Rabbitmq - start_consuming may not be called from the scope of another BlockingConnection or BlockingChannel...












0















I have two python processes, one consumer process and one producer process. Each process will start a rabbitmq connection and spawn multiple consumer/producer threads. Each thread will create a channel in the connection and perform the message sending and receiving logic.



This is my consumer thread



def consumer_thread(connection, routing_key):
channel = connection.channel()
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange="test", routing_key=routing_key, queue=queue_name)
thread_name = current_thread().name

def process(ch, method, properties, body):
print(f"{thread_name} received {body}")

channel.basic_consume(process, queue=queue_name, no_ack=True)
channel.start_consuming()


This is my producer thread



def producer_thread(connection, routing_key, sleep_time):
channel = connection.channel()
thread_name = current_thread().name
count = 0

while True:
count += 1
channel.basic_publish("test", routing_key=routing_key,
body=f"msg {count} from {thread_name}")
time.sleep(sleep_time)


And I start a rabbitmq connection using



connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))



However, when I run my code, for the first message received at consumer thread, I am getting this error message



Traceback (most recent call last):
File "D:appcortex-binPython36libthreading.py", line 916, in _bootstrap_inner
self.run()
File "D:appcortex-binPython36libthreading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File "D:appcortexbackgroundcorescratchtest.py", line 18, in consumer_thread
channel.start_consuming()
File "D:appcortex-binPython36libsite-packagespikaadaptersblocking_connection.py", line 1817, in start_consuming
'start_consuming may not be called from the scope of 'pika.exceptions.RecursionError: start_consuming may not be called from the scope of another BlockingConnection or BlockingChannel callback'


For all subsequent messages, they can be received by the consumer threads just fine.



May I know what's causing this exception? Thanks.










share|improve this question























  • The bug is described here: github.com/pika/pika/issues/927

    – Gabriele
    Nov 21 '18 at 7:39
















0















I have two python processes, one consumer process and one producer process. Each process will start a rabbitmq connection and spawn multiple consumer/producer threads. Each thread will create a channel in the connection and perform the message sending and receiving logic.



This is my consumer thread



def consumer_thread(connection, routing_key):
channel = connection.channel()
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange="test", routing_key=routing_key, queue=queue_name)
thread_name = current_thread().name

def process(ch, method, properties, body):
print(f"{thread_name} received {body}")

channel.basic_consume(process, queue=queue_name, no_ack=True)
channel.start_consuming()


This is my producer thread



def producer_thread(connection, routing_key, sleep_time):
channel = connection.channel()
thread_name = current_thread().name
count = 0

while True:
count += 1
channel.basic_publish("test", routing_key=routing_key,
body=f"msg {count} from {thread_name}")
time.sleep(sleep_time)


And I start a rabbitmq connection using



connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))



However, when I run my code, for the first message received at consumer thread, I am getting this error message



Traceback (most recent call last):
File "D:appcortex-binPython36libthreading.py", line 916, in _bootstrap_inner
self.run()
File "D:appcortex-binPython36libthreading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File "D:appcortexbackgroundcorescratchtest.py", line 18, in consumer_thread
channel.start_consuming()
File "D:appcortex-binPython36libsite-packagespikaadaptersblocking_connection.py", line 1817, in start_consuming
'start_consuming may not be called from the scope of 'pika.exceptions.RecursionError: start_consuming may not be called from the scope of another BlockingConnection or BlockingChannel callback'


For all subsequent messages, they can be received by the consumer threads just fine.



May I know what's causing this exception? Thanks.










share|improve this question























  • The bug is described here: github.com/pika/pika/issues/927

    – Gabriele
    Nov 21 '18 at 7:39














0












0








0








I have two python processes, one consumer process and one producer process. Each process will start a rabbitmq connection and spawn multiple consumer/producer threads. Each thread will create a channel in the connection and perform the message sending and receiving logic.



This is my consumer thread



def consumer_thread(connection, routing_key):
channel = connection.channel()
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange="test", routing_key=routing_key, queue=queue_name)
thread_name = current_thread().name

def process(ch, method, properties, body):
print(f"{thread_name} received {body}")

channel.basic_consume(process, queue=queue_name, no_ack=True)
channel.start_consuming()


This is my producer thread



def producer_thread(connection, routing_key, sleep_time):
channel = connection.channel()
thread_name = current_thread().name
count = 0

while True:
count += 1
channel.basic_publish("test", routing_key=routing_key,
body=f"msg {count} from {thread_name}")
time.sleep(sleep_time)


And I start a rabbitmq connection using



connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))



However, when I run my code, for the first message received at consumer thread, I am getting this error message



Traceback (most recent call last):
File "D:appcortex-binPython36libthreading.py", line 916, in _bootstrap_inner
self.run()
File "D:appcortex-binPython36libthreading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File "D:appcortexbackgroundcorescratchtest.py", line 18, in consumer_thread
channel.start_consuming()
File "D:appcortex-binPython36libsite-packagespikaadaptersblocking_connection.py", line 1817, in start_consuming
'start_consuming may not be called from the scope of 'pika.exceptions.RecursionError: start_consuming may not be called from the scope of another BlockingConnection or BlockingChannel callback'


For all subsequent messages, they can be received by the consumer threads just fine.



May I know what's causing this exception? Thanks.










share|improve this question














I have two python processes, one consumer process and one producer process. Each process will start a rabbitmq connection and spawn multiple consumer/producer threads. Each thread will create a channel in the connection and perform the message sending and receiving logic.



This is my consumer thread



def consumer_thread(connection, routing_key):
channel = connection.channel()
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange="test", routing_key=routing_key, queue=queue_name)
thread_name = current_thread().name

def process(ch, method, properties, body):
print(f"{thread_name} received {body}")

channel.basic_consume(process, queue=queue_name, no_ack=True)
channel.start_consuming()


This is my producer thread



def producer_thread(connection, routing_key, sleep_time):
channel = connection.channel()
thread_name = current_thread().name
count = 0

while True:
count += 1
channel.basic_publish("test", routing_key=routing_key,
body=f"msg {count} from {thread_name}")
time.sleep(sleep_time)


And I start a rabbitmq connection using



connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))



However, when I run my code, for the first message received at consumer thread, I am getting this error message



Traceback (most recent call last):
File "D:appcortex-binPython36libthreading.py", line 916, in _bootstrap_inner
self.run()
File "D:appcortex-binPython36libthreading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File "D:appcortexbackgroundcorescratchtest.py", line 18, in consumer_thread
channel.start_consuming()
File "D:appcortex-binPython36libsite-packagespikaadaptersblocking_connection.py", line 1817, in start_consuming
'start_consuming may not be called from the scope of 'pika.exceptions.RecursionError: start_consuming may not be called from the scope of another BlockingConnection or BlockingChannel callback'


For all subsequent messages, they can be received by the consumer threads just fine.



May I know what's causing this exception? Thanks.







python multithreading rabbitmq pika






share|improve this question













share|improve this question











share|improve this question




share|improve this question










asked Nov 21 '18 at 4:03









user2396041user2396041

113




113













  • The bug is described here: github.com/pika/pika/issues/927

    – Gabriele
    Nov 21 '18 at 7:39



















  • The bug is described here: github.com/pika/pika/issues/927

    – Gabriele
    Nov 21 '18 at 7:39

















The bug is described here: github.com/pika/pika/issues/927

– Gabriele
Nov 21 '18 at 7:39





The bug is described here: github.com/pika/pika/issues/927

– Gabriele
Nov 21 '18 at 7:39












1 Answer
1






active

oldest

votes


















0














You can't access a Pika connection from multiple threads (comment). Your threads must start their own connection and channels.





NOTE: the RabbitMQ team monitors the rabbitmq-users mailing list and only sometimes answers questions on StackOverflow.






share|improve this answer
























  • I have another question. Currently I have several message producing threads that only publish messages infrequently (e.g every 6 hours, or every 24 hours). As for the consumer thread. It doesn't take very long to process each message (within heartbeat interval), but most of the time the connection is idle as no message being published. In both cases, do I have to constantly publish heartbeat messages in order to keep the connection alive? Will the rabbitmq server take care of the heartbeat sending?

    – user2396041
    Nov 23 '18 at 8:20











  • Rather than tack on a question to an existing one (that nobody will learn from), please either ask a new question here at StackOverflow or on the pika-python mailing list.

    – Luke Bakken
    Nov 23 '18 at 15:47











Your Answer






StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");

StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});

function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});


}
});














draft saved

draft discarded


















StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53405102%2frabbitmq-start-consuming-may-not-be-called-from-the-scope-of-another-blockingc%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown

























1 Answer
1






active

oldest

votes








1 Answer
1






active

oldest

votes









active

oldest

votes






active

oldest

votes









0














You can't access a Pika connection from multiple threads (comment). Your threads must start their own connection and channels.





NOTE: the RabbitMQ team monitors the rabbitmq-users mailing list and only sometimes answers questions on StackOverflow.






