MongoDB CDC - Load Data in Real time

Luke Smith
Enterprise Solutions Architect
December 2, 2022
Matt Tanner
Developer Relations Lead
December 2, 2022
Matt Tanner
Matt Tanner
Developer Relations Lead
Developer Relations Lead
January 25, 2023
20
 min read
Join our newsletter

In the past, traditional offline data warehouse architecture was the norm. The preferred approach for moving data between a source and destination data store consisted of collection, batch processing, and overwriting. Using this approach led to poor data availability when it mattered most. Using data as it was created, or even within the same day, was nearly impossible with a batch-style approach to moving data. To resolve this, real-time data pipelines and warehouses were deployed and changed the landscape. By using data collection tools and real-time computing engines, faster and more efficient ways to collect and analyze data were possible.

Over time, Change Data Capture, abbreviated to CDC, became a key tool in building real-time data pipelines that fed data to data warehouses and other platforms. CDC is the process of identifying data changes in a source and applying the changes to the data in a destination system, like a big data or analytics platform. It enables real-time synchronized data across various applications, and systems like data lakes, data pipelines, and data warehouses. Using CDC also helps ensure data integrity and data quality while data is synchronized between source and target.

There are many ways to implement CDC depending on the technologies available at your disposal. This write-up will explain how CDC can be used within MongoDB. MongoDB, a cross-platform document-oriented database, can use CDC to detect and apply changes made via transaction log files. The detected changes can then be applied to other downstream applications that require the most up-to-date data. Instead of querying for changes in tables/collections and causing additional burdens on the database, log-based CDC can be implemented. Using log-based CDC allows the database system to remain performant while still enabling real-time data synchronization between source and target.

The significance of MongoDB CDC should not be underestimated. CDC has wide applicability including being used for application integration, zero-downtime data migration, data analytics, auditing, and much more.

Table of Contents

What is MongoDB?

MongoDB is an open-source cross-platform document-based database solution. What that means is that, unlike relational databases, it is based on the NoSQL paradigm. MongoDB does not store data in tabular format or in relational tables, instead, data is stored as documents in collections. Essentially these are large JSON-like objects where the data is stored. Because of this, it is possible to build scalable Internet applications using MongoDB. The flexible schema approach of NoSQL and MongoDB means that data is not modeled with strict relationships. Many developers prefer to use NoSQL databases when building and deploying large scalable applications. 

MongoDB is particularly suited for working with large sets of distributed data. As mentioned previously, MongoDB manages document-oriented information by using a JSON-like format for the storage and retrieval of information. MongoDB makes it easy to store structured and unstructured data, handle a high volume of data, and scale the data both vertically and horizontally.

MongoDB supports easy-to-use and configure drivers for all major programming languages so connectivity is simple. Using MongoDB allows developers to start building applications immediately without having to configure a database first since no schema is required to store data. 

What is MongoDB CDC?

Change Data Capture, abbreviated to CDC, is a concept or mechanism that is used to replicate data. CDC identifies and records data changes that occur in a datastore and replicates these changes in other data stores in real-time. It is used to synchronize data across databases and other data stores. Keeping data in sync between stores ensures data consistency for applications and other systems that are using the data.

CDC within MongoDB functions by relying on the Write-Ahead Log, or WAL. The WAL is an internal database log that contains all changes that are to be made to the database before they are actually applied to objects and indexes in the database. The CDC process leverages these logs instead of having to query the database objects to scan for changes. Any changes are easily detected and applied from the data stored within the Write-Ahead Log.

MongoDB, as has been explained in the previous section, is a NoSQL database management solution providing flexible JSON-like document storage. As capable as it is for most use cases, there may arise situations where you might need to get a copy of data into another platform. Typical platforms that require the application data stored in MongoDB include data lakes, data warehouses, and other big data platforms. Replicating this data into the platforms where it is required is where CDC comes in.

A Change Data Capture process running on MongoDB converts changes in the datastore into a stream of CDC events. CDC events can be described as a message containing a representation of the changes performed on a data store. When a CDC event is created, the changes will then be copied from the datastore to the other datastore(s) through a process known as data replication. This will apply the changes made in the source datastore to the target datastore so the two are in sync. The options for implementing CDC on MongoDB include tools like Apache Kafka, Debezium, Qlik Replicate, Arcion, and few others.  Some companies may also look to create their own CDC handler, although there is much more work and support involved with this approach.

The next section of this post will delve into how CDC works on MongoDB. We will also cover how it can be implemented so you can maintain fresh versions of your data for real-time analysis and improve your company’s decision-making processes.

Methods To Set Up Change Data Capture (CDC) in MongoDB

As the need to report and analyze data increases in the data world today, it has become really important that the data being produced is synced in real time across various platforms. MongoDB can handle this using CDC to copy data and changes made to it in a source database to other target databases. The methods listed show various options for implementing CDC with MongoDB, as well as some other considerations.

Using A Timestamp Field

Timestamp involves the updating of a field whenever a document in the database collection changes. The field would be updated whenever a change is made within the affected document, including when insert and update operations occur. The timestamp field does not use a version number but simply just uses a timestamp from the exact time the record was last changed. Using a timestamp field or column is one of the most common and simplest CDC mechanisms to implement. 

When a document is created initially, the timestamp field is set to the date and time the record was inserted. Subsequent updates to the fields in the document will overwrite that timestamp with the current one to show the date and time the change occurred. 

Some database technologies take care of these updates automatically, database triggers are used in some platforms to support the use of a timestamp field or column. In MongoDB, the creation and update logic for the timestamp field needs to be built into the application logic directly. A specific timestamp field needs to be created when the document is so it can be updated through the application logic every time there is a change in a record.

The benefit of using a timestamp field in MongoDB is that the target system does not have to keep a reference table. The target system will send a request for an update of the records with a timestamp greater than the last time the data was updated on the target system. Since only updated records are returned in the query, the overhead on the target system is minimal. Conversely, significant overhead is created on the MongoDB source database because of the application logic needed to update the timestamp column whenever there is a change in a row. 

Using Change Streams

Another approach to implementing Change Data Capture in MongoDB is through the use of Change Streams. MongoDB Change Streams employs the publisher-subscriber pattern. With Change Streams, applications subscribe to any data changes that occur on a single collection, database, or an entire deployment. Once a change is detected, subscribing applications are notified and can react immediately. This feature of MongoDB can be used to receive updates to data in real time. Events can be triggered as each change is accompanied by an event document that details the instructions describing the change that occurred in the MongoDB instance.

Change streams use MongoDB’s Operational Log, or oplog, as their data source. This is the MongoDB equivalent of the Write-Ahead Log mentioned earlier. The oplog is a capped collection that records all the most recent writes and updates to the data. Change streams allow applications to access real-time data changes without the complexity and risk of tailing the oplog. Because change streams use the aggregation framework, applications can also filter for specific changes or transform the notifications at will. Change streams use the aggregation framework, therefore, change streams are optimized, providing efficient resource utilization, and ultimately, faster execution of aggregate pipeline stages. 

Change streams are only available for MongoDB replica sets and shared clusters. A standalone MongoDB instance cannot produce a change stream. To open a change stream for a replica set, you can issue the open change stream operation from any of the data-bearing members. For a shared cluster, you must issue the open change stream operation from the mongos. Languages such as C, C#, Go, Java, Motor, Node.js, PHP, and Python can be used to do so. 

Here is an example of how to implement a change stream in MongoDB:

  1. Shut down your current standalone MongoDB instance, if it’s currently running.
  2. Restart the instance to create a replica set. For this, use the --replset option to specify the name of the new replica set. Here is an example of what this command would look like:
mongod --port 27017 --dbpath /srv/mongodb/db0 --replSet rs0 --bind_ip localhost,
  1. Connect mongosh to the mongod instance.
  2. Use rs.initiate() to initiate the new replica set as shown below.
rs.initiate()

The replica set is now operational and each writing operation in MongoDB will produce a change stream. The change stream will keep both replicas in sync. 

With the replica set instance running, rs.conf() is used to view the replica set configuration, rs.status() is used to check the status of the replica set, and rs.add() is used to add members.

Below are a few examples of how to use change streams in MongoDB. The first is an example of a change stream application that monitors and handles changes occurring in a stock collection.

conn = new Mongo("mongodb://localhost:27017/demo?replicaSet=rs");
db = conn.getDB("demo");
collection = db.stock; 

const changeStreamCursor = collection.watch(); 

pollStream(changeStreamCursor); 

//this function polls a change stream and prints out each change as it comes in
function pollStream(cursor) {  
 while (!cursor.isExhausted()) {    
  if (cursor.hasNext()) {      
   change = cursor.next();      
   print(JSON.stringify(change));    
  }  
 }  
 pollStream(cursor);
}

By using the parameterless watch() method, the change stream signals every write to the stock collection. This is monitored through the changeStreamCursor which is fed into the pollStream function. The pollStream function polls a change stream and prints out each change as it comes in.

The next example shows how to create a targeted change stream. This change stream will show when the inventory of each item in the stock collection, same as the example above, falls below a certain threshold. In this specific example, we only want to be notified when the available quantity is less than or equal to 10.

const changeStream = collection.watch(  
 [{    
  $match: {      
  $and: [        
   { "updateDescription.updatedFields.quantity": { $lte: 10 } },        
   { operationType: "update" }      
  ]    
 }  
}],  
 {    
  fullDocument: "updateLookup"  
 }
);

The completed MongoDB shell script of both components will look like this:

conn = new Mongo("mongodb://localhost:27017/demo?replicaSet=rs");
db = conn.getDB("demo");
collection = db.stock; 

let updateOps = {  
 $match: {    
  $and: [      
   { "updateDescription.updatedFields.quantity": { $lte: 10 } },      
   { operationType: "update" }    
  ]  
 }
};

const changeStreamCursor = collection.watch([updateOps]); 

pollStream(changeStreamCursor); 

//this function polls a change stream and prints out each change as it comes in
function pollStream(cursor) {  
 while (!cursor.isExhausted()) {    
  if (cursor.hasNext()) {      
   change = cursor.next();      
   print(JSON.stringify(change));    
  }  
 }  
 pollStream(cursor);
}

The next example is used to print all changes that are happening in a database called organization_db. Using a change stream cursor to help listen to changes, the getStream() function will print out the change stream data in a JSON format using JSON.stringify().

conn = new Mongo("mongodb://localhost:27017/demo?replicaSet=rs"); db = conn.getDB("organization_db"); collection = db.employee; 

const changeCursor = collection.watch(); 

function getStream() {   
while (!changeCursor.isExhausted()) {     
if (changeCursor.hasNext()) {       
changed_value = changeCursor.next();       print(JSON.stringify(changeCursor));     
}   
}   
getStream(changeCursor); 
}

This last example shows how you can resume a change stream when an application fails and wants to resume. This is done by specifying a resume token to either resumeAfter or startAfter when opening the cursor. The example below uses a resumeAfter<resumeToken> as shown.

conn = new Mongo("mongodb://localhost:27017/demo?replicaSet=rs");
db = conn.getDB("demo");
collection = db.stock; 

const changeStreamCursor = collection.watch();
resumeStream(changeStreamCursor, true); 

function resumeStream(changeStreamCursor, forceResume = false) {  
 let resumeToken;  
 while (!changeStreamCursor.isExhausted()) {    
  if (changeStreamCursor.hasNext()) {      
  change = changeStreamCursor.next();      
  print(JSON.stringify(change));      
  resumeToken = change._id;      
  if (forceResume === true) {        
   print("\r\nSimulating app failure for 10 seconds...");        
   sleepFor(10000);        
   changeStreamCursor.close();        
   const newChangeStreamCursor = collection.watch([], {          
    resumeAfter: resumeToken        
   });        
   print("\r\nResuming change stream with token " + JSON.stringify(resumeToken) + "\r\n");        
   resumeStream(newChangeStreamCursor);      
   }    
  }  
 }  
 resumeStream(changeStreamCursor, forceResume);
 }

Using resume tokens allows applications that are using the change stream to ensure they resume from the correct place in the change stream. It is up to the application that is listening to the change stream to ensure that they have not already processed the change.

For more information about MongoDB change streams, visit the official documentation here.

Using Custom Code

If none of the prebuilt CDC handlers is suited for the kind of operations you want to execute, you can create your custom CDC handler, generally using a MongoDB Kafka Connector sink connector. More details on the specific steps required for such an approach can be found here in the MongoDB docs.

Using custom code can be difficult as you need to always be on the lookout for bugs and logic errors. This type of support comes with its own challenges. When there is a breakage in the network due to an error or timeout, the connection to the MongoDB cluster will be lost. In most cases, your script will then need to have logic built into it to resume the listening for change events. If the script does not contain this logic, this would become a major limitation and lead to inconsistencies in the CDC process and the data that flows through it. 

Another disadvantage to the custom route is that developers creating the custom CDC solution require knowledge of both the MongoDB source and the destination sink. Custom code will be needed to actually facilitate the copying of the data. This means that if a CDC solution targets many different targets, time and resources needs may increase. Plus, developers will need specific skills and knowledge to surmount any obstacles that arise during implementation and after.

Using Arcion For MongoDB CDC

Of course, sometimes it’s easier to use a CDC tool which allows for deep customization and high performance without the effort of a custom solution. When it comes to using CDC with MongoDB, it’s obvious from the above discussions that it is much simpler to use Arcion to implement and maintain CDC functionality within MongoDB. 

Arcion is a go-to solution for many enterprises who are looking to select a data pipeline tool that is scalable, reliable, and extremely easy to configure and use. It provides robust data pipelines that offer high availability, leverage log-based CDC, and auto-scalable features. Available with multiple deployment options, Arcion can migrate data to and from on-prem data sources, cloud-based data sources or a mix of both. If you’re interested in learning more about how Arcion can help your MongoDB real-time replication, visit our MongoDB page.

 

The zero-code approach allows users to easily configure Arcion and build their data pipelines without having to write any code. Arcion can be set up and configured strictly through configuration files or by using Arcion’s intuitive and easy-to-use UI to set up pipelines in a matter of minutes. Compared to homegrown solutions or ones that mismatch a bunch of different technologies, Arcion makes implementation smooth by providing 24/7 support through extensive documentation, tutorials, blogs, and customer support.

Let’s take a look at some specific features that are relevant for MongoDB CDC.

Sub-Second Latency 

Many other existing CDC solutions don’t scale for high-volume, high-velocity data, resulting in slow pipelines, and slow delivery to the target systems. Arcion is the only distributed, end-to-end multi-threaded CDC solution that auto-scales vertically & horizontally. Any process that runs on Source & Target is parallelized using patent-pending techniques to achieve maximum throughput. There isn’t a single step within the pipeline that is single-threaded. It means Arcion users get ultra-low latency CDC replication and can always keep up with the forever increasing data volume on the source system.

100% Agentless Change Data Capture

Arcion is the only CDC vendor in the market that offers 100% agentless CDC to all its supported 20+ connectors. The agentless CDC applies to all complex enterprise databases including MongoDB. Arcion reads directly from database logs, never reading from the database itself. Previously, data teams faced administrative nightmares and security risks associated with running agent-based software in production environments. You can now replicate data in real-time, at scale, with guaranteed delivery — but without the inherent performance issues or the security concerns.

Data Consistency Guaranteed

Arcion provides transactional integrity and data consistency through its CDC technology. To further this effort, Arcion also has built-in data validation support that works automatically and efficiently to ensure data integrity is always maintained. It offers a solution for both scalable data migration and replication while making sure that zero data loss has occurred.

Automatic schema conversion & schema evolution support

Arcion handles schema changes out of the box requiring no user intervention. This helps mitigate data loss and eliminate downtime caused by pipeline-breaking schema changes by intercepting changes in the source database and propagating them while ensuring compatibility with the target's schema evolution. Other solutions would reload (re-do the snapshot) the data when there is a schema change in the source databases, which causes pipeline downtime and requires a lot of compute resources, which can be expensive!   

Pre-Built 20+ Enterprise Data Connectors

Arcion has a library of pre-built data connectors. These connectors can provide support for almost 20 enterprise databases, data warehouses, and cloud-native analytics platforms (see full list). Unlike other ETL tools, Arcion provides full control over data while still maintaining a high degree of automation. Data can be moved from one source to multiple targets or multiple sources to a single target depending on your use case. This means that if you branch out into other technologies outside of MongoDB, you’ll already have the capability within Arcion to handle your new sources and targets without the need for another pipeline technology.

Before you leave, we also have three videos to show you how Arcion works with MongoDB in action:

  1. Arcion Self-hosted Demo: Oracle To MongoDB [Insert video: https://youtu.be/dTChAc9GpSc]
  2. Oracle to MongoDB with Denormalization with Arcion Self-hosted [Insert video: https://youtu.be/sK3tZmpb1YI]
  3. Arcion Cloud Demo: Oracle To MongoDB [Insert video: https://youtu.be/8IgKtaeq5F0]

Wrapping up

The article has covered MongoDB CDC and given you methods through which you can capture changes in your data using MongoDB effectively. MongoDB is a scalable and powerful NoSQL database with lots of functionalities but with a CDC solution, it enables a greater possibility and reduces the burden on production instances by doing away with load-intensive tasks.

You were also shown how to use timestamp columns, Change streams, and custom code as methods for integration. It was established that using the timestamp column will require changes being made with every write operation leading to a complex situation on the system whereas, change streams listen to the operation logs (oplog) allowing applications to subscribe to all data changes in real-time without risking the oplog to increase complexity and using custom code can be limiting as it would require special skills to be able to pull it off. Lastly, we looked at Arcion and the benefits it brings to implementing and maintaining a CDC pipeline for MongoDB instances. To try out Arcion for yourself, download Arcion Self-hosted for free today and build CDC pipelines for your MongoDB instances in a matter of minutes.

Matt is a developer at heart with a passion for data, software architecture, and writing technical content. In the past, Matt worked at some of the largest finance and insurance companies in Canada before pivoting to working for fast-growing startups.
Luke has two decades of experience working with database technologies and has worked for companies like Oracle, AWS, and MariaDB. He is experienced in C++, Python, and JavaScript. He now works at Arcion as an Enterprise Solutions Architect to help companies simplify their data replication process.

Take Arcion for a Spin

Deploy the only cloud-native data replication platform you’ll ever need. Get real-time, high-performance data pipelines today.

8 sources & 6 targets

Pre-configured enterprise instance

Available in four US AWS regions

Free download

20+ enterprise source and target connectors

Deploy on-prem or VPC

Satisfy security requirements

Join the waitlist for Arcion Cloud (beta)

Fully managed, in the cloud.

Start your 30-day free trial with Arcion self-hosted edition

Self managed, wherever you want it.

Please use a valid email so we can send you the trial license.