Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Potential Quadratic Complexity Vulnerabilities in theemail Module #136063

Open
Labels
3.10only security fixes3.11only security fixes3.12only security fixes3.13bugs and security fixes3.14bugs and security fixes3.15new features, bugs and security fixesstdlibStandard Library Python modules in the Lib/ directorytopic-emailtype-securityA security issue
@kexinoh

Description

@kexinoh

Bug Description:
A series of simple quadratic complexity vulnerabilities has been identified in theemail package. After confirmation by CPython's security team, these low-threat DOS vulnerabilities can be fixed with community assistance.

Vulnerability Locations (All Fixed):

  1. def_parseparam(s):

    2.
    defget_phrase(value):

    3.
    whilevalueand (value[0]=='\\'orvalue[0]notinPHRASE_ENDS):

    4.
    5.
    6.
    7.
    8.
    9.
    10.
    11.
    12.
    13.
    14.
    to_encode=to_encode[1:]

Below are the newly identified DoS risk points in theemail module, added on2026/01/23, updated from#144133:

  1. def_refold_parse_tree(parse_tree,*,policy):
  2. def_fold_mime_parameters(part,lines,maxlen,encoding):
  3. defset_boundary(self,boundary):
  4. def__add__(self,other):
  5. def__isub__(self,other):
  6. defall_mailboxes(self):

more:

TokenList.all_defects:

@property
defall_defects(self):
returnsum((x.all_defectsforxinself),self.defects)

AddressList.mailboxes:
@property
defmailboxes(self):
returnsum((x.mailboxes
forxinselfifx.token_type=='address'), [])

get_encoded_word:
defget_encoded_word(value,terminal_type='vtext'):
""" encoded-word = "=?" charset "?" encoding "?" encoded-text "?="
"""
ew=EncodedWord()
ifnotvalue.startswith('=?'):
raiseerrors.HeaderParseError(
"expected encoded word but found {}".format(value))
tok,*remainder=value[2:].split('?=',1)
iftok==value[2:]:
raiseerrors.HeaderParseError(
"expected encoded word but found {}".format(value))
remstr=''.join(remainder)
if (len(remstr)>1and
remstr[0]inhexdigitsand
remstr[1]inhexdigitsand
tok.count('?')<2):
# The ? after the CTE was followed by an encoded word escape (=XX).
rest,*remainder=remstr.split('?=',1)
tok=tok+'?='+rest
iflen(tok.split())>1:
ew.defects.append(errors.InvalidHeaderDefect(
"whitespace inside encoded word"))
ew.cte=value
value=''.join(remainder)
try:
text,charset,lang,defects=_ew.decode('=?'+tok+'?=')
except (ValueError,KeyError):
raise_InvalidEwError(
"encoded word format invalid: '{}'".format(ew.cte))
ew.charset=charset
ew.lang=lang
ew.defects.extend(defects)
whiletext:
iftext[0]inWSP:
token,text=get_fws(text)
ew.append(token)
continue
chars,*remainder=_wsp_splitter(text,1)
vtext=ValueTerminal(chars,terminal_type)
_validate_xtext(vtext)
ew.append(vtext)
text=''.join(remainder)
# Encoded words should be followed by a WS
ifvalueandvalue[0]notinWSP:
ew.defects.append(errors.InvalidHeaderDefect(
"missing trailing whitespace after encoded-word"))
returnew,value

get_unstructured:
defget_unstructured(value):
"""unstructured = (*([FWS] vchar) *WSP) / obs-unstruct
obs-unstruct = *((*LF *CR *(obs-utext) *LF *CR)) / FWS)
obs-utext = %d0 / obs-NO-WS-CTL / LF / CR
obs-NO-WS-CTL is control characters except WSP/CR/LF.
So, basically, we have printable runs, plus control characters or nulls in
the obsolete syntax, separated by whitespace. Since RFC 2047 uses the
obsolete syntax in its specification, but requires whitespace on either
side of the encoded words, I can see no reason to need to separate the
non-printable-non-whitespace from the printable runs if they occur, so we
parse this into xtext tokens separated by WSP tokens.
Because an 'unstructured' value must by definition constitute the entire
value, this 'get' routine does not return a remaining value, only the
parsed TokenList.
"""
# XXX: but what about bare CR and LF? They might signal the start or
# end of an encoded word. YAGNI for now, since our current parsers
# will never send us strings with bare CR or LF.
unstructured=UnstructuredTokenList()
whilevalue:
ifvalue[0]inWSP:
token,value=get_fws(value)
unstructured.append(token)
continue
valid_ew=True
ifvalue.startswith('=?'):
try:
token,value=get_encoded_word(value,'utext')
except_InvalidEwError:
valid_ew=False
excepterrors.HeaderParseError:
# XXX: Need to figure out how to register defects when
# appropriate here.
pass
else:
have_ws=True
iflen(unstructured)>0:
ifunstructured[-1].token_type!='fws':
unstructured.defects.append(errors.InvalidHeaderDefect(
"missing whitespace before encoded word"))
have_ws=False
ifhave_wsandlen(unstructured)>1:
ifunstructured[-2].token_type=='encoded-word':
unstructured[-1]=EWWhiteSpaceTerminal(
unstructured[-1],'fws')
unstructured.append(token)
continue
tok,*remainder=_wsp_splitter(value,1)
# Split in the middle of an atom if there is a rfc2047 encoded word
# which does not have WSP on both sides. The defect will be registered
# the next time through the loop.
# This needs to only be performed when the encoded word is valid;
# otherwise, performing it on an invalid encoded word can cause
# the parser to go in an infinite loop.
ifvalid_ewandrfc2047_matcher.search(tok):
tok,*remainder=value.partition('=?')
vtext=ValueTerminal(tok,'utext')
_validate_xtext(vtext)
unstructured.append(vtext)
value=''.join(remainder)
returnunstructured

get_bare_quoted_string:
defget_bare_quoted_string(value):
"""bare-quoted-string = DQUOTE *([FWS] qcontent) [FWS] DQUOTE
A quoted-string without the leading or trailing white space. Its
value is the text between the quote marks, with whitespace
preserved and quoted pairs decoded.
"""
ifnotvalueorvalue[0]!='"':
raiseerrors.HeaderParseError(
"expected '\"' but found '{}'".format(value))
bare_quoted_string=BareQuotedString()
value=value[1:]
ifvalueandvalue[0]=='"':
token,value=get_qcontent(value)
bare_quoted_string.append(token)
whilevalueandvalue[0]!='"':
ifvalue[0]inWSP:
token,value=get_fws(value)
elifvalue[:2]=='=?':
valid_ew=False
try:
token,value=get_encoded_word(value)
bare_quoted_string.defects.append(errors.InvalidHeaderDefect(
"encoded word inside quoted string"))
valid_ew=True
excepterrors.HeaderParseError:
token,value=get_qcontent(value)
# Collapse the whitespace between two encoded words that occur in a
# bare-quoted-string.
ifvalid_ewandlen(bare_quoted_string)>1:
if (bare_quoted_string[-1].token_type=='fws'and
bare_quoted_string[-2].token_type=='encoded-word'):
bare_quoted_string[-1]=EWWhiteSpaceTerminal(
bare_quoted_string[-1],'fws')
else:
token,value=get_qcontent(value)
bare_quoted_string.append(token)
ifnotvalue:
bare_quoted_string.defects.append(errors.InvalidHeaderDefect(
"end of header inside quoted string"))
returnbare_quoted_string,value
returnbare_quoted_string,value[1:]

get_comment:
defget_comment(value):
"""comment = "(" *([FWS] ccontent) [FWS] ")"
ccontent = ctext / quoted-pair / comment
We handle nested comments here, and quoted-pair in our qp-ctext routine.
"""
ifvalueandvalue[0]!='(':
raiseerrors.HeaderParseError(
"expected '(' but found '{}'".format(value))
comment=Comment()
value=value[1:]
whilevalueandvalue[0]!=")":
ifvalue[0]inWSP:
token,value=get_fws(value)
elifvalue[0]=='(':
token,value=get_comment(value)
else:
token,value=get_qp_ctext(value)
comment.append(token)
ifnotvalue:
comment.defects.append(errors.InvalidHeaderDefect(
"end of header inside comment"))
returncomment,value
returncomment,value[1:]

get_dot_atom_text:
defget_dot_atom_text(value):
""" dot-text = 1*atext *("." 1*atext)
"""
dot_atom_text=DotAtomText()
ifnotvalueorvalue[0]inATOM_ENDS:
raiseerrors.HeaderParseError("expected atom at a start of "
"dot-atom-text but found '{}'".format(value))
whilevalueandvalue[0]notinATOM_ENDS:
token,value=get_atext(value)
dot_atom_text.append(token)
ifvalueandvalue[0]=='.':
dot_atom_text.append(DOT)
value=value[1:]
ifdot_atom_text[-1]isDOT:
raiseerrors.HeaderParseError("expected atom at end of dot-atom-text "
"but found '{}'".format('.'+value))
returndot_atom_text,value

get_dot_atom:
defget_dot_atom(value):
""" dot-atom = [CFWS] dot-atom-text [CFWS]
Any place we can have a dot atom, we could instead have an rfc2047 encoded
word.
"""
dot_atom=DotAtom()
ifvalue[0]inCFWS_LEADER:
token,value=get_cfws(value)
dot_atom.append(token)
ifvalue.startswith('=?'):
try:
token,value=get_encoded_word(value)
excepterrors.HeaderParseError:
# XXX: need to figure out how to register defects when
# appropriate here.
token,value=get_dot_atom_text(value)
else:
token,value=get_dot_atom_text(value)
dot_atom.append(token)
ifvalueandvalue[0]inCFWS_LEADER:
token,value=get_cfws(value)
dot_atom.append(token)
returndot_atom,value

get_word:
defget_word(value):
"""word = atom / quoted-string
Either atom or quoted-string may start with CFWS. We have to peel off this
CFWS first to determine which type of word to parse. Afterward we splice
the leading CFWS, if any, into the parsed sub-token.
If neither an atom or a quoted-string is found before the next special, a
HeaderParseError is raised.
The token returned is either an Atom or a QuotedString, as appropriate.
This means the 'word' level of the formal grammar is not represented in the
parse tree; this is because having that extra layer when manipulating the
parse tree is more confusing than it is helpful.
"""
ifvalue[0]inCFWS_LEADER:
leader,value=get_cfws(value)
else:
leader=None
ifnotvalue:
raiseerrors.HeaderParseError(
"Expected 'atom' or 'quoted-string' but found nothing.")
ifvalue[0]=='"':
token,value=get_quoted_string(value)
elifvalue[0]inSPECIALS:
raiseerrors.HeaderParseError("Expected 'atom' or 'quoted-string' "
"but found '{}'".format(value))
else:
token,value=get_atom(value)
ifleaderisnotNone:
token[:0]= [leader]
returntoken,value

get_local_part:
defget_local_part(value):
""" local-part = dot-atom / quoted-string / obs-local-part
"""
local_part=LocalPart()
leader=None
ifvalueandvalue[0]inCFWS_LEADER:
leader,value=get_cfws(value)
ifnotvalue:
raiseerrors.HeaderParseError(
"expected local-part but found '{}'".format(value))
try:
token,value=get_dot_atom(value)
excepterrors.HeaderParseError:
try:
token,value=get_word(value)
excepterrors.HeaderParseError:
ifvalue[0]!='\\'andvalue[0]inPHRASE_ENDS:
raise
token=TokenList()
ifleaderisnotNone:
token[:0]= [leader]
local_part.append(token)
ifvalueand (value[0]=='\\'orvalue[0]notinPHRASE_ENDS):
obs_local_part,value=get_obs_local_part(str(local_part)+value)
ifobs_local_part.token_type=='invalid-obs-local-part':
local_part.defects.append(errors.InvalidHeaderDefect(
"local-part is not dot-atom, quoted-string, or obs-local-part"))
else:
local_part.defects.append(errors.ObsoleteHeaderDefect(
"local-part is not a dot-atom (contains CFWS)"))
local_part[0]=obs_local_part
try:
local_part.value.encode('ascii')
exceptUnicodeEncodeError:
local_part.defects.append(errors.NonASCIILocalPartDefect(
"local-part contains non-ASCII characters)"))
returnlocal_part,value

get_domain:
defget_domain(value):
""" domain = dot-atom / domain-literal / obs-domain
obs-domain = atom *("." atom))
"""
domain=Domain()
leader=None
ifvalueandvalue[0]inCFWS_LEADER:
leader,value=get_cfws(value)
ifnotvalue:
raiseerrors.HeaderParseError(
"expected domain but found '{}'".format(value))
ifvalue[0]=='[':
token,value=get_domain_literal(value)
ifleaderisnotNone:
token[:0]= [leader]
domain.append(token)
returndomain,value
try:
token,value=get_dot_atom(value)
excepterrors.HeaderParseError:
token,value=get_atom(value)
ifvalueandvalue[0]=='@':
raiseerrors.HeaderParseError('Invalid Domain')
ifleaderisnotNone:
token[:0]= [leader]
domain.append(token)
ifvalueandvalue[0]=='.':
domain.defects.append(errors.ObsoleteHeaderDefect(
"domain is not a dot-atom (contains CFWS)"))
ifdomain[0].token_type=='dot-atom':
domain[:]=domain[0]
whilevalueandvalue[0]=='.':
domain.append(DOT)
token,value=get_atom(value[1:])
domain.append(token)
returndomain,value

get_addr_spec:
defget_addr_spec(value):
""" addr-spec = local-part "@" domain
"""
addr_spec=AddrSpec()
token,value=get_local_part(value)
addr_spec.append(token)
ifnotvalueorvalue[0]!='@':
addr_spec.defects.append(errors.InvalidHeaderDefect(
"addr-spec local part with no domain"))
returnaddr_spec,value
addr_spec.append(ValueTerminal('@','address-at-symbol'))
token,value=get_domain(value[1:])
addr_spec.append(token)
returnaddr_spec,value

get_angle_addr:
defget_angle_addr(value):
""" angle-addr = [CFWS] "<" addr-spec ">" [CFWS] / obs-angle-addr
obs-angle-addr = [CFWS] "<" obs-route addr-spec ">" [CFWS]
"""
angle_addr=AngleAddr()
ifvalueandvalue[0]inCFWS_LEADER:
token,value=get_cfws(value)
angle_addr.append(token)
ifnotvalueorvalue[0]!='<':
raiseerrors.HeaderParseError(
"expected angle-addr but found '{}'".format(value))
angle_addr.append(ValueTerminal('<','angle-addr-start'))
value=value[1:]
# Although it is not legal per RFC5322, SMTP uses '<>' in certain
# circumstances.
ifvalueandvalue[0]=='>':
angle_addr.append(ValueTerminal('>','angle-addr-end'))
angle_addr.defects.append(errors.InvalidHeaderDefect(
"null addr-spec in angle-addr"))
value=value[1:]
returnangle_addr,value
try:
token,value=get_addr_spec(value)
excepterrors.HeaderParseError:
try:
token,value=get_obs_route(value)
angle_addr.defects.append(errors.ObsoleteHeaderDefect(
"obsolete route specification in angle-addr"))
excepterrors.HeaderParseError:
raiseerrors.HeaderParseError(
"expected addr-spec or obs-route but found '{}'".format(value))
angle_addr.append(token)
token,value=get_addr_spec(value)
angle_addr.append(token)
ifvalueandvalue[0]=='>':
value=value[1:]
else:
angle_addr.defects.append(errors.InvalidHeaderDefect(
"missing trailing '>' on angle-addr"))
angle_addr.append(ValueTerminal('>','angle-addr-end'))
ifvalueandvalue[0]inCFWS_LEADER:
token,value=get_cfws(value)
angle_addr.append(token)
returnangle_addr,value

get_display_name:
defget_display_name(value):
""" display-name = phrase
Because this is simply a name-rule, we don't return a display-name
token containing a phrase, but rather a display-name token with
the content of the phrase.
"""
display_name=DisplayName()
token,value=get_phrase(value)
display_name.extend(token[:])
display_name.defects=token.defects[:]
returndisplay_name,value

get_name_addr:
defget_name_addr(value):
""" name-addr = [display-name] angle-addr
"""
name_addr=NameAddr()
# Both the optional display name and the angle-addr can start with cfws.
leader=None
ifnotvalue:
raiseerrors.HeaderParseError(
"expected name-addr but found '{}'".format(value))
ifvalue[0]inCFWS_LEADER:
leader,value=get_cfws(value)
ifnotvalue:
raiseerrors.HeaderParseError(
"expected name-addr but found '{}'".format(leader))
ifvalue[0]!='<':
ifvalue[0]inPHRASE_ENDS:
raiseerrors.HeaderParseError(
"expected name-addr but found '{}'".format(value))
token,value=get_display_name(value)
ifnotvalue:
raiseerrors.HeaderParseError(
"expected name-addr but found '{}'".format(token))
ifleaderisnotNone:
ifisinstance(token[0],TokenList):
token[0][:0]= [leader]
else:
token[:0]= [leader]
leader=None
name_addr.append(token)
token,value=get_angle_addr(value)
ifleaderisnotNone:
token[:0]= [leader]
name_addr.append(token)
returnname_addr,value

get_mailbox:
defget_mailbox(value):
""" mailbox = name-addr / addr-spec
"""
# The only way to figure out if we are dealing with a name-addr or an
# addr-spec is to try parsing each one.
mailbox=Mailbox()
try:
token,value=get_name_addr(value)
excepterrors.HeaderParseError:
try:
token,value=get_addr_spec(value)
excepterrors.HeaderParseError:
raiseerrors.HeaderParseError(
"expected mailbox but found '{}'".format(value))
ifany(isinstance(x,errors.InvalidHeaderDefect)
forxintoken.all_defects):
mailbox.token_type='invalid-mailbox'
mailbox.append(token)
returnmailbox,value

get_mailbox_list:
defget_mailbox_list(value):
""" mailbox-list = (mailbox *("," mailbox)) / obs-mbox-list
obs-mbox-list = *([CFWS] ",") mailbox *("," [mailbox / CFWS])
For this routine we go outside the formal grammar in order to improve error
handling. We recognize the end of the mailbox list only at the end of the
value or at a ';' (the group terminator). This is so that we can turn
invalid mailboxes into InvalidMailbox tokens and continue parsing any
remaining valid mailboxes. We also allow all mailbox entries to be null,
and this condition is handled appropriately at a higher level.
"""
mailbox_list=MailboxList()
whilevalueandvalue[0]!=';':
try:
token,value=get_mailbox(value)
mailbox_list.append(token)
excepterrors.HeaderParseError:
leader=None
ifvalue[0]inCFWS_LEADER:
leader,value=get_cfws(value)
ifnotvalueorvalue[0]in',;':
mailbox_list.append(leader)
mailbox_list.defects.append(errors.ObsoleteHeaderDefect(
"empty element in mailbox-list"))
else:
token,value=get_invalid_mailbox(value,',;')
ifleaderisnotNone:
token[:0]= [leader]
mailbox_list.append(token)
mailbox_list.defects.append(errors.InvalidHeaderDefect(
"invalid mailbox in mailbox-list"))
elifvalue[0]==',':
mailbox_list.defects.append(errors.ObsoleteHeaderDefect(
"empty element in mailbox-list"))
else:
token,value=get_invalid_mailbox(value,',;')
ifleaderisnotNone:
token[:0]= [leader]
mailbox_list.append(token)
mailbox_list.defects.append(errors.InvalidHeaderDefect(
"invalid mailbox in mailbox-list"))
ifvalueandvalue[0]notin',;':
# Crap after mailbox; treat it as an invalid mailbox.
# The mailbox info will still be available.
mailbox=mailbox_list[-1]
mailbox.token_type='invalid-mailbox'
token,value=get_invalid_mailbox(value,',;')
mailbox.extend(token)
mailbox_list.defects.append(errors.InvalidHeaderDefect(
"invalid mailbox in mailbox-list"))
ifvalueandvalue[0]==',':
mailbox_list.append(ListSeparator)
value=value[1:]
returnmailbox_list,value

