Agregaciones básicas en Spark
Introducción
Una de las tareas más común trabajando con Spark son las agregaciones.
Por ejemplo, en SQL haríamos lo siguiente:
Select avg(salario) as SalarioMedia from empleados group by genero
Esto nos calcula la media de salario para cada genero.
¿Cómo se hace esto en Spark? Pues curiosamente hay muchas formas, podemos recurrir a spark-sql y ejecutar exactamente esa query, pero personalmente no me gusta NADA hacer uso de spark-sql, por lo tanto, eliminamos una de las maneras de realizarlo 😄.
La forma más correcta (desde mi punto de vista, claro) de escribir eso sería:
empleados.groupBy($"genero").agg(functions.avg($"salario") as "SalarioMedia")
DataFrame.groupBy
"DataFrame. groupBy" (o Dataset, no importa, recordemos que DataFrame = Dataset[Row]) es una funcion que tienen todos los dataframe para poder realizar agregaciones, es importante entender que esta función NO devuelve un dataframe, sino un RelationalGroupedDataset, que sería un dataframe pero agrupado por una clave.
Es decir, una vez hagamos groupBy, temporalmente dejamos de tener un dataframe y las funciones que podemos utilizar cambian, por ejemplo, no podemos hacer select o join con un RelationalGroupedDataset, debemos antes convertirlo a Dataframe.
RelationalGroupedDataset
Esta clase, como hemos dicho representa un dataframe agregado, nota que practicamente tenemos 3 o 4 funciones disponibles en esta clase.
"RelationalGroupedDataset.agg" es la clave, esta funcion nos permite ejecutar funciones de agregación sobre los datos agregados y lo mejor de todo, devuelve un dataframe, por lo que después recuperaremos nuestras funcionalidades de antes.
Funciones de agregación de Spark
En Spark tenemos muchas funciones de agregación, quizás las más comunes sean estas:
- sum -> Devuelve la suma de todos los valores agregados por cada clave de agregación de una columna
- min -> Devuelve el valor más pequeño por cada clave de agregación de una columna
- max -> Devuelve el valor más grande por cada clave de agregación de una columna
- avg -> Devuelve el valor medio por cada clave de agregación de una columna
- count -> Devuelve la cantidad de elementos por cada clave de agregación de una column
Pero estas no son todas, tenemos otras funciones muy interesantes como por ejemplo:
- collect_list -> Devuelve todos los elementos como una lista por cada clave de agregación de una columna
Teneis una lista completa (en inglés) en The Internals of Spark SQL.
¿Y cómo se utiliza una función de agregación? Bueno, las funciones de agregación se pueden usar dentro de "agg()", pero devuelven columnas, por lo que se le puede aplicar renombrado/alias u otras operaciones que realices con una columna. También podemos utilizaras en otras situaciones especiales como en una agregación window o en un select directamente.
Código base para practicas
Como siempre, os dejo el código base para generar el dataframe con el que realizaremos las siguientes operaciones, en este caso es un dataframe pequeñito, pero hacemos uso de las funciones de generación de datos de siempre, que podéis encontrar aquí.
import org.apache.spark.sql.SparkSession import pruebas.GeneradorAleatorio.{func_randomEnum, func_randomEnumProb, func_randomNum} object TestAgg { def main(args: Array[String]): Unit = { val spark = SparkSession .builder .appName("TestJoin") .getOrCreate() import spark.implicits._ val df_1M = spark.range(1000000) val empleados = df_1M .withColumn("id_categoria", func_randomNum(0, 100)()) .withColumn("salario", func_randomNum(15000, 50000)()) .withColumn("dimension1", func_randomEnum(Seq("A", "B", "C", "D"))()) .withColumn("genero", func_randomEnumProb( Map( "Hombre" -> 0.6F, "Mujer" -> 0.3F, "Otro" -> 0.1F ))()).cache() } }
Calcular salario más bajo, más alto y de media para las mujeres y hombres
Una operación clásica, tiene esta forma (ejecutado en Spark-Shell):
scala> :paste // Entering paste mode (ctrl-D to finish) empleados .filter($"genero" isin ("Mujer", "Hombre")) .groupBy($"genero").agg( functions.count($"genero") as "CantidadEmpleados", functions.max($"salario") as "salarioMaximo", functions.min($"salario") as "salarioMinimo" ).show() // Exiting paste mode, now interpreting. +------+-----------------+-------------+-------------+ |genero|CantidadEmpleados|salarioMaximo|salarioMinimo| +------+-----------------+-------------+-------------+ |Hombre| 600726| 49999| 15000| | Mujer| 299159| 49999| 15000| +------+-----------------+-------------+-------------+
Nota que hay un filter antes de realizar la agregación, es MUY IMPORTANTE reducir el flujo antes de realizar determinadas operaciones como agregaciones si este contiene datos que no necesitamos.
En este caso, no importa si hacemos el filter antes o despues de la agregación, pues Spark lo va a optimizar y realmente ejecutará lo mismo, pero es buena practica en Spark escribir los filtros de la forma adecuada, pues no siempre se van a poder realizar estas optimizaciones de forma automática. Es mejor no confiar.
También he añadido la columna adicional contando la cantidad de empleados de cada género para demostrar que las funciones de agregación se pueden mezclar.
Calcular cuantas personas de cada genero cobran el salario máximo
scala> :paste // Entering paste mode (ctrl-D to finish) val df_maxSalario = empleados .agg(functions.max($"salario") as "maxSalario") val broadcastedMaxSalario = functions.broadcast(df_maxSalario) broadcastedMaxSalario .join(empleados, functions.col("maxSalario") === empleados("salario")) .groupBy($"genero") .agg(functions.count(functions.lit(1)) as "cantidad_de_empleados", functions.first("salario") as "salario") .show() // Exiting paste mode, now interpreting. +------+---------------------+-------+ |genero|cantidad_de_empleados|salario| +------+---------------------+-------+ | Otro| 4| 49999| |Hombre| 20| 49999| | Mujer| 9| 49999| +------+---------------------+-------+
Nota como en este caso se ha hecho uso de Broadcast, pero tambien de la función ".agg" directamente sobre el dataframe.
Al aplicar directamente ".agg" sobre dataframe hacemos uso de una abreviatura de "df.groupBy().agg()", donde estamos agregando todo el dataframe, es decir, calculamos el salario máximo.
Despues, como sabemos que este dataframe siempre tendrá 1 solo registro, realizamos un broadcast para asegurarnos de que el join actuará como un filter y será muy rápido para finalmente agregar por genero y hacer un count. Lo más probable es que no sea necesario un broadcast explicito, pero siempre creo que lo mejor es indicarlo todo.
También añadimos la función first("salario") para recuperar el salario, otra opción sería añadirlo al groupBy.
Otras formas de agregar
Finalmente, indicar que existen otras agregaciones como las funciones window (lo mismo que en SQL) o la posibilidad de indicar la operación de agregación con un Map, siendo:
empleados.groupBy($"genero").agg(functions.avg($"salario"))Igual a
val mapAgg = Map("salario" -> "avg") empleados.groupBy($"genero").agg(mapAgg)
Lo cual puede resultar muy útil en ciertos casos
Comentarios
Publicar un comentario