Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Web

These are helpers for making various web requests.

Note that these helpers can be invoked directly fromself.helpers, e.g.:

self.helpers.request("https://www.evilcorp.com")

WebHelper

Bases:EngineClient

Source code inbbot/core/helpers/web/web.py
 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544
classWebHelper(EngineClient):SERVER_CLASS=HTTPEngineERROR_CLASS=WebError"""    Main utility class for managing HTTP operations in BBOT. It serves as a wrapper around the BBOTAsyncClient,    which itself is a subclass of httpx.AsyncClient. The class provides functionalities to make HTTP requests,    download files, and handle cached wordlists.    Attributes:        parent_helper (object): The parent helper object containing scan configurations.        http_debug (bool): Flag to indicate whether HTTP debugging is enabled.        ssl_verify (bool): Flag to indicate whether SSL verification is enabled.        web_client (BBOTAsyncClient): An instance of BBOTAsyncClient for making HTTP requests.        client_only_options (tuple): A tuple of options only applicable to the web client.    Examples:        Basic web request:        >>> response = await self.helpers.request("https://www.evilcorp.com")        Download file:        >>> filename = await self.helpers.download("https://www.evilcorp.com/passwords.docx")        Download wordlist (cached for 30 days by default):        >>> filename = await self.helpers.wordlist("https://www.evilcorp.com/wordlist.txt")    """def__init__(self,parent_helper):self.parent_helper=parent_helperself.preset=self.parent_helper.presetself.config=self.preset.configself.web_config=self.config.get("web",{})self.web_spider_depth=self.web_config.get("spider_depth",1)self.web_spider_distance=self.web_config.get("spider_distance",0)self.web_clients={}self.target=self.preset.targetself.ssl_verify=self.config.get("ssl_verify",False)engine_debug=self.config.get("engine",{}).get("debug",False)super().__init__(server_kwargs={"config":self.config,"target":self.parent_helper.preset.target},debug=engine_debug,)defAsyncClient(self,*args,**kwargs):# cache by retries to prevent unwanted accumulation of clients# (they are not garbage-collected)retries=kwargs.get("retries",1)try:returnself.web_clients[retries]exceptKeyError:from.clientimportBBOTAsyncClientclient=BBOTAsyncClient.from_config(self.config,self.target,*args,persist_cookies=False,**kwargs)self.web_clients[client.retries]=clientreturnclientasyncdefrequest(self,*args,**kwargs):"""        Asynchronous function for making HTTP requests, intended to be the most basic web request function        used widely across BBOT and within this helper class. Handles various exceptions and timeouts        that might occur during the request.        This function automatically respects the scan's global timeout, proxy, headers, etc.        Headers you specify will be merged with the scan's. Your arguments take ultimate precedence,        meaning you can override the scan's values if you want.        Args:            url (str): The URL to send the request to.            method (str, optional): The HTTP method to use for the request. Defaults to 'GET'.            headers (dict, optional): Dictionary of HTTP headers to send with the request.            params (dict, optional): Dictionary, list of tuples, or bytes to send in the query string.            cookies (dict, optional): Dictionary or CookieJar object containing cookies.            json (Any, optional): A JSON serializable Python object to send in the body.            data (dict, optional): Dictionary, list of tuples, or bytes to send in the body.            files (dict, optional): Dictionary of 'name': file-like-objects for multipart encoding upload.            auth (tuple, optional): Auth tuple to enable Basic/Digest/Custom HTTP auth.            timeout (float, optional): The maximum time to wait for the request to complete.            proxy (str, optional): HTTP proxy URL.            allow_redirects (bool, optional): Enables or disables redirection. Defaults to None.            stream (bool, optional): Enables or disables response streaming.            raise_error (bool, optional): Whether to raise exceptions for HTTP connect, timeout errors. Defaults to False.            client (httpx.AsyncClient, optional): A specific httpx.AsyncClient to use for the request. Defaults to self.web_client.            cache_for (int, optional): Time in seconds to cache the request. Not used currently. Defaults to None.        Raises:            httpx.TimeoutException: If the request times out.            httpx.ConnectError: If the connection fails.            httpx.RequestError: For other request-related errors.        Returns:            httpx.Response or None: The HTTP response object returned by the httpx library.        Examples:            >>> response = await self.helpers.request("https://www.evilcorp.com")            >>> response = await self.helpers.request("https://api.evilcorp.com/", method="POST", data="stuff")        Note:            If the web request fails, it will return None unless `raise_error` is `True`.        """raise_error=kwargs.get("raise_error",False)result=awaitself.run_and_return("request",*args,**kwargs)ifisinstance(result,dict)and"_request_error"inresult:ifraise_error:error_msg=result["_request_error"]response=result["_response"]error=self.ERROR_CLASS(error_msg)error.response=responseraiseerrorreturnresultasyncdefrequest_batch(self,urls,*args,**kwargs):"""        Given a list of URLs, request them in parallel and yield responses as they come in.        Args:            urls (list[str]): List of URLs to visit            *args: Positional arguments to pass through to httpx            **kwargs: Keyword arguments to pass through to httpx        Examples:            >>> async for url, response in self.helpers.request_batch(urls, headers={"X-Test": "Test"}):            >>>     if response is not None and response.status_code == 200:            >>>         self.hugesuccess(response)        """agen=self.run_and_yield("request_batch",urls,*args,**kwargs)while1:try:yieldawaitagen.__anext__()except(StopAsyncIteration,GeneratorExit):awaitagen.aclose()breakasyncdefrequest_custom_batch(self,urls_and_kwargs):"""        Make web requests in parallel with custom options for each request. Yield responses as they come in.        Similar to `request_batch` except it allows individual arguments for each URL.        Args:            urls_and_kwargs (list[tuple]): List of tuples in the format: (url, kwargs, custom_tracker)                where custom_tracker is an optional value for your own internal use. You may use it to                help correlate requests, etc.        Examples:            >>> urls_and_kwargs = [            >>>     ("http://evilcorp.com/1", {"method": "GET"}, "request-1"),            >>>     ("http://evilcorp.com/2", {"method": "POST"}, "request-2"),            >>> ]            >>> async for url, kwargs, custom_tracker, response in self.helpers.request_custom_batch(            >>>     urls_and_kwargs            >>> ):            >>>     if response is not None and response.status_code == 200:            >>>         self.hugesuccess(response)        """agen=self.run_and_yield("request_custom_batch",urls_and_kwargs)while1:try:yieldawaitagen.__anext__()except(StopAsyncIteration,GeneratorExit):awaitagen.aclose()breakasyncdefdownload(self,url,**kwargs):"""        Asynchronous function for downloading files from a given URL. Supports caching with an optional        time period in hours via the "cache_hrs" keyword argument. In case of successful download,        returns the full path of the saved filename. If the download fails, returns None.        Args:            url (str): The URL of the file to download.            filename (str, optional): The filename to save the downloaded file as.                If not provided, will generate based on URL.            max_size (str or int): Maximum filesize as a string ("5MB") or integer in bytes.            cache_hrs (float, optional): The number of hours to cache the downloaded file.                A negative value disables caching. Defaults to -1.            method (str, optional): The HTTP method to use for the request, defaults to 'GET'.            raise_error (bool, optional): Whether to raise exceptions for HTTP connect, timeout errors. Defaults to False.            **kwargs: Additional keyword arguments to pass to the httpx request.        Returns:            Path or None: The full path of the downloaded file as a Path object if successful, otherwise None.        Examples:            >>> filepath = await self.helpers.download("https://www.evilcorp.com/passwords.docx", cache_hrs=24)        """success=Falseraise_error=kwargs.get("raise_error",False)filename=kwargs.pop("filename",self.parent_helper.cache_filename(url))filename=truncate_filename(Path(filename).resolve())kwargs["filename"]=filenamemax_size=kwargs.pop("max_size",None)ifmax_sizeisnotNone:max_size=self.parent_helper.human_to_bytes(max_size)kwargs["max_size"]=max_sizecache_hrs=float(kwargs.pop("cache_hrs",-1))ifcache_hrs>0andself.parent_helper.is_cached(url):log.debug(f"{url} is cached at {self.parent_helper.cache_filename(url)}")success=Trueelse:result=awaitself.run_and_return("download",url,**kwargs)ifisinstance(result,dict)and"_download_error"inresult:ifraise_error:error_msg=result["_download_error"]response=result["_response"]error=self.ERROR_CLASS(error_msg)error.response=responseraiseerrorelifresult:success=Trueifsuccess:returnfilenameasyncdefwordlist(self,path,lines=None,zip=False,zip_filename=None,**kwargs):"""        Asynchronous function for retrieving wordlists, either from a local path or a URL.        Allows for optional line-based truncation and caching. Returns the full path of the wordlist        file or a truncated version of it.        Args:            path (str): The local or remote path of the wordlist.            lines (int, optional): Number of lines to read from the wordlist.                If specified, will return a truncated wordlist with this many lines.            zip (bool, optional): Whether to unzip the file after downloading. Defaults to False.            zip_filename (str, optional): The name of the file to extract from the ZIP archive.                Required if zip is True.            cache_hrs (float, optional): Number of hours to cache the downloaded wordlist.                Defaults to 720 hours (30 days) for remote wordlists.            **kwargs: Additional keyword arguments to pass to the 'download' function for remote wordlists.        Returns:            Path: The full path of the wordlist (or its truncated version) as a Path object.        Raises:            WordlistError: If the path is invalid or the wordlist could not be retrieved or found.        Examples:            Fetching full wordlist            >>> wordlist_path = await self.helpers.wordlist("https://www.evilcorp.com/wordlist.txt")            Fetching and truncating to the first 100 lines            >>> wordlist_path = await self.helpers.wordlist("/root/rockyou.txt", lines=100)        """importzipfileifnotpath:raiseWordlistError(f"Invalid wordlist: {path}")if"cache_hrs"notinkwargs:kwargs["cache_hrs"]=720ifself.parent_helper.is_url(path):filename=awaitself.download(str(path),**kwargs)iffilenameisNone:raiseWordlistError(f"Unable to retrieve wordlist from {path}")else:filename=Path(path).resolve()ifnotfilename.is_file():raiseWordlistError(f"Unable to find wordlist at {path}")ifzip:ifnotzip_filename:raiseWordlistError("zip_filename must be specified when zip is True")try:withzipfile.ZipFile(filename,"r")aszip_ref:ifzip_filenamenotinzip_ref.namelist():raiseWordlistError(f"File {zip_filename} not found in the zip archive {filename}")zip_ref.extract(zip_filename,filename.parent)filename=filename.parent/zip_filenameexceptExceptionase:raiseWordlistError(f"Error unzipping file {filename}: {e}")iflinesisNone:returnfilenameelse:lines=int(lines)withopen(filename)asf:read_lines=f.readlines()cache_key=f"{filename}:{lines}"truncated_filename=self.parent_helper.cache_filename(cache_key)withopen(truncated_filename,"w")asf:forlineinread_lines[:lines]:f.write(line)returntruncated_filenameasyncdefcurl(self,*args,**kwargs):"""        An asynchronous function that runs a cURL command with specified arguments and options.        This function constructs and executes a cURL command based on the provided parameters.        It offers support for various cURL options such as headers, post data, and cookies.        Args:            *args: Variable length argument list for positional arguments. Unused in this function.            url (str): The URL for the cURL request. Mandatory.            raw_path (bool, optional): If True, activates '--path-as-is' in cURL. Defaults to False.            headers (dict, optional): A dictionary of HTTP headers to include in the request.            ignore_bbot_global_settings (bool, optional): If True, ignores the global settings of BBOT. Defaults to False.            post_data (dict, optional): A dictionary containing data to be sent in the request body.            method (str, optional): The HTTP method to use for the request (e.g., 'GET', 'POST').            cookies (dict, optional): A dictionary of cookies to include in the request.            path_override (str, optional): Overrides the request-target to use in the HTTP request line.            head_mode (bool, optional): If True, includes '-I' to fetch headers only. Defaults to None.            raw_body (str, optional): Raw string to be sent in the body of the request.            **kwargs: Arbitrary keyword arguments that will be forwarded to the HTTP request function.        Returns:            str: The output of the cURL command.        Raises:            CurlError: If 'url' is not supplied.        Examples:            >>> output = await curl(url="https://example.com", headers={"X-Header": "Wat"})            >>> print(output)        """url=kwargs.get("url","")ifnoturl:raiseCurlError("No URL supplied to CURL helper")curl_command=["curl",url,"-s"]raw_path=kwargs.get("raw_path",False)ifraw_path:curl_command.append("--path-as-is")# respect global ssl verify settingsifself.ssl_verifyisnotTrue:curl_command.append("-k")headers=kwargs.get("headers",{})cookies=kwargs.get("cookies",{})ignore_bbot_global_settings=kwargs.get("ignore_bbot_global_settings",False)ifignore_bbot_global_settings:http_timeout=20# setting 20 as a worse-case settinglog.debug("ignore_bbot_global_settings enabled. Global settings will not be applied")else:http_timeout=self.parent_helper.web_config.get("http_timeout",20)user_agent=self.parent_helper.web_config.get("user_agent","BBOT")if"User-Agent"notinheaders:headers["User-Agent"]=user_agent# only add custom headers / cookies if the URL is in-scopeifself.parent_helper.preset.in_scope(url):forhk,hvinself.web_config.get("http_headers",{}).items():# Only add the header if it doesn't already exist in the headers dictionaryifhknotinheaders:headers[hk]=hvforck,cvinself.web_config.get("http_cookies",{}).items():# don't clobber cookiesifcknotincookies:cookies[ck]=cv# add the timeoutif"timeout"notinkwargs:timeout=http_timeoutcurl_command.append("-m")curl_command.append(str(timeout))fork,vinheaders.items():ifisinstance(v,list):forxinv:curl_command.append("-H")curl_command.append(f"{k}: {x}")else:curl_command.append("-H")curl_command.append(f"{k}: {v}")post_data=kwargs.get("post_data",{})iflen(post_data.items())>0:curl_command.append("-d")post_data_str=""fork,vinpost_data.items():post_data_str+=f"&{k}={v}"curl_command.append(post_data_str.lstrip("&"))method=kwargs.get("method","")ifmethod:curl_command.append("-X")curl_command.append(method)cookies=kwargs.get("cookies","")ifcookies:curl_command.append("-b")cookies_str=""fork,vincookies.items():cookies_str+=f"{k}={v}; "curl_command.append(f"{cookies_str.rstrip(' ')}")path_override=kwargs.get("path_override",None)ifpath_override:curl_command.append("--request-target")curl_command.append(f"{path_override}")head_mode=kwargs.get("head_mode",None)ifhead_mode:curl_command.append("-I")raw_body=kwargs.get("raw_body",None)ifraw_body:curl_command.append("-d")curl_command.append(raw_body)log.verbose(f"Running curl command: {curl_command}")output=(awaitself.parent_helper.run(curl_command)).stdoutreturnoutputdefbeautifulsoup(self,markup,features="html.parser",builder=None,parse_only=None,from_encoding=None,exclude_encodings=None,element_classes=None,**kwargs,):"""        Naviate, Search, Modify, Parse, or PrettyPrint HTML Content.        More information at https://beautiful-soup-4.readthedocs.io/en/latest/        Args:            markup: A string or a file-like object representing markup to be parsed.            features: Desirable features of the parser to be used.                This may be the name of a specific parser ("lxml",                "lxml-xml", "html.parser", or "html5lib") or it may be                the type of markup to be used ("html", "html5", "xml").                Defaults to 'html.parser'.            builder: A TreeBuilder subclass to instantiate (or instance to use)                instead of looking one up based on `features`.            parse_only: A SoupStrainer. Only parts of the document                matching the SoupStrainer will be considered.            from_encoding: A string indicating the encoding of the                document to be parsed.            exclude_encodings = A list of strings indicating                encodings known to be wrong.            element_classes = A dictionary mapping BeautifulSoup                classes like Tag and NavigableString, to other classes you'd                like to be instantiated instead as the parse tree is                built.            **kwargs = For backwards compatibility purposes.        Returns:            soup: An instance of the BeautifulSoup class        Todo:            - Write tests for this function        Examples:            >>> soup = self.helpers.beautifulsoup(event.data["body"], "html.parser")            Perform an html parse of the 'markup' argument and return a soup instance            >>> email_type = soup.find(type="email")            Searches the soup instance for all occurrences of the passed in argument        """try:soup=BeautifulSoup(markup,features,builder,parse_only,from_encoding,exclude_encodings,element_classes,**kwargs)returnsoupexceptExceptionase:log.debug(f"Error parsing beautifulsoup: {e}")returnFalsedefresponse_to_json(self,response):"""        Convert web response to JSON object, similar to the output of `httpx -irr -json`        """ifresponseisNone:returnimportmmh3fromdatetimeimportdatetimefromhashlibimportmd5,sha256frombbot.core.helpers.miscimporttagify,urlparse,split_host_port,smart_decoderequest=response.requesturl=str(request.url)parsed_url=urlparse(url)netloc=parsed_url.netlocscheme=parsed_url.scheme.lower()host,port=split_host_port(f"{scheme}://{netloc}")raw_headers="\r\n".join([f"{k}: {v}"fork,vinresponse.headers.items()])raw_headers_encoded=raw_headers.encode()headers={}fork,vinresponse.headers.items():k=tagify(k,delimiter="_")headers[k]=vj={"timestamp":datetime.now().isoformat(),"hash":{"body_md5":md5(response.content).hexdigest(),"body_mmh3":mmh3.hash(response.content),"body_sha256":sha256(response.content).hexdigest(),# "body_simhash": "TODO","header_md5":md5(raw_headers_encoded).hexdigest(),"header_mmh3":mmh3.hash(raw_headers_encoded),"header_sha256":sha256(raw_headers_encoded).hexdigest(),# "header_simhash": "TODO",},"header":headers,"body":smart_decode(response.content),"content_type":headers.get("content_type","").split(";")[0].strip(),"url":url,"host":str(host),"port":port,"scheme":scheme,"method":response.request.method,"path":parsed_url.path,"raw_header":raw_headers,"status_code":response.status_code,}returnj

