1+ #!/usr/bin/env python3
2+ """
3+ Add risk assessment tags to rule package JSON files.
4+
5+ This script:
6+ 1. Iterates through each JSON file in rule_packages directory
7+ 2. Looks for CERT-C or CERT-CPP sections
8+ 3. For each rule, finds the corresponding markdown file
9+ 4. Extracts risk assessment data from the markdown file
10+ 5. Adds risk assessment data as tags to each query in the JSON file
11+ """
12+
13+ import os
14+ import json
15+ import re
16+ import glob
17+ from bs4 import BeautifulSoup
18+ import logging
19+
20+ logging .basicConfig (level = logging .INFO ,format = '%(asctime)s - %(levelname)s - %(message)s' )
21+ logger = logging .getLogger (__name__ )
22+
23+ def find_rule_packages ():
24+ """Find all JSON rule package files in the rule_packages directory."""
25+ repo_root = os .path .dirname (os .path .dirname (os .path .abspath (__file__ )))
26+ rule_packages_dir = os .path .join (repo_root ,"rule_packages" )
27+ return glob .glob (os .path .join (rule_packages_dir ,"**" ,"*.json" ),recursive = True )
28+
29+ def extract_risk_assessment_from_md (md_file_path ):
30+ """Extract risk assessment data from the markdown file."""
31+ risk_data = {}
32+
33+ try :
34+ with open (md_file_path ,'r' ,encoding = 'utf-8' )as f :
35+ content = f .read ()
36+
37+ # Find the Risk Assessment section
38+ risk_section_match = re .search (r'## Risk Assessment(.*?)##' ,content ,re .DOTALL )
39+ if not risk_section_match :
40+ # Try to find it as the last section
41+ risk_section_match = re .search (r'## Risk Assessment(.*?)$' ,content ,re .DOTALL )
42+ if not risk_section_match :
43+ logger .warning (f"No Risk Assessment section found in{ md_file_path } " )
44+ return risk_data
45+
46+ risk_section = risk_section_match .group (1 )
47+
48+ # Look for the table with risk assessment data
49+ table_match = re .search (r'<table>(.*?)</table>' ,risk_section ,re .DOTALL )
50+ if not table_match :
51+ logger .warning (f"No risk assessment table found in{ md_file_path } " )
52+ return risk_data
53+
54+ table_html = table_match .group (0 )
55+ soup = BeautifulSoup (table_html ,'html.parser' )
56+
57+ # Find all rows in the table
58+ rows = soup .find_all ('tr' )
59+ if len (rows )< 2 :# Need at least header and data row
60+ logger .warning (f"Incomplete risk assessment table in{ md_file_path } " )
61+ return risk_data
62+
63+ # Extract headers and values
64+ headers = [th .get_text ().strip ()for th in rows [0 ].find_all ('th' )]
65+ values = [td .get_text ().strip ()for td in rows [1 ].find_all ('td' )]
66+
67+ # Create a dictionary of headers and values
68+ if len (headers )== len (values ):
69+ for i ,header in enumerate (headers ):
70+ risk_data [header ]= values [i ]
71+ else :
72+ logger .warning (f"Header and value count mismatch in{ md_file_path } " )
73+
74+ except Exception as e :
75+ logger .error (f"Error extracting risk assessment from{ md_file_path } :{ e } " )
76+
77+ return risk_data
78+
79+ def find_md_file (rule_id ,short_name ,language ):
80+ """Find the markdown file for the given rule ID and short name."""
81+ repo_root = os .path .dirname (os .path .dirname (os .path .abspath (__file__ )))
82+ md_path = os .path .join (repo_root ,language ,"cert" ,"src" ,"rules" ,rule_id ,f"{ short_name } .md" )
83+
84+ if os .path .exists (md_path ):
85+ return md_path
86+ else :
87+ # Try without short name (sometimes the file is named after the rule ID)
88+ md_path = os .path .join (repo_root ,language ,"cert" ,"src" ,"rules" ,rule_id ,f"{ rule_id } .md" )
89+ if os .path .exists (md_path ):
90+ return md_path
91+ else :
92+ logger .warning (f"Could not find markdown file for{ language } rule{ rule_id } ({ short_name } )" )
93+ return None
94+
95+ def process_rule_package (rule_package_file ):
96+ """Process a single rule package JSON file."""
97+ try :
98+ with open (rule_package_file ,'r' ,encoding = 'utf-8' )as f :
99+ data = json .load (f )
100+
101+ modified = False
102+
103+ # Look for CERT-C and CERT-CPP sections
104+ for cert_key in ["CERT-C" ,"CERT-C++" ]:
105+ if cert_key in data :
106+ language = "c" if cert_key == "CERT-C" else "cpp"
107+
108+ # Process each rule in the CERT section
109+ for rule_id ,rule_data in data [cert_key ].items ():
110+ if "queries" in rule_data :
111+ for query in rule_data ["queries" ]:
112+ if "short_name" in query :
113+ md_file = find_md_file (rule_id ,query ["short_name" ],language )
114+
115+ if md_file :
116+ risk_data = extract_risk_assessment_from_md (md_file )
117+
118+ if risk_data :
119+ # Add risk assessment data as tags
120+ if "tags" not in query :
121+ query ["tags" ]= []
122+
123+ # Add each risk assessment property as a tag
124+ for key ,value in risk_data .items ():
125+ key_sanitized = key .lower ().replace (" " ,"-" )
126+ if key_sanitized == "rule" :
127+ # skip rule, as that is already in the rule ID
128+ continue
129+ tag = f"external/cert/{ key_sanitized } /{ value .lower ()} "
130+ if tag not in query ["tags" ]:
131+ query ["tags" ].append (tag )
132+ modified = True
133+ logger .info (f"Added tag{ tag } to{ rule_id } ({ query ['short_name' ]} )" )
134+
135+ # Save the modified data back to the file if any changes were made
136+ if modified :
137+ with open (rule_package_file ,'w' ,encoding = 'utf-8' )as f :
138+ json .dump (data ,f ,indent = 2 )
139+ logger .info (f"Updated{ rule_package_file } " )
140+ else :
141+ logger .info (f"No changes made to{ rule_package_file } " )
142+
143+ except Exception as e :
144+ logger .error (f"Error processing{ rule_package_file } :{ e } " )
145+
146+ def main ():
147+ """Main function to process all rule packages."""
148+ logger .info ("Starting risk assessment tag addition process" )
149+
150+ rule_packages = find_rule_packages ()
151+ logger .info (f"Found{ len (rule_packages )} rule package files" )
152+
153+ for rule_package in rule_packages :
154+ logger .info (f"Processing{ rule_package } " )
155+ process_rule_package (rule_package )
156+
157+ logger .info ("Completed risk assessment tag addition process" )
158+
159+ if __name__ == "__main__" :
160+ main ()