Handling Multiple AWS Lambda Event Types with Go

By Robert Bruce – Chief Technology Officer

Here at synvert TCM (formerly Crimson Macaw), we have typically created AWS Lambda functions in Python and used the abstract factory design pattern as a way to handle different AWS event structures. Achieving the same principle within Go requires a different approach. Read on to find out how we handle multiple AWS Lambda event types in Go.

Being able to trigger a lambda function from multiple sources gives great flexibility to deploy our solutions into different configurations depending on our clients’ requirements.

An excellent example of this is a lambda function that handles AWS S3 events; you can do this either by directly triggering from S3 or by sending S3 events to AWS SNS or AWS SQS and then subsequently setting up as an event source to the lambda function.

Expected Data Structures

When S3, SNS or SQS trigger a lambda function, the JSON structure contains a top-level Records array; however, the structure of each will differ depending on the source.

{ 
"Records": [] 
}

Direct From S3

In this setup, we configure an  S3 bucket for its events to invoke a Lambda function directly.

Diagram showing AWS Lambda Function triggered by AWS S3

When S3 asynchronously invokes the Lambda function, an example of the JSON structure would be:

{ 
"Records": [ 
{ 
"eventVersion": "2.1", 
"eventSource": "aws:s3", 
"awsRegion": "eu-west-1", 
"eventTime": "2020-04-05T19:37:27.192Z", 
"eventName": "ObjectCreated:Put", 
"userIdentity": { 
"principalId": "AWS:AIDAINPONIXQXHT3IKHL2" 
}, 
"requestParameters": { 
"sourceIPAddress": "205.255.255.255" 
}, 
"responseElements": { 
"x-amz-request-id": "D82B88E5F771F645", 
"x-amz-id-2": "vlR7PnpV2Ce81l0PRw6jlUpck7Jo5ZsQjryTjKlc5aLWGVHPZLj5NeC6qMa0emYBDXOo6QBU0Wo=" }, 
"s3": { 
"s3SchemaVersion": "1.0", 
"configurationId": "828aa6fc-f7b5-4305-8584-487c791949c1", 
"bucket": { 
"name": "lambda-artifacts-deafc19498e3f2df", 
"ownerIdentity": { 
"principalId": "A3I5XTEXAMAI3E" 
}, 
"arn": "arn:aws:s3:::lambda-artifacts-deafc19498e3f2df" 
}, 
"object": { 
"key": "b21b84d653bb07b05b1e6b33684dc11b", 
"size": 1305107, 
"eTag": "b21b84d653bb07b05b1e6b33684dc11b", 
"sequencer": "0C0F6F405D6ED209E1" 
} 
} 
} 
] 
}

Referenced from https://docs.aws.amazon.com/lambda/latest/dg/with-s3.html

AWS S3 events via AWS SNS

In this setup, we configure S3 to publish events to an SNS Topic. We add the Lambda function as a subscriber of the SNS Topic.

Diagram showing AWS Lambda Function being triggered by AWS S3 via AWS SNS

When events occur on the S3 bucket, it pushes messages to the SNS Topic in the same format as above.

When SNS asynchronously invokes the Lambda function, after S3 has published a message, an example of the JSON structure would be:

