Como configurar um consumer para o Kafka

Das principais vantagens do Kafka, uma delas é o consumer receber uma mensagem uma única vez. Isso nos livra de ter de controlar a idempotência do processamento manual. No entanto, para ter acesso a essa vantagem, se faz necessário a correta configuração do seu consumer. E é sobre isso que vamos falar hoje: Como configurar um consumer para o Kafka.

Como assim uma única vez?

Essa é uma das maiores preocupações em quando se trabalha com mensageria. Talvez você esteja implementando um sistema de notificações. E vamos concordar que, em alguns casos, receber dois e-mails (ou até nenhum) pode não gerar maiores problemas. Agora, se estivermos falando de um sistema financeiro, é importantíssimo que as informações de saque/depósito sejam processadas apenas uma vez. Do contrário, os efeitos podem ser catastróficos!

Por isso a preocupação em receber a mensagem uma única vez. Acontece que a nossa preocupação não deveria ser apenas esta. O que realmente desejamos é que uma mensagem seja processada apenas uma única vez. E em alguns casos, na exata ordem dos eventos. Neste cenário se torna imprescindível que recebamos a mesma mensagem tantas vezes o processamento dela falhe. E quando o processamento for finalizado com sucesso, nunca mais receber a mesma mensagem.

Você pode adotar uma estratégia de validar se as mensagens já foram processadas – com algum tipo de persistência de chaves – ou ainda partir para uma estratégia de persistir localmente as mensagens recebidas e criar uma máquina de estado que seja capaz de processar as mensagens do banco. São estratégias válidas, mas que com o Kafka, podem representar um overengineering. As configurações que vou apresentar a seguir buscam alcançar esse fluxo de “Consumo – Processamento – Commit”.

EnableAutoOffsetStore = false

Para o cenário em que eu preciso consumir a mensagem, processar a mensagem e confirmar o processamento, algumas configurações do consumidor precisam ser alteradas. A primeira delas é EnableAutoOffsetStore. Como você já sabe, para cada consumer-group há uma espécie de ponteiro dizendo qual é a próxima mensagem a ser consumida. Esse ponteiro é o offset. Quando consumimos uma mensagem, automaticamente o Kafka passa a apontar para o próximo offset da partição. Com EnableAutoOffsetStore= false, nós alteramos esse comportamento e passamos a controlar quando o offset é atualizado. Neste caso, preferencialmente, após o final do processamento bem-sucedido.

var consumerResult = _consumer.Consume(token);
await messageHandler(
    consumerResult.Message.Headers.ToDictionary(),
    consumerResult.Message.Value);
_consumer.StoreOffset(consumerResult);
_consumer.Commit();

Com esse código garantimos que, caso messageHandler levante alguma exceção, o offset da partição não será atualizado. Mas só isso não é suficiente para garantir que a mensagem será consumida apenas uma vez.

O valor padrão desta propriedade é true.

ATENÇÃO! Você pode pensar que pelo simples fato de não chamar StoreOffset, uma chamada a Consume() irá retornar a mesma mensagem. Isso não é necessariamente verdade. Uma nova chamada a Consume() irá trazer a próxima mensagem (se houver) da fila. Para obter esse mesmo comportamento, seria preciso desconectar do broker e fazer uma nova conexão. Você pode fazer isso de várias formas: desde simplesmente destruir a conexão atual e solicitar uma nova (nesse caso, o consumer não pode ser singleton) ou se estiver trabalhando com Kubernetes, finalizar a aplicação. Por isso é importantíssimo ter bem claro o motivo e o comportamento da mensagem em caso de falha. Quero tentar novamente? Não guarde o offset. Não quero tentar novamente (a mensagem está inválida talvez)? Defina uma estratégia de descarte. Geralmente usa-se dead letter queue.

EnableAutoCommit = false

Só a chamada para StoreOffset não é suficiente. É preciso que as alterações sejam confirmadas no Kafka. Se isso não acontecer, caso um rebalance aconteça, o novo consumidor irá consumir as mensagens cujo Offset não foi atualizado. O EnableAutoCommit = true é indicado para casos em que o rebalance não acontece com frequência e se acontecer, o possível duplo consumo da mensagem manterá a operação idempotente. Nesta opção, o commit é feito automaticamente pela biblioteca e em background, no intervalo de tempo determinado pela propriedade `AutoCommitIntervalMs`.

Já com a chave desligada, o commit não acontecerá automaticamente. Desta forma você precisará fazer uma chamada para _consumer.Commit() a cada requisição – ou em qualquer outra frequência que você achar cabível. No nosso caso, preferimos fazer a chamada a Commit() para cada mensagem consumida.

O valor padrão para esta propriedade é true.

Com essas duas propriedades conseguimos alterar o fluxo das mensagens. Agora sim conseguimos implementar um consumer-> process -> commit. Algo semelhante ao Acknowledge presente em outros message brokers. Ainda assim, temos um ponto que estamos deixando passar batido: E se a aplicação “morrer” durante o processamento ou entrar em um estado instável que não a permita confirmar o fim do processamento da mensagem?

Estratégias de retry e timeout no processamento das mensagens no Kafka

Não é incomum que implementemos estratégias de retry nas nossas aplicações. Uma das estratégias mais comuns é a de configurar um número máximo de retentativas em um intervalo de tempo crescente. Se dissermos que um processamento deve tentar 4 vezes, com incremento baseado em Fibonacci, teríamos:

  • Tentativa 1: 1 segundo
  • Tentativa 2: 2 segundos
  • Tentativa 3: 3 segundos
  • Tentativa 4: 5 segundos
  • Falha

O tempo de execução desse processamento, no pior caso e até falhar, levaria 11 segundos. Mas consideremos que cada uma dessas tentativas faça uma requisição HTTP que leva 30 segundos até retornar um erro de timeout. Assim teríamos:

  • Tentativa 1: 30 + 1 segundo
  • Tentativa 2: 30 + 2 segundos
  • Tentativa 3: 30 + 3 segundos
  • Tentativa 4: 30 + 5 segundos
  • Falha

Neste caso, teríamos um tempo de processamento de 2 minutos e 11 segundos. Agora imagine que você iniciou no seu prompt de comando o consumo das mensagens e está esperando o fim do processamento. O que você faria após 1 minuto sem a aplicação responder? Provavelmente você cancelaria a execução e tentaria novamente. Você o Kafka tem algo em comum.

MaxPollIntervalMs

O Kafka observa o tempo entre cada chamada a Consume() – que internamente é traduzir para a biblioteca Librdkafka como uma chamada Poll(). Caso esse tempo exceda o valor configurado na propriedade, o consumo será considerado como perdido (sua aplicação falhou) e o Kafka irá reatribuir a partition para outro consumidor.

O valor dessa propriedade é muito importante porque ela dá ao Kafka a informação de falha no processamento. Uma configuração equivocada dessa mensagem, causando a má interpretação de que a aplicação está fora do ar, poderá levar ao reprocessamento da última mensagem. Por isso, ao implementar estratégias de retry na sua aplicação, fique atento se a soma de todo o tempo não irá extrapolar o valor de MaxPollIntervalMs.

O valor padrão dessa propriedade é 300000 (ou 5 minutos).

HeartbeatIntervalMs

Além do tempo entre as requisições de mensagens, o Kafka também utiliza outra informação para verificar se a aplicação está no ar. É o famoso heart beat. É uma chamada bastante leve que serve apenas para confirmar que as aplicações estão respondendo. Esta propriedade especifica o tempo entre cada “batida do coração”.

O valor padrão dessa propriedade é 3000 (3 segundos).

SessionTimeoutMs

É uma variável com importância diretamente ligada a HeartbeatIntervalMs. Apenas para continuar na metáfora do coração, imagine que a aplicação pode “sofrer de arritmia”, demorando um pouco para enviar o heartbeat para o Kafka. A propriedade SessionTimeoutMs estabelece um tempo de tolerância. Caso a negociação do heartbeat não aconteça no tempo determinado, a conexão com o Kafka é encerrada e um rebalance acontece.

O valor padrão dessa propriedade é de 10000 (10 segundos).

ATENÇÃO! Na versão 1.7.0 da Confluent.Kafka não é feita nenhuma validação da relação entre as propriedades HeartbeatIntervalMs e SessionTimeoutMs. Se você configurar valores muito próximos ou até inválidos, você poderá ter constantes erros:
Aplicação:
     _configuration.HeartbeatIntervalMs = 11000;
     _configuration.SessionTimeoutMs = 10000;
Log: 
 [thrd:main]: Consumer group session timed out (in join-state steady) after 10093 ms without a successful response from the group coordinator (broker 1001, last error was Success): revoking assignment and rejoining group

Tudo isso é o suficiente?

Essas configurações ajudam bastante do lado do consumer. É impressionante o quanto o Kafka facilita a nossa vida. Mas nem tudo se resolve apenas nele. Ainda existem alguns corner cases que são difíceis de reproduzir e raros de acontecer, mas que ainda assim existem em teoria. Por exemplo: o que acontece se a energia cair entre a finalização do processamento e a chamada a StoreOffset()? Em casos como esse a solução está no desenho arquitetural e não em alguma simples configuração de biblioteca e/ou código. É preciso medir, portanto, os custos e benefícios de manter um processo 100% à prova de falhas. Ou melhor: falhar rápido e voltar rápido.

Existem muitas outras configurações disponíveis para o Kafka. Algumas configuram os tempos de retry da aplicação, outras permitem que você resgate do servidor mais de uma mensagem – diminuindo a latência do consumo – e muitas mais. Eu aconselho que você visite a página com a lista de configurações da biblioteca e faça as combinações que melhor se aplicarem para o seu cenário.

E com base nesses comentários todos, no próximo artigo vamos realmente implementar o nosso consumer.

Deixe um comentário

O seu endereço de e-mail não será publicado. Campos obrigatórios são marcados com *

Esse site utiliza o Akismet para reduzir spam. Aprenda como seus dados de comentários são processados.