diff --git a/kafka-consumer/pom.xml b/kafka-consumer/pom.xml index 4931528..97ac126 100644 --- a/kafka-consumer/pom.xml +++ b/kafka-consumer/pom.xml @@ -19,8 +19,16 @@ src/integration-test/java src/integration-test/resources 2.22.2 + 1.9.2 + + + confluent + https://packages.confluent.io/maven/ + + + org.springframework.boot @@ -34,6 +42,27 @@ org.projectlombok lombok + + org.apache.avro + avro + ${avro.version} + + + org.apache.avro + avro-maven-plugin + ${avro.version} + + + org.apache.avro + avro-compiler + ${avro.version} + + + io.confluent + kafka-avro-serializer + + 6.0.0 + com.ippon.trainning.kafkaintegrationtest diff --git a/kafka-consumer/src/integration-test/java/com/ippon/trainning/kafkaintegrationtest/kafkaconsumer/consumer/KafkaAvroConsumerIT.java b/kafka-consumer/src/integration-test/java/com/ippon/trainning/kafkaintegrationtest/kafkaconsumer/consumer/KafkaAvroConsumerIT.java new file mode 100644 index 0000000..65a9657 --- /dev/null +++ b/kafka-consumer/src/integration-test/java/com/ippon/trainning/kafkaintegrationtest/kafkaconsumer/consumer/KafkaAvroConsumerIT.java @@ -0,0 +1,162 @@ +package com.ippon.trainning.kafkaintegrationtest.kafkaconsumer.consumer; + +import com.ippon.trainning.kafkaintegrationtest.kafkacommontest.extension.EmbeddedKafkaExtension; +import com.ippon.trainning.kafkaintegrationtest.kafkaconsumer.service.MessageService; +import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; +import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import io.confluent.kafka.serializers.KafkaAvroSerializer; +import org.awaitility.Durations; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Primary; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.*; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.junit.jupiter.SpringExtension; +import org.springframework.web.client.RestClientException; + +import java.io.IOException; +import java.util.UUID; + +import static com.ippon.trainning.kafkaintegrationtest.kafkaconsumer.model.AvroSchemas.avroGeneratedObjectSchema; +import static org.awaitility.Awaitility.await; +import static org.mockito.BDDMockito.then; + +@ExtendWith({ + SpringExtension.class, + EmbeddedKafkaExtension.class, + MockitoExtension.class +}) +@SpringBootTest(classes = { + KafkaAvroConsumerIT.KafkaAvroConsumerITConfiguration.class +}, properties = "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}") +@EmbeddedKafka +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +@ActiveProfiles({"test"}) +public class KafkaAvroConsumerIT { + @TestConfiguration + public static class KafkaAvroConsumerITConfiguration { + /** Properties filled in {@code application-test.yml}. */ + private final KafkaProperties kafkaProperties; + + public KafkaAvroConsumerITConfiguration(KafkaProperties kafkaProperties) { + this.kafkaProperties = kafkaProperties; + } + + /* ******************************************************************** */ + /* Configuration du kafka template pour déclencher le test du consumer. */ + /* ******************************************************************** */ + /** + * Mock schema registry bean used by Kafka Avro Serde since + * the @EmbeddedKafka setup doesn't include a schema registry. + * @return MockSchemaRegistryClient instance + */ + @Bean + public MockSchemaRegistryClient schemaRegistryClient(@Value("${server.kafka.topic}") String topic) throws IOException, RestClientException, io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException { + MockSchemaRegistryClient mockSchemaRegistryClient = new MockSchemaRegistryClient(); + mockSchemaRegistryClient.register(topic + "-value", avroGeneratedObjectSchema); + return mockSchemaRegistryClient; + } + + @Bean + public KafkaAvroSerializer kafkaAvroSerializer(MockSchemaRegistryClient mockSchemaRegistryClient) { + return new KafkaAvroSerializer(mockSchemaRegistryClient); + } + + @Bean + public DefaultKafkaProducerFactory producerFactory(KafkaAvroSerializer kafkaAvroSerializer) { + //noinspection unchecked + return new DefaultKafkaProducerFactory( + kafkaProperties.buildProducerProperties(), + new KafkaAvroSerializer(), + kafkaAvroSerializer + ); + } + + @Bean + public KafkaTemplate kafkaTemplate(ProducerFactory producerFactory) { + return new KafkaTemplate<>(producerFactory); + } + + + + + /* *********************************************************************************** */ + /* Configuration de la consumer factory pour qu'elle utilise le schema registry mocké. */ + /* *********************************************************************************** */ + /** + * KafkaAvroDeserializer that uses the MockSchemaRegistryClient. + * The props must be provided so that specific.avro.reader: true + * is set. Without this, the consumer will receive GenericData records. + * @return KafkaAvroDeserializer instance + */ + @Bean + public KafkaAvroDeserializer kafkaAvroDeserializer(MockSchemaRegistryClient schemaRegistryClient) { + return new KafkaAvroDeserializer(schemaRegistryClient, kafkaProperties.buildConsumerProperties()); + } + + /** + * Configures the kafka consumer factory to use the overridden + * KafkaAvroSerializer so that the MockSchemaRegistryClient + * is used rather than trying to reach out via HTTP to a schema registry + * @return DefaultKafkaConsumerFactory instance + */ + @Bean + @Primary + public DefaultKafkaConsumerFactory consumerFactory( + KafkaAvroDeserializer kafkaAvroDeserializer + ) { + return new DefaultKafkaConsumerFactory( + kafkaProperties.buildConsumerProperties(), + new KafkaAvroDeserializer(), + kafkaAvroDeserializer + ); + } + + /** + * Configure the ListenerContainerFactory to use the overridden + * consumer factory so that the MockSchemaRegistryClient is used + * under the covers by all consumers when deserializing Avro data. + * @return ConcurrentKafkaListenerContainerFactory instance + */ + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( + ConsumerFactory consumerFactory + ) { + var factory = new ConcurrentKafkaListenerContainerFactory(); + factory.setConsumerFactory(consumerFactory); + return factory; + } + } + + @Value("${server.kafka.topic}") + private String topic; + + @Autowired + private KafkaTemplate kafkaTemplate; + @MockBean + private MessageService messageService; + + @Test + void should_consume_a_message_from_topic() { + // given + String key = UUID.randomUUID().toString(); + String value = "A message to consume"; + + // when + kafkaTemplate.send(topic, key, value); + + // then + await().atMost(Durations.FIVE_SECONDS).untilAsserted(() -> then(messageService).should().handleMessage(key, value)); + } +} diff --git a/kafka-consumer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaconsumer/model/AvroSchemas.java b/kafka-consumer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaconsumer/model/AvroSchemas.java new file mode 100644 index 0000000..f1efaa6 --- /dev/null +++ b/kafka-consumer/src/main/java/com/ippon/trainning/kafkaintegrationtest/kafkaconsumer/model/AvroSchemas.java @@ -0,0 +1,32 @@ +package com.ippon.trainning.kafkaintegrationtest.kafkaconsumer.model; + +import org.apache.avro.Schema; + +public class AvroSchemas { + public static final org.apache.avro.Schema avroGeneratedObjectSchema = new org.apache.avro.Schema.Parser().parse("{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"AvroGeneratedObject\",\n" + + " \"namespace\": \"com.ippon.trainning.kafkaintegrationtest.kafkaconsumer.model.avro\",\n" + + " \"fields\": [\n" + + " {\n" + + " \"name\": \"oneStringField\",\n" + + " \"type\": [\n" + + " \"null\",\n" + + " {\n" + + " \"type\": \"string\",\n" + + " \"avro.java.string\": \"String\"\n" + + " }\n" + + " ]\n" + + " },\n" + + " {\n" + + " \"name\": \"oneIntegerField\",\n" + + " \"type\": [\n" + + " \"null\",\n" + + " \"int\"\n" + + " ],\n" + + " \"default\": null,\n" + + " \"comment\": \"\"\n" + + " }\n" + + " ]\n" + + "}"); +}