Skip to main content
Filter by
Sorted by
Tagged with
0 votes
0 answers
21 views

Kafka message processed by multiple instances of the same application all in the same consumer group

I am processing kafka messages in spark streaming. My topic has 2 partitions and one consumer group. Error condition lead to 3/4 or more instances of the same application running, all in the same ...
user1860447's user avatar
  • 1,436
0 votes
0 answers
51 views

spark on EMR error when using `foreachBatch`: "terminated with exception: Error while obtaining a new communication channel"

I use spark on EMR with versions: emr-6.13.0, Spark 3.4.1 i try to run a simple spark streaming job that read from kafka and write to memory-table using foreachBatch and get failure "Error while ...
shayms8's user avatar
  • 771
0 votes
0 answers
120 views

Unable to sync non-partitioned Hudi table with BigQuery

I'm trying to to write my structured streaming data to Apache Hudi in a non-partitioned table and then sync it with BigQuery. But even though it is a new table and I've set no partitioning ...
Vinayak Gupta's user avatar
1 vote
2 answers
94 views

Issue with Multiple Spark Structured Streaming Jobs Consuming Same Kafka Topic

I have two separate Python scripts (job1.py and job2.py) that use Spark Structured Streaming to consume data from the Kafka topic test1. Both scripts are configured with the same Kafka consumer group (...
Nagaraj's user avatar
  • 57
0 votes
0 answers
74 views

How to get starting offset and ending offset of a kafka batch in Spark Streaming (Java)

In spark streaming (using Java), I am trying to print the start offset and and last offset of a specific batch taken from Kafka along with the batch related metrics (like processingStartTime, ...
Ravi's user avatar
  • 961
0 votes
0 answers
31 views

Spark streaming with Kafka: ERROR in fold checkpoint Multiple streaming queries

I am a beginner in Apache Spark. I am researching the problem of restarting a job when an error occurs when using Spark streaming with Kafka. I tried deleting the latest file in the commit folder and ...
Đức Hân Trần's user avatar
0 votes
0 answers
25 views

Spark stream to stream join failing with error java.lang.IllegalStateException: Error reading delta file

I am reading two streams as dataframe from Kafka, joining them and trying to process the joined DF as below private def readStream(spark: SparkSession, topicName: String, schema: StructType): ...
Sanket Sanath's user avatar
0 votes
0 answers
114 views

How to correctly parse data format Kafka from Azure EventHub

I am new into Kafka and I am trying to read Stream events coming from an Azure EventHub using python but I am not able to correctly extract or parse the data. kafka_options = { # Port 9093 is the ...
s528060's user avatar
  • 73
0 votes
0 answers
37 views

java.lang.NoClassDefFoundError: Could not initialize class kafka.utils.Json$

I'm trying to use Spark Streaming with Kafka in a Java application, and I'm encountering issues when using spark-submit with Spark 2.11.0. I've set up my dependencies in Maven as follows: <...
user23740029's user avatar
1 vote
0 answers
126 views

Spark foreachBatch concurrency and time-based triggers

Does foreachBatch method is guaranteed to run a callback "exclusively", or it will be started asynchronously? I'm just working on a custom sink with relying on consecutive, non-concurrent ...
Timurib's user avatar
  • 2,743
0 votes
0 answers
100 views

Buffer Overflow Exception in Spark Structured Streaming with Kafka

Getting the below error while streaming data from Kafka using spark structured streaming. java.lang.IllegalStateException: Buffer overflow when available data size(16384)>=application buffer size (...
Om Prakash's user avatar
0 votes
1 answer
466 views

Periodic processing time spikes in spark structured streaming

I am wondering why every 4th batch of my spark streaming application has a huge spike. Some details This is sateful processing using rocksdb state store Reading from Kafka with 180 partitions Writing ...
PiyushC's user avatar
  • 308
0 votes
0 answers
124 views

py4j.protocol.Py4JJavaError: An error occurred while calling o37.load. : java.lang.NoClassDefFoundError: scala/$less$colon$less

I am encountering an issue when trying to load data from Kafka using Apache Spark version 3.3.4 with Scala version 2.12.8. The error message I receive is as follows: py4j.protocol.Py4JJavaError: An ...
Kunal's user avatar
  • 1
0 votes
0 answers
45 views

java.lang.NoClassDefFoundError running example streaming.DirectKafkaWordCount

I am trying to run an example program DirectKafkaWordCount using this command: bin/run-example --packages "org.apache.kafka:kafka-clients:3.6.1,org.apache.spark:spark-streaming-kafka_2.11:1.6. ...
Raghu's user avatar
  • 1
0 votes
1 answer
244 views

Dynamic Filter with Spark Structured Streaming

I am working on my Spark Streaming project with the goal of creating a simple app to notify users when a data stream meets a condition (e.g., sending a notification when a stock price > x). df = ...
Baubau Tran's user avatar
0 votes
0 answers
101 views

pyspark streaming fill all millisecond

I'm new a pyspark streaming i have a data stream from kafka that i want to fill every 0.1 seconds the timestamp from kafka is irregular I'm trying to fill all timestamp that are not there with the ...
kreemo's user avatar
  • 5
0 votes
0 answers
39 views

Kafka consumer fetches new messages only after restarting the consumer

I'm facing an issue with my kafka consumer job written in scala. when we start the consumer, it fetches all messages available in the broker from the last consumed offset, process those JSON messages ...
Mani Ganesh's user avatar
0 votes
1 answer
462 views

Disable Kafka Warnings for Spark Streaming Application

I use Spark Structured Streaming (pyspark) to read data from Kafka topic. It works well but when I open executors stderr my whole log page is WARN from Kafka saying that kafkadataconsumer is not ...
Huvi's user avatar
  • 73
0 votes
0 answers
1k views

spark streaming kafka auth with ssl certificates

Faced with the problem of authentication in the kafka topic using SSL from spark-streaming. I've got 3 ssl certs in pem format for authentication in the kafka topic: ssl_cafile ssl_certfile ...
nagibator_archivator's user avatar
0 votes
0 answers
134 views

Convert some specific columns that have 0 and 1 values in Kafka messages to False and True in PySpark

Requirement We are consuming messages from Kafka using PySpark. In these JSON messages, there are some keys corresponding to which we have values such as 0 and 1. Now the requirement here is to ...
tall-e.stark's user avatar
1 vote
0 answers
673 views

Spark Streaming - NoSuchMethodError: scala.collection.immutable.Map$.apply

I have a simple Spark streaming java program: import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.spark.SparkConf; import org.apache.spark.streaming.Durations; import org.apache....
Eugene Goldberg's user avatar
0 votes
1 answer
312 views

Async Checkpointing in Spark Structured Streaming using RocksDB

I am currently exploring on enabling async checkpointing in Spark Structured streaming , but not able to find any way for the same. DataBricks is offering the same for its flavour of Spark. Spark ...
Aviral Kumar's user avatar
1 vote
0 answers
28 views

Spark Streaming query dependency

I have a use case where two Spark streaming queries are running and consuming data from the same topic.I want to have a guarantee that Query1 consumes and process the data from the topic before Query2 ...
V Dineshkumar's user avatar
0 votes
1 answer
866 views

Error-Queries with streaming sources must be executed with writeStream.start();; kafka

I was trying is to handle real-time data streaming of kafka using pyspark. I got a table that get updated realtime . whenever there is a content on the table, i need to aggregate it and stream the ...
Scarlett Code's user avatar
0 votes
0 answers
110 views

Distinct operation in spark structured streaming with a window operation

I want to implement a distinct operation in Spark structured code. I have already watermarked it and windowed it, but still, Spark is not able to execute it. FYI - Distinct comes under the list of ...
Deval's user avatar
  • 97
0 votes
1 answer
504 views

Not able to read data through kafka spark streaming in pyspark

I am working on creating a basic streaming app which reads streaming data from kafka and process the data. Below is the code I am trying in pyspark spark = SparkSession.builder.appName("testing&...
Nikhil's user avatar
  • 121
1 vote
0 answers
47 views

sparkstreaming kafka cunsumer auto close,

I don't want to use one consumer for all topics, I want to use this method to improve consumption efficiency val kafkaParams = Map( ConsumerConfig.GROUP_ID_CONFIG -> group, ...
1580923067's user avatar
1 vote
0 answers
462 views

kafka stream with AWS Glue - authorization issue

I want to stream a kafka topic from Glue job but I got the following error StreamingQueryException: Not authorized to access topics: [topic_name] This my current script # Script generated for node ...
Smaillns's user avatar
  • 3,119
1 vote
0 answers
239 views

pyspark ml training ALS: No ratings available from MapPartitionsRDD

I'm trying to train ALS with data in each batch from kafka using spark streaming and facing with below error. I think it's because the rating column is negative or something invalid like wrong data ...
Ngọc An's user avatar
2 votes
1 answer
1k views

Spark not giving equal tasks to all executors

I am reading from kafka topic which has 5 partitions. Since 5 cores are not sufficient to handle the load, I am doing repartitioning the input to 30. I have given 30 cores to my spark process with 6 ...
best wishes's user avatar
  • 6,564
1 vote
1 answer
789 views

Spark speculative tasks and its performance overhead

I am currently exploring spark's speculative tasks option. Below are my configuration which I am planning to use. I am reading the data from kafka and using repartition() I am creating around 200+ ...
Shane's user avatar
  • 626
0 votes
1 answer
728 views

Spark structured streaming job not processing stages and showing in hung state

I am running one streaming application and processing data from Kafka to Kafka using spark. If i am using latest then its working as expected and running without any issue. but in source we have done ...
Sonu's user avatar
  • 77
0 votes
1 answer
2k views

offset management in spark streaming

As far as i understand,for a spark streaming application(structured streaming or otherwise),to manually manage the offsets ,spark provides the feature of checkpointing where you just have to configure ...
Gaurav Gupta's user avatar
1 vote
1 answer
300 views

kafkaUtils.createDirectStream gives an error

I changed a line from createStream to createDirectStream since the new library does not support createStream I have checked it from here https://codewithgowtham.blogspot.com/2022/02/spark-streaming-...
evirac's user avatar
  • 63
0 votes
0 answers
120 views

Problem integrating kafka and spark streaming no messages received in spark streaming

My spark streaming context is successfully subscribed to my kafka topic where my tweets are streamed using my twitter producer.But no messages is being streamed from topic in my spark streaming! Here ...
bigdata1800's user avatar
0 votes
0 answers
92 views

spark-streaming-kafka-0-10 does not support message handler

My use case is to print offset number , partition , topic for each record that has been read from kafka from a spark streaming application. currently my code to create discrete stream looks like this. ...
amarnath harish's user avatar
1 vote
0 answers
2k views

How to use environment variables in spark which deployed on cluster mode?

When I set environment variable using Intellij below code works, but when i deploy code with spark-submit it does not work since environment variables are not exits on entire cluster. import com....
Enes Uğuroğlu's user avatar
0 votes
1 answer
412 views

Spark Stucture streaming processing already processed record on failure

I am stuck with very weird issue in spark structure streaming. Whenever I am shutting down the stream and restart again it again process already processed record. I tried to use spark.conf.set("...
Deepak's user avatar
  • 41
0 votes
0 answers
227 views

Spark streaming provides 2 kind of streams when integrating with kafka 1) Receiver Based 2) Direct What kind of steam structured streaming uses

Spark streaming provides 2 kind of streams when integrating with kafka Receiver Based Direct What kind of stream structured streaming uses when we do spark.readstream.format("kafka")?
Abhinav Kumar's user avatar
1 vote
0 answers
180 views

Spark task failure with ClassCastException [C cannot be cast to [J, at org.apache.spark.unsafe.memory.HeapMemoryAllocator.allocate

We have a java spark streaming application which does scd2 operation on deltalake. We were using spark 3.0.0 and delta lake 0.7.0 after upgrading to Spark 3.2.0 and delta 1.1.0, we can see the ...
Arun Benoy's user avatar
1 vote
1 answer
176 views

Apache Spark with kafka stream - Missing Kafka

I have trying to setup the Apache Spark with kafka and wrote simple program in local and its failing and not able figure out from debug. build.gradle.kts implementation ("org.jetbrains.kotlin:...
BaskarNatarajan's user avatar
1 vote
0 answers
106 views

Write Spark Stream to Phoenix table

I am trying to figure out how to write a Spark Stream to a Phoenix table in the least convoluted way. So far I have only found this solution: kafka-to-phoenix, which requires some deep ad-hoc ...
cyberZamp's user avatar
  • 195
0 votes
1 answer
253 views

Spark Structured Streaming from Kafka to Elastic Search

I want to write a Spark Streaming Job from Kafka to Elasticsearch. Here I want to detect the schema dynamically while reading it from Kafka. Can you help me to do that.? I know, this can be done in ...
Siva Samraj's user avatar
1 vote
1 answer
1k views

foreach() method with Spark Streaming errors

I'm trying to write data pulled from a Kafka to a Bigquery table every 120 seconds. I would like to do some additional operations which by documentation should be possible inside the .foreach() or ...
nonoDa's user avatar
  • 453
0 votes
1 answer
5k views

Available options for a source/destination format of Spark structured streaming

When we use DataStreamReader API for a format in Spark, we specify options for the format used using option/options method. For example, In the below code, I'm using Kafka as the source and passing ...
Scarface's user avatar
  • 407
1 vote
1 answer
773 views

Spark and Kafka: how to increase parallelism for producer sending large batch of records improving network usage?

I am diving to understand how can I send(produce) a large batch of records to a Kafka Topic from Spark. From the docs I can see that there is an attempt to use the same producer across tasks in the ...
YFl's user avatar
  • 1,299
1 vote
1 answer
655 views

Can I use Airflow to start/stop spark streaming job [closed]

I have two type of job: Spark Batch jobs and and Spark streaming jobs. I would like to schedule and manage them both with airflow. Airflow is using for job has stop. But I want to use it for my ...
Hiệp's user avatar
  • 37
0 votes
1 answer
1k views

How do I get the data of one row of a Structured Streaming Dataframe in pyspark?

I have a Kafka broker with a topic connected to Spark Structured Streaming. My topic sends data to my streaming dataframe, and I'd like to get information on each row for this topic (because I need to ...
Donsitoz's user avatar
0 votes
1 answer
756 views

How to merge multiple datatypes of an union type in avro schema to show one data type in the value field instead of member0 member1

I have the following avro schema { "name": "MyClass", "type": "record", "namespace": "com.acme.avro", "fields": [ { ...
Beluga's user avatar
  • 63
0 votes
0 answers
224 views

How to configure thread count on Spark Driver node?

We are running spark streaming job in stand-alone cluster mode with deploy mode as the client. This streaming job polls messages from kafka topic periodically, and the logs generated at the driver ...
Anoop Deshpande's user avatar

1
2 3 4 5 6