|
1 | 1 | importunittest |
| 2 | +fromunittestimportmock |
2 | 3 | fromtestimportsupport |
3 | 4 | fromtest.supportimport ( |
4 | 5 | is_apple,os_helper,refleak_helper,socket_helper,threading_helper |
@@ -179,6 +180,17 @@ def _have_socket_bluetooth(): |
179 | 180 | returnTrue |
180 | 181 |
|
181 | 182 |
|
| 183 | +def_have_socket_bluetooth_l2cap(): |
| 184 | +"""Check whether BTPROTO_L2CAP sockets are supported on this host.""" |
| 185 | +try: |
| 186 | +s=socket.socket(socket.AF_BLUETOOTH,socket.SOCK_SEQPACKET,socket.BTPROTO_L2CAP) |
| 187 | +except (AttributeError,OSError): |
| 188 | +returnFalse |
| 189 | +else: |
| 190 | +s.close() |
| 191 | +returnTrue |
| 192 | + |
| 193 | + |
182 | 194 | def_have_socket_hyperv(): |
183 | 195 | """Check whether AF_HYPERV sockets are supported on this host.""" |
184 | 196 | try: |
@@ -239,6 +251,8 @@ def downgrade_malformed_data_warning(): |
239 | 251 |
|
240 | 252 | HAVE_SOCKET_BLUETOOTH=_have_socket_bluetooth() |
241 | 253 |
|
| 254 | +HAVE_SOCKET_BLUETOOTH_L2CAP=_have_socket_bluetooth_l2cap() |
| 255 | + |
242 | 256 | HAVE_SOCKET_HYPERV=_have_socket_hyperv() |
243 | 257 |
|
244 | 258 | # Size in bytes of the int type |
@@ -2622,6 +2636,116 @@ def testCreateScoSocket(self): |
2622 | 2636 | withsocket.socket(socket.AF_BLUETOOTH,socket.SOCK_SEQPACKET,socket.BTPROTO_SCO)ass: |
2623 | 2637 | pass |
2624 | 2638 |
|
| 2639 | +@unittest.skipUnless(HAVE_SOCKET_BLUETOOTH_L2CAP,'Bluetooth L2CAP sockets required for this test') |
| 2640 | +deftestBindBrEdrL2capSocket(self): |
| 2641 | +withsocket.socket(socket.AF_BLUETOOTH,socket.SOCK_SEQPACKET,socket.BTPROTO_L2CAP)asf: |
| 2642 | +# First user PSM in BR/EDR L2CAP |
| 2643 | +psm=0x1001 |
| 2644 | +f.bind((socket.BDADDR_ANY,psm)) |
| 2645 | +addr=f.getsockname() |
| 2646 | +self.assertEqual(addr, (socket.BDADDR_ANY,psm)) |
| 2647 | + |
| 2648 | +@unittest.skipUnless(HAVE_SOCKET_BLUETOOTH_L2CAP,'Bluetooth L2CAP sockets required for this test') |
| 2649 | +deftestBadL2capAddr(self): |
| 2650 | +withsocket.socket(socket.AF_BLUETOOTH,socket.SOCK_SEQPACKET,socket.BTPROTO_L2CAP)asf: |
| 2651 | +withself.assertRaises(OSError): |
| 2652 | +f.bind((socket.BDADDR_ANY,0,0)) |
| 2653 | +withself.assertRaises(OSError): |
| 2654 | +f.bind((socket.BDADDR_ANY,)) |
| 2655 | +withself.assertRaises(OSError): |
| 2656 | +f.bind(socket.BDADDR_ANY) |
| 2657 | +withself.assertRaises(OSError): |
| 2658 | +f.bind((socket.BDADDR_ANY.encode(),0x1001)) |
| 2659 | + |
| 2660 | +deftestBindRfcommSocket(self): |
| 2661 | +withsocket.socket(socket.AF_BLUETOOTH,socket.SOCK_STREAM,socket.BTPROTO_RFCOMM)ass: |
| 2662 | +channel=0 |
| 2663 | +try: |
| 2664 | +s.bind((socket.BDADDR_ANY,channel)) |
| 2665 | +exceptOSErroraserr: |
| 2666 | +ifsys.platform=='win32'anderr.winerror==10050: |
| 2667 | +self.skipTest(str(err)) |
| 2668 | +raise |
| 2669 | +addr=s.getsockname() |
| 2670 | +self.assertEqual(addr, (mock.ANY,channel)) |
| 2671 | +self.assertRegex(addr[0],r'(?i)[0-9a-f]{2}(?::[0-9a-f]{2}){4}') |
| 2672 | +ifsys.platform!='win32': |
| 2673 | +self.assertEqual(addr, (socket.BDADDR_ANY,channel)) |
| 2674 | +withsocket.socket(socket.AF_BLUETOOTH,socket.SOCK_STREAM,socket.BTPROTO_RFCOMM)ass: |
| 2675 | +s.bind(addr) |
| 2676 | +addr2=s.getsockname() |
| 2677 | +self.assertEqual(addr2,addr) |
| 2678 | + |
| 2679 | +deftestBadRfcommAddr(self): |
| 2680 | +withsocket.socket(socket.AF_BLUETOOTH,socket.SOCK_STREAM,socket.BTPROTO_RFCOMM)ass: |
| 2681 | +channel=0 |
| 2682 | +withself.assertRaises(OSError): |
| 2683 | +s.bind((socket.BDADDR_ANY.encode(),channel)) |
| 2684 | +withself.assertRaises(OSError): |
| 2685 | +s.bind((socket.BDADDR_ANY,)) |
| 2686 | +withself.assertRaises(OSError): |
| 2687 | +s.bind((socket.BDADDR_ANY,channel,0)) |
| 2688 | +withself.assertRaises(OSError): |
| 2689 | +s.bind((socket.BDADDR_ANY+'\0',channel)) |
| 2690 | +withself.assertRaises(OSError): |
| 2691 | +s.bind(('invalid',channel)) |
| 2692 | + |
| 2693 | +@unittest.skipUnless(hasattr(socket,'BTPROTO_HCI'),'Bluetooth HCI sockets required for this test') |
| 2694 | +deftestBindHciSocket(self): |
| 2695 | +withsocket.socket(socket.AF_BLUETOOTH,socket.SOCK_RAW,socket.BTPROTO_HCI)ass: |
| 2696 | +ifsys.platform.startswith(('netbsd','dragonfly','freebsd')): |
| 2697 | +s.bind(socket.BDADDR_ANY.encode()) |
| 2698 | +addr=s.getsockname() |
| 2699 | +self.assertEqual(addr,socket.BDADDR_ANY) |
| 2700 | +else: |
| 2701 | +dev=0 |
| 2702 | +s.bind((dev,)) |
| 2703 | +addr=s.getsockname() |
| 2704 | +self.assertEqual(addr,dev) |
| 2705 | + |
| 2706 | +@unittest.skipUnless(hasattr(socket,'BTPROTO_HCI'),'Bluetooth HCI sockets required for this test') |
| 2707 | +deftestBadHciAddr(self): |
| 2708 | +withsocket.socket(socket.AF_BLUETOOTH,socket.SOCK_RAW,socket.BTPROTO_HCI)ass: |
| 2709 | +ifsys.platform.startswith(('netbsd','dragonfly','freebsd')): |
| 2710 | +withself.assertRaises(OSError): |
| 2711 | +s.bind(socket.BDADDR_ANY) |
| 2712 | +withself.assertRaises(OSError): |
| 2713 | +s.bind((socket.BDADDR_ANY.encode(),)) |
| 2714 | +ifsys.platform.startswith('freebsd'): |
| 2715 | +withself.assertRaises(ValueError): |
| 2716 | +s.bind(socket.BDADDR_ANY.encode()+b'\0') |
| 2717 | +withself.assertRaises(ValueError): |
| 2718 | +s.bind(socket.BDADDR_ANY.encode()+b' '*100) |
| 2719 | +withself.assertRaises(OSError): |
| 2720 | +s.bind(b'invalid') |
| 2721 | +else: |
| 2722 | +dev=0 |
| 2723 | +withself.assertRaises(OSError): |
| 2724 | +s.bind(()) |
| 2725 | +withself.assertRaises(OSError): |
| 2726 | +s.bind((dev,0)) |
| 2727 | +withself.assertRaises(OSError): |
| 2728 | +s.bind(dev) |
| 2729 | +withself.assertRaises(OSError): |
| 2730 | +s.bind(socket.BDADDR_ANY.encode()) |
| 2731 | + |
| 2732 | +@unittest.skipUnless(hasattr(socket,'BTPROTO_SCO'),'Bluetooth SCO sockets required for this test') |
| 2733 | +deftestBindScoSocket(self): |
| 2734 | +withsocket.socket(socket.AF_BLUETOOTH,socket.SOCK_SEQPACKET,socket.BTPROTO_SCO)ass: |
| 2735 | +s.bind(socket.BDADDR_ANY.encode()) |
| 2736 | +addr=s.getsockname() |
| 2737 | +self.assertEqual(addr,socket.BDADDR_ANY) |
| 2738 | + |
| 2739 | +@unittest.skipUnless(hasattr(socket,'BTPROTO_SCO'),'Bluetooth SCO sockets required for this test') |
| 2740 | +deftestBadScoAddr(self): |
| 2741 | +withsocket.socket(socket.AF_BLUETOOTH,socket.SOCK_SEQPACKET,socket.BTPROTO_SCO)ass: |
| 2742 | +withself.assertRaises(OSError): |
| 2743 | +s.bind(socket.BDADDR_ANY) |
| 2744 | +withself.assertRaises(OSError): |
| 2745 | +s.bind((socket.BDADDR_ANY.encode(),)) |
| 2746 | +withself.assertRaises(OSError): |
| 2747 | +s.bind(b'invalid') |
| 2748 | + |
2625 | 2749 |
|
2626 | 2750 | @unittest.skipUnless(HAVE_SOCKET_HYPERV, |
2627 | 2751 | 'Hyper-V sockets required for this test.') |
|