How to filter a spark dataframe based on occurrence of a value in a column with a condition a date column?
team, I'm working with a dataframe looks like:
df
client | date
C1 |08-NOV-18 11.29.43
C2 |09-NOV-18 13.29.43
C2 |09-NOV-18 18.29.43
C3 |11-NOV-18 19.29.43
C1 |12-NOV-18 10.29.43
C2 |13-NOV-18 09.29.43
C4 |14-NOV-18 20.29.43
C1 |15-NOV-18 11.29.43
C5 |16-NOV-18 15.29.43
C10 |17-NOV-18 19.29.43
C1 |18-NOV-18 12.29.43
C2 |18-NOV-18 10.29.43
C2 |19-NOV-18 09.29.43
C6 |20-NOV-18 13.29.43
C6 |21-NOV-18 14.29.43
C1 |21-NOV-18 18.29.43
C1 |22-NOV-18 11.29.43
My goal is to filter this dataframe and get new dataframe that contains tow last occurrence of each client if this occurrence is < of 24 hours for example for this example the result must be:
client |date
C2 |18-NOV-18 10.29.43
C2 |19-NOV-18 09.29.43
C1 |21-NOV-18 18.29.43
C1 |22-NOV-18 11.29.43
any help, please !
scala apache-spark apache-spark-sql
add a comment |
team, I'm working with a dataframe looks like:
df
client | date
C1 |08-NOV-18 11.29.43
C2 |09-NOV-18 13.29.43
C2 |09-NOV-18 18.29.43
C3 |11-NOV-18 19.29.43
C1 |12-NOV-18 10.29.43
C2 |13-NOV-18 09.29.43
C4 |14-NOV-18 20.29.43
C1 |15-NOV-18 11.29.43
C5 |16-NOV-18 15.29.43
C10 |17-NOV-18 19.29.43
C1 |18-NOV-18 12.29.43
C2 |18-NOV-18 10.29.43
C2 |19-NOV-18 09.29.43
C6 |20-NOV-18 13.29.43
C6 |21-NOV-18 14.29.43
C1 |21-NOV-18 18.29.43
C1 |22-NOV-18 11.29.43
My goal is to filter this dataframe and get new dataframe that contains tow last occurrence of each client if this occurrence is < of 24 hours for example for this example the result must be:
client |date
C2 |18-NOV-18 10.29.43
C2 |19-NOV-18 09.29.43
C1 |21-NOV-18 18.29.43
C1 |22-NOV-18 11.29.43
any help, please !
scala apache-spark apache-spark-sql
your question is not clear. what exactly you want as output
– Balaji Reddy
Nov 22 '18 at 12:06
What is the type ofdate
column?
– anuj saxena
Nov 22 '18 at 12:56
hello @BalajiReddy I edited the question I want to get a dataframe that contains for each client the last tow observation that the deference of the date is little than 24 hours.
– Chaouki
Nov 22 '18 at 13:13
@anujsaxena the date column is a timestamp.
– Chaouki
Nov 22 '18 at 13:13
add a comment |
team, I'm working with a dataframe looks like:
df
client | date
C1 |08-NOV-18 11.29.43
C2 |09-NOV-18 13.29.43
C2 |09-NOV-18 18.29.43
C3 |11-NOV-18 19.29.43
C1 |12-NOV-18 10.29.43
C2 |13-NOV-18 09.29.43
C4 |14-NOV-18 20.29.43
C1 |15-NOV-18 11.29.43
C5 |16-NOV-18 15.29.43
C10 |17-NOV-18 19.29.43
C1 |18-NOV-18 12.29.43
C2 |18-NOV-18 10.29.43
C2 |19-NOV-18 09.29.43
C6 |20-NOV-18 13.29.43
C6 |21-NOV-18 14.29.43
C1 |21-NOV-18 18.29.43
C1 |22-NOV-18 11.29.43
My goal is to filter this dataframe and get new dataframe that contains tow last occurrence of each client if this occurrence is < of 24 hours for example for this example the result must be:
client |date
C2 |18-NOV-18 10.29.43
C2 |19-NOV-18 09.29.43
C1 |21-NOV-18 18.29.43
C1 |22-NOV-18 11.29.43
any help, please !
scala apache-spark apache-spark-sql
team, I'm working with a dataframe looks like:
df
client | date
C1 |08-NOV-18 11.29.43
C2 |09-NOV-18 13.29.43
C2 |09-NOV-18 18.29.43
C3 |11-NOV-18 19.29.43
C1 |12-NOV-18 10.29.43
C2 |13-NOV-18 09.29.43
C4 |14-NOV-18 20.29.43
C1 |15-NOV-18 11.29.43
C5 |16-NOV-18 15.29.43
C10 |17-NOV-18 19.29.43
C1 |18-NOV-18 12.29.43
C2 |18-NOV-18 10.29.43
C2 |19-NOV-18 09.29.43
C6 |20-NOV-18 13.29.43
C6 |21-NOV-18 14.29.43
C1 |21-NOV-18 18.29.43
C1 |22-NOV-18 11.29.43
My goal is to filter this dataframe and get new dataframe that contains tow last occurrence of each client if this occurrence is < of 24 hours for example for this example the result must be:
client |date
C2 |18-NOV-18 10.29.43
C2 |19-NOV-18 09.29.43
C1 |21-NOV-18 18.29.43
C1 |22-NOV-18 11.29.43
any help, please !
scala apache-spark apache-spark-sql
scala apache-spark apache-spark-sql
edited Nov 22 '18 at 12:58
Chaouki
asked Nov 22 '18 at 11:13
ChaoukiChaouki
7611
7611
your question is not clear. what exactly you want as output
– Balaji Reddy
Nov 22 '18 at 12:06
What is the type ofdate
column?
– anuj saxena
Nov 22 '18 at 12:56
hello @BalajiReddy I edited the question I want to get a dataframe that contains for each client the last tow observation that the deference of the date is little than 24 hours.
– Chaouki
Nov 22 '18 at 13:13
@anujsaxena the date column is a timestamp.
– Chaouki
Nov 22 '18 at 13:13
add a comment |
your question is not clear. what exactly you want as output
– Balaji Reddy
Nov 22 '18 at 12:06
What is the type ofdate
column?
– anuj saxena
Nov 22 '18 at 12:56
hello @BalajiReddy I edited the question I want to get a dataframe that contains for each client the last tow observation that the deference of the date is little than 24 hours.
– Chaouki
Nov 22 '18 at 13:13
@anujsaxena the date column is a timestamp.
– Chaouki
Nov 22 '18 at 13:13
your question is not clear. what exactly you want as output
– Balaji Reddy
Nov 22 '18 at 12:06
your question is not clear. what exactly you want as output
– Balaji Reddy
Nov 22 '18 at 12:06
What is the type of
date
column?– anuj saxena
Nov 22 '18 at 12:56
What is the type of
date
column?– anuj saxena
Nov 22 '18 at 12:56
hello @BalajiReddy I edited the question I want to get a dataframe that contains for each client the last tow observation that the deference of the date is little than 24 hours.
– Chaouki
Nov 22 '18 at 13:13
hello @BalajiReddy I edited the question I want to get a dataframe that contains for each client the last tow observation that the deference of the date is little than 24 hours.
– Chaouki
Nov 22 '18 at 13:13
@anujsaxena the date column is a timestamp.
– Chaouki
Nov 22 '18 at 13:13
@anujsaxena the date column is a timestamp.
– Chaouki
Nov 22 '18 at 13:13
add a comment |
3 Answers
3
active
oldest
votes
Using window functions. Check this out:
val df = Seq(("C1","08-NOV-18 11.29.43"),
("C2","09-NOV-18 13.29.43"),
("C2","09-NOV-18 18.29.43"),
("C3","11-NOV-18 19.29.43"),
("C1","12-NOV-18 10.29.43"),
("C2","13-NOV-18 09.29.43"),
("C4","14-NOV-18 20.29.43"),
("C1","15-NOV-18 11.29.43"),
("C5","16-NOV-18 15.29.43"),
("C10","17-NOV-18 19.29.43"),
("C1","18-NOV-18 12.29.43"),
("C2","18-NOV-18 10.29.43"),
("C2","19-NOV-18 09.29.43"),
("C6","20-NOV-18 13.29.43"),
("C6","21-NOV-18 14.29.43"),
("C1","21-NOV-18 18.29.43"),
("C1","22-NOV-18 11.29.43")).toDF("client","dt").withColumn("dt",from_unixtime(unix_timestamp('dt,"dd-MMM-yy HH.mm.ss"),"yyyy-MM-dd HH:mm:ss"))
df.createOrReplaceTempView("tbl")
val df2 = spark.sql(""" select * from ( select client, dt, count(*) over(partition by client ) cnt, rank() over(partition by client order by dt desc) rk1 from tbl ) t where cnt>1 and rk1 in (1,2) """)
df2.alias("t1").join(df2.alias("t2"), $"t1.client" === $"t2.client" and $"t1.rk1" =!= $"t2.rk1" , "inner" ).withColumn("dt24",(unix_timestamp($"t1.dt") - unix_timestamp($"t2.dt") )/ 3600 ).where("dt24 > -24 and dt24 < 24").select($"t1.client", $"t1.dt").show(false)
Results:
+------+-------------------+
|client|dt |
+------+-------------------+
|C1 |2018-11-22 11:29:43|
|C1 |2018-11-21 18:29:43|
|C2 |2018-11-19 09:29:43|
|C2 |2018-11-18 10:29:43|
+------+-------------------+
add a comment |
I have one solution for this scenario:
val milliSecForADay = 24 * 60 * 60 * 1000
val filterDatesUDF = udf { arr: scala.collection.mutable.WrappedArray[Timestamp] =>
arr.sortWith(_ after _).toList match {
case last :: secondLast :: _ if (last.getTime - secondLast.getTime) < milliSecForADay => Array(secondLast, last)
case _ => Array.empty[Timestamp]
}
}
val finalDF = df.groupBy("client")
.agg(collect_list("date").as("dates"))
.select(col("client"), explode(filterDatesUDF(col("dates"))).as("date"))
.show()
In this solution, first, I am grouping the data based on the client using a user-defined function
or udf
for handling timestamps grouped for each client.
This is done while presuming that the date
column already in Timestamp
format which I think may not be true. In case you are getting the date
column as String
type, add the following code before the above solution to convert the type of column date
from String
to Timestamp
.
val stringToTimestampUDF = udf { str: String =>
val format = new java.text.SimpleDateFormat("dd-MMM-yy hh.mm.ss") //format for "08-NOV-18 11.29.43"
new Timestamp(format.parse(str).getTime)
}
val df = originDF.select(col("client"), to_utc_timestamp(stringToTimestampUDF(col("date")), "utc").as("date"))
add a comment |
With Window function next/prev dates can be found, and then filtered rows where diff between dates bigger than 24 hours.
Data preparation
val df = Seq(("C1", "08-NOV-18 11.29.43"),
("C2", "09-NOV-18 13.29.43"),
("C2", "09-NOV-18 18.29.43"),
("C3", "11-NOV-18 19.29.43"),
("C1", "12-NOV-18 10.29.43"),
("C2", "13-NOV-18 09.29.43"),
("C4", "14-NOV-18 20.29.43"),
("C1", "15-NOV-18 11.29.43"),
("C5", "16-NOV-18 15.29.43"),
("C10", "17-NOV-18 19.29.43"),
("C1", "18-NOV-18 12.29.43"),
("C2", "18-NOV-18 10.29.43"),
("C2", "19-NOV-18 09.29.43"),
("C6", "20-NOV-18 13.29.43"),
("C6", "21-NOV-18 14.29.43"),
("C1", "21-NOV-18 18.29.43"),
("C1", "22-NOV-18 11.29.43"))
.toDF("client", "dt")
.withColumn("dt", to_timestamp($"dt", "dd-MMM-yy HH.mm.ss"))
Acting code
// get next/prev dates
val dateWindow = Window.partitionBy("client").orderBy("dt")
val withNextPrevDates = df
.withColumn("previousDate", lag($"dt", 1).over(dateWindow))
.withColumn("nextDate", lead($"dt", 1).over(dateWindow))
// function for filter
val secondsInDay = TimeUnit.DAYS.toSeconds(1)
val dateDiffLessThanDay = (startTimeStamp: Column, endTimeStamp: Column) =>
endTimeStamp.cast(LongType) - startTimeStamp.cast(LongType) < secondsInDay && datediff(endTimeStamp, startTimeStamp) === 1
// filter
val result = withNextPrevDates
.where(dateDiffLessThanDay($"previousDate", $"dt") || dateDiffLessThanDay($"dt", $"nextDate"))
.drop("previousDate", "nextDate")
Result
+------+-------------------+
|client|dt |
+------+-------------------+
|C1 |2018-11-21 18:29:43|
|C1 |2018-11-22 11:29:43|
|C2 |2018-11-18 10:29:43|
|C2 |2018-11-19 09:29:43|
+------+-------------------+
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53429716%2fhow-to-filter-a-spark-dataframe-based-on-occurrence-of-a-value-in-a-column-with%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
3 Answers
3
active
oldest
votes
3 Answers
3
active
oldest
votes
active
oldest
votes
active
oldest
votes
Using window functions. Check this out:
val df = Seq(("C1","08-NOV-18 11.29.43"),
("C2","09-NOV-18 13.29.43"),
("C2","09-NOV-18 18.29.43"),
("C3","11-NOV-18 19.29.43"),
("C1","12-NOV-18 10.29.43"),
("C2","13-NOV-18 09.29.43"),
("C4","14-NOV-18 20.29.43"),
("C1","15-NOV-18 11.29.43"),
("C5","16-NOV-18 15.29.43"),
("C10","17-NOV-18 19.29.43"),
("C1","18-NOV-18 12.29.43"),
("C2","18-NOV-18 10.29.43"),
("C2","19-NOV-18 09.29.43"),
("C6","20-NOV-18 13.29.43"),
("C6","21-NOV-18 14.29.43"),
("C1","21-NOV-18 18.29.43"),
("C1","22-NOV-18 11.29.43")).toDF("client","dt").withColumn("dt",from_unixtime(unix_timestamp('dt,"dd-MMM-yy HH.mm.ss"),"yyyy-MM-dd HH:mm:ss"))
df.createOrReplaceTempView("tbl")
val df2 = spark.sql(""" select * from ( select client, dt, count(*) over(partition by client ) cnt, rank() over(partition by client order by dt desc) rk1 from tbl ) t where cnt>1 and rk1 in (1,2) """)
df2.alias("t1").join(df2.alias("t2"), $"t1.client" === $"t2.client" and $"t1.rk1" =!= $"t2.rk1" , "inner" ).withColumn("dt24",(unix_timestamp($"t1.dt") - unix_timestamp($"t2.dt") )/ 3600 ).where("dt24 > -24 and dt24 < 24").select($"t1.client", $"t1.dt").show(false)
Results:
+------+-------------------+
|client|dt |
+------+-------------------+
|C1 |2018-11-22 11:29:43|
|C1 |2018-11-21 18:29:43|
|C2 |2018-11-19 09:29:43|
|C2 |2018-11-18 10:29:43|
+------+-------------------+
add a comment |
Using window functions. Check this out:
val df = Seq(("C1","08-NOV-18 11.29.43"),
("C2","09-NOV-18 13.29.43"),
("C2","09-NOV-18 18.29.43"),
("C3","11-NOV-18 19.29.43"),
("C1","12-NOV-18 10.29.43"),
("C2","13-NOV-18 09.29.43"),
("C4","14-NOV-18 20.29.43"),
("C1","15-NOV-18 11.29.43"),
("C5","16-NOV-18 15.29.43"),
("C10","17-NOV-18 19.29.43"),
("C1","18-NOV-18 12.29.43"),
("C2","18-NOV-18 10.29.43"),
("C2","19-NOV-18 09.29.43"),
("C6","20-NOV-18 13.29.43"),
("C6","21-NOV-18 14.29.43"),
("C1","21-NOV-18 18.29.43"),
("C1","22-NOV-18 11.29.43")).toDF("client","dt").withColumn("dt",from_unixtime(unix_timestamp('dt,"dd-MMM-yy HH.mm.ss"),"yyyy-MM-dd HH:mm:ss"))
df.createOrReplaceTempView("tbl")
val df2 = spark.sql(""" select * from ( select client, dt, count(*) over(partition by client ) cnt, rank() over(partition by client order by dt desc) rk1 from tbl ) t where cnt>1 and rk1 in (1,2) """)
df2.alias("t1").join(df2.alias("t2"), $"t1.client" === $"t2.client" and $"t1.rk1" =!= $"t2.rk1" , "inner" ).withColumn("dt24",(unix_timestamp($"t1.dt") - unix_timestamp($"t2.dt") )/ 3600 ).where("dt24 > -24 and dt24 < 24").select($"t1.client", $"t1.dt").show(false)
Results:
+------+-------------------+
|client|dt |
+------+-------------------+
|C1 |2018-11-22 11:29:43|
|C1 |2018-11-21 18:29:43|
|C2 |2018-11-19 09:29:43|
|C2 |2018-11-18 10:29:43|
+------+-------------------+
add a comment |
Using window functions. Check this out:
val df = Seq(("C1","08-NOV-18 11.29.43"),
("C2","09-NOV-18 13.29.43"),
("C2","09-NOV-18 18.29.43"),
("C3","11-NOV-18 19.29.43"),
("C1","12-NOV-18 10.29.43"),
("C2","13-NOV-18 09.29.43"),
("C4","14-NOV-18 20.29.43"),
("C1","15-NOV-18 11.29.43"),
("C5","16-NOV-18 15.29.43"),
("C10","17-NOV-18 19.29.43"),
("C1","18-NOV-18 12.29.43"),
("C2","18-NOV-18 10.29.43"),
("C2","19-NOV-18 09.29.43"),
("C6","20-NOV-18 13.29.43"),
("C6","21-NOV-18 14.29.43"),
("C1","21-NOV-18 18.29.43"),
("C1","22-NOV-18 11.29.43")).toDF("client","dt").withColumn("dt",from_unixtime(unix_timestamp('dt,"dd-MMM-yy HH.mm.ss"),"yyyy-MM-dd HH:mm:ss"))
df.createOrReplaceTempView("tbl")
val df2 = spark.sql(""" select * from ( select client, dt, count(*) over(partition by client ) cnt, rank() over(partition by client order by dt desc) rk1 from tbl ) t where cnt>1 and rk1 in (1,2) """)
df2.alias("t1").join(df2.alias("t2"), $"t1.client" === $"t2.client" and $"t1.rk1" =!= $"t2.rk1" , "inner" ).withColumn("dt24",(unix_timestamp($"t1.dt") - unix_timestamp($"t2.dt") )/ 3600 ).where("dt24 > -24 and dt24 < 24").select($"t1.client", $"t1.dt").show(false)
Results:
+------+-------------------+
|client|dt |
+------+-------------------+
|C1 |2018-11-22 11:29:43|
|C1 |2018-11-21 18:29:43|
|C2 |2018-11-19 09:29:43|
|C2 |2018-11-18 10:29:43|
+------+-------------------+
Using window functions. Check this out:
val df = Seq(("C1","08-NOV-18 11.29.43"),
("C2","09-NOV-18 13.29.43"),
("C2","09-NOV-18 18.29.43"),
("C3","11-NOV-18 19.29.43"),
("C1","12-NOV-18 10.29.43"),
("C2","13-NOV-18 09.29.43"),
("C4","14-NOV-18 20.29.43"),
("C1","15-NOV-18 11.29.43"),
("C5","16-NOV-18 15.29.43"),
("C10","17-NOV-18 19.29.43"),
("C1","18-NOV-18 12.29.43"),
("C2","18-NOV-18 10.29.43"),
("C2","19-NOV-18 09.29.43"),
("C6","20-NOV-18 13.29.43"),
("C6","21-NOV-18 14.29.43"),
("C1","21-NOV-18 18.29.43"),
("C1","22-NOV-18 11.29.43")).toDF("client","dt").withColumn("dt",from_unixtime(unix_timestamp('dt,"dd-MMM-yy HH.mm.ss"),"yyyy-MM-dd HH:mm:ss"))
df.createOrReplaceTempView("tbl")
val df2 = spark.sql(""" select * from ( select client, dt, count(*) over(partition by client ) cnt, rank() over(partition by client order by dt desc) rk1 from tbl ) t where cnt>1 and rk1 in (1,2) """)
df2.alias("t1").join(df2.alias("t2"), $"t1.client" === $"t2.client" and $"t1.rk1" =!= $"t2.rk1" , "inner" ).withColumn("dt24",(unix_timestamp($"t1.dt") - unix_timestamp($"t2.dt") )/ 3600 ).where("dt24 > -24 and dt24 < 24").select($"t1.client", $"t1.dt").show(false)
Results:
+------+-------------------+
|client|dt |
+------+-------------------+
|C1 |2018-11-22 11:29:43|
|C1 |2018-11-21 18:29:43|
|C2 |2018-11-19 09:29:43|
|C2 |2018-11-18 10:29:43|
+------+-------------------+
answered Nov 22 '18 at 14:16
stack0114106stack0114106
3,9032420
3,9032420
add a comment |
add a comment |
I have one solution for this scenario:
val milliSecForADay = 24 * 60 * 60 * 1000
val filterDatesUDF = udf { arr: scala.collection.mutable.WrappedArray[Timestamp] =>
arr.sortWith(_ after _).toList match {
case last :: secondLast :: _ if (last.getTime - secondLast.getTime) < milliSecForADay => Array(secondLast, last)
case _ => Array.empty[Timestamp]
}
}
val finalDF = df.groupBy("client")
.agg(collect_list("date").as("dates"))
.select(col("client"), explode(filterDatesUDF(col("dates"))).as("date"))
.show()
In this solution, first, I am grouping the data based on the client using a user-defined function
or udf
for handling timestamps grouped for each client.
This is done while presuming that the date
column already in Timestamp
format which I think may not be true. In case you are getting the date
column as String
type, add the following code before the above solution to convert the type of column date
from String
to Timestamp
.
val stringToTimestampUDF = udf { str: String =>
val format = new java.text.SimpleDateFormat("dd-MMM-yy hh.mm.ss") //format for "08-NOV-18 11.29.43"
new Timestamp(format.parse(str).getTime)
}
val df = originDF.select(col("client"), to_utc_timestamp(stringToTimestampUDF(col("date")), "utc").as("date"))
add a comment |
I have one solution for this scenario:
val milliSecForADay = 24 * 60 * 60 * 1000
val filterDatesUDF = udf { arr: scala.collection.mutable.WrappedArray[Timestamp] =>
arr.sortWith(_ after _).toList match {
case last :: secondLast :: _ if (last.getTime - secondLast.getTime) < milliSecForADay => Array(secondLast, last)
case _ => Array.empty[Timestamp]
}
}
val finalDF = df.groupBy("client")
.agg(collect_list("date").as("dates"))
.select(col("client"), explode(filterDatesUDF(col("dates"))).as("date"))
.show()
In this solution, first, I am grouping the data based on the client using a user-defined function
or udf
for handling timestamps grouped for each client.
This is done while presuming that the date
column already in Timestamp
format which I think may not be true. In case you are getting the date
column as String
type, add the following code before the above solution to convert the type of column date
from String
to Timestamp
.
val stringToTimestampUDF = udf { str: String =>
val format = new java.text.SimpleDateFormat("dd-MMM-yy hh.mm.ss") //format for "08-NOV-18 11.29.43"
new Timestamp(format.parse(str).getTime)
}
val df = originDF.select(col("client"), to_utc_timestamp(stringToTimestampUDF(col("date")), "utc").as("date"))
add a comment |
I have one solution for this scenario:
val milliSecForADay = 24 * 60 * 60 * 1000
val filterDatesUDF = udf { arr: scala.collection.mutable.WrappedArray[Timestamp] =>
arr.sortWith(_ after _).toList match {
case last :: secondLast :: _ if (last.getTime - secondLast.getTime) < milliSecForADay => Array(secondLast, last)
case _ => Array.empty[Timestamp]
}
}
val finalDF = df.groupBy("client")
.agg(collect_list("date").as("dates"))
.select(col("client"), explode(filterDatesUDF(col("dates"))).as("date"))
.show()
In this solution, first, I am grouping the data based on the client using a user-defined function
or udf
for handling timestamps grouped for each client.
This is done while presuming that the date
column already in Timestamp
format which I think may not be true. In case you are getting the date
column as String
type, add the following code before the above solution to convert the type of column date
from String
to Timestamp
.
val stringToTimestampUDF = udf { str: String =>
val format = new java.text.SimpleDateFormat("dd-MMM-yy hh.mm.ss") //format for "08-NOV-18 11.29.43"
new Timestamp(format.parse(str).getTime)
}
val df = originDF.select(col("client"), to_utc_timestamp(stringToTimestampUDF(col("date")), "utc").as("date"))
I have one solution for this scenario:
val milliSecForADay = 24 * 60 * 60 * 1000
val filterDatesUDF = udf { arr: scala.collection.mutable.WrappedArray[Timestamp] =>
arr.sortWith(_ after _).toList match {
case last :: secondLast :: _ if (last.getTime - secondLast.getTime) < milliSecForADay => Array(secondLast, last)
case _ => Array.empty[Timestamp]
}
}
val finalDF = df.groupBy("client")
.agg(collect_list("date").as("dates"))
.select(col("client"), explode(filterDatesUDF(col("dates"))).as("date"))
.show()
In this solution, first, I am grouping the data based on the client using a user-defined function
or udf
for handling timestamps grouped for each client.
This is done while presuming that the date
column already in Timestamp
format which I think may not be true. In case you are getting the date
column as String
type, add the following code before the above solution to convert the type of column date
from String
to Timestamp
.
val stringToTimestampUDF = udf { str: String =>
val format = new java.text.SimpleDateFormat("dd-MMM-yy hh.mm.ss") //format for "08-NOV-18 11.29.43"
new Timestamp(format.parse(str).getTime)
}
val df = originDF.select(col("client"), to_utc_timestamp(stringToTimestampUDF(col("date")), "utc").as("date"))
edited Nov 22 '18 at 14:03
answered Nov 22 '18 at 13:55


anuj saxenaanuj saxena
24917
24917
add a comment |
add a comment |
With Window function next/prev dates can be found, and then filtered rows where diff between dates bigger than 24 hours.
Data preparation
val df = Seq(("C1", "08-NOV-18 11.29.43"),
("C2", "09-NOV-18 13.29.43"),
("C2", "09-NOV-18 18.29.43"),
("C3", "11-NOV-18 19.29.43"),
("C1", "12-NOV-18 10.29.43"),
("C2", "13-NOV-18 09.29.43"),
("C4", "14-NOV-18 20.29.43"),
("C1", "15-NOV-18 11.29.43"),
("C5", "16-NOV-18 15.29.43"),
("C10", "17-NOV-18 19.29.43"),
("C1", "18-NOV-18 12.29.43"),
("C2", "18-NOV-18 10.29.43"),
("C2", "19-NOV-18 09.29.43"),
("C6", "20-NOV-18 13.29.43"),
("C6", "21-NOV-18 14.29.43"),
("C1", "21-NOV-18 18.29.43"),
("C1", "22-NOV-18 11.29.43"))
.toDF("client", "dt")
.withColumn("dt", to_timestamp($"dt", "dd-MMM-yy HH.mm.ss"))
Acting code
// get next/prev dates
val dateWindow = Window.partitionBy("client").orderBy("dt")
val withNextPrevDates = df
.withColumn("previousDate", lag($"dt", 1).over(dateWindow))
.withColumn("nextDate", lead($"dt", 1).over(dateWindow))
// function for filter
val secondsInDay = TimeUnit.DAYS.toSeconds(1)
val dateDiffLessThanDay = (startTimeStamp: Column, endTimeStamp: Column) =>
endTimeStamp.cast(LongType) - startTimeStamp.cast(LongType) < secondsInDay && datediff(endTimeStamp, startTimeStamp) === 1
// filter
val result = withNextPrevDates
.where(dateDiffLessThanDay($"previousDate", $"dt") || dateDiffLessThanDay($"dt", $"nextDate"))
.drop("previousDate", "nextDate")
Result
+------+-------------------+
|client|dt |
+------+-------------------+
|C1 |2018-11-21 18:29:43|
|C1 |2018-11-22 11:29:43|
|C2 |2018-11-18 10:29:43|
|C2 |2018-11-19 09:29:43|
+------+-------------------+
add a comment |
With Window function next/prev dates can be found, and then filtered rows where diff between dates bigger than 24 hours.
Data preparation
val df = Seq(("C1", "08-NOV-18 11.29.43"),
("C2", "09-NOV-18 13.29.43"),
("C2", "09-NOV-18 18.29.43"),
("C3", "11-NOV-18 19.29.43"),
("C1", "12-NOV-18 10.29.43"),
("C2", "13-NOV-18 09.29.43"),
("C4", "14-NOV-18 20.29.43"),
("C1", "15-NOV-18 11.29.43"),
("C5", "16-NOV-18 15.29.43"),
("C10", "17-NOV-18 19.29.43"),
("C1", "18-NOV-18 12.29.43"),
("C2", "18-NOV-18 10.29.43"),
("C2", "19-NOV-18 09.29.43"),
("C6", "20-NOV-18 13.29.43"),
("C6", "21-NOV-18 14.29.43"),
("C1", "21-NOV-18 18.29.43"),
("C1", "22-NOV-18 11.29.43"))
.toDF("client", "dt")
.withColumn("dt", to_timestamp($"dt", "dd-MMM-yy HH.mm.ss"))
Acting code
// get next/prev dates
val dateWindow = Window.partitionBy("client").orderBy("dt")
val withNextPrevDates = df
.withColumn("previousDate", lag($"dt", 1).over(dateWindow))
.withColumn("nextDate", lead($"dt", 1).over(dateWindow))
// function for filter
val secondsInDay = TimeUnit.DAYS.toSeconds(1)
val dateDiffLessThanDay = (startTimeStamp: Column, endTimeStamp: Column) =>
endTimeStamp.cast(LongType) - startTimeStamp.cast(LongType) < secondsInDay && datediff(endTimeStamp, startTimeStamp) === 1
// filter
val result = withNextPrevDates
.where(dateDiffLessThanDay($"previousDate", $"dt") || dateDiffLessThanDay($"dt", $"nextDate"))
.drop("previousDate", "nextDate")
Result
+------+-------------------+
|client|dt |
+------+-------------------+
|C1 |2018-11-21 18:29:43|
|C1 |2018-11-22 11:29:43|
|C2 |2018-11-18 10:29:43|
|C2 |2018-11-19 09:29:43|
+------+-------------------+
add a comment |
With Window function next/prev dates can be found, and then filtered rows where diff between dates bigger than 24 hours.
Data preparation
val df = Seq(("C1", "08-NOV-18 11.29.43"),
("C2", "09-NOV-18 13.29.43"),
("C2", "09-NOV-18 18.29.43"),
("C3", "11-NOV-18 19.29.43"),
("C1", "12-NOV-18 10.29.43"),
("C2", "13-NOV-18 09.29.43"),
("C4", "14-NOV-18 20.29.43"),
("C1", "15-NOV-18 11.29.43"),
("C5", "16-NOV-18 15.29.43"),
("C10", "17-NOV-18 19.29.43"),
("C1", "18-NOV-18 12.29.43"),
("C2", "18-NOV-18 10.29.43"),
("C2", "19-NOV-18 09.29.43"),
("C6", "20-NOV-18 13.29.43"),
("C6", "21-NOV-18 14.29.43"),
("C1", "21-NOV-18 18.29.43"),
("C1", "22-NOV-18 11.29.43"))
.toDF("client", "dt")
.withColumn("dt", to_timestamp($"dt", "dd-MMM-yy HH.mm.ss"))
Acting code
// get next/prev dates
val dateWindow = Window.partitionBy("client").orderBy("dt")
val withNextPrevDates = df
.withColumn("previousDate", lag($"dt", 1).over(dateWindow))
.withColumn("nextDate", lead($"dt", 1).over(dateWindow))
// function for filter
val secondsInDay = TimeUnit.DAYS.toSeconds(1)
val dateDiffLessThanDay = (startTimeStamp: Column, endTimeStamp: Column) =>
endTimeStamp.cast(LongType) - startTimeStamp.cast(LongType) < secondsInDay && datediff(endTimeStamp, startTimeStamp) === 1
// filter
val result = withNextPrevDates
.where(dateDiffLessThanDay($"previousDate", $"dt") || dateDiffLessThanDay($"dt", $"nextDate"))
.drop("previousDate", "nextDate")
Result
+------+-------------------+
|client|dt |
+------+-------------------+
|C1 |2018-11-21 18:29:43|
|C1 |2018-11-22 11:29:43|
|C2 |2018-11-18 10:29:43|
|C2 |2018-11-19 09:29:43|
+------+-------------------+
With Window function next/prev dates can be found, and then filtered rows where diff between dates bigger than 24 hours.
Data preparation
val df = Seq(("C1", "08-NOV-18 11.29.43"),
("C2", "09-NOV-18 13.29.43"),
("C2", "09-NOV-18 18.29.43"),
("C3", "11-NOV-18 19.29.43"),
("C1", "12-NOV-18 10.29.43"),
("C2", "13-NOV-18 09.29.43"),
("C4", "14-NOV-18 20.29.43"),
("C1", "15-NOV-18 11.29.43"),
("C5", "16-NOV-18 15.29.43"),
("C10", "17-NOV-18 19.29.43"),
("C1", "18-NOV-18 12.29.43"),
("C2", "18-NOV-18 10.29.43"),
("C2", "19-NOV-18 09.29.43"),
("C6", "20-NOV-18 13.29.43"),
("C6", "21-NOV-18 14.29.43"),
("C1", "21-NOV-18 18.29.43"),
("C1", "22-NOV-18 11.29.43"))
.toDF("client", "dt")
.withColumn("dt", to_timestamp($"dt", "dd-MMM-yy HH.mm.ss"))
Acting code
// get next/prev dates
val dateWindow = Window.partitionBy("client").orderBy("dt")
val withNextPrevDates = df
.withColumn("previousDate", lag($"dt", 1).over(dateWindow))
.withColumn("nextDate", lead($"dt", 1).over(dateWindow))
// function for filter
val secondsInDay = TimeUnit.DAYS.toSeconds(1)
val dateDiffLessThanDay = (startTimeStamp: Column, endTimeStamp: Column) =>
endTimeStamp.cast(LongType) - startTimeStamp.cast(LongType) < secondsInDay && datediff(endTimeStamp, startTimeStamp) === 1
// filter
val result = withNextPrevDates
.where(dateDiffLessThanDay($"previousDate", $"dt") || dateDiffLessThanDay($"dt", $"nextDate"))
.drop("previousDate", "nextDate")
Result
+------+-------------------+
|client|dt |
+------+-------------------+
|C1 |2018-11-21 18:29:43|
|C1 |2018-11-22 11:29:43|
|C2 |2018-11-18 10:29:43|
|C2 |2018-11-19 09:29:43|
+------+-------------------+
answered Nov 22 '18 at 19:27
pasha701pasha701
3,2901613
3,2901613
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53429716%2fhow-to-filter-a-spark-dataframe-based-on-occurrence-of-a-value-in-a-column-with%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
your question is not clear. what exactly you want as output
– Balaji Reddy
Nov 22 '18 at 12:06
What is the type of
date
column?– anuj saxena
Nov 22 '18 at 12:56
hello @BalajiReddy I edited the question I want to get a dataframe that contains for each client the last tow observation that the deference of the date is little than 24 hours.
– Chaouki
Nov 22 '18 at 13:13
@anujsaxena the date column is a timestamp.
– Chaouki
Nov 22 '18 at 13:13