Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

field class refactoring#37

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
alexvolha merged 3 commits intomainfromfield-class-refactoring
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletiontranslator/app/translator/core/functions.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -58,7 +58,7 @@ def concat_kwargs(kwargs: Dict[str, str]) -> str:

@staticmethod
def map_field(field: Field, source_mapping: SourceMapping) -> str:
generic_field_name = field.generic_names_map[source_mapping.source_id]
generic_field_name = field.get_generic_field_name(source_mapping.source_id)
mapped_field = source_mapping.fields_mapping.get_platform_field_name(generic_field_name=generic_field_name)
if isinstance(mapped_field, list):
mapped_field = mapped_field[0]
Expand Down
4 changes: 4 additions & 0 deletionstranslator/app/translator/core/mapping.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -122,3 +122,7 @@ def get_suitable_source_mappings(self, *args, **kwargs) -> List[SourceMapping]:

def get_source_mapping(self, source_id: str) -> Optional[SourceMapping]:
return self._source_mappings.get(source_id)

@property
def default_mapping(self) -> SourceMapping:
return self._source_mappings[DEFAULT_MAPPING_NAME]
19 changes: 11 additions & 8 deletionstranslator/app/translator/core/mixins/logic.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,29 @@
from typing import List, Union

from app.translator.core.models.field import Field, Keyword
from app.translator.core.models.identifier import Identifier
from app.translator.core.custom_types.tokens import LogicalOperatorType, GroupType
from app.translator.core.models.field import FieldValue, Keyword
from app.translator.core.models.identifier import Identifier


class ANDLogicOperatorMixin:

@staticmethod
def get_missed_and_token_indices(tokens: List[Union[Field, Keyword, Identifier]]) -> List[int]:
def get_missed_and_token_indices(tokens: List[Union[FieldValue, Keyword, Identifier]]) -> List[int]:
missed_and_indices = []
for index in range(len(tokens) - 1):
token = tokens[index]
next_token = tokens[index + 1]
if (isinstance(token, (Field, Keyword))
and not (isinstance(next_token, Identifier) and (
next_token.token_type in LogicalOperatorType
or next_token.token_type == GroupType.R_PAREN))):
if ((isinstance(token, (FieldValue, Keyword))
or isinstance(token, Identifier) and token.token_type == GroupType.R_PAREN)
and not (isinstance(next_token, Identifier)
and (next_token.token_type
in (LogicalOperatorType.AND, LogicalOperatorType.OR, GroupType.R_PAREN)))):
missed_and_indices.append(index + 1)
return list(reversed(missed_and_indices))

def add_and_token_if_missed(self, tokens: List[Union[Field, Keyword, Identifier]]) -> List[Union[Field, Keyword, Identifier]]:
def add_and_token_if_missed(self,
tokens: List[Union[FieldValue, Keyword, Identifier]]
) -> List[Union[FieldValue, Keyword, Identifier]]:
indices = self.get_missed_and_token_indices(tokens=tokens)
for index in indices:
tokens.insert(index, Identifier(token_type=LogicalOperatorType.AND))
Expand Down
52 changes: 24 additions & 28 deletionstranslator/app/translator/core/models/field.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,36 @@
from typing import Union, Optional

from app.translator.core.mapping import SourceMapping, DEFAULT_MAPPING_NAME
from app.translator.core.models.identifier import Identifier
from app.translator.core.custom_types.tokens import OperatorType


class Field:
def __init__(self, source_name: str, operator: Identifier = None, value: Union[int, str, list, tuple] = None):
def __init__(self, source_name: str):
self.source_name = source_name
self.__generic_names_map = {}

def get_generic_field_name(self, source_id: str) -> Optional[str]:
return self.__generic_names_map.get(source_id)

def set_generic_names_map(self, source_mappings: list[SourceMapping], default_mapping: SourceMapping) -> None:
generic_names_map = {
source_mapping.source_id: source_mapping.fields_mapping.get_generic_field_name(self.source_name)
for source_mapping in source_mappings
}
if DEFAULT_MAPPING_NAME not in generic_names_map:
fields_mapping = default_mapping.fields_mapping
generic_names_map[DEFAULT_MAPPING_NAME] = fields_mapping.get_generic_field_name(self.source_name)

self.__generic_names_map = generic_names_map


class FieldValue:
def __init__(self, source_name: str, operator: Identifier, value: Union[int, str, list, tuple]):
self.field = Field(source_name=source_name)
self.operator = operator
self.values = []
self.__add_value(value)
self.source_name = source_name # input translation field name
self.generic_names_map = {}

