Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitd49f3dd

Browse files
dsymeKevinRansom
authored andcommitted
fix cancellation for PostAndAsyncReply (dotnet#4477)
* fix cancellation for PostAndAsyncReply* fix tests
1 parent47e74f5 commitd49f3dd

File tree

3 files changed

+116
-72
lines changed

3 files changed

+116
-72
lines changed

‎src/fsharp/FSharp.Core/control.fs‎

Lines changed: 74 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -329,21 +329,13 @@ namespace Microsoft.FSharp.Control
329329

330330

331331

332-
// Reify exceptional results as exceptions
332+
/// Reify exceptional results as exceptions
333333
letcommit res=
334334
match reswith
335335
| Ok res-> res
336336
| Error edi-> edi.ThrowAny()
337337
| Canceled exn-> raise exn
338338

339-
// Reify exceptional results as exceptionsJIT 64 doesn't always take tailcalls correctly
340-
341-
letcommitWithPossibleTimeout res=
342-
match reswith
343-
| None-> raise(System.TimeoutException())
344-
| Some res-> commit res
345-
346-
347339
//----------------------------------
348340
// PRIMITIVE ASYNC INVOCATION
349341

@@ -713,11 +705,15 @@ namespace Microsoft.FSharp.Control
713705
[<AutoSerializable(false)>]
714706
typeResultCell<'T>()=
715707
let mutableresult= None
708+
716709
// The continuations for the result
717710
let mutablesavedConts:list<SuspendedAsync<'T>>=[]
711+
718712
// The WaitHandle event for the result. Only created if needed, and set to null when disposed.
719713
let mutableresEvent=null
714+
720715
let mutabledisposed=false
716+
721717
// All writers of result are protected by lock on syncRoot.
722718
letsyncRoot=new Object()
723719

@@ -752,13 +748,11 @@ namespace Microsoft.FSharp.Control
752748
interface IDisposablewith
753749
memberx.Dispose()= x.Close()// ; System.GC.SuppressFinalize(x)
754750

755-
756751
memberx.GrabResult()=
757752
match resultwith
758753
| Some res-> res
759754
| None-> failwith"Unexpected no result"
760755

761-
762756
/// Record the result in the ResultCell.
763757
memberx.RegisterResult(res:'T,reuseThread)=
764758
letgrabbedConts=
@@ -795,7 +789,10 @@ namespace Microsoft.FSharp.Control
795789

796790
memberx.ResultAvailable= result.IsSome
797791

798-
memberx.AwaitResult=
792+
/// Await the result of a result cell, without a direct timeout or direct
793+
/// cancellation. That is, the underlying computation must fill the result
794+
/// if cancellation or timeout occurs.
795+
memberx.AwaitResult_NoDirectCancelOrTimeout=
799796
unprotectedPrimitive(fun args->
800797
// Check if a result is available synchronously
801798
letresOpt=
@@ -860,10 +857,10 @@ namespace Microsoft.FSharp.Control
860857
// If timeout is provided, we govern the async by our own CTS, to cancel
861858
// when execution times out. Otherwise, the user-supplied token governs the async.
862859
match timeoutwith
863-
|None-> token,None
864-
|Some_->
865-
letsubSource=new LinkedSubSource(token)
866-
subSource.Token, Some subSource
860+
| None-> token,None
861+
| Some_->
862+
letsubSource=new LinkedSubSource(token)
863+
subSource.Token, Some subSource
867864

868865
use resultCell=new ResultCell<AsyncImplResult<_>>()
869866
queueAsync
@@ -1252,7 +1249,8 @@ namespace Microsoft.FSharp.Control
12521249
aux.econt edi
12531250
)
12541251

1255-
static memberAwaitWaitHandle(waitHandle:WaitHandle,?millisecondsTimeout:int)=
1252+
/// Wait for a wait handle. Both timeout and cancellation are supported
1253+
static memberAwaitWaitHandle(waitHandle:WaitHandle,?millisecondsTimeout:int)=
12561254
letmillisecondsTimeout= defaultArg millisecondsTimeout Threading.Timeout.Infinite
12571255
if millisecondsTimeout=0then
12581256
async.Delay(fun()->
@@ -1312,61 +1310,61 @@ namespace Microsoft.FSharp.Control
13121310
return! Async.AwaitWaitHandle(iar.AsyncWaitHandle, ?millisecondsTimeout=millisecondsTimeout)}
13131311

13141312

1315-
///Await the result of a result cell without a timeout
1316-
static memberReifyResult(result:AsyncImplResult<'T>):Async<'T>=
1313+
///Bind the result of a result cell, calling the appropriate continuation.
1314+
static memberBindResult(result:AsyncImplResult<'T>):Async<'T>=
13171315
unprotectedPrimitive(fun({aux=aux}asargs)->
13181316
(match resultwith
13191317
| Ok v-> args.cont v
13201318
| Error exn-> aux.econt exn
13211319
| Canceled exn-> aux.ccont exn))
13221320

1323-
/// Await the result of a result cell without a timeout
1324-
static memberAwaitAndReifyResult(resultCell:ResultCell<AsyncImplResult<'T>>):Async<'T>=
1321+
/// Await and use the result of a result cell. The resulting async doesn't support cancellation
1322+
/// or timeout directly, rather the underlying computation must fill the result if cancellation
1323+
/// or timeout occurs.
1324+
static memberAwaitAndBindResult_NoDirectCancelOrTimeout(resultCell:ResultCell<AsyncImplResult<'T>>):Async<'T>=
13251325
async{
1326-
let!result= resultCell.AwaitResult
1327-
return! Async.ReifyResult(result)
1326+
let!result= resultCell.AwaitResult_NoDirectCancelOrTimeout
1327+
return! Async.BindResult(result)
13281328
}
1329-
1330-
13311329

1332-
/// Await the result of a result cell without a timeout
1333-
///
1334-
/// Always resyncs to the synchronization context if needed, by virtue of it being built
1335-
/// from primitives which resync.
1336-
static memberAsyncWaitAsyncWithTimeout(innerCTS:CancellationTokenSource,resultCell:ResultCell<AsyncImplResult<'T>>,millisecondsTimeout):Async<'T>=
1330+
/// Await the result of a result cell belonging to a child computation. The resulting async supports timeout and if
1331+
/// it happens the child computation will be cancelled. The resulting async doesn't support cancellation
1332+
/// directly, rather the underlying computation must fill the result if cancellation occurs.
1333+
static memberAwaitAndBindChildResult(innerCTS:CancellationTokenSource,resultCell:ResultCell<AsyncImplResult<'T>>,millisecondsTimeout):Async<'T>=
13371334
match millisecondsTimeoutwith
13381335
| None| Some-1->
1339-
resultCell|> Async.AwaitAndReifyResult
1336+
resultCell|> Async.AwaitAndBindResult_NoDirectCancelOrTimeout
13401337

13411338
| Some0->
13421339
async{if resultCell.ResultAvailablethen
13431340
return commit(resultCell.GrabResult())
13441341
else
1345-
returncommitWithPossibleTimeout None}
1342+
returnraise(System.TimeoutException())}
13461343
|_->
13471344
async{try
13481345
if resultCell.ResultAvailablethen
13491346
return commit(resultCell.GrabResult())
13501347
else
1351-
let!ok= Async.AwaitWaitHandle(resultCell.GetWaitHandle(),?millisecondsTimeout=millisecondsTimeout)
1348+
let!ok= Async.AwaitWaitHandle(resultCell.GetWaitHandle(),?millisecondsTimeout=millisecondsTimeout)
13521349
if okthen
1353-
returncommitWithPossibleTimeout(Some(resultCell.GrabResult()))
1350+
returncommit(resultCell.GrabResult())
13541351
else// timed out
13551352
// issue cancellation signal
13561353
innerCTS.Cancel()
13571354
// wait for computation to quiesce
13581355
let!_= Async.AwaitWaitHandle(resultCell.GetWaitHandle())
1359-
returncommitWithPossibleTimeout None
1356+
returnraise(System.TimeoutException())
13601357
finally
13611358
resultCell.Close()}
13621359

13631360

1364-
static memberFromBeginEnd(beginAction,endAction,?cancelAction):Async<'T>=
1361+
static memberFromBeginEnd(beginAction,endAction,?cancelAction):Async<'T>=
13651362
async{let!cancellationToken= getCancellationToken()
13661363
letresultCell=new ResultCell<_>()
13671364

13681365
letonce= Once()
13691366
letregistration:CancellationTokenRegistration=
1367+
13701368
letonCancel(_:obj)=
13711369
// Call the cancellation routine
13721370
match cancelActionwith
@@ -1381,7 +1379,9 @@ namespace Microsoft.FSharp.Control
13811379
// If we get an exception from a cooperative cancellation function
13821380
// we assume the operation has already completed.
13831381
try cancel()with_->()
1382+
13841383
cancellationToken.Register(Action<obj>(onCancel),null)
1384+
13851385
letcallback=
13861386
new System.AsyncCallback(fun iar->
13871387
ifnot iar.CompletedSynchronouslythen
@@ -1405,15 +1405,15 @@ namespace Microsoft.FSharp.Control
14051405
// ResultCell allows a race and throws away whichever comes last.
14061406
resultCell.RegisterResult(res,reuseThread=true)|> unfake
14071407
else())
1408-
1409-
14101408

14111409
let(iar:IAsyncResult)= beginAction(callback,(null:obj))
14121410
if iar.CompletedSynchronouslythen
14131411
registration.Dispose()
14141412
return endAction iar
14151413
else
1416-
return! Async.AwaitAndReifyResult(resultCell)}
1414+
// Note: ok to use "NoDirectCancel" here because cancellation has been registered above
1415+
// Note: ok to use "NoDirectTimeout" here because no timeout parameter to this method
1416+
return! Async.AwaitAndBindResult_NoDirectCancelOrTimeout(resultCell)}
14171417

14181418

14191419
static memberFromBeginEnd(arg,beginAction,endAction,?cancelAction):Async<'T>=
@@ -1567,7 +1567,9 @@ namespace Microsoft.FSharp.Control
15671567
event.AddHandler(del)
15681568

15691569
// Return the async computation that allows us to await the result
1570-
return! Async.AwaitAndReifyResult(resultCell)}
1570+
// Note: ok to use "NoDirectCancel" here because cancellation has been registered above
1571+
// Note: ok to use "NoDirectTimeout" here because no timeout parameter to this method
1572+
return! Async.AwaitAndBindResult_NoDirectCancelOrTimeout(resultCell)}
15711573

15721574
typeAsyncwith
15731575
static memberIgnore(computation:Async<'T>)= bindA computation(fun _-> doneA)
@@ -1597,7 +1599,7 @@ namespace Microsoft.FSharp.Control
15971599
computation
15981600
|> unfake
15991601

1600-
return Async.AsyncWaitAsyncWithTimeout(innerCTS, resultCell,millisecondsTimeout)}
1602+
return Async.AwaitAndBindChildResult(innerCTS, resultCell,millisecondsTimeout)}
16011603

16021604
static memberSwitchToContext syncContext=
16031605
async{match syncContextwith
@@ -1681,10 +1683,6 @@ namespace Microsoft.FSharp.Control
16811683
Async.FromBeginEnd(buffer,offset,count,stream.BeginWrite,stream.EndWrite)
16821684
#endif
16831685

1684-
typeSystem.Threading.WaitHandlewith
1685-
memberwaitHandle.AsyncWaitOne(?millisecondsTimeout:int)=// only used internally, not a public API
1686-
Async.AwaitWaitHandle(waitHandle,?millisecondsTimeout=millisecondsTimeout)
1687-
16881686
typeIObservable<'Args>with
16891687

16901688
[<CompiledName("AddToObservable")>]// give the extension member a 'nice', unmangled compiled name, unique within this module
@@ -1715,7 +1713,7 @@ namespace Microsoft.FSharp.Control
17151713
|:? System.Net.WebExceptionas webExn
17161714
when webExn.Status= System.Net.WebExceptionStatus.RequestCanceled&&!canceled->
17171715

1718-
Async.ReifyResult(AsyncImplResult.Canceled(OperationCanceledException webExn.Message))
1716+
Async.BindResult(AsyncImplResult.Canceled(OperationCanceledException webExn.Message))
17191717
|_->
17201718
edi.ThrowAny())
17211719

@@ -1791,7 +1789,10 @@ namespace Microsoft.FSharp.Control
17911789
)
17921790
start a1 Choice1Of2
17931791
start a2 Choice2Of2
1794-
let!result= c.AwaitResult
1792+
// Note: It is ok to use "NoDirectCancel" here because the started computations use the same
1793+
// cancellation token and will register a cancelled result if cancellation occurs.
1794+
// Note: It is ok to use "NoDirectTimeout" here because there is no specific timeout log to this routine.
1795+
let!result= c.AwaitResult_NoDirectCancelOrTimeout
17951796
return! reify result
17961797
}
17971798
lettimeout msec cancellationToken=
@@ -1805,7 +1806,10 @@ namespace Microsoft.FSharp.Control
18051806
exceptionContinuation=ignore,
18061807
cancellationContinuation=ignore,
18071808
cancellationToken= cancellationToken)
1808-
c.AwaitResult
1809+
// Note: It is ok to use "NoDirectCancel" here because the started computations use the same
1810+
// cancellation token and will register a cancelled result if cancellation occurs.
1811+
// Note: It is ok to use "NoDirectTimeout" here because the child compuation above looks after the timeout.
1812+
c.AwaitResult_NoDirectCancelOrTimeout
18091813

