How to load dataframe on all dask workers
I have a few thousand CSV files in S3, and I want to load them, concatenate them together into a single pandas dataframe, and share that entire dataframe with all dask workers on a cluster. All of the files are approximately the same size (~1MB). I am using 8 processes per machine (one per core) and one thread per process. The entire dataframe fits comfortably into each worker process's memory. What is the most efficient and scalable way to accomplish this?
I implemented this workflow using MPI4py as follows: use a thread pool in one worker process to read all of the files into pandas dataframes, concatenate the dataframes together, and use MPI4py's broadcast function to send the complete dataframe to all of the other worker processes.
I've thought of five ways to accomplish this in dask:
- Each worker reads all of the files using pandas.read_csv, and then concatenates them together using pandas.concat.
- Use dask.dataframe.from_delayed to read all files into a distributed dask dataframe, use dask.distributed.worker_client to get a client on each worker process, and then use dask.dataframe.compute on each worker to get the pandas dataframe.
- Load the distributed dataframe as in solution 2, use dask.distributed.Client.replicate to distribute all of the partitions to all of the workers, use dask.distributed.worker_client to get a client on each worker process, and use dask.dataframe.compute to get the pandas dataframe in each worker process.
- Load the distributed dataframe as in solution 2, use dask.dataframe.compute to bring the the dataframe into the local process, delete the distributed dataframe from the cluster (by cancelling the futures), and use dask.distributed.Client.scatter(broadcast=True, direct=True) to send the local pandas dataframe to all workers.
- Load the distributed dataframe and gather it to the local process as in solution 4, use dask.distributed.Client.scatter(broadcast=False) to send it to a worker process, use dask.distributed.Client.replicate to send it to all other workers.
Solutions 2-5 have a huge advantage over the MPI4py version in that they leverage off of dask's ability to load the dataframe in parallel. However, none of those solutions get anywhere close to the performance of MPI4py's broadcast function when it's time to distribute the data around the cluster. In addition, I'm having trouble predicting their memory usage, and I see many messages from the workers complaining that the event loop was unresponsive for multiple seconds.
At this stage, I'm inclined to use the first solution: even though the data-loading is inefficient, it's not that slow, and in my experience it's the most robust. Surely I will be leaving a lot of performance potential on the table if I go this route. Is there any way to improve one of the dask solutions? Or is there another solution that I haven't considered?
python performance amazon-s3 dask dask-distributed
add a comment |
I have a few thousand CSV files in S3, and I want to load them, concatenate them together into a single pandas dataframe, and share that entire dataframe with all dask workers on a cluster. All of the files are approximately the same size (~1MB). I am using 8 processes per machine (one per core) and one thread per process. The entire dataframe fits comfortably into each worker process's memory. What is the most efficient and scalable way to accomplish this?
I implemented this workflow using MPI4py as follows: use a thread pool in one worker process to read all of the files into pandas dataframes, concatenate the dataframes together, and use MPI4py's broadcast function to send the complete dataframe to all of the other worker processes.
I've thought of five ways to accomplish this in dask:
- Each worker reads all of the files using pandas.read_csv, and then concatenates them together using pandas.concat.
- Use dask.dataframe.from_delayed to read all files into a distributed dask dataframe, use dask.distributed.worker_client to get a client on each worker process, and then use dask.dataframe.compute on each worker to get the pandas dataframe.
- Load the distributed dataframe as in solution 2, use dask.distributed.Client.replicate to distribute all of the partitions to all of the workers, use dask.distributed.worker_client to get a client on each worker process, and use dask.dataframe.compute to get the pandas dataframe in each worker process.
- Load the distributed dataframe as in solution 2, use dask.dataframe.compute to bring the the dataframe into the local process, delete the distributed dataframe from the cluster (by cancelling the futures), and use dask.distributed.Client.scatter(broadcast=True, direct=True) to send the local pandas dataframe to all workers.
- Load the distributed dataframe and gather it to the local process as in solution 4, use dask.distributed.Client.scatter(broadcast=False) to send it to a worker process, use dask.distributed.Client.replicate to send it to all other workers.
Solutions 2-5 have a huge advantage over the MPI4py version in that they leverage off of dask's ability to load the dataframe in parallel. However, none of those solutions get anywhere close to the performance of MPI4py's broadcast function when it's time to distribute the data around the cluster. In addition, I'm having trouble predicting their memory usage, and I see many messages from the workers complaining that the event loop was unresponsive for multiple seconds.
At this stage, I'm inclined to use the first solution: even though the data-loading is inefficient, it's not that slow, and in my experience it's the most robust. Surely I will be leaving a lot of performance potential on the table if I go this route. Is there any way to improve one of the dask solutions? Or is there another solution that I haven't considered?
python performance amazon-s3 dask dask-distributed
add a comment |
I have a few thousand CSV files in S3, and I want to load them, concatenate them together into a single pandas dataframe, and share that entire dataframe with all dask workers on a cluster. All of the files are approximately the same size (~1MB). I am using 8 processes per machine (one per core) and one thread per process. The entire dataframe fits comfortably into each worker process's memory. What is the most efficient and scalable way to accomplish this?
I implemented this workflow using MPI4py as follows: use a thread pool in one worker process to read all of the files into pandas dataframes, concatenate the dataframes together, and use MPI4py's broadcast function to send the complete dataframe to all of the other worker processes.
I've thought of five ways to accomplish this in dask:
- Each worker reads all of the files using pandas.read_csv, and then concatenates them together using pandas.concat.
- Use dask.dataframe.from_delayed to read all files into a distributed dask dataframe, use dask.distributed.worker_client to get a client on each worker process, and then use dask.dataframe.compute on each worker to get the pandas dataframe.
- Load the distributed dataframe as in solution 2, use dask.distributed.Client.replicate to distribute all of the partitions to all of the workers, use dask.distributed.worker_client to get a client on each worker process, and use dask.dataframe.compute to get the pandas dataframe in each worker process.
- Load the distributed dataframe as in solution 2, use dask.dataframe.compute to bring the the dataframe into the local process, delete the distributed dataframe from the cluster (by cancelling the futures), and use dask.distributed.Client.scatter(broadcast=True, direct=True) to send the local pandas dataframe to all workers.
- Load the distributed dataframe and gather it to the local process as in solution 4, use dask.distributed.Client.scatter(broadcast=False) to send it to a worker process, use dask.distributed.Client.replicate to send it to all other workers.
Solutions 2-5 have a huge advantage over the MPI4py version in that they leverage off of dask's ability to load the dataframe in parallel. However, none of those solutions get anywhere close to the performance of MPI4py's broadcast function when it's time to distribute the data around the cluster. In addition, I'm having trouble predicting their memory usage, and I see many messages from the workers complaining that the event loop was unresponsive for multiple seconds.
At this stage, I'm inclined to use the first solution: even though the data-loading is inefficient, it's not that slow, and in my experience it's the most robust. Surely I will be leaving a lot of performance potential on the table if I go this route. Is there any way to improve one of the dask solutions? Or is there another solution that I haven't considered?
python performance amazon-s3 dask dask-distributed
I have a few thousand CSV files in S3, and I want to load them, concatenate them together into a single pandas dataframe, and share that entire dataframe with all dask workers on a cluster. All of the files are approximately the same size (~1MB). I am using 8 processes per machine (one per core) and one thread per process. The entire dataframe fits comfortably into each worker process's memory. What is the most efficient and scalable way to accomplish this?
I implemented this workflow using MPI4py as follows: use a thread pool in one worker process to read all of the files into pandas dataframes, concatenate the dataframes together, and use MPI4py's broadcast function to send the complete dataframe to all of the other worker processes.
I've thought of five ways to accomplish this in dask:
- Each worker reads all of the files using pandas.read_csv, and then concatenates them together using pandas.concat.
- Use dask.dataframe.from_delayed to read all files into a distributed dask dataframe, use dask.distributed.worker_client to get a client on each worker process, and then use dask.dataframe.compute on each worker to get the pandas dataframe.
- Load the distributed dataframe as in solution 2, use dask.distributed.Client.replicate to distribute all of the partitions to all of the workers, use dask.distributed.worker_client to get a client on each worker process, and use dask.dataframe.compute to get the pandas dataframe in each worker process.
- Load the distributed dataframe as in solution 2, use dask.dataframe.compute to bring the the dataframe into the local process, delete the distributed dataframe from the cluster (by cancelling the futures), and use dask.distributed.Client.scatter(broadcast=True, direct=True) to send the local pandas dataframe to all workers.
- Load the distributed dataframe and gather it to the local process as in solution 4, use dask.distributed.Client.scatter(broadcast=False) to send it to a worker process, use dask.distributed.Client.replicate to send it to all other workers.
Solutions 2-5 have a huge advantage over the MPI4py version in that they leverage off of dask's ability to load the dataframe in parallel. However, none of those solutions get anywhere close to the performance of MPI4py's broadcast function when it's time to distribute the data around the cluster. In addition, I'm having trouble predicting their memory usage, and I see many messages from the workers complaining that the event loop was unresponsive for multiple seconds.
At this stage, I'm inclined to use the first solution: even though the data-loading is inefficient, it's not that slow, and in my experience it's the most robust. Surely I will be leaving a lot of performance potential on the table if I go this route. Is there any way to improve one of the dask solutions? Or is there another solution that I haven't considered?
python performance amazon-s3 dask dask-distributed
python performance amazon-s3 dask dask-distributed
asked Oct 23 '17 at 19:09
Peter LubansPeter Lubans
506
506
add a comment |
add a comment |
0
active
oldest
votes
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f46896785%2fhow-to-load-dataframe-on-all-dask-workers%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
0
active
oldest
votes
0
active
oldest
votes
active
oldest
votes
active
oldest
votes
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f46896785%2fhow-to-load-dataframe-on-all-dask-workers%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown