Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Create and run commands over a RPC protocol stream

License

NotificationsYou must be signed in to change notification settings

little-core-labs/rpc-protocol

Repository files navigation

Create and run commands over a RPC protocol stream

Installation

$ npm install rpc-protocol

Usage

Echo server over web sockets:

constWebSocketServer=require('simple-websocket/server')constprotocol=require('rpc-protocol')constserver=newWebSocketServer({port:3000})server.on('connection',onconnection)functiononconnection(socket){constrpc=protocol({stream:socket})rpc.command('echo',(req)=>{returnreq.arguments})}

Echo client over web sockets

constWebSocket=require('simple-websocket')constprotocol=require('rpc-protocol')constsocket=newWebSocket('ws://localhost:3000')socket.on('connect',onconnect)functiononconnect(){constrpc=protocol({stream:socket})rpc.call('echo','hello world',(err,res)=>{console.log(res)// [ 'hello world' ]})}

API

rpc = require('rpc-protocol')(opts)

Creates a new RPC protocol Duplex stream whereopts can be:

  • opts.encoding is an optionalencoding object thatcontains anencode(value) anddecode(buffer) functions forconverting values to and from buffers that can be sent over a binarystream
  • opts.stream is an optional stream to pump data in and out of intothe protocol stream

rpc.command(commandName, callback)

Create a callback for a command given by the stringcommandName thatis called whenrpc.call(commandName, ...) is called from the clientand wherecallback contains a command request object. Results can bereturned to the caller by returning a value, which can be aPromise.

Example

Below is a simple example of a command that proxies afetch() call overthe stream. The caller would simply need to callrpc.call('fetch', ['https://github.com', { mode: 'cors' }, ], callback).

rpc.command('fetch',async(req)=>{const[resource,init]=req.argumentsreturnfetch(resource,init).then(async(res)=>{return{arrayBuffer:Buffer.from(awaitres.clone().arrayBuffer),text:Buffer.from(awaitres.clone().text),json:Buffer.from(awaitres.clone().json),statusText:res.statusText,redirected:res.redirected,bodyUsed:res.bodyUsed,headers:res.headers,status:res.status,type:res.type,url:res.url,ok:res.ok,}})})

Command Streams

Commands can stream responses by to the client by making use of thesecond argument given to the command callback calledreply(err, results).It is a function that accepts an optional error and an array of results tosend back to the client.

Below is an example of a simple counter stream. Given astart andend range with aninterval the command will reply with anincrementedi value at someinterval.

rpc.command('counter',(req,reply)=>{const[start,end,interval=100]=req.argumentsleti=startontick()functionontick(){if(i<=end){// reply returns true that means we can write againif(reply(null,i++)){setTimeout(ontick,interval)}}else{// signal end of streamreply(null,null)}}})
didWrite = reply(err, response)

Thereply() function replies to the caller with an errorerr and anarray of results as aresponse. The command can continue to call thisfunction with more results or errors. The return value of thereplyfunction will indicate if the stream is still open to write to.

command = rpc.call(commandName, arguments, callback)

A command can be invoked by callingrpc.call() with a command nameand an optional array of arguments along with acallback(err, res)function that will be called when the command responds with a reply.Response results are always given as an array as the command mayreturn more than value. A command can only throw one error at a time.

rpc.call('echo',['hello','world'],(err,res)=>{if(err){// handle Error}else{console.log(res)// ['hello', 'world']}})

stream = rpc.createReadStream(command)

Read streams can be created from an existing command. They are useful ifthe command can reply with multiple values over a period of time.

rpc.command('fs.createReadStream',(req,reply)=>{const[path]=req.argumentsconststream=fs.createReadStream(path)stream.on('data',(data)=>reply(null,data))stream.on('end',()=>reply(null,null))// 'null' signals end of stream})constcommand=rpc.call('fs.createReadStream',['/path/to/file.txt'])conststream=rpc.createReadStream(command)constchunks[]stream.on('data',(data)=>{chunks.push(data)})stream.on('end',()=>console.log(Buffer.concat(chunks).toString()))

rpc.send(...)

rpc.send() is the low level function that makes a request for anexisting command, an extension, or an arbitrary buffer.

Sending An Extension

Sending an extension message requires you use the extension type and somevalue that the extension encodes.

rpc.send(EXTENSION_TYPE,'some value',(err,res)=>{// called when extension replies with a response})

Sending A Command

Sending a command requires you create an instance ofCommand with anencoding, command name, array of arguments, and a callback that iscalled when the command replies with a response

constcommand=newCommand(rpc.encoding,'echo',['hello'],(err,res)=>{// called when the command replies with a response})rpc.send(command.id,command,command.pack())

Sending An Arbitrary Message

Sending an arbitrary message is possible but replies cannot be linked tothe sent message and there is no guarantee that the message was read.

rpc.send(Buffer.from('hello'),(err)=>{// It is impossible for a reply as this is an arbitrary message})

rpc.cancel(command)

An alias torpc.fin(command).

rpc.fin(command)

Send aFin packet to the stream tied to the command request.

constcommand=rpc.call('echo',['hello','world'],console.log)rpc.fin(command)

rpc.extension(extensionType, encoding)

Extensions provide a way to extend the protocol with user suppliedbinary encodings. Therpc.extension(extensionType, encoding) functionaccepts an integerextensionType and anencoding object thatcontainsencode(value) anddecode(buffer) functions for encodingvalues to and from buffers.

Extensions can be used by making use of therpc.send() function thatexpects anextensionType, an array of arguments that will be encodedby the extension encoding and a callback that will be called when theextension replies with a response.

Extension Wire Interface

Extensionsmust provide a way of encoding anid into the extensionencoding which must be available afterdecode(buffer) is called onthe return value. Theid is used internally to track requests andresponse from callers.

If an extension provides a way to encode and decodeError properties,then they will be propagated to theerr argument in thereply(err)function.

Example Extension

constkeyPair=require('hypercore-crypto')constpbs=require('protocol-buffers')constKEY_PAIR_EXTENSION=0xfedconst{ KeyPair}=pbc(`  message KeyPair {    bytes id = 1;    bytes publicKey = 2;    bytes secretKey = 3;  }`)constserver=protocol()constclient=protocol()server.extension(KEY_PAIR_EXTENSION,KeyPair)client.extension(KEY_PAIR_EXTENSION,KeyPair)server.on('extension',(req,type,buffer,reply)=>{if(KEY_PAIR_EXTENSION===type){reply(null,keyPair())}})client.send(KEY_PAIR_EXTENSION,(err,res)=>{console.log(res)// { publicKey: <Buffer ...>, secretKey: <Buffer ...>})

License

MIT

About

Create and run commands over a RPC protocol stream

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Contributors3

  •  
  •  
  •  

[8]ページ先頭

©2009-2025 Movatter.jp