Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

linter issues fix#41

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
alexvolha merged 3 commits intomainfromrefactoring
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletiontranslator/Dockerfile
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -4,6 +4,6 @@ WORKDIR /siem_converter
COPY . .
RUN pip install --upgrade pip && \
python -m pip install --upgrade setuptools && \
pip install --trusted-host=pypi.python.org --trusted-host=pypi.org --trusted-host=files.pythonhosted.org --no-cache-dir -Ur requirements.txt
pip install --trusted-host=pypi.python.org --trusted-host=pypi.org --trusted-host=files.pythonhosted.org --no-cache-dir -Ur requirements/requirements_prod.txt
EXPOSE 8000
CMD ["python", "server.py"]
39 changes: 15 additions & 24 deletionstranslator/app/routers/assistance.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,37 @@
import datetime
import json
from datetime import datetime
from typing import List, Dict
import os

from collections.abc import Generator
from contextlib import asynccontextmanager
from datetime import datetime

from fastapi import APIRouter, FastAPI

from const import ROOT_PROJECT_PATH

from app.translator.core.mitre import MitreConfig
from const import ROOT_PROJECT_PATH

assistance_router = APIRouter()

suggestions = {}


@asynccontextmanager
async def lifespan(app: FastAPI):
async def lifespan(app: FastAPI) -> Generator[None, None, None]: # noqa: ARG001
MitreConfig().update_mitre_config()
with open(os.path.join(ROOT_PROJECT_PATH, 'app/dictionaries/uncoder_meta_info_roota.json'), 'r') as file:
json_f = json.load(file)
suggestions['roota'] = json_f
with open(os.path.join(ROOT_PROJECT_PATH, 'app/dictionaries/uncoder_meta_info_sigma.json'), 'r') as file:
json_f = json.load(file)
suggestions['sigma'] = json_f
with open(os.path.join(ROOT_PROJECT_PATH, "app/dictionaries/uncoder_meta_info_roota.json")) as file:
suggestions["roota"] = json.load(file)
with open(os.path.join(ROOT_PROJECT_PATH, "app/dictionaries/uncoder_meta_info_sigma.json")) as file:
suggestions["sigma"] = json.load(file)
yield


@assistance_router.get(
'/suggestions/{parser_id}',
tags=["assistance"],
description="Get suggestions"
)
async def get_suggestions(parser_id: str) -> List[Dict]:
@assistance_router.get("/suggestions/{parser_id}", tags=["assistance"], description="Get suggestions")
async def get_suggestions(parser_id: str) -> list[dict]:
parser_dict = suggestions.get(parser_id, [])
if parser_id =='roota':
today = datetime.today().strftime('%Y-%m-%d')
if parser_id =="roota":
today = datetime.today().strftime("%Y-%m-%d")
for i in parser_dict:
if i['title'] =='Date':
for v in i['dictionary']:
v['name'] = today
if i["title"] =="Date":
for v in i["dictionary"]:
v["name"] = today
return parser_dict
return parser_dict
60 changes: 31 additions & 29 deletionstranslator/app/routers/ioc_translate.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,48 @@
from typing import Optional, List
from typing import Optional

from fastapi import APIRouter, Body

from app.translator.tools.const import IOCType, HashType, IocParsingRule
from app.translator.cti_translator import CTIConverter
from app.models.ioc_translation import CTIPlatform, OneTranslationCTIData
from app.models.translation import InfoMessage
from app.translator.cti_translator import CTIConverter
from app.translator.tools.const import HashType, IocParsingRule, IOCType

iocs_router = APIRouter()
converter = CTIConverter()


@iocs_router.post(
"/iocs/translate",
description="Parse IOCs from text.",
)
@iocs_router.post("/iocs/translate", description="Parse IOCs from text.")
@iocs_router.post("/iocs/translate", include_in_schema=False)
def parse_and_translate_iocs(
text: str = Body(..., description="Text to parse IOCs from", embed=True),
iocs_per_query: int = Body(25, description="Platforms to parse IOCs to", embed=True),
platform: CTIPlatform = Body(..., description="Platforms to parse IOCs to", embed=True),
include_ioc_types: Optional[List[IOCType]] = Body(
None, description="List of IOC types to include. By default all types are enabled.", embed=True),
include_hash_types: Optional[List[HashType]] = Body(
None, description="List of hash types to include. By default all hash types are enabled.", embed=True),
exceptions: Optional[List[str]] = Body(
None, description="List of exceptions. IOC is ignored if it contains one of exception values.", embed=True),
ioc_parsing_rules: Optional[List[IocParsingRule]] = Body(
None, embed=True, description="Additional parsing parameters."),
include_source_ip: Optional[bool] = Body(
False, description="Include source IP in query. By default it is false."
)
include_ioc_types: Optional[list[IOCType]] = Body(
None, description="List of IOC types to include. By default all types are enabled.", embed=True
),
include_hash_types: Optional[list[HashType]] = Body(
None, description="List of hash types to include. By default all hash types are enabled.", embed=True
),
exceptions: Optional[list[str]] = Body(
None, description="List of exceptions. IOC is ignored if it contains one of exception values.", embed=True
),
ioc_parsing_rules: Optional[list[IocParsingRule]] = Body(
None, embed=True, description="Additional parsing parameters."
),
include_source_ip: Optional[bool] = Body(False, description="Include source IP in query. By default it is false."),
) -> OneTranslationCTIData:
status, translations = converter.convert(text=text,
platform_data=platform,
iocs_per_query=iocs_per_query,
include_ioc_types=include_ioc_types,
include_hash_types=include_hash_types,
exceptions=exceptions,
ioc_parsing_rules=ioc_parsing_rules,
include_source_ip=include_source_ip)
status, translations = converter.convert(
text=text,
platform_data=platform,
iocs_per_query=iocs_per_query,
include_ioc_types=include_ioc_types,
include_hash_types=include_hash_types,
exceptions=exceptions,
ioc_parsing_rules=ioc_parsing_rules,
include_source_ip=include_source_ip,
)
if status:
return OneTranslationCTIData(status=status, translations=translations, target_siem_type=platform.name)
else:
info_message = InfoMessage(message=translations, severity="error")
return OneTranslationCTIData(info=info_message, status=status, target_siem_type=platform.name)

info_message = InfoMessage(message=translations, severity="error")
return OneTranslationCTIData(info=info_message, status=status, target_siem_type=platform.name)
105 changes: 47 additions & 58 deletionstranslator/app/routers/translate.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -1,101 +1,90 @@
from fastapi import APIRouter, Body

from app.translator.translator importSiemConverter
from app.models.translation importConvertorPlatforms, InfoMessage, OneTranslationData, Platform
from app.translator.cti_translator import CTIConverter
from app.models.translation importOneTranslationData, ConvertorPlatforms, Platform, InfoMessage
from app.translator.translator importSiemConverter

st_router = APIRouter()

converter = SiemConverter()


@st_router.post(
"/translate",
tags=["siem_translate"],
description="Generate target translation",
)
@st_router.post("/translate", tags=["siem_translate"], description="Generate target translation")
@st_router.post("/translate/", include_in_schema=False)
def generate_one_translation(
source_siem: str = Body(..., embed=True),
source_scheme: str = Body(None, embed=True),
source_scheme: str = Body(None, embed=True), # noqa: ARG001
target_siem: str = Body(..., embed=True),
target_scheme: str = Body(None, embed=True),
target_scheme: str = Body(None, embed=True), # noqa: ARG001
text: str = Body(..., embed=True),

) -> OneTranslationData:
status, data = converter.generate_translation(
text=text,
source=source_siem,
target=target_siem
)
status, data = converter.generate_translation(text=text, source=source_siem, target=target_siem)
if status:
return OneTranslationData(
status=status,
translation=data,
target_siem_type=target_siem)
else:
info_message = InfoMessage(message=data, severity="error")
return OneTranslationData(
info=info_message,
status=status,
target_siem_type=target_siem)
return OneTranslationData(status=status, translation=data, target_siem_type=target_siem)

info_message = InfoMessage(message=data, severity="error")
return OneTranslationData(info=info_message, status=status, target_siem_type=target_siem)


@st_router.post(
"/translate/all",
tags=["siem_translate"],
description="Generate all translations",
)
@st_router.post("/translate/all", tags=["siem_translate"], description="Generate all translations")
@st_router.post("/translate/all/", include_in_schema=False)
def generate_all_translations(
source_siem: str = Body(..., embed=True),
source_scheme: str = Body(None, embed=True),
source_scheme: str = Body(None, embed=True), # noqa: ARG001
text: str = Body(..., embed=True),
) -> list[OneTranslationData]:
result = converter.generate_all_translation(
text=text,
source=source_siem
)
result = converter.generate_all_translation(text=text, source=source_siem)
translations = []
for siem_result in result:
if siem_result.get("status"):
translations.append(OneTranslationData(
status=siem_result.get("status", True),
translation=siem_result.get("result"),
target_siem_type=siem_result.get("siem_type"))
translations.append(
OneTranslationData(
status=siem_result.get("status", True),
translation=siem_result.get("result"),
target_siem_type=siem_result.get("siem_type"),
)
)
else:
translations.append(OneTranslationData(
status=siem_result.get("status", False),
info=InfoMessage(message=siem_result.get("result"), severity="error"),
target_siem_type=siem_result.get("siem_type"))
translations.append(
OneTranslationData(
status=siem_result.get("status", False),
info=InfoMessage(message=siem_result.get("result"), severity="error"),
target_siem_type=siem_result.get("siem_type"),
)
)
return translations


@st_router.get(
"/platforms",
tags=["siem_translate"],
description="Get translator platforms",
)
@st_router.get("/platforms", tags=["siem_translate"], description="Get translator platforms")
@st_router.get("/platforms/", include_in_schema=False)
def get_convertor_platforms() -> ConvertorPlatforms:
renders, parsers = converter.get_all_platforms()
return ConvertorPlatforms(renders=renders, parsers=parsers)


@st_router.get(
"/all_platforms",
description="Get Sigma, RootA and iocs platforms",
)
@st_router.get("/all_platforms", description="Get Sigma, RootA and iocs platforms")
@st_router.get("/all_platforms/", include_in_schema=False)
def get_all_platforms() -> list:
converter_renders, converter_platforms = converter.get_all_platforms()
return [
Platform(id="roota", name="RootA", code="roota", group_name="RootA", group_id="roota",
renders=converter_renders, parsers=converter_platforms),
Platform(id="sigma", name="Sigma", code="sigma", group_name="Sigma", group_id="sigma",
renders=[render for render in converter_renders if render.code != "sigma"]),
Platform(id="ioc", name="IOCs", code="ioc", group_name="IOCs", group_id="ioc",
renders=CTIConverter().get_renders())
Platform(
id="roota",
name="RootA",
code="roota",
group_name="RootA",
group_id="roota",
renders=converter_renders,
parsers=converter_platforms,
),
Platform(
id="sigma",
name="Sigma",
code="sigma",
group_name="Sigma",
group_id="sigma",
renders=[render for render in converter_renders if render.code != "sigma"],
),
Platform(
id="ioc", name="IOCs", code="ioc", group_name="IOCs", group_id="ioc", renders=CTIConverter().get_renders()
),
]
4 changes: 2 additions & 2 deletionstranslator/app/translator/const.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
from os.path import abspath, dirname
from typing import Union, List
from typing import Union

APP_PATH = dirname(abspath(__file__))

CTI_MIN_LIMIT_QUERY = 10000

CTI_IOCS_PER_QUERY_LIMIT = 25

DEFAULT_VALUE_TYPE = Union[Union[int, str,List[int],List[str]]]
DEFAULT_VALUE_TYPE = Union[int, str,list[int],list[str]]
8 changes: 8 additions & 0 deletionstranslator/app/translator/core/custom_types/meta_info.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
from app.translator.tools.custom_enum import CustomEnum


class SeverityType(CustomEnum):
critical = "critical"
high = "high"
medium = "medium"
low = "low"
10 changes: 5 additions & 5 deletionstranslator/app/translator/core/escape_manager.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
import re
from abc import ABC
from typing import Union
from typing importClassVar,Union

from app.translator.core.custom_types.values import ValueType
from app.translator.core.models.escape_details import EscapeDetails


class EscapeManager(ABC):
escape_map: dict[str, EscapeDetails] = {}
escape_map:ClassVar[dict[str, EscapeDetails]] = {}

def escape(self, value: Union[str, int], value_type: str = ValueType.value) -> Union[str, int]:
if isinstance(value, int):
Expand All@@ -17,8 +17,8 @@ def escape(self, value: Union[str, int], value_type: str = ValueType.value) -> U
value = symbols_pattern.sub(escape_details.escape_symbols, value)
return value

def remove_escape(self, value: Union[str, int]) -> Union[str, int]:
@staticmethod
def remove_escape(value: Union[str, int]) -> Union[str, int]:
if isinstance(value, int):
return value
value = value.encode().decode("unicode_escape")
return value
return value.encode().decode("unicode_escape")
Loading

[8]ページ先頭

©2009-2025 Movatter.jp