|
| 1 | +importjson |
| 2 | +importrequests |
| 3 | +importtime |
| 4 | +fromtypingimportDict,Optional,Tuple,List |
| 5 | + |
| 6 | +fromservices.base.cache_serviceimportCacheService |
| 7 | +fromconstantsimportGITHUB_API_URL |
| 8 | +fromproxy_utilsimportget_proxy_config |
| 9 | + |
| 10 | + |
| 11 | +classGitHubService(CacheService): |
| 12 | +"""Service for fetching and caching GitHub exploit data.""" |
| 13 | + |
| 14 | +def__init__(self,logger=None): |
| 15 | +"""Initialize the GitHubService.""" |
| 16 | +super().__init__("github_exploits",logger=logger) |
| 17 | +self.api_url=GITHUB_API_URL |
| 18 | + |
| 19 | +defmake_request( |
| 20 | +self,url:str,max_retries:int=3,timeout:int=60,**kwargs |
| 21 | + )->Optional[requests.Response]: |
| 22 | +"""Make HTTP request with retries and timeout""" |
| 23 | +forattemptinrange(max_retries): |
| 24 | +try: |
| 25 | +response=requests.get(url,timeout=timeout,**kwargs) |
| 26 | +response.raise_for_status() |
| 27 | +returnresponse |
| 28 | +except (requests.RequestException,requests.Timeout)ase: |
| 29 | +ifattempt==max_retries-1: |
| 30 | +self.logger.error( |
| 31 | +f"[GitHubService] Request failed after{max_retries} attempts:{str(e)}" |
| 32 | + ) |
| 33 | +returnNone |
| 34 | +self.logger.warning( |
| 35 | +f"[GitHubService] Request failed (attempt{attempt+1}/{max_retries}):{str(e)}" |
| 36 | + ) |
| 37 | +time.sleep(1)# Wait 1 second before retrying |
| 38 | + |
| 39 | +deffetch_exploits( |
| 40 | +self,cve_id:str,proxy_settings:Optional[Dict]=None |
| 41 | + )->Tuple[Optional[Dict],Optional[str]]: |
| 42 | +""" |
| 43 | + Fetch GitHub exploit information for a given CVE. |
| 44 | +
|
| 45 | + Args: |
| 46 | + cve_id: The CVE ID to fetch exploits for |
| 47 | + proxy_settings: Optional proxy configuration |
| 48 | +
|
| 49 | + Returns: |
| 50 | + Tuple containing: |
| 51 | + - Dictionary with exploit data if successful, None if failed |
| 52 | + - Error message if failed, None if successful |
| 53 | + """ |
| 54 | +self.logger.info(f"[GitHubService] Fetching exploits for{cve_id}") |
| 55 | + |
| 56 | +# Try to get from cache first |
| 57 | +cache_key=f"github_{cve_id}" |
| 58 | +cached_data,error=self._get_from_cache(cache_key) |
| 59 | +ifcached_data: |
| 60 | +if"pocs"incached_data: |
| 61 | +self.logger.info( |
| 62 | +f"[GitHubService] Found{len(cached_data['pocs'])} exploits for{cve_id} in cache" |
| 63 | + ) |
| 64 | +returncached_data,None |
| 65 | + |
| 66 | +# If no cache or cache is expired, fetch fresh data |
| 67 | +try: |
| 68 | +proxies=get_proxy_config(proxy_settings) |
| 69 | +verify=proxies.pop("verify",True) |
| 70 | + |
| 71 | +response=self.make_request( |
| 72 | +self.api_url,params={"cve_id":cve_id},proxies=proxies,verify=verify |
| 73 | + ) |
| 74 | + |
| 75 | +ifnotresponse: |
| 76 | +returnNone,"Failed to get response from GitHub API" |
| 77 | + |
| 78 | +data=response.json() |
| 79 | + |
| 80 | +if"pocs"indata: |
| 81 | +self.logger.info( |
| 82 | +f"[GitHubService] Found{len(data['pocs'])} exploits for{cve_id}" |
| 83 | + ) |
| 84 | +# Save to cache |
| 85 | +self._save_to_cache(cache_key,data) |
| 86 | +else: |
| 87 | +self.logger.info(f"[GitHubService] No exploits found for{cve_id}") |
| 88 | + |
| 89 | +returndata,None |
| 90 | + |
| 91 | +exceptExceptionase: |
| 92 | +error_msg=f"Error fetching GitHub exploits:{str(e)}" |
| 93 | +self.logger.error(f"[GitHubService]{error_msg}") |
| 94 | +returnNone,error_msg |
| 95 | + |
| 96 | +defget_formatted_exploits( |
| 97 | +self,cve_id:str,proxy_settings:Optional[Dict]=None |
| 98 | + )->List[Dict]: |
| 99 | +""" |
| 100 | + Get formatted GitHub exploit information for a given CVE. |
| 101 | +
|
| 102 | + Args: |
| 103 | + cve_id: The CVE ID to fetch exploits for |
| 104 | + proxy_settings: Optional proxy configuration |
| 105 | +
|
| 106 | + Returns: |
| 107 | + List of formatted exploit data dictionaries |
| 108 | + """ |
| 109 | +exploits_data= [] |
| 110 | +data,error=self.fetch_exploits(cve_id,proxy_settings) |
| 111 | + |
| 112 | +ifdataand"pocs"indata: |
| 113 | +forpocindata["pocs"]: |
| 114 | +exploit_data= { |
| 115 | +"created_at":poc.get("created_at","N/A"), |
| 116 | +"updated_at":poc.get("updated_at","N/A"), |
| 117 | +"url":poc.get("html_url","N/A"), |
| 118 | + } |
| 119 | +exploits_data.append(exploit_data) |
| 120 | + |
| 121 | +returnexploits_data |