Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

added mitre-attack parsing + mitre-attack render to platforms#23

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
alex-bredikhin merged 2 commits intomainfromadded-mittre-attack-tags-parsing
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 108 additions & 0 deletionssiem-converter/app/converter/core/mitre.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
import json
import os
import urllib.request
import ssl
from urllib.error import HTTPError

from app.converter.tools.singleton_meta import SingletonMeta
from const import ROOT_PROJECT_PATH


class MitreConfig(metaclass=SingletonMeta):
config_url: str = 'https://raw.githubusercontent.com/mitre/cti/master/enterprise-attack/enterprise-attack.json'
mitre_source_types: tuple = ('mitre-attack', )
tactics: dict = {}
techniques: dict = {}

@staticmethod
def __revoked_or_deprecated(entry: dict) -> bool:
if entry.get("revoked") or entry.get("x_mitre_deprecated"):
return True
return False

def __get_mitre_json(self) -> dict:
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE

try:
with urllib.request.urlopen(self.config_url, context=ctx) as cti_json:
return json.loads(cti_json.read().decode())
except HTTPError:
return {}
def update_mitre_config(self) -> None:
if not (mitre_json := self.__get_mitre_json()):
self.__load_mitre_configs_from_files()
return

tactic_map = {}
technique_map = {}

# Map the tatics
for entry in mitre_json["objects"]:
if not entry["type"] == "x-mitre-tactic" or self.__revoked_or_deprecated(entry):
continue
for ref in entry["external_references"]:
if ref["source_name"] == 'mitre-attack':
tactic_map[entry["x_mitre_shortname"]] = entry["name"]
self.tactics[entry["name"].replace(' ', '_').lower()] = {
"external_id": ref["external_id"],
"url": ref["url"],
"tactic": entry["name"]
}
break

# Map the techniques
for entry in mitre_json["objects"]:
if not entry["type"] == "attack-pattern" or self.__revoked_or_deprecated(entry):
continue
if entry.get("x_mitre_is_subtechnique"):
continue
for ref in entry["external_references"]:
if ref["source_name"] in self.mitre_source_types:
technique_map[ref["external_id"]] = entry["name"]
sub_tactics = []
# Get Mitre Tactics (Kill-Chains)
for tactic in entry["kill_chain_phases"]:
if tactic["kill_chain_name"] in self.mitre_source_types:
# Map the short phase_name to tactic name
sub_tactics.append(tactic_map[tactic["phase_name"]])
self.techniques[ref["external_id"].lower()] = {
"technique_id": ref["external_id"],
"technique": entry["name"],
"url": ref["url"],
"tactic": sub_tactics
}
break

## Map the sub-techniques
for entry in mitre_json["objects"]:
if not entry["type"] == "attack-pattern" or self.__revoked_or_deprecated(entry):
continue
if entry.get("x_mitre_is_subtechnique"):
for ref in entry["external_references"]:
if ref["source_name"] in self.mitre_source_types:
sub_technique_id = ref["external_id"]
sub_technique_name = entry["name"]
parent_technique_name = technique_map[sub_technique_id.split(".")[0]]
sub_technique_name = "{} : {}".format(parent_technique_name, sub_technique_name)
self.techniques[ref["external_id"].lower()] = {
"technique_id": ref["external_id"],
"technique": sub_technique_name,
"url": ref["url"],
}
break

def __load_mitre_configs_from_files(self) -> None:
with open(os.path.join(ROOT_PROJECT_PATH, 'app/dictionaries/tactics.json'), 'r') as file:
self.tactics = json.load(file)

with open(os.path.join(ROOT_PROJECT_PATH, 'app/dictionaries/techniques.json'), 'r') as file:
self.techniques = json.load(file)

def get_tactic(self, tactic: str) -> dict:
tactic = tactic.replace('.', '_')
return self.tactics.get(tactic, {})

def get_technique(self, technique_id: str) -> dict:
return self.techniques.get(technique_id, {})
21 changes: 21 additions & 0 deletionssiem-converter/app/converter/core/mixins/rule.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
import json
from typing import List

import yaml

from app.converter.core.exceptions.core import InvalidYamlStructure, InvalidJSONStructure
from app.converter.core.mitre import MitreConfig


class JsonRuleMixin:
Expand All@@ -15,9 +17,28 @@ def load_rule(self, text):


class YamlRuleMixin:
mitre_config: MitreConfig = MitreConfig()

def load_rule(self, text):
try:
return yaml.safe_load(text)
except yaml.YAMLError as err:
raise InvalidYamlStructure(error=str(err))

def parse_mitre_attack(self, tags: List[str]) -> dict[str, list]:
result = {
'tactics': [],
'techniques': []
}
for tag in tags:
tag = tag.lower()
if tag.startswith('attack.'):
tag = tag[7::]
if tag.startswith('t'):
if technique := self.mitre_config.get_technique(tag):
result['techniques'].append(technique)
else:
if tactic := self.mitre_config.get_tactic(tag):
result['tactics'].append(tactic)

return result
4 changes: 2 additions & 2 deletionssiem-converter/app/converter/core/models/parser_output.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -17,8 +17,8 @@ def __init__(self, *,
license_: str = None,
severity: str = None,
references: List[str] = None,
tags:List[str] = None,
mitre_attack:List[str] = None,
tags:list[str] = None,
mitre_attack:dict[str, list] = None,
status: str = None,
false_positives: List[str] = None,
source_mapping_ids: List[str] = None
Expand Down
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -7,8 +7,10 @@
version = "0.01"
rule_id = "<rule_id_place_holder>"
status = "<status_place_holder>"
severity = "<severity_place_holder>"
tags = "<tags_place_holder>"
falsepositives = "<falsepositives_place_holder>"
severity = "<severity_place_holder>"
created = "<created_place_holder>"

events:
<query_placeholder>
Expand Down
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -96,4 +96,6 @@ def finalize_query(self, prefix: str, query: str, functions: str, meta_info: Met
rule = rule.replace("<severity_place_holder>", meta_info.severity)
rule = rule.replace("<status_place_holder>", meta_info.status)
rule = rule.replace("<falsepositives_place_holder>", ', '.join(meta_info.false_positives))
rule = rule.replace("<tags_place_holder>", ", ".join(meta_info.tags))
rule = rule.replace("<created_place_holder>", str(meta_info.date))
return rule
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -19,6 +19,7 @@

import copy
import json
from typing import Union

from app.converter.platforms.elasticsearch.const import ELASTICSEARCH_DETECTION_RULE, elasticsearch_rule_details
from app.converter.platforms.elasticsearch.mapping import ElasticSearchMappings, elasticsearch_mappings
Expand All@@ -27,6 +28,7 @@
from app.converter.core.models.platform_details import PlatformDetails
from app.converter.core.models.parser_output import MetaInfoContainer
from app.converter.tools.utils import concatenate_str, get_mitre_attack_str
from app.converter.core.mitre import MitreConfig


class ElasticSearchRuleFieldValue(ElasticSearchFieldValue):
Expand All@@ -36,6 +38,7 @@ class ElasticSearchRuleFieldValue(ElasticSearchFieldValue):
class ElasticSearchRuleRender(ElasticSearchQueryRender):
details: PlatformDetails = elasticsearch_rule_details
mappings: ElasticSearchMappings = elasticsearch_mappings
mitre: MitreConfig = MitreConfig()

or_token = "OR"
and_token = "AND"
Expand All@@ -44,6 +47,46 @@ class ElasticSearchRuleRender(ElasticSearchQueryRender):
field_value_map = ElasticSearchRuleFieldValue(or_token=or_token)
query_pattern = "{prefix} {query} {functions}"

def __create_mitre_threat(self, mitre_attack: dict) -> Union[list, list[dict]]:
if not mitre_attack.get('techniques'):
return []
threat = []

if not mitre_attack.get('tactics'):
for technique in mitre_attack.get('techniques'):
technique_name = technique['technique']
if '.' in technique_name:
technique_name = technique_name[:technique_name.index('.')]
threat.append(technique_name)
return threat

for tactic in mitre_attack['tactics']:
tactic_render = {
'id': tactic['external_id'],
'name': tactic['tactic'],
'reference': tactic['url']
}
sub_threat = {
'tactic': tactic_render,
'framework': 'MITRE ATT&CK',
'technique': []
}
for technique in mitre_attack['techniques']:
technique_id = technique['technique_id'].lower()
if '.' in technique_id:
technique_id = technique_id[:technique['technique_id'].index('.')]
main_technique = self.mitre.get_technique(technique_id)
if tactic['tactic'] in main_technique['tactic']:
sub_threat['technique'].append({
"id": main_technique['technique_id'],
"name": main_technique['technique'],
"reference": main_technique['url']
})
if len(sub_threat['technique']) > 0:
threat.append(sub_threat)

return threat

def finalize_query(self, prefix: str, query: str, functions: str, meta_info: MetaInfoContainer,
source_mapping: SourceMapping = None, not_supported_functions: list = None):
query = super().finalize_query(prefix=prefix, query=query, functions=functions, meta_info=meta_info)
Expand All@@ -61,7 +104,8 @@ def finalize_query(self, prefix: str, query: str, functions: str, meta_info: Met
"severity": meta_info.severity,
"references": meta_info.references,
"license": meta_info.license,
"tags": meta_info.mitre_attack,
"tags": meta_info.tags,
"threat": self.__create_mitre_threat(meta_info.mitre_attack),
"false_positives": meta_info.false_positives
})
rule_str = json.dumps(rule, indent=4, sort_keys=False, ensure_ascii=False)
Expand Down
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -52,8 +52,7 @@ def finalize_query(self, prefix: str, query: str, functions: str, meta_info: Met
"<description_place_holder>",
get_rule_description_str(
description=meta_info.description,
license=meta_info.license,
mitre_attack=meta_info.mitre_attack
license=meta_info.license
)
)
rule = rule.replace("<title_place_holder>", meta_info.title)
Expand Down
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -53,8 +53,7 @@ def finalize_query(self, prefix: str, query: str, functions: str, meta_info: Met
author=meta_info.author,
rule_id=meta_info.id,
license=meta_info.license,
references=meta_info.references,
mitre_attack=meta_info.mitre_attack
references=meta_info.references
)
rule_str = json.dumps(rule, indent=4, sort_keys=False)
if not_supported_functions:
Expand Down
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -25,7 +25,7 @@
from app.converter.core.mapping import SourceMapping
from app.converter.core.models.platform_details import PlatformDetails
from app.converter.core.models.parser_output import MetaInfoContainer
from app.converter.tools.utils import get_rule_description_str
from app.converter.tools.utils import get_rule_description_str, get_mitre_attack_str


_AUTOGENERATED_TITLE = "Autogenerated Falcon LogScale Alert"
Expand All@@ -45,10 +45,14 @@ def finalize_query(self, prefix: str, query: str, functions: str, meta_info: Met
rule = copy.deepcopy(DEFAULT_LOGSCALE_ALERT)
rule['query']['queryString'] = query
rule['name'] = meta_info.title or _AUTOGENERATED_TITLE
mitre_attack = []
if meta_info.mitre_attack:
mitre_attack = [f"ATTACK.{i['tactic']}" for i in meta_info.mitre_attack.get('tactics', [])]
mitre_attack.extend([f"ATTACK.{i['technique_id']}" for i in meta_info.mitre_attack.get('techniques', [])])
rule['description'] = get_rule_description_str(
description=meta_info.description,
license=meta_info.license,
mitre_attack=meta_info.mitre_attack,
mitre_attack=mitre_attack,
author=meta_info.author
)

Expand Down
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -40,6 +40,18 @@ class MicrosoftSentinelRuleRender(MicrosoftSentinelQueryRender):
or_token = "or"
field_value_map = MicrosoftSentinelRuleFieldValue(or_token=or_token)

def __create_mitre_threat(self, meta_info: MetaInfoContainer) -> tuple[list, list]:
tactics = []
techniques = []

for tactic in meta_info.mitre_attack.get('tactics'):
tactics.append(tactic['tactic'])

for technique in meta_info.mitre_attack.get('techniques'):
techniques.append(technique['technique_id'])

return tactics, techniques

def finalize_query(self, prefix: str, query: str, functions: str, meta_info: MetaInfoContainer,
source_mapping: SourceMapping = None, not_supported_functions: list = None):
query = super().finalize_query(prefix=prefix, query=query, functions=functions, meta_info=meta_info)
Expand All@@ -52,7 +64,9 @@ def finalize_query(self, prefix: str, query: str, functions: str, meta_info: Met
license=meta_info.license
)
rule["severity"] = meta_info.severity
rule["techniques"] = [el.upper() for el in meta_info.mitre_attack]
mitre_tactics, mitre_techniques = self.__create_mitre_threat(meta_info=meta_info)
rule['tactics'] = mitre_tactics
rule['techniques'] = mitre_techniques
json_rule = json.dumps(rule, indent=4, sort_keys=False)
if not_supported_functions:
rendered_not_supported = self.render_not_supported_functions(not_supported_functions)
Expand Down
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -15,6 +15,7 @@
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-----------------------------------------------------------------
"""
import re

from app.converter.core.exceptions.core import UnsupportedRootAParser, RootARuleValidationException
from app.converter.core.mixins.rule import YamlRuleMixin
Expand All@@ -27,16 +28,19 @@ class RootAParser(YamlRuleMixin):
parsers = parser_manager
mandatory_fields = {"name", "details", "author", "severity", "mitre-attack", "detection", "references", "license"}

@staticmethod
def __update_meta_info(meta_info: MetaInfoContainer, rule: dict) -> MetaInfoContainer:
def __update_meta_info(self, meta_info: MetaInfoContainer, rule: dict) -> MetaInfoContainer:
mitre_attack = rule.get("mitre-attack") or []
mitre_attack = [i.strip("") for i in mitre_attack.split(",")] if isinstance(mitre_attack, str) else mitre_attack
mitre_tags = [i.strip("") for i in mitre_attack.split(",")] if isinstance(mitre_attack, str) else mitre_attack
mitre_attack = self.parse_mitre_attack(mitre_tags)
rule_tags = rule.get('tags', [])
rule_tags += mitre_tags

meta_info.title = rule.get("name")
meta_info.description = rule.get("details")
meta_info.id = rule.get("uuid", meta_info.id)
meta_info.references = rule.get("references")
meta_info.license = rule.get("license", meta_info.license)
meta_info.tags =rule.get("tags",meta_info.tags)
meta_info.tags =rule_tags ormeta_info.tags
meta_info.mitre_attack = mitre_attack
meta_info.date = rule.get("date", meta_info.date)
meta_info.author = rule.get("author", meta_info.author)
Expand Down
12 changes: 2 additions & 10 deletionssiem-converter/app/converter/platforms/sigma/parsers/sigma.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -38,15 +38,6 @@ class SigmaParser(YamlRuleMixin):
mappings: SigmaMappings = sigma_mappings
mandatory_fields = {"title", "description", "references", "logsource", "detection"}

@staticmethod
def __parse_mitre_attack(tags: List[str]) -> List[str]:
result = []
for tag in tags:
if search := re.search(r"[tT]\d{4}(?:\.\d{3})?", tag):
result.append(search.group())

return result

@staticmethod
def __parse_false_positives(false_positives: Union[str, List[str], None]) -> list:
if isinstance(false_positives, str):
Expand All@@ -62,9 +53,10 @@ def _get_meta_info(self, rule: dict, source_mapping_ids: List[str]) -> MetaInfoC
date=rule.get("date"),
references=rule.get("references", []),
license_=rule.get("license"),
mitre_attack=self.__parse_mitre_attack(rule.get("tags", [])),
mitre_attack=self.parse_mitre_attack(rule.get("tags", [])),
severity=rule.get("level"),
status=rule.get("status"),
tags=rule.get("tags"),
false_positives=self.__parse_false_positives(rule.get("falsepositives")),
source_mapping_ids=source_mapping_ids
)
Expand Down
Loading

[8]ページ先頭

©2009-2025 Movatter.jp