Publicando mensagens para o Kafka

Após entendermos como o Kafka funciona e como fazê-lo rodar na sua máquina, chegou a hora de você aprender a publicar as suas primeiras mensagens, seus primeiros eventos. Nesse post, vamos aprender as melhores práticas para desenvolver produtores de mensagens, como torná-los testáveis e sem agredir a performance da sua aplicação.

https://github.com/ftathiago/blogdoft-toycode/tree/feature/CreateKafkaProducer

Acesse o código da aplicação de exemplo

Sobre esse assunto, a primeira coisa que você precisa saber é:

Librdkafka: a biblioteca mágica

Essa é uma biblioteca escrita em C/C++ que abstrai todo o trato de conexão, envio e consumo de mensagens com o Kafka. Todo mesmo! Além da conexão com o broker, a librdkafka também provê controle e buffer de envio das mensagens, estratégias de retentativa e reconexão. E não menos importante, também temos interfaces síncronas e assíncronas para o envio das mensagens.

Em palavras diretas, você não precisa se preocupar em utilizar bibliotecas ou estratégias de retry – como as possibilitadas pelo Polly – na sua aplicação. E mais importante: A conexão é thread-safe, ou seja, você pode (e eu diria até que deve!) torná-la disponível como um singleton na sua aplicação. Isso fará como que você ganhe alguns segundos de performance a cada requisição da sua aplicação.

Para extrair o máximo dessa biblioteca, é necessário um mergulho na sua extensa documentação e detalhes de configuração. Não é o escopo, neste momento, abordamos detalhes de configuração. Em momento oportuno, podemos verificar algumas configurações que podem ser interessantes. Por hora, vamos focar no básico viável para produzirmos nossa mensagem.

O mínimo que você precisa saber sobre conexão

Para cada situação, você vai querer configurações diferentes. Em determinados contextos, você vai priorizar o throughput; talvez valorizar tanto ao ponto de não se importar com a idempotência garantida pelo Kafka, ou não se importar com a ordem de publicação das mensagens. Em outras situações, você quer garantir que as mensagens sempre vão chegar na mesma ordem ao broker e processadas na mesma ordem também. A estratégia que vou te explicar agora é das mais simples e conservadoras. Recomendo, novamente, a leitura da documentação antes de subir algo para produção.

Vamos às configurações:

  • Acks: Esta opção determina de quem estamos aguardando as respostas de confirmação de recebimento:
    • All: Todos os brokers ativos devem confirmar o recebimento da mensagem e caso nem todos confirmem, será considerada uma falha;
    • Leader: Espera-se apenas a confirmação por parte do nó líder para considerar a publicação como “bem sucedida”;
    • None: A mensagem é tida como recebida imediatamente após a sua produção; nenhuma retentativa será feita e nenhuma exceção levantada;
  • BootstrapServer: Uma lista de endereços, separada por vírgula, indicando os nós que formam o cluster Kafka. Antigamente era necessário especificar apenas o endereço do Zookeeper. No entanto, essa opção está depreciada dado que em breve o Zookeeper não será mais utilizado;
  • RequestTimeoutMs: Quando enviada a requisição, esta propriedade define quanto tempo (em milissegundos) a aplicação deverá esperar pelo retorno dos Brokers. Em caso de timeout, uma retentativa será executada. O padrão atual é de 30000ms.
  • MessageTimeoutMs: O tempo total (incluindo retentativas) em que uma mensagem deve ser entregue.

Existem outras configurações a serem exploradas. Repare, por exemplo, que não falei de autenticação nesta lista de parâmetros. O Kafka permite várias formas de autenticação: de um simples usuário e senha ao uso de certificados e afins. Veja com o seu vendor quais os meios de autenticação utilizados. E como podem transitar dados sensíveis, usar SSL (no mínimo) é moralmente obrigatório.

Idempotência?

Caso você não se lembre bem o que esta palavra significa, deixa eu te lembrar com um exemplo: O que acontece quando você faz uma requisição HTTP GET /users/123? Não importa quantas vezes você faz a mesma operação: você sempre terá o mesmo resultado. O mesmo nem sempre é esperado de uma requisição HTTP POST, afinal se você repetir essa operação, ou irá criar um novo recurso ou terá um erro de execução (como assim você está inserindo um CPF repetido???).

A idempotência é uma característica desejada pelas aplicações publicadoras e consumidoras. Se o produtor, por algum motivo, enviar a mesma mensagem duas vezes, deseja-se que o broker recuse a segunda tentativa – preferencialmente sem nenhuma mensagem de erro. E o consumer espera que o broker não lhe envie a mesma mensagem duas vezes. Caso o broker não garanta a idempotência, será papel de todas as aplicações pub/sub criarem meios de garanti-la.

O Kafka consegue ser idempotente, no consumo principalmente, com facilidade. Mas sempre há os corner cases, também conhecidas como as “exceção das exceções”; um caso tão raro e atípico que é difícil até de simular em laboratório. Mas que é possível, é. E dependendo da importância desses eventos para o seu negócio, é imprescindível que você também esteja preparado para esses corner cases.

Em termos de configuração, EnableIdempotence = true é o primeiro passo para garantir que a mesma mensagem não será publicada duas vezes. Outras configurações também serão necessárias, mas discutiremos isso em outro momento.

Construindo um provider

Como foi dito há pouco, manter a conexão com o Kafka como um singleton pode melhorar a performance da sua aplicação em alguns segundos por requisição! Todavia, a conexão é feita através do producer. A biblioteca de conexão com o Kafka nos disponibiliza uma interface que abstrai a produção de mensagems: Confluent.Kafka.IProducer<TKey, TValue>. Repare que a interface dispõe de uma generalização para a chave da mensagem e o objeto que está sendo desserializado.

Essa relação direta entre chave e valor é uma das razões que faz necessária a criação de um provider de producers. A combinação entre os recursos do Injetor de Dependências e a aplicação do padrão Singleton na classe provider, garantem o controle

Uma classe provider tem o papel de encapsular toda a complexidade envolvida na criação do producer. Esta complexidade envolve, além da aplicação das configurações de conexão, determinar as classes de serialização/desserilização dos objetos transmitidos, mecanismos de log, manipuladores de erro, estratégias de particionamento. Estas e outras capacidades estão disponíveis através da classe Confluent.Kafka.ProducerBuilder<TKey,TValue>.

Toda a lógica do ProducerBuilder poderia estar encapsulada dentro de um método IServiceCollection.AddSingleton. Porém, é da minha preferência ter uma classe Provider responsável por isso. A clareza de intenção, neste caso, cobra um preço baixo: Apenas que eu implemente o padrão disposable para garantir que o produtor gerado será descartado.

Veja a classe Producer.InfraKafka.Providers.ProducerProvider para ter um exemplo de implementação.

Construindo um producer

Ter o producer injetado diretamente nas classes de serviço, embora possível, é um erro arquitetural. Não se contamina a camada de serviço (que estaria mais próximo a um domínio) com detalhes técnicos. O ideal é isolar das classes de serviço, toda a complexidade de geração da mensagem. Parte dessa complexidade está em preparar os headers da mensagem, possibilitando a continuidade do tracing do processamento, por exemplo. Outro ponto é que o Kafka faz uso de um tipo particular de mensagem. Ter os seus objetos de evento também acoplados ao Kafka não faz muito sentido. Por isso gosto de ter uma classe producer.

Esta classe, além de converter o evento recebido num objeto Message<TKey, TValue>, encapsula a lógica de serialização (se você não tiver utilizado um serializador na configuração do ProducerBuilder) e a confirmação do envio para o tópico, além de manipular qualquer exceção. É uma oportunidade de você criar dispositivos que garantam a já falada idempotência.

Na classe de exemplo, implementamos um exemplo simples, que apenas serializa a mensagem e adiciona um correlationId como header. Em produção você poderia, por exemplo, serializar o seu objeto, seguindo alguma especificação de schema. Ainda no exemplo proposto, estou publicando a entidade diretamente. Esta opção é por pura objetividade. Sempre que possível, crie classes – ou qualquer outra estrutura de dados – que representem os seus eventos. Especialmente se o schema dos dados e do evento não estão fortemente ligados.

O exemplo está implementado na classe  Producer.InfraKafka.Producers.SaleEventPublisher. E eu quero que você também se atente ao detalhe de que, para a camada de negócio, a metáfora que utilizamos é a de publicação de eventos – que é o que faz sentido para o negócio. Já no detalhe técnico do Kafka, adotamos uma implementação mais próxima da linguagem técnica do Kafka.

Apêndices do projeto

Sim, é apenas isso que você precisa para publicar objetos no Kafka. Porém, você deve estar se perguntando o que são todos esses penduricalhos na aplicação de exemplo. Na classe SaleEventProducer, por exemplo, há um método que pode parecer inútil:

namespace Producer.InfraKafka.Producers
{
     public class SaleEventProducer : IEvent, IWarmUpCommand
     {
         Task IWarmUpCommand.Execute() =>
             Task.CompletedTask;
...

Este método satisfaz a interface IWarmupCommand, cuja função é diminuir os impactos da famosa “primeira requisição”. Esta é uma das práticas que estão em estudos/teste. E assim que eu tiver resultados mais apurados, pode deixar que eu conto aqui 😉

Outro detalhe que eu quero chamar a sua atenção é para a implementação das classes ServicesExtension. Dá um certo trabalho implementar as dependências diretamente em cada camada? Com certeza dá. Mas o que se ganha em organização e desacoplamento, compensa o esforço. Para entender melhor como eu gosto de organizar os meus projetos, dê uma olhada neste post sobre Como estruturar o seu projeto em C#

Por fim, adicionei ao projeto um docker-compose. Com um simples comando é possível levantar toda as dependências de infra do projeto e algumas mais. Temos o Zookeeper e o Kafka. A porta 9093 deve ser utilizada para conexões externas a rede criada no docker e a 9092 para conexões internas. Também temos uma imagem do SQLServer, que cria um banco de dados de exemplo. E por fim, temos a ELK stack configurada para prover APM da API. Vale muito a pena brincar um pouco com isso também (artigos em breve!).

Como também incluí uma imagem Docker da API, ao tentar levantar o docker-compose, utilize a opção --build.

docker-compose up --build

Espero que o projeto possa ser útil e te servir de inspiração para os seus próximos projetos. Nos próximos artigos, vamos aprofundar alguns conceitos que abordamos superficialmente, mas principalmente: vamos implementar os consumers!

Até lá!

Deixe um comentário

O seu endereço de e-mail não será publicado. Campos obrigatórios são marcados com *

Esse site utiliza o Akismet para reduzir spam. Aprenda como seus dados de comentários são processados.