get_group_list:
defget_group_list(value):
""" group-list = mailbox-list / CFWS / obs-group-list
obs-group-list = 1*([CFWS] ",") [CFWS]
"""
group_list=GroupList()
ifnotvalue:
group_list.defects.append(errors.InvalidHeaderDefect(
"end of header before group-list"))
returngroup_list,value
leader=None
ifvalueandvalue[0]inCFWS_LEADER:
leader,value=get_cfws(value)
ifnotvalue:
# This should never happen in email parsing, since CFWS-only is a
# legal alternative to group-list in a group, which is the only
# place group-list appears.
group_list.defects.append(errors.InvalidHeaderDefect(
"end of header in group-list"))
group_list.append(leader)
returngroup_list,value
ifvalue[0]==';':
group_list.append(leader)
returngroup_list,value
token,value=get_mailbox_list(value)
iflen(token.all_mailboxes)==0:
ifleaderisnotNone:
group_list.append(leader)
group_list.extend(token)
group_list.defects.append(errors.ObsoleteHeaderDefect(
"group-list with empty entries"))
returngroup_list,value
ifleaderisnotNone:
token[:0]= [leader]
group_list.append(token)
returngroup_list,value

get_group:
defget_group(value):
""" group = display-name ":" [group-list] ";" [CFWS]
"""
group=Group()
token,value=get_display_name(value)
ifnotvalueorvalue[0]!=':':
raiseerrors.HeaderParseError("expected ':' at end of group "
"display name but found '{}'".format(value))
group.append(token)
group.append(ValueTerminal(':','group-display-name-terminator'))
value=value[1:]
ifvalueandvalue[0]==';':
group.append(ValueTerminal(';','group-terminator'))
returngroup,value[1:]
token,value=get_group_list(value)
group.append(token)
ifnotvalue:
group.defects.append(errors.InvalidHeaderDefect(
"end of header in group"))
elifvalue[0]!=';':
raiseerrors.HeaderParseError(
"expected ';' at end of group but found {}".format(value))
group.append(ValueTerminal(';','group-terminator'))
value=value[1:]
ifvalueandvalue[0]inCFWS_LEADER:
token,value=get_cfws(value)
group.append(token)
returngroup,value

get_address:
defget_address(value):
""" address = mailbox / group
Note that counter-intuitively, an address can be either a single address or
a list of addresses (a group). This is why the returned Address object has
a 'mailboxes' attribute which treats a single address as a list of length
one. When you need to differentiate between to two cases, extract the single
element, which is either a mailbox or a group token.
"""
# The formal grammar isn't very helpful when parsing an address. mailbox
# and group, especially when allowing for obsolete forms, start off very
# similarly. It is only when you reach one of @, <, or : that you know
# what you've got. So, we try each one in turn, starting with the more
# likely of the two. We could perhaps make this more efficient by looking
# for a phrase and then branching based on the next character, but that
# would be a premature optimization.
address=Address()
try:
token,value=get_group(value)
excepterrors.HeaderParseError:
try:
token,value=get_mailbox(value)
excepterrors.HeaderParseError:
raiseerrors.HeaderParseError(
"expected address but found '{}'".format(value))
address.append(token)
returnaddress,value

get_address_list:
defget_address_list(value):
""" address_list = (address *("," address)) / obs-addr-list
obs-addr-list = *([CFWS] ",") address *("," [address / CFWS])
We depart from the formal grammar here by continuing to parse until the end
of the input, assuming the input to be entirely composed of an
address-list. This is always true in email parsing, and allows us
to skip invalid addresses to parse additional valid ones.
"""
address_list=AddressList()
whilevalue:
try:
token,value=get_address(value)
address_list.append(token)
excepterrors.HeaderParseError:
leader=None
ifvalue[0]inCFWS_LEADER:
leader,value=get_cfws(value)
ifnotvalueorvalue[0]==',':
address_list.append(leader)
address_list.defects.append(errors.ObsoleteHeaderDefect(
"address-list entry with no content"))
else:
token,value=get_invalid_mailbox(value,',')
ifleaderisnotNone:
token[:0]= [leader]
address_list.append(Address([token]))
address_list.defects.append(errors.InvalidHeaderDefect(
"invalid address in address-list"))
elifvalue[0]==',':
address_list.defects.append(errors.ObsoleteHeaderDefect(
"empty element in address-list"))
else:
token,value=get_invalid_mailbox(value,',')
ifleaderisnotNone:
token[:0]= [leader]
address_list.append(Address([token]))
address_list.defects.append(errors.InvalidHeaderDefect(
"invalid address in address-list"))
ifvalueandvalue[0]!=',':
# Crap after address; treat it as an invalid mailbox.
# The mailbox info will still be available.
mailbox=address_list[-1][0]
mailbox.token_type='invalid-mailbox'
token,value=get_invalid_mailbox(value,',')
mailbox.extend(token)
address_list.defects.append(errors.InvalidHeaderDefect(
"invalid address in address-list"))
ifvalue:# Must be a , at this point.
address_list.append(ListSeparator)
value=value[1:]
returnaddress_list,value

