RabbitMQ Pika connection reset , (-1, ConnectionResetError(104, 'Connection reset by peer'))












1















searched through stackoverflow and posting this question because no solution worked for me and my question might be different from other question.



I am writing a script which gets an article from rabbitMQ queue and process the article to count words and extract key words from it and dump it in db. my script is working fine but after some time of execution i get this exception
(-1, "ConnectionResetError(104, 'Connection reset by peer')")



I have no idea why am I getting this. I have tried a lot of solutions available on stackover flow none is working for me. I havr written my script and tried it in two different ways. both work fine but after some time same exception occurs.



here is my first code:



def app_main():

global channel, results, speedvars
Logger.log_message('Starting app main')

# Edit 4
def pika_connect():
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=Config.AMQ_DAEMONS['base']['amq-host']))
channel = connection.channel()
print ("In pika connect")
Logger.log_message('Setting up input queue consumer')
channel.queue_declare(Config.AMQ_DAEMONS['consumer']['input'], durable=True)
channel.basic_consume(on_message, queue=Config.AMQ_DAEMONS['consumer']['input'], no_ack=True)

Logger.log_message('Starting loop')
channel.start_consuming()

#########

speedvars = SpeedVars()
speedtracker = SpeedTracker(speedvars)
speedtracker.start()

sender = ResultsSender(results, speedvars)
sender.start()


# Edit 5 starting 10 threads to listen to pika

for th in range(qthreads):
Logger.log_message('Starting thread: '+str(th))
try:
t = Thread(target=pika_connect, args=())
t.start()
except Exception as e:
Logger.error_message("Exception in starting threads " + str(e))



try:
app_main()
except Exception as e:
Logger.error_message("Exception in APP MAIN " + str(e))


here is my second code:



def app_main():

global channel, results, speedvars
Logger.log_message('Starting app main')

speedvars = SpeedVars()
speedtracker = SpeedTracker(speedvars)
speedtracker.start()

sender = ResultsSender(results, speedvars)
sender.start()

connection = pika.BlockingConnection(pika.ConnectionParameters(
host=Config.AMQ_DAEMONS['base']['amq-host']))
channel = connection.channel()
print ("In app main")
Logger.log_message('Setting up input queue consumer')
channel.queue_declare(Config.AMQ_DAEMONS['consumer']['input'], durable=True)
channel.basic_consume(on_message, queue=Config.AMQ_DAEMONS['consumer']['input'], no_ack=True)

Logger.log_message('Starting loop')

try:
channel.start_consuming()
except Exception as e:
Logger.error_message("Exception in start_consuming in main " + str(e))
raise e


try:
app_main()
except Exception as e:
Logger.error_message("Exception in APP MAIN " + str(e))




in my first code i used threading because i want to speed up the process of processing articles.


this is my call back fuction
def on_message(ch, method, properties, message):
Logger.log_message("Starting parsing new msg ")
handle_message(message)



EDIT: Full Code



import os
abspath = os.path.abspath(__file__)
dname = os.path.dirname(abspath)
os.chdir(dname)

from Modules import Logger
import pika
import Config
import json
import pickle
import Pipeline
import sys
import time
import datetime
import threading
import queue
import functools

from pid.decorator import pidfile

Logger.log_init(Config.AMQ_DAEMONS['consumer']['log-ident'])
#qthreads = Config.AMQ_DAEMONS['consumer']['threads']
results = queue.Queue()
channel = None
speedvars = None

SPD_RECEIVED = 'received'
SPD_DISCARDED = 'discarded'
SPD_SENT = 'sent'

class SpeedVars(object):
vars = {}
lock = None

def __init__(self):
self.lock = threading.Lock()

def inc(self, var):

self.lock.acquire()
try:
if var in self.vars:
self.vars[var] += 1
else:
self.vars[var] = 1
finally:
self.lock.release()


def dec(self, var):

self.lock.acquire()
try:
if var in self.vars:
self.vars[var] -= 1
else:
Logger.error_message('Cannot decrement ' + var + ', not tracked')
finally:
self.lock.release()

def get(self, var):

out = None
self.lock.acquire()
try:
if var in self.vars:
out = self.vars[var]
else:
Logger.error_message('Cannot get ' + var + ', not tracked')
finally:
self.lock.release()


return out

def get_all(self):

out = None
self.lock.acquire()
try:
out = self.vars.copy()
finally:
self.lock.release()


return out


class SpeedTracker(threading.Thread):
speedvars = None
start_ts = None
last_vars = {}

def __init__(self, speedvars):
super(SpeedTracker, self).__init__()
self.start_ts = time.time()
self.speedvars = speedvars
Logger.log_message('Setting up speed tracker')

def run(self):
while True:
time.sleep(Config.AMQ_DAEMONS['consumer']['speed-tracking-interval'])
prev = self.last_vars
cur = self.speedvars.get_all()
now = time.time()
if len(prev) > 0:
q = {}
for key in cur:
qty = cur[key] - prev[key]
avg = qty / Config.AMQ_DAEMONS['consumer']['speed-tracking-interval']
overall_avg = cur[key] / (now - self.start_ts)
Logger.log_message('Speed-tracking (' + key + '): total ' + str(cur[key])
+ ', delta ' + str(qty) + ', speed ' + '%0.2f' % avg + '/sec, '
+ ', overall speed ' + '%0.2f' % overall_avg + '/sec')
pending = cur[SPD_RECEIVED] - cur[SPD_DISCARDED] - cur[SPD_SENT]
pending_avg = pending / (now - self.start_ts)
Logger.log_message('Speed-tracking (pending): total ' + str(pending)
+ ', overall speed ' + '%0.2f' % pending_avg + '/sec')
self.last_vars = cur


class ResultsSender(threading.Thread):
channel = None
results = None
speedvars = None

def __init__(self, results, speedvars):
super(ResultsSender, self).__init__()
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=Config.AMQ_DAEMONS['base']['amq-host']))
self.channel = connection.channel()
Logger.log_message('Setting up output exchange')
self.channel.exchange_declare(exchange=Config.AMQ_DAEMONS['consumer']['output'], exchange_type='direct')
self.results = results
self.speedvars = speedvars

def run(self):
while True:
item = self.results.get()
self.channel.basic_publish(
exchange=Config.AMQ_DAEMONS['consumer']['output'],
routing_key='',
body=item)
self.speedvars.inc(SPD_SENT)

def parse_message(message):
try:
bodytxt = message.decode('UTF-8')
body = json.loads(bodytxt)
return body
except Exception as e:
Logger.error_message("Cannot parse message - " + str(e))
raise e

def get_body_elements(body):
try:
artid = str(body.get('article_id'))
article_dt = datetime.datetime.fromtimestamp(body.get('pubTime'))
date = article_dt.strftime(Config.DATE_FORMAT)
article = "n".join([body.get('title', ''), body.get('subheading', ''), body.get('content', '')])
return (artid, date, article)
except Exception as e:
Logger.error_message("Cannot retrieve article attributes " + str(e))
raise e

