# Copyright (c) 2015-2025 Vector 35 Inc## Permission is hereby granted, free of charge, to any person obtaining a copy# of this software and associated documentation files (the "Software"), to# deal in the Software without restriction, including without limitation the# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or# sell copies of the Software, and to permit persons to whom the Software is# furnished to do so, subject to the following conditions:## The above copyright notice and this permission notice shall be included in# all copies or substantial portions of the Software.## THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS# IN THE SOFTWARE.importargparseimportcmdimportctypesimportjsonimporttracebackfromtypingimportList,Union,Callable,Optional,Any# Binary Ninja componentsimportbinaryninjafrom.logimportlog_error_for_exceptionfrom.import_binaryninjacoreascorefrom.flowgraphimportFlowGraph,CoreFlowGraphfrom.importbinaryviewfrom.importfunctionas_functionfrom.importlowlevelilfrom.importmediumlevelilfrom.importhighlevelilActivityType=Union['Activity',str][docs]classAnalysisContext:""":class:`AnalysisContext` is a proxy object that provides access to the current analysis context,including the associated :class:`BinaryView`, :class:`Function`, and intermediate language (IL)representations. It provides APIs to retrieve and modify the in-progress analysis state and allowsusers to notify the analysis system of any changes or updates."""[docs]def__init__(self,handle:core.BNAnalysisContextHandle):asserthandleisnotNoneself.handle=handle @propertydefview(self)->Optional['binaryview.BinaryView']:"""BinaryView for the current AnalysisContext (writable)"""result=core.BNAnalysisContextGetBinaryView(self.handle)ifnotresult:returnNonereturnbinaryview.BinaryView(handle=result)@propertydeffunction(self)->Optional['_function.Function']:"""Function for the current AnalysisContext (read-only)"""result=core.BNAnalysisContextGetFunction(self.handle)ifnotresult:returnNonereturn_function.Function(handle=result)@propertydeflifted_il(self)->Optional[lowlevelil.LowLevelILFunction]:"""LowLevelILFunction used to represent lifted IL (writable)"""result=core.BNAnalysisContextGetLiftedILFunction(self.handle)ifnotresult:returnNonereturnlowlevelil.LowLevelILFunction(handle=result)@lifted_il.setterdeflifted_il(self,lifted_il:lowlevelil.LowLevelILFunction)->None:core.BNSetLiftedILFunction(self.handle,lifted_il.handle)@propertydefllil(self)->Optional[lowlevelil.LowLevelILFunction]:"""LowLevelILFunction used to represent Low Level IL (writable)"""result=core.BNAnalysisContextGetLowLevelILFunction(self.handle)ifnotresult:returnNonereturnlowlevelil.LowLevelILFunction(handle=result)@llil.setterdefllil(self,value:lowlevelil.LowLevelILFunction)->None:core.BNSetLowLevelILFunction(self.handle,value.handle)@propertydefmlil(self)->Optional[mediumlevelil.MediumLevelILFunction]:"""MediumLevelILFunction used to represent Medium Level IL (writable)"""result=core.BNAnalysisContextGetMediumLevelILFunction(self.handle)ifnotresult:returnNonereturnmediumlevelil.MediumLevelILFunction(handle=result)@mlil.setterdefmlil(self,value:mediumlevelil.MediumLevelILFunction)->None:self.set_mlil_function(value)[docs]defset_mlil_function(self,new_func:mediumlevelil.MediumLevelILFunction,llil_ssa_to_mlil_instr_map:Optional['mediumlevelil.LLILSSAToMLILInstructionMapping']=None,llil_ssa_to_mlil_expr_map:Optional['mediumlevelil.LLILSSAToMLILExpressionMapping']=None,)->None:"""Set the Medium Level IL function in the current analysis, giving updatedLow Level IL (SSA) to Medium Level IL instruction and expression mappings.:param new_func: New MLIL function:param llil_ssa_to_mlil_instr_map: Mapping from every LLIL SSA instruction to \ every MLIL instruction:param llil_ssa_to_mlil_expr_map: Mapping from every LLIL SSA expression to \ one or more MLIL expressions (first expression \ will be the primary)"""ifllil_ssa_to_mlil_instr_mapisNoneorllil_ssa_to_mlil_expr_mapisNone:# Build up maps from existing data in the functionllil_ssa_to_mlil_instr_map=new_func._get_llil_ssa_to_mlil_instr_map(True)llil_ssa_to_mlil_expr_map=new_func._get_llil_ssa_to_mlil_expr_map(True)# Number of instructionsinstr_count=0iflen(llil_ssa_to_mlil_instr_map)>0:instr_count=max(llil_ssa_to_mlil_instr_map.keys())+1ffi_instr_map=(ctypes.c_size_t*instr_count)()foriinrange(instr_count):ffi_instr_map[i]=0xfffffffffffffffffor(key,value)inllil_ssa_to_mlil_instr_map.items():ffi_instr_map[key]=value# Number of map entries, not highest indexexpr_count=len(llil_ssa_to_mlil_expr_map)ffi_expr_map=(core.BNExprMapInfo*expr_count)()fori,mapinenumerate(llil_ssa_to_mlil_expr_map):ffi_expr_map[i]=map._to_core_struct()core.BNSetMediumLevelILFunction(self.handle,new_func.handle,ffi_instr_map,instr_count,ffi_expr_map,expr_count) @propertydefhlil(self)->Optional[highlevelil.HighLevelILFunction]:"""HighLevelILFunction used to represent High Level IL (writable)"""result=core.BNAnalysisContextGetHighLevelILFunction(self.handle)ifnotresult:returnNonereturnhighlevelil.HighLevelILFunction(handle=result)@hlil.setterdefhlil(self,value:highlevelil.HighLevelILFunction)->None:core.BNSetHighLevelILFunction(self.handle,value.handle)@propertydefbasic_blocks(self)->'_function.BasicBlockList':"""function.BasicBlockList of BasicBlocks in the current function (writable)"""return_function.BasicBlockList(self.function)@basic_blocks.setterdefbasic_blocks(self,value:'_function.BasicBlockList')->None:core.BNSetBasicBlockList(self.handle,value._blocks,value._count)[docs]definform(self,request:str)->bool:returncore.BNAnalysisContextInform(self.handle,request) [docs]classActivity(object):""":class:`Activity` in Binary Ninja represents an individual analysis or action to be performed on a:class:`BinaryView` or :class:`Function` object.Activities are the fundamental units of execution within a :class:`Workflow`. Each Activity encapsulatesa specific task and defines its own behavior, dependencies, and eligibility criteria. Activities areexecuted in the context of an :class:`AnalysisContext`, which provides access to binary data, analysisstate, and utility functions."""_action_callbacks={}_eligibility_callbacks={}[docs]def__init__(self,configuration:str="",handle:Optional[core.BNActivityHandle]=None,action:Optional[Callable[[Any],None]]=None,eligibility:Optional[Callable[[Any],bool]]=None):ifhandleisNone:action_callback=ctypes.CFUNCTYPE(None,ctypes.c_void_p,ctypes.POINTER(core.BNAnalysisContext))(lambdactxt,ac:self._action(ac))ifeligibility:eligibility_callback=ctypes.CFUNCTYPE(ctypes.c_bool,ctypes.c_void_p,ctypes.POINTER(core.BNActivity),ctypes.POINTER(core.BNAnalysisContext))(lambdactxt,act,ac:eligibility(act,ac))_handle=core.BNCreateActivityWithEligibility(configuration,None,action_callback,eligibility_callback)self.__class__._eligibility_callbacks[len(self.__class__._eligibility_callbacks)]=eligibility_callbackelse:_handle=core.BNCreateActivity(configuration,None,action_callback)self.action=actionself.__class__._action_callbacks[len(self.__class__._action_callbacks)]=action_callbackelse:_handle=handleassert_handleisnotNone,"Activity instantiation failed"self.handle=_handle def_action(self,ac:Any):try:ifself.actionisnotNone:self.action(AnalysisContext(ac))except:log_error_for_exception("Unhandled Python exception in Activity._action")def__del__(self):ifcoreisnotNone:core.BNFreeActivity(self.handle)def__repr__(self):returnf"<{self.__class__.__name__}:{self.name}>"def__str__(self):returnself.namedef__eq__(self,other):ifnotisinstance(other,self.__class__):returnNotImplementedreturnctypes.addressof(self.handle.contents)==ctypes.addressof(other.handle.contents)def__ne__(self,other):ifnotisinstance(other,self.__class__):returnNotImplementedreturnnot(self==other)def__hash__(self):returnhash(ctypes.addressof(self.handle.contents))@propertydefname(self)->str:"""Activity name (read-only)"""returncore.BNActivityGetName(self.handle) class_WorkflowMetaclass(type):@propertydeflist(self)->List['Workflow']:"""List all Workflows (read-only)"""binaryninja._init_plugins()count=ctypes.c_ulonglong()workflows=core.BNGetWorkflowList(count)assertworkflowsisnotNone,"core.BNGetWorkflowList returned None"result=[]try:foriinrange(0,count.value):handle=core.BNNewWorkflowReference(workflows[i])asserthandleisnotNone,"core.BNNewWorkflowReference returned None"result.append(Workflow(handle=handle))returnresultfinally:core.BNFreeWorkflowList(workflows,count.value)def__iter__(self):binaryninja._init_plugins()count=ctypes.c_ulonglong()workflows=core.BNGetWorkflowList(count)assertworkflowsisnotNone,"core.BNGetWorkflowList returned None"try:foriinrange(0,count.value):handle=core.BNNewWorkflowReference(workflows[i])asserthandleisnotNone,"core.BNNewWorkflowReference returned None"yieldWorkflow(handle=handle)finally:core.BNFreeWorkflowList(workflows,count.value)def__getitem__(self,value):binaryninja._init_plugins()workflow=core.BNWorkflowGetOrCreate(str(value))returnWorkflow(handle=workflow)[docs]classWorkflow(metaclass=_WorkflowMetaclass):"""``class Workflow`` in Binary Ninja defines the set of analyses to perform on a binary,including their dependencies and execution order.Workflows are represented as Directed Acyclic Graphs (DAGs), where each node corresponds toan :class:`Activity` (an individual analysis or action). Workflows are used to tailor theanalysis process for :class:`BinaryView` or :class:`Function` objects, providing granularcontrol over analysis tasks at module or function levels.A Workflow starts in an unregistered state, either by creating a new empty Workflow or bycloning an existing one. While unregistered, it is possible to add and remove :class:`Activity`objects, as well as modify the execution strategy. To apply a Workflow to a binary, it must beregistered. Once registered, the Workflow becomes immutable and is available for use.:Example:.. code-block:: python# Define the custom activity configurationconfiguration = json.dumps({"name": "analysis.plugins.xorStringDecoder","title": "XOR String Decoder","description": "This analysis step transforms XOR-encoded strings within the current function.","eligibility": {"auto": {"default": False}}})# Clone the meta function workflow for customizationworkflow = Workflow("core.function.metaAnalysis").clone()# Register a new activityworkflow.register_activity(Activity(configuration,action=lambda analysis_context: log_warn(f"Decoder running for function: {hex(analysis_context.function.start)}"# Insert decoder logic here :P)))# Insert the new activity before the "generateHighLevelIL" stepworkflow.insert("core.function.generateHighLevelIL", ["analysis.plugins.xorStringDecoder"])# Register the modified meta function workflowworkflow.register()"""[docs]def__init__(self,name:str="",handle:core.BNWorkflowHandle=None,query_registry:bool=True,object_handle:Union[core.BNFunctionHandle,core.BNBinaryViewHandle]=None):ifhandleisNone:ifquery_registry:_handle=core.BNWorkflowGetOrCreate(str(name))else:_handle=core.BNCreateWorkflow(name)else:_handle=handleassert_handleisnotNoneself.handle=_handleself._name=core.BNGetWorkflowName(self.handle)self._machine=Noneifobject_handleisnotNone:self._machine=WorkflowMachine(object_handle) def__del__(self):ifcoreisnotNone:core.BNFreeWorkflow(self.handle)def__len__(self):returnint(core.BNWorkflowSize(self.handle))def__repr__(self):ifcore.BNWorkflowIsRegistered(self.handle):returnf"<Workflow:{self.name}>"else:returnf"<Workflow [Unregistered]:{self.name}>"def__str__(self):returnself.name@propertydefname(self)->str:returnself._namedef__eq__(self,other):ifnotisinstance(other,self.__class__):returnNotImplementedreturnctypes.addressof(self.handle.contents)==ctypes.addressof(other.handle.contents)def__ne__(self,other):ifnotisinstance(other,self.__class__):returnNotImplementedreturnnot(self==other)def__hash__(self):returnhash(ctypes.addressof(self.handle.contents))[docs]defregister(self,configuration:str="")->bool:"""``register`` Register this Workflow, making it immutable and available for use.:param str configuration: a JSON representation of the workflow configuration:return: True on Success, False otherwise:rtype: bool"""returncore.BNRegisterWorkflow(self.handle,str(configuration)) [docs]defclone(self,name:str=None,activity:ActivityType="")->"Workflow":"""``clone`` Clone a new Workflow, copying all Activities and the execution strategy.:param str name: if specified, name the new Workflow, otherwise the name is copied from the original:param str activity: if specified, perform the clone operation using ``activity`` as the root:return: a new Workflow:rtype: Workflow"""ifnameisNone:name=""workflow=core.BNWorkflowClone(self.handle,str(name),str(activity))returnWorkflow(handle=workflow) [docs]defregister_activity(self,activity:Activity,subactivities:List[ActivityType]=[])->Optional[Activity]:"""``register_activity`` Register an Activity with this Workflow.:param Activity activity: the Activity to register:param list[str] subactivities: the list of Activities to assign:return: the registered Activity:rtype: Activity"""ifactivityisNone:returnNoneinput_list=(ctypes.c_char_p*len(subactivities))()foriinrange(0,len(subactivities)):input_list[i]=str(subactivities[i]).encode('charmap')handle=core.BNWorkflowRegisterActivity(self.handle,activity.handle,input_list,len(subactivities))ifhandleisNone:returnNonereturnactivity [docs]defcontains(self,activity:ActivityType)->bool:"""``contains`` Determine if an Activity exists in this Workflow.:param ActivityType activity: the Activity name:return: True if the Activity exists, False otherwise:rtype: bool"""returncore.BNWorkflowContains(self.handle,str(activity)) [docs]defconfiguration(self,activity:ActivityType="")->str:"""``configuration`` Retrieve the configuration as an adjacency list in JSON for the Workflow, or if specified just for the given ``activity``.:param ActivityType activity: if specified, return the configuration for the ``activity``:return: an adjacency list representation of the configuration in JSON:rtype: str"""returncore.BNWorkflowGetConfiguration(self.handle,str(activity)) @propertydefregistered(self)->bool:"""``registered`` Whether this Workflow is registered or not. A Workflow becomes immutable once it is registered.:type: bool"""returncore.BNWorkflowIsRegistered(self.handle)[docs]defget_activity(self,activity:ActivityType)->Optional[Activity]:"""``get_activity`` Retrieve the Activity object for the specified ``activity``.:param str activity: the Activity name:return: the Activity object:rtype: Activity"""handle=core.BNWorkflowGetActivity(self.handle,str(activity))ifhandleisNone:returnNonereturnActivity(str(activity),handle) [docs]defactivity_roots(self,activity:ActivityType="")->List[str]:"""``activity_roots`` Retrieve the list of activity roots for the Workflow, or if specified just for the given ``activity``.:param str activity: if specified, return the roots for the ``activity``:return: list of root activity names:rtype: list[str]"""length=ctypes.c_ulonglong()result=core.BNWorkflowGetActivityRoots(self.handle,str(activity),ctypes.byref(length))assertresultisnotNone,"core.BNWorkflowGetActivityRoots returned None"out_list=[]try:foriinrange(length.value):out_list.append(result[i].decode('utf-8'))returnout_listfinally:core.BNFreeStringList(result,length.value) [docs]defsubactivities(self,activity:ActivityType="",immediate:bool=True)->List[str]:"""``subactivities`` Retrieve the list of all activities, or optionally a filtered list.:param str activity: if specified, return the direct children and optionally the descendants of the ``activity`` (includes ``activity``):param bool immediate: whether to include only direct children of ``activity`` or all descendants:return: list of activity names:rtype: list[str]"""length=ctypes.c_ulonglong()result=core.BNWorkflowGetSubactivities(self.handle,str(activity),immediate,ctypes.byref(length))assertresultisnotNone,"core.BNWorkflowGetSubactivities returned None"out_list=[]try:foriinrange(length.value):out_list.append(result[i].decode('utf-8'))returnout_listfinally:core.BNFreeStringList(result,length.value) [docs]defassign_subactivities(self,activity:Activity,activities:Union[List[str],str])->bool:"""``assign_subactivities`` Assign the list of ``activities`` as the new set of children for the specified ``activity``.:param str activity: the Activity node to assign children:param list[str] activities: the list of Activities to assign:return: True on success, False otherwise:rtype: bool"""ifisinstance(activities,str):activities=[activities]input_list=(ctypes.c_char_p*len(activities))()foriinrange(0,len(activities)):input_list[i]=str(activities[i]).encode('charmap')returncore.BNWorkflowAssignSubactivities(self.handle,str(activity),input_list,len(activities)) [docs]defclear(self)->bool:"""``clear`` Remove all Activity nodes from this Workflow.:return: True on success, False otherwise:rtype: bool"""returncore.BNWorkflowClear(self.handle) [docs]definsert(self,activity:ActivityType,activities:Union[List[str],str])->bool:"""``insert`` Insert the list of ``activities`` before the specified ``activity`` and at the same level.:param str activity: the Activity node for which to insert ``activities`` before:param list[str] activities: the list of Activities to insert:return: True on success, False otherwise:rtype: bool"""ifisinstance(activities,str):activities=[activities]input_list=(ctypes.c_char_p*len(activities))()foriinrange(0,len(activities)):input_list[i]=str(activities[i]).encode('charmap')returncore.BNWorkflowInsert(self.handle,str(activity),input_list,len(activities)) [docs]definsert_after(self,activity:ActivityType,activities:Union[List[str],str])->bool:"""``insert_after`` Insert the list of ``activities`` after the specified ``activity`` and at the same level.:param str activity: the Activity node for which to insert ``activities`` after:param list[str] activities: the list of Activities to insert:return: True on success, False otherwise:rtype: bool"""ifisinstance(activities,str):activities=[activities]input_list=(ctypes.c_char_p*len(activities))()foriinrange(0,len(activities)):input_list[i]=str(activities[i]).encode('charmap')returncore.BNWorkflowInsertAfter(self.handle,str(activity),input_list,len(activities)) [docs]defremove(self,activity:ActivityType)->bool:"""``remove`` Remove the specified ``activity``.:param str activity: the Activity to remove:return: True on success, False otherwise:rtype: bool"""returncore.BNWorkflowRemove(self.handle,str(activity)) [docs]defreplace(self,activity:ActivityType,new_activity:str)->bool:"""``replace`` Replace the specified ``activity``.:param str activity: the Activity to replace:param str new_activity: the replacement Activity:return: True on success, False otherwise:rtype: bool"""returncore.BNWorkflowReplace(self.handle,str(activity),str(new_activity)) [docs]defgraph(self,activity:ActivityType="",sequential:bool=False,show:bool=True)->Optional[FlowGraph]:"""``graph`` Generate a FlowGraph object for the current Workflow and optionally show it in the UI.:param str activity: if specified, generate the Flowgraph using ``activity`` as the root:param bool sequential: whether to generate a **Composite** or **Sequential** style graph:param bool show: whether to show the graph in the UI or not:return: FlowGraph object on success, None on failure:rtype: FlowGraph"""graph=core.BNWorkflowGetGraph(self.handle,str(activity),sequential)ifnotgraph:returnNonegraph=CoreFlowGraph(graph)ifshow:core.BNShowGraphReport(None,f'{self.name} <{activity}>'ifactivityelseself.name,graph.handle)returngraph [docs]defshow_topology(self)->None:"""``show_topology`` Show the Workflow topology in the UI.:rtype: None"""core.BNWorkflowShowReport(self.handle,"topology") [docs]defeligibility_settings(self)->List[str]:"""``eligibility_settings`` Retrieve the list of eligibility settings for the Workflow.:return: list of eligibility settings:rtype: list[str]"""length=ctypes.c_ulonglong()result=core.BNWorkflowGetEligibilitySettings(self.handle,ctypes.byref(length))assertresultisnotNone,"core.BNWorkflowGetEligibilitySettings returned None"out_list=[]try:foriinrange(length.value):out_list.append(result[i].decode('utf-8'))returnout_listfinally:core.BNFreeStringList(result,length.value) @propertydefmachine(self):ifself._machineisnotNone:returnself._machineelse:raiseAttributeError("Machine does not exist.") [docs]classWorkflowMachine:[docs]def__init__(self,handle:Union[core.BNFunctionHandle,core.BNBinaryViewHandle]=None):ifisinstance(handle,core.BNFunctionHandle):self.is_function_machine=Trueelifisinstance(handle,core.BNBinaryViewHandle):self.is_function_machine=Falseelse:raiseValueError("WorkflowMachine requires a Function or BinaryView handle!")self.handle=handle [docs]defshow_metrics(self)->None:ifself.is_function_machine:core.BNShowWorkflowReportForFunction(self.handle,"metrics")else:core.BNShowWorkflowReportForBinaryView(self.handle,"metrics") [docs]defshow_topology(self)->None:ifself.is_function_machine:core.BNShowWorkflowReportForFunction(self.handle,"topology")else:core.BNShowWorkflowReportForBinaryView(self.handle,"topology") [docs]defshow_trace(self)->None:ifself.is_function_machine:core.BNShowWorkflowReportForFunction(self.handle,"trace")else:core.BNShowWorkflowReportForBinaryView(self.handle,"trace") [docs]deflog(self,enable:bool=True,is_global:bool=False):request=json.dumps({"command":"log","enable":enable,"global":is_global})ifself.is_function_machine:returnjson.loads(core.BNPostWorkflowRequestForFunction(self.handle,request))else:returnjson.loads(core.BNPostWorkflowRequestForBinaryView(self.handle,request)) [docs]defmetrics(self,enable:bool=True,is_global:bool=False):request=json.dumps({"command":"metrics","enable":enable,"global":is_global})ifself.is_function_machine:returnjson.loads(core.BNPostWorkflowRequestForFunction(self.handle,request))else:returnjson.loads(core.BNPostWorkflowRequestForBinaryView(self.handle,request)) [docs]defdump(self):request=json.dumps({"command":"dump"})ifself.is_function_machine:returnjson.loads(core.BNPostWorkflowRequestForFunction(self.handle,request))else:returnjson.loads(core.BNPostWorkflowRequestForBinaryView(self.handle,request)) [docs]defconfigure(self,advanced:bool=True,incremental:bool=False):ifself.is_function_machine:request=json.dumps({"command":"configure","advanced":advanced,"incremental":incremental})returnjson.loads(core.BNPostWorkflowRequestForFunction(self.handle,request))else:request=json.dumps({"command":"configure"})returnjson.loads(core.BNPostWorkflowRequestForBinaryView(self.handle,request)) [docs]defresume(self,advanced:bool=True,incremental:bool=False):ifself.is_function_machine:request=json.dumps({"command":"resume","advanced":advanced,"incremental":incremental})returnjson.loads(core.BNPostWorkflowRequestForFunction(self.handle,request))else:request=json.dumps({"command":"resume"})returnjson.loads(core.BNPostWorkflowRequestForBinaryView(self.handle,request)) [docs]defrun(self,advanced:bool=True,incremental:bool=False):ifself.is_function_machine:request=json.dumps({"command":"run","advanced":advanced,"incremental":incremental})returnjson.loads(core.BNPostWorkflowRequestForFunction(self.handle,request))else:request=json.dumps({"command":"run"})returnjson.loads(core.BNPostWorkflowRequestForBinaryView(self.handle,request)) [docs]defhalt(self):request=json.dumps({"command":"halt"})ifself.is_function_machine:returnjson.loads(core.BNPostWorkflowRequestForFunction(self.handle,request))else:returnjson.loads(core.BNPostWorkflowRequestForBinaryView(self.handle,request)) [docs]defreset(self):request=json.dumps({"command":"reset"})ifself.is_function_machine:returnjson.loads(core.BNPostWorkflowRequestForFunction(self.handle,request))else:returnjson.loads(core.BNPostWorkflowRequestForBinaryView(self.handle,request)) [docs]defenable(self):request=json.dumps({"command":"enable"})ifself.is_function_machine:returnjson.loads(core.BNPostWorkflowRequestForFunction(self.handle,request))else:returnjson.loads(core.BNPostWorkflowRequestForBinaryView(self.handle,request)) [docs]defdisable(self):request=json.dumps({"command":"disable"})ifself.is_function_machine:returnjson.loads(core.BNPostWorkflowRequestForFunction(self.handle,request))else:returnjson.loads(core.BNPostWorkflowRequestForBinaryView(self.handle,request)) [docs]defstep(self):request=json.dumps({"command":"step"})ifself.is_function_machine:returnjson.loads(core.BNPostWorkflowRequestForFunction(self.handle,request))else:returnjson.loads(core.BNPostWorkflowRequestForBinaryView(self.handle,request)) [docs]defbreakpoint_delete(self,activities):ifisinstance(activities,str):activities=[activities]request=json.dumps({"command":"breakpoint","action":"delete","activities":activities})ifself.is_function_machine:returnjson.loads(core.BNPostWorkflowRequestForFunction(self.handle,request))else:returnjson.loads(core.BNPostWorkflowRequestForBinaryView(self.handle,request)) [docs]defbreakpoint_query(self):request=json.dumps({"command":"breakpoint","action":"query"})ifself.is_function_machine:returnjson.loads(core.BNPostWorkflowRequestForFunction(self.handle,request))else:returnjson.loads(core.BNPostWorkflowRequestForBinaryView(self.handle,request)) [docs]defbreakpoint_set(self,activities):ifisinstance(activities,str):activities=[activities]request=json.dumps({"command":"breakpoint","action":"set","activities":activities})ifself.is_function_machine:returnjson.loads(core.BNPostWorkflowRequestForFunction(self.handle,request))else:returnjson.loads(core.BNPostWorkflowRequestForBinaryView(self.handle,request)) [docs]defstatus(self):request=json.dumps({"command":"status"})ifself.is_function_machine:returnjson.loads(core.BNPostWorkflowRequestForFunction(self.handle,request))else:returnjson.loads(core.BNPostWorkflowRequestForBinaryView(self.handle,request)) [docs]defoverride_clear(self,activity):request=json.dumps({"command":"override","action":"clear","activity":activity})ifself.is_function_machine:returnjson.loads(core.BNPostWorkflowRequestForFunction(self.handle,request))else:returnjson.loads(core.BNPostWorkflowRequestForBinaryView(self.handle,request)) [docs]defoverride_query(self,activity=None):ifactivityisNone:activity=""request=json.dumps({"command":"override","action":"query","activity":activity})ifself.is_function_machine:returnjson.loads(core.BNPostWorkflowRequestForFunction(self.handle,request))else:returnjson.loads(core.BNPostWorkflowRequestForBinaryView(self.handle,request)) [docs]defoverride_set(self,activity,enable):request=json.dumps({"command":"override","action":"set","activity":activity,"enable":enable})ifself.is_function_machine:returnjson.loads(core.BNPostWorkflowRequestForFunction(self.handle,request))else:returnjson.loads(core.BNPostWorkflowRequestForBinaryView(self.handle,request)) [docs]defdelay(self,duration):request=json.dumps({"command":"delay","duration":duration})ifself.is_function_machine:returnjson.loads(core.BNPostWorkflowRequestForFunction(self.handle,request))else:returnjson.loads(core.BNPostWorkflowRequestForBinaryView(self.handle,request)) [docs]defrequest(self,request):ifself.is_function_machine:returnjson.loads(core.BNPostWorkflowRequestForFunction(self.handle,request))else:returnjson.loads(core.BNPostWorkflowRequestForBinaryView(self.handle,request)) [docs]defcli(self):WorkflowMachineCLI(self).cmdloop() [docs]classWorkflowMachineCLI(cmd.Cmd):intro="Welcome to the Workflow Orchestrator. Type 'help' to list available commands."prompt="(dechora) "aliases={"l":"log","m":"metrics","d":"dump","r":"run","c":"resume","h":"halt","s":"step","b":"breakpoint","o":"override","q":"quit"}[docs]def__init__(self,machine:WorkflowMachine):super().__init__()self.machine=machine [docs]defdo_log(self,line):"""Control workflow logging."""parser=argparse.ArgumentParser(exit_on_error=False)parser.add_argument("--enable",action="store_true",default=None,help="Enable logging")parser.add_argument("--disable",action="store_true",default=False,help="Disable logging")parser.add_argument("--global",action="store_true",default=False,dest="is_global",help="Enable or disable logging globally")try:args=parser.parse_args(line.split())ifargs.enableisNone:enable=notargs.disableelse:enable=args.enablestatus=self.machine.log(enable=enable,is_global=args.is_global)print(json.dumps(status,indent=4))exceptargparse.ArgumentErrorase:print("ArgumentError:",e)exceptSystemExit:pass [docs]defdo_metrics(self,line):"""Control workflow metrics collection."""parser=argparse.ArgumentParser(exit_on_error=False)parser.add_argument("--enable",action="store_true",default=None,help="Enable logging")parser.add_argument("--disable",action="store_true",default=False,help="Disable logging")parser.add_argument("--global",action="store_true",default=False,dest="is_global",help="Enable or disable logging globally")try:args=parser.parse_args(line.split())ifargs.enableisNone:enable=notargs.disableelse:enable=args.enablestatus=self.machine.metrics(enable=enable,is_global=args.is_global)print(json.dumps(status,indent=4))exceptargparse.ArgumentErrorase:print("ArgumentError:",e)exceptSystemExit:pass [docs]defdo_dump(self,line):"""Dump metrics from the workflow system."""status=self.machine.dump()accepted=status.get('commandStatus',{}).get('accepted',False)ifaccepted:response=status.pop("response",None)print(json.dumps(status,indent=4))ifacceptedandresponse:print(json.dumps(response,indent=None)) [docs]defdo_configure(self,line):"""Configure the workflow machine."""parser=argparse.ArgumentParser(exit_on_error=False)parser.add_argument("--advanced",action="store_true",default=True,help="Enable advanced configuration (default: True)")parser.add_argument("--incremental",action="store_true",default=None,help="Enable incremental configuration (default: True if provided, False if omitted)")try:args=parser.parse_args(line.split())ifargs.incrementalisNone:args.incremental=Falsestatus=self.machine.configure(advanced=args.advanced,incremental=args.incremental)print(json.dumps(status,indent=4))exceptargparse.ArgumentErrorase:print("ArgumentError:",e)exceptSystemExit:pass [docs]defdo_resume(self,line):"""Continue/Resume execution of a workflow."""status=self.machine.resume()print(json.dumps(status,indent=4)) [docs]defdo_run(self,line):"""Run the workflow machine and generate a default configuration if the workflow is not configured."""status=self.machine.run()print(json.dumps(status,indent=4)) [docs]defdo_halt(self,line):"""Halt the workflow machine."""status=self.machine.halt()print(json.dumps(status,indent=4)) [docs]defdo_reset(self,line):"""Reset the workflow machine."""status=self.machine.reset()print(json.dumps(status,indent=4)) [docs]defdo_enable(self,line):"""Enable the workflow machine."""status=self.machine.enable()print(json.dumps(status,indent=4)) [docs]defdo_disable(self,line):"""Disable the workflow machine."""status=self.machine.disable()print(json.dumps(status,indent=4)) [docs]defdo_step(self,line):"""Step to the next activity in the workflow machine."""status=self.machine.step()print(json.dumps(status,indent=4)) [docs]defdo_breakpoint(self,line):"""Handle breakpoint commands."""parser=argparse.ArgumentParser(exit_on_error=False)parser.add_argument("action",choices=["delete","set","query"],help="Action to perform: delete, set, or, query.")parser.add_argument("activities",type=str,nargs="*",help="The breakpoint(s) to set/delete.")try:args=parser.parse_args(line.split())args.activities=[activity[1:-1]ifactivity.startswith('"')andactivity.endswith('"')elseactivityforactivityinargs.activities]ifargs.action=="delete":status=self.machine.breakpoint_delete(args.activities)print(json.dumps(status,indent=4))elifargs.action=="query":status=self.machine.breakpoint_query()accepted=status.get('commandStatus',{}).get('accepted',False)ifaccepted:response=status.pop("response",None)print(json.dumps(status,indent=4))ifacceptedandresponse:print(json.dumps(response,indent=None))elifargs.action=="set":status=self.machine.breakpoint_set(args.activities)print(json.dumps(status,indent=4))exceptargparse.ArgumentErrorase:print("ArgumentError:",e)exceptSystemExit:pass [docs]defdo_status(self,line):"""Retrieve the current machine status."""status=self.machine.status()print(json.dumps(status,indent=4)) [docs]defdo_override(self,line):"""Handle override commands."""parser=argparse.ArgumentParser(exit_on_error=False)parser.add_argument("action",choices=["clear","set","query"],help="Action to perform: clear, set, or, query.")parser.add_argument("activity",type=str,nargs="?",default="",help="The activity to set/clear.")parser.add_argument("--enable",action="store_true",default=None,help="Enable the specified activity.")parser.add_argument("--disable",action="store_true",default=False,help="Disable the specified activity.")try:args=parser.parse_args(line.split())args.activity=args.activity[1:-1]ifargs.activityandargs.activity.startswith('"')andargs.activity.endswith('"')elseargs.activityifargs.action=="clear":status=self.machine.override_clear(args.activity)print(json.dumps(status,indent=4))elifargs.action=="query":status=self.machine.override_query(args.activity)accepted=status.get('commandStatus',{}).get('accepted',False)ifaccepted:response=status.pop("response",None)print(json.dumps(status,indent=4))ifacceptedandresponse:print(json.dumps(response,indent=None))elifargs.action=="set":ifargs.enableisNone:enable=notargs.disableelse:enable=args.enablestatus=self.machine.override_set(args.activity,enable)print(json.dumps(status,indent=4))exceptargparse.ArgumentErrorase:print("ArgumentError:",e)exceptSystemExit:pass [docs]defdo_quit(self,line):"""Exit the WorkflowMachine CLI."""print("Exiting WorkflowMachine CLI...")returnTrue [docs]defprecmd(self,line):words=line.split()ifwordsandwords[0]inself.aliases:words[0]=self.aliases[words[0]]line=' '.join(words)returnline [docs]defhelp(self,arg):ifarginself.aliases:arg=self.aliases[arg]super().help(arg)