18101814
[<Sealed>]
18111815
[<AutoSerializable(false)>]
@@ -1854,7 +1858,7 @@ namespace Microsoft.FSharp.Control
18541858
failwith"multiple waiting reader continuations for mailbox")
18551859

18561860
letwaitOneWithCancellation(timeout)=
1857-
ensurePulse().AsyncWaitOne(millisecondsTimeout=timeout)
1861+
Async.AwaitWaitHandle(ensurePulse(),millisecondsTimeout=timeout)
18581862

18591863
letwaitOne(timeout)=
18601864
if timeout<0&&not cancellationSupportedthen
@@ -2125,36 +2129,34 @@ namespace Microsoft.FSharp.Control
21252129
letmsg= buildMessage(new AsyncReplyChannel<_>(fun reply->
21262130
// Note the ResultCell may have been disposed if the operation
21272131
// timed out. In this case RegisterResult drops the result on the floor.
2128-
resultCell.RegisterResult(reply,reuseThread=false)|> unfake))
2132+
resultCell.RegisterResult(reply,reuseThread=false)|> unfake))
21292133
mailbox.Post(msg)
21302134
match timeoutwith
2131-
| Threading.Timeout.Infinite->
2132-
async{let!result= resultCell.AwaitResult
2133-
return Some(result)
2134-
}
2135+
| Threading.Timeout.Infinitewhennot cancellationSupported->
2136+
async{let!result= resultCell.AwaitResult_NoDirectCancelOrTimeout
2137+
return Some result}
21352138

