Optimizar joins: Tabla de hechos y tabla de dimensión cruzadas
Vamos a simular un caso en el que se nos solicita realizar un desarrollo y debemos hacerlo lo más eficiente posible.
La tabla grande tiene el siguiente esquema:
Tercera propuesta: "forzando" BroadcastHashJoin

Un total de 7.2 minutos, una mejora del 49% respecto a la propuesta 1 y una mejora del 24% respecto a la propuesta 2.
Respecto a las 3 implementaciones de join cuando se hace un join entre una tabla muy grande y una muy pequeña:
Requerimientos solicitados
Se nos ha solicitado, dada una tabla de empleados y una tabla de categorías de empleados, recuperar el id de empleado y el codigo especial que le corresponde a cada uno.La tabla grande tiene el siguiente esquema:
- id (bigint): Id del empleado
- id_categoria (int): Id de la categoría en la que se encuentra el empleado
-
salario (int): Sueldo del empleado
-
dimension1 (string): Otra dimensión no necesaria
-
- id (bigint): id de la dimensión
- datos_extra1 (int): Información no necesaria
- datos_extra2 (int): Información no necesaria
- datos_extra3 (int): Información no necesaria
- codigo_especial (string): El código que se nos ha requerido recuperar
Generación de la tablas
Introducción
Para la generación de los datos se ha emplado la técnica descrita en la entrada del blog Generación de datos aleatorios con Dataframe. Las funciones empleadas en el siguiente código y la descripción del funcionamiento se encuentran en la entrada del blog.
Tabla de hechos
El código para la generación de la tabla de hechos es el siguiente, en este caso tiene 2 mil millones de registros:
def main(args: Array[String]): Unit = { val spark = SparkSession .builder .appName("GeneradorAleatorio") .getOrCreate() val df_1M = spark.range(2000000000L) val df_datos = df_1M .withColumn("id_categoria", func_randomNum(0, 1000000)()) .withColumn("salario", func_randomNum(15000, 50000)()) .withColumn("dimension1", func_randomEnum(Seq("A", "B", "C", "D"))()) .withColumn("dimension2", func_randomEnumProb( Map( "Hombre" -> 0.5F, "Mujer" -> 0.4F, "Otro" -> 0.1F ))()) df_datos.write.format("parquet").saveAsTable("default.empleados") } }
El comando de lanzamiento de spark-submit es en este caso:
spark-submit --master yarn --deploy-mode cluster --executor-cores 2 --executor-memory 1G --num-executors 2 --class pruebas.GeneradorAleatorio SparkTests-1.0-SNAPSHOT.jar
Y el resultado en hdfs es el siguiente:
4 preciosos ficheros parquet-snappy de 5GB cada uno, no ha sido a proposito pero ha salido un número muy redondo
Tabla de dimensión
En la tabla de dimensión tenemos 1 millon de registros, una volumetría 2000 veces inferior a la tabla de hechos.def main(args: Array[String]): Unit = { val spark = SparkSession .builder .appName("GeneradorDimensiones2") .getOrCreate() val df_dimension = spark.range(1000000) val df_dimension2 = df_dimension .withColumn("datos_extra1", func_randomNum(-100000000, 100000000)()) .withColumn("datos_extra2", func_randomNum(-100000000, 100000000)()) .withColumn("datos_extra3", func_randomNum(-100000000, 100000000)()) .withColumn("codigo_especial", func_randomEnumProb( Map( "AFKSX23" -> 0.2F, "FISNCI83" -> 0.2F, "CNSKJ34" -> 0.1F, "DHFJSN73" -> 0.1F, "SDJKFGB31" -> 0.1F, "DCVNMKLSDFG332" -> 0.1F, "ZDFKNJGNF423" -> 0.1F, "DFGSDFJK2334" -> 0.1F ))()) df_dimension2.write.format("parquet").saveAsTable("default.categorias") }
El comando de lanzamiento:
spark-submit --master yarn --deploy-mode client --executor-cores 1 --executor-memory 500m --num-executors 1 --class pruebas.GeneradorDimensiones2 SparkTests-1.0-SNAPSHOT.jarY el resultado en hdfs es de 2 ficheros de 7.8MB cada uno, un total de 15.6MB
Primera propuesta
Nuesta primera propuesta en este caso, sería lo más sencillo posible, cargar las 2 tablas, cruzarlas, quitar las columnas que sobran y guardarlo como una tabla nueva:import org.apache.spark.sql.SparkSession object TestJoin { def main(args: Array[String]): Unit = { val spark = SparkSession .builder .appName("TestJoin") .getOrCreate() val df_empleados_original = spark.table("default.empleados") val df_categorias = spark.table("default.categorias") val df_empleados = df_empleados_original.withColumnRenamed("id", "id_empleado") val df_final = df_empleados.join(df_categorias, df_empleados("id_categoria") === df_categorias("id")) df_final.select("id_empleado", "codigo_especial").write.saveAsTable("default.empleado_codigoEspecial") } }
Simples operaciones como renombrar una columna para evitar problemas en el join, realizar un join y guardar el resultado. En este código solo hay una acción, por lo que para spark será facil de optimizar correctamente.
El comando empleado para ejecutar es:
spark-submit --master yarn --deploy-mode client --executor-cores 2 --executor-memory 500m --num-executors 2 --class pruebas.TestJoin SparkTests-1.0-SNAPSHOT.jar
La ejecución ha tardado 14 minutos, haciendo uso de 4 cores de procesador, no es mucho tiempo pero puede optimizarse mucho aún.
Esta única acción podemos verla en Spark History de esta forma, incluye unas marcas sobre los principales cuellos de botella de este join:
Donde tenemos 2 puntos marcados:
- (Verde): Un shuffle de 44.7GB
- (Naranja): El join se hace con la implementación de join SortMergeJoin
Esto no es malo, simplemente es lo que el optimizador de spark, con la configuración que tenemos de forma predeterminada ha considerado que es lo mejor.
Pero hagamos un pequeño cambio, vamos a cambiar una de las configuraciones para indicarle a Spark que preferimos un ShuffledHashJoin antes que un SortMergeJoin.
Segunda propuesta: "forzando" ShuffledHashJoin
HashJoin es otra de las implementaciones que tiene Spark para realizar un join, normalmente ShuffledHashJoin es mejor que SortMergeJoin si el coste de calcular el hash de la columna de cruce de una de las tablas es bajo. En este caso, debido a la poca volumetría de la segunda tabla, creo que así será, pero siempre es aconsejable hacer la prueba y ver el resultado real.
El único cambio necesario en este caso sería desactivar el parámetro "spark.sql.join.preferSortMergeJoin", lo cual lo haremos simplemente añadiendo una opción en la linea de lanzamiento, sin ningun cambio en el código:
spark-submit --master yarn --deploy-mode client --executor-cores 2 --executor-memory 500m --num-executors 2 --conf spark.sql.join.preferSortMergeJoin=false --class pruebas.TestJoin SparkTests-1.0-SNAPSHOT.jar
El resultado es el siguiente:
Con un tiempo de 9.5 minutos, el ganador es el ShuffleHashJoin, una mejora del 32%.
Pero... Aún tenemos mucho shuffle, 44.7GB... Aún podemos optimizar esto más.
Tercera propuesta: "forzando" BroadcastHashJoin
En este caso, el tamaño de la tabla de dimensión es muy pequeño, especialmente si descartas las columnas no necesarias, por lo tanto es posible hacer uso de la implementación de join "BroadcastHashJoin", esto se puede hacer de distintas formas, pero por seguír con el patrón de no modificar el código, lo haremos ajustando el auto-broadcast de spark.
El cambio lo haremos en el parámetro "spark.sql.autoBroadcastJoinThreshold" el cual de forma predeterminada tiene el valor de 10MB (10485760 Bytes). En este caso sé que la tabla es muy pequeña por lo tanto lo subiré solo un poco (mucho más de lo necesario realmente, pero no importa) y pondré 50MB (52428800 Bytes).
El comando de lanzamiento sería en este caso:
spark-submit --master yarn --deploy-mode client --executor-cores 2 --executor-memory 750m --num-executors 2 --conf spark.sql.autoBroadcastJoinThreshold=52428800 --class pruebas.TestJoin SparkTests-1.0-SNAPSHOT.jar
Nótese que he aumentado la ram de cada ejecutor, esto se debe a que hacer uso de Broadcast (en este caso, automático) aumenta el uso de RAM por contenedor.
Y el resultado es:
Un total de 7.2 minutos, una mejora del 49% respecto a la propuesta 1 y una mejora del 24% respecto a la propuesta 2.
Conclusión
Con solo añadir 1 parámetro de configuración en la linea de lanzamiento y sin tocar el código, hemos conseguido reducir a la mitad el tiempo de ejecución gracias a reducir casi 50GB de shuffle y cambiar el tipo de join.
Esta mejora en un cluster de verdad será más notoria, pues en estas pruebas se está ejecutando sobre 1 solo nodo y el coste del shuffle es mucho más inferior al coste real cuando los 2 ejecutores se encuentren en distintos servidores y deban comunicarse por red.
Por desgracia no dispongo de un cluster sobre el que ejecutar este test, pero sobre 1 solo nodo también hay diferencia de rendimiento.
Respecto a las 3 implementaciones de join cuando se hace un join entre una tabla muy grande y una muy pequeña:
- BroadcastHashJoin: Mayor rendimiento, mayor coste de RAM, reduce el shuffle al minimo posible.
- ShuffledHashJoin: Rendimiento intermedio, shuffle alto
- SortMergeJoin: Rendimiento bajo, suffle alto
¿Se puede optimizar más?
Sí, hay muchas más optimizaciones que podemos tener en cuenta, pero no las voy a tratar en este momento. Cuando intentamos optimizar algo, es muy importante conocer el dato y el entorno, pero especialmente el dato.
En este punto hemos visto una mejora para un caso concreto que incluso reduce a la mitad los tiempos de ejecución y no requiere recompilar o cambiar el código. En otra ocasión trataremos otras optimizaciones.
Comentarios
Publicar un comentario