Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit8e5f873

Browse files
authored
fix: subscription error handling during server shutdown (#311)
1 parent59971ab commit8e5f873

File tree

2 files changed

+33
-23
lines changed

2 files changed

+33
-23
lines changed

‎db-client-java/src/main/java/com/eventstore/dbclient/AbstractRegularSubscription.java

Lines changed: 28 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
importcom.eventstore.dbclient.proto.shared.Shared;
44
importcom.eventstore.dbclient.proto.streams.StreamsGrpc;
55
importcom.eventstore.dbclient.proto.streams.StreamsOuterClass;
6+
importio.grpc.ManagedChannel;
67

78
importjava.util.concurrent.CompletableFuture;
89

@@ -32,38 +33,42 @@ protected AbstractRegularSubscription(GrpcClient client, OptionsWithBackPressure
3233
protectedabstractStreamsOuterClass.ReadReq.Options.BuildercreateOptions();
3334

3435
publicCompletableFuture<Subscription>execute() {
35-
CompletableFuture<Subscription>future =newCompletableFuture<>();
36-
37-
this.client.getWorkItemArgs().whenComplete((args,error) -> {
38-
if (error !=null) {
39-
future.completeExceptionally(error);
40-
return;
41-
}
42-
43-
ReadResponseObserverobserver =createObserver(args,future);
44-
observer.onConnected(args);
36+
returnthis.client.run(channel -> {
37+
CompletableFuture<Subscription>future =newCompletableFuture<>();
4538

4639
StreamsOuterClass.ReadReqreadReq =StreamsOuterClass.ReadReq.newBuilder()
4740
.setOptions(createOptions())
4841
.build();
4942

50-
StreamsGrpc.StreamsStubstreamsClient =GrpcUtils.configureStub(StreamsGrpc.newStub(args.getChannel()),this.client.getSettings(),this.options);
43+
StreamsGrpc.StreamsStubstreamsClient =GrpcUtils.configureStub(
44+
StreamsGrpc.newStub(channel),
45+
this.client.getSettings(),
46+
this.options
47+
);
48+
49+
ReadResponseObserverobserver =createObserver(channel,future);
5150
streamsClient.read(readReq,observer);
52-
});
5351

54-
returnfuture;
52+
returnfuture;
53+
});
5554
}
5655

57-
privateReadResponseObservercreateObserver(WorkItemArgsargs,CompletableFuture<Subscription>future) {
58-
StreamConsumerconsumer =newSubscriptionStreamConsumer(this.listener,this.checkpointer,future, (subscriptionId,event,action) -> {
59-
ClientTelemetry.traceSubscribe(
60-
action,
61-
subscriptionId,
62-
args.getChannel(),
63-
client.getSettings(),
64-
options.getCredentials(),
65-
event);
66-
});
56+
privateReadResponseObservercreateObserver(ManagedChannelchannel,CompletableFuture<Subscription>future) {
57+
StreamConsumerconsumer =newSubscriptionStreamConsumer(
58+
this.listener,
59+
this.checkpointer,
60+
future,
61+
(subscriptionId,event,action) -> {
62+
ClientTelemetry.traceSubscribe(
63+
action,
64+
subscriptionId,
65+
channel,
66+
client.getSettings(),
67+
options.getCredentials(),
68+
event
69+
);
70+
}
71+
);
6772

6873
returnnewReadResponseObserver(this.options,consumer);
6974
}

‎db-client-java/src/main/java/com/eventstore/dbclient/SubscriptionStreamConsumer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,11 @@ public void onFellBehind() {
6767

6868
@Override
6969
publicvoidonCancelled(Throwableexception) {
70+
// error occurred before subscription was confirmed
71+
if (this.subscription ==null) {
72+
this.future.completeExceptionally(exception);
73+
}
74+
7075
this.listener.onCancelled(this.subscription,exception);
7176
}
7277

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp