7.spark SQL EAE
7.spark SQL EAE
7.spark SQL EAE
it happen
Apache Spark
Spark SQL
eae.es
Spark SQL
Índice
01. Introducción
02. Conceptos básicos de Spark SQL
03. Carga de datos en Data Frames
04. Principales funciones Data Frames
05. Persistencia de Data Frames
06. Funciones avanzadas y UDF
07. Técnicas de Optimización
08. Modo SQL e integración con Apache Hive
09. Casos de uso
2
Spark SQL
1. Introducción
eae.es
3
Spark SQL
1. Introducción
4
Spark SQL
1. Introducción
Spark Spark
Spark Sql Graphx
Streaming Mlib
SPARK CORE
5
Spark SQL
1. Introducción
6
Spark SQL
eae.es
7
Spark SQL
Arquitectura
• SparkSession (incluye un SparkContext)
• Estructuras de datos
• Data Frames y Data Sets
• Almacenamiento columnar y distribuido
• Optimizador de consultas (Catalyst) y de generación de código
• DataFrames es un link DataSets de tipo Row.
8
Spark SQL
Arquitectura
9
Spark SQL
10
Spark SQL
2. Conceptos básicos de Spark
11
Spark SQL
12
Spark SQL
13
Spark SQL
Spark Session
• Cuando trabajamos con la API SQL tenemos que iniciar la sesión de
Spark SQL con un objeto Spark Session.
• Un Spark Session contiene siempre un Spark Context.
• Importar las librerias
• from pyspark.sql import SparkSession
• Creación del Spark Session
spark = SparkSession \
.builder \
.appName(‘MyApp') \
.master('local[*]') \
.getOrCreate()
14
Spark SQL
Spark Session
• Método .getOrCreate()
• Si existe una sesión previa podemos recuperarla.
• Si no existe se crea una nueva.
• Es recomendable cerrar la sesión:
• spark.stop()
15
Spark SQL
16
Spark SQL
17
Spark SQL
eae.es
18
Spark SQL
2. Carga de datos en un DF
Carga de datos
• A partir de un RDD previamente existente
• person = [('Matt','Paris'),('Gill','Tokio'),('Mark','Casablanca')]
• personDF = spark.SparkContext.parallelize(person).toDF()
• A partir de ficheros o bases de datos
• Usando el método .read del Spark Session
• Varias formas de usarlo
1. spark.read.csv('sales_info.csv',inferSchema=True,header=True)
2. df = spark.read.format("csv"). \
option("header", "true"). \
option("inferSchema", "true"). \
load("sales_info.csv")
19
Spark SQL
2. Carga de datos en un DF
20
Spark SQL
2. Carga de datos en un DF
21
Spark SQL
2. Carga de datos en un DF
• final_struc = StructType(fields=data_schema)
• df = spark.read.json('people.json', schema=final_struc)
22
Spark SQL
eae.es
23
Spark SQL
24
Spark SQL
25
Spark SQL
Transformaciones básicas
• Acceso a las columnas
• Opción 1 : Por nombre de la columna, df[‘Company’]
• Opción 2: Por posición, df[0]
• Opción 3: Por nombre de la columna, ‘Company’
• Otras opciones: Mediante las funciones expr, col, column
• from pyspark.sql.functions import expr, col, column
• col("Sales")
• select(): retorna un nuevo DF con las columnas seleccionadas
1. df.select(df['Company'],df['Person'])
2. df.select(df[0],df[1])
3. df.select('Company','Person’)
4. df.select(col("Company"),col("Person"))
5. df.select(df.Company, df.Person)
26
Spark SQL
Transformaciones básicas
• Usando expr tengo la posibilidad de aplicar operaciones a las columnas
• from pyspark.sql.functions import expr
• df.select(expr("Company as compania"),expr("Sales * 2 as ventasPor2"))
• selectExpr(): Como es frecuente usar select con expresiones, esta función permite
hacerlo sin tener que usar la función expr.
• df.selectExpr("Company as compania","Sales * 2 as ventasPor2")
• Pero también así con ayuda de la función alias:
• df.select(df['Company'].alias('compania'),(df['Sales']*2).alias(
'ventasPor2’))
• Múltiples formas de trabajar con el DF para
obtener el mismo resultado.
27
Spark SQL
Transformaciones básicas
• withColumn(): Permite añadir una nueva columna al Data Frame.
• from pyspark.sql.functions import lit
• df.withColumn('A number',lit(1))
• df.withColumn('SalesOver200',expr("Sales >=200")).show()
28
Spark SQL
Transformaciones básicas
• where: Permite aplicar una expresión de filtrado al Data Frame.
• df.where(df['Company'] == 'GOOG')
Transformaciones básicas
• orderBy: Permite ordenar un DF por orden ascendente o descendente de una o más
columnas.
• df.orderBy(df['Sales’])
30
Spark SQL
Transformaciones básicas
• distinct: Permite generar un Data Frame sin filas duplicadas.
• from pyspark.sql.functions import *
• df.select('Company').distinct()
• union: Permite concatenar dos DF en un nuevo DF. Ambos DF deben tener el mismo
esquema.
• dfSales2.show()
• dfSales2.show()
• df3union = df.union(dfSales2)
31
Spark SQL
Transformaciones de agregación
• count: Permite hacer un recuento de los valores que tiene una columna, devolviendo el
resultado como una columna de un nuevo DF. No tiene en cuenta los valores NULL.
• df.select(count(df['Company']))
• countDistinct : Similar a la anterior, pero solo tiene recuenta los valores que son
distintos en la columna indicada.
• df.select(countDistinct(df['Company’]))
• df.select(countDistinct(df['Company']).alias('Companias
Distintas'))
• min and max: Calculo de valores máximos y mínimos para un columna indicada.
• df.select(min(df['Sales']),max(df['Sales']))
32
Spark SQL
Transformaciones de agregación
• sum: Cálculo de la suma total de todos los valores de una columna.
• df.select(sum(df['Sales’]))
• avg: Cálculo de la media de los valores de una columna. Mean() es equivalente.
• df.select(sum(df['Sales']).alias('ventasTotales'),avg(df[
'Sales']).alias('mediaVentas’))
• varianza y desviación estándar: Cálculo de la varianza y desviación estándar de
una columna.
• df.select(variance('Sales'),stddev('Sales’))
• describe: Devuelve un nuevo DF con el recuento, media, stdev, mínimo y máximo de
cada una de las columnas del DF sobre el que se aplica. Es útil para la exploración
estadística de un DF, por ejemplo en un proceso de ML.
• df.select(variance('Sales'),stddev('Sales’))
33
Spark SQL
Transformaciones de agregación
• Hasta ahora agregaciones a nivel de DF
• Aplicamos la función de agregación sobre todos los valores de una columna de
DF.
• Es más común la agregación de valores sobre grupos de columnas del DF.
• Esto una operación Group By de SQL.
• En Spark SQL una operación Group By se implementa como:
1. Especificamos con el método GroupBy las columnas sobre las que queremos
agrupar.
2. Aplicamos una de las transformaciones de agregación que hemos visto
previamente.
• Ejemplo:
• df.groupBy(df['Company']).max()
34
Spark SQL
Transformaciones de agregación
• agg: Una alternativa para aplicar una función de agregación sobre los grupos creados
en el DF con groupBy, es usar una o más expresiones de agregación mediante agg().
• df.groupBy(df['Company']). \
•
agg(sum(df['Sales']).alias('SumaVentas'),count(df['Sales'
]).alias('Recuento'))
35
Spark SQL
Transformaciones de unión
• Tipos de join soportados por Spark
• Inner joins: mantener solo las filas con las claves que existen al mismo
tiempo en los conjuntos de datos de la izquierda y la derecha.
• Outer joins: mantener filas con claves en los conjuntos de datos de la
izquierda o la derecha.
• Left outer joins: mantener las filas con las teclas en el conjunto de datos de
la izquierda.
• Right outer joins: mantener las filas con las claves en el conjunto de datos
de la derecha.
• Left semi joins: mantener las filas en la izquierda, y sólo la izquierda,
conjunto de datos donde la clave aparece en el conjunto de datos de la
derecha.
• Left anti joins: mantener las filas en la izquierda, y sólo la izquierda,
conjunto de datos donde no aparecen en el conjunto de datos de la derecha.
• Cross: producto Cartesiano.
36
Spark SQL
Transformaciones de unión
• Inner join.
• condition = dfA['colName'] == dfB['colName’]
• jointype = “inner”
• dfJoined = dfA.join(dfB, condition, jointype)
• Ejemplo:
• dfJoined = df.join(dfDesc,(df['Company'] ==
dfDesc['Company']),"inner")
37
Spark SQL
5. Persistencia
eae.es
38
Spark SQL
5. Persistencia
Destinos de datos
• Spark SQL incorpora integración “core” con 6 tipos de fuentes de datos
• CSV
• JSON
• Parquet
• ORC
• JDBC/ODBC
• Archivos de texto planos
• Y soporta muchas otras externas desarrolladas por la comunidad
• Cassandra
• MongoDB
• XML
• AWS Redshift
• Y muchas otras ...
39
Spark SQL
5. Persistencia
Método write
• Todas las fuentes anteriores implementan los métodos read() y write()
• Ya estudiamos previamente el método read() para cargar datos desde una
fuente a un DF
• Método write
• Permite guardar los datos de un DF en un sistema de almacenamiento.
• En la mayoría de casos, posible escritura en paralelo, de forma distribuida.
• Los parámetros esenciales:
• Formato (ej. CSV, JSON)
• Modo de escritura (ej. overwrite)
• Ruta de la carpeta.
• Pero cada formato puede añadir parámetros específicos
40
Spark SQL
5. Persistencia
Método write
• Los parámetros esenciales suelen ser:
• Formato (ej. CSV, JSON)
• Modo de escritura
• errorIfExists (por defecto): Falla si ya existen datos en la ruta.
• append: Añade archivos (datos) a la ruta.
• overwrite: Sobrescribe la ruta completa con los nuevos archivos (borra lo que había).
• ignore: Como errorIfExists pero sin generar error, no realiza la escritura simplemente
ignora la orden.
• Ruta de la carpeta
• La salida es una carpeta, dónde pueden haber uno o más archivos.
• También pueden haber subcarpetas en el caso de que apliquemos particionamiento.
• La carpeta padre representa al Dataset o tabla persistida.
41
Spark SQL
5. Persistencia
Escritura en CSV
• Ejemplo
42
Spark SQL
5. Persistencia
Escritura en Parquet
• Almacenar los datos en CSV está bien para escenarios en los que:
• Datos fuente solo disponibles en este formato
• Es un requisito para la salida final de los procesos.
• Sin embargo, se recomienda hacer uso de formatos de ficheros más
optimizados, como ORC o Parquet.
• Parquet es
• Un formato de archivo columnar Optimo para selección de columnas y permite
añadir nuevas.
• Guarda el esquema de datos No nos hace falta guardar los datos con cabeceras, ni
inferir los tipos de datos en lectura.
• Es Splittable Optimiza la lectura distribuida y por tanto el procesamiento.
43
Spark SQL
5. Persistencia
Escritura en Parquet
• Parquet
44
Spark SQL
5. Persistencia
Escritura de datos en paralelo
• El número de archivos de datos escritos es dependiente del número
de particiones que el DF tiene en el momento de ejecutar la escritura.
• Por defecto, se escribe 1 fichero por partición
• Podemos alterar el número de particiones de un DF con método repartition(num
partitions)
45
Spark SQL
5. Persistencia
Escritura de datos en paralelo
• Particionado
• Técnica que permite crear particiones de los datos para decidir como se
almacenan los datos.
• Puede optimizar la lectura/escritura de datos:
• Leer/escribir datos solo para una partición o conjunto de ellas (filtro por
partición)
• Tamaño de archivos Problema de los archivos pequeños en Hadoop,
también es un problema para Spark.
46
Spark SQL
5. Persistencia
Escritura de datos en paralelo
• Particionado
• Ejemplo de particionamiento por
columna compañía.
47
Spark SQL
eae.es
48
Spark SQL
6. Funciones avanzadas y UDF
Funciones avanzadas
• Spark SQL cuenta con muchas funciones avanzadas
• https://spark.apache.org/docs/latest/api/sql/index.html
• Tratamiento de fechas
• to_date, datediff, dayofmonth, from_utc_timestamp,…
• Funciones matemáticas y estadísticas
• approx_count_distinct, percentile, dense_rank, stdev_pop,
kurtosis, skewness, levenshtein,…
• Agregación
• rollup, grouping,...
• Tratamiento de tipos de datos complejos
• explode, named_struct, struct,…
• regexp_extract, xpath,…
• Otras
• monotonically_increasing_id, md5,…
49
Spark SQL
6. Funciones avanzadas y UDF
Funciones de ventana
• Nos permiten realizar operaciones avanzadas sobre agrupaciones de un
DF (groupBy)
• from pyspark.sql.window import Window, max, col, rank
• maxSales = max(dfCS["Sales"]).over(windowsSpec)
• rankSales = rank().over(windowsSpec)
• dfCS.select(df['Company'],df['Sales'], \
maxSales.alias('maxSales'), \
rankSales.alias('rankingCompania')).show()
50
Spark SQL
6. Funciones avanzadas y UDF
Funciones UDF
• Además de las funciones propias de Spark, el usuario puede crearse
sus propias funciones para aplicar sobre los elementos de un DF
• UDF: User Defined Functions
• Característica muy potente que permite aumentar la funcionalidad de Spark.
• Esta funciones puede construirse usando las librerías de Python o incluso librerías
externas.
• Por defecto, se registran como funciones temporales en el objeto Spark Session para
poder usarlas en los Worker Nodes.
51
Spark SQL
6. Funciones avanzadas y UDF
Funciones UDF
• Ejemplo:
• from pyspark.sql.functions import udf
• def potencia3(x):
return x ** 3
• potencia3udf = udf(potencia3)
• df.select(df['Company'],df['Person'],potencia3udf(df['Sales’]). \
alias('sales3power'))
52
Spark SQL
7. Técnicas de optimización
eae.es
53
Spark SQL
7. Técnicas de optimización
Algunas técnicas
• Algunas técnicas que nos permitirán optimizar nuestros programas Spark
• Lectura/escritura de datos en paralelo
• Uso de compresión en formatos de entrada salida
• Ajuste del particionado
• Evitar la recolección de datos excesivos al programa driver
• Cacheado de Data Frames
54
Spark SQL
7. Técnicas de optimización
Caching
• Funciona como una acción que computa las transformaciones previas y
guarda en resultado en memoria (por defecto) o disco
• df.cache()
• Se aplican los mismos principios que para los RDD
• Permite mejorar el rendimiento cuando un mismo DF se utiliza
• Como base para la creación de múltiples DF’s: distintas ramas del DAG que describe
el proceso a partir de ese nodo.
• En repetidas iteraciones de un algoritmo (ej. Machine Learning)
• Si solo se usa una vez ese DF en todo el programa, cachear no aporta mejoras.
55
Spark SQL
eae.es
56
Spark SQL
8. Modo SQL e integración con Hive
Modos SQL
• Hasta ahora hemos visto como trabajar con Data Frames usando las
funciones de la API de Spark SQL.
• Sin embargo, existen dos formas más de trabajar que permiten ejecutar
sentencias en lenguaje SQL (sin funciones, ANSI SQL:2003)
• Interfaz programática de SQL
• Spark SQL CLI
• Además, podemos levantar Spark como base de datos en memoria y
soportar conectividad J/ODBC
• Usando el servicio Thrift Server
• Gracias a el Catálogo (Catalog) podemos crear bases de datos y tablas con
persistencia aún cuando el proceso Spark esté apagado.
57
Spark SQL
8. Modo SQL e integración con Hive
Interfaz programática de SQL
• Permite ejecutar sentencias SQL
sobre un Data Frame.
58
Spark SQL
8. Modo SQL e integración con Hive
Sparl SQL CLI
• Útil para hacer consultas SQL en modo local desde línea de comandos
• No se puede comunicar con Thrift J/OBDC Server.
• Para iniciarlo ejecutar:
• ./bin/spark-sql
• Para configurar la integración con Hive es necesario copiar los siguientes
archivos en /conf
• hive-site.xml, core-site.xml, and hdfs-site.xml
59
Spark SQL
8. Modo SQL e integración con Hive
Thrift Server
• Permite levantar Spark como base de datos en memoria y soportar
conectividad J/ODBC
• Por ejemplo, puede ser útil para conectar Spark con Tableau.
• Para iniciar Thrift Server ejecutar el script
• ./sbin/start-thriftserver.sh
• Por defecto, el servidor escucha en el Puerto 10000
• localhost:10000
60
Spark SQL
8. Modo SQL e integración con Hive
Integración con Hive
• Antes del auge de Apache Spark, Hive era la principal herramienta para el
procesamiento de Big Data con SQL.
• Hasta Facebook, creador de Apache Hive, hace uso de Spark para ejecutar cargas de
trabajo previamente implementadas en Hive.
• Apache Spark permite integrarse perfectamente con Hive:
• Acceso de lectura/escritura a las misma base de datos (Hive Metastore)
• Es necesario configurar spark.sql.hive.metastore.versión para indica la versión de
Hive a la que se accede.
• Soporte para queries con sintaxis HiveQL
61
Spark SQL
9. Casos de uso
eae.es
62
Spark SQL
9. Casos de uso
Algunos casos de uso
• Actualmente Spark se está usando como parte fundamental de las
arquitecturas Big Data en muchas empresas.
• Procesos de calidad de datos en comunicaciones direccionales
• Plataforma Big Data de Uber
• Empresas de Logística (ej. optimización de rutas, consumo de combustible,…)
• Sector Turismo (ej. análisis de datos de transporte)
• Análisis de datos de campañas de marketing digital
• Plataformas de empresas del sector energético
• Y muchas otras más,….
63
Spark SQL
9. Casos de uso
Calidad de datos
• https://aws.amazon.com/es/solutions/case-studies/buildfax/
• BuildFax entrega todos sus datos por dirección postal.
• La información de la dirección a veces es bastante escasa: Número de la calle, el nombre de la
calle y el condado, pero no la ciudad, el estado o el código postal.
64
Spark SQL
9. Casos de uso
Calidad de datos
• Ejemplo de arquitectura
65
Spark SQL
9. Casos de uso
Calidad de datos
• Ejemplo de código
• Comparación de las direcciones contra callejeros de Correos,…
• Fuzzy Matching: Definición de algoritmo de de-duplicación basado en técnicas
estadísticas (Cadenas de levensthein ponderadas).
66
Spark SQL
9. Casos de uso
Plataforma Big Data de Uber
• Spark se usa en Uber para los procesos ETL y Machine Learning.
• https://eng.uber.com/uber-big-data-platform/
67
We make
it happen
FIN UNIDAD
eae.es