def process_article(id, date, text):
global results, speedvars
try:
Logger.log_message('Processing article ' + id)
keywords = Pipeline.extract_keywords(text)
send_data = {"id": id, "date": date, "keywords": keywords}
results.put(pickle.dumps(send_data))
# print('Queue Size:',results.qsize())
except Exception as e:
Logger.error_message("Problem processing article " + str(e))
raise e

def ack_message(ch, delivery_tag):
"""Note that `channel` must be the same pika channel instance via which
the message being ACKed was retrieved (AMQP protocol constraint).
"""
if channel.is_open:
channel.basic_ack(delivery_tag)
else:
Logger.error_message("Channel is already closed, so we can't ACK this message" + str(e))
# Channel is already closed, so we can't ACK this message;
# log and/or do something that makes sense for your app in this case.
#pass

def handle_message(connection, ch, delivery_tag, message):
global speedvars
start = time.time()
thread_id = threading.get_ident()

try:
speedvars.inc(SPD_RECEIVED)
body = parse_message(message)
(id, date, text) = get_body_elements(body)
words = len(text.split())
if words <= Config.AMQ_DAEMONS['consumer']['word-count-limit']:
process_article(id, date, text)
else:
Logger.log_message('Ignoring article, over word count limit')
speedvars.inc(SPD_DISCARDED)

except Exception as e:
Logger.error_message("Could not process message - " + str(e))

cb = functools.partial(ack_message, ch, delivery_tag)
connection.add_callback_threadsafe(cb)

Logger.log_message("Thread id: "+str(thread_id)+" Delivery tag: "+str(delivery_tag))
Logger.log_message("TOtal time taken to handle message : "+ str(time.time()-start))

# CALL BACK
## def on_message(ch, method, properties, message):
## global executor
## executor.submit(handle_message, message)

def on_message(ch, method, header_frame, message, args):
(connection, threads) = args
delivery_tag = method.delivery_tag
t = threading.Thread(target=handle_message, args=(connection, ch, delivery_tag, message))
t.start()
threads.append(t)


####################################################
@pidfile(piddir=Config.AMQ_DAEMONS['base']['pid-dir'], pidname=Config.AMQ_DAEMONS['consumer']['pid-file'])
def app_main():
global channel, results, speedvars

speedvars = SpeedVars()
speedtracker = SpeedTracker(speedvars)
speedtracker.start()

sender = ResultsSender(results, speedvars)
sender.start()


# Pika Connection
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=Config.AMQ_DAEMONS['base']['amq-host']))
channel = connection.channel()

Logger.log_message('Setting up input queue consumer')
channel.queue_declare(Config.AMQ_DAEMONS['consumer']['input'], durable=True)

#channel.basic_consume(on_message, queue=Config.AMQ_DAEMONS['consumer']['input'], no_ack=True)
channel.basic_qos(prefetch_count=1)
threads =
on_message_callback = functools.partial(on_message, args=(connection, threads))
channel.basic_consume(on_message_callback, Config.AMQ_DAEMONS['consumer']['input'])

Logger.log_message('Starting loop')
## channel.start_consuming()
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()

Wait for all to complete
for thread in threads:
thread.join()

connection.close()


app_main()


pika is not taking a lot of time to process message still i am facing connection reset issue.

**TOtal time taken to handle message : 0.0005991458892822266
**










share|improve this question

























  • What does the RabbitMQ log contain?

    – Luke Bakken
    Jan 2 at 14:39











  • @LukeBakken =ERROR REPORT==== 1-Jan-2019::12:45:17 === closing AMQP connection <0.13654.58> ([::1]:44022 -> [::1]:5672): {writer,send_failed,{error,timeout}} =ERROR REPORT==== 1-Jan-2019::12:48:19 === closing AMQP connection <0.13560.58> ([::1]:44006 -> [::1]:5672): missed heartbeats from client, timeout: 60s this is what my log file says. but i don't get it why is it missing heartbeats my script is dequeuing articles from rabbitmq queue almost every 2 3 seconds

    – irum zahra
    Jan 3 at 7:47


















1















searched through stackoverflow and posting this question because no solution worked for me and my question might be different from other question.



I am writing a script which gets an article from rabbitMQ queue and process the article to count words and extract key words from it and dump it in db. my script is working fine but after some time of execution i get this exception
(-1, "ConnectionResetError(104, 'Connection reset by peer')")



I have no idea why am I getting this. I have tried a lot of solutions available on stackover flow none is working for me. I havr written my script and tried it in two different ways. both work fine but after some time same exception occurs.



here is my first code:



def app_main():

global channel, results, speedvars
Logger.log_message('Starting app main')

# Edit 4
def pika_connect():
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=Config.AMQ_DAEMONS['base']['amq-host']))
channel = connection.channel()
print ("In pika connect")
Logger.log_message('Setting up input queue consumer')
channel.queue_declare(Config.AMQ_DAEMONS['consumer']['input'], durable=True)
channel.basic_consume(on_message, queue=Config.AMQ_DAEMONS['consumer']['input'], no_ack=True)

Logger.log_message('Starting loop')
channel.start_consuming()

#########

speedvars = SpeedVars()
speedtracker = SpeedTracker(speedvars)
speedtracker.start()

sender = ResultsSender(results, speedvars)
sender.start()


# Edit 5 starting 10 threads to listen to pika

for th in range(qthreads):
Logger.log_message('Starting thread: '+str(th))
try:
t = Thread(target=pika_connect, args=())
t.start()
except Exception as e:
Logger.error_message("Exception in starting threads " + str(e))



try:
app_main()
except Exception as e:
Logger.error_message("Exception in APP MAIN " + str(e))


here is my second code:



def app_main():

global channel, results, speedvars
Logger.log_message('Starting app main')

speedvars = SpeedVars()
speedtracker = SpeedTracker(speedvars)
speedtracker.start()

sender = ResultsSender(results, speedvars)
sender.start()

connection = pika.BlockingConnection(pika.ConnectionParameters(
host=Config.AMQ_DAEMONS['base']['amq-host']))
channel = connection.channel()
print ("In app main")
Logger.log_message('Setting up input queue consumer')
channel.queue_declare(Config.AMQ_DAEMONS['consumer']['input'], durable=True)
channel.basic_consume(on_message, queue=Config.AMQ_DAEMONS['consumer']['input'], no_ack=True)

Logger.log_message('Starting loop')

try:
channel.start_consuming()
except Exception as e:
Logger.error_message("Exception in start_consuming in main " + str(e))
raise e


try:
app_main()
except Exception as e:
Logger.error_message("Exception in APP MAIN " + str(e))




in my first code i used threading because i want to speed up the process of processing articles.