{ 
"Records": [ 
{ "EventVersion": "1.0", 
"EventSubscriptionArn": "arn:aws:sns:us-east-2:123456789012:sns-lambda:21be56ed-a058-49f5-8c98-aedd2564c486", 
"EventSource": "aws:sns", 
"Sns": { 
"SignatureVersion": "1", 
"Timestamp": "2020-04-05T19:37:27.318Z", 
"Signature": "tcc6faL2yUC6dgZdmrwh1Y4cGa/ebXEkAi6RibDsvpi+tE/1+82j...65r==", 
"SigningCertUrl": "https://sns.us-east-2.amazonaws.com/SimpleNotificationService-ac565b8b1a6c5d002d285f9598aa1d9b.pem", 
"MessageId": "95df01b4-ee98-5cb9-9903-4c221d41eb5e", 
"Message": "{\"Records\":[{\"eventVersion\":\"2.1\",\"eventSource\":\"aws:s3\",\"awsRegion\":\"eu-west-1\",\"eventTime\":\"2020-04-05T19:37:27.192Z\",\"eventName\":\"ObjectCreated:Put\",\"userIdentity\":{\"principalId\":\"AWS:AIDAINPONIXQXHT3IKHL2\"},\"requestParameters\":{\"sourceIPAddress\":\"205.255.255.255\"},\"responseElements\":{\"x-amz-request-id\":\"D82B88E5F771F645\",\"x-amz-id-2\":\"vlR7PnpV2Ce81l0PRw6jlUpck7Jo5ZsQjryTjKlc5aLWGVHPZLj5NeC6qMa0emYBDXOo6QBU0Wo=\"},\"s3\":{\"s3SchemaVersion\":\"1.0\",\"configurationId\":\"828aa6fc-f7b5-4305-8584-487c791949c1\",\"bucket\":{\"name\":\"lambda-artifacts-deafc19498e3f2df\",\"ownerIdentity\":{\"principalId\":\"A3I5XTEXAMAI3E\"},\"arn\":\"arn:aws:s3:::lambda-artifacts-deafc19498e3f2df\"},\"object\":{\"key\":\"b21b84d653bb07b05b1e6b33684dc11b\",\"size\":1305107,\"eTag\":\"b21b84d653bb07b05b1e6b33684dc11b\",\"sequencer\":\"0C0F6F405D6ED209E1\"}}}]}", 
"MessageAttributes": {}, 
"Type": "Notification", 
"UnsubscribeUrl": "https://sns.eu-west-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:eu-west-1:123456789012:test-lambda:21be56ed-a058-49f5-8c98-aedd2564c486", 
"TopicArn":"arn:aws:sns:eu-west-1:123456789012:sns-lambda", 
"Subject": "TestInvoke" 
} 
} 
] 
}

Referenced from https://docs.aws.amazon.com/lambda/latest/dg/with-sns.html

As you can see, the S3 event is the original JSON structure but encoded as a string in the Message property.

AWS S3 events via AWS SQS

In this setup, S3 was configured to send events to an SQS Queue. The SQS Queue is added as an event source to the Lambda function.

Diagram showing AWS Lambda Function being triggered by AWS S3 via AWS SQS

When events occur on the S3 bucket, messages are pushed to the SQS Queue in the same format as above.

When SNS synchronously invokes the Lambda function, after S3 has sent a message, an example of the JSON structure would be:

{ 
"Records": [ 
{ 
"messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", 
"receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...", 
"body": "{\"Records\":[{\"eventVersion\":\"2.1\",\"eventSource\":\"aws:s3\",\"awsRegion\":\"eu-west-1\",\"eventTime\":\"2020-04-05T19:37:27.192Z\",\"eventName\":\"ObjectCreated:Put\",\"userIdentity\":{\"principalId\":\"AWS:AIDAINPONIXQXHT3IKHL2\"},\"requestParameters\":{\"sourceIPAddress\":\"205.255.255.255\"},\"responseElements\":{\"x-amz-request-id\":\"D82B88E5F771F645\",\"x-amz-id-2\":\"vlR7PnpV2Ce81l0PRw6jlUpck7Jo5ZsQjryTjKlc5aLWGVHPZLj5NeC6qMa0emYBDXOo6QBU0Wo=\"},\"s3\":{\"s3SchemaVersion\":\"1.0\",\"configurationId\":\"828aa6fc-f7b5-4305-8584-487c791949c1\",\"bucket\":{\"name\":\"lambda-artifacts-deafc19498e3f2df\",\"ownerIdentity\":{\"principalId\":\"A3I5XTEXAMAI3E\"},\"arn\":\"arn:aws:s3:::lambda-artifacts-deafc19498e3f2df\"},\"object\":{\"key\":\"b21b84d653bb07b05b1e6b33684dc11b\",\"size\":1305107,\"eTag\":\"b21b84d653bb07b05b1e6b33684dc11b\",\"sequencer\":\"0C0F6F405D6ED209E1\"}}}]}", 
"attributes": { 
"ApproximateReceiveCount": "1", 
"SentTimestamp": "1586111847318", 
"SenderId": "AIDAIENQZJOLO23YVJ4VO", 
"ApproximateFirstReceiveTimestamp": "15861118483091" 
}, 
"messageAttributes": {}, 
"md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", 
"eventSource": "aws:sqs", 
"eventSourceARN": "arn:aws:sqs:eu-west-1:123456789012:my-queue", 
"awsRegion": "eu-west-1" 
} 
] 
}

Referenced from https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html

Just like when we use SNS, the S3 event is the original JSON structure but coded as a string in the body of the SQS message.

AWS Lambda in Python

Decoding the variant structures in Python is relatively simple, as the event arrives as a dict, then using Python classes and **kwargs on a class’s __init__ method is enough to recursively decode the structure.

An incomplete example:

from abc import ABC 

class Record(ABC): 
@staticmethod 
def factory(record: dict) -> 'Record': 
try: 
return S3Record(**record) 
except (KeyError, ValueError): 
pass 

try: 
return SNSRecord(**record) 
except (KeyError, ValueError): 
pass 

try: 
return SQSRecord(**record) 
except (KeyError, ValueError): 
pass 

raise TypeError 

class S3Record(Record): 
__slots__ = ('event_source',) 

def __init__(self, **kwargs): 
self.event_source = kwargs.pop('eventSource') 
if 'aws:s3' != self.event_source: 
raise ValueError 

class SNSRecord(Record): 
__slots__ = ('event_source',) 

def __init__(self, **kwargs): 
self.event_source = kwargs.pop('EventSource') 
if 'aws:sns' != self.event_source: 
raise ValueError 

class SQSRecord(Record): __slots__ = ('event_source',) 

def __init__(self, **kwargs): 
self.event_source = kwargs.pop('eventSource') 
if 'aws:sns' != self.event_source: 
raise ValueError

AWS Lambda in Go

Using the AWS Lambda for Go library, it is not so simple as the event structure is already pre-decoded using the encoding/json library internally.

Fortunately, the library already contains the expected data structures as struct types with the json tags needed for decoding. These can be used for much of the leg work, but to handle the variant structure then the Unmarshaller interface needs to be implemented on the data type used on the lambda handle function.

Setup the main

If you are unfamiliar with writing Lambda functions in Go, then I would recommend reading the AWS Lambda Function Handler in Go documentation.

I create a custom structure data type called Event, for which I know will always contain, be a dictionary with a field called Records.

Each Record will always be about an S3 event, but its source could come from any of the above. To keep the original data, I can store the event source information and if an SNS or SQS payload is received, then save that too within additional fields.

package main 

import ( 
"context" 
"github.com/aws/aws-lambda-go/events" 
"github.com/aws/aws-lambda-go/lambda" 
) 

//Record each data record 
type Record struct { 
EventSource string 
EventSourceArn string 
AWSRegion string 
S3 events.S3Entity 
SQS events.SQSMessage 
SNS events.SNSEntity 
} 

//Event incoming event 
type Event struct { 
Records []Record 
} 

func handle(ctx context.Context, event Event) { 
// execute the lambda 
} 

func main() { 
lambda.Start(handle) 
}

Implement the Unmarshaller interface

The Unmarshaller interface only requires a single function. Initially, the most apparent type to implement this would be on a Record, however as an SNS or SQS event could theoretically have multiple S3 records internally then the function must exist on the Event type.

func (event *Event) UnmarshalJSON(data []byte) error { 
}

Detecting the Event Source

Before I decode the data, first, I need to detect the structure type. I add another function on an *Event to get the event type; our actual implementation of this has much more error handling, but a simplified version would be:

type eventType int 

const ( unknownEventType eventType = iota 
s3EventType 
snsEventType 
sqsEventType 
) 

func (event *Event) getEventType(data []byte) eventType { 
temp := make(map[string]interface{}) 
json.Unmarshal(data, &temp) 

recordsList, _ := temp["Records"].([]interface{}) 
record, _ := recordsList[0].(map[string]interface{}) 

var eventSource string 

if es, ok := record["EventSource"]; ok { 
eventSource = es.(string) 

} else if es, ok := record["eventSource"]; ok { 
eventSource = es.(string) 
} 

switch eventSource { 
case "aws:s3": 
return s3EventType 
case "aws:sns": 
return snsEventType 
case "aws:sqs": 
return sqsEventType 
} 

return unknownEventType 

}

The first step in the UnmarshalJSON function is now to call the getEventType function.

func (event *Event) UnmarshalJSON(data []byte) error { 
eType := event.getEventType(data) 
}

Mapping the Data

