- Notifications
You must be signed in to change notification settings - Fork24
Treasure Data API library for Python
License
treasure-data/td-client-python
Folders and files
Name | Name | Last commit message | Last commit date | |
---|---|---|---|---|
Repository files navigation
Treasure Data API library for Python
td-client
supports the following versions of Python.
- Python 3.5+
- PyPy
You can install the releases fromPyPI.
$ pip install td-client
It'd be better to installcertifi to enable SSL certificate verification.
$ pip install certifi
Please see also the examples atTreasure Data Documentation.
The td-client documentation is hosted athttps://tdclient.readthedocs.io/,or you can go directly to theAPI documentation.
For information on the parameters that may be used when reading particulartypes of data, seeFile import parameters.
Treasure Data API key will be read from environment variableTD_API_KEY
, if none is given viaapikey=
argument passed totdclient.Client
.
Treasure Data API endpointhttps://api.treasuredata.com
is used by default. You can override this with environment variableTD_API_SERVER
, which in turn can be overridden viaendpoint=
argument passed totdclient.Client
. List of available Treasure Data sites and corresponding API endpoints can be foundhere.
importtdclientwithtdclient.Client()astd:forjobintd.jobs():print(job.job_id)
Running jobs on Treasure Data.
importtdclientwithtdclient.Client()astd:job=td.query("sample_datasets","SELECT COUNT(1) FROM www_access",type="hive")job.wait()forrowinjob.result():print(repr(row))
td-client-python implementsPEP 0249 Python Database API v2.0.You can use td-client-python with external libraries which supports Database API such likepandas.
importpandasimporttdclientdefon_waiting(cursor):print(cursor.job_status())withtdclient.connect(db="sample_datasets",type="presto",wait_callback=on_waiting)astd:data=pandas.read_sql("SELECT symbol, COUNT(1) AS c FROM nasdaq GROUP BY symbol",td)print(repr(data))
We offer another package for pandas namedpytd with some advanced features.You may prefer it if you need to do complicated things, such like exporting result data to Treasure Data, printing job'sprogress during long execution, etc.
Importing data into Treasure Data in streaming manner, as similar asfluentd is doing.
importsysimporttdclientwithtdclient.Client()astd:forfile_nameinsys.argv[:1]:td.import_file("mydb","mytbl","csv",file_name)
Warning
Importing data in streaming manner requires certain amount of time to be ready to query since schema update will beexecuted with delay.
Importing data into Treasure Data in batch manner.
importsysimporttdclientimportuuidimportwarningsiflen(sys.argv)<=1:sys.exit(0)withtdclient.Client()astd:session_name="session-{}".format(uuid.uuid1())bulk_import=td.create_bulk_import(session_name,"mydb","mytbl")try:forfile_nameinsys.argv[1:]:part_name="part-{}".format(file_name)bulk_import.upload_file(part_name,"json",file_name)bulk_import.freeze()except:bulk_import.delete()raisebulk_import.perform(wait=True)if0<bulk_import.error_records:warnings.warn("detected {} error records.".format(bulk_import.error_records))if0<bulk_import.valid_records:print("imported {} records.".format(bulk_import.valid_records))else:raise(RuntimeError("no records have been imported: {}".format(bulk_import.name)))bulk_import.commit(wait=True)bulk_import.delete()
If you want to import data asmsgpack format, you can write as follows:
importioimporttimeimportuuidimportwarningsimporttdclientt1=int(time.time())l1= [{"a":1,"b":2,"time":t1}, {"a":3,"b":9,"time":t1}]withtdclient.Client()astd:session_name="session-{}".format(uuid.uuid1())bulk_import=td.create_bulk_import(session_name,"mydb","mytbl")try:_bytes=tdclient.util.create_msgpack(l1)bulk_import.upload_file("part","msgpack",io.BytesIO(_bytes))bulk_import.freeze()except:bulk_import.delete()raisebulk_import.perform(wait=True)# same as the above example
Thetd-client
package will generally make sensible choices on how to readthe columns in CSV and TSV data, but sometimes the user needs to override thedefault mechanism. This can be done using the optionalfile importparametersdtypes
andconverters
.
For instance, consider CSV data that starts with the following records:
time,col1,col2,col31575454204,a,0001,a;b;c1575454204,b,0002,d;e;f
If that data is read using the defaults, it will produce values that looklike:
1575454204,"a",1,"a;b;c"1575454204,"b",2,"d;e;f"
that is, an integer, a string, an integer and another string.
If the user wants to keep the leading zeroes incol2
, then they canspecify the column datatype as string. For instance, usingbulk_import.upload_file
to read data frominput_data
:
bulk_import.upload_file("part","msgpack",input_data,dtypes={"col2":"str"},)
which would produce:
1575454204,"a","0001","a;b;c"1575454204,"b","0002","d;e;f"
If they also wanted to treatcol3
as a sequence of strings, separated bysemicolons, then they could specify a function to processcol3
:
bulk_import.upload_file("part","msgpack",input_data,dtypes={"col2":"str"},converters={"col3",lambdax:x.split(";")},)
which would produce:
1575454204,"a","0001", ["a","b","c"]1575454204,"b","0002", ["d","e","f"]
Run tests.
$ pytest tdclient
You can run tests against all supported Python versions. I'd recommend you to installpyenv to manage Pythons.
$ pyenv shell system$forversionin$(cat .python-version);do [-d"$(pyenv root)/versions/${version}" ]|| pyenv install"${version}";done$ pyenv shell --unset
Installtox.
$ pip install tox
Then, runtox
.
$ tox
- Update version x.x.x in pyproject.toml.
- Create a PR with release-x.x.x branch. Request and merge the PR.
- Create and push a tag x.x.x on release-x.x.x merge commit.
- Create a Release on GitHub will publish new version to PyPI.
If you want to release manually, you can upload by twine.
$ python -m build$ twine upload dist/*
Apache Software License, Version 2.0
About
Treasure Data API library for Python
Resources
License
Security policy
Uh oh!
There was an error while loading.Please reload this page.
Stars
Watchers
Forks
Packages0
Uh oh!
There was an error while loading.Please reload this page.