share|improve this answer
























  • I have another question. Currently I have several message producing threads that only publish messages infrequently (e.g every 6 hours, or every 24 hours). As for the consumer thread. It doesn't take very long to process each message (within heartbeat interval), but most of the time the connection is idle as no message being published. In both cases, do I have to constantly publish heartbeat messages in order to keep the connection alive? Will the rabbitmq server take care of the heartbeat sending?

    – user2396041
    Nov 23 '18 at 8:20











  • Rather than tack on a question to an existing one (that nobody will learn from), please either ask a new question here at StackOverflow or on the pika-python mailing list.

    – Luke Bakken
    Nov 23 '18 at 15:47
















0














You can't access a Pika connection from multiple threads (comment). Your threads must start their own connection and channels.





NOTE: the RabbitMQ team monitors the rabbitmq-users mailing list and only sometimes answers questions on StackOverflow.






share|improve this answer
























  • I have another question. Currently I have several message producing threads that only publish messages infrequently (e.g every 6 hours, or every 24 hours). As for the consumer thread. It doesn't take very long to process each message (within heartbeat interval), but most of the time the connection is idle as no message being published. In both cases, do I have to constantly publish heartbeat messages in order to keep the connection alive? Will the rabbitmq server take care of the heartbeat sending?

    – user2396041
    Nov 23 '18 at 8:20











  • Rather than tack on a question to an existing one (that nobody will learn from), please either ask a new question here at StackOverflow or on the pika-python mailing list.

    – Luke Bakken
    Nov 23 '18 at 15:47














0












0








0







You can't access a Pika connection from multiple threads (comment). Your threads must start their own connection and channels.





NOTE: the RabbitMQ team monitors the rabbitmq-users mailing list and only sometimes answers questions on StackOverflow.






share|improve this answer













You can't access a Pika connection from multiple threads (comment). Your threads must start their own connection and channels.





NOTE: the RabbitMQ team monitors the rabbitmq-users mailing list and only sometimes answers questions on StackOverflow.







share|improve this answer












share|improve this answer



share|improve this answer










answered Nov 21 '18 at 16:08









Luke BakkenLuke Bakken

3,0322915




3,0322915













  • I have another question. Currently I have several message producing threads that only publish messages infrequently (e.g every 6 hours, or every 24 hours). As for the consumer thread. It doesn't take very long to process each message (within heartbeat interval), but most of the time the connection is idle as no message being published. In both cases, do I have to constantly publish heartbeat messages in order to keep the connection alive? Will the rabbitmq server take care of the heartbeat sending?

    – user2396041
    Nov 23 '18 at 8:20











  • Rather than tack on a question to an existing one (that nobody will learn from), please either ask a new question here at StackOverflow or on the pika-python mailing list.

    – Luke Bakken
    Nov 23 '18 at 15:47



















  • I have another question. Currently I have several message producing threads that only publish messages infrequently (e.g every 6 hours, or every 24 hours). As for the consumer thread. It doesn't take very long to process each message (within heartbeat interval), but most of the time the connection is idle as no message being published. In both cases, do I have to constantly publish heartbeat messages in order to keep the connection alive? Will the rabbitmq server take care of the heartbeat sending?

    – user2396041
    Nov 23 '18 at 8:20











  • Rather than tack on a question to an existing one (that nobody will learn from), please either ask a new question here at StackOverflow or on the pika-python mailing list.

    – Luke Bakken
    Nov 23 '18 at 15:47

















I have another question. Currently I have several message producing threads that only publish messages infrequently (e.g every 6 hours, or every 24 hours). As for the consumer thread. It doesn't take very long to process each message (within heartbeat interval), but most of the time the connection is idle as no message being published. In both cases, do I have to constantly publish heartbeat messages in order to keep the connection alive? Will the rabbitmq server take care of the heartbeat sending?

– user2396041
Nov 23 '18 at 8:20





I have another question. Currently I have several message producing threads that only publish messages infrequently (e.g every 6 hours, or every 24 hours). As for the consumer thread. It doesn't take very long to process each message (within heartbeat interval), but most of the time the connection is idle as no message being published. In both cases, do I have to constantly publish heartbeat messages in order to keep the connection alive? Will the rabbitmq server take care of the heartbeat sending?

– user2396041
Nov 23 '18 at 8:20













Rather than tack on a question to an existing one (that nobody will learn from), please either ask a new question here at StackOverflow or on the pika-python mailing list.

– Luke Bakken
Nov 23 '18 at 15:47





Rather than tack on a question to an existing one (that nobody will learn from), please either ask a new question here at StackOverflow or on the pika-python mailing list.

– Luke Bakken
Nov 23 '18 at 15:47


















draft saved

draft discarded




















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid



  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.


To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53405102%2frabbitmq-start-consuming-may-not-be-called-from-the-scope-of-another-blockingc%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

MongoDB - Not Authorized To Execute Command

How to fix TextFormField cause rebuild widget in Flutter

in spring boot 2.1 many test slices are not allowed anymore due to multiple @BootstrapWith