Dynamic state machines with AWS Lambda, SST and XState

Photo by Martin Adams on Unsplash

Introduction

I had a requirement to run dynamic state machines in serverless on AWS.

A dynamic machine consisted of a few well known states, but the states could be configured into any design that a user wanted.

Dynamic state machines

The configuration would be stored in JSON and used later to recreate the users machine.

Step functions won't work here

AWS provides step functions to run state machines but it's designed more for running pre-designed state machines.

You're limited to 10,000 state machine definitions by default.

This is OK if you have < 10k known machine configurations, but it isn't enough where you have to enable multiple dynamic machines for each user in your system.

State machine in lambda

The alternative is to run the machine in a single lambda using some state machine library.

Running a state machine in lambda poses some interesting challenges.

  1. How do you define the machine?
  2. How do you run the machine safely on demand?
  3. How do you persist the machine state?

Defining a machine with XState

XState is the most popular, well tested, state machine library in javascript-land. It has great tooling, including a neat graphical state machine designer that can output code.

See https://stately.ai/docs/xstate for the docs.

With XState your machine is all based on code. And you can provide implementations via an options object making dynamic machines possible.

A simple machine configuration might look like this.

const countMachine = createMachine({
  context: {
    count: 0,
  },
  on: {
    INC: {
      actions: assign({
        count: ({ context }) => context.count + 1,
      }),
    },
    DEC: {
      actions: assign({
        count: ({ context }) => context.count - 1,
      }),
    },
    SET: {
      invoke: {
        id: 'emailNewCount',
        src: 'emailNewCount',
      }
      actions: assign({
        count: ({ event }) => event.value,
      }),
    },
  },
}, implementations);

const implementations = {
  actors: {
    emailNewCount: fromPromise((context) => {
      return someEmailService.send(context.count);
    }),
  }
}

So now implementaions and even machine step config for a discrete state type in our system can be combined in a nice way.

In the following code I define the implementation for a method in an object and i just reference the implementation by src in the machine config.

export const askAQuestion = {
    steps: { SET: {
      invoke: {
        id: 'emailNewCount',
        src: 'emailNewCount',
      }
      actions: assign({
        count: ({ event }) => event.value,
      }),
    },},
    implementations: {
      actors: {
        emailNewCount: fromPromise((context) => {
          return someEmailService.send(context.count);
        }),
      }
    }
}

You can see how this could be extended to include more steps and more implementations.

Then just map/deserialise configuration for a new machine instance from some json to objects like above, and spread them into a creaetMachine() config.

Running a machine in lambda

State machines are a bit tricky in lambda because you only want to run them when they have work to do. You want them to sleep when they have reached certain states requiring input and you absolutely only want one instance of a given machine running at one time.

The basic architecture will be

  1. Some ingress or source that receives a request to start a machine or send an event to a machine (http lambda here for example)
  2. A FIFO queue to ensure that only one instance of a machine is running at a time. The MessageGroupId will be important here.
  3. A lambda that runs the machine and sends events to it
  4. A dynamodb table to persist the machine state

Architecture

If you use SST you can define all of this in a few lines of code.

// I'll ignore the ingress http lambda here for brevity.
// But something must send a message to the queue with the event to send to the machine.

export function StateMachineStack({stack}: StackContext) {
  // create a db for storing the state. Xstate
    const db = new DynamoDBTable(stack, 'db', {
      fields: {
        id: { type: FieldType.STRING },
      },
      primaryIndex: { partitionKey: 'id' },
    });

    const queue = new SQSQueue(stack, 'stateMachineQueue', {
      consumer: {
        function: {
          handler: 'src/queue.handleMachineEvent', // this is where we will do the work!
          timeout: "30 seconds",
          permissions: [db]
          retryAttempts: 1,
        },
        cdk: {
          eventSource: {
            batchSize: 1, // process only one message per group id at a time
          },
        }
      },
      cdk: {
        queue: {
          fifo: true, // queue must be fifo to ensure only one instance of a machine is running at a time
          contentBasedDeduplication: true, // and dedupe on the machine id via MessageGroupId when queueing messages
          visibilityTimeout: Duration.seconds(60),
        // add a dlq here
        }
      }
    });

    queue.bind([db]);
    // pass the queue url to http lambdas or anywhere else you need it
    // you can use envrionment or sst config for this
    return { db, queue };
}

Now you can send a message to the queue with the machine id and the event to send to the machine.

SST makes it pretty easy to pass the queue url to other lambdas or services. Refer to the SST docs for more info.

// Send a message to machine via the queue from some other lambda or service
    const command = new SendMessageCommand({
      QueueUrl: queueUrl,
      MessageBody: JSON.stringify({
        machineId: machineId,
        event: event,
      }),
      MessageGroupId: machineId,
    });

Now we need to write the code that will run the machine.

// This is src/queue.ts from the SST config

import { dbClient } from './dbClient'; // i won't implement this stuff here. just get and update a record in dynamodb
import { SQSEvent } from 'aws-lambda';
import {Snapshot, waitFor } from "xstate"

export async function handleMachineEvent(event: SQSEvent, _context:unknown) {

  for (const event of event.Records){
    const eventBody = JSON.parse(event.body);
    const machineContext = await dbClient.getMachineContext(eventBody.machineId);

    // I wont implement this mapper but you need something to create xstate compatible configuration
    // objects from some serialised definition dsl
    const customStateConfiguration = mapCustomStateConfiguration(machineContext.customMachineDefinition);
    const initialStateId = Object.keys(customStateConfiguration)[0];

    const machine = createMachine({
      context: ({input}) => ({input}),
      id: machineContext.id,
      initial: initialStateId,
      states:{
        ...customStateConfiguration.states,
        // you can add constant states here too like "done" or "error". or "idle" as initial.
        DONE: {
          type: 'final',
        },
        ERROR: {
          type: 'final',
        },
      }
     // the implementations for actions in custom steps
    }, customStateConfiguration.implementations);

    // now create the actor to run the machine
    // with our input context and the stored snapshow (or null if this is a new machine)
    //(xstate has excellent docs on actors)
    const hydratedActor = createActor(machine, {
      input: machineContext.input, // input is the "variables" to start the machine with
      snapshot: machineContext.snapshot});

    hydratedActor.start();

    // run until the machine is in a final state or error or matches a special tag we set on
    // states where we know we need new information from the user (see xstate docs on tags)
    await waitFor(hydratedActor, (state) => {
      return state.status !== "active" || state.tags.has("awaitingInput");
    },{
      timeout: 25_000 // should be less than the lambda timeout so the thing can save state
    });

    // now save the state and snapshot
    await dbClient.updateMachineContext(eventBody.machineId, {
      snapshot: hydratedActor.getPersistedSnapshot(),
    });

    // finally stop the machine to shut down the lambda
    hydratedActor.stop();
  }

}

Now all your logic can live in nicely defined state machine blocks with actions, guards, services, etc.

One thing I noticed is that the xstate snapshot contains some nulls that the dynamoDbClient object mapper doesn't like.

So just use the JSON.stringify serialiser and store as a string.

Conclusion

This is a pretty simple way to run dynamic state machines in lambda.

XState does all the heavy lifting for us - defining and running the machine, serialising the state. We just provide some AWS primitives via SST to make it all work.

I hope this helps someone else out there trying to do the same thing!

Hit me up on twitter if you have any questions or comments.