Init.
This commit is contained in:
59
kafka-common-test/pom.xml
Normal file
59
kafka-common-test/pom.xml
Normal file
@@ -0,0 +1,59 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>com.ippon.trainning.kafkaintegrationtest</groupId>
|
||||
<artifactId>kafka-integration-test-parent</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
<relativePath>../pom.xml</relativePath>
|
||||
</parent>
|
||||
|
||||
<artifactId>kafka-common-test</artifactId>
|
||||
<version>0.0.1-SNAPSHOT</version>
|
||||
|
||||
<properties>
|
||||
<java.version>11</java.version>
|
||||
<jupiter.version>5.7.0</jupiter.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-test</artifactId>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka-test</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.junit.jupiter</groupId>
|
||||
<artifactId>junit-jupiter-engine</artifactId>
|
||||
<version>${jupiter.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.junit.jupiter</groupId>
|
||||
<artifactId>junit-jupiter-api</artifactId>
|
||||
<version>${jupiter.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.junit.jupiter</groupId>
|
||||
<artifactId>junit-jupiter-params</artifactId>
|
||||
<version>${jupiter.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.awaitility</groupId>
|
||||
<artifactId>awaitility</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
@@ -0,0 +1,73 @@
|
||||
package com.ippon.trainning.kafkaintegrationtest.kafkacommontest.assertor;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.common.header.Header;
|
||||
import org.assertj.core.api.AbstractObjectAssert;
|
||||
|
||||
import java.util.Iterator;
|
||||
|
||||
import org.assertj.core.api.Assertions;
|
||||
|
||||
public class ConsumerRecordAssert<K, V> extends AbstractObjectAssert<ConsumerRecordAssert<K, V>, ConsumerRecord<K, V>> {
|
||||
private final ObjectMapper objectMapper = new ObjectMapper();
|
||||
|
||||
private ConsumerRecordAssert(ConsumerRecord<K, V> record) {
|
||||
super(record, ConsumerRecordAssert.class);
|
||||
}
|
||||
|
||||
public static <K, V> ConsumerRecordAssert<K, V> assertThat(ConsumerRecord<K, V> record) {
|
||||
return new ConsumerRecordAssert<>(record);
|
||||
}
|
||||
|
||||
public ConsumerRecordAssert<K, V> hasValue(V expectedValue) {
|
||||
isNotNull();
|
||||
|
||||
Assertions.assertThat(actual.value()).usingRecursiveComparison().isEqualTo(expectedValue);
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
public ConsumerRecordAssert<K, V> hasKey(K expectedKey) {
|
||||
isNotNull();
|
||||
|
||||
Assertions.assertThat(actual.key()).usingRecursiveComparison().isEqualTo(expectedKey);
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
public ConsumerRecordAssert<K, V> hasHeader(String headerName, Object expectedHeaderValue) {
|
||||
isNotNull();
|
||||
|
||||
String expectedHeaderValueAsJsonString = null;
|
||||
try {
|
||||
expectedHeaderValueAsJsonString = objectMapper.writeValueAsString(expectedHeaderValue);
|
||||
} catch (JsonProcessingException ex) {
|
||||
throw new IllegalArgumentException(String.format(
|
||||
"Could not initialize <hasHeader> assertion due to JSON serialization of expected value <%s>",
|
||||
expectedHeaderValue
|
||||
));
|
||||
}
|
||||
|
||||
Assertions.assertThat(actual.headers()).isNotEmpty();
|
||||
|
||||
Iterator<Header> headerIterator = actual.headers().headers(headerName).iterator();
|
||||
if (!headerIterator.hasNext()) {
|
||||
failWithMessage("<%s> should contain header <%s> with value <%s>, but it does not.",
|
||||
ConsumerRecord.class.getSimpleName(),
|
||||
headerName,
|
||||
expectedHeaderValueAsJsonString);
|
||||
}
|
||||
String headerValue = new String(headerIterator.next().value());
|
||||
if (!expectedHeaderValueAsJsonString.equals(headerValue)) {
|
||||
failWithMessage("<%s> should contain header <%s> with value <%s>, but has <%s>.",
|
||||
ConsumerRecord.class.getSimpleName(),
|
||||
headerName,
|
||||
expectedHeaderValueAsJsonString,
|
||||
headerValue);
|
||||
}
|
||||
|
||||
return this;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,66 @@
|
||||
package com.ippon.trainning.kafkaintegrationtest.kafkacommontest.extension;
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.junit.jupiter.api.extension.AfterEachCallback;
|
||||
import org.junit.jupiter.api.extension.BeforeEachCallback;
|
||||
import org.junit.jupiter.api.extension.ExtensionContext;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.listener.ContainerProperties;
|
||||
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
|
||||
import org.springframework.kafka.listener.MessageListener;
|
||||
import org.springframework.kafka.test.EmbeddedKafkaBroker;
|
||||
import org.springframework.kafka.test.utils.ContainerTestUtils;
|
||||
import org.springframework.test.context.junit.jupiter.SpringExtension;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
||||
public class EmbeddedKafkaExtension implements BeforeEachCallback, AfterEachCallback {
|
||||
private EmbeddedKafkaBroker embeddedKafkaBroker;
|
||||
private KafkaMessageListenerContainer<String, String> container;
|
||||
/**
|
||||
* This queue is required for {@link EmbeddedKafkaExtension#container} to start.
|
||||
* <p>
|
||||
* We can improve this extension by providing this queue as a Spring bean but with setting up of a sort
|
||||
* of factory to define genericity of {@link ConsumerRecord}.
|
||||
* </p>
|
||||
*/
|
||||
@SuppressWarnings({"FieldCanBeLocal", "MismatchedQueryAndUpdateOfCollection"})
|
||||
private BlockingQueue<ConsumerRecord<String, String>> records;
|
||||
|
||||
@Override
|
||||
public void beforeEach(ExtensionContext context) throws Exception {
|
||||
ApplicationContext applicationContext = SpringExtension.getApplicationContext(context);
|
||||
embeddedKafkaBroker = applicationContext.getBean(EmbeddedKafkaBroker.class);
|
||||
|
||||
DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(getConsumerProperties());
|
||||
ContainerProperties containerProperties = new ContainerProperties("test-topic");
|
||||
container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
|
||||
records = new LinkedBlockingQueue<>();
|
||||
container.setupMessageListener((MessageListener<String, String>) records::add);
|
||||
container.start();
|
||||
ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic());
|
||||
}
|
||||
|
||||
private Map<String, Object> getConsumerProperties() {
|
||||
return Map.of(
|
||||
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaBroker.getBrokersAsString(),
|
||||
ConsumerConfig.GROUP_ID_CONFIG, "consumer",
|
||||
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true",
|
||||
ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10",
|
||||
ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "60000",
|
||||
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
|
||||
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
|
||||
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterEach(ExtensionContext context) {
|
||||
container.stop();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,68 @@
|
||||
package com.ippon.trainning.kafkaintegrationtest.kafkacommontest.utils;
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.springframework.kafka.annotation.KafkaListener;
|
||||
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
|
||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||
|
||||
/**
|
||||
* Listener that will consume a kafka topic and store consumed message.
|
||||
* <p>
|
||||
* This class serves to test a kafka producer.
|
||||
* </p>
|
||||
* @param <V> The kafka producer messages type.
|
||||
*/
|
||||
public class KafkaTopicListener<K, V> {
|
||||
/**
|
||||
* Queue to store consumed messages.
|
||||
*/
|
||||
private final BlockingQueue<ConsumerRecord<K, V>> messages = new LinkedBlockingDeque<>();
|
||||
|
||||
/**
|
||||
* Serves to consume a kafka topic, and store consumed messages in the queue.
|
||||
* <p>
|
||||
* This method has to be annotated with {@link KafkaListener} annotation in classes that extend this one.
|
||||
* </p>
|
||||
* <p>
|
||||
* Example:
|
||||
* <pre>{@code
|
||||
* new KafkaTopicListener<>() {
|
||||
* @KafkaListener(
|
||||
* topics = "topic-name",
|
||||
* groupId = "groupId-name",
|
||||
* containerFactory = "container-factory-bean-name"
|
||||
* )
|
||||
* @Override
|
||||
* public void listen(ConsumerRecord<String, String> record) {
|
||||
* super.listen(record);
|
||||
* }
|
||||
* };
|
||||
* }</pre>
|
||||
* </p>
|
||||
* <p>
|
||||
* Note: The {@code container-factory-bean-name} is a bean that you have to declare
|
||||
* (with a {@code @Bean} annotated method) that should be an instance of
|
||||
* {@link org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory}.
|
||||
* </p>
|
||||
* @param message Topic consumed message.
|
||||
*/
|
||||
public void listen(ConsumerRecord<K, V> message) {
|
||||
messages.add(message);
|
||||
}
|
||||
|
||||
/**
|
||||
* Waits at most 5 seconds that kafka listener consumes a message,
|
||||
* and return it if one was consumed in this period, otherwise, {@code null} will be returned.
|
||||
* @return The kafka consumed message, or {@code null}.
|
||||
*/
|
||||
public ConsumerRecord<K, V> getMessage() {
|
||||
try {
|
||||
return messages.poll(5, SECONDS);
|
||||
} catch (InterruptedException ex) {
|
||||
throw new AssertionError(ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user