Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings
This repository was archived by the owner on Oct 29, 2024. It is now read-only.

Commitc903d73

Browse files
authored
Fix chunked query to return chunk resultsets (#753)
When querying large data sets, it's vital to get a chunked responses tomanage memory usage. Wrapping the query response in a generator andstreaming the request provides the desired result.It also fixes `InfluxDBClient.query()` behavior for chunked queries thatis currently not working according to[specs](https://github.com/influxdata/influxdb-python/blob/master/influxdb/client.py#L429)Closes#585.Closes#531.Closes#538.
1 parentd6192a7 commitc903d73

File tree

2 files changed

+25
-29
lines changed

2 files changed

+25
-29
lines changed

‎influxdb/client.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ def switch_user(self, username, password):
249249
self._username=username
250250
self._password=password
251251

252-
defrequest(self,url,method='GET',params=None,data=None,
252+
defrequest(self,url,method='GET',params=None,data=None,stream=False,
253253
expected_response_code=200,headers=None):
254254
"""Make a HTTP request to the InfluxDB API.
255255
@@ -261,6 +261,8 @@ def request(self, url, method='GET', params=None, data=None,
261261
:type params: dict
262262
:param data: the data of the request, defaults to None
263263
:type data: str
264+
:param stream: True if a query uses chunked responses
265+
:type stream: bool
264266
:param expected_response_code: the expected response code of
265267
the request, defaults to 200
266268
:type expected_response_code: int
@@ -312,6 +314,7 @@ def request(self, url, method='GET', params=None, data=None,
312314
auth=(self._username,self._password),
313315
params=params,
314316
data=data,
317+
stream=stream,
315318
headers=headers,
316319
proxies=self._proxies,
317320
verify=self._verify_ssl,
@@ -398,17 +401,17 @@ def write(self, data, params=None, expected_response_code=204,
398401

399402
@staticmethod
400403
def_read_chunked_response(response,raise_errors=True):
401-
result_set= {}
402404
forlineinresponse.iter_lines():
403405
ifisinstance(line,bytes):
404406
line=line.decode('utf-8')
405407
data=json.loads(line)
408+
result_set= {}
406409
forresultindata.get('results', []):
407410
for_keyinresult:
408411
ifisinstance(result[_key],list):
409412
result_set.setdefault(
410413
_key, []).extend(result[_key])
411-
returnResultSet(result_set,raise_errors=raise_errors)
414+
yieldResultSet(result_set,raise_errors=raise_errors)
412415

413416
defquery(self,
414417
query,
@@ -499,6 +502,7 @@ def query(self,
499502
method=method,
500503
params=params,
501504
data=None,
505+
stream=chunked,
502506
expected_response_code=expected_response_code
503507
)
504508

‎influxdb/tests/client_test.py

Lines changed: 18 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1400,40 +1400,32 @@ def test_invalid_port_fails(self):
14001400
deftest_chunked_response(self):
14011401
"""Test chunked reponse for TestInfluxDBClient object."""
14021402
example_response= \
1403-
u'{"results":[{"statement_id":0,"series":' \
1404-
'[{"name":"cpu","columns":["fieldKey","fieldType"],"values":' \
1405-
'[["value","integer"]]}],"partial":true}]}\n{"results":' \
1406-
'[{"statement_id":0,"series":[{"name":"iops","columns":' \
1407-
'["fieldKey","fieldType"],"values":[["value","integer"]]}],' \
1408-
'"partial":true}]}\n{"results":[{"statement_id":0,"series":' \
1409-
'[{"name":"load","columns":["fieldKey","fieldType"],"values":' \
1410-
'[["value","integer"]]}],"partial":true}]}\n{"results":' \
1411-
'[{"statement_id":0,"series":[{"name":"memory","columns":' \
1412-
'["fieldKey","fieldType"],"values":[["value","integer"]]}]}]}\n'
1403+
u'{"results":[{"statement_id":0,"series":[{"columns":["key"],' \
1404+
'"values":[["cpu"],["memory"],["iops"],["network"]],"partial":' \
1405+
'true}],"partial":true}]}\n{"results":[{"statement_id":0,' \
1406+
'"series":[{"columns":["key"],"values":[["qps"],["uptime"],' \
1407+
'["df"],["mount"]]}]}]}\n'
14131408

14141409
withrequests_mock.Mocker()asm:
14151410
m.register_uri(
14161411
requests_mock.GET,
14171412
"http://localhost:8086/query",
14181413
text=example_response
14191414
)
1420-
response=self.cli.query('show series limit 4 offset 0',
1415+
response=self.cli.query('show series',
14211416
chunked=True,chunk_size=4)
1422-
self.assertTrue(len(response)==4)
1423-
self.assertEqual(response.__repr__(),ResultSet(
1424-
{'series': [{'values': [['value','integer']],
1425-
'name':'cpu',
1426-
'columns': ['fieldKey','fieldType']},
1427-
{'values': [['value','integer']],
1428-
'name':'iops',
1429-
'columns': ['fieldKey','fieldType']},
1430-
{'values': [['value','integer']],
1431-
'name':'load',
1432-
'columns': ['fieldKey','fieldType']},
1433-
{'values': [['value','integer']],
1434-
'name':'memory',
1435-
'columns': ['fieldKey','fieldType']}]}
1436-
).__repr__())
1417+
res=list(response)
1418+
self.assertTrue(len(res)==2)
1419+
self.assertEqual(res[0].__repr__(),ResultSet(
1420+
{'series': [{
1421+
'columns': ['key'],
1422+
'values': [['cpu'], ['memory'], ['iops'], ['network']]
1423+
}]}).__repr__())
1424+
self.assertEqual(res[1].__repr__(),ResultSet(
1425+
{'series': [{
1426+
'columns': ['key'],
1427+
'values': [['qps'], ['uptime'], ['df'], ['mount']]
1428+
}]}).__repr__())
14371429

14381430

14391431
classFakeClient(InfluxDBClient):

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp