sparklyr - Including null values in an Apache Spark Join












2















The question Including null values in an Apache Spark Join has answers for Scala, PySpark and SparkR, but not for sparklyr. I've been unable to figure out how to have inner_join in sparklyr treat null values in a join column as equal. Does anyone know how this can be done in sparklyr?










share|improve this question





























    2















    The question Including null values in an Apache Spark Join has answers for Scala, PySpark and SparkR, but not for sparklyr. I've been unable to figure out how to have inner_join in sparklyr treat null values in a join column as equal. Does anyone know how this can be done in sparklyr?










    share|improve this question



























      2












      2








      2








      The question Including null values in an Apache Spark Join has answers for Scala, PySpark and SparkR, but not for sparklyr. I've been unable to figure out how to have inner_join in sparklyr treat null values in a join column as equal. Does anyone know how this can be done in sparklyr?










      share|improve this question
















      The question Including null values in an Apache Spark Join has answers for Scala, PySpark and SparkR, but not for sparklyr. I've been unable to figure out how to have inner_join in sparklyr treat null values in a join column as equal. Does anyone know how this can be done in sparklyr?







      r apache-spark join dplyr sparklyr






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Jan 10 at 16:09









      user6910411

      35.7k1090110




      35.7k1090110










      asked Jan 3 at 0:49









      Dave KincaidDave Kincaid

      2,15611729




      2,15611729
























          1 Answer
          1






          active

          oldest

          votes


















          1














          You can invoke an implicit cross join:



          #' Return a Cartesian product of Spark tables
          #'
          #' @param df1 tbl_spark
          #' @param df2 tbl_spark
          #' @param explicit logical If TRUE use crossJoin otherwise
          #' join without expression
          #' @param suffix character suffixes to be used on duplicate names
          cross_join <- function(df1, df2,
          explicit = FALSE, suffix = c("_x", "_y")) {

          common_cols <- intersect(colnames(df1), colnames(df2))

          if(length(common_cols) > 0) {
          df1 <- df1 %>% rename_at(common_cols, funs(paste0(., suffix[1])))
          df2 <- df2 %>% rename_at(common_cols, funs(paste0(., suffix[2])))
          }

          sparklyr::invoke(
          spark_dataframe(df1),
          if(explicit) "crossJoin" else "join",
          spark_dataframe(df2)) %>% sdf_register()
          }


          and filter the result with IS NOT DISTINCT FROM



          # Enable Cross joins
          sc %>%
          spark_session() %>%
          sparklyr::invoke("conf") %>%
          sparklyr::invoke("set", "spark.sql.crossJoin.enabled", "true")

          df1 <- copy_to(sc, tibble(id1 = c(NA, "foo", "bar"), val = 1:3))
          df2 <- copy_to(sc, tibble(id2 = c(NA, "foo", "baz"), val = 4:6))

          df1 %>%
          cross_join(df2) %>%
          filter(id1 %IS NOT DISTINCT FROM% id2)


          # Source: spark<?> [?? x 4]
          id1 val_x id2 val_y
          * <chr> <int> <chr> <int>
          1 NA 1 NA 4
          2 foo 2 foo 5


          The optimized execution plan:



          <jobj[62]>
          org.apache.spark.sql.catalyst.plans.logical.Join
          Join Inner, (id1#10 <=> id2#76)
          :- Project [id1#10, val#11 AS val_x#129]
          : +- InMemoryRelation [id1#10, val#11], StorageLevel(disk, memory, deserialized, 1 replicas)
          : +- Scan ExistingRDD[id1#10,val#11]
          +- Project [id2#76, val#77 AS val_y#132]
          +- InMemoryRelation [id2#76, val#77], StorageLevel(disk, memory, deserialized, 1 replicas)
          +- Scan ExistingRDD[id2#76,val#77]


          <=> operator should work the same way:



          df1 %>%
          cross_join(df2) %>%
          filter(id1 %<=>% id2)


          Please note that:




          • Implicit cross join will fail, if it is not followed by selection which promotes the result to hash join / sort merge join, or cross join is enabled.

          • Explicit cross join shouldn't be used in this case, as it will take precedence over subsequent selections.


          • It is possible to use dplyr style cross join:



            mutate(df1, `_const` = TRUE) %>%  
            inner_join(mutate(df2, `_const` = TRUE), by = c("_const")) %>%
            select(-`_const`) %>%
            filter(id1 %IS NOT DISTINCT FROM% id2)


            but I'd advise against that, as it is less robust (depending on the context optimizer might be unable to recognize that _const is constant).








          share|improve this answer


























            Your Answer






            StackExchange.ifUsing("editor", function () {
            StackExchange.using("externalEditor", function () {
            StackExchange.using("snippets", function () {
            StackExchange.snippets.init();
            });
            });
            }, "code-snippets");

            StackExchange.ready(function() {
            var channelOptions = {
            tags: "".split(" "),
            id: "1"
            };
            initTagRenderer("".split(" "), "".split(" "), channelOptions);

            StackExchange.using("externalEditor", function() {
            // Have to fire editor after snippets, if snippets enabled
            if (StackExchange.settings.snippets.snippetsEnabled) {
            StackExchange.using("snippets", function() {
            createEditor();
            });
            }
            else {
            createEditor();
            }
            });

            function createEditor() {
            StackExchange.prepareEditor({
            heartbeatType: 'answer',
            autoActivateHeartbeat: false,
            convertImagesToLinks: true,
            noModals: true,
            showLowRepImageUploadWarning: true,
            reputationToPostImages: 10,
            bindNavPrevention: true,
            postfix: "",
            imageUploader: {
            brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
            contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
            allowUrls: true
            },
            onDemand: true,
            discardSelector: ".discard-answer"
            ,immediatelyShowMarkdownHelp:true
            });


            }
            });














            draft saved

            draft discarded


















            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54015034%2fsparklyr-including-null-values-in-an-apache-spark-join%23new-answer', 'question_page');
            }
            );

            Post as a guest















            Required, but never shown

























            1 Answer
            1






            active

            oldest

            votes








            1 Answer
            1






            active

            oldest

            votes









            active

            oldest

            votes






            active

            oldest

            votes









            1














            You can invoke an implicit cross join:



            #' Return a Cartesian product of Spark tables
            #'
            #' @param df1 tbl_spark
            #' @param df2 tbl_spark
            #' @param explicit logical If TRUE use crossJoin otherwise
            #' join without expression
            #' @param suffix character suffixes to be used on duplicate names
            cross_join <- function(df1, df2,
            explicit = FALSE, suffix = c("_x", "_y")) {

            common_cols <- intersect(colnames(df1), colnames(df2))

            if(length(common_cols) > 0) {
            df1 <- df1 %>% rename_at(common_cols, funs(paste0(., suffix[1])))
            df2 <- df2 %>% rename_at(common_cols, funs(paste0(., suffix[2])))
            }

            sparklyr::invoke(
            spark_dataframe(df1),
            if(explicit) "crossJoin" else "join",
            spark_dataframe(df2)) %>% sdf_register()
            }


            and filter the result with IS NOT DISTINCT FROM



            # Enable Cross joins
            sc %>%
            spark_session() %>%
            sparklyr::invoke("conf") %>%
            sparklyr::invoke("set", "spark.sql.crossJoin.enabled", "true")

            df1 <- copy_to(sc, tibble(id1 = c(NA, "foo", "bar"), val = 1:3))
            df2 <- copy_to(sc, tibble(id2 = c(NA, "foo", "baz"), val = 4:6))

            df1 %>%
            cross_join(df2) %>%
            filter(id1 %IS NOT DISTINCT FROM% id2)


            # Source: spark<?> [?? x 4]
            id1 val_x id2 val_y
            * <chr> <int> <chr> <int>
            1 NA 1 NA 4
            2 foo 2 foo 5


            The optimized execution plan:



            <jobj[62]>
            org.apache.spark.sql.catalyst.plans.logical.Join
            Join Inner, (id1#10 <=> id2#76)
            :- Project [id1#10, val#11 AS val_x#129]
            : +- InMemoryRelation [id1#10, val#11], StorageLevel(disk, memory, deserialized, 1 replicas)
            : +- Scan ExistingRDD[id1#10,val#11]
            +- Project [id2#76, val#77 AS val_y#132]
            +- InMemoryRelation [id2#76, val#77], StorageLevel(disk, memory, deserialized, 1 replicas)
            +- Scan ExistingRDD[id2#76,val#77]


            <=> operator should work the same way:



            df1 %>%
            cross_join(df2) %>%
            filter(id1 %<=>% id2)


            Please note that:




            • Implicit cross join will fail, if it is not followed by selection which promotes the result to hash join / sort merge join, or cross join is enabled.

            • Explicit cross join shouldn't be used in this case, as it will take precedence over subsequent selections.


            • It is possible to use dplyr style cross join:



              mutate(df1, `_const` = TRUE) %>%  
              inner_join(mutate(df2, `_const` = TRUE), by = c("_const")) %>%
              select(-`_const`) %>%
              filter(id1 %IS NOT DISTINCT FROM% id2)


              but I'd advise against that, as it is less robust (depending on the context optimizer might be unable to recognize that _const is constant).








            share|improve this answer






























              1














              You can invoke an implicit cross join:



              #' Return a Cartesian product of Spark tables
              #'
              #' @param df1 tbl_spark
              #' @param df2 tbl_spark
              #' @param explicit logical If TRUE use crossJoin otherwise
              #' join without expression
              #' @param suffix character suffixes to be used on duplicate names
              cross_join <- function(df1, df2,
              explicit = FALSE, suffix = c("_x", "_y")) {

              common_cols <- intersect(colnames(df1), colnames(df2))

              if(length(common_cols) > 0) {
              df1 <- df1 %>% rename_at(common_cols, funs(paste0(., suffix[1])))
              df2 <- df2 %>% rename_at(common_cols, funs(paste0(., suffix[2])))
              }

              sparklyr::invoke(
              spark_dataframe(df1),
              if(explicit) "crossJoin" else "join",
              spark_dataframe(df2)) %>% sdf_register()
              }


              and filter the result with IS NOT DISTINCT FROM



              # Enable Cross joins
              sc %>%
              spark_session() %>%
              sparklyr::invoke("conf") %>%
              sparklyr::invoke("set", "spark.sql.crossJoin.enabled", "true")

              df1 <- copy_to(sc, tibble(id1 = c(NA, "foo", "bar"), val = 1:3))
              df2 <- copy_to(sc, tibble(id2 = c(NA, "foo", "baz"), val = 4:6))

              df1 %>%
              cross_join(df2) %>%
              filter(id1 %IS NOT DISTINCT FROM% id2)


              # Source: spark<?> [?? x 4]
              id1 val_x id2 val_y
              * <chr> <int> <chr> <int>
              1 NA 1 NA 4
              2 foo 2 foo 5


              The optimized execution plan:



              <jobj[62]>
              org.apache.spark.sql.catalyst.plans.logical.Join
              Join Inner, (id1#10 <=> id2#76)
              :- Project [id1#10, val#11 AS val_x#129]
              : +- InMemoryRelation [id1#10, val#11], StorageLevel(disk, memory, deserialized, 1 replicas)
              : +- Scan ExistingRDD[id1#10,val#11]
              +- Project [id2#76, val#77 AS val_y#132]
              +- InMemoryRelation [id2#76, val#77], StorageLevel(disk, memory, deserialized, 1 replicas)
              +- Scan ExistingRDD[id2#76,val#77]


              <=> operator should work the same way:



              df1 %>%
              cross_join(df2) %>%
              filter(id1 %<=>% id2)


              Please note that:




              • Implicit cross join will fail, if it is not followed by selection which promotes the result to hash join / sort merge join, or cross join is enabled.

              • Explicit cross join shouldn't be used in this case, as it will take precedence over subsequent selections.


              • It is possible to use dplyr style cross join:



                mutate(df1, `_const` = TRUE) %>%  
                inner_join(mutate(df2, `_const` = TRUE), by = c("_const")) %>%
                select(-`_const`) %>%
                filter(id1 %IS NOT DISTINCT FROM% id2)


                but I'd advise against that, as it is less robust (depending on the context optimizer might be unable to recognize that _const is constant).








              share|improve this answer




























                1












                1








                1







                You can invoke an implicit cross join:



                #' Return a Cartesian product of Spark tables
                #'
                #' @param df1 tbl_spark
                #' @param df2 tbl_spark
                #' @param explicit logical If TRUE use crossJoin otherwise
                #' join without expression
                #' @param suffix character suffixes to be used on duplicate names
                cross_join <- function(df1, df2,
                explicit = FALSE, suffix = c("_x", "_y")) {

                common_cols <- intersect(colnames(df1), colnames(df2))

                if(length(common_cols) > 0) {
                df1 <- df1 %>% rename_at(common_cols, funs(paste0(., suffix[1])))
                df2 <- df2 %>% rename_at(common_cols, funs(paste0(., suffix[2])))
                }

                sparklyr::invoke(
                spark_dataframe(df1),
                if(explicit) "crossJoin" else "join",
                spark_dataframe(df2)) %>% sdf_register()
                }


                and filter the result with IS NOT DISTINCT FROM



                # Enable Cross joins
                sc %>%
                spark_session() %>%
                sparklyr::invoke("conf") %>%
                sparklyr::invoke("set", "spark.sql.crossJoin.enabled", "true")

                df1 <- copy_to(sc, tibble(id1 = c(NA, "foo", "bar"), val = 1:3))
                df2 <- copy_to(sc, tibble(id2 = c(NA, "foo", "baz"), val = 4:6))

                df1 %>%
                cross_join(df2) %>%
                filter(id1 %IS NOT DISTINCT FROM% id2)


                # Source: spark<?> [?? x 4]
                id1 val_x id2 val_y
                * <chr> <int> <chr> <int>
                1 NA 1 NA 4
                2 foo 2 foo 5


                The optimized execution plan:



                <jobj[62]>
                org.apache.spark.sql.catalyst.plans.logical.Join
                Join Inner, (id1#10 <=> id2#76)
                :- Project [id1#10, val#11 AS val_x#129]
                : +- InMemoryRelation [id1#10, val#11], StorageLevel(disk, memory, deserialized, 1 replicas)
                : +- Scan ExistingRDD[id1#10,val#11]
                +- Project [id2#76, val#77 AS val_y#132]
                +- InMemoryRelation [id2#76, val#77], StorageLevel(disk, memory, deserialized, 1 replicas)
                +- Scan ExistingRDD[id2#76,val#77]


                <=> operator should work the same way:



                df1 %>%
                cross_join(df2) %>%
                filter(id1 %<=>% id2)


                Please note that:




                • Implicit cross join will fail, if it is not followed by selection which promotes the result to hash join / sort merge join, or cross join is enabled.

                • Explicit cross join shouldn't be used in this case, as it will take precedence over subsequent selections.


                • It is possible to use dplyr style cross join:



                  mutate(df1, `_const` = TRUE) %>%  
                  inner_join(mutate(df2, `_const` = TRUE), by = c("_const")) %>%
                  select(-`_const`) %>%
                  filter(id1 %IS NOT DISTINCT FROM% id2)


                  but I'd advise against that, as it is less robust (depending on the context optimizer might be unable to recognize that _const is constant).








                share|improve this answer















                You can invoke an implicit cross join:



                #' Return a Cartesian product of Spark tables
                #'
                #' @param df1 tbl_spark
                #' @param df2 tbl_spark
                #' @param explicit logical If TRUE use crossJoin otherwise
                #' join without expression
                #' @param suffix character suffixes to be used on duplicate names
                cross_join <- function(df1, df2,
                explicit = FALSE, suffix = c("_x", "_y")) {

                common_cols <- intersect(colnames(df1), colnames(df2))

                if(length(common_cols) > 0) {
                df1 <- df1 %>% rename_at(common_cols, funs(paste0(., suffix[1])))
                df2 <- df2 %>% rename_at(common_cols, funs(paste0(., suffix[2])))
                }

                sparklyr::invoke(
                spark_dataframe(df1),
                if(explicit) "crossJoin" else "join",
                spark_dataframe(df2)) %>% sdf_register()
                }


                and filter the result with IS NOT DISTINCT FROM



                # Enable Cross joins
                sc %>%
                spark_session() %>%
                sparklyr::invoke("conf") %>%
                sparklyr::invoke("set", "spark.sql.crossJoin.enabled", "true")

                df1 <- copy_to(sc, tibble(id1 = c(NA, "foo", "bar"), val = 1:3))
                df2 <- copy_to(sc, tibble(id2 = c(NA, "foo", "baz"), val = 4:6))

                df1 %>%
                cross_join(df2) %>%
                filter(id1 %IS NOT DISTINCT FROM% id2)


                # Source: spark<?> [?? x 4]
                id1 val_x id2 val_y
                * <chr> <int> <chr> <int>
                1 NA 1 NA 4
                2 foo 2 foo 5


                The optimized execution plan:



                <jobj[62]>
                org.apache.spark.sql.catalyst.plans.logical.Join
                Join Inner, (id1#10 <=> id2#76)
                :- Project [id1#10, val#11 AS val_x#129]
                : +- InMemoryRelation [id1#10, val#11], StorageLevel(disk, memory, deserialized, 1 replicas)
                : +- Scan ExistingRDD[id1#10,val#11]
                +- Project [id2#76, val#77 AS val_y#132]
                +- InMemoryRelation [id2#76, val#77], StorageLevel(disk, memory, deserialized, 1 replicas)
                +- Scan ExistingRDD[id2#76,val#77]


                <=> operator should work the same way:



                df1 %>%
                cross_join(df2) %>%
                filter(id1 %<=>% id2)


                Please note that:




                • Implicit cross join will fail, if it is not followed by selection which promotes the result to hash join / sort merge join, or cross join is enabled.

                • Explicit cross join shouldn't be used in this case, as it will take precedence over subsequent selections.


                • It is possible to use dplyr style cross join:



                  mutate(df1, `_const` = TRUE) %>%  
                  inner_join(mutate(df2, `_const` = TRUE), by = c("_const")) %>%
                  select(-`_const`) %>%
                  filter(id1 %IS NOT DISTINCT FROM% id2)


                  but I'd advise against that, as it is less robust (depending on the context optimizer might be unable to recognize that _const is constant).









                share|improve this answer














                share|improve this answer



                share|improve this answer








                edited Feb 6 at 14:58

























                answered Jan 3 at 13:04









                user6910411user6910411

                35.7k1090110




                35.7k1090110
































                    draft saved

                    draft discarded




















































                    Thanks for contributing an answer to Stack Overflow!


                    • Please be sure to answer the question. Provide details and share your research!

                    But avoid



                    • Asking for help, clarification, or responding to other answers.

                    • Making statements based on opinion; back them up with references or personal experience.


                    To learn more, see our tips on writing great answers.




                    draft saved


                    draft discarded














                    StackExchange.ready(
                    function () {
                    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54015034%2fsparklyr-including-null-values-in-an-apache-spark-join%23new-answer', 'question_page');
                    }
                    );

                    Post as a guest















                    Required, but never shown





















































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown

































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown







                    Popular posts from this blog

                    Can a sorcerer learn a 5th-level spell early by creating spell slots using the Font of Magic feature?

                    Does disintegrating a polymorphed enemy still kill it after the 2018 errata?

                    A Topological Invariant for $pi_3(U(n))$