Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

InfluxDB 2.0 python client

License

NotificationsYou must be signed in to change notification settings

influxdata/influxdb-client-python

 
 

Repository files navigation

CircleCIcodecovCI statusPyPI packageAnaconda.org packageSupported Python versionsDocumentation statusSlack Status

This repository contains the Python client library for the InfluxDB 2.0.

Note: Use this client library with InfluxDB 2.x and InfluxDB 1.8+. For connecting to InfluxDB 1.7 or earlier instances, use theinfluxdb-pythonclient library.The API of theinfluxdb-client-python is not the backwards-compatible with the old one -influxdb-python.

Documentation

This section contains links to the client library documentation.

InfluxDB 2.0 client features

Installation

InfluxDB python library usesRxPY - The Reactive Extensions for Python (RxPY).

Python 3.7 or later is required.

Note

It is recommended to useciso8601 with client for parsing dates.ciso8601 is much faster than built-in Python datetime. Since it's written as aC module the best way is build it from sources:

Windows:

You have to installVisual C++ Build Tools 2015 to buildciso8601 bypip.

conda:

Install from sources:conda install -c conda-forge/label/cf202003 ciso8601.

pip install

The python package is hosted onPyPI, you can install latest version directly:

pip install'influxdb-client'

Then import the package:

importinfluxdb_client

There are additional package extras that will pull in additional dependencies:

  • async: async/await support
  • ciso: faster date and time parsing
  • extra: Pandas and NumPy support
  • sql: SQL client support

For example if your application uses async/await in Python you can install theasync extra:

$ pip install influxdb-client[async]

For more info seHow to use Asyncio.

Setuptools

Install viaSetuptools.

python setup.py install --user

(orsudo python setup.py install to install the package for all users)

Getting Started

Please follow theInstallation and then run the following:

frominfluxdb_clientimportInfluxDBClient,Pointfrominfluxdb_client.client.write_apiimportSYNCHRONOUSbucket="my-bucket"client=InfluxDBClient(url="http://localhost:8086",token="my-token",org="my-org")write_api=client.write_api(write_options=SYNCHRONOUS)query_api=client.query_api()p=Point("my_measurement").tag("location","Prague").field("temperature",25.3)write_api.write(bucket=bucket,record=p)## using Table structuretables=query_api.query('from(bucket:"my-bucket") |> range(start: -10m)')fortableintables:print(table)forrowintable.records:print (row.values)## using csv librarycsv_result=query_api.query_csv('from(bucket:"my-bucket") |> range(start: -10m)')val_count=0forrowincsv_result:forcellinrow:val_count+=1

Client configuration

Via File

A client can be configured via*.ini file in segmentinflux2.

The following options are supported:

  • url - the url to connect to InfluxDB
  • org - default destination organization for writes and queries
  • token - the token to use for the authorization
  • timeout - socket timeout in ms (default value is 10000)
  • verify_ssl - set this to false to skip verifying SSL certificate when calling API from https server
  • ssl_ca_cert - set this to customize the certificate file to verify the peer
  • cert_file - path to the certificate that will be used for mTLS authentication
  • cert_key_file - path to the file contains private key for mTLS certificate
  • cert_key_password - string or function which returns password for decrypting the mTLS private key
  • connection_pool_maxsize - set the number of connections to save that can be reused by urllib3
  • auth_basic - enable http basic authentication when talking to a InfluxDB 1.8.x without authentication but is accessed via reverse proxy with basic authentication (defaults to false)
  • profilers - set the list of enabledFlux profilers
self.client=InfluxDBClient.from_config_file("config.ini")
[influx2]url=http://localhost:8086org=my-orgtoken=my-tokentimeout=6000verify_ssl=False

Via Environment Properties

A client can be configured via environment properties.

Supported properties are:

  • INFLUXDB_V2_URL - the url to connect to InfluxDB
  • INFLUXDB_V2_ORG - default destination organization for writes and queries
  • INFLUXDB_V2_TOKEN - the token to use for the authorization
  • INFLUXDB_V2_TIMEOUT - socket timeout in ms (default value is 10000)
  • INFLUXDB_V2_VERIFY_SSL - set this to false to skip verifying SSL certificate when calling API from https server
  • INFLUXDB_V2_SSL_CA_CERT - set this to customize the certificate file to verify the peer
  • INFLUXDB_V2_CERT_FILE - path to the certificate that will be used for mTLS authentication
  • INFLUXDB_V2_CERT_KEY_FILE - path to the file contains private key for mTLS certificate
  • INFLUXDB_V2_CERT_KEY_PASSWORD - string or function which returns password for decrypting the mTLS private key
  • INFLUXDB_V2_CONNECTION_POOL_MAXSIZE - set the number of connections to save that can be reused by urllib3
  • INFLUXDB_V2_AUTH_BASIC - enable http basic authentication when talking to a InfluxDB 1.8.x without authentication but is accessed via reverse proxy with basic authentication (defaults to false)
  • INFLUXDB_V2_PROFILERS - set the list of enabledFlux profilers
self.client=InfluxDBClient.from_env_properties()

Profile query

TheFlux Profiler package providesperformance profiling tools for Flux queries and operations.

You can enable printing profiler information of the Flux query in client library by:

  • set QueryOptions.profilers in QueryApi,
  • setINFLUXDB_V2_PROFILERS environment variable,
  • setprofilers option in configuration file.

When the profiler is enabled, the result of flux query contains additional tables "profiler/*".In order to have consistent behaviour with enabled/disabled profiler,FluxCSVParser excludes "profiler/*" measurementsfrom result.

Example how to enable profilers using API:

q='''    from(bucket: stringParam)      |> range(start: -5m, stop: now())      |> filter(fn: (r) => r._measurement == "mem")      |> filter(fn: (r) => r._field == "available" or r._field == "free" or r._field == "used")      |> aggregateWindow(every: 1m, fn: mean)      |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'''p= {"stringParam":"my-bucket",}query_api=client.query_api(query_options=QueryOptions(profilers=["query","operator"]))csv_result=query_api.query(query=q,params=p)

Example of a profiler output:

===============Profiler: query===============from(bucket: stringParam)  |> range(start: -5m, stop: now())  |> filter(fn: (r) => r._measurement == "mem")  |> filter(fn: (r) => r._field == "available" or r._field == "free" or r._field == "used")  |> aggregateWindow(every: 1m, fn: mean)  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")========================Profiler: profiler/query========================result              : _profilertable               : 0_measurement        : profiler/queryTotalDuration       : 8924700CompileDuration     : 350900QueueDuration       : 33800PlanDuration        : 0RequeueDuration     : 0ExecuteDuration     : 8486500Concurrency         : 0MaxAllocated        : 2072TotalAllocated      : 0flux/query-plan     :digraph {  ReadWindowAggregateByTime11  // every = 1m, aggregates = [mean], createEmpty = true, timeColumn = "_stop"  pivot8  generated_yield  ReadWindowAggregateByTime11 -> pivot8  pivot8 -> generated_yield}influxdb/scanned-bytes: 0influxdb/scanned-values: 0===========================Profiler: profiler/operator===========================result              : _profilertable               : 1_measurement        : profiler/operatorType                : *universe.pivotTransformationLabel               : pivot8Count               : 3MinDuration         : 32600MaxDuration         : 126200DurationSum         : 193400MeanDuration        : 64466.666666666664===========================Profiler: profiler/operator===========================result              : _profilertable               : 1_measurement        : profiler/operatorType                : *influxdb.readWindowAggregateSourceLabel               : ReadWindowAggregateByTime11Count               : 1MinDuration         : 940500MaxDuration         : 940500DurationSum         : 940500MeanDuration        : 940500.0

You can also use callback function to get profilers output.Return value of this callback is type of FluxRecord.

Example how to use profilers with callback:

classProfilersCallback(object):def__init__(self):self.records= []def__call__(self,flux_record):self.records.append(flux_record.values)callback=ProfilersCallback()query_api=client.query_api(query_options=QueryOptions(profilers=["query","operator"],profiler_callback=callback))tables=query_api.query('from(bucket:"my-bucket") |> range(start: -10m)')forprofilerincallback.records:print(f'Custom processing of profiler result:{profiler}')

Example output of this callback:

Custom processing of profiler result: {'result': '_profiler', 'table': 0, '_measurement': 'profiler/query', 'TotalDuration': 18843792, 'CompileDuration': 1078666, 'QueueDuration': 93375, 'PlanDuration': 0, 'RequeueDuration': 0, 'ExecuteDuration': 17371000, 'Concurrency': 0, 'MaxAllocated': 448, 'TotalAllocated': 0, 'RuntimeErrors': None, 'flux/query-plan': 'digraph {\r\n  ReadRange2\r\n  generated_yield\r\n\r\n  ReadRange2 -> generated_yield\r\n}\r\n\r\n', 'influxdb/scanned-bytes': 0, 'influxdb/scanned-values': 0}Custom processing of profiler result: {'result': '_profiler', 'table': 1, '_measurement': 'profiler/operator', 'Type': '*influxdb.readFilterSource', 'Label': 'ReadRange2', 'Count': 1, 'MinDuration': 3274084, 'MaxDuration': 3274084, 'DurationSum': 3274084, 'MeanDuration': 3274084.0}

How to use

Writes

TheWriteApi supports synchronous, asynchronous and batching writes into InfluxDB 2.0.The data should be passed as aInfluxDB Line Protocol,Data Point or Observable stream.

Warning

TheWriteApi in batching mode (default mode) is suppose to run as a singleton.To flush all your data you should wrap the execution usingwith client.write_api(...) as write_api: statementor callwrite_api.close() at the end of your script.

The default instance of WriteApi use batching.

The data could be written as

  1. string orbytes that is formatted as a InfluxDB's line protocol
  2. Data Point structure
  3. Dictionary style mapping with keys:measurement,tags,fields andtime or custom structure
  4. NamedTuple
  5. Data Classes
  6. Pandas DataFrame
  7. List of above items
  8. Abatching type of write also supports anObservable that produce one of an above item

You can find write examples at GitHub:influxdb-client-python/examples.

Batching

The batching is configurable bywrite_options:

PropertyDescriptionDefault Value
batch_sizethe number of data point to collect in a batch1000
flush_intervalthe number of milliseconds before the batch is written1000
jitter_intervalthe number of milliseconds to increase the batch flush interval by a random amount0
retry_intervalthe number of milliseconds to retry first unsuccessful write. The next retry delay is computed using exponential random backoff. The retry interval is used when the InfluxDB server does not specify "Retry-After" header.5000
max_retry_timemaximum total retry timeout in milliseconds.180_000
max_retriesthe number of max retries when write fails5
max_retry_delaythe maximum delay between each retry attempt in milliseconds125_000
exponential_basethe base for the exponential retry delay, the next delay is computed using random exponential backoff as a random value within the intervalretry_interval * exponential_base^(attempts-1) andretry_interval * exponential_base^(attempts). Example forretry_interval=5_000, exponential_base=2, max_retry_delay=125_000, total=5 Retry delays are random distributed values within the ranges of[5_000-10_000, 10_000-20_000, 20_000-40_000, 40_000-80_000, 80_000-125_000]2
fromdatetimeimportdatetime,timedeltaimportpandasaspdimportreactivexasrxfromreactiveximportoperatorsasopsfrominfluxdb_clientimportInfluxDBClient,Point,WriteOptionswithInfluxDBClient(url="http://localhost:8086",token="my-token",org="my-org")as_client:with_client.write_api(write_options=WriteOptions(batch_size=500,flush_interval=10_000,jitter_interval=2_000,retry_interval=5_000,max_retries=5,max_retry_delay=30_000,exponential_base=2))as_write_client:"""        Write Line Protocol formatted as string        """_write_client.write("my-bucket","my-org","h2o_feet,location=coyote_creek water_level=1.0 1")_write_client.write("my-bucket","my-org", ["h2o_feet,location=coyote_creek water_level=2.0 2","h2o_feet,location=coyote_creek water_level=3.0 3"])"""        Write Line Protocol formatted as byte array        """_write_client.write("my-bucket","my-org","h2o_feet,location=coyote_creek water_level=1.0 1".encode())_write_client.write("my-bucket","my-org", ["h2o_feet,location=coyote_creek water_level=2.0 2".encode(),"h2o_feet,location=coyote_creek water_level=3.0 3".encode()])"""        Write Dictionary-style object        """_write_client.write("my-bucket","my-org", {"measurement":"h2o_feet","tags": {"location":"coyote_creek"},"fields": {"water_level":1.0},"time":1})_write_client.write("my-bucket","my-org", [{"measurement":"h2o_feet","tags": {"location":"coyote_creek"},"fields": {"water_level":2.0},"time":2},                                                    {"measurement":"h2o_feet","tags": {"location":"coyote_creek"},"fields": {"water_level":3.0},"time":3}])"""        Write Data Point        """_write_client.write("my-bucket","my-org",Point("h2o_feet").tag("location","coyote_creek").field("water_level",4.0).time(4))_write_client.write("my-bucket","my-org",                            [Point("h2o_feet").tag("location","coyote_creek").field("water_level",5.0).time(5),Point("h2o_feet").tag("location","coyote_creek").field("water_level",6.0).time(6)])"""        Write Observable stream        """_data=rx \            .range(7,11) \            .pipe(ops.map(lambdai:"h2o_feet,location=coyote_creek water_level={0}.0 {0}".format(i)))_write_client.write("my-bucket","my-org",_data)"""        Write Pandas DataFrame        """_now=datetime.utcnow()_data_frame=pd.DataFrame(data=[["coyote_creek",1.0], ["coyote_creek",2.0]],index=[_now,_now+timedelta(hours=1)],columns=["location","water_level"])_write_client.write("my-bucket","my-org",record=_data_frame,data_frame_measurement_name='h2o_feet',data_frame_tag_columns=['location'])

Default Tags

Sometimes is useful to store same information in every measurement e.g.hostname,location,customer.The client is able to use static value or env property as a tag value.

The expressions:

  • California Miner - static value
  • ${env.hostname} - environment property
Via API
point_settings=PointSettings()point_settings.add_default_tag("id","132-987-655")point_settings.add_default_tag("customer","California Miner")point_settings.add_default_tag("data_center","${env.data_center}")self.write_client=self.client.write_api(write_options=SYNCHRONOUS,point_settings=point_settings)
self.write_client=self.client.write_api(write_options=SYNCHRONOUS,point_settings=PointSettings(**{"id":"132-987-655","customer":"California Miner"}))
Via Configuration file

In ainit configuration file you are able to specify default tags bytags segment.

self.client=InfluxDBClient.from_config_file("config.ini")
[influx2]url=http://localhost:8086org=my-orgtoken=my-tokentimeout=6000[tags]id = 132-987-655customer = California Minerdata_center = ${env.data_center}

You can also use aTOML or aJSON format for the configuration file.

Via Environment Properties

You are able to specify default tags by environment properties with prefixINFLUXDB_V2_TAG_.

Examples:

  • INFLUXDB_V2_TAG_ID
  • INFLUXDB_V2_TAG_HOSTNAME
self.client=InfluxDBClient.from_env_properties()

Synchronous client

Data are writes in a synchronous HTTP request.

frominfluxdb_clientimportInfluxDBClient,Pointfrominfluxdb_client .client.write_apiimportSYNCHRONOUSclient=InfluxDBClient(url="http://localhost:8086",token="my-token",org="my-org")write_api=client.write_api(write_options=SYNCHRONOUS)_point1=Point("my_measurement").tag("location","Prague").field("temperature",25.3)_point2=Point("my_measurement").tag("location","New York").field("temperature",24.3)write_api.write(bucket="my-bucket",record=[_point1,_point2])client.close()

Queries

The result retrieved byQueryApi could be formatted as a:

  1. Flux data structure:FluxTable,FluxColumn andFluxRecord
  2. :class:`~influxdb_client.client.flux_table.CSVIterator` which will iterate over CSV lines
  3. Raw unprocessed results as astr iterator
  4. Pandas DataFrame

The API also support streamingFluxRecord viaquery_stream, see example below:

frominfluxdb_clientimportInfluxDBClient,Point,Dialectfrominfluxdb_client.client.write_apiimportSYNCHRONOUSclient=InfluxDBClient(url="http://localhost:8086",token="my-token",org="my-org")write_api=client.write_api(write_options=SYNCHRONOUS)query_api=client.query_api()"""Prepare data"""_point1=Point("my_measurement").tag("location","Prague").field("temperature",25.3)_point2=Point("my_measurement").tag("location","New York").field("temperature",24.3)write_api.write(bucket="my-bucket",record=[_point1,_point2])"""Query: using Table structure"""tables=query_api.query('from(bucket:"my-bucket") |> range(start: -10m)')fortableintables:print(table)forrecordintable.records:print(record.values)print()print()"""Query: using Bind parameters"""p= {"_start":datetime.timedelta(hours=-1),"_location":"Prague","_desc":True,"_floatParam":25.1,"_every":datetime.timedelta(minutes=5)     }tables=query_api.query('''    from(bucket:"my-bucket") |> range(start: _start)        |> filter(fn: (r) => r["_measurement"] == "my_measurement")        |> filter(fn: (r) => r["_field"] == "temperature")        |> filter(fn: (r) => r["location"] == _location and r["_value"] > _floatParam)        |> aggregateWindow(every: _every, fn: mean, createEmpty: true)        |> sort(columns: ["_time"], desc: _desc)''',params=p)fortableintables:print(table)forrecordintable.records:print(str(record["_time"])+" - "+record["location"]+": "+str(record["_value"]))print()print()"""Query: using Stream"""records=query_api.query_stream('from(bucket:"my-bucket") |> range(start: -10m)')forrecordinrecords:print(f'Temperature in{record["location"]} is{record["_value"]}')"""Interrupt a stream after retrieve a required data"""large_stream=query_api.query_stream('from(bucket:"my-bucket") |> range(start: -100d)')forrecordinlarge_stream:ifrecord["location"]=="New York":print(f'New York temperature:{record["_value"]}')breaklarge_stream.close()print()print()"""Query: using csv library"""csv_result=query_api.query_csv('from(bucket:"my-bucket") |> range(start: -10m)',dialect=Dialect(header=False,delimiter=",",comment_prefix="#",annotations=[],date_time_format="RFC3339"))forcsv_lineincsv_result:ifnotlen(csv_line)==0:print(f'Temperature in{csv_line[9]} is{csv_line[6]}')"""Close client"""client.close()

Pandas DataFrame

Note

For DataFrame querying you should install Pandas dependency viapip install 'influxdb-client[extra]'.

Note

Note that if a query returns more then one table then the client generates aDataFrame for each of them.

Theclient is able to retrieve data inPandas DataFrame format thoughtquery_data_frame:

frominfluxdb_clientimportInfluxDBClient,Point,Dialectfrominfluxdb_client.client.write_apiimportSYNCHRONOUSclient=InfluxDBClient(url="http://localhost:8086",token="my-token",org="my-org")write_api=client.write_api(write_options=SYNCHRONOUS)query_api=client.query_api()"""Prepare data"""_point1=Point("my_measurement").tag("location","Prague").field("temperature",25.3)_point2=Point("my_measurement").tag("location","New York").field("temperature",24.3)write_api.write(bucket="my-bucket",record=[_point1,_point2])"""Query: using Pandas DataFrame"""data_frame=query_api.query_data_frame('from(bucket:"my-bucket") ''|> range(start: -10m) ''|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") ''|> keep(columns: ["location", "temperature"])')print(data_frame.to_string())"""Close client"""client.close()

Output:

    result table  location  temperature0  _result     0  New York         24.31  _result     1    Prague         25.3

Examples

How to efficiently import large dataset

The following example shows how to import dataset with dozen megabytes.If you would like to import gigabytes of data then use our multiprocessing example:import_data_set_multiprocessing.py for use a full capability of your hardware.

"""Import VIX - CBOE Volatility Index - from "vix-daily.csv" file into InfluxDB 2.0https://datahub.io/core/finance-vix#data"""fromcollectionsimportOrderedDictfromcsvimportDictReaderimportreactivexasrxfromreactiveximportoperatorsasopsfrominfluxdb_clientimportInfluxDBClient,Point,WriteOptionsdefparse_row(row:OrderedDict):"""Parse row of CSV file into Point with structure:        financial-analysis,type=ily close=18.47,high=19.82,low=18.28,open=19.82 1198195200000000000    CSV format:        Date,VIX Open,VIX High,VIX Low,VIX Close\n        2004-01-02,17.96,18.68,17.54,18.22\n        2004-01-05,18.45,18.49,17.44,17.49\n        2004-01-06,17.66,17.67,16.19,16.73\n        2004-01-07,16.72,16.75,15.5,15.5\n        2004-01-08,15.42,15.68,15.32,15.61\n        2004-01-09,16.15,16.88,15.57,16.75\n        ...    :param row: the row of CSV file    :return: Parsed csv row to [Point]    """"""     For better performance is sometimes useful directly create a LineProtocol to avoid unnecessary escaping overhead:     """# from datetime import timezone# import ciso8601# from influxdb_client.client.write.point import EPOCH## time = (ciso8601.parse_datetime(row["Date"]).replace(tzinfo=timezone.utc) - EPOCH).total_seconds() * 1e9# return f"financial-analysis,type=vix-daily" \#        f" close={float(row['VIX Close'])},high={float(row['VIX High'])},low={float(row['VIX Low'])},open={float(row['VIX Open'])} " \#        f" {int(time)}"returnPoint("financial-analysis") \        .tag("type","vix-daily") \        .field("open",float(row['VIX Open'])) \        .field("high",float(row['VIX High'])) \        .field("low",float(row['VIX Low'])) \        .field("close",float(row['VIX Close'])) \        .time(row['Date'])"""Converts vix-daily.csv into sequence of datad point"""data=rx \    .from_iterable(DictReader(open('vix-daily.csv','r'))) \    .pipe(ops.map(lambdarow:parse_row(row)))client=InfluxDBClient(url="http://localhost:8086",token="my-token",org="my-org",debug=True)"""Create client that writes data in batches with 50_000 items."""write_api=client.write_api(write_options=WriteOptions(batch_size=50_000,flush_interval=10_000))"""Write data into InfluxDB"""write_api.write(bucket="my-bucket",record=data)write_api.close()"""Querying max value of CBOE Volatility Index"""query='from(bucket:"my-bucket")' \' |> range(start: 0, stop: now())' \' |> filter(fn: (r) => r._measurement == "financial-analysis")' \' |> max()'result=client.query_api().query(query=query)"""Processing results"""print()print("=== results ===")print()fortableinresult:forrecordintable.records:print('max {0:5} = {1}'.format(record.get_field(),record.get_value()))"""Close client"""client.close()

Efficiency write data from IOT sensor

"""Efficiency write data from IOT sensor - write changed temperature every minute"""importatexitimportplatformfromdatetimeimporttimedeltaimportpsutilaspsutilimportreactivexasrxfromreactiveximportoperatorsasopsfrominfluxdb_clientimportInfluxDBClient,WriteApi,WriteOptionsdefon_exit(db_client:InfluxDBClient,write_api:WriteApi):"""Close clients after terminate a script.    :param db_client: InfluxDB client    :param write_api: WriteApi    :return: nothing    """write_api.close()db_client.close()defsensor_temperature():"""Read a CPU temperature. The [psutil] doesn't support MacOS so we use [sysctl].    :return: actual CPU temperature    """os_name=platform.system()ifos_name=='Darwin':fromsubprocessimportcheck_outputoutput=check_output(["sysctl","machdep.xcpm.cpu_thermal_level"])importrereturnre.findall(r'\d+',str(output))[0]else:returnpsutil.sensors_temperatures()["coretemp"][0]defline_protocol(temperature):"""Create a InfluxDB line protocol with structure:        iot_sensor,hostname=mine_sensor_12,type=temperature value=68    :param temperature: the sensor temperature    :return: Line protocol to write into InfluxDB    """importsocketreturn'iot_sensor,hostname={},type=temperature value={}'.format(socket.gethostname(),temperature)"""Read temperature every minute; distinct_until_changed - produce only if temperature change"""data=rx\    .interval(period=timedelta(seconds=60))\    .pipe(ops.map(lambdat:sensor_temperature()),ops.distinct_until_changed(),ops.map(lambdatemperature:line_protocol(temperature)))_db_client=InfluxDBClient(url="http://localhost:8086",token="my-token",org="my-org",debug=True)"""Create client that writes data into InfluxDB"""_write_api=_db_client.write_api(write_options=WriteOptions(batch_size=1))_write_api.write(bucket="my-bucket",record=data)"""Call after terminate a script"""atexit.register(on_exit,_db_client,_write_api)input()

Connect to InfluxDB Cloud

The following example demonstrate a simplest way how to write and query date with the InfluxDB Cloud.

At first point you should create an authentication token as is describedhere.

After that you should configure properties:influx_cloud_url,influx_cloud_token,bucket andorg in ainflux_cloud.py example.

The last step is run a python script via:python3 influx_cloud.py.

"""Connect to InfluxDB 2.0 - write data and query them"""fromdatetimeimportdatetimefrominfluxdb_clientimportPoint,InfluxDBClientfrominfluxdb_client.client.write_apiimportSYNCHRONOUS"""Configure credentials"""influx_cloud_url='https://us-west-2-1.aws.cloud2.influxdata.com'influx_cloud_token='...'bucket='...'org='...'client=InfluxDBClient(url=influx_cloud_url,token=influx_cloud_token)try:kind='temperature'host='host1'device='opt-123'"""    Write data by Point structure    """point=Point(kind).tag('host',host).tag('device',device).field('value',25.3).time(time=datetime.utcnow())print(f'Writing to InfluxDB cloud:{point.to_line_protocol()} ...')write_api=client.write_api(write_options=SYNCHRONOUS)write_api.write(bucket=bucket,org=org,record=point)print()print('success')print()print()"""    Query written data    """query=f'from(bucket: "{bucket}") |> range(start: -1d) |> filter(fn: (r) => r._measurement == "{kind}")'print(f'Querying from InfluxDB cloud: "{query}" ...')print()query_api=client.query_api()tables=query_api.query(query=query,org=org)fortableintables:forrowintable.records:print(f'{row.values["_time"]}: host={row.values["host"]},device={row.values["device"]} 'f'{row.values["_value"]} °C')print()print('success')exceptExceptionase:print(e)finally:client.close()

How to use Jupyter + Pandas + InfluxDB 2

The first example shows how to use client capabilities to predict stock price viaKeras,TensorFlow,sklearn:

The example is taken fromKaggle.

https://raw.githubusercontent.com/influxdata/influxdb-client-python/master/docs/images/stock-price-prediction.gif

Result:

https://raw.githubusercontent.com/influxdata/influxdb-client-python/master/docs/images/stock-price-prediction-results.png

The second example shows how to use client capabilities to realtime visualization viahvPlot,Streamz,RxPY:

https://raw.githubusercontent.com/influxdata/influxdb-client-python/master/docs/images/realtime-result.gif

Other examples

You can find all examples at GitHub:influxdb-client-python/examples.

Advanced Usage

Gzip support

InfluxDBClient does not enable gzip compression for http requests by default. If you want to enable gzip to reduce transfer data's size, you can call:

frominfluxdb_clientimportInfluxDBClient_db_client=InfluxDBClient(url="http://localhost:8086",token="my-token",org="my-org",enable_gzip=True)

Authenticate to the InfluxDB

InfluxDBClient supports three options how to authorize a connection:

  • Token
  • Username & Password
  • HTTP Basic

Token

Use thetoken to authenticate to the InfluxDB API. In your API requests, an Authorization header will be send.The header value, provide the word Token followed by a space and an InfluxDB API token. The word token` is case-sensitive.

frominfluxdb_clientimportInfluxDBClientwithInfluxDBClient(url="http://localhost:8086",token="my-token")asclient

Note

Note that this is a preferred way how to authenticate to InfluxDB API.

Username & Password

Authenticates via username and password credentials. If successful, creates a new session for the user.

frominfluxdb_clientimportInfluxDBClientwithInfluxDBClient(url="http://localhost:8086",username="my-user",password="my-password")asclient

Warning

Theusername/password auth is based on the HTTP "Basic" authentication.The authorization expires when thetime-to-live (TTL)(default 60 minutes) is reached and client producesunauthorized exception.

HTTP Basic

Use this to enable basic authentication when talking to a InfluxDB 1.8.x that does not use auth-enabledbut is protected by a reverse proxy with basic authentication.

frominfluxdb_clientimportInfluxDBClientwithInfluxDBClient(url="http://localhost:8086",auth_basic=True,token="my-proxy-secret")asclient

Warning

Don't use this when directly talking to InfluxDB 2.

Proxy configuration

You can configure the client to tunnel requests through an HTTP proxy.The following proxy options are supported:

  • proxy - Set this to configure the http proxy to be used, ex.http://localhost:3128
  • proxy_headers - A dictionary containing headers that will be sent to the proxy. Could be used for proxy authentication.
frominfluxdb_clientimportInfluxDBClientwithInfluxDBClient(url="http://localhost:8086",token="my-token",org="my-org",proxy="http://localhost:3128")asclient:

Note

If your proxy notify the client with permanent redirect (HTTP 301) todifferent host.The client removesAuthorization header, because otherwise the contents ofAuthorization is sent to third partieswhich is a security vulnerability.

You can change this behaviour by:

fromurllib3importRetryRetry.DEFAULT_REMOVE_HEADERS_ON_REDIRECT=frozenset()Retry.DEFAULT.remove_headers_on_redirect=Retry.DEFAULT_REMOVE_HEADERS_ON_REDIRECT

Delete data

Thedelete_api.py supports deletespoints from an InfluxDB bucket.

frominfluxdb_clientimportInfluxDBClientclient=InfluxDBClient(url="http://localhost:8086",token="my-token")delete_api=client.delete_api()"""Delete Data"""start="1970-01-01T00:00:00Z"stop="2021-02-01T00:00:00Z"delete_api.delete(start,stop,'_measurement="my_measurement"',bucket='my-bucket',org='my-org')"""Close client"""client.close()

InfluxDB 1.8 API compatibility

InfluxDB 1.8.0 introduced forward compatibility APIs for InfluxDB 2.0. This allow you to easily move from InfluxDB 1.x to InfluxDB 2.0 Cloud or open source.

The following forward compatible APIs are available:

APIEndpointDescription
query_api.py/api/v2/queryQuery data in InfluxDB 1.8.0+ using the InfluxDB 2.0 API andFlux (endpoint should be enabled byflux-enabled option)
write_api.py/api/v2/writeWrite data to InfluxDB 1.8.0+ using the InfluxDB 2.0 API
ping()/pingCheck the status of your InfluxDB instance

For detail info seeInfluxDB 1.8 example.

Handling Errors

Errors happen and it's important that your code is prepared for them. All client related exceptions are delivered fromInfluxDBError. If the exception cannot be recovered in the client it is returned to the application.These exceptions are left for the developer to handle.

Almost all APIs directly return unrecoverable exceptions to be handled this way:

frominfluxdb_clientimportInfluxDBClientfrominfluxdb_client.client.exceptionsimportInfluxDBErrorfrominfluxdb_client.client.write_apiimportSYNCHRONOUSwithInfluxDBClient(url="http://localhost:8086",token="my-token",org="my-org")asclient:try:client.write_api(write_options=SYNCHRONOUS).write("my-bucket",record="mem,tag=a value=86")exceptInfluxDBErrorase:ife.response.status==401:raiseException(f"Insufficient write permissions to 'my-bucket'.")fromeraise

The only exception isbatchingWriteAPI (for more info seeBatching). where you need to register custom callbacks to handle batch events.This is because this API runs in thebackground in aseparate thread and isn't possible to directlyreturn underlying exceptions.

frominfluxdb_clientimportInfluxDBClientfrominfluxdb_client.client.exceptionsimportInfluxDBErrorclassBatchingCallback(object):defsuccess(self,conf: (str,str,str),data:str):print(f"Written batch:{conf}, data:{data}")deferror(self,conf: (str,str,str),data:str,exception:InfluxDBError):print(f"Cannot write batch:{conf}, data:{data} due:{exception}")defretry(self,conf: (str,str,str),data:str,exception:InfluxDBError):print(f"Retryable error occurs for batch:{conf}, data:{data} retry:{exception}")withInfluxDBClient(url="http://localhost:8086",token="my-token",org="my-org")asclient:callback=BatchingCallback()withclient.write_api(success_callback=callback.success,error_callback=callback.error,retry_callback=callback.retry)aswrite_api:pass

HTTP Retry Strategy

By default the client uses a retry strategy only for batching writes (for more info seeBatching).For other HTTP requests there is no one retry strategy, but it could be configured byretriesparameter ofInfluxDBClient.

For more info about how configure HTTP retry see details inurllib3 documentation.

fromurllib3importRetryfrominfluxdb_clientimportInfluxDBClientretries=Retry(connect=5,read=2,redirect=5)client=InfluxDBClient(url="http://localhost:8086",token="my-token",org="my-org",retries=retries)

Nanosecond precision

The Python'sdatetime doesn't support precision with nanosecondsso the library during writes and queries ignores everything after microseconds.

If you would like to usedatetime with nanosecond precision you should usepandas.Timestampthat is replacement for pythondatetime.datetime object and also you should set a properDateTimeHelper to the client.

frominfluxdb_clientimportPoint,InfluxDBClientfrominfluxdb_client.client.util.date_utils_pandasimportPandasDateTimeHelperfrominfluxdb_client.client.write_apiimportSYNCHRONOUS"""Set PandasDate helper which supports nanoseconds."""importinfluxdb_client.client.util.date_utilsasdate_utilsdate_utils.date_helper=PandasDateTimeHelper()"""Prepare client."""client=InfluxDBClient(url="http://localhost:8086",token="my-token",org="my-org")write_api=client.write_api(write_options=SYNCHRONOUS)query_api=client.query_api()"""Prepare data"""point=Point("h2o_feet") \    .field("water_level",10) \    .tag("location","pacific") \    .time('1996-02-25T21:20:00.001001231Z')print(f'Time serialized with nanosecond precision:{point.to_line_protocol()}')print()write_api.write(bucket="my-bucket",record=point)"""Query: using Stream"""query='''from(bucket:"my-bucket")        |> range(start: 0, stop: now())        |> filter(fn: (r) => r._measurement == "h2o_feet")'''records=query_api.query_stream(query)forrecordinrecords:print(f'Temperature in{record["location"]} is{record["_value"]} at time:{record["_time"]}')"""Close client"""client.close()

How to use Asyncio

Starting from version 1.27.0 for Python 3.7+ theinfluxdb-client package supportsasync/await based onasyncio,aiohttp andaiocsv.You can installaiohttp andaiocsv directly:

$ python -m pip install influxdb-client aiohttp aiocsv

or use the[async] extra:

$ python -m pip install influxdb-client[async]

Warning

TheInfluxDBClientAsync should be initialised insideasync coroutineotherwise there can be unexpected behaviour.For more info see:Why is creating a ClientSession outside of an event loop dangerous?.

Async APIs

All async APIs are available via:class:`~influxdb_client.client.influxdb_client_async.InfluxDBClientAsync`.Theasync version of the client supports following asynchronous APIs:

and also check to readiness of the InfluxDB via/ping endpoint:

importasynciofrominfluxdb_client.client.influxdb_client_asyncimportInfluxDBClientAsyncasyncdefmain():asyncwithInfluxDBClientAsync(url="http://localhost:8086",token="my-token",org="my-org")asclient:ready=awaitclient.ping()print(f"InfluxDB:{ready}")if__name__=="__main__":asyncio.run(main())

Async Write API

The:class:`~influxdb_client.client.write_api_async.WriteApiAsync` supports ingesting data as:

importasynciofrominfluxdb_clientimportPointfrominfluxdb_client.client.influxdb_client_asyncimportInfluxDBClientAsyncasyncdefmain():asyncwithInfluxDBClientAsync(url="http://localhost:8086",token="my-token",org="my-org")asclient:write_api=client.write_api()_point1=Point("async_m").tag("location","Prague").field("temperature",25.3)_point2=Point("async_m").tag("location","New York").field("temperature",24.3)successfully=awaitwrite_api.write(bucket="my-bucket",record=[_point1,_point2])print(f" > successfully:{successfully}")if__name__=="__main__":asyncio.run(main())

Async Query API

The:class:`~influxdb_client.client.query_api_async.QueryApiAsync` supports retrieve data as:

importasynciofrominfluxdb_client.client.influxdb_client_asyncimportInfluxDBClientAsyncasyncdefmain():asyncwithInfluxDBClientAsync(url="http://localhost:8086",token="my-token",org="my-org")asclient:# Stream of FluxRecordsquery_api=client.query_api()records=awaitquery_api.query_stream('from(bucket:"my-bucket") ''|> range(start: -10m) ''|> filter(fn: (r) => r["_measurement"] == "async_m")')asyncforrecordinrecords:print(record)if__name__=="__main__":asyncio.run(main())

Async Delete API

importasynciofromdatetimeimportdatetimefrominfluxdb_client.client.influxdb_client_asyncimportInfluxDBClientAsyncasyncdefmain():asyncwithInfluxDBClientAsync(url="http://localhost:8086",token="my-token",org="my-org")asclient:start=datetime.utcfromtimestamp(0)stop=datetime.now()# Delete data with location = 'Prague'successfully=awaitclient.delete_api().delete(start=start,stop=stop,bucket="my-bucket",predicate="location =\"Prague\"")print(f" > successfully:{successfully}")if__name__=="__main__":asyncio.run(main())

Management API

importasynciofrominfluxdb_clientimportOrganizationsServicefrominfluxdb_client.client.influxdb_client_asyncimportInfluxDBClientAsyncasyncdefmain():asyncwithInfluxDBClientAsync(url='http://localhost:8086',token='my-token',org='my-org')asclient:# Initialize async OrganizationsServiceorganizations_service=OrganizationsService(api_client=client.api_client)# Find organization with name 'my-org'organizations=awaitorganizations_service.get_orgs(org='my-org')fororganizationinorganizations.orgs:print(f'name:{organization.name}, id:{organization.id}')if__name__=="__main__":asyncio.run(main())

Proxy and redirects

You can configure the client to tunnel requests through an HTTP proxy.The following proxy options are supported:

  • proxy - Set this to configure the http proxy to be used, ex.http://localhost:3128
  • proxy_headers - A dictionary containing headers that will be sent to the proxy. Could be used for proxy authentication.
frominfluxdb_client.client.influxdb_client_asyncimportInfluxDBClientAsyncasyncwithInfluxDBClientAsync(url="http://localhost:8086",token="my-token",org="my-org",proxy="http://localhost:3128")asclient:

Note

If your proxy notify the client with permanent redirect (HTTP 301) todifferent host.The client removesAuthorization header, because otherwise the contents ofAuthorization is sent to third partieswhich is a security vulnerability.

Client automatically follows HTTP redirects. The default redirect policy is to follow up to10 consecutive requests. The redirects can be configured via:

  • allow_redirects - If set toFalse, do not follow HTTP redirects.True by default.
  • max_redirects - Maximum number of HTTP redirects to follow.10 by default.

SQL Client Support

The ability to query InfluxDB with SQL was introduced with the IOX backend.To make use of the SQL support users can create a SQL Client with this library:

frominfluxdb_clientimportInfluxDBClientwithInfluxDBClient(url="http://localhost:8086",token="my-token",org="my-org",debug=False)asclient:sql_client=client.sql_client("my-bucket")reader=sql_client.query("select * from cpu limit 10")print(reader.read_all())

Warning

TheSQLClient only works with InfluxDB that has SQL support enabled.This does not apply to all InfluxDB versions.

Logging

The client uses Python'slogging facility for logging the library activity. The following logger categories are exposed:

  • influxdb_client.client.influxdb_client
  • influxdb_client.client.influxdb_client_async
  • influxdb_client.client.write_api
  • influxdb_client.client.write_api_async
  • influxdb_client.client.write.retry
  • influxdb_client.client.write.dataframe_serializer
  • influxdb_client.client.util.multiprocessing_helper
  • influxdb_client.client.http
  • influxdb_client.client.exceptions

The default logging level is warning without configured logger output. You can use the standard logger interface to change the log level and handler:

importloggingimportsysfrominfluxdb_clientimportInfluxDBClientwithInfluxDBClient(url="http://localhost:8086",token="my-token",org="my-org")asclient:for_,loggerinclient.conf.loggers.items():logger.setLevel(logging.DEBUG)logger.addHandler(logging.StreamHandler(sys.stdout))

Debugging

For debug purpose you can enable verbose logging of HTTP requests and set thedebug level to all client's logger categories by:

client=InfluxDBClient(url="http://localhost:8086",token="my-token",debug=True)

Note

Both HTTP request headers and body will be logged to standard output.

Local tests

#start/restart InfluxDB2 onlocal machine using docker./scripts/influxdb-restart.sh#install requirementspip install -e . --userpip install -e .\[extra\] --userpip install -e .\[test\] --user#run unit& integration testspytest tests

Contributing

Bug reports and pull requests are welcome on GitHub athttps://github.com/influxdata/influxdb-client-python.

License

The gem is available as open source under the terms of theMIT License.


[8]ページ先頭

©2009-2025 Movatter.jp