Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commite2661b5

Browse files
Addwrite_timeout policy configuration option (#7499)
This adds a new `write_timeout` configuration option which controls theslow consumer behaviour more precisely with what happens when the`write_deadline` is reached. This option can be set for clients, routes,gateways or leafnodes independently. Three values are supported:* `default`: functionality as it stands today, `retry` forroute/gateway/leafnode, `close` for client* `retry`: when hitting the `write_deadline`, keep retrying writes untileither we succeed or the ping interval hits* `close`: when hitting the `write_deadline`, close the connectionimmediatelySigned-off-by: Neil Twigg <neil@nats.io>
2 parents5d63c16 +ab6896a commite2661b5

File tree

10 files changed

+441
-81
lines changed

10 files changed

+441
-81
lines changed

‎server/client.go‎

Lines changed: 49 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,26 @@ const (
237237
pmrMsgImportedFromService
238238
)
239239

240+
typeWriteTimeoutPolicyuint8
241+
242+
const (
243+
WriteTimeoutPolicyDefault=iota
244+
WriteTimeoutPolicyClose
245+
WriteTimeoutPolicyRetry
246+
)
247+
248+
// String returns a human-friendly value. Only used in varz.
249+
func (pWriteTimeoutPolicy)String()string {
250+
switchp {
251+
caseWriteTimeoutPolicyClose:
252+
return"close"
253+
caseWriteTimeoutPolicyRetry:
254+
return"retry"
255+
default:
256+
return_EMPTY_
257+
}
258+
}
259+
240260
typeclientstruct {
241261
// Here first because of use of atomics, and memory alignment.
242262
stats
@@ -328,15 +348,16 @@ type pinfo struct {
328348

329349
// outbound holds pending data for a socket.
330350
typeoutboundstruct {
331-
nb net.Buffers// Pending buffers for send, each has fixed capacity as per nbPool below.
332-
wnb net.Buffers// Working copy of "nb", reused on each flushOutbound call, partial writes may leave entries here for next iteration.
333-
pbint64// Total pending/queued bytes.
334-
fspint32// Flush signals that are pending per producer from readLoop's pcd.
335-
sg*sync.Cond// To signal writeLoop that there is data to flush.
336-
wdl time.Duration// Snapshot of write deadline.
337-
mpint64// Snapshot of max pending for client.
338-
lft time.Duration// Last flush time for Write.
339-
stcchanstruct{}// Stall chan we create to slow down producers on overrun, e.g. fan-in.
351+
nb net.Buffers// Pending buffers for send, each has fixed capacity as per nbPool below.
352+
wnb net.Buffers// Working copy of "nb", reused on each flushOutbound call, partial writes may leave entries here for next iteration.
353+
pbint64// Total pending/queued bytes.
354+
fspint32// Flush signals that are pending per producer from readLoop's pcd.
355+
wtpWriteTimeoutPolicy// What do we do on a write timeout?
356+
sg*sync.Cond// To signal writeLoop that there is data to flush.
357+
wdl time.Duration// Snapshot of write deadline.
358+
mpint64// Snapshot of max pending for client.
359+
lft time.Duration// Last flush time for Write.
360+
stcchanstruct{}// Stall chan we create to slow down producers on overrun, e.g. fan-in.
340361
cw*s2.Writer
341362
}
342363

@@ -698,6 +719,24 @@ func (c *client) initClient() {
698719
casec.kind==LEAF&&opts.LeafNode.WriteDeadline>0:
699720
c.out.wdl=opts.LeafNode.WriteDeadline
700721
}
722+
switchc.kind {
723+
caseROUTER:
724+
ifc.out.wtp=opts.Cluster.WriteTimeout;c.out.wtp==WriteTimeoutPolicyDefault {
725+
c.out.wtp=WriteTimeoutPolicyRetry
726+
}
727+
caseLEAF:
728+
ifc.out.wtp=opts.LeafNode.WriteTimeout;c.out.wtp==WriteTimeoutPolicyDefault {
729+
c.out.wtp=WriteTimeoutPolicyRetry
730+
}
731+
caseGATEWAY:
732+
ifc.out.wtp=opts.Gateway.WriteTimeout;c.out.wtp==WriteTimeoutPolicyDefault {
733+
c.out.wtp=WriteTimeoutPolicyRetry
734+
}
735+
default:
736+
ifc.out.wtp=opts.WriteTimeout;c.out.wtp==WriteTimeoutPolicyDefault {
737+
c.out.wtp=WriteTimeoutPolicyClose
738+
}
739+
}
701740
c.out.mp=opts.MaxPending
702741
// Snapshot max control line since currently can not be changed on reload and we
703742
// were checking it on each call to parse. If this changes and we allow MaxControlLine
@@ -1849,7 +1888,7 @@ func (c *client) handleWriteTimeout(written, attempted int64, numChunks int) boo
18491888
scState,c.out.wdl,numChunks,attempted)
18501889

18511890
// We always close CLIENT connections, or when nothing was written at all...
1852-
ifc.kind==CLIENT||written==0 {
1891+
ifc.out.wtp==WriteTimeoutPolicyClose||written==0 {
18531892
c.markConnAsClosed(SlowConsumerWriteDeadline)
18541893
returntrue
18551894
}else {

‎server/client_test.go‎

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3686,3 +3686,112 @@ func TestLogConnectionAuthInfo(t *testing.T) {
36863686
}
36873687
})
36883688
}
3689+
3690+
funcTestClientConfigureWriteTimeoutPolicy(t*testing.T) {
3691+
forname,policy:=rangemap[string]WriteTimeoutPolicy{
3692+
"Default":WriteTimeoutPolicyDefault,
3693+
"Retry":WriteTimeoutPolicyRetry,
3694+
"Close":WriteTimeoutPolicyClose,
3695+
} {
3696+
t.Run(name,func(t*testing.T) {
3697+
opts:=DefaultOptions()
3698+
opts.WriteTimeout=policy
3699+
s:=RunServer(opts)
3700+
defers.Shutdown()
3701+
3702+
nc:=natsConnect(t,fmt.Sprintf("nats://%s:%d",opts.Host,opts.Port))
3703+
defernc.Close()
3704+
3705+
s.mu.RLock()
3706+
defers.mu.RUnlock()
3707+
3708+
for_,r:=ranges.clients {
3709+
ifpolicy==WriteTimeoutPolicyDefault {
3710+
require_Equal(t,r.out.wtp,WriteTimeoutPolicyClose)
3711+
}else {
3712+
require_Equal(t,r.out.wtp,policy)
3713+
}
3714+
}
3715+
})
3716+
}
3717+
}
3718+
3719+
// TestClientFlushOutboundWriteTimeoutPolicy relies on specifically having
3720+
// written at least one byte in order to not trip the "written == 0" close
3721+
// condition, so just setting an unrealistically low write deadline won't
3722+
// work. Instead what we'll do is write the first byte very quickly and then
3723+
// slow down, so that we can trip a more honest slow consumer condition.
3724+
typewriteTimeoutPolicyWriterstruct {
3725+
net.Conn
3726+
deadline time.Time
3727+
writtenint
3728+
}
3729+
3730+
func (w*writeTimeoutPolicyWriter)SetWriteDeadline(deadline time.Time)error {
3731+
w.deadline=deadline
3732+
returnw.Conn.SetWriteDeadline(deadline)
3733+
}
3734+
3735+
func (w*writeTimeoutPolicyWriter)Write(b []byte) (int,error) {
3736+
ifw.written==0 {
3737+
w.written++
3738+
returnw.Conn.Write(b[:1])
3739+
}
3740+
time.Sleep(time.Until(w.deadline)+10*time.Millisecond)
3741+
returnw.Conn.Write(b)
3742+
}
3743+
3744+
funcTestClientFlushOutboundWriteTimeoutPolicy(t*testing.T) {
3745+
forname,policy:=rangemap[string]WriteTimeoutPolicy{
3746+
"Retry":WriteTimeoutPolicyRetry,
3747+
"Close":WriteTimeoutPolicyClose,
3748+
} {
3749+
t.Run(name,func(t*testing.T) {
3750+
opts:=DefaultOptions()
3751+
opts.PingInterval=250*time.Millisecond
3752+
opts.WriteDeadline=100*time.Millisecond
3753+
opts.WriteTimeout=policy
3754+
s:=RunServer(opts)
3755+
defers.Shutdown()
3756+
3757+
nc1:=natsConnect(t,fmt.Sprintf("nats://%s:%d",opts.Host,opts.Port))
3758+
defernc1.Close()
3759+
3760+
_,err:=nc1.Subscribe("test",func(_*nats.Msg) {})
3761+
require_NoError(t,err)
3762+
3763+
nc2:=natsConnect(t,fmt.Sprintf("nats://%s:%d",opts.Host,opts.Port))
3764+
defernc2.Close()
3765+
3766+
cid,err:=nc1.GetClientID()
3767+
require_NoError(t,err)
3768+
3769+
client:=s.getClient(cid)
3770+
client.mu.Lock()
3771+
client.out.wdl=100*time.Millisecond
3772+
client.nc=&writeTimeoutPolicyWriter{Conn:client.nc}
3773+
client.mu.Unlock()
3774+
3775+
require_NoError(t,nc2.Publish("test",make([]byte,1024*1024)))
3776+
3777+
checkFor(t,5*time.Second,10*time.Millisecond,func()error {
3778+
client.mu.Lock()
3779+
deferclient.mu.Unlock()
3780+
switch {
3781+
case!client.flags.isSet(connMarkedClosed):
3782+
returnfmt.Errorf("connection not closed yet")
3783+
casepolicy==WriteTimeoutPolicyRetry&&client.flags.isSet(isSlowConsumer):
3784+
// Retry policy should have marked the client as a slow consumer and
3785+
// continued to retry flushes.
3786+
returnnil
3787+
casepolicy==WriteTimeoutPolicyClose&&!client.flags.isSet(isSlowConsumer):
3788+
// Close policy shouldn't have marked the client as a slow consumer,
3789+
// it will just close it instead.
3790+
returnnil
3791+
default:
3792+
returnfmt.Errorf("client not in correct state yet")
3793+
}
3794+
})
3795+
})
3796+
}
3797+
}

‎server/gateway_test.go‎

Lines changed: 50 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7537,7 +7537,6 @@ func TestGatewayConfigureWriteDeadline(t *testing.T) {
75377537
defers1.Shutdown()
75387538

75397539
o2:=testGatewayOptionsFromToWithServers(t,"A","B",s1)
7540-
o2.Gateway.WriteDeadline=6*time.Second
75417540
s2:=runGatewayServer(o2)
75427541
defers2.Shutdown()
75437542

@@ -7546,17 +7545,60 @@ func TestGatewayConfigureWriteDeadline(t *testing.T) {
75467545
waitForOutboundGateways(t,s1,1,time.Second)
75477546

75487547
s1.mu.RLock()
7549-
s2.mu.RLock()
75507548
defers1.mu.RUnlock()
7551-
defers2.mu.RUnlock()
75527549

7553-
s1.forEachRemote(func(r*client) {
7554-
require_Equal(t,r.out.wdl,6*time.Second)
7555-
})
7550+
for_,r:=ranges1.gateway.out {
7551+
require_Equal(t,r.out.wdl,5*time.Second)
7552+
}
75567553

7557-
s2.forEachRemote(func(r*client) {
7554+
for_,r:=ranges1.gateway.in {
75587555
require_Equal(t,r.out.wdl,5*time.Second)
7559-
})
7556+
}
7557+
}
7558+
7559+
funcTestGatewayConfigureWriteTimeoutPolicy(t*testing.T) {
7560+
forname,policy:=rangemap[string]WriteTimeoutPolicy{
7561+
"Default":WriteTimeoutPolicyDefault,
7562+
"Retry":WriteTimeoutPolicyRetry,
7563+
"Close":WriteTimeoutPolicyClose,
7564+
} {
7565+
t.Run(name,func(t*testing.T) {
7566+
o1:=testDefaultOptionsForGateway("B")
7567+
o1.Gateway.WriteTimeout=policy
7568+
s1:=runGatewayServer(o1)
7569+
defers1.Shutdown()
7570+
7571+
o2:=testGatewayOptionsFromToWithServers(t,"A","B",s1)
7572+
s2:=runGatewayServer(o2)
7573+
defers2.Shutdown()
7574+
7575+
waitForOutboundGateways(t,s2,1,time.Second)
7576+
waitForInboundGateways(t,s1,1,time.Second)
7577+
waitForOutboundGateways(t,s1,1,time.Second)
7578+
7579+
s1.mu.RLock()
7580+
defers1.mu.RUnlock()
7581+
7582+
s1.gateway.RLock()
7583+
defers1.gateway.RUnlock()
7584+
7585+
for_,r:=ranges1.gateway.out {
7586+
ifpolicy==WriteTimeoutPolicyDefault {
7587+
require_Equal(t,r.out.wtp,WriteTimeoutPolicyRetry)
7588+
}else {
7589+
require_Equal(t,r.out.wtp,policy)
7590+
}
7591+
}
7592+
7593+
for_,r:=ranges1.gateway.in {
7594+
ifpolicy==WriteTimeoutPolicyDefault {
7595+
require_Equal(t,r.out.wtp,WriteTimeoutPolicyRetry)
7596+
}else {
7597+
require_Equal(t,r.out.wtp,policy)
7598+
}
7599+
}
7600+
})
7601+
}
75607602
}
75617603

75627604
funcTestGatewayProcessRSubNoBlockingAccountFetch(t*testing.T) {

‎server/leafnode_test.go‎

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10810,25 +10810,52 @@ func TestLeafNodeConfigureWriteDeadline(t *testing.T) {
1081010810

1081110811
s1URL,_:=url.Parse(fmt.Sprintf("nats://127.0.0.1:%d",o1.LeafNode.Port))
1081210812
o2.Cluster.Name="somethingelse"
10813-
o2.LeafNode.WriteDeadline=6*time.Second
1081410813
o2.LeafNode.Remotes= []*RemoteLeafOpts{{URLs: []*url.URL{s1URL}}}
1081510814
s2:=RunServer(o2)
1081610815
defers2.Shutdown()
1081710816

1081810817
checkLeafNodeConnected(t,s2)
1081910818

1082010819
s1.mu.RLock()
10821-
s2.mu.RLock()
1082210820
defers1.mu.RUnlock()
10823-
defers2.mu.RUnlock()
1082410821

10825-
s1.forEachRemote(func(r*client) {
10822+
for_,r:=ranges1.leafs {
1082610823
require_Equal(t,r.out.wdl,5*time.Second)
10827-
})
10824+
}
10825+
}
1082810826

10829-
s2.forEachRemote(func(r*client) {
10830-
require_Equal(t,r.out.wdl,6*time.Second)
10831-
})
10827+
funcTestLeafNodeConfigureWriteTimeoutPolicy(t*testing.T) {
10828+
forname,policy:=rangemap[string]WriteTimeoutPolicy{
10829+
"Default":WriteTimeoutPolicyDefault,
10830+
"Retry":WriteTimeoutPolicyRetry,
10831+
"Close":WriteTimeoutPolicyClose,
10832+
} {
10833+
t.Run(name,func(t*testing.T) {
10834+
o1:=testDefaultOptionsForGateway("B")
10835+
o1.Gateway.WriteTimeout=policy
10836+
s1:=runGatewayServer(o1)
10837+
defers1.Shutdown()
10838+
10839+
o2:=testGatewayOptionsFromToWithServers(t,"A","B",s1)
10840+
s2:=runGatewayServer(o2)
10841+
defers2.Shutdown()
10842+
10843+
waitForOutboundGateways(t,s2,1,time.Second)
10844+
waitForInboundGateways(t,s1,1,time.Second)
10845+
waitForOutboundGateways(t,s1,1,time.Second)
10846+
10847+
s1.mu.RLock()
10848+
defers1.mu.RUnlock()
10849+
10850+
for_,r:=ranges1.leafs {
10851+
ifpolicy==WriteTimeoutPolicyDefault {
10852+
require_Equal(t,r.out.wtp,WriteTimeoutPolicyRetry)
10853+
}else {
10854+
require_Equal(t,r.out.wtp,policy)
10855+
}
10856+
}
10857+
})
10858+
}
1083210859
}
1083310860

1083410861
// https://github.com/nats-io/nats-server/issues/7441

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp