Skip to main content

All Questions

Filter by
Sorted by
Tagged with
1 vote
0 answers
57 views

Spark Shuffle FetchFailedException for large dataset in emr

My job takes a input data of 400 TB parquet dataset from s3. This job runs with 250 r716x large (each having 64 vCore, 488 GiB memory). The job fails with below error org.apache.spark.shuffle....
user3858193's user avatar
  • 1,518
5 votes
0 answers
100 views

Spark Repartition/shuffle optimization

I am trying to repartiton before applying any transformation logic. This takes a lot of time. Here is code and snapshot of UI below. Any optimization can be applied here?. Cluster: AWS EMR,200 Task ...
user3858193's user avatar
  • 1,518
1 vote
0 answers
108 views

Spark EMR Shuffle Read Fetch Wait Time is in 4hrs

One of my spark job failed due emr-spark-shuffle-fetchfailedexception-with-65tb-data-with-aqe-enabled has high Shuffle Read Fetch Wait Time. is there any way it can be improved. Spark-submit spark-...
user3858193's user avatar
  • 1,518
0 votes
2 answers
135 views

What does retry in SparkUI means?

I have spark executed in two different instances: spark.sql.adaptive.coalescePartitions.enabled=false spark.sql.adaptive.coalescePartitions.enabled=true In the first instance, the stage graphs have ...
user3858193's user avatar
  • 1,518
0 votes
0 answers
35 views

ClassCastException in Spark SQL Incremental Load with DBT

I'm encountering a ClassCastException error when running an incremental load using DBT and Spark SQL. The error message indicates an issue with casting in the Spark execution plan: org.apache.hive....
Raul Zinezi's user avatar
1 vote
1 answer
83 views

Spark emr jobs: Is the number of task defined by AQE (adaptive.enabled)?

I see the number of task in spark job is only 1000 after initial read, where as number of cores available is 9000 (1800 executors*5 core each). I have enabled aqe and coalesce shuffle partition. In ...
user3858193's user avatar
  • 1,518
0 votes
1 answer
209 views

Spark Driver going OOM

For certain DataFrames, applying the withColumn method in Spark 3.4.1 causes the driver to choke and run out of memory. However, the same DataFrames are processed successfully in Spark 2.4.0. Heap ...
Gaurav Madan's user avatar
0 votes
0 answers
42 views

Spark SQL - performance degradation after adding a new column

My code is in Scala and I'm using Spark SQL syntax to make a union between 3 dataframes of data. Currently I am working on adding a new field. It's applicable only for one of the dataframes, so the ...
nelyanne's user avatar
  • 106
0 votes
2 answers
800 views

400 Bad Request error when trying to write to S3 from an EMR 7.0.0 cluster

I have an Spark application working flawlessly using emr-5.29.0 and Spark 2.4.4. This app writes to S3 using Spark SQL like this df .repartition($"year", $"month", $"day&...
beikern's user avatar
  • 33
0 votes
0 answers
132 views

Is repartition(num) faster than repartition(num, col)?

Shouldn't repartition(num_of_partitions_wanted), be faster than repartition(num_of_partitions_wanted, 'id')? Wondering about other strategies to get even partitions in write outs to parquet in EMR. My ...
Eugene Choi's user avatar
0 votes
0 answers
41 views

How can we remove unwanted data from Databricks cluster after filtering required data?

I have a folder in S3 and the total size of data with in the folder is more then 80 GB, I am taking this as my input in my spark sql but I am using only a few column from the imported folder data. In ...
Asif Khan's user avatar
  • 171
0 votes
0 answers
101 views

Pyspark structured streaming - data from previous record

I have a use case where I have a stream of records with gps coordinates. Schema: latitude: Float longitude: Float I want to use pyspark to calculate the distance between my current record and my ...
David Cunningham's user avatar
1 vote
1 answer
115 views

Pyspark gives different df.count() result on every other run

I am using AWS EMR (v 6.11.1), PySpark (v 3.3.2). After some transformations (mainly after groupBy, dropDuplicates) on data getting different values in output of pyspark.sql.DataFrame.count() on every ...
Atul G's user avatar
  • 11
0 votes
1 answer
98 views

Multiple CSV Scan in spark structured streaming for same file in s3

I have a spark 3.4.1 structured streaming job which uses custom s3-sqs connector by AWS which reads messages from sqs, reads the provided path in SQS Message and then read from S3. Now I need to split ...
Umang Desai's user avatar
0 votes
1 answer
379 views

AWS EMR YARN does not allocate all requested executors

The situation: 1x Primary node: 4-cores 8GiB memory 2x core nodes: 16-cores -64GiB memory I tried to request 6 executors, each running 5 cores, so that my application would utilise 30 cores total. ...
William's user avatar
1 vote
1 answer
1k views

pyspark & iceberg: `update *` not working in `merge into`?

I run pyspark on AWS EMR studio MERGE INTO iceberg_catalog.staging.tbl AS t USING tempview AS s ON t.number = s.number WHEN MATCHED THEN UPDATE * WHEN NOT MATCHED THEN INSERT * but got pyspark.sql....
BAE's user avatar
  • 8,924
0 votes
1 answer
343 views

How to read .csv and .csv.gz file with all executors in EMR cluster

I am using aws emr service (emr 6.9.0) with one core node and 8 executors of instance type m5.2xlarge. My requirement is to execute read csv or csv.gz file through a spark scala job and write it into ...
Shan's user avatar
  • 31
0 votes
1 answer
2k views

Executors not seem to be created or scaling up on Spark Application on AWS EMR Serverless

I would appreciate your help with my problem. I'm running a spark application on AWS EMR serverless with emr 6.11 release. I'm using Spark 3.3.2 with java 17, with configuration: maximum recourses of ...
Shai Barak's user avatar
0 votes
0 answers
256 views

Spark Scala job in AWS EMR fails randomly with the error org.xml.sax.SAXParseException; Premature end of file

I have a Spark(2.4.6) Scala job running in AWS EMR(emr-5.31.0) that fails randomly with the error org.xml.sax.SAXParseException; Premature end of file. The job consistently overwrites parquet files in ...
sgallagher's user avatar
0 votes
1 answer
313 views

Does saving Spark DF into s3 path writes data to EBS volume first?

I am curious to know what happens behind the scenes when writing Spark DF as a Parquet file on S3 location. Does it first stores it locally on the local file system(EBS in our case) and then pushes ...
Venkatesan Muniappan's user avatar
0 votes
0 answers
445 views

Implementing exponential backoff in spark between consecutive attempts

On our AWS EMR spark cluster we have 100-150 jobs running at a given time. Each cluster-task-node is allocated 7 of 8 cores. But at some hour in a day the load average touches 10 --causing task ...
chendu's user avatar
  • 759
0 votes
1 answer
479 views

How can I reuse spark SQL view/table across multiple AWS EMR steps?

I am submitting multiple steps (concurrency - 1) to AWS EMR cluster by command - 'spark-submit --deploy-mode client --master yarn <>' one after other. In first step I'm reading for S3 and ...
Rajat Goel's user avatar
  • 2,305
0 votes
0 answers
112 views

How to solve an OutOfMemory exception when I load a big number of JSON files in Spark using and HDFS source

The problem: I have a hdfs source hdfs:///data that contains 500 GB of JSON files. My executor node memory limit is 64 GB, my Spark version is 3.3.0, and I am running on AWS EMR 6.8. val df = spark....
Raphael Mansuy's user avatar
3 votes
0 answers
544 views

Why are raw strings not supported in Spark SQL when running in EMR?

In Pyspark, using a Spark SQL function such as regexp_extract or regexp_replace, raw strings (string literals prefixed with r) are supported when running locally, but not when running on EMR. A simple ...
danielcahall's user avatar
  • 2,742
1 vote
0 answers
2k views

spark sql unable to read delta lake table

has anyone ran into this error while reading a delta lake table using spark.sql?? I am running spark on an EMR and using S3 to store my tables. I am able to create the DB and tables to read from my S3 ...
rodrigo sejas Jaldin's user avatar
1 vote
1 answer
1k views

How to make this pyspark udf faster?

I am executing this udf through pyspark on EMR and using spark 3.0.1 with yarn manager. How can I make this UDF faster? I am using this external parser zipcodes. matching takes more time. @udf(...
Xi12's user avatar
  • 1,191
0 votes
0 answers
370 views

Pyspark job stuck at writing a job to s3 bucket

I am running pyspark on EMR, and my cluster is getting stuck at below screen shot while writing data into S3 bucket. I am able to see a temp folder in the bucket, and is taking 30 min without any ...
Xi12's user avatar
  • 1,191
0 votes
0 answers
34 views

RDD collect OOM issues

I am trying to parse the input rdd into map with broadcast but running into memory issues on EMR. Below is the code val data = sparkContext.textFile("inputPath") val result = sparkContext....
user0712's user avatar
1 vote
1 answer
492 views

Task has long Schedule Delay on Spark UI and it failed because of GC overhead limit exceeded

I'm running a Spark application which load 1 million rows of data from a database into 201 partitions and then write as parquet files. The application code may look like: dataframe = spark.read.format(...
Rinze's user avatar
  • 852
1 vote
0 answers
324 views

Use Glue Catalog for Spark On EMR with Ranger plugin

I'm trying to apply Apache Ranger integrated EMR. So I installed Ranger Server (Admin Server) and EMR Plug-in for EMR-FS and EMR-SPARK. EMR-FS works fine. However, it doesn't seem to work as intended ...
spaaaaark's user avatar
3 votes
0 answers
855 views

Error : Requested role is not associated to cluster ,when trying to read redshift table from pyspark in emr

Trying to read redshift table from pyspark in emr, getting “requested role is not associated to cluster error” Role is already attached to redshift df.count() is working, df.show() throws above ...
remeezraja abdulrahman's user avatar
1 vote
1 answer
1k views

regexp extract pyspark sql: ParseException Literals of type 'R' are currently not supported

I'm using Pyspark SQL with regexp_extract in this way: df = spark.createDataFrame([['id_20_30', 10], ['id_40_50', 30]], ['id', 'age']) df.createOrReplaceTempView("table") sql_statement="...
solopiu's user avatar
  • 756
-1 votes
1 answer
579 views

Spark is inconsistent with unusually encoded CSV file

Context: As part of data pipeline, I am working on some flat CSV files Those files have unusual encoding and escaping rules My intention is too preprocess those and convert to parquets for subsequent ...
Łukasz Rogalski's user avatar
0 votes
0 answers
38 views

Running more than one spark applications in cluster, all spark applications are not running optimally as some are getting completed sooner

I am running 20 spark applications on an emr cluster of 2 workers and 1 master node with c5.24xlarge instances, thus I have 192 cores in total and 1024 gb ram in total. Each application is processing ...
sriparth's user avatar
0 votes
1 answer
319 views

Is there an optimal way for writing lots of tiny files with PySpark?

I have a job that requires having to write a single JSON file to s3 for each row in a Spark dataframe (which then gets picked up by another process). df.repartition(col("id")).write.mode(&...
Jaqq Darude's user avatar
0 votes
1 answer
307 views

emr-dynamodb-connector control/throttle the writes to DDB (Throughput exceeds the current throughput limit for your account.)

I am trying to write 1 TB / 30 million documents to DDB table. DDB table is set for On-demand capacity. For that i am using emr-dynamodb-connector by running spark job on EMR cluster. Code looks like ...
Sunny Gupta's user avatar
0 votes
1 answer
163 views

emr-dynamodb-connector don't save if primary key is present in dynamodb

We are using Spark job with emr-dynamodb-connector to load the data from S3 files into Dyanamodb. https://github.com/awslabs/emr-dynamodb-connector But if document is already present in dynamodb, my ...
Sunny Gupta's user avatar
1 vote
0 answers
1k views

Scala - java.lang.IllegalArgumentException: requirement failed: All input types must be the same except

Here I'm trying to add date to the data frame(232) but I'm getting exception on line number 234: 232 - val df5Final = df5Phone.withColumn(colName, regexp_replace(col(colName), date_pattern, "...
mohit's user avatar
  • 13
1 vote
1 answer
1k views

EMR 6.x cross-account access to Glue catalog

We're using SparkSQL on EMR version 6.2.0. To run the SparkSQL scripts, we're using Zeppelin notebooks on the EMR. We've been required to access Glue catalog cross-account (both read and write). I ...
Miky Schreiber's user avatar
0 votes
0 answers
174 views

Spark Optimisation Not working as expected

I am trying to run a different set of SQL operations in pySpark. However, the optimization is not happening as expected. Ideally, spark optimizes the whole execution plan before doing the actual thing....
fried_fish's user avatar
1 vote
1 answer
687 views

Best way to automate AWS EMR Creation,termination and pyspark jobs

I have pyspark code in S3 and I will execute them and write test cases in Pyspark and load it to the snowflake. My job runs for 1 min on a daily basis and also need log if it has failed I am new to ...
Xi12's user avatar
  • 1,191
1 vote
0 answers
369 views

pySpark read from multiple source and load into multiple tables

I have Archiecture as below Source 1 --> EMR (Spark transformation) --> into Snowflake Tables A,B,C Source 2 --> EMR (Spark transformation) --> into Snowflake Tables D,E,F Source 3 --&...
Xi12's user avatar
  • 1,191
2 votes
1 answer
3k views

Spark SQL queries against Delta Lake Tables using Symlink Format Manifest

I'm running spark 3.1.1 and an AWS emr-6.3.0 cluster with the following hive/metastore configurations: spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog spark.delta....
nate's user avatar
  • 81
3 votes
0 answers
254 views

pySpark indexing dataframe: No space left on the device

The source table has many columns namely The table is partitioned on year,year_month and tran_datetime table size : 590000000 each partition has roughly 50000000 rows the data is being written to S3. ...
Andy_101's user avatar
  • 1,306
0 votes
1 answer
2k views

How to fix 'Failed to convert the JSON string 'varchar(2)' to a data type.'

We want to move from spark 3.0.1 to 3.1.2. According to migration guide varchar data types are now supported in table schema. Unfortunately data onboarded with new version cant be queried by old spark ...
user3813314's user avatar
1 vote
0 answers
858 views

Connect Glue catalog as Spark SQL metastore on EMR

We are trying to connect to the Glue catalog using spark on EMR 5.30.2 and applications: Spark 2.4.5. I am able to connect to it with a small program. import org.apache.spark.sql.SparkSession; ...
Ashish Mishra's user avatar
0 votes
1 answer
736 views

how can I force Spark executor to spawn more threads per task?

I am running a cluster of EMR Spark with this setup: Master: 1 of m5.xlarge Core: 4 of m5.xlarge spark.executor.instances 4 spark.executor.cores 4 spark.driver.memory ...
pure's user avatar
  • 1,135
3 votes
0 answers
2k views

Spark buffer holder size limit issue

I am doing the aggregate function on column level like df.groupby("a").agg(collect_set(b)) The column value is increasing beyond default size of 2gb. Error details: Spark job fails with an ...
lsha's user avatar
  • 31
-1 votes
1 answer
87 views

Best practice to read data from EMR to physical server

I am using pyspark to read data from EMR. But if the EMR cluster is fully occupied, I can see on the cluster manager that all the memories are occupied by some ETL job, still can I run this script on ...
dipan arya's user avatar
0 votes
0 answers
139 views

Spark Job taking long time to append data to S3

I'm running spark job on EMR and trying to convert large zipped file (15gb) to parquet but it is taking too long to write to S3. I'm using r5 instance for master (1 instance) and core (3 instances). ...
John's user avatar
  • 17

1
2 3 4 5