Initial commit.

This commit is contained in:
Florian THIERRY
2021-02-03 10:43:53 +01:00
commit 9891cf5da7
11 changed files with 329 additions and 0 deletions

33
.gitignore vendored Normal file
View File

@@ -0,0 +1,33 @@
HELP.md
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/

15
docker/docker-compose.yml Normal file
View File

@@ -0,0 +1,15 @@
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 192.168.0.14
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock

View File

@@ -0,0 +1,13 @@
package com.example.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}

View File

@@ -0,0 +1,47 @@
package com.example.demo.configuration;
import com.example.demo.model.User;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.errors.TimeoutException;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.ConsumerFactory;
@Configuration
public class HealthConfiguration {
@Bean
public HealthIndicator trackUnitKafkaHealthIndicator(ConsumerFactory<String, User> consumerFactory) {
return new KafkaHealthIndicator(consumerFactory);
}
private static class KafkaHealthIndicator implements HealthIndicator {
private final ConsumerFactory<String, User> consumerFactory;
public KafkaHealthIndicator(ConsumerFactory<String, User> consumerFactory) {
this.consumerFactory = consumerFactory;
}
@Override
public Health health() {
Health result;
try (Consumer<String, User> consumer = consumerFactory.createConsumer()) {
if (consumer.listTopics().isEmpty()) {
result = Health.down()
.withDetail("reason", "There is no topic in kafka.")
.build();
} else {
result = Health.up().build();
}
} catch(TimeoutException ex) {
result = Health.down()
.withDetail("reason", "Timeout while ping kafka partner")
.build();
}
return result;
}
}
}

View File

@@ -0,0 +1,75 @@
package com.example.demo.configuration;
import com.example.demo.model.User;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConfiguration {
private final String bootstrapAddress;
private final String usersGroupId;
public KafkaConfiguration(@Value("${kafka.demo.bootstrapAddress}") String bootstrapAddress,
@Value("${kafka.demo.topics.users.groupId}") String usersGroupId) {
this.bootstrapAddress = bootstrapAddress;
this.usersGroupId = usersGroupId;
}
@Bean
public ProducerFactory<String, String> userProducerFactory() {
final Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ProducerConfig.CLIENT_ID_CONFIG, usersGroupId);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 5000);
props.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 5000);
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 5000);
props.put(ProducerConfig.RETRIES_CONFIG, 4); // Retry number regarding partition count
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
props.put("metadata.max.idle.ms", 5000);
props.put(ProducerConfig.LINGER_MS_CONFIG, 30);
props.put("delivery.timeout.ms", 5120); // request.timeout.ms + linger.ms) * retries
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, String> usersKafkaTemplate() {
return new KafkaTemplate<>(userProducerFactory());
}
@Bean
public ConsumerFactory<String, User> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, usersGroupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, 5000);
props.put(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 5000);
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 5000);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 20971520);
props.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 2000);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, User> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}

View File

@@ -0,0 +1,9 @@
package com.example.demo.configuration;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
@Configuration
@EnableScheduling
public class SpringConfiguration {
}

View File

@@ -0,0 +1,17 @@
package com.example.demo.model;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import java.util.UUID;
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class User {
private UUID id;
private String name;
}

View File

@@ -0,0 +1,55 @@
package com.example.demo.producer;
import com.example.demo.model.User;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@Component
@Slf4j
public class UserProducer {
private final KafkaTemplate<String, String> usersKafkaTemplate;
private final ObjectMapper objectMapper;
private final String usersTopicName;
public UserProducer(KafkaTemplate<String, String> usersKafkaTemplate,
ObjectMapper objectMapper,
@Value("${kafka.demo.topics.users.name}") String usersTopicName) {
this.usersKafkaTemplate = usersKafkaTemplate;
this.objectMapper = objectMapper;
this.usersTopicName = usersTopicName;
}
public void send(User user) {
serialize(user)
.ifPresent(serializedUser -> {
log.info("Send user {}", user.getId());
usersKafkaTemplate.send(usersTopicName, user.getId().toString(), serializedUser)
.completable()
.handle((u, throwable) -> {
if (throwable != null) {
log.error("Error while sending user in kafka :", throwable);
}
return null;
});
});
}
private Optional<String> serialize(User user) {
Optional<String> result = Optional.empty();
try {
result = Optional.of(objectMapper.writeValueAsString(user));
} catch (JsonProcessingException e) {
log.warn("Unable to serialize as JSON string the User object because it is null.");
}
return result;
}
}

View File

@@ -0,0 +1,27 @@
package com.example.demo.service;
import com.example.demo.model.User;
import com.example.demo.producer.UserProducer;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.util.UUID;
@Service
public class UserService {
private final UserProducer userProducer;
public UserService(UserProducer userProducer) {
this.userProducer = userProducer;
}
@Async
@Scheduled(fixedRate = 2000)
public void sendUsersInKafka() {
User user = new User();
user.setId(UUID.randomUUID());
user.setName("User name");
userProducer.send(user);
}
}

View File

@@ -0,0 +1,25 @@
# These properties are used to configure the actuator healthCheck.
management:
endpoint:
health:
show-details: always
health:
db:
enabled: false
diskSpace:
enabled: false
kafka:
demo:
bootstrapAddress: localhost:9092
topics:
users:
name: users-topic
groupId: users-groupId
logging:
level:
org:
apache:
# kafka: ERROR

View File

@@ -0,0 +1,13 @@
package com.example.demo;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class DemoApplicationTests {
@Test
void contextLoads() {
}
}