"""requests.sessions~~~~~~~~~~~~~~~~~This module provides a Session object to manage and persist settings acrossrequests (cookies, auth, proxies)."""importosimportsysimporttimefromcollectionsimportOrderedDictfromdatetimeimporttimedeltafrom._internal_utilsimportto_native_stringfrom.adaptersimportHTTPAdapterfrom.authimport_basic_auth_strfrom.compatimportMapping,cookielib,urljoin,urlparsefrom.cookiesimport(RequestsCookieJar,cookiejar_from_dict,extract_cookies_to_jar,merge_cookies,)from.exceptionsimport(ChunkedEncodingError,ContentDecodingError,InvalidSchema,TooManyRedirects,)from.hooksimportdefault_hooks,dispatch_hook# formerly defined here, reexposed here for backward compatibilityfrom.modelsimport(# noqa: F401DEFAULT_REDIRECT_LIMIT,REDIRECT_STATI,PreparedRequest,Request,)from.status_codesimportcodesfrom.structuresimportCaseInsensitiveDictfrom.utilsimport(# noqa: F401DEFAULT_PORTS,default_headers,get_auth_from_url,get_environ_proxies,get_netrc_auth,requote_uri,resolve_proxies,rewind_body,should_bypass_proxies,to_key_val_list,)# Preferred clock, based on which one is more accurate on a given system.ifsys.platform=="win32":preferred_clock=time.perf_counterelse:preferred_clock=time.timedefmerge_setting(request_setting,session_setting,dict_class=OrderedDict):"""Determines appropriate setting for a given request, taking into account the explicit setting on that request, and the setting in the session. If a setting is a dictionary, they will be merged together using `dict_class` """ifsession_settingisNone:returnrequest_settingifrequest_settingisNone:returnsession_setting# Bypass if not a dictionary (e.g. verify)ifnot(isinstance(session_setting,Mapping)andisinstance(request_setting,Mapping)):returnrequest_settingmerged_setting=dict_class(to_key_val_list(session_setting))merged_setting.update(to_key_val_list(request_setting))# Remove keys that are set to None. Extract keys first to avoid altering# the dictionary during iteration.none_keys=[kfor(k,v)inmerged_setting.items()ifvisNone]forkeyinnone_keys:delmerged_setting[key]returnmerged_settingdefmerge_hooks(request_hooks,session_hooks,dict_class=OrderedDict):"""Properly merges both requests and session hooks. This is necessary because when request_hooks == {'response': []}, the merge breaks Session hooks entirely. """ifsession_hooksisNoneorsession_hooks.get("response")==[]:returnrequest_hooksifrequest_hooksisNoneorrequest_hooks.get("response")==[]:returnsession_hooksreturnmerge_setting(request_hooks,session_hooks,dict_class)classSessionRedirectMixin:defget_redirect_target(self,resp):"""Receives a Response. Returns a redirect URI or ``None``"""# Due to the nature of how requests processes redirects this method will# be called at least once upon the original response and at least twice# on each subsequent redirect response (if any).# If a custom mixin is used to handle this logic, it may be advantageous# to cache the redirect location onto the response object as a private# attribute.ifresp.is_redirect:location=resp.headers["location"]# Currently the underlying http module on py3 decode headers# in latin1, but empirical evidence suggests that latin1 is very# rarely used with non-ASCII characters in HTTP headers.# It is more likely to get UTF8 header rather than latin1.# This causes incorrect handling of UTF8 encoded location headers.# To solve this, we re-encode the location in latin1.location=location.encode("latin1")returnto_native_string(location,"utf8")returnNonedefshould_strip_auth(self,old_url,new_url):"""Decide whether Authorization header should be removed when redirecting"""old_parsed=urlparse(old_url)new_parsed=urlparse(new_url)ifold_parsed.hostname!=new_parsed.hostname:returnTrue# Special case: allow http -> https redirect when using the standard# ports. This isn't specified by RFC 7235, but is kept to avoid# breaking backwards compatibility with older versions of requests# that allowed any redirects on the same host.if(old_parsed.scheme=="http"andold_parsed.portin(80,None)andnew_parsed.scheme=="https"andnew_parsed.portin(443,None)):returnFalse# Handle default port usage corresponding to scheme.changed_port=old_parsed.port!=new_parsed.portchanged_scheme=old_parsed.scheme!=new_parsed.schemedefault_port=(DEFAULT_PORTS.get(old_parsed.scheme,None),None)if(notchanged_schemeandold_parsed.portindefault_portandnew_parsed.portindefault_port):returnFalse# Standard case: root URI must matchreturnchanged_portorchanged_schemedefresolve_redirects(self,resp,req,stream=False,timeout=None,verify=True,cert=None,proxies=None,yield_requests=False,**adapter_kwargs,):"""Receives a Response. Returns a generator of Responses or Requests."""hist=[]# keep track of historyurl=self.get_redirect_target(resp)previous_fragment=urlparse(req.url).fragmentwhileurl:prepared_request=req.copy()# Update history and keep track of redirects.# resp.history must ignore the original request in this loophist.append(resp)resp.history=hist[1:]try:resp.content# Consume socket so it can be releasedexcept(ChunkedEncodingError,ContentDecodingError,RuntimeError):resp.raw.read(decode_content=False)iflen(resp.history)>=self.max_redirects:raiseTooManyRedirects(f"Exceeded{self.max_redirects} redirects.",response=resp)# Release the connection back into the pool.resp.close()# Handle redirection without scheme (see: RFC 1808 Section 4)ifurl.startswith("//"):parsed_rurl=urlparse(resp.url)url=":".join([to_native_string(parsed_rurl.scheme),url])# Normalize url case and attach previous fragment if needed (RFC 7231 7.1.2)parsed=urlparse(url)ifparsed.fragment==""andprevious_fragment:parsed=parsed._replace(fragment=previous_fragment)elifparsed.fragment:previous_fragment=parsed.fragmenturl=parsed.geturl()# Facilitate relative 'location' headers, as allowed by RFC 7231.# (e.g. '/path/to/resource' instead of 'http://domain.tld/path/to/resource')# Compliant with RFC3986, we percent encode the url.ifnotparsed.netloc:url=urljoin(resp.url,requote_uri(url))else:url=requote_uri(url)prepared_request.url=to_native_string(url)self.rebuild_method(prepared_request,resp)# https://github.com/psf/requests/issues/1084ifresp.status_codenotin(codes.temporary_redirect,codes.permanent_redirect,):# https://github.com/psf/requests/issues/3490purged_headers=("Content-Length","Content-Type","Transfer-Encoding")forheaderinpurged_headers:prepared_request.headers.pop(header,None)prepared_request.body=Noneheaders=prepared_request.headersheaders.pop("Cookie",None)# Extract any cookies sent on the response to the cookiejar# in the new request. Because we've mutated our copied prepared# request, use the old one that we haven't yet touched.extract_cookies_to_jar(prepared_request._cookies,req,resp.raw)merge_cookies(prepared_request._cookies,self.cookies)prepared_request.prepare_cookies(prepared_request._cookies)# Rebuild auth and proxy information.proxies=self.rebuild_proxies(prepared_request,proxies)self.rebuild_auth(prepared_request,resp)# A failed tell() sets `_body_position` to `object()`. This non-None# value ensures `rewindable` will be True, allowing us to raise an# UnrewindableBodyError, instead of hanging the connection.rewindable=prepared_request._body_positionisnotNoneand("Content-Length"inheadersor"Transfer-Encoding"inheaders)# Attempt to rewind consumed file-like object.ifrewindable:rewind_body(prepared_request)# Override the original request.req=prepared_requestifyield_requests:yieldreqelse:resp=self.send(req,stream=stream,timeout=timeout,verify=verify,cert=cert,proxies=proxies,allow_redirects=False,**adapter_kwargs,)extract_cookies_to_jar(self.cookies,prepared_request,resp.raw)# extract redirect url, if any, for the next loopurl=self.get_redirect_target(resp)yieldrespdefrebuild_auth(self,prepared_request,response):"""When being redirected we may want to strip authentication from the request to avoid leaking credentials. This method intelligently removes and reapplies authentication where possible to avoid credential loss. """headers=prepared_request.headersurl=prepared_request.urlif"Authorization"inheadersandself.should_strip_auth(response.request.url,url):# If we get redirected to a new host, we should strip out any# authentication headers.delheaders["Authorization"]# .netrc might have more auth for us on our new host.new_auth=get_netrc_auth(url)ifself.trust_envelseNoneifnew_authisnotNone:prepared_request.prepare_auth(new_auth)defrebuild_proxies(self,prepared_request,proxies):"""This method re-evaluates the proxy configuration by considering the environment variables. If we are redirected to a URL covered by NO_PROXY, we strip the proxy configuration. Otherwise, we set missing proxy keys for this URL (in case they were stripped by a previous redirect). This method also replaces the Proxy-Authorization header where necessary. :rtype: dict """headers=prepared_request.headersscheme=urlparse(prepared_request.url).schemenew_proxies=resolve_proxies(prepared_request,proxies,self.trust_env)if"Proxy-Authorization"inheaders:delheaders["Proxy-Authorization"]try:username,password=get_auth_from_url(new_proxies[scheme])exceptKeyError:username,password=None,None# urllib3 handles proxy authorization for us in the standard adapter.# Avoid appending this to TLS tunneled requests where it may be leaked.ifnotscheme.startswith("https")andusernameandpassword:headers["Proxy-Authorization"]=_basic_auth_str(username,password)returnnew_proxiesdefrebuild_method(self,prepared_request,response):"""When being redirected we may want to change the method of the request based on certain specs or browser behavior. """method=prepared_request.method# https://tools.ietf.org/html/rfc7231#section-6.4.4ifresponse.status_code==codes.see_otherandmethod!="HEAD":method="GET"# Do what the browsers do, despite standards...# First, turn 302s into GETs.ifresponse.status_code==codes.foundandmethod!="HEAD":method="GET"# Second, if a POST is responded to with a 301, turn it into a GET.# This bizarre behaviour is explained in Issue 1704.ifresponse.status_code==codes.movedandmethod=="POST":method="GET"prepared_request.method=method[docs]classSession(SessionRedirectMixin):"""A Requests session. Provides cookie persistence, connection-pooling, and configuration. Basic Usage:: >>> import requests >>> s = requests.Session() >>> s.get('https://httpbin.org/get') <Response [200]> Or as a context manager:: >>> with requests.Session() as s: ... s.get('https://httpbin.org/get') <Response [200]> """__attrs__=["headers","cookies","auth","proxies","hooks","params","verify","cert","adapters","stream","trust_env","max_redirects",]def__init__(self):#: A case-insensitive dictionary of headers to be sent on each#: :class:`Request <Request>` sent from this#: :class:`Session <Session>`.self.headers=default_headers()#: Default Authentication tuple or object to attach to#: :class:`Request <Request>`.self.auth=None#: Dictionary mapping protocol or protocol and host to the URL of the proxy#: (e.g. {'http': 'foo.bar:3128', 'http://host.name': 'foo.bar:4012'}) to#: be used on each :class:`Request <Request>`.self.proxies={}#: Event-handling hooks.self.hooks=default_hooks()#: Dictionary of querystring data to attach to each#: :class:`Request <Request>`. The dictionary values may be lists for#: representing multivalued query parameters.self.params={}#: Stream response content default.self.stream=False#: SSL Verification default.#: Defaults to `True`, requiring requests to verify the TLS certificate at the#: remote end.#: If verify is set to `False`, requests will accept any TLS certificate#: presented by the server, and will ignore hostname mismatches and/or#: expired certificates, which will make your application vulnerable to#: man-in-the-middle (MitM) attacks.#: Only set this to `False` for testing.self.verify=True#: SSL client certificate default, if String, path to ssl client#: cert file (.pem). If Tuple, ('cert', 'key') pair.self.cert=None#: Maximum number of redirects allowed. If the request exceeds this#: limit, a :class:`TooManyRedirects` exception is raised.#: This defaults to requests.models.DEFAULT_REDIRECT_LIMIT, which is#: 30.self.max_redirects=DEFAULT_REDIRECT_LIMIT#: Trust environment settings for proxy configuration, default#: authentication and similar.self.trust_env=True#: A CookieJar containing all currently outstanding cookies set on this#: session. By default it is a#: :class:`RequestsCookieJar <requests.cookies.RequestsCookieJar>`, but#: may be any other ``cookielib.CookieJar`` compatible object.self.cookies=cookiejar_from_dict({})# Default connection adapters.self.adapters=OrderedDict()self.mount("https://",HTTPAdapter())self.mount("http://",HTTPAdapter())def__enter__(self):returnselfdef__exit__(self,*args):self.close()[docs]defprepare_request(self,request):"""Constructs a :class:`PreparedRequest <PreparedRequest>` for transmission and returns it. The :class:`PreparedRequest` has settings merged from the :class:`Request <Request>` instance and those of the :class:`Session`. :param request: :class:`Request` instance to prepare with this session's settings. :rtype: requests.PreparedRequest """cookies=request.cookiesor{}# Bootstrap CookieJar.ifnotisinstance(cookies,cookielib.CookieJar):cookies=cookiejar_from_dict(cookies)# Merge with session cookiesmerged_cookies=merge_cookies(merge_cookies(RequestsCookieJar(),self.cookies),cookies)# Set environment's basic authentication if not explicitly set.auth=request.authifself.trust_envandnotauthandnotself.auth:auth=get_netrc_auth(request.url)p=PreparedRequest()p.prepare(method=request.method.upper(),url=request.url,files=request.files,data=request.data,json=request.json,headers=merge_setting(request.headers,self.headers,dict_class=CaseInsensitiveDict),params=merge_setting(request.params,self.params),auth=merge_setting(auth,self.auth),cookies=merged_cookies,hooks=merge_hooks(request.hooks,self.hooks),)returnp [docs]defrequest(self,method,url,params=None,data=None,headers=None,cookies=None,files=None,auth=None,timeout=None,allow_redirects=True,proxies=None,hooks=None,stream=None,verify=None,cert=None,json=None,):"""Constructs a :class:`Request <Request>`, prepares it and sends it. Returns :class:`Response <Response>` object. :param method: method for the new :class:`Request` object. :param url: URL for the new :class:`Request` object. :param params: (optional) Dictionary or bytes to be sent in the query string for the :class:`Request`. :param data: (optional) Dictionary, list of tuples, bytes, or file-like object to send in the body of the :class:`Request`. :param json: (optional) json to send in the body of the :class:`Request`. :param headers: (optional) Dictionary of HTTP Headers to send with the :class:`Request`. :param cookies: (optional) Dict or CookieJar object to send with the :class:`Request`. :param files: (optional) Dictionary of ``'filename': file-like-objects`` for multipart encoding upload. :param auth: (optional) Auth tuple or callable to enable Basic/Digest/Custom HTTP Auth. :param timeout: (optional) How many seconds to wait for the server to send data before giving up, as a float, or a :ref:`(connect timeout, read timeout) <timeouts>` tuple. :type timeout: float or tuple :param allow_redirects: (optional) Set to True by default. :type allow_redirects: bool :param proxies: (optional) Dictionary mapping protocol or protocol and hostname to the URL of the proxy. :param hooks: (optional) Dictionary mapping hook name to one event or list of events, event must be callable. :param stream: (optional) whether to immediately download the response content. Defaults to ``False``. :param verify: (optional) Either a boolean, in which case it controls whether we verify the server's TLS certificate, or a string, in which case it must be a path to a CA bundle to use. Defaults to ``True``. When set to ``False``, requests will accept any TLS certificate presented by the server, and will ignore hostname mismatches and/or expired certificates, which will make your application vulnerable to man-in-the-middle (MitM) attacks. Setting verify to ``False`` may be useful during local development or testing. :param cert: (optional) if String, path to ssl client cert file (.pem). If Tuple, ('cert', 'key') pair. :rtype: requests.Response """# Create the Request.req=Request(method=method.upper(),url=url,headers=headers,files=files,data=dataor{},json=json,params=paramsor{},auth=auth,cookies=cookies,hooks=hooks,)prep=self.prepare_request(req)proxies=proxiesor{}settings=self.merge_environment_settings(prep.url,proxies,stream,verify,cert)# Send the request.send_kwargs={"timeout":timeout,"allow_redirects":allow_redirects,}send_kwargs.update(settings)resp=self.send(prep,**send_kwargs)returnresp [docs]defget(self,url,**kwargs):r"""Sends a GET request. Returns :class:`Response` object. :param url: URL for the new :class:`Request` object. :param \*\*kwargs: Optional arguments that ``request`` takes. :rtype: requests.Response """kwargs.setdefault("allow_redirects",True)returnself.request("GET",url,**kwargs) [docs]defoptions(self,url,**kwargs):r"""Sends a OPTIONS request. Returns :class:`Response` object. :param url: URL for the new :class:`Request` object. :param \*\*kwargs: Optional arguments that ``request`` takes. :rtype: requests.Response """kwargs.setdefault("allow_redirects",True)returnself.request("OPTIONS",url,**kwargs) [docs]defhead(self,url,**kwargs):r"""Sends a HEAD request. Returns :class:`Response` object. :param url: URL for the new :class:`Request` object. :param \*\*kwargs: Optional arguments that ``request`` takes. :rtype: requests.Response """kwargs.setdefault("allow_redirects",False)returnself.request("HEAD",url,**kwargs) [docs]defpost(self,url,data=None,json=None,**kwargs):r"""Sends a POST request. Returns :class:`Response` object. :param url: URL for the new :class:`Request` object. :param data: (optional) Dictionary, list of tuples, bytes, or file-like object to send in the body of the :class:`Request`. :param json: (optional) json to send in the body of the :class:`Request`. :param \*\*kwargs: Optional arguments that ``request`` takes. :rtype: requests.Response """returnself.request("POST",url,data=data,json=json,**kwargs) [docs]defput(self,url,data=None,**kwargs):r"""Sends a PUT request. Returns :class:`Response` object. :param url: URL for the new :class:`Request` object. :param data: (optional) Dictionary, list of tuples, bytes, or file-like object to send in the body of the :class:`Request`. :param \*\*kwargs: Optional arguments that ``request`` takes. :rtype: requests.Response """returnself.request("PUT",url,data=data,**kwargs) [docs]defpatch(self,url,data=None,**kwargs):r"""Sends a PATCH request. Returns :class:`Response` object. :param url: URL for the new :class:`Request` object. :param data: (optional) Dictionary, list of tuples, bytes, or file-like object to send in the body of the :class:`Request`. :param \*\*kwargs: Optional arguments that ``request`` takes. :rtype: requests.Response """returnself.request("PATCH",url,data=data,**kwargs) [docs]defdelete(self,url,**kwargs):r"""Sends a DELETE request. Returns :class:`Response` object. :param url: URL for the new :class:`Request` object. :param \*\*kwargs: Optional arguments that ``request`` takes. :rtype: requests.Response """returnself.request("DELETE",url,**kwargs) [docs]defsend(self,request,**kwargs):"""Send a given PreparedRequest. :rtype: requests.Response """# Set defaults that the hooks can utilize to ensure they always have# the correct parameters to reproduce the previous request.kwargs.setdefault("stream",self.stream)kwargs.setdefault("verify",self.verify)kwargs.setdefault("cert",self.cert)if"proxies"notinkwargs:kwargs["proxies"]=resolve_proxies(request,self.proxies,self.trust_env)# It's possible that users might accidentally send a Request object.# Guard against that specific failure case.ifisinstance(request,Request):raiseValueError("You can only send PreparedRequests.")# Set up variables needed for resolve_redirects and dispatching of hooksallow_redirects=kwargs.pop("allow_redirects",True)stream=kwargs.get("stream")hooks=request.hooks# Get the appropriate adapter to useadapter=self.get_adapter(url=request.url)# Start time (approximately) of the requeststart=preferred_clock()# Send the requestr=adapter.send(request,**kwargs)# Total elapsed time of the request (approximately)elapsed=preferred_clock()-startr.elapsed=timedelta(seconds=elapsed)# Response manipulation hooksr=dispatch_hook("response",hooks,r,**kwargs)# Persist cookiesifr.history:# If the hooks create history then we want those cookies tooforrespinr.history:extract_cookies_to_jar(self.cookies,resp.request,resp.raw)extract_cookies_to_jar(self.cookies,request,r.raw)# Resolve redirects if allowed.ifallow_redirects:# Redirect resolving generator.gen=self.resolve_redirects(r,request,**kwargs)history=[respforrespingen]else:history=[]# Shuffle things around if there's history.ifhistory:# Insert the first (original) request at the starthistory.insert(0,r)# Get the last request mader=history.pop()r.history=history# If redirects aren't being followed, store the response on the Request for Response.next().ifnotallow_redirects:try:r._next=next(self.resolve_redirects(r,request,yield_requests=True,**kwargs))exceptStopIteration:passifnotstream:r.contentreturnr [docs]defmerge_environment_settings(self,url,proxies,stream,verify,cert):""" Check the environment and merge it with some settings. :rtype: dict """# Gather clues from the surrounding environment.ifself.trust_env:# Set environment's proxies.no_proxy=proxies.get("no_proxy")ifproxiesisnotNoneelseNoneenv_proxies=get_environ_proxies(url,no_proxy=no_proxy)fork,vinenv_proxies.items():proxies.setdefault(k,v)# Look for requests environment configuration# and be compatible with cURL.ifverifyisTrueorverifyisNone:verify=(os.environ.get("REQUESTS_CA_BUNDLE")oros.environ.get("CURL_CA_BUNDLE")orverify)# Merge all the kwargs.proxies=merge_setting(proxies,self.proxies)stream=merge_setting(stream,self.stream)verify=merge_setting(verify,self.verify)cert=merge_setting(cert,self.cert)return{"proxies":proxies,"stream":stream,"verify":verify,"cert":cert} [docs]defget_adapter(self,url):""" Returns the appropriate connection adapter for the given URL. :rtype: requests.adapters.BaseAdapter """forprefix,adapterinself.adapters.items():ifurl.lower().startswith(prefix.lower()):returnadapter# Nothing matches :-/raiseInvalidSchema(f"No connection adapters were found for{url!r}") [docs]defclose(self):"""Closes all adapters and as such the session"""forvinself.adapters.values():v.close() [docs]defmount(self,prefix,adapter):"""Registers a connection adapter to a prefix. Adapters are sorted in descending order by prefix length. """self.adapters[prefix]=adapterkeys_to_move=[kforkinself.adaptersiflen(k)<len(prefix)]forkeyinkeys_to_move:self.adapters[key]=self.adapters.pop(key) def__getstate__(self):state={attr:getattr(self,attr,None)forattrinself.__attrs__}returnstatedef__setstate__(self,state):forattr,valueinstate.items():setattr(self,attr,value) defsession():""" Returns a :class:`Session` for context-management. .. deprecated:: 1.0.0 This method has been deprecated since version 1.0.0 and is only kept for backwards compatibility. New code should use :class:`~requests.sessions.Session` to create a session. This may be removed at a future date. :rtype: Session """returnSession()