@@ -328,23 +328,25 @@ The output of function should be a `data.frame`. Schema specifies the row format
328328{% highlight r %}
329329
330330# Determine six waiting times with the largest eruption time in minutes.
331- schema <- structType(structField("waiting", "double"), structField(" max_eruption", "double"))
331+ schema <- structType(structField("max_eruption", "double"))
332332result <- gapply(
333333 df,
334334 "waiting",
335335 function(key, x) {
336- y <- data.frame(key, max(x$eruptions))
336+ y <- data.frame(max(x$eruptions))
337337 },
338338 schema)
339+ colnames(result) <- c("waiting", "max_eruption")
340+
339341head(collect(arrange(result, "max_eruption", decreasing = TRUE)))
340342
341343## waiting max_eruption
342- ##1 64 5.100
343- ##2 69 5.067
344- ##3 71 5.033
345- ##4 87 5.000
346- ##5 63 4.933
347- ##6 89 4.900
344+ ##1 96 5.100
345+ ##2 76 5.067
346+ ##3 77 5.033
347+ ##4 88 5.000
348+ ##5 86 4.933
349+ ##6 82 4.900
348350{% endhighlight %}
349351</div >
350352
@@ -359,19 +361,19 @@ result <- gapplyCollect(
359361 df,
360362 "waiting",
361363 function(key, x) {
362- y <- data.frame(key, max(x$eruptions))
363- colnames(y) <- c("waiting", "max_eruption")
364- y
364+ y <- data.frame(max(x$eruptions))
365365 })
366+ colnames(result) <- c("waiting", "max_eruption")
367+
366368head(result[ order(result$max_eruption, decreasing = TRUE), ] )
367369
368370## waiting max_eruption
369- ##1 64 5.100
370- ##2 69 5.067
371- ##3 71 5.033
372- ##4 87 5.000
373- ##5 63 4.933
374- ##6 89 4.900
371+ ##1 96 5.100
372+ ##2 76 5.067
373+ ##3 77 5.033
374+ ##4 88 5.000
375+ ##5 86 4.933
376+ ##6 82 4.900
375377
376378{% endhighlight %}
377379</div >
@@ -445,61 +447,6 @@ head(result[order(result$max_eruption, decreasing = TRUE), ])
445447</tr >
446448</table >
447449
448- <<<<<<< HEAD
449- <div data-lang =" r " markdown =" 1 " >
450- {% highlight r %}
451-
452- # Determine six waiting times with the largest eruption time in minutes.
453- schema <- structType(structField("max_eruption", "double"))
454- result <- gapply(
455- df,
456- "waiting",
457- function(key, x) {
458- y <- data.frame(max(x$eruptions))
459- },
460- schema)
461- colnames(result) <- c("waiting", "max_eruption")
462-
463- head(collect(arrange(result, "max_eruption", decreasing = TRUE)))
464-
465- ## waiting max_eruption
466- ##1 96 5.100
467- ##2 76 5.067
468- ##3 77 5.033
469- ##4 88 5.000
470- ##5 86 4.933
471- ##6 82 4.900
472- {% endhighlight %}
473- </div >
474-
475- ##### gapplyCollect
476- Like ` gapply ` , applies a function to each partition of a ` SparkDataFrame ` and collect the result back to R data.frame. The output of the function should be a ` data.frame ` . But, the schema is not required to be passed. Note that ` gapplyCollect ` can fail if the output of UDF run on all the partition cannot be pulled to the driver and fit in driver memory.
477-
478- <div data-lang =" r " markdown =" 1 " >
479- {% highlight r %}
480-
481- # Determine six waiting times with the largest eruption time in minutes.
482- result <- gapplyCollect(
483- df,
484- "waiting",
485- function(key, x) {
486- y <- data.frame(max(x$eruptions))
487- })
488- colnames(result) <- c("waiting", "max_eruption")
489-
490- head(result[ order(result$max_eruption, decreasing = TRUE), ] )
491-
492- ## waiting max_eruption
493- ##1 96 5.100
494- ##2 76 5.067
495- ##3 77 5.033
496- ##4 88 5.000
497- ##5 86 4.933
498- ##6 82 4.900
499-
500- {% endhighlight %}
501- </div >
502-
503450#### Run local R functions distributed using ` spark.lapply `
504451
505452##### spark.lapply
0 commit comments