@@ -81,7 +81,7 @@ impl OverlappedResult {
81
81
82
82
/// Convert the OverlappedResult into a Result<...> indicating success or failure of the referent operation.
83
83
pub fn ok ( & self ) ->Result < ( ) , Fail > {
84
- let win32_error: WIN32_ERROR =translate_ntstatus ( self . result ) ;
84
+ let win32_error =translate_ntstatus ( self . result ) ;
85
85
if win32_error. is_ok ( ) {
86
86
Ok ( ( ) )
87
87
} else {
@@ -93,7 +93,7 @@ impl OverlappedResult {
93
93
impl < S : Unpin > IoCompletionPort < S > {
94
94
/// Create a new I/O completion port.
95
95
pub fn new ( ) ->Result < IoCompletionPort < S > , Fail > {
96
- let iocp: HANDLE =match unsafe { CreateIoCompletionPort ( INVALID_HANDLE_VALUE , None , 0 , 1 ) } {
96
+ let iocp =match unsafe { CreateIoCompletionPort ( INVALID_HANDLE_VALUE , None , 0 , 1 ) } {
97
97
Ok ( handle) => handle,
98
98
Err ( err) =>return Err ( err. into ( ) ) ,
99
99
} ;
@@ -150,7 +150,7 @@ impl<S: Unpin> IoCompletionPort<S> {
150
150
for < ' a > F2 : FnOnce ( Pin < & ' a mut S > , OverlappedResult ) ->Result < R , Fail > ,
151
151
{
152
152
// Allocate a new Overlapped completion in the pin slab.
153
- let pinslab_index: usize =match self . ops . insert ( OverlappedCompletion :: new ( state) ) {
153
+ let pinslab_index =match self . ops . insert ( OverlappedCompletion :: new ( state) ) {
154
154
Some ( index) => index,
155
155
None =>{
156
156
return Err ( Fail :: new (
@@ -160,21 +160,20 @@ impl<S: Unpin> IoCompletionPort<S> {
160
160
} ,
161
161
} ;
162
162
// Grab a reference to the new completion in the pin slab.
163
- let mut pinned_completion: Pin < & mut OverlappedCompletion < S > > =
164
- expect_some ! ( self . ops. get_pin_mut( pinslab_index) , "Just inserted this" ) ;
163
+ let mut pinned_completion =expect_some ! ( self . ops. get_pin_mut( pinslab_index) , "Just inserted this" ) ;
165
164
166
165
// Set the pinslab index so the I/O processor can remove it later if necessary.
167
166
pinned_completion. as_mut ( ) . set_pinslab_index ( pinslab_index) ;
168
167
169
- let overlapped: * mut OVERLAPPED = pinned_completion. as_mut ( ) . marshal ( ) ;
170
- let result: Result < R , Fail > =match start ( pinned_completion. as_mut ( ) . get_state ( ) , overlapped) {
168
+ let overlapped = pinned_completion. as_mut ( ) . marshal ( ) ;
169
+ let result =match start ( pinned_completion. as_mut ( ) . get_state ( ) , overlapped) {
171
170
// Operation in progress, pending overlapped completion.
172
171
Ok ( ( ) ) =>{
173
172
while let Some ( mut cv) = pinned_completion. as_ref ( ) . get_cv ( ) {
174
173
cv. wait ( ) . await ;
175
174
}
176
175
177
- let overlapped_result: OverlappedResult =OverlappedResult :: new (
176
+ let overlapped_result =OverlappedResult :: new (
178
177
pinned_completion. as_mut ( ) . overlapped ,
179
178
pinned_completion. as_mut ( ) . completion_key ,
180
179
) ;
@@ -219,10 +218,10 @@ impl<S: Unpin> IoCompletionPort<S> {
219
218
pub fn process_events ( & mut self ) ->Result < ( ) , Fail > {
220
219
// Arbitrarily chosen batch size; should be updated after tuning.
221
220
const BATCH_SIZE : usize =4 ;
222
- let mut entries: [ OVERLAPPED_ENTRY ; BATCH_SIZE ] =[ OVERLAPPED_ENTRY :: default ( ) ; BATCH_SIZE ] ;
221
+ let mut entries =[ OVERLAPPED_ENTRY :: default ( ) ; BATCH_SIZE ] ;
223
222
224
223
loop {
225
- let mut dequeued: u32 =0 ;
224
+ let mut dequeued =0 ;
226
225
match unsafe { GetQueuedCompletionStatusEx ( self . iocp , entries. as_mut_slice ( ) , & mut dequeued, 0 , FALSE ) } {
227
226
Ok ( ( ) ) =>{
228
227
for iin 0 ..dequeued{
@@ -247,7 +246,7 @@ impl<S: Unpin> IoCompletionPort<S> {
247
246
248
247
impl < S : Unpin > OverlappedCompletion < S > {
249
248
pub fn new ( state : S ) ->Self {
250
- let cv: SharedConditionVariable =SharedConditionVariable :: default ( ) ;
249
+ let cv =SharedConditionVariable :: default ( ) ;
251
250
252
251
Self {
253
252
overlapped : OVERLAPPED :: default ( ) ,
@@ -313,7 +312,7 @@ mod tests {
313
312
use crate :: {
314
313
ensure_eq,
315
314
runtime:: { conditional_yield_with_timeout, SharedDemiRuntime } ,
316
- OperationResult , QDesc , QToken ,
315
+ OperationResult , QDesc ,
317
316
} ;
318
317
use futures:: pin_mut;
319
318
use std:: {
@@ -425,7 +424,7 @@ mod tests {
425
424
426
425
#[ test]
427
426
fn test_marshal_unmarshal ( ) ->Result < ( ) > {
428
- let mut completion: Pin < Box < OverlappedCompletion < ( ) > > > =Box :: pin ( OverlappedCompletion :: new ( ( ) ) ) ;
427
+ let mut completion =Box :: pin ( OverlappedCompletion :: new ( ( ) ) ) ;
429
428
430
429
// Ensure that the marshal returns the address of the overlapped member.
431
430
ensure_eq ! (
@@ -445,8 +444,8 @@ mod tests {
445
444
( completion. as_ref( ) . get_ref( ) as * const OverlappedCompletion <( ) >) as usize
446
445
) ;
447
446
448
- let overlapped_ptr: NonNull < OVERLAPPED > =NonNull :: new ( completion. as_mut ( ) . marshal ( ) ) . unwrap ( ) ;
449
- let unmarshalled: Pin < & mut OverlappedCompletion < ( ) > > =OverlappedCompletion :: unmarshal ( overlapped_ptr) ;
447
+ let overlapped_ptr =NonNull :: new ( completion. as_mut ( ) . marshal ( ) ) . unwrap ( ) ;
448
+ let unmarshalled =OverlappedCompletion :: unmarshal ( overlapped_ptr) ;
450
449
451
450
// Test that unmarshal returns an address which is the same as the OVERLAPPED, which is the same as the original
452
451
// OverlappedCompletionWithState. This implies that OVERLAPPED is at member offset 0 in all structs.
@@ -463,19 +462,19 @@ mod tests {
463
462
fn test_event_processor ( ) ->Result < ( ) > {
464
463
const COMPLETION_KEY : usize =123 ;
465
464
let mut iocp: IoCompletionPort < ( ) > =make_iocp ( ) ?;
466
- let overlapped: OverlappedCompletion < ( ) > =OverlappedCompletion :: new ( ( ) ) ;
467
- let mut cv: SharedConditionVariable = overlapped. condition_variable . clone ( ) . unwrap ( ) ;
465
+ let overlapped =OverlappedCompletion :: new ( ( ) ) ;
466
+ let mut cv = overlapped. condition_variable . clone ( ) . unwrap ( ) ;
468
467
pin_mut ! ( overlapped) ;
469
468
470
469
// Insert coroutine
471
- let mut runtime: SharedDemiRuntime =SharedDemiRuntime :: default ( ) ;
470
+ let mut runtime =SharedDemiRuntime :: default ( ) ;
472
471
let server =run_as_io_op ( async move {
473
472
cv. wait ( ) . await ;
474
473
Ok ( OperationResult :: Close )
475
474
} )
476
475
. fuse ( ) ;
477
476
478
- let server_task: QToken = runtime
477
+ let server_task = runtime
479
478
. insert_nonpolling_coroutine ( "ioc_server" , Box :: pin ( server) )
480
479
. unwrap ( ) ;
481
480
post_completion ( & iocp, overlapped. as_mut ( ) . marshal ( ) , COMPLETION_KEY ) ?;
@@ -503,7 +502,7 @@ mod tests {
503
502
const PIPE_NAME : PCSTR =s ! ( r"\\.\pipe\demikernel-test-pipe" ) ;
504
503
const BUFFER_SIZE : u32 =128 ;
505
504
const COMPLETION_KEY : usize =0xFEEDF00D ;
506
- let server_pipe: SafeHandle =SafeHandle ( unsafe {
505
+ let server_pipe =SafeHandle ( unsafe {
507
506
CreateNamedPipeA (
508
507
PIPE_NAME ,
509
508
PIPE_ACCESS_DUPLEX |FILE_FLAG_FIRST_PIPE_INSTANCE |FILE_FLAG_OVERLAPPED ,
@@ -515,11 +514,11 @@ mod tests {
515
514
None ,
516
515
)
517
516
} ?) ;
518
- let server_state: Rc < AtomicU32 > =Rc :: new ( AtomicU32 :: new ( 0 ) ) ;
519
- let server_state_view: Rc < AtomicU32 > = server_state. clone ( ) ;
520
- let mut iocp: UnsafeCell < IoCompletionPort < Rc < Vec < u8 > > > > =UnsafeCell :: new ( make_iocp ( ) . map_err ( anyhow_fail) ?) ;
517
+ let server_state =Rc :: new ( AtomicU32 :: new ( 0 ) ) ;
518
+ let server_state_view = server_state. clone ( ) ;
519
+ let mut iocp =UnsafeCell :: new ( make_iocp ( ) . map_err ( anyhow_fail) ?) ;
521
520
iocp. get_mut ( ) . associate_handle ( server_pipe. 0 , COMPLETION_KEY ) ?;
522
- let iocp_ref: & mut IoCompletionPort < Rc < Vec < u8 > > > =unsafe { & mut * iocp. get ( ) } ;
521
+ let iocp_ref =unsafe { & mut * iocp. get ( ) } ;
523
522
524
523
let server =Box :: pin (
525
524
run_as_io_op ( async move {
@@ -537,13 +536,12 @@ mod tests {
537
536
538
537
server_state. fetch_add ( 1 , Ordering :: Relaxed ) ;
539
538
540
- let mut buffer: Rc < Vec < u8 > > =
541
- Rc :: new ( iter:: repeat ( 0u8 ) . take ( BUFFER_SIZE as usize ) . collect :: < Vec < u8 > > ( ) ) ;
539
+ let mut buffer =Rc :: new ( iter:: repeat ( 0u8 ) . take ( BUFFER_SIZE as usize ) . collect :: < Vec < u8 > > ( ) ) ;
542
540
buffer =unsafe {
543
541
iocp_ref. do_io (
544
542
buffer,
545
543
|state : Pin < & mut Rc < Vec < u8 > > > , overlapped : * mut OVERLAPPED | ->Result < ( ) , Fail > {
546
- let vec: & mut Vec < u8 > =Rc :: get_mut ( state. get_mut ( ) ) . unwrap ( ) ;
544
+ let vec =Rc :: get_mut ( state. get_mut ( ) ) . unwrap ( ) ;
547
545
vec. resize ( BUFFER_SIZE as usize , 0u8 ) ;
548
546
is_overlapped_ok ( ReadFile (
549
547
server_pipe. 0 ,
@@ -572,10 +570,10 @@ mod tests {
572
570
}
573
571
. await ?;
574
572
575
- let message: & str = std:: str:: from_utf8 ( buffer. as_slice ( ) )
573
+ let message = std:: str:: from_utf8 ( buffer. as_slice ( ) )
576
574
. map_err ( |_|Fail :: new ( libc:: EINVAL , "utf8 conversion failed" ) ) ?;
577
575
if message !=MESSAGE {
578
- let err_msg: String =format ! ( "expected\" {}\" , got\" {}\" " , MESSAGE , message) ;
576
+ let err_msg =format ! ( "expected\" {}\" , got\" {}\" " , MESSAGE , message) ;
579
577
Err ( Fail :: new ( libc:: EINVAL , err_msg. as_str ( ) ) )
580
578
} else {
581
579
// Dummy result
@@ -585,8 +583,8 @@ mod tests {
585
583
. fuse ( ) ,
586
584
) ;
587
585
588
- let mut runtime: SharedDemiRuntime =SharedDemiRuntime :: default ( ) ;
589
- let server_task: QToken = runtime. insert_nonpolling_coroutine ( "ioc_server" , server) . unwrap ( ) ;
586
+ let mut runtime =SharedDemiRuntime :: default ( ) ;
587
+ let server_task = runtime. insert_nonpolling_coroutine ( "ioc_server" , server) . unwrap ( ) ;
590
588
591
589
let mut wait_for_state = |state| ->Result < ( ) , Fail > {
592
590
while server_state_view. load ( Ordering :: Relaxed ) < state{
@@ -604,7 +602,7 @@ mod tests {
604
602
605
603
wait_for_state ( 1 ) ?;
606
604
607
- let client_handle: SafeHandle =SafeHandle ( unsafe {
605
+ let client_handle =SafeHandle ( unsafe {
608
606
CreateFileA (
609
607
PIPE_NAME ,
610
608
GENERIC_WRITE . 0 ,
@@ -618,7 +616,7 @@ mod tests {
618
616
619
617
wait_for_state ( 2 ) ?;
620
618
621
- let mut bytes_written: u32 =0 ;
619
+ let mut bytes_written =0 ;
622
620
unsafe {
623
621
WriteFile (
624
622
client_handle. 0 ,
@@ -630,7 +628,7 @@ mod tests {
630
628
631
629
std:: mem:: drop ( client_handle) ;
632
630
633
- let result: OperationResult =loop {
631
+ let result =loop {
634
632
iocp. get_mut ( ) . process_events ( ) ?;
635
633
if let Ok ( ( _, result) ) = runtime. wait ( server_task, Duration :: ZERO ) {
636
634
break result;
@@ -650,7 +648,7 @@ mod tests {
650
648
const PIPE_NAME : PCSTR =s ! ( r"\\.\pipe\demikernel-test-cancel-pipe" ) ;
651
649
const BUFFER_SIZE : u32 =128 ;
652
650
const COMPLETION_KEY : usize =0xFEEDF00D ;
653
- let server_pipe: SafeHandle =SafeHandle ( unsafe {
651
+ let server_pipe =SafeHandle ( unsafe {
654
652
CreateNamedPipeA (
655
653
PIPE_NAME ,
656
654
PIPE_ACCESS_DUPLEX |FILE_FLAG_FIRST_PIPE_INSTANCE |FILE_FLAG_OVERLAPPED ,
@@ -662,11 +660,11 @@ mod tests {
662
660
None ,
663
661
)
664
662
} ?) ;
665
- let server_state: Rc < AtomicU32 > =Rc :: new ( AtomicU32 :: new ( 0 ) ) ;
666
- let server_state_view: Rc < AtomicU32 > = server_state. clone ( ) ;
667
- let mut iocp: UnsafeCell < IoCompletionPort < ( ) > > =UnsafeCell :: new ( make_iocp ( ) . map_err ( anyhow_fail) ?) ;
663
+ let server_state =Rc :: new ( AtomicU32 :: new ( 0 ) ) ;
664
+ let server_state_view = server_state. clone ( ) ;
665
+ let mut iocp =UnsafeCell :: new ( make_iocp ( ) . map_err ( anyhow_fail) ?) ;
668
666
iocp. get_mut ( ) . associate_handle ( server_pipe. 0 , COMPLETION_KEY ) ?;
669
- let iocp_ref: & mut IoCompletionPort < ( ) > =unsafe { & mut * iocp. get ( ) } ;
667
+ let iocp_ref =unsafe { & mut * iocp. get ( ) } ;
670
668
671
669
let server =run_as_io_op ( async move {
672
670
match conditional_yield_with_timeout (
@@ -690,8 +688,8 @@ mod tests {
690
688
} )
691
689
. fuse ( ) ;
692
690
693
- let mut runtime: SharedDemiRuntime =SharedDemiRuntime :: default ( ) ;
694
- let server_task: QToken = runtime
691
+ let mut runtime =SharedDemiRuntime :: default ( ) ;
692
+ let server_task = runtime
695
693
. insert_nonpolling_coroutine ( "ioc_server" , Box :: pin ( server) )
696
694
. unwrap ( ) ;
697
695
@@ -701,11 +699,11 @@ mod tests {
701
699
"server execution should have started"
702
700
) ;
703
701
704
- let iocp_ref: & mut IoCompletionPort < ( ) > =unsafe { & mut * iocp. get ( ) } ;
702
+ let iocp_ref =unsafe { & mut * iocp. get ( ) } ;
705
703
iocp_ref. process_events ( ) ?;
706
704
707
705
// Poll the runtime again, which
708
- let result: OperationResult =loop {
706
+ let result =loop {
709
707
// Move time forward, which should time out the operation.
710
708
runtime. advance_clock ( Instant :: now ( ) ) ;
711
709
iocp. get_mut ( ) . process_events ( ) ?;