1

I am running a CDC pipeline with a PostgreSQL source (Debezium Source Connector) and a PostgreSQL target using the Confluent JDBC Sink Connector (version 10.6.4) to read data from Kafka. The issue I am facing is a persistent lag of approximately 40 messages, with a publishing rate of approx 5 messages per second.

Please find the details here for what all I have done till now. I have setup up a CDC pipeline to stream database changes from one database to another. Both Source and destination DB are PostgresSQL serving different Microservices. This is the Debezium Source Connector I am using.

Pipeline enter image description here

Here are the relevant configurations(properties that I have tuned) for the Confluent JDBC Sink Connector:

Connector Properties

  • consumer.override.fetch.min.bytes=101200
  • consumer.override.fetch.max.wait.ms=1000
  • consumer.override.auto.commit.interval.ms=100
  • consumer.override.max.poll.records=5000
  • consumer.override.max.poll.interval.ms=300000
  • consumer.override.offset.flush.interval.ms=200

Please find the other cases for which connector has been tested enter image description here

Additional Information:

I have tried running Debezium Sink Connectoras well (version 2.4.1) faced timeouts (Commit of offsets timed out).

  • Configurations for Debezium JDBC Sink Connector:
  • consumer.override.offset.flush.timeout.ms=5000
  • consumer.override.max.poll.interval.ms=1000
  • consumer.override.enable.auto.commit=true
  • consumer.override.max.poll.records=1
  • consumer.override.auto.commit.interval.ms=10000

Below are the logs of Debezium Sink Connector

{"debug_level":"INFO","debug_timestamp":"2023-11-27 15:38:33,051","debug_thread":"task-thread-debezium_sink_connector3-0","debug_file":"WorkerSinkTask.java", "debug_line":"352","debug_message":"WorkerSinkTask{id=debezium_sink_connector3-0} Committing offsets asynchronously using sequence number 4: {mesh.public.glusr_usr-0=OffsetAndMetadata{offset=141102239, leaderEpoch=null, metadata=''}}"}
{"debug_level":"WARN","debug_timestamp":"2023-11-27 15:38:51,675","debug_thread":"task-thread-debezium_sink_connector3-0","debug_file":"WorkerSinkTask.java", "debug_line":"225","debug_message":"WorkerSinkTask{id=debezium_sink_connector3-0} Commit of offsets timed out"}
{"debug_level":"INFO","debug_timestamp":"2023-11-27 15:39:25,408","debug_thread":"task-thread-debezium_sink_connector3-0","debug_file":"WorkerSinkTask.java", "debug_line":"352","debug_message":"WorkerSinkTask{id=debezium_sink_connector3-0} Committing offsets asynchronously using sequence number 5: {mesh.public.glusr_usr-0=OffsetAndMetadata{offset=141103017, leaderEpoch=null, metadata=''}}"}
{"debug_level":"WARN","debug_timestamp":"2023-11-27 15:39:59,140","debug_thread":"task-thread-debezium_sink_connector3-0","debug_file":"WorkerSinkTask.java", "debug_line":"225","debug_message":"WorkerSinkTask{id=debezium_sink_connector3-0} Commit of offsets timed out"} 

Conclusion

  • I am facing a persistence lag of around 10sec at sink end even though a single upsert query at destination database is taking only 1ms.
  • I have tried different kafka consumer configurations but the results are not promising.
  • I have observed that a change in configuration has effect on the consumer lag but we are unable to derive any relation ship between lag and configuration.

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.