Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Treasure Data API library for Python

License

NotificationsYou must be signed in to change notification settings

treasure-data/td-client-python

Repository files navigation

Build Status on GitHub ActionsBuild statusPyPI version

Treasure Data API library for Python

Requirements

td-client supports the following versions of Python.

  • Python 3.5+
  • PyPy

Install

You can install the releases fromPyPI.

$ pip install td-client

It'd be better to installcertifi to enable SSL certificate verification.

$ pip install certifi

Examples

Please see also the examples atTreasure Data Documentation.

The td-client documentation is hosted athttps://tdclient.readthedocs.io/,or you can go directly to theAPI documentation.

For information on the parameters that may be used when reading particulartypes of data, seeFile import parameters.

Listing jobs

Treasure Data API key will be read from environment variableTD_API_KEY, if none is given viaapikey= argument passed totdclient.Client.

Treasure Data API endpointhttps://api.treasuredata.com is used by default. You can override this with environment variableTD_API_SERVER, which in turn can be overridden viaendpoint= argument passed totdclient.Client. List of available Treasure Data sites and corresponding API endpoints can be foundhere.

importtdclientwithtdclient.Client()astd:forjobintd.jobs():print(job.job_id)

Running jobs

Running jobs on Treasure Data.

importtdclientwithtdclient.Client()astd:job=td.query("sample_datasets","SELECT COUNT(1) FROM www_access",type="hive")job.wait()forrowinjob.result():print(repr(row))

Running jobs via DBAPI2

td-client-python implementsPEP 0249 Python Database API v2.0.You can use td-client-python with external libraries which supports Database API such likepandas.

importpandasimporttdclientdefon_waiting(cursor):print(cursor.job_status())withtdclient.connect(db="sample_datasets",type="presto",wait_callback=on_waiting)astd:data=pandas.read_sql("SELECT symbol, COUNT(1) AS c FROM nasdaq GROUP BY symbol",td)print(repr(data))

We offer another package for pandas namedpytd with some advanced features.You may prefer it if you need to do complicated things, such like exporting result data to Treasure Data, printing job'sprogress during long execution, etc.

Importing data

Importing data into Treasure Data in streaming manner, as similar asfluentd is doing.

importsysimporttdclientwithtdclient.Client()astd:forfile_nameinsys.argv[:1]:td.import_file("mydb","mytbl","csv",file_name)

Warning

Importing data in streaming manner requires certain amount of time to be ready to query since schema update will beexecuted with delay.

Bulk import

Importing data into Treasure Data in batch manner.

importsysimporttdclientimportuuidimportwarningsiflen(sys.argv)<=1:sys.exit(0)withtdclient.Client()astd:session_name="session-{}".format(uuid.uuid1())bulk_import=td.create_bulk_import(session_name,"mydb","mytbl")try:forfile_nameinsys.argv[1:]:part_name="part-{}".format(file_name)bulk_import.upload_file(part_name,"json",file_name)bulk_import.freeze()except:bulk_import.delete()raisebulk_import.perform(wait=True)if0<bulk_import.error_records:warnings.warn("detected {} error records.".format(bulk_import.error_records))if0<bulk_import.valid_records:print("imported {} records.".format(bulk_import.valid_records))else:raise(RuntimeError("no records have been imported: {}".format(bulk_import.name)))bulk_import.commit(wait=True)bulk_import.delete()

If you want to import data asmsgpack format, you can write as follows:

importioimporttimeimportuuidimportwarningsimporttdclientt1=int(time.time())l1= [{"a":1,"b":2,"time":t1}, {"a":3,"b":9,"time":t1}]withtdclient.Client()astd:session_name="session-{}".format(uuid.uuid1())bulk_import=td.create_bulk_import(session_name,"mydb","mytbl")try:_bytes=tdclient.util.create_msgpack(l1)bulk_import.upload_file("part","msgpack",io.BytesIO(_bytes))bulk_import.freeze()except:bulk_import.delete()raisebulk_import.perform(wait=True)# same as the above example

Changing how CSV and TSV columns are read

Thetd-client package will generally make sensible choices on how to readthe columns in CSV and TSV data, but sometimes the user needs to override thedefault mechanism. This can be done using the optionalfile importparametersdtypes andconverters.

For instance, consider CSV data that starts with the following records:

time,col1,col2,col31575454204,a,0001,a;b;c1575454204,b,0002,d;e;f

If that data is read using the defaults, it will produce values that looklike:

1575454204,"a",1,"a;b;c"1575454204,"b",2,"d;e;f"

that is, an integer, a string, an integer and another string.

If the user wants to keep the leading zeroes incol2, then they canspecify the column datatype as string. For instance, usingbulk_import.upload_file to read data frominput_data:

bulk_import.upload_file("part","msgpack",input_data,dtypes={"col2":"str"},)

which would produce:

1575454204,"a","0001","a;b;c"1575454204,"b","0002","d;e;f"

If they also wanted to treatcol3 as a sequence of strings, separated bysemicolons, then they could specify a function to processcol3:

bulk_import.upload_file("part","msgpack",input_data,dtypes={"col2":"str"},converters={"col3",lambdax:x.split(";")},)

which would produce:

1575454204,"a","0001", ["a","b","c"]1575454204,"b","0002", ["d","e","f"]

Development

Running tests

Run tests.

$ pytest tdclient

Running tests (tox)

You can run tests against all supported Python versions. I'd recommend you to installpyenv to manage Pythons.

$ pyenv shell system$forversionin$(cat .python-version);do [-d"$(pyenv root)/versions/${version}" ]|| pyenv install"${version}";done$ pyenv shell --unset

Installtox.

$ pip install tox

Then, runtox.

$ tox

Release

  1. Update version x.x.x in pyproject.toml.
  2. Create a PR with release-x.x.x branch. Request and merge the PR.
  3. Create and push a tag x.x.x on release-x.x.x merge commit.
  4. Create a Release on GitHub will publish new version to PyPI.

Manual release

If you want to release manually, you can upload by twine.

$ python -m build$ twine upload dist/*

License

Apache Software License, Version 2.0

About

Treasure Data API library for Python

Resources

License

Security policy

Stars

Watchers

Forks

Packages

No packages published

Languages


[8]ページ先頭

©2009-2025 Movatter.jp