Kafka Guide
Kafka Guide
Kafka Guide
Architettura
dei Sistemi
Software
Comunicazione asincrona:
Kafka
dispensa asw840 When you come out of the storm,
you won’t be the same person
marzo 2021 who walked in.
That’s what this storm’s all about.
Haruki Murakami (Kafka on the Shore)
1 Comunicazione asincrona: Kafka Luca Cabibbo ASW
- Riferimenti
Luca Cabibbo. Architettura del Software: Strutture e Qualità. Edizioni
Efesto, 2021.
Capitolo 25, Comunicazione asincrona
Argomenti
introduzione a Kafka
esempi
discussione
* Introduzione a Kafka
Apache Kafka è una piattaforma distribuita per lo streaming
Kafka fornisce tre capacità fondamentali
publish-subscribe su stream (flussi) di record – ovvero, è in
grado di agire da message broker
memorizzazione di stream di record in modo duraturo e
tollerante ai guasti
elaborazione di stream di record, mentre occorrono (mentre
vengono prodotti)
Produttori e consumatori
Produttori e consumatori (e flussi di record)
Kafka
Topic e partizioni
Un topic, con le sue partizioni e i suoi record
record (the content
Topic: alpha is not shown)
1 1
Partition 0 0 1 2 3 4 5 6 7 8 9
0 1
8 offset
Partition 1 0 1 2 3 4 5 6 7
within partition
(e.g., 9)
1
Partition 2 0 1 2 3 4 5 6 7 8 9 each new record
0
is appended to a
partition
Old New
nota: i record mostrati in questa figura sono tutti distinti tra loro
– all’interno è mostrato l’offset del record, non il contenuto
8 Comunicazione asincrona: Kafka Luca Cabibbo ASW
Produttori
Ogni produttore, durante la sua esistenza, può pubblicare molti
record sui topic che vuole
ciascun record pubblicato su un topic viene appeso a una sola
delle partizioni del topic
la scelta della partizione può avvenire in modalità round-
robin oppure utilizzando una qualche funzione di
partizionamento semantico
Consumatori e gruppi
Ogni consumatore (istanza di consumatore), per ricevere i record
di un topic, deve abbonarsi al topic
quando un consumatore si abbona a un topic, lo fa
specificando il nome del suo consumer group (gruppo) – che
viene utilizzato da Kafka per la distribuzione dei record del topic
ai consumatori abbonati al topic
Kafka distribuisce i record del topic consegnando ciascun
record pubblicato sul topic a un consumatore (istanza di
consumatore) per ciascuno dei gruppi
ogni record di un topic viene dunque consegnato a molti
consumatori – viene consegnato a tutti i gruppi, e
precisamente a un solo consumatore per ciascun gruppo
nell’ambito di un gruppo, i diversi record di un topic vengono
in genere consegnati a consumatori differenti di quel gruppo
(e non tutti a uno stesso consumatore)
Consumatori e gruppi
Kafka distribuisce i record di un topic consegnando ciascun record
pubblicato sul topic a un solo consumatore (istanza di
consumatore) per gruppo
consideriamo alcuni consumatori per un topic suddivisi su più
gruppi
Group A A1
Group B B1 B2 B3
Group C C1 C2
- Un produttore e un consumatore
Consideriamo ora un semplice esempio, con un produttore e un
consumatore, che si scambiano messaggi testuali su un topic
(alpha)
realizziamo il produttore come un’applicazione Spring Boot, il
cui package di base è asw.kafka.simpleproducer
realizziamo anche il consumatore come un’altra applicazione
Spring Boot, il cui package di base è
asw.kafka.simpleconsumer
utilizziamo il progetto Spring for Apache Kafka, che semplifica
l’accesso a Kafka, mediante l’uso di template
va utilizzata la dipendenza starter
org.springframework.kafka:spring-kafka
Produttore
Ecco una porzione di esempio del produttore “finale”
è qui che va definita la logica applicativa del produttore
package asw.kafka.simpleproducer.domain;
import ...;
@Component
public class SimpleProducerRunner implements CommandLineRunner {
@Autowired
private SimpleProducerService simpleProducerService;
public void run(String[] args) {
String message = ... produce un messaggio message ...
simpleProducerService.publish(message);
}
Qui va definita la
}
logica applicativa del
produttore.
package asw.kafka.simpleproducer.domain;
public interface SimpleMessagePublisher {
public void publish(String message);
}
Produttore
Per consentire l’invio di messaggi su Kafka è necessario un
outbound adapter (messagepublisher) per Kafka
ecco la sua implementazione
package asw.kafka.simpleproducer.messagepublisher;
import asw.kafka.simpleproducer.domain.SimpleMessagePublisher;
import org.springframework.kafka.core.KafkaTemplate;
import ...
@Component
public class SimpleMessagePublisherImpl
implements SimpleMessagePublisher {
... vedi dopo ...
} in rosso indichiamo
il codice relativo a
Kafka
@Value("${asw.kafka.channel.out}")
private String channel;
# application.properties
asw.kafka.channel.out=alpha
@Autowired
private KafkaTemplate<String, String> template;
public void publish(String message) {
template.send(channel, message);
}
Produttore
Un’occhiata al file application.properties
nome del canale su cui inviare messaggi
# NON ESEGUIRE COME APPLICAZIONE WEB
spring.main.web-application-type=NONE
# MESSAGING
asw.kafka.channel.out=alpha nome del gruppo del componente
asw.kafka.groupid=simple-producer (irrilevante in questo caso)
# KAFKA
spring.kafka.bootstrap-servers=10.11.1.121:9092
spring.kafka.producer.key-serializer=
org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=
org.springframework.kafka.support.serializer.JsonSerializer
Consumatore
Per consentire la ricezione di messaggi da Kafka è necessario un
inbound adapter (messagelistener) per Kafka
ecco la sua implementazione
package asw.kafka.simpleconsumer.messagelistener;
import asw.kafka.simpleconsumer.domain.SimpleConsumerService;
import org.springframework.kafka.annotation.KafkaListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import ...
@Component
public class SimpleMessageListener {
... vedi dopo ...
}
Consumatore
Per consentire la ricezione di messaggi da Kafka è necessario un
inbound adapter (messagelistener) per Kafka
l’annotazione @KafkaListener, gestita dal framework Spring,
svolge quasi tutto il lavoro
all’avvio dell’applicazione, Spring richiede a Kafka di
abbonare questo consumatore (istanza di consumatore) ai
topic elencati (in questo caso, al solo topic alpha) usando il
gruppo specificato (in questo caso, simple-consumer) – in
corrispondenza, Kafka gli assegna (dinamicamente) zero,
una o più partizioni del topic alpha
per ogni messaggio pubblicato su una di queste partizioni
del topic alpha, Kafka (tramite Spring) consegna il
messaggio a questo consumatore (istanza di consumatore),
invocando proprio il metodo listen annotato con
@KafkaListener (consumo in modalità “subscription”)
Consumatore
Nel file application.properties si noti anche la proprietà
spring.kafka.consumer.auto-offset-reset
questa proprietà consente di regolare gli aspetti temporali della
consegna di messaggi a un gruppo di consumatori su un topic
il valore latest specifica che i consumatori di quel gruppo
debbano ricevere solo i messaggi pubblicati sul topic dal
momento del loro abbonamento – escludendo quelli
pubblicati prima dell’inizio dell’abbonamento
il valore earliest specifica invece che i consumatori di quel
gruppo debbano ricevere tutti i messaggi pubblicati sul topic
– compresi quelli pubblicati in passato, anche prima del loro
abbonamento
conseguenze
C riceve N messaggi (nell’ordine in cui sono stati inviati)
1 2 3 4
P
1 2 3 4
C
conseguenze
C riceve N messaggi (non necessariamente nell’ordine in cui
sono stati inviati)
1 2 3 4
P
1 2 4 3
C
conseguenze
C riceve N messaggi (non necessariamente nell’ordine in cui
sono stati inviati)
1 2 3 4
P
1 3 2 4
C
conseguenze
C non riceve alcun messaggio
1 2 3 4
P
conseguenze
C riceve N1+N2 messaggi
1 2 3 4
P1
A B C
P2
1 A 2 3 B C 4
C
conseguenze
il consumatore C1 riceve X messaggi
l’altro consumatore C2 riceve gli altri N-X messaggi
1 2 3 4 5 6
P
1 2 4 6
C1
3 5
C2
conseguenze
il consumatore C1 riceve tutti i messaggi
l’altro consumatore C2 non riceve alcun messaggio
1 2 3 4 5 6
P
1 2 3 4 5 6
C1
C2
conseguenze
il consumatore C1 riceve X messaggi
l’altro consumatore C2 riceve gli altri N1+N2-X messaggi
1 2 3 4
P1
A B C
P2
1 B C 4
C1
A 2 3
C2
41 Comunicazione asincrona: Kafka Luca Cabibbo ASW
conseguenze
ciascuno dei consumatori C1 e C2 riceve N messaggi
1 2 3 4 5 6
P
1 2 3 4 5 6
C1
1 2 3 4 5 6
C2
conseguenze
il consumatore C1 riceve N messaggi
C2’ riceve X messaggi, C2’’ riceve gli altri N-X messaggi
1 2 3 4 5 6
P
1 2 3 4 5 6
C1
1 2 4 6
C2’
3 5
C2’’
43 Comunicazione asincrona: Kafka Luca Cabibbo ASW
Filtro
Il filtro definisce un servizio per l’elaborazione di messaggi testuali
è qui che va definita la logica applicativa del filtro
package asw.kafka.simplefilter.domain;
import org.springframework.stereotype.Service;
import org.springframework.beans.factory.annotation.Autowired
@Service
public class SimpleFilterService {
@Autowired
private SimpleMessagePublisher simpleMessagePublisher;
public void filter(String inMessage) {
String outMessage = ... elabora il messaggio message ricevuto ...
simpleMessagePublisher.publish(outMessage);
}
} Qui va definita la
logica applicativa del
filtro.
package asw.kafka.simplefilter.domain;
public interface SimpleMessagePublisher {
public void publish(String message);
}
Filtro
L’implementazione dell’outbound adapter (messagepublisher)
è come per il produttore – in questo caso cambia il package e il
file di configurazione application.properties
package asw.kafka.simplefilter.messagepublisher;
import asw.kafka.simplefilter.domain.SimpleMessagePublisher;
import org.springframework.kafka.core.KafkaTemplate;
import ...
@Component
public class SimpleMessagePublisherImpl
implements SimpleMessagePublisher {
... vedi dopo ...
}
Filtro
L’implementazione dell’inbound adapter (messagelistener)
è simile a quella del consumatore – cambia il package, il file di
configurazione application.properties, e soprattutto il servizio
invocato quando viene ricevuto un messaggio
package asw.kafka.simplefilter.messagelistener;
import asw.kafka.simplefilter.domain.SimpleFilterService;
import org.springframework.kafka.annotation.KafkaListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import ...
@Component
public class SimpleMessageListener {
... vedi dopo ...
}
Filtro
Un’occhiata al file application.properties
# NON ESEGUIRE COME APPLICAZIONE WEB
spring.main.web-application-type=NONE
# MESSAGING
asw.kafka.channel.in=alpha
asw.kafka.channel.out=beta
asw.kafka.groupid=simple-filter
# KAFKA
spring.kafka.bootstrap-servers=10.11.1.121:9092
spring.kafka.consumer.group-id=${asw.kafka.groupid}
# spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.key-deserializer=
org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=
org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.producer.key-serializer=
org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=
org.springframework.kafka.support.serializer.JsonSerializer
52 Comunicazione asincrona: Kafka Luca Cabibbo ASW
- Il servizio restaurant-service
Consideriamo ora il servizio restaurant-service per la gestione di
un insieme di ristoranti – nell’ambito di un’applicazione efood per
la gestione di un servizio di ordinazione e spedizione a domicilio di
pasti da ristoranti, su scala nazionale – già introdotto in una
dispensa precedente
i ristoranti sono definiti come un’entità JPA Restaurant – con
attributi id, name e location
la gestione dei ristoranti avviene tramite il servizio
RestaurantService
Servizio restaurant-service e
comunicazione asincrona
Ecco alcune possibili applicazioni della comunicazione asincrona
per il servizio restaurant-service – nel contesto dell’applicazione
efood, in cui ci sono diversi servizi applicativi
pubblicazione di eventi relativi a cambiamenti di stato avvenuti
in questo servizio
altri servizi potrebbero essere interessati a questi eventi, per
poter eseguire delle azioni in corrispondenza al loro
verificarsi
ascolto di eventi pubblicati da altri servizi applicativi
questo servizio potrebbe essere interessato a tali eventi, per
poter eseguire delle azioni in corrispondenza al loro
verificarsi
Pubblicazione di eventi
Il servizio per la gestione dei ristoranti può pubblicare eventi di
dominio mediante un outbound adapter (eventpublisher)
questo richiede
la definizione degli eventi del dominio dei ristoranti
la specifica del canale su cui scambiare gli eventi del
dominio dei ristoranti
la definizione di un’interfaccia per l’adapter eventpublisher e
la sua implementazione
l’utilizzo dell’adapter eventpublisher – ad es., da parte del
servizio RestaurantService
package asw.efood.common.api.event;
public interface DomainEvent {
}
Pubblicazione di eventi
La definizione degli eventi del dominio dei ristoranti
l’evento di dominio RestaurantCreatedEvent
package asw.efood.restaurantservice.api.event;
import asw.efood.common.api.event.DomainEvent;
public class RestaurantCreatedEvent implements DomainEvent {
private Long id;
private String name;
private String location;
… costruttori e metodi get, set e toString …
}
Pubblicazione di eventi
La definizione di un’interfaccia per l’adapter eventpublisher e la
sua implementazione
package asw.efood.restaurantservice.domain;
import asw.efood.common.api.event.DomainEvent;
public interface RestaurantDomainEventPublisher {
public void publish(DomainEvent event);
}
Pubblicazione di eventi
L’utilizzo dell’adapter eventpublisher – ad es., da parte del
servizio RestaurantService
sono evidenziate in rosso le differenze rispetto alla versione
precedente del servizio
package asw.efood.restaurantservice.domain;
import …
@Service
@Transactional
public class RestaurantService {
@Autowired
private RestaurantRepository restaurantRepository;
@Autowired
private RestaurantDomainEventPublisher domainEventPublisher;
... vedi dopo ...
}
Ricezione di comandi
Il servizio per la gestione dei ristoranti può ricevere comandi per le
proprie operazioni mediante un inbound adapter
(commandlistener)
questo richiede
la definizione dei comandi del servizio dei ristoranti
la specifica del canale su cui scambiare i comandi del
servizio dei ristoranti
l’implementazione di un command handler (gestore dei
comandi) per il servizio dei ristoranti
l’implementazione dell’adapter commandlistener
package asw.efood.common.api.command;
public interface Command {
}
Ricezione di comandi
La definizione dei comandi del servizio dei ristoranti
il comando CreateRestaurantCommand
package asw.efood.restaurantservice.api.command;
import asw.efood.common.api.command.Command;
public class CreateRestaurantCommand implements Command {
private String name;
private String location;
… costruttori e metodi get, set e toString …
}
Ricezione di comandi
L’implementazione di un command handler per il servizio dei
ristoranti
definisce il metodo pubblico onCommand per la gestione dei
comandi
package asw.efood.restaurantservice.domain;
import asw.efood.common.api.command.Command;
import asw.efood.restaurantservice.api.command.*;
import …
@Service
public class RestaurantCommandHandler {
@Autowired
private RestaurantService restaurantService;
public void onCommand(Command command) {
... vedi dopo ...
}
}
68 Comunicazione asincrona: Kafka Luca Cabibbo ASW
Ricezione di comandi
L’implementazione di un command handler per il servizio dei
ristoranti
definisce il metodo pubblico onCommand per la gestione dei
comandi
public void onCommand(Command command) {
if (command.getClass().equals(CreateRestaurantCommand.class)) {
CreateRestaurantCommand c = (CreateRestaurantCommand) command;
this.createRestaurant(c);
} else if (command.getClass().equals(AnotherOpCommand.class)) {
AnotherOpCommand c = (AnotherOpCommand) command;
this.anotherOp(c);
} else {
... unknown command ...
}
}
69 Comunicazione asincrona: Kafka Luca Cabibbo ASW
Ricezione di comandi
L’implementazione di un command handler per il servizio dei
ristoranti
inoltre, definisce un metodo di supporto per ciascuno dei
comandi
private void createRestaurant(CreateRestaurantCommand command) {
restaurantService.createRestaurant(
command.getName(),
command.getLocation()
);
}
Configurazione
Un’occhiata al file application.properties del servizio dei ristoranti
– limitatamente alla configurazione di Kafka
# KAFKA
spring.kafka.bootstrap-servers=10.11.1.121:9092
spring.kafka.consumer.group-id=${spring.application.name}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.key-serializer=
org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=
org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.key-deserializer=
org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=
org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
- Discussione
Ecco alcune considerazioni sull’utilizzo di Kafka
consente di inviare e ricevere messaggi (record) tramite canali
(topic) – con un modello basato su gruppi che generalizza
quello dei canali point-to-point e publish-subscribe
con riferimento all’architettura esagonale
l’invio di messaggi, da parte di un produttore, richiede la
definizione di un outbound adapter
la ricezione di messaggi, da parte di un consumatore,
richiede la definizione di un inbound adapter
un componente può agire sia da produttore che da
consumatore di messaggi
i messaggi scambiati possono essere documenti, eventi di
dominio e comandi – ciascuna tipologia di essi richiederà un
canale specifico
* Discussione
In questa dispensa abbiamo presentato Apache Kafka come
piattaforma per la comunicazione asincrona
Kafka consente di agire da message broker – ovvero supporta
il pattern publish-subscribe per la trasmissione di stream di
record (flussi di messaggi)
i canali (chiamati topic e organizzati in partizioni) consentono di
pubblicare e di ricevere record
i produttori possono pubblicare flussi di messaggi (record) su
uno o più topic
i consumatori possono ricevere flussi di messaggi (record) da
uno o più topic
i consumatori di messaggi sono organizzati in gruppi – utilizzati
per la distribuzione dei messaggi ai consumatori – secondo un
modello che generalizza quello dei canali point-to-point e
publish-subscribe
76 Comunicazione asincrona: Kafka Luca Cabibbo ASW