How to load dataframe on all dask workers












1















I have a few thousand CSV files in S3, and I want to load them, concatenate them together into a single pandas dataframe, and share that entire dataframe with all dask workers on a cluster. All of the files are approximately the same size (~1MB). I am using 8 processes per machine (one per core) and one thread per process. The entire dataframe fits comfortably into each worker process's memory. What is the most efficient and scalable way to accomplish this?



I implemented this workflow using MPI4py as follows: use a thread pool in one worker process to read all of the files into pandas dataframes, concatenate the dataframes together, and use MPI4py's broadcast function to send the complete dataframe to all of the other worker processes.



I've thought of five ways to accomplish this in dask:




  1. Each worker reads all of the files using pandas.read_csv, and then concatenates them together using pandas.concat.

  2. Use dask.dataframe.from_delayed to read all files into a distributed dask dataframe, use dask.distributed.worker_client to get a client on each worker process, and then use dask.dataframe.compute on each worker to get the pandas dataframe.

  3. Load the distributed dataframe as in solution 2, use dask.distributed.Client.replicate to distribute all of the partitions to all of the workers, use dask.distributed.worker_client to get a client on each worker process, and use dask.dataframe.compute to get the pandas dataframe in each worker process.

  4. Load the distributed dataframe as in solution 2, use dask.dataframe.compute to bring the the dataframe into the local process, delete the distributed dataframe from the cluster (by cancelling the futures), and use dask.distributed.Client.scatter(broadcast=True, direct=True) to send the local pandas dataframe to all workers.

  5. Load the distributed dataframe and gather it to the local process as in solution 4, use dask.distributed.Client.scatter(broadcast=False) to send it to a worker process, use dask.distributed.Client.replicate to send it to all other workers.


Solutions 2-5 have a huge advantage over the MPI4py version in that they leverage off of dask's ability to load the dataframe in parallel. However, none of those solutions get anywhere close to the performance of MPI4py's broadcast function when it's time to distribute the data around the cluster. In addition, I'm having trouble predicting their memory usage, and I see many messages from the workers complaining that the event loop was unresponsive for multiple seconds.



At this stage, I'm inclined to use the first solution: even though the data-loading is inefficient, it's not that slow, and in my experience it's the most robust. Surely I will be leaving a lot of performance potential on the table if I go this route. Is there any way to improve one of the dask solutions? Or is there another solution that I haven't considered?










share|improve this question



























    1















    I have a few thousand CSV files in S3, and I want to load them, concatenate them together into a single pandas dataframe, and share that entire dataframe with all dask workers on a cluster. All of the files are approximately the same size (~1MB). I am using 8 processes per machine (one per core) and one thread per process. The entire dataframe fits comfortably into each worker process's memory. What is the most efficient and scalable way to accomplish this?



    I implemented this workflow using MPI4py as follows: use a thread pool in one worker process to read all of the files into pandas dataframes, concatenate the dataframes together, and use MPI4py's broadcast function to send the complete dataframe to all of the other worker processes.



    I've thought of five ways to accomplish this in dask:




    1. Each worker reads all of the files using pandas.read_csv, and then concatenates them together using pandas.concat.

    2. Use dask.dataframe.from_delayed to read all files into a distributed dask dataframe, use dask.distributed.worker_client to get a client on each worker process, and then use dask.dataframe.compute on each worker to get the pandas dataframe.

    3. Load the distributed dataframe as in solution 2, use dask.distributed.Client.replicate to distribute all of the partitions to all of the workers, use dask.distributed.worker_client to get a client on each worker process, and use dask.dataframe.compute to get the pandas dataframe in each worker process.

    4. Load the distributed dataframe as in solution 2, use dask.dataframe.compute to bring the the dataframe into the local process, delete the distributed dataframe from the cluster (by cancelling the futures), and use dask.distributed.Client.scatter(broadcast=True, direct=True) to send the local pandas dataframe to all workers.

    5. Load the distributed dataframe and gather it to the local process as in solution 4, use dask.distributed.Client.scatter(broadcast=False) to send it to a worker process, use dask.distributed.Client.replicate to send it to all other workers.


    Solutions 2-5 have a huge advantage over the MPI4py version in that they leverage off of dask's ability to load the dataframe in parallel. However, none of those solutions get anywhere close to the performance of MPI4py's broadcast function when it's time to distribute the data around the cluster. In addition, I'm having trouble predicting their memory usage, and I see many messages from the workers complaining that the event loop was unresponsive for multiple seconds.



    At this stage, I'm inclined to use the first solution: even though the data-loading is inefficient, it's not that slow, and in my experience it's the most robust. Surely I will be leaving a lot of performance potential on the table if I go this route. Is there any way to improve one of the dask solutions? Or is there another solution that I haven't considered?










    share|improve this question

























      1












      1








      1








      I have a few thousand CSV files in S3, and I want to load them, concatenate them together into a single pandas dataframe, and share that entire dataframe with all dask workers on a cluster. All of the files are approximately the same size (~1MB). I am using 8 processes per machine (one per core) and one thread per process. The entire dataframe fits comfortably into each worker process's memory. What is the most efficient and scalable way to accomplish this?



      I implemented this workflow using MPI4py as follows: use a thread pool in one worker process to read all of the files into pandas dataframes, concatenate the dataframes together, and use MPI4py's broadcast function to send the complete dataframe to all of the other worker processes.



      I've thought of five ways to accomplish this in dask:




      1. Each worker reads all of the files using pandas.read_csv, and then concatenates them together using pandas.concat.

      2. Use dask.dataframe.from_delayed to read all files into a distributed dask dataframe, use dask.distributed.worker_client to get a client on each worker process, and then use dask.dataframe.compute on each worker to get the pandas dataframe.

      3. Load the distributed dataframe as in solution 2, use dask.distributed.Client.replicate to distribute all of the partitions to all of the workers, use dask.distributed.worker_client to get a client on each worker process, and use dask.dataframe.compute to get the pandas dataframe in each worker process.

      4. Load the distributed dataframe as in solution 2, use dask.dataframe.compute to bring the the dataframe into the local process, delete the distributed dataframe from the cluster (by cancelling the futures), and use dask.distributed.Client.scatter(broadcast=True, direct=True) to send the local pandas dataframe to all workers.

      5. Load the distributed dataframe and gather it to the local process as in solution 4, use dask.distributed.Client.scatter(broadcast=False) to send it to a worker process, use dask.distributed.Client.replicate to send it to all other workers.


      Solutions 2-5 have a huge advantage over the MPI4py version in that they leverage off of dask's ability to load the dataframe in parallel. However, none of those solutions get anywhere close to the performance of MPI4py's broadcast function when it's time to distribute the data around the cluster. In addition, I'm having trouble predicting their memory usage, and I see many messages from the workers complaining that the event loop was unresponsive for multiple seconds.



      At this stage, I'm inclined to use the first solution: even though the data-loading is inefficient, it's not that slow, and in my experience it's the most robust. Surely I will be leaving a lot of performance potential on the table if I go this route. Is there any way to improve one of the dask solutions? Or is there another solution that I haven't considered?










      share|improve this question














      I have a few thousand CSV files in S3, and I want to load them, concatenate them together into a single pandas dataframe, and share that entire dataframe with all dask workers on a cluster. All of the files are approximately the same size (~1MB). I am using 8 processes per machine (one per core) and one thread per process. The entire dataframe fits comfortably into each worker process's memory. What is the most efficient and scalable way to accomplish this?



      I implemented this workflow using MPI4py as follows: use a thread pool in one worker process to read all of the files into pandas dataframes, concatenate the dataframes together, and use MPI4py's broadcast function to send the complete dataframe to all of the other worker processes.



      I've thought of five ways to accomplish this in dask:




      1. Each worker reads all of the files using pandas.read_csv, and then concatenates them together using pandas.concat.

      2. Use dask.dataframe.from_delayed to read all files into a distributed dask dataframe, use dask.distributed.worker_client to get a client on each worker process, and then use dask.dataframe.compute on each worker to get the pandas dataframe.

      3. Load the distributed dataframe as in solution 2, use dask.distributed.Client.replicate to distribute all of the partitions to all of the workers, use dask.distributed.worker_client to get a client on each worker process, and use dask.dataframe.compute to get the pandas dataframe in each worker process.

      4. Load the distributed dataframe as in solution 2, use dask.dataframe.compute to bring the the dataframe into the local process, delete the distributed dataframe from the cluster (by cancelling the futures), and use dask.distributed.Client.scatter(broadcast=True, direct=True) to send the local pandas dataframe to all workers.

      5. Load the distributed dataframe and gather it to the local process as in solution 4, use dask.distributed.Client.scatter(broadcast=False) to send it to a worker process, use dask.distributed.Client.replicate to send it to all other workers.


      Solutions 2-5 have a huge advantage over the MPI4py version in that they leverage off of dask's ability to load the dataframe in parallel. However, none of those solutions get anywhere close to the performance of MPI4py's broadcast function when it's time to distribute the data around the cluster. In addition, I'm having trouble predicting their memory usage, and I see many messages from the workers complaining that the event loop was unresponsive for multiple seconds.



      At this stage, I'm inclined to use the first solution: even though the data-loading is inefficient, it's not that slow, and in my experience it's the most robust. Surely I will be leaving a lot of performance potential on the table if I go this route. Is there any way to improve one of the dask solutions? Or is there another solution that I haven't considered?







      python performance amazon-s3 dask dask-distributed






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Oct 23 '17 at 19:09









      Peter LubansPeter Lubans

      506




      506
























          0






          active

          oldest

          votes











          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f46896785%2fhow-to-load-dataframe-on-all-dask-workers%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          0






          active

          oldest

          votes








          0






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes
















          draft saved

          draft discarded




















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f46896785%2fhow-to-load-dataframe-on-all-dask-workers%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          MongoDB - Not Authorized To Execute Command

          in spring boot 2.1 many test slices are not allowed anymore due to multiple @BootstrapWith

          Npm cannot find a required file even through it is in the searched directory