ERROR_CLASSclass-attributeinstance-attribute

ERROR_CLASS=WebError

Main utility class for managing HTTP operations in BBOT. It serves as a wrapper around the BBOTAsyncClient,which itself is a subclass of httpx.AsyncClient. The class provides functionalities to make HTTP requests,download files, and handle cached wordlists.

Attributes:

  • parent_helper (object) –

    The parent helper object containing scan configurations.

  • http_debug (bool) –

    Flag to indicate whether HTTP debugging is enabled.

  • ssl_verify (bool) –

    Flag to indicate whether SSL verification is enabled.

  • web_client (BBOTAsyncClient) –

    An instance of BBOTAsyncClient for making HTTP requests.

  • client_only_options (tuple) –

    A tuple of options only applicable to the web client.

Examples:

Basic web request:

>>>response=awaitself.helpers.request("https://www.evilcorp.com")

Download file:

>>>filename=awaitself.helpers.download("https://www.evilcorp.com/passwords.docx")

Download wordlist (cached for 30 days by default):

>>>filename=awaitself.helpers.wordlist("https://www.evilcorp.com/wordlist.txt")

beautifulsoup

beautifulsoup(markup,features='html.parser',builder=None,parse_only=None,from_encoding=None,exclude_encodings=None,element_classes=None,**kwargs)

