Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit655dce9

Browse files
feat: implement retry mechanism
Syntax:```jsconst socket = io({ retries: 3, ackTimeout: 10000});// "my-event" will be sent up to 4 times (1 + 3), until the server sends an acknowledgementsocket.emit("my-event", (err) => {});```Notes:- the order of the packets is guaranteed, as we send packets one by one- the same packet id is reused for consecutive retries, in order toallow deduplication on the server side
1 parent9f32925 commit655dce9

File tree

4 files changed

+187
-3
lines changed

4 files changed

+187
-3
lines changed

‎lib/socket.ts‎

Lines changed: 108 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,29 @@ export interface SocketOptions {
5555
/**
5656
* the authentication payload sent when connecting to the Namespace
5757
*/
58-
auth:{[key:string]:any}|((cb:(data:object)=>void)=>void);
58+
auth?:{[key:string]:any}|((cb:(data:object)=>void)=>void);
59+
/**
60+
* The maximum number of retries. Above the limit, the packet will be discarded.
61+
*
62+
* Using `Infinity` means the delivery guarantee is "at-least-once" (instead of "at-most-once" by default), but a
63+
* smaller value like 10 should be sufficient in practice.
64+
*/
65+
retries?:number;
66+
/**
67+
* The default timeout in milliseconds used when waiting for an acknowledgement.
68+
*/
69+
ackTimeout?:number;
5970
}
6071

72+
typeQueuedPacket={
73+
id:number;
74+
args:unknown[];
75+
flags:Flags;
76+
pending:boolean;
77+
tryCount:number;
78+
ack?:(err?:Error, ...args:unknown[])=>void;
79+
};
80+
6181
/**
6282
* Internal events.
6383
* These events can't be emitted by the user.
@@ -76,6 +96,7 @@ interface Flags {
7696
compress?:boolean;
7797
volatile?:boolean;
7898
timeout?:number;
99+
withRetry?:boolean;
79100
}
80101

81102
exporttypeDisconnectDescription=
@@ -198,8 +219,16 @@ export class Socket<
198219
* Buffer for packets that will be sent once the socket is connected
199220
*/
200221
publicsendBuffer:Array<Packet>=[];
222+
/**
223+
* The queue of packets to be sent with retry in case of failure.
224+
*
225+
* Packets are sent one by one, each waiting for the server acknowledgement, in order to guarantee the delivery order.
226+
*@private
227+
*/
228+
private_queue:Array<QueuedPacket>=[];
201229

202230
privatereadonlynsp:string;
231+
privatereadonly_opts:SocketOptions;
203232

204233
privateids:number=0;
205234
privateacks:object={};
@@ -218,6 +247,7 @@ export class Socket<
218247
if(opts&&opts.auth){
219248
this.auth=opts.auth;
220249
}
250+
this._opts=Object.assign({},opts);
221251
if(this.io._autoConnect)this.open();
222252
}
223253

@@ -350,6 +380,24 @@ export class Socket<
350380
}
351381

352382
args.unshift(ev);
383+
384+
if(this._opts.retries&&!this.flags.withRetry&&!this.flags.volatile){
385+
letack;
386+
if(typeofargs[args.length-1]==="function"){
387+
ack=args.pop();
388+
}
389+
this._queue.push({
390+
id:this.ids++,
391+
tryCount:0,
392+
pending:false,
393+
args,
394+
ack,
395+
flags:Object.assign({withRetry:true},this.flags),
396+
});
397+
this._drainQueue();
398+
returnthis;
399+
}
400+
353401
constpacket:any={
354402
type:PacketType.EVENT,
355403
data:args,
@@ -393,7 +441,7 @@ export class Socket<
393441
*@private
394442
*/
395443
private_registerAckCallback(id:number,ack:Function){
396-
consttimeout=this.flags.timeout;
444+
consttimeout=this.flags.timeout??this._opts.ackTimeout;
397445
if(timeout===undefined){
398446
this.acks[id]=ack;
399447
return;
@@ -440,7 +488,8 @@ export class Socket<
440488
...args:AllButLast<EventParams<EmitEvents,Ev>>
441489
):Promise<FirstArg<Last<EventParams<EmitEvents,Ev>>>>{
442490
// the timeout flag is optional
443-
constwithErr=this.flags.timeout!==undefined;
491+
constwithErr=
492+
this.flags.timeout!==undefined||this._opts.ackTimeout!==undefined;
444493
returnnewPromise((resolve,reject)=>{
445494
args.push((arg1,arg2)=>{
446495
if(withErr){
@@ -453,6 +502,62 @@ export class Socket<
453502
});
454503
}
455504

505+
/**
506+
* Send the first packet of the queue, and wait for an acknowledgement from the server.
507+
*@private
508+
*/
509+
private_drainQueue(){
510+
debug("draining queue");
511+
if(this._queue.length===0){
512+
return;
513+
}
514+
constpacket=this._queue[0];
515+
if(packet.pending){
516+
debug(
517+
"packet [%d] has already been sent and is waiting for an ack",
518+
packet.id
519+
);
520+
return;
521+
}
522+
packet.pending=true;
523+
packet.tryCount++;
524+
debug("sending packet [%d] (try n°%d)",packet.id,packet.tryCount);
525+
constcurrentId=this.ids;
526+
this.ids=packet.id;// the same id is reused for consecutive retries, in order to allow deduplication on the server side
527+
this.flags=packet.flags;
528+
packet.args.push((err, ...responseArgs)=>{
529+
if(packet!==this._queue[0]){
530+
// the packet has already been acknowledged
531+
return;
532+
}
533+
consthasError=err!==null;
534+
if(hasError){
535+
if(packet.tryCount>this._opts.retries){
536+
debug(
537+
"packet [%d] is discarded after %d tries",
538+
packet.id,
539+
packet.tryCount
540+
);
541+
this._queue.shift();
542+
if(packet.ack){
543+
packet.ack(err);
544+
}
545+
}
546+
}else{
547+
debug("packet [%d] was successfully sent",packet.id);
548+
this._queue.shift();
549+
if(packet.ack){
550+
packet.ack(null, ...responseArgs);
551+
}
552+
}
553+
packet.pending=false;
554+
returnthis._drainQueue();
555+
});
556+
557+
this.emit.apply(this,packet.args);
558+
this.ids=currentId;// restore offset
559+
}
560+
456561
/**
457562
* Sends a packet.
458563
*

‎test/index.ts‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@ import "./connection";
33
import"./socket";
44
import"./node";
55
import"./connection-state-recovery";
6+
import"./retry";

‎test/retry.ts‎

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
importexpectfrom"expect.js";
2+
import{io}from"..";
3+
import{wrap,BASE_URL,success}from"./support/util";
4+
5+
describe("retry",()=>{
6+
it("should preserve the order of the packets",()=>{
7+
returnwrap((done)=>{
8+
constsocket=io(BASE_URL,{
9+
forceNew:true,
10+
retries:1,
11+
ackTimeout:50,
12+
});
13+
14+
socket.emit("echo",1,()=>{
15+
//@ts-ignore
16+
expect(socket._queue.length).to.eql(2);
17+
});
18+
19+
//@ts-ignore
20+
expect(socket._queue.length).to.eql(1);
21+
22+
socket.emit("echo",2,()=>{
23+
//@ts-ignore
24+
expect(socket._queue.length).to.eql(1);
25+
});
26+
27+
//@ts-ignore
28+
expect(socket._queue.length).to.eql(2);
29+
30+
socket.emit("echo",3,(err,val)=>{
31+
expect(err).to.be(null);
32+
expect(val).to.eql(3);
33+
//@ts-ignore
34+
expect(socket._queue.length).to.eql(0);
35+
36+
success(done,socket);
37+
});
38+
39+
//@ts-ignore
40+
expect(socket._queue.length).to.eql(3);
41+
});
42+
});
43+
44+
it("should fail when the server does not acknowledge the packet",()=>{
45+
returnwrap((done)=>{
46+
constsocket=io(BASE_URL,{
47+
forceNew:true,
48+
retries:3,
49+
ackTimeout:50,
50+
});
51+
52+
letcount=0;
53+
54+
socket.emit("ack",()=>{
55+
expect(count).to.eql(4);
56+
57+
success(done,socket);
58+
});
59+
60+
socket.on("ack",()=>{
61+
count++;
62+
});
63+
});
64+
});
65+
});

‎test/socket.ts‎

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -618,5 +618,18 @@ describe("socket", () => {
618618
}
619619
});
620620
});
621+
622+
it("should use the default value",()=>{
623+
returnwrap((done)=>{
624+
constsocket=io(BASE_URL+"/",{
625+
ackTimeout:50,
626+
});
627+
628+
socket.emit("unknown",(err)=>{
629+
expect(err).to.be.an(Error);
630+
success(done,socket);
631+
});
632+
});
633+
});
621634
});
622635
});

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp