Spark 集約関数 collect_list, collect_set
配列に集約する操作であるので、aggの中で関数を適用することになる。
scala> val df = Seq( ("p00001", "food", 200), ("p00002", "food", 500), ("p00003", "food", 800), ("p00004", "food", 500), ("p00005", "food", 700), ("p00006", "book", 900), ("p00007", "book", 1500), ("p00008", "book", 1200), ("p00009", "book", 2200), ("p00010", "book", 3000), ("p00011", "tool", 20000), ("p00012", "tool", 50000), ("p00013", "tool", 15000), ("p00014", "tool", 60000), ("p00015", "tool", 35000) ).toDF("id", "category", "sales") scala > val idArrayDF = DF. group_by('category). agg(collect_list('id).as("id_array"), sum('sales).as("sales_sum")) scala > idArrayDF.show() +--------+--------------------+---------+ |category| id_array|sales_sum| +--------+--------------------+---------+ | food|[p00001, p00002, ...| 2700| | tool|[p00011, p00012, ...| 180000| | book|[p00006, p00007, ...| 8800| +--------+--------------------+---------+
重複を削除する場合には、collect_setを使うことになる。
さらに、配列になったものを分割して抽出する場合には、explodeを適用すればよい。
scala > idArrayDF.
select(
'category,
explode('id_array).as("id")
).show()
+--------+------+
|category| id|
+--------+------+
| food|p00003|
| food|p00005|
| food|p00002|
| food|p00004|
| food|p00001|
| tool|p00014|
| tool|p00012|
| tool|p00015|
| tool|p00011|
| tool|p00013|
| book|p00010|
| book|p00009|
| book|p00007|
| book|p00008|
| book|p00006|
+--------+------+