"""requests.auth~~~~~~~~~~~~~This module contains the authentication handlers for Requests."""importhashlibimportosimportreimportthreadingimporttimeimportwarningsfrombase64importb64encodefrom._internal_utilsimportto_native_stringfrom.compatimportbasestring,str,urlparsefrom.cookiesimportextract_cookies_to_jarfrom.utilsimportparse_dict_headerCONTENT_TYPE_FORM_URLENCODED="application/x-www-form-urlencoded"CONTENT_TYPE_MULTI_PART="multipart/form-data"def_basic_auth_str(username,password):"""Returns a Basic Auth string."""# "I want us to put a big-ol' comment on top of it that# says that this behaviour is dumb but we need to preserve# it because people are relying on it."# - Lukasa## These are here solely to maintain backwards compatibility# for things like ints. This will be removed in 3.0.0.ifnotisinstance(username,basestring):warnings.warn("Non-string usernames will no longer be supported in Requests ""3.0.0. Please convert the object you've passed in ({!r}) to ""a string or bytes object in the near future to avoid ""problems.".format(username),category=DeprecationWarning,)username=str(username)ifnotisinstance(password,basestring):warnings.warn("Non-string passwords will no longer be supported in Requests ""3.0.0. Please convert the object you've passed in ({!r}) to ""a string or bytes object in the near future to avoid ""problems.".format(type(password)),category=DeprecationWarning,)password=str(password)# -- End Removal --ifisinstance(username,str):username=username.encode("latin1")ifisinstance(password,str):password=password.encode("latin1")authstr="Basic "+to_native_string(b64encode(b":".join((username,password))).strip())returnauthstr[docs]classAuthBase:"""Base class that all auth implementations derive from"""def__call__(self,r):raiseNotImplementedError("Auth hooks must be callable.") [docs]classHTTPBasicAuth(AuthBase):"""Attaches HTTP Basic Authentication to the given Request object."""def__init__(self,username,password):self.username=usernameself.password=passworddef__eq__(self,other):returnall([self.username==getattr(other,"username",None),self.password==getattr(other,"password",None),])def__ne__(self,other):returnnotself==otherdef__call__(self,r):r.headers["Authorization"]=_basic_auth_str(self.username,self.password)returnr [docs]classHTTPProxyAuth(HTTPBasicAuth):"""Attaches HTTP Proxy Authentication to a given Request object."""def__call__(self,r):r.headers["Proxy-Authorization"]=_basic_auth_str(self.username,self.password)returnr [docs]classHTTPDigestAuth(AuthBase):"""Attaches HTTP Digest Authentication to the given Request object."""def__init__(self,username,password):self.username=usernameself.password=password# Keep state in per-thread local storageself._thread_local=threading.local()definit_per_thread_state(self):# Ensure state is initialized just once per-threadifnothasattr(self._thread_local,"init"):self._thread_local.init=Trueself._thread_local.last_nonce=""self._thread_local.nonce_count=0self._thread_local.chal={}self._thread_local.pos=Noneself._thread_local.num_401_calls=Nonedefbuild_digest_header(self,method,url):""" :rtype: str """realm=self._thread_local.chal["realm"]nonce=self._thread_local.chal["nonce"]qop=self._thread_local.chal.get("qop")algorithm=self._thread_local.chal.get("algorithm")opaque=self._thread_local.chal.get("opaque")hash_utf8=NoneifalgorithmisNone:_algorithm="MD5"else:_algorithm=algorithm.upper()# lambdas assume digest modules are imported at the top levelif_algorithm=="MD5"or_algorithm=="MD5-SESS":defmd5_utf8(x):ifisinstance(x,str):x=x.encode("utf-8")returnhashlib.md5(x).hexdigest()hash_utf8=md5_utf8elif_algorithm=="SHA":defsha_utf8(x):ifisinstance(x,str):x=x.encode("utf-8")returnhashlib.sha1(x).hexdigest()hash_utf8=sha_utf8elif_algorithm=="SHA-256":defsha256_utf8(x):ifisinstance(x,str):x=x.encode("utf-8")returnhashlib.sha256(x).hexdigest()hash_utf8=sha256_utf8elif_algorithm=="SHA-512":defsha512_utf8(x):ifisinstance(x,str):x=x.encode("utf-8")returnhashlib.sha512(x).hexdigest()hash_utf8=sha512_utf8KD=lambdas,d:hash_utf8(f"{s}:{d}")# noqa:E731ifhash_utf8isNone:returnNone# XXX not implemented yetentdig=Nonep_parsed=urlparse(url)#: path is request-uri defined in RFC 2616 which should not be emptypath=p_parsed.pathor"/"ifp_parsed.query:path+=f"?{p_parsed.query}"A1=f"{self.username}:{realm}:{self.password}"A2=f"{method}:{path}"HA1=hash_utf8(A1)HA2=hash_utf8(A2)ifnonce==self._thread_local.last_nonce:self._thread_local.nonce_count+=1else:self._thread_local.nonce_count=1ncvalue=f"{self._thread_local.nonce_count:08x}"s=str(self._thread_local.nonce_count).encode("utf-8")s+=nonce.encode("utf-8")s+=time.ctime().encode("utf-8")s+=os.urandom(8)cnonce=hashlib.sha1(s).hexdigest()[:16]if_algorithm=="MD5-SESS":HA1=hash_utf8(f"{HA1}:{nonce}:{cnonce}")ifnotqop:respdig=KD(HA1,f"{nonce}:{HA2}")elifqop=="auth"or"auth"inqop.split(","):noncebit=f"{nonce}:{ncvalue}:{cnonce}:auth:{HA2}"respdig=KD(HA1,noncebit)else:# XXX handle auth-int.returnNoneself._thread_local.last_nonce=nonce# XXX should the partial digests be encoded too?base=(f'username="{self.username}", realm="{realm}", nonce="{nonce}", 'f'uri="{path}", response="{respdig}"')ifopaque:base+=f', opaque="{opaque}"'ifalgorithm:base+=f', algorithm="{algorithm}"'ifentdig:base+=f', digest="{entdig}"'ifqop:base+=f', qop="auth", nc={ncvalue}, cnonce="{cnonce}"'returnf"Digest{base}"defhandle_redirect(self,r,**kwargs):"""Reset num_401_calls counter on redirects."""ifr.is_redirect:self._thread_local.num_401_calls=1defhandle_401(self,r,**kwargs):""" Takes the given response and tries digest-auth, if needed. :rtype: requests.Response """# If response is not 4xx, do not auth# See https://github.com/psf/requests/issues/3772ifnot400<=r.status_code<500:self._thread_local.num_401_calls=1returnrifself._thread_local.posisnotNone:# Rewind the file position indicator of the body to where# it was to resend the request.r.request.body.seek(self._thread_local.pos)s_auth=r.headers.get("www-authenticate","")if"digest"ins_auth.lower()andself._thread_local.num_401_calls<2:self._thread_local.num_401_calls+=1pat=re.compile(r"digest ",flags=re.IGNORECASE)self._thread_local.chal=parse_dict_header(pat.sub("",s_auth,count=1))# Consume content and release the original connection# to allow our new request to reuse the same one.r.contentr.close()prep=r.request.copy()extract_cookies_to_jar(prep._cookies,r.request,r.raw)prep.prepare_cookies(prep._cookies)prep.headers["Authorization"]=self.build_digest_header(prep.method,prep.url)_r=r.connection.send(prep,**kwargs)_r.history.append(r)_r.request=prepreturn_rself._thread_local.num_401_calls=1returnrdef__call__(self,r):# Initialize per-thread state, if neededself.init_per_thread_state()# If we have a saved nonce, skip the 401ifself._thread_local.last_nonce:r.headers["Authorization"]=self.build_digest_header(r.method,r.url)try:self._thread_local.pos=r.body.tell()exceptAttributeError:# In the case of HTTPDigestAuth being reused and the body of# the previous request was a file-like object, pos has the# file position of the previous body. Ensure it's set to# None.self._thread_local.pos=Noner.register_hook("response",self.handle_401)r.register_hook("response",self.handle_redirect)self._thread_local.num_401_calls=1returnrdef__eq__(self,other):returnall([self.username==getattr(other,"username",None),self.password==getattr(other,"password",None),])def__ne__(self,other):returnnotself==other