2136-
|_->
2137-
async{use _disposeCell= resultCell
2138-
let!ok= resultCell.GetWaitHandle().AsyncWaitOne(millisecondsTimeout=timeout)
2139-
letres=(if okthen Some(resultCell.GrabResult())else None)
2140-
return res}
2139+
|_->
2140+
async{use _disposeCell= resultCell
2141+
let!ok=Async.AwaitWaitHandle(resultCell.GetWaitHandle(),millisecondsTimeout=timeout)
2142+
letres=(if okthen Some(resultCell.GrabResult())else None)
2143+
return res}
21412144

21422145
memberx.PostAndAsyncReply(buildMessage,?timeout:int)=
21432146
lettimeout= defaultArg timeout defaultTimeout
21442147
match timeoutwith
2145-
| Threading.Timeout.Infinite->
2146-
// Nothing to dispose, no wait handles used
2147-
letresultCell=new ResultCell<_>()
2148-
letmsg= buildMessage(new AsyncReplyChannel<_>(fun reply-> resultCell.RegisterResult(reply,reuseThread=false)|> unfake))
2149-
mailbox.Post(msg)
2150-
resultCell.AwaitResult
2151-
|_->
2152-
letasyncReply= x.PostAndTryAsyncReply(buildMessage,timeout=timeout)
2153-
async{let!res= asyncReply
2154-
match reswith
2155-
| None->return! raise(TimeoutException(SR.GetString(SR.mailboxProcessorPostAndAsyncReplyTimedOut)))
2156-
| Some res->return res
2157-
}
2148+
| Threading.Timeout.Infinitewhennot cancellationSupported->
2149+
// Nothing to dispose, no wait handles used
2150+
letresultCell=new ResultCell<_>()
2151+
letmsg= buildMessage(new AsyncReplyChannel<_>(fun reply-> resultCell.RegisterResult(reply,reuseThread=false)|> unfake))
2152+
mailbox.Post(msg)
2153+
resultCell.AwaitResult_NoDirectCancelOrTimeout
2154+
|_->
2155+
letasyncReply= x.PostAndTryAsyncReply(buildMessage,timeout=timeout)
2156+
async{let!res= asyncReply
2157+
match reswith
2158+
| None->return! raise(TimeoutException(SR.GetString(SR.mailboxProcessorPostAndAsyncReplyTimedOut)))
2159+
| Some res->return res}
21582160