this is my call back fuction
def on_message(ch, method, properties, message):
Logger.log_message("Starting parsing new msg ")
handle_message(message)



EDIT: Full Code



import os
abspath = os.path.abspath(__file__)
dname = os.path.dirname(abspath)
os.chdir(dname)

from Modules import Logger
import pika
import Config
import json
import pickle
import Pipeline
import sys
import time
import datetime
import threading
import queue
import functools

from pid.decorator import pidfile

Logger.log_init(Config.AMQ_DAEMONS['consumer']['log-ident'])
#qthreads = Config.AMQ_DAEMONS['consumer']['threads']
results = queue.Queue()
channel = None
speedvars = None

SPD_RECEIVED = 'received'
SPD_DISCARDED = 'discarded'
SPD_SENT = 'sent'

class SpeedVars(object):
vars = {}
lock = None

def __init__(self):
self.lock = threading.Lock()

def inc(self, var):

self.lock.acquire()
try:
if var in self.vars:
self.vars[var] += 1
else:
self.vars[var] = 1
finally:
self.lock.release()


def dec(self, var):

self.lock.acquire()
try:
if var in self.vars:
self.vars[var] -= 1
else:
Logger.error_message('Cannot decrement ' + var + ', not tracked')
finally:
self.lock.release()

def get(self, var):

out = None
self.lock.acquire()
try:
if var in self.vars:
out = self.vars[var]
else:
Logger.error_message('Cannot get ' + var + ', not tracked')
finally:
self.lock.release()


return out

def get_all(self):

out = None
self.lock.acquire()
try:
out = self.vars.copy()
finally:
self.lock.release()


return out


class SpeedTracker(threading.Thread):
speedvars = None
start_ts = None
last_vars = {}

def __init__(self, speedvars):
super(SpeedTracker, self).__init__()
self.start_ts = time.time()
self.speedvars = speedvars
Logger.log_message('Setting up speed tracker')

def run(self):
while True:
time.sleep(Config.AMQ_DAEMONS['consumer']['speed-tracking-interval'])
prev = self.last_vars
cur = self.speedvars.get_all()
now = time.time()
if len(prev) > 0:
q = {}
for key in cur:
qty = cur[key] - prev[key]
avg = qty / Config.AMQ_DAEMONS['consumer']['speed-tracking-interval']
overall_avg = cur[key] / (now - self.start_ts)
Logger.log_message('Speed-tracking (' + key + '): total ' + str(cur[key])
+ ', delta ' + str(qty) + ', speed ' + '%0.2f' % avg + '/sec, '
+ ', overall speed ' + '%0.2f' % overall_avg + '/sec')
pending = cur[SPD_RECEIVED] - cur[SPD_DISCARDED] - cur[SPD_SENT]
pending_avg = pending / (now - self.start_ts)
Logger.log_message('Speed-tracking (pending): total ' + str(pending)
+ ', overall speed ' + '%0.2f' % pending_avg + '/sec')
self.last_vars = cur


class ResultsSender(threading.Thread):
channel = None
results = None
speedvars = None

def __init__(self, results, speedvars):
super(ResultsSender, self).__init__()
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=Config.AMQ_DAEMONS['base']['amq-host']))
self.channel = connection.channel()
Logger.log_message('Setting up output exchange')
self.channel.exchange_declare(exchange=Config.AMQ_DAEMONS['consumer']['output'], exchange_type='direct')
self.results = results
self.speedvars = speedvars

def run(self):
while True:
item = self.results.get()
self.channel.basic_publish(
exchange=Config.AMQ_DAEMONS['consumer']['output'],
routing_key='',
body=item)
self.speedvars.inc(SPD_SENT)

def parse_message(message):
try:
bodytxt = message.decode('UTF-8')
body = json.loads(bodytxt)
return body
except Exception as e:
Logger.error_message("Cannot parse message - " + str(e))
raise e

def get_body_elements(body):
try:
artid = str(body.get('article_id'))
article_dt = datetime.datetime.fromtimestamp(body.get('pubTime'))
date = article_dt.strftime(Config.DATE_FORMAT)
article = "n".join([body.get('title', ''), body.get('subheading', ''), body.get('content', '')])
return (artid, date, article)
except Exception as e:
Logger.error_message("Cannot retrieve article attributes " + str(e))
raise e

def process_article(id, date, text):
global results, speedvars
try:
Logger.log_message('Processing article ' + id)
keywords = Pipeline.extract_keywords(text)
send_data = {"id": id, "date": date, "keywords": keywords}
results.put(pickle.dumps(send_data))
# print('Queue Size:',results.qsize())
except Exception as e:
Logger.error_message("Problem processing article " + str(e))
raise e

def ack_message(ch, delivery_tag):
"""Note that `channel` must be the same pika channel instance via which
the message being ACKed was retrieved (AMQP protocol constraint).
"""
if channel.is_open:
channel.basic_ack(delivery_tag)
else:
Logger.error_message("Channel is already closed, so we can't ACK this message" + str(e))
# Channel is already closed, so we can't ACK this message;
# log and/or do something that makes sense for your app in this case.
#pass

def handle_message(connection, ch, delivery_tag, message):
global speedvars
start = time.time()
thread_id = threading.get_ident()

try:
speedvars.inc(SPD_RECEIVED)
body = parse_message(message)
(id, date, text) = get_body_elements(body)
words = len(text.split())
if words <= Config.AMQ_DAEMONS['consumer']['word-count-limit']:
process_article(id, date, text)
else:
Logger.log_message('Ignoring article, over word count limit')
speedvars.inc(SPD_DISCARDED)

except Exception as e:
Logger.error_message("Could not process message - " + str(e))

cb = functools.partial(ack_message, ch, delivery_tag)
connection.add_callback_threadsafe(cb)

Logger.log_message("Thread id: "+str(thread_id)+" Delivery tag: "+str(delivery_tag))
Logger.log_message("TOtal time taken to handle message : "+ str(time.time()-start))

# CALL BACK
## def on_message(ch, method, properties, message):
## global executor
## executor.submit(handle_message, message)

def on_message(ch, method, header_frame, message, args):
(connection, threads) = args
delivery_tag = method.delivery_tag
t = threading.Thread(target=handle_message, args=(connection, ch, delivery_tag, message))
t.start()
threads.append(t)


####################################################
@pidfile(piddir=Config.AMQ_DAEMONS['base']['pid-dir'], pidname=Config.AMQ_DAEMONS['consumer']['pid-file'])
def app_main():
global channel, results, speedvars

speedvars = SpeedVars()
speedtracker = SpeedTracker(speedvars)
speedtracker.start()

sender = ResultsSender(results, speedvars)
sender.start()


# Pika Connection
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=Config.AMQ_DAEMONS['base']['amq-host']))
channel = connection.channel()

Logger.log_message('Setting up input queue consumer')
channel.queue_declare(Config.AMQ_DAEMONS['consumer']['input'], durable=True)

#channel.basic_consume(on_message, queue=Config.AMQ_DAEMONS['consumer']['input'], no_ack=True)
channel.basic_qos(prefetch_count=1)
threads =
on_message_callback = functools.partial(on_message, args=(connection, threads))
channel.basic_consume(on_message_callback, Config.AMQ_DAEMONS['consumer']['input'])

Logger.log_message('Starting loop')
## channel.start_consuming()
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()

Wait for all to complete
for thread in threads:
thread.join()

connection.close()


app_main()


pika is not taking a lot of time to process message still i am facing connection reset issue.

**TOtal time taken to handle message : 0.0005991458892822266
**










share|improve this question

























  • What does the RabbitMQ log contain?

    – Luke Bakken
    Jan 2 at 14:39











  • @LukeBakken =ERROR REPORT==== 1-Jan-2019::12:45:17 === closing AMQP connection <0.13654.58> ([::1]:44022 -> [::1]:5672): {writer,send_failed,{error,timeout}} =ERROR REPORT==== 1-Jan-2019::12:48:19 === closing AMQP connection <0.13560.58> ([::1]:44006 -> [::1]:5672): missed heartbeats from client, timeout: 60s this is what my log file says. but i don't get it why is it missing heartbeats my script is dequeuing articles from rabbitmq queue almost every 2 3 seconds

    – irum zahra
    Jan 3 at 7:47
















1












1








1








searched through stackoverflow and posting this question because no solution worked for me and my question might be different from other question.



I am writing a script which gets an article from rabbitMQ queue and process the article to count words and extract key words from it and dump it in db. my script is working fine but after some time of execution i get this exception
(-1, "ConnectionResetError(104, 'Connection reset by peer')")



I have no idea why am I getting this. I have tried a lot of solutions available on stackover flow none is working for me. I havr written my script and tried it in two different ways. both work fine but after some time same exception occurs.



here is my first code:



def app_main():

global channel, results, speedvars
Logger.log_message('Starting app main')

# Edit 4
def pika_connect():
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=Config.AMQ_DAEMONS['base']['amq-host']))
channel = connection.channel()
print ("In pika connect")
Logger.log_message('Setting up input queue consumer')
channel.queue_declare(Config.AMQ_DAEMONS['consumer']['input'], durable=True)
channel.basic_consume(on_message, queue=Config.AMQ_DAEMONS['consumer']['input'], no_ack=True)

Logger.log_message('Starting loop')
channel.start_consuming()

#########

speedvars = SpeedVars()
speedtracker = SpeedTracker(speedvars)
speedtracker.start()

sender = ResultsSender(results, speedvars)
sender.start()


# Edit 5 starting 10 threads to listen to pika

for th in range(qthreads):
Logger.log_message('Starting thread: '+str(th))
try:
t = Thread(target=pika_connect, args=())
t.start()
except Exception as e:
Logger.error_message("Exception in starting threads " + str(e))



try:
app_main()
except Exception as e:
Logger.error_message("Exception in APP MAIN " + str(e))


here is my second code:



def app_main():

global channel, results, speedvars
Logger.log_message('Starting app main')

speedvars = SpeedVars()
speedtracker = SpeedTracker(speedvars)
speedtracker.start()

sender = ResultsSender(results, speedvars)
sender.start()

connection = pika.BlockingConnection(pika.ConnectionParameters(
host=Config.AMQ_DAEMONS['base']['amq-host']))
channel = connection.channel()
print ("In app main")
Logger.log_message('Setting up input queue consumer')
channel.queue_declare(Config.AMQ_DAEMONS['consumer']['input'], durable=True)
channel.basic_consume(on_message, queue=Config.AMQ_DAEMONS['consumer']['input'], no_ack=True)

Logger.log_message('Starting loop')

try:
channel.start_consuming()
except Exception as e:
Logger.error_message("Exception in start_consuming in main " + str(e))
raise e


try:
app_main()
except Exception as e:
Logger.error_message("Exception in APP MAIN " + str(e))




in my first code i used threading because i want to speed up the process of processing articles.


this is my call back fuction
def on_message(ch, method, properties, message):
Logger.log_message("Starting parsing new msg ")
handle_message(message)



EDIT: Full Code



import os
abspath = os.path.abspath(__file__)
dname = os.path.dirname(abspath)
os.chdir(dname)

from Modules import Logger
import pika
import Config
import json
import pickle
import Pipeline
import sys
import time
import datetime
import threading
import queue
import functools

from pid.decorator import pidfile

Logger.log_init(Config.AMQ_DAEMONS['consumer']['log-ident'])
#qthreads = Config.AMQ_DAEMONS['consumer']['threads']
results = queue.Queue()
channel = None
speedvars = None

SPD_RECEIVED = 'received'
SPD_DISCARDED = 'discarded'
SPD_SENT = 'sent'

class SpeedVars(object):
vars = {}
lock = None

def __init__(self):
self.lock = threading.Lock()

def inc(self, var):

self.lock.acquire()
try:
if var in self.vars:
self.vars[var] += 1
else:
self.vars[var] = 1
finally:
self.lock.release()


def dec(self, var):

self.lock.acquire()
try:
if var in self.vars:
self.vars[var] -= 1
else:
Logger.error_message('Cannot decrement ' + var + ', not tracked')
finally:
self.lock.release()

def get(self, var):

out = None
self.lock.acquire()
try:
if var in self.vars:
out = self.vars[var]
else:
Logger.error_message('Cannot get ' + var + ', not tracked')
finally:
self.lock.release()


return out

def get_all(self):

out = None
self.lock.acquire()
try:
out = self.vars.copy()
finally:
self.lock.release()


return out


class SpeedTracker(threading.Thread):
speedvars = None
start_ts = None
last_vars = {}

def __init__(self, speedvars):
super(SpeedTracker, self).__init__()
self.start_ts = time.time()
self.speedvars = speedvars
Logger.log_message('Setting up speed tracker')

def run(self):
while True:
time.sleep(Config.AMQ_DAEMONS['consumer']['speed-tracking-interval'])
prev = self.last_vars
cur = self.speedvars.get_all()
now = time.time()
if len(prev) > 0:
q = {}
for key in cur:
qty = cur[key] - prev[key]
avg = qty / Config.AMQ_DAEMONS['consumer']['speed-tracking-interval']
overall_avg = cur[key] / (now - self.start_ts)
Logger.log_message('Speed-tracking (' + key + '): total ' + str(cur[key])
+ ', delta ' + str(qty) + ', speed ' + '%0.2f' % avg + '/sec, '
+ ', overall speed ' + '%0.2f' % overall_avg + '/sec')
pending = cur[SPD_RECEIVED] - cur[SPD_DISCARDED] - cur[SPD_SENT]
pending_avg = pending / (now - self.start_ts)
Logger.log_message('Speed-tracking (pending): total ' + str(pending)
+ ', overall speed ' + '%0.2f' % pending_avg + '/sec')
self.last_vars = cur


