Why does Dataset.unpersist cascade to all dependent cached Datasets?












2















I am using spark 2.3.2. For my use case, I'm caching first dataframe and then second dataframe.



Trying to replicate the same.



scala> val df = spark.range(1, 1000000).withColumn("rand", (rand * 100).cast("int")).cache
df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint, rand: int]

scala> df.count
res0: Long = 999999

scala> val aggDf = df.groupBy("rand").agg(count("id") as "count").cache
aggDf: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [rand: int, count: bigint]

scala> aggDf.count
res1: Long = 100


As use can see in below image, There are two RDD's for each dataframe.



enter image description here



Now, When I'm going to unpersist my first dataframe, spark is unpersisting both.



df.unpersist()


Trying to understand this weird behaviour, Why spark is unpersisting both dataframe instead of first?

Am I missing something?










share|improve this question





























    2















    I am using spark 2.3.2. For my use case, I'm caching first dataframe and then second dataframe.



    Trying to replicate the same.



    scala> val df = spark.range(1, 1000000).withColumn("rand", (rand * 100).cast("int")).cache
    df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint, rand: int]

    scala> df.count
    res0: Long = 999999

    scala> val aggDf = df.groupBy("rand").agg(count("id") as "count").cache
    aggDf: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [rand: int, count: bigint]

    scala> aggDf.count
    res1: Long = 100


    As use can see in below image, There are two RDD's for each dataframe.



    enter image description here



    Now, When I'm going to unpersist my first dataframe, spark is unpersisting both.



    df.unpersist()


    Trying to understand this weird behaviour, Why spark is unpersisting both dataframe instead of first?

    Am I missing something?










    share|improve this question



























      2












      2








      2








      I am using spark 2.3.2. For my use case, I'm caching first dataframe and then second dataframe.



      Trying to replicate the same.



      scala> val df = spark.range(1, 1000000).withColumn("rand", (rand * 100).cast("int")).cache
      df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint, rand: int]

      scala> df.count
      res0: Long = 999999

      scala> val aggDf = df.groupBy("rand").agg(count("id") as "count").cache
      aggDf: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [rand: int, count: bigint]

      scala> aggDf.count
      res1: Long = 100


      As use can see in below image, There are two RDD's for each dataframe.



      enter image description here



      Now, When I'm going to unpersist my first dataframe, spark is unpersisting both.



      df.unpersist()


      Trying to understand this weird behaviour, Why spark is unpersisting both dataframe instead of first?

      Am I missing something?










      share|improve this question
















      I am using spark 2.3.2. For my use case, I'm caching first dataframe and then second dataframe.



      Trying to replicate the same.



      scala> val df = spark.range(1, 1000000).withColumn("rand", (rand * 100).cast("int")).cache
      df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint, rand: int]

      scala> df.count
      res0: Long = 999999

      scala> val aggDf = df.groupBy("rand").agg(count("id") as "count").cache
      aggDf: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [rand: int, count: bigint]

      scala> aggDf.count
      res1: Long = 100


      As use can see in below image, There are two RDD's for each dataframe.



      enter image description here



      Now, When I'm going to unpersist my first dataframe, spark is unpersisting both.



      df.unpersist()


      Trying to understand this weird behaviour, Why spark is unpersisting both dataframe instead of first?

      Am I missing something?







      apache-spark apache-spark-sql






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Jan 1 at 17:53









      Jacek Laskowski

      45.5k18134274




      45.5k18134274










      asked Jan 1 at 13:02









      KaushalKaushal

      1,94031935




      1,94031935
























          1 Answer
          1






          active

          oldest

          votes


















          2














          Quoting SPARK-21478 Unpersist a DF also unpersists related DFs:




          This is by design. We do not want to use the invalid cached data.



          The current cache design need to ensure the query correctness. If you want to keep the cached data, even if the data is stale. You need to materialize it by saving it as a table.




          That however has been changed in 2.4.0 in SPARK-24596 Non-cascading Cache Invalidation:




          When invalidating a cache, we invalid other caches dependent on this cache to ensure cached data is up to date. For example, when the underlying table has been modified or the table has been dropped itself, all caches that use this table should be invalidated or refreshed.



          However, in other cases, like when user simply want to drop a cache to free up memory, we do not need to invalidate dependent caches since no underlying data has been changed. For this reason, we would like to introduce a new cache invalidation mode: the non-cascading cache invalidation.




          Since you're using 2.3.2 you have to follow the recommendation to save to a table or upgrade to 2.4.0.






          share|improve this answer























            Your Answer






            StackExchange.ifUsing("editor", function () {
            StackExchange.using("externalEditor", function () {
            StackExchange.using("snippets", function () {
            StackExchange.snippets.init();
            });
            });
            }, "code-snippets");

            StackExchange.ready(function() {
            var channelOptions = {
            tags: "".split(" "),
            id: "1"
            };
            initTagRenderer("".split(" "), "".split(" "), channelOptions);

            StackExchange.using("externalEditor", function() {
            // Have to fire editor after snippets, if snippets enabled
            if (StackExchange.settings.snippets.snippetsEnabled) {
            StackExchange.using("snippets", function() {
            createEditor();
            });
            }
            else {
            createEditor();
            }
            });

            function createEditor() {
            StackExchange.prepareEditor({
            heartbeatType: 'answer',
            autoActivateHeartbeat: false,
            convertImagesToLinks: true,
            noModals: true,
            showLowRepImageUploadWarning: true,
            reputationToPostImages: 10,
            bindNavPrevention: true,
            postfix: "",
            imageUploader: {
            brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
            contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
            allowUrls: true
            },
            onDemand: true,
            discardSelector: ".discard-answer"
            ,immediatelyShowMarkdownHelp:true
            });


            }
            });














            draft saved

            draft discarded


















            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53995652%2fwhy-does-dataset-unpersist-cascade-to-all-dependent-cached-datasets%23new-answer', 'question_page');
            }
            );

            Post as a guest















            Required, but never shown

























            1 Answer
            1






            active

            oldest

            votes








            1 Answer
            1






            active

            oldest

            votes









            active

            oldest

            votes






            active

            oldest

            votes









            2














            Quoting SPARK-21478 Unpersist a DF also unpersists related DFs:




            This is by design. We do not want to use the invalid cached data.



            The current cache design need to ensure the query correctness. If you want to keep the cached data, even if the data is stale. You need to materialize it by saving it as a table.




            That however has been changed in 2.4.0 in SPARK-24596 Non-cascading Cache Invalidation:




            When invalidating a cache, we invalid other caches dependent on this cache to ensure cached data is up to date. For example, when the underlying table has been modified or the table has been dropped itself, all caches that use this table should be invalidated or refreshed.



            However, in other cases, like when user simply want to drop a cache to free up memory, we do not need to invalidate dependent caches since no underlying data has been changed. For this reason, we would like to introduce a new cache invalidation mode: the non-cascading cache invalidation.




            Since you're using 2.3.2 you have to follow the recommendation to save to a table or upgrade to 2.4.0.






            share|improve this answer




























              2














              Quoting SPARK-21478 Unpersist a DF also unpersists related DFs:




              This is by design. We do not want to use the invalid cached data.



              The current cache design need to ensure the query correctness. If you want to keep the cached data, even if the data is stale. You need to materialize it by saving it as a table.




              That however has been changed in 2.4.0 in SPARK-24596 Non-cascading Cache Invalidation:




              When invalidating a cache, we invalid other caches dependent on this cache to ensure cached data is up to date. For example, when the underlying table has been modified or the table has been dropped itself, all caches that use this table should be invalidated or refreshed.



              However, in other cases, like when user simply want to drop a cache to free up memory, we do not need to invalidate dependent caches since no underlying data has been changed. For this reason, we would like to introduce a new cache invalidation mode: the non-cascading cache invalidation.




              Since you're using 2.3.2 you have to follow the recommendation to save to a table or upgrade to 2.4.0.






              share|improve this answer


























                2












                2








                2







                Quoting SPARK-21478 Unpersist a DF also unpersists related DFs:




                This is by design. We do not want to use the invalid cached data.



                The current cache design need to ensure the query correctness. If you want to keep the cached data, even if the data is stale. You need to materialize it by saving it as a table.




                That however has been changed in 2.4.0 in SPARK-24596 Non-cascading Cache Invalidation:




                When invalidating a cache, we invalid other caches dependent on this cache to ensure cached data is up to date. For example, when the underlying table has been modified or the table has been dropped itself, all caches that use this table should be invalidated or refreshed.



                However, in other cases, like when user simply want to drop a cache to free up memory, we do not need to invalidate dependent caches since no underlying data has been changed. For this reason, we would like to introduce a new cache invalidation mode: the non-cascading cache invalidation.




                Since you're using 2.3.2 you have to follow the recommendation to save to a table or upgrade to 2.4.0.






                share|improve this answer













                Quoting SPARK-21478 Unpersist a DF also unpersists related DFs:




                This is by design. We do not want to use the invalid cached data.



                The current cache design need to ensure the query correctness. If you want to keep the cached data, even if the data is stale. You need to materialize it by saving it as a table.




                That however has been changed in 2.4.0 in SPARK-24596 Non-cascading Cache Invalidation:




                When invalidating a cache, we invalid other caches dependent on this cache to ensure cached data is up to date. For example, when the underlying table has been modified or the table has been dropped itself, all caches that use this table should be invalidated or refreshed.



                However, in other cases, like when user simply want to drop a cache to free up memory, we do not need to invalidate dependent caches since no underlying data has been changed. For this reason, we would like to introduce a new cache invalidation mode: the non-cascading cache invalidation.




                Since you're using 2.3.2 you have to follow the recommendation to save to a table or upgrade to 2.4.0.







                share|improve this answer












                share|improve this answer



                share|improve this answer










                answered Jan 1 at 17:52









                Jacek LaskowskiJacek Laskowski

                45.5k18134274




                45.5k18134274
































                    draft saved

                    draft discarded




















































                    Thanks for contributing an answer to Stack Overflow!


                    • Please be sure to answer the question. Provide details and share your research!

                    But avoid



                    • Asking for help, clarification, or responding to other answers.

                    • Making statements based on opinion; back them up with references or personal experience.


                    To learn more, see our tips on writing great answers.




                    draft saved


                    draft discarded














                    StackExchange.ready(
                    function () {
                    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53995652%2fwhy-does-dataset-unpersist-cascade-to-all-dependent-cached-datasets%23new-answer', 'question_page');
                    }
                    );

                    Post as a guest















                    Required, but never shown





















































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown

































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown







                    Popular posts from this blog

                    MongoDB - Not Authorized To Execute Command

                    in spring boot 2.1 many test slices are not allowed anymore due to multiple @BootstrapWith

                    Npm cannot find a required file even through it is in the searched directory