Movatterモバイル変換


[0]ホーム

URL:


Skip to content
DEV Community
Log in Create account

DEV Community

Cover image for Building MongoDB-based event-driven applications with DocumentDB
AWS Community Builders  profile imageArpad Toth
Arpad Toth forAWS Community Builders

Posted on • Originally published atarpadt.com

Building MongoDB-based event-driven applications with DocumentDB

TL;DR: We can create event-driven architectures that react to changes in MongoDB. Change streams are supported in the latest DocumentDB engine, and we can trigger a Lambda function when a change occurs in the database. Although this article discusses how to send new MongoDB documents to OpenSearch, this is not the only use case. We can publish a message to SNS or EventBridge and inform other services of the database changes.

Table of contents

1. MongoDB vs DynamoDB

I likeDynamoDB and use it where it makes sense. I like the fast speed, the elasticity and the fact that it often has a negligible cost compared to other database options.

But sometimes we have to consider other NoSQL options, likeMongoDB. Why do we want to choose MongoDB over DynamoDB? Here are some reasons:

  • The application needs to store items as JSON documents.
  • Documents are deeply nested, and it would be a lot of effort to transform them into simple attributes that DynamoDB expects.
  • MongoDB has a simple JavaScript syntax. DynamoDB has a steeper learning curve.
  • Your team lead or line manager is afraid of anything new, and they want you to use the good old MongoDB.

There might be other reasons (feel free to write them in a comment), but these ones come to mind now.

2. The scenario

So, MongoDB is the chosen database. Do we have to abandon the event-driven patterns we can implement with DynamoDB Streams?

The good news is no, we don't.

MongoDB supportschange streams, which applications can use to react to real-time data changes.

Luckily, AWS has a MongoDB-compatible NoSQL database calledDocumentDB, and the latest engine version (5.0.0) now supports change streams.

3. MongoDB change streams

Change streams store event objects for3 hours by default, but we can extend the duration up to 7 days. We mustexplicitlyenable change streams on adatabase or specificcollections in the database.

A good spot for the change stream code is where we write (and probably cache) the MongoDB client code:

// Create the connection and get the clientconstclient=awaitconnectToDatabase();// Enable change streams on the collectionconstadminDb=client.db('admin');awaitadminDb.command({modifyChangeStreams:1,database:DB_NAME,collection:COLLECTION_NAME,enable:true});
Enter fullscreen modeExit fullscreen mode

Part of theconnectToDatabase() function may be similar to this:

constsecretsManager=newSecretsManagerClient();exportasyncfunctionconnectToDatabase():Promise<MongoClient>{// Get credentials from Secrets ManagerconstsecretCommandInput:GetSecretValueCommandInput={SecretId:process.env.DOCDB_SECRET_ARN,};constsecretData=awaitsecretsManager.send(newGetSecretValueCommand(secretCommandInput));if(!secretData.SecretString){thrownewError('Missing database credentials in config');}constsecret=JSON.parse(secretData.SecretString);constencodedUsername=encodeURIComponent(secret.username);constencodedPassword=encodeURIComponent(secret.password);constport=DOCDB_PORT||'27017';constdatabase=DOCDB_DATABASE||'products';consturi=`mongodb://${encodedUsername}:${encodedPassword}  @${DOCDB_ENDPOINT}:${port}/${database}?tls=true&replicaSet=rs0  &readPreference=secondaryPreferred&retryWrites=false`;constclient=newMongoClient(uri,{tlsCAFile:<PATH_TO_ROOT_CERTIFICATE_PEM_FILE>,tls:true,});awaitclient.connect();cachedClient=client;returnclient;}
Enter fullscreen modeExit fullscreen mode

As you can see, it's a standard MongoDB connection function.

I want to highlight one thing in the code. DocumentDB integrates withSecrets Manager, where database secrets (username and password) are stored. The function must fetch these credentials before it can create the connection URI.

Now that we have enabled the change streams, let's look at the architecture. How can we use the change stream feature in our applications?

4. Architecture

This article will present the simple design of an application where admin users can addproducts, and users cansearch for them.

4.1. Overview

The connection point to the application is anAPI Gateway, which has two endpoints:POST /product andGET /search.

From DocumentDB to OpenSearch

Administrators upload the new product info by sending theproduct object in the request body to the/product endpoint. In this sample project, part of theproduct schema looks like this:

{"productId":1001,"productName":"Wireless Headphones","description":"Noise-cancelling wireless headphones","brand":"SoundPro","category":"Electronics","price":199.99,"currency":"USD","ratings":{"averageRating":4.7},// other product-related properties, many of which are nested}
Enter fullscreen modeExit fullscreen mode

ALambda function will act as the endpoint integration and save new products to the database.

As discussed above, I choseDocumentDB for the project because of the many nested properties in the document, and I want to store the document as is. I didn't bother making the database and collection names very complex, and named bothproducts.

A second Lambda function calledchangeStreamHandler watches the MongoDB change stream andindexes the new product documents inOpenSearch Serverless service. The function iterates over theeventsrecords and adds them to OpenSearch.

Finally, users can invoke the/search endpoint with a search expression to fetch matching product details. Thesearch function in my little POC performs amulti_match query:

// ...other function code// Search in OpenSearchconstresponse=awaitopensearchClient.search({index:'products',body:{query:{multi_match:{query,// User's search expressionfields:['productName^3','description^2','brand','category','ratings.averageRating',],},},sort:[{_score:{order:'desc'}}],},});
Enter fullscreen modeExit fullscreen mode

As the code above shows, the search expression looks for matching data in fivefields.

Requests to OpenSearch must be signed withAWS Signature V4. TheOpenSearch projectnpm package contains theAwsSigv4Signer method, which makes the signature simple:

import{Client}from'@opensearch-project/opensearch';import{AwsSigv4Signer}from'@opensearch-project/opensearch/aws';// ...constclient=newClient({node:endpoint,...AwsSigv4Signer({region:'eu-central-1',service:'aoss',// Amazon OpenSearch Serverless}),});
Enter fullscreen modeExit fullscreen mode

Let's take a closer look at some key elements.

4.2. Detailed view

Functions in the VPC. It's a shame, but as of today, DocumentDB is only available in acluster format that runs managed instances. The cluster must be provisioned in at least two - preferably private -subnets in aVPC. There's currently no serverless version available!

Theproduct andchangeStreamHandler Lambda functions interact with the database. As such, we must provision them, possibly in the same VPC private subnets. (Let's not go into other VPC design options here.)

Security groups. The functions'security groups must allowoutbound rules to the security group assigned to the DocumentDB cluster. The DocumentDB security group must allowinbound traffic on port27017 (the MongoDB default port) from the Lambda functions' security group.

Event source mapping. Since thechangeStreamHandler reads events from the change stream, Lambda will use thepolling invocation model. We must configure an event source mapping, where, besides the usual stream settings, likebatchSize orstartingPosition, we must set the authentication option and specify thedatabase andcollection names the function will read events from. A sample CDK code with a low-level construct may look like this:

consteventSourceMapping=newcdk.CfnResource(this,'ChangeStreamEventSourceMapping',{type:'AWS::Lambda::EventSourceMapping',properties:{FunctionName:changeStreamLambda.functionArn,EventSourceArn:docdbClusterArn,StartingPosition:'LATEST',BatchSize:10,Enabled:true,SourceAccessConfigurations:[{Type:'BASIC_AUTH',// The ARN of the Secrets Manager secret// that holds the connection infoURI:docdbSecret.secretArn,},],DocumentDBEventSourceConfig:{DatabaseName:'products',CollectionName:'products',},},});
Enter fullscreen modeExit fullscreen mode

We must also specify the Secret Manager secret'sARN containing the DocumentDB cluster connection information (username and password).

NAT Gateway. For the functions to interact with DocumentDB via the event source mapping, we need to provision either a NAT Gateway or someVPC interface endpoints.The documentation will describe what endpoints are required in more detail. For the sake of simplicity, I created a NAT Gateway, but it might not be the best option for some industries where companies need to comply with strict regulations.

OpenSearch Serverless. Great option for proofs-of-concept (like this one) or teams that don't want to manage OpenSearch clusters. It's a fully managed service where we don't have to worry about VPCs, security groups and NAT Gateways. The search function connects to OpenSearch Serverless through anhttps endpoint.

Permissions. Lambda functions provisioned in a VPC must have specific permissions other than those they use to connect to DocumentDB. The Lambda service needs to createENIs - Elastic Network Interfaces, virtual network cards in the VPC -, so we must add therequired permissions to the function's execution role.

Theproduct function also needsSecrets Manager (secretsmanager:GetSecretValue andsecretsmanager:DescribeSecret) andKMS (kms:Decrypt) permissions since it needs to authenticate to DocumentDB.

5. Considerations

The result is a simple event-driven application, where we use streams to react to MongoDB data changes.

I want to highlight a couple of things, though.

5.1. Not production-ready

As it might be clear from the discussion above, this article doesnot cover the entire application.

The application is only a POC and isnot production-ready. It only contains some core infrastructure elements.

Error handling, retries and monitoring should also be implemented in production workloads.

5.2. Testing the app

Besides the basiclet's call the endpoint and see what happens method, I tested the application with a script, which made a batch of 5 parallel requests per second to thePOST /product endpoint until 500 documents got uploaded to the database.

The architecture elements scaled well when needed, and I didn't experience any bottlenecks.

5.3. Connecting to the database cluster

If you are like me and prefer connecting to databases from the terminal vs using a GUI, you are probably familiar with theMongoDB Shell.

But how can we connect to the DocumentDB cluster, which is provisioned in a private network, from outside?

We have a couple ofoptions. One of them is to launch an EC2 instance in the same VPC, installmongosh on the instance, and copy the cluster connection command from the DocumentDB page in the console.

For example, we can then check which databases and collections have the change streams setting enabled:

db.runCommand({aggregate:1,pipeline:[{$listChangeStreams:1}],cursor:{}})
Enter fullscreen modeExit fullscreen mode

The response will look like this:

{waitedMS:Long('0'),cursor:{firstBatch:[{database:'DB_NAME',collection:'COLLECTION_NAME'}],id:Long('0'),ns:'test.$cmd'},operationTime:Timestamp({t:1747661570,i:1}),ok:1}
Enter fullscreen modeExit fullscreen mode

Alternatively, you can use the usual GUI solutions likeStudio 3T. Instructions on connecting to DocumentDB can be found in the official documentation.

5.4. Direct integration

I discussedone possible solution to connect OpenSearch and DocumentDB.

OpenSearch Ingestion pipelines support DocumentDB as source and keep the database in sync with OpenSearch without using an intermediary compute resource, like thechangeStreamHandler function in this example. Ingestion pipelines support filtering, data transformation and enrichment, replacing the Lambda function with a managed feature.

The need to control the business logic and cost considerations (ingestion pipelines vs running a Lambda function) can help make the right architecture decision.

5.5. Other event-driven options

Indexing documents in OpenSearch is not the only action we can perform by building on change streams.

We can createfanout patterns by having the Lambda function publish a message to anSNS topic or anEventBridge event bus, allowing other (micro)services to react to the MongoDB state changes.

6. Summary

MongoDB change streams offer the option to implement event-driven patterns that react to document changes in the database.

DocumentDB is a NoSQL database with MongoDB compatibility, where version 5.0.0 supports change streams.

One implementation pattern is to index the newly created document in OpenSearch and provide users with a search box to perform advanced searches.

7. Further reading

Amazon OpenSearch Service Pricing - Ingestion pipelines pricing included

Create your first Lambda function - If you need help in creating Lambda functions

Creating an Amazon DocumentDB cluster - I can't add more to the title

Top comments(0)

Subscribe
pic
Create template

Templates let you quickly answer FAQs or store snippets for re-use.

Dismiss

Are you sure you want to hide this comment? It will become hidden in your post, but will still be visible via the comment'spermalink.

For further actions, you may consider blocking this person and/orreporting abuse

Build On!

Would you like to become an AWS Community Builder? Learn more about the program and apply to join when applications are open next.

More fromAWS Community Builders

DEV Community

We're a place where coders share, stay up-to-date and grow their careers.

Log in Create account

[8]ページ先頭

©2009-2025 Movatter.jp