Sparklyr
1.6 ya está disponible en grua!
Para instalar sparklyr
1.6 de CRAN, ejecutar
En esta publicación de weblog, destacaremos las siguientes características y mejoras de sparklyr
1.6:
Resúmenes ponderados de cuantiles
chispa apache es bien conocido por admitir algoritmos aproximados que intercambian cantidades marginales de precisión por mayor velocidad y paralelismo. Dichos algoritmos son particularmente beneficiosos para realizar exploraciones de datos preliminares a escala, ya que permiten a los usuarios consultar rápidamente ciertas estadísticas estimadas dentro de un margen de error predefinido, evitando al mismo tiempo el alto costo de los cálculos exactos. Un ejemplo es el algoritmo de Greenwald-Khanna para el cálculo en línea de resúmenes de cuantiles, como se describe en Greenwald y Khanna (2001). Este algoritmo fue diseñado originalmente para eficiente (épsilon)– aproximación de cuantiles dentro de un gran conjunto de datos sin la noción de puntos de datos que tienen pesos diferentes, y la versión no ponderada de la misma se ha implementado como
approxQuantile()
desde chispa 2.0. Sin embargo, el mismo algoritmo se puede generalizar para manejar entradas ponderadas y, como sparklyr
usuario @Zhuk66 mencionado en este problemaa
versión ponderada
de este algoritmo es útil sparklyr
característica.
Para explicar adecuadamente qué significa cuantil ponderado, debemos aclarar qué significa el peso de cada punto de datos. Por ejemplo, si tenemos una secuencia de observaciones ((1, 1, 1, 1, 0, 2, -1, -1))y nos gustaría aproximarnos a la mediana de todos los puntos de datos, entonces tenemos las dos opciones siguientes:
-
Ejecute la versión no ponderada de
approxQuantile()
en Spark para escanear los 8 puntos de datos -
O alternativamente, “comprimir” los datos en 4 tuplas de (valor, peso):
((1, 0,5), (0, 0,125), (2, 0,125), (-1, 0,25))donde el segundo componente de cada tupla representa la frecuencia con la que ocurre un valor en relación con el resto de los valores observados, y luego encuentre la mediana escaneando las 4 tuplas usando la versión ponderada del algoritmo de Greenwald-Khanna.
También podemos utilizar un ejemplo synthetic que involucre la distribución regular estándar para ilustrar el poder de la estimación cuantil ponderada en
sparklyr
1.6. Supongamos que no podemos simplemente ejecutar qnorm()
en R para evaluar la
función cuantil
de la distribución regular estándar en (p = 0,25) y (p = 0,75)¿cómo podemos tener una thought vaga sobre el primer y tercer cuantiles de esta distribución? Una forma es muestrear una gran cantidad de puntos de datos de esta distribución y luego aplicar el algoritmo de Greenwald-Khanna a nuestras muestras no ponderadas, como se muestra a continuación:
## 25% 75%
## -0.6629242 0.6874939
Tenga en cuenta que debido a que estamos trabajando con un algoritmo aproximado y hemos especificado
relative.error = 0.01
el valor estimado de (-0,6629242) desde arriba podría estar entre el percentil 24 y el 26 de todas las muestras. De hecho, cae en el (25.36896)-ésimo percentil:
## [1] 0.2536896
Ahora, ¿cómo podemos hacer uso de la estimación cuantil ponderada a partir de sparklyr
1.6 para obtener resultados similares? ¡Easy! Podemos muestrear una gran cantidad de (incógnita) valores uniformemente aleatorios de ((-infty, infty)) (o alternativamente, simplemente seleccione una gran cantidad de valores espaciados uniformemente entre ((-M, M)) dónde (METRO) es aproximadamente (infty)), y asignar cada (incógnita) valorar un peso de
(displaystyle frac{1}{sqrt{2 pi}}e^{-frac{x^2}{2}})la densidad de probabilidad de la distribución regular estándar en (incógnita). Finalmente, ejecutamos la versión ponderada de sdf_quantile()
de sparklyr
1.6, como se muestra a continuación:
library(sparklyr)
sc <- spark_connect(grasp = "native")
num_samples <- 1e6
M <- 1000
samples <- tibble::tibble(
x = M * seq(-num_samples / 2 + 1, num_samples / 2) / num_samples,
weight = dnorm(x)
)
samples_sdf <- copy_to(sc, samples, title = random_string())
samples_sdf %>%
sdf_quantile(
column = "x",
weight.column = "weight",
chances = c(0.25, 0.75),
relative.error = 0.01
) %>%
print()
## 25% 75%
## -0.696 0.662
¡Voilá! Las estimaciones no están muy lejos de los percentiles 25 y 75 (en relación con nuestro error máximo permitido de (0,01)):
## [1] 0.2432144
## [1] 0.7460144
Agrupación de iteraciones de energía
Agrupación de iteraciones de energía (PIC), un método de agrupación de gráficos easy y escalable presentado en Lin y Cohen (2010)primero encuentra una incrustación de baja dimensión de un conjunto de datos, usando una iteración de potencia truncada en una matriz de similitud por pares normalizada de todos los puntos de datos, y luego usa esta incrustación como el “indicador de conglomerado”, una representación intermedia del conjunto de datos que conduce a una integración rápida. convergencia cuando se utiliza como entrada para la agrupación de k-medias. Este proceso está muy bien ilustrado en la figura 1 de Lin y Cohen (2010) (reproducido a continuación)
en la que la imagen más a la izquierda es la visualización de un conjunto de datos que consta de 3 círculos, con puntos coloreados en rojo, verde y azul que indican resultados de agrupación, y las imágenes posteriores muestran el proceso de iteración de potencia que transforma gradualmente el conjunto unique de puntos en lo que parece ser tres segmentos de línea separados, una representación intermedia que se puede separar rápidamente en 3 grupos utilizando la agrupación de k-medias con (ok = 3).
En sparklyr
1.6, ml_power_iteration()
se implementó para hacer
funcionalidad PIC
en Spark accesible desde R. Espera como entrada un marco de datos Spark de 3 columnas que represente una matriz de similitud por pares de todos los puntos de datos. Dos de las columnas de este marco de datos deben contener índices de filas y columnas basados en 0, y la tercera columna debe contener la medida de similitud correspondiente. En el siguiente ejemplo, veremos un conjunto de datos que consta de dos círculos que se separan fácilmente en dos grupos por ml_power_iteration()
utilizándose el núcleo gaussiano como medida de similitud entre 2 puntos cualesquiera:
gen_similarity_matrix <- operate() {
# Guassian similarity measure
guassian_similarity <- operate(pt1, pt2) {
exp(-sum((pt2 - pt1) ^ 2) / 2)
}
# generate evenly distributed factors on a circle centered on the origin
gen_circle <- operate(radius, num_pts) {
seq(0, num_pts - 1) %>%
purrr::map_dfr(
operate(idx) {
theta <- 2 * pi * idx / num_pts
radius * c(x = cos(theta), y = sin(theta))
})
}
# generate factors on each circles
pts <- rbind(
gen_circle(radius = 1, num_pts = 80),
gen_circle(radius = 4, num_pts = 80)
)
# populate the pairwise similarity matrix (saved as a 3-column dataframe)
similarity_matrix <- information.body()
for (i in seq(2, nrow(pts)))
similarity_matrix <- similarity_matrix %>%
rbind(seq(i - 1L) %>%
purrr::map_dfr(~ record(
src = i - 1L, dst = .x - 1L,
similarity = guassian_similarity(pts[i,], pts[.x,])
))
)
similarity_matrix
}
library(sparklyr)
sc <- spark_connect(grasp = "native")
sdf <- copy_to(sc, gen_similarity_matrix())
clusters <- ml_power_iteration(
sdf, ok = 2, max_iter = 10, init_mode = "diploma",
src_col = "src", dst_col = "dst", weight_col = "similarity"
)
clusters %>% print(n = 160)
## # A tibble: 160 x 2
## id cluster
## <dbl> <int>
## 1 0 1
## 2 1 1
## 3 2 1
## 4 3 1
## 5 4 1
## ...
## 157 156 0
## 158 157 0
## 159 158 0
## 160 159 0
El resultado muestra que los puntos de los dos círculos se asignan a grupos separados, como se esperaba, después de solo una pequeña cantidad de iteraciones PIC.
spark_write_rds()
+ collect_from_rds()
spark_write_rds()
y collect_from_rds()
se implementan como una alternativa que eat menos memoria que gather()
. A diferencia de gather()
que recupera todos los elementos de un marco de datos de Spark a través del nodo del controlador de Spark, lo que puede causar lentitud o fallas por falta de memoria al recopilar grandes cantidades de datos.
spark_write_rds()
cuando se utiliza junto con collect_from_rds()
puede recuperar todas las particiones de un marco de datos de Spark directamente de los trabajadores de Spark, en lugar de hacerlo a través del nodo del controlador de Spark. Primero, spark_write_rds()
distribuirá las tareas de serializar particiones de marcos de datos de Spark en formato RDS versión 2 entre los trabajadores de Spark. Luego, los trabajadores de Spark pueden procesar múltiples particiones en paralelo, cada una manejando una partición a la vez y persistiendo la salida de RDS directamente en el disco, en lugar de enviar particiones de marco de datos al nodo del controlador Spark. Finalmente, las salidas RDS se pueden volver a ensamblar en marcos de datos R usando
collect_from_rds()
.
A continuación se muestra un ejemplo de spark_write_rds()
+ collect_from_rds()
uso, donde las salidas RDS se guardan primero en HDFS y luego se descargan al sistema de archivos native con hadoop fs -get
y finalmente, postprocesado con
collect_from_rds()
:
library(sparklyr)
library(nycflights13)
num_partitions <- 10L
sc <- spark_connect(grasp = "yarn", spark_home = "/usr/lib/spark")
flights_sdf <- copy_to(sc, flights, repartition = num_partitions)
# Spark employees serialize all partition in RDS format in parallel and write RDS
# outputs to HDFS
spark_write_rds(
flights_sdf,
dest_uri = "hdfs://<namenode>:8020/flights-part-{partitionId}.rds"
)
# Run `hadoop fs -get` to obtain RDS recordsdata from HDFS to native file system
for (partition in seq(num_partitions) - 1)
system2(
"hadoop",
c("fs", "-get", sprintf("hdfs://<namenode>:8020/flights-part-%d.rds", partition))
)
# Put up-process RDS outputs
partitions <- seq(num_partitions) - 1 %>%
lapply(operate(partition) collect_from_rds(sprintf("flights-part-%d.rds", partition)))
# Optionally, name `rbind()` to mix information from all partitions right into a single R dataframe
flights_df <- do.name(rbind, partitions)
Related a otros recientes sparklyr
lanzamientos, sparklyr
1.6 viene con una serie de mejoras relacionadas con dplyr, como
- Soporte para
the place()
predicado dentrochoose()
ysummarize(throughout(...))
operaciones en marcos de datos Spark - Adición de
if_all()
yif_any()
funciones - Compatibilidad whole con
dbplyr
API de back-end 2.0
choose(the place(...))
y summarize(throughout(the place(...)))
el dplyr the place(...)
La construcción es útil para aplicar una función de selección o agregación a múltiples columnas que satisfacen algún predicado booleano. Por ejemplo,
devuelve todas las columnas numéricas del iris
conjunto de datos, y
calcula el promedio de cada columna numérica.
En Sparklyr 1.6, ambos tipos de operaciones se pueden aplicar a los marcos de datos de Spark, por ejemplo,
if_all()
y if_any()
if_all()
y if_any()
son dos funciones de conveniencia de dplyr
1.0.4 (ver
aquí para más detalles) que combinan efectivamente los resultados de aplicar un predicado booleano a una selección ordenada de columnas usando la lógica and
/or
operadores.
A partir de Sparklyr 1.6, if_all()
y if_any()
También se puede aplicar a marcos de datos Spark, .por ejemplo,
Compatibilidad con dbplyr
API de back-end 2.0
Sparklyr
1.6 es totalmente appropriate con el más nuevo dbplyr
API de backend 2.0 (mediante la implementación de todos los cambios de interfaz recomendados en
aquí), manteniendo la compatibilidad con versiones anteriores de la edición anterior de dbplyr
API, para que sparklyr
los usuarios no se verán obligados a cambiar a ninguna versión explicit de
dbplyr
.
Este debería ser un cambio prácticamente no seen para el usuario a partir de ahora. De hecho, el único cambio de comportamiento discernible será el siguiente código
salida
[1] 2
si sparklyr
está trabajando con dbplyr
2.0+, y
[1] 1
si no.
Expresiones de gratitud
En orden cronológico, nos gustaría agradecer a los siguientes colaboradores por hacer sparklyr
1.6 impresionante:
También nos gustaría agradecer a la maravillosa comunidad de código abierto detrás sparklyr
sin el cual no nos habríamos beneficiado de numerosos
sparklyr
-Informes de errores relacionados y sugerencias de funciones.
Finalmente, el autor de esta publicación de weblog también agradece mucho las valiosas sugerencias editoriales de @skeydan.
Si deseas aprender más sobre sparklyr
recomendamos consultar
sparklyr.ai, spark.rstudio.comy también algunos anteriores sparklyr
publicar publicaciones como
brillante 1.5
y brillante 1.4.
Eso es todo. ¡Gracias por leer!
Greenwald, Michael y Sanjeev Khanna. 2001. “Computación en línea de resúmenes de cuantiles con eficiencia espacial”. Rec. SIGMOD. 30 (2): 58–66. https://doi.org/10.1145/376284.375670.
Lin, Frank y William Cohen. 2010. “Agrupación de iteraciones de energía”. En, 655–62.