今週も特にありません

進捗どうですか?

Spark 集約関数 collect_list, collect_set

配列に集約する操作であるので、aggの中で関数を適用することになる。

scala> val df = Seq(
  ("p00001", "food", 200),
  ("p00002", "food", 500),
  ("p00003", "food", 800),
  ("p00004", "food", 500),
  ("p00005", "food", 700),
  ("p00006", "book", 900),
  ("p00007", "book", 1500),
  ("p00008", "book", 1200),
  ("p00009", "book", 2200),
  ("p00010", "book", 3000),
  ("p00011", "tool", 20000),
  ("p00012", "tool", 50000),
  ("p00013", "tool", 15000),
  ("p00014", "tool", 60000),
  ("p00015", "tool", 35000)
).toDF("id", "category", "sales")

scala > val idArrayDF = DF.
  group_by('category).
  agg(collect_list('id).as("id_array"), sum('sales).as("sales_sum"))

scala > idArrayDF.show()
+--------+--------------------+---------+
|category|            id_array|sales_sum|
+--------+--------------------+---------+
|    food|[p00001, p00002, ...|     2700|
|    tool|[p00011, p00012, ...|   180000|
|    book|[p00006, p00007, ...|     8800|
+--------+--------------------+---------+

重複を削除する場合には、collect_setを使うことになる。

さらに、配列になったものを分割して抽出する場合には、explodeを適用すればよい。

scala > idArrayDF.
  select(
    'category,
    explode('id_array).as("id")
  ).show()
+--------+------+
|category|    id|
+--------+------+
|    food|p00003|
|    food|p00005|
|    food|p00002|
|    food|p00004|
|    food|p00001|
|    tool|p00014|
|    tool|p00012|
|    tool|p00015|
|    tool|p00011|
|    tool|p00013|
|    book|p00010|
|    book|p00009|
|    book|p00007|
|    book|p00008|
|    book|p00006|
+--------+------+

jaceklaskowski.gitbooks.io