27
27
//! }
28
28
//! ```
29
29
use crate :: {
30
- CanAddr , CanAnyFrame , CanFdFrame , CanFrame , Error , IoResult , Result , Socket , SocketOptions ,
30
+ frame :: AsPtr , CanAddr , CanAnyFrame , CanFrame , Error , IoResult , Result , Socket , SocketOptions ,
31
31
} ;
32
32
use futures:: { prelude:: * , ready, task:: Context } ;
33
33
use std:: {
@@ -187,9 +187,12 @@ pub type CanFdSocket = AsyncCanSocket<crate::CanFdSocket>;
187
187
188
188
impl CanFdSocket {
189
189
/// Write a CAN FD frame to the socket asynchronously
190
- pub async fn write_frame ( & self , frame : CanFdFrame ) ->IoResult < ( ) > {
190
+ pub async fn write_frame < F > ( & self , frame : & F ) ->IoResult < ( ) >
191
+ where
192
+ F : Into < CanAnyFrame > +AsPtr ,
193
+ {
191
194
self . 0
192
- . async_io ( Interest :: WRITABLE , |inner| inner. write_frame ( & frame) )
195
+ . async_io ( Interest :: WRITABLE , |inner| inner. write_frame ( frame) )
193
196
. await
194
197
}
195
198
@@ -215,7 +218,7 @@ impl Stream for CanFdSocket {
215
218
}
216
219
}
217
220
218
- impl Sink < CanFdFrame > for CanFdSocket {
221
+ impl Sink < CanAnyFrame > for CanFdSocket {
219
222
type Error =Error ;
220
223
221
224
fn poll_ready ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) ->Poll < Result < ( ) > > {
@@ -233,7 +236,7 @@ impl Sink<CanFdFrame> for CanFdSocket {
233
236
Poll :: Ready ( Ok ( ( ) ) )
234
237
}
235
238
236
- fn start_send ( self : Pin < & mut Self > , item : CanFdFrame ) ->Result < ( ) > {
239
+ fn start_send ( self : Pin < & mut Self > , item : CanAnyFrame ) ->Result < ( ) > {
237
240
self . 0 . get_ref ( ) . write_frame_insist ( & item) ?;
238
241
Ok ( ( ) )
239
242
}
@@ -294,7 +297,7 @@ mod tests {
294
297
use super :: * ;
295
298
use crate :: {
296
299
frame:: { can_frame_default, AsPtr } ,
297
- CanFrame , Frame , IoErrorKind , StandardId ,
300
+ CanFdFrame , CanFrame , Frame , IoErrorKind , StandardId ,
298
301
} ;
299
302
use embedded_can:: Frame as EmbeddedFrame ;
300
303
use futures:: { select, try_join} ;
@@ -369,11 +372,18 @@ mod tests {
369
372
)
370
373
}
371
374
372
- /// Write a test frame to the CanSocket
373
- async fn write_frame_fd ( socket : & CanFdSocket ) ->Result < ( ) > {
375
+ /// Write a testCANFD frame to the CanSocket
376
+ async fn write_frame_fd_canfd ( socket : & CanFdSocket ) ->Result < ( ) > {
374
377
let test_frame =
375
378
CanFdFrame :: new ( StandardId :: new ( 0x1 ) . unwrap ( ) , & [ 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 ] ) . unwrap ( ) ;
376
- socket. write_frame ( test_frame) . await ?;
379
+ socket. write_frame ( & test_frame) . await ?;
380
+ Ok ( ( ) )
381
+ }
382
+
383
+ /// Write a test CAN frame to the CanSocket
384
+ async fn write_frame_fd_can ( socket : & CanFdSocket ) ->Result < ( ) > {
385
+ let test_frame =CanFrame :: new ( StandardId :: new ( 0x1 ) . unwrap ( ) , & [ 0 ] ) . unwrap ( ) ;
386
+ socket. write_frame ( & test_frame) . await ?;
377
387
Ok ( ( ) )
378
388
}
379
389
@@ -442,11 +452,34 @@ mod tests {
442
452
443
453
#[ serial]
444
454
#[ tokio:: test]
445
- async fn test_receive_can_fd ( ) ->Result < ( ) > {
455
+ async fn test_receive_can_fd_canfd ( ) ->Result < ( ) > {
456
+ let socket1 =CanFdSocket :: open ( "vcan0" ) . unwrap ( ) ;
457
+ let socket2 =CanFdSocket :: open ( "vcan0" ) . unwrap ( ) ;
458
+
459
+ let send_frames = future:: try_join (
460
+ write_frame_fd_canfd ( & socket1) ,
461
+ write_frame_fd_canfd ( & socket1) ,
462
+ ) ;
463
+
464
+ let recv_frames =async {
465
+ let socket2 =recv_frame_fd ( socket2) . await ?;
466
+ let _socket2 =recv_frame_fd ( socket2) . await ;
467
+ Ok ( ( ) )
468
+ } ;
469
+
470
+ try_join ! ( recv_frames, send_frames) ?;
471
+
472
+ Ok ( ( ) )
473
+ }
474
+
475
+ #[ serial]
476
+ #[ tokio:: test]
477
+ async fn test_receive_can_fd_can ( ) ->Result < ( ) > {
446
478
let socket1 =CanFdSocket :: open ( "vcan0" ) . unwrap ( ) ;
447
479
let socket2 =CanFdSocket :: open ( "vcan0" ) . unwrap ( ) ;
448
480
449
- let send_frames = future:: try_join ( write_frame_fd ( & socket1) , write_frame_fd ( & socket1) ) ;
481
+ let send_frames =
482
+ future:: try_join ( write_frame_fd_can ( & socket1) , write_frame_fd_can ( & socket1) ) ;
450
483
451
484
let recv_frames =async {
452
485
let socket2 =recv_frame_fd ( socket2) . await ?;
@@ -461,11 +494,34 @@ mod tests {
461
494
462
495
#[ serial]
463
496
#[ tokio:: test]
464
- async fn test_receive_can_fd_with_stream ( ) ->Result < ( ) > {
497
+ async fn test_receive_can_fd_canfd_with_stream ( ) ->Result < ( ) > {
465
498
let socket1 =CanFdSocket :: open ( "vcan0" ) . unwrap ( ) ;
466
499
let socket2 =CanFdSocket :: open ( "vcan0" ) . unwrap ( ) ;
467
500
468
- let send_frames = future:: try_join ( write_frame_fd ( & socket1) , write_frame_fd ( & socket1) ) ;
501
+ let send_frames = future:: try_join (
502
+ write_frame_fd_canfd ( & socket1) ,
503
+ write_frame_fd_canfd ( & socket1) ,
504
+ ) ;
505
+
506
+ let recv_frames =async {
507
+ let socket2 =recv_frame_fd_with_stream ( socket2) . await ?;
508
+ let _socket2 =recv_frame_fd_with_stream ( socket2) . await ;
509
+ Ok ( ( ) )
510
+ } ;
511
+
512
+ try_join ! ( recv_frames, send_frames) ?;
513
+
514
+ Ok ( ( ) )
515
+ }
516
+
517
+ #[ serial]
518
+ #[ tokio:: test]
519
+ async fn test_receive_can_fd_can_with_stream ( ) ->Result < ( ) > {
520
+ let socket1 =CanFdSocket :: open ( "vcan0" ) . unwrap ( ) ;
521
+ let socket2 =CanFdSocket :: open ( "vcan0" ) . unwrap ( ) ;
522
+
523
+ let send_frames =
524
+ future:: try_join ( write_frame_fd_can ( & socket1) , write_frame_fd_can ( & socket1) ) ;
469
525
470
526
let recv_frames =async {
471
527
let socket2 =recv_frame_fd_with_stream ( socket2) . await ?;
@@ -532,7 +588,7 @@ mod tests {
532
588
533
589
#[ serial]
534
590
#[ tokio:: test]
535
- async fn test_sink_stream_fd ( ) ->Result < ( ) > {
591
+ async fn test_sink_stream_fd_canfd ( ) ->Result < ( ) > {
536
592
let socket1 =CanFdSocket :: open ( "vcan0" ) . unwrap ( ) ;
537
593
let socket2 =CanFdSocket :: open ( "vcan0" ) . unwrap ( ) ;
538
594
@@ -555,9 +611,49 @@ mod tests {
555
611
. fold ( 0u8 , |acc, _frame|async move { acc +1 } ) ;
556
612
557
613
let send_frames =async {
558
- let _frame_1 = sink. send ( frame_id_1) . await ?;
559
- let _frame_2 = sink. send ( frame_id_2) . await ?;
560
- let _frame_3 = sink. send ( frame_id_3) . await ?;
614
+ let _frame_1 = sink. send ( frame_id_1. into ( ) ) . await ?;
615
+ let _frame_2 = sink. send ( frame_id_2. into ( ) ) . await ?;
616
+ let _frame_3 = sink. send ( frame_id_3. into ( ) ) . await ?;
617
+ println ! ( "Sent 3 frames" ) ;
618
+ Ok :: < ( ) , Error > ( ( ) )
619
+ } ;
620
+
621
+ let ( x, frame_send_r) = future:: join ( count_ids_less_than_3, send_frames) . await ;
622
+ frame_send_r?;
623
+
624
+ assert_eq ! ( x, 2 ) ;
625
+
626
+ Ok ( ( ) )
627
+ }
628
+
629
+ #[ serial]
630
+ #[ tokio:: test]
631
+ async fn test_sink_stream_fd_can ( ) ->Result < ( ) > {
632
+ let socket1 =CanFdSocket :: open ( "vcan0" ) . unwrap ( ) ;
633
+ let socket2 =CanFdSocket :: open ( "vcan0" ) . unwrap ( ) ;
634
+
635
+ let frame_id_1 =CanFrame :: from_raw_id ( 0x01 , & [ 0u8 ] ) . unwrap ( ) ;
636
+ let frame_id_2 =CanFrame :: from_raw_id ( 0x02 , & [ 0u8 ] ) . unwrap ( ) ;
637
+ let frame_id_3 =CanFrame :: from_raw_id ( 0x03 , & [ 0u8 ] ) . unwrap ( ) ;
638
+
639
+ let ( mut sink, _stream) = socket1. split ( ) ;
640
+ let ( _sink, stream) = socket2. split ( ) ;
641
+
642
+ let count_ids_less_than_3 = stream
643
+ . map ( |x| x. unwrap ( ) )
644
+ . take_while ( |frame|{
645
+ if let CanAnyFrame :: Normal ( frame) = frame{
646
+ future:: ready ( frame. raw_id ( ) <3 )
647
+ } else {
648
+ future:: ready ( false )
649
+ }
650
+ } )
651
+ . fold ( 0u8 , |acc, _frame|async move { acc +1 } ) ;
652
+
653
+ let send_frames =async {
654
+ let _frame_1 = sink. send ( frame_id_1. into ( ) ) . await ?;
655
+ let _frame_2 = sink. send ( frame_id_2. into ( ) ) . await ?;
656
+ let _frame_3 = sink. send ( frame_id_3. into ( ) ) . await ?;
561
657
println ! ( "Sent 3 frames" ) ;
562
658
Ok :: < ( ) , Error > ( ( ) )
563
659
} ;