21592161
memberx.Receive(?timeout)= mailbox.Receive(timeout=defaultArg timeout defaultTimeout)
21602162
memberx.TryReceive(?timeout)= mailbox.TryReceive(timeout=defaultArg timeout defaultTimeout)

‎tests/FSharp.Core.UnitTests/FSharp.Core/Microsoft.FSharp.Control/Cancellation.fs‎

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,3 +306,5 @@ type CancellationType() =
306306
Assert.IsFalse((r1a<> r1a'))
307307
Assert.IsTrue((r1a<> r1b))
308308
Assert.IsTrue((r1a<> r2))
309+
310+

‎tests/FSharp.Core.UnitTests/FSharp.Core/Microsoft.FSharp.Control/MailboxProcessorType.fs‎

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,3 +292,43 @@ type MailboxProcessorType() =
292292

293293
test()
294294

295+
[<Test>]
296+
memberthis.PostAndAsyncReply_Cancellation()=
297+
298+
use cancel=new CancellationTokenSource(500)
299+
let mutablegotGood=false
300+
let mutablegotBad=false
301+
302+
letgoodAsync=async{
303+
try
304+
for iin Seq.initInfinite(fun i-> i)do
305+
if i%10000000=0then
306+
printfn"good async working..."
307+
finally
308+
printfn"good async exited - that's what we want"
309+
gotGood<-true
310+
}
311+
312+
letbadAsync(mbox:MailboxProcessor<AsyncReplyChannel<int>>)=async{
313+
try
314+
printfn"bad async working..."
315+
let!result= mbox.PostAndAsyncReply id// <- got stuck in here forever
316+
printfn"%d" result
317+
finally
318+
printfn"bad async exited - that's what we want"// <- we never got here
319+
gotBad<-true
320+
}
321+
322+
letmbox= MailboxProcessor.Start(fun inbox->async{
323+
let!(reply:AsyncReplyChannel<int>)= inbox.Receive()
324+
do! Async.Sleep1000000
325+
reply.Reply(200)
326+
}, cancel.Token)
327+
328+
[goodAsync; badAsync mbox]
329+
|> Async.Parallel
330+
|> Async.Ignore
331+
|>fun x-> Async.Start(x, cancel.Token)
332+
System.Threading.Thread.Sleep(1000)// cancellation after 500
333+
ifnot gotGood||not gotBadthen
334+
failwith"Exected both good and bad async's to be cancelled afteMailbox should not fail!"

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp