Expand Up @@ -9,18 +9,19 @@ from pgml.exceptions import PgMLException from pgml.sql import q class Project(object): """ Use projects to refine multiple models of a particular dataset on a specific objective. Attributes: id (int): a unique identifier name (str): a human friendly unique identifier objective (str): the purpose of this project created_at (Timestamp): when this project was created updated_at (Timestamp): when this project was last updated """ _cache = {} def __init__(self): Expand All @@ -36,11 +37,14 @@ def find(cls, id: int): Returns: Project or None: instantiated from the database if found """ result = plpy.execute(f""" result = plpy.execute( f""" SELECT * FROM pgml.projects WHERE id = {q(id)} """, 1) """, 1, ) if len(result) == 0: return None Expand All @@ -53,25 +57,28 @@ def find(cls, id: int): @classmethod def find_by_name(cls, name: str): """ Get a Project from the database by name. Get a Project from the database by name. This is the prefered API to retrieve projects, and they are cached by name to avoid needing to go to he database on every usage. Args: name (str): the project name Returns: Project or None: instantiated from the database if found """ if name in cls._cache: return cls._cache[name] result = plpy.execute(f""" result = plpy.execute( f""" SELECT * FROM pgml.projects WHERE name = {q(name)} """, 1) if len(result)== 0: """, 1, ) if len(result) == 0: return None project = Project() Expand All @@ -84,7 +91,7 @@ def find_by_name(cls, name: str): def create(cls, name: str, objective: str): """ Create a Project and save it to the database. Args: name (str): a human friendly identifier objective (str): valid values are ["regression", "classification"]. Expand All @@ -93,11 +100,16 @@ def create(cls, name: str, objective: str): """ project = Project() project.__dict__ = dict(plpy.execute(f""" project.__dict__ = dict( plpy.execute( f""" INSERT INTO pgml.projects (name, objective) VALUES ({q(name)}, {q(objective)}) RETURNING * """, 1)[0]) """, 1, )[0] ) project.__init__() cls._cache[name] = project return project Expand All @@ -112,10 +124,11 @@ def deployed_model(self): self._deployed_model = Model.find_deployed(self.id) return self._deployed_model class Snapshot(object): """ Snapshots capture a set of training & test data for repeatability. Attributes: id (int): a unique identifier relation_name (str): the name of the table or view to snapshot Expand All @@ -126,11 +139,18 @@ class Snapshot(object): created_at (Timestamp): when this snapshot was created updated_at (Timestamp): when this snapshot was last updated """ @classmethod def create(cls, relation_name: str, y_column_name: str, test_size: float or int, test_sampling: str): def create( cls, relation_name: str, y_column_name: str, test_size: float or int, test_sampling: str, ): """ Create a Snapshot and save it to the database. Create a Snapshot and save it to the database. This creates both a metadata record in the snapshots table, as well as creating a new table that holds a snapshot of all the data currently present in the relation so that training runs may be repeated, or further analysis may be conducted against the input. Expand All @@ -145,32 +165,46 @@ def create(cls, relation_name: str, y_column_name: str, test_size: float or int, """ snapshot = Snapshot() snapshot.__dict__ = dict(plpy.execute(f""" snapshot.__dict__ = dict( plpy.execute( f""" INSERT INTO pgml.snapshots (relation_name, y_column_name, test_size, test_sampling, status) VALUES ({q(relation_name)}, {q(y_column_name)}, {q(test_size)}, {q(test_sampling)}, 'new') RETURNING * """, 1)[0]) plpy.execute(f""" """, 1, )[0] ) plpy.execute( f""" CREATE TABLE pgml."snapshot_{snapshot.id}" AS SELECT * FROM "{snapshot.relation_name}"; """) snapshot.__dict__ = dict(plpy.execute(f""" """ ) snapshot.__dict__ = dict( plpy.execute( f""" UPDATE pgml.snapshots SET status = 'created' WHERE id = {q(snapshot.id)} RETURNING * """, 1)[0]) """, 1, )[0] ) return snapshot def data(self): """ Returns: list, list, list, list: All rows from the snapshot split into X_train, X_test, y_train, y_test sets. """ data = plpy.execute(f""" data = plpy.execute( f""" SELECT * FROM pgml."snapshot_{self.id}" """) """ ) print(data) # Sanity check the data Expand Down Expand Up @@ -203,10 +237,10 @@ def data(self): y.append(y_) # Split into training and test sets if self.test_sampling ==' random' : if self.test_sampling ==" random" : return train_test_split(X, y, test_size=self.test_size, random_state=0) else: if self.test_sampling ==' first' : if self.test_sampling ==" first" : X.reverse() y.reverse() if isinstance(split, float): Expand All @@ -216,9 +250,9 @@ def data(self): split = int(self.test_size * X.len()) return X[:split], X[split:], y[:split], y[split:] # TODO normalize and clean data class Model(object): """Models use an algorithm on a snapshot of data to record the parameters learned. Expand All @@ -234,23 +268,26 @@ class Model(object): pickle (bytes): the serialized version of the model parameters algorithm: the in memory version of the model parameters that can make predictions """ @classmethod def create(cls, project: Project, snapshot: Snapshot, algorithm_name: str): """ Create a Model and save it to the database. Args: project (str): snapshot (str): project (str): snapshot (str): algorithm_name (str): Returns: Model: instantiated from the database """ result = plpy.execute(f""" result = plpy.execute( f""" INSERT INTO pgml.models (project_id, snapshot_id, algorithm_name, status) VALUES ({q(project.id)}, {q(snapshot.id)}, {q(algorithm_name)}, 'new') RETURNING * """) """ ) model = Model() model.__dict__ = dict(result[0]) model.__init__() Expand All @@ -265,15 +302,17 @@ def find_deployed(cls, project_id: int): Returns: Model: that should currently be used for predictions of the project """ result = plpy.execute(f""" result = plpy.execute( f""" SELECT models.* FROM pgml.models JOIN pgml.deployments ON deployments.model_id = models.id AND deployments.project_id = {q(project_id)} ORDER by deployments.created_at DESC LIMIT 1 """) """ ) if len(result) == 0: return None Expand Down Expand Up @@ -303,19 +342,19 @@ def algorithm(self): self._algorithm = pickle.loads(self.pickle) else: self._algorithm = { ' linear_regression' : LinearRegression,' random_forest_regression' : RandomForestRegressor,' random_forest_classification' : RandomForestClassifier }[self.algorithm_name +'_' + self.project.objective]() " linear_regression" : LinearRegression," random_forest_regression" : RandomForestRegressor," random_forest_classification" : RandomForestClassifier, }[self.algorithm_name +"_" + self.project.objective]() return self._algorithm def fit(self, snapshot: Snapshot): """ Learns the parameters of this model and records them in the database. Learns the parameters of this model and records them in the database. Args: snapshot (Snapshot): dataset used to train this model Args: snapshot (Snapshot): dataset used to train this model """ X_train, X_test, y_train, y_test = snapshot.data() Expand All @@ -328,22 +367,28 @@ def fit(self, snapshot: Snapshot): r2 = r2_score(y_test, y_pred) # Save the model self.__dict__ = dict(plpy.execute(f""" self.__dict__ = dict( plpy.execute( f""" UPDATE pgml.models SET pickle = '\\x{pickle.dumps(self.algorithm).hex()}', status = 'successful', mean_squared_error = {q(msq)}, r2_score = {q(r2)} WHERE id = {q(self.id)} RETURNING * """)[0]) """ )[0] ) def deploy(self): """Promote this model to the active version for the project that will be used for predictions""" plpy.execute(f""" plpy.execute( f""" INSERT INTO pgml.deployments (project_id, model_id) VALUES ({q(self.project_id)}, {q(self.id)}) """) """ ) def predict(self, data: list): """Use the model for a set of features. Expand All @@ -358,12 +403,12 @@ def predict(self, data: list): def train( project_name: str, project_name: str, objective: str, relation_name: str, y_column_name: str, relation_name: str, y_column_name: str, test_size: float or int = 0.1, test_sampling: str = "random" test_sampling: str = "random", ): """Create a regression model from a table or view filled with training data. Expand All @@ -390,5 +435,5 @@ def train( model.fit(snapshot) if best_error is None or model.mean_squared_error < best_error: best_error = model.mean_squared_error best_model = model best_model = model best_model.deploy()