今週も特にありません

進捗どうですか?

Spark ランク関数 rank, dense_rank, percent_rank

Sparkのランク関数メモ。商品IDとその商品カテゴリ、売り上げのようなデータがあったとする。

scala> val df = Seq(
  ("p00001", "food", 200),
  ("p00002", "food", 500),
  ("p00003", "food", 800),
  ("p00004", "food", 500),
  ("p00005", "food", 700),
  ("p00006", "book", 900),
  ("p00007", "book", 1500),
  ("p00008", "book", 1200),
  ("p00009", "book", 2200),
  ("p00010", "book", 3000),
  ("p00011", "tool", 20000),
  ("p00012", "tool", 50000),
  ("p00013", "tool", 15000),
  ("p00014", "tool", 60000),
  ("p00015", "tool", 35000)
).toDF("id", "category", "sales")

商品カテゴリごとに売上の降順でランク付けする。

scala> import org.apache.spark.sql.expressions.Window

scala> df.
  withColumn("rnk", rank().over(Window.partitionBy('category).orderBy('sales.desc))).
  withColumn("dense_rnk", dense_rank().over(Window.partitionBy('category).orderBy('sales.desc))).
  withColumn("percent_rnk", percent_rank().over(Window.partitionBy('category).orderBy('sales.desc))).
  show()
+------+--------+-----+---+---------+-----------+
|    id|category|sales|rnk|dense_rnk|percent_rnk|
+------+--------+-----+---+---------+-----------+
|p00003|    food|  800|  1|        1|        0.0|
|p00005|    food|  700|  2|        2|       0.25|
|p00002|    food|  500|  3|        3|        0.5|
|p00004|    food|  500|  3|        3|        0.5|
|p00001|    food|  200|  5|        4|        1.0|
|p00014|    tool|60000|  1|        1|        0.0|
|p00012|    tool|50000|  2|        2|       0.25|
|p00015|    tool|35000|  3|        3|        0.5|
|p00011|    tool|20000|  4|        4|       0.75|
|p00013|    tool|15000|  5|        5|        1.0|
|p00010|    book| 3000|  1|        1|        0.0|
|p00009|    book| 2200|  2|        2|       0.25|
|p00007|    book| 1500|  3|        3|        0.5|
|p00008|    book| 1200|  4|        4|       0.75|
|p00006|    book|  900|  5|        5|        1.0|
+------+--------+-----+---+---------+-----------+

ここから、.filter('percent_rnk <= 0.3)で絞り込むことで、売り上げ上位30%のレコードが抽出できる。

spark.apache.org