Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit9be86a4

Browse files
committed
Upgrade to RxJava 3.0.0-RC3, adjust internals
1 parent0b90b7f commit9be86a4

25 files changed

+156
-259
lines changed

‎README.md‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ RxJava 3.x implementation of extra sources, operators and components and ports o
1313

1414
```
1515
dependencies {
16-
compile "com.github.akarnokd:rxjava3-extensions:3.0.0-RC2"
16+
compile "com.github.akarnokd:rxjava3-extensions:3.0.0-RC3"
1717
}
1818
```
1919

‎build.gradle‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ dependencies {
4949
signature'org.codehaus.mojo.signature:java16:1.1@signature'
5050

5151
compile"org.reactivestreams:reactive-streams:1.0.3"
52-
compile"io.reactivex.rxjava3:rxjava:3.0.0-RC2"
52+
compile"io.reactivex.rxjava3:rxjava:3.0.0-RC3"
5353

5454
testCompilegroup:'junit',name:'junit',version:'4.12'
5555

‎gradle.properties‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=3.0.0-RC2
1+
version=3.0.0-RC3

‎src/main/java/hu/akarnokd/rxjava3/basetypes/NonoConcat.java‎

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
importio.reactivex.rxjava3.internal.queue.*;
2626
importio.reactivex.rxjava3.internal.subscriptions.*;
2727
importio.reactivex.rxjava3.internal.util.*;
28-
importio.reactivex.rxjava3.plugins.RxJavaPlugins;
2928

3029
/**
3130
* Concatenate sources emitted by a Publisher one after another and complete after each complete.
@@ -64,7 +63,7 @@ abstract static class AbstractConcatSubscriber extends BasicIntQueueSubscription
6463

6564
finalintlimit;
6665

67-
finalAtomicThrowableerror;
66+
finalAtomicThrowableerrors;
6867

6968
finalInnerSubscriberinner;
7069

@@ -86,7 +85,7 @@ abstract static class AbstractConcatSubscriber extends BasicIntQueueSubscription
8685
this.downstream =downstream;
8786
this.prefetch =prefetch;
8887
this.limit =prefetch - (prefetch >>2);
89-
this.error =newAtomicThrowable();
88+
this.errors =newAtomicThrowable();
9089
this.inner =newInnerSubscriber();
9190
}
9291

@@ -240,14 +239,14 @@ static final class ConcatImmediateSubscriber extends AbstractConcatSubscriber {
240239

241240
@Override
242241
publicvoidonError(Throwablet) {
243-
cancel();
244-
HalfSerializer.onError(downstream,t,this,error);
242+
cancelIf(true);
243+
HalfSerializer.onError(downstream,t,this,errors);
245244
}
246245

247246
@Override
248247
publicvoidinnerError(Throwablet) {
249-
cancel();
250-
HalfSerializer.onError(downstream,t,this,error);
248+
cancelIf(true);
249+
HalfSerializer.onError(downstream,t,this,errors);
251250
}
252251

253252
@Override
@@ -258,13 +257,21 @@ public void onComplete() {
258257

259258
@Override
260259
publicvoidcancel() {
260+
cancelIf(false);
261+
}
262+
263+
voidcancelIf(booleanerror) {
261264
cancelled =true;
262265
upstream.cancel();
263266
inner.dispose();
267+
if (!error) {
268+
errors.tryTerminateAndReport();
269+
}
264270

265271
if (wip.getAndIncrement() ==0) {
266272
queue.clear();
267273
}
274+
268275
}
269276

270277
@Override
@@ -289,14 +296,14 @@ public void drain() {
289296
Exceptions.throwIfFatal(ex);
290297
upstream.cancel();
291298
queue.clear();
292-
HalfSerializer.onError(downstream,ex,this,error);
299+
HalfSerializer.onError(downstream,ex,this,errors);
293300
return;
294301
}
295302

296303
booleanempty =np ==null;
297304

298305
if (d &&empty) {
299-
HalfSerializer.onComplete(downstream,this,error);
306+
HalfSerializer.onComplete(downstream,this,errors);
300307
return;
301308
}
302309

@@ -324,11 +331,9 @@ static final class ConcatDelayedSubscriber extends AbstractConcatSubscriber {
324331

325332
@Override
326333
publicvoidonError(Throwablet) {
327-
if (error.addThrowable(t)) {
334+
if (errors.tryAddThrowableOrReport(t)) {
328335
done =true;
329336
drain();
330-
}else {
331-
RxJavaPlugins.onError(t);
332337
}
333338
}
334339

@@ -343,6 +348,7 @@ public void cancel() {
343348
cancelled =true;
344349
upstream.cancel();
345350
inner.dispose();
351+
errors.tryTerminateAndReport();
346352

347353
if (getAndIncrement() ==0) {
348354
queue.clear();
@@ -362,9 +368,9 @@ void drain() {
362368
}
363369

364370
if (!active) {
365-
if (!tillTheEnd &&error.get() !=null) {
371+
if (!tillTheEnd &&errors.get() !=null) {
366372
queue.clear();
367-
downstream.onError(error.terminate());
373+
errors.tryTerminateConsumer(downstream);
368374
return;
369375
}
370376

@@ -378,21 +384,16 @@ void drain() {
378384
Exceptions.throwIfFatal(ex);
379385
upstream.cancel();
380386
queue.clear();
381-
error.addThrowable(ex);
387+
errors.tryAddThrowableOrReport(ex);
382388

383-
downstream.onError(error.terminate());
389+
errors.tryTerminateConsumer(downstream);
384390
return;
385391
}
386392

387393
booleanempty =np ==null;
388394

389395
if (d &&empty) {
390-
Throwableex =error.terminate();
391-
if (ex !=null) {
392-
downstream.onError(ex);
393-
}else {
394-
downstream.onComplete();
395-
}
396+
errors.tryTerminateConsumer(downstream);
396397
return;
397398
}
398399

@@ -408,14 +409,12 @@ void drain() {
408409

409410
@Override
410411
voidinnerError(Throwablet) {
411-
if (error.addThrowable(t)) {
412+
if (errors.tryAddThrowableOrReport(t)) {
412413
if (!tillTheEnd) {
413414
upstream.cancel();
414415
}
415416
active =false;
416417
drain();
417-
}else {
418-
RxJavaPlugins.onError(t);
419418
}
420419
}
421420
}

‎src/main/java/hu/akarnokd/rxjava3/basetypes/NonoConcatArray.java‎

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ static final class ConcatSubscriber extends BasicRefQueueSubscription<Void, Subs
7070
@Override
7171
publicvoidcancel() {
7272
SubscriptionHelper.cancel(this);
73+
if (errors !=null) {
74+
errors.tryTerminateAndReport();
75+
}
7376
}
7477

7578
@Override
@@ -86,9 +89,10 @@ public void onNext(Void t) {
8689
publicvoidonError(Throwablet) {
8790
AtomicThrowableerr =errors;
8891
if (err !=null) {
89-
err.addThrowable(t);
90-
active =false;
91-
drain();
92+
if (err.tryAddThrowableOrReport(t)) {
93+
active =false;
94+
drain();
95+
}
9296
}else {
9397
downstream.onError(t);
9498
}
@@ -113,9 +117,8 @@ void drain() {
113117
if (!active) {
114118
intidx =index;
115119
if (idx ==sources.length) {
116-
Throwableex =errors !=null ?errors.terminate() :null;
117-
if (ex !=null) {
118-
downstream.onError(ex);
120+
if (errors !=null) {
121+
errors.tryTerminateConsumer(downstream);
119122
}else {
120123
downstream.onComplete();
121124
}
@@ -129,8 +132,8 @@ void drain() {
129132
if (np ==null) {
130133
NullPointerExceptionnpe =newNullPointerException("One of the sources is null");
131134
if (errors !=null) {
132-
errors.addThrowable(npe);
133-
downstream.onError(errors.terminate());
135+
errors.tryAddThrowableOrReport(npe);
136+
errors.tryTerminateConsumer(downstream);
134137
}else {
135138
downstream.onError(npe);
136139
}

‎src/main/java/hu/akarnokd/rxjava3/basetypes/NonoConcatIterable.java‎

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,9 @@ static final class ConcatSubscriber extends BasicRefQueueSubscription<Void, Subs
8181
@Override
8282
publicvoidcancel() {
8383
SubscriptionHelper.cancel(this);
84+
if (errors !=null) {
85+
errors.tryTerminateAndReport();
86+
}
8487
}
8588

8689
@Override
@@ -97,9 +100,10 @@ public void onNext(Void t) {
97100
publicvoidonError(Throwablet) {
98101
AtomicThrowableerr =errors;
99102
if (err !=null) {
100-
err.addThrowable(t);
101-
active =false;
102-
drain();
103+
if (err.tryAddThrowableOrReport(t)) {
104+
active =false;
105+
drain();
106+
}
103107
}else {
104108
downstream.onError(t);
105109
}
@@ -133,17 +137,16 @@ void drain() {
133137
}catch (Throwableex) {
134138
Exceptions.throwIfFatal(ex);
135139
if (errors !=null) {
136-
errors.addThrowable(ex);
137-
downstream.onError(errors.terminate());
140+
errors.tryAddThrowableOrReport(ex);
141+
errors.tryTerminateConsumer(downstream);
138142
}else {
139143
downstream.onError(ex);
140144
}
141145
return;
142146
}
143147
if (!b) {
144-
Throwableex =errors !=null ?errors.terminate() :null;
145-
if (ex !=null) {
146-
downstream.onError(ex);
148+
if (errors !=null) {
149+
errors.tryTerminateConsumer(downstream);
147150
}else {
148151
downstream.onComplete();
149152
}

‎src/main/java/hu/akarnokd/rxjava3/basetypes/NonoMerge.java‎

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -98,14 +98,11 @@ public void onNext(Nono t) {
9898

9999
@Override
100100
publicvoidonError(Throwablet) {
101-
if (errors.addThrowable(t)) {
101+
if (errors.tryAddThrowable(t)) {
102102
if (!delayErrors) {
103103
set.dispose();
104104

105-
Throwableex =errors.terminate();
106-
if (ex !=ExceptionHelper.TERMINATED) {
107-
downstream.onError(ex);
108-
}
105+
errors.tryTerminateConsumer(downstream);
109106
}else {
110107
onComplete();
111108
}
@@ -117,12 +114,7 @@ public void onError(Throwable t) {
117114
@Override
118115
publicvoidonComplete() {
119116
if (decrementAndGet() ==0) {
120-
Throwableex =errors.terminate();
121-
if (ex !=null) {
122-
downstream.onError(ex);
123-
}else {
124-
downstream.onComplete();
125-
}
117+
errors.tryTerminateConsumer(downstream);
126118
}
127119
}
128120

@@ -146,7 +138,7 @@ void complete() {
146138

147139
voidinnerError(Disposableinner,Throwableerror) {
148140
set.delete(inner);
149-
if (errors.addThrowable(error)) {
141+
if (errors.tryAddThrowableOrReport(error)) {
150142
if (!delayErrors) {
151143
set.dispose();
152144

@@ -157,15 +149,14 @@ void innerError(Disposable inner, Throwable error) {
157149
}else {
158150
complete();
159151
}
160-
}else {
161-
RxJavaPlugins.onError(error);
162152
}
163153
}
164154

165155
@Override
166156
publicvoidcancel() {
167157
upstream.cancel();
168158
set.dispose();
159+
errors.tryTerminateAndReport();
169160
}
170161

171162
finalclassMergeInnerSubscriberextendsAtomicReference<Subscription>implementsSubscriber<Void>,Disposable {

‎src/main/java/hu/akarnokd/rxjava3/basetypes/NonoMergeArray.java‎

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,7 @@
2222

2323
importhu.akarnokd.rxjava3.util.CompositeSubscription;
2424
importio.reactivex.rxjava3.internal.subscriptions.*;
25-
importio.reactivex.rxjava3.internal.util.*;
26-
importio.reactivex.rxjava3.plugins.RxJavaPlugins;
25+
importio.reactivex.rxjava3.internal.util.AtomicThrowable;
2726

2827
/**
2928
* Run Nono sources in parallel and complete when all complete.
@@ -108,6 +107,7 @@ public void request(long n) {
108107
publicvoidcancel() {
109108
cancelled =true;
110109
set.cancel();
110+
errors.tryTerminateAndReport();
111111
}
112112

113113
voidsubscribe(intn) {
@@ -143,16 +143,13 @@ void subscribe(int n) {
143143
Nononp =srcs[i];
144144

145145
if (np ==null) {
146-
errors.addThrowable(newNullPointerException("A source is null"));
146+
errors.tryAddThrowableOrReport(newNullPointerException("A source is null"));
147147
if (delayErrors) {
148148
i =f;
149149
break;
150150
}
151151
set.cancel();
152-
Throwableex =errors.terminate();
153-
if (ex !=ExceptionHelper.TERMINATED) {
154-
downstream.onError(ex);
155-
}
152+
errors.tryTerminateConsumer(downstream);
156153
return;
157154
}
158155

@@ -190,20 +187,15 @@ void subscribe(int n) {
190187
@Override
191188
publicvoidinnerError(InnerSubscriberinner,Throwableex) {
192189
set.delete(inner);
193-
if (errors.addThrowable(ex)) {
190+
if (errors.tryAddThrowableOrReport(ex)) {
194191
if (!delayErrors) {
195192
set.cancel();
196193

197-
ex =errors.terminate();
198-
if (ex !=ExceptionHelper.TERMINATED) {
199-
downstream.onError(ex);
200-
}
194+
errors.tryTerminateConsumer(downstream);
201195
}else {
202196
subscribe(1);
203197
complete();
204198
}
205-
}else {
206-
RxJavaPlugins.onError(ex);
207199
}
208200
}
209201

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp