Spark Scala Checkpointing Data Set showing .isCheckpointed = false after Action but directories written












0















There seem to be a few postings on this but none seem to answer what I understand.



The following code run on DataBricks:



spark.sparkContext.setCheckpointDir("/dbfs/FileStore/checkpoint/cp1/loc7")
val checkpointDir = spark.sparkContext.getCheckpointDir.get
val ds = spark.range(10).repartition(2)
ds.cache()
ds.checkpoint()
ds.count()
ds.rdd.isCheckpointed


Added an improvement of sorts:



...
val ds2 = ds.checkpoint(eager=true)
println(ds2.queryExecution.toRdd.toDebugString)
...


returns:



(2) MapPartitionsRDD[307] at toRdd at command-1835760423149753:13 
| MapPartitionsRDD[305] at checkpoint at command-1835760423149753:12
| ReliableCheckpointRDD[306] at checkpoint at command-1835760423149753:12
checkpointDir: String = dbfs:/dbfs/FileStore/checkpoint/cp1/loc10/86cc77b5-27c3-4049-9136-503ddcab0fa9
ds: org.apache.spark.sql.Dataset[Long] = [id: bigint]
ds2: org.apache.spark.sql.Dataset[Long] = [id: bigint]
res53: Boolean = false


Question 1:



ds.rdd.isCheckpointed or ds2.rdd.isCheckpointed both return False even though with count I have a non-lazy situation. Why, when in particular the ../loc 7 & 10 are written with (part) files? Also we can see that ReliableCheckPoint!



Not well explained the whole concept. Trying to sort this out.



Question 2 - secondary question:



Is the cache really necessary or not with latest versions of Spark 2.4? A new branch on the ds, if not cached, will it cause re-computation or is that better now? Seems odd the checkpoint data would not be used, or could we say Spark does not really know what is better?



From High Performance Spark I get the mixed impression that check pointing is not so recommended, but then again it is.










share|improve this question

























  • Possible duplicate of pyspark rdd isCheckPointed() is false

    – OmG
    Jan 2 at 11:13
















0















There seem to be a few postings on this but none seem to answer what I understand.



The following code run on DataBricks:



spark.sparkContext.setCheckpointDir("/dbfs/FileStore/checkpoint/cp1/loc7")
val checkpointDir = spark.sparkContext.getCheckpointDir.get
val ds = spark.range(10).repartition(2)
ds.cache()
ds.checkpoint()
ds.count()
ds.rdd.isCheckpointed


Added an improvement of sorts:



...
val ds2 = ds.checkpoint(eager=true)
println(ds2.queryExecution.toRdd.toDebugString)
...


returns:



(2) MapPartitionsRDD[307] at toRdd at command-1835760423149753:13 
| MapPartitionsRDD[305] at checkpoint at command-1835760423149753:12
| ReliableCheckpointRDD[306] at checkpoint at command-1835760423149753:12
checkpointDir: String = dbfs:/dbfs/FileStore/checkpoint/cp1/loc10/86cc77b5-27c3-4049-9136-503ddcab0fa9
ds: org.apache.spark.sql.Dataset[Long] = [id: bigint]
ds2: org.apache.spark.sql.Dataset[Long] = [id: bigint]
res53: Boolean = false


Question 1:



ds.rdd.isCheckpointed or ds2.rdd.isCheckpointed both return False even though with count I have a non-lazy situation. Why, when in particular the ../loc 7 & 10 are written with (part) files? Also we can see that ReliableCheckPoint!



Not well explained the whole concept. Trying to sort this out.



Question 2 - secondary question:



Is the cache really necessary or not with latest versions of Spark 2.4? A new branch on the ds, if not cached, will it cause re-computation or is that better now? Seems odd the checkpoint data would not be used, or could we say Spark does not really know what is better?



From High Performance Spark I get the mixed impression that check pointing is not so recommended, but then again it is.










share|improve this question

























  • Possible duplicate of pyspark rdd isCheckPointed() is false

    – OmG
    Jan 2 at 11:13














0












0








0








There seem to be a few postings on this but none seem to answer what I understand.



The following code run on DataBricks:



spark.sparkContext.setCheckpointDir("/dbfs/FileStore/checkpoint/cp1/loc7")
val checkpointDir = spark.sparkContext.getCheckpointDir.get
val ds = spark.range(10).repartition(2)
ds.cache()
ds.checkpoint()
ds.count()
ds.rdd.isCheckpointed


Added an improvement of sorts:



...
val ds2 = ds.checkpoint(eager=true)
println(ds2.queryExecution.toRdd.toDebugString)
...


returns:



(2) MapPartitionsRDD[307] at toRdd at command-1835760423149753:13 
| MapPartitionsRDD[305] at checkpoint at command-1835760423149753:12
| ReliableCheckpointRDD[306] at checkpoint at command-1835760423149753:12
checkpointDir: String = dbfs:/dbfs/FileStore/checkpoint/cp1/loc10/86cc77b5-27c3-4049-9136-503ddcab0fa9
ds: org.apache.spark.sql.Dataset[Long] = [id: bigint]
ds2: org.apache.spark.sql.Dataset[Long] = [id: bigint]
res53: Boolean = false


Question 1:



ds.rdd.isCheckpointed or ds2.rdd.isCheckpointed both return False even though with count I have a non-lazy situation. Why, when in particular the ../loc 7 & 10 are written with (part) files? Also we can see that ReliableCheckPoint!



Not well explained the whole concept. Trying to sort this out.



Question 2 - secondary question:



Is the cache really necessary or not with latest versions of Spark 2.4? A new branch on the ds, if not cached, will it cause re-computation or is that better now? Seems odd the checkpoint data would not be used, or could we say Spark does not really know what is better?



From High Performance Spark I get the mixed impression that check pointing is not so recommended, but then again it is.










share|improve this question
















There seem to be a few postings on this but none seem to answer what I understand.



The following code run on DataBricks:



spark.sparkContext.setCheckpointDir("/dbfs/FileStore/checkpoint/cp1/loc7")
val checkpointDir = spark.sparkContext.getCheckpointDir.get
val ds = spark.range(10).repartition(2)
ds.cache()
ds.checkpoint()
ds.count()
ds.rdd.isCheckpointed


Added an improvement of sorts:



...
val ds2 = ds.checkpoint(eager=true)
println(ds2.queryExecution.toRdd.toDebugString)
...


returns:



(2) MapPartitionsRDD[307] at toRdd at command-1835760423149753:13 
| MapPartitionsRDD[305] at checkpoint at command-1835760423149753:12
| ReliableCheckpointRDD[306] at checkpoint at command-1835760423149753:12
checkpointDir: String = dbfs:/dbfs/FileStore/checkpoint/cp1/loc10/86cc77b5-27c3-4049-9136-503ddcab0fa9
ds: org.apache.spark.sql.Dataset[Long] = [id: bigint]
ds2: org.apache.spark.sql.Dataset[Long] = [id: bigint]
res53: Boolean = false


Question 1:



ds.rdd.isCheckpointed or ds2.rdd.isCheckpointed both return False even though with count I have a non-lazy situation. Why, when in particular the ../loc 7 & 10 are written with (part) files? Also we can see that ReliableCheckPoint!



Not well explained the whole concept. Trying to sort this out.



Question 2 - secondary question:



Is the cache really necessary or not with latest versions of Spark 2.4? A new branch on the ds, if not cached, will it cause re-computation or is that better now? Seems odd the checkpoint data would not be used, or could we say Spark does not really know what is better?



From High Performance Spark I get the mixed impression that check pointing is not so recommended, but then again it is.







apache-spark apache-spark-sql checkpointing






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Jan 2 at 14:42









Community

11




11










asked Jan 2 at 11:08









thebluephantomthebluephantom

3,15231032




3,15231032













  • Possible duplicate of pyspark rdd isCheckPointed() is false

    – OmG
    Jan 2 at 11:13



















  • Possible duplicate of pyspark rdd isCheckPointed() is false

    – OmG
    Jan 2 at 11:13

















Possible duplicate of pyspark rdd isCheckPointed() is false

– OmG
Jan 2 at 11:13





Possible duplicate of pyspark rdd isCheckPointed() is false

– OmG
Jan 2 at 11:13












1 Answer
1






active

oldest

votes


















2














TL;DR: You don't inspect the object that is actually checkpointed:



