2

I am working on a Beam application that uses KafkaIO as an input

KafkaIO.<Long, GenericRecord>read()
            .withBootstrapServers("bootstrapServers")
            .withTopic("topicName")
            .withConsumerConfigUpdates(confs)
            .withKeyDeserializer(LongDeserializer.class)
            .withValueDeserializer((Deserializer.class)
            .commitOffsetsInFinalize()
            .withoutMetadata();

I am trying to understand how exactly the commitOffsetsInFinalize() works.

How can the streaming job be finalized? The last step in the pipeline is a custom DoFn that writes the messages to DynamoDb. Is there any way to manually call some finalize() method there, so that the offsets are committed after each successful execution of the DoFn?

Also I am having hard time understanding whats the relation between the checkpoints and the finalization ? If no checkpoint is enabled on the pipeline, will I still be able to finalize and get the commitOffsetsInFinalize() to work?

p.s The way the pipeline is right now, even with the commitOffsetsInFinalize() each message that is read, regardless whether there is a failure downstream is being committed, hence causing a data lose.

Thank you!

1 Answer 1

2

The finalize here is referring to the finalization of the checkpoint, in other words when the data has been durably committed into Beam's runtime state (such that worker failures/reassignment will be retried without having to read this message from Kafka again). This does not mean that the data in question has made its way the rest of the way through the pipeline.

2
  • Thank you robertwb. What will happen if the checkpoint is never defined? What is the default checkpoint mechanism if the application is running on Flink? Also my understanding is that unless I have the ENABLE_AUTO_COMMIT_CONFIG, false the messages will still be committed regardless whether the commitOffsetsInFinalize() is set or not, is that correct ? Commented Dec 12, 2020 at 0:14
  • Correct, this in only useful if you also AUTO_COMMIT is not set the kafka consumer configuration.
    – robertwb
    Commented Dec 14, 2020 at 21:56

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Not the answer you're looking for? Browse other questions tagged or ask your own question.