From df3b76f166fa7103d1b9d656431e9927f428a22b Mon Sep 17 00:00:00 2001
From: Florian THIERRY
Date: Fri, 30 Apr 2021 15:44:28 +0200
Subject: [PATCH] Init.
---
.gitignore | 34 +++++
Insomnia-REST-client.json | 1 +
README.md | 11 ++
docker/docker-compose.yml | 13 ++
kafka-common-test/pom.xml | 59 ++++++++
.../assertor/ConsumerRecordAssert.java | 73 ++++++++++
.../extension/EmbeddedKafkaExtension.java | 66 +++++++++
.../utils/KafkaTopicListener.java | 68 +++++++++
kafka-consumer/pom.xml | 131 +++++++++++++++++
.../consumer/KafkaConsumerIT.java | 85 +++++++++++
.../resources/application-test.yml | 5 +
.../KafkaConsumerApplication.java | 11 ++
.../config/KafkaConsumerConfiguration.java | 46 ++++++
.../kafkaconsumer/consumer/KafkaConsumer.java | 25 ++++
.../kafkaconsumer/service/MessageService.java | 12 ++
.../src/main/resources/application.yml | 6 +
.../KafkaConsumerApplicationTests.java | 13 ++
kafka-producer/pom.xml | 137 ++++++++++++++++++
.../kafkaproducer/producer/ProducerIT.java | 96 ++++++++++++
.../resources/application-test.yml | 5 +
.../KafkaProducerApplication.java | 11 ++
.../config/KafkaProducerConfiguration.java | 37 +++++
.../controller/MessageController.java | 30 ++++
.../kafkaproducer/producer/KafkaProducer.java | 23 +++
.../src/main/resources/application.yml | 5 +
.../KafkaProducerApplicationTests.java | 13 ++
pom.xml | 36 +++++
27 files changed, 1052 insertions(+)
create mode 100644 .gitignore
create mode 100644 Insomnia-REST-client.json
create mode 100644 README.md
create mode 100644 docker/docker-compose.yml
create mode 100644 kafka-common-test/pom.xml
create mode 100644 kafka-common-test/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkacommontest/assertor/ConsumerRecordAssert.java
create mode 100644 kafka-common-test/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkacommontest/extension/EmbeddedKafkaExtension.java
create mode 100644 kafka-common-test/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkacommontest/utils/KafkaTopicListener.java
create mode 100644 kafka-consumer/pom.xml
create mode 100644 kafka-consumer/src/integration-test/java/com/ippon/trainning/kafkaintegrationtest/kafkaconsumer/consumer/KafkaConsumerIT.java
create mode 100644 kafka-consumer/src/integration-test/resources/application-test.yml
create mode 100644 kafka-consumer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaconsumer/KafkaConsumerApplication.java
create mode 100644 kafka-consumer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaconsumer/config/KafkaConsumerConfiguration.java
create mode 100644 kafka-consumer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaconsumer/consumer/KafkaConsumer.java
create mode 100644 kafka-consumer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaconsumer/service/MessageService.java
create mode 100644 kafka-consumer/src/main/resources/application.yml
create mode 100644 kafka-consumer/src/test/java/com/ippon/trainning/kafkaintegrationtest/kafkaconsumer/KafkaConsumerApplicationTests.java
create mode 100644 kafka-producer/pom.xml
create mode 100644 kafka-producer/src/integration-test/java/com/ippon/trainning/kafkaintegrationtest/kafkaproducer/producer/ProducerIT.java
create mode 100644 kafka-producer/src/integration-test/resources/application-test.yml
create mode 100644 kafka-producer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaproducer/KafkaProducerApplication.java
create mode 100644 kafka-producer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaproducer/config/KafkaProducerConfiguration.java
create mode 100644 kafka-producer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaproducer/controller/MessageController.java
create mode 100644 kafka-producer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaproducer/producer/KafkaProducer.java
create mode 100644 kafka-producer/src/main/resources/application.yml
create mode 100644 kafka-producer/src/test/java/com/ippon/trainning/kafkaintegrationtest/kafkaproducer/KafkaProducerApplicationTests.java
create mode 100644 pom.xml
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..64eb4e2
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,34 @@
+HELP.md
+kafka-consumer/target/
+!.mvn/wrapper/maven-wrapper.jar
+!**/src/main/**/target/
+!**/src/test/**/target/
+**/target/
+
+### STS ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+.sts4-cache
+
+### IntelliJ IDEA ###
+.idea
+*.iws
+*.iml
+*.ipr
+
+### NetBeans ###
+/nbproject/private/
+/nbbuild/
+/dist/
+/nbdist/
+/.nb-gradle/
+build/
+!**/src/main/**/build/
+!**/src/test/**/build/
+
+### VS Code ###
+.vscode/
diff --git a/Insomnia-REST-client.json b/Insomnia-REST-client.json
new file mode 100644
index 0000000..9d85e8f
--- /dev/null
+++ b/Insomnia-REST-client.json
@@ -0,0 +1 @@
+{"_type":"export","__export_format":4,"__export_date":"2021-01-11T16:08:14.068Z","__export_source":"insomnia.desktop.app:v2020.5.2","resources":[{"_id":"req_72df07e95f3a48c0b49c9f0a93f9f82f","parentId":"fld_952979615ccb49a48fa38d35fcae9957","modified":1610378023564,"created":1610377899987,"url":"http://{{ _.kafkaProducer.host }}:{{ _.kafkaProducer.port }}/messages","name":"Send message","description":"","method":"POST","body":{"mimeType":"application/json","text":"\"Hello world!\""},"parameters":[],"headers":[{"name":"Content-Type","value":"application/json","id":"pair_184c9a3648114ff68c5a1bf3fb37b198"}],"authentication":{},"metaSortKey":-1610377899987,"isPrivate":false,"settingStoreCookies":true,"settingSendCookies":true,"settingDisableRenderRequestBody":false,"settingEncodeUrl":true,"settingRebuildPath":true,"settingFollowRedirects":"global","_type":"request"},{"_id":"fld_952979615ccb49a48fa38d35fcae9957","parentId":"wrk_05388a4751734d0e82825b35e6ef8689","modified":1610377891368,"created":1610377891368,"name":"KafkaProducer","description":"","environment":{},"environmentPropertyOrder":null,"metaSortKey":-1610377891368,"_type":"request_group"},{"_id":"wrk_05388a4751734d0e82825b35e6ef8689","parentId":null,"modified":1610377834700,"created":1610377834700,"name":"Insomnia","description":"","scope":null,"_type":"workspace"},{"_id":"env_339b114ac8e86fda9f4de7d2181e4b03f3650c0b","parentId":"wrk_05388a4751734d0e82825b35e6ef8689","modified":1610377970871,"created":1610377834725,"name":"Base Environment","data":{"kafkaProducer":{"host":"localhost","port":"8081"},"kafkaConsumer":{"host":"localhost","port":"8082"}},"dataPropertyOrder":{"&":["kafkaProducer","kafkaConsumer"],"&~|kafkaProducer":["host","port"],"&~|kafkaConsumer":["host","port"]},"color":null,"isPrivate":false,"metaSortKey":1610377834725,"_type":"environment"},{"_id":"jar_339b114ac8e86fda9f4de7d2181e4b03f3650c0b","parentId":"wrk_05388a4751734d0e82825b35e6ef8689","modified":1610377834726,"created":1610377834726,"name":"Default Jar","cookies":[],"_type":"cookie_jar"},{"_id":"spc_e64ff1b4a38a4c988025b81b1d7e1003","parentId":"wrk_05388a4751734d0e82825b35e6ef8689","modified":1610377834700,"created":1610377834700,"fileName":"Insomnia","contents":"","contentType":"yaml","_type":"api_spec"},{"_id":"env_54124313de4d4467b7f6feccaba48848","parentId":"env_339b114ac8e86fda9f4de7d2181e4b03f3650c0b","modified":1610381253855,"created":1610377974133,"name":"Localhost","data":{"kafkaConsumer":{"host":"localhost","port":"8081"},"kafkaProducer":{"host":"localhost","port":"8082"}},"dataPropertyOrder":{"&":["kafkaConsumer","kafkaProducer"],"&~|kafkaConsumer":["host","port"],"&~|kafkaProducer":["host","port"]},"color":null,"isPrivate":false,"metaSortKey":1610377974133,"_type":"environment"}]}
\ No newline at end of file
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..f5dabf0
--- /dev/null
+++ b/README.md
@@ -0,0 +1,11 @@
+# Kafka
+## Commands
+### Launch CMD Producer
+```bash
+$ kafka-console-producer.sh --broker-list localhost:9092 --topic test
+```
+
+### Launch CMD Consumer
+```bash
+$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
+```
\ No newline at end of file
diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml
new file mode 100644
index 0000000..ba43945
--- /dev/null
+++ b/docker/docker-compose.yml
@@ -0,0 +1,13 @@
+version: '3'
+services:
+ zookeeper:
+ image: wurstmeister/zookeeper
+ ports:
+ - "2181:2181"
+ kafka:
+ image: wurstmeister/kafka
+ ports:
+ - "9092:9092"
+ environment:
+ KAFKA_ADVERTISED_HOST_NAME: "192.168.0.11"
+ KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
\ No newline at end of file
diff --git a/kafka-common-test/pom.xml b/kafka-common-test/pom.xml
new file mode 100644
index 0000000..980ca1f
--- /dev/null
+++ b/kafka-common-test/pom.xml
@@ -0,0 +1,59 @@
+
+
+ 4.0.0
+
+ com.ippon.trainning.kafkaintegrationtest
+ kafka-integration-test-parent
+ 0.0.1-SNAPSHOT
+ ../pom.xml
+
+
+ kafka-common-test
+ 0.0.1-SNAPSHOT
+
+
+ 11
+ 5.7.0
+
+
+
+
+ org.springframework.kafka
+ spring-kafka
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+
+
+ junit
+ junit
+
+
+
+
+ org.springframework.kafka
+ spring-kafka-test
+
+
+ org.junit.jupiter
+ junit-jupiter-engine
+ ${jupiter.version}
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+ ${jupiter.version}
+
+
+ org.junit.jupiter
+ junit-jupiter-params
+ ${jupiter.version}
+
+
+ org.awaitility
+ awaitility
+
+
+
diff --git a/kafka-common-test/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkacommontest/assertor/ConsumerRecordAssert.java b/kafka-common-test/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkacommontest/assertor/ConsumerRecordAssert.java
new file mode 100644
index 0000000..49c00ad
--- /dev/null
+++ b/kafka-common-test/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkacommontest/assertor/ConsumerRecordAssert.java
@@ -0,0 +1,73 @@
+package com.ippon.trainning.kafkaintegrationtest.kafkacommontest.assertor;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.Header;
+import org.assertj.core.api.AbstractObjectAssert;
+
+import java.util.Iterator;
+
+import org.assertj.core.api.Assertions;
+
+public class ConsumerRecordAssert extends AbstractObjectAssert, ConsumerRecord> {
+ private final ObjectMapper objectMapper = new ObjectMapper();
+
+ private ConsumerRecordAssert(ConsumerRecord record) {
+ super(record, ConsumerRecordAssert.class);
+ }
+
+ public static ConsumerRecordAssert assertThat(ConsumerRecord record) {
+ return new ConsumerRecordAssert<>(record);
+ }
+
+ public ConsumerRecordAssert hasValue(V expectedValue) {
+ isNotNull();
+
+ Assertions.assertThat(actual.value()).usingRecursiveComparison().isEqualTo(expectedValue);
+
+ return this;
+ }
+
+ public ConsumerRecordAssert hasKey(K expectedKey) {
+ isNotNull();
+
+ Assertions.assertThat(actual.key()).usingRecursiveComparison().isEqualTo(expectedKey);
+
+ return this;
+ }
+
+ public ConsumerRecordAssert hasHeader(String headerName, Object expectedHeaderValue) {
+ isNotNull();
+
+ String expectedHeaderValueAsJsonString = null;
+ try {
+ expectedHeaderValueAsJsonString = objectMapper.writeValueAsString(expectedHeaderValue);
+ } catch (JsonProcessingException ex) {
+ throw new IllegalArgumentException(String.format(
+ "Could not initialize assertion due to JSON serialization of expected value <%s>",
+ expectedHeaderValue
+ ));
+ }
+
+ Assertions.assertThat(actual.headers()).isNotEmpty();
+
+ Iterator
+ *
+ * Note: The {@code container-factory-bean-name} is a bean that you have to declare
+ * (with a {@code @Bean} annotated method) that should be an instance of
+ * {@link org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory}.
+ *
+ * @param message Topic consumed message.
+ */
+ public void listen(ConsumerRecord message) {
+ messages.add(message);
+ }
+
+ /**
+ * Waits at most 5 seconds that kafka listener consumes a message,
+ * and return it if one was consumed in this period, otherwise, {@code null} will be returned.
+ * @return The kafka consumed message, or {@code null}.
+ */
+ public ConsumerRecord getMessage() {
+ try {
+ return messages.poll(5, SECONDS);
+ } catch (InterruptedException ex) {
+ throw new AssertionError(ex);
+ }
+ }
+}
\ No newline at end of file
diff --git a/kafka-consumer/pom.xml b/kafka-consumer/pom.xml
new file mode 100644
index 0000000..4931528
--- /dev/null
+++ b/kafka-consumer/pom.xml
@@ -0,0 +1,131 @@
+
+
+ 4.0.0
+
+ com.ippon.trainning.kafkaintegrationtest
+ kafka-integration-test-parent
+ 0.0.1-SNAPSHOT
+ ../pom.xml
+
+
+ kafka-consumer
+ 0.0.1-SNAPSHOT
+
+
+ 11
+ 5.7.0
+ 3.2.0
+ src/integration-test/java
+ src/integration-test/resources
+ 2.22.2
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+
+
+ org.springframework.kafka
+ spring-kafka
+
+
+ org.projectlombok
+ lombok
+
+
+
+ com.ippon.trainning.kafkaintegrationtest
+ kafka-common-test
+ 0.0.1-SNAPSHOT
+ test
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+ org.springframework.kafka
+ spring-kafka-test
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter-engine
+ ${jupiter.version}
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+ ${jupiter.version}
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter-params
+ ${jupiter.version}
+ test
+
+
+ org.awaitility
+ awaitility
+ test
+
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+ exec
+
+
+
+
+
+ org.codehaus.mojo
+ build-helper-maven-plugin
+ ${build-helper-maven-plugin.version}
+
+
+ add-integration-test
+
+ add-test-source
+ add-test-resource
+
+
+
+ ${integration-test.source.directory}
+
+
+
+ ${integration-test.resources.directory}
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-failsafe-plugin
+ ${maven-failsafe-plugin.version}
+
+
+
+ integration-test
+ verify
+
+
+
+
+
+
+
diff --git a/kafka-consumer/src/integration-test/java/com/ippon/trainning/kafkaintegrationtest/kafkaconsumer/consumer/KafkaConsumerIT.java b/kafka-consumer/src/integration-test/java/com/ippon/trainning/kafkaintegrationtest/kafkaconsumer/consumer/KafkaConsumerIT.java
new file mode 100644
index 0000000..bcdd5ea
--- /dev/null
+++ b/kafka-consumer/src/integration-test/java/com/ippon/trainning/kafkaintegrationtest/kafkaconsumer/consumer/KafkaConsumerIT.java
@@ -0,0 +1,85 @@
+package com.ippon.trainning.kafkaintegrationtest.kafkaconsumer.consumer;
+
+import com.ippon.trainning.kafkaintegrationtest.kafkacommontest.extension.EmbeddedKafkaExtension;
+import com.ippon.trainning.kafkaintegrationtest.kafkaconsumer.service.MessageService;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.awaitility.Durations;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.test.context.ActiveProfiles;
+import org.springframework.test.context.junit.jupiter.SpringExtension;
+
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.kafka.clients.producer.ProducerConfig.*;
+import static org.awaitility.Awaitility.await;
+import static org.mockito.BDDMockito.then;
+
+@ExtendWith({
+ SpringExtension.class,
+ MockitoExtension.class,
+ EmbeddedKafkaExtension.class
+})
+@SpringBootTest(classes = {
+ KafkaConsumerIT.KafkaConsumerITConfiguration.class
+}, properties = "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}")
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+@EmbeddedKafka
+@ActiveProfiles({"test"})
+public class KafkaConsumerIT {
+ @TestConfiguration
+ public static class KafkaConsumerITConfiguration {
+ @Bean
+ public DefaultKafkaProducerFactory producerFactory(
+ @Value("${server.kafka.bootstrapAddress}") String bootstrapAddress
+ ) {
+ Map configProperties = Map.of(
+ BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress,
+ KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
+ VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class
+ );
+
+ return new DefaultKafkaProducerFactory<>(configProperties);
+ }
+
+ @Bean
+ public KafkaTemplate kafkaTemplate(ProducerFactory producerFactory) {
+ return new KafkaTemplate<>(producerFactory);
+ }
+ }
+
+ @Autowired
+ private KafkaTemplate kafkaTemplate;
+ @Value("${server.kafka.topic}")
+ private String topic;
+
+ @MockBean
+ private MessageService messageService;
+
+ @Test
+ void should_comsume_a_message_from_kafka_topic() {
+ // given
+ String key = UUID.randomUUID().toString();
+ String message = "A message to consume";
+
+ // when
+ kafkaTemplate.send(topic, key, message);
+
+ // then
+ await().atMost(Durations.FIVE_SECONDS).untilAsserted(() -> then(messageService).should().handleMessage(key, message));
+ }
+}
diff --git a/kafka-consumer/src/integration-test/resources/application-test.yml b/kafka-consumer/src/integration-test/resources/application-test.yml
new file mode 100644
index 0000000..8d0d79c
--- /dev/null
+++ b/kafka-consumer/src/integration-test/resources/application-test.yml
@@ -0,0 +1,5 @@
+server:
+ kafka:
+ bootstrapAddress: "${spring.embedded.kafka.brokers:}"
+ topic: topic-test
+ groupId: groupId-test
diff --git a/kafka-consumer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaconsumer/KafkaConsumerApplication.java b/kafka-consumer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaconsumer/KafkaConsumerApplication.java
new file mode 100644
index 0000000..4e2eb8c
--- /dev/null
+++ b/kafka-consumer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaconsumer/KafkaConsumerApplication.java
@@ -0,0 +1,11 @@
+package com.ippon.trainning.kafkaintegrationtest.kafkaconsumer;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class KafkaConsumerApplication {
+ public static void main(String[] args) {
+ SpringApplication.run(KafkaConsumerApplication.class, args);
+ }
+}
diff --git a/kafka-consumer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaconsumer/config/KafkaConsumerConfiguration.java b/kafka-consumer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaconsumer/config/KafkaConsumerConfiguration.java
new file mode 100644
index 0000000..7980033
--- /dev/null
+++ b/kafka-consumer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaconsumer/config/KafkaConsumerConfiguration.java
@@ -0,0 +1,46 @@
+package com.ippon.trainning.kafkaintegrationtest.kafkaconsumer.config;
+
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+
+import java.util.Map;
+
+import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.CommonClientConfigs.GROUP_ID_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+
+@Configuration
+public class KafkaConsumerConfiguration {
+ private final String bootstrapAddress;
+ private final String groupId;
+
+ public KafkaConsumerConfiguration(@Value("${server.kafka.bootstrapAddress}") String bootstrapAddress,
+ @Value("${server.kafka.groupId}") String groupId) {
+ this.bootstrapAddress = bootstrapAddress;
+ this.groupId = groupId;
+ }
+
+ @Bean
+ public ConsumerFactory consumerFactory() {
+ Map props = Map.of(
+ BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress,
+ VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
+ KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
+ GROUP_ID_CONFIG, groupId
+ );
+ return new DefaultKafkaConsumerFactory<>(props);
+ }
+
+ @Bean
+ public ConcurrentKafkaListenerContainerFactory containerFactory(ConsumerFactory consumerFactory) {
+ ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
+ factory.setConsumerFactory(consumerFactory);
+ return factory;
+ }
+}
diff --git a/kafka-consumer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaconsumer/consumer/KafkaConsumer.java b/kafka-consumer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaconsumer/consumer/KafkaConsumer.java
new file mode 100644
index 0000000..7ff61a3
--- /dev/null
+++ b/kafka-consumer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaconsumer/consumer/KafkaConsumer.java
@@ -0,0 +1,25 @@
+package com.ippon.trainning.kafkaintegrationtest.kafkaconsumer.consumer;
+
+import com.ippon.trainning.kafkaintegrationtest.kafkaconsumer.service.MessageService;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.stereotype.Service;
+
+@Service
+@Slf4j
+public class KafkaConsumer {
+ private final MessageService messageService;
+
+ public KafkaConsumer(MessageService messageService) {
+ this.messageService = messageService;
+ }
+
+ @KafkaListener(
+ containerFactory = "containerFactory",
+ topics = "${server.kafka.topic}"
+ )
+ public void listenTopic(ConsumerRecord record) {
+ messageService.handleMessage(record.key(), record.value());
+ }
+}
diff --git a/kafka-consumer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaconsumer/service/MessageService.java b/kafka-consumer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaconsumer/service/MessageService.java
new file mode 100644
index 0000000..c2782fa
--- /dev/null
+++ b/kafka-consumer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaconsumer/service/MessageService.java
@@ -0,0 +1,12 @@
+package com.ippon.trainning.kafkaintegrationtest.kafkaconsumer.service;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+
+@Service
+@Slf4j
+public class MessageService {
+ public void handleMessage(String key, String message) {
+ log.info("Message consumed: <{}> - <{}>", key, message);
+ }
+}
diff --git a/kafka-consumer/src/main/resources/application.yml b/kafka-consumer/src/main/resources/application.yml
new file mode 100644
index 0000000..bf30e1d
--- /dev/null
+++ b/kafka-consumer/src/main/resources/application.yml
@@ -0,0 +1,6 @@
+server:
+ port: 8081
+ kafka:
+ bootstrapAddress: localhost:9092
+ topic: topic-test
+ groupId: groupId-test
\ No newline at end of file
diff --git a/kafka-consumer/src/test/java/com/ippon/trainning/kafkaintegrationtest/kafkaconsumer/KafkaConsumerApplicationTests.java b/kafka-consumer/src/test/java/com/ippon/trainning/kafkaintegrationtest/kafkaconsumer/KafkaConsumerApplicationTests.java
new file mode 100644
index 0000000..0f25a4b
--- /dev/null
+++ b/kafka-consumer/src/test/java/com/ippon/trainning/kafkaintegrationtest/kafkaconsumer/KafkaConsumerApplicationTests.java
@@ -0,0 +1,13 @@
+package com.ippon.trainning.kafkaintegrationtest.kafkaconsumer;
+
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+
+@SpringBootTest
+class KafkaConsumerApplicationTests {
+
+ @Test
+ void contextLoads() {
+ }
+
+}
diff --git a/kafka-producer/pom.xml b/kafka-producer/pom.xml
new file mode 100644
index 0000000..0487d5b
--- /dev/null
+++ b/kafka-producer/pom.xml
@@ -0,0 +1,137 @@
+
+
+ 4.0.0
+
+ com.ippon.trainning.kafkaintegrationtest
+ kafka-integration-test-parent
+ 0.0.1-SNAPSHOT
+ ../pom.xml
+
+
+ kafka-producer
+ 0.0.1-SNAPSHOT
+
+
+ 11
+ 5.7.0
+ 3.2.0
+ src/integration-test/java
+ src/integration-test/resources
+ 2.22.2
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+
+
+ org.springframework.kafka
+ spring-kafka
+
+
+ org.projectlombok
+ lombok
+
+
+
+ com.ippon.trainning.kafkaintegrationtest
+ kafka-common-test
+ 0.0.1-SNAPSHOT
+ test
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+ junit
+ junit
+
+
+
+
+ org.springframework.kafka
+ spring-kafka-test
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter-engine
+ ${jupiter.version}
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+ ${jupiter.version}
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter-params
+ ${jupiter.version}
+ test
+
+
+ org.awaitility
+ awaitility
+ test
+
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+ exec
+
+
+
+
+
+ org.codehaus.mojo
+ build-helper-maven-plugin
+ ${build-helper-maven-plugin.version}
+
+
+ add-integration-test
+
+ add-test-source
+ add-test-resource
+
+
+
+ ${integration-test.source.directory}
+
+
+
+ ${integration-test.resources.directory}
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-failsafe-plugin
+ ${maven-failsafe-plugin.version}
+
+
+
+ integration-test
+ verify
+
+
+
+
+
+
+
diff --git a/kafka-producer/src/integration-test/java/com/ippon/trainning/kafkaintegrationtest/kafkaproducer/producer/ProducerIT.java b/kafka-producer/src/integration-test/java/com/ippon/trainning/kafkaintegrationtest/kafkaproducer/producer/ProducerIT.java
new file mode 100644
index 0000000..bf5b743
--- /dev/null
+++ b/kafka-producer/src/integration-test/java/com/ippon/trainning/kafkaintegrationtest/kafkaproducer/producer/ProducerIT.java
@@ -0,0 +1,96 @@
+package com.ippon.trainning.kafkaintegrationtest.kafkaproducer.producer;
+
+import com.ippon.trainning.kafkaintegrationtest.kafkacommontest.extension.EmbeddedKafkaExtension;
+import com.ippon.trainning.kafkaintegrationtest.kafkacommontest.utils.KafkaTopicListener;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.test.context.ActiveProfiles;
+import org.springframework.test.context.junit.jupiter.SpringExtension;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static com.ippon.trainning.kafkaintegrationtest.kafkacommontest.assertor.ConsumerRecordAssert.assertThat;
+
+@ExtendWith({
+ SpringExtension.class,
+ EmbeddedKafkaExtension.class
+})
+@SpringBootTest(classes = {
+ ProducerIT.ProducerITConfiguration.class
+}, properties = "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}")
+@EmbeddedKafka
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+@ActiveProfiles({"test"})
+public class ProducerIT {
+ @TestConfiguration
+ public static class ProducerITConfiguration {
+ @Bean("message-container-factory")
+ public ConcurrentKafkaListenerContainerFactory containerFactory(
+ @Value("${server.kafka.bootstrapAddress}") String bootstrapAddress
+ ) {
+ var factory = new ConcurrentKafkaListenerContainerFactory();
+ factory.setConsumerFactory(consumerFactory(bootstrapAddress));
+ factory.setConcurrency(1);
+ return factory;
+ }
+
+ private ConsumerFactory consumerFactory(String bootstrapAddress) {
+ Map props = new HashMap<>();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ return new DefaultKafkaConsumerFactory<>(props);
+ }
+
+ @Bean
+ public KafkaTopicListener messageKafkaTopicListener() {
+ return new KafkaTopicListener<>() {
+ @KafkaListener(
+ topics = "${server.kafka.topic}",
+ groupId = "${server.kafka.groupId}",
+ containerFactory = "message-container-factory"
+ )
+ @Override
+ public void listen(ConsumerRecord message) {
+ super.listen(message);
+ }
+ };
+ }
+ }
+
+ @Autowired
+ private KafkaProducer producer;
+ @Autowired
+ @Qualifier("messageKafkaTopicListener")
+ private KafkaTopicListener messageKafkaTopicListener;
+
+ @Test
+ void should_send_a_message_into_kafka_topic() {
+ // given
+ String message = "A test message to send into topic";
+
+ // when
+ producer.sendMessage(message);
+
+ // then
+ ConsumerRecord messageConsumed = messageKafkaTopicListener.getMessage();
+ assertThat(messageConsumed)
+ .hasValue(message);
+ }
+}
diff --git a/kafka-producer/src/integration-test/resources/application-test.yml b/kafka-producer/src/integration-test/resources/application-test.yml
new file mode 100644
index 0000000..8d0d79c
--- /dev/null
+++ b/kafka-producer/src/integration-test/resources/application-test.yml
@@ -0,0 +1,5 @@
+server:
+ kafka:
+ bootstrapAddress: "${spring.embedded.kafka.brokers:}"
+ topic: topic-test
+ groupId: groupId-test
diff --git a/kafka-producer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaproducer/KafkaProducerApplication.java b/kafka-producer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaproducer/KafkaProducerApplication.java
new file mode 100644
index 0000000..536d946
--- /dev/null
+++ b/kafka-producer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaproducer/KafkaProducerApplication.java
@@ -0,0 +1,11 @@
+package com.ippon.trainning.kafkaintegrationtest.kafkaproducer;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class KafkaProducerApplication {
+ public static void main(String[] args) {
+ SpringApplication.run(KafkaProducerApplication.class, args);
+ }
+}
diff --git a/kafka-producer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaproducer/config/KafkaProducerConfiguration.java b/kafka-producer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaproducer/config/KafkaProducerConfiguration.java
new file mode 100644
index 0000000..c6b9eeb
--- /dev/null
+++ b/kafka-producer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaproducer/config/KafkaProducerConfiguration.java
@@ -0,0 +1,37 @@
+package com.ippon.trainning.kafkaintegrationtest.kafkaproducer.config;
+
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+
+import java.util.Map;
+
+import static org.apache.kafka.clients.producer.ProducerConfig.*;
+
+@Configuration
+public class KafkaProducerConfiguration {
+ private final String bootstrapAddress;
+
+ public KafkaProducerConfiguration(@Value("${server.kafka.bootstrapAddress}") String bootstrapAddress) {
+ this.bootstrapAddress = bootstrapAddress;
+ }
+
+ @Bean
+ public ProducerFactory producerFactory() {
+ Map configProperties = Map.of(
+ BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress,
+ KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
+ VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class
+ );
+ return new DefaultKafkaProducerFactory<>(configProperties);
+ }
+
+ @Bean
+ public KafkaTemplate kafkaTemplate() {
+ return new KafkaTemplate<>(producerFactory());
+ }
+}
diff --git a/kafka-producer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaproducer/controller/MessageController.java b/kafka-producer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaproducer/controller/MessageController.java
new file mode 100644
index 0000000..b64bd2d
--- /dev/null
+++ b/kafka-producer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaproducer/controller/MessageController.java
@@ -0,0 +1,30 @@
+package com.ippon.trainning.kafkaintegrationtest.kafkaproducer.controller;
+
+import com.ippon.trainning.kafkaintegrationtest.kafkaproducer.producer.KafkaProducer;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RestController;
+
+import static org.springframework.http.HttpStatus.INTERNAL_SERVER_ERROR;
+
+@RestController
+public class MessageController {
+ private final KafkaProducer kafkaProducer;
+
+ public MessageController(KafkaProducer kafkaProducer) {
+ this.kafkaProducer = kafkaProducer;
+ }
+
+ @PostMapping("/messages")
+ public ResponseEntity sendMessage(@RequestBody String message) {
+ ResponseEntity response;
+ try {
+ kafkaProducer.sendMessage(message);
+ response = ResponseEntity.ok("Message sent.");
+ } catch(Exception ex) {
+ response = ResponseEntity.status(INTERNAL_SERVER_ERROR).body("Message no sent.");
+ }
+ return response;
+ }
+}
diff --git a/kafka-producer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaproducer/producer/KafkaProducer.java b/kafka-producer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaproducer/producer/KafkaProducer.java
new file mode 100644
index 0000000..51069f9
--- /dev/null
+++ b/kafka-producer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaproducer/producer/KafkaProducer.java
@@ -0,0 +1,23 @@
+package com.ippon.trainning.kafkaintegrationtest.kafkaproducer.producer;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.stereotype.Service;
+
+import java.util.UUID;
+
+@Service
+public class KafkaProducer {
+ private final KafkaTemplate kafkaTemplate;
+ private final String topic;
+
+ public KafkaProducer(KafkaTemplate kafkaTemplate,
+ @Value("${server.kafka.topic}") String topic) {
+ this.kafkaTemplate = kafkaTemplate;
+ this.topic = topic;
+ }
+
+ public void sendMessage(String message) {
+ kafkaTemplate.send(topic, UUID.randomUUID().toString(), message);
+ }
+}
diff --git a/kafka-producer/src/main/resources/application.yml b/kafka-producer/src/main/resources/application.yml
new file mode 100644
index 0000000..3fa7f96
--- /dev/null
+++ b/kafka-producer/src/main/resources/application.yml
@@ -0,0 +1,5 @@
+server:
+ port: 8082
+ kafka:
+ bootstrapAddress: "localhost:9092"
+ topic: topic-test
\ No newline at end of file
diff --git a/kafka-producer/src/test/java/com/ippon/trainning/kafkaintegrationtest/kafkaproducer/KafkaProducerApplicationTests.java b/kafka-producer/src/test/java/com/ippon/trainning/kafkaintegrationtest/kafkaproducer/KafkaProducerApplicationTests.java
new file mode 100644
index 0000000..22ed70e
--- /dev/null
+++ b/kafka-producer/src/test/java/com/ippon/trainning/kafkaintegrationtest/kafkaproducer/KafkaProducerApplicationTests.java
@@ -0,0 +1,13 @@
+package com.ippon.trainning.kafkaintegrationtest.kafkaproducer;
+
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+
+@SpringBootTest
+class KafkaProducerApplicationTests {
+
+ @Test
+ void contextLoads() {
+ }
+
+}
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..1ce8955
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,36 @@
+
+
+ 4.0.0
+
+ com.ippon.trainning.kafkaintegrationtest
+ kafka-integration-test-parent
+ 0.0.1-SNAPSHOT
+ pom
+
+
+ UTF-8
+ UTF-8
+ 11
+ ${java.version}
+ ${java.version}
+
+
+
+ kafka-common-test
+ kafka-consumer
+ kafka-producer
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-dependencies
+ 2.4.5
+ pom
+ import
+
+
+
+