Source code for requests.utils

"""requests.utils~~~~~~~~~~~~~~This module provides utility functions that are used within Requeststhat are also useful for external consumption."""importcodecsimportcontextlibimportioimportosimportreimportsocketimportstructimportsysimporttempfileimportwarningsimportzipfilefromcollectionsimportOrderedDictfromurllib3.utilimportmake_headers,parse_urlfrom.importcertsfrom.__version__import__version__# to_native_string is unused here, but imported here for backwards compatibilityfrom._internal_utilsimport(# noqa: F401_HEADER_VALIDATORS_BYTE,_HEADER_VALIDATORS_STR,HEADER_VALIDATORS,to_native_string,)from.compatimport(Mapping,basestring,bytes,getproxies,getproxies_environment,integer_types,is_urllib3_1,)from.compatimportparse_http_listas_parse_list_headerfrom.compatimport(proxy_bypass,proxy_bypass_environment,quote,str,unquote,urlparse,urlunparse,)from.cookiesimportcookiejar_from_dictfrom.exceptionsimport(FileModeWarning,InvalidHeader,InvalidURL,UnrewindableBodyError,)from.structuresimportCaseInsensitiveDictNETRC_FILES=(".netrc","_netrc")DEFAULT_CA_BUNDLE_PATH=certs.where()DEFAULT_PORTS={"http":80,"https":443}# Ensure that ', ' is used to preserve previous delimiter behavior.DEFAULT_ACCEPT_ENCODING=", ".join(re.split(r",\s*",make_headers(accept_encoding=True)["accept-encoding"]))ifsys.platform=="win32":# provide a proxy_bypass version on Windows without DNS lookupsdefproxy_bypass_registry(host):try:importwinregexceptImportError:returnFalsetry:internetSettings=winreg.OpenKey(winreg.HKEY_CURRENT_USER,r"Software\Microsoft\Windows\CurrentVersion\Internet Settings",)# ProxyEnable could be REG_SZ or REG_DWORD, normalizing itproxyEnable=int(winreg.QueryValueEx(internetSettings,"ProxyEnable")[0])# ProxyOverride is almost always a stringproxyOverride=winreg.QueryValueEx(internetSettings,"ProxyOverride")[0]except(OSError,ValueError):returnFalseifnotproxyEnableornotproxyOverride:returnFalse# make a check value list from the registry entry: replace the# '<local>' string by the localhost entry and the corresponding# canonical entry.proxyOverride=proxyOverride.split(";")# filter out empty strings to avoid re.match return true in the following code.proxyOverride=filter(None,proxyOverride)# now check if we match one of the registry values.fortestinproxyOverride:iftest=="<local>":if"."notinhost:returnTruetest=test.replace(".",r"\.")# mask dotstest=test.replace("*",r".*")# change glob sequencetest=test.replace("?",r".")# change glob charifre.match(test,host,re.I):returnTruereturnFalsedefproxy_bypass(host):# noqa"""Return True, if the host should be bypassed.        Checks proxy settings gathered from the environment, if specified,        or the registry.        """ifgetproxies_environment():returnproxy_bypass_environment(host)else:returnproxy_bypass_registry(host)defdict_to_sequence(d):"""Returns an internal sequence dictionary update."""ifhasattr(d,"items"):d=d.items()returnddefsuper_len(o):total_length=Nonecurrent_position=0ifnotis_urllib3_1andisinstance(o,str):# urllib3 2.x+ treats all strings as utf-8 instead# of latin-1 (iso-8859-1) like http.client.o=o.encode("utf-8")ifhasattr(o,"__len__"):total_length=len(o)elifhasattr(o,"len"):total_length=o.lenelifhasattr(o,"fileno"):try:fileno=o.fileno()except(io.UnsupportedOperation,AttributeError):# AttributeError is a surprising exception, seeing as how we've just checked# that `hasattr(o, 'fileno')`.  It happens for objects obtained via# `Tarfile.extractfile()`, per issue 5229.passelse:total_length=os.fstat(fileno).st_size# Having used fstat to determine the file length, we need to# confirm that this file was opened up in binary mode.if"b"notino.mode:warnings.warn(("Requests has determined the content-length for this ""request using the binary size of the file: however, the ""file has been opened in text mode (i.e. without the 'b' ""flag in the mode). This may lead to an incorrect ""content-length. In Requests 3.0, support will be removed ""for files in text mode."),FileModeWarning,)ifhasattr(o,"tell"):try:current_position=o.tell()exceptOSError:# This can happen in some weird situations, such as when the file# is actually a special file descriptor like stdin. In this# instance, we don't know what the length is, so set it to zero and# let requests chunk it instead.iftotal_lengthisnotNone:current_position=total_lengthelse:ifhasattr(o,"seek")andtotal_lengthisNone:# StringIO and BytesIO have seek but no usable filenotry:# seek to end of fileo.seek(0,2)total_length=o.tell()# seek back to current position to support# partially read file-like objectso.seek(current_positionor0)exceptOSError:total_length=0iftotal_lengthisNone:total_length=0returnmax(0,total_length-current_position)defget_netrc_auth(url,raise_errors=False):"""Returns the Requests tuple auth for a given url from netrc."""netrc_file=os.environ.get("NETRC")ifnetrc_fileisnotNone:netrc_locations=(netrc_file,)else:netrc_locations=(f"~/{f}"forfinNETRC_FILES)try:fromnetrcimportNetrcParseError,netrcnetrc_path=Noneforfinnetrc_locations:loc=os.path.expanduser(f)ifos.path.exists(loc):netrc_path=locbreak# Abort early if there isn't one.ifnetrc_pathisNone:returnri=urlparse(url)host=ri.hostnametry:_netrc=netrc(netrc_path).authenticators(host)if_netrc:# Return with login / passwordlogin_i=0if_netrc[0]else1return(_netrc[login_i],_netrc[2])except(NetrcParseError,OSError):# If there was a parsing error or a permissions issue reading the file,# we'll just skip netrc auth unless explicitly asked to raise errors.ifraise_errors:raise# App Engine hackiness.except(ImportError,AttributeError):passdefguess_filename(obj):"""Tries to guess the filename of the given object."""name=getattr(obj,"name",None)ifnameandisinstance(name,basestring)andname[0]!="<"andname[-1]!=">":returnos.path.basename(name)defextract_zipped_paths(path):"""Replace nonexistent paths that look like they refer to a member of a zip    archive with the location of an extracted copy of the target, or else    just return the provided path unchanged.    """ifos.path.exists(path):# this is already a valid path, no need to do anything furtherreturnpath# find the first valid part of the provided path and treat that as a zip archive# assume the rest of the path is the name of a member in the archivearchive,member=os.path.split(path)whilearchiveandnotos.path.exists(archive):archive,prefix=os.path.split(archive)ifnotprefix:# If we don't check for an empty prefix after the split (in other words, archive remains unchanged after the split),# we _can_ end up in an infinite loop on a rare corner case affecting a small number of usersbreakmember="/".join([prefix,member])ifnotzipfile.is_zipfile(archive):returnpathzip_file=zipfile.ZipFile(archive)ifmembernotinzip_file.namelist():returnpath# we have a valid zip archive and a valid member of that archivetmp=tempfile.gettempdir()extracted_path=os.path.join(tmp,member.split("/")[-1])ifnotos.path.exists(extracted_path):# use read + write to avoid the creating nested folders, we only want the file, avoids mkdir racing conditionwithatomic_open(extracted_path)asfile_handler:file_handler.write(zip_file.read(member))returnextracted_path@contextlib.contextmanagerdefatomic_open(filename):"""Write a file to the disk in an atomic fashion"""tmp_descriptor,tmp_name=tempfile.mkstemp(dir=os.path.dirname(filename))try:withos.fdopen(tmp_descriptor,"wb")astmp_handler:yieldtmp_handleros.replace(tmp_name,filename)exceptBaseException:os.remove(tmp_name)raisedeffrom_key_val_list(value):"""Take an object and test to see if it can be represented as a    dictionary. Unless it can not be represented as such, return an    OrderedDict, e.g.,    ::        >>> from_key_val_list([('key', 'val')])        OrderedDict([('key', 'val')])        >>> from_key_val_list('string')        Traceback (most recent call last):        ...        ValueError: cannot encode objects that are not 2-tuples        >>> from_key_val_list({'key': 'val'})        OrderedDict([('key', 'val')])    :rtype: OrderedDict    """ifvalueisNone:returnNoneifisinstance(value,(str,bytes,bool,int)):raiseValueError("cannot encode objects that are not 2-tuples")returnOrderedDict(value)defto_key_val_list(value):"""Take an object and test to see if it can be represented as a    dictionary. If it can be, return a list of tuples, e.g.,    ::        >>> to_key_val_list([('key', 'val')])        [('key', 'val')]        >>> to_key_val_list({'key': 'val'})        [('key', 'val')]        >>> to_key_val_list('string')        Traceback (most recent call last):        ...        ValueError: cannot encode objects that are not 2-tuples    :rtype: list    """ifvalueisNone:returnNoneifisinstance(value,(str,bytes,bool,int)):raiseValueError("cannot encode objects that are not 2-tuples")ifisinstance(value,Mapping):value=value.items()returnlist(value)# From mitsuhiko/werkzeug (used with permission).defparse_list_header(value):"""Parse lists as described by RFC 2068 Section 2.    In particular, parse comma-separated lists where the elements of    the list may include quoted-strings.  A quoted-string could    contain a comma.  A non-quoted string could have quotes in the    middle.  Quotes are removed automatically after parsing.    It basically works like :func:`parse_set_header` just that items    may appear multiple times and case sensitivity is preserved.    The return value is a standard :class:`list`:    >>> parse_list_header('token, "quoted value"')    ['token', 'quoted value']    To create a header from the :class:`list` again, use the    :func:`dump_header` function.    :param value: a string with a list header.    :return: :class:`list`    :rtype: list    """result=[]foritemin_parse_list_header(value):ifitem[:1]==item[-1:]=='"':item=unquote_header_value(item[1:-1])result.append(item)returnresult# From mitsuhiko/werkzeug (used with permission).defparse_dict_header(value):"""Parse lists of key, value pairs as described by RFC 2068 Section 2 and    convert them into a python dict:    >>> d = parse_dict_header('foo="is a fish", bar="as well"')    >>> type(d) is dict    True    >>> sorted(d.items())    [('bar', 'as well'), ('foo', 'is a fish')]    If there is no value for a key it will be `None`:    >>> parse_dict_header('key_without_value')    {'key_without_value': None}    To create a header from the :class:`dict` again, use the    :func:`dump_header` function.    :param value: a string with a dict header.    :return: :class:`dict`    :rtype: dict    """result={}foritemin_parse_list_header(value):if"="notinitem:result[item]=Nonecontinuename,value=item.split("=",1)ifvalue[:1]==value[-1:]=='"':value=unquote_header_value(value[1:-1])result[name]=valuereturnresult# From mitsuhiko/werkzeug (used with permission).defunquote_header_value(value,is_filename=False):r"""Unquotes a header value.  (Reversal of :func:`quote_header_value`).    This does not use the real unquoting but what browsers are actually    using for quoting.    :param value: the header value to unquote.    :rtype: str    """ifvalueandvalue[0]==value[-1]=='"':# this is not the real unquoting, but fixing this so that the# RFC is met will result in bugs with internet explorer and# probably some other browsers as well.  IE for example is# uploading files with "C:\foo\bar.txt" as filenamevalue=value[1:-1]# if this is a filename and the starting characters look like# a UNC path, then just return the value without quotes.  Using the# replace sequence below on a UNC path has the effect of turning# the leading double slash into a single slash and then# _fix_ie_filename() doesn't work correctly.  See #458.ifnotis_filenameorvalue[:2]!="\\\\":returnvalue.replace("\\\\","\\").replace('\\"','"')returnvalue
[docs]defdict_from_cookiejar(cj):"""Returns a key/value dictionary from a CookieJar. :param cj: CookieJar object to extract cookies from. :rtype: dict """cookie_dict={cookie.name:cookie.valueforcookieincj}returncookie_dict
[docs]defadd_dict_to_cookiejar(cj,cookie_dict):"""Returns a CookieJar from a key/value dictionary. :param cj: CookieJar to insert cookies into. :param cookie_dict: Dict of key/values to insert into CookieJar. :rtype: CookieJar """returncookiejar_from_dict(cookie_dict,cj)
[docs]defget_encodings_from_content(content):"""Returns encodings from given content string. :param content: bytestring to extract encodings from. """warnings.warn(("In requests 3.0, get_encodings_from_content will be removed. For ""more information, please see the discussion on issue #2266. (This"" warning should only appear once.)"),DeprecationWarning,)charset_re=re.compile(r'<meta.*?charset=["\']*(.+?)["\'>]',flags=re.I)pragma_re=re.compile(r'<meta.*?content=["\']*;?charset=(.+?)["\'>]',flags=re.I)xml_re=re.compile(r'^<\?xml.*?encoding=["\']*(.+?)["\'>]')return(charset_re.findall(content)+pragma_re.findall(content)+xml_re.findall(content))
def_parse_content_type_header(header):"""Returns content type and parameters from given header :param header: string :return: tuple containing content type and dictionary of parameters """tokens=header.split(";")content_type,params=tokens[0].strip(),tokens[1:]params_dict={}items_to_strip="\"' "forparaminparams:param=param.strip()ifparam:key,value=param,Trueindex_of_equals=param.find("=")ifindex_of_equals!=-1:key=param[:index_of_equals].strip(items_to_strip)value=param[index_of_equals+1:].strip(items_to_strip)params_dict[key.lower()]=valuereturncontent_type,params_dict
[docs]defget_encoding_from_headers(headers):"""Returns encodings from given HTTP Header Dict. :param headers: dictionary to extract encoding from. :rtype: str """content_type=headers.get("content-type")ifnotcontent_type:returnNonecontent_type,params=_parse_content_type_header(content_type)if"charset"inparams:returnparams["charset"].strip("'\"")if"text"incontent_type:return"ISO-8859-1"if"application/json"incontent_type:# Assume UTF-8 based on RFC 4627: https://www.ietf.org/rfc/rfc4627.txt since the charset was unsetreturn"utf-8"
defstream_decode_response_unicode(iterator,r):"""Stream decodes an iterator."""ifr.encodingisNone:yield fromiteratorreturndecoder=codecs.getincrementaldecoder(r.encoding)(errors="replace")forchunkiniterator:rv=decoder.decode(chunk)ifrv:yieldrvrv=decoder.decode(b"",final=True)ifrv:yieldrvdefiter_slices(string,slice_length):"""Iterate over slices of a string."""pos=0ifslice_lengthisNoneorslice_length<=0:slice_length=len(string)whilepos<len(string):yieldstring[pos:pos+slice_length]pos+=slice_length
[docs]defget_unicode_from_response(r):"""Returns the requested content back in unicode. :param r: Response object to get unicode content from. Tried: 1. charset from content-type 2. fall back and replace all unicode characters :rtype: str """warnings.warn(("In requests 3.0, get_unicode_from_response will be removed. For ""more information, please see the discussion on issue #2266. (This"" warning should only appear once.)"),DeprecationWarning,)tried_encodings=[]# Try charset from content-typeencoding=get_encoding_from_headers(r.headers)ifencoding:try:returnstr(r.content,encoding)exceptUnicodeError:tried_encodings.append(encoding)# Fall back:try:returnstr(r.content,encoding,errors="replace")exceptTypeError:returnr.content
# The unreserved URI characters (RFC 3986)UNRESERVED_SET=frozenset("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"+"0123456789-._~")defunquote_unreserved(uri):"""Un-escape any percent-escape sequences in a URI that are unreserved characters. This leaves all reserved, illegal and non-ASCII bytes encoded. :rtype: str """parts=uri.split("%")foriinrange(1,len(parts)):h=parts[i][0:2]iflen(h)==2andh.isalnum():try:c=chr(int(h,16))exceptValueError:raiseInvalidURL(f"Invalid percent-escape sequence: '{h}'")ifcinUNRESERVED_SET:parts[i]=c+parts[i][2:]else:parts[i]=f"%{parts[i]}"else:parts[i]=f"%{parts[i]}"return"".join(parts)defrequote_uri(uri):"""Re-quote the given URI. This function passes the given URI through an unquote/quote cycle to ensure that it is fully and consistently quoted. :rtype: str """safe_with_percent="!#$%&'()*+,/:;=?@[]~"safe_without_percent="!#$&'()*+,/:;=?@[]~"try:# Unquote only the unreserved characters# Then quote only illegal characters (do not quote reserved,# unreserved, or '%')returnquote(unquote_unreserved(uri),safe=safe_with_percent)exceptInvalidURL:# We couldn't unquote the given URI, so let's try quoting it, but# there may be unquoted '%'s in the URI. We need to make sure they're# properly quoted so they do not cause issues elsewhere.returnquote(uri,safe=safe_without_percent)defaddress_in_network(ip,net):"""This function allows you to check if an IP belongs to a network subnet Example: returns True if ip = 192.168.1.1 and net = 192.168.1.0/24 returns False if ip = 192.168.1.1 and net = 192.168.100.0/24 :rtype: bool """ipaddr=struct.unpack("=L",socket.inet_aton(ip))[0]netaddr,bits=net.split("/")netmask=struct.unpack("=L",socket.inet_aton(dotted_netmask(int(bits))))[0]network=struct.unpack("=L",socket.inet_aton(netaddr))[0]&netmaskreturn(ipaddr&netmask)==(network&netmask)defdotted_netmask(mask):"""Converts mask from /xx format to xxx.xxx.xxx.xxx Example: if mask is 24 function returns 255.255.255.0 :rtype: str """bits=0xFFFFFFFF^(1<<32-mask)-1returnsocket.inet_ntoa(struct.pack(">I",bits))defis_ipv4_address(string_ip):""" :rtype: bool """try:socket.inet_aton(string_ip)exceptOSError:returnFalsereturnTruedefis_valid_cidr(string_network):""" Very simple check of the cidr format in no_proxy variable. :rtype: bool """ifstring_network.count("/")==1:try:mask=int(string_network.split("/")[1])exceptValueError:returnFalseifmask<1ormask>32:returnFalsetry:socket.inet_aton(string_network.split("/")[0])exceptOSError:returnFalseelse:returnFalsereturnTrue@contextlib.contextmanagerdefset_environ(env_name,value):"""Set the environment variable 'env_name' to 'value' Save previous value, yield, and then restore the previous value stored in the environment variable 'env_name'. If 'value' is None, do nothing"""value_changed=valueisnotNoneifvalue_changed:old_value=os.environ.get(env_name)os.environ[env_name]=valuetry:yieldfinally:ifvalue_changed:ifold_valueisNone:delos.environ[env_name]else:os.environ[env_name]=old_valuedefshould_bypass_proxies(url,no_proxy):""" Returns whether we should bypass proxies or not. :rtype: bool """# Prioritize lowercase environment variables over uppercase# to keep a consistent behaviour with other http projects (curl, wget).defget_proxy(key):returnos.environ.get(key)oros.environ.get(key.upper())# First check whether no_proxy is defined. If it is, check that the URL# we're getting isn't in the no_proxy list.no_proxy_arg=no_proxyifno_proxyisNone:no_proxy=get_proxy("no_proxy")parsed=urlparse(url)ifparsed.hostnameisNone:# URLs don't always have hostnames, e.g. file:/// urls.returnTrueifno_proxy:# We need to check whether we match here. We need to see if we match# the end of the hostname, both with and without the port.no_proxy=(hostforhostinno_proxy.replace(" ","").split(",")ifhost)ifis_ipv4_address(parsed.hostname):forproxy_ipinno_proxy:ifis_valid_cidr(proxy_ip):ifaddress_in_network(parsed.hostname,proxy_ip):returnTrueelifparsed.hostname==proxy_ip:# If no_proxy ip was defined in plain IP notation instead of cidr notation &# matches the IP of the indexreturnTrueelse:host_with_port=parsed.hostnameifparsed.port:host_with_port+=f":{parsed.port}"forhostinno_proxy:ifparsed.hostname.endswith(host)orhost_with_port.endswith(host):# The URL does match something in no_proxy, so we don't want# to apply the proxies on this URL.returnTruewithset_environ("no_proxy",no_proxy_arg):# parsed.hostname can be `None` in cases such as a file URI.try:bypass=proxy_bypass(parsed.hostname)except(TypeError,socket.gaierror):bypass=Falseifbypass:returnTruereturnFalsedefget_environ_proxies(url,no_proxy=None):""" Return a dict of environment proxies. :rtype: dict """ifshould_bypass_proxies(url,no_proxy=no_proxy):return{}else:returngetproxies()defselect_proxy(url,proxies):"""Select a proxy for the url, if applicable. :param url: The url being for the request :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs """proxies=proxiesor{}urlparts=urlparse(url)ifurlparts.hostnameisNone:returnproxies.get(urlparts.scheme,proxies.get("all"))proxy_keys=[urlparts.scheme+"://"+urlparts.hostname,urlparts.scheme,"all://"+urlparts.hostname,"all",]proxy=Noneforproxy_keyinproxy_keys:ifproxy_keyinproxies:proxy=proxies[proxy_key]breakreturnproxydefresolve_proxies(request,proxies,trust_env=True):"""This method takes proxy information from a request and configuration input to resolve a mapping of target proxies. This will consider settings such as NO_PROXY to strip proxy configurations. :param request: Request or PreparedRequest :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs :param trust_env: Boolean declaring whether to trust environment configs :rtype: dict """proxies=proxiesifproxiesisnotNoneelse{}url=request.urlscheme=urlparse(url).schemeno_proxy=proxies.get("no_proxy")new_proxies=proxies.copy()iftrust_envandnotshould_bypass_proxies(url,no_proxy=no_proxy):environ_proxies=get_environ_proxies(url,no_proxy=no_proxy)proxy=environ_proxies.get(scheme,environ_proxies.get("all"))ifproxy:new_proxies.setdefault(scheme,proxy)returnnew_proxiesdefdefault_user_agent(name="python-requests"):""" Return a string representing the default user agent. :rtype: str """returnf"{name}/{__version__}"defdefault_headers():""" :rtype: requests.structures.CaseInsensitiveDict """returnCaseInsensitiveDict({"User-Agent":default_user_agent(),"Accept-Encoding":DEFAULT_ACCEPT_ENCODING,"Accept":"*/*","Connection":"keep-alive",})defparse_header_links(value):"""Return a list of parsed link headers proxies. i.e. Link: <http:/.../front.jpeg>; rel=front; type="image/jpeg",<http://.../back.jpeg>; rel=back;type="image/jpeg" :rtype: list """links=[]replace_chars=" '\""value=value.strip(replace_chars)ifnotvalue:returnlinksforvalinre.split(", *<",value):try:url,params=val.split(";",1)exceptValueError:url,params=val,""link={"url":url.strip("<> '\"")}forparaminparams.split(";"):try:key,value=param.split("=")exceptValueError:breaklink[key.strip(replace_chars)]=value.strip(replace_chars)links.append(link)returnlinks# Null bytes; no need to recreate these on each call to guess_json_utf_null="\x00".encode("ascii")# encoding to ASCII for Python 3_null2=_null*2_null3=_null*3defguess_json_utf(data):""" :rtype: str """# JSON always starts with two ASCII characters, so detection is as# easy as counting the nulls and from their location and count# determine the encoding. Also detect a BOM, if present.sample=data[:4]ifsamplein(codecs.BOM_UTF32_LE,codecs.BOM_UTF32_BE):return"utf-32"# BOM includedifsample[:3]==codecs.BOM_UTF8:return"utf-8-sig"# BOM included, MS style (discouraged)ifsample[:2]in(codecs.BOM_UTF16_LE,codecs.BOM_UTF16_BE):return"utf-16"# BOM includednullcount=sample.count(_null)ifnullcount==0:return"utf-8"ifnullcount==2:ifsample[::2]==_null2:# 1st and 3rd are nullreturn"utf-16-be"ifsample[1::2]==_null2:# 2nd and 4th are nullreturn"utf-16-le"# Did not detect 2 valid UTF-16 ascii-range charactersifnullcount==3:ifsample[:3]==_null3:return"utf-32-be"ifsample[1:]==_null3:return"utf-32-le"# Did not detect a valid UTF-32 ascii-range characterreturnNonedefprepend_scheme_if_needed(url,new_scheme):"""Given a URL that may or may not have a scheme, prepend the given scheme. Does not replace a present scheme with the one provided as an argument. :rtype: str """parsed=parse_url(url)scheme,auth,host,port,path,query,fragment=parsed# A defect in urlparse determines that there isn't a netloc present in some# urls. We previously assumed parsing was overly cautious, and swapped the# netloc and path. Due to a lack of tests on the original defect, this is# maintained with parse_url for backwards compatibility.netloc=parsed.netlocifnotnetloc:netloc,path=path,netlocifauth:# parse_url doesn't provide the netloc with auth# so we'll add it ourselves.netloc="@".join([auth,netloc])ifschemeisNone:scheme=new_schemeifpathisNone:path=""returnurlunparse((scheme,netloc,path,"",query,fragment))defget_auth_from_url(url):"""Given a url with authentication components, extract them into a tuple of username,password. :rtype: (str,str) """parsed=urlparse(url)try:auth=(unquote(parsed.username),unquote(parsed.password))except(AttributeError,TypeError):auth=("","")returnauthdefcheck_header_validity(header):"""Verifies that header parts don't contain leading whitespace reserved characters, or return characters. :param header: tuple, in the format (name, value). """name,value=header_validate_header_part(header,name,0)_validate_header_part(header,value,1)def_validate_header_part(header,header_part,header_validator_index):ifisinstance(header_part,str):validator=_HEADER_VALIDATORS_STR[header_validator_index]elifisinstance(header_part,bytes):validator=_HEADER_VALIDATORS_BYTE[header_validator_index]else:raiseInvalidHeader(f"Header part ({header_part!r}) from{header} "f"must be of type str or bytes, not{type(header_part)}")ifnotvalidator.match(header_part):header_kind="name"ifheader_validator_index==0else"value"raiseInvalidHeader(f"Invalid leading whitespace, reserved character(s), or return "f"character(s) in header{header_kind}:{header_part!r}")defurldefragauth(url):""" Given a url remove the fragment and the authentication part. :rtype: str """scheme,netloc,path,params,query,fragment=urlparse(url)# see func:`prepend_scheme_if_needed`ifnotnetloc:netloc,path=path,netlocnetloc=netloc.rsplit("@",1)[-1]returnurlunparse((scheme,netloc,path,params,query,""))defrewind_body(prepared_request):"""Move file pointer back to its recorded starting position so it can be read again on redirect. """body_seek=getattr(prepared_request.body,"seek",None)ifbody_seekisnotNoneandisinstance(prepared_request._body_position,integer_types):try:body_seek(prepared_request._body_position)exceptOSError:raiseUnrewindableBodyError("An error occurred when rewinding request body for redirect.")else:raiseUnrewindableBodyError("Unable to rewind request body for redirect.")