|
1 | 1 | # XXX TypeErrors on calling handlers, or on bad return values from a |
2 | 2 | # handler, are obscure and unhelpful. |
3 | 3 |
|
| 4 | +importabc |
| 5 | +importfunctools |
4 | 6 | importos |
5 | 7 | importplatform |
| 8 | +importre |
6 | 9 | importsys |
7 | 10 | importsysconfig |
| 11 | +importtextwrap |
8 | 12 | importunittest |
9 | 13 | importtraceback |
10 | 14 | fromioimportBytesIO |
11 | 15 | fromtestimportsupport |
12 | | -fromtest.supportimportos_helper |
| 16 | +fromtest.supportimportimport_helper,os_helper |
13 | 17 |
|
14 | 18 | fromxml.parsersimportexpat |
15 | 19 | fromxml.parsers.expatimporterrors |
@@ -848,5 +852,199 @@ def start_element(name, _): |
848 | 852 | self.assertEqual(started, ['doc']) |
849 | 853 |
|
850 | 854 |
|
| 855 | +classAttackProtectionTestBase(abc.ABC): |
| 856 | +""" |
| 857 | + Base class for testing protections against XML payloads with |
| 858 | + disproportionate amplification. |
| 859 | +
|
| 860 | + The protections being tested should detect and prevent attacks |
| 861 | + that leverage disproportionate amplification from small inputs. |
| 862 | + """ |
| 863 | + |
| 864 | +@staticmethod |
| 865 | +defexponential_expansion_payload(*,nrows,ncols,text='.'): |
| 866 | +"""Create a billion laughs attack payload. |
| 867 | +
|
| 868 | + Be careful: the number of total items is pow(n, k), thereby |
| 869 | + requiring at least pow(ncols, nrows) * sizeof(text) memory! |
| 870 | + """ |
| 871 | +template=textwrap.dedent(f"""\ |
| 872 | + <?xml version="1.0"?> |
| 873 | + <!DOCTYPE doc [ |
| 874 | + <!ENTITY row0 "{text}"> |
| 875 | + <!ELEMENT doc (#PCDATA)> |
| 876 | + {{body}} |
| 877 | + ]> |
| 878 | + <doc>&row{nrows};</doc> |
| 879 | + """).rstrip() |
| 880 | + |
| 881 | +body='\n'.join( |
| 882 | +f'<!ENTITY row{i+1} "{f"&row{i};"*ncols}">' |
| 883 | +foriinrange(nrows) |
| 884 | + ) |
| 885 | +body=textwrap.indent(body,' '*4) |
| 886 | +returntemplate.format(body=body) |
| 887 | + |
| 888 | +deftest_payload_generation(self): |
| 889 | +# self-test for exponential_expansion_payload() |
| 890 | +payload=self.exponential_expansion_payload(nrows=2,ncols=3) |
| 891 | +self.assertEqual(payload,textwrap.dedent("""\ |
| 892 | + <?xml version="1.0"?> |
| 893 | + <!DOCTYPE doc [ |
| 894 | + <!ENTITY row0 "."> |
| 895 | + <!ELEMENT doc (#PCDATA)> |
| 896 | + <!ENTITY row1 "&row0;&row0;&row0;"> |
| 897 | + <!ENTITY row2 "&row1;&row1;&row1;"> |
| 898 | + ]> |
| 899 | + <doc>&row2;</doc> |
| 900 | + """).rstrip()) |
| 901 | + |
| 902 | +defassert_root_parser_failure(self,func,/,*args,**kwargs): |
| 903 | +"""Check that func(*args, **kwargs) is invalid for a sub-parser.""" |
| 904 | +msg="parser must be a root parser" |
| 905 | +self.assertRaisesRegex(expat.ExpatError,msg,func,*args,**kwargs) |
| 906 | + |
| 907 | +@abc.abstractmethod |
| 908 | +defassert_rejected(self,func,/,*args,**kwargs): |
| 909 | +"""Assert that func(*args, **kwargs) triggers the attack protection. |
| 910 | +
|
| 911 | + Note: this method must ensure that the attack protection being tested |
| 912 | + is the one that is actually triggered at runtime, e.g., by matching |
| 913 | + the exact error message. |
| 914 | + """ |
| 915 | + |
| 916 | +@abc.abstractmethod |
| 917 | +defset_activation_threshold(self,parser,threshold): |
| 918 | +"""Set the activation threshold for the tested protection.""" |
| 919 | + |
| 920 | +@abc.abstractmethod |
| 921 | +defset_maximum_amplification(self,parser,max_factor): |
| 922 | +"""Set the maximum amplification factor for the tested protection.""" |
| 923 | + |
| 924 | +@abc.abstractmethod |
| 925 | +deftest_set_activation_threshold__threshold_reached(self): |
| 926 | +"""Test when the activation threshold is exceeded.""" |
| 927 | + |
| 928 | +@abc.abstractmethod |
| 929 | +deftest_set_activation_threshold__threshold_not_reached(self): |
| 930 | +"""Test when the activation threshold is not exceeded.""" |
| 931 | + |
| 932 | +deftest_set_activation_threshold__invalid_threshold_type(self): |
| 933 | +parser=expat.ParserCreate() |
| 934 | +setter=functools.partial(self.set_activation_threshold,parser) |
| 935 | + |
| 936 | +self.assertRaises(TypeError,setter,1.0) |
| 937 | +self.assertRaises(TypeError,setter,-1.5) |
| 938 | +self.assertRaises(ValueError,setter,-5) |
| 939 | + |
| 940 | +deftest_set_activation_threshold__invalid_threshold_range(self): |
| 941 | +_testcapi=import_helper.import_module("_testcapi") |
| 942 | +parser=expat.ParserCreate() |
| 943 | +setter=functools.partial(self.set_activation_threshold,parser) |
| 944 | + |
| 945 | +self.assertRaises(OverflowError,setter,_testcapi.ULLONG_MAX+1) |
| 946 | + |
| 947 | +deftest_set_activation_threshold__fail_for_subparser(self): |
| 948 | +parser=expat.ParserCreate() |
| 949 | +subparser=parser.ExternalEntityParserCreate(None) |
| 950 | +setter=functools.partial(self.set_activation_threshold,subparser) |
| 951 | +self.assert_root_parser_failure(setter,12345) |
| 952 | + |
| 953 | +@abc.abstractmethod |
| 954 | +deftest_set_maximum_amplification__amplification_exceeded(self): |
| 955 | +"""Test when the amplification factor is exceeded.""" |
| 956 | + |
| 957 | +@abc.abstractmethod |
| 958 | +deftest_set_maximum_amplification__amplification_not_exceeded(self): |
| 959 | +"""Test when the amplification factor is not exceeded.""" |
| 960 | + |
| 961 | +deftest_set_maximum_amplification__infinity(self): |
| 962 | +inf=float('inf')# an 'inf' threshold is allowed by Expat |
| 963 | +parser=expat.ParserCreate() |
| 964 | +self.assertIsNone(self.set_maximum_amplification(parser,inf)) |
| 965 | + |
| 966 | +deftest_set_maximum_amplification__invalid_max_factor_type(self): |
| 967 | +parser=expat.ParserCreate() |
| 968 | +setter=functools.partial(self.set_maximum_amplification,parser) |
| 969 | + |
| 970 | +self.assertRaises(TypeError,setter,None) |
| 971 | +self.assertRaises(TypeError,setter,'abc') |
| 972 | + |
| 973 | +deftest_set_maximum_amplification__invalid_max_factor_range(self): |
| 974 | +parser=expat.ParserCreate() |
| 975 | +setter=functools.partial(self.set_maximum_amplification,parser) |
| 976 | + |
| 977 | +msg=re.escape("'max_factor' must be at least 1.0") |
| 978 | +self.assertRaisesRegex(expat.ExpatError,msg,setter,float('nan')) |
| 979 | +self.assertRaisesRegex(expat.ExpatError,msg,setter,0.99) |
| 980 | + |
| 981 | +deftest_set_maximum_amplification__fail_for_subparser(self): |
| 982 | +parser=expat.ParserCreate() |
| 983 | +subparser=parser.ExternalEntityParserCreate(None) |
| 984 | +setter=functools.partial(self.set_maximum_amplification,subparser) |
| 985 | +self.assert_root_parser_failure(setter,123.45) |
| 986 | + |
| 987 | + |
| 988 | +@unittest.skipIf(expat.version_info< (2,7,2),"requires Expat >= 2.7.2") |
| 989 | +classMemoryProtectionTest(AttackProtectionTestBase,unittest.TestCase): |
| 990 | + |
| 991 | +# NOTE: with the default Expat configuration, the billion laughs protection |
| 992 | +# may hit before the allocation limiter if exponential_expansion_payload() |
| 993 | +# is not carefully parametrized. As such, the payloads should be chosen so |
| 994 | +# that either the allocation limiter is hit before other protections are |
| 995 | +# triggered or no protection at all is triggered. |
| 996 | + |
| 997 | +defassert_rejected(self,func,/,*args,**kwargs): |
| 998 | +"""Check that func(*args, **kwargs) hits the allocation limit.""" |
| 999 | +msg=r"out of memory: line \d+, column \d+" |
| 1000 | +self.assertRaisesRegex(expat.ExpatError,msg,func,*args,**kwargs) |
| 1001 | + |
| 1002 | +defset_activation_threshold(self,parser,threshold): |
| 1003 | +returnparser.SetAllocTrackerActivationThreshold(threshold) |
| 1004 | + |
| 1005 | +defset_maximum_amplification(self,parser,max_factor): |
| 1006 | +returnparser.SetAllocTrackerMaximumAmplification(max_factor) |
| 1007 | + |
| 1008 | +deftest_set_activation_threshold__threshold_reached(self): |
| 1009 | +parser=expat.ParserCreate() |
| 1010 | +# Choose a threshold expected to be always reached. |
| 1011 | +self.set_activation_threshold(parser,3) |
| 1012 | +# Check that the threshold is reached by choosing a small factor |
| 1013 | +# and a payload whose peak amplification factor exceeds it. |
| 1014 | +self.assertIsNone(self.set_maximum_amplification(parser,1.0)) |
| 1015 | +payload=self.exponential_expansion_payload(ncols=10,nrows=4) |
| 1016 | +self.assert_rejected(parser.Parse,payload,True) |
| 1017 | + |
| 1018 | +deftest_set_activation_threshold__threshold_not_reached(self): |
| 1019 | +parser=expat.ParserCreate() |
| 1020 | +# Choose a threshold expected to be never reached. |
| 1021 | +self.set_activation_threshold(parser,pow(10,5)) |
| 1022 | +# Check that the threshold is reached by choosing a small factor |
| 1023 | +# and a payload whose peak amplification factor exceeds it. |
| 1024 | +self.assertIsNone(self.set_maximum_amplification(parser,1.0)) |
| 1025 | +payload=self.exponential_expansion_payload(ncols=10,nrows=4) |
| 1026 | +self.assertIsNotNone(parser.Parse(payload,True)) |
| 1027 | + |
| 1028 | +deftest_set_maximum_amplification__amplification_exceeded(self): |
| 1029 | +parser=expat.ParserCreate() |
| 1030 | +# Unconditionally enable maximum activation factor. |
| 1031 | +self.set_activation_threshold(parser,0) |
| 1032 | +# Choose a max amplification factor expected to always be exceeded. |
| 1033 | +self.assertIsNone(self.set_maximum_amplification(parser,1.0)) |
| 1034 | +# Craft a payload for which the peak amplification factor is > 1.0. |
| 1035 | +payload=self.exponential_expansion_payload(ncols=1,nrows=2) |
| 1036 | +self.assert_rejected(parser.Parse,payload,True) |
| 1037 | + |
| 1038 | +deftest_set_maximum_amplification__amplification_not_exceeded(self): |
| 1039 | +parser=expat.ParserCreate() |
| 1040 | +# Unconditionally enable maximum activation factor. |
| 1041 | +self.set_activation_threshold(parser,0) |
| 1042 | +# Choose a max amplification factor expected to never be exceeded. |
| 1043 | +self.assertIsNone(self.set_maximum_amplification(parser,1e4)) |
| 1044 | +# Craft a payload for which the peak amplification factor is < 1e4. |
| 1045 | +payload=self.exponential_expansion_payload(ncols=1,nrows=2) |
| 1046 | +self.assertIsNotNone(parser.Parse(payload,True)) |
| 1047 | + |
| 1048 | + |
851 | 1049 | if__name__=="__main__": |
852 | 1050 | unittest.main() |