get_msg_id:
defget_msg_id(value):
"""msg-id = [CFWS] "<" id-left '@' id-right ">" [CFWS]
id-left = dot-atom-text / obs-id-left
id-right = dot-atom-text / no-fold-literal / obs-id-right
no-fold-literal = "[" *dtext "]"
"""
msg_id=MsgID()
ifvalueandvalue[0]inCFWS_LEADER:
token,value=get_cfws(value)
msg_id.append(token)
ifnotvalueorvalue[0]!='<':
raiseerrors.HeaderParseError(
"expected msg-id but found '{}'".format(value))
msg_id.append(ValueTerminal('<','msg-id-start'))
value=value[1:]
# Parse id-left.
try:
token,value=get_dot_atom_text(value)
excepterrors.HeaderParseError:
try:
# obs-id-left is same as local-part of add-spec.
token,value=get_obs_local_part(value)
msg_id.defects.append(errors.ObsoleteHeaderDefect(
"obsolete id-left in msg-id"))
excepterrors.HeaderParseError:
raiseerrors.HeaderParseError(
"expected dot-atom-text or obs-id-left"
" but found '{}'".format(value))
msg_id.append(token)
ifnotvalueorvalue[0]!='@':
msg_id.defects.append(errors.InvalidHeaderDefect(
"msg-id with no id-right"))
# Even though there is no id-right, if the local part
# ends with `>` let's just parse it too and return
# along with the defect.
ifvalueandvalue[0]=='>':
msg_id.append(ValueTerminal('>','msg-id-end'))
value=value[1:]
returnmsg_id,value
msg_id.append(ValueTerminal('@','address-at-symbol'))
value=value[1:]
# Parse id-right.
try:
token,value=get_dot_atom_text(value)
excepterrors.HeaderParseError:
try:
token,value=get_no_fold_literal(value)
excepterrors.HeaderParseError:
try:
token,value=get_domain(value)
msg_id.defects.append(errors.ObsoleteHeaderDefect(
"obsolete id-right in msg-id"))
excepterrors.HeaderParseError:
raiseerrors.HeaderParseError(
"expected dot-atom-text, no-fold-literal or obs-id-right"
" but found '{}'".format(value))
msg_id.append(token)
ifvalueandvalue[0]=='>':
value=value[1:]
else:
msg_id.defects.append(errors.InvalidHeaderDefect(
"missing trailing '>' on msg-id"))
msg_id.append(ValueTerminal('>','msg-id-end'))
ifvalueandvalue[0]inCFWS_LEADER:
token,value=get_cfws(value)
msg_id.append(token)
returnmsg_id,value

parse_message_id:
defparse_message_id(value):
"""message-id = "Message-ID:" msg-id CRLF
"""
message_id=MessageID()
try:
token,value=get_msg_id(value)
message_id.append(token)
excepterrors.HeaderParseErrorasex:
token=get_unstructured(value)
message_id=InvalidMessageID(token)
message_id.defects.append(
errors.InvalidHeaderDefect("Invalid msg-id: {!r}".format(ex)))
else:
# Value after parsing a valid msg_id should be None.
ifvalue:
message_id.defects.append(errors.InvalidHeaderDefect(
"Unexpected {!r}".format(value)))
returnmessage_id

parse_mime_parameters:
defparse_mime_parameters(value):
""" parameter *( ";" parameter )
That BNF is meant to indicate this routine should only be called after
finding and handling the leading ';'. There is no corresponding rule in
the formal RFC grammar, but it is more convenient for us for the set of
parameters to be treated as its own TokenList.
This is 'parse' routine because it consumes the remaining value, but it
would never be called to parse a full header. Instead it is called to
parse everything after the non-parameter value of a specific MIME header.
"""
mime_parameters=MimeParameters()
whilevalue:
try:
token,value=get_parameter(value)
mime_parameters.append(token)
excepterrors.HeaderParseError:
leader=None
ifvalue[0]inCFWS_LEADER:
leader,value=get_cfws(value)
ifnotvalue:
mime_parameters.append(leader)
returnmime_parameters
ifvalue[0]==';':
ifleaderisnotNone:
mime_parameters.append(leader)
mime_parameters.defects.append(errors.InvalidHeaderDefect(
"parameter entry with no content"))
else:
token,value=get_invalid_parameter(value)
ifleader:
token[:0]= [leader]
mime_parameters.append(token)
mime_parameters.defects.append(errors.InvalidHeaderDefect(
"invalid parameter {!r}".format(token)))
ifvalueandvalue[0]!=';':
# Junk after the otherwise valid parameter. Mark it as
# invalid, but it will have a value.
param=mime_parameters[-1]
param.token_type='invalid-parameter'
token,value=get_invalid_parameter(value)
param.extend(token)
mime_parameters.defects.append(errors.InvalidHeaderDefect(
"parameter with invalid trailing text {!r}".format(token)))
ifvalue:
# Must be a ';' at this point.
mime_parameters.append(ValueTerminal(';','parameter-separator'))
value=value[1:]
returnmime_parameters

AddrlistClass.getaddress:
defgetaddress(self):
"""Parse the next address."""
self.commentlist= []
self.gotonext()
oldpos=self.pos
oldcl=self.commentlist
plist=self.getphraselist()
self.gotonext()
returnlist= []
ifself.pos>=len(self.field):
# Bad email address technically, no domain.
ifplist:
returnlist= [(SPACE.join(self.commentlist),plist[0])]
elifself.field[self.pos]in'.@':
# email address is just an addrspec
# this isn't very efficient since we start over
self.pos=oldpos
self.commentlist=oldcl
addrspec=self.getaddrspec()
returnlist= [(SPACE.join(self.commentlist),addrspec)]
elifself.field[self.pos]==':':
# address is a group
returnlist= []
fieldlen=len(self.field)
self.pos+=1
whileself.pos<len(self.field):
self.gotonext()
ifself.pos<fieldlenandself.field[self.pos]==';':
self.pos+=1
break
returnlist=returnlist+self.getaddress()
elifself.field[self.pos]=='<':
# Address is a phrase then a route addr
routeaddr=self.getrouteaddr()
ifself.commentlist:
returnlist= [(SPACE.join(plist)+' ('+
' '.join(self.commentlist)+')',routeaddr)]
else:
returnlist= [(SPACE.join(plist),routeaddr)]
else:
ifplist:
returnlist= [(SPACE.join(self.commentlist),plist[0])]
elifself.field[self.pos]inself.specials:
self.pos+=1
self.gotonext()
ifself.pos<len(self.field)andself.field[self.pos]==',':
self.pos+=1
returnreturnlist

Charset.header_encode_lines:
defheader_encode_lines(self,string,maxlengths):
"""Header-encode a string by converting it first to bytes.
This is similar to `header_encode()` except that the string is fit
into maximum line lengths as given by the argument.
:param string: A unicode string for the header. It must be possible
to encode this string to bytes using the character set's
output codec.
:param maxlengths: Maximum line length iterator. Each element
returned from this iterator will provide the next maximum line
length. This parameter is used as an argument to built-in next()
and should never be exhausted. The maximum line lengths should
not count the RFC 2047 chrome. These line lengths are only a
hint; the splitter does the best it can.
:return: Lines of encoded strings, each with RFC 2047 chrome.
"""
# See which encoding we should use.
codec=self.output_codecor'us-ascii'
header_bytes=_encode(string,codec)
encoder_module=self._get_encoder(header_bytes)
encoder=partial(encoder_module.header_encode,charset=codec)
# Calculate the number of characters that the RFC 2047 chrome will
# contribute to each line.
charset=self.get_output_charset()
extra=len(charset)+RFC2047_CHROME_LEN
# Now comes the hard part. We must encode bytes but we can't split on
# bytes because some character sets are variable length and each
# encoded word must stand on its own. So the problem is you have to
# encode to bytes to figure out this word's length, but you must split
# on characters. This causes two problems: first, we don't know how
# many octets a specific substring of unicode characters will get
# encoded to, and second, we don't know how many ASCII characters
# those octets will get encoded to. Unless we try it. Which seems
# inefficient. In the interest of being correct rather than fast (and
# in the hope that there will be few encoded headers in any such
# message), brute force it. :(
lines= []
current_line= []
maxlen=next(maxlengths)-extra
forcharacterinstring:
current_line.append(character)
this_line=EMPTYSTRING.join(current_line)
length=encoder_module.header_length(_encode(this_line,charset))
iflength>maxlen:
# This last character doesn't fit so pop it off.
current_line.pop()
# Does nothing fit on the first line?
ifnotlinesandnotcurrent_line:
lines.append(None)
else:
joined_line=EMPTYSTRING.join(current_line)
header_bytes=_encode(joined_line,codec)
lines.append(encoder(header_bytes))
current_line= [character]
maxlen=next(maxlengths)-extra
joined_line=EMPTYSTRING.join(current_line)
header_bytes=_encode(joined_line,codec)
lines.append(encoder(header_bytes))
returnlines