@property
def value(self):
Expand All@@ -30,31 +50,7 @@ def __add__(self, other):
self.values.append(other)

def __repr__(self):
if self.operator:
return f"{self.source_name} {self.operator.token_type} {self.values}"

return f"{self.source_name}"

def __eq__(self, other):
if isinstance(other, Field):
return self._hash == other._hash
"""For OR operator check"""
if self.source_name == other.source_name and self.operator == other.operator:
return True
return False

def __neq__(self, other):
"""For AND operator check"""
if self.source_name != other.source_name:
return True
return False

@property
def _hash(self):
return hash(str(self))

def __hash__(self):
return hash(str(self))
return f"{self.field.source_name} {self.operator.token_type} {self.values}"


class Keyword:
Expand Down
4 changes: 2 additions & 2 deletionstranslator/app/translator/core/models/functions/base.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -3,14 +3,14 @@
from dataclasses import dataclass, field
from typing import List, Union

from app.translator.core.models.field import Field, Keyword
from app.translator.core.models.field import Field,FieldValue,Keyword
from app.translator.core.models.identifier import Identifier


@dataclass
class Function:
name: str = None
args: List[Union[Field, Keyword, Function, Identifier]] = field(default_factory=list)
args: List[Union[Field,FieldValue,Keyword, Function, Identifier]] = field(default_factory=list)
as_clause: str = None
by_clauses: List[Field] = field(default_factory=list)

Expand Down
14 changes: 7 additions & 7 deletionstranslator/app/translator/core/parser.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -21,7 +21,7 @@

from app.translator.core.functions import PlatformFunctions
from app.translator.core.mapping import BasePlatformMappings, SourceMapping
from app.translator.core.models.field importField
from app.translator.core.models.field importFieldValue
from app.translator.core.models.functions.base import ParsedFunctions
from app.translator.core.models.platform_details import PlatformDetails
from app.translator.core.models.parser_output import SiemContainer, MetaInfoContainer
Expand DownExpand Up@@ -50,15 +50,15 @@ def get_tokens_and_source_mappings(self,
if not query:
raise TokenizerGeneralException("Can't translate empty query. Please provide more details")
tokens = self.tokenizer.tokenize(query=query)
field_tokens = self.tokenizer.filter_tokens(tokens,Field)
field_tokens =[token.field for token inself.tokenizer.filter_tokens(tokens,FieldValue)]
field_names = [field.source_name for field in field_tokens]
suitable_source_mappings = self.mappings.get_suitable_source_mappings(field_names=field_names, **log_sources)
self.tokenizer.set_field_generic_names_map(field_tokens,suitable_source_mappings, self.mappings)
source_mappings = self.mappings.get_suitable_source_mappings(field_names=field_names, **log_sources)
self.tokenizer.set_field_tokens_generic_names_map(field_tokens,source_mappings, self.mappings.default_mapping)

return tokens,suitable_source_mappings
return tokens,source_mappings

def set_functions_fields_generic_names(self,
functions: ParsedFunctions,
source_mappings: List[SourceMapping]) -> None:
field_tokens = self.tokenizer.filter_function_tokens(tokens=functions.functions)
self.tokenizer.set_field_generic_names_map(field_tokens, source_mappings, self.mappings)
field_tokens = self.tokenizer.get_field_tokens_from_func_args(args=functions.functions)
self.tokenizer.set_field_tokens_generic_names_map(field_tokens, source_mappings, self.mappings.default_mapping)
23 changes: 14 additions & 9 deletionstranslator/app/translator/core/render.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -27,11 +27,12 @@
from app.translator.core.exceptions.parser import UnsupportedOperatorException
from app.translator.core.functions import PlatformFunctions
from app.translator.core.mapping import BasePlatformMappings, SourceMapping, LogSourceSignature, DEFAULT_MAPPING_NAME
from app.translator.core.models.field import Field, Keyword
from app.translator.core.models.field import Field,FieldValue,Keyword
from app.translator.core.models.functions.base import Function, ParsedFunctions
from app.translator.core.models.identifier import Identifier
from app.translator.core.models.platform_details import PlatformDetails
from app.translator.core.models.parser_output import MetaInfoContainer
from app.translator.core.custom_types.tokens import LogicalOperatorType, OperatorType, GroupType
from app.translator.core.custom_types.tokens import LogicalOperatorType, OperatorType


class BaseQueryFieldValue(ABC):
Expand DownExpand Up@@ -133,7 +134,7 @@ def generate_functions(self, functions: List[Function], source_mapping: SourceMa
return self.platform_functions.render(functions, source_mapping) if self.platform_functions else ""

def map_field(self, field: Field, source_mapping: SourceMapping) -> List[str]:
generic_field_name = field.generic_names_map[source_mapping.source_id]
generic_field_name = field.get_generic_field_name(source_mapping.source_id)
# field can be mapped to corresponding platform field name or list of platform field names
mapped_field = source_mapping.fields_mapping.get_platform_field_name(generic_field_name=generic_field_name)
if not mapped_field and self.is_strict_mapping:
Expand All@@ -145,10 +146,10 @@ def map_field(self, field: Field, source_mapping: SourceMapping) -> List[str]:
return mapped_field if mapped_field else [generic_field_name] if generic_field_name else [field.source_name]

def apply_token(self,
token: Union[Field, Keyword,LogicalOperatorType, GroupType],
token: Union[FieldValue, Keyword,Identifier],
source_mapping: SourceMapping) -> str:
if isinstance(token,(Field, Keyword)):
mapped_fields = self.map_field(token, source_mapping) if isinstance(token, Field) else [None]
if isinstance(token,FieldValue):
mapped_fields = self.map_field(token.field, source_mapping)
if len(mapped_fields) > 1:
return self.group_token % self.operator_map[LogicalOperatorType.OR].join([
self.field_value_map.apply_field_value(field=field, operator=token.operator, value=token.value)
Expand All@@ -158,12 +159,17 @@ def apply_token(self,
return self.field_value_map.apply_field_value(field=mapped_fields[0],
operator=token.operator,
value=token.value)
elif isinstance(token, Keyword):
return self.field_value_map.apply_field_value(field=None,
operator=token.operator,
value=token.value)
elif token.token_type in LogicalOperatorType:
return self.operator_map.get(token.token_type)

return token.token_type

def generate_query(self,
query: List[Union[Field, Keyword,LogicalOperatorType, GroupType]],
query: List[Union[FieldValue, Keyword,Identifier]],
source_mapping: SourceMapping) -> str:
result_values = []
for token in query:
Expand All@@ -173,8 +179,7 @@ def generate_query(self,
def wrap_query_with_meta_info(self, meta_info: MetaInfoContainer, query: str):
if meta_info and (meta_info.id or meta_info.title):
query_meta_info = "\n".join(
self.wrap_with_comment(f"{key}{value}")
for key, value in {"name: ": meta_info.title, "uuid: ": meta_info.id}.items() if value
self.wrap_with_comment(f"{key}{value}") for key, value in {"name: ": meta_info.title, "uuid: ": meta_info.id}.items() if value
)
query = f"{query}\n\n{query_meta_info}"
return query
Expand Down
64 changes: 29 additions & 35 deletionstranslator/app/translator/core/tokenizer.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -27,20 +27,20 @@
TokenizerGeneralException,
QueryParenthesesException
)
from app.translator.core.mapping import SourceMapping, DEFAULT_MAPPING_NAME, BasePlatformMappings
from app.translator.core.models.field import Field, Keyword
from app.translator.core.mapping import SourceMapping
from app.translator.core.models.field import Field,FieldValue,Keyword
from app.translator.core.models.functions.base import Function
from app.translator.core.models.functions.sort import SortArg
from app.translator.core.models.identifier import Identifier
from app.translator.core.custom_types.tokens import OperatorType, GroupType
from app.translator.tools.utils import get_match_group

TOKEN_TYPE = Union[Field, Keyword, Identifier]
TOKEN_TYPE = Union[FieldValue, Keyword, Identifier]


class BaseTokenizer(ABC):
@abstractmethod
def tokenize(self, query: str) -> List[Union[Field, Keyword, Identifier]]:
def tokenize(self, query: str) -> List[Union[FieldValue, Keyword, Identifier]]:
raise NotImplementedError()


Expand DownExpand Up@@ -180,18 +180,18 @@ def process_value_wildcard_symbols(self,
return self._clean_value(value, wildcard_symbol), op

@staticmethod
defcreate_field(field_name: str, operator: Identifier, value: Union[str, List]) ->Field:
returnField(operator=operator, value=value, source_name=field_name)
defcreate_field_value(field_name: str, operator: Identifier, value: Union[str, List]) ->FieldValue:
returnFieldValue(source_name=field_name,operator=operator, value=value)

def search_field_value(self, query):
def search_field_value(self, query) -> Tuple[FieldValue, str]:
field_name = self.search_field(query)
operator = self.search_operator(query, field_name)
query, operator, value = self.search_value(query=query, operator=operator, field_name=field_name)
value, operator_token = self.process_value_wildcard_symbols(value=value,
operator=operator,
wildcard_symbol=self.wildcard_symbol)
field = self.create_field(field_name=field_name, operator=operator_token, value=value)
returnfield, query
field_value = self.create_field_value(field_name=field_name, operator=operator_token, value=value)
returnfield_value, query

def _match_field_value(self, query: str, white_space_pattern: str = r"\s+") -> bool:
single_value_operator_group = fr"(?:{'|'.join(self.single_value_operators_map)})"
Expand All@@ -208,7 +208,7 @@ def _match_field_value(self, query: str, white_space_pattern: str = r"\s+") -> b

return False

def _get_identifier(self, query: str) -> Tuple[Union[Field, Keyword, Identifier], str]:
def _get_identifier(self, query: str) -> Tuple[Union[FieldValue, Keyword, Identifier], str]:
query = query.strip("\n").strip(" ").strip("\n")
if query.startswith(GroupType.L_PAREN):
return Identifier(token_type=GroupType.L_PAREN), query[1:]
Expand DownExpand Up@@ -240,7 +240,7 @@ def _validate_parentheses(tokens):
raise QueryParenthesesException()
return True

def tokenize(self, query: str) -> List[Union[Field, Keyword, Identifier]]:
def tokenize(self, query: str) -> List[Union[FieldValue, Keyword, Identifier]]:
tokenized = []
while query:
identifier, query = self._get_identifier(query=query)
Expand All@@ -250,34 +250,28 @@ def tokenize(self, query: str) -> List[Union[Field, Keyword, Identifier]]:

@staticmethod
def filter_tokens(tokens: List[TOKEN_TYPE],
token_type: Union[Type[Field], Type[Keyword], Type[Identifier]]) -> List[TOKEN_TYPE]:
token_type: Union[Type[FieldValue], Type[Keyword], Type[Identifier]]) -> List[TOKEN_TYPE]:
return [token for token in tokens if isinstance(token, token_type)]

def filter_function_tokens(self,
tokens: List[Union[Field, Keyword, Identifier, Function, SortArg]]) -> List[TOKEN_TYPE]:
def get_field_tokens_from_func_args(self,
args: List[Union[Field, FieldValue, Keyword, Identifier, Function, SortArg]]
) -> List[Field]:
result = []
for token in tokens:
if isinstance(token, Field):
result.append(token)
elif isinstance(token, Function):
result.extend(self.filter_function_tokens(tokens=token.args))
result.extend(self.filter_function_tokens(tokens=token.by_clauses))
elif isinstance(token, SortArg):
result.append(token.field)
for arg in args:
if isinstance(arg, Field):
result.append(arg)
elif isinstance(arg, FieldValue):
result.append(arg.field)
elif isinstance(arg, Function):
result.extend(self.get_field_tokens_from_func_args(args=arg.args))
result.extend(self.get_field_tokens_from_func_args(args=arg.by_clauses))
elif isinstance(arg, SortArg):
result.append(arg.field)
return result

@staticmethod
defset_field_generic_names_map(tokens: List[Field],
source_mappings: List[SourceMapping],
platform_mappings: BasePlatformMappings) -> None:
defset_field_tokens_generic_names_map(tokens: List[Field],
source_mappings: List[SourceMapping],
default_mapping: SourceMapping) -> None:
for token in tokens:
generic_names_map = {
source_mapping.source_id: source_mapping.fields_mapping.get_generic_field_name(token.source_name)
for source_mapping in source_mappings
}
if DEFAULT_MAPPING_NAME not in generic_names_map:
default_source_mapping = platform_mappings.get_source_mapping(DEFAULT_MAPPING_NAME)
fields_mapping = default_source_mapping.fields_mapping
generic_names_map[DEFAULT_MAPPING_NAME] = fields_mapping.get_generic_field_name(token.source_name)

token.generic_names_map = generic_names_map
token.set_generic_names_map(source_mappings, default_mapping)
Loading

[8]ページ先頭

©2009-2025 Movatter.jp