Building Modern Serverless APIs with CDK, Python, and GraphQL (Part 3)
Designing Event Driven Applications
Table of contents
Welcome to the third part of this article series.
In Part 1, we gave a brief overview of the concept of event-driven architectures, coupling, and defined all AWS services needed to build the API.
In Part 2, we created a new CDK project, and added CDK resources for our GraphQL API, alongside a GraphQL Schema, an SQS queue, a DynamoDB table, and an SNS Topic.
Let's proceed to build from where we left off in part 2.
AWS Step Functions Resources
We won't be looking at how to create a step functions workflow from scratch. I'll assume you at least have a basic understanding of the service.
If you don't, don't worry. I've written a couple of articles to get you up and running in no time.
The Step Functions workflow has 4 Lambda functions. Here's what it looks like visually. I exported this image from the Step Functions Visual Workflow.
I also exported the workflow as a JSON file and saved it inside a python package called step_function_workflow
in the project directory.
You can create a python package with the same name and get the file here. workflow.json
For the 4 Lambda functions, navigate to the lambda_fns
folder, create 4 python packages, and within each package, create a python file. These python files would serve as lambda functions.
Package name:
initialize_order
, File name:initialize_order.py
Package name:
process_payment
, File name:process_payment.py
Package name:
complete_order
, File name:complete_order.py
Package name:
cancel_failed_order
, File name:cancel_failed_order.py
Create a file called step_function.py
within the step_function_workflow
python package. We'll define all the lambda functions resources needed by the workflow within this file.
We have to create a Lambda resource for each of the above 4 python files, add them to the step functions workflow and then return a step functions instance.
In order to create a Lambda function resource, we need to first import the aws_lambda
class from CDK.
from aws_cdk import aws_lambda as lambda_
Create a function with these parameters. We'll be passing in the stack, lambda permissions, and an SNS topic we created in the stack file.
def create_step_function(stack, lambda_step_function_role, cfn_topic):
Next, using the function CfnFunction
found within the aws_lambda
class, let's create the lambda function resource for initialize_order.py
.
Firstly, we need to return a stream of the Lambda function, using the open
method. We'll repeat these same steps for the other lambda functions.
with open("lambda_fns/initialize_order/initialize_order.py", 'r') as file:
initialize_order = file.read()
Then, we define the resource the Lambda function needs
initialize_order_function = \
lambda_.CfnFunction(stack, "initialize-order-function",
code=lambda_.CfnFunction.CodeProperty(
zip_file=initialize_order
),
role=lambda_step_function_role.role_arn,
# the properties below are optional
architectures=["x86_64"],
description="lambda-ds",
environment=lambda_.CfnFunction.EnvironmentProperty(
variables={
"ORDER_TABLE": "ORDER",
"TOPIC_ARN": cfn_topic.attr_topic_arn
}
),
function_name="initialize-order-function",
handler="index.handler",
package_type="Zip",
runtime="python3.9",
timeout=123,
tracing_config=lambda_.CfnFunction.TracingConfigProperty(
mode="Active"
)
)
Here's what's happening in the above code.
To create a Lambda function, we need a deployment package and an execution role. Our deployment package in this case is a zip file, containing the initialize_order
Lambda code. You can also use a container image as your deployment package.
code=lambda_.CfnFunction.CodeProperty(zip_file=initialize_order),
We also need an execution role that grants the function permission to access other AWS services such as Step Functions, CloudWatch, etc.
role=lambda_step_function_role.role_arn,
Here are the policies we attached to the lambda function role in the stack file.
Full DynamoDB access (Because we'll be saving the order to the database)
Full SNS access for sending emails
Full CloudWatch access for logging
lambda_step_function_role = \
iam.Role(self, "LambdaStepFunctionRole",
assumed_by=iam.ServicePrincipal("lambda.amazonaws.com"),
managed_policies=[db_full_access_role,
cloud_watch_role_full_access,
sns_policy])
Then we set the environment variables that we'll be needing in our Lambda function. In this case, it's the DynamoDB table name and SNS topic Amazon resource name (ARN).
Next, we state the runtime (python 3.9), handler name (which we'll define later within the initialize_order.py
file, and the Lambda function timeout in seconds. Note that a lambda function can't run above 15 minutes.
Repeat these same steps for the other 3 lambda functions.
Use the file on Github as a reference: step_function.py
At the bottom of the step_function.py
file, we have to define the Step Functions workflow template and also add the created Lambda functions ARNs.
sf_workflow = \
Template(workflow).substitute(InitializeOrderArn=initialize_order_function.attr_arn,
ProcessPaymentArn=process_payment_function.attr_arn,
CompleteOrderArn=complete_order_function.attr_arn,
CancelFailedOrderArn=cancel_failed_order_function.attr_arn, dollar="$")
return sf_workflow
Inside the workflow.json
file, there's a line like this
"FunctionName": "$InitializeOrderArn"
We use the substitute
method above to replace that line with the ARN to where the Lambda function resides InitializeOrderArn=initialize_order_function.attr_arn
.
Now, go to the stack file of your application, and type in the following code to create a Step Functions state machine.
Firstly, we need to call the create_step_function(self, lambda_step_function_role, cfn_topic)
function we defined above and pass its return type to our state machine's definition.
Also, the state machine needs permission to interact with other AWS services. So we grant Step Functions permissions to execute lambda functions.
workflow = create_step_function(self, lambda_step_function_role, cfn_topic)
simple_state_machine = \
stepfunctions.CfnStateMachine(self, "SimpleStateMachine",
definition=json.loads(workflow),
role_arn=lambda_execution_role.role_arn
)
At this point, we've created resources for 75% of the application. Let's go ahead and start creating the endpoints to consume these resources.
Endpoints
Create Order
When a user places an order, a series of events happen
An AppSync Mutation is invoked. The input to the Mutation is bundled and sent as a message to an SQS Queue.
A Lambda function (post order Lambda) polls the messages from the SQS Queue, extracts the order information, and starts up a Step Functions workflow with order information as input.
The Step Functions workflow contains 4 different Lambda functions (Initialize Order, Process Payment, Complete Order, Cancel Order).
The Initialize Order function creates a new order in the database.
The Process Payment function randomly assigns a
success
orfailed
payment status to the order.If payment was successful, the Complete Order function updates the order in the database and sends an email to the client.
If payment failed, the order status is updated in the database and an email is sent with failed status to the client through the failed email Lambda function.
Create a python package inside the lambda-fns
folder called post_order
. Then inside that folder, create a python file called post_order.py
.
This Lambda function would poll messages from the SQS queue and start a Step Functions workflow. Therefore, it'll need permission to receive SQS queue messages and to start a Step Functions workflow.
Let's define the code within the post_order.py
file.
def handler(event, context):
new_order_list = []
print("event ", event)
for record in event["Records"]:
message_id = record["messageId"]
request_body = json.loads(record["body"])
order_data = request_body["input"]
print(f'post_orders reqeust_body {order_data} type: {type(order_data)}')
sfn_input = assemble_order(message_id, order_data)
response = start_sfn_exec(sfn_input, message_id)
print(f'start sfn execution: {response}')
new_order_list.append(response["executionArn"])
return new_order_list
Messages from a queue are polled as Records
. We have to iterate over each record in order to extract the order information, assemble the order, and use it to start a step functions workflow.
sfn_input = assemble_order(message_id, order_data)
This method simply adds more random data to the created order and returns the order.
def assemble_order(message_id, order_data):
now = datetime.now()
order_data["user_id"] = "demo_user"
order_data["id"] = message_id
order_data["orderStatus"] = DEFAULT_ORDER_STATUS
order_data["createdAt"] = now.isoformat()
return json.dumps(order_data, cls=DecimalEncoder)
response = start_sfn_exec(sfn_input, message_id)
Here's where we start the step functions workflow, using the Step Function ARN and the order data as input.
def start_sfn_exec(sfn_input, sfn_exec_id):
response = sfn.start_execution(
stateMachineArn=STATE_MACHINE_ARN,
name=sfn_exec_id,
input=json.dumps(sfn_input, cls=DecimalEncoder)
)
print(f'post_orders start sfn_exec_id {sfn_exec_id} and input {sfn_input}')
return response
Now, define the resources and permissions this Lambda function needs. It's pretty similar to the Step Functions lambda resources we defined above.
Within the stack folder, mine is event_driven_cdk_app
create a file called post_order_resource.py
and type in the following code.
def create_post_order_lambda_resource(stack, simple_state_machine, sqs_receive_message_role, queue):
with open("lambda_fns/post_order/post_order.py", 'r') as file:
post_function_file = file.read()
post_function = lambda_.CfnFunction(stack, "post",
code=lambda_.CfnFunction.CodeProperty(
zip_file=post_function_file
),
role=sqs_receive_message_role.role_arn,
# the properties below are optional
architectures=["x86_64"],
description="lambda-ds",
environment=lambda_.CfnFunction.EnvironmentProperty(
variables={
"ORDER_TABLE": "ORDER",
"STATE_MACHINE_ARN": simple_state_machine.attr_arn
}
),
function_name="post-order-function",
handler="index.handler",
package_type="Zip",
runtime="python3.9",
timeout=123,
tracing_config=lambda_.CfnFunction.TracingConfigProperty(
mode="Active"
)
)
lambda_.EventSourceMapping(scope=stack, id="MyEventSourceMapping",
target=post_function,
batch_size=5,
enabled=True,
event_source_arn=queue.attr_arn)
It's very identical to the one we defined above, except for the last part
lambda_.EventSourceMapping(scope=stack, id="MyEventSourceMapping",
target=post_function,
batch_size=5,
enabled=True,
event_source_arn=queue.attr_arn)
Remember, this function is responsible for polling messages from an SQS queue. We subscribe to the SQS queue using the EventSourceMapping
function of the lambda API by passing in the SQS queue's ARN as the event source event_source_arn=queue.attr_arn
.
The batch_size
indicates the number of messages to be polled from SQS at a time.
When this function invokes a Step Function workflow, the first Lambda function to run in the workflow is initialize_order.py
.
We created the file in the initialize_order
python package, but we didn't add any code to it. Open up initialize_order.py
and type in the following code.
All we do is save the order to the database and forward the order details to the next function, which is process_payment.py
def persist_order(order_item):
print(f'persist_order item {order_item} to table {TABLE_NAME}')
response = table.put_item(Item=order_item)
message = {"order_status": order_item["orderStatus"], "order_id": order_item["id"]}
print(f'new order pending payment {message}')
return {
"message": json.dumps(order_item, indent=4, cls=DecimalEncoder)
}
You can grab the complete code here
The process_payment.py
function randomly assigns a payment state (ok
or error
) and an error message if the randomly assigned state was error
, and then moves to the next function.
The next function is determined by a choice state based on the state of the payment. The next function would be complete_order.py
if the payment state was ok
or cancel_failed_order.py
if the payment state was error
.
In either of these functions, the order is updated in the database, and an email is sent using SNS.
For the complete order function, the order is updated like so
response = table.update_item(
Key={
"user_id": event["saveResults"]["user_id"],
"id": event["saveResults"]["id"]
},
UpdateExpression="set orderStatus = :s",
ExpressionAttributeValues={
":s": order_status
},
ReturnValues="UPDATED_NEW"
)
And an email is sent as such
sns.publish(
TopicArn=topic_arn,
Message=json.dumps(message),
Subject=f'Orders-App: Update for order {message["order_id"]}'
)
See the complete code here complete_order
Same thing with cancel_failed_order.py
function Cancel Failed Order
At this point, we could deploy and test the API. But I think it'll be better to add at least one more endpoint before deploying.
I would love to see a list of all the orders made.
But that'll be for the next article.
Conclusion
In this post, we added a Step functions workflow containing 4 Lambda functions to our API. We saw how to initialize an order and also send SNS email notifications of the status of an order. We'll continue building the API in the next post.
I'll love to know your thoughts about the series so far. Please leave a like or comment.
And until next time folks ✌🏾