- Notifications
You must be signed in to change notification settings - Fork1
Tails MongoDB oplog - provides interfaces for Observables, Node.js streams, and more.
License
ORESoftware/oplog.rx
Folders and files
Name | Name | Last commit message | Last commit date | |
---|---|---|---|---|
Repository files navigation
Tails the MongoDB Oplog for you. An improvement over existing libraries.
Has interfaces for both Node.js streams and RxJS Observables + better TypeScript typingsThis library will switch to native Observables once they become available.Until then, simply using the latest version of RxJS5.
The MongoDB oplog is simply a capped collection that is tailable, using Cursor.stream();The structure of an Oplog document is like so:
{"ts":"6533791416483577857","t":4,"h":"8859258976700926266","v":2,"op":"i","ns":"test.foo","o":{"_id":"5ab94bb","username":"fox"}}
field | description |
---|---|
ts | 64bit timestamp |
op | the type of operation (i is insert, u is update, d is delete, etc.) |
ns | <db>.<collection> |
o | the document that changed (it should always be the complete document, not just the changed part). |
t | the election "term" of the replicaset (not really important) |
v | Version of the oplog format (unfortunately not the version of the document object) |
h | The hash field gives each oplog entry a unique id |
This article is pretty good on the subject:
https://engineering.tes.com/post/mongodb-oplog/
import{ObservableOplog}from'oplog.rx';constoplog=newObservableOplog();oplog.tail().then(function(){console.log('successfully started tailing the oplog.');});oplog.getEmitter().on('update',function(){}).on('insert',function(){}).on('delete',function(){});
import{ObservableOplog}from'oplog.rx';import{Timestamp}from'bson';constoplog=newObservableOplog({ts:Timestamp.fromInt(Date.now()-45000),// search for documents that are younger than 45 seconds agons:{$in:['mydb.coll1','mydb.coll2',/mydb2\.*/],}});// or if you need something very custom, use query:constoplog=newObservableOplog({query:{ts:{$gt:Timestamp.fromInt(Date.now()-45000)},$and:[{ns:{$nin:[/foo/,/rolo/]}},{ns:{$in:[/bar/]}},]}});// if the query parameter is provided, it will be used directly to search the oplog.rs collection:// like so:constcoll=db.collection('oplog.rs');constcursor=coll.find(query);
constoplog=newObservableOplog();oplog.tail().then(function(){console.log('successfully started tailing the oplog.');});constops=oplog.getOps();ops.insert.subscribe(v=>{});ops.delete.subscribe(v=>{});ops.update.subscribe(v=>{});// or use:const{insert, update, del}=oplog.getOps();
constoplog=newObservableOplog();oplog.tail().then(function(){console.log('successfully started tailing the oplog.');});// create a transform stream which only forwards the desired dataconstt=oplog.getFilteredStream({namespace:'foobar'});// the above stream is a transform stream which you can pipe elsewhere// to send the data to another process, convert it to JSON firstconstJSONStdio=require('json-stdio');consttransform=JSONStdio.transformObject2JSON();constsocket=getClientConnection();// get a tcp connection from wherevert.pipe(transform).pipe(socket);// with the above code, you can listen for certain events// and pipe the data to wherever it needs to go// streams are especially useful for performant networking between processes.
In the above section, we piped JSON into a socket connection.The above might have been a TCP server that's tailing the oplog.Below we have code that might reside on a client process that's connected to the TCP server.The client receives JSON (representing oplog events) through a socket stream.We use a helper function from the 'oplog.rx' library to parse oplog events from the stream.
importnet= require('net');importJSONStdio= require('json-stdio');import{getOplogStreamInterpreter}from'oplog.rx';constc=net.createConnection(6969,'localhost');constjsonParser=JSONStdio.createParser();conststrm=c.pipe(jsonParser);// parse the JSON stream into JS objectsconst{ops, emitter}=getOplogStreamInterpreter(strm);// listen for data events// we can use observablesops.delete.subscribe(v=>{console.log('delete happened.');});ops.insert.subscribe(v=>{console.log('insert happened.');});ops.update.subscribe(v=>{console.log('update happened.');});// or just use an event emitteremitter.on('update',function(){console.log('update happened.');});emitter.on('delete',function(){console.log('delete happened.');});emitter.on('insert',function(){console.log('insert happened.');});
About
Tails MongoDB oplog - provides interfaces for Observables, Node.js streams, and more.
Topics
Resources
License
Uh oh!
There was an error while loading.Please reload this page.
Stars
Watchers
Forks
Releases
Packages0
Uh oh!
There was an error while loading.Please reload this page.
Contributors2
Uh oh!
There was an error while loading.Please reload this page.