Heyo 乐贴网,
当前正在寻找一种进行特定转换的优雅方法。
所以我有一个动作数据框,看起来像这样:
+---------+----------+----------+---------+
|timestamp| user_id| action| value|
+---------+----------+----------+---------+
| 100| 1| click| null|
| 101| 2| click| null|
| 103| 1| drag| AAA|
| 101| 1| click| null|
| 108| 1| click| null|
| 100| 2| click| null|
| 106| 1| drag| BBB|
+---------+----------+----------+---------+
上下文:用户可以执行操作:单击和拖动。点击没有价值,拖动没有价值。拖曳表示有点击,但没有相反。我们还假设拖动事件可以记录在click事件的[[after或before中。因此,对于每个拖动,我都有一个corresponding单击操作。我想做的是将拖放动作合并为1,即。将value
赋予点击动作后,将其删除。
timestamp
的点击。我还想确保,如果时间戳差异超过5,则拖动不能链接到点击(这意味着有些拖动可能没有链接,这很好)。当然,我不希望用户1的拖动与用户2的点击相对应。这里,结果看起来像这样:
+---------+----------+----------+---------+ |timestamp| user_id| action| value| +---------+----------+----------+---------+ | 100| 1| click| null| | 101| 2| click| null| | 101| 1| click| AAA| | 108| 1| click| BBB| | 100| 2| click| null| +---------+----------+----------+---------+
AAA
(timestamp
= 103)的拖动链接到101处的点击,因为它最接近103。BBB
的逻辑相同。所以我想以一种流畅/有效的方式执行这些操作。到目前为止,我有这样的事情:
val window = Window partitionBy ($"user_id") orderBy $"timestamp".asc myDF .withColumn("previous_value", lag("value", 1, null) over window) .withColumn("previous_timestamp", lag("timestamp", 1, null) over window) .withColumn("next_value", lead("value", 1, null) over window) .withColumn("next_timestamp", lead("timestamp", 1, null) over window) .withColumn("value", when( $"previous_value".isNotNull and // If there is more than 5 sec. difference, it shouldn't be joined $"timestamp" - $"previous_timestamp" < 5 and ( $"next_timestamp".isNull or $"next_timestamp" - $"timestamp" > $"timestamp" - $"previous_timestamp" ), $"previous_value") .otherwise( when($"next_timestamp" - $"timestamp" < 5, $"next_value") .otherwise(null) ) ) .filter($"action" === "click") .drop("previous_value") .drop("previous_timestamp") .drop("next_value") .drop("next_timestamp")
但是我觉得这效率很低。有一个更好的方法吗 ? (无需创建4个临时列即可完成此操作...)例如,有没有办法在同一表达式中同时处理偏移量为-1和+1的行?提前感谢!
投票
myDF.registerTempTable("mydf") spark.sql(""" with clicks_table as (select * from mydf where action='click') ,drags_table as (select * from mydf where action='drag' ) ,one_click_many_drags as ( select c.timestamp as c_timestamp , d.timestamp as d_timestamp , c.user_id as c_user_id , d.user_id as d_user_id , c.action as c_action , d.action as d_action , c.value as c_value , d.value as d_value from clicks_table c inner join drags_table d on c.user_id = d.user_id and abs(c.timestamp - d.timestamp) <= 5 --a drag cannot be linked to a click if there timestamp difference is over 5 ) ,one_click_one_drag as ( select c_timestamp as timestamp, c_user_id as user_id, c_action as action, d_value as value from ( select *, row_number() over ( partition by d_user_id, d_timestamp --for each drag timestamp with multiple possible click timestamps, we rank the click timestamps by nearness order by abs(c_timestamp - d_timestamp) asc --prefer nearest , c_timestamp asc --prefer next_value if tied ) as rn from one_click_many_drags ) where rn=1 --take only the best match for each drag timestamp ) --now we start from the clicks_table and add in the desired drag values! select c.timestamp, c.user_id, c.action, m.value from clicks_table c left join one_click_one_drag m on c.user_id = m.user_id and c.timestamp = m.timestamp """)
经过测试以产生所需的输出。