Skip to main content

All Questions

Filter by
Sorted by
Tagged with
0 votes
0 answers
498 views

Produce to Confluent Kafka Topic with SASL_SSL and OAUTHBEARER mechanism using python client

I'm trying to produce one message to a topic in my RBAC-enabled and TLS-enabled Confluent Kafka cluster. My solution only works partially. I'm seeking help to successfully publish a message to the ...
Moritz Wolff's user avatar
1 vote
0 answers
71 views

How to set produced json key as document _id for mongoDB Atlas

I am building a data pipeline, where I am sending scraped data via Kafka to mongoDB Atlas (all in the cloud, not local). I initialized a Producer p with my Confluent Kafka configs. Then I extract WKNs ...
ku11's user avatar
  • 53
1 vote
1 answer
1k views

kerberos error while authenticating on Confluent Kafka

I´ve been trying to understand apache beam, confluent kafka and dataflow integration with python 3.8 and beam sdk 2.7 the desire result is to build a pipeline (which is going to be ran on dataflow) ...
Christian 's user avatar
0 votes
0 answers
123 views

What will happen if commit() method called on consumer running in auto commit mode?

I'm calling the consumer.commit() method on each message of consumer running in auto.commit=True mode. When I tried it locally it worked without any data loss or data duplication. What are the effects ...
Akash Pagar's user avatar
1 vote
0 answers
418 views

How to handle ValueDeserializationError in confluent_kafka python?

This is the base consumer class I'm using for creating new consumers. It works fine for "enable.auto.commit":True consumer. But when I create a consumer with enable.auto.commit=False and any ...
Akash Pagar's user avatar
0 votes
0 answers
3k views

How to serialize as string the key in kafka messages

I am using confluent-kafka and I need to serialize my keys as strings and produce some messages. I have a working code for the case where I retrieve the schema from the schema registry and use it to ...
Eypros's user avatar
  • 5,703
0 votes
1 answer
819 views

Unable to push Json in KAFKA topic

I try to push data in JSON format inside a KAFKA topic but without success. I used the following AVRO SCHEMA : {"schemaType":"AVRO","schema":"{\"title\":\&...
Phil's user avatar
  • 314
1 vote
0 answers
2k views

Unable to catch confluent-kafka-python timeout errors

My confluent-kafka-python - version 1.7.0 producer code is as below from confluent_kafka import KafkaError, KafkaException def publish_data(): try: producer.produce( topic=...
Zaks's user avatar
  • 690
0 votes
1 answer
642 views

Kafka offset manual commit not updating offset value

My python confluent kafka code to read from the Kafka broker looks as below self.consumer = Consumer( { "auto.offset.reset": "earliest", ...
Zaks's user avatar
  • 690
0 votes
2 answers
4k views

Disable Certificate validation in SchemaRegistryClient Confluent Kafka

So, I want to read a topic from kafka (Confluent) where data lies in Avro format. For certain unavoidable reasons , I would like to disable certificate validation. I am using security.protocol= ...
Saugat Mukherjee's user avatar
2 votes
3 answers
6k views

Cannot import Producer from confluent_kafka

I'm using conda environment with python 3.9 and confluent_kafka library (installation by pip install confluent-kafka). I have also installed librdkafka. From pycharm I cannot import: from ...
Krzysiek's user avatar
0 votes
1 answer
677 views

No such configuration property: "schema.compatibility.level" when trying to initialise a kafka producer

I am using confluent-kafka. My code is producer = SimpleAvroProducer(producer_id="producer_1", topic_name="events_topic", ...
darthsithius's user avatar
0 votes
0 answers
307 views

Schema Evolution in Kinesis for python consumer and producer

How can I have schema registry with Kinesis in python. AWS glue is in JAVA. I want similar to that in python. Can I use open source schema registry (ex: confluent schema registry) with AWS kinesis?
jeevitha's user avatar
3 votes
0 answers
837 views

Confluent Kafka python schema parser causes conflict with fastavro

I am running Python 3.9 with Confluent Kafka 1.7.0, avro-python3 1.10.0 and fastavro 1.4.1. The following code uses Avro schema encoder in order to encode a message, which succeeds only if we ...
gt6989b's user avatar
  • 4,193
1 vote
0 answers
39 views

how does message size effect message transfer speed in Kafka?

I am developing a wrapper to transfer specific types of messages over Kafka. Incoming data has a high throughput. I am looking to minimize the delay in Kafka. what are the factors that effects delay ...
sam's user avatar
  • 335
1 vote
1 answer
3k views

When does a delivery callback called with an error in confluent kafka client?

The Confluent Kafka library (python version in this case) has a produce method which takes a delivery callback function: kafka_producer.produce(topic=topic, key=key, ...
coderelliot's user avatar
0 votes
1 answer
642 views

Kafka confluent proxy api - send message - Internal server error

I'm trying to wrap the Confluent kafka proxy api in one class that will handle producing and consuming. Following this link: https://docs.confluent.io/platform/current/kafka-rest/api.html I tried to ...
zbeedatm's user avatar
  • 157
2 votes
2 answers
2k views

Consume Data from Confluent Kafka Topic and Exit using Python

I am trying to write a python code to consume data from a Confluent Kafka topic and perform data validation as part of a testing project. I am able to read the data, however the consume process is in ...
AK_sat's user avatar
  • 101
0 votes
0 answers
3k views

Unknown KafkaError 87 from Kafka

I am writing an Avro message to Kafka using Confluent: import avro_validator import avro.schema from confluent_kafka import avro as confluent_avro from confluent_kafka.cimpl import KafkaError, Message ...
Max's user avatar
  • 755
6 votes
2 answers
8k views

Get Latest Message for a Confluent Kafka Topic in Python

Here's what I've tried so far: from confluent_kafka import Consumer c = Consumer({... several security/server settings skipped... 'auto.offset.reset': 'beginning', 'group....
ArtOfWarfare's user avatar
  • 21.4k
1 vote
2 answers
2k views

KSQL Cli Working but KSQL Python API is not working while KSQL server is running

I have managed to set up KSQL server and KSQL CLI client to consume stream using KSQL and its working using cli. For example show topics command is working below ksql> SHOW TOPICS; Kafka Topic ...
iamabhaykmr's user avatar
  • 2,001
1 vote
0 answers
211 views

Confleunt Kafka - return messages after timestamp

Using this code I iterate over a cloud hosted Confluent Kafka topic: import pyspark import copy import numpy as np from collections import namedtuple import json import sklearn from confluent_kafka ...
blue-sky's user avatar
  • 53.7k
1 vote
0 answers
539 views

confluent-kafka-python didn't catch timed out metadatarequest in flight in producer

Hi our kafka has crashed for about 2 days, while that one of our producers using confluent kafka for python still tried producing a messages and while that we received logs that say Timed out ...
dfhyr123's user avatar
1 vote
0 answers
471 views

Why librdkafka doesn't throw any error/exception even when wrong broker hostnames are passed?

I am writing python app that used confluent_kafka api (which internally uses librdkafka) to calculate kafka throughput,however in this process i observed that even when I pass dummy broker name then ...
PapaDiHatti's user avatar
  • 1,921
1 vote
1 answer
1k views

Is there any point of calling poll() after produce if we are not using delivery callback in kafka?

We are trying to optimize our python app that produces message to kafka topic via confluent python kafka client and using confluent control center we can check whether particular message is delivered ...
PapaDiHatti's user avatar
  • 1,921
0 votes
0 answers
246 views

confluent-kafka python client version is suitable for confluent-kafka 5.5.1 version

Which confluent-kafka python client version is suitable for confluent-kafka 5.5.1 version? In confluent documentation it just mentioned to install confluent-kafka not the version details.
Rena76's user avatar
  • 46
0 votes
1 answer
2k views

How to describe configuration of topic kafka?

I want to describe the configuration of one topic. I developed a script using confluent-kafka-python librairie (version 1.5.0) and my version of python is 2.7. My final goal is to be able to change ...
LilyAZ's user avatar
  • 163
0 votes
1 answer
901 views

None block 'while True' using asyncio

Using below code I'm attempting to start 2 infinite loops using asyncio: async def do_job_1(): while True : print('do_job_1') await asyncio.sleep(5) async def do_job_2(): ...
blue-sky's user avatar
  • 53.7k
1 vote
1 answer
891 views

Unable to write avro data to kafka using python

I'm using kafka kafka_2.11-0.11.0.2 and confluent version 3.3.0 for schema registry. I have defined an avro schema as follows: { "namespace": "com.myntra.search", "type": ...
nish's user avatar
  • 7,260
0 votes
1 answer
3k views

Confluent Kafka Python Error: Metadata request failed

Getting below error. Not Sure whats wrong. %5|1591739081.630|REQTMOUT|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/290: Timed out HeartbeatRequest in flight (after 10622ms, timeout #...
Hardik Patira's user avatar
0 votes
2 answers
761 views

store message from kafka to hdfs

I'm using confluent-Kafka with python. I want to connect Kafka to Hdfs and store some particular data direct in Hdfs. I found the confluent HDFS connector, but I didn't get how to connect it with my ...
Ayoub's user avatar
  • 19
4 votes
2 answers
7k views

How to determine if a kafka topic exists using confluent-kafka-python

I'm using the confluent-kafka-python package to interface with a Kafka server. I can successfully create the topic and push events to it. However, my problem lies when I spin up multiple nodes (...
nalyd88's user avatar
  • 5,128
0 votes
0 answers
2k views

Kafka Python Consumer - Confluent Kafka

I have written a Python consumer using confluent-kafka package. After few hours of running the consumer is dying with the below error cimpl.KafkaException: KafkaError{code=_TIMED_OUT,val=-185,str="...
Rajib Deb's user avatar
  • 1,734
4 votes
1 answer
7k views

Kafka python Consumer group session timed out

I use confluent-kafka v1.3.0 and I have following problem with consumer group session timeout. My config looks like: c['KAFKA'] = { 'bootstrap.servers': 'host.docker.internal:9104', '...
XWizard's user avatar
  • 339
1 vote
2 answers
1k views

Confluent_kafka Producer does not publish messages into topic

I tried to install Kafka on my Raspberry. And test it on 'hello-kafka' topic: ~ $ /usr/local/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic hello-kafka >Test message 1 &...
Максим Виговский's user avatar
0 votes
1 answer
2k views

How to get schema from confluent schema registry with schema id and version using python

Can we pass both schema id and version to get the schema from schema registry? I know about these functions, Getting schema by ID sr = SchemaRegistryClient('localhost:8081') my_schema = sr.get_by_id(...
1689's user avatar
  • 127
0 votes
0 answers
487 views

Unable to produce any Kafka messages

I am new to Kafka. I was trying to run the example code stated in https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/avro-cli.py. As per the code, this is how producer and ...
Dee's user avatar
  • 51
0 votes
2 answers
2k views

How can I produce without schema as avro type with python?

I use below code and send message to kafka. It works. But I want to send message without schema because I have schema on kafka topic. I register it first. I do not want to send schema everytime. ...
CompEng's user avatar
  • 7,366
4 votes
2 answers
7k views

Cannot pipenv install confluent-kafka 1.4.0 from source (pypi) - no workaround seems to work

There seems to be an issue logged with the current Confluent-Kafka package on pypi: I have a Dockerfile with the foll code which used to work until the issue happened : RUN cd /tmp && git ...
banditKing's user avatar
  • 9,579
0 votes
2 answers
428 views

Kafka Consumer: How to read specific Avro field in Python?

In the below snippet of the consumer, I am able to receive the data sent. How do i access particular values from the entire data to work with. from confluent_kafka import KafkaError from ...
Aerluft's user avatar
  • 41
8 votes
0 answers
2k views

confluent kafka python Consumer.poll() always return None while official console-consumer works properly

The Consumer instance from confluent-kafka python client always returns None when calling poll() with timeout set. The topic does contains some message and the official console consumer works fine: ...
Alex's user avatar
  • 353
2 votes
1 answer
5k views

Confluent kafka python SSL verification

I am using confluent kafka python 'https://github.com/confluentinc/confluent-kafka-python' for writing application. Both kafka and schema registry is secured and uses https endpoints. While running ...
SunilS's user avatar
  • 2,268
0 votes
1 answer
67 views

What is the behaviour of kafka when a commit is made without reading the message?

I have code that looks like. def message_reader(consumer): consumed_message = consumer.consume_batch() if consumed_message: #do something def run_reader(): process_consumer = ...
darthsithius's user avatar
1 vote
1 answer
3k views

how to configure Kafka-Python client.dns.lookup = "use_all_dns_ips"

I want to configure my python producer/consumer with client.dns.lookup = "use_all_dns_ips" but none of python kafka clients like kafka-python (2.0.1) and confluent-kafka (1.3.0) support this This is ...
user3524049's user avatar
1 vote
1 answer
532 views

Trying to consume messages with python multiprocessing

I am not able to consume messages with the below code. I am able to consume if I just directly consOne.startLoop(). What am I missing here. Appreciate the help. from confluent_kafka import Consumer, ...
neelabalan's user avatar
1 vote
1 answer
241 views

Kafka Persistence State-Store Not Worked with combined use of python and java

Today i found very strange thing in Kafka state store i google lot but didn't found the reason for the behavior. Consider the below state store written in java: private KeyValueStore<String, ...
Mohit Singh's user avatar
1 vote
3 answers
753 views

Kafka different partitions are selected python confluent_kafka library v/s apache kafka Java

I am publishing same data (Topic, Key & Value) from python confluent_kafka library based Producer v/s Java apache library based producer but when messages checked on Kafka then they are published ...
Sat's user avatar
  • 161
0 votes
1 answer
296 views

Python confluent_kafka: consume(0) cannot trigger callbacks

I am using confluent-kafka-python and librdkafka. But I think, confluent_kafka.Consumer.consume(0) cannot trigger stats_cb. Consumer_consume(0) => rd_kafka_consume_batch_queue => ...
BAE's user avatar
  • 8,924
2 votes
1 answer
4k views

How do I scale Kafka Consumers in python?

This probably has multiple questions so bear with me. I am still figuring out the right way to use the Kafka Architecture. I know that the partitions of a topic are divided b/w the consumers. What ...
Vineeth Vishwanath's user avatar
2 votes
0 answers
2k views

Connecting to Confluent Cloud with AIOKafka client

I'm trying to connect to my Confluent Cloud Kafka cluster using a modified version of the AIOKafka ssl_consume_produce.py example in the AIOKafka repo at https://github.com/aio-libs/aiokafka/blob/...
galen211's user avatar