Add IT that uses a mocked schema registry.

This commit is contained in:
Florian THIERRY
2021-05-06 10:31:03 +02:00
parent df3b76f166
commit 045e2aee6d
3 changed files with 223 additions and 0 deletions

View File

@@ -19,8 +19,16 @@
<integration-test.source.directory>src/integration-test/java</integration-test.source.directory>
<integration-test.resources.directory>src/integration-test/resources</integration-test.resources.directory>
<maven-failsafe-plugin.version>2.22.2</maven-failsafe-plugin.version>
<avro.version>1.9.2</avro.version>
</properties>
<repositories>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
@@ -34,6 +42,27 @@
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-compiler</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<!-- For Confluent Platform 6.0.0 -->
<version>6.0.0</version>
</dependency>
<dependency>
<groupId>com.ippon.trainning.kafkaintegrationtest</groupId>

View File

@@ -0,0 +1,162 @@
package com.ippon.trainning.kafkaintegrationtest.kafkaconsumer.consumer;
import com.ippon.trainning.kafkaintegrationtest.kafkacommontest.extension.EmbeddedKafkaExtension;
import com.ippon.trainning.kafkaintegrationtest.kafkaconsumer.service.MessageService;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.awaitility.Durations;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.web.client.RestClientException;
import java.io.IOException;
import java.util.UUID;
import static com.ippon.trainning.kafkaintegrationtest.kafkaconsumer.model.AvroSchemas.avroGeneratedObjectSchema;
import static org.awaitility.Awaitility.await;
import static org.mockito.BDDMockito.then;
@ExtendWith({
SpringExtension.class,
EmbeddedKafkaExtension.class,
MockitoExtension.class
})
@SpringBootTest(classes = {
KafkaAvroConsumerIT.KafkaAvroConsumerITConfiguration.class
}, properties = "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}")
@EmbeddedKafka
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@ActiveProfiles({"test"})
public class KafkaAvroConsumerIT {
@TestConfiguration
public static class KafkaAvroConsumerITConfiguration {
/** Properties filled in {@code application-test.yml}. */
private final KafkaProperties kafkaProperties;
public KafkaAvroConsumerITConfiguration(KafkaProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties;
}
/* ******************************************************************** */
/* Configuration du kafka template pour déclencher le test du consumer. */
/* ******************************************************************** */
/**
* Mock schema registry bean used by Kafka Avro Serde since
* the @EmbeddedKafka setup doesn't include a schema registry.
* @return MockSchemaRegistryClient instance
*/
@Bean
public MockSchemaRegistryClient schemaRegistryClient(@Value("${server.kafka.topic}") String topic) throws IOException, RestClientException, io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException {
MockSchemaRegistryClient mockSchemaRegistryClient = new MockSchemaRegistryClient();
mockSchemaRegistryClient.register(topic + "-value", avroGeneratedObjectSchema);
return mockSchemaRegistryClient;
}
@Bean
public KafkaAvroSerializer kafkaAvroSerializer(MockSchemaRegistryClient mockSchemaRegistryClient) {
return new KafkaAvroSerializer(mockSchemaRegistryClient);
}
@Bean
public DefaultKafkaProducerFactory<String, String> producerFactory(KafkaAvroSerializer kafkaAvroSerializer) {
//noinspection unchecked
return new DefaultKafkaProducerFactory(
kafkaProperties.buildProducerProperties(),
new KafkaAvroSerializer(),
kafkaAvroSerializer
);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
/* *********************************************************************************** */
/* Configuration de la consumer factory pour qu'elle utilise le schema registry mocké. */
/* *********************************************************************************** */
/**
* KafkaAvroDeserializer that uses the MockSchemaRegistryClient.
* The props must be provided so that specific.avro.reader: true
* is set. Without this, the consumer will receive GenericData records.
* @return KafkaAvroDeserializer instance
*/
@Bean
public KafkaAvroDeserializer kafkaAvroDeserializer(MockSchemaRegistryClient schemaRegistryClient) {
return new KafkaAvroDeserializer(schemaRegistryClient, kafkaProperties.buildConsumerProperties());
}
/**
* Configures the kafka consumer factory to use the overridden
* KafkaAvroSerializer so that the MockSchemaRegistryClient
* is used rather than trying to reach out via HTTP to a schema registry
* @return DefaultKafkaConsumerFactory instance
*/
@Bean
@Primary
public DefaultKafkaConsumerFactory<String, /* Replace by AVRO generated class */ Object> consumerFactory(
KafkaAvroDeserializer kafkaAvroDeserializer
) {
return new DefaultKafkaConsumerFactory(
kafkaProperties.buildConsumerProperties(),
new KafkaAvroDeserializer(),
kafkaAvroDeserializer
);
}
/**
* Configure the ListenerContainerFactory to use the overridden
* consumer factory so that the MockSchemaRegistryClient is used
* under the covers by all consumers when deserializing Avro data.
* @return ConcurrentKafkaListenerContainerFactory instance
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, /* Replace by AVRO generated class */ Object> kafkaListenerContainerFactory(
ConsumerFactory<String, /* Replace by AVRO generated class */ Object> consumerFactory
) {
var factory = new ConcurrentKafkaListenerContainerFactory<String, /* Replace by AVRO generated class */ Object>();
factory.setConsumerFactory(consumerFactory);
return factory;
}
}
@Value("${server.kafka.topic}")
private String topic;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@MockBean
private MessageService messageService;
@Test
void should_consume_a_message_from_topic() {
// given
String key = UUID.randomUUID().toString();
String value = "A message to consume";
// when
kafkaTemplate.send(topic, key, value);
// then
await().atMost(Durations.FIVE_SECONDS).untilAsserted(() -> then(messageService).should().handleMessage(key, value));
}
}

View File

@@ -0,0 +1,32 @@
package com.ippon.trainning.kafkaintegrationtest.kafkaconsumer.model;
import org.apache.avro.Schema;
public class AvroSchemas {
public static final org.apache.avro.Schema avroGeneratedObjectSchema = new org.apache.avro.Schema.Parser().parse("{\n" +
" \"type\": \"record\",\n" +
" \"name\": \"AvroGeneratedObject\",\n" +
" \"namespace\": \"com.ippon.trainning.kafkaintegrationtest.kafkaconsumer.model.avro\",\n" +
" \"fields\": [\n" +
" {\n" +
" \"name\": \"oneStringField\",\n" +
" \"type\": [\n" +
" \"null\",\n" +
" {\n" +
" \"type\": \"string\",\n" +
" \"avro.java.string\": \"String\"\n" +
" }\n" +
" ]\n" +
" },\n" +
" {\n" +
" \"name\": \"oneIntegerField\",\n" +
" \"type\": [\n" +
" \"null\",\n" +
" \"int\"\n" +
" ],\n" +
" \"default\": null,\n" +
" \"comment\": \"\"\n" +
" }\n" +
" ]\n" +
"}");
}