|
3 | 3 | importcom.eventstore.dbclient.proto.shared.Shared;
|
4 | 4 | importcom.eventstore.dbclient.proto.streams.StreamsGrpc;
|
5 | 5 | importcom.eventstore.dbclient.proto.streams.StreamsOuterClass;
|
| 6 | +importio.grpc.ManagedChannel; |
6 | 7 |
|
7 | 8 | importjava.util.concurrent.CompletableFuture;
|
8 | 9 |
|
@@ -32,38 +33,42 @@ protected AbstractRegularSubscription(GrpcClient client, OptionsWithBackPressure
|
32 | 33 | protectedabstractStreamsOuterClass.ReadReq.Options.BuildercreateOptions();
|
33 | 34 |
|
34 | 35 | publicCompletableFuture<Subscription>execute() {
|
35 |
| -CompletableFuture<Subscription>future =newCompletableFuture<>(); |
36 |
| - |
37 |
| -this.client.getWorkItemArgs().whenComplete((args,error) -> { |
38 |
| -if (error !=null) { |
39 |
| -future.completeExceptionally(error); |
40 |
| -return; |
41 |
| - } |
42 |
| - |
43 |
| -ReadResponseObserverobserver =createObserver(args,future); |
44 |
| -observer.onConnected(args); |
| 36 | +returnthis.client.run(channel -> { |
| 37 | +CompletableFuture<Subscription>future =newCompletableFuture<>(); |
45 | 38 |
|
46 | 39 | StreamsOuterClass.ReadReqreadReq =StreamsOuterClass.ReadReq.newBuilder()
|
47 | 40 | .setOptions(createOptions())
|
48 | 41 | .build();
|
49 | 42 |
|
50 |
| -StreamsGrpc.StreamsStubstreamsClient =GrpcUtils.configureStub(StreamsGrpc.newStub(args.getChannel()),this.client.getSettings(),this.options); |
| 43 | +StreamsGrpc.StreamsStubstreamsClient =GrpcUtils.configureStub( |
| 44 | +StreamsGrpc.newStub(channel), |
| 45 | +this.client.getSettings(), |
| 46 | +this.options |
| 47 | + ); |
| 48 | + |
| 49 | +ReadResponseObserverobserver =createObserver(channel,future); |
51 | 50 | streamsClient.read(readReq,observer);
|
52 |
| - }); |
53 | 51 |
|
54 |
| -returnfuture; |
| 52 | +returnfuture; |
| 53 | + }); |
55 | 54 | }
|
56 | 55 |
|
57 |
| -privateReadResponseObservercreateObserver(WorkItemArgsargs,CompletableFuture<Subscription>future) { |
58 |
| -StreamConsumerconsumer =newSubscriptionStreamConsumer(this.listener,this.checkpointer,future, (subscriptionId,event,action) -> { |
59 |
| -ClientTelemetry.traceSubscribe( |
60 |
| -action, |
61 |
| -subscriptionId, |
62 |
| -args.getChannel(), |
63 |
| -client.getSettings(), |
64 |
| -options.getCredentials(), |
65 |
| -event); |
66 |
| - }); |
| 56 | +privateReadResponseObservercreateObserver(ManagedChannelchannel,CompletableFuture<Subscription>future) { |
| 57 | +StreamConsumerconsumer =newSubscriptionStreamConsumer( |
| 58 | +this.listener, |
| 59 | +this.checkpointer, |
| 60 | +future, |
| 61 | + (subscriptionId,event,action) -> { |
| 62 | +ClientTelemetry.traceSubscribe( |
| 63 | +action, |
| 64 | +subscriptionId, |
| 65 | +channel, |
| 66 | +client.getSettings(), |
| 67 | +options.getCredentials(), |
| 68 | +event |
| 69 | + ); |
| 70 | + } |
| 71 | + ); |
67 | 72 |
|
68 | 73 | returnnewReadResponseObserver(this.options,consumer);
|
69 | 74 | }
|
|