Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Add test for super stream exchange#357

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Draft
acogoluegnes wants to merge2 commits intomain
base:main
Choose a base branch
Loading
fromsuper-stream-exchange
Draft
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions.github/workflows/test-pr.yml
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -5,6 +5,9 @@ on:
branches:
- main

env:
RABBITMQ_IMAGE: 'pivotalrabbitmq/rabbitmq:super-stream-exchange-type-otp-max-bazel'

jobs:
build:
runs-on: ubuntu-22.04
Expand Down
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
// info@rabbitmq.com.
package com.rabbitmq.stream.impl;

import static com.rabbitmq.stream.impl.TestUtils.*;
import static com.rabbitmq.stream.impl.TestUtils.SuperStreamExchangeType.SUPER;
import static org.assertj.core.api.Assertions.assertThat;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.stream.*;
import io.netty.channel.EventLoopGroup;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.function.BiConsumer;import java.util.stream.IntStream;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
public class SuperStreamExchangeTest {

EventLoopGroup eventLoopGroup;

Environment environment;

Connection connection;
int partitions = 3;
int messageCount = 10_000;
String superStream;

@BeforeEach
void init(TestInfo info) throws Exception {
EnvironmentBuilder environmentBuilder =
Environment.builder().netty().eventLoopGroup(eventLoopGroup).environmentBuilder();
environmentBuilder.addressResolver(add -> localhost());
environment = environmentBuilder.build();
connection = new ConnectionFactory().newConnection();
superStream = TestUtils.streamName(info);
}

@AfterEach
void tearDown() throws Exception {
environment.close();
deleteSuperStreamTopology(connection, superStream, partitions);
connection.close();
}

@Test
void publish() throws Exception {
declareSuperStreamTopology(connection, superStream, SUPER, partitions);
List<String> routingKeys = new ArrayList<>(messageCount);
IntStream.range(0, messageCount)
.forEach(ignored -> routingKeys.add(UUID.randomUUID().toString()));

CountDownLatch publishLatch = new CountDownLatch(messageCount);
try (Producer producer =
environment
.producerBuilder()
.superStream(superStream)
.routing(msg -> msg.getProperties().getMessageIdAsString())
.producerBuilder()
.build()) {
ConfirmationHandler confirmationHandler = status -> publishLatch.countDown();
routingKeys.forEach(
rk ->
producer.send(
producer.messageBuilder().properties().messageId(rk).messageBuilder().build(),
confirmationHandler));
latchAssert(publishLatch).completes();
}

java.util.function.Consumer<Map<String, Set<String>>> consumeMessages =
receivedMessages -> {
CountDownLatch consumeLatch = new CountDownLatch(messageCount);
try (Consumer ignored =
environment
.consumerBuilder()
.superStream(superStream)
.offset(OffsetSpecification.first())
.messageHandler(
(ctx, msg) -> {
receivedMessages
.computeIfAbsent(ctx.stream(), k -> ConcurrentHashMap.newKeySet())
.add(msg.getProperties().getMessageIdAsString());
consumeLatch.countDown();
})
.build()) {

latchAssert(consumeLatch).completes();
assertThat(receivedMessages.values().stream().mapToInt(Set::size).sum())
.isEqualTo(messageCount);
}
};

Map<String, Set<String>> streamProducerMessages = new ConcurrentHashMap<>(partitions);
consumeMessages.accept(streamProducerMessages);

deleteSuperStreamTopology(connection, superStream, partitions);
declareSuperStreamTopology(connection, superStream, SUPER, partitions);

try (Channel channel = connection.createChannel()) {
channel.confirmSelect();
for (String rk : routingKeys) {
channel.basicPublish(
superStream, rk, new AMQP.BasicProperties.Builder().messageId(rk).build(), null);
}
channel.waitForConfirmsOrDie();
}

Map<String, Set<String>> amqpProducerMessages = new ConcurrentHashMap<>(partitions);
consumeMessages.accept(amqpProducerMessages);
assertThat(amqpProducerMessages).hasSameSizeAs(streamProducerMessages)
.containsKeys(streamProducerMessages.keySet().toArray(new String[]{}));

BiConsumer<Set<String>, Set<String>> compareSets = (s1, s2) -> {
assertThat(s1).hasSameSizeAs(s2);
s1.forEach(rk -> assertThat(s2).contains(rk));
};

amqpProducerMessages.forEach(
(key, value) -> compareSets.accept(value, streamProducerMessages.get(key)));
}
}
34 changes: 32 additions & 2 deletionssrc/test/java/com/rabbitmq/stream/impl/TestUtils.java
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -13,13 +13,13 @@
// info@rabbitmq.com.
package com.rabbitmq.stream.impl;

import static com.rabbitmq.stream.impl.TestUtils.SuperStreamExchangeType.DIRECT;
import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.fail;

import ch.qos.logback.classic.Level;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.stream.Address;
Expand DownExpand Up@@ -272,18 +272,37 @@ static <T> void doIfNotNull(T obj, Consumer<T> action) {

static void declareSuperStreamTopology(Connection connection, String superStream, int partitions)
throws Exception {
declareSuperStreamTopology(connection, superStream, DIRECT, partitions);
}

static void declareSuperStreamTopology(
Connection connection,
String superStream,
SuperStreamExchangeType exchangeType,
int partitions)
throws Exception {
declareSuperStreamTopology(
connection,
superStream,
exchangeType,
IntStream.range(0, partitions).mapToObj(String::valueOf).toArray(String[]::new));
}

static void declareSuperStreamTopology(Connection connection, String superStream, String... rks)
throws Exception {
declareSuperStreamTopology(connection, superStream, DIRECT, rks);
}

static void declareSuperStreamTopology(
Connection connection,
String superStream,
SuperStreamExchangeType exchangeType,
String... rks)
throws Exception {
try (Channel ch = connection.createChannel()) {
ch.exchangeDeclare(
superStream,
BuiltinExchangeType.DIRECT,
exchangeType.value,
true,
false,
Collections.singletonMap("x-super-stream", true));
Expand All@@ -309,6 +328,17 @@ static void declareSuperStreamTopology(Connection connection, String superStream
}
}

public enum SuperStreamExchangeType {
DIRECT("direct"),
SUPER("x-super-stream");

final String value;

SuperStreamExchangeType(String value) {
this.value = value;
}
}

static void deleteSuperStreamTopology(Connection connection, String superStream, int partitions)
throws Exception {
deleteSuperStreamTopology(
Expand Down

[8]ページ先頭

©2009-2025 Movatter.jp