Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

DataSource SDK for WISE-PaaS AFS to analytics developer.

License

NotificationsYou must be signed in to change notification settings

WISE-AIFS/afs2-datasource

Repository files navigation

The AFS2-DataSource SDK package allows developers to easily access PostgreSQL, MongoDB, InfluxDB, S3 and APM.

Installation

Support Python version 3.6 or later

pip install afs2-datasource

Development

pip install -e .

Notice

AFS2-DataSource SDK usesasyncio package, and Jupyter kernel is also usingasyncio and running an event loop, but these loops can't be nested.(jupyter/notebook#3397)

If using AFS2-DataSource SDK in Jupyter Notebook, please add the following codes to resolve this issue:

!pipinstallnest_asyncioimportnest_asyncionest_asyncio.apply()

API

DBManager


Init DBManager

With Database Config

Import database config via Python.

fromafs2datasourceimportDBManager,constant# For MySQLmanager=DBManager(db_type=constant.DB_TYPE['MYSQL'],username=username,password=password,host=host,port=port,database=database,querySql="select {field} from {table}")# For SQLServermanager=DBManager(db_type=constant.DB_TYPE['SQLSERVER'],username=username,password=password,host=host,port=port,database=database,querySql="select {field} from {table}"# only support `SELECT`)# For PostgreSQLmanager=DBManager(db_type=constant.DB_TYPE['POSTGRES'],username=username,password=password,host=host,port=port,database=database,querySql="select {field} from {schema}.{table}")# For MongoDBmanager=DBManager(db_type=constant.DB_TYPE['MONGODB'],username=username,password=password,host=host,port=port,database=database,collection=collection,querySql="{}")# For InfluxDBmanager=DBManager(db_type=constant.DB_TYPE['INFLUXDB'],username=username,password=password,host=host,port=port,database=database,querySql="select {field_key} from {measurement_name}")# For Oracle Databasemanager=DBManagerdb_type=constant.DB_TYPE['ORACLEDB'],username=username,password=password,host=host,port=port,database=database,querySql="select {field_key} from {measurement_name}"# only support `SELECT`)# For S3manager=DBManager(db_type=constant.DB_TYPE['S3'],endpoint=endpoint,access_key=access_key,secret_key=secret_key,is_verify=False,buckets=[{'bucket':'bucket_name','blobs': {'files': ['file_name'],'folders': ['folder_name']    }  }])# For AWS S3manager=DBManager(db_type=constant.DB_TYPE['AWS'],access_key=access_key,secret_key=secret_key,buckets=[{'bucket':'bucket_name','blobs': {'files': ['file_name'],'folders': ['folder_name']    }  }])# For APMmanager=DBManager(db_type=constant.DB_TYPE['APM'],username=username,# sso usernamepassword=password,# sso passwordapmUrl=apmUrl,apm_config=[{'name':name# dataset name'machines': [{'id':machine_id# node_id in APM    }],'parameters': [parameter1,# parameter in APMparameter2    ]  }],mongouri=mongouri,# timeRange or timeLasttimeRange=[{'start':start_ts,'end':end_ts}],timeLast={'lastDays':lastDay,'lastHours':lastHour,'lastMins':lastMin})# For Azure Blobmanager=DBManager(db_type=constant.DB_TYPE['AZUREBLOB'],account_name=account_name,account_key=account_key,containers=[{'container':container_name,'blobs': {'files': ['file_name']'folders': ['folder_name']    }  }])# For DataHubmanager=DBManager(db_type=constant.DB_TYPE['DATAHUB'],username=username,# sso usernamepassword=password,# sso passworddatahub_url=datahub_url,datahub_config=[{"name":"string",# dataset name"project_id":"project_id","node_id":"node_id","device_id":"device_id","tags": ["tag_name"    ]  }],uri=mongouri,# mongouri or influxuri# timeRange or timeLasttimeRange=[{'start':start_ts,'end':end_ts}],timeLast={'lastDays':lastDay,'lastHours':lastHour,'lastMins':lastMin})
How to get APM machine id and parameters

How to get DataHub project id, node id, device id and tag


DBManager.connect()

Connect to MySQL, PostgreSQL, MongoDB, InfluxDB, S3, APM with specified by the given config.

manager.connect()

DBManager.disconnect()

Close the connection.Note S3 datasource not support this function.

manager.disconnect()

DBManager.is_connected()

Return if the connection is connected.

manager.is_connected()

DBManager.is_connecting()

Return if the connection is connecting.

manager.is_connecting()

DBManager.get_dbtype()

Return database type of the connection.

manager.get_dbtype()# Return: str

DBManager.get_query()

Return query in the config.

manager.get_query()# MySQL, Oracle Database# Return type: String"""select {field} from {table} {condition}"""# PostgreSQL# Return type: String"""select {field} from {schema}.{table}"""# MongoDB# Return type: String"""{"{key}": {value}}"""# InfluxDB# Return type: String"""select {field_key} from {measurement_name}"""# S3# Return type: List"""[{  'bucket': 'bucket_name',  'blobs': {    'files': ['file_name'],    'folders': ['folder_name']  }}]"""# Azure Blob# Return type: List"""[{  'container': container_name,  'blobs': {    'files': ['file_name']    'folders': ['folder_name']  }}]"""# APM# Return type: Dict"""{  'apm_config': [{    'name': name  # dataset name    'machines': [{      'id': machine_id  # node_id in APM    }],    'parameters': [      parameter1,      # parameter in APM      parameter2    ]  }],  'time_range': [{'start': start_ts, 'end': end_ts}],  'time_last': {'lastDays': lastDay, 'lastHours': lastHour, 'lastMins': lastMin}}"""# DataHub# Return type: Dict"""{  'config': [{    "name": "string", # dataset name    "project_id": "project_id",    "node_id": "node_id",    "device_id": "device_id",    "tags": [      "tag_name"    ]  }],  'time_range': [{'start': start_ts, 'end': end_ts}],  'time_last': {'lastDays': lastDay, 'lastHours': lastHour, 'lastMins': lastMin}}"""

DBManager.execute_query(querySql=None)

Return the result in MySQL, PostgreSQL, MongoDB or InfluxDB after executing thequerySql in config orquerySql parameter.

Download files which are specified inbuckets in S3 config orcontainers in Azure Blob config, and returnbuckets andcontainers name of the array.If only download one csv file, then returndataframe.

Return dataframe of list which ofMachine andParameter intimeRange ortimeLast from APM.Return dataframe of list which ofTag intimeRange ortimeLast from DataHub.

# For MySQL, Postgres, MongoDB, InfluxDB, Oracle Database, APM and DataHubdf=manager.execute_query()# Return type: DataFrame"""      Age  Cabin  Embarked      Fare  ...  Sex  Survived  Ticket_info  Title20    22.0    7.0       2.0    7.2500  ...  1.0       0.0          2.0     2.01    38.0    2.0       0.0   71.2833  ...  0.0       1.0         14.0     3.02    26.0    7.0       2.0    7.9250  ...  0.0       1.0         31.0     1.03    35.0    2.0       2.0   53.1000  ...  0.0       1.0         36.0     3.04    35.0    7.0       2.0    8.0500  ...  1.0       0.0         36.0     2.0..."""# For Azure Blobcontainer_names=manager.execute_query()# Return Array# Return type: DataFrame"""['container1', 'container2']"""# or Return type: DataFrame"""      Age  Cabin  Embarked      Fare  ...  Sex  Survived  Ticket_info  Title20    22.0    7.0       2.0    7.2500  ...  1.0       0.0          2.0     2.01    38.0    2.0       0.0   71.2833  ...  0.0       1.0         14.0     3.02    26.0    7.0       2.0    7.9250  ...  0.0       1.0         31.0     1.03    35.0    2.0       2.0   53.1000  ...  0.0       1.0         36.0     3.04    35.0    7.0       2.0    8.0500  ...  1.0       0.0         36.0     2.0..."""# For S3bucket_names=manager.execute_query()# Return Array"""['bucket1', 'bucket2']"""# or Return type: DataFrame"""      Age  Cabin  Embarked      Fare  ...  Sex  Survived  Ticket_info  Title20    22.0    7.0       2.0    7.2500  ...  1.0       0.0          2.0     2.01    38.0    2.0       0.0   71.2833  ...  0.0       1.0         14.0     3.02    26.0    7.0       2.0    7.9250  ...  0.0       1.0         31.0     1.03    35.0    2.0       2.0   53.1000  ...  0.0       1.0         36.0     3.04    35.0    7.0       2.0    8.0500  ...  1.0       0.0         36.0     2.0..."""

DBManager.create_table(table_name, columns=[])

Create table in database for MySQL, Postgres, MongoDB and InfluxDB.Noted, to create a new measurement in influxdb simply insert data into the measurement.

Create Bucket/Container in S3/Azure Blob.

Note: PostgreSQL table_name formatschema.table

# For MySQL, Postgres, MongoDB and InfluxDBtable_name='titanic'columns= [  {'name':'index','type':'INTEGER','is_primary':True},  {'name':'survived','type':'FLOAT','is_not_null':True},  {'name':'age','type':'FLOAT'},  {'name':'embarked','type':'INTEGER'}]manager.create_table(table_name=table_name,columns=columns)# For S3bucket_name='bucket'manager.create_table(table_name=bucket_name)# For Azure Blobcontainer_name='container'manager.create_table(table_name=container_name)

DBManager.is_table_exist(table_name)

Return if the table exists in MySQL, Postgres, MongoDB or Influxdb.

Return if the bucket exists in S3.

Return if the container exists in Azure Blob.

# For Postgres, MongoDB and InfluxDBtable_name='titanic'manager.is_table_exist(table_name=table_name)# For S3bucket_name='bucket'manager.is_table_exist(table_name=bucket_name)# For Azure blobcontainer_name='container'manager.is_table_exist(table_name=container_name)

DBManager.is_file_exist(table_name, file_name)

Return if the file exists in the bucket in S3 & AWS S3.

Note this function only support S3 and AWS S3.

# For S3 & AWS S3bucket_name='bucket'file_name= 'test.csvmanager.is_file_exist(table_name=bucket_name,file_name=file_name)# Return: Boolean

DBManager.insert(table_name, columns=[], records=[], source='', destination='')

Insert records into table in MySQL, Postgres, MongoDB or InfluxDB.

Upload file to S3 and Azure Blob.

# For MySQL, Postgres, MongoDB and InfluxDBtable_name='titanic'columns= ['index','survived','age','embarked']records= [  [0,1,22.0,7.0],  [1,1,2.0,0.0],  [2,0,26.0,7.0]]manager.insert(table_name=table_name,columns=columns,records=records)# For S3bucket_name='bucket'source='test.csv'# local file pathdestination='test_s3.csv'# the file path and name in s3manager.insert(table_name=bucket_name,source=source,destination=destination)# For Azure Blobcontainer_name='container'source='test.csv'# local file pathdestination='test_s3.csv'# the file path and name in Azure blobmanager.insert(table_name=container_name,source=source,destination=destination)

Use APM data source

  • Get Hist Raw data from SCADA Mongo data base
  • Required
    • username: APM SSO username
    • password: APM SSO password
    • mongouri: mongo data base uri
    • apmurl: APM api url
    • apm_config: APM config (type:Array)
      • name: dataset name
      • machines: APM machine list (type:Array)
        • id: APM machine Id
      • parameters: APM parameter name list (type:Array)
    • time range: Training date range
      • example:
      [{'start':'2019-05-01', 'end':'2019-05-31'}]
    • time last: Training date range
      • example:
      {'lastDays:' 1, 'lastHours': 2, 'lastMins': 3}

DBManager.delete_table(table_name)

Delete table in MySQL, Postgres, MongoDB or InfluxDB, and return if the table is deleted successfully.

Delete the bucket in S3 and return if the table is deleted successfully.

Delete the container in Azure Blob and return if the table is deleted successfully.

# For Postgres, MongoDB or InfluxDBtable_name='titanic'is_success=manager.delete_table(table_name=table_name)# Return: Boolean# For S3bucket_name='bucket'is_success=manager.delete_table(table_name=bucket_name)# Return: Boolean# For Azure Blobcontainer_name='container'is_success=manager.delete_table(table_name=container_name)# Return: Boolean

DBManager.delete_record(table_name, file_name, condition)

Delete record withcondition intable_name in MySQL, Postgres and MongoDB, and return if delete successfully.

Delete file in bucket in S3 and in container in Azure Blob, and return if the file is deleted successfully.

Note Influx not support this function.

# For MySQL, Postgrestable_name='titanic'condition='passenger_id = 1'is_success=manager.delete_record(table_name=table_name,condition=condition)# Return: Boolean# For MongoDBtable_name='titanic'condition= {'passanger_id':1}is_success=manager.delete_record(table_name=table_name,condition=condition)# Return: Boolean# For S3bucket_name='bucket'file_name='data/titanic.csv'is_success=manager.delete_record(table_name=bucket_name,file_name=file_name)# Return: Boolean# For Azure Blobcontainer_name='container'file_name='data/titanic.csv'is_success=manager.delete_record(table_name=container_name,file_name=file_name)# Return: Boolean

Example

MongoDB Example

fromafs2datasourceimportDBManager,constant# Init DBManagermanager=DBManager(db_type=constant.DB_TYPE['MONGODB'],username={USERNAME},password={PASSWORD},host={HOST},port={PORT},database={DATABASE},collection={COLLECTION},querySql={QUERYSQL})## Mongo query ISODate ExampleQUERYSQL="{\"ts\": {\"$lte\": ISODate(\"2020-09-26T02:53:00Z\")}}"QUERYSQL= {'ts': {'$lte':datetime.datetime(2020,9,26,2,53,0)}}# Connect DBmanager.connect()# Check the status of connectionis_connected=manager.is_connected()# Return type: boolean# Check is the table is existtable_name='titanic'manager.is_table_exist(table_name)# Return type: boolean# Create Tablecolumns= [  {'name':'index','type':'INTEGER','is_not_null':True},  {'name':'survived','type':'INTEGER'},  {'name':'age','type':'FLOAT'},  {'name':'embarked','type':'INTEGER'}]manager.create_table(table_name=table_name,columns=columns)# Insert Recordcolumns= ['index','survived','age','embarked']records= [  [0,1,22.0,7.0],  [1,1,2.0,0.0],  [2,0,26.0,7.0]]manager.insert(table_name=table_name,columns=columns,records=records)# Execute querySql in DB configdata=manager.execute_query()# Return type: DataFrame"""      index  survived   age   embarked0         0         1   22.0       7.01         1         1    2.0       0.02         2         0   26.0       7.0..."""# Delete Documentcondition= {'survived':0}is_success=db.delete_record(table_name=table_name,condition=condition)# Return type: Boolean# Delete Tableis_success=db.delete_table(table_name=table_name)# Return type: Boolean# Disconnect to DBmanager.disconnect()

S3 Example

fromafs2datasourceimportDBManager,constant# Init DBManagermanager=DBManager(db_type=constant.DB_TYPE['S3'],endpoint={ENDPOINT},access_key={ACCESSKEY},secret_key={SECRETKEY},buckets=[{'bucket': {BUCKET_NAME},'blobs': {'files': ['titanic.csv'],'folders': ['models/']    }  }])# Connect S3manager.connect()# Check is the table is existbucket_name='titanic'manager.is_table_exist(table_name=bucket_name)# Return type: boolean# Create Bucketmanager.create_table(table_name=bucket_name)# Upload File to S3local_file='../titanic.csv's3_file='titanic.csv'manager.insert(table_name=bucket_name,source=local_file,destination=s3_file)# Download files in blob_list# Download all files in directorybucket_names=manager.execute_query()# Return type: Array# Check if file is exist or notis_exist=manager.is_file_exist(table_name=bucket_name,file_name=s3_file)# Return type: Boolean# Delete the file in Bucket and return if the file is deleted successfullyis_success=manager.delete_record(table_name=bucket_name,file_name=s3_file)# Return type: Boolean# Delete Bucketis_success=manager.delete_table(table_name=bucket_name)# Return type: Boolean

APM Data source example

APMDSHelper(username,password,apmurl,machineIdList,parameterList,mongouri,timeRange)APMDSHelper.execute()

Azure Blob Example

fromafs2datasourceimportDBManager,constant# Init DBManagermanager=DBManager(db_type=constant.DB_TYPE['AZUREBLOB'],account_key={ACCESS_KEY},account_name={ACCESS_NAME}containers=[{'container': {CONTAINER_NAME},'blobs': {'files': ['titanic.csv'],'folders': ['test/']   } }])# Connect Azure Blobmanager.connect()# Check is the container is existcontainer_name='container'manager.is_table_exist(table_name=container_name)# Return type: boolean# Create containermanager.create_table(table_name=container_name)# Upload File to Azure Bloblocal_file='../titanic.csv'azure_file='titanic.csv'manager.insert(table_name=container_name,source=local_file,destination=azure_file)# Download files in `containers`# Download all files in directorycontainer_names=manager.execute_query()# Return type: Array# Check if file is exist in container or notis_exist=manager.is_file_exist(table_name=container_name,file_name=azure_file)# Return type: Boolean# Delete Fileis_success=manager.delete_record(table_name=container_name,file_file=azure_file)# Delete Containeris_success=manager.delete_table(table_name=container_name)# Return type: Boolean

Oracle Example

Notice

fromafs2datasourceimportDBManager,constant# Init DBManagermanager=DBManager(db_type=constant.DB_TYPE['ORACLEDB'],username=username,password=password,host=host,port=port,dsn=dsb,querySql="select {field_key} from {measurement_name}"# only support `SELECT`)# Connect OracleDBmanager.connect()# Check is the container is existtable_name='table'manager.is_table_exist(table_name=table_name)# Return type: boolean# Execute querySql in DB configdata=manager.execute_query()# Return type: DataFrame"""      index  survived   age   embarked0         0         1   22.0       7.01         1         1    2.0       0.02         2         0   26.0       7.0..."""

About

DataSource SDK for WISE-PaaS AFS to analytics developer.

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Contributors3

  •  
  •  
  •  

Languages


[8]ページ先頭

©2009-2025 Movatter.jp