Integrating MongoDB and Amazon Kinesis for Intelligent, Durable Streams

This article was originally published on MongoDB. Thank you for supporting the partners who make SitePoint possible.

You can build your online, operational workloads atop MongoDB and still respond to events in real time by kicking off Amazon Kinesis stream processing actions, using MongoDB Stitch Triggers.

Let’s look at an example scenario in which a stream of data is being generated as a result of actions users take on a website. We’ll durably store the data and simultaneously feed a Kinesis process to do streaming analytics on something like cart abandonment, product recommendations, or even credit card fraud detection.

We’ll do this by setting up a Stitch Trigger. When relevant data updates are made in MongoDB, the trigger will use a Stitch Function to call out to AWS Kinesis, as you can see in this architecture diagram:

What you’ll need to follow along

  1. An Atlas instance
    If you don’t already have an application running on Atlas, you can follow our getting started with Atlas guide here. In this example, we’ll be using a database called streamdata, with a collection called clickdata where we’re writing data from our web-based e-commerce application.
  2. An AWS account and a Kinesis stream
    In this example, we’ll use a Kinesis stream to send data downstream to additional applications such as Kinesis Analytics. This is the stream we want to feed our updates into.
  3. A Stitch application
    If you don’t already have a Stitch application, log into Atlas, and click Stitch Apps from the navigation on the left, then click Create New Application.

Create a Collection

The first step is to create a database and collection from the Stitch application console. Click Rules from the left navigation menu and click the Add Collection button. Type streamdata for the database and clickdata for the collection name. Select the template labeled Users can only read and write their own data and provide a field name where we’ll specify the user id.

Figure 2. Create a collection

Configuring Stitch to Talk to AWS

Stitch lets you configure Services to interact with external services such as AWS Kinesis. Choose Services from the navigation on the left, and click the Add a Service button, select the AWS service and set AWS Access Key ID, and Secret Access Key.

Figure 3. Service Configuration in Stitch

Services use Rules to specify what aspect of a service Stitch can use, and how. Add a rule which will enable that service to communicate with Kinesis by clicking the button labeled NEW RULE. Name the rule “kinesis” as we’ll be using this specific rule to enable communication with AWS Kinesis. In the section marked Action, select the API labeled Kinesis and select All Actions.

Figure 4. Add a rule to enable integration with Kinesis

Write a Function that Streams Documents into Kinesis

Now that we have a working AWS service, we can use it to put records into a Kinesis stream. The way we do that in Stitch is with Functions. Let’s set up a putKinesisRecord function.

Select Functions from the left-hand menu, and click Create New Function. Provide a name for the function and paste the following in the body of the function.

Figure 5. Example Function - putKinesisRecord

exports = function(event){
 const awsService = context.services.get('aws');
try{
   awsService.kinesis().PutRecord({
     Data: JSON.stringify(event.fullDocument),
     StreamName: "stitchStream",
     PartitionKey: "1"
      }).then(function(response) {
        return response;
      });
}
catch(error){
  console.log(JSON.parse(error));
}
};

Test Out the Function

Let’s make sure everything is working by calling that function manually. From the Function Editor, Click Console to view the interactive javascript console for Stitch.

Functions called from Triggers require an event. To test execution of our function, we’ll need to pass a dummy event to the function. Creating variables from the console in Stitch is simple. Simply set the value of the variable to a JSON document. For our simple example, use the following:

event = {
   "operationType": "replace",
   "fullDocument": {
       "color": "black",
       "inventory": {
           "$numberInt": "1"
       },
       "overview": "test document",
       "price": {
           "$numberDecimal": "123"
       },
       "type": "backpack"
   },
   "ns": {
       "db": "streamdata",
       "coll": "clickdata"
   }
}
exports(event);

Paste the above into the console and click the button labeled Run Function As. Select a user and the function will execute.

Ta-da!

Putting It Together with Stitch Triggers

We’ve got our MongoDB collection living in Atlas, receiving events from our web app. We’ve got our Kinesis stream ready for data. We’ve got a Stitch Function that can put data into a Kinesis stream.

Configuring Stitch Triggers is so simple it’s almost anticlimactic. Click Triggers from the left navigation, name your trigger, provide the database and collection context, and select the database events Stitch will react to with execution of a function.

For the database and collection, use the names from step one. Now we’ll set the operations we want to watch with our trigger. (Some triggers might care about all of them – inserts, updates, deletes, and replacements – while others can be more efficient because they logically can only matter for some of those.) In our case, we’re going to watch for insert, update and replace operations.

Now we specify our putKinesisRecord function as the linked function, and we’re done.

Figure 6. Trigger Configuration in Stitch

As part of trigger execution, Stitch will forward details associated with the trigger event, including the full document involved in the event (i.e. the newly inserted, updated, or deleted document from the collection.) This is where we can evaluate some condition or attribute of the incoming document and decide whether or not to put the record onto a stream.

Test the Trigger!

Amazon provides a dashboard which will enable you to view details associated with the data coming into your stream.

Figure 7. Kinesis Stream Monitoring

As you execute the function from within Stitch, you’ll begin to see the data entering the Kinesis stream.

Building More Functionality

So far our trigger is pretty basic – it watches a collection and when any updates or inserts happen, it feeds the entire document to our Kinesis stream. From here we can build out some more intelligent functionality. To wrap up this post, let’s look at what we can do with the data once it’s been durably stored in MongoDB and placed into a stream.

Once the record is in the Kinesis Stream you can configure additional services downstream to act on the data. A common use case incorporates Amazon Kinesis Data Analytics to perform analytics on the streaming data. Amazon Kinesis Data Analytics offers pre-configured templates to accomplish things like anomaly detection, simple alerts, aggregations, and more.

For example, our stream of data will contain orders resulting from purchases. These orders may originate from point-of-sale systems, as well as from our web-based e-commerce application. Kinesis Analytics can be leveraged to create applications that process the incoming stream of data. For our example, we could build a machine learning algorithm to detect anomalies in the data or create a product performance leaderboard from a sliding, or tumbling window of data from our stream.

Figure 8. Amazon Data Analytics - Anomaly Detection Example

Wrapping Up

Now you can connect MongoDB to Kinesis. From here, you’re able to leverage any one of the many services offered from Amazon Web Services to build on your application. In our next article in the series, we’ll focus on getting the data back from Kinesis into MongoDB. In the meantime, let us know what you’re building with Atlas, Stitch, and Kinesis!

Resources

MongoDB Atlas

MongoDB Stitch

Amazon Kinesis