ds2.queryExecution.toRdd.dependencies(0).rdd.isCheckpointed
// Boolean = true



ds.rdd.isCheckpointed or ds2.rdd.isCheckpointed both return False




That is an expected behavior. The object being checkpointed is not the converted RDD (which is a result of the additional transformations required to convert to external representation), that you reference, but the internal RDD object (in fact, as you see above, it is not even the latest internal RDD, but its parent).



Additionally, in the first case you just use a wrong Dataset object whatsoever - as explained in the linked answer Dataset.checkpoint returns a new Dataset




even though with count I have a non-lazy situation




That doesn't make much sense. The default checkpoint implementation is eager, therefore it force evaluates. Even if it wasn't for that, Dataset.count is not the right way to force evaluation.




Is the cache really necessary or not with latest version




As you can see in the linked source, Dataset.checkpoint uses RDD.checkpoint internally so the same rule apply. However you already execute a separate action to force checkpoint, so additional caching, especially considering the cost of Dataset persistence, could be an overkill.



Of course, if in doubt, you might consider benchmarking in a specific context.






share|improve this answer























    Your Answer






    StackExchange.ifUsing("editor", function () {
    StackExchange.using("externalEditor", function () {
    StackExchange.using("snippets", function () {
    StackExchange.snippets.init();
    });
    });
    }, "code-snippets");

    StackExchange.ready(function() {
    var channelOptions = {
    tags: "".split(" "),
    id: "1"
    };
    initTagRenderer("".split(" "), "".split(" "), channelOptions);

    StackExchange.using("externalEditor", function() {
    // Have to fire editor after snippets, if snippets enabled
    if (StackExchange.settings.snippets.snippetsEnabled) {
    StackExchange.using("snippets", function() {
    createEditor();
    });
    }
    else {
    createEditor();
    }
    });

    function createEditor() {
    StackExchange.prepareEditor({
    heartbeatType: 'answer',
    autoActivateHeartbeat: false,
    convertImagesToLinks: true,
    noModals: true,
    showLowRepImageUploadWarning: true,
    reputationToPostImages: 10,
    bindNavPrevention: true,
    postfix: "",
    imageUploader: {
    brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
    contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
    allowUrls: true
    },
    onDemand: true,
    discardSelector: ".discard-answer"
    ,immediatelyShowMarkdownHelp:true
    });


    }
    });














    draft saved

    draft discarded


















    StackExchange.ready(
    function () {
    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54005223%2fspark-scala-checkpointing-data-set-showing-ischeckpointed-false-after-action%23new-answer', 'question_page');
    }
    );

    Post as a guest















    Required, but never shown

























    1 Answer
    1






    active

    oldest

    votes








    1 Answer
    1






    active

    oldest

    votes









    active

    oldest

    votes






    active

    oldest

    votes









    2














    TL;DR: You don't inspect the object that is actually checkpointed:



    ds2.queryExecution.toRdd.dependencies(0).rdd.isCheckpointed
    // Boolean = true



    ds.rdd.isCheckpointed or ds2.rdd.isCheckpointed both return False




    That is an expected behavior. The object being checkpointed is not the converted RDD (which is a result of the additional transformations required to convert to external representation), that you reference, but the internal RDD object (in fact, as you see above, it is not even the latest internal RDD, but its parent).



    Additionally, in the first case you just use a wrong Dataset object whatsoever - as explained in the linked answer Dataset.checkpoint returns a new Dataset




    even though with count I have a non-lazy situation




    That doesn't make much sense. The default checkpoint implementation is eager, therefore it force evaluates. Even if it wasn't for that, Dataset.count is not the right way to force evaluation.




    Is the cache really necessary or not with latest version




    As you can see in the linked source, Dataset.checkpoint uses RDD.checkpoint internally so the same rule apply. However you already execute a separate action to force checkpoint, so additional caching, especially considering the cost of Dataset persistence, could be an overkill.



    Of course, if in doubt, you might consider benchmarking in a specific context.






    share|improve this answer




























      2














      TL;DR: You don't inspect the object that is actually checkpointed:



      ds2.queryExecution.toRdd.dependencies(0).rdd.isCheckpointed
      // Boolean = true



      ds.rdd.isCheckpointed or ds2.rdd.isCheckpointed both return False




      That is an expected behavior. The object being checkpointed is not the converted RDD (which is a result of the additional transformations required to convert to external representation), that you reference, but the internal RDD object (in fact, as you see above, it is not even the latest internal RDD, but its parent).



      Additionally, in the first case you just use a wrong Dataset object whatsoever - as explained in the linked answer Dataset.checkpoint returns a new Dataset




      even though with count I have a non-lazy situation




      That doesn't make much sense. The default checkpoint implementation is eager, therefore it force evaluates. Even if it wasn't for that, Dataset.count is not the right way to force evaluation.




      Is the cache really necessary or not with latest version




      As you can see in the linked source, Dataset.checkpoint uses RDD.checkpoint internally so the same rule apply. However you already execute a separate action to force checkpoint, so additional caching, especially considering the cost of Dataset persistence, could be an overkill.



      Of course, if in doubt, you might consider benchmarking in a specific context.






      share|improve this answer


























        2












        2








        2







        TL;DR: You don't inspect the object that is actually checkpointed:



        ds2.queryExecution.toRdd.dependencies(0).rdd.isCheckpointed
        // Boolean = true



        ds.rdd.isCheckpointed or ds2.rdd.isCheckpointed both return False




        That is an expected behavior. The object being checkpointed is not the converted RDD (which is a result of the additional transformations required to convert to external representation), that you reference, but the internal RDD object (in fact, as you see above, it is not even the latest internal RDD, but its parent).



        Additionally, in the first case you just use a wrong Dataset object whatsoever - as explained in the linked answer Dataset.checkpoint returns a new Dataset




        even though with count I have a non-lazy situation




        That doesn't make much sense. The default checkpoint implementation is eager, therefore it force evaluates. Even if it wasn't for that, Dataset.count is not the right way to force evaluation.




        Is the cache really necessary or not with latest version




        As you can see in the linked source, Dataset.checkpoint uses RDD.checkpoint internally so the same rule apply. However you already execute a separate action to force checkpoint, so additional caching, especially considering the cost of Dataset persistence, could be an overkill.



        Of course, if in doubt, you might consider benchmarking in a specific context.






        share|improve this answer













        TL;DR: You don't inspect the object that is actually checkpointed:



        ds2.queryExecution.toRdd.dependencies(0).rdd.isCheckpointed
        // Boolean = true



        ds.rdd.isCheckpointed or ds2.rdd.isCheckpointed both return False




        That is an expected behavior. The object being checkpointed is not the converted RDD (which is a result of the additional transformations required to convert to external representation), that you reference, but the internal RDD object (in fact, as you see above, it is not even the latest internal RDD, but its parent).



        Additionally, in the first case you just use a wrong Dataset object whatsoever - as explained in the linked answer Dataset.checkpoint returns a new Dataset




        even though with count I have a non-lazy situation




        That doesn't make much sense. The default checkpoint implementation is eager, therefore it force evaluates. Even if it wasn't for that, Dataset.count is not the right way to force evaluation.




        Is the cache really necessary or not with latest version




        As you can see in the linked source, Dataset.checkpoint uses RDD.checkpoint internally so the same rule apply. However you already execute a separate action to force checkpoint, so additional caching, especially considering the cost of Dataset persistence, could be an overkill.



        Of course, if in doubt, you might consider benchmarking in a specific context.







        share|improve this answer












        share|improve this answer



        share|improve this answer










        answered Jan 2 at 13:46









        user10465355user10465355

        2,1172521




        2,1172521
































            draft saved

            draft discarded




















































            Thanks for contributing an answer to Stack Overflow!


            • Please be sure to answer the question. Provide details and share your research!

            But avoid



            • Asking for help, clarification, or responding to other answers.

            • Making statements based on opinion; back them up with references or personal experience.


            To learn more, see our tips on writing great answers.




            draft saved


            draft discarded














            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54005223%2fspark-scala-checkpointing-data-set-showing-ischeckpointed-false-after-action%23new-answer', 'question_page');
            }
            );

            Post as a guest















            Required, but never shown





















































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown

































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown







            Popular posts from this blog

            MongoDB - Not Authorized To Execute Command

            How to fix TextFormField cause rebuild widget in Flutter

            Npm cannot find a required file even through it is in the searched directory