Using Amazon Kinesis Data Firehose to generate business insights
Ajani Motta7 min read
This article will focus on using Amazon Kinesis Data Firehose to route Lambda destination logs to S3 and connecting to QuickSight (in order to analyze product performance from a business perspective).
Event-driven architecture is a popular application design approach which uses events to trigger and communicate between decoupled services. An event can be any change in state or an update: a button press, credit card swipe, item being placed in a cart, et cetera. We used AWS Lambda and EventBridge to build out our event-driven architecture. AWS gives us the ability to route the execution results of each Lambda invocation to other AWS services — this is the concept of Lambda destinations. Lambda destinations allow you to route asynchronous function results as an execution record to a destination resource without writing additional code. An execution record contains information about the request and response (in JSON format).
As these events trigger Lambda executions, it is common to want to track performance of these Lambdas, answering questions like:
- What percentage of new user form submissions failed during our onboarding in October?
- Did we maintain the threshold of successful credit card swipes during the holiday period?
The answers to these sorts of questions can provide valuable business insights. A development team can use these metrics in a more practical approach towards product development. They can point to concrete data to support their decision making process. It provides higher product visibility, which usually helps the development cycle.
Project Setup
This implementation utilizes the Serverless framework: an open source software that builds, compiles, and packages code for serverless deployment, and then deploys the package to the cloud.
How can we set up a system to answer performance questions like these?
The first step in analysis is to gather and store data — it makes sense to add an S3 bucket as the Lambda destination. The problem here is that it is only possible to route Lambdas to one of four locations:
- another Lambda function
- SNS
- SQS
- EventBridge
This begs the question — How can we route these Lambda results to S3? Answer: Kinesis Data Firehose — a tool used to stream data into data buckets (in our case S3) and convert data into required formats for analysis without the need for building processing pipelines (read more here).
Architecture
We configure each Lambda with EventBridge destinations on both the success and failure cases. These success/failure events are used as triggers to the sendEventToFirehose
Lambda (responsible for sending the event to Kinesis Data Firehose). Once the data is successfully streamed into the S3 bucket, we configure a QuickSight instance to use this S3 bucket as a data source (to generate an analytics dashboard).
Implementation
In this implementation, we assign a pattern
event type to each Lambda. The pattern
acts as a schema, helping AWS filter out the relevant events to be forwarded to the function. Each Lambda function also has destinations for the success and failure cases, in the form of event bus identifiers or ARNs. We abstract out the event bus names/ARNs, as this serverless.yml
is shared between multiple developer environments (each with its own stage/event bus).
exampleLambda:
handler: path/to/handler
events:
- eventBridge:
eventBus: ${self:custom.eventBus}
pattern:
source:
- Event Source (e.g. Lambda)
detail-type:
- Event Detail Type (e.g. Example Lambda)
destinations:
onSuccess: ${self:custom.eventBusArn}
onFailure: ${self:custom.eventBusArn}
environment:
EVENT_BUS_NAME: ${self:custom.eventBus}
The sendEventToFirehose
Lambda (configuration below) is triggered by events with detail types corresponding to successes/failures of the Lambdas like the one above. The environment
key is used to define any environment variables used in the function (in this case, the name of the Kinesis Data Firehose delivery stream — a property of the CloudFormation stack used to host the Kinesis Data Firehose instance). We do not assign any destinations for this Lambda, as it does not represent any business logic we are looking to analyze.
## sendEventToFirehose Lambda configuration
sendEventToFirehose:
handler: path/to/sendEventToFirehose.handler
events:
- eventBridge:
eventBus: ${self:custom.eventBus}
pattern:
source:
- Lambda
detail-type:
- Lambda Function Invocation Result - Success
- Lambda Function Invocation Result - Failure
environment:
DELIVERY_STREAM_NAME: ${cf:firehose-${sls:stage}.DeliveryStreamName}
The next order of business is to implement this newly configured Lambda function. In order to route the event to Kinesis Data Firehose, we create a PutRecordCommand
which is to be sent by the Firehose client. The input for this command includes:
DeliveryStreamName
: the name of the stream to put the data record intoRecord.Data
: the data blob to put into the record, which is base64-encoded when the blob is serialized.
import { EventBridgeHandler } from 'aws-lambda';
import {
FirehoseClient,
PutRecordCommand,
PutRecordCommandInput,
} from '@aws-sdk/client-firehose';
const client = new FirehoseClient({ region: 'us-east-1' });
export const sendEventToFirehose: EventBridgeHandler<
| 'Lambda Function Invocation Result - Success'
| 'Lambda Function Invocation Result - Failure',
{ eventName: string },
void
> = async (event) => {
const input: PutRecordCommandInput = {
DeliveryStreamName: process.env.DELIVERY_STREAM_NAME,
Record: { Data: Buffer.from(`${JSON.stringify(event)}\n`) },
};
const command = new PutRecordCommand(input);
await client.send(command);
};
Firehose CloudFormation Stack
We deploy the Firehose setup on its own CloudFormation stack, separate from the serverless flow it is tracking (.cform
below). We assign StageName
as a parameter. We then define all the resources that this CloudFormation Stack will be deploying:
- S3 bucket to which the events are routed
- CloudWatch Log Group to allow us to inspect logs
- Firehose Role — This IAM role will allow resources inside of our AWS account to access the Firehose we created. It will also allow the Firehose stream to use our S3 bucket and CloudWatch.
- Firehose delivery stream (which configures Firehose to the S3 and CloudWatch instances)
In order to connect this newly formed CloudFormation stack to our Serverless setup, we declare the name of the Firehose delivery stream as an output (which we referenced earlier — as an environment variable in our sendEventToFirehose
Lambda 😲 💡 🧠 ).
## firehoseStack.cform
Parameters:
StageName:
Type: String
Resources:
EventsBucket:
Type: AWS::S3::Bucket
EventsCloudWatchLogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: !Sub /aws/kinesisfirehose/EventsFirehose-${StageName}
EventsCloudWatchLogStream:
Type: AWS::Logs::LogStream
Properties:
LogGroupName: !Ref EventsCloudWatchLogGroup
LogStreamName: !Sub EventsFirehoseLogStream-${StageName}
EventsFirehoseRole:
Type: AWS::IAM::Role
Properties:
RoleName: !Sub EventsFirehoseRole-${StageName}
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- firehose.amazonaws.com
Action: sts:AssumeRole
Condition:
StringEquals:
sts:ExternalId: !Ref 'AWS::AccountId'
Policies:
- PolicyName: EventsFirehoseRolePolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action: s3:*
Resource:
- !GetAtt EventsBucket.Arn
- !Sub ${EventsBucket.Arn}/*
- Effect: Allow
Action: logs:PutLogEvents
Resource: !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/kinesisfirehose/EventsFirehose-*
EventsFirehose:
Type: AWS::KinesisFirehose::DeliveryStream
Properties:
DeliveryStreamName: !Sub EventsFirehose-${StageName}
S3DestinationConfiguration:
CloudWatchLoggingOptions:
Enabled: True
LogGroupName: !Ref EventsCloudWatchLogGroup
LogStreamName: !Ref EventsCloudWatchLogStream
BucketARN: !GetAtt EventsBucket.Arn
BufferingHints:
IntervalInSeconds: 60
SizeInMBs: 5
CompressionFormat: UNCOMPRESSED
RoleARN:
Fn::GetAtt: [EventsFirehoseRole, Arn]
Outputs:
DeliveryStreamName:
Description: Events Firehose Delivery Stream Name
Value: !Ref EventsFirehose
Export:
Name: !Sub DeliveryStreamName-${StageName}
Connecting to QuickSight
Connecting this Firehose stream to QuickSight is as easy as following the steps here that AWS provides. JSON manifest files are used to specify files in Amazon S3 to import into Amazon QuickSight. In our case, we use the fields in:
fileLocations
: to specify the files to importURIPrefixes
: an array that lists URI prefixes for S3 buckets/folders
globalUploadSettings
(optional): to specify import settings for those files (i.e. text qualifiers). Uses default values if not specified.format
: format of files to be importedtextqualifier
: character used to signify where text begins and ends
// manifest.json
{
"fileLocations": [
{
"URIPrefixes": [
"s3://<bucket name>/"
]
}
],
"globalUploadSettings": {
"format": "JSON",
"textqualifier":"\""
}
}
Final Output
Here are some visual examples of the output that QuickSight provides as a result. QuickSight can break down the data into groups, allowing for a variety of visuals similar to those below (performance by date, performance by function, and more). The QuickSight dashboard offers a whole assortment of views, filters, and metrics to play around with in order to craft the most insightful data representation.
Conclusion
Congratulations, the setup is complete! You can build a custom analytics dashboard and present your findings to other business constituents! Now you can use these metrics to drive product direction:
- What should we iterate on?
- Where should we focus our attention?
The choice is yours.