Generación de datos aleatorios con Dataframe

Para hacer pruebas, especialmente cuando de habla de optimización, es muy útil poder generar una tanda de datos aleatorios con cierto control.

Lo primero que se nos viene a la cabeza es crear una lista muy grande y distribuirla, pero... Esto es un problema si quieres crear 10 millones de registros.

Función Spark.range

Esta función de spark nos permite crear un dataframe con una columna id, tendrá tantos registros como le indiquemos.


import org.apache.spark.sql.SparkSession

object GeneradorAleatorio {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder
      .appName("GeneradorAleatorio")
      .getOrCreate()

    val df_1M = spark.range(1000000)
  }
}

Con esto, ya tenemos un dataframe con 1 millon de registros con un id incremental, desde 0 hasta 999.999.

La creación de las columnas la haremos mediante UDFs (o cualquier otra funcionalidad de los dataframe).

Veamos algunos ejemplos de columnas útiles, qué columnas crear dependerá del uso que queramos hacer.

Números aleatorios

La siguiente función nos creará una udf con la que podemos generar números aleatorios dentro de un rango

  def func_randomNum(min: Int, max: Int): UserDefinedFunction ={

    val aleatorio: () => Int = () => {
      Math.floor(min + (Math.random()*(max-min))).toInt
    }

    functions.udf(aleatorio)
  }

Que la podemos utilizar para construir una columna de la siguiente forma:

  val df_numero1 = df_1M.withColumn("salario", func_randomNum(15000, 50000)())

Dimension aleatoria

Otra funcionalidad muy importante es poder generar dimensiones de forma semi-aleatoria, por ejemplo, podemos partir de una lista de elementos y que se tome cada uno de los elementos de forma aleatoria, esto lo lograremos con la siguiente función:

  def func_randomEnum(seq: Seq[String]): UserDefinedFunction = {
    val min = 0
    val max = seq.size

    val aleatorio: () => String = () => {
      val randomIndex = Math.floor(min + (Math.random()*(max-min))).toInt

      seq(randomIndex)
    }

    functions.udf(aleatorio)
  }

Que se usaría de la siguiente manera:

  val df_datos = df_1M.withColumn("dimension1", func_randomEnum(Seq("A", "B", "C", "D"))())

Dimensión aleatoria con distinta probabilidad

Esta función generadora sería la misma que la anterior, pero permitiendo distinta probabilidad para cada uno de los elementos:

  def func_randomEnumProb(values: Map[String, Float]): UserDefinedFunction = {

    //Validar probabilidad
    if (values.values.sum != 1) throw new Error("La suma total de las probabilidades no es 100%")

    //convierte los numeros de porcentaje a rangos de 0 a 1.
    var acum = 0F
    def generarRangos(mapa: (String, Float)): (String, (Float, Float)) ={
      val min = acum
      val max = acum + mapa._2

      acum = max

      (mapa._1, (min, max))
    }

    val rangos = values.map(generarRangos)

    val selectRandomVal: () => String = () => {
      val randomNum = Math.random()

      def filterFun(rangeElement: (String, (Float, Float))) = rangeElement._2._1 < randomNum && rangeElement._2._2 > randomNum

      rangos.filter(filterFun).head._1
    }

    functions.udf(selectRandomVal)
  }

Para hacer uso de ella, debemos definir un Map donde la clave es el valor de la dimensión y el valor es la probabilidad de que aparezca, es decir:

  Map(
    "Hombre" -> 0.5F,
    "Mujer" -> 0.4F,
    "Otro" -> 0.1F
  )

Es necesario que la suma total de las probabilidades sea 1, esto es así porque el codigo era más facil de hacer (ya sabeis, el principio KISS).

Y se puede utilizar de la siguiente manera:
    val df_datos = df_1M.withColumn("dimension2", func_randomEnumProb(
      Map(
        "Hombre" -> 0.5F,
        "Mujer" -> 0.4F,
        "Otro" -> 0.1F
      ))())

Código resultante

Una vez escrito todo el código, el resultado sería algo tal que así:
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.{SparkSession, functions}

object GeneradorAleatorio {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder
      .appName("GeneradorAleatorio")
      .getOrCreate()

    val df_1M = spark.range(1000000)

    val df_datos = df_1M
      .withColumn("salario", func_randomNum(15000, 50000)())
      .withColumn("dimension1", func_randomEnum(Seq("A", "B", "C", "D"))())
      .withColumn("dimension2", func_randomEnumProb(
        Map(
          "Hombre" -> 0.5F,
          "Mujer" -> 0.4F,
          "Otro" -> 0.1F
        ))())

    df_datos.write.format("parquet").saveAsTable("default.GeneracionAleatoria")
  }

  def func_randomNum(min: Int, max: Int): UserDefinedFunction ={

    val aleatorio: () => Int = () => {
      Math.floor(min + (Math.random()*(max-min))).toInt
    }

    functions.udf(aleatorio)
  }

  def func_randomEnum(seq: Seq[String]): UserDefinedFunction = {
    val min = 0
    val max = seq.size

    val aleatorio: () => String = () => {
      val randomIndex = Math.floor(min + (Math.random()*(max-min))).toInt

      seq(randomIndex)
    }

    functions.udf(aleatorio)
  }

  def func_randomEnumProb(values: Map[String, Float]): UserDefinedFunction = {

    //Validar probabilidad
    if (values.values.sum != 1) throw new Error("La suma total de las probabilidades no es 100%")

    //convierte los numeros de porcentaje a rangos de 0 a 1.
    var acum = 0F
    def generarRangos(mapa: (String, Float)): (String, (Float, Float)) ={
      val min = acum
      val max = acum + mapa._2

      acum = max

      (mapa._1, (min, max))
    }

    val rangos = values.map(generarRangos)

    val selectRandomVal: () => String = () => {
      val randomNum = Math.random()

      def filterFun(rangeElement: (String, (Float, Float))) = rangeElement._2._1 < randomNum && rangeElement._2._2 > randomNum

      rangos.filter(filterFun).head._1
    }

    functions.udf(selectRandomVal)
  }
}

Una vez generados los datos, los guardaremos en una tabla Hive y podremos comprobar facilmente el resultado.

Generando datos

Una vez creado el código, toca ponerlo a prueba, una vez compilado, toca desplegarlo en un servidor, para ejecutar usaré la siguiente configuración de spark-submit:
  spark-submit --master yarn --deploy-mode cluster --executor-cores 2 --executor-memory 500m --num-executors 2 --class pruebas.GeneradorAleatorio SparkTests-1.0-SNAPSHOT.jar 

Ejecucución de la generación de 1 millon de registros
La ejecución ha sido bastante rápida, analicemos rápidamente las columnas creadas con impala.

Un primer select muy prometedor:


Veamos que tal ha salido la distribución de dimension1:

Bastante uniforme, hay más D que de los otros, pero muy ligeramente más, es un aleatorio bastante uniforme.

Ahora lo más importante y complejo, dimension2:


Se ha distribuido perfectamente, 10% Otro, 50% hombre y 40% mujer.

Finalmente, los números, veamos cuantos valores distintos se han creado:

 Como se podía esperar, hay 35.000 valores distintos (50.000 - 15.000 = 35.000).

Vaya, parece que los 2 salarios más comunes son uno cerca del limite superior y otro cerca del limite inferior, es como la vida misma:

Generación grande: 1.000 millones de registros

Veamos cuanto tarda en generar 1.000 millones de registros utilizando esta tecnica:

El tiempo de ejecución ha subido una barbaridad, de 13s a 5.9min, unas 30 veces más tiempo, generando una volumetría 1000 veces mayor, no está mal.

Veamos cuanto ocupa en hdfs esto:
Se han creado 4 ficheros (1 por core empleado) de 1.5GB, un total de 6GB generados en 6 minutos.

 Conclusión

La posibilidad de hacer uso de las UDFs combinado con la función spark.range, permite generar datos de prueba simulando casos reales utilizando el paralelismo de spark. Es decir, podriamos generar terabytes de datos en minutos si lo ejecutamos en un cluster con muchos recursos.

Por otro lado, también es una herramienta increiblemente potente en la fase de aprendizaje de spark, pues nos permite generar datos muy rápidamente con el formato exacto que deseamos para poder hacer pruebas.

Comentarios

Entradas populares de este blog

Agregaciones básicas en Spark

Optimizar joins: Tabla de hechos y tabla de dimensión cruzadas