sparklyr - Including null values in an Apache Spark Join
The question Including null values in an Apache Spark Join has answers for Scala, PySpark and SparkR, but not for sparklyr. I've been unable to figure out how to have inner_join
in sparklyr treat null values in a join column as equal. Does anyone know how this can be done in sparklyr?
r apache-spark join dplyr sparklyr
add a comment |
The question Including null values in an Apache Spark Join has answers for Scala, PySpark and SparkR, but not for sparklyr. I've been unable to figure out how to have inner_join
in sparklyr treat null values in a join column as equal. Does anyone know how this can be done in sparklyr?
r apache-spark join dplyr sparklyr
add a comment |
The question Including null values in an Apache Spark Join has answers for Scala, PySpark and SparkR, but not for sparklyr. I've been unable to figure out how to have inner_join
in sparklyr treat null values in a join column as equal. Does anyone know how this can be done in sparklyr?
r apache-spark join dplyr sparklyr
The question Including null values in an Apache Spark Join has answers for Scala, PySpark and SparkR, but not for sparklyr. I've been unable to figure out how to have inner_join
in sparklyr treat null values in a join column as equal. Does anyone know how this can be done in sparklyr?
r apache-spark join dplyr sparklyr
r apache-spark join dplyr sparklyr
edited Jan 10 at 16:09
user6910411
35.7k1090110
35.7k1090110
asked Jan 3 at 0:49
Dave KincaidDave Kincaid
2,15611729
2,15611729
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
You can invoke an implicit cross join:
#' Return a Cartesian product of Spark tables
#'
#' @param df1 tbl_spark
#' @param df2 tbl_spark
#' @param explicit logical If TRUE use crossJoin otherwise
#' join without expression
#' @param suffix character suffixes to be used on duplicate names
cross_join <- function(df1, df2,
explicit = FALSE, suffix = c("_x", "_y")) {
common_cols <- intersect(colnames(df1), colnames(df2))
if(length(common_cols) > 0) {
df1 <- df1 %>% rename_at(common_cols, funs(paste0(., suffix[1])))
df2 <- df2 %>% rename_at(common_cols, funs(paste0(., suffix[2])))
}
sparklyr::invoke(
spark_dataframe(df1),
if(explicit) "crossJoin" else "join",
spark_dataframe(df2)) %>% sdf_register()
}
and filter the result with IS NOT DISTINCT FROM
# Enable Cross joins
sc %>%
spark_session() %>%
sparklyr::invoke("conf") %>%
sparklyr::invoke("set", "spark.sql.crossJoin.enabled", "true")
df1 <- copy_to(sc, tibble(id1 = c(NA, "foo", "bar"), val = 1:3))
df2 <- copy_to(sc, tibble(id2 = c(NA, "foo", "baz"), val = 4:6))
df1 %>%
cross_join(df2) %>%
filter(id1 %IS NOT DISTINCT FROM% id2)
# Source: spark<?> [?? x 4]
id1 val_x id2 val_y
* <chr> <int> <chr> <int>
1 NA 1 NA 4
2 foo 2 foo 5
The optimized execution plan:
<jobj[62]>
org.apache.spark.sql.catalyst.plans.logical.Join
Join Inner, (id1#10 <=> id2#76)
:- Project [id1#10, val#11 AS val_x#129]
: +- InMemoryRelation [id1#10, val#11], StorageLevel(disk, memory, deserialized, 1 replicas)
: +- Scan ExistingRDD[id1#10,val#11]
+- Project [id2#76, val#77 AS val_y#132]
+- InMemoryRelation [id2#76, val#77], StorageLevel(disk, memory, deserialized, 1 replicas)
+- Scan ExistingRDD[id2#76,val#77]
<=>
operator should work the same way:
df1 %>%
cross_join(df2) %>%
filter(id1 %<=>% id2)
Please note that:
- Implicit cross join will fail, if it is not followed by selection which promotes the result to hash join / sort merge join, or cross join is enabled.
- Explicit cross join shouldn't be used in this case, as it will take precedence over subsequent selections.
It is possible to use
dplyr
style cross join:
mutate(df1, `_const` = TRUE) %>%
inner_join(mutate(df2, `_const` = TRUE), by = c("_const")) %>%
select(-`_const`) %>%
filter(id1 %IS NOT DISTINCT FROM% id2)
but I'd advise against that, as it is less robust (depending on the context optimizer might be unable to recognize that
_const
is constant).
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54015034%2fsparklyr-including-null-values-in-an-apache-spark-join%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
You can invoke an implicit cross join:
#' Return a Cartesian product of Spark tables
#'
#' @param df1 tbl_spark
#' @param df2 tbl_spark
#' @param explicit logical If TRUE use crossJoin otherwise
#' join without expression
#' @param suffix character suffixes to be used on duplicate names
cross_join <- function(df1, df2,
explicit = FALSE, suffix = c("_x", "_y")) {
common_cols <- intersect(colnames(df1), colnames(df2))
if(length(common_cols) > 0) {
df1 <- df1 %>% rename_at(common_cols, funs(paste0(., suffix[1])))
df2 <- df2 %>% rename_at(common_cols, funs(paste0(., suffix[2])))
}
sparklyr::invoke(
spark_dataframe(df1),
if(explicit) "crossJoin" else "join",
spark_dataframe(df2)) %>% sdf_register()
}
and filter the result with IS NOT DISTINCT FROM
# Enable Cross joins
sc %>%
spark_session() %>%
sparklyr::invoke("conf") %>%
sparklyr::invoke("set", "spark.sql.crossJoin.enabled", "true")
df1 <- copy_to(sc, tibble(id1 = c(NA, "foo", "bar"), val = 1:3))
df2 <- copy_to(sc, tibble(id2 = c(NA, "foo", "baz"), val = 4:6))
df1 %>%
cross_join(df2) %>%
filter(id1 %IS NOT DISTINCT FROM% id2)
# Source: spark<?> [?? x 4]
id1 val_x id2 val_y
* <chr> <int> <chr> <int>
1 NA 1 NA 4
2 foo 2 foo 5
The optimized execution plan:
<jobj[62]>
org.apache.spark.sql.catalyst.plans.logical.Join
Join Inner, (id1#10 <=> id2#76)
:- Project [id1#10, val#11 AS val_x#129]
: +- InMemoryRelation [id1#10, val#11], StorageLevel(disk, memory, deserialized, 1 replicas)
: +- Scan ExistingRDD[id1#10,val#11]
+- Project [id2#76, val#77 AS val_y#132]
+- InMemoryRelation [id2#76, val#77], StorageLevel(disk, memory, deserialized, 1 replicas)
+- Scan ExistingRDD[id2#76,val#77]
<=>
operator should work the same way:
df1 %>%
cross_join(df2) %>%
filter(id1 %<=>% id2)
Please note that:
- Implicit cross join will fail, if it is not followed by selection which promotes the result to hash join / sort merge join, or cross join is enabled.
- Explicit cross join shouldn't be used in this case, as it will take precedence over subsequent selections.
It is possible to use
dplyr
style cross join:
mutate(df1, `_const` = TRUE) %>%
inner_join(mutate(df2, `_const` = TRUE), by = c("_const")) %>%
select(-`_const`) %>%
filter(id1 %IS NOT DISTINCT FROM% id2)
but I'd advise against that, as it is less robust (depending on the context optimizer might be unable to recognize that
_const
is constant).
add a comment |
You can invoke an implicit cross join:
#' Return a Cartesian product of Spark tables
#'
#' @param df1 tbl_spark
#' @param df2 tbl_spark
#' @param explicit logical If TRUE use crossJoin otherwise
#' join without expression
#' @param suffix character suffixes to be used on duplicate names
cross_join <- function(df1, df2,
explicit = FALSE, suffix = c("_x", "_y")) {
common_cols <- intersect(colnames(df1), colnames(df2))
if(length(common_cols) > 0) {
df1 <- df1 %>% rename_at(common_cols, funs(paste0(., suffix[1])))
df2 <- df2 %>% rename_at(common_cols, funs(paste0(., suffix[2])))
}
sparklyr::invoke(
spark_dataframe(df1),
if(explicit) "crossJoin" else "join",
spark_dataframe(df2)) %>% sdf_register()
}
and filter the result with IS NOT DISTINCT FROM
# Enable Cross joins
sc %>%
spark_session() %>%
sparklyr::invoke("conf") %>%
sparklyr::invoke("set", "spark.sql.crossJoin.enabled", "true")
df1 <- copy_to(sc, tibble(id1 = c(NA, "foo", "bar"), val = 1:3))
df2 <- copy_to(sc, tibble(id2 = c(NA, "foo", "baz"), val = 4:6))
df1 %>%
cross_join(df2) %>%
filter(id1 %IS NOT DISTINCT FROM% id2)
# Source: spark<?> [?? x 4]
id1 val_x id2 val_y
* <chr> <int> <chr> <int>
1 NA 1 NA 4
2 foo 2 foo 5
The optimized execution plan:
<jobj[62]>
org.apache.spark.sql.catalyst.plans.logical.Join
Join Inner, (id1#10 <=> id2#76)
:- Project [id1#10, val#11 AS val_x#129]
: +- InMemoryRelation [id1#10, val#11], StorageLevel(disk, memory, deserialized, 1 replicas)
: +- Scan ExistingRDD[id1#10,val#11]
+- Project [id2#76, val#77 AS val_y#132]
+- InMemoryRelation [id2#76, val#77], StorageLevel(disk, memory, deserialized, 1 replicas)
+- Scan ExistingRDD[id2#76,val#77]
<=>
operator should work the same way:
df1 %>%
cross_join(df2) %>%
filter(id1 %<=>% id2)
Please note that:
- Implicit cross join will fail, if it is not followed by selection which promotes the result to hash join / sort merge join, or cross join is enabled.
- Explicit cross join shouldn't be used in this case, as it will take precedence over subsequent selections.
It is possible to use
dplyr
style cross join:
mutate(df1, `_const` = TRUE) %>%
inner_join(mutate(df2, `_const` = TRUE), by = c("_const")) %>%
select(-`_const`) %>%
filter(id1 %IS NOT DISTINCT FROM% id2)
but I'd advise against that, as it is less robust (depending on the context optimizer might be unable to recognize that
_const
is constant).
add a comment |
You can invoke an implicit cross join:
#' Return a Cartesian product of Spark tables
#'
#' @param df1 tbl_spark
#' @param df2 tbl_spark
#' @param explicit logical If TRUE use crossJoin otherwise
#' join without expression
#' @param suffix character suffixes to be used on duplicate names
cross_join <- function(df1, df2,
explicit = FALSE, suffix = c("_x", "_y")) {
common_cols <- intersect(colnames(df1), colnames(df2))
if(length(common_cols) > 0) {
df1 <- df1 %>% rename_at(common_cols, funs(paste0(., suffix[1])))
df2 <- df2 %>% rename_at(common_cols, funs(paste0(., suffix[2])))
}
sparklyr::invoke(
spark_dataframe(df1),
if(explicit) "crossJoin" else "join",
spark_dataframe(df2)) %>% sdf_register()
}
and filter the result with IS NOT DISTINCT FROM
# Enable Cross joins
sc %>%
spark_session() %>%
sparklyr::invoke("conf") %>%
sparklyr::invoke("set", "spark.sql.crossJoin.enabled", "true")
df1 <- copy_to(sc, tibble(id1 = c(NA, "foo", "bar"), val = 1:3))
df2 <- copy_to(sc, tibble(id2 = c(NA, "foo", "baz"), val = 4:6))
df1 %>%
cross_join(df2) %>%
filter(id1 %IS NOT DISTINCT FROM% id2)
# Source: spark<?> [?? x 4]
id1 val_x id2 val_y
* <chr> <int> <chr> <int>
1 NA 1 NA 4
2 foo 2 foo 5
The optimized execution plan:
<jobj[62]>
org.apache.spark.sql.catalyst.plans.logical.Join
Join Inner, (id1#10 <=> id2#76)
:- Project [id1#10, val#11 AS val_x#129]
: +- InMemoryRelation [id1#10, val#11], StorageLevel(disk, memory, deserialized, 1 replicas)
: +- Scan ExistingRDD[id1#10,val#11]
+- Project [id2#76, val#77 AS val_y#132]
+- InMemoryRelation [id2#76, val#77], StorageLevel(disk, memory, deserialized, 1 replicas)
+- Scan ExistingRDD[id2#76,val#77]
<=>
operator should work the same way:
df1 %>%
cross_join(df2) %>%
filter(id1 %<=>% id2)
Please note that:
- Implicit cross join will fail, if it is not followed by selection which promotes the result to hash join / sort merge join, or cross join is enabled.
- Explicit cross join shouldn't be used in this case, as it will take precedence over subsequent selections.
It is possible to use
dplyr
style cross join:
mutate(df1, `_const` = TRUE) %>%
inner_join(mutate(df2, `_const` = TRUE), by = c("_const")) %>%
select(-`_const`) %>%
filter(id1 %IS NOT DISTINCT FROM% id2)
but I'd advise against that, as it is less robust (depending on the context optimizer might be unable to recognize that
_const
is constant).
You can invoke an implicit cross join:
#' Return a Cartesian product of Spark tables
#'
#' @param df1 tbl_spark
#' @param df2 tbl_spark
#' @param explicit logical If TRUE use crossJoin otherwise
#' join without expression
#' @param suffix character suffixes to be used on duplicate names
cross_join <- function(df1, df2,
explicit = FALSE, suffix = c("_x", "_y")) {
common_cols <- intersect(colnames(df1), colnames(df2))
if(length(common_cols) > 0) {
df1 <- df1 %>% rename_at(common_cols, funs(paste0(., suffix[1])))
df2 <- df2 %>% rename_at(common_cols, funs(paste0(., suffix[2])))
}
sparklyr::invoke(
spark_dataframe(df1),
if(explicit) "crossJoin" else "join",
spark_dataframe(df2)) %>% sdf_register()
}
and filter the result with IS NOT DISTINCT FROM
# Enable Cross joins
sc %>%
spark_session() %>%
sparklyr::invoke("conf") %>%
sparklyr::invoke("set", "spark.sql.crossJoin.enabled", "true")
df1 <- copy_to(sc, tibble(id1 = c(NA, "foo", "bar"), val = 1:3))
df2 <- copy_to(sc, tibble(id2 = c(NA, "foo", "baz"), val = 4:6))
df1 %>%
cross_join(df2) %>%
filter(id1 %IS NOT DISTINCT FROM% id2)
# Source: spark<?> [?? x 4]
id1 val_x id2 val_y
* <chr> <int> <chr> <int>
1 NA 1 NA 4
2 foo 2 foo 5
The optimized execution plan:
<jobj[62]>
org.apache.spark.sql.catalyst.plans.logical.Join
Join Inner, (id1#10 <=> id2#76)
:- Project [id1#10, val#11 AS val_x#129]
: +- InMemoryRelation [id1#10, val#11], StorageLevel(disk, memory, deserialized, 1 replicas)
: +- Scan ExistingRDD[id1#10,val#11]
+- Project [id2#76, val#77 AS val_y#132]
+- InMemoryRelation [id2#76, val#77], StorageLevel(disk, memory, deserialized, 1 replicas)
+- Scan ExistingRDD[id2#76,val#77]
<=>
operator should work the same way:
df1 %>%
cross_join(df2) %>%
filter(id1 %<=>% id2)
Please note that:
- Implicit cross join will fail, if it is not followed by selection which promotes the result to hash join / sort merge join, or cross join is enabled.
- Explicit cross join shouldn't be used in this case, as it will take precedence over subsequent selections.
It is possible to use
dplyr
style cross join:
mutate(df1, `_const` = TRUE) %>%
inner_join(mutate(df2, `_const` = TRUE), by = c("_const")) %>%
select(-`_const`) %>%
filter(id1 %IS NOT DISTINCT FROM% id2)
but I'd advise against that, as it is less robust (depending on the context optimizer might be unable to recognize that
_const
is constant).
edited Feb 6 at 14:58
answered Jan 3 at 13:04
user6910411user6910411
35.7k1090110
35.7k1090110
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54015034%2fsparklyr-including-null-values-in-an-apache-spark-join%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown