- Notifications
You must be signed in to change notification settings - Fork0
DataSource SDK for WISE-PaaS AFS to analytics developer.
License
WISE-AIFS/afs2-datasource
Folders and files
Name | Name | Last commit message | Last commit date | |
---|---|---|---|---|
Repository files navigation
The AFS2-DataSource SDK package allows developers to easily access PostgreSQL, MongoDB, InfluxDB, S3 and APM.
Support Python version 3.6 or later
pip install afs2-datasource
pip install -e .
AFS2-DataSource SDK usesasyncio
package, and Jupyter kernel is also usingasyncio
and running an event loop, but these loops can't be nested.(jupyter/notebook#3397)
If using AFS2-DataSource SDK in Jupyter Notebook, please add the following codes to resolve this issue:
!pipinstallnest_asyncioimportnest_asyncionest_asyncio.apply()
Init DBManager
DBManager.connect()
DBManager.disconnect()
DBManager.is_connected()
DBManager.is_connecting()
DBManager.get_dbtype()
DBManager.get_query()
DBManager.execute_query()
DBManager.create_table(table_name, columns)
DBManager.is_table_exist(table_name)
DBManager.is_file_exist(table_name, file_name)
DBManager.insert(table_name, columns, records, source, destination)
DBManager.delete_table(table_name)
DBManager.delete_record(table_name, file_name, condition)
Import database config via Python.
fromafs2datasourceimportDBManager,constant# For MySQLmanager=DBManager(db_type=constant.DB_TYPE['MYSQL'],username=username,password=password,host=host,port=port,database=database,querySql="select {field} from {table}")# For SQLServermanager=DBManager(db_type=constant.DB_TYPE['SQLSERVER'],username=username,password=password,host=host,port=port,database=database,querySql="select {field} from {table}"# only support `SELECT`)# For PostgreSQLmanager=DBManager(db_type=constant.DB_TYPE['POSTGRES'],username=username,password=password,host=host,port=port,database=database,querySql="select {field} from {schema}.{table}")# For MongoDBmanager=DBManager(db_type=constant.DB_TYPE['MONGODB'],username=username,password=password,host=host,port=port,database=database,collection=collection,querySql="{}")# For InfluxDBmanager=DBManager(db_type=constant.DB_TYPE['INFLUXDB'],username=username,password=password,host=host,port=port,database=database,querySql="select {field_key} from {measurement_name}")# For Oracle Databasemanager=DBManagerdb_type=constant.DB_TYPE['ORACLEDB'],username=username,password=password,host=host,port=port,database=database,querySql="select {field_key} from {measurement_name}"# only support `SELECT`)# For S3manager=DBManager(db_type=constant.DB_TYPE['S3'],endpoint=endpoint,access_key=access_key,secret_key=secret_key,is_verify=False,buckets=[{'bucket':'bucket_name','blobs': {'files': ['file_name'],'folders': ['folder_name'] } }])# For AWS S3manager=DBManager(db_type=constant.DB_TYPE['AWS'],access_key=access_key,secret_key=secret_key,buckets=[{'bucket':'bucket_name','blobs': {'files': ['file_name'],'folders': ['folder_name'] } }])# For APMmanager=DBManager(db_type=constant.DB_TYPE['APM'],username=username,# sso usernamepassword=password,# sso passwordapmUrl=apmUrl,apm_config=[{'name':name# dataset name'machines': [{'id':machine_id# node_id in APM }],'parameters': [parameter1,# parameter in APMparameter2 ] }],mongouri=mongouri,# timeRange or timeLasttimeRange=[{'start':start_ts,'end':end_ts}],timeLast={'lastDays':lastDay,'lastHours':lastHour,'lastMins':lastMin})# For Azure Blobmanager=DBManager(db_type=constant.DB_TYPE['AZUREBLOB'],account_name=account_name,account_key=account_key,containers=[{'container':container_name,'blobs': {'files': ['file_name']'folders': ['folder_name'] } }])# For DataHubmanager=DBManager(db_type=constant.DB_TYPE['DATAHUB'],username=username,# sso usernamepassword=password,# sso passworddatahub_url=datahub_url,datahub_config=[{"name":"string",# dataset name"project_id":"project_id","node_id":"node_id","device_id":"device_id","tags": ["tag_name" ] }],uri=mongouri,# mongouri or influxuri# timeRange or timeLasttimeRange=[{'start':start_ts,'end':end_ts}],timeLast={'lastDays':lastDay,'lastHours':lastHour,'lastMins':lastMin})
Connect to MySQL, PostgreSQL, MongoDB, InfluxDB, S3, APM with specified by the given config.
manager.connect()
Close the connection.Note S3 datasource not support this function.
manager.disconnect()
Return if the connection is connected.
manager.is_connected()
Return if the connection is connecting.
manager.is_connecting()
Return database type of the connection.
manager.get_dbtype()# Return: str
Return query in the config.
manager.get_query()# MySQL, Oracle Database# Return type: String"""select {field} from {table} {condition}"""# PostgreSQL# Return type: String"""select {field} from {schema}.{table}"""# MongoDB# Return type: String"""{"{key}": {value}}"""# InfluxDB# Return type: String"""select {field_key} from {measurement_name}"""# S3# Return type: List"""[{ 'bucket': 'bucket_name', 'blobs': { 'files': ['file_name'], 'folders': ['folder_name'] }}]"""# Azure Blob# Return type: List"""[{ 'container': container_name, 'blobs': { 'files': ['file_name'] 'folders': ['folder_name'] }}]"""# APM# Return type: Dict"""{ 'apm_config': [{ 'name': name # dataset name 'machines': [{ 'id': machine_id # node_id in APM }], 'parameters': [ parameter1, # parameter in APM parameter2 ] }], 'time_range': [{'start': start_ts, 'end': end_ts}], 'time_last': {'lastDays': lastDay, 'lastHours': lastHour, 'lastMins': lastMin}}"""# DataHub# Return type: Dict"""{ 'config': [{ "name": "string", # dataset name "project_id": "project_id", "node_id": "node_id", "device_id": "device_id", "tags": [ "tag_name" ] }], 'time_range': [{'start': start_ts, 'end': end_ts}], 'time_last': {'lastDays': lastDay, 'lastHours': lastHour, 'lastMins': lastMin}}"""
Return the result in MySQL, PostgreSQL, MongoDB or InfluxDB after executing thequerySql
in config orquerySql
parameter.
Download files which are specified inbuckets
in S3 config orcontainers
in Azure Blob config, and returnbuckets
andcontainers
name of the array.If only download one csv file, then returndataframe
.
Return dataframe of list which ofMachine
andParameter
intimeRange
ortimeLast
from APM.Return dataframe of list which ofTag
intimeRange
ortimeLast
from DataHub.
# For MySQL, Postgres, MongoDB, InfluxDB, Oracle Database, APM and DataHubdf=manager.execute_query()# Return type: DataFrame""" Age Cabin Embarked Fare ... Sex Survived Ticket_info Title20 22.0 7.0 2.0 7.2500 ... 1.0 0.0 2.0 2.01 38.0 2.0 0.0 71.2833 ... 0.0 1.0 14.0 3.02 26.0 7.0 2.0 7.9250 ... 0.0 1.0 31.0 1.03 35.0 2.0 2.0 53.1000 ... 0.0 1.0 36.0 3.04 35.0 7.0 2.0 8.0500 ... 1.0 0.0 36.0 2.0..."""# For Azure Blobcontainer_names=manager.execute_query()# Return Array# Return type: DataFrame"""['container1', 'container2']"""# or Return type: DataFrame""" Age Cabin Embarked Fare ... Sex Survived Ticket_info Title20 22.0 7.0 2.0 7.2500 ... 1.0 0.0 2.0 2.01 38.0 2.0 0.0 71.2833 ... 0.0 1.0 14.0 3.02 26.0 7.0 2.0 7.9250 ... 0.0 1.0 31.0 1.03 35.0 2.0 2.0 53.1000 ... 0.0 1.0 36.0 3.04 35.0 7.0 2.0 8.0500 ... 1.0 0.0 36.0 2.0..."""# For S3bucket_names=manager.execute_query()# Return Array"""['bucket1', 'bucket2']"""# or Return type: DataFrame""" Age Cabin Embarked Fare ... Sex Survived Ticket_info Title20 22.0 7.0 2.0 7.2500 ... 1.0 0.0 2.0 2.01 38.0 2.0 0.0 71.2833 ... 0.0 1.0 14.0 3.02 26.0 7.0 2.0 7.9250 ... 0.0 1.0 31.0 1.03 35.0 2.0 2.0 53.1000 ... 0.0 1.0 36.0 3.04 35.0 7.0 2.0 8.0500 ... 1.0 0.0 36.0 2.0..."""
Create table in database for MySQL, Postgres, MongoDB and InfluxDB.Noted, to create a new measurement in influxdb simply insert data into the measurement.
Create Bucket/Container in S3/Azure Blob.
Note: PostgreSQL table_name formatschema.table
# For MySQL, Postgres, MongoDB and InfluxDBtable_name='titanic'columns= [ {'name':'index','type':'INTEGER','is_primary':True}, {'name':'survived','type':'FLOAT','is_not_null':True}, {'name':'age','type':'FLOAT'}, {'name':'embarked','type':'INTEGER'}]manager.create_table(table_name=table_name,columns=columns)# For S3bucket_name='bucket'manager.create_table(table_name=bucket_name)# For Azure Blobcontainer_name='container'manager.create_table(table_name=container_name)
Return if the table exists in MySQL, Postgres, MongoDB or Influxdb.
Return if the bucket exists in S3.
Return if the container exists in Azure Blob.
# For Postgres, MongoDB and InfluxDBtable_name='titanic'manager.is_table_exist(table_name=table_name)# For S3bucket_name='bucket'manager.is_table_exist(table_name=bucket_name)# For Azure blobcontainer_name='container'manager.is_table_exist(table_name=container_name)
Return if the file exists in the bucket in S3 & AWS S3.
Note this function only support S3 and AWS S3.
# For S3 & AWS S3bucket_name='bucket'file_name= 'test.csvmanager.is_file_exist(table_name=bucket_name,file_name=file_name)# Return: Boolean
Insert records into table in MySQL, Postgres, MongoDB or InfluxDB.
Upload file to S3 and Azure Blob.
# For MySQL, Postgres, MongoDB and InfluxDBtable_name='titanic'columns= ['index','survived','age','embarked']records= [ [0,1,22.0,7.0], [1,1,2.0,0.0], [2,0,26.0,7.0]]manager.insert(table_name=table_name,columns=columns,records=records)# For S3bucket_name='bucket'source='test.csv'# local file pathdestination='test_s3.csv'# the file path and name in s3manager.insert(table_name=bucket_name,source=source,destination=destination)# For Azure Blobcontainer_name='container'source='test.csv'# local file pathdestination='test_s3.csv'# the file path and name in Azure blobmanager.insert(table_name=container_name,source=source,destination=destination)
- Get Hist Raw data from SCADA Mongo data base
- Required
- username: APM SSO username
- password: APM SSO password
- mongouri: mongo data base uri
- apmurl: APM api url
- apm_config: APM config (type:Array)
- name: dataset name
- machines: APM machine list (type:Array)
- id: APM machine Id
- parameters: APM parameter name list (type:Array)
- time range: Training date range
- example:
[{'start':'2019-05-01', 'end':'2019-05-31'}]
- time last: Training date range
- example:
{'lastDays:' 1, 'lastHours': 2, 'lastMins': 3}
Delete table in MySQL, Postgres, MongoDB or InfluxDB, and return if the table is deleted successfully.
Delete the bucket in S3 and return if the table is deleted successfully.
Delete the container in Azure Blob and return if the table is deleted successfully.
# For Postgres, MongoDB or InfluxDBtable_name='titanic'is_success=manager.delete_table(table_name=table_name)# Return: Boolean# For S3bucket_name='bucket'is_success=manager.delete_table(table_name=bucket_name)# Return: Boolean# For Azure Blobcontainer_name='container'is_success=manager.delete_table(table_name=container_name)# Return: Boolean
Delete record withcondition
intable_name
in MySQL, Postgres and MongoDB, and return if delete successfully.
Delete file in bucket in S3 and in container in Azure Blob, and return if the file is deleted successfully.
Note Influx not support this function.
# For MySQL, Postgrestable_name='titanic'condition='passenger_id = 1'is_success=manager.delete_record(table_name=table_name,condition=condition)# Return: Boolean# For MongoDBtable_name='titanic'condition= {'passanger_id':1}is_success=manager.delete_record(table_name=table_name,condition=condition)# Return: Boolean# For S3bucket_name='bucket'file_name='data/titanic.csv'is_success=manager.delete_record(table_name=bucket_name,file_name=file_name)# Return: Boolean# For Azure Blobcontainer_name='container'file_name='data/titanic.csv'is_success=manager.delete_record(table_name=container_name,file_name=file_name)# Return: Boolean
fromafs2datasourceimportDBManager,constant# Init DBManagermanager=DBManager(db_type=constant.DB_TYPE['MONGODB'],username={USERNAME},password={PASSWORD},host={HOST},port={PORT},database={DATABASE},collection={COLLECTION},querySql={QUERYSQL})## Mongo query ISODate ExampleQUERYSQL="{\"ts\": {\"$lte\": ISODate(\"2020-09-26T02:53:00Z\")}}"QUERYSQL= {'ts': {'$lte':datetime.datetime(2020,9,26,2,53,0)}}# Connect DBmanager.connect()# Check the status of connectionis_connected=manager.is_connected()# Return type: boolean# Check is the table is existtable_name='titanic'manager.is_table_exist(table_name)# Return type: boolean# Create Tablecolumns= [ {'name':'index','type':'INTEGER','is_not_null':True}, {'name':'survived','type':'INTEGER'}, {'name':'age','type':'FLOAT'}, {'name':'embarked','type':'INTEGER'}]manager.create_table(table_name=table_name,columns=columns)# Insert Recordcolumns= ['index','survived','age','embarked']records= [ [0,1,22.0,7.0], [1,1,2.0,0.0], [2,0,26.0,7.0]]manager.insert(table_name=table_name,columns=columns,records=records)# Execute querySql in DB configdata=manager.execute_query()# Return type: DataFrame""" index survived age embarked0 0 1 22.0 7.01 1 1 2.0 0.02 2 0 26.0 7.0..."""# Delete Documentcondition= {'survived':0}is_success=db.delete_record(table_name=table_name,condition=condition)# Return type: Boolean# Delete Tableis_success=db.delete_table(table_name=table_name)# Return type: Boolean# Disconnect to DBmanager.disconnect()
fromafs2datasourceimportDBManager,constant# Init DBManagermanager=DBManager(db_type=constant.DB_TYPE['S3'],endpoint={ENDPOINT},access_key={ACCESSKEY},secret_key={SECRETKEY},buckets=[{'bucket': {BUCKET_NAME},'blobs': {'files': ['titanic.csv'],'folders': ['models/'] } }])# Connect S3manager.connect()# Check is the table is existbucket_name='titanic'manager.is_table_exist(table_name=bucket_name)# Return type: boolean# Create Bucketmanager.create_table(table_name=bucket_name)# Upload File to S3local_file='../titanic.csv's3_file='titanic.csv'manager.insert(table_name=bucket_name,source=local_file,destination=s3_file)# Download files in blob_list# Download all files in directorybucket_names=manager.execute_query()# Return type: Array# Check if file is exist or notis_exist=manager.is_file_exist(table_name=bucket_name,file_name=s3_file)# Return type: Boolean# Delete the file in Bucket and return if the file is deleted successfullyis_success=manager.delete_record(table_name=bucket_name,file_name=s3_file)# Return type: Boolean# Delete Bucketis_success=manager.delete_table(table_name=bucket_name)# Return type: Boolean
APMDSHelper(username,password,apmurl,machineIdList,parameterList,mongouri,timeRange)APMDSHelper.execute()
fromafs2datasourceimportDBManager,constant# Init DBManagermanager=DBManager(db_type=constant.DB_TYPE['AZUREBLOB'],account_key={ACCESS_KEY},account_name={ACCESS_NAME}containers=[{'container': {CONTAINER_NAME},'blobs': {'files': ['titanic.csv'],'folders': ['test/'] } }])# Connect Azure Blobmanager.connect()# Check is the container is existcontainer_name='container'manager.is_table_exist(table_name=container_name)# Return type: boolean# Create containermanager.create_table(table_name=container_name)# Upload File to Azure Bloblocal_file='../titanic.csv'azure_file='titanic.csv'manager.insert(table_name=container_name,source=local_file,destination=azure_file)# Download files in `containers`# Download all files in directorycontainer_names=manager.execute_query()# Return type: Array# Check if file is exist in container or notis_exist=manager.is_file_exist(table_name=container_name,file_name=azure_file)# Return type: Boolean# Delete Fileis_success=manager.delete_record(table_name=container_name,file_file=azure_file)# Delete Containeris_success=manager.delete_table(table_name=container_name)# Return type: Boolean
- Install OracleDB client Documents
fromafs2datasourceimportDBManager,constant# Init DBManagermanager=DBManager(db_type=constant.DB_TYPE['ORACLEDB'],username=username,password=password,host=host,port=port,dsn=dsb,querySql="select {field_key} from {measurement_name}"# only support `SELECT`)# Connect OracleDBmanager.connect()# Check is the container is existtable_name='table'manager.is_table_exist(table_name=table_name)# Return type: boolean# Execute querySql in DB configdata=manager.execute_query()# Return type: DataFrame""" index survived age embarked0 0 1 22.0 7.01 1 1 2.0 0.02 2 0 26.0 7.0..."""
About
DataSource SDK for WISE-PaaS AFS to analytics developer.
Topics
Resources
License
Uh oh!
There was an error while loading.Please reload this page.
Stars
Watchers
Forks
Packages0
Uh oh!
There was an error while loading.Please reload this page.
Contributors3
Uh oh!
There was an error while loading.Please reload this page.