All Questions
Tagged with amazon-emr apache-spark-sql
215 questions
1
vote
0
answers
57
views
Spark Shuffle FetchFailedException for large dataset in emr
My job takes a input data of 400 TB parquet dataset from s3. This job runs with 250 r716x large (each having 64 vCore, 488 GiB memory). The job fails with below error org.apache.spark.shuffle....
5
votes
0
answers
100
views
Spark Repartition/shuffle optimization
I am trying to repartiton before applying any transformation logic. This takes a lot of time. Here is code and snapshot of UI below. Any optimization can be applied here?.
Cluster: AWS EMR,200 Task ...
1
vote
0
answers
108
views
Spark EMR Shuffle Read Fetch Wait Time is in 4hrs
One of my spark job failed due emr-spark-shuffle-fetchfailedexception-with-65tb-data-with-aqe-enabled has high Shuffle Read Fetch Wait Time. is there any way it can be improved.
Spark-submit
spark-...
0
votes
2
answers
135
views
What does retry in SparkUI means?
I have spark executed in two different instances:
spark.sql.adaptive.coalescePartitions.enabled=false
spark.sql.adaptive.coalescePartitions.enabled=true
In the first instance, the stage graphs have ...
0
votes
0
answers
35
views
ClassCastException in Spark SQL Incremental Load with DBT
I'm encountering a ClassCastException error when running an incremental load using DBT and Spark SQL. The error message indicates an issue with casting in the Spark execution plan:
org.apache.hive....
1
vote
1
answer
83
views
Spark emr jobs: Is the number of task defined by AQE (adaptive.enabled)?
I see the number of task in spark job is only 1000 after initial read, where as number of cores available is 9000 (1800 executors*5 core each). I have enabled aqe and coalesce shuffle partition. In ...
0
votes
1
answer
209
views
Spark Driver going OOM
For certain DataFrames, applying the withColumn method in Spark 3.4.1 causes the driver to choke and run out of memory. However, the same DataFrames are processed successfully in Spark 2.4.0.
Heap ...
0
votes
0
answers
42
views
Spark SQL - performance degradation after adding a new column
My code is in Scala and I'm using Spark SQL syntax to make a union between 3 dataframes of data.
Currently I am working on adding a new field. It's applicable only for one of the dataframes, so the ...
0
votes
2
answers
800
views
400 Bad Request error when trying to write to S3 from an EMR 7.0.0 cluster
I have an Spark application working flawlessly using emr-5.29.0 and Spark 2.4.4. This app writes to S3 using Spark SQL like this
df
.repartition($"year", $"month", $"day&...
0
votes
0
answers
132
views
Is repartition(num) faster than repartition(num, col)?
Shouldn't repartition(num_of_partitions_wanted), be faster than repartition(num_of_partitions_wanted, 'id')? Wondering about other strategies to get even partitions in write outs to parquet in EMR.
My ...
0
votes
0
answers
41
views
How can we remove unwanted data from Databricks cluster after filtering required data?
I have a folder in S3 and the total size of data with in the folder is more then 80 GB, I am taking this as my input in my spark sql but I am using only a few column from the imported folder data. In ...
0
votes
0
answers
101
views
Pyspark structured streaming - data from previous record
I have a use case where I have a stream of records with gps coordinates.
Schema:
latitude: Float
longitude: Float
I want to use pyspark to calculate the distance between my current record and my ...
1
vote
1
answer
115
views
Pyspark gives different df.count() result on every other run
I am using AWS EMR (v 6.11.1), PySpark (v 3.3.2). After some transformations (mainly after groupBy, dropDuplicates) on data getting different values in output of pyspark.sql.DataFrame.count() on every ...
0
votes
1
answer
98
views
Multiple CSV Scan in spark structured streaming for same file in s3
I have a spark 3.4.1 structured streaming job which uses custom s3-sqs connector by AWS which reads messages from sqs, reads the provided path in SQS Message and then read from S3. Now I need to split ...
0
votes
1
answer
379
views
AWS EMR YARN does not allocate all requested executors
The situation:
1x Primary node:
4-cores
8GiB memory
2x core nodes:
16-cores
-64GiB memory
I tried to request 6 executors, each running 5 cores, so that my application would utilise 30 cores total. ...
1
vote
1
answer
1k
views
pyspark & iceberg: `update *` not working in `merge into`?
I run pyspark on AWS EMR studio
MERGE INTO iceberg_catalog.staging.tbl AS t
USING tempview AS s
ON t.number = s.number
WHEN MATCHED THEN UPDATE *
WHEN NOT MATCHED THEN INSERT *
but got
pyspark.sql....
0
votes
1
answer
343
views
How to read .csv and .csv.gz file with all executors in EMR cluster
I am using aws emr service (emr 6.9.0) with one core node and 8 executors of instance type m5.2xlarge. My requirement is to execute read csv or csv.gz file through a spark scala job and write it into ...
0
votes
1
answer
2k
views
Executors not seem to be created or scaling up on Spark Application on AWS EMR Serverless
I would appreciate your help with my problem.
I'm running a spark application on AWS EMR serverless with emr 6.11 release.
I'm using Spark 3.3.2 with java 17, with configuration: maximum recourses of ...
0
votes
0
answers
256
views
Spark Scala job in AWS EMR fails randomly with the error org.xml.sax.SAXParseException; Premature end of file
I have a Spark(2.4.6) Scala job running in AWS EMR(emr-5.31.0) that fails randomly with the error org.xml.sax.SAXParseException; Premature end of file. The job consistently overwrites parquet files in ...
0
votes
1
answer
313
views
Does saving Spark DF into s3 path writes data to EBS volume first?
I am curious to know what happens behind the scenes when writing Spark DF as a Parquet file on S3 location. Does it first stores it locally on the local file system(EBS in our case) and then pushes ...
0
votes
0
answers
445
views
Implementing exponential backoff in spark between consecutive attempts
On our AWS EMR spark cluster we have 100-150 jobs running at a given time. Each cluster-task-node is allocated 7 of 8 cores. But at some hour in a day the load average touches 10 --causing task ...
0
votes
1
answer
479
views
How can I reuse spark SQL view/table across multiple AWS EMR steps?
I am submitting multiple steps (concurrency - 1) to AWS EMR cluster by command - 'spark-submit --deploy-mode client --master yarn <>' one after other.
In first step I'm reading for S3 and ...
0
votes
0
answers
112
views
How to solve an OutOfMemory exception when I load a big number of JSON files in Spark using and HDFS source
The problem:
I have a hdfs source hdfs:///data that contains 500 GB of JSON files.
My executor node memory limit is 64 GB, my Spark version is 3.3.0, and I am running on AWS EMR 6.8.
val df = spark....
3
votes
0
answers
544
views
Why are raw strings not supported in Spark SQL when running in EMR?
In Pyspark, using a Spark SQL function such as regexp_extract or regexp_replace, raw strings (string literals prefixed with r) are supported when running locally, but not when running on EMR.
A simple ...
1
vote
0
answers
2k
views
spark sql unable to read delta lake table
has anyone ran into this error while reading a delta lake table using spark.sql??
I am running spark on an EMR and using S3 to store my tables. I am able to create the DB and tables to read from my S3 ...
1
vote
1
answer
1k
views
How to make this pyspark udf faster?
I am executing this udf through pyspark on EMR and using spark 3.0.1 with yarn manager. How can I make this UDF faster?
I am using this external parser zipcodes. matching takes more time.
@udf(...
0
votes
0
answers
370
views
Pyspark job stuck at writing a job to s3 bucket
I am running pyspark on EMR, and my cluster is getting stuck at below screen shot while writing data into S3 bucket. I am able to see a temp folder in the bucket, and is taking 30 min without any ...
0
votes
0
answers
34
views
RDD collect OOM issues
I am trying to parse the input rdd into map with broadcast but running into memory issues on EMR. Below is the code
val data = sparkContext.textFile("inputPath")
val result = sparkContext....
1
vote
1
answer
492
views
Task has long Schedule Delay on Spark UI and it failed because of GC overhead limit exceeded
I'm running a Spark application which load 1 million rows of data from a database into 201 partitions and then write as parquet files. The application code may look like:
dataframe = spark.read.format(...
1
vote
0
answers
324
views
Use Glue Catalog for Spark On EMR with Ranger plugin
I'm trying to apply Apache Ranger integrated EMR.
So I installed Ranger Server (Admin Server) and EMR Plug-in for EMR-FS and EMR-SPARK.
EMR-FS works fine. However, it doesn't seem to work as intended ...
3
votes
0
answers
855
views
Error : Requested role is not associated to cluster ,when trying to read redshift table from pyspark in emr
Trying to read redshift table from pyspark in emr, getting “requested role is not associated to cluster error”
Role is already attached to redshift
df.count() is working, df.show() throws above ...
1
vote
1
answer
1k
views
regexp extract pyspark sql: ParseException Literals of type 'R' are currently not supported
I'm using Pyspark SQL with regexp_extract in this way:
df = spark.createDataFrame([['id_20_30', 10], ['id_40_50', 30]], ['id', 'age'])
df.createOrReplaceTempView("table")
sql_statement="...
-1
votes
1
answer
579
views
Spark is inconsistent with unusually encoded CSV file
Context:
As part of data pipeline, I am working on some flat CSV files
Those files have unusual encoding and escaping rules
My intention is too preprocess those and convert to parquets for subsequent ...
0
votes
0
answers
38
views
Running more than one spark applications in cluster, all spark applications are not running optimally as some are getting completed sooner
I am running 20 spark applications on an emr cluster of 2 workers and 1 master node with c5.24xlarge instances, thus I have 192 cores in total and 1024 gb ram in total.
Each application is processing ...
0
votes
1
answer
319
views
Is there an optimal way for writing lots of tiny files with PySpark?
I have a job that requires having to write a single JSON file to s3 for each row in a Spark dataframe (which then gets picked up by another process).
df.repartition(col("id")).write.mode(&...
0
votes
1
answer
307
views
emr-dynamodb-connector control/throttle the writes to DDB (Throughput exceeds the current throughput limit for your account.)
I am trying to write 1 TB / 30 million documents to DDB table.
DDB table is set for On-demand capacity.
For that i am using emr-dynamodb-connector by running spark job on EMR cluster. Code looks like ...
0
votes
1
answer
163
views
emr-dynamodb-connector don't save if primary key is present in dynamodb
We are using Spark job with emr-dynamodb-connector to load the data from S3 files into Dyanamodb.
https://github.com/awslabs/emr-dynamodb-connector
But if document is already present in dynamodb, my ...
1
vote
0
answers
1k
views
Scala - java.lang.IllegalArgumentException: requirement failed: All input types must be the same except
Here I'm trying to add date to the data frame(232) but I'm getting exception on line number 234:
232 - val df5Final = df5Phone.withColumn(colName, regexp_replace(col(colName), date_pattern, "...
1
vote
1
answer
1k
views
EMR 6.x cross-account access to Glue catalog
We're using SparkSQL on EMR version 6.2.0. To run the SparkSQL scripts, we're using Zeppelin notebooks on the EMR. We've been required to access Glue catalog cross-account (both read and write).
I ...
0
votes
0
answers
174
views
Spark Optimisation Not working as expected
I am trying to run a different set of SQL operations in pySpark. However, the optimization is not happening as expected.
Ideally, spark optimizes the whole execution plan before doing the actual thing....
1
vote
1
answer
687
views
Best way to automate AWS EMR Creation,termination and pyspark jobs
I have pyspark code in S3 and I will execute them and write test cases in Pyspark and load it to the snowflake. My job runs for 1 min on a daily basis and also need log if it has failed
I am new to ...
1
vote
0
answers
369
views
pySpark read from multiple source and load into multiple tables
I have Archiecture as below
Source 1 --> EMR (Spark transformation) --> into Snowflake Tables A,B,C
Source 2 --> EMR (Spark transformation) --> into Snowflake Tables D,E,F
Source 3 --&...
2
votes
1
answer
3k
views
Spark SQL queries against Delta Lake Tables using Symlink Format Manifest
I'm running spark 3.1.1 and an AWS emr-6.3.0 cluster with the following hive/metastore configurations:
spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
spark.delta....
3
votes
0
answers
254
views
pySpark indexing dataframe: No space left on the device
The source table has many columns namely
The table is partitioned on year,year_month and tran_datetime
table size : 590000000
each partition has roughly 50000000 rows
the data is being written to S3.
...
0
votes
1
answer
2k
views
How to fix 'Failed to convert the JSON string 'varchar(2)' to a data type.'
We want to move from spark 3.0.1 to 3.1.2. According to migration guide varchar data types are now supported in table schema. Unfortunately data onboarded with new version cant be queried by old spark ...
1
vote
0
answers
858
views
Connect Glue catalog as Spark SQL metastore on EMR
We are trying to connect to the Glue catalog using spark on EMR 5.30.2 and applications: Spark 2.4.5.
I am able to connect to it with a small program.
import org.apache.spark.sql.SparkSession;
...
0
votes
1
answer
736
views
how can I force Spark executor to spawn more threads per task?
I am running a cluster of EMR Spark with this setup:
Master: 1 of m5.xlarge
Core: 4 of m5.xlarge
spark.executor.instances 4
spark.executor.cores 4
spark.driver.memory ...
3
votes
0
answers
2k
views
Spark buffer holder size limit issue
I am doing the aggregate function on column level like
df.groupby("a").agg(collect_set(b))
The column value is increasing beyond default size of 2gb.
Error details:
Spark job fails with an ...
-1
votes
1
answer
87
views
Best practice to read data from EMR to physical server
I am using pyspark to read data from EMR. But if the EMR cluster is fully occupied, I can see on the cluster manager that all the memories are occupied by some ETL job, still can I run this script on ...
0
votes
0
answers
139
views
Spark Job taking long time to append data to S3
I'm running spark job on EMR and trying to convert large zipped file (15gb) to parquet but it is taking too long to write to S3.
I'm using r5 instance for master (1 instance) and core (3 instances).
...