Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit4ce19d4

Browse files
authored
feat: supports new caught up and fell behind events. (#325)
1 parent26c72a2 commit4ce19d4

File tree

7 files changed

+74
-18
lines changed

7 files changed

+74
-18
lines changed

‎src/main/java/io/kurrent/dbclient/ReadResponseObserver.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
importorg.slf4j.LoggerFactory;
1313

1414
importjava.nio.charset.Charset;
15+
importjava.time.Instant;
1516
importjava.util.concurrent.atomic.AtomicBoolean;
1617
importjava.util.concurrent.atomic.AtomicInteger;
1718

@@ -118,11 +119,21 @@ else if (value.hasLastStreamPosition())
118119
elseif (value.hasLastAllStreamPosition()) {
119120
Shared.AllStreamPositionposition =value.getLastAllStreamPosition();
120121
consumer.onLastAllStreamPosition(position.getCommitPosition(),position.getPreparePosition());
121-
}elseif (value.hasCaughtUp())
122-
consumer.onCaughtUp();
123-
elseif (value.hasFellBehind())
124-
consumer.onFellBehind();
125-
else {
122+
}elseif (value.hasCaughtUp()) {
123+
StreamsOuterClass.ReadResp.CaughtUpcaughtUp =value.getCaughtUp();
124+
Instanttimestamp =Instant.ofEpochSecond(caughtUp.getTimestamp().getSeconds(),caughtUp.getTimestamp().getNanos());
125+
LongstreamRevision =caughtUp.hasStreamRevision() ?caughtUp.getStreamRevision() :null;
126+
Positionposition =caughtUp.hasPosition() ?newPosition(caughtUp.getPosition().getCommitPosition(),caughtUp.getPosition().getPreparePosition()) :null;
127+
128+
consumer.onCaughtUp(timestamp,streamRevision,position);
129+
}elseif (value.hasFellBehind()) {
130+
StreamsOuterClass.ReadResp.FellBehindfellBehind =value.getFellBehind();
131+
Instanttimestamp =Instant.ofEpochSecond(fellBehind.getTimestamp().getSeconds(),fellBehind.getTimestamp().getNanos());
132+
LongstreamRevision =fellBehind.hasStreamRevision() ?fellBehind.getStreamRevision() :null;
133+
Positionposition =fellBehind.hasPosition() ?newPosition(fellBehind.getPosition().getCommitPosition(),fellBehind.getPosition().getPreparePosition()) :null;
134+
135+
consumer.onFellBehind(timestamp,streamRevision,position);
136+
}else {
126137
logger.warn("received unknown message variant");
127138
}
128139

‎src/main/java/io/kurrent/dbclient/ReadStreamConsumer.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
importorg.reactivestreams.Subscriber;
44

5+
importjava.time.Instant;
6+
57
classReadStreamConsumerimplementsStreamConsumer {
68
privatefinalSubscriber<?superReadMessage>subscriber;
79

@@ -41,10 +43,10 @@ public void onLastAllStreamPosition(long commit, long prepare) {
4143
}
4244

4345
@Override
44-
publicvoidonCaughtUp() {}
46+
publicvoidonCaughtUp(Instanttimestamp,LongstreamRevision,Positionposition) {}
4547

4648
@Override
47-
publicvoidonFellBehind() {}
49+
publicvoidonFellBehind(Instanttimestamp,LongstreamRevision,Positionposition) {}
4850

4951
@Override
5052
publicvoidonCancelled(Throwableexception) {
Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
packageio.kurrent.dbclient;
22

3+
importjava.time.Instant;
4+
35
publicinterfaceStreamConsumer {
46
defaultvoidonSubscribe(org.reactivestreams.Subscriptionsubscription) {}
57
voidonEvent(ResolvedEventevent);
@@ -9,8 +11,8 @@ default void onSubscribe(org.reactivestreams.Subscription subscription) {}
911
voidonFirstStreamPosition(longposition);
1012
voidonLastStreamPosition(longposition);
1113
voidonLastAllStreamPosition(longcommit,longprepare);
12-
voidonCaughtUp();
13-
voidonFellBehind();
14+
voidonCaughtUp(Instanttimestamp,LongstreamRevision,Positionposition);
15+
voidonFellBehind(Instanttimestamp,LongstreamRevision,Positionposition);
1416
voidonCancelled(Throwableexception);
1517
voidonComplete();
1618
}

‎src/main/java/io/kurrent/dbclient/SubscriptionListener.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
packageio.kurrent.dbclient;
22

3+
importjava.time.Instant;
4+
35
/**
46
* Listener used to handle catch-up subscription notifications raised throughout its lifecycle.
57
*/
@@ -28,12 +30,12 @@ public void onConfirmation(Subscription subscription) {}
2830
* Called when the subscription has reached the head of the stream.
2931
* @param subscription handle to the subscription.
3032
*/
31-
publicvoidonCaughtUp(Subscriptionsubscription) {}
33+
publicvoidonCaughtUp(Subscriptionsubscription,Instanttimestamp,LongstreamRevision,Positionposition) {}
3234

3335
/**
3436
* Called when the subscription has fallen behind, meaning it's no longer keeping up with the
3537
* stream's pace.
3638
* @param subscription handle to the subscription.
3739
*/
38-
publicvoidonFellBehind(Subscriptionsubscription) {}
40+
publicvoidonFellBehind(Subscriptionsubscription,Instanttimestamp,LongstreamRevision,Positionposition) {}
3941
}

‎src/main/java/io/kurrent/dbclient/SubscriptionStreamConsumer.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
packageio.kurrent.dbclient;
22

33

4+
importjava.time.Instant;
45
importjava.util.concurrent.CompletableFuture;
56

67
classSubscriptionStreamConsumerimplementsStreamConsumer {
@@ -56,13 +57,13 @@ public void onLastStreamPosition(long position) {}
5657
publicvoidonLastAllStreamPosition(longcommit,longprepare) {}
5758

5859
@Override
59-
publicvoidonCaughtUp() {
60-
this.listener.onCaughtUp(this.subscription);
60+
publicvoidonCaughtUp(Instanttimestamp,LongstreamRevision,Positionposition) {
61+
this.listener.onCaughtUp(this.subscription,timestamp,streamRevision,position);
6162
}
6263

6364
@Override
64-
publicvoidonFellBehind() {
65-
this.listener.onFellBehind(this.subscription);
65+
publicvoidonFellBehind(Instanttimestamp,LongstreamRevision,Positionposition) {
66+
this.listener.onFellBehind(this.subscription,timestamp,streamRevision,position);
6667
}
6768

6869
@Override

‎src/main/proto/streams.proto

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,37 @@ message ReadResp {
103103
FellBehindfell_behind=9;
104104
}
105105

106-
messageCaughtUp {}
106+
// The $all or stream subscription has caught up and become live.
107+
messageCaughtUp {
108+
// Current time in the server when the subscription caught up
109+
google.protobuf.Timestamptimestamp=1;
110+
111+
// Checkpoint for resuming a stream subscription.
112+
// For stream subscriptions it is populated unless the stream is empty.
113+
// For $all subscriptions it is not populated.
114+
optionalint64stream_revision=2;
115+
116+
// Checkpoint for resuming a $all subscription.
117+
// For stream subscriptions it is not populated.
118+
// For $all subscriptions it is populated unless the database is empty.
119+
optionalPositionposition=3;
120+
}
121+
122+
// The $all or stream subscription has fallen back into catchup mode and is no longer live.
123+
messageFellBehind {
124+
// Current time in the server when the subscription fell behind
125+
google.protobuf.Timestamptimestamp=1;
126+
127+
// Checkpoint for resuming a stream subscription.
128+
// For stream subscriptions it is populated unless the stream is empty.
129+
// For $all subscriptions it is not populated.
130+
optionalint64stream_revision=2;
107131

108-
messageFellBehind {}
132+
// Checkpoint for resuming a $all subscription.
133+
// For stream subscriptions it is not populated.
134+
// For $all subscriptions it is populated unless the database is empty.
135+
optionalPositionposition=3;
136+
}
109137

110138
messageReadEvent {
111139
RecordedEventevent=1;
@@ -132,7 +160,16 @@ message ReadResp {
132160
messageCheckpoint {
133161
uint64commit_position=1;
134162
uint64prepare_position=2;
163+
164+
// Current time in the server when the checkpoint was reached
165+
google.protobuf.Timestamptimestamp=3;
135166
}
167+
168+
messagePosition {
169+
uint64commit_position=1;
170+
uint64prepare_position=2;
171+
}
172+
136173
messageStreamNotFound {
137174
event_store.client.StreamIdentifierstream_identifier=1;
138175
}

‎src/test/java/io/kurrent/dbclient/streams/SubscriptionTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
importorg.junit.jupiter.api.Timeout;
88

99
importjava.io.IOException;
10+
importjava.time.Instant;
1011
importjava.util.ArrayList;
1112
importjava.util.Optional;
1213
importjava.util.concurrent.CountDownLatch;
@@ -176,7 +177,7 @@ default void testCaughtUpMessageIsReceived() throws Throwable {
176177

177178
Subscriptionsubscription =client.subscribeToStream(streamName,newSubscriptionListener() {
178179
@Override
179-
publicvoidonCaughtUp(Subscriptionsubscription) {
180+
publicvoidonCaughtUp(Subscriptionsubscription,Instanttimestamp,LongstreamRevision,Positionposition) {
180181
caughtUp.countDown();
181182
}
182183
},SubscribeToStreamOptions.get().fromStart()).get();

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp