|
| 1 | +importitertools |
1 | 2 | fromcontextlibimportcontextmanager |
2 | 3 | fromcollectionsimportOrderedDict |
3 | 4 | importdatetime |
@@ -52,6 +53,7 @@ def __init__(self, method_name): |
52 | 53 | # If running in local mode, just use environment variables for params. |
53 | 54 | self.arguments=os.environifget_args_from_envelse {} |
54 | 55 | self.arraysize=1000 |
| 56 | +self.buffer_size_bytes=10485760 |
55 | 57 |
|
56 | 58 | defconnection_params(self,arguments): |
57 | 59 | params= { |
@@ -84,7 +86,7 @@ def connection(self, extra_params=()): |
84 | 86 | @contextmanager |
85 | 87 | defcursor(self,extra_params=()): |
86 | 88 | withself.connection(extra_params)asconn: |
87 | | -cursor=conn.cursor(arraysize=self.arraysize) |
| 89 | +cursor=conn.cursor(arraysize=self.arraysize,buffer_size_bytes=self.buffer_size_bytes) |
88 | 90 | try: |
89 | 91 | yieldcursor |
90 | 92 | finally: |
@@ -633,6 +635,27 @@ def test_closing_a_closed_connection_doesnt_fail(self): |
633 | 635 |
|
634 | 636 | self.assertTrue(expected_message_was_found,"Did not find expected log messages") |
635 | 637 |
|
| 638 | +@skipUnless(pysql_supports_arrow(),'needs arrow support') |
| 639 | +deftest_cloud_fetch(self): |
| 640 | +# This test can take several minutes to run |
| 641 | +limits= [100000,600000] |
| 642 | +threads= [10,25] |
| 643 | +self.buffer_size_bytes=104857600 |
| 644 | +self.arraysize=100000 |
| 645 | +base_query="SELECT * FROM store_sales " |
| 646 | +fornum_limit,num_threads,lz4_compressioninitertools.product(limits,threads, [True,False]): |
| 647 | +withself.subTest(num_limit=num_limit,num_threads=num_threads,lz4_compression=lz4_compression): |
| 648 | +cf_result,noop_result=None,None |
| 649 | +query=base_query+"LIMIT "+str(num_limit) |
| 650 | +withself.cursor({"use_cloud_fetch":True,"max_download_threads":num_threads})ascursor: |
| 651 | +cursor.execute(query) |
| 652 | +cf_result=cursor.fetchall() |
| 653 | +withself.cursor({})ascursor: |
| 654 | +cursor.execute(query) |
| 655 | +noop_result=cursor.fetchall() |
| 656 | +assertlen(cf_result)==len(noop_result) |
| 657 | +foriinrange(len(cf_result)): |
| 658 | +assertcf_result[i]==noop_result[i] |
636 | 659 |
|
637 | 660 | # use a RetrySuite to encapsulate these tests which we'll typically want to run together; however keep |
638 | 661 | # the 429/503 subsuites separate since they execute under different circumstances. |
|