Tailing the CosmosDB Change Feed

This will hopefully be a short post about how to listen to changes from your CosmosDB SQL API using the Node SDK. For some reasons, it reminds me of the oplog tailing mechanism in MongoDB that Meteor utilized in the past to achieve real-time, hence this article's title.

I do not have much time right now, but let us talk about a bit of my use case for this. I am currently working on an IoT project that uses several Azure technologies as its infrastructure. As you might have guessed, it pipes data from the IoT sensors into Azure Event Hubs. The data then gets ingested in Azure Databricks; ultimately ending up in an intermediate data warehouse (for dashboards and further analysis) and a NoSQL application data store (coming up next).

This NoSQL application data store is the CosmosDB instance that we are going to tail. The objective was to push changes from this data store to a mobile application in real-time or at least real-time (vs long polling or querying in set intervals). To make the long story short, I ended up tailing the CosmosDB's Change Feed in a GraphQL service application to make it easier for the client application to implement a PubSub system. More about this in an older post that I published early this year.

Before we dig into the code, let me just say that we are not going to go through how to initialize your CosmosDB collection or how to write a facade / repository class in NodeJS. We will go straight to the Change Feed part but first, make sure you have the correct Node SDK in your project:

npm install --save @azure/cosmos

Once you have set up your CosmosDB boilerplate code in Node, you can access the change feed iterator using the code below:

const changeFeed = container.items.readChangeFeed(partitionKey, options);

If you need to brush up on iterators in JavaScript, you can check this post that I have written last year).

And that's it! You now have a change feed iterable where you can listen to changes from CosmosDB, but let us see what options we have in doing so. The Node SDK currently gives us four ways of listening to the change feed and they are:

1. Start from now

const options = {};

2. Start from a continuation

const { headers } = await container.items.create({ ... });
const lsn = headers["lsn"];
const options = { continuation: lsn };
// I have not used this specific method yet, this example is from here.

3. Start from a specific point in time

const options = { startTime: new Date() }; // any Date object will do

4. Start from the beginning

const options = { startFromBeginning: true };

As all of these are self-explanatory, we will go ahead and use our changeFeed iterator. To retrieve the next value, we can use await changeFeed.executeNext() or we can loop through the next values like this:

while (changeFeed.hasMoreResults) {
  const { result } = await changeFeed.executeNext();
  // do what you want with the result / changes
}

Reading the source code of the Node SDK revealead that it is also exposing a real generator (the function signature being public async *getAsyncIterator(): AsyncIterable<ChangeFeedResponse<Array<T & Resource>>>). This would have allowed a more elegant for of construct, but unfortunately I bumped into a few issues regarding Symbols when I tried it. If you have used it in the past, please feel free to share in the comments!

That will be all for now, and I hope you learned something in this post.

Real-time GraphQL Subscriptions Part 1: Server

In this post, we will not go through GraphQL from the ground up. My assumption is that you are here to learn how to implement real-time through the use of the graphql-subscription package and that you already have a basic understanding of GraphQL types, queries and mutations.

We are going to use Apollo for this tutorial. Apollo was made by the same guys who made Meteor and is a bit opinionated but is also arguably one of the most popular full-featured GraphQL libraries around. We will also use React and create-react-app to bootstrap our client application on the second part of this tutorial. That being said, some knowledge of higher order components is also assumed (in Part 2).

Server Boilerplate

Let's start outlining our backend. Initialize a Node project by issuing npm init on your preferred folder, and then install dependencies like so:

npm i --save express body-parser cors graphql graphql-tools apollo-server-express

Next, create the three files that we will use for this short tutorial:

touch index.js resolvers.js schema.js

We will then define a type, a root query and a mutation that we will use for our subscription:

const { makeExecutableSchema } = require('graphql-tools');
const resolvers = require('./resolvers');

const typeDefs = `
  type Message {
    message: String
  }

  type Query {
    getMessages: [Message]
  }

  type Mutation {
    addMessage(message: String!): [Message]
  }

  schema {
    query: Query
    mutation: Mutation
  }
`;

module.exports = makeExecutableSchema({ typeDefs, resolvers });

Okay, so at this point, we have the schema for a GraphQL server that allows you to send a mutation to add a Message, and a query that allows you fetch all messages in the server. Let's implement resolvers.js so we can start using our schema:

const messages = [];

const resolvers = {
  Query: {
    getMessages(parentValue, params) {
      return messages;
    }
  },
  Mutation: {
    addMessage(parentValue, { message }) {
      messages.push({ message });
      return messages;
    }
  }
};

module.exports = resolvers;

Oh shoot. We have defined a schema and the functions that will resolve their return values, but we have not set our server up. At least not yet. We are going to use express and apollo-server-express to serve our GraphQL implementation through HTTP:

const express = require('express');
const bodyParser = require('body-parser');
const cors = require('cors');
const { graphqlExpress, graphiqlExpress } = require('apollo-server-express');
const { createServer } = require('http');

const schema = require('./schema');

const app = express();

app.use(bodyParser.json());
app.use(bodyParser.urlencoded({ extended: true }));

app.use(cors());
app.use(
  '/graphql',
  graphqlExpress({
    schema
  })
);

