基于多个条件,在Spark上优雅地合并行

Heyo 乐贴网,

当前正在寻找一种进行特定转换的优雅方法。

所以我有一个动作数据框,看起来像这样:

+---------+----------+----------+---------+
|timestamp|   user_id|    action|    value|
+---------+----------+----------+---------+
|      100|         1|     click|     null|
|      101|         2|     click|     null|
|      103|         1|      drag|      AAA|
|      101|         1|     click|     null|
|      108|         1|     click|     null|
|      100|         2|     click|     null|
|      106|         1|      drag|      BBB|
+---------+----------+----------+---------+

上下文:用户可以执行操作:单击和拖动。点击没有价值,拖动没有价值。拖曳表示有点击,但没有相反。我们还假设拖动事件可以记录在click事件的[[after或before中。因此,对于每个拖动,我都有一个corresponding单击操作。我想做的是将拖放动作合并为1,即。将value赋予点击动作后,将其删除。

要知道哪个点击对应于哪个拖动,我必须选择其时间戳最接近拖动的timestamp的点击。我还想确保,如果时间戳差异超过5,则拖动不能链接到点击(这意味着有些拖动可能没有链接,这很好)。当然,我不希望用户1的拖动与用户2的点击相对应。

这里,结果看起来像这样:

+---------+----------+----------+---------+ |timestamp| user_id| action| value| +---------+----------+----------+---------+ | 100| 1| click| null| | 101| 2| click| null| | 101| 1| click| AAA| | 108| 1| click| BBB| | 100| 2| click| null| +---------+----------+----------+---------+

AAAtimestamp = 103)的拖动链接到101处的点击,因为它最接近103。BBB的逻辑相同。

所以我想以一种流畅/有效的方式执行这些操作。到目前为止,我有这样的事情:

val window = Window partitionBy ($"user_id") orderBy $"timestamp".asc myDF .withColumn("previous_value", lag("value", 1, null) over window) .withColumn("previous_timestamp", lag("timestamp", 1, null) over window) .withColumn("next_value", lead("value", 1, null) over window) .withColumn("next_timestamp", lead("timestamp", 1, null) over window) .withColumn("value", when( $"previous_value".isNotNull and // If there is more than 5 sec. difference, it shouldn't be joined $"timestamp" - $"previous_timestamp" < 5 and ( $"next_timestamp".isNull or $"next_timestamp" - $"timestamp" > $"timestamp" - $"previous_timestamp" ), $"previous_value") .otherwise( when($"next_timestamp" - $"timestamp" < 5, $"next_value") .otherwise(null) ) ) .filter($"action" === "click") .drop("previous_value") .drop("previous_timestamp") .drop("next_value") .drop("next_timestamp")

但是我觉得这效率很低。有一个更好的方法吗 ? (无需创建4个临时列即可完成此操作...)例如,有没有办法在同一表达式中同时处理偏移量为-1和+1的行?

提前感谢!

0
投票
这是我尝试使用Spark-SQL而不是DataFrame API,但应该可以进行转换:

myDF.registerTempTable("mydf") spark.sql(""" with clicks_table as (select * from mydf where action='click') ,drags_table as (select * from mydf where action='drag' ) ,one_click_many_drags as ( select c.timestamp as c_timestamp , d.timestamp as d_timestamp , c.user_id as c_user_id , d.user_id as d_user_id , c.action as c_action , d.action as d_action , c.value as c_value , d.value as d_value from clicks_table c inner join drags_table d on c.user_id = d.user_id and abs(c.timestamp - d.timestamp) <= 5 --a drag cannot be linked to a click if there timestamp difference is over 5 ) ,one_click_one_drag as ( select c_timestamp as timestamp, c_user_id as user_id, c_action as action, d_value as value from ( select *, row_number() over ( partition by d_user_id, d_timestamp --for each drag timestamp with multiple possible click timestamps, we rank the click timestamps by nearness order by abs(c_timestamp - d_timestamp) asc --prefer nearest , c_timestamp asc --prefer next_value if tied ) as rn from one_click_many_drags ) where rn=1 --take only the best match for each drag timestamp ) --now we start from the clicks_table and add in the desired drag values! select c.timestamp, c.user_id, c.action, m.value from clicks_table c left join one_click_one_drag m on c.user_id = m.user_id and c.timestamp = m.timestamp """)

经过测试以产生所需的输出。