How to filter a spark dataframe based on occurrence of a value in a column with a condition a date column?












-1















team, I'm working with a dataframe looks like:



    df
client | date
C1 |08-NOV-18 11.29.43
C2 |09-NOV-18 13.29.43
C2 |09-NOV-18 18.29.43
C3 |11-NOV-18 19.29.43
C1 |12-NOV-18 10.29.43
C2 |13-NOV-18 09.29.43
C4 |14-NOV-18 20.29.43
C1 |15-NOV-18 11.29.43
C5 |16-NOV-18 15.29.43
C10 |17-NOV-18 19.29.43
C1 |18-NOV-18 12.29.43
C2 |18-NOV-18 10.29.43
C2 |19-NOV-18 09.29.43
C6 |20-NOV-18 13.29.43
C6 |21-NOV-18 14.29.43
C1 |21-NOV-18 18.29.43
C1 |22-NOV-18 11.29.43


My goal is to filter this dataframe and get new dataframe that contains tow last occurrence of each client if this occurrence is < of 24 hours for example for this example the result must be:



     client  |date
C2 |18-NOV-18 10.29.43
C2 |19-NOV-18 09.29.43
C1 |21-NOV-18 18.29.43
C1 |22-NOV-18 11.29.43


any help, please !










share|improve this question

























  • your question is not clear. what exactly you want as output

    – Balaji Reddy
    Nov 22 '18 at 12:06











  • What is the type of date column?

    – anuj saxena
    Nov 22 '18 at 12:56













  • hello @BalajiReddy I edited the question I want to get a dataframe that contains for each client the last tow observation that the deference of the date is little than 24 hours.

    – Chaouki
    Nov 22 '18 at 13:13











  • @anujsaxena the date column is a timestamp.

    – Chaouki
    Nov 22 '18 at 13:13
















-1















team, I'm working with a dataframe looks like:



    df
client | date
C1 |08-NOV-18 11.29.43
C2 |09-NOV-18 13.29.43
C2 |09-NOV-18 18.29.43
C3 |11-NOV-18 19.29.43
C1 |12-NOV-18 10.29.43
C2 |13-NOV-18 09.29.43
C4 |14-NOV-18 20.29.43
C1 |15-NOV-18 11.29.43
C5 |16-NOV-18 15.29.43
C10 |17-NOV-18 19.29.43
C1 |18-NOV-18 12.29.43
C2 |18-NOV-18 10.29.43
C2 |19-NOV-18 09.29.43
C6 |20-NOV-18 13.29.43
C6 |21-NOV-18 14.29.43
C1 |21-NOV-18 18.29.43
C1 |22-NOV-18 11.29.43


My goal is to filter this dataframe and get new dataframe that contains tow last occurrence of each client if this occurrence is < of 24 hours for example for this example the result must be:



     client  |date
C2 |18-NOV-18 10.29.43
C2 |19-NOV-18 09.29.43
C1 |21-NOV-18 18.29.43
C1 |22-NOV-18 11.29.43


any help, please !










share|improve this question

























  • your question is not clear. what exactly you want as output

    – Balaji Reddy
    Nov 22 '18 at 12:06











  • What is the type of date column?

    – anuj saxena
    Nov 22 '18 at 12:56













  • hello @BalajiReddy I edited the question I want to get a dataframe that contains for each client the last tow observation that the deference of the date is little than 24 hours.

    – Chaouki
    Nov 22 '18 at 13:13











  • @anujsaxena the date column is a timestamp.

    – Chaouki
    Nov 22 '18 at 13:13














-1












-1








-1








team, I'm working with a dataframe looks like:



    df
client | date
C1 |08-NOV-18 11.29.43
C2 |09-NOV-18 13.29.43
C2 |09-NOV-18 18.29.43
C3 |11-NOV-18 19.29.43
C1 |12-NOV-18 10.29.43
C2 |13-NOV-18 09.29.43
C4 |14-NOV-18 20.29.43
C1 |15-NOV-18 11.29.43
C5 |16-NOV-18 15.29.43
C10 |17-NOV-18 19.29.43
C1 |18-NOV-18 12.29.43
C2 |18-NOV-18 10.29.43
C2 |19-NOV-18 09.29.43
C6 |20-NOV-18 13.29.43
C6 |21-NOV-18 14.29.43
C1 |21-NOV-18 18.29.43
C1 |22-NOV-18 11.29.43


My goal is to filter this dataframe and get new dataframe that contains tow last occurrence of each client if this occurrence is < of 24 hours for example for this example the result must be:



     client  |date
C2 |18-NOV-18 10.29.43
C2 |19-NOV-18 09.29.43
C1 |21-NOV-18 18.29.43
C1 |22-NOV-18 11.29.43


any help, please !










share|improve this question
















team, I'm working with a dataframe looks like:



    df
client | date
C1 |08-NOV-18 11.29.43
C2 |09-NOV-18 13.29.43
C2 |09-NOV-18 18.29.43
C3 |11-NOV-18 19.29.43
C1 |12-NOV-18 10.29.43
C2 |13-NOV-18 09.29.43
C4 |14-NOV-18 20.29.43
C1 |15-NOV-18 11.29.43
C5 |16-NOV-18 15.29.43
C10 |17-NOV-18 19.29.43
C1 |18-NOV-18 12.29.43
C2 |18-NOV-18 10.29.43
C2 |19-NOV-18 09.29.43
C6 |20-NOV-18 13.29.43
C6 |21-NOV-18 14.29.43
C1 |21-NOV-18 18.29.43
C1 |22-NOV-18 11.29.43


My goal is to filter this dataframe and get new dataframe that contains tow last occurrence of each client if this occurrence is < of 24 hours for example for this example the result must be:



     client  |date
C2 |18-NOV-18 10.29.43
C2 |19-NOV-18 09.29.43
C1 |21-NOV-18 18.29.43
C1 |22-NOV-18 11.29.43


any help, please !







scala apache-spark apache-spark-sql






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Nov 22 '18 at 12:58







Chaouki

















asked Nov 22 '18 at 11:13









ChaoukiChaouki

7611




7611













  • your question is not clear. what exactly you want as output

    – Balaji Reddy
    Nov 22 '18 at 12:06











  • What is the type of date column?

    – anuj saxena
    Nov 22 '18 at 12:56













  • hello @BalajiReddy I edited the question I want to get a dataframe that contains for each client the last tow observation that the deference of the date is little than 24 hours.

    – Chaouki
    Nov 22 '18 at 13:13











  • @anujsaxena the date column is a timestamp.

    – Chaouki
    Nov 22 '18 at 13:13



















  • your question is not clear. what exactly you want as output

    – Balaji Reddy
    Nov 22 '18 at 12:06











  • What is the type of date column?

    – anuj saxena
    Nov 22 '18 at 12:56













  • hello @BalajiReddy I edited the question I want to get a dataframe that contains for each client the last tow observation that the deference of the date is little than 24 hours.

    – Chaouki
    Nov 22 '18 at 13:13











  • @anujsaxena the date column is a timestamp.

    – Chaouki
    Nov 22 '18 at 13:13

















your question is not clear. what exactly you want as output

– Balaji Reddy
Nov 22 '18 at 12:06





your question is not clear. what exactly you want as output

– Balaji Reddy
Nov 22 '18 at 12:06













What is the type of date column?

– anuj saxena
Nov 22 '18 at 12:56







What is the type of date column?

– anuj saxena
Nov 22 '18 at 12:56















hello @BalajiReddy I edited the question I want to get a dataframe that contains for each client the last tow observation that the deference of the date is little than 24 hours.

– Chaouki
Nov 22 '18 at 13:13





hello @BalajiReddy I edited the question I want to get a dataframe that contains for each client the last tow observation that the deference of the date is little than 24 hours.

– Chaouki
Nov 22 '18 at 13:13













@anujsaxena the date column is a timestamp.

– Chaouki
Nov 22 '18 at 13:13





@anujsaxena the date column is a timestamp.

– Chaouki
Nov 22 '18 at 13:13












3 Answers
3






active

oldest

votes


















1














Using window functions. Check this out:



val df = Seq(("C1","08-NOV-18 11.29.43"),
("C2","09-NOV-18 13.29.43"),
("C2","09-NOV-18 18.29.43"),
("C3","11-NOV-18 19.29.43"),
("C1","12-NOV-18 10.29.43"),
("C2","13-NOV-18 09.29.43"),
("C4","14-NOV-18 20.29.43"),
("C1","15-NOV-18 11.29.43"),
("C5","16-NOV-18 15.29.43"),
("C10","17-NOV-18 19.29.43"),
("C1","18-NOV-18 12.29.43"),
("C2","18-NOV-18 10.29.43"),
("C2","19-NOV-18 09.29.43"),
("C6","20-NOV-18 13.29.43"),
("C6","21-NOV-18 14.29.43"),
("C1","21-NOV-18 18.29.43"),
("C1","22-NOV-18 11.29.43")).toDF("client","dt").withColumn("dt",from_unixtime(unix_timestamp('dt,"dd-MMM-yy HH.mm.ss"),"yyyy-MM-dd HH:mm:ss"))

df.createOrReplaceTempView("tbl")

val df2 = spark.sql(""" select * from ( select client, dt, count(*) over(partition by client ) cnt, rank() over(partition by client order by dt desc) rk1 from tbl ) t where cnt>1 and rk1 in (1,2) """)

df2.alias("t1").join(df2.alias("t2"), $"t1.client" === $"t2.client" and $"t1.rk1" =!= $"t2.rk1" , "inner" ).withColumn("dt24",(unix_timestamp($"t1.dt") - unix_timestamp($"t2.dt") )/ 3600 ).where("dt24 > -24 and dt24 < 24").select($"t1.client", $"t1.dt").show(false)


Results:



+------+-------------------+
|client|dt |
+------+-------------------+
|C1 |2018-11-22 11:29:43|
|C1 |2018-11-21 18:29:43|
|C2 |2018-11-19 09:29:43|
|C2 |2018-11-18 10:29:43|
+------+-------------------+





share|improve this answer































    0














    I have one solution for this scenario:



      val milliSecForADay = 24 * 60 * 60 * 1000

    val filterDatesUDF = udf { arr: scala.collection.mutable.WrappedArray[Timestamp] =>
    arr.sortWith(_ after _).toList match {
    case last :: secondLast :: _ if (last.getTime - secondLast.getTime) < milliSecForADay => Array(secondLast, last)
    case _ => Array.empty[Timestamp]
    }
    }

    val finalDF = df.groupBy("client")
    .agg(collect_list("date").as("dates"))
    .select(col("client"), explode(filterDatesUDF(col("dates"))).as("date"))
    .show()


    In this solution, first, I am grouping the data based on the client using a user-defined function or udf for handling timestamps grouped for each client.



    This is done while presuming that the date column already in Timestamp format which I think may not be true. In case you are getting the date column as String type, add the following code before the above solution to convert the type of column date from String to Timestamp.



      val stringToTimestampUDF = udf { str: String =>
    val format = new java.text.SimpleDateFormat("dd-MMM-yy hh.mm.ss") //format for "08-NOV-18 11.29.43"
    new Timestamp(format.parse(str).getTime)
    }

    val df = originDF.select(col("client"), to_utc_timestamp(stringToTimestampUDF(col("date")), "utc").as("date"))





    share|improve this answer

































      0














      With Window function next/prev dates can be found, and then filtered rows where diff between dates bigger than 24 hours.



      Data preparation



      val df = Seq(("C1", "08-NOV-18 11.29.43"),
      ("C2", "09-NOV-18 13.29.43"),
      ("C2", "09-NOV-18 18.29.43"),
      ("C3", "11-NOV-18 19.29.43"),
      ("C1", "12-NOV-18 10.29.43"),
      ("C2", "13-NOV-18 09.29.43"),
      ("C4", "14-NOV-18 20.29.43"),
      ("C1", "15-NOV-18 11.29.43"),
      ("C5", "16-NOV-18 15.29.43"),
      ("C10", "17-NOV-18 19.29.43"),
      ("C1", "18-NOV-18 12.29.43"),
      ("C2", "18-NOV-18 10.29.43"),
      ("C2", "19-NOV-18 09.29.43"),
      ("C6", "20-NOV-18 13.29.43"),
      ("C6", "21-NOV-18 14.29.43"),
      ("C1", "21-NOV-18 18.29.43"),
      ("C1", "22-NOV-18 11.29.43"))
      .toDF("client", "dt")
      .withColumn("dt", to_timestamp($"dt", "dd-MMM-yy HH.mm.ss"))


      Acting code



      // get next/prev dates
      val dateWindow = Window.partitionBy("client").orderBy("dt")
      val withNextPrevDates = df
      .withColumn("previousDate", lag($"dt", 1).over(dateWindow))
      .withColumn("nextDate", lead($"dt", 1).over(dateWindow))

      // function for filter
      val secondsInDay = TimeUnit.DAYS.toSeconds(1)
      val dateDiffLessThanDay = (startTimeStamp: Column, endTimeStamp: Column) =>
      endTimeStamp.cast(LongType) - startTimeStamp.cast(LongType) < secondsInDay && datediff(endTimeStamp, startTimeStamp) === 1

      // filter
      val result = withNextPrevDates
      .where(dateDiffLessThanDay($"previousDate", $"dt") || dateDiffLessThanDay($"dt", $"nextDate"))
      .drop("previousDate", "nextDate")


      Result



      +------+-------------------+
      |client|dt |
      +------+-------------------+
      |C1 |2018-11-21 18:29:43|
      |C1 |2018-11-22 11:29:43|
      |C2 |2018-11-18 10:29:43|
      |C2 |2018-11-19 09:29:43|
      +------+-------------------+





      share|improve this answer























        Your Answer






        StackExchange.ifUsing("editor", function () {
        StackExchange.using("externalEditor", function () {
        StackExchange.using("snippets", function () {
        StackExchange.snippets.init();
        });
        });
        }, "code-snippets");

        StackExchange.ready(function() {
        var channelOptions = {
        tags: "".split(" "),
        id: "1"
        };
        initTagRenderer("".split(" "), "".split(" "), channelOptions);

        StackExchange.using("externalEditor", function() {
        // Have to fire editor after snippets, if snippets enabled
        if (StackExchange.settings.snippets.snippetsEnabled) {
        StackExchange.using("snippets", function() {
        createEditor();
        });
        }
        else {
        createEditor();
        }
        });

        function createEditor() {
        StackExchange.prepareEditor({
        heartbeatType: 'answer',
        autoActivateHeartbeat: false,
        convertImagesToLinks: true,
        noModals: true,
        showLowRepImageUploadWarning: true,
        reputationToPostImages: 10,
        bindNavPrevention: true,
        postfix: "",
        imageUploader: {
        brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
        contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
        allowUrls: true
        },
        onDemand: true,
        discardSelector: ".discard-answer"
        ,immediatelyShowMarkdownHelp:true
        });


        }
        });














        draft saved

        draft discarded


















        StackExchange.ready(
        function () {
        StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53429716%2fhow-to-filter-a-spark-dataframe-based-on-occurrence-of-a-value-in-a-column-with%23new-answer', 'question_page');
        }
        );

        Post as a guest















        Required, but never shown

























        3 Answers
        3






        active

        oldest

        votes








        3 Answers
        3






        active

        oldest

        votes









        active

        oldest

        votes






        active

        oldest

        votes









        1














        Using window functions. Check this out:



        val df = Seq(("C1","08-NOV-18 11.29.43"),
        ("C2","09-NOV-18 13.29.43"),
        ("C2","09-NOV-18 18.29.43"),
        ("C3","11-NOV-18 19.29.43"),
        ("C1","12-NOV-18 10.29.43"),
        ("C2","13-NOV-18 09.29.43"),
        ("C4","14-NOV-18 20.29.43"),
        ("C1","15-NOV-18 11.29.43"),
        ("C5","16-NOV-18 15.29.43"),
        ("C10","17-NOV-18 19.29.43"),
        ("C1","18-NOV-18 12.29.43"),
        ("C2","18-NOV-18 10.29.43"),
        ("C2","19-NOV-18 09.29.43"),
        ("C6","20-NOV-18 13.29.43"),
        ("C6","21-NOV-18 14.29.43"),
        ("C1","21-NOV-18 18.29.43"),
        ("C1","22-NOV-18 11.29.43")).toDF("client","dt").withColumn("dt",from_unixtime(unix_timestamp('dt,"dd-MMM-yy HH.mm.ss"),"yyyy-MM-dd HH:mm:ss"))

        df.createOrReplaceTempView("tbl")

        val df2 = spark.sql(""" select * from ( select client, dt, count(*) over(partition by client ) cnt, rank() over(partition by client order by dt desc) rk1 from tbl ) t where cnt>1 and rk1 in (1,2) """)

        df2.alias("t1").join(df2.alias("t2"), $"t1.client" === $"t2.client" and $"t1.rk1" =!= $"t2.rk1" , "inner" ).withColumn("dt24",(unix_timestamp($"t1.dt") - unix_timestamp($"t2.dt") )/ 3600 ).where("dt24 > -24 and dt24 < 24").select($"t1.client", $"t1.dt").show(false)


        Results:



        +------+-------------------+
        |client|dt |
        +------+-------------------+
        |C1 |2018-11-22 11:29:43|
        |C1 |2018-11-21 18:29:43|
        |C2 |2018-11-19 09:29:43|
        |C2 |2018-11-18 10:29:43|
        +------+-------------------+





        share|improve this answer




























          1














          Using window functions. Check this out:



          val df = Seq(("C1","08-NOV-18 11.29.43"),
          ("C2","09-NOV-18 13.29.43"),
          ("C2","09-NOV-18 18.29.43"),
          ("C3","11-NOV-18 19.29.43"),
          ("C1","12-NOV-18 10.29.43"),
          ("C2","13-NOV-18 09.29.43"),
          ("C4","14-NOV-18 20.29.43"),
          ("C1","15-NOV-18 11.29.43"),
          ("C5","16-NOV-18 15.29.43"),
          ("C10","17-NOV-18 19.29.43"),
          ("C1","18-NOV-18 12.29.43"),
          ("C2","18-NOV-18 10.29.43"),
          ("C2","19-NOV-18 09.29.43"),
          ("C6","20-NOV-18 13.29.43"),
          ("C6","21-NOV-18 14.29.43"),
          ("C1","21-NOV-18 18.29.43"),
          ("C1","22-NOV-18 11.29.43")).toDF("client","dt").withColumn("dt",from_unixtime(unix_timestamp('dt,"dd-MMM-yy HH.mm.ss"),"yyyy-MM-dd HH:mm:ss"))

          df.createOrReplaceTempView("tbl")

          val df2 = spark.sql(""" select * from ( select client, dt, count(*) over(partition by client ) cnt, rank() over(partition by client order by dt desc) rk1 from tbl ) t where cnt>1 and rk1 in (1,2) """)

          df2.alias("t1").join(df2.alias("t2"), $"t1.client" === $"t2.client" and $"t1.rk1" =!= $"t2.rk1" , "inner" ).withColumn("dt24",(unix_timestamp($"t1.dt") - unix_timestamp($"t2.dt") )/ 3600 ).where("dt24 > -24 and dt24 < 24").select($"t1.client", $"t1.dt").show(false)


          Results:



          +------+-------------------+
          |client|dt |
          +------+-------------------+
          |C1 |2018-11-22 11:29:43|
          |C1 |2018-11-21 18:29:43|
          |C2 |2018-11-19 09:29:43|
          |C2 |2018-11-18 10:29:43|
          +------+-------------------+





          share|improve this answer


























            1












            1








            1







            Using window functions. Check this out:



            val df = Seq(("C1","08-NOV-18 11.29.43"),
            ("C2","09-NOV-18 13.29.43"),
            ("C2","09-NOV-18 18.29.43"),
            ("C3","11-NOV-18 19.29.43"),
            ("C1","12-NOV-18 10.29.43"),
            ("C2","13-NOV-18 09.29.43"),
            ("C4","14-NOV-18 20.29.43"),
            ("C1","15-NOV-18 11.29.43"),
            ("C5","16-NOV-18 15.29.43"),
            ("C10","17-NOV-18 19.29.43"),
            ("C1","18-NOV-18 12.29.43"),
            ("C2","18-NOV-18 10.29.43"),
            ("C2","19-NOV-18 09.29.43"),
            ("C6","20-NOV-18 13.29.43"),
            ("C6","21-NOV-18 14.29.43"),
            ("C1","21-NOV-18 18.29.43"),
            ("C1","22-NOV-18 11.29.43")).toDF("client","dt").withColumn("dt",from_unixtime(unix_timestamp('dt,"dd-MMM-yy HH.mm.ss"),"yyyy-MM-dd HH:mm:ss"))

            df.createOrReplaceTempView("tbl")

            val df2 = spark.sql(""" select * from ( select client, dt, count(*) over(partition by client ) cnt, rank() over(partition by client order by dt desc) rk1 from tbl ) t where cnt>1 and rk1 in (1,2) """)

            df2.alias("t1").join(df2.alias("t2"), $"t1.client" === $"t2.client" and $"t1.rk1" =!= $"t2.rk1" , "inner" ).withColumn("dt24",(unix_timestamp($"t1.dt") - unix_timestamp($"t2.dt") )/ 3600 ).where("dt24 > -24 and dt24 < 24").select($"t1.client", $"t1.dt").show(false)


            Results:



            +------+-------------------+
            |client|dt |
            +------+-------------------+
            |C1 |2018-11-22 11:29:43|
            |C1 |2018-11-21 18:29:43|
            |C2 |2018-11-19 09:29:43|
            |C2 |2018-11-18 10:29:43|
            +------+-------------------+





            share|improve this answer













            Using window functions. Check this out:



            val df = Seq(("C1","08-NOV-18 11.29.43"),
            ("C2","09-NOV-18 13.29.43"),
            ("C2","09-NOV-18 18.29.43"),
            ("C3","11-NOV-18 19.29.43"),
            ("C1","12-NOV-18 10.29.43"),
            ("C2","13-NOV-18 09.29.43"),
            ("C4","14-NOV-18 20.29.43"),
            ("C1","15-NOV-18 11.29.43"),
            ("C5","16-NOV-18 15.29.43"),
            ("C10","17-NOV-18 19.29.43"),
            ("C1","18-NOV-18 12.29.43"),
            ("C2","18-NOV-18 10.29.43"),
            ("C2","19-NOV-18 09.29.43"),
            ("C6","20-NOV-18 13.29.43"),
            ("C6","21-NOV-18 14.29.43"),
            ("C1","21-NOV-18 18.29.43"),
            ("C1","22-NOV-18 11.29.43")).toDF("client","dt").withColumn("dt",from_unixtime(unix_timestamp('dt,"dd-MMM-yy HH.mm.ss"),"yyyy-MM-dd HH:mm:ss"))

            df.createOrReplaceTempView("tbl")

            val df2 = spark.sql(""" select * from ( select client, dt, count(*) over(partition by client ) cnt, rank() over(partition by client order by dt desc) rk1 from tbl ) t where cnt>1 and rk1 in (1,2) """)

            df2.alias("t1").join(df2.alias("t2"), $"t1.client" === $"t2.client" and $"t1.rk1" =!= $"t2.rk1" , "inner" ).withColumn("dt24",(unix_timestamp($"t1.dt") - unix_timestamp($"t2.dt") )/ 3600 ).where("dt24 > -24 and dt24 < 24").select($"t1.client", $"t1.dt").show(false)


            Results:



            +------+-------------------+
            |client|dt |
            +------+-------------------+
            |C1 |2018-11-22 11:29:43|
            |C1 |2018-11-21 18:29:43|
            |C2 |2018-11-19 09:29:43|
            |C2 |2018-11-18 10:29:43|
            +------+-------------------+






            share|improve this answer












            share|improve this answer



            share|improve this answer










            answered Nov 22 '18 at 14:16









            stack0114106stack0114106

            3,9032420




            3,9032420

























                0














                I have one solution for this scenario:



                  val milliSecForADay = 24 * 60 * 60 * 1000

                val filterDatesUDF = udf { arr: scala.collection.mutable.WrappedArray[Timestamp] =>
                arr.sortWith(_ after _).toList match {
                case last :: secondLast :: _ if (last.getTime - secondLast.getTime) < milliSecForADay => Array(secondLast, last)
                case _ => Array.empty[Timestamp]
                }
                }

                val finalDF = df.groupBy("client")
                .agg(collect_list("date").as("dates"))
                .select(col("client"), explode(filterDatesUDF(col("dates"))).as("date"))
                .show()


                In this solution, first, I am grouping the data based on the client using a user-defined function or udf for handling timestamps grouped for each client.



                This is done while presuming that the date column already in Timestamp format which I think may not be true. In case you are getting the date column as String type, add the following code before the above solution to convert the type of column date from String to Timestamp.



                  val stringToTimestampUDF = udf { str: String =>
                val format = new java.text.SimpleDateFormat("dd-MMM-yy hh.mm.ss") //format for "08-NOV-18 11.29.43"
                new Timestamp(format.parse(str).getTime)
                }

                val df = originDF.select(col("client"), to_utc_timestamp(stringToTimestampUDF(col("date")), "utc").as("date"))





                share|improve this answer






























                  0














                  I have one solution for this scenario:



                    val milliSecForADay = 24 * 60 * 60 * 1000

                  val filterDatesUDF = udf { arr: scala.collection.mutable.WrappedArray[Timestamp] =>
                  arr.sortWith(_ after _).toList match {
                  case last :: secondLast :: _ if (last.getTime - secondLast.getTime) < milliSecForADay => Array(secondLast, last)
                  case _ => Array.empty[Timestamp]
                  }
                  }

                  val finalDF = df.groupBy("client")
                  .agg(collect_list("date").as("dates"))
                  .select(col("client"), explode(filterDatesUDF(col("dates"))).as("date"))
                  .show()


                  In this solution, first, I am grouping the data based on the client using a user-defined function or udf for handling timestamps grouped for each client.



                  This is done while presuming that the date column already in Timestamp format which I think may not be true. In case you are getting the date column as String type, add the following code before the above solution to convert the type of column date from String to Timestamp.



                    val stringToTimestampUDF = udf { str: String =>
                  val format = new java.text.SimpleDateFormat("dd-MMM-yy hh.mm.ss") //format for "08-NOV-18 11.29.43"
                  new Timestamp(format.parse(str).getTime)
                  }

                  val df = originDF.select(col("client"), to_utc_timestamp(stringToTimestampUDF(col("date")), "utc").as("date"))





                  share|improve this answer




























                    0












                    0








                    0







                    I have one solution for this scenario:



                      val milliSecForADay = 24 * 60 * 60 * 1000

                    val filterDatesUDF = udf { arr: scala.collection.mutable.WrappedArray[Timestamp] =>
                    arr.sortWith(_ after _).toList match {
                    case last :: secondLast :: _ if (last.getTime - secondLast.getTime) < milliSecForADay => Array(secondLast, last)
                    case _ => Array.empty[Timestamp]
                    }
                    }

                    val finalDF = df.groupBy("client")
                    .agg(collect_list("date").as("dates"))
                    .select(col("client"), explode(filterDatesUDF(col("dates"))).as("date"))
                    .show()


                    In this solution, first, I am grouping the data based on the client using a user-defined function or udf for handling timestamps grouped for each client.



                    This is done while presuming that the date column already in Timestamp format which I think may not be true. In case you are getting the date column as String type, add the following code before the above solution to convert the type of column date from String to Timestamp.



                      val stringToTimestampUDF = udf { str: String =>
                    val format = new java.text.SimpleDateFormat("dd-MMM-yy hh.mm.ss") //format for "08-NOV-18 11.29.43"
                    new Timestamp(format.parse(str).getTime)
                    }

                    val df = originDF.select(col("client"), to_utc_timestamp(stringToTimestampUDF(col("date")), "utc").as("date"))





                    share|improve this answer















                    I have one solution for this scenario:



                      val milliSecForADay = 24 * 60 * 60 * 1000

                    val filterDatesUDF = udf { arr: scala.collection.mutable.WrappedArray[Timestamp] =>
                    arr.sortWith(_ after _).toList match {
                    case last :: secondLast :: _ if (last.getTime - secondLast.getTime) < milliSecForADay => Array(secondLast, last)
                    case _ => Array.empty[Timestamp]
                    }
                    }

                    val finalDF = df.groupBy("client")
                    .agg(collect_list("date").as("dates"))
                    .select(col("client"), explode(filterDatesUDF(col("dates"))).as("date"))
                    .show()


                    In this solution, first, I am grouping the data based on the client using a user-defined function or udf for handling timestamps grouped for each client.



                    This is done while presuming that the date column already in Timestamp format which I think may not be true. In case you are getting the date column as String type, add the following code before the above solution to convert the type of column date from String to Timestamp.



                      val stringToTimestampUDF = udf { str: String =>
                    val format = new java.text.SimpleDateFormat("dd-MMM-yy hh.mm.ss") //format for "08-NOV-18 11.29.43"
                    new Timestamp(format.parse(str).getTime)
                    }

                    val df = originDF.select(col("client"), to_utc_timestamp(stringToTimestampUDF(col("date")), "utc").as("date"))






                    share|improve this answer














                    share|improve this answer



                    share|improve this answer








                    edited Nov 22 '18 at 14:03

























                    answered Nov 22 '18 at 13:55









                    anuj saxenaanuj saxena

                    24917




                    24917























                        0














                        With Window function next/prev dates can be found, and then filtered rows where diff between dates bigger than 24 hours.



                        Data preparation



                        val df = Seq(("C1", "08-NOV-18 11.29.43"),
                        ("C2", "09-NOV-18 13.29.43"),
                        ("C2", "09-NOV-18 18.29.43"),
                        ("C3", "11-NOV-18 19.29.43"),
                        ("C1", "12-NOV-18 10.29.43"),
                        ("C2", "13-NOV-18 09.29.43"),
                        ("C4", "14-NOV-18 20.29.43"),
                        ("C1", "15-NOV-18 11.29.43"),
                        ("C5", "16-NOV-18 15.29.43"),
                        ("C10", "17-NOV-18 19.29.43"),
                        ("C1", "18-NOV-18 12.29.43"),
                        ("C2", "18-NOV-18 10.29.43"),
                        ("C2", "19-NOV-18 09.29.43"),
                        ("C6", "20-NOV-18 13.29.43"),
                        ("C6", "21-NOV-18 14.29.43"),
                        ("C1", "21-NOV-18 18.29.43"),
                        ("C1", "22-NOV-18 11.29.43"))
                        .toDF("client", "dt")
                        .withColumn("dt", to_timestamp($"dt", "dd-MMM-yy HH.mm.ss"))


                        Acting code



                        // get next/prev dates
                        val dateWindow = Window.partitionBy("client").orderBy("dt")
                        val withNextPrevDates = df
                        .withColumn("previousDate", lag($"dt", 1).over(dateWindow))
                        .withColumn("nextDate", lead($"dt", 1).over(dateWindow))

                        // function for filter
                        val secondsInDay = TimeUnit.DAYS.toSeconds(1)
                        val dateDiffLessThanDay = (startTimeStamp: Column, endTimeStamp: Column) =>
                        endTimeStamp.cast(LongType) - startTimeStamp.cast(LongType) < secondsInDay && datediff(endTimeStamp, startTimeStamp) === 1

                        // filter
                        val result = withNextPrevDates
                        .where(dateDiffLessThanDay($"previousDate", $"dt") || dateDiffLessThanDay($"dt", $"nextDate"))
                        .drop("previousDate", "nextDate")


                        Result



                        +------+-------------------+
                        |client|dt |
                        +------+-------------------+
                        |C1 |2018-11-21 18:29:43|
                        |C1 |2018-11-22 11:29:43|
                        |C2 |2018-11-18 10:29:43|
                        |C2 |2018-11-19 09:29:43|
                        +------+-------------------+





                        share|improve this answer




























                          0














                          With Window function next/prev dates can be found, and then filtered rows where diff between dates bigger than 24 hours.



                          Data preparation



                          val df = Seq(("C1", "08-NOV-18 11.29.43"),
                          ("C2", "09-NOV-18 13.29.43"),
                          ("C2", "09-NOV-18 18.29.43"),
                          ("C3", "11-NOV-18 19.29.43"),
                          ("C1", "12-NOV-18 10.29.43"),
                          ("C2", "13-NOV-18 09.29.43"),
                          ("C4", "14-NOV-18 20.29.43"),
                          ("C1", "15-NOV-18 11.29.43"),
                          ("C5", "16-NOV-18 15.29.43"),
                          ("C10", "17-NOV-18 19.29.43"),
                          ("C1", "18-NOV-18 12.29.43"),
                          ("C2", "18-NOV-18 10.29.43"),
                          ("C2", "19-NOV-18 09.29.43"),
                          ("C6", "20-NOV-18 13.29.43"),
                          ("C6", "21-NOV-18 14.29.43"),
                          ("C1", "21-NOV-18 18.29.43"),
                          ("C1", "22-NOV-18 11.29.43"))
                          .toDF("client", "dt")
                          .withColumn("dt", to_timestamp($"dt", "dd-MMM-yy HH.mm.ss"))


                          Acting code



                          // get next/prev dates
                          val dateWindow = Window.partitionBy("client").orderBy("dt")
                          val withNextPrevDates = df
                          .withColumn("previousDate", lag($"dt", 1).over(dateWindow))
                          .withColumn("nextDate", lead($"dt", 1).over(dateWindow))

                          // function for filter
                          val secondsInDay = TimeUnit.DAYS.toSeconds(1)
                          val dateDiffLessThanDay = (startTimeStamp: Column, endTimeStamp: Column) =>
                          endTimeStamp.cast(LongType) - startTimeStamp.cast(LongType) < secondsInDay && datediff(endTimeStamp, startTimeStamp) === 1

                          // filter
                          val result = withNextPrevDates
                          .where(dateDiffLessThanDay($"previousDate", $"dt") || dateDiffLessThanDay($"dt", $"nextDate"))
                          .drop("previousDate", "nextDate")


                          Result



                          +------+-------------------+
                          |client|dt |
                          +------+-------------------+
                          |C1 |2018-11-21 18:29:43|
                          |C1 |2018-11-22 11:29:43|
                          |C2 |2018-11-18 10:29:43|
                          |C2 |2018-11-19 09:29:43|
                          +------+-------------------+





                          share|improve this answer


























                            0












                            0








                            0







                            With Window function next/prev dates can be found, and then filtered rows where diff between dates bigger than 24 hours.



                            Data preparation



                            val df = Seq(("C1", "08-NOV-18 11.29.43"),
                            ("C2", "09-NOV-18 13.29.43"),
                            ("C2", "09-NOV-18 18.29.43"),
                            ("C3", "11-NOV-18 19.29.43"),
                            ("C1", "12-NOV-18 10.29.43"),
                            ("C2", "13-NOV-18 09.29.43"),
                            ("C4", "14-NOV-18 20.29.43"),
                            ("C1", "15-NOV-18 11.29.43"),
                            ("C5", "16-NOV-18 15.29.43"),
                            ("C10", "17-NOV-18 19.29.43"),
                            ("C1", "18-NOV-18 12.29.43"),
                            ("C2", "18-NOV-18 10.29.43"),
                            ("C2", "19-NOV-18 09.29.43"),
                            ("C6", "20-NOV-18 13.29.43"),
                            ("C6", "21-NOV-18 14.29.43"),
                            ("C1", "21-NOV-18 18.29.43"),
                            ("C1", "22-NOV-18 11.29.43"))
                            .toDF("client", "dt")
                            .withColumn("dt", to_timestamp($"dt", "dd-MMM-yy HH.mm.ss"))


                            Acting code



                            // get next/prev dates
                            val dateWindow = Window.partitionBy("client").orderBy("dt")
                            val withNextPrevDates = df
                            .withColumn("previousDate", lag($"dt", 1).over(dateWindow))
                            .withColumn("nextDate", lead($"dt", 1).over(dateWindow))

                            // function for filter
                            val secondsInDay = TimeUnit.DAYS.toSeconds(1)
                            val dateDiffLessThanDay = (startTimeStamp: Column, endTimeStamp: Column) =>
                            endTimeStamp.cast(LongType) - startTimeStamp.cast(LongType) < secondsInDay && datediff(endTimeStamp, startTimeStamp) === 1

                            // filter
                            val result = withNextPrevDates
                            .where(dateDiffLessThanDay($"previousDate", $"dt") || dateDiffLessThanDay($"dt", $"nextDate"))
                            .drop("previousDate", "nextDate")


                            Result



                            +------+-------------------+
                            |client|dt |
                            +------+-------------------+
                            |C1 |2018-11-21 18:29:43|
                            |C1 |2018-11-22 11:29:43|
                            |C2 |2018-11-18 10:29:43|
                            |C2 |2018-11-19 09:29:43|
                            +------+-------------------+





                            share|improve this answer













                            With Window function next/prev dates can be found, and then filtered rows where diff between dates bigger than 24 hours.



                            Data preparation



                            val df = Seq(("C1", "08-NOV-18 11.29.43"),
                            ("C2", "09-NOV-18 13.29.43"),
                            ("C2", "09-NOV-18 18.29.43"),
                            ("C3", "11-NOV-18 19.29.43"),
                            ("C1", "12-NOV-18 10.29.43"),
                            ("C2", "13-NOV-18 09.29.43"),
                            ("C4", "14-NOV-18 20.29.43"),
                            ("C1", "15-NOV-18 11.29.43"),
                            ("C5", "16-NOV-18 15.29.43"),
                            ("C10", "17-NOV-18 19.29.43"),
                            ("C1", "18-NOV-18 12.29.43"),
                            ("C2", "18-NOV-18 10.29.43"),
                            ("C2", "19-NOV-18 09.29.43"),
                            ("C6", "20-NOV-18 13.29.43"),
                            ("C6", "21-NOV-18 14.29.43"),
                            ("C1", "21-NOV-18 18.29.43"),
                            ("C1", "22-NOV-18 11.29.43"))
                            .toDF("client", "dt")
                            .withColumn("dt", to_timestamp($"dt", "dd-MMM-yy HH.mm.ss"))


                            Acting code



                            // get next/prev dates
                            val dateWindow = Window.partitionBy("client").orderBy("dt")
                            val withNextPrevDates = df
                            .withColumn("previousDate", lag($"dt", 1).over(dateWindow))
                            .withColumn("nextDate", lead($"dt", 1).over(dateWindow))

                            // function for filter
                            val secondsInDay = TimeUnit.DAYS.toSeconds(1)
                            val dateDiffLessThanDay = (startTimeStamp: Column, endTimeStamp: Column) =>
                            endTimeStamp.cast(LongType) - startTimeStamp.cast(LongType) < secondsInDay && datediff(endTimeStamp, startTimeStamp) === 1

                            // filter
                            val result = withNextPrevDates
                            .where(dateDiffLessThanDay($"previousDate", $"dt") || dateDiffLessThanDay($"dt", $"nextDate"))
                            .drop("previousDate", "nextDate")


                            Result



                            +------+-------------------+
                            |client|dt |
                            +------+-------------------+
                            |C1 |2018-11-21 18:29:43|
                            |C1 |2018-11-22 11:29:43|
                            |C2 |2018-11-18 10:29:43|
                            |C2 |2018-11-19 09:29:43|
                            +------+-------------------+






                            share|improve this answer












                            share|improve this answer



                            share|improve this answer










                            answered Nov 22 '18 at 19:27









                            pasha701pasha701

                            3,2901613




                            3,2901613






























                                draft saved

                                draft discarded




















































                                Thanks for contributing an answer to Stack Overflow!


                                • Please be sure to answer the question. Provide details and share your research!

                                But avoid



                                • Asking for help, clarification, or responding to other answers.

                                • Making statements based on opinion; back them up with references or personal experience.


                                To learn more, see our tips on writing great answers.




                                draft saved


                                draft discarded














                                StackExchange.ready(
                                function () {
                                StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53429716%2fhow-to-filter-a-spark-dataframe-based-on-occurrence-of-a-value-in-a-column-with%23new-answer', 'question_page');
                                }
                                );

                                Post as a guest















                                Required, but never shown





















































                                Required, but never shown














                                Required, but never shown












                                Required, but never shown







                                Required, but never shown

































                                Required, but never shown














                                Required, but never shown












                                Required, but never shown







                                Required, but never shown







                                Popular posts from this blog

                                MongoDB - Not Authorized To Execute Command

                                How to fix TextFormField cause rebuild widget in Flutter

                                in spring boot 2.1 many test slices are not allowed anymore due to multiple @BootstrapWith