Building Modern Serverless APIs with CDK, Python, and GraphQL (Part 2)
Designing Event Driven Applications
In the first part of this article series, we gave a brief overview of the concept of event-driven architectures, coupling, and defined all AWS services needed to build the API.
Let's proceed.
Prerequisite
Before proceeding, please confirm you have all these dependencies installed on your computer
Creating a new CDK python project
From the command line interface (Terminal), create and change the directory into the newly created folder using the command
mkdir eventDrivenCdk && cd $_
I named my project eventDrivenCdk
, feel free to give yours a different name.
Within the newly created project, initialize a python CDK project using the command
cdk init --language=python
This project is set up like a standard Python project. The initialization process also creates a virtualenv
within this project, stored under the .venv
directory.
To create the virtualenv
it assumes that there is a python3
(or python
for Windows) executable in your path with access to the venv
package.
If for any reason the automatic creation of the virtualenv fails, you can create the virtualenv manually.
To manually create a virtualenv on MacOS and Linux:
$ python3 -m venv .venv
After the init process completes and the virtualenv is created, you can use the following step to activate your virtualenv.
$ source .venv/bin/activate
If you are a Windows platform, you would activate the virtualenv like this:
% .venv\Scripts\activate.bat
Once the virtualenv is activated, you can install the required dependencies.
Add boto3
to the requirements.txt
before running the command.
$ pip install -r requirements.txt
Boto3 is the aws sdk for python.
Graphql Schema
In the root directory, create a file called schema.graphql
and type in the following code. This file is a description of our Graphql API. It contains all the types, queries, mutations, and subscriptions for our Graphql API.
type Schema {
query: Query
mutation: Mutation
}
type Order {
name: String!
quantity: Int!
restaurantId: String!
}
input OrderInput {
name: String!
quantity: Int!
restaurantId: String!
}
input UpdateOrderInput {
id: String!
name: String!
quantity: Int!
restaurantId: String!
}
type Query {
orders: [Order ]!
order(id: String!): Order!
}
type Mutation {
postOrder(input: OrderInput!): Order!
updateOrder(input: UpdateOrderInput!): Order!
deleteOrder(id: String!): String
}
From the schema, our API has 3 mutations and 2 queries. Before delving into the implementation details of these endpoints, we need to first define all the resources the app needs in order to run effectively.
Defining the GraphQL API in Stack
The first step is to import the appsync
class from the aws-cdk-lib
.
import aws_cdk.aws_appsync as appsync
Then, use the CfnGraphQLApi
method within the appsync class to create the API. This method takes a myriad of parameters, but for our use case, all we need is an API name, the authentication type, x-ray, and cloudwatch for tracing and logging.
api = appsync.CfnGraphQLApi(self, "Api",
name="event_driven_cdk",
authentication_type="API_KEY",
xray_enabled=True,
log_config=log_config
)
log_config = appsync.CfnGraphQLApi.LogConfigProperty(
cloud_watch_logs_role_arn=appsync_cloud_watch_role.role_arn,
exclude_verbose_content=False,
field_log_level="ALL")
This line cloud_watch_logs_role_arn=appsync_cloud_watch_role.role_arn,
gives AppSync permissions to push logs to CloudWatch. Here's how we define the role and attach its policies.
cloud_watch_role_full_access = iam.ManagedPolicy.from_managed_policy_arn(self, "cloudWatchLogRole",
'arn:aws:iam::aws:policy/CloudWatchLogsFullAccess')
appsync_cloud_watch_role = iam.Role(self, "AppSyncCloudWatchRole",
assumed_by=iam.ServicePrincipal("appsync.amazonaws.com"),
managed_policies=[
cloud_watch_role_full_access
])
After creating the API, the next logical step is to attach the schema. We use the CfnGraphQLSchema
method from the AppSync class to achieve this. This method takes in a scope, an id, an api_id which should be a unique AWS Appsync GraphQL API identifier, and a definition (the schema file itself).
dirname = path.dirname(__file__)
with open(os.path.join(dirname, "../schema.graphql"), 'r') as file:
data_schema = file.read().replace('\n', '')
schema = appsync.CfnGraphQLSchema(scope=self, id="schema", api_id=api.attr_api_id, definition=data_schema)
Defining the Queue
Let's define and attach the SQS queue to AppSync. Firstly, we import the SQS class from cdk.
import aws_cdk.aws_sqs as sqs
Then, we'll create 2 queues and use one as the Dead letter queue (DLQ). A Dead letter Queue is a message queue that'll store all the messages that couldn't be processed successfully. The developer can always go back to the DLQ and redrive the unsuccessful messages.
# SQS
queue = sqs.CfnQueue(
self, "CdkAccelerateQueue",
visibility_timeout=300,
queue_name="sqs-queue"
)
deadLetterQueue = sqs.Queue(
self, "CdkAccelerateDLQueue",
visibility_timeout=Duration.minutes(10),
queue_name="dead-letter-queue"
)
sqs.DeadLetterQueue(max_receive_count=4, queue=deadLetterQueue)
The visibility_timeout
is the time taken for a consumer to process and delete a message once dequeued. While this timeout is valid, the message is made unavailable to other consumers. If the timeout expires when the message hasn't been successfully processed and delivered, the message is sent back into the queue and made available for other consumers to pick up.
If you don't specify a value for the visibility_timeout
, AWS CloudFormation uses the default value of 30 seconds. Default: Duration.seconds(30)
The max_receive_count
is the number of times a message can be unsuccessfully dequeued before being moved to the dead-letter queue. We set the value to 4, meaning after 4 unsuccessful dequeue attempts, that message would be sent to the DLQ.
Now, let's attach the SQS queue to Appsync.
api.add_dependency(queue)
That's all. Remember the name of the GraphQL API we created above was api
.
Defining DynamoDB Resources
Import the dynamodb class for cdk.
import aws_cdk.aws_dynamodb as dynamodb
Create a table called ORDER
with a composite key.
user_id
for the primary key and id
as the sort key.
# DynamoDB
dynamodb.CfnTable(self, "Table",
key_schema=[dynamodb.CfnTable.KeySchemaProperty(
attribute_name="user_id",
key_type="HASH"
),
dynamodb.CfnTable.KeySchemaProperty(
attribute_name="id",
key_type="RANGE"
)],
billing_mode="PAY_PER_REQUEST",
table_name="ORDER",
attribute_definitions=[dynamodb.CfnTable.AttributeDefinitionProperty(
attribute_name="user_id",
attribute_type="S"
),
dynamodb.CfnTable.AttributeDefinitionProperty(
attribute_name="id",
attribute_type="S"
)]
)
Defining SNS Resources
Create an SNS topic with the topic name sns-topic
. As usual, we'll import the SNS class from AWS CDK
from aws_cdk import aws_sns as sns
Then use CfnTopic
and CfnTopicPolicy
methods from the SNS class to create and grant policies to the SNS topic.
cfn_topic = sns.CfnTopic(self, "MyCfnTopic",
display_name="sns-topic",
fifo_topic=False,
topic_name="sns-topic"
)
sns_publish_policy = sns.CfnTopicPolicy(self, "MyCfnTopicPolicy",
policy_document=iam.PolicyDocument(
statements=[iam.PolicyStatement(
actions=["sns:Publish", "sns:Subscribe"
],
principals=[iam.AnyPrincipal()],
resources=["*"]
)]
),
topics=[cfn_topic.attr_topic_arn]
)
Bear in mind that, we've assumed the client is already signed in to our application at this point. So we have their email address. We'll use that email address to subscribe to the SNS topic to receive email updates on success
or failed
order payments.
For the purpose of this tutorial, we'll pass in the email address as a parameter from the CLI, when deploying the application later.
For now, we need to add email
as a subscriber using the CfnSubscription
method from the SNS class.
email_address = CfnParameter(self, "subscriptionEmail")
sns.CfnSubscription(self, "EmailSubscription",
topic_arn=cfn_topic.attr_topic_arn,
protocol="email",
endpoint=email_address.value_as_string
)
Conclusion
In this episode,
We Created A GraphQL API and attached a schema to it.
Created and attached a queue to the AppSync API.
Created a Dead Letter Queue to catch unprocessed messages.
Created an SNS topic, with an email as a subscriber.
Gave IAM roles and policies to all created resources.
In the next episode, we'll continue creating more resources for the API. Please stay tuned. Thanks for reading. If you enjoyed it, please leave a like or a comment. I'll love to know what you think. Take Care.