I am running a CDC pipeline with a PostgreSQL source (Debezium Source Connector) and a PostgreSQL target using the Confluent JDBC Sink Connector (version 10.6.4) to read data from Kafka. The issue I am facing is a persistent lag of approximately 40 messages, with a publishing rate of approx 5 messages per second.
Please find the details here for what all I have done till now. I have setup up a CDC pipeline to stream database changes from one database to another. Both Source and destination DB are PostgresSQL serving different Microservices. This is the Debezium Source Connector I am using.
Here are the relevant configurations(properties that I have tuned) for the Confluent JDBC Sink Connector:
Connector Properties
- consumer.override.fetch.min.bytes=101200
- consumer.override.fetch.max.wait.ms=1000
- consumer.override.auto.commit.interval.ms=100
- consumer.override.max.poll.records=5000
- consumer.override.max.poll.interval.ms=300000
- consumer.override.offset.flush.interval.ms=200
Please find the other cases for which connector has been tested
Additional Information:
I have tried running Debezium Sink Connectoras well (version 2.4.1) faced timeouts (Commit of offsets timed out).
- Configurations for Debezium JDBC Sink Connector:
- consumer.override.offset.flush.timeout.ms=5000
- consumer.override.max.poll.interval.ms=1000
- consumer.override.enable.auto.commit=true
- consumer.override.max.poll.records=1
- consumer.override.auto.commit.interval.ms=10000
Below are the logs of Debezium Sink Connector
{"debug_level":"INFO","debug_timestamp":"2023-11-27 15:38:33,051","debug_thread":"task-thread-debezium_sink_connector3-0","debug_file":"WorkerSinkTask.java", "debug_line":"352","debug_message":"WorkerSinkTask{id=debezium_sink_connector3-0} Committing offsets asynchronously using sequence number 4: {mesh.public.glusr_usr-0=OffsetAndMetadata{offset=141102239, leaderEpoch=null, metadata=''}}"}
{"debug_level":"WARN","debug_timestamp":"2023-11-27 15:38:51,675","debug_thread":"task-thread-debezium_sink_connector3-0","debug_file":"WorkerSinkTask.java", "debug_line":"225","debug_message":"WorkerSinkTask{id=debezium_sink_connector3-0} Commit of offsets timed out"}
{"debug_level":"INFO","debug_timestamp":"2023-11-27 15:39:25,408","debug_thread":"task-thread-debezium_sink_connector3-0","debug_file":"WorkerSinkTask.java", "debug_line":"352","debug_message":"WorkerSinkTask{id=debezium_sink_connector3-0} Committing offsets asynchronously using sequence number 5: {mesh.public.glusr_usr-0=OffsetAndMetadata{offset=141103017, leaderEpoch=null, metadata=''}}"}
{"debug_level":"WARN","debug_timestamp":"2023-11-27 15:39:59,140","debug_thread":"task-thread-debezium_sink_connector3-0","debug_file":"WorkerSinkTask.java", "debug_line":"225","debug_message":"WorkerSinkTask{id=debezium_sink_connector3-0} Commit of offsets timed out"}
Conclusion
- I am facing a persistence lag of around 10sec at sink end even though a single upsert query at destination database is taking only 1ms.
- I have tried different kafka consumer configurations but the results are not promising.
- I have observed that a change in configuration has effect on the consumer lag but we are unable to derive any relation ship between lag and configuration.