app.use(
  '/graphiql',
  graphiqlExpress({
    endpointURL: '/graphql'
  })
);

const PORT = process.env.PORT || 3030;

const server = createServer(app);
server.listen(PORT, () => {
  console.log(`Server now running at port ${PORT}`);
});

We can now have a working GraphQL server running on http://localhost:3030/graphql by issuing node index.js. Since we have configured the interactive Graphiql as well, we can explore our schema, and issue some sample queries and mutations on http://localhost:3030/graphiql:

Adding real-time through Subscriptions and PubSub

Server Configuration

Our simple GraphQL server is running. That means we can now proceed to the interesting part: implementing real-time through Apollo PubSub. As with all modern real-time frameworks, the implementation is often done on top of WebSockets. We need to install additional dependencies to make use of this transport layer:

npm i --save graphql-subscriptions subscriptions-transport-ws

We then need to make use of these libraries to enable WebSockets support on index.js:

const { execute, subscribe } = require('graphql');
const { SubscriptionServer } = require('subscriptions-transport-ws');

. . .

const server = createServer(app);
server.listen(PORT, () => {
    console.log(`Server now running at port ${PORT}`);
    new SubscriptionServer(
        {
            execute,
            subscribe,
            schema
        },
        {
            server,
            path: '/subscriptions'
        }
    );
});

Let's modify our /graphiql endpoint as well to make use of our new transport layer, so we can demonstrate that this is working through Graphiql once we are done:

app.use(
    '/graphiql',
    graphiqlExpress({
        endpointURL: '/graphql',
        subscriptionsEndpoint: 'ws://localhost:3030/subscriptions'
    })
);

That's it for the server setup! Let's proceed on fleshing out the subscription implementation.

Defining Subscriptions

In GraphQL, a subscription is just a type, pretty much like query and mutation. Go ahead and define our subscription on our schema.js:

  const typeDefs = `
  
  . . .
  
  type Subscription {
    newMessageAdded: Message
  }

  schema {
    query: Query
    mutation: Mutation
    subscription: Subscription
  }
`;

We have just defined our first subscription. It will allow applications or clients to subscribe and receive updates whenever new messages are added (through a mutation). Just to make sure everything is working correctly, visit Documentation Explorer on Graphiql and you should now be able to see Subscription and newMessageAdded:

If there are no errors and you can see the definition above, then we are ready to make this work by, you guessed it, implementing the resolver function for newMessageAdded.

Implementing the Subscription and Publishing Messages

With the transport configuration and the type definitions done, the only thing we need to do now is to implement newMessageAdded and the actual message publication. The flow will be like this:

1. A client will subscribe to `newMessageAdded`
2. Every time our `addMessage` mutation is queried, we will publish a message to `newMessageAdded`, using the new `message` as the payload.

We need to tweak our resolvers.js to import helpers from graphql-subscriptions. We will use them to implement our newMessageAdded subscription query:

const { PubSub, withFilter } = require('graphql-subscriptions');
const pubsub = new PubSub();

. . .

const resolvers = {
  Query: {
    . . .
  },
  Mutation: {
    . . .
  },
  Subscription: {
    newMessageAdded: {
      subscribe: withFilter(
        () => pubsub.asyncIterator('newMessageAdded'),
        (params, variables) => true
      )
    }
  }
};

module.exports = resolvers;

We just implemented our first subscription query! Every time our server publishes a message to newMessageAdded, clients that are subscribed will get the published payload.

As an aside, the helper function withFilter is not actually required nor used in our example here (just subscribe: () => pubsub.asyncIterator('newMessageAdded') will do for this tutorial), but I figured that this will be helpful if you want to try something useful with this whole pubsub ordeal, like say, a classic chat app.
The second function that you pass as an argument to withFilter will allow you to filter out the subscribers who will receive the message. This is done by using the field in the actual payload that is about to get published (params) and the GraphQL query variables from the subscription (variables). All you need to do is to return a truthy value if you want it sent to this particular subscriber. It will look roughly similar to this: return params.receiverId === variables.userId. Of course, that is assuming that a query variable called userId was sent along with the subscription.

Since we do not have an application that will subscribe to our server yet, why don't we try this out with Graphiql?

If you can see the same message above, great! Everything is working awesome. But if we do not publish anything anywhere on our server, nothing will happen. Yep, we are about to do just that.

In fact, we just need to add one line to our addMessage resolver:

  Mutation: {
    addMessage(parentValue, { message }) {
      messages.push({ message });
      
      // blame prettier for not making this a one-liner 
      pubsub.publish('newMessageAdded', {
        newMessageAdded: { message }
      });
      
      return messages;
    }
  }

We can now test this using Graphiql on two browser windows. The first browser will be the subscriber, and the second one will send the mutations:

As soon as you send a addMessage mutation on the second browser, the first browser receives the message, and displays it instantly! How cool is that? Let's wrap up what we learned in this short tutorial.

Wrap up

In this tutorial, we learned how to set up subscriptions and publish message across subscribers using graphql-subscriptions. On the next part of this tutorial, we will use apollo-client with react-apollo to see how this will work with a real application as the subscriber.

The complete source code for this tutorial can be found here

If you encountered any errors or have any questions about this, let me know in the comments section below!