All Questions
Tagged with confluent-platform python
126 questions
0
votes
0
answers
498
views
Produce to Confluent Kafka Topic with SASL_SSL and OAUTHBEARER mechanism using python client
I'm trying to produce one message to a topic in my RBAC-enabled and TLS-enabled Confluent Kafka cluster. My solution only works partially. I'm seeking help to successfully publish a message to the ...
1
vote
0
answers
71
views
How to set produced json key as document _id for mongoDB Atlas
I am building a data pipeline, where I am sending scraped data via Kafka to mongoDB Atlas (all in the cloud, not local).
I initialized a Producer p with my Confluent Kafka configs. Then I extract WKNs ...
1
vote
1
answer
1k
views
kerberos error while authenticating on Confluent Kafka
I´ve been trying to understand apache beam, confluent kafka and dataflow integration with python 3.8 and beam sdk 2.7 the desire result is to build a pipeline (which is going to be ran on dataflow) ...
0
votes
0
answers
123
views
What will happen if commit() method called on consumer running in auto commit mode?
I'm calling the consumer.commit() method on each message of consumer running in auto.commit=True mode. When I tried it locally it worked without any data loss or data duplication.
What are the effects ...
1
vote
0
answers
418
views
How to handle ValueDeserializationError in confluent_kafka python?
This is the base consumer class I'm using for creating new consumers. It works fine for "enable.auto.commit":True consumer. But when I create a consumer with enable.auto.commit=False and any ...
0
votes
0
answers
3k
views
How to serialize as string the key in kafka messages
I am using confluent-kafka and I need to serialize my keys as strings and produce some messages. I have a working code for the case where I retrieve the schema from the schema registry and use it to ...
0
votes
1
answer
819
views
Unable to push Json in KAFKA topic
I try to push data in JSON format inside a KAFKA topic but without success.
I used the following AVRO SCHEMA :
{"schemaType":"AVRO","schema":"{\"title\":\&...
1
vote
0
answers
2k
views
Unable to catch confluent-kafka-python timeout errors
My confluent-kafka-python - version 1.7.0 producer code is as below
from confluent_kafka import KafkaError, KafkaException
def publish_data():
try:
producer.produce(
topic=...
0
votes
1
answer
642
views
Kafka offset manual commit not updating offset value
My python confluent kafka code to read from the Kafka broker looks as below
self.consumer = Consumer(
{
"auto.offset.reset": "earliest",
...
0
votes
2
answers
4k
views
Disable Certificate validation in SchemaRegistryClient Confluent Kafka
So, I want to read a topic from kafka (Confluent) where data lies in Avro format.
For certain unavoidable reasons , I would like to disable certificate validation.
I am using security.protocol= ...
2
votes
3
answers
6k
views
Cannot import Producer from confluent_kafka
I'm using conda environment with python 3.9 and confluent_kafka library (installation by pip install confluent-kafka). I have also installed librdkafka. From pycharm I cannot import:
from ...
0
votes
1
answer
677
views
No such configuration property: "schema.compatibility.level" when trying to initialise a kafka producer
I am using confluent-kafka.
My code is
producer = SimpleAvroProducer(producer_id="producer_1",
topic_name="events_topic",
...
0
votes
0
answers
307
views
Schema Evolution in Kinesis for python consumer and producer
How can I have schema registry with Kinesis in python. AWS glue is in JAVA. I want similar to that in python. Can I use open source schema registry (ex: confluent schema registry) with AWS kinesis?
3
votes
0
answers
837
views
Confluent Kafka python schema parser causes conflict with fastavro
I am running Python 3.9 with Confluent Kafka 1.7.0, avro-python3 1.10.0 and fastavro 1.4.1.
The following code uses Avro schema encoder in order to encode a message, which succeeds only if we ...
1
vote
0
answers
39
views
how does message size effect message transfer speed in Kafka?
I am developing a wrapper to transfer specific types of messages over Kafka. Incoming data has a high throughput. I am looking to minimize the delay in Kafka. what are the factors that effects delay ...
1
vote
1
answer
3k
views
When does a delivery callback called with an error in confluent kafka client?
The Confluent Kafka library (python version in this case) has a produce method which takes a delivery callback function:
kafka_producer.produce(topic=topic,
key=key,
...
0
votes
1
answer
642
views
Kafka confluent proxy api - send message - Internal server error
I'm trying to wrap the Confluent kafka proxy api in one class that will handle producing and consuming.
Following this link: https://docs.confluent.io/platform/current/kafka-rest/api.html I tried to ...
2
votes
2
answers
2k
views
Consume Data from Confluent Kafka Topic and Exit using Python
I am trying to write a python code to consume data from a Confluent Kafka topic and perform data validation as part of a testing project. I am able to read the data, however the consume process is in ...
0
votes
0
answers
3k
views
Unknown KafkaError 87 from Kafka
I am writing an Avro message to Kafka using Confluent:
import avro_validator
import avro.schema
from confluent_kafka import avro as confluent_avro
from confluent_kafka.cimpl import KafkaError, Message
...
6
votes
2
answers
8k
views
Get Latest Message for a Confluent Kafka Topic in Python
Here's what I've tried so far:
from confluent_kafka import Consumer
c = Consumer({... several security/server settings skipped...
'auto.offset.reset': 'beginning',
'group....
1
vote
2
answers
2k
views
KSQL Cli Working but KSQL Python API is not working while KSQL server is running
I have managed to set up KSQL server and KSQL CLI client to consume stream using KSQL and its working using cli. For example show topics command is working below
ksql> SHOW TOPICS;
Kafka Topic ...
1
vote
0
answers
211
views
Confleunt Kafka - return messages after timestamp
Using this code I iterate over a cloud hosted Confluent Kafka topic:
import pyspark
import copy
import numpy as np
from collections import namedtuple
import json
import sklearn
from confluent_kafka ...
1
vote
0
answers
539
views
confluent-kafka-python didn't catch timed out metadatarequest in flight in producer
Hi our kafka has crashed for about 2 days, while that one of our producers using confluent kafka for python still tried producing a messages and while that we received logs that say Timed out ...
1
vote
0
answers
471
views
Why librdkafka doesn't throw any error/exception even when wrong broker hostnames are passed?
I am writing python app that used confluent_kafka api (which internally uses librdkafka) to calculate kafka throughput,however in this process i observed that even when I pass dummy broker name then ...
1
vote
1
answer
1k
views
Is there any point of calling poll() after produce if we are not using delivery callback in kafka?
We are trying to optimize our python app that produces message to kafka topic via confluent python kafka client and using confluent control center we can check whether particular message is delivered ...
0
votes
0
answers
246
views
confluent-kafka python client version is suitable for confluent-kafka 5.5.1 version
Which confluent-kafka python client version is suitable for confluent-kafka 5.5.1 version? In confluent documentation it just mentioned to install confluent-kafka not the version details.
0
votes
1
answer
2k
views
How to describe configuration of topic kafka?
I want to describe the configuration of one topic. I developed a script using confluent-kafka-python librairie (version 1.5.0) and my version of python is 2.7.
My final goal is to be able to change ...
0
votes
1
answer
901
views
None block 'while True' using asyncio
Using below code I'm attempting to start 2 infinite loops using asyncio:
async def do_job_1():
while True :
print('do_job_1')
await asyncio.sleep(5)
async def do_job_2():
...
1
vote
1
answer
891
views
Unable to write avro data to kafka using python
I'm using kafka kafka_2.11-0.11.0.2 and confluent version 3.3.0 for schema registry.
I have defined an avro schema as follows:
{
"namespace": "com.myntra.search",
"type": ...
0
votes
1
answer
3k
views
Confluent Kafka Python Error: Metadata request failed
Getting below error. Not Sure whats wrong.
%5|1591739081.630|REQTMOUT|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/290: Timed out HeartbeatRequest in flight (after 10622ms, timeout #...
0
votes
2
answers
761
views
store message from kafka to hdfs
I'm using confluent-Kafka with python.
I want to connect Kafka to Hdfs and store some particular data direct in Hdfs.
I found the confluent HDFS connector, but I didn't get how to connect it with my ...
4
votes
2
answers
7k
views
How to determine if a kafka topic exists using confluent-kafka-python
I'm using the confluent-kafka-python package to interface with a Kafka server. I can successfully create the topic and push events to it. However, my problem lies when I spin up multiple nodes (...
0
votes
0
answers
2k
views
Kafka Python Consumer - Confluent Kafka
I have written a Python consumer using confluent-kafka package. After few hours of running the consumer is dying with the below error
cimpl.KafkaException:
KafkaError{code=_TIMED_OUT,val=-185,str="...
4
votes
1
answer
7k
views
Kafka python Consumer group session timed out
I use confluent-kafka v1.3.0 and I have following problem with consumer group session timeout.
My config looks like:
c['KAFKA'] = {
'bootstrap.servers': 'host.docker.internal:9104',
'...
1
vote
2
answers
1k
views
Confluent_kafka Producer does not publish messages into topic
I tried to install Kafka on my Raspberry. And test it on 'hello-kafka' topic:
~ $ /usr/local/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic hello-kafka
>Test message 1
&...
0
votes
1
answer
2k
views
How to get schema from confluent schema registry with schema id and version using python
Can we pass both schema id and version to get the schema from schema registry? I know about these functions,
Getting schema by ID
sr = SchemaRegistryClient('localhost:8081')
my_schema = sr.get_by_id(...
0
votes
0
answers
487
views
Unable to produce any Kafka messages
I am new to Kafka. I was trying to run the example code stated in https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/avro-cli.py.
As per the code, this is how producer and ...
0
votes
2
answers
2k
views
How can I produce without schema as avro type with python?
I use below code and send message to kafka. It works.
But I want to send message without schema because I have schema on kafka topic. I register it first. I do not want to send schema everytime.
...
4
votes
2
answers
7k
views
Cannot pipenv install confluent-kafka 1.4.0 from source (pypi) - no workaround seems to work
There seems to be an issue logged with the current Confluent-Kafka package on pypi:
I have a Dockerfile with the foll code which used to work until the issue happened :
RUN cd /tmp && git ...
0
votes
2
answers
428
views
Kafka Consumer: How to read specific Avro field in Python?
In the below snippet of the consumer, I am able to receive the data sent. How do i access particular values from the entire data to work with.
from confluent_kafka import KafkaError
from ...
8
votes
0
answers
2k
views
confluent kafka python Consumer.poll() always return None while official console-consumer works properly
The Consumer instance from confluent-kafka python client always returns None when calling poll() with timeout set.
The topic does contains some message and the official console consumer works fine:
...
2
votes
1
answer
5k
views
Confluent kafka python SSL verification
I am using confluent kafka python 'https://github.com/confluentinc/confluent-kafka-python' for writing application. Both kafka and schema registry is secured and uses https endpoints.
While running ...
0
votes
1
answer
67
views
What is the behaviour of kafka when a commit is made without reading the message?
I have code that looks like.
def message_reader(consumer):
consumed_message = consumer.consume_batch()
if consumed_message:
#do something
def run_reader():
process_consumer = ...
1
vote
1
answer
3k
views
how to configure Kafka-Python client.dns.lookup = "use_all_dns_ips"
I want to configure my python producer/consumer with client.dns.lookup = "use_all_dns_ips"
but none of python kafka clients like kafka-python (2.0.1) and confluent-kafka (1.3.0) support this
This is ...
1
vote
1
answer
532
views
Trying to consume messages with python multiprocessing
I am not able to consume messages with the below code. I am able to consume if I just directly consOne.startLoop(). What am I missing here. Appreciate the help.
from confluent_kafka import Consumer, ...
1
vote
1
answer
241
views
Kafka Persistence State-Store Not Worked with combined use of python and java
Today i found very strange thing in Kafka state store i google lot but didn't found the reason for the behavior.
Consider the below state store written in java:
private KeyValueStore<String, ...
1
vote
3
answers
753
views
Kafka different partitions are selected python confluent_kafka library v/s apache kafka Java
I am publishing same data (Topic, Key & Value) from python confluent_kafka library based Producer v/s Java apache library based producer but when messages checked on Kafka then they are published ...
0
votes
1
answer
296
views
Python confluent_kafka: consume(0) cannot trigger callbacks
I am using confluent-kafka-python and librdkafka.
But I think, confluent_kafka.Consumer.consume(0) cannot trigger stats_cb.
Consumer_consume(0) => rd_kafka_consume_batch_queue
=> ...
2
votes
1
answer
4k
views
How do I scale Kafka Consumers in python?
This probably has multiple questions so bear with me. I am still figuring out the right way to use the Kafka Architecture. I know that the partitions of a topic are divided b/w the consumers.
What ...
2
votes
0
answers
2k
views
Connecting to Confluent Cloud with AIOKafka client
I'm trying to connect to my Confluent Cloud Kafka cluster using a modified version of the AIOKafka ssl_consume_produce.py example in the AIOKafka repo at https://github.com/aio-libs/aiokafka/blob/...