Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitd068762

Browse files
authored
Document read reactive (#322)
1 parent6dfea76 commitd068762

File tree

1 file changed

+243
-0
lines changed

1 file changed

+243
-0
lines changed

‎src/test/java/io/kurrent/dbclient/samples/reading_events/ReadingEvents.java

Lines changed: 243 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,12 @@
33
importio.kurrent.dbclient.*;
44
importcom.fasterxml.jackson.core.JsonProcessingException;
55
importcom.fasterxml.jackson.databind.ObjectMapper;
6+
importorg.reactivestreams.*;
7+
importorg.reactivestreams.Subscription;
68

79
importjava.util.concurrent.ExecutionException;
810

11+
@SuppressWarnings("ALL")
912
publicclassReadingEvents {
1013
privatestaticvoidreadFromStream(KurrentDBClientclient)throwsExecutionException,InterruptedException,JsonProcessingException {
1114
// region read-from-stream
@@ -16,13 +19,41 @@ private static void readFromStream(KurrentDBClient client) throws ExecutionExcep
1619
ReadResultresult =client.readStream("some-stream",options)
1720
.get();
1821

22+
23+
// or using read reactive
24+
Publisher<ReadMessage>publisher =client.readStreamReactive("some-stream",options);
1925
// endregion read-from-stream
2026

2127
// region iterate-stream
2228
for (ResolvedEventresolvedEvent :result.getEvents()) {
2329
RecordedEventrecordedEvent =resolvedEvent.getOriginalEvent();
2430
System.out.println(newObjectMapper().writeValueAsString(recordedEvent.getEventData()));
2531
}
32+
33+
// or using read reactive
34+
publisher.subscribe(newSubscriber<ReadMessage>() {
35+
@Override
36+
publicvoidonSubscribe(Subscriptionsubscription) {
37+
}
38+
39+
@Override
40+
publicvoidonNext(ReadMessagereadMessage) {
41+
RecordedEventrecordedEvent =readMessage.getEvent().getOriginalEvent();
42+
try {
43+
System.out.println(newObjectMapper().writeValueAsString(recordedEvent.getEventData()));
44+
}catch (JsonProcessingExceptione) {
45+
thrownewRuntimeException(e);
46+
}
47+
}
48+
49+
@Override
50+
publicvoidonError(Throwablethrowable) {
51+
}
52+
53+
@Override
54+
publicvoidonComplete() {
55+
}
56+
});
2657
// endregion iterate-stream
2758
}
2859

@@ -36,13 +67,40 @@ private static void readFromStreamPosition(KurrentDBClient client) throws Execut
3667
ReadResultresult =client.readStream("some-stream",options)
3768
.get();
3869

70+
// or using read reactive
71+
Publisher<ReadMessage>publisher =client.readStreamReactive("some-stream",options);
3972
// endregion read-from-stream-position
4073

4174
// region iterate-stream
4275
for (ResolvedEventresolvedEvent :result.getEvents()) {
4376
RecordedEventrecordedEvent =resolvedEvent.getOriginalEvent();
4477
System.out.println(newObjectMapper().writeValueAsString(recordedEvent.getEventData()));
4578
}
79+
80+
// or using read reactive
81+
publisher.subscribe(newSubscriber<ReadMessage>() {
82+
@Override
83+
publicvoidonSubscribe(Subscriptionsubscription) {
84+
}
85+
86+
@Override
87+
publicvoidonNext(ReadMessagereadMessage) {
88+
RecordedEventrecordedEvent =readMessage.getEvent().getOriginalEvent();
89+
try {
90+
System.out.println(newObjectMapper().writeValueAsString(recordedEvent.getEventData()));
91+
}catch (JsonProcessingExceptione) {
92+
thrownewRuntimeException(e);
93+
}
94+
}
95+
96+
@Override
97+
publicvoidonError(Throwablethrowable) {
98+
}
99+
100+
@Override
101+
publicvoidonComplete() {
102+
}
103+
});
46104
// endregion iterate-stream
47105
}
48106

@@ -55,6 +113,9 @@ private static void readStreamOverridingUserCredentials(KurrentDBClient client)
55113

56114
ReadResultresult =client.readStream("some-stream",options)
57115
.get();
116+
117+
// Or using reactive stream
118+
Publisher<ReadMessage>publisher =client.readStreamReactive("some-stream",options);
58119
// endregion overriding-user-credentials
59120
}
60121

@@ -81,6 +142,39 @@ private static void readFromStreamPositionCheck(KurrentDBClient client) throws J
81142
RecordedEventrecordedEvent =resolvedEvent.getOriginalEvent();
82143
System.out.println(newObjectMapper().writeValueAsString(recordedEvent.getEventData()));
83144
}
145+
146+
// or using read reactive
147+
Publisher<ReadMessage>publisher =client.readStreamReactive("some-stream",options);
148+
149+
publisher.subscribe(newSubscriber<ReadMessage>() {
150+
@Override
151+
publicvoidonSubscribe(Subscriptionsubscription) {
152+
}
153+
154+
@Override
155+
publicvoidonNext(ReadMessagereadMessage) {
156+
RecordedEventrecordedEvent =readMessage.getEvent().getOriginalEvent();
157+
try {
158+
System.out.println(newObjectMapper().writeValueAsString(recordedEvent.getEventData()));
159+
}catch (JsonProcessingExceptione) {
160+
thrownewRuntimeException(e);
161+
}
162+
}
163+
164+
@Override
165+
publicvoidonError(Throwablethrowable) {
166+
ThrowableinnerException =throwable.getCause();
167+
168+
if (innerExceptioninstanceofStreamNotFoundException) {
169+
return;
170+
}
171+
// Handle other errors
172+
}
173+
174+
@Override
175+
publicvoidonComplete() {
176+
}
177+
});
84178
// endregion checking-for-stream-presence
85179
}
86180

@@ -97,6 +191,33 @@ private static void readFromStreamBackwards(KurrentDBClient client) throws JsonP
97191
RecordedEventrecordedEvent =resolvedEvent.getOriginalEvent();
98192
System.out.println(newObjectMapper().writeValueAsString(recordedEvent.getEventData()));
99193
}
194+
195+
// or using read reactive
196+
Publisher<ReadMessage>publisher =client.readStreamReactive("some-stream",options);
197+
198+
publisher.subscribe(newSubscriber<ReadMessage>() {
199+
@Override
200+
publicvoidonSubscribe(Subscriptionsubscription) {
201+
}
202+
203+
@Override
204+
publicvoidonNext(ReadMessagereadMessage) {
205+
RecordedEventrecordedEvent =readMessage.getEvent().getOriginalEvent();
206+
try {
207+
System.out.println(newObjectMapper().writeValueAsString(recordedEvent.getEventData()));
208+
}catch (JsonProcessingExceptione) {
209+
thrownewRuntimeException(e);
210+
}
211+
}
212+
213+
@Override
214+
publicvoidonError(Throwablethrowable) {
215+
}
216+
217+
@Override
218+
publicvoidonComplete() {
219+
}
220+
});
100221
// endregion reading-backwards
101222
}
102223

@@ -109,13 +230,40 @@ private static void readFromAllStream(KurrentDBClient client) throws JsonProcess
109230
ReadResultresult =client.readAll(options)
110231
.get();
111232

233+
// or using read reactive
234+
Publisher<ReadMessage>publisher =client.readAllReactive(options);
112235
// endregion read-from-all-stream
113236

114237
// region read-from-all-stream-iterate
115238
for (ResolvedEventresolvedEvent :result.getEvents()) {
116239
RecordedEventrecordedEvent =resolvedEvent.getOriginalEvent();
117240
System.out.println(newObjectMapper().writeValueAsString(recordedEvent.getEventData()));
118241
}
242+
243+
// or using read reactive
244+
publisher.subscribe(newSubscriber<ReadMessage>() {
245+
@Override
246+
publicvoidonSubscribe(Subscriptionsubscription) {
247+
}
248+
249+
@Override
250+
publicvoidonNext(ReadMessagereadMessage) {
251+
RecordedEventrecordedEvent =readMessage.getEvent().getOriginalEvent();
252+
try {
253+
System.out.println(newObjectMapper().writeValueAsString(recordedEvent.getEventData()));
254+
}catch (JsonProcessingExceptione) {
255+
thrownewRuntimeException(e);
256+
}
257+
}
258+
259+
@Override
260+
publicvoidonError(Throwablethrowable) {
261+
}
262+
263+
@Override
264+
publicvoidonComplete() {
265+
}
266+
});
119267
// endregion read-from-all-stream-iterate
120268
}
121269

@@ -128,6 +276,9 @@ private static void readAllOverridingUserCredentials(KurrentDBClient client) thr
128276

129277
ReadResultresult =client.readAll(options)
130278
.get();
279+
280+
// or using read reactive
281+
Publisher<ReadMessage>publisher =client.readAllReactive(options);
131282
// endregion read-all-overriding-user-credentials
132283
}
133284

@@ -147,6 +298,38 @@ private static void ignoreSystemEvents(KurrentDBClient client) throws JsonProces
147298
}
148299
System.out.println(newObjectMapper().writeValueAsString(recordedEvent.getEventData()));
149300
}
301+
302+
// or using read reactive
303+
Publisher<ReadMessage>publisher =client.readAllReactive(options);
304+
305+
publisher.subscribe(newSubscriber<ReadMessage>() {
306+
@Override
307+
publicvoidonSubscribe(Subscriptionsubscription) {
308+
}
309+
310+
@Override
311+
publicvoidonNext(ReadMessagereadMessage) {
312+
RecordedEventrecordedEvent =readMessage.getEvent().getOriginalEvent();
313+
314+
if (recordedEvent.getEventType().startsWith("$")) {
315+
return;
316+
}
317+
318+
try {
319+
System.out.println(newObjectMapper().writeValueAsString(recordedEvent.getEventData()));
320+
}catch (JsonProcessingExceptione) {
321+
thrownewRuntimeException(e);
322+
}
323+
}
324+
325+
@Override
326+
publicvoidonError(Throwablethrowable) {
327+
}
328+
329+
@Override
330+
publicvoidonComplete() {
331+
}
332+
});
150333
// endregion ignore-system-events
151334
}
152335

@@ -159,13 +342,40 @@ private static void readFromAllStreamBackwards(KurrentDBClient client) throws Js
159342
ReadResultresult =client.readAll(options)
160343
.get();
161344

345+
// or using read reactive
346+
Publisher<ReadMessage>publisher =client.readAllReactive(options);
162347
// endregion read-from-all-stream-backwards
163348

164349
// region read-from-all-stream-iterate
165350
for (ResolvedEventresolvedEvent :result.getEvents()) {
166351
RecordedEventrecordedEvent =resolvedEvent.getOriginalEvent();
167352
System.out.println(newObjectMapper().writeValueAsString(recordedEvent.getEventData()));
168353
}
354+
355+
// or using read reactive
356+
publisher.subscribe(newSubscriber<ReadMessage>() {
357+
@Override
358+
publicvoidonSubscribe(Subscriptionsubscription) {
359+
}
360+
361+
@Override
362+
publicvoidonNext(ReadMessagereadMessage) {
363+
RecordedEventrecordedEvent =readMessage.getEvent().getOriginalEvent();
364+
try {
365+
System.out.println(newObjectMapper().writeValueAsString(recordedEvent.getEventData()));
366+
}catch (JsonProcessingExceptione) {
367+
thrownewRuntimeException(e);
368+
}
369+
}
370+
371+
@Override
372+
publicvoidonError(Throwablethrowable) {
373+
}
374+
375+
@Override
376+
publicvoidonComplete() {
377+
}
378+
});
169379
// endregion read-from-all-stream-iterate
170380
}
171381

@@ -184,6 +394,36 @@ private static void filteringOutSystemEvents(KurrentDBClient client) throws Json
184394
}
185395
System.out.println(newObjectMapper().writeValueAsString(recordedEvent.getEventData()));
186396
}
397+
398+
// or using read reactive
399+
Publisher<ReadMessage>publisher =client.readAllReactive(options);
400+
401+
publisher.subscribe(newSubscriber<ReadMessage>() {
402+
@Override
403+
publicvoidonSubscribe(Subscriptionsubscription) {
404+
}
405+
406+
@Override
407+
publicvoidonNext(ReadMessagereadMessage) {
408+
RecordedEventrecordedEvent =readMessage.getEvent().getOriginalEvent();
409+
if (!recordedEvent.getEventType().startsWith("$")) {
410+
return;
411+
}
412+
try {
413+
System.out.println(newObjectMapper().writeValueAsString(recordedEvent.getEventData()));
414+
}catch (JsonProcessingExceptione) {
415+
thrownewRuntimeException(e);
416+
}
417+
}
418+
419+
@Override
420+
publicvoidonError(Throwablethrowable) {
421+
}
422+
423+
@Override
424+
publicvoidonComplete() {
425+
}
426+
});
187427
}
188428

189429
privatestaticvoidreadFromStreamResolvingLinkTos(KurrentDBClientclient)throwsJsonProcessingException,ExecutionException,InterruptedException {
@@ -196,6 +436,9 @@ private static void readFromStreamResolvingLinkTos(KurrentDBClient client) throw
196436
ReadResultresult =client.readAll(options)
197437
.get();
198438

439+
// or using read reactive
440+
Publisher<ReadMessage>publisher =client.readAllReactive(options);
441+
199442
// endregion read-from-all-stream-resolving-link-Tos
200443
for (ResolvedEventresolvedEvent :result.getEvents()) {
201444
RecordedEventrecordedEvent =resolvedEvent.getOriginalEvent();

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp