5
\$\begingroup\$

Today, I've got a smallFlowLogger for you to review. The idea is not to log pure messages, but focus on the flow of the app that might contain such items as:

  • state - json-snapshot of variables etc
  • event - something worth logging has occured
  • started - the flow has started
  • altered - the flow has been altered by some condition like anif
  • canceled - the flow has been canceled before it run to completion
  • faulted - the flow experienced an exception
  • completed - the flow has succesfuly run to completion

Implementation

The APIs are implemented by theFlowLogger that is build on top of the standard logging without exposing the original methods:

import jsonimport loggingimport functoolsimport pyodbcfrom typing import Union, Listfrom timeit import default_timer as timerfrom datetime import datetimefrom logging import Handler
class FlowLogger:    def __init__(self, name: str, handlers: List[logging.Handler]):        self._logger = logging.Logger(name)        for h in handlers:            self._logger.addHandler(h)    def state(self, **kwargs):        self._logger.info(json.dumps(kwargs, sort_keys=True), (), **dict(extra={"flow": FlowLogger.state.__name__}))    def event(self, name: str):        self._logger.info(name, (), **dict(extra={"flow": FlowLogger.event().__name__}))    def started(self, name: Union[str | None] = None, **kwargs):        self._logger.info(name, **dict(extra=dict(**kwargs, **{"flow": FlowLogger.started.__name__})))    def altered(self, reason: str):        self._logger.info(reason, **dict(extra={"flow": FlowLogger.altered.__name__}))    def canceled(self, reason: Union[str | None]):        self._logger.warning(reason, **dict(extra={"flow": FlowLogger.canceled.__name__}))    def faulted(self, **kwargs):        self._logger.exception(None, extra=dict(**kwargs, **{"flow": FlowLogger.faulted.__name__}))    def completed(self, **kwargs):        self._logger.info(None, **dict(extra=dict(**kwargs, **{"flow": FlowLogger.completed.__name__})))

In order to not repeat myself when logging the flow of entire methods thetelemetry decorator takes over:

def telemetry(flow: FlowLogger):    def decorate(decoratee):        @functools.wraps(decoratee)        def wrapper(*decoratee_args, **decoratee_kwargs):            with FlowScope(flow, decoratee.__name__):                return decoratee(*decoratee_args, **decoratee_kwargs)        return wrapper    return decorate

It's supported by theFlowScope that handles the time measurement and exception handling:

class FlowScope:    def __init__(self, flow: FlowLogger, api: str):        self.flow = flow        self.api = api        self.start = timer()        pass    def __enter__(self):        flow.started(api=self.api)    def __exit__(self, exc_type, exc_val, exc_tb):        if exc_type:            flow.faulted(api=self.api, elapsed=timer() - self.start)        else:            flow.completed(api=self.api, elapsed=timer() - self.start)

The output is handled by theSqlServerHandler:

    create table log(        [id] int identity(1,1) primary key,        [timestamp] datetime2,        [logger] nvarchar(50),        [module] nvarchar(200),        [api] nvarchar(200),        [flow] nvarchar(50),        [elapsed] float,        [message] nvarchar(max),        [line] int    )
class SqlServerHandler(Handler):    def __init__(self, server: str, database: str, username: str, password: str):        super().__init__(logging.INFO)        connection = pyodbc.connect(f"DRIVER={{ODBC Driver 17 for SQL Server}};SERVER={server};DATABASE={database};UID={username};PWD={password}")        self.db = connection.cursor()    def emit(self, record):        if record.exc_info:            record.exc_text = logging.Formatter().formatException(record.exc_info)        self.db.execute(            "INSERT INTO log([timestamp], [logger], [module], [api], [flow], [elapsed], [message], [line]) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",            datetime.fromtimestamp(record.created),            record.name,            record.module,            record.api if hasattr(record, "api") else record.funcName,            record.flow if hasattr(record, "flow") else None,            record.elapsed if hasattr(record, "elapsed") else None,            record.exc_text if record.exc_text else record.msg,            record.lineno        )        self.db.commit()

Example

This is mydirty code for how I'm testing it:

def initialize_telemetry() -> FlowLogger:    stream_handler = logging.StreamHandler()    stream_handler.setFormatter(logging.Formatter(style="{", fmt="{asctime} | {module}.{funcName} | {flow} | {message}", defaults={"flow": "<flow>", "message": "<message>"}))    sql_server_handler = SqlServerHandler(server="localhost,1433", database="master", username="sa", password="***")    flow = FlowLogger("test-logger", [stream_handler, sql_server_handler])    try:        return flow    finally:        flow.completed()flow = initialize_telemetry()@telemetry(flow)def flow_decorator_test():    raise ValueError    passdef flow_test():    flow.started("Custom flow.")    flow.state(foo="bar")    if True:        flow.altered("The value was true.")    try:        raise ValueError    except:        flow.faulted()    flow.completed()if __name__ == "__main__":    flow_test()    flow_decorator_test()

What do you think? Would say I'm doing something terribly wrong here?

200_success's user avatar
200_success
146k22 gold badges191 silver badges481 bronze badges
askedAug 29, 2022 at 15:35
t3chb0t's user avatar
\$\endgroup\$
1
  • \$\begingroup\$There is a new version, a follow-up of this questionhere.\$\endgroup\$CommentedOct 25, 2023 at 10:43

0

You mustlog in to answer this question.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.