태그 보관물: apache-kafka

apache-kafka

메시지 처리에 실패하면 동일한 메시지를 다시 사용하십시오.

Confluent.Kafka .NET 클라이언트 버전 1.3.0을 사용하고 있습니다. 나는 문서를 따르고있다 :

var consumerConfig = new ConsumerConfig
{
    BootstrapServers = "server1, server2",
    AutoOffsetReset = AutoOffsetReset.Earliest,
    EnableAutoCommit = true,
    EnableAutoOffsetStore = false,
    GroupId = this.groupId,
    SecurityProtocol = SecurityProtocol.SaslPlaintext,
    SaslMechanism = SaslMechanism.Plain,
    SaslUsername = this.kafkaUsername,
    SaslPassword = this.kafkaPassword,
};

using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build())
{
    var cancellationToken = new CancellationTokenSource();
    Console.CancelKeyPress += (_, e) =>
    {
        e.Cancel = true;
        cancellationToken.Cancel();
    };

    consumer.Subscribe("my-topic");
    while (true)
    {
        try
        {
            var consumerResult = consumer.Consume();
            // process message
            consumer.StoreOffset(consumerResult);
        }
        catch (ConsumeException e)
        {
            // log
        }
        catch (KafkaException e)
        {
            // log
        }
        catch (OperationCanceledException e)
        {
            // log
        }
    }
}

문제는 내가 줄을 주석으로 경우에도 있다는 것입니다 consumer.StoreOffset(consumerResult);, 나는 다음 사용되지 않은 메시지를 내가 다음 번가 계속 소모를 즉, 유지 오프셋 (offset)는, 즉 무엇 문서가 요구 될 것 같지 않는 증가 적어도 하나의 전달 .

심지어 내가 설정하는 경우 EnableAutoCommit = false와 제거 ‘EnableAutoOffsetStore = 거짓’는 설정에서, 그리고 교체 consumer.StoreOffset(consumerResult)consumer.Commit()내가 주석 즉하더라도, 난 여전히 같은 행동을보고 Commit, 나는 아직도 다음 사용되지 않은 메시지가 계속.

나는 여기에 근본적인 것이 빠져 있다고 생각하지만 무엇을 알 수는 없습니다. 도움을 주셔서 감사합니다!



답변

댓글을 아직 추가 할 수 없습니다. Kafka 소비자는 메시지를 일괄 처리하므로 백그라운드 스레드에서 미리 가져온 일괄 처리를 반복 할 수 있습니다 .

kafka util을 사용하여 소비자가 실제로 오프셋을 커밋하는지 여부를 확인할 수 있습니다 kafka-consumer-groups.sh

kafka-consumer-groups.sh --bootstrap-server kafka-host:9092 --group consumer_group  --describe


답변

5 번과 같이 정해진 횟수만큼 각 메시지를 처리하기위한 재시도 로직을 원할 수 있습니다. 5 번의 재시도 중에 성공하지 못하면이 메시지를 다른 주제에 추가하여 모든 메시지를 처리 ​​할 수 ​​있습니다 실제 주제보다 우선하는 실패한 메시지 또는 실패한 메시지를 동일한 주제에 추가하여 다른 모든 메시지가 사용 된 후에 나중에 선택되도록 할 수 있습니다.

5 번의 재시도 내에서 메시지 처리에 성공하면 큐의 다음 메시지로 건너 뛸 수 있습니다.


답변