class ResultsSender(threading.Thread):
channel = None
results = None
speedvars = None

def __init__(self, results, speedvars):
super(ResultsSender, self).__init__()
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=Config.AMQ_DAEMONS['base']['amq-host']))
self.channel = connection.channel()
Logger.log_message('Setting up output exchange')
self.channel.exchange_declare(exchange=Config.AMQ_DAEMONS['consumer']['output'], exchange_type='direct')
self.results = results
self.speedvars = speedvars

def run(self):
while True:
item = self.results.get()
self.channel.basic_publish(
exchange=Config.AMQ_DAEMONS['consumer']['output'],
routing_key='',
body=item)
self.speedvars.inc(SPD_SENT)

def parse_message(message):
try:
bodytxt = message.decode('UTF-8')
body = json.loads(bodytxt)
return body
except Exception as e:
Logger.error_message("Cannot parse message - " + str(e))
raise e

def get_body_elements(body):
try:
artid = str(body.get('article_id'))
article_dt = datetime.datetime.fromtimestamp(body.get('pubTime'))
date = article_dt.strftime(Config.DATE_FORMAT)
article = "n".join([body.get('title', ''), body.get('subheading', ''), body.get('content', '')])
return (artid, date, article)
except Exception as e:
Logger.error_message("Cannot retrieve article attributes " + str(e))
raise e

def process_article(id, date, text):
global results, speedvars
try:
Logger.log_message('Processing article ' + id)
keywords = Pipeline.extract_keywords(text)
send_data = {"id": id, "date": date, "keywords": keywords}
results.put(pickle.dumps(send_data))
# print('Queue Size:',results.qsize())
except Exception as e:
Logger.error_message("Problem processing article " + str(e))
raise e

def ack_message(ch, delivery_tag):
"""Note that `channel` must be the same pika channel instance via which
the message being ACKed was retrieved (AMQP protocol constraint).
"""
if channel.is_open:
channel.basic_ack(delivery_tag)
else:
Logger.error_message("Channel is already closed, so we can't ACK this message" + str(e))
# Channel is already closed, so we can't ACK this message;
# log and/or do something that makes sense for your app in this case.
#pass

def handle_message(connection, ch, delivery_tag, message):
global speedvars
start = time.time()
thread_id = threading.get_ident()

try:
speedvars.inc(SPD_RECEIVED)
body = parse_message(message)
(id, date, text) = get_body_elements(body)
words = len(text.split())
if words <= Config.AMQ_DAEMONS['consumer']['word-count-limit']:
process_article(id, date, text)
else:
Logger.log_message('Ignoring article, over word count limit')
speedvars.inc(SPD_DISCARDED)

except Exception as e:
Logger.error_message("Could not process message - " + str(e))

cb = functools.partial(ack_message, ch, delivery_tag)
connection.add_callback_threadsafe(cb)

Logger.log_message("Thread id: "+str(thread_id)+" Delivery tag: "+str(delivery_tag))
Logger.log_message("TOtal time taken to handle message : "+ str(time.time()-start))

# CALL BACK
## def on_message(ch, method, properties, message):
## global executor
## executor.submit(handle_message, message)

def on_message(ch, method, header_frame, message, args):
(connection, threads) = args
delivery_tag = method.delivery_tag
t = threading.Thread(target=handle_message, args=(connection, ch, delivery_tag, message))
t.start()
threads.append(t)


####################################################
@pidfile(piddir=Config.AMQ_DAEMONS['base']['pid-dir'], pidname=Config.AMQ_DAEMONS['consumer']['pid-file'])
def app_main():
global channel, results, speedvars

speedvars = SpeedVars()
speedtracker = SpeedTracker(speedvars)
speedtracker.start()

sender = ResultsSender(results, speedvars)
sender.start()


# Pika Connection
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=Config.AMQ_DAEMONS['base']['amq-host']))
channel = connection.channel()

Logger.log_message('Setting up input queue consumer')
channel.queue_declare(Config.AMQ_DAEMONS['consumer']['input'], durable=True)

#channel.basic_consume(on_message, queue=Config.AMQ_DAEMONS['consumer']['input'], no_ack=True)
channel.basic_qos(prefetch_count=1)
threads =
on_message_callback = functools.partial(on_message, args=(connection, threads))
channel.basic_consume(on_message_callback, Config.AMQ_DAEMONS['consumer']['input'])

Logger.log_message('Starting loop')
## channel.start_consuming()
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()

Wait for all to complete
for thread in threads:
thread.join()

connection.close()


app_main()


pika is not taking a lot of time to process message still i am facing connection reset issue.

**TOtal time taken to handle message : 0.0005991458892822266
**










share|improve this question
















searched through stackoverflow and posting this question because no solution worked for me and my question might be different from other question.



I am writing a script which gets an article from rabbitMQ queue and process the article to count words and extract key words from it and dump it in db. my script is working fine but after some time of execution i get this exception
(-1, "ConnectionResetError(104, 'Connection reset by peer')")



I have no idea why am I getting this. I have tried a lot of solutions available on stackover flow none is working for me. I havr written my script and tried it in two different ways. both work fine but after some time same exception occurs.



here is my first code:



def app_main():

global channel, results, speedvars
Logger.log_message('Starting app main')

# Edit 4
def pika_connect():
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=Config.AMQ_DAEMONS['base']['amq-host']))
channel = connection.channel()
print ("In pika connect")
Logger.log_message('Setting up input queue consumer')
channel.queue_declare(Config.AMQ_DAEMONS['consumer']['input'], durable=True)
channel.basic_consume(on_message, queue=Config.AMQ_DAEMONS['consumer']['input'], no_ack=True)

Logger.log_message('Starting loop')
channel.start_consuming()

#########

speedvars = SpeedVars()
speedtracker = SpeedTracker(speedvars)
speedtracker.start()

sender = ResultsSender(results, speedvars)
sender.start()


# Edit 5 starting 10 threads to listen to pika

for th in range(qthreads):
Logger.log_message('Starting thread: '+str(th))
try:
t = Thread(target=pika_connect, args=())
t.start()
except Exception as e:
Logger.error_message("Exception in starting threads " + str(e))



try:
app_main()
except Exception as e:
Logger.error_message("Exception in APP MAIN " + str(e))


here is my second code:



def app_main():

global channel, results, speedvars
Logger.log_message('Starting app main')

speedvars = SpeedVars()
speedtracker = SpeedTracker(speedvars)
speedtracker.start()

sender = ResultsSender(results, speedvars)
sender.start()

connection = pika.BlockingConnection(pika.ConnectionParameters(
host=Config.AMQ_DAEMONS['base']['amq-host']))
channel = connection.channel()
print ("In app main")
Logger.log_message('Setting up input queue consumer')
channel.queue_declare(Config.AMQ_DAEMONS['consumer']['input'], durable=True)
channel.basic_consume(on_message, queue=Config.AMQ_DAEMONS['consumer']['input'], no_ack=True)

Logger.log_message('Starting loop')

try:
channel.start_consuming()
except Exception as e:
Logger.error_message("Exception in start_consuming in main " + str(e))
raise e


try:
app_main()
except Exception as e:
Logger.error_message("Exception in APP MAIN " + str(e))




in my first code i used threading because i want to speed up the process of processing articles.


this is my call back fuction
def on_message(ch, method, properties, message):
Logger.log_message("Starting parsing new msg ")
handle_message(message)



EDIT: Full Code



import os
abspath = os.path.abspath(__file__)
dname = os.path.dirname(abspath)
os.chdir(dname)

from Modules import Logger
import pika
import Config
import json
import pickle
import Pipeline
import sys
import time
import datetime
import threading
import queue
import functools

from pid.decorator import pidfile

Logger.log_init(Config.AMQ_DAEMONS['consumer']['log-ident'])
#qthreads = Config.AMQ_DAEMONS['consumer']['threads']
results = queue.Queue()
channel = None
speedvars = None

SPD_RECEIVED = 'received'
SPD_DISCARDED = 'discarded'
SPD_SENT = 'sent'

class SpeedVars(object):
vars = {}
lock = None

def __init__(self):
self.lock = threading.Lock()

def inc(self, var):

self.lock.acquire()
try:
if var in self.vars:
self.vars[var] += 1
else:
self.vars[var] = 1
finally:
self.lock.release()


def dec(self, var):

self.lock.acquire()
try:
if var in self.vars:
self.vars[var] -= 1
else:
Logger.error_message('Cannot decrement ' + var + ', not tracked')
finally:
self.lock.release()

def get(self, var):

out = None
self.lock.acquire()
try:
if var in self.vars:
out = self.vars[var]
else:
Logger.error_message('Cannot get ' + var + ', not tracked')
finally:
self.lock.release()


return out

def get_all(self):

out = None
self.lock.acquire()
try:
out = self.vars.copy()
finally:
self.lock.release()


return out


class SpeedTracker(threading.Thread):
speedvars = None
start_ts = None
last_vars = {}

def __init__(self, speedvars):
super(SpeedTracker, self).__init__()
self.start_ts = time.time()
self.speedvars = speedvars
Logger.log_message('Setting up speed tracker')

def run(self):
while True:
time.sleep(Config.AMQ_DAEMONS['consumer']['speed-tracking-interval'])
prev = self.last_vars
cur = self.speedvars.get_all()
now = time.time()
if len(prev) > 0:
q = {}
for key in cur:
qty = cur[key] - prev[key]
avg = qty / Config.AMQ_DAEMONS['consumer']['speed-tracking-interval']
overall_avg = cur[key] / (now - self.start_ts)
Logger.log_message('Speed-tracking (' + key + '): total ' + str(cur[key])
+ ', delta ' + str(qty) + ', speed ' + '%0.2f' % avg + '/sec, '
+ ', overall speed ' + '%0.2f' % overall_avg + '/sec')
pending = cur[SPD_RECEIVED] - cur[SPD_DISCARDED] - cur[SPD_SENT]
pending_avg = pending / (now - self.start_ts)
Logger.log_message('Speed-tracking (pending): total ' + str(pending)
+ ', overall speed ' + '%0.2f' % pending_avg + '/sec')
self.last_vars = cur


class ResultsSender(threading.Thread):
channel = None
results = None
speedvars = None

def __init__(self, results, speedvars):
super(ResultsSender, self).__init__()
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=Config.AMQ_DAEMONS['base']['amq-host']))
self.channel = connection.channel()
Logger.log_message('Setting up output exchange')
self.channel.exchange_declare(exchange=Config.AMQ_DAEMONS['consumer']['output'], exchange_type='direct')
self.results = results
self.speedvars = speedvars

def run(self):
while True:
item = self.results.get()
self.channel.basic_publish(
exchange=Config.AMQ_DAEMONS['consumer']['output'],
routing_key='',
body=item)
self.speedvars.inc(SPD_SENT)

def parse_message(message):
try:
bodytxt = message.decode('UTF-8')
body = json.loads(bodytxt)
return body
except Exception as e:
Logger.error_message("Cannot parse message - " + str(e))
raise e

def get_body_elements(body):
try:
artid = str(body.get('article_id'))
article_dt = datetime.datetime.fromtimestamp(body.get('pubTime'))
date = article_dt.strftime(Config.DATE_FORMAT)
article = "n".join([body.get('title', ''), body.get('subheading', ''), body.get('content', '')])
return (artid, date, article)
except Exception as e:
Logger.error_message("Cannot retrieve article attributes " + str(e))
raise e

def process_article(id, date, text):
global results, speedvars
try:
Logger.log_message('Processing article ' + id)
keywords = Pipeline.extract_keywords(text)
send_data = {"id": id, "date": date, "keywords": keywords}
results.put(pickle.dumps(send_data))
# print('Queue Size:',results.qsize())
except Exception as e:
Logger.error_message("Problem processing article " + str(e))
raise e

def ack_message(ch, delivery_tag):
"""Note that `channel` must be the same pika channel instance via which
the message being ACKed was retrieved (AMQP protocol constraint).
"""
if channel.is_open:
channel.basic_ack(delivery_tag)
else:
Logger.error_message("Channel is already closed, so we can't ACK this message" + str(e))
# Channel is already closed, so we can't ACK this message;
# log and/or do something that makes sense for your app in this case.
#pass

def handle_message(connection, ch, delivery_tag, message):
global speedvars
start = time.time()
thread_id = threading.get_ident()

try:
speedvars.inc(SPD_RECEIVED)
body = parse_message(message)
(id, date, text) = get_body_elements(body)
words = len(text.split())
if words <= Config.AMQ_DAEMONS['consumer']['word-count-limit']:
process_article(id, date, text)
else:
Logger.log_message('Ignoring article, over word count limit')
speedvars.inc(SPD_DISCARDED)

except Exception as e:
Logger.error_message("Could not process message - " + str(e))

cb = functools.partial(ack_message, ch, delivery_tag)
connection.add_callback_threadsafe(cb)

Logger.log_message("Thread id: "+str(thread_id)+" Delivery tag: "+str(delivery_tag))
Logger.log_message("TOtal time taken to handle message : "+ str(time.time()-start))

# CALL BACK
## def on_message(ch, method, properties, message):
## global executor
## executor.submit(handle_message, message)

def on_message(ch, method, header_frame, message, args):
(connection, threads) = args
delivery_tag = method.delivery_tag
t = threading.Thread(target=handle_message, args=(connection, ch, delivery_tag, message))
t.start()
threads.append(t)


