sparklyr
1.4 ya está disponible en grua! Para instalar sparklyr
1.4 de CRAN, ejecutar
En esta publicación de weblog, mostraremos las siguientes nuevas funcionalidades muy esperadas del sparklyr
Versión 1.4:
Muestreo ponderado paralelo
Lectores familiarizados con dplyr::sample_n()
y dplyr::sample_frac()
Las funciones pueden haber notado que ambas admiten casos de uso de muestreo ponderado en marcos de datos R, por ejemplo,
dplyr::sample_n(mtcars, dimension = 3, weight = mpg, change = FALSE)
mpg cyl disp hp drat wt qsec vs am gear carb
Fiat 128 32.4 4 78.7 66 4.08 2.200 19.47 1 1 4 1
Merc 280C 17.8 6 167.6 123 3.92 3.440 18.90 1 0 4 4
Mazda RX4 Wag 21.0 6 160.0 110 3.90 2.875 17.02 0 1 4 4
y
dplyr::sample_frac(mtcars, dimension = 0.1, weight = mpg, change = FALSE)
mpg cyl disp hp drat wt qsec vs am gear carb
Honda Civic 30.4 4 75.7 52 4.93 1.615 18.52 1 1 4 2
Merc 450SE 16.4 8 275.8 180 3.07 4.070 17.40 0 0 3 3
Fiat X1-9 27.3 4 79.0 66 4.08 1.935 18.90 1 1 4 1
seleccionará algún subconjunto aleatorio de mtcars
usando el mpg
atributo como peso de muestreo para cada fila. Si change = FALSE
se establece, entonces se elimina una fila de la población de muestreo una vez que se selecciona, mientras que cuando se establece change = TRUE
cada fila siempre permanecerá en la población de muestreo y se puede seleccionar varias veces.
Ahora se admiten exactamente los mismos casos de uso para los marcos de datos Spark en sparklyr
1.4! Por ejemplo:
devolverá un subconjunto aleatorio de tamaño 5 del marco de datos de Spark mtcars_sdf
.
Más importante aún, el algoritmo de muestreo implementado en sparklyr
1.4 es algo que encaja perfectamente en el paradigma de MapReduce: como hemos dividido nuestro mtcars
datos en 4 particiones de mtcars_sdf
especificando repartition = 4L
el algoritmo primero procesará cada partición de forma independiente y en paralelo, seleccionando un conjunto de muestras de tamaño hasta 5 de cada una, y luego reducirá los 4 conjuntos de muestras a un conjunto de muestras remaining de tamaño 5 eligiendo los registros que tengan las 5 prioridades de muestreo más altas. entre todos.
¿Cómo es posible tal paralelización, especialmente para el escenario de muestreo sin reemplazo, donde el resultado deseado se outline como el resultado de un proceso secuencial? Una respuesta detallada a esta pregunta se encuentra en esta publicación de weblogque incluye una definición del problema (en explicit, el significado exacto de los pesos de muestreo en términos de probabilidades), una explicación de alto nivel de la solución precise y la motivación detrás de ella, y también algunos detalles matemáticos, todo oculto en un enlace. a un archivo PDF, de modo que los lectores no orientados a las matemáticas puedan entender la esencia de todo lo demás sin asustarse, mientras que los lectores orientados a las matemáticas puedan disfrutar resolviendo todas las integrales ellos mismos antes de echar un vistazo a la respuesta.
Verbos ordenados
Las implementaciones especializadas de los siguientes tidyr
Los verbos que funcionan eficientemente con marcos de datos de Spark se incluyeron como parte de sparklyr
1.4:
Podemos demostrar cómo esos verbos son útiles para ordenar datos a través de algunos ejemplos.
Digamos que nos dan mtcars_sdf
un marco de datos de Spark que contiene todas las filas de mtcars
más el nombre de cada fila:
# Supply: spark<?> [?? x 12]
mannequin mpg cyl disp hp drat wt qsec vs am gear carb
<chr> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
1 Mazda RX4 21 6 160 110 3.9 2.62 16.5 0 1 4 4
2 Mazda RX4 W… 21 6 160 110 3.9 2.88 17.0 0 1 4 4
3 Datsun 710 22.8 4 108 93 3.85 2.32 18.6 1 1 4 1
4 Hornet 4 Dr… 21.4 6 258 110 3.08 3.22 19.4 1 0 3 1
5 Hornet Spor… 18.7 8 360 175 3.15 3.44 17.0 0 0 3 2
# … with extra rows
y nos gustaría convertir todos los atributos numéricos en mtcar_sdf
(en otras palabras, todas las columnas excepto la mannequin
columna) en pares clave-valor almacenados en 2 columnas, con el key
columna que almacena el nombre de cada atributo, y el worth
columna que almacena el valor numérico de cada atributo. Una forma de lograrlo con tidyr
es utilizando el tidyr::pivot_longer
funcionalidad:
mtcars_kv_sdf <- mtcars_sdf %>%
tidyr::pivot_longer(cols = -mannequin, names_to = "key", values_to = "worth")
print(mtcars_kv_sdf, n = 5)
# Supply: spark<?> [?? x 3]
mannequin key worth
<chr> <chr> <dbl>
1 Mazda RX4 am 1
2 Mazda RX4 carb 4
3 Mazda RX4 cyl 6
4 Mazda RX4 disp 160
5 Mazda RX4 drat 3.9
# … with extra rows
Para deshacer el efecto de tidyr::pivot_longer
podemos aplicar tidyr::pivot_wider
a nuestro mtcars_kv_sdf
Spark marco de datos y recuperar los datos originales que estaban presentes en mtcars_sdf
:
tbl <- mtcars_kv_sdf %>%
tidyr::pivot_wider(names_from = key, values_from = worth)
print(tbl, n = 5)
# Supply: spark<?> [?? x 12]
mannequin carb cyl drat hp mpg vs wt am disp gear qsec
<chr> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
1 Mazda RX4 4 6 3.9 110 21 0 2.62 1 160 4 16.5
2 Hornet 4 Dr… 1 6 3.08 110 21.4 1 3.22 0 258 3 19.4
3 Hornet Spor… 2 8 3.15 175 18.7 0 3.44 0 360 3 17.0
4 Merc 280C 4 6 3.92 123 17.8 1 3.44 0 168. 4 18.9
5 Merc 450SLC 3 8 3.07 180 15.2 0 3.78 0 276. 3 18
# … with extra rows
Otra forma de reducir muchas columnas en menos es mediante el uso tidyr::nest
para mover algunas columnas a tablas anidadas. Por ejemplo, podemos crear una tabla anidada. perf
encapsulando todos los atributos relacionados con el rendimiento de mtcars
(a saber, hp
, mpg
, disp
y qsec
). Sin embargo, a diferencia de los marcos de datos de R, los marcos de datos Spark no tienen el concepto de tablas anidadas, y lo más parecido a las tablas anidadas que podemos conseguir es una perf
columna que contiene estructuras con nombre con hp
, mpg
, disp
y qsec
atributos:
mtcars_nested_sdf <- mtcars_sdf %>%
tidyr::nest(perf = c(hp, mpg, disp, qsec))
Luego podemos inspeccionar el tipo de perf
columna en mtcars_nested_sdf
:
sdf_schema(mtcars_nested_sdf)$perf$sort
[1] "ArrayType(StructType(StructField(hp,DoubleType,true), StructField(mpg,DoubleType,true), StructField(disp,DoubleType,true), StructField(qsec,DoubleType,true)),true)"
e inspeccionar elementos de estructura individuales dentro perf
:
hp mpg disp qsec
110.00 21.00 160.00 16.46
Finalmente, también podemos utilizar tidyr::unnest
para deshacer los efectos de tidyr::nest
:
mtcars_unnested_sdf <- mtcars_nested_sdf %>%
tidyr::unnest(col = perf)
print(mtcars_unnested_sdf, n = 5)
# Supply: spark<?> [?? x 12]
mannequin cyl drat wt vs am gear carb hp mpg disp qsec
<chr> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
1 Mazda RX4 6 3.9 2.62 0 1 4 4 110 21 160 16.5
2 Hornet 4 Dr… 6 3.08 3.22 1 0 3 1 110 21.4 258 19.4
3 Duster 360 8 3.21 3.57 0 0 3 4 245 14.3 360 15.8
4 Merc 280 6 3.92 3.44 1 0 4 4 123 19.2 168. 18.3
5 Lincoln Con… 8 3 5.42 0 0 3 4 215 10.4 460 17.8
# … with extra rows
Escalador robusto
Escalador robusto es una nueva funcionalidad introducida en Spark 3.0 (CHISPA-28399). gracias a un solicitud de extracción por @cero323una interfaz R para RobustScaler
a saber, el ft_robust_scaler()
función, ahora es parte de sparklyr
.
A menudo se observa que muchos algoritmos de aprendizaje automático funcionan mejor con entradas numéricas estandarizadas. Muchos de nosotros hemos aprendido en estadísticas 101 que dada una variable aleatoria (INCÓGNITA)podemos calcular su media (mu = E[X])desviación estándar (sigma = sqrt{E[X^2] – (E[X])^2})y luego obtener una puntuación estándar (z = frac{X – mu}{sigma}) que tiene media 0 y desviación estándar 1.
Sin embargo, observe ambos (MI[X]) y (MI[X^2]) desde arriba son cantidades que pueden ser fácilmente sesgadas por valores atípicos extremos en (INCÓGNITA)provocando distorsiones en (z). Un caso particularmente grave sería si todos los valores no atípicos entre (INCÓGNITA) están muy cerca de (0)por lo tanto haciendo (MI[X]) cerca de (0)mientras que los valores atípicos extremos están muy en la dirección negativa, lo que arrastra hacia abajo (MI[X]) mientras se inclina (MI[X^2]) hacia arriba.
Una forma alternativa de estandarizar (INCÓGNITA) Con base en su mediana, los valores del 1er cuartil y del 3er cuartil, todos los cuales son robustos frente a valores atípicos, sería el siguiente:
(displaystyle z = frac{X – textual content{Mediana}(X)}{textual content{P75}(X) – textual content{P25}(X)})
y esto es precisamente lo que Escalador robusto ofertas.
para ver ft_robust_scaler()
en acción y demostrar su utilidad, podemos repasar un ejemplo synthetic que consta de los siguientes pasos:
- Extraiga 500 muestras aleatorias de la distribución regular estándar
[1] -0.626453811 0.183643324 -0.835628612 1.595280802 0.329507772
[6] -0.820468384 0.487429052 0.738324705 0.575781352 -0.305388387
...
- Inspeccione los valores mínimos y máximos entre los (500) muestras aleatorias:
[1] -3.008049
[1] 3.810277
- Ahora crea (10) otros valores que son valores atípicos extremos en comparación con los (500) muestras aleatorias arriba. Dado que sabemos todo (500) Las muestras están dentro del rango de ((-4, 4))podemos elegir (-501, -502, ldots, -509, -510) como nuestro (10) valores atípicos:
outliers <- -500L - seq(10)
- Copiar todo (510) valores en un marco de datos de Spark llamado
sdf
library(sparklyr)
sc <- spark_connect(grasp = "native", model = "3.0.0")
sdf <- copy_to(sc, information.body(worth = c(sample_values, outliers)))
- Entonces podemos aplicar
ft_robust_scaler()
para obtener el valor estandarizado para cada entrada:
- Al trazar el resultado se muestra que los puntos de datos no atípicos se escalan a valores que aún forman más o menos una distribución en forma de campana centrada alrededor (0)como se esperaba, por lo que la escala es sólida frente a la influencia de los valores atípicos:
- Finalmente, podemos comparar la distribución de los valores escalados anteriores con la distribución de las puntuaciones z de todos los valores de entrada, y notar cómo escalar la entrada solo con la media y la desviación estándar habría causado una asimetría notable, que el escalador robusto ha evitado con éxito:
all_values <- c(sample_values, outliers)
z_scores <- (all_values - imply(all_values)) / sd(all_values)
ggplot(information.body(scaled = z_scores), aes(x = scaled)) +
xlim(-0.05, 0.2) +
geom_histogram(binwidth = 0.005)
- De los 2 gráficos anteriores, se puede observar que ambos procesos de estandarización produjeron algunas distribuciones que todavía tenían forma de campana, la producida por
ft_robust_scaler()
está centrado alrededor (0)indicando correctamente el promedio entre todos los valores no atípicos, mientras que la distribución de puntuación z claramente no se centra alrededor (0) ya que su centro ha sido notablemente desplazado por el (10) valores atípicos.
RÁPIDOS
Los lectores que siguen de cerca los lanzamientos de Apache Spark probablemente hayan notado la reciente incorporación de RÁPIDOS Soporte de aceleración de GPU en Spark 3.0. Para ponerse al día con este desarrollo reciente, también se creó una opción para habilitar RAPIDS en conexiones Spark en sparklyr
y enviado en sparklyr
1.4. En un host con {hardware} suitable con RAPIDS (por ejemplo, una instancia Amazon EC2 de tipo ‘p3.2xlarge’), se puede instalar sparklyr
1.4 y observe que la aceleración de {hardware} de RAPIDS se refleja en los planes de consultas físicas de Spark SQL:
library(sparklyr)
sc <- spark_connect(grasp = "native", model = "3.0.0", packages = "rapids")
dplyr::db_explain(sc, "SELECT 4")
== Bodily Plan ==
*(2) GpuColumnarToRow false
+- GpuProject [4 AS 4#45]
+- GpuRowToColumnar TargetSize(2147483647)
+- *(1) Scan OneRowRelation[]
Todas las funciones de orden superior recientemente introducidas desde Spark 3.0, como array_sort()
con comparador personalizado, transform_keys()
, transform_values()
y map_zip_with()
son apoyados por sparklyr
1.4.
Además, ahora se puede acceder a todas las funciones de orden superior directamente a través de dplyr
en lugar de sus hof_*
homólogos en sparklyr
. Esto significa, por ejemplo, que podemos ejecutar lo siguiente dplyr
consultas para calcular el cuadrado de todos los elementos de la matriz en la columna x
de sdf
y luego ordenarlos en orden descendente:
library(sparklyr)
sc <- spark_connect(grasp = "native", model = "3.0.0")
sdf <- copy_to(sc, tibble::tibble(x = record(c(-3, -2, 1, 5), c(6, -7, 5, 8))))
sq_desc <- sdf %>%
dplyr::mutate(x = remodel(x, ~ .x * .x)) %>%
dplyr::mutate(x = array_sort(x, ~ as.integer(signal(.y - .x)))) %>%
dplyr::pull(x)
print(sq_desc)
[[1]]
[1] 25 9 4 1
[[2]]
[1] 64 49 36 25
Reconocimiento
En orden cronológico, nos gustaría agradecer a las siguientes personas por sus contribuciones a sparklyr
1.4:
También apreciamos los informes de errores, las solicitudes de funciones y otros comentarios valiosos sobre sparklyr
de nuestra increíble comunidad de código abierto (por ejemplo, la función de muestreo ponderado en sparklyr
1.4 fue motivado en gran medida por esto problema de github presentado por @ajingy algunos dplyr
Las correcciones de errores relacionados en esta versión se iniciaron en #2648 y completado con esto solicitud de extracción por @wkdavis).
Por último, pero no menos importante, el autor de esta publicación de weblog está extremadamente agradecido por las fantásticas sugerencias editoriales de @javierluraschi, @batpigandmey @skeydan.
Si deseas aprender más sobre sparklyr
recomendamos consultar sparklyr.ai, spark.rstudio.comy también algunas de las publicaciones de versiones anteriores, como brillante 1.3 y brillante 1.2.
¡Gracias por leer!