commit df3b76f166fa7103d1b9d656431e9927f428a22b Author: Florian THIERRY Date: Fri Apr 30 15:44:28 2021 +0200 Init. diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..64eb4e2 --- /dev/null +++ b/.gitignore @@ -0,0 +1,34 @@ +HELP.md +kafka-consumer/target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ +**/target/ + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ diff --git a/Insomnia-REST-client.json b/Insomnia-REST-client.json new file mode 100644 index 0000000..9d85e8f --- /dev/null +++ b/Insomnia-REST-client.json @@ -0,0 +1 @@ +{"_type":"export","__export_format":4,"__export_date":"2021-01-11T16:08:14.068Z","__export_source":"insomnia.desktop.app:v2020.5.2","resources":[{"_id":"req_72df07e95f3a48c0b49c9f0a93f9f82f","parentId":"fld_952979615ccb49a48fa38d35fcae9957","modified":1610378023564,"created":1610377899987,"url":"http://{{ _.kafkaProducer.host }}:{{ _.kafkaProducer.port }}/messages","name":"Send message","description":"","method":"POST","body":{"mimeType":"application/json","text":"\"Hello world!\""},"parameters":[],"headers":[{"name":"Content-Type","value":"application/json","id":"pair_184c9a3648114ff68c5a1bf3fb37b198"}],"authentication":{},"metaSortKey":-1610377899987,"isPrivate":false,"settingStoreCookies":true,"settingSendCookies":true,"settingDisableRenderRequestBody":false,"settingEncodeUrl":true,"settingRebuildPath":true,"settingFollowRedirects":"global","_type":"request"},{"_id":"fld_952979615ccb49a48fa38d35fcae9957","parentId":"wrk_05388a4751734d0e82825b35e6ef8689","modified":1610377891368,"created":1610377891368,"name":"KafkaProducer","description":"","environment":{},"environmentPropertyOrder":null,"metaSortKey":-1610377891368,"_type":"request_group"},{"_id":"wrk_05388a4751734d0e82825b35e6ef8689","parentId":null,"modified":1610377834700,"created":1610377834700,"name":"Insomnia","description":"","scope":null,"_type":"workspace"},{"_id":"env_339b114ac8e86fda9f4de7d2181e4b03f3650c0b","parentId":"wrk_05388a4751734d0e82825b35e6ef8689","modified":1610377970871,"created":1610377834725,"name":"Base Environment","data":{"kafkaProducer":{"host":"localhost","port":"8081"},"kafkaConsumer":{"host":"localhost","port":"8082"}},"dataPropertyOrder":{"&":["kafkaProducer","kafkaConsumer"],"&~|kafkaProducer":["host","port"],"&~|kafkaConsumer":["host","port"]},"color":null,"isPrivate":false,"metaSortKey":1610377834725,"_type":"environment"},{"_id":"jar_339b114ac8e86fda9f4de7d2181e4b03f3650c0b","parentId":"wrk_05388a4751734d0e82825b35e6ef8689","modified":1610377834726,"created":1610377834726,"name":"Default Jar","cookies":[],"_type":"cookie_jar"},{"_id":"spc_e64ff1b4a38a4c988025b81b1d7e1003","parentId":"wrk_05388a4751734d0e82825b35e6ef8689","modified":1610377834700,"created":1610377834700,"fileName":"Insomnia","contents":"","contentType":"yaml","_type":"api_spec"},{"_id":"env_54124313de4d4467b7f6feccaba48848","parentId":"env_339b114ac8e86fda9f4de7d2181e4b03f3650c0b","modified":1610381253855,"created":1610377974133,"name":"Localhost","data":{"kafkaConsumer":{"host":"localhost","port":"8081"},"kafkaProducer":{"host":"localhost","port":"8082"}},"dataPropertyOrder":{"&":["kafkaConsumer","kafkaProducer"],"&~|kafkaConsumer":["host","port"],"&~|kafkaProducer":["host","port"]},"color":null,"isPrivate":false,"metaSortKey":1610377974133,"_type":"environment"}]} \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..f5dabf0 --- /dev/null +++ b/README.md @@ -0,0 +1,11 @@ +# Kafka +## Commands +### Launch CMD Producer +```bash +$ kafka-console-producer.sh --broker-list localhost:9092 --topic test +``` + +### Launch CMD Consumer +```bash +$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test +``` \ No newline at end of file diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml new file mode 100644 index 0000000..ba43945 --- /dev/null +++ b/docker/docker-compose.yml @@ -0,0 +1,13 @@ +version: '3' +services: + zookeeper: + image: wurstmeister/zookeeper + ports: + - "2181:2181" + kafka: + image: wurstmeister/kafka + ports: + - "9092:9092" + environment: + KAFKA_ADVERTISED_HOST_NAME: "192.168.0.11" + KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" \ No newline at end of file diff --git a/kafka-common-test/pom.xml b/kafka-common-test/pom.xml new file mode 100644 index 0000000..980ca1f --- /dev/null +++ b/kafka-common-test/pom.xml @@ -0,0 +1,59 @@ + + + 4.0.0 + + com.ippon.trainning.kafkaintegrationtest + kafka-integration-test-parent + 0.0.1-SNAPSHOT + ../pom.xml + + + kafka-common-test + 0.0.1-SNAPSHOT + + + 11 + 5.7.0 + + + + + org.springframework.kafka + spring-kafka + + + org.springframework.boot + spring-boot-starter-test + + + junit + junit + + + + + org.springframework.kafka + spring-kafka-test + + + org.junit.jupiter + junit-jupiter-engine + ${jupiter.version} + + + org.junit.jupiter + junit-jupiter-api + ${jupiter.version} + + + org.junit.jupiter + junit-jupiter-params + ${jupiter.version} + + + org.awaitility + awaitility + + + diff --git a/kafka-common-test/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkacommontest/assertor/ConsumerRecordAssert.java b/kafka-common-test/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkacommontest/assertor/ConsumerRecordAssert.java new file mode 100644 index 0000000..49c00ad --- /dev/null +++ b/kafka-common-test/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkacommontest/assertor/ConsumerRecordAssert.java @@ -0,0 +1,73 @@ +package com.ippon.trainning.kafkaintegrationtest.kafkacommontest.assertor; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Header; +import org.assertj.core.api.AbstractObjectAssert; + +import java.util.Iterator; + +import org.assertj.core.api.Assertions; + +public class ConsumerRecordAssert extends AbstractObjectAssert, ConsumerRecord> { + private final ObjectMapper objectMapper = new ObjectMapper(); + + private ConsumerRecordAssert(ConsumerRecord record) { + super(record, ConsumerRecordAssert.class); + } + + public static ConsumerRecordAssert assertThat(ConsumerRecord record) { + return new ConsumerRecordAssert<>(record); + } + + public ConsumerRecordAssert hasValue(V expectedValue) { + isNotNull(); + + Assertions.assertThat(actual.value()).usingRecursiveComparison().isEqualTo(expectedValue); + + return this; + } + + public ConsumerRecordAssert hasKey(K expectedKey) { + isNotNull(); + + Assertions.assertThat(actual.key()).usingRecursiveComparison().isEqualTo(expectedKey); + + return this; + } + + public ConsumerRecordAssert hasHeader(String headerName, Object expectedHeaderValue) { + isNotNull(); + + String expectedHeaderValueAsJsonString = null; + try { + expectedHeaderValueAsJsonString = objectMapper.writeValueAsString(expectedHeaderValue); + } catch (JsonProcessingException ex) { + throw new IllegalArgumentException(String.format( + "Could not initialize assertion due to JSON serialization of expected value <%s>", + expectedHeaderValue + )); + } + + Assertions.assertThat(actual.headers()).isNotEmpty(); + + Iterator
headerIterator = actual.headers().headers(headerName).iterator(); + if (!headerIterator.hasNext()) { + failWithMessage("<%s> should contain header <%s> with value <%s>, but it does not.", + ConsumerRecord.class.getSimpleName(), + headerName, + expectedHeaderValueAsJsonString); + } + String headerValue = new String(headerIterator.next().value()); + if (!expectedHeaderValueAsJsonString.equals(headerValue)) { + failWithMessage("<%s> should contain header <%s> with value <%s>, but has <%s>.", + ConsumerRecord.class.getSimpleName(), + headerName, + expectedHeaderValueAsJsonString, + headerValue); + } + + return this; + } +} diff --git a/kafka-common-test/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkacommontest/extension/EmbeddedKafkaExtension.java b/kafka-common-test/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkacommontest/extension/EmbeddedKafkaExtension.java new file mode 100644 index 0000000..1cf95b0 --- /dev/null +++ b/kafka-common-test/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkacommontest/extension/EmbeddedKafkaExtension.java @@ -0,0 +1,66 @@ +package com.ippon.trainning.kafkaintegrationtest.kafkacommontest.extension; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.BeforeEachCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.springframework.context.ApplicationContext; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.listener.ContainerProperties; +import org.springframework.kafka.listener.KafkaMessageListenerContainer; +import org.springframework.kafka.listener.MessageListener; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.utils.ContainerTestUtils; +import org.springframework.test.context.junit.jupiter.SpringExtension; + +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +public class EmbeddedKafkaExtension implements BeforeEachCallback, AfterEachCallback { + private EmbeddedKafkaBroker embeddedKafkaBroker; + private KafkaMessageListenerContainer container; + /** + * This queue is required for {@link EmbeddedKafkaExtension#container} to start. + *

+ * We can improve this extension by providing this queue as a Spring bean but with setting up of a sort + * of factory to define genericity of {@link ConsumerRecord}. + *

+ */ + @SuppressWarnings({"FieldCanBeLocal", "MismatchedQueryAndUpdateOfCollection"}) + private BlockingQueue> records; + + @Override + public void beforeEach(ExtensionContext context) throws Exception { + ApplicationContext applicationContext = SpringExtension.getApplicationContext(context); + embeddedKafkaBroker = applicationContext.getBean(EmbeddedKafkaBroker.class); + + DefaultKafkaConsumerFactory consumerFactory = new DefaultKafkaConsumerFactory<>(getConsumerProperties()); + ContainerProperties containerProperties = new ContainerProperties("test-topic"); + container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties); + records = new LinkedBlockingQueue<>(); + container.setupMessageListener((MessageListener) records::add); + container.start(); + ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic()); + } + + private Map getConsumerProperties() { + return Map.of( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaBroker.getBrokersAsString(), + ConsumerConfig.GROUP_ID_CONFIG, "consumer", + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true", + ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10", + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "60000", + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class, + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class, + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest" + ); + } + + @Override + public void afterEach(ExtensionContext context) { + container.stop(); + } +} \ No newline at end of file diff --git a/kafka-common-test/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkacommontest/utils/KafkaTopicListener.java b/kafka-common-test/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkacommontest/utils/KafkaTopicListener.java new file mode 100644 index 0000000..cd3f15d --- /dev/null +++ b/kafka-common-test/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkacommontest/utils/KafkaTopicListener.java @@ -0,0 +1,68 @@ +package com.ippon.trainning.kafkaintegrationtest.kafkacommontest.utils; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.kafka.annotation.KafkaListener; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; + +import static java.util.concurrent.TimeUnit.SECONDS; + +/** + * Listener that will consume a kafka topic and store consumed message. + *

+ * This class serves to test a kafka producer. + *

+ * @param The kafka producer messages type. + */ +public class KafkaTopicListener { + /** + * Queue to store consumed messages. + */ + private final BlockingQueue> messages = new LinkedBlockingDeque<>(); + + /** + * Serves to consume a kafka topic, and store consumed messages in the queue. + *

+ * This method has to be annotated with {@link KafkaListener} annotation in classes that extend this one. + *

+ *

+ * Example: + *

{@code
+     *     new KafkaTopicListener<>() {
+     *         @KafkaListener(
+     *             topics = "topic-name",
+     *             groupId = "groupId-name",
+     *             containerFactory = "container-factory-bean-name"
+     *         )
+     *         @Override
+     *         public void listen(ConsumerRecord record) {
+     *             super.listen(record);
+     *         }
+     *     };
+     *     }
+ *

+ *

+ * Note: The {@code container-factory-bean-name} is a bean that you have to declare + * (with a {@code @Bean} annotated method) that should be an instance of + * {@link org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory}. + *

+ * @param message Topic consumed message. + */ + public void listen(ConsumerRecord message) { + messages.add(message); + } + + /** + * Waits at most 5 seconds that kafka listener consumes a message, + * and return it if one was consumed in this period, otherwise, {@code null} will be returned. + * @return The kafka consumed message, or {@code null}. + */ + public ConsumerRecord getMessage() { + try { + return messages.poll(5, SECONDS); + } catch (InterruptedException ex) { + throw new AssertionError(ex); + } + } +} \ No newline at end of file diff --git a/kafka-consumer/pom.xml b/kafka-consumer/pom.xml new file mode 100644 index 0000000..4931528 --- /dev/null +++ b/kafka-consumer/pom.xml @@ -0,0 +1,131 @@ + + + 4.0.0 + + com.ippon.trainning.kafkaintegrationtest + kafka-integration-test-parent + 0.0.1-SNAPSHOT + ../pom.xml + + + kafka-consumer + 0.0.1-SNAPSHOT + + + 11 + 5.7.0 + 3.2.0 + src/integration-test/java + src/integration-test/resources + 2.22.2 + + + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.kafka + spring-kafka + + + org.projectlombok + lombok + + + + com.ippon.trainning.kafkaintegrationtest + kafka-common-test + 0.0.1-SNAPSHOT + test + + + org.springframework.boot + spring-boot-starter-test + test + + + org.springframework.kafka + spring-kafka-test + test + + + org.junit.jupiter + junit-jupiter-engine + ${jupiter.version} + test + + + org.junit.jupiter + junit-jupiter-api + ${jupiter.version} + test + + + org.junit.jupiter + junit-jupiter-params + ${jupiter.version} + test + + + org.awaitility + awaitility + test + + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + exec + + + + + + org.codehaus.mojo + build-helper-maven-plugin + ${build-helper-maven-plugin.version} + + + add-integration-test + + add-test-source + add-test-resource + + + + ${integration-test.source.directory} + + + + ${integration-test.resources.directory} + + + + + + + + + org.apache.maven.plugins + maven-failsafe-plugin + ${maven-failsafe-plugin.version} + + + + integration-test + verify + + + + + + + diff --git a/kafka-consumer/src/integration-test/java/com/ippon/trainning/kafkaintegrationtest/kafkaconsumer/consumer/KafkaConsumerIT.java b/kafka-consumer/src/integration-test/java/com/ippon/trainning/kafkaintegrationtest/kafkaconsumer/consumer/KafkaConsumerIT.java new file mode 100644 index 0000000..bcdd5ea --- /dev/null +++ b/kafka-consumer/src/integration-test/java/com/ippon/trainning/kafkaintegrationtest/kafkaconsumer/consumer/KafkaConsumerIT.java @@ -0,0 +1,85 @@ +package com.ippon.trainning.kafkaintegrationtest.kafkaconsumer.consumer; + +import com.ippon.trainning.kafkaintegrationtest.kafkacommontest.extension.EmbeddedKafkaExtension; +import com.ippon.trainning.kafkaintegrationtest.kafkaconsumer.service.MessageService; +import org.apache.kafka.common.serialization.StringSerializer; +import org.awaitility.Durations; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.junit.jupiter.SpringExtension; + +import java.util.Map; +import java.util.UUID; + +import static org.apache.kafka.clients.producer.ProducerConfig.*; +import static org.awaitility.Awaitility.await; +import static org.mockito.BDDMockito.then; + +@ExtendWith({ + SpringExtension.class, + MockitoExtension.class, + EmbeddedKafkaExtension.class +}) +@SpringBootTest(classes = { + KafkaConsumerIT.KafkaConsumerITConfiguration.class +}, properties = "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}") +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +@EmbeddedKafka +@ActiveProfiles({"test"}) +public class KafkaConsumerIT { + @TestConfiguration + public static class KafkaConsumerITConfiguration { + @Bean + public DefaultKafkaProducerFactory producerFactory( + @Value("${server.kafka.bootstrapAddress}") String bootstrapAddress + ) { + Map configProperties = Map.of( + BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress, + KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class, + VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class + ); + + return new DefaultKafkaProducerFactory<>(configProperties); + } + + @Bean + public KafkaTemplate kafkaTemplate(ProducerFactory producerFactory) { + return new KafkaTemplate<>(producerFactory); + } + } + + @Autowired + private KafkaTemplate kafkaTemplate; + @Value("${server.kafka.topic}") + private String topic; + + @MockBean + private MessageService messageService; + + @Test + void should_comsume_a_message_from_kafka_topic() { + // given + String key = UUID.randomUUID().toString(); + String message = "A message to consume"; + + // when + kafkaTemplate.send(topic, key, message); + + // then + await().atMost(Durations.FIVE_SECONDS).untilAsserted(() -> then(messageService).should().handleMessage(key, message)); + } +} diff --git a/kafka-consumer/src/integration-test/resources/application-test.yml b/kafka-consumer/src/integration-test/resources/application-test.yml new file mode 100644 index 0000000..8d0d79c --- /dev/null +++ b/kafka-consumer/src/integration-test/resources/application-test.yml @@ -0,0 +1,5 @@ +server: + kafka: + bootstrapAddress: "${spring.embedded.kafka.brokers:}" + topic: topic-test + groupId: groupId-test diff --git a/kafka-consumer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaconsumer/KafkaConsumerApplication.java b/kafka-consumer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaconsumer/KafkaConsumerApplication.java new file mode 100644 index 0000000..4e2eb8c --- /dev/null +++ b/kafka-consumer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaconsumer/KafkaConsumerApplication.java @@ -0,0 +1,11 @@ +package com.ippon.trainning.kafkaintegrationtest.kafkaconsumer; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class KafkaConsumerApplication { + public static void main(String[] args) { + SpringApplication.run(KafkaConsumerApplication.class, args); + } +} diff --git a/kafka-consumer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaconsumer/config/KafkaConsumerConfiguration.java b/kafka-consumer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaconsumer/config/KafkaConsumerConfiguration.java new file mode 100644 index 0000000..7980033 --- /dev/null +++ b/kafka-consumer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaconsumer/config/KafkaConsumerConfiguration.java @@ -0,0 +1,46 @@ +package com.ippon.trainning.kafkaintegrationtest.kafkaconsumer.config; + +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; + +import java.util.Map; + +import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.CommonClientConfigs.GROUP_ID_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; + +@Configuration +public class KafkaConsumerConfiguration { + private final String bootstrapAddress; + private final String groupId; + + public KafkaConsumerConfiguration(@Value("${server.kafka.bootstrapAddress}") String bootstrapAddress, + @Value("${server.kafka.groupId}") String groupId) { + this.bootstrapAddress = bootstrapAddress; + this.groupId = groupId; + } + + @Bean + public ConsumerFactory consumerFactory() { + Map props = Map.of( + BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress, + VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class, + KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class, + GROUP_ID_CONFIG, groupId + ); + return new DefaultKafkaConsumerFactory<>(props); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory containerFactory(ConsumerFactory consumerFactory) { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory); + return factory; + } +} diff --git a/kafka-consumer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaconsumer/consumer/KafkaConsumer.java b/kafka-consumer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaconsumer/consumer/KafkaConsumer.java new file mode 100644 index 0000000..7ff61a3 --- /dev/null +++ b/kafka-consumer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaconsumer/consumer/KafkaConsumer.java @@ -0,0 +1,25 @@ +package com.ippon.trainning.kafkaintegrationtest.kafkaconsumer.consumer; + +import com.ippon.trainning.kafkaintegrationtest.kafkaconsumer.service.MessageService; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Service; + +@Service +@Slf4j +public class KafkaConsumer { + private final MessageService messageService; + + public KafkaConsumer(MessageService messageService) { + this.messageService = messageService; + } + + @KafkaListener( + containerFactory = "containerFactory", + topics = "${server.kafka.topic}" + ) + public void listenTopic(ConsumerRecord record) { + messageService.handleMessage(record.key(), record.value()); + } +} diff --git a/kafka-consumer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaconsumer/service/MessageService.java b/kafka-consumer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaconsumer/service/MessageService.java new file mode 100644 index 0000000..c2782fa --- /dev/null +++ b/kafka-consumer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaconsumer/service/MessageService.java @@ -0,0 +1,12 @@ +package com.ippon.trainning.kafkaintegrationtest.kafkaconsumer.service; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +@Service +@Slf4j +public class MessageService { + public void handleMessage(String key, String message) { + log.info("Message consumed: <{}> - <{}>", key, message); + } +} diff --git a/kafka-consumer/src/main/resources/application.yml b/kafka-consumer/src/main/resources/application.yml new file mode 100644 index 0000000..bf30e1d --- /dev/null +++ b/kafka-consumer/src/main/resources/application.yml @@ -0,0 +1,6 @@ +server: + port: 8081 + kafka: + bootstrapAddress: localhost:9092 + topic: topic-test + groupId: groupId-test \ No newline at end of file diff --git a/kafka-consumer/src/test/java/com/ippon/trainning/kafkaintegrationtest/kafkaconsumer/KafkaConsumerApplicationTests.java b/kafka-consumer/src/test/java/com/ippon/trainning/kafkaintegrationtest/kafkaconsumer/KafkaConsumerApplicationTests.java new file mode 100644 index 0000000..0f25a4b --- /dev/null +++ b/kafka-consumer/src/test/java/com/ippon/trainning/kafkaintegrationtest/kafkaconsumer/KafkaConsumerApplicationTests.java @@ -0,0 +1,13 @@ +package com.ippon.trainning.kafkaintegrationtest.kafkaconsumer; + +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; + +@SpringBootTest +class KafkaConsumerApplicationTests { + + @Test + void contextLoads() { + } + +} diff --git a/kafka-producer/pom.xml b/kafka-producer/pom.xml new file mode 100644 index 0000000..0487d5b --- /dev/null +++ b/kafka-producer/pom.xml @@ -0,0 +1,137 @@ + + + 4.0.0 + + com.ippon.trainning.kafkaintegrationtest + kafka-integration-test-parent + 0.0.1-SNAPSHOT + ../pom.xml + + + kafka-producer + 0.0.1-SNAPSHOT + + + 11 + 5.7.0 + 3.2.0 + src/integration-test/java + src/integration-test/resources + 2.22.2 + + + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.kafka + spring-kafka + + + org.projectlombok + lombok + + + + com.ippon.trainning.kafkaintegrationtest + kafka-common-test + 0.0.1-SNAPSHOT + test + + + org.springframework.boot + spring-boot-starter-test + test + + + junit + junit + + + + + org.springframework.kafka + spring-kafka-test + test + + + org.junit.jupiter + junit-jupiter-engine + ${jupiter.version} + test + + + org.junit.jupiter + junit-jupiter-api + ${jupiter.version} + test + + + org.junit.jupiter + junit-jupiter-params + ${jupiter.version} + test + + + org.awaitility + awaitility + test + + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + exec + + + + + + org.codehaus.mojo + build-helper-maven-plugin + ${build-helper-maven-plugin.version} + + + add-integration-test + + add-test-source + add-test-resource + + + + ${integration-test.source.directory} + + + + ${integration-test.resources.directory} + + + + + + + + + org.apache.maven.plugins + maven-failsafe-plugin + ${maven-failsafe-plugin.version} + + + + integration-test + verify + + + + + + + diff --git a/kafka-producer/src/integration-test/java/com/ippon/trainning/kafkaintegrationtest/kafkaproducer/producer/ProducerIT.java b/kafka-producer/src/integration-test/java/com/ippon/trainning/kafkaintegrationtest/kafkaproducer/producer/ProducerIT.java new file mode 100644 index 0000000..bf5b743 --- /dev/null +++ b/kafka-producer/src/integration-test/java/com/ippon/trainning/kafkaintegrationtest/kafkaproducer/producer/ProducerIT.java @@ -0,0 +1,96 @@ +package com.ippon.trainning.kafkaintegrationtest.kafkaproducer.producer; + +import com.ippon.trainning.kafkaintegrationtest.kafkacommontest.extension.EmbeddedKafkaExtension; +import com.ippon.trainning.kafkaintegrationtest.kafkacommontest.utils.KafkaTopicListener; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.extension.ExtendWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.junit.jupiter.SpringExtension; + +import java.util.HashMap; +import java.util.Map; + +import static com.ippon.trainning.kafkaintegrationtest.kafkacommontest.assertor.ConsumerRecordAssert.assertThat; + +@ExtendWith({ + SpringExtension.class, + EmbeddedKafkaExtension.class +}) +@SpringBootTest(classes = { + ProducerIT.ProducerITConfiguration.class +}, properties = "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}") +@EmbeddedKafka +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +@ActiveProfiles({"test"}) +public class ProducerIT { + @TestConfiguration + public static class ProducerITConfiguration { + @Bean("message-container-factory") + public ConcurrentKafkaListenerContainerFactory containerFactory( + @Value("${server.kafka.bootstrapAddress}") String bootstrapAddress + ) { + var factory = new ConcurrentKafkaListenerContainerFactory(); + factory.setConsumerFactory(consumerFactory(bootstrapAddress)); + factory.setConcurrency(1); + return factory; + } + + private ConsumerFactory consumerFactory(String bootstrapAddress) { + Map props = new HashMap<>(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + return new DefaultKafkaConsumerFactory<>(props); + } + + @Bean + public KafkaTopicListener messageKafkaTopicListener() { + return new KafkaTopicListener<>() { + @KafkaListener( + topics = "${server.kafka.topic}", + groupId = "${server.kafka.groupId}", + containerFactory = "message-container-factory" + ) + @Override + public void listen(ConsumerRecord message) { + super.listen(message); + } + }; + } + } + + @Autowired + private KafkaProducer producer; + @Autowired + @Qualifier("messageKafkaTopicListener") + private KafkaTopicListener messageKafkaTopicListener; + + @Test + void should_send_a_message_into_kafka_topic() { + // given + String message = "A test message to send into topic"; + + // when + producer.sendMessage(message); + + // then + ConsumerRecord messageConsumed = messageKafkaTopicListener.getMessage(); + assertThat(messageConsumed) + .hasValue(message); + } +} diff --git a/kafka-producer/src/integration-test/resources/application-test.yml b/kafka-producer/src/integration-test/resources/application-test.yml new file mode 100644 index 0000000..8d0d79c --- /dev/null +++ b/kafka-producer/src/integration-test/resources/application-test.yml @@ -0,0 +1,5 @@ +server: + kafka: + bootstrapAddress: "${spring.embedded.kafka.brokers:}" + topic: topic-test + groupId: groupId-test diff --git a/kafka-producer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaproducer/KafkaProducerApplication.java b/kafka-producer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaproducer/KafkaProducerApplication.java new file mode 100644 index 0000000..536d946 --- /dev/null +++ b/kafka-producer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaproducer/KafkaProducerApplication.java @@ -0,0 +1,11 @@ +package com.ippon.trainning.kafkaintegrationtest.kafkaproducer; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class KafkaProducerApplication { + public static void main(String[] args) { + SpringApplication.run(KafkaProducerApplication.class, args); + } +} diff --git a/kafka-producer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaproducer/config/KafkaProducerConfiguration.java b/kafka-producer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaproducer/config/KafkaProducerConfiguration.java new file mode 100644 index 0000000..c6b9eeb --- /dev/null +++ b/kafka-producer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaproducer/config/KafkaProducerConfiguration.java @@ -0,0 +1,37 @@ +package com.ippon.trainning.kafkaintegrationtest.kafkaproducer.config; + +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; + +import java.util.Map; + +import static org.apache.kafka.clients.producer.ProducerConfig.*; + +@Configuration +public class KafkaProducerConfiguration { + private final String bootstrapAddress; + + public KafkaProducerConfiguration(@Value("${server.kafka.bootstrapAddress}") String bootstrapAddress) { + this.bootstrapAddress = bootstrapAddress; + } + + @Bean + public ProducerFactory producerFactory() { + Map configProperties = Map.of( + BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress, + KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class, + VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class + ); + return new DefaultKafkaProducerFactory<>(configProperties); + } + + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } +} diff --git a/kafka-producer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaproducer/controller/MessageController.java b/kafka-producer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaproducer/controller/MessageController.java new file mode 100644 index 0000000..b64bd2d --- /dev/null +++ b/kafka-producer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaproducer/controller/MessageController.java @@ -0,0 +1,30 @@ +package com.ippon.trainning.kafkaintegrationtest.kafkaproducer.controller; + +import com.ippon.trainning.kafkaintegrationtest.kafkaproducer.producer.KafkaProducer; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RestController; + +import static org.springframework.http.HttpStatus.INTERNAL_SERVER_ERROR; + +@RestController +public class MessageController { + private final KafkaProducer kafkaProducer; + + public MessageController(KafkaProducer kafkaProducer) { + this.kafkaProducer = kafkaProducer; + } + + @PostMapping("/messages") + public ResponseEntity sendMessage(@RequestBody String message) { + ResponseEntity response; + try { + kafkaProducer.sendMessage(message); + response = ResponseEntity.ok("Message sent."); + } catch(Exception ex) { + response = ResponseEntity.status(INTERNAL_SERVER_ERROR).body("Message no sent."); + } + return response; + } +} diff --git a/kafka-producer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaproducer/producer/KafkaProducer.java b/kafka-producer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaproducer/producer/KafkaProducer.java new file mode 100644 index 0000000..51069f9 --- /dev/null +++ b/kafka-producer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaproducer/producer/KafkaProducer.java @@ -0,0 +1,23 @@ +package com.ippon.trainning.kafkaintegrationtest.kafkaproducer.producer; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Service; + +import java.util.UUID; + +@Service +public class KafkaProducer { + private final KafkaTemplate kafkaTemplate; + private final String topic; + + public KafkaProducer(KafkaTemplate kafkaTemplate, + @Value("${server.kafka.topic}") String topic) { + this.kafkaTemplate = kafkaTemplate; + this.topic = topic; + } + + public void sendMessage(String message) { + kafkaTemplate.send(topic, UUID.randomUUID().toString(), message); + } +} diff --git a/kafka-producer/src/main/resources/application.yml b/kafka-producer/src/main/resources/application.yml new file mode 100644 index 0000000..3fa7f96 --- /dev/null +++ b/kafka-producer/src/main/resources/application.yml @@ -0,0 +1,5 @@ +server: + port: 8082 + kafka: + bootstrapAddress: "localhost:9092" + topic: topic-test \ No newline at end of file diff --git a/kafka-producer/src/test/java/com/ippon/trainning/kafkaintegrationtest/kafkaproducer/KafkaProducerApplicationTests.java b/kafka-producer/src/test/java/com/ippon/trainning/kafkaintegrationtest/kafkaproducer/KafkaProducerApplicationTests.java new file mode 100644 index 0000000..22ed70e --- /dev/null +++ b/kafka-producer/src/test/java/com/ippon/trainning/kafkaintegrationtest/kafkaproducer/KafkaProducerApplicationTests.java @@ -0,0 +1,13 @@ +package com.ippon.trainning.kafkaintegrationtest.kafkaproducer; + +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; + +@SpringBootTest +class KafkaProducerApplicationTests { + + @Test + void contextLoads() { + } + +} diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..1ce8955 --- /dev/null +++ b/pom.xml @@ -0,0 +1,36 @@ + + + 4.0.0 + + com.ippon.trainning.kafkaintegrationtest + kafka-integration-test-parent + 0.0.1-SNAPSHOT + pom + + + UTF-8 + UTF-8 + 11 + ${java.version} + ${java.version} + + + + kafka-common-test + kafka-consumer + kafka-producer + + + + + + org.springframework.boot + spring-boot-dependencies + 2.4.5 + pom + import + + + +