Naviate, Search, Modify, Parse, or PrettyPrint HTML Content.More information at https://beautiful-soup-4.readthedocs.io/en/latest/

Parameters:

  • markup

    A string or a file-like object representing markup to be parsed.

  • features

    Desirable features of the parser to be used.This may be the name of a specific parser ("lxml","lxml-xml", "html.parser", or "html5lib") or it may bethe type of markup to be used ("html", "html5", "xml").Defaults to 'html.parser'.

  • builder

    A TreeBuilder subclass to instantiate (or instance to use)instead of looking one up based onfeatures.

  • parse_only

    A SoupStrainer. Only parts of the documentmatching the SoupStrainer will be considered.

  • from_encoding

    A string indicating the encoding of thedocument to be parsed.

Returns:

  • soup

    An instance of the BeautifulSoup class

Todo
  • Write tests for this function

Examples:

>>>soup=self.helpers.beautifulsoup(event.data["body"],"html.parser")Perform an html parse of the 'markup' argument and return a soup instance
>>>email_type=soup.find(type="email")Searches the soup instance for all occurrences of the passed in argument
Source code inbbot/core/helpers/web/web.py
433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489
defbeautifulsoup(self,markup,features="html.parser",builder=None,parse_only=None,from_encoding=None,exclude_encodings=None,element_classes=None,**kwargs,):"""    Naviate, Search, Modify, Parse, or PrettyPrint HTML Content.    More information at https://beautiful-soup-4.readthedocs.io/en/latest/    Args:        markup: A string or a file-like object representing markup to be parsed.        features: Desirable features of the parser to be used.            This may be the name of a specific parser ("lxml",            "lxml-xml", "html.parser", or "html5lib") or it may be            the type of markup to be used ("html", "html5", "xml").            Defaults to 'html.parser'.        builder: A TreeBuilder subclass to instantiate (or instance to use)            instead of looking one up based on `features`.        parse_only: A SoupStrainer. Only parts of the document            matching the SoupStrainer will be considered.        from_encoding: A string indicating the encoding of the            document to be parsed.        exclude_encodings = A list of strings indicating            encodings known to be wrong.        element_classes = A dictionary mapping BeautifulSoup            classes like Tag and NavigableString, to other classes you'd            like to be instantiated instead as the parse tree is            built.        **kwargs = For backwards compatibility purposes.    Returns:        soup: An instance of the BeautifulSoup class    Todo:        - Write tests for this function    Examples:        >>> soup = self.helpers.beautifulsoup(event.data["body"], "html.parser")        Perform an html parse of the 'markup' argument and return a soup instance        >>> email_type = soup.find(type="email")        Searches the soup instance for all occurrences of the passed in argument    """try:soup=BeautifulSoup(markup,features,builder,parse_only,from_encoding,exclude_encodings,element_classes,**kwargs)returnsoupexceptExceptionase:log.debug(f"Error parsing beautifulsoup: {e}")returnFalse

curlasync

curl(*args,**kwargs)

An asynchronous function that runs a cURL command with specified arguments and options.

This function constructs and executes a cURL command based on the provided parameters.It offers support for various cURL options such as headers, post data, and cookies.

Parameters:

  • *args

    Variable length argument list for positional arguments. Unused in this function.

  • url (str) –

    The URL for the cURL request. Mandatory.

  • raw_path (bool) –

    If True, activates '--path-as-is' in cURL. Defaults to False.

  • headers (dict) –

    A dictionary of HTTP headers to include in the request.

  • ignore_bbot_global_settings (bool) –

    If True, ignores the global settings of BBOT. Defaults to False.

  • post_data (dict) –

    A dictionary containing data to be sent in the request body.

  • method (str) –

    The HTTP method to use for the request (e.g., 'GET', 'POST').

  • cookies (dict) –

    A dictionary of cookies to include in the request.

  • path_override (str) –

    Overrides the request-target to use in the HTTP request line.

  • head_mode (bool) –

    If True, includes '-I' to fetch headers only. Defaults to None.

  • raw_body (str) –

    Raw string to be sent in the body of the request.

  • **kwargs

    Arbitrary keyword arguments that will be forwarded to the HTTP request function.

Returns:

  • str

    The output of the cURL command.

Raises:

  • CurlError

    If 'url' is not supplied.

Examples:

>>>output=awaitcurl(url="https://example.com",headers={"X-Header":"Wat"})>>>print(output)
Source code inbbot/core/helpers/web/web.py
305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431
asyncdefcurl(self,*args,**kwargs):"""    An asynchronous function that runs a cURL command with specified arguments and options.    This function constructs and executes a cURL command based on the provided parameters.    It offers support for various cURL options such as headers, post data, and cookies.    Args:        *args: Variable length argument list for positional arguments. Unused in this function.        url (str): The URL for the cURL request. Mandatory.        raw_path (bool, optional): If True, activates '--path-as-is' in cURL. Defaults to False.        headers (dict, optional): A dictionary of HTTP headers to include in the request.        ignore_bbot_global_settings (bool, optional): If True, ignores the global settings of BBOT. Defaults to False.        post_data (dict, optional): A dictionary containing data to be sent in the request body.        method (str, optional): The HTTP method to use for the request (e.g., 'GET', 'POST').        cookies (dict, optional): A dictionary of cookies to include in the request.        path_override (str, optional): Overrides the request-target to use in the HTTP request line.        head_mode (bool, optional): If True, includes '-I' to fetch headers only. Defaults to None.        raw_body (str, optional): Raw string to be sent in the body of the request.        **kwargs: Arbitrary keyword arguments that will be forwarded to the HTTP request function.    Returns:        str: The output of the cURL command.    Raises:        CurlError: If 'url' is not supplied.    Examples:        >>> output = await curl(url="https://example.com", headers={"X-Header": "Wat"})        >>> print(output)    """url=kwargs.get("url","")ifnoturl:raiseCurlError("No URL supplied to CURL helper")curl_command=["curl",url,"-s"]raw_path=kwargs.get("raw_path",False)ifraw_path:curl_command.append("--path-as-is")# respect global ssl verify settingsifself.ssl_verifyisnotTrue:curl_command.append("-k")headers=kwargs.get("headers",{})cookies=kwargs.get("cookies",{})ignore_bbot_global_settings=kwargs.get("ignore_bbot_global_settings",False)ifignore_bbot_global_settings:http_timeout=20# setting 20 as a worse-case settinglog.debug("ignore_bbot_global_settings enabled. Global settings will not be applied")else:http_timeout=self.parent_helper.web_config.get("http_timeout",20)user_agent=self.parent_helper.web_config.get("user_agent","BBOT")if"User-Agent"notinheaders:headers["User-Agent"]=user_agent# only add custom headers / cookies if the URL is in-scopeifself.parent_helper.preset.in_scope(url):forhk,hvinself.web_config.get("http_headers",{}).items():# Only add the header if it doesn't already exist in the headers dictionaryifhknotinheaders:headers[hk]=hvforck,cvinself.web_config.get("http_cookies",{}).items():# don't clobber cookiesifcknotincookies:cookies[ck]=cv# add the timeoutif"timeout"notinkwargs:timeout=http_timeoutcurl_command.append("-m")curl_command.append(str(timeout))fork,vinheaders.items():ifisinstance(v,list):forxinv:curl_command.append("-H")curl_command.append(f"{k}: {x}")else:curl_command.append("-H")curl_command.append(f"{k}: {v}")post_data=kwargs.get("post_data",{})iflen(post_data.items())>0:curl_command.append("-d")post_data_str=""fork,vinpost_data.items():post_data_str+=f"&{k}={v}"curl_command.append(post_data_str.lstrip("&"))method=kwargs.get("method","")ifmethod:curl_command.append("-X")curl_command.append(method)cookies=kwargs.get("cookies","")ifcookies:curl_command.append("-b")cookies_str=""fork,vincookies.items():cookies_str+=f"{k}={v}; "curl_command.append(f"{cookies_str.rstrip(' ')}")path_override=kwargs.get("path_override",None)ifpath_override:curl_command.append("--request-target")curl_command.append(f"{path_override}")head_mode=kwargs.get("head_mode",None)ifhead_mode:curl_command.append("-I")raw_body=kwargs.get("raw_body",None)ifraw_body:curl_command.append("-d")curl_command.append(raw_body)log.verbose(f"Running curl command: {curl_command}")output=(awaitself.parent_helper.run(curl_command)).stdoutreturnoutput

downloadasync

download(url,**kwargs)

Asynchronous function for downloading files from a given URL. Supports caching with an optionaltime period in hours via the "cache_hrs" keyword argument. In case of successful download,returns the full path of the saved filename. If the download fails, returns None.

Parameters:

  • url (str) –

    The URL of the file to download.

  • filename (str) –

    The filename to save the downloaded file as.If not provided, will generate based on URL.

  • max_size (str orint) –

    Maximum filesize as a string ("5MB") or integer in bytes.

  • cache_hrs (float) –

    The number of hours to cache the downloaded file.A negative value disables caching. Defaults to -1.

  • method (str) –

    The HTTP method to use for the request, defaults to 'GET'.

  • raise_error (bool) –

    Whether to raise exceptions for HTTP connect, timeout errors. Defaults to False.

  • **kwargs

    Additional keyword arguments to pass to the httpx request.

Returns:

  • Path or None: The full path of the downloaded file as a Path object if successful, otherwise None.

Examples:

>>>filepath=awaitself.helpers.download("https://www.evilcorp.com/passwords.docx",cache_hrs=24)
Source code inbbot/core/helpers/web/web.py
184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
asyncdefdownload(self,url,**kwargs):"""    Asynchronous function for downloading files from a given URL. Supports caching with an optional    time period in hours via the "cache_hrs" keyword argument. In case of successful download,    returns the full path of the saved filename. If the download fails, returns None.    Args:        url (str): The URL of the file to download.        filename (str, optional): The filename to save the downloaded file as.            If not provided, will generate based on URL.        max_size (str or int): Maximum filesize as a string ("5MB") or integer in bytes.        cache_hrs (float, optional): The number of hours to cache the downloaded file.            A negative value disables caching. Defaults to -1.        method (str, optional): The HTTP method to use for the request, defaults to 'GET'.        raise_error (bool, optional): Whether to raise exceptions for HTTP connect, timeout errors. Defaults to False.        **kwargs: Additional keyword arguments to pass to the httpx request.    Returns:        Path or None: The full path of the downloaded file as a Path object if successful, otherwise None.    Examples:        >>> filepath = await self.helpers.download("https://www.evilcorp.com/passwords.docx", cache_hrs=24)    """success=Falseraise_error=kwargs.get("raise_error",False)filename=kwargs.pop("filename",self.parent_helper.cache_filename(url))filename=truncate_filename(Path(filename).resolve())kwargs["filename"]=filenamemax_size=kwargs.pop("max_size",None)ifmax_sizeisnotNone:max_size=self.parent_helper.human_to_bytes(max_size)kwargs["max_size"]=max_sizecache_hrs=float(kwargs.pop("cache_hrs",-1))ifcache_hrs>0andself.parent_helper.is_cached(url):log.debug(f"{url} is cached at {self.parent_helper.cache_filename(url)}")success=Trueelse:result=awaitself.run_and_return("download",url,**kwargs)ifisinstance(result,dict)and"_download_error"inresult:ifraise_error:error_msg=result["_download_error"]response=result["_response"]error=self.ERROR_CLASS(error_msg)error.response=responseraiseerrorelifresult:success=Trueifsuccess:returnfilename

requestasync

request(*args,**kwargs)

Asynchronous function for making HTTP requests, intended to be the most basic web request functionused widely across BBOT and within this helper class. Handles various exceptions and timeoutsthat might occur during the request.

This function automatically respects the scan's global timeout, proxy, headers, etc.Headers you specify will be merged with the scan's. Your arguments take ultimate precedence,meaning you can override the scan's values if you want.

Parameters:

  • url (str) –

    The URL to send the request to.

  • method (str) –

    The HTTP method to use for the request. Defaults to 'GET'.

  • headers (dict) –

    Dictionary of HTTP headers to send with the request.

  • params (dict) –

    Dictionary, list of tuples, or bytes to send in the query string.

  • cookies (dict) –

    Dictionary or CookieJar object containing cookies.

  • json (Any) –

    A JSON serializable Python object to send in the body.

  • data (dict) –

    Dictionary, list of tuples, or bytes to send in the body.

  • files (dict) –

    Dictionary of 'name': file-like-objects for multipart encoding upload.

  • auth (tuple) –

    Auth tuple to enable Basic/Digest/Custom HTTP auth.

  • timeout (float) –

    The maximum time to wait for the request to complete.

  • proxy (str) –

    HTTP proxy URL.

  • allow_redirects (bool) –

    Enables or disables redirection. Defaults to None.

  • stream (bool) –

    Enables or disables response streaming.

  • raise_error (bool) –

    Whether to raise exceptions for HTTP connect, timeout errors. Defaults to False.

  • client (AsyncClient) –

    A specific httpx.AsyncClient to use for the request. Defaults to self.web_client.

  • cache_for (int) –

    Time in seconds to cache the request. Not used currently. Defaults to None.

Raises:

  • TimeoutException

    If the request times out.

  • ConnectError

    If the connection fails.

  • RequestError

    For other request-related errors.

Returns:

  • httpx.Response or None: The HTTP response object returned by the httpx library.

Examples:

>>>response=awaitself.helpers.request("https://www.evilcorp.com")
>>>response=awaitself.helpers.request("https://api.evilcorp.com/",method="POST",data="stuff")
Note

If the web request fails, it will return None unlessraise_error isTrue.

Source code inbbot/core/helpers/web/web.py
 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
asyncdefrequest(self,*args,**kwargs):"""    Asynchronous function for making HTTP requests, intended to be the most basic web request function    used widely across BBOT and within this helper class. Handles various exceptions and timeouts    that might occur during the request.    This function automatically respects the scan's global timeout, proxy, headers, etc.    Headers you specify will be merged with the scan's. Your arguments take ultimate precedence,    meaning you can override the scan's values if you want.    Args:        url (str): The URL to send the request to.        method (str, optional): The HTTP method to use for the request. Defaults to 'GET'.        headers (dict, optional): Dictionary of HTTP headers to send with the request.        params (dict, optional): Dictionary, list of tuples, or bytes to send in the query string.        cookies (dict, optional): Dictionary or CookieJar object containing cookies.        json (Any, optional): A JSON serializable Python object to send in the body.        data (dict, optional): Dictionary, list of tuples, or bytes to send in the body.        files (dict, optional): Dictionary of 'name': file-like-objects for multipart encoding upload.        auth (tuple, optional): Auth tuple to enable Basic/Digest/Custom HTTP auth.        timeout (float, optional): The maximum time to wait for the request to complete.        proxy (str, optional): HTTP proxy URL.        allow_redirects (bool, optional): Enables or disables redirection. Defaults to None.        stream (bool, optional): Enables or disables response streaming.        raise_error (bool, optional): Whether to raise exceptions for HTTP connect, timeout errors. Defaults to False.        client (httpx.AsyncClient, optional): A specific httpx.AsyncClient to use for the request. Defaults to self.web_client.        cache_for (int, optional): Time in seconds to cache the request. Not used currently. Defaults to None.    Raises:        httpx.TimeoutException: If the request times out.        httpx.ConnectError: If the connection fails.        httpx.RequestError: For other request-related errors.    Returns:        httpx.Response or None: The HTTP response object returned by the httpx library.    Examples:        >>> response = await self.helpers.request("https://www.evilcorp.com")        >>> response = await self.helpers.request("https://api.evilcorp.com/", method="POST", data="stuff")    Note:        If the web request fails, it will return None unless `raise_error` is `True`.    """raise_error=kwargs.get("raise_error",False)result=awaitself.run_and_return("request",*args,**kwargs)ifisinstance(result,dict)and"_request_error"inresult:ifraise_error:error_msg=result["_request_error"]response=result["_response"]error=self.ERROR_CLASS(error_msg)error.response=responseraiseerrorreturnresult

request_batchasync

request_batch(urls,*args,**kwargs)

Given a list of URLs, request them in parallel and yield responses as they come in.

Parameters:

  • urls (list[str]) –

    List of URLs to visit

  • *args

    Positional arguments to pass through to httpx

  • **kwargs

    Keyword arguments to pass through to httpx

Examples:

>>>asyncforurl,responseinself.helpers.request_batch(urls,headers={"X-Test":"Test"}):>>>ifresponseisnotNoneandresponse.status_code==200:>>>self.hugesuccess(response)
Source code inbbot/core/helpers/web/web.py
132133134135136137138139140141142143144145146147148149150151152
asyncdefrequest_batch(self,urls,*args,**kwargs):"""    Given a list of URLs, request them in parallel and yield responses as they come in.    Args:        urls (list[str]): List of URLs to visit        *args: Positional arguments to pass through to httpx        **kwargs: Keyword arguments to pass through to httpx    Examples:        >>> async for url, response in self.helpers.request_batch(urls, headers={"X-Test": "Test"}):        >>>     if response is not None and response.status_code == 200:        >>>         self.hugesuccess(response)    """agen=self.run_and_yield("request_batch",urls,*args,**kwargs)while1:try:yieldawaitagen.__anext__()except(StopAsyncIteration,GeneratorExit):awaitagen.aclose()break

request_custom_batchasync

request_custom_batch(urls_and_kwargs)

Make web requests in parallel with custom options for each request. Yield responses as they come in.

Similar torequest_batch except it allows individual arguments for each URL.

Parameters:

  • urls_and_kwargs (list[tuple]) –

    List of tuples in the format: (url, kwargs, custom_tracker)where custom_tracker is an optional value for your own internal use. You may use it tohelp correlate requests, etc.

Examples:

>>>urls_and_kwargs=[>>>("http://evilcorp.com/1",{"method":"GET"},"request-1"),>>>("http://evilcorp.com/2",{"method":"POST"},"request-2"),>>>]>>>asyncforurl,kwargs,custom_tracker,responseinself.helpers.request_custom_batch(>>>urls_and_kwargs>>>):>>>ifresponseisnotNoneandresponse.status_code==200:>>>self.hugesuccess(response)
Source code inbbot/core/helpers/web/web.py
154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
asyncdefrequest_custom_batch(self,urls_and_kwargs):"""    Make web requests in parallel with custom options for each request. Yield responses as they come in.    Similar to `request_batch` except it allows individual arguments for each URL.    Args:        urls_and_kwargs (list[tuple]): List of tuples in the format: (url, kwargs, custom_tracker)            where custom_tracker is an optional value for your own internal use. You may use it to            help correlate requests, etc.    Examples:        >>> urls_and_kwargs = [        >>>     ("http://evilcorp.com/1", {"method": "GET"}, "request-1"),        >>>     ("http://evilcorp.com/2", {"method": "POST"}, "request-2"),        >>> ]        >>> async for url, kwargs, custom_tracker, response in self.helpers.request_custom_batch(        >>>     urls_and_kwargs        >>> ):        >>>     if response is not None and response.status_code == 200:        >>>         self.hugesuccess(response)    """agen=self.run_and_yield("request_custom_batch",urls_and_kwargs)while1:try:yieldawaitagen.__anext__()except(StopAsyncIteration,GeneratorExit):awaitagen.aclose()break

response_to_json

response_to_json(response)

Convert web response to JSON object, similar to the output ofhttpx -irr -json

Source code inbbot/core/helpers/web/web.py
491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544
defresponse_to_json(self,response):"""    Convert web response to JSON object, similar to the output of `httpx -irr -json`    """ifresponseisNone:returnimportmmh3fromdatetimeimportdatetimefromhashlibimportmd5,sha256frombbot.core.helpers.miscimporttagify,urlparse,split_host_port,smart_decoderequest=response.requesturl=str(request.url)parsed_url=urlparse(url)netloc=parsed_url.netlocscheme=parsed_url.scheme.lower()host,port=split_host_port(f"{scheme}://{netloc}")raw_headers="\r\n".join([f"{k}: {v}"fork,vinresponse.headers.items()])raw_headers_encoded=raw_headers.encode()headers={}fork,vinresponse.headers.items():k=tagify(k,delimiter="_")headers[k]=vj={"timestamp":datetime.now().isoformat(),"hash":{"body_md5":md5(response.content).hexdigest(),"body_mmh3":mmh3.hash(response.content),"body_sha256":sha256(response.content).hexdigest(),# "body_simhash": "TODO","header_md5":md5(raw_headers_encoded).hexdigest(),"header_mmh3":mmh3.hash(raw_headers_encoded),"header_sha256":sha256(raw_headers_encoded).hexdigest(),# "header_simhash": "TODO",},"header":headers,"body":smart_decode(response.content),"content_type":headers.get("content_type","").split(";")[0].strip(),"url":url,"host":str(host),"port":port,"scheme":scheme,"method":response.request.method,"path":parsed_url.path,"raw_header":raw_headers,"status_code":response.status_code,}returnj

wordlistasync

wordlist(path,lines=None,zip=False,zip_filename=None,**kwargs)

Asynchronous function for retrieving wordlists, either from a local path or a URL.Allows for optional line-based truncation and caching. Returns the full path of the wordlistfile or a truncated version of it.

Parameters:

  • path (str) –

    The local or remote path of the wordlist.

  • lines (int, default:None) –

    Number of lines to read from the wordlist.If specified, will return a truncated wordlist with this many lines.

  • zip (bool, default:False) –

    Whether to unzip the file after downloading. Defaults to False.

  • zip_filename (str, default:None) –

    The name of the file to extract from the ZIP archive.Required if zip is True.

  • cache_hrs (float) –

    Number of hours to cache the downloaded wordlist.Defaults to 720 hours (30 days) for remote wordlists.

  • **kwargs

    Additional keyword arguments to pass to the 'download' function for remote wordlists.

Returns:

  • Path

    The full path of the wordlist (or its truncated version) as a Path object.

Raises:

  • WordlistError

    If the path is invalid or the wordlist could not be retrieved or found.

Examples:

Fetching full wordlist

>>>wordlist_path=awaitself.helpers.wordlist("https://www.evilcorp.com/wordlist.txt")

Fetching and truncating to the first 100 lines

>>>wordlist_path=awaitself.helpers.wordlist("/root/rockyou.txt",lines=100)
Source code inbbot/core/helpers/web/web.py
235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
asyncdefwordlist(self,path,lines=None,zip=False,zip_filename=None,**kwargs):"""    Asynchronous function for retrieving wordlists, either from a local path or a URL.    Allows for optional line-based truncation and caching. Returns the full path of the wordlist    file or a truncated version of it.    Args:        path (str): The local or remote path of the wordlist.        lines (int, optional): Number of lines to read from the wordlist.            If specified, will return a truncated wordlist with this many lines.        zip (bool, optional): Whether to unzip the file after downloading. Defaults to False.        zip_filename (str, optional): The name of the file to extract from the ZIP archive.            Required if zip is True.        cache_hrs (float, optional): Number of hours to cache the downloaded wordlist.            Defaults to 720 hours (30 days) for remote wordlists.        **kwargs: Additional keyword arguments to pass to the 'download' function for remote wordlists.    Returns:        Path: The full path of the wordlist (or its truncated version) as a Path object.    Raises:        WordlistError: If the path is invalid or the wordlist could not be retrieved or found.    Examples:        Fetching full wordlist        >>> wordlist_path = await self.helpers.wordlist("https://www.evilcorp.com/wordlist.txt")        Fetching and truncating to the first 100 lines        >>> wordlist_path = await self.helpers.wordlist("/root/rockyou.txt", lines=100)    """importzipfileifnotpath:raiseWordlistError(f"Invalid wordlist: {path}")if"cache_hrs"notinkwargs:kwargs["cache_hrs"]=720ifself.parent_helper.is_url(path):filename=awaitself.download(str(path),**kwargs)iffilenameisNone:raiseWordlistError(f"Unable to retrieve wordlist from {path}")else:filename=Path(path).resolve()ifnotfilename.is_file():raiseWordlistError(f"Unable to find wordlist at {path}")ifzip:ifnotzip_filename:raiseWordlistError("zip_filename must be specified when zip is True")try:withzipfile.ZipFile(filename,"r")aszip_ref:ifzip_filenamenotinzip_ref.namelist():raiseWordlistError(f"File {zip_filename} not found in the zip archive {filename}")zip_ref.extract(zip_filename,filename.parent)filename=filename.parent/zip_filenameexceptExceptionase:raiseWordlistError(f"Error unzipping file {filename}: {e}")iflinesisNone:returnfilenameelse:lines=int(lines)withopen(filename)asf:read_lines=f.readlines()cache_key=f"{filename}:{lines}"truncated_filename=self.parent_helper.cache_filename(cache_key)withopen(truncated_filename,"w")asf:forlineinread_lines[:lines]:f.write(line)returntruncated_filename

[8]ページ先頭

©2009-2025 Movatter.jp