####################################################
@pidfile(piddir=Config.AMQ_DAEMONS['base']['pid-dir'], pidname=Config.AMQ_DAEMONS['consumer']['pid-file'])
def app_main():
global channel, results, speedvars

speedvars = SpeedVars()
speedtracker = SpeedTracker(speedvars)
speedtracker.start()

sender = ResultsSender(results, speedvars)
sender.start()


# Pika Connection
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=Config.AMQ_DAEMONS['base']['amq-host']))
channel = connection.channel()

Logger.log_message('Setting up input queue consumer')
channel.queue_declare(Config.AMQ_DAEMONS['consumer']['input'], durable=True)

#channel.basic_consume(on_message, queue=Config.AMQ_DAEMONS['consumer']['input'], no_ack=True)
channel.basic_qos(prefetch_count=1)
threads =
on_message_callback = functools.partial(on_message, args=(connection, threads))
channel.basic_consume(on_message_callback, Config.AMQ_DAEMONS['consumer']['input'])

Logger.log_message('Starting loop')
## channel.start_consuming()
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()

Wait for all to complete
for thread in threads:
thread.join()

connection.close()


app_main()


pika is not taking a lot of time to process message still i am facing connection reset issue.

**TOtal time taken to handle message : 0.0005991458892822266
**







python-3.x multithreading rabbitmq pika python-pika






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Jan 9 at 6:34







irum zahra

















asked Jan 2 at 8:46









irum zahrairum zahra

7910




7910













  • What does the RabbitMQ log contain?

    – Luke Bakken
    Jan 2 at 14:39











  • @LukeBakken =ERROR REPORT==== 1-Jan-2019::12:45:17 === closing AMQP connection <0.13654.58> ([::1]:44022 -> [::1]:5672): {writer,send_failed,{error,timeout}} =ERROR REPORT==== 1-Jan-2019::12:48:19 === closing AMQP connection <0.13560.58> ([::1]:44006 -> [::1]:5672): missed heartbeats from client, timeout: 60s this is what my log file says. but i don't get it why is it missing heartbeats my script is dequeuing articles from rabbitmq queue almost every 2 3 seconds

    – irum zahra
    Jan 3 at 7:47





















  • What does the RabbitMQ log contain?

    – Luke Bakken
    Jan 2 at 14:39











  • @LukeBakken =ERROR REPORT==== 1-Jan-2019::12:45:17 === closing AMQP connection <0.13654.58> ([::1]:44022 -> [::1]:5672): {writer,send_failed,{error,timeout}} =ERROR REPORT==== 1-Jan-2019::12:48:19 === closing AMQP connection <0.13560.58> ([::1]:44006 -> [::1]:5672): missed heartbeats from client, timeout: 60s this is what my log file says. but i don't get it why is it missing heartbeats my script is dequeuing articles from rabbitmq queue almost every 2 3 seconds

    – irum zahra
    Jan 3 at 7:47



















What does the RabbitMQ log contain?

– Luke Bakken
Jan 2 at 14:39





What does the RabbitMQ log contain?

– Luke Bakken
Jan 2 at 14:39













@LukeBakken =ERROR REPORT==== 1-Jan-2019::12:45:17 === closing AMQP connection <0.13654.58> ([::1]:44022 -> [::1]:5672): {writer,send_failed,{error,timeout}} =ERROR REPORT==== 1-Jan-2019::12:48:19 === closing AMQP connection <0.13560.58> ([::1]:44006 -> [::1]:5672): missed heartbeats from client, timeout: 60s this is what my log file says. but i don't get it why is it missing heartbeats my script is dequeuing articles from rabbitmq queue almost every 2 3 seconds

– irum zahra
Jan 3 at 7:47







@LukeBakken =ERROR REPORT==== 1-Jan-2019::12:45:17 === closing AMQP connection <0.13654.58> ([::1]:44022 -> [::1]:5672): {writer,send_failed,{error,timeout}} =ERROR REPORT==== 1-Jan-2019::12:48:19 === closing AMQP connection <0.13560.58> ([::1]:44006 -> [::1]:5672): missed heartbeats from client, timeout: 60s this is what my log file says. but i don't get it why is it missing heartbeats my script is dequeuing articles from rabbitmq queue almost every 2 3 seconds

– irum zahra
Jan 3 at 7:47














1 Answer
1






active

oldest

votes


















1














Your handle_message method is blocking heartbeats because all of your code, including the Pika I/O loop, is running on the same thread. Check out this example of how to run your work (handle_message) on a separate thread from Pikas I/O loop and then acknowledge messages correctly.





NOTE: the RabbitMQ team monitors the rabbitmq-users mailing list and only sometimes answers questions on StackOverflow.






share|improve this answer
























  • Using this example connection error problem has been solved. but now mermory starts increasing slowly and keeps on increasing. I don't know whats causing this issue.

    – irum zahra
    Jan 4 at 10:28











  • It's probably an issue in your code, but since you don't provide your code, it is impossible for anyone to continue to help.

    – Luke Bakken
    Jan 4 at 13:36











  • I have added full code of my consumer can you check the new edit ? do let me knowif you can help. thanks in advance :) my code is continuously processing articles and i do not want to stop it ever.

    – irum zahra
    Jan 7 at 7:33






  • 1





    The example code for using threads is just that, an example, and is not meant to be considered "production code". What do you think happens in your application as you keep appending to the threads list?

    – Luke Bakken
    Jan 7 at 13:40











  • I tried it without appending as well because i thought that might be he issue as my consumer never stops and it never execute join() part. but it is not the problem. something somewhere is eating the memory and I still do not know whats wrong with my threads.

    – irum zahra
    Jan 8 at 6:45











Your Answer






StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");

StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});

function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});


}
});














draft saved

draft discarded


















StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54003433%2frabbitmq-pika-connection-reset-1-connectionreseterror104-connection-rese%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown

























1 Answer
1






active

oldest

votes








1 Answer
1






active

oldest

votes









active

oldest

votes






active

oldest

votes









1














Your handle_message method is blocking heartbeats because all of your code, including the Pika I/O loop, is running on the same thread. Check out this example of how to run your work (handle_message) on a separate thread from Pikas I/O loop and then acknowledge messages correctly.





NOTE: the RabbitMQ team monitors the rabbitmq-users mailing list and only sometimes answers questions on StackOverflow.






share|improve this answer
























  • Using this example connection error problem has been solved. but now mermory starts increasing slowly and keeps on increasing. I don't know whats causing this issue.

    – irum zahra
    Jan 4 at 10:28











  • It's probably an issue in your code, but since you don't provide your code, it is impossible for anyone to continue to help.

    – Luke Bakken
    Jan 4 at 13:36











  • I have added full code of my consumer can you check the new edit ? do let me knowif you can help. thanks in advance :) my code is continuously processing articles and i do not want to stop it ever.

    – irum zahra
    Jan 7 at 7:33






  • 1





    The example code for using threads is just that, an example, and is not meant to be considered "production code". What do you think happens in your application as you keep appending to the threads list?

    – Luke Bakken
    Jan 7 at 13:40











  • I tried it without appending as well because i thought that might be he issue as my consumer never stops and it never execute join() part. but it is not the problem. something somewhere is eating the memory and I still do not know whats wrong with my threads.

    – irum zahra
    Jan 8 at 6:45
















1














Your handle_message method is blocking heartbeats because all of your code, including the Pika I/O loop, is running on the same thread. Check out this example of how to run your work (handle_message) on a separate thread from Pikas I/O loop and then acknowledge messages correctly.





NOTE: the RabbitMQ team monitors the rabbitmq-users mailing list and only sometimes answers questions on StackOverflow.






share|improve this answer
























  • Using this example connection error problem has been solved. but now mermory starts increasing slowly and keeps on increasing. I don't know whats causing this issue.

    – irum zahra
    Jan 4 at 10:28











  • It's probably an issue in your code, but since you don't provide your code, it is impossible for anyone to continue to help.

    – Luke Bakken
    Jan 4 at 13:36











  • I have added full code of my consumer can you check the new edit ? do let me knowif you can help. thanks in advance :) my code is continuously processing articles and i do not want to stop it ever.

    – irum zahra
    Jan 7 at 7:33






  • 1





    The example code for using threads is just that, an example, and is not meant to be considered "production code". What do you think happens in your application as you keep appending to the threads list?

    – Luke Bakken
    Jan 7 at 13:40











  • I tried it without appending as well because i thought that might be he issue as my consumer never stops and it never execute join() part. but it is not the problem. something somewhere is eating the memory and I still do not know whats wrong with my threads.

    – irum zahra
    Jan 8 at 6:45














1












1








1







Your handle_message method is blocking heartbeats because all of your code, including the Pika I/O loop, is running on the same thread. Check out this example of how to run your work (handle_message) on a separate thread from Pikas I/O loop and then acknowledge messages correctly.





NOTE: the RabbitMQ team monitors the rabbitmq-users mailing list and only sometimes answers questions on StackOverflow.






share|improve this answer













Your handle_message method is blocking heartbeats because all of your code, including the Pika I/O loop, is running on the same thread. Check out this example of how to run your work (handle_message) on a separate thread from Pikas I/O loop and then acknowledge messages correctly.





NOTE: the RabbitMQ team monitors the rabbitmq-users mailing list and only sometimes answers questions on StackOverflow.







share|improve this answer












share|improve this answer



share|improve this answer










answered Jan 3 at 13:13









Luke BakkenLuke Bakken

3,37221017




3,37221017













  • Using this example connection error problem has been solved. but now mermory starts increasing slowly and keeps on increasing. I don't know whats causing this issue.

    – irum zahra
    Jan 4 at 10:28











  • It's probably an issue in your code, but since you don't provide your code, it is impossible for anyone to continue to help.

    – Luke Bakken
    Jan 4 at 13:36











  • I have added full code of my consumer can you check the new edit ? do let me knowif you can help. thanks in advance :) my code is continuously processing articles and i do not want to stop it ever.

    – irum zahra
    Jan 7 at 7:33






  • 1





    The example code for using threads is just that, an example, and is not meant to be considered "production code". What do you think happens in your application as you keep appending to the threads list?

    – Luke Bakken
    Jan 7 at 13:40











  • I tried it without appending as well because i thought that might be he issue as my consumer never stops and it never execute join() part. but it is not the problem. something somewhere is eating the memory and I still do not know whats wrong with my threads.

    – irum zahra
    Jan 8 at 6:45



















  • Using this example connection error problem has been solved. but now mermory starts increasing slowly and keeps on increasing. I don't know whats causing this issue.

    – irum zahra
    Jan 4 at 10:28











  • It's probably an issue in your code, but since you don't provide your code, it is impossible for anyone to continue to help.

    – Luke Bakken
    Jan 4 at 13:36











  • I have added full code of my consumer can you check the new edit ? do let me knowif you can help. thanks in advance :) my code is continuously processing articles and i do not want to stop it ever.

    – irum zahra
    Jan 7 at 7:33






  • 1





    The example code for using threads is just that, an example, and is not meant to be considered "production code". What do you think happens in your application as you keep appending to the threads list?

    – Luke Bakken
    Jan 7 at 13:40











  • I tried it without appending as well because i thought that might be he issue as my consumer never stops and it never execute join() part. but it is not the problem. something somewhere is eating the memory and I still do not know whats wrong with my threads.

    – irum zahra
    Jan 8 at 6:45

















Using this example connection error problem has been solved. but now mermory starts increasing slowly and keeps on increasing. I don't know whats causing this issue.

– irum zahra
Jan 4 at 10:28





Using this example connection error problem has been solved. but now mermory starts increasing slowly and keeps on increasing. I don't know whats causing this issue.

– irum zahra
Jan 4 at 10:28













It's probably an issue in your code, but since you don't provide your code, it is impossible for anyone to continue to help.

– Luke Bakken
Jan 4 at 13:36





It's probably an issue in your code, but since you don't provide your code, it is impossible for anyone to continue to help.

– Luke Bakken
Jan 4 at 13:36













I have added full code of my consumer can you check the new edit ? do let me knowif you can help. thanks in advance :) my code is continuously processing articles and i do not want to stop it ever.

– irum zahra
Jan 7 at 7:33





I have added full code of my consumer can you check the new edit ? do let me knowif you can help. thanks in advance :) my code is continuously processing articles and i do not want to stop it ever.

– irum zahra
Jan 7 at 7:33




1




1





The example code for using threads is just that, an example, and is not meant to be considered "production code". What do you think happens in your application as you keep appending to the threads list?

– Luke Bakken
Jan 7 at 13:40





The example code for using threads is just that, an example, and is not meant to be considered "production code". What do you think happens in your application as you keep appending to the threads list?

– Luke Bakken
Jan 7 at 13:40













I tried it without appending as well because i thought that might be he issue as my consumer never stops and it never execute join() part. but it is not the problem. something somewhere is eating the memory and I still do not know whats wrong with my threads.

– irum zahra
Jan 8 at 6:45





I tried it without appending as well because i thought that might be he issue as my consumer never stops and it never execute join() part. but it is not the problem. something somewhere is eating the memory and I still do not know whats wrong with my threads.

– irum zahra
Jan 8 at 6:45




















draft saved

draft discarded




















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid



  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.


To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54003433%2frabbitmq-pika-connection-reset-1-connectionreseterror104-connection-rese%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

Can a sorcerer learn a 5th-level spell early by creating spell slots using the Font of Magic feature?

Does disintegrating a polymorphed enemy still kill it after the 2018 errata?

A Topological Invariant for $pi_3(U(n))$