Spring data couchbase, unable to delete the document by id using reactive programming












1















Recently we decided to use spring-webflux with couchbase in our project, and we need help on how to solve below use case in reactive programming




  1. Validate and save the request in Bucket1 couchbase ,(we used javax.validation, and spring ReactiveCouchbaseRepository.


  2. Call external service (we used webclient to call the API.





    • Upon successful,




      • write AUDIT document to Bucket2.

      • Fetch the document that is inserted in Bucket1 and send the same in response.

      • write Audit document to Bucket2




    • Upon failure,




      • write AUDIT document to Bucket2.

      • delete the document that is inserted in BUCKET1 and throw the exception.

      • write Audit document to Bucket2






We have written a service class, and are using two repository classes to save the documents to couchbase, and a webclient to call external service.



Our service class method business logic looks like below.



    {

//1. Validate the request and throw the error
List<String> validationMessages = handler.validate(customerRequest);
if (validationMessages != null && !validationMessages.isEmpty()) {
return Mono.error(new InvalidRequestException("Invalid Request", validationMessages, null));
}

//generate the id, set it to the request and save it to BUCKET1
String customerRequestId = sequenceGenerator.nextId(Sequence.CUSTOMER_ACCOUNT_ID);
customerRequest.setcustomerRequestId(customerRequestId);
customerRequestMono = bucket1Repository.save(customerRequest);


//2. Call the external service using webclient
externalServiceResponse = customerRequestWebClient.createCFEEnrollment(customerRequest);

//2. Subscribe to the response and and on Success write audit to BUCKET2 , and onerror write audit to BUCKET2 , and delete the inserted documet from BUCKET1
externalServiceResponse.subscribe(response -> {
//Initialise the success audit bean and save
//2.1 a) Write Audt to BUCKET2
Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(cfeAudit);
}, errorResp -> {
//2.2 a) Write Audt to BUCKET2
//Initialise the error audit bean and save
Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(cfeAudit);

//2.2 b)Delete the inserted
Mono<CustomerRequest> delCustomer = bucket1Repository.deleteByLoanAccountId(loanAccountId);
});

//Get the loan account id and return the same
finalResponse = bucket1Repository.findByCustomerId(customerId);
return Mono.when(externalServiceResponse,customerRequestMono,finalResponse).then(finalResponse)
.doOnSuccess(resp -> {
try {
finalMasterAudit.setServiceResponse(new ObjectMapper().writeValueAsString(resp));
Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(finalMasterAudit);
} catch (JsonProcessingException e) {
e.printStackTrace();
}

})
.doOnError(error -> {
try {
finalMasterAudit.setServiceResponse(new ObjectMapper().writeValueAsString(error.getMessage()));
Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(finalMasterAudit);
} catch (JsonProcessingException e) {
e.printStackTrace();
}

});
}


Couple issues we observerd are




  1. Document is not persisted until we subscribe it in some cases. Is this the expected behavior ? Do we need to subscribe for the document to be saved ?

  2. Unable to delete the document in case of error.

  3. Also I know I didnt follow pure reactive programming above. Help me with any pointers to write the code effectively in reactive.


Please help us with any pointers










share|improve this question





























    1















    Recently we decided to use spring-webflux with couchbase in our project, and we need help on how to solve below use case in reactive programming




    1. Validate and save the request in Bucket1 couchbase ,(we used javax.validation, and spring ReactiveCouchbaseRepository.


    2. Call external service (we used webclient to call the API.





      • Upon successful,




        • write AUDIT document to Bucket2.

        • Fetch the document that is inserted in Bucket1 and send the same in response.

        • write Audit document to Bucket2




      • Upon failure,




        • write AUDIT document to Bucket2.

        • delete the document that is inserted in BUCKET1 and throw the exception.

        • write Audit document to Bucket2






    We have written a service class, and are using two repository classes to save the documents to couchbase, and a webclient to call external service.



    Our service class method business logic looks like below.



        {

    //1. Validate the request and throw the error
    List<String> validationMessages = handler.validate(customerRequest);
    if (validationMessages != null && !validationMessages.isEmpty()) {
    return Mono.error(new InvalidRequestException("Invalid Request", validationMessages, null));
    }

    //generate the id, set it to the request and save it to BUCKET1
    String customerRequestId = sequenceGenerator.nextId(Sequence.CUSTOMER_ACCOUNT_ID);
    customerRequest.setcustomerRequestId(customerRequestId);
    customerRequestMono = bucket1Repository.save(customerRequest);


    //2. Call the external service using webclient
    externalServiceResponse = customerRequestWebClient.createCFEEnrollment(customerRequest);

    //2. Subscribe to the response and and on Success write audit to BUCKET2 , and onerror write audit to BUCKET2 , and delete the inserted documet from BUCKET1
    externalServiceResponse.subscribe(response -> {
    //Initialise the success audit bean and save
    //2.1 a) Write Audt to BUCKET2
    Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(cfeAudit);
    }, errorResp -> {
    //2.2 a) Write Audt to BUCKET2
    //Initialise the error audit bean and save
    Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(cfeAudit);

    //2.2 b)Delete the inserted
    Mono<CustomerRequest> delCustomer = bucket1Repository.deleteByLoanAccountId(loanAccountId);
    });

    //Get the loan account id and return the same
    finalResponse = bucket1Repository.findByCustomerId(customerId);
    return Mono.when(externalServiceResponse,customerRequestMono,finalResponse).then(finalResponse)
    .doOnSuccess(resp -> {
    try {
    finalMasterAudit.setServiceResponse(new ObjectMapper().writeValueAsString(resp));
    Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(finalMasterAudit);
    } catch (JsonProcessingException e) {
    e.printStackTrace();
    }

    })
    .doOnError(error -> {
    try {
    finalMasterAudit.setServiceResponse(new ObjectMapper().writeValueAsString(error.getMessage()));
    Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(finalMasterAudit);
    } catch (JsonProcessingException e) {
    e.printStackTrace();
    }

    });
    }


    Couple issues we observerd are




    1. Document is not persisted until we subscribe it in some cases. Is this the expected behavior ? Do we need to subscribe for the document to be saved ?

    2. Unable to delete the document in case of error.

    3. Also I know I didnt follow pure reactive programming above. Help me with any pointers to write the code effectively in reactive.


    Please help us with any pointers










    share|improve this question



























      1












      1








      1








      Recently we decided to use spring-webflux with couchbase in our project, and we need help on how to solve below use case in reactive programming




      1. Validate and save the request in Bucket1 couchbase ,(we used javax.validation, and spring ReactiveCouchbaseRepository.


      2. Call external service (we used webclient to call the API.





        • Upon successful,




          • write AUDIT document to Bucket2.

          • Fetch the document that is inserted in Bucket1 and send the same in response.

          • write Audit document to Bucket2




        • Upon failure,




          • write AUDIT document to Bucket2.

          • delete the document that is inserted in BUCKET1 and throw the exception.

          • write Audit document to Bucket2






      We have written a service class, and are using two repository classes to save the documents to couchbase, and a webclient to call external service.



      Our service class method business logic looks like below.



          {

      //1. Validate the request and throw the error
      List<String> validationMessages = handler.validate(customerRequest);
      if (validationMessages != null && !validationMessages.isEmpty()) {
      return Mono.error(new InvalidRequestException("Invalid Request", validationMessages, null));
      }

      //generate the id, set it to the request and save it to BUCKET1
      String customerRequestId = sequenceGenerator.nextId(Sequence.CUSTOMER_ACCOUNT_ID);
      customerRequest.setcustomerRequestId(customerRequestId);
      customerRequestMono = bucket1Repository.save(customerRequest);


      //2. Call the external service using webclient
      externalServiceResponse = customerRequestWebClient.createCFEEnrollment(customerRequest);

      //2. Subscribe to the response and and on Success write audit to BUCKET2 , and onerror write audit to BUCKET2 , and delete the inserted documet from BUCKET1
      externalServiceResponse.subscribe(response -> {
      //Initialise the success audit bean and save
      //2.1 a) Write Audt to BUCKET2
      Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(cfeAudit);
      }, errorResp -> {
      //2.2 a) Write Audt to BUCKET2
      //Initialise the error audit bean and save
      Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(cfeAudit);

      //2.2 b)Delete the inserted
      Mono<CustomerRequest> delCustomer = bucket1Repository.deleteByLoanAccountId(loanAccountId);
      });

      //Get the loan account id and return the same
      finalResponse = bucket1Repository.findByCustomerId(customerId);
      return Mono.when(externalServiceResponse,customerRequestMono,finalResponse).then(finalResponse)
      .doOnSuccess(resp -> {
      try {
      finalMasterAudit.setServiceResponse(new ObjectMapper().writeValueAsString(resp));
      Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(finalMasterAudit);
      } catch (JsonProcessingException e) {
      e.printStackTrace();
      }

      })
      .doOnError(error -> {
      try {
      finalMasterAudit.setServiceResponse(new ObjectMapper().writeValueAsString(error.getMessage()));
      Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(finalMasterAudit);
      } catch (JsonProcessingException e) {
      e.printStackTrace();
      }

      });
      }


      Couple issues we observerd are




      1. Document is not persisted until we subscribe it in some cases. Is this the expected behavior ? Do we need to subscribe for the document to be saved ?

      2. Unable to delete the document in case of error.

      3. Also I know I didnt follow pure reactive programming above. Help me with any pointers to write the code effectively in reactive.


      Please help us with any pointers










      share|improve this question
















      Recently we decided to use spring-webflux with couchbase in our project, and we need help on how to solve below use case in reactive programming




      1. Validate and save the request in Bucket1 couchbase ,(we used javax.validation, and spring ReactiveCouchbaseRepository.


      2. Call external service (we used webclient to call the API.





        • Upon successful,




          • write AUDIT document to Bucket2.

          • Fetch the document that is inserted in Bucket1 and send the same in response.

          • write Audit document to Bucket2




        • Upon failure,




          • write AUDIT document to Bucket2.

          • delete the document that is inserted in BUCKET1 and throw the exception.

          • write Audit document to Bucket2






      We have written a service class, and are using two repository classes to save the documents to couchbase, and a webclient to call external service.



      Our service class method business logic looks like below.



          {

      //1. Validate the request and throw the error
      List<String> validationMessages = handler.validate(customerRequest);
      if (validationMessages != null && !validationMessages.isEmpty()) {
      return Mono.error(new InvalidRequestException("Invalid Request", validationMessages, null));
      }

      //generate the id, set it to the request and save it to BUCKET1
      String customerRequestId = sequenceGenerator.nextId(Sequence.CUSTOMER_ACCOUNT_ID);
      customerRequest.setcustomerRequestId(customerRequestId);
      customerRequestMono = bucket1Repository.save(customerRequest);


      //2. Call the external service using webclient
      externalServiceResponse = customerRequestWebClient.createCFEEnrollment(customerRequest);

      //2. Subscribe to the response and and on Success write audit to BUCKET2 , and onerror write audit to BUCKET2 , and delete the inserted documet from BUCKET1
      externalServiceResponse.subscribe(response -> {
      //Initialise the success audit bean and save
      //2.1 a) Write Audt to BUCKET2
      Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(cfeAudit);
      }, errorResp -> {
      //2.2 a) Write Audt to BUCKET2
      //Initialise the error audit bean and save
      Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(cfeAudit);

      //2.2 b)Delete the inserted
      Mono<CustomerRequest> delCustomer = bucket1Repository.deleteByLoanAccountId(loanAccountId);
      });

      //Get the loan account id and return the same
      finalResponse = bucket1Repository.findByCustomerId(customerId);
      return Mono.when(externalServiceResponse,customerRequestMono,finalResponse).then(finalResponse)
      .doOnSuccess(resp -> {
      try {
      finalMasterAudit.setServiceResponse(new ObjectMapper().writeValueAsString(resp));
      Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(finalMasterAudit);
      } catch (JsonProcessingException e) {
      e.printStackTrace();
      }

      })
      .doOnError(error -> {
      try {
      finalMasterAudit.setServiceResponse(new ObjectMapper().writeValueAsString(error.getMessage()));
      Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(finalMasterAudit);
      } catch (JsonProcessingException e) {
      e.printStackTrace();
      }

      });
      }


      Couple issues we observerd are




      1. Document is not persisted until we subscribe it in some cases. Is this the expected behavior ? Do we need to subscribe for the document to be saved ?

      2. Unable to delete the document in case of error.

      3. Also I know I didnt follow pure reactive programming above. Help me with any pointers to write the code effectively in reactive.


      Please help us with any pointers







      reactive-programming couchbase spring-webflux






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Jan 3 at 0:58







      ytram99

















      asked Jan 3 at 0:47









      ytram99ytram99

      215




      215
























          1 Answer
          1






          active

          oldest

          votes


















          3














          Taking a chunk of the code above:



          externalServiceResponse.subscribe(response -> {
          Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(cfeAudit);
          }, errorResp -> {
          Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(cfeAudit);
          Mono<CustomerRequest> delCustomer = bucket1Repository.deleteByLoanAccountId(loanAccountId);
          });


          There are two reactive programming issues with it:




          1. You are creating Monos that you don't subscribe to, so they will never execute.

          2. You shouldn't be creating them in subscribe anyway, but instead using flatMap or onErrorResume to chain them, pre-subscribe.


          Something like this should do the trick (forgive me, I haven't tested it so you may need to make some tweaks):



          externalServiceResponse
          // If something goes wrong then delete the inserted doc
          .onErrorResume(err -> bucket1Repository.deleteByLoanAccountId(loanAccountId))

          // Always want to save the audit regardless
          .then(bucket2Repository.save(cfeAudit))

          .subscribe();


          There are other problems to fix in the code, e.g. it looks like you want to flatMap multiple Monos together before subscribing to the final Mono, but hopefully this gets you started.






          share|improve this answer


























            Your Answer






            StackExchange.ifUsing("editor", function () {
            StackExchange.using("externalEditor", function () {
            StackExchange.using("snippets", function () {
            StackExchange.snippets.init();
            });
            });
            }, "code-snippets");

            StackExchange.ready(function() {
            var channelOptions = {
            tags: "".split(" "),
            id: "1"
            };
            initTagRenderer("".split(" "), "".split(" "), channelOptions);

            StackExchange.using("externalEditor", function() {
            // Have to fire editor after snippets, if snippets enabled
            if (StackExchange.settings.snippets.snippetsEnabled) {
            StackExchange.using("snippets", function() {
            createEditor();
            });
            }
            else {
            createEditor();
            }
            });

            function createEditor() {
            StackExchange.prepareEditor({
            heartbeatType: 'answer',
            autoActivateHeartbeat: false,
            convertImagesToLinks: true,
            noModals: true,
            showLowRepImageUploadWarning: true,
            reputationToPostImages: 10,
            bindNavPrevention: true,
            postfix: "",
            imageUploader: {
            brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
            contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
            allowUrls: true
            },
            onDemand: true,
            discardSelector: ".discard-answer"
            ,immediatelyShowMarkdownHelp:true
            });


            }
            });














            draft saved

            draft discarded


















            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54015026%2fspring-data-couchbase-unable-to-delete-the-document-by-id-using-reactive-progra%23new-answer', 'question_page');
            }
            );

            Post as a guest















            Required, but never shown

























            1 Answer
            1






            active

            oldest

            votes








            1 Answer
            1






            active

            oldest

            votes









            active

            oldest

            votes






            active

            oldest

            votes









            3














            Taking a chunk of the code above:



            externalServiceResponse.subscribe(response -> {
            Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(cfeAudit);
            }, errorResp -> {
            Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(cfeAudit);
            Mono<CustomerRequest> delCustomer = bucket1Repository.deleteByLoanAccountId(loanAccountId);
            });


            There are two reactive programming issues with it:




            1. You are creating Monos that you don't subscribe to, so they will never execute.

            2. You shouldn't be creating them in subscribe anyway, but instead using flatMap or onErrorResume to chain them, pre-subscribe.


            Something like this should do the trick (forgive me, I haven't tested it so you may need to make some tweaks):



            externalServiceResponse
            // If something goes wrong then delete the inserted doc
            .onErrorResume(err -> bucket1Repository.deleteByLoanAccountId(loanAccountId))

            // Always want to save the audit regardless
            .then(bucket2Repository.save(cfeAudit))

            .subscribe();


            There are other problems to fix in the code, e.g. it looks like you want to flatMap multiple Monos together before subscribing to the final Mono, but hopefully this gets you started.






            share|improve this answer






























              3














              Taking a chunk of the code above:



              externalServiceResponse.subscribe(response -> {
              Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(cfeAudit);
              }, errorResp -> {
              Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(cfeAudit);
              Mono<CustomerRequest> delCustomer = bucket1Repository.deleteByLoanAccountId(loanAccountId);
              });


              There are two reactive programming issues with it:




              1. You are creating Monos that you don't subscribe to, so they will never execute.

              2. You shouldn't be creating them in subscribe anyway, but instead using flatMap or onErrorResume to chain them, pre-subscribe.


              Something like this should do the trick (forgive me, I haven't tested it so you may need to make some tweaks):



              externalServiceResponse
              // If something goes wrong then delete the inserted doc
              .onErrorResume(err -> bucket1Repository.deleteByLoanAccountId(loanAccountId))

              // Always want to save the audit regardless
              .then(bucket2Repository.save(cfeAudit))

              .subscribe();


              There are other problems to fix in the code, e.g. it looks like you want to flatMap multiple Monos together before subscribing to the final Mono, but hopefully this gets you started.






              share|improve this answer




























                3












                3








                3







                Taking a chunk of the code above:



                externalServiceResponse.subscribe(response -> {
                Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(cfeAudit);
                }, errorResp -> {
                Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(cfeAudit);
                Mono<CustomerRequest> delCustomer = bucket1Repository.deleteByLoanAccountId(loanAccountId);
                });


                There are two reactive programming issues with it:




                1. You are creating Monos that you don't subscribe to, so they will never execute.

                2. You shouldn't be creating them in subscribe anyway, but instead using flatMap or onErrorResume to chain them, pre-subscribe.


                Something like this should do the trick (forgive me, I haven't tested it so you may need to make some tweaks):



                externalServiceResponse
                // If something goes wrong then delete the inserted doc
                .onErrorResume(err -> bucket1Repository.deleteByLoanAccountId(loanAccountId))

                // Always want to save the audit regardless
                .then(bucket2Repository.save(cfeAudit))

                .subscribe();


                There are other problems to fix in the code, e.g. it looks like you want to flatMap multiple Monos together before subscribing to the final Mono, but hopefully this gets you started.






                share|improve this answer















                Taking a chunk of the code above:



                externalServiceResponse.subscribe(response -> {
                Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(cfeAudit);
                }, errorResp -> {
                Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(cfeAudit);
                Mono<CustomerRequest> delCustomer = bucket1Repository.deleteByLoanAccountId(loanAccountId);
                });


                There are two reactive programming issues with it:




                1. You are creating Monos that you don't subscribe to, so they will never execute.

                2. You shouldn't be creating them in subscribe anyway, but instead using flatMap or onErrorResume to chain them, pre-subscribe.


                Something like this should do the trick (forgive me, I haven't tested it so you may need to make some tweaks):



                externalServiceResponse
                // If something goes wrong then delete the inserted doc
                .onErrorResume(err -> bucket1Repository.deleteByLoanAccountId(loanAccountId))

                // Always want to save the audit regardless
                .then(bucket2Repository.save(cfeAudit))

                .subscribe();


                There are other problems to fix in the code, e.g. it looks like you want to flatMap multiple Monos together before subscribing to the final Mono, but hopefully this gets you started.







                share|improve this answer














                share|improve this answer



                share|improve this answer








                edited Jan 3 at 15:57

























                answered Jan 3 at 12:37









                Graham PopleGraham Pople

                15615




                15615
































                    draft saved

                    draft discarded




















































                    Thanks for contributing an answer to Stack Overflow!


                    • Please be sure to answer the question. Provide details and share your research!

                    But avoid



                    • Asking for help, clarification, or responding to other answers.

                    • Making statements based on opinion; back them up with references or personal experience.


                    To learn more, see our tips on writing great answers.




                    draft saved


                    draft discarded














                    StackExchange.ready(
                    function () {
                    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54015026%2fspring-data-couchbase-unable-to-delete-the-document-by-id-using-reactive-progra%23new-answer', 'question_page');
                    }
                    );

                    Post as a guest















                    Required, but never shown





















































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown

































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown







                    Popular posts from this blog

                    android studio warns about leanback feature tag usage required on manifest while using Unity exported app?

                    SQL update select statement

                    WPF add header to Image with URL pettitions [duplicate]