258 questions
0
votes
0
answers
21
views
Kafka message processed by multiple instances of the same application all in the same consumer group
I am processing kafka messages in spark streaming. My topic has 2 partitions and one consumer group. Error condition lead to 3/4 or more instances of the same application running, all in the same ...
0
votes
0
answers
51
views
spark on EMR error when using `foreachBatch`: "terminated with exception: Error while obtaining a new communication channel"
I use spark on EMR with versions: emr-6.13.0, Spark 3.4.1
i try to run a simple spark streaming job that read from kafka and write to memory-table using foreachBatch and get failure "Error while ...
0
votes
0
answers
120
views
Unable to sync non-partitioned Hudi table with BigQuery
I'm trying to to write my structured streaming data to Apache Hudi in a non-partitioned table and then sync it with BigQuery. But even though it is a new table and I've set no partitioning ...
1
vote
2
answers
94
views
Issue with Multiple Spark Structured Streaming Jobs Consuming Same Kafka Topic
I have two separate Python scripts (job1.py and job2.py) that use Spark Structured Streaming to consume data from the Kafka topic test1. Both scripts are configured with the same Kafka consumer group (...
0
votes
0
answers
74
views
How to get starting offset and ending offset of a kafka batch in Spark Streaming (Java)
In spark streaming (using Java), I am trying to print the start offset and and last offset of a specific batch taken from Kafka along with the batch related metrics (like processingStartTime, ...
0
votes
0
answers
31
views
Spark streaming with Kafka: ERROR in fold checkpoint Multiple streaming queries
I am a beginner in Apache Spark. I am researching the problem of restarting a job when an error occurs when using Spark streaming with Kafka.
I tried deleting the latest file in the commit folder and ...
0
votes
0
answers
25
views
Spark stream to stream join failing with error java.lang.IllegalStateException: Error reading delta file
I am reading two streams as dataframe from Kafka, joining them and trying to process the joined DF as below
private def readStream(spark: SparkSession, topicName: String, schema: StructType): ...
0
votes
0
answers
114
views
How to correctly parse data format Kafka from Azure EventHub
I am new into Kafka and I am trying to read Stream events coming from an Azure EventHub using python but I am not able to correctly extract or parse the data.
kafka_options = {
# Port 9093 is the ...
0
votes
0
answers
37
views
java.lang.NoClassDefFoundError: Could not initialize class kafka.utils.Json$
I'm trying to use Spark Streaming with Kafka in a Java application, and I'm encountering issues when using spark-submit with Spark 2.11.0. I've set up my dependencies in Maven as follows:
<...
1
vote
0
answers
126
views
Spark foreachBatch concurrency and time-based triggers
Does foreachBatch method is guaranteed to run a callback "exclusively", or it will be started asynchronously? I'm just working on a custom sink with relying on consecutive, non-concurrent ...
0
votes
0
answers
100
views
Buffer Overflow Exception in Spark Structured Streaming with Kafka
Getting the below error while streaming data from Kafka using spark structured streaming.
java.lang.IllegalStateException: Buffer overflow when available data size(16384)>=application buffer size (...
0
votes
1
answer
466
views
Periodic processing time spikes in spark structured streaming
I am wondering why every 4th batch of my spark streaming application has a huge spike.
Some details
This is sateful processing using rocksdb state store
Reading from Kafka with 180 partitions
Writing ...
0
votes
0
answers
124
views
py4j.protocol.Py4JJavaError: An error occurred while calling o37.load. : java.lang.NoClassDefFoundError: scala/$less$colon$less
I am encountering an issue when trying to load data from Kafka using Apache Spark version 3.3.4 with Scala version 2.12.8. The error message I receive is as follows:
py4j.protocol.Py4JJavaError: An ...
0
votes
0
answers
45
views
java.lang.NoClassDefFoundError running example streaming.DirectKafkaWordCount
I am trying to run an example program DirectKafkaWordCount using this command:
bin/run-example --packages "org.apache.kafka:kafka-clients:3.6.1,org.apache.spark:spark-streaming-kafka_2.11:1.6. ...
0
votes
1
answer
244
views
Dynamic Filter with Spark Structured Streaming
I am working on my Spark Streaming project with the goal of creating a simple app to notify users when a data stream meets a condition (e.g., sending a notification when a stock price > x).
df = ...
0
votes
0
answers
101
views
pyspark streaming fill all millisecond
I'm new a pyspark streaming i have a data stream from kafka that i want to fill every 0.1 seconds the timestamp from kafka is irregular I'm trying to fill all timestamp that are not there with the ...
0
votes
0
answers
39
views
Kafka consumer fetches new messages only after restarting the consumer
I'm facing an issue with my kafka consumer job written in scala. when we start the consumer, it fetches all messages available in the broker from the last consumed offset, process those JSON messages ...
0
votes
1
answer
462
views
Disable Kafka Warnings for Spark Streaming Application
I use Spark Structured Streaming (pyspark) to read data from Kafka topic. It works well but when I open executors stderr my whole log page is WARN from Kafka saying that
kafkadataconsumer is not ...
0
votes
0
answers
1k
views
spark streaming kafka auth with ssl certificates
Faced with the problem of authentication in the kafka topic using SSL from spark-streaming.
I've got 3 ssl certs in pem format for authentication in the kafka topic:
ssl_cafile
ssl_certfile
...
0
votes
0
answers
134
views
Convert some specific columns that have 0 and 1 values in Kafka messages to False and True in PySpark
Requirement
We are consuming messages from Kafka using PySpark. In these JSON messages, there are some keys corresponding to which we have values such as 0 and 1.
Now the requirement here is to ...
1
vote
0
answers
673
views
Spark Streaming - NoSuchMethodError: scala.collection.immutable.Map$.apply
I have a simple Spark streaming java program:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache....
0
votes
1
answer
312
views
Async Checkpointing in Spark Structured Streaming using RocksDB
I am currently exploring on enabling async checkpointing in Spark Structured streaming , but not able to find any way for the same. DataBricks is offering the same for its flavour of Spark.
Spark ...
1
vote
0
answers
28
views
Spark Streaming query dependency
I have a use case where two Spark streaming queries are running and consuming data from the same topic.I want to have a guarantee that Query1 consumes and process the data from the topic before Query2 ...
0
votes
1
answer
866
views
Error-Queries with streaming sources must be executed with writeStream.start();; kafka
I was trying is to handle real-time data streaming of kafka using pyspark.
I got a table that get updated realtime . whenever there is a content on the table, i need to aggregate it and stream the ...
0
votes
0
answers
110
views
Distinct operation in spark structured streaming with a window operation
I want to implement a distinct operation in Spark structured code.
I have already watermarked it and windowed it, but still, Spark is not able to execute it.
FYI - Distinct comes under the list of ...
0
votes
1
answer
504
views
Not able to read data through kafka spark streaming in pyspark
I am working on creating a basic streaming app which reads streaming data from kafka and process the data. Below is the code I am trying in pyspark
spark = SparkSession.builder.appName("testing&...
1
vote
0
answers
47
views
sparkstreaming kafka cunsumer auto close,
I don't want to use one consumer for all topics, I want to use this method to improve consumption efficiency
val kafkaParams = Map(
ConsumerConfig.GROUP_ID_CONFIG -> group,
...
1
vote
0
answers
462
views
kafka stream with AWS Glue - authorization issue
I want to stream a kafka topic from Glue job but I got the following error
StreamingQueryException: Not authorized to access topics:
[topic_name]
This my current script
# Script generated for node ...
1
vote
0
answers
239
views
pyspark ml training ALS: No ratings available from MapPartitionsRDD
I'm trying to train ALS with data in each batch from kafka using spark streaming and facing with below error.
I think it's because the rating column is negative or something invalid like wrong data ...
2
votes
1
answer
1k
views
Spark not giving equal tasks to all executors
I am reading from kafka topic which has 5 partitions. Since 5 cores are not sufficient to handle the load, I am doing repartitioning the input to 30. I have given 30 cores to my spark process with 6 ...
1
vote
1
answer
789
views
Spark speculative tasks and its performance overhead
I am currently exploring spark's speculative tasks option.
Below are my configuration which I am planning to use.
I am reading the data from kafka and using repartition() I am creating around 200+ ...
0
votes
1
answer
728
views
Spark structured streaming job not processing stages and showing in hung state
I am running one streaming application and processing data from Kafka to Kafka using spark.
If i am using latest then its working as expected and running without any issue.
but in source we have done ...
0
votes
1
answer
2k
views
offset management in spark streaming
As far as i understand,for a spark streaming application(structured streaming or otherwise),to manually manage the offsets ,spark provides the feature of checkpointing where you just have to configure ...
1
vote
1
answer
300
views
kafkaUtils.createDirectStream gives an error
I changed a line from createStream to createDirectStream since the new library does not support createStream
I have checked it from here https://codewithgowtham.blogspot.com/2022/02/spark-streaming-...
0
votes
0
answers
120
views
Problem integrating kafka and spark streaming no messages received in spark streaming
My spark streaming context is successfully subscribed to my kafka topic where my tweets are streamed using my twitter producer.But no messages is being streamed from topic in my spark streaming!
Here ...
0
votes
0
answers
92
views
spark-streaming-kafka-0-10 does not support message handler
My use case is to print offset number , partition , topic for each record that has been read from kafka from a spark streaming application.
currently my code to create discrete stream looks like this.
...
1
vote
0
answers
2k
views
How to use environment variables in spark which deployed on cluster mode?
When I set environment variable using Intellij below code works, but when i deploy code with spark-submit it does not work since environment variables are not exits on entire cluster.
import com....
0
votes
1
answer
412
views
Spark Stucture streaming processing already processed record on failure
I am stuck with very weird issue in spark structure streaming. Whenever I am shutting down the stream and restart again it again process already processed record.
I tried to use spark.conf.set("...
0
votes
0
answers
227
views
Spark streaming provides 2 kind of streams when integrating with kafka 1) Receiver Based 2) Direct What kind of steam structured streaming uses
Spark streaming provides 2 kind of streams when integrating with kafka
Receiver Based
Direct
What kind of stream structured streaming uses when we do spark.readstream.format("kafka")?
1
vote
0
answers
180
views
Spark task failure with ClassCastException [C cannot be cast to [J, at org.apache.spark.unsafe.memory.HeapMemoryAllocator.allocate
We have a java spark streaming application which does scd2 operation on deltalake.
We were using spark 3.0.0 and delta lake 0.7.0 after upgrading to Spark 3.2.0 and delta 1.1.0, we can see the ...
1
vote
1
answer
176
views
Apache Spark with kafka stream - Missing Kafka
I have trying to setup the Apache Spark with kafka and wrote simple program in local and its failing and not able figure out from debug.
build.gradle.kts
implementation ("org.jetbrains.kotlin:...
1
vote
0
answers
106
views
Write Spark Stream to Phoenix table
I am trying to figure out how to write a Spark Stream to a Phoenix table in the least convoluted way.
So far I have only found this solution: kafka-to-phoenix, which requires some deep ad-hoc ...
0
votes
1
answer
253
views
Spark Structured Streaming from Kafka to Elastic Search
I want to write a Spark Streaming Job from Kafka to Elasticsearch. Here I want to detect the schema dynamically while reading it from Kafka.
Can you help me to do that.?
I know, this can be done in ...
1
vote
1
answer
1k
views
foreach() method with Spark Streaming errors
I'm trying to write data pulled from a Kafka to a Bigquery table every 120 seconds.
I would like to do some additional operations which by documentation should be possible inside the .foreach() or ...
0
votes
1
answer
5k
views
Available options for a source/destination format of Spark structured streaming
When we use DataStreamReader API for a format in Spark, we specify options for the format used using option/options method. For example, In the below code, I'm using Kafka as the source and passing ...
1
vote
1
answer
773
views
Spark and Kafka: how to increase parallelism for producer sending large batch of records improving network usage?
I am diving to understand how can I send(produce) a large batch of records to a Kafka Topic from Spark.
From the docs I can see that there is an attempt to use the same producer across tasks in the ...
1
vote
1
answer
655
views
Can I use Airflow to start/stop spark streaming job [closed]
I have two type of job: Spark Batch jobs and and Spark streaming jobs.
I would like to schedule and manage them both with airflow.
Airflow is using for job has stop. But I want to use it for my ...
0
votes
1
answer
1k
views
How do I get the data of one row of a Structured Streaming Dataframe in pyspark?
I have a Kafka broker with a topic connected to Spark Structured Streaming. My topic sends data to my streaming dataframe, and I'd like to get information on each row for this topic (because I need to ...
0
votes
1
answer
756
views
How to merge multiple datatypes of an union type in avro schema to show one data type in the value field instead of member0 member1
I have the following avro schema
{
"name": "MyClass",
"type": "record",
"namespace": "com.acme.avro",
"fields": [
{
...
0
votes
0
answers
224
views
How to configure thread count on Spark Driver node?
We are running spark streaming job in stand-alone cluster mode with deploy mode as the client. This streaming job polls messages from kafka topic periodically, and the logs generated at the driver ...