Now that I know the event type, I can safely use the event structures inside the AWS Lambda for Go package as these have json struct tags for decoding the data.

func (event *Event) UnmarshalJSON(data []byte) error { 
var err error 

switch event.getEventType(data) { 
case s3EventType: 
s3Event := &events.S3Event{} 
err = json.Unmarshal(data, s3Event) 

if err == nil { 
return event.mapS3EventRecords(s3Event) 
}

case snsEventType: 
snsEvent := &events.SNSEvent{} 
err = json.Unmarshal(data, snsEvent) 

if err == nil { 
return event.mapSNSEventRecords(snsEvent) 
} 

case sqsEventType: 
sqsEvent := &events.SQSEvent{} 
err = json.Unmarshal(data, sqsEvent) 

if err == nil { 
return event.mapSQSEventRecords(sqsEvent) 
} 

} 

return err 
}

In the above, I have referenced some map functions. These take each source event and map each record to our Record structure.

Mapping S3 Records

The mapS3EventRecords function is the simplest, each S3EventRecord can be directly mapped to my Record structure.

func (event *Event) mapS3EventRecords(s3Event *events.S3Event) error { 
event.Records = make([]Record, 0) 

for _, s3Record := range s3Event.Records { 
event.Records = append(event.Records, Record{ 
EventSource: s3Record.EventSource, 
EventSourceArn: s3Record.S3.Bucket.Arn, 
AWSRegion: s3Record.AWSRegion, 
S3: s3Record.S3, 
}) 

}

return nil 
}

Mapping SNS Records

The mapSNSEventRecords function requires a little bit extra.

An SNS event does not contain any region information in its payload, by using arn.Parse function available in the AWS SDK for Go I can extract the region from SNS TopicArn.

Also, I use the json.Unmarshal function to decode the SNS message to an S3Event. As this, itself holds an array, the overall mapping of a single SNSEventRecord can produce multiple records.

You may also notice the use of github.com/pkg/errors package here!

func (event *Event) mapSNSEventRecords(snsEvent *events.SNSEvent) error { 
event.Records = make([]Record, 0) 

for _, snsRecord := range snsEvent.Records { 
// decode sns message to s3 event 
s3Event := &events.S3Event{} 
err := json.Unmarshal([]byte(snsRecord.SNS.Message), s3Event) 
if err != nil { 
return errors.Wrap(err, "Failed to decode sns message to an S3 event") 
}

if len(s3Event.Records) == 0 { 
return errors.New("S3 Event Records is empty") 
} 

for _, s3Record := range s3Event.Records { 
topicArn, err := arn.Parse(snsRecord.SNS.TopicArn) 
if err != nil { 
return err 
} 

event.Records = append(event.Records, Record{ 
EventSource: snsRecord.EventSource, 
EventSourceArn: snsRecord.SNS.TopicArn, 
AWSRegion: topicArn.Region, 
SNS: snsRecord.SNS, 
S3: s3Record.S3, 
}) 
} 

} 

return nil 
}

Mapping SQS Records

The mapSQSEventRecords is similar to mapSNSEventRecords, except that the region is part of a SQS structure.

func (event *Event) mapSQSEventRecords(sqsEvent *events.SQSEvent) error { 
event.Records = make([]Record, 0) 

for _, sqsRecord := range sqsEvent.Records { 
// decode sqs body to s3 event s3Event := &events.S3Event{} 
err := json.Unmarshal([]byte(sqsRecord.Body), s3Event) 
if err != nil { 
return errors.Wrap(err, "Failed to decode sqs body to an S3 event") 
} 

if len(s3Event.Records) == 0 { 
return errors.New("S3 Event Records is empty") 
} 

for _, s3Record := range s3Event.Records { 
event.Records = append(event.Records, 
Record{ EventSource: sqsRecord.EventSource, 
EventSourceArn: sqsRecord.EventSourceARN, 
AWSRegion: sqsRecord.AWSRegion, 
SQS: sqsRecord, 
S3: s3Record.S3, 
}) 
} 
} 

return nil 
}

Conclusion

As you can see from the above, by using the internal functionality of Go’s encoding/json package, it is possible to populate your lambda function’s event type dynamically.

This approach, of course, is expandible for any underlying data type you wish.

Find out more about how we’ve used json to help our clients here.