Building an Event-Driven Application with AWS AppSync, EventBridge, and Step Functions

Building an Event-Driven Application with AWS AppSync, EventBridge, and Step Functions

Over the past years, event-driven architecture has become more and more popular. With the arrival of services such as EventBridge and its integration with many other AWS services, building such applications has become easier than ever.

Last year, AWS announced support for Amazon EventBridge as a Data Source for AWS AppSync, and more recently, it was the turn of EventBridge to integrate with AppSync. The two-way integration between those two services opens many opportunities.

In this article, I will show you how you can create an event-driven system with real-time updates using AWS AppSync, EventBridge, and Step Functions.

💡
To maintain brevity, this article assumes a certain level of familiarity with those services.

What We Will Build

We will build a simple food-ordering service where users can place an order for their meal. The order will be received by the back-end through an AWS AppSync GraphQL API. The Orders service will process it and send real-time updates to the user about the different status updates (e.g. Paid, Preparing, Out for delivery, Delivered, etc.). The Orders service is a Step Functions workflow that orchestrates the different steps of the process, while Amazon EventBridge asynchronously coordinates all the services.

Here is a high-level diagram of what this looks like.

To build this, we will use the CDK as our Infrastructure as Code (IaC).

👨‍💻
Just want to see the code? You can find the final solution on GitHub.

The AppSync API

First, we'll need an AppSync API. It will be the entry point for our users to place orders. Here is the simplified schema.

type Product {
  name: String!
  quantity: Int!
}

type Order {
  id: ID!
  status: OrderStatus!
  products: [Product]!
  createdAt: AWSDateTime!
  updatedAt: AWSDateTime!
}

input ProductInput {
  name: String!
  quantity: Int!
}

input CreateOrderInput {
  products: [ProductInput!]!
}

input NotifyOrderUpdatedInput {
  id: ID!
  status: OrderStatus!
  updatedAt: AWSDateTime!
}

type OrderUpdated @aws_api_key @aws_iam {
  id: ID!
  status: OrderStatus!
  updatedAt: AWSDateTime!
}

enum OrderStatus {
  PENDING
  PAID
  PREPARING
  OUT_FOR_DELIVERY
  DELIVERED
  CANCELLED
}

type Mutation {
  createOrder(input: CreateOrderInput!): Order
  notifyOrderUpdated(input: NotifyOrderUpdatedInput!): OrderUpdated! @aws_iam
}

type Subscription {
  onOrderUpdated(id: ID!): OrderUpdated
    @aws_subscribe(mutations: ["notifyOrderUpdated"])
}

The schema contains 2 mutations:

  • createOrder: can be used by the front end to place an order

This mutation is attached to a pipeline resolver with two functions: createOrder and putEvent.

createOrder uses a DynamoDB data source to persist the order in a table, and prepares an order.created event containing its details before storing it in the stash.

// createOrder resolver
import { util } from '@aws-appsync/utils';
import { put } from '@aws-appsync/utils/dynamodb';

export const request = (ctx) => {
  const order = {
    id: util.autoId(),
    status: 'PENDING',
    products: ctx.arguments.input.products,
    createdAt: util.time.nowISO8601(),
    updatedAt: util.time.nowISO8601(),
  };

  return put({
    key: {
      id: order.id,
    },
    item: order,
  });
};

export const response = (ctx) => {
  // Here, we prepare the `order.created` event for the next step
  ctx.stash.event = { detailType: 'order.created', detail: ctx.result };

  return ctx.result;
};

putEvent is attached to an EventBridge data source. It receives an event (from the stash) and puts it into the event bus. We'll see later how it is used.

// putEvent resolver
import { util } from '@aws-appsync/utils';

export function request(ctx) {
  const { event } = ctx.stash;

  if (!event) {
    console.error('No event found in stash');
    util.error('InternalError');
  }

  return {
    operation: 'PutEvents',
    events: [
      {
        source: 'order.api',
        ...event,
      },
    ],
  };
}

export function response(ctx) {
  return ctx.prev.result;
}
  • notifyOrderUpdate will be used by the back end to send real-time updates.

It is attached to a resolver with a None data source because this mutation does not persist anything in any store. Its only purpose is to trigger a notification to the onOrderUpdated subscription.

The API also uses two authorizers: API_KEY (for users to place orders) and IAM for the back end to call the notifyOrderUpdated mutation (this is a requirement from the EventBridge integration).

💡
In a real app, we'd probably use a Cognito User Pool, or OIDC, for user-facing authentication. I used an API key for simplicity.

The Step Functions State Machine

The second component of our application is a Step Functions state machine. This state machine orchestrates the different steps of processing an order: payment, waiting for the restaurant to prepare the meal, waiting for delivery, etc.

For simplicity, I used wait tasks to simulate the workflow. In a real application, the state machine would wait for real processes (e.g. payment gateway), or humans (e.g. meal preparation) before transitioning between the different states (hint: probably using the callback pattern).

The most important part is the EventBridgePutEvents tasks. After every step in the order processing workflow, it receives the updated order and puts an event into the event bus with all its details. In the next section, we'll see how they are being handled.

// lib/state-machine-construct.ts

createNotifyUpdate(name: string) {
  return new EventBridgePutEvents(this, `Notify ${name}`, {
    entries: [
      // generates an event from the task input
      {
        source: 'order.processing',
        detailType: 'order.updated',
        detail: TaskInput.fromObject({
          'order.$': '$.order',
        }),
        eventBus: this.eventBus,
      },
    ],
    resultPath: '$.eventBridgeResult',
  });
}

Put It All Together

So far, we've learned about the AppSync API, and the Step Functions workflow. Did you notice that both those services place events into EventBridge? What I haven't shown you yet is how those events are used and how they play together.

In our application, the first thing that happens is a user placing an order. That's the starting point of the whole process, and it results in an order.created event. If you look at our state machine construct, you'll notice that it subscribes to those events to start a new execution.

new Rule(this, 'OrderHandlerRule', {
  eventBus: eventBus,
  eventPattern: {
    // subscribe to order.created events, coming from the api.
    source: ['order.api'],
    detailType: ['order.created'],
  },
  targets: [
    new SfnStateMachine(sm, {
      input: RuleTargetInput.fromObject({
        order: EventField.fromPath('$.detail'),
      }),
    }),
  ],
});

This means, that each time an order is placed, a Step Functions workflow will start with the order details as its input.

Then, as the state machine goes over all the different steps of the order processing, it emits order.updated events, which in turn are picked up by an EventBridge-AppSync rule.

new CfnRule(scope, 'UpdateOrder', {
  eventBusName: eventBus.eventBusName,
  eventPattern: {
    source: ['order.processing'],
    'detail-type': ['order.updated'],
  },
  targets: [
    {
      id: 'OrderUpdated',
      arn: (api.node.defaultChild as CfnGraphQLApi).attrGraphQlEndpointArn,
      roleArn: ebRuleRole.roleArn,
      appSyncParameters: {
        graphQlOperation: `mutation NotifyOrderUpdated($input: NotifyOrderUpdatedInput!) { notifyOrderUpdated(input: $input) { id status updatedAt } }`,
      },
      inputTransformer: {
        inputPathsMap: {
          id: '$.detail.order.id',
          status: '$.detail.order.status',
          updatedAt: '$.detail.order.updatedAt',
        },
        inputTemplate: JSON.stringify({
          input: {
            id: '<id>',
            status: '<status>',
            updatedAt: '<updatedAt>',
          },
        }),
      },
    },
  ],
});

This is a direct integration between EventBridge and AWS AppSync. EventBridge invokes the notifyOrderUpdated mutation and uses the event's details attributes to build the input variables of the request. In turn, the mutation emits a message to all subscribers of the onOrderUpdated subscription.

💡
At the time of writing this article, there is no L2 construct for the EventBridge-AppSync integration, but we can use the L1 CfnRule construct.

With all this in place, the front end can use the onOrderUpdate subscription to get real-time updates as soon as it places an order.

subscription OnOrderUpdated($id: ID!) {
  onOrderUpdated(id: $id) {
    id
    status
    updatedAt
  }
}

Here is a simulation using GraphBolt. Look at the subscription messages coming in. Pay attention to the status field.

💡
GraphBolt is a desktop app to build, test, debug, and manage AWS AppSync APIs. Try it for free today!

Conclusion

Leveraging event-driven architecture with AWS AppSync, Step Functions, and EventBridge offers a seamless solution for building efficient, asynchronous, and real-time applications. In this article, I explained with a practical use case how you can achieve it, and the best part is that we almost did not write any code for it (outside the infrastructure).

If you liked this content, please share! You can also follow me on X, Hashnode, and LinkedIn for more, or subscribe to this blog's newsletter.

Thank you!