|
| 1 | +packageio.kurrent.dbclient.connection; |
| 2 | + |
| 3 | +importio.kurrent.dbclient.*; |
| 4 | +importorg.junit.jupiter.api.Assertions; |
| 5 | +importorg.junit.jupiter.api.Test; |
| 6 | +importorg.junit.jupiter.api.Timeout; |
| 7 | + |
| 8 | +importjava.util.concurrent.CountDownLatch; |
| 9 | +importjava.util.concurrent.TimeUnit; |
| 10 | +importjava.util.concurrent.atomic.AtomicBoolean; |
| 11 | +importjava.util.concurrent.atomic.AtomicInteger; |
| 12 | +importjava.util.concurrent.atomic.AtomicReference; |
| 13 | + |
| 14 | +publicclassConnectionShutdownTests { |
| 15 | +@Test |
| 16 | +@Timeout(value =1,unit =TimeUnit.MINUTES) |
| 17 | +publicvoidtestDatabaseCleanupWithActiveSubscription()throwsThrowable { |
| 18 | +DatabasetestDatabase =DatabaseFactory.spawn(); |
| 19 | +KurrentDBClientclient =testDatabase.defaultClient(); |
| 20 | + |
| 21 | +finalAtomicIntegercount =newAtomicInteger(0); |
| 22 | +finalAtomicIntegerretryCount =newAtomicInteger(-1); |
| 23 | +finalAtomicBooleancancellationReceived =newAtomicBoolean(false); |
| 24 | +finalCountDownLatchcancellationLatch =newCountDownLatch(1); |
| 25 | +finalAtomicReference<Throwable>reconnectError =newAtomicReference<>(); |
| 26 | + |
| 27 | +SubscriptionListenerlistener =newSubscriptionListener() { |
| 28 | +@Override |
| 29 | +publicvoidonEvent(Subscriptionsubscription,ResolvedEventevent) { |
| 30 | +count.incrementAndGet(); |
| 31 | + } |
| 32 | + |
| 33 | +@Override |
| 34 | +publicvoidonCancelled(Subscriptionsubscription,Throwablethrowable) { |
| 35 | +cancellationReceived.set(true); |
| 36 | + |
| 37 | +retryCount.incrementAndGet(); |
| 38 | + |
| 39 | +try { |
| 40 | +client.subscribeToAll(this).get(10,TimeUnit.SECONDS); |
| 41 | + }catch (Throwableex) { |
| 42 | +reconnectError.set(ex); |
| 43 | + }finally { |
| 44 | +cancellationLatch.countDown(); |
| 45 | + } |
| 46 | + } |
| 47 | + }; |
| 48 | + |
| 49 | +client.subscribeToAll(listener).get(); |
| 50 | + |
| 51 | +testDatabase.dispose(); |
| 52 | + |
| 53 | +booleancallbackReceived =cancellationLatch.await(30,TimeUnit.SECONDS); |
| 54 | +Assertions.assertTrue(callbackReceived); |
| 55 | +Assertions.assertTrue(cancellationReceived.get()); |
| 56 | +Assertions.assertTrue(count.get() >0); |
| 57 | +Assertions.assertEquals(2,retryCount.get()); |
| 58 | + |
| 59 | +Throwableex =reconnectError.get(); |
| 60 | +Assertions.assertInstanceOf(ConnectionShutdownException.class,ex.getCause()); |
| 61 | + } |
| 62 | +} |