|
| 1 | +importitertools |
1 | 2 | fromcontextlibimportcontextmanager |
2 | 3 | fromcollectionsimportOrderedDict |
3 | 4 | importdatetime |
@@ -47,6 +48,7 @@ def __init__(self, method_name): |
47 | 48 | # If running in local mode, just use environment variables for params. |
48 | 49 | self.arguments=os.environifget_args_from_envelse {} |
49 | 50 | self.arraysize=1000 |
| 51 | +self.buffer_size_bytes=10485760 |
50 | 52 |
|
51 | 53 | defconnection_params(self,arguments): |
52 | 54 | params= { |
@@ -79,7 +81,7 @@ def connection(self, extra_params=()): |
79 | 81 | @contextmanager |
80 | 82 | defcursor(self,extra_params=()): |
81 | 83 | withself.connection(extra_params)asconn: |
82 | | -cursor=conn.cursor(arraysize=self.arraysize) |
| 84 | +cursor=conn.cursor(arraysize=self.arraysize,buffer_size_bytes=self.buffer_size_bytes) |
83 | 85 | try: |
84 | 86 | yieldcursor |
85 | 87 | finally: |
@@ -610,7 +612,27 @@ def test_close_connection_closes_cursors(self): |
610 | 612 | ifhasattr(cm,"exception"): |
611 | 613 | assert"RESOURCE_DOES_NOT_EXIST"incm.exception.message |
612 | 614 |
|
613 | | - |
| 615 | +@skipUnless(pysql_supports_arrow(),'needs arrow support') |
| 616 | +deftest_cloud_fetch(self): |
| 617 | +# This test can take several minutes to run |
| 618 | +limits= [100000,600000] |
| 619 | +threads= [10,25] |
| 620 | +self.buffer_size_bytes=104857600 |
| 621 | +self.arraysize=100000 |
| 622 | +base_query="SELECT * FROM store_sales " |
| 623 | +fornum_limit,num_threads,lz4_compressioninitertools.product(limits,threads, [True,False]): |
| 624 | +withself.subTest(num_limit=num_limit,num_threads=num_threads,lz4_compression=lz4_compression): |
| 625 | +cf_result,noop_result=None,None |
| 626 | +query=base_query+"LIMIT "+str(num_limit) |
| 627 | +withself.cursor({"use_cloud_fetch":True,"max_download_threads":num_threads})ascursor: |
| 628 | +cursor.execute(query) |
| 629 | +cf_result=cursor.fetchall() |
| 630 | +withself.cursor({})ascursor: |
| 631 | +cursor.execute(query) |
| 632 | +noop_result=cursor.fetchall() |
| 633 | +assertlen(cf_result)==len(noop_result) |
| 634 | +foriinrange(len(cf_result)): |
| 635 | +assertcf_result[i]==noop_result[i] |
614 | 636 |
|
615 | 637 |
|
616 | 638 | # use a RetrySuite to encapsulate these tests which we'll typically want to run together; however keep |
|