From 92d2e752187fbec3d1b9649602f0be2666c18003 Mon Sep 17 00:00:00 2001 From: Florian THIERRY Date: Wed, 3 Feb 2021 10:43:53 +0100 Subject: [PATCH] Initial commit. --- .gitignore | 33 ++++++++ docker/docker-compose.yml | 15 ++++ pom.xml | 73 ++++++++++++++++++ .../com/example/demo/DemoApplication.java | 13 ++++ .../configuration/HealthConfiguration.java | 47 ++++++++++++ .../configuration/KafkaConfiguration.java | 75 +++++++++++++++++++ .../configuration/SpringConfiguration.java | 9 +++ .../java/com/example/demo/model/User.java | 17 +++++ .../example/demo/producer/UserProducer.java | 55 ++++++++++++++ .../com/example/demo/service/UserService.java | 27 +++++++ src/main/resources/application.yml | 25 +++++++ .../example/demo/DemoApplicationTests.java | 13 ++++ 12 files changed, 402 insertions(+) create mode 100644 .gitignore create mode 100644 docker/docker-compose.yml create mode 100644 pom.xml create mode 100644 src/main/java/com/example/demo/DemoApplication.java create mode 100644 src/main/java/com/example/demo/configuration/HealthConfiguration.java create mode 100644 src/main/java/com/example/demo/configuration/KafkaConfiguration.java create mode 100644 src/main/java/com/example/demo/configuration/SpringConfiguration.java create mode 100644 src/main/java/com/example/demo/model/User.java create mode 100644 src/main/java/com/example/demo/producer/UserProducer.java create mode 100644 src/main/java/com/example/demo/service/UserService.java create mode 100644 src/main/resources/application.yml create mode 100644 src/test/java/com/example/demo/DemoApplicationTests.java diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..549e00a --- /dev/null +++ b/.gitignore @@ -0,0 +1,33 @@ +HELP.md +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml new file mode 100644 index 0000000..b56943b --- /dev/null +++ b/docker/docker-compose.yml @@ -0,0 +1,15 @@ +version: '2' +services: + zookeeper: + image: wurstmeister/zookeeper + ports: + - "2181:2181" + kafka: + image: wurstmeister/kafka + ports: + - "9092:9092" + environment: + KAFKA_ADVERTISED_HOST_NAME: 192.168.0.14 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + volumes: + - /var/run/docker.sock:/var/run/docker.sock \ No newline at end of file diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..a2659e9 --- /dev/null +++ b/pom.xml @@ -0,0 +1,73 @@ + + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 2.4.2 + + + com.example + demo + 0.0.1-SNAPSHOT + demo + Demo project for Spring Boot + + 11 + + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-actuator + + + org.springframework.kafka + spring-kafka + + + + org.springframework.boot + spring-boot-devtools + runtime + true + + + org.projectlombok + lombok + true + + + org.springframework.boot + spring-boot-starter-test + test + + + org.springframework.kafka + spring-kafka-test + test + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + org.projectlombok + lombok + + + + + + + + diff --git a/src/main/java/com/example/demo/DemoApplication.java b/src/main/java/com/example/demo/DemoApplication.java new file mode 100644 index 0000000..64b538a --- /dev/null +++ b/src/main/java/com/example/demo/DemoApplication.java @@ -0,0 +1,13 @@ +package com.example.demo; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class DemoApplication { + + public static void main(String[] args) { + SpringApplication.run(DemoApplication.class, args); + } + +} diff --git a/src/main/java/com/example/demo/configuration/HealthConfiguration.java b/src/main/java/com/example/demo/configuration/HealthConfiguration.java new file mode 100644 index 0000000..6b964ee --- /dev/null +++ b/src/main/java/com/example/demo/configuration/HealthConfiguration.java @@ -0,0 +1,47 @@ +package com.example.demo.configuration; + +import com.example.demo.model.User; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.errors.TimeoutException; +import org.springframework.boot.actuate.health.Health; +import org.springframework.boot.actuate.health.HealthIndicator; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.ConsumerFactory; + +@Configuration +public class HealthConfiguration { + @Bean + public HealthIndicator trackUnitKafkaHealthIndicator(ConsumerFactory consumerFactory) { + return new KafkaHealthIndicator(consumerFactory); + } + + private static class KafkaHealthIndicator implements HealthIndicator { + private final ConsumerFactory consumerFactory; + + public KafkaHealthIndicator(ConsumerFactory consumerFactory) { + this.consumerFactory = consumerFactory; + } + + @Override + public Health health() { + Health result; + + try (Consumer consumer = consumerFactory.createConsumer()) { + if (consumer.listTopics().isEmpty()) { + result = Health.down() + .withDetail("reason", "There is no topic in kafka.") + .build(); + } else { + result = Health.up().build(); + } + } catch(TimeoutException ex) { + result = Health.down() + .withDetail("reason", "Timeout while ping kafka partner") + .build(); + } + + return result; + } + } +} diff --git a/src/main/java/com/example/demo/configuration/KafkaConfiguration.java b/src/main/java/com/example/demo/configuration/KafkaConfiguration.java new file mode 100644 index 0000000..3a453b5 --- /dev/null +++ b/src/main/java/com/example/demo/configuration/KafkaConfiguration.java @@ -0,0 +1,75 @@ +package com.example.demo.configuration; + +import com.example.demo.model.User; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.*; + +import java.util.HashMap; +import java.util.Map; + +@Configuration +public class KafkaConfiguration { + private final String bootstrapAddress; + private final String usersGroupId; + + public KafkaConfiguration(@Value("${kafka.demo.bootstrapAddress}") String bootstrapAddress, + @Value("${kafka.demo.topics.users.groupId}") String usersGroupId) { + this.bootstrapAddress = bootstrapAddress; + this.usersGroupId = usersGroupId; + } + + @Bean + public ProducerFactory userProducerFactory() { + final Map props = new HashMap<>(); + + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); + props.put(ProducerConfig.CLIENT_ID_CONFIG, usersGroupId); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 5000); + props.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 5000); + props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 5000); + props.put(ProducerConfig.RETRIES_CONFIG, 4); // Retry number regarding partition count + props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000); + props.put("metadata.max.idle.ms", 5000); + props.put(ProducerConfig.LINGER_MS_CONFIG, 30); + props.put("delivery.timeout.ms", 5120); // request.timeout.ms + linger.ms) * retries + + return new DefaultKafkaProducerFactory<>(props); + } + + @Bean + public KafkaTemplate usersKafkaTemplate() { + return new KafkaTemplate<>(userProducerFactory()); + } + + @Bean + public ConsumerFactory consumerFactory() { + Map props = new HashMap<>(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); + props.put(ConsumerConfig.GROUP_ID_CONFIG, usersGroupId); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, 5000); + props.put(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 5000); + props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000); + props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 5000); + props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 20971520); + props.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 2000); + return new DefaultKafkaConsumerFactory<>(props); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory()); + return factory; + } +} diff --git a/src/main/java/com/example/demo/configuration/SpringConfiguration.java b/src/main/java/com/example/demo/configuration/SpringConfiguration.java new file mode 100644 index 0000000..864e037 --- /dev/null +++ b/src/main/java/com/example/demo/configuration/SpringConfiguration.java @@ -0,0 +1,9 @@ +package com.example.demo.configuration; + +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.EnableScheduling; + +@Configuration +@EnableScheduling +public class SpringConfiguration { +} diff --git a/src/main/java/com/example/demo/model/User.java b/src/main/java/com/example/demo/model/User.java new file mode 100644 index 0000000..be24fc9 --- /dev/null +++ b/src/main/java/com/example/demo/model/User.java @@ -0,0 +1,17 @@ +package com.example.demo.model; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +import java.util.UUID; + +@Getter +@Setter +@NoArgsConstructor +@AllArgsConstructor +public class User { + private UUID id; + private String name; +} diff --git a/src/main/java/com/example/demo/producer/UserProducer.java b/src/main/java/com/example/demo/producer/UserProducer.java new file mode 100644 index 0000000..0f0d3cf --- /dev/null +++ b/src/main/java/com/example/demo/producer/UserProducer.java @@ -0,0 +1,55 @@ +package com.example.demo.producer; + +import com.example.demo.model.User; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; +import org.springframework.stereotype.Component; + +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +@Component +@Slf4j +public class UserProducer { + private final KafkaTemplate usersKafkaTemplate; + private final ObjectMapper objectMapper; + private final String usersTopicName; + + public UserProducer(KafkaTemplate usersKafkaTemplate, + ObjectMapper objectMapper, + @Value("${kafka.demo.topics.users.name}") String usersTopicName) { + this.usersKafkaTemplate = usersKafkaTemplate; + this.objectMapper = objectMapper; + this.usersTopicName = usersTopicName; + } + + public void send(User user) { + serialize(user) + .ifPresent(serializedUser -> { + log.info("Send user {}", user.getId()); + usersKafkaTemplate.send(usersTopicName, user.getId().toString(), serializedUser) + .completable() + .handle((u, throwable) -> { + if (throwable != null) { + log.error("Error while sending user in kafka :", throwable); + } + return null; + }); + }); + } + + + private Optional serialize(User user) { + Optional result = Optional.empty(); + try { + result = Optional.of(objectMapper.writeValueAsString(user)); + } catch (JsonProcessingException e) { + log.warn("Unable to serialize as JSON string the User object because it is null."); + } + return result; + } +} diff --git a/src/main/java/com/example/demo/service/UserService.java b/src/main/java/com/example/demo/service/UserService.java new file mode 100644 index 0000000..d4c09ca --- /dev/null +++ b/src/main/java/com/example/demo/service/UserService.java @@ -0,0 +1,27 @@ +package com.example.demo.service; + +import com.example.demo.model.User; +import com.example.demo.producer.UserProducer; +import org.springframework.scheduling.annotation.Async; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; + +import java.util.UUID; + +@Service +public class UserService { + private final UserProducer userProducer; + + public UserService(UserProducer userProducer) { + this.userProducer = userProducer; + } + + @Async + @Scheduled(fixedRate = 2000) + public void sendUsersInKafka() { + User user = new User(); + user.setId(UUID.randomUUID()); + user.setName("User name"); + userProducer.send(user); + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml new file mode 100644 index 0000000..cbcf548 --- /dev/null +++ b/src/main/resources/application.yml @@ -0,0 +1,25 @@ +# These properties are used to configure the actuator healthCheck. +management: + endpoint: + health: + show-details: always + health: + db: + enabled: false + diskSpace: + enabled: false + + +kafka: + demo: + bootstrapAddress: localhost:9092 + topics: + users: + name: users-topic + groupId: users-groupId + +logging: + level: + org: + apache: +# kafka: ERROR \ No newline at end of file diff --git a/src/test/java/com/example/demo/DemoApplicationTests.java b/src/test/java/com/example/demo/DemoApplicationTests.java new file mode 100644 index 0000000..2778a6a --- /dev/null +++ b/src/test/java/com/example/demo/DemoApplicationTests.java @@ -0,0 +1,13 @@ +package com.example.demo; + +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; + +@SpringBootTest +class DemoApplicationTests { + + @Test + void contextLoads() { + } + +}