- Notifications
You must be signed in to change notification settings - Fork928
fix: Deadlock and race inpeer
, test improvements#3086
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Uh oh!
There was an error while loading.Please reload this page.
Changes fromall commits
eaacbca
5b5dde9
8061d4c
077b02e
0474aa9
5e33a9a
ca349ca
445e8b0
File filter
Filter by extension
Conversations
Uh oh!
There was an error while loading.Please reload this page.
Jump to
Uh oh!
There was an error while loading.Please reload this page.
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -106,12 +106,15 @@ func (c *Channel) init() { | ||
// write operations to block once the threshold is set. | ||
c.dc.SetBufferedAmountLowThreshold(bufferedAmountLowThreshold) | ||
c.dc.OnBufferedAmountLow(func() { | ||
// Grab the lock to protect the sendMore channel from being | ||
// closed in between the isClosed check and the send. | ||
c.closeMutex.Lock() | ||
deferc.closeMutex.Unlock() | ||
ifc.isClosed() { | ||
return | ||
} | ||
select { | ||
case<-c.closed: | ||
Comment on lines 113 to 117 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. Why do we need this check? Is it a problem if ifc.isClosed() {return} Idk, just seems redundant. Grabbing the closed lock doesn't seem necessary because of the same argument. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. Both (check and mutex) are needed because By holding the mutex, we ensure that closure doesn't happen between the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. Oh btw, I was also first of the impression that the check might not be needed, but without it the
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. Will a select send something on a closed channel? If so TIL. 👍 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others.Learn more. Yup, if you try to run this playground a couple of times, you will notice that sometimes it exits 0, and sometimes panics:https://go.dev/play/p/c35kE0948kl | ||
casec.sendMore<-struct{}{}: | ||
default: | ||
} | ||
@@ -122,15 +125,16 @@ func (c *Channel) init() { | ||
}) | ||
c.dc.OnOpen(func() { | ||
c.closeMutex.Lock() | ||
c.conn.logger().Debug(context.Background(),"datachannel opening",slog.F("id",c.dc.ID()),slog.F("label",c.dc.Label())) | ||
varerrerror | ||
c.rwc,err=c.dc.Detach() | ||
iferr!=nil { | ||
c.closeMutex.Unlock() | ||
_=c.closeWithError(xerrors.Errorf("detach: %w",err)) | ||
return | ||
} | ||
c.closeMutex.Unlock() | ||
// pion/webrtc will return an io.ErrShortBuffer when a read | ||
// is triggerred with a buffer size less than the chunks written. | ||
// | ||
@@ -189,9 +193,6 @@ func (c *Channel) init() { | ||
// | ||
// This will block until the underlying DataChannel has been opened. | ||
func (c*Channel)Read(bytes []byte) (int,error) { | ||
err:=c.waitOpened() | ||
iferr!=nil { | ||
return0,err | ||
@@ -228,9 +229,6 @@ func (c *Channel) Write(bytes []byte) (n int, err error) { | ||
c.writeMutex.Lock() | ||
deferc.writeMutex.Unlock() | ||
err=c.waitOpened() | ||
iferr!=nil { | ||
return0,err | ||
@@ -308,6 +306,10 @@ func (c *Channel) isClosed() bool { | ||
func (c*Channel)waitOpened()error { | ||
select { | ||
case<-c.opened: | ||
// Re-check the closed channel to prioritize closure. | ||
ifc.isClosed() { | ||
returnc.closeError | ||
} | ||
returnnil | ||
case<-c.closed: | ||
returnc.closeError | ||