
ROR is a pipelining framework for Python which makes it easier to define complex ML anddata-processing stages.
To get started with creating your first pipeline, you can base it on this example whichdefines a simple GMM pipeline. Firstly, we import the relevant packages.
importmatplotlib.pyplotaspltfromsklearnimportdatasetsfromsklearn.mixtureimportGaussianMixturefromsklearn.decompositionimportPCAfromsklearn.preprocessingimportStandardScalerfromdataclassesimportdataclassfromtypingimportTuplefromror.schemasimportBaseSchemafromror.schemas.fieldsimportfield_perishable,field_persistancefromror.stagesimportIInitStage,ITerminalStage,IForwardStagefromror.controlersimportBaseController
Then we can define the schemas which will determine the structure of the data communicated between the different stages.
@dataclassclassInitStageInput(BaseSchema):data:object=field_perishable()@dataclassclassInitStageOutput(BaseSchema):X_pca:object=field_persistance()X_std:object=field_perishable()model:object=field_persistance()@dataclassclassInferenceStageOutput(BaseSchema):X_pca:object=field_perishable()model:object=field_perishable()labels:object=field_persistance()@dataclassclassVisStageOutput(BaseSchema):labels:object=field_persistance()
We can then define the logical stages which will be utilizing these schemas as inputand output between stages.
classVisStage(ITerminalStage[InferenceStageOutput,VisStageOutput]):defcompute(self)->None:# Visualize the clustersplt.figure(figsize=(8,6))colors= ['r','g','b']foriinrange(3):plt.scatter(self.input.X_pca[self.input.labels==i,0],self.input.X_pca[self.input.labels==i,1],color=colors[i],label=f'Cluster{i+1}' )plt.title('Gaussian Mixture Model Clustering')plt.xlabel('Principal Component 1')plt.ylabel('Principal Component 2')plt.legend()plt.show()self._output=self.input.get_carry()defget_output(self)->VisStageOutput:returnVisStageOutput(**self._output)classInferenceStage(IForwardStage[InitStageOutput,InferenceStageOutput,VisStage]):defcompute(self)->None:# Fit Guassian mixture to datasetself.input.model.fit(self.input.X_std)# Predict the labelslabels=self.input.model.predict(self.input.X_std)self._output= {"labels":labels,**self.input.get_carry() }defget_output(self)->Tuple[VisStage,InferenceStageOutput]:returnVisStage(),InferenceStageOutput(**self._output)classInitStage(IInitStage[InitStageInput,InitStageOutput,InferenceStage]):defcompute(self)->None:# Load the datasetX=self.input.data.data# Standardize the featuresscaler=StandardScaler()X_std=scaler.fit_transform(X)# Apply PCA to reduce dimensionality for visualizationpca=PCA(n_components=2)X_pca=pca.fit_transform(X_std)# Fit a Gaussian Mixture Modelgmm=GaussianMixture(n_components=3,random_state=42)self._output= {"X_pca":X_pca,"X_std":X_std,"model":gmm,**self.input.get_carry() }defget_output(self)->Tuple[InferenceStage,InitStageOutput]:returnInferenceStage(),InitStageOutput(**self._output)
Then we can define a simple controller which will be given an instance of the init stage and the input data to be passed through the pipeline.
iris=datasets.load_iris()input_data=InitStageInput(data=iris)controller=BaseController(init_data=input_data,init_stage=InitStage)controller.discover()# Shows a table of the connected stagesoutput,run_id=controller.start()
And that's it! With this you can define logical processing stages for your ML inferencepipelines whilst keeping a high level of seperation.