Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit8833c97

Browse files
authored
fix: Close bq read client (#3644)
* Shutdown bqReadClient after high throughput read* Code formatted to google-java-format
1 parentfadd992 commit8833c97

File tree

1 file changed

+13
-3
lines changed

1 file changed

+13
-3
lines changed

‎google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java‎

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ class ConnectionImpl implements Connection {
9797
privatefinalLoggerlogger =Logger.getLogger(this.getClass().getName());
9898
privateBigQueryReadClientbqReadClient;
9999
privatestaticfinallongEXECUTOR_TIMEOUT_SEC =10;
100+
privatestaticfinallongBIGQUERY_TIMEOUT_SEC =10;
100101
privateBlockingQueue<AbstractList<FieldValue>>
101102
bufferFvl;// initialized lazily iff we end up using the tabledata.list end point
102103
privateBlockingQueue<BigQueryResultImpl.Row>
@@ -148,8 +149,15 @@ public synchronized boolean close() throws BigQuerySQLException {
148149
flagEndOfStream();// an End of Stream flag in the buffer so that the `ResultSet.next()` stops
149150
// advancing the cursor
150151
queryTaskExecutor.shutdownNow();
152+
booleanisBqReadClientTerminated =true;
151153
try {
152-
if (queryTaskExecutor.awaitTermination(EXECUTOR_TIMEOUT_SEC,TimeUnit.SECONDS)) {
154+
if (bqReadClient !=null) {
155+
bqReadClient.shutdownNow();
156+
isBqReadClientTerminated =
157+
bqReadClient.awaitTermination(BIGQUERY_TIMEOUT_SEC,TimeUnit.SECONDS);
158+
}
159+
if (queryTaskExecutor.awaitTermination(EXECUTOR_TIMEOUT_SEC,TimeUnit.SECONDS)
160+
&&isBqReadClientTerminated) {
153161
returntrue;
154162
}// else queryTaskExecutor.isShutdown() will be returned outside this try block
155163
}catch (InterruptedExceptione) {
@@ -159,7 +167,9 @@ public synchronized boolean close() throws BigQuerySQLException {
159167
e);// Logging InterruptedException instead of throwing the exception back, close method
160168
// will return queryTaskExecutor.isShutdown()
161169
}
162-
returnqueryTaskExecutor.isShutdown();// check if the executor has been shutdown
170+
171+
returnqueryTaskExecutor.isShutdown()
172+
&&isBqReadClientTerminated;// check if the executor has been shutdown
163173
}
164174

165175
/**
@@ -992,7 +1002,6 @@ BigQueryResult highThroughPutRead(
9921002
// DO a regex check using order by and use multiple streams
9931003
;
9941004
ReadSessionreadSession =bqReadClient.createReadSession(builder.build());
995-
9961005
bufferRow =newLinkedBlockingDeque<>(getBufferSize());
9971006
Map<String,Integer>arrowNameToIndex =newHashMap<>();
9981007
// deserialize and populate the buffer async, so that the client isn't blocked
@@ -1050,6 +1059,7 @@ private void processArrowStreamAsync(
10501059
"\n" +Thread.currentThread().getName() +" Interrupted @ markLast",
10511060
e);
10521061
}
1062+
bqReadClient.shutdownNow();// Shutdown the read client
10531063
queryTaskExecutor.shutdownNow();// Shutdown the thread pool
10541064
}
10551065
};

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp