return control to the transport












2















I'm trying to simulate a situation where data is received by a server periodically. In my set up, I run one process that sets up the server and another process that sets up a bunch of clients (suffices to think of a single client). I have set up some of the code by putting together bits and pieces mostly from here. The server/clients communicate by sending messages using transport.write. First, the server tells the clients to start (this works fine AFAIK). The clients report back to the server as they make progress. What has me confused is that I only get these intermittent messages at the very end when the client is done. This could be a problem with buffer flush and I tried (unsuccessfully) things like This. Also, each message is pretty large and I tried sending the same message multiple times so the buffer would get cleared.



I suspect what I am seeing is a problem with returning the control to the transport but I can't figure out how to do away with it.



Any help with this or any other issues that jump up to you is much appreciated.



Server:



from twisted.internet import reactor, protocol

import time
import serverSideAnalysis
import pdb
#import bson, json, msgpack
import _pickle as pickle # I expect the users to authenticate and not
# do anything malicious.


PORT = 9000
NUM = 1
local_scratch="/local/scratch"


class Hub(protocol.Protocol):
def __init__(self,factory, clients, nclients):
self.clients = clients
self.nclients = nclients
self.factory = factory
self.dispatcher = serverSideAnalysis.ServerTalker(NUM, self,
local_scratch)

def connectionMade(self):
print("connected to user" , (self))
if len(self.clients) < self.nclients:
self.factory.clients.append(self)
else:
self.factory.clients[self.nclients] = self
if len(self.clients) == NUM:
val = input("Looks like everyone is here, shall we start? (Y/N)")
while (val.upper() != "Y"):
time.sleep(20)
val = input("Looks like everyone is here, shall we start??? (Y/N)")
message = pickle.dumps({"TASK": "INIT", "SUBTASK":"STORE"})
self.message(message) # This reaches the client as I had expected

def message(self, command):
for c in self.factory.clients:
c.transport.write(command)

def connectionLost(self, reason):
self.factory.clients.remove(self)
self.nclients -= 1

def dataReceived(self, data):
if len(self.clients) == NUM:
self.dispatcher.dispatch(data)

class PauseTransport(protocol.Protocol):
def makeConnection(self, transport):
transport.pauseProducing()

class HubFactory(protocol.Factory):
def __init__(self, num):
self.clients = set()
self.nclients = 0
self.totConnections = num

def buildProtocol(self, addr):
print(self.nclients)
if self.nclients < self.totConnections:
self.nclients += 1
return Hub(self, self.clients, self.nclients)
protocol = PauseTransport()
protocol.factory = self
return protocol

factory = HubFactory(NUM)
reactor.listenTCP(PORT, factory)
factory.clients =
reactor.run()


Client:



from twisted.internet import reactor, protocol
import time
import clientSideAnalysis
import sys


HOST = 'localhost'
PORT = 9000
local_scratch="/local/scratch"

class MyClient(protocol.Protocol):

def connectionMade(self):
print("connected!")
self.factory.clients.append(self)
print ("clients are ", self.factory.clients)

self.cdispatcher = clientSideAnalysis.ServerTalker(analysis_file_name, local_scratch, self)

def clientConnectionLost(self, reason):
#TODO send warning
self.factory.clients.remove(self)

def dataReceived(self, data): #This is the problematic part I think
self.cdispatcher.dispatch(data)
print("1 sent")
time.sleep(10)
self.cdispatcher.dispatch(data)
print("2 sent")
time.sleep(10)
self.cdispatcher.dispatch(data)
time.sleep(10)


def message(self, data):
self.transport.write(data)

class MyClientFactory(protocol.ClientFactory):
protocol = MyClient

if __name__=="__main__":
analysis_file_name = sys.argv[1]

factory = MyClientFactory()
reactor.connectTCP(HOST, PORT, factory)
factory.clients =
reactor.run()


The last bit of relevant information about what the dispatchers do.



In both cases, they load the message that has arrived (a dictionary) and do a few computations based on the content. Every once in a while, they use the message method to communicate with thier current values.



Finally, I'm using python 3.6. and twisted 18.9.0










share|improve this question





























    2















    I'm trying to simulate a situation where data is received by a server periodically. In my set up, I run one process that sets up the server and another process that sets up a bunch of clients (suffices to think of a single client). I have set up some of the code by putting together bits and pieces mostly from here. The server/clients communicate by sending messages using transport.write. First, the server tells the clients to start (this works fine AFAIK). The clients report back to the server as they make progress. What has me confused is that I only get these intermittent messages at the very end when the client is done. This could be a problem with buffer flush and I tried (unsuccessfully) things like This. Also, each message is pretty large and I tried sending the same message multiple times so the buffer would get cleared.



    I suspect what I am seeing is a problem with returning the control to the transport but I can't figure out how to do away with it.



    Any help with this or any other issues that jump up to you is much appreciated.



    Server:



    from twisted.internet import reactor, protocol

    import time
    import serverSideAnalysis
    import pdb
    #import bson, json, msgpack
    import _pickle as pickle # I expect the users to authenticate and not
    # do anything malicious.


    PORT = 9000
    NUM = 1
    local_scratch="/local/scratch"


    class Hub(protocol.Protocol):
    def __init__(self,factory, clients, nclients):
    self.clients = clients
    self.nclients = nclients
    self.factory = factory
    self.dispatcher = serverSideAnalysis.ServerTalker(NUM, self,
    local_scratch)

    def connectionMade(self):
    print("connected to user" , (self))
    if len(self.clients) < self.nclients:
    self.factory.clients.append(self)
    else:
    self.factory.clients[self.nclients] = self
    if len(self.clients) == NUM:
    val = input("Looks like everyone is here, shall we start? (Y/N)")
    while (val.upper() != "Y"):
    time.sleep(20)
    val = input("Looks like everyone is here, shall we start??? (Y/N)")
    message = pickle.dumps({"TASK": "INIT", "SUBTASK":"STORE"})
    self.message(message) # This reaches the client as I had expected

    def message(self, command):
    for c in self.factory.clients:
    c.transport.write(command)

    def connectionLost(self, reason):
    self.factory.clients.remove(self)
    self.nclients -= 1

    def dataReceived(self, data):
    if len(self.clients) == NUM:
    self.dispatcher.dispatch(data)

    class PauseTransport(protocol.Protocol):
    def makeConnection(self, transport):
    transport.pauseProducing()

    class HubFactory(protocol.Factory):
    def __init__(self, num):
    self.clients = set()
    self.nclients = 0
    self.totConnections = num

    def buildProtocol(self, addr):
    print(self.nclients)
    if self.nclients < self.totConnections:
    self.nclients += 1
    return Hub(self, self.clients, self.nclients)
    protocol = PauseTransport()
    protocol.factory = self
    return protocol

    factory = HubFactory(NUM)
    reactor.listenTCP(PORT, factory)
    factory.clients =
    reactor.run()


    Client:



    from twisted.internet import reactor, protocol
    import time
    import clientSideAnalysis
    import sys


    HOST = 'localhost'
    PORT = 9000
    local_scratch="/local/scratch"

    class MyClient(protocol.Protocol):

    def connectionMade(self):
    print("connected!")
    self.factory.clients.append(self)
    print ("clients are ", self.factory.clients)

    self.cdispatcher = clientSideAnalysis.ServerTalker(analysis_file_name, local_scratch, self)

    def clientConnectionLost(self, reason):
    #TODO send warning
    self.factory.clients.remove(self)

    def dataReceived(self, data): #This is the problematic part I think
    self.cdispatcher.dispatch(data)
    print("1 sent")
    time.sleep(10)
    self.cdispatcher.dispatch(data)
    print("2 sent")
    time.sleep(10)
    self.cdispatcher.dispatch(data)
    time.sleep(10)


    def message(self, data):
    self.transport.write(data)

    class MyClientFactory(protocol.ClientFactory):
    protocol = MyClient

    if __name__=="__main__":
    analysis_file_name = sys.argv[1]

    factory = MyClientFactory()
    reactor.connectTCP(HOST, PORT, factory)
    factory.clients =
    reactor.run()


    The last bit of relevant information about what the dispatchers do.



    In both cases, they load the message that has arrived (a dictionary) and do a few computations based on the content. Every once in a while, they use the message method to communicate with thier current values.



    Finally, I'm using python 3.6. and twisted 18.9.0










    share|improve this question



























      2












      2








      2








      I'm trying to simulate a situation where data is received by a server periodically. In my set up, I run one process that sets up the server and another process that sets up a bunch of clients (suffices to think of a single client). I have set up some of the code by putting together bits and pieces mostly from here. The server/clients communicate by sending messages using transport.write. First, the server tells the clients to start (this works fine AFAIK). The clients report back to the server as they make progress. What has me confused is that I only get these intermittent messages at the very end when the client is done. This could be a problem with buffer flush and I tried (unsuccessfully) things like This. Also, each message is pretty large and I tried sending the same message multiple times so the buffer would get cleared.



      I suspect what I am seeing is a problem with returning the control to the transport but I can't figure out how to do away with it.



      Any help with this or any other issues that jump up to you is much appreciated.



      Server:



      from twisted.internet import reactor, protocol

      import time
      import serverSideAnalysis
      import pdb
      #import bson, json, msgpack
      import _pickle as pickle # I expect the users to authenticate and not
      # do anything malicious.


      PORT = 9000
      NUM = 1
      local_scratch="/local/scratch"


      class Hub(protocol.Protocol):
      def __init__(self,factory, clients, nclients):
      self.clients = clients
      self.nclients = nclients
      self.factory = factory
      self.dispatcher = serverSideAnalysis.ServerTalker(NUM, self,
      local_scratch)

      def connectionMade(self):
      print("connected to user" , (self))
      if len(self.clients) < self.nclients:
      self.factory.clients.append(self)
      else:
      self.factory.clients[self.nclients] = self
      if len(self.clients) == NUM:
      val = input("Looks like everyone is here, shall we start? (Y/N)")
      while (val.upper() != "Y"):
      time.sleep(20)
      val = input("Looks like everyone is here, shall we start??? (Y/N)")
      message = pickle.dumps({"TASK": "INIT", "SUBTASK":"STORE"})
      self.message(message) # This reaches the client as I had expected

      def message(self, command):
      for c in self.factory.clients:
      c.transport.write(command)

      def connectionLost(self, reason):
      self.factory.clients.remove(self)
      self.nclients -= 1

      def dataReceived(self, data):
      if len(self.clients) == NUM:
      self.dispatcher.dispatch(data)

      class PauseTransport(protocol.Protocol):
      def makeConnection(self, transport):
      transport.pauseProducing()

      class HubFactory(protocol.Factory):
      def __init__(self, num):
      self.clients = set()
      self.nclients = 0
      self.totConnections = num

      def buildProtocol(self, addr):
      print(self.nclients)
      if self.nclients < self.totConnections:
      self.nclients += 1
      return Hub(self, self.clients, self.nclients)
      protocol = PauseTransport()
      protocol.factory = self
      return protocol

      factory = HubFactory(NUM)
      reactor.listenTCP(PORT, factory)
      factory.clients =
      reactor.run()


      Client:



      from twisted.internet import reactor, protocol
      import time
      import clientSideAnalysis
      import sys


      HOST = 'localhost'
      PORT = 9000
      local_scratch="/local/scratch"

      class MyClient(protocol.Protocol):

      def connectionMade(self):
      print("connected!")
      self.factory.clients.append(self)
      print ("clients are ", self.factory.clients)

      self.cdispatcher = clientSideAnalysis.ServerTalker(analysis_file_name, local_scratch, self)

      def clientConnectionLost(self, reason):
      #TODO send warning
      self.factory.clients.remove(self)

      def dataReceived(self, data): #This is the problematic part I think
      self.cdispatcher.dispatch(data)
      print("1 sent")
      time.sleep(10)
      self.cdispatcher.dispatch(data)
      print("2 sent")
      time.sleep(10)
      self.cdispatcher.dispatch(data)
      time.sleep(10)


      def message(self, data):
      self.transport.write(data)

      class MyClientFactory(protocol.ClientFactory):
      protocol = MyClient

      if __name__=="__main__":
      analysis_file_name = sys.argv[1]

      factory = MyClientFactory()
      reactor.connectTCP(HOST, PORT, factory)
      factory.clients =
      reactor.run()


      The last bit of relevant information about what the dispatchers do.



      In both cases, they load the message that has arrived (a dictionary) and do a few computations based on the content. Every once in a while, they use the message method to communicate with thier current values.



      Finally, I'm using python 3.6. and twisted 18.9.0










      share|improve this question
















      I'm trying to simulate a situation where data is received by a server periodically. In my set up, I run one process that sets up the server and another process that sets up a bunch of clients (suffices to think of a single client). I have set up some of the code by putting together bits and pieces mostly from here. The server/clients communicate by sending messages using transport.write. First, the server tells the clients to start (this works fine AFAIK). The clients report back to the server as they make progress. What has me confused is that I only get these intermittent messages at the very end when the client is done. This could be a problem with buffer flush and I tried (unsuccessfully) things like This. Also, each message is pretty large and I tried sending the same message multiple times so the buffer would get cleared.



      I suspect what I am seeing is a problem with returning the control to the transport but I can't figure out how to do away with it.



      Any help with this or any other issues that jump up to you is much appreciated.



      Server:



      from twisted.internet import reactor, protocol

      import time
      import serverSideAnalysis
      import pdb
      #import bson, json, msgpack
      import _pickle as pickle # I expect the users to authenticate and not
      # do anything malicious.


      PORT = 9000
      NUM = 1
      local_scratch="/local/scratch"


      class Hub(protocol.Protocol):
      def __init__(self,factory, clients, nclients):
      self.clients = clients
      self.nclients = nclients
      self.factory = factory
      self.dispatcher = serverSideAnalysis.ServerTalker(NUM, self,
      local_scratch)

      def connectionMade(self):
      print("connected to user" , (self))
      if len(self.clients) < self.nclients:
      self.factory.clients.append(self)
      else:
      self.factory.clients[self.nclients] = self
      if len(self.clients) == NUM:
      val = input("Looks like everyone is here, shall we start? (Y/N)")
      while (val.upper() != "Y"):
      time.sleep(20)
      val = input("Looks like everyone is here, shall we start??? (Y/N)")
      message = pickle.dumps({"TASK": "INIT", "SUBTASK":"STORE"})
      self.message(message) # This reaches the client as I had expected

      def message(self, command):
      for c in self.factory.clients:
      c.transport.write(command)

      def connectionLost(self, reason):
      self.factory.clients.remove(self)
      self.nclients -= 1

      def dataReceived(self, data):
      if len(self.clients) == NUM:
      self.dispatcher.dispatch(data)

      class PauseTransport(protocol.Protocol):
      def makeConnection(self, transport):
      transport.pauseProducing()

      class HubFactory(protocol.Factory):
      def __init__(self, num):
      self.clients = set()
      self.nclients = 0
      self.totConnections = num

      def buildProtocol(self, addr):
      print(self.nclients)
      if self.nclients < self.totConnections:
      self.nclients += 1
      return Hub(self, self.clients, self.nclients)
      protocol = PauseTransport()
      protocol.factory = self
      return protocol

      factory = HubFactory(NUM)
      reactor.listenTCP(PORT, factory)
      factory.clients =
      reactor.run()


      Client:



      from twisted.internet import reactor, protocol
      import time
      import clientSideAnalysis
      import sys


      HOST = 'localhost'
      PORT = 9000
      local_scratch="/local/scratch"

      class MyClient(protocol.Protocol):

      def connectionMade(self):
      print("connected!")
      self.factory.clients.append(self)
      print ("clients are ", self.factory.clients)

      self.cdispatcher = clientSideAnalysis.ServerTalker(analysis_file_name, local_scratch, self)

      def clientConnectionLost(self, reason):
      #TODO send warning
      self.factory.clients.remove(self)

      def dataReceived(self, data): #This is the problematic part I think
      self.cdispatcher.dispatch(data)
      print("1 sent")
      time.sleep(10)
      self.cdispatcher.dispatch(data)
      print("2 sent")
      time.sleep(10)
      self.cdispatcher.dispatch(data)
      time.sleep(10)


      def message(self, data):
      self.transport.write(data)

      class MyClientFactory(protocol.ClientFactory):
      protocol = MyClient

      if __name__=="__main__":
      analysis_file_name = sys.argv[1]

      factory = MyClientFactory()
      reactor.connectTCP(HOST, PORT, factory)
      factory.clients =
      reactor.run()


      The last bit of relevant information about what the dispatchers do.



      In both cases, they load the message that has arrived (a dictionary) and do a few computations based on the content. Every once in a while, they use the message method to communicate with thier current values.



      Finally, I'm using python 3.6. and twisted 18.9.0







      python twisted twisted.internet






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Nov 20 '18 at 4:34







      apoursh

















      asked Nov 20 '18 at 2:47









      apourshapoursh

      134




      134
























          1 Answer
          1






          active

          oldest

          votes


















          0














          The way you return control to the reactor from a Protocol.dataReceived method is you return from that method. For example:



          def dataReceived(self, data):
          self.cdispatcher.dispatch(data)
          print("1 sent")


          If you want more work to happen after this, you have some options. If you want the work to happen after some amount of time has passed, use reactor.callLater. If you want the work to happen after it is dispatched to another thread, use twisted.internet.threads.deferToThread. If you want the work to happen in response to some other event (for example, data being received), put it in the callback that handles that event (for example, dataReceived).






          share|improve this answer
























          • Thank you for the suggestions. I ended up dispatching the task using deferToThread on the client side, and on the server side, I set up a deferredQueue to catch the incoming messages and deal with them (as suggested here stackoverflow.com/a/32403309/5068615).

            – apoursh
            Nov 21 '18 at 19:48











          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53385494%2freturn-control-to-the-transport%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes









          0














          The way you return control to the reactor from a Protocol.dataReceived method is you return from that method. For example:



          def dataReceived(self, data):
          self.cdispatcher.dispatch(data)
          print("1 sent")


          If you want more work to happen after this, you have some options. If you want the work to happen after some amount of time has passed, use reactor.callLater. If you want the work to happen after it is dispatched to another thread, use twisted.internet.threads.deferToThread. If you want the work to happen in response to some other event (for example, data being received), put it in the callback that handles that event (for example, dataReceived).






          share|improve this answer
























          • Thank you for the suggestions. I ended up dispatching the task using deferToThread on the client side, and on the server side, I set up a deferredQueue to catch the incoming messages and deal with them (as suggested here stackoverflow.com/a/32403309/5068615).

            – apoursh
            Nov 21 '18 at 19:48
















          0














          The way you return control to the reactor from a Protocol.dataReceived method is you return from that method. For example:



          def dataReceived(self, data):
          self.cdispatcher.dispatch(data)
          print("1 sent")


          If you want more work to happen after this, you have some options. If you want the work to happen after some amount of time has passed, use reactor.callLater. If you want the work to happen after it is dispatched to another thread, use twisted.internet.threads.deferToThread. If you want the work to happen in response to some other event (for example, data being received), put it in the callback that handles that event (for example, dataReceived).






          share|improve this answer
























          • Thank you for the suggestions. I ended up dispatching the task using deferToThread on the client side, and on the server side, I set up a deferredQueue to catch the incoming messages and deal with them (as suggested here stackoverflow.com/a/32403309/5068615).

            – apoursh
            Nov 21 '18 at 19:48














          0












          0








          0







          The way you return control to the reactor from a Protocol.dataReceived method is you return from that method. For example:



          def dataReceived(self, data):
          self.cdispatcher.dispatch(data)
          print("1 sent")


          If you want more work to happen after this, you have some options. If you want the work to happen after some amount of time has passed, use reactor.callLater. If you want the work to happen after it is dispatched to another thread, use twisted.internet.threads.deferToThread. If you want the work to happen in response to some other event (for example, data being received), put it in the callback that handles that event (for example, dataReceived).






          share|improve this answer













          The way you return control to the reactor from a Protocol.dataReceived method is you return from that method. For example:



          def dataReceived(self, data):
          self.cdispatcher.dispatch(data)
          print("1 sent")


          If you want more work to happen after this, you have some options. If you want the work to happen after some amount of time has passed, use reactor.callLater. If you want the work to happen after it is dispatched to another thread, use twisted.internet.threads.deferToThread. If you want the work to happen in response to some other event (for example, data being received), put it in the callback that handles that event (for example, dataReceived).







          share|improve this answer












          share|improve this answer



          share|improve this answer










          answered Nov 20 '18 at 17:49









          Jean-Paul CalderoneJean-Paul Calderone

          41.2k571101




          41.2k571101













          • Thank you for the suggestions. I ended up dispatching the task using deferToThread on the client side, and on the server side, I set up a deferredQueue to catch the incoming messages and deal with them (as suggested here stackoverflow.com/a/32403309/5068615).

            – apoursh
            Nov 21 '18 at 19:48



















          • Thank you for the suggestions. I ended up dispatching the task using deferToThread on the client side, and on the server side, I set up a deferredQueue to catch the incoming messages and deal with them (as suggested here stackoverflow.com/a/32403309/5068615).

            – apoursh
            Nov 21 '18 at 19:48

















          Thank you for the suggestions. I ended up dispatching the task using deferToThread on the client side, and on the server side, I set up a deferredQueue to catch the incoming messages and deal with them (as suggested here stackoverflow.com/a/32403309/5068615).

          – apoursh
          Nov 21 '18 at 19:48





          Thank you for the suggestions. I ended up dispatching the task using deferToThread on the client side, and on the server side, I set up a deferredQueue to catch the incoming messages and deal with them (as suggested here stackoverflow.com/a/32403309/5068615).

          – apoursh
          Nov 21 '18 at 19:48


















          draft saved

          draft discarded




















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53385494%2freturn-control-to-the-transport%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          MongoDB - Not Authorized To Execute Command

          How to fix TextFormField cause rebuild widget in Flutter

          in spring boot 2.1 many test slices are not allowed anymore due to multiple @BootstrapWith