I am running a CDC pipeline with a PostgreSQL source (Debezium Source Connector) and a PostgreSQL target using the Confluent JDBC Sink Connector (version 10.6.4) to read data from Kafka. The issue I am facing is a persistent lag of approximately 40 messages, with a publishing rate of approx 5 messages per second.

Please find the details here for what all I have done till now. I have setup up a CDC pipeline to stream database changes from one database to another. Both Source and destination DB are PostgresSQL serving different Microservices. This is the Debezium Source Connector I am using.

Pipeline enter image description here

Here are the relevant configurations(properties that I have tuned) for the Confluent JDBC Sink Connector:

Connector Properties

  • consumer.override.fetch.min.bytes=101200
  • consumer.override.fetch.max.wait.ms=1000
  • consumer.override.auto.commit.interval.ms=100
  • consumer.override.max.poll.records=5000
  • consumer.override.max.poll.interval.ms=300000
  • consumer.override.offset.flush.interval.ms=200

Please find the other cases for which connector has been tested enter image description here

Additional Information:

I have tried running Debezium Sink Connectoras well (version 2.4.1) faced timeouts (Commit of offsets timed out).

  • Configurations for Debezium JDBC Sink Connector:
  • consumer.override.offset.flush.timeout.ms=5000
  • consumer.override.max.poll.interval.ms=1000
  • consumer.override.enable.auto.commit=true
  • consumer.override.max.poll.records=1
  • consumer.override.auto.commit.interval.ms=10000

Below are the logs of Debezium Sink Connector

{"debug_level":"INFO","debug_timestamp":"2023-11-27 15:38:33,051","debug_thread":"task-thread-debezium_sink_connector3-0","debug_file":"WorkerSinkTask.java", "debug_line":"352","debug_message":"WorkerSinkTask{id=debezium_sink_connector3-0} Committing offsets asynchronously using sequence number 4: {mesh.public.glusr_usr-0=OffsetAndMetadata{offset=141102239, leaderEpoch=null, metadata=''}}"}
{"debug_level":"WARN","debug_timestamp":"2023-11-27 15:38:51,675","debug_thread":"task-thread-debezium_sink_connector3-0","debug_file":"WorkerSinkTask.java", "debug_line":"225","debug_message":"WorkerSinkTask{id=debezium_sink_connector3-0} Commit of offsets timed out"}
{"debug_level":"INFO","debug_timestamp":"2023-11-27 15:39:25,408","debug_thread":"task-thread-debezium_sink_connector3-0","debug_file":"WorkerSinkTask.java", "debug_line":"352","debug_message":"WorkerSinkTask{id=debezium_sink_connector3-0} Committing offsets asynchronously using sequence number 5: {mesh.public.glusr_usr-0=OffsetAndMetadata{offset=141103017, leaderEpoch=null, metadata=''}}"}
{"debug_level":"WARN","debug_timestamp":"2023-11-27 15:39:59,140","debug_thread":"task-thread-debezium_sink_connector3-0","debug_file":"WorkerSinkTask.java", "debug_line":"225","debug_message":"WorkerSinkTask{id=debezium_sink_connector3-0} Commit of offsets timed out"} 


  • I am facing a persistence lag of around 10sec at sink end even though a single upsert query at destination database is taking only 1ms.
  • I have tried different kafka consumer configurations but the results are not promising.
  • I have observed that a change in configuration has effect on the consumer lag but we are unable to derive any relation ship between lag and configuration.


Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.