Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit09d223a

Browse files
authored
Merge pull request#183 from calloway-jacob/fix-confirm-receipt-early
Fix race condition on confirms
2 parentse4711f3 +65674cf commit09d223a

File tree

3 files changed

+37
-12
lines changed

3 files changed

+37
-12
lines changed

‎channel.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1435,6 +1435,11 @@ func (ch *Channel) PublishWithDeferredConfirmWithContext(ctx context.Context, ex
14351435
ch.m.Lock()
14361436
deferch.m.Unlock()
14371437

1438+
vardc*DeferredConfirmation
1439+
ifch.confirming {
1440+
dc=ch.confirms.publish()
1441+
}
1442+
14381443
iferr:=ch.send(&basicPublish{
14391444
Exchange:exchange,
14401445
RoutingKey:key,
@@ -1457,14 +1462,13 @@ func (ch *Channel) PublishWithDeferredConfirmWithContext(ctx context.Context, ex
14571462
AppId:msg.AppId,
14581463
},
14591464
});err!=nil {
1465+
ifch.confirming {
1466+
ch.confirms.unpublish()
1467+
}
14601468
returnnil,err
14611469
}
14621470

1463-
ifch.confirming {
1464-
returnch.confirms.Publish(),nil
1465-
}
1466-
1467-
returnnil,nil
1471+
returndc,nil
14681472
}
14691473

14701474
/*

‎confirms.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,23 @@ func (c *confirms) Listen(l chan Confirmation) {
3939
}
4040

4141
// Publish increments the publishing counter
42-
func (c*confirms)Publish()*DeferredConfirmation {
42+
func (c*confirms)publish()*DeferredConfirmation {
4343
c.publishedMut.Lock()
4444
deferc.publishedMut.Unlock()
4545

4646
c.published++
4747
returnc.deferredConfirmations.Add(c.published)
4848
}
4949

50+
// unpublish decrements the publishing counter and removes the
51+
// DeferredConfirmation. It must be called immediately after a publish fails.
52+
func (c*confirms)unpublish() {
53+
c.publishedMut.Lock()
54+
deferc.publishedMut.Unlock()
55+
c.deferredConfirmations.remove(c.published)
56+
c.published--
57+
}
58+
5059
// confirm confirms one publishing, increments the expecting delivery tag, and
5160
// removes bookkeeping for that delivery tag.
5261
func (c*confirms)confirm(confirmationConfirmation) {
@@ -135,6 +144,18 @@ func (d *deferredConfirmations) Add(tag uint64) *DeferredConfirmation {
135144
returndc
136145
}
137146

147+
// remove is only used to drop a tag whose publish failed
148+
func (d*deferredConfirmations)remove(taguint64) {
149+
d.m.Lock()
150+
deferd.m.Unlock()
151+
dc,found:=d.confirmations[tag]
152+
if!found {
153+
return
154+
}
155+
close(dc.done)
156+
delete(d.confirmations,tag)
157+
}
158+
138159
func (d*deferredConfirmations)Confirm(confirmationConfirmation) {
139160
d.m.Lock()
140161
deferd.m.Unlock()

‎confirms_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ func TestConfirmOneResequences(t *testing.T) {
2626
c.Listen(l)
2727

2828
fori:=rangefixtures {
29-
ifwant,got:=uint64(i+1),c.Publish();want!=got.DeliveryTag {
29+
ifwant,got:=uint64(i+1),c.publish();want!=got.DeliveryTag {
3030
t.Fatalf("expected publish to return the 1 based delivery tag published, want: %d, got: %d",want,got.DeliveryTag)
3131
}
3232
}
@@ -64,7 +64,7 @@ func TestConfirmAndPublishDoNotDeadlock(t *testing.T) {
6464
}()
6565

6666
fori:=0;i<iterations;i++ {
67-
c.Publish()
67+
c.publish()
6868
<-l
6969
}
7070
}
@@ -82,7 +82,7 @@ func TestConfirmMixedResequences(t *testing.T) {
8282
c.Listen(l)
8383

8484
forrangefixtures {
85-
c.Publish()
85+
c.publish()
8686
}
8787

8888
c.One(fixtures[0])
@@ -117,7 +117,7 @@ func TestConfirmMultipleResequences(t *testing.T) {
117117
c.Listen(l)
118118

119119
forrangefixtures {
120-
c.Publish()
120+
c.publish()
121121
}
122122

123123
c.Multiple(fixtures[len(fixtures)-1])
@@ -141,7 +141,7 @@ func BenchmarkSequentialBufferedConfirms(t *testing.B) {
141141
ifi>cap(l)-1 {
142142
<-l
143143
}
144-
c.One(Confirmation{c.Publish().DeliveryTag,true})
144+
c.One(Confirmation{c.publish().DeliveryTag,true})
145145
}
146146
}
147147

@@ -159,7 +159,7 @@ func TestConfirmsIsThreadSafe(t *testing.T) {
159159
c.Listen(l)
160160

161161
fori:=0;i<count;i++ {
162-
gofunc() {pub<-Confirmation{c.Publish().DeliveryTag,true} }()
162+
gofunc() {pub<-Confirmation{c.publish().DeliveryTag,true} }()
163163
}
164164

165165
fori:=0;i<count;i++ {

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp