Building an Event-Driven Application with AWS AppSync, EventBridge, and Step Functions
Over the past years, event-driven architecture has become more and more popular. With the arrival of services such as EventBridge and its integration with many other AWS services, building such applications has become easier than ever.
Last year, AWS announced support for Amazon EventBridge as a Data Source for AWS AppSync, and more recently, it was the turn of EventBridge to integrate with AppSync. The two-way integration between those two services opens many opportunities.
In this article, I will show you how you can create an event-driven system with real-time updates using AWS AppSync, EventBridge, and Step Functions.
What We Will Build
We will build a simple food-ordering service where users can place an order for their meal. The order will be received by the back-end through an AWS AppSync GraphQL API. The Orders service will process it and send real-time updates to the user about the different status updates (e.g. Paid, Preparing, Out for delivery, Delivered, etc.). The Orders service is a Step Functions workflow that orchestrates the different steps of the process, while Amazon EventBridge asynchronously coordinates all the services.
Here is a high-level diagram of what this looks like.
To build this, we will use the CDK as our Infrastructure as Code (IaC).
The AppSync API
First, we'll need an AppSync API. It will be the entry point for our users to place orders. Here is the simplified schema.
type Product {
name: String!
quantity: Int!
}
type Order {
id: ID!
status: OrderStatus!
products: [Product]!
createdAt: AWSDateTime!
updatedAt: AWSDateTime!
}
input ProductInput {
name: String!
quantity: Int!
}
input CreateOrderInput {
products: [ProductInput!]!
}
input NotifyOrderUpdatedInput {
id: ID!
status: OrderStatus!
updatedAt: AWSDateTime!
}
type OrderUpdated @aws_api_key @aws_iam {
id: ID!
status: OrderStatus!
updatedAt: AWSDateTime!
}
enum OrderStatus {
PENDING
PAID
PREPARING
OUT_FOR_DELIVERY
DELIVERED
CANCELLED
}
type Mutation {
createOrder(input: CreateOrderInput!): Order
notifyOrderUpdated(input: NotifyOrderUpdatedInput!): OrderUpdated! @aws_iam
}
type Subscription {
onOrderUpdated(id: ID!): OrderUpdated
@aws_subscribe(mutations: ["notifyOrderUpdated"])
}
The schema contains 2 mutations:
createOrder
: can be used by the front end to place an order
This mutation is attached to a pipeline resolver with two functions: createOrder
and putEvent
.
createOrder
uses a DynamoDB data source to persist the order in a table, and prepares an order.created
event containing its details before storing it in the stash.
// createOrder resolver
import { util } from '@aws-appsync/utils';
import { put } from '@aws-appsync/utils/dynamodb';
export const request = (ctx) => {
const order = {
id: util.autoId(),
status: 'PENDING',
products: ctx.arguments.input.products,
createdAt: util.time.nowISO8601(),
updatedAt: util.time.nowISO8601(),
};
return put({
key: {
id: order.id,
},
item: order,
});
};
export const response = (ctx) => {
// Here, we prepare the `order.created` event for the next step
ctx.stash.event = { detailType: 'order.created', detail: ctx.result };
return ctx.result;
};
putEvent
is attached to an EventBridge data source. It receives an event (from the stash) and puts it into the event bus. We'll see later how it is used.
// putEvent resolver
import { util } from '@aws-appsync/utils';
export function request(ctx) {
const { event } = ctx.stash;
if (!event) {
console.error('No event found in stash');
util.error('InternalError');
}
return {
operation: 'PutEvents',
events: [
{
source: 'order.api',
...event,
},
],
};
}
export function response(ctx) {
return ctx.prev.result;
}
notifyOrderUpdate
will be used by the back end to send real-time updates.
It is attached to a resolver with a None data source because this mutation does not persist anything in any store. Its only purpose is to trigger a notification to the onOrderUpdated
subscription.
The API also uses two authorizers: API_KEY
(for users to place orders) and IAM
for the back end to call the notifyOrderUpdated
mutation (this is a requirement from the EventBridge integration).
The Step Functions State Machine
The second component of our application is a Step Functions state machine. This state machine orchestrates the different steps of processing an order: payment, waiting for the restaurant to prepare the meal, waiting for delivery, etc.
For simplicity, I used wait tasks to simulate the workflow. In a real application, the state machine would wait for real processes (e.g. payment gateway), or humans (e.g. meal preparation) before transitioning between the different states (hint: probably using the callback pattern).
The most important part is the EventBridgePutEvents
tasks. After every step in the order processing workflow, it receives the updated order and puts an event into the event bus with all its details. In the next section, we'll see how they are being handled.
// lib/state-machine-construct.ts
createNotifyUpdate(name: string) {
return new EventBridgePutEvents(this, `Notify ${name}`, {
entries: [
// generates an event from the task input
{
source: 'order.processing',
detailType: 'order.updated',
detail: TaskInput.fromObject({
'order.$': '$.order',
}),
eventBus: this.eventBus,
},
],
resultPath: '$.eventBridgeResult',
});
}
Put It All Together
So far, we've learned about the AppSync API, and the Step Functions workflow. Did you notice that both those services place events into EventBridge? What I haven't shown you yet is how those events are used and how they play together.
In our application, the first thing that happens is a user placing an order. That's the starting point of the whole process, and it results in an order.created
event. If you look at our state machine construct, you'll notice that it subscribes to those events to start a new execution.
new Rule(this, 'OrderHandlerRule', {
eventBus: eventBus,
eventPattern: {
// subscribe to order.created events, coming from the api.
source: ['order.api'],
detailType: ['order.created'],
},
targets: [
new SfnStateMachine(sm, {
input: RuleTargetInput.fromObject({
order: EventField.fromPath('$.detail'),
}),
}),
],
});
This means, that each time an order is placed, a Step Functions workflow will start with the order details as its input.
Then, as the state machine goes over all the different steps of the order processing, it emits order.updated
events, which in turn are picked up by an EventBridge-AppSync rule.
new CfnRule(scope, 'UpdateOrder', {
eventBusName: eventBus.eventBusName,
eventPattern: {
source: ['order.processing'],
'detail-type': ['order.updated'],
},
targets: [
{
id: 'OrderUpdated',
arn: (api.node.defaultChild as CfnGraphQLApi).attrGraphQlEndpointArn,
roleArn: ebRuleRole.roleArn,
appSyncParameters: {
graphQlOperation: `mutation NotifyOrderUpdated($input: NotifyOrderUpdatedInput!) { notifyOrderUpdated(input: $input) { id status updatedAt } }`,
},
inputTransformer: {
inputPathsMap: {
id: '$.detail.order.id',
status: '$.detail.order.status',
updatedAt: '$.detail.order.updatedAt',
},
inputTemplate: JSON.stringify({
input: {
id: '<id>',
status: '<status>',
updatedAt: '<updatedAt>',
},
}),
},
},
],
});
This is a direct integration between EventBridge and AWS AppSync. EventBridge invokes the notifyOrderUpdated
mutation and uses the event's details
attributes to build the input variables of the request. In turn, the mutation emits a message to all subscribers of the onOrderUpdated
subscription.
CfnRule
construct.With all this in place, the front end can use the onOrderUpdate
subscription to get real-time updates as soon as it places an order.
subscription OnOrderUpdated($id: ID!) {
onOrderUpdated(id: $id) {
id
status
updatedAt
}
}
Here is a simulation using GraphBolt. Look at the subscription messages coming in. Pay attention to the status
field.
Conclusion
Leveraging event-driven architecture with AWS AppSync, Step Functions, and EventBridge offers a seamless solution for building efficient, asynchronous, and real-time applications. In this article, I explained with a practical use case how you can achieve it, and the best part is that we almost did not write any code for it (outside the infrastructure).
If you liked this content, please share! You can also follow me on X, Hashnode, and LinkedIn for more, or subscribe to this blog's newsletter.
Thank you!