decode_header:
defdecode_header(header):
"""Decode a message header value without converting charset.
For historical reasons, this function may return either:
1. A list of length 1 containing a pair (str, None).
2. A list of (bytes, charset) pairs containing each of the decoded
parts of the header. Charset is None for non-encoded parts of the header,
otherwise a lower-case string containing the name of the character set
specified in the encoded string.
header may be a string that may or may not contain RFC2047 encoded words,
or it may be a Header object.
An email.errors.HeaderParseError may be raised when certain decoding error
occurs (e.g. a base64 decoding exception).
This function exists for backwards compatibility only. For new code, we
recommend using email.headerregistry.HeaderRegistry instead.
"""
# If it is a Header object, we can just return the encoded chunks.
ifhasattr(header,'_chunks'):
return [(_charset._encode(string,str(charset)),str(charset))
forstring,charsetinheader._chunks]
# If no encoding, just return the header with no charset.
ifnotecre.search(header):
return [(header,None)]
# First step is to parse all the encoded parts into triplets of the form
# (encoded_string, encoding, charset). For unencoded strings, the last
# two parts will be None.
words= []
forlineinheader.splitlines():
parts=ecre.split(line)
first=True
whileparts:
unencoded=parts.pop(0)
iffirst:
unencoded=unencoded.lstrip()
first=False
ifunencoded:
words.append((unencoded,None,None))
ifparts:
charset=parts.pop(0).lower()
encoding=parts.pop(0).lower()
encoded=parts.pop(0)
words.append((encoded,encoding,charset))
# Now loop over words and remove words that consist of whitespace
# between two encoded strings.
droplist= []
forn,winenumerate(words):
ifn>1andw[1]andwords[n-2][1]andwords[n-1][0].isspace():
droplist.append(n-1)
fordinreversed(droplist):
delwords[d]
# The next step is to decode each encoded word by applying the reverse
# base64 or quopri transformation. decoded_words is now a list of the
# form (decoded_word, charset).
decoded_words= []
forencoded_string,encoding,charsetinwords:
ifencodingisNone:
# This is an unencoded word.
decoded_words.append((encoded_string,charset))
elifencoding=='q':
word=email.quoprimime.header_decode(encoded_string)
decoded_words.append((word,charset))
elifencoding=='b':
paderr=len(encoded_string)%4# Postel's law: add missing padding
ifpaderr:
encoded_string+='==='[:4-paderr]
try:
word=email.base64mime.decode(encoded_string)
exceptbinascii.Error:
raiseHeaderParseError('Base64 decoding error')
else:
decoded_words.append((word,charset))
else:
raiseAssertionError('Unexpected encoding: '+encoding)
# Now convert all words to bytes and collapse consecutive runs of
# similarly encoded words.
collapsed= []
last_word=last_charset=None
forword,charsetindecoded_words:
ifisinstance(word,str):
word=bytes(word,'raw-unicode-escape')
iflast_wordisNone:
last_word=word
last_charset=charset
elifcharset!=last_charset:
collapsed.append((last_word,last_charset))
last_word=word
last_charset=charset
eliflast_charsetisNone:
last_word+=BSPACE+word
else:
last_word+=word
collapsed.append((last_word,last_charset))
returncollapsed

Header._normalize:
def_normalize(self):
# Step 1: Normalize the chunks so that all runs of identical charsets
# get collapsed into a single unicode string.
chunks= []
last_charset=None
last_chunk= []
forstring,charsetinself._chunks:
ifcharset==last_charset:
last_chunk.append(string)
else:
iflast_charsetisnotNone:
chunks.append((SPACE.join(last_chunk),last_charset))
last_chunk= [string]
last_charset=charset
iflast_chunk:
chunks.append((SPACE.join(last_chunk),last_charset))
self._chunks=chunks

Message.set_param:
defset_param(self,param,value,header='Content-Type',requote=True,
charset=None,language='',replace=False):
"""Set a parameter in the Content-Type header.
If the parameter already exists in the header, its value will be
replaced with the new value.
If header is Content-Type and has not yet been defined for this
message, it will be set to "text/plain" and the new parameter and
value will be appended as per RFC 2045.
An alternate header can be specified in the header argument, and all
parameters will be quoted as necessary unless requote is False.
If charset is specified, the parameter will be encoded according to RFC
2231. Optional language specifies the RFC 2231 language, defaulting
to the empty string. Both charset and language should be strings.
"""
ifnotisinstance(value,tuple)andcharset:
value= (charset,language,value)
ifheadernotinselfandheader.lower()=='content-type':
ctype='text/plain'
else:
ctype=self.get(header)
ifnotself.get_param(param,header=header):
ifnotctype:
ctype=_formatparam(param,value,requote)
else:
ctype=SEMISPACE.join(
[ctype,_formatparam(param,value,requote)])
else:
ctype=''
forold_param,old_valueinself.get_params(header=header,
unquote=requote):
append_param=''
ifold_param.lower()==param.lower():
append_param=_formatparam(param,value,requote)
else:
append_param=_formatparam(old_param,old_value,requote)
ifnotctype:
ctype=append_param
else:
ctype=SEMISPACE.join([ctype,append_param])
ifctype!=self.get(header):
ifreplace:
self.replace_header(header,ctype)
else:
delself[header]
self[header]=ctype

Message.del_param:
defdel_param(self,param,header='content-type',requote=True):
"""Remove the given parameter completely from the Content-Type header.
The header will be re-written in place without the parameter or its
value. All values will be quoted as necessary unless requote is
False. Optional header specifies an alternative to the Content-Type
header.
"""
ifheadernotinself:
return
new_ctype=''
forp,vinself.get_params(header=header,unquote=requote):
ifp.lower()!=param.lower():
ifnotnew_ctype:
new_ctype=_formatparam(p,v,requote)
else:
new_ctype=SEMISPACE.join([new_ctype,
_formatparam(p,v,requote)])
ifnew_ctype!=self.get(header):
delself[header]
self[header]=new_ctype

Repair Status:

Common Information:

  • CPython Version: main branch
  • Operating System: Linux
  • Credits: Finder is kexinoh (Xiangfan Wu) from QI-ANXIN Technology Research Institute.

Linked PRs

Metadata

Metadata

Assignees

No one assigned

    Labels

    3.10only security fixes3.11only security fixes3.12only security fixes3.13bugs and security fixes3.14bugs and security fixes3.15new features, bugs and security fixesstdlibStandard Library Python modules in the Lib/ directorytopic-emailtype-securityA security issue

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions


      [8]ページ先頭

      ©2009-2026 Movatter.jp