Spring amqp reject message outside a listener












1















The application uses java 10, spring amqp and rabbitmq.



The system has a dead letter queue where we send some messages (they couldn't be processed as expected because of database unavailability).



For now, database availability is checked every X seconds and, if available only, we re-queue messages to their original queue. Otherwise we do nothing and messages stays in the dead letter queue.



When re-queued to original queue, messages can go back to dead letter queue again and see the x-death header count growing.



For some reasons, we would like to process dead-lettered messages that have count >= 5 (for example) and re-queue others to the dead letter queue.



I need to basic ack the message first to check the x-death count header, then send it to the original queue if count is big enough, else re-queue in dead letter queue.



I can't manage to re-queue to dead letter queue because the basic get in not inside a listener: throwing AmqpRejectAndDontRequeueException doesn't work as the exception is not thrown inside a rabbitmq listener object.



I tried throwing the exception inside a receiveAndCallback method, but this seems not better:



rabbitTemplate.receiveAndReply(queueName, new ReceiveAndReplyCallback<Message, Object>() {

@Override
public Object handle(Message message) {
Long messageXdeathCount = null;
if (null != message.getMessageProperties() && null != message.getMessageProperties().getHeaders()) {
List<Map<String, ?>> xdeathHeader =
(List<Map<String, ?>>) message.getMessageProperties().getHeaders().get(
"x-death");
if (null != xdeathHeader && null != xdeathHeader.get(0)) {
messageXdeathCount = (Long) xdeathHeader.get(0).get("count");
}
}
if (messageXdeathCount == null) {
messageXdeathCount = 0L;
}
if (messageXdeathCount >= 5) {
resendsMessage(message);
} else {
// this does not reject the message
throw new AmqpRejectAndDontRequeueException("rejected");
}
return null;
}
});
return receive;
}


After this method execution, the message is not rejected as I expect and is away from the queue (it has been acked).



Here is the exchange and queue declaration:



@Bean
public Exchange exchange() {
TopicExchange exchange = new TopicExchange(EXCHANGE, true, false);
admin().declareExchange(exchange);
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", EXCHANGE);
Queue queue = new Queue("queueName", true, false, false, args);
admin().declareQueue(queue);
Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey).noargs();
admin().declareBinding(binding);
return exchange;
}


How can reject the messages in the dead letter queue without using the AmqpRejectAndDontRequeueException?
Is is possible for an exchange to have x-dead-letter-exchange set to self?



Thanks for your help



UPDATE



I tried another way, with channel get and reject:



// exchange creation
@Bean
public Exchange exchange() throws IOException {
Connection connection = connectionFactory().createConnection();
Channel channel = channel();
channel.exchangeDeclare(EXCHANGE, ExchangeTypes.TOPIC, true, false, null);
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", EXCHANGE);
channel.queueDeclare("queueName", true, false, false, args);
channel.queueBind("queueName", EXCHANGE, routingKey);
return exchange;
}


Message get and ack or reject:



GetResponse response = channel.basicGet(queueName, false);
Long messageXdeathCount = null;
if(null != response.getProps() && null != response.getProps().getHeaders()) {
List<Map<String, ?>> xdeathHeader =
(List<Map<String, ?>>) response.getProps().getHeaders().get("x-death");
if(null != xdeathHeader && null != xdeathHeader.get(0)) {
messageXdeathCount = (Long) xdeathHeader.get(0).get("count");
}
}
if (messageXdeathCount == null) {
messageXdeathCount = 0L;
}
if (messageXdeathCount >= 5) {
MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();
MessageProperties messageProps =
messagePropertiesConverter.toMessageProperties(response.getProps(),
response.getEnvelope(), "UTF-8");
resendsMessage(new Message(response.getBody(), messageProps));
channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
} else {
if(response.getProps().getHeaders().get("x-death") == null) {
response.getProps().getHeaders().put("x-death", new ArrayList<>());
}
if(((List<Map<String, Object>>) response.getProps().getHeaders().get("x-death")).get(0) == null) {
((List<Map<String, Object>>)response.getProps().getHeaders().get("x-death")).add(new HashMap<>());
}
((List<Map<String, Object>>) response.getProps().getHeaders().get("x-death")).get(0).put(
"count", messageXdeathCount + 1);
channel.basicReject(response.getEnvelope().getDeliveryTag(), true);
}


First I realized that it was quite ugly, then that messages cannot be updated between get and rejected. It there a way to use channel.basicReject and update the x-death count header?










share|improve this question





























    1















    The application uses java 10, spring amqp and rabbitmq.



    The system has a dead letter queue where we send some messages (they couldn't be processed as expected because of database unavailability).



    For now, database availability is checked every X seconds and, if available only, we re-queue messages to their original queue. Otherwise we do nothing and messages stays in the dead letter queue.



    When re-queued to original queue, messages can go back to dead letter queue again and see the x-death header count growing.



    For some reasons, we would like to process dead-lettered messages that have count >= 5 (for example) and re-queue others to the dead letter queue.



    I need to basic ack the message first to check the x-death count header, then send it to the original queue if count is big enough, else re-queue in dead letter queue.



    I can't manage to re-queue to dead letter queue because the basic get in not inside a listener: throwing AmqpRejectAndDontRequeueException doesn't work as the exception is not thrown inside a rabbitmq listener object.



    I tried throwing the exception inside a receiveAndCallback method, but this seems not better:



    rabbitTemplate.receiveAndReply(queueName, new ReceiveAndReplyCallback<Message, Object>() {

    @Override
    public Object handle(Message message) {
    Long messageXdeathCount = null;
    if (null != message.getMessageProperties() && null != message.getMessageProperties().getHeaders()) {
    List<Map<String, ?>> xdeathHeader =
    (List<Map<String, ?>>) message.getMessageProperties().getHeaders().get(
    "x-death");
    if (null != xdeathHeader && null != xdeathHeader.get(0)) {
    messageXdeathCount = (Long) xdeathHeader.get(0).get("count");
    }
    }
    if (messageXdeathCount == null) {
    messageXdeathCount = 0L;
    }
    if (messageXdeathCount >= 5) {
    resendsMessage(message);
    } else {
    // this does not reject the message
    throw new AmqpRejectAndDontRequeueException("rejected");
    }
    return null;
    }
    });
    return receive;
    }


    After this method execution, the message is not rejected as I expect and is away from the queue (it has been acked).



    Here is the exchange and queue declaration:



    @Bean
    public Exchange exchange() {
    TopicExchange exchange = new TopicExchange(EXCHANGE, true, false);
    admin().declareExchange(exchange);
    Map<String, Object> args = new HashMap<String, Object>();
    args.put("x-dead-letter-exchange", EXCHANGE);
    Queue queue = new Queue("queueName", true, false, false, args);
    admin().declareQueue(queue);
    Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey).noargs();
    admin().declareBinding(binding);
    return exchange;
    }


    How can reject the messages in the dead letter queue without using the AmqpRejectAndDontRequeueException?
    Is is possible for an exchange to have x-dead-letter-exchange set to self?



    Thanks for your help



    UPDATE



    I tried another way, with channel get and reject:



    // exchange creation
    @Bean
    public Exchange exchange() throws IOException {
    Connection connection = connectionFactory().createConnection();
    Channel channel = channel();
    channel.exchangeDeclare(EXCHANGE, ExchangeTypes.TOPIC, true, false, null);
    Map<String, Object> args = new HashMap<String, Object>();
    args.put("x-dead-letter-exchange", EXCHANGE);
    channel.queueDeclare("queueName", true, false, false, args);
    channel.queueBind("queueName", EXCHANGE, routingKey);
    return exchange;
    }


    Message get and ack or reject:



    GetResponse response = channel.basicGet(queueName, false);
    Long messageXdeathCount = null;
    if(null != response.getProps() && null != response.getProps().getHeaders()) {
    List<Map<String, ?>> xdeathHeader =
    (List<Map<String, ?>>) response.getProps().getHeaders().get("x-death");
    if(null != xdeathHeader && null != xdeathHeader.get(0)) {
    messageXdeathCount = (Long) xdeathHeader.get(0).get("count");
    }
    }
    if (messageXdeathCount == null) {
    messageXdeathCount = 0L;
    }
    if (messageXdeathCount >= 5) {
    MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();
    MessageProperties messageProps =
    messagePropertiesConverter.toMessageProperties(response.getProps(),
    response.getEnvelope(), "UTF-8");
    resendsMessage(new Message(response.getBody(), messageProps));
    channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
    } else {
    if(response.getProps().getHeaders().get("x-death") == null) {
    response.getProps().getHeaders().put("x-death", new ArrayList<>());
    }
    if(((List<Map<String, Object>>) response.getProps().getHeaders().get("x-death")).get(0) == null) {
    ((List<Map<String, Object>>)response.getProps().getHeaders().get("x-death")).add(new HashMap<>());
    }
    ((List<Map<String, Object>>) response.getProps().getHeaders().get("x-death")).get(0).put(
    "count", messageXdeathCount + 1);
    channel.basicReject(response.getEnvelope().getDeliveryTag(), true);
    }


    First I realized that it was quite ugly, then that messages cannot be updated between get and rejected. It there a way to use channel.basicReject and update the x-death count header?










    share|improve this question



























      1












      1








      1








      The application uses java 10, spring amqp and rabbitmq.



      The system has a dead letter queue where we send some messages (they couldn't be processed as expected because of database unavailability).



      For now, database availability is checked every X seconds and, if available only, we re-queue messages to their original queue. Otherwise we do nothing and messages stays in the dead letter queue.



      When re-queued to original queue, messages can go back to dead letter queue again and see the x-death header count growing.



      For some reasons, we would like to process dead-lettered messages that have count >= 5 (for example) and re-queue others to the dead letter queue.



      I need to basic ack the message first to check the x-death count header, then send it to the original queue if count is big enough, else re-queue in dead letter queue.



      I can't manage to re-queue to dead letter queue because the basic get in not inside a listener: throwing AmqpRejectAndDontRequeueException doesn't work as the exception is not thrown inside a rabbitmq listener object.



      I tried throwing the exception inside a receiveAndCallback method, but this seems not better:



      rabbitTemplate.receiveAndReply(queueName, new ReceiveAndReplyCallback<Message, Object>() {

      @Override
      public Object handle(Message message) {
      Long messageXdeathCount = null;
      if (null != message.getMessageProperties() && null != message.getMessageProperties().getHeaders()) {
      List<Map<String, ?>> xdeathHeader =
      (List<Map<String, ?>>) message.getMessageProperties().getHeaders().get(
      "x-death");
      if (null != xdeathHeader && null != xdeathHeader.get(0)) {
      messageXdeathCount = (Long) xdeathHeader.get(0).get("count");
      }
      }
      if (messageXdeathCount == null) {
      messageXdeathCount = 0L;
      }
      if (messageXdeathCount >= 5) {
      resendsMessage(message);
      } else {
      // this does not reject the message
      throw new AmqpRejectAndDontRequeueException("rejected");
      }
      return null;
      }
      });
      return receive;
      }


      After this method execution, the message is not rejected as I expect and is away from the queue (it has been acked).



      Here is the exchange and queue declaration:



      @Bean
      public Exchange exchange() {
      TopicExchange exchange = new TopicExchange(EXCHANGE, true, false);
      admin().declareExchange(exchange);
      Map<String, Object> args = new HashMap<String, Object>();
      args.put("x-dead-letter-exchange", EXCHANGE);
      Queue queue = new Queue("queueName", true, false, false, args);
      admin().declareQueue(queue);
      Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey).noargs();
      admin().declareBinding(binding);
      return exchange;
      }


      How can reject the messages in the dead letter queue without using the AmqpRejectAndDontRequeueException?
      Is is possible for an exchange to have x-dead-letter-exchange set to self?



      Thanks for your help



      UPDATE



      I tried another way, with channel get and reject:



      // exchange creation
      @Bean
      public Exchange exchange() throws IOException {
      Connection connection = connectionFactory().createConnection();
      Channel channel = channel();
      channel.exchangeDeclare(EXCHANGE, ExchangeTypes.TOPIC, true, false, null);
      Map<String, Object> args = new HashMap<String, Object>();
      args.put("x-dead-letter-exchange", EXCHANGE);
      channel.queueDeclare("queueName", true, false, false, args);
      channel.queueBind("queueName", EXCHANGE, routingKey);
      return exchange;
      }


      Message get and ack or reject:



      GetResponse response = channel.basicGet(queueName, false);
      Long messageXdeathCount = null;
      if(null != response.getProps() && null != response.getProps().getHeaders()) {
      List<Map<String, ?>> xdeathHeader =
      (List<Map<String, ?>>) response.getProps().getHeaders().get("x-death");
      if(null != xdeathHeader && null != xdeathHeader.get(0)) {
      messageXdeathCount = (Long) xdeathHeader.get(0).get("count");
      }
      }
      if (messageXdeathCount == null) {
      messageXdeathCount = 0L;
      }
      if (messageXdeathCount >= 5) {
      MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();
      MessageProperties messageProps =
      messagePropertiesConverter.toMessageProperties(response.getProps(),
      response.getEnvelope(), "UTF-8");
      resendsMessage(new Message(response.getBody(), messageProps));
      channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
      } else {
      if(response.getProps().getHeaders().get("x-death") == null) {
      response.getProps().getHeaders().put("x-death", new ArrayList<>());
      }
      if(((List<Map<String, Object>>) response.getProps().getHeaders().get("x-death")).get(0) == null) {
      ((List<Map<String, Object>>)response.getProps().getHeaders().get("x-death")).add(new HashMap<>());
      }
      ((List<Map<String, Object>>) response.getProps().getHeaders().get("x-death")).get(0).put(
      "count", messageXdeathCount + 1);
      channel.basicReject(response.getEnvelope().getDeliveryTag(), true);
      }


      First I realized that it was quite ugly, then that messages cannot be updated between get and rejected. It there a way to use channel.basicReject and update the x-death count header?










      share|improve this question
















      The application uses java 10, spring amqp and rabbitmq.



      The system has a dead letter queue where we send some messages (they couldn't be processed as expected because of database unavailability).



      For now, database availability is checked every X seconds and, if available only, we re-queue messages to their original queue. Otherwise we do nothing and messages stays in the dead letter queue.



      When re-queued to original queue, messages can go back to dead letter queue again and see the x-death header count growing.



      For some reasons, we would like to process dead-lettered messages that have count >= 5 (for example) and re-queue others to the dead letter queue.



      I need to basic ack the message first to check the x-death count header, then send it to the original queue if count is big enough, else re-queue in dead letter queue.



      I can't manage to re-queue to dead letter queue because the basic get in not inside a listener: throwing AmqpRejectAndDontRequeueException doesn't work as the exception is not thrown inside a rabbitmq listener object.



      I tried throwing the exception inside a receiveAndCallback method, but this seems not better:



      rabbitTemplate.receiveAndReply(queueName, new ReceiveAndReplyCallback<Message, Object>() {

      @Override
      public Object handle(Message message) {
      Long messageXdeathCount = null;
      if (null != message.getMessageProperties() && null != message.getMessageProperties().getHeaders()) {
      List<Map<String, ?>> xdeathHeader =
      (List<Map<String, ?>>) message.getMessageProperties().getHeaders().get(
      "x-death");
      if (null != xdeathHeader && null != xdeathHeader.get(0)) {
      messageXdeathCount = (Long) xdeathHeader.get(0).get("count");
      }
      }
      if (messageXdeathCount == null) {
      messageXdeathCount = 0L;
      }
      if (messageXdeathCount >= 5) {
      resendsMessage(message);
      } else {
      // this does not reject the message
      throw new AmqpRejectAndDontRequeueException("rejected");
      }
      return null;
      }
      });
      return receive;
      }


      After this method execution, the message is not rejected as I expect and is away from the queue (it has been acked).



      Here is the exchange and queue declaration:



      @Bean
      public Exchange exchange() {
      TopicExchange exchange = new TopicExchange(EXCHANGE, true, false);
      admin().declareExchange(exchange);
      Map<String, Object> args = new HashMap<String, Object>();
      args.put("x-dead-letter-exchange", EXCHANGE);
      Queue queue = new Queue("queueName", true, false, false, args);
      admin().declareQueue(queue);
      Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey).noargs();
      admin().declareBinding(binding);
      return exchange;
      }


      How can reject the messages in the dead letter queue without using the AmqpRejectAndDontRequeueException?
      Is is possible for an exchange to have x-dead-letter-exchange set to self?



      Thanks for your help



      UPDATE



      I tried another way, with channel get and reject:



      // exchange creation
      @Bean
      public Exchange exchange() throws IOException {
      Connection connection = connectionFactory().createConnection();
      Channel channel = channel();
      channel.exchangeDeclare(EXCHANGE, ExchangeTypes.TOPIC, true, false, null);
      Map<String, Object> args = new HashMap<String, Object>();
      args.put("x-dead-letter-exchange", EXCHANGE);
      channel.queueDeclare("queueName", true, false, false, args);
      channel.queueBind("queueName", EXCHANGE, routingKey);
      return exchange;
      }


      Message get and ack or reject:



      GetResponse response = channel.basicGet(queueName, false);
      Long messageXdeathCount = null;
      if(null != response.getProps() && null != response.getProps().getHeaders()) {
      List<Map<String, ?>> xdeathHeader =
      (List<Map<String, ?>>) response.getProps().getHeaders().get("x-death");
      if(null != xdeathHeader && null != xdeathHeader.get(0)) {
      messageXdeathCount = (Long) xdeathHeader.get(0).get("count");
      }
      }
      if (messageXdeathCount == null) {
      messageXdeathCount = 0L;
      }
      if (messageXdeathCount >= 5) {
      MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();
      MessageProperties messageProps =
      messagePropertiesConverter.toMessageProperties(response.getProps(),
      response.getEnvelope(), "UTF-8");
      resendsMessage(new Message(response.getBody(), messageProps));
      channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
      } else {
      if(response.getProps().getHeaders().get("x-death") == null) {
      response.getProps().getHeaders().put("x-death", new ArrayList<>());
      }
      if(((List<Map<String, Object>>) response.getProps().getHeaders().get("x-death")).get(0) == null) {
      ((List<Map<String, Object>>)response.getProps().getHeaders().get("x-death")).add(new HashMap<>());
      }
      ((List<Map<String, Object>>) response.getProps().getHeaders().get("x-death")).get(0).put(
      "count", messageXdeathCount + 1);
      channel.basicReject(response.getEnvelope().getDeliveryTag(), true);
      }


      First I realized that it was quite ugly, then that messages cannot be updated between get and rejected. It there a way to use channel.basicReject and update the x-death count header?







      java rabbitmq spring-amqp






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Nov 23 '18 at 10:23







      marcesso

















      asked Nov 22 '18 at 11:17









      marcessomarcesso

      124312




      124312
























          2 Answers
          2






          active

          oldest

          votes


















          1














          receiveAndReply() methods currently do not provide control over the acknowledging of the received message. Feel free to open a New Feature Request.



          You can use a listener container instead to get the flexibility you need.



          EDIT



          You can drop down to the rabbitmq API...



          rabbitTemplate.execute(channel -> {
          // basicGet, basicPublish, ack/nack etc here
          });





          share|improve this answer


























          • I created jira.spring.io/browse/AMQP-843

            – marcesso
            Nov 23 '18 at 8:19











          • >You can use a listener container instead to get the flexibility you need. does it imply to create a listener? I can't let a listener take control over the basic.get as I did it manually

            – marcesso
            Nov 23 '18 at 8:28






          • 1





            In that case you can drop down the rabbitmq API within an template.execute().

            – Gary Russell
            Nov 23 '18 at 13:55











          • thanks for the edit: I could use the admin() declarations in rabbit configuration and use the channel where I needed

            – marcesso
            Nov 23 '18 at 14:53



















          0














          I could use the channel basic methods:



          GetResponse response = channel.basicGet(queueName, false);
          Long messageXdeathCount = 0L;
          if(null != response.getProps() && null != response.getProps().getHeaders()) {
          List<Map<String, ?>> xdeathHeader =
          (List<Map<String, ?>>) response.getProps().getHeaders().get("x-death");
          if(null != xdeathHeader && null != xdeathHeader.get(0)) {
          for (Map<String, ?> map : xdeathHeader) {
          Long count = (Long) map.get("count");
          messageXdeathCount += count == null ? 0L : count;
          }
          }
          }
          if (messageXdeathCount >= 5) {
          MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();
          MessageProperties messageProps = messagePropertiesConverter.toMessageProperties(response.getProps(), response.getEnvelope(), "UTF-8");
          resendsMessage(new Message(response.getBody(), messageProps));
          channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
          } else {
          channel.basicReject(response.getEnvelope().getDeliveryTag(), false);
          }


          the issue in the update part of my question was in last line:



          channel.basicGet(queueName, true);


          the boolean indicates if the message should be requeued or not: if not requeued, it goes to exchange letter and increments count x-death header, as expected. Boolean updated to false fixed the issue.






          share|improve this answer























            Your Answer






            StackExchange.ifUsing("editor", function () {
            StackExchange.using("externalEditor", function () {
            StackExchange.using("snippets", function () {
            StackExchange.snippets.init();
            });
            });
            }, "code-snippets");

            StackExchange.ready(function() {
            var channelOptions = {
            tags: "".split(" "),
            id: "1"
            };
            initTagRenderer("".split(" "), "".split(" "), channelOptions);

            StackExchange.using("externalEditor", function() {
            // Have to fire editor after snippets, if snippets enabled
            if (StackExchange.settings.snippets.snippetsEnabled) {
            StackExchange.using("snippets", function() {
            createEditor();
            });
            }
            else {
            createEditor();
            }
            });

            function createEditor() {
            StackExchange.prepareEditor({
            heartbeatType: 'answer',
            autoActivateHeartbeat: false,
            convertImagesToLinks: true,
            noModals: true,
            showLowRepImageUploadWarning: true,
            reputationToPostImages: 10,
            bindNavPrevention: true,
            postfix: "",
            imageUploader: {
            brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
            contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
            allowUrls: true
            },
            onDemand: true,
            discardSelector: ".discard-answer"
            ,immediatelyShowMarkdownHelp:true
            });


            }
            });














            draft saved

            draft discarded


















            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53429782%2fspring-amqp-reject-message-outside-a-listener%23new-answer', 'question_page');
            }
            );

            Post as a guest















            Required, but never shown

























            2 Answers
            2






            active

            oldest

            votes








            2 Answers
            2






            active

            oldest

            votes









            active

            oldest

            votes






            active

            oldest

            votes









            1














            receiveAndReply() methods currently do not provide control over the acknowledging of the received message. Feel free to open a New Feature Request.



            You can use a listener container instead to get the flexibility you need.



            EDIT



            You can drop down to the rabbitmq API...



            rabbitTemplate.execute(channel -> {
            // basicGet, basicPublish, ack/nack etc here
            });





            share|improve this answer


























            • I created jira.spring.io/browse/AMQP-843

              – marcesso
              Nov 23 '18 at 8:19











            • >You can use a listener container instead to get the flexibility you need. does it imply to create a listener? I can't let a listener take control over the basic.get as I did it manually

              – marcesso
              Nov 23 '18 at 8:28






            • 1





              In that case you can drop down the rabbitmq API within an template.execute().

              – Gary Russell
              Nov 23 '18 at 13:55











            • thanks for the edit: I could use the admin() declarations in rabbit configuration and use the channel where I needed

              – marcesso
              Nov 23 '18 at 14:53
















            1














            receiveAndReply() methods currently do not provide control over the acknowledging of the received message. Feel free to open a New Feature Request.



            You can use a listener container instead to get the flexibility you need.



            EDIT



            You can drop down to the rabbitmq API...



            rabbitTemplate.execute(channel -> {
            // basicGet, basicPublish, ack/nack etc here
            });





            share|improve this answer


























            • I created jira.spring.io/browse/AMQP-843

              – marcesso
              Nov 23 '18 at 8:19











            • >You can use a listener container instead to get the flexibility you need. does it imply to create a listener? I can't let a listener take control over the basic.get as I did it manually

              – marcesso
              Nov 23 '18 at 8:28






            • 1





              In that case you can drop down the rabbitmq API within an template.execute().

              – Gary Russell
              Nov 23 '18 at 13:55











            • thanks for the edit: I could use the admin() declarations in rabbit configuration and use the channel where I needed

              – marcesso
              Nov 23 '18 at 14:53














            1












            1








            1







            receiveAndReply() methods currently do not provide control over the acknowledging of the received message. Feel free to open a New Feature Request.



            You can use a listener container instead to get the flexibility you need.



            EDIT



            You can drop down to the rabbitmq API...



            rabbitTemplate.execute(channel -> {
            // basicGet, basicPublish, ack/nack etc here
            });





            share|improve this answer















            receiveAndReply() methods currently do not provide control over the acknowledging of the received message. Feel free to open a New Feature Request.



            You can use a listener container instead to get the flexibility you need.



            EDIT



            You can drop down to the rabbitmq API...



            rabbitTemplate.execute(channel -> {
            // basicGet, basicPublish, ack/nack etc here
            });






            share|improve this answer














            share|improve this answer



            share|improve this answer








            edited Nov 23 '18 at 13:54

























            answered Nov 22 '18 at 15:44









            Gary RussellGary Russell

            82.9k74974




            82.9k74974













            • I created jira.spring.io/browse/AMQP-843

              – marcesso
              Nov 23 '18 at 8:19











            • >You can use a listener container instead to get the flexibility you need. does it imply to create a listener? I can't let a listener take control over the basic.get as I did it manually

              – marcesso
              Nov 23 '18 at 8:28






            • 1





              In that case you can drop down the rabbitmq API within an template.execute().

              – Gary Russell
              Nov 23 '18 at 13:55











            • thanks for the edit: I could use the admin() declarations in rabbit configuration and use the channel where I needed

              – marcesso
              Nov 23 '18 at 14:53



















            • I created jira.spring.io/browse/AMQP-843

              – marcesso
              Nov 23 '18 at 8:19











            • >You can use a listener container instead to get the flexibility you need. does it imply to create a listener? I can't let a listener take control over the basic.get as I did it manually

              – marcesso
              Nov 23 '18 at 8:28






            • 1





              In that case you can drop down the rabbitmq API within an template.execute().

              – Gary Russell
              Nov 23 '18 at 13:55











            • thanks for the edit: I could use the admin() declarations in rabbit configuration and use the channel where I needed

              – marcesso
              Nov 23 '18 at 14:53

















            I created jira.spring.io/browse/AMQP-843

            – marcesso
            Nov 23 '18 at 8:19





            I created jira.spring.io/browse/AMQP-843

            – marcesso
            Nov 23 '18 at 8:19













            >You can use a listener container instead to get the flexibility you need. does it imply to create a listener? I can't let a listener take control over the basic.get as I did it manually

            – marcesso
            Nov 23 '18 at 8:28





            >You can use a listener container instead to get the flexibility you need. does it imply to create a listener? I can't let a listener take control over the basic.get as I did it manually

            – marcesso
            Nov 23 '18 at 8:28




            1




            1





            In that case you can drop down the rabbitmq API within an template.execute().

            – Gary Russell
            Nov 23 '18 at 13:55





            In that case you can drop down the rabbitmq API within an template.execute().

            – Gary Russell
            Nov 23 '18 at 13:55













            thanks for the edit: I could use the admin() declarations in rabbit configuration and use the channel where I needed

            – marcesso
            Nov 23 '18 at 14:53





            thanks for the edit: I could use the admin() declarations in rabbit configuration and use the channel where I needed

            – marcesso
            Nov 23 '18 at 14:53













            0














            I could use the channel basic methods:



            GetResponse response = channel.basicGet(queueName, false);
            Long messageXdeathCount = 0L;
            if(null != response.getProps() && null != response.getProps().getHeaders()) {
            List<Map<String, ?>> xdeathHeader =
            (List<Map<String, ?>>) response.getProps().getHeaders().get("x-death");
            if(null != xdeathHeader && null != xdeathHeader.get(0)) {
            for (Map<String, ?> map : xdeathHeader) {
            Long count = (Long) map.get("count");
            messageXdeathCount += count == null ? 0L : count;
            }
            }
            }
            if (messageXdeathCount >= 5) {
            MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();
            MessageProperties messageProps = messagePropertiesConverter.toMessageProperties(response.getProps(), response.getEnvelope(), "UTF-8");
            resendsMessage(new Message(response.getBody(), messageProps));
            channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
            } else {
            channel.basicReject(response.getEnvelope().getDeliveryTag(), false);
            }


            the issue in the update part of my question was in last line:



            channel.basicGet(queueName, true);


            the boolean indicates if the message should be requeued or not: if not requeued, it goes to exchange letter and increments count x-death header, as expected. Boolean updated to false fixed the issue.






            share|improve this answer




























              0














              I could use the channel basic methods:



              GetResponse response = channel.basicGet(queueName, false);
              Long messageXdeathCount = 0L;
              if(null != response.getProps() && null != response.getProps().getHeaders()) {
              List<Map<String, ?>> xdeathHeader =
              (List<Map<String, ?>>) response.getProps().getHeaders().get("x-death");
              if(null != xdeathHeader && null != xdeathHeader.get(0)) {
              for (Map<String, ?> map : xdeathHeader) {
              Long count = (Long) map.get("count");
              messageXdeathCount += count == null ? 0L : count;
              }
              }
              }
              if (messageXdeathCount >= 5) {
              MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();
              MessageProperties messageProps = messagePropertiesConverter.toMessageProperties(response.getProps(), response.getEnvelope(), "UTF-8");
              resendsMessage(new Message(response.getBody(), messageProps));
              channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
              } else {
              channel.basicReject(response.getEnvelope().getDeliveryTag(), false);
              }


              the issue in the update part of my question was in last line:



              channel.basicGet(queueName, true);


              the boolean indicates if the message should be requeued or not: if not requeued, it goes to exchange letter and increments count x-death header, as expected. Boolean updated to false fixed the issue.






              share|improve this answer


























                0












                0








                0







                I could use the channel basic methods:



                GetResponse response = channel.basicGet(queueName, false);
                Long messageXdeathCount = 0L;
                if(null != response.getProps() && null != response.getProps().getHeaders()) {
                List<Map<String, ?>> xdeathHeader =
                (List<Map<String, ?>>) response.getProps().getHeaders().get("x-death");
                if(null != xdeathHeader && null != xdeathHeader.get(0)) {
                for (Map<String, ?> map : xdeathHeader) {
                Long count = (Long) map.get("count");
                messageXdeathCount += count == null ? 0L : count;
                }
                }
                }
                if (messageXdeathCount >= 5) {
                MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();
                MessageProperties messageProps = messagePropertiesConverter.toMessageProperties(response.getProps(), response.getEnvelope(), "UTF-8");
                resendsMessage(new Message(response.getBody(), messageProps));
                channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
                } else {
                channel.basicReject(response.getEnvelope().getDeliveryTag(), false);
                }


                the issue in the update part of my question was in last line:



                channel.basicGet(queueName, true);


                the boolean indicates if the message should be requeued or not: if not requeued, it goes to exchange letter and increments count x-death header, as expected. Boolean updated to false fixed the issue.






                share|improve this answer













                I could use the channel basic methods:



                GetResponse response = channel.basicGet(queueName, false);
                Long messageXdeathCount = 0L;
                if(null != response.getProps() && null != response.getProps().getHeaders()) {
                List<Map<String, ?>> xdeathHeader =
                (List<Map<String, ?>>) response.getProps().getHeaders().get("x-death");
                if(null != xdeathHeader && null != xdeathHeader.get(0)) {
                for (Map<String, ?> map : xdeathHeader) {
                Long count = (Long) map.get("count");
                messageXdeathCount += count == null ? 0L : count;
                }
                }
                }
                if (messageXdeathCount >= 5) {
                MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();
                MessageProperties messageProps = messagePropertiesConverter.toMessageProperties(response.getProps(), response.getEnvelope(), "UTF-8");
                resendsMessage(new Message(response.getBody(), messageProps));
                channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
                } else {
                channel.basicReject(response.getEnvelope().getDeliveryTag(), false);
                }


                the issue in the update part of my question was in last line:



                channel.basicGet(queueName, true);


                the boolean indicates if the message should be requeued or not: if not requeued, it goes to exchange letter and increments count x-death header, as expected. Boolean updated to false fixed the issue.







                share|improve this answer












                share|improve this answer



                share|improve this answer










                answered Nov 23 '18 at 13:43









                marcessomarcesso

                124312




                124312






























                    draft saved

                    draft discarded




















































                    Thanks for contributing an answer to Stack Overflow!


                    • Please be sure to answer the question. Provide details and share your research!

                    But avoid



                    • Asking for help, clarification, or responding to other answers.

                    • Making statements based on opinion; back them up with references or personal experience.


                    To learn more, see our tips on writing great answers.




                    draft saved


                    draft discarded














                    StackExchange.ready(
                    function () {
                    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53429782%2fspring-amqp-reject-message-outside-a-listener%23new-answer', 'question_page');
                    }
                    );

                    Post as a guest















                    Required, but never shown





















































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown

































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown







                    Popular posts from this blog

                    MongoDB - Not Authorized To Execute Command

                    How to fix TextFormField cause rebuild widget in Flutter

                    in spring boot 2.1 many test slices are not allowed anymore due to multiple @BootstrapWith