Spring data couchbase, unable to delete the document by id using reactive programming
Recently we decided to use spring-webflux with couchbase in our project, and we need help on how to solve below use case in reactive programming
- Validate and save the request in Bucket1 couchbase ,(we used javax.validation, and spring ReactiveCouchbaseRepository.
Call external service (we used webclient to call the API.
Upon successful,
- write AUDIT document to Bucket2.
- Fetch the document that is inserted in Bucket1 and send the same in response.
- write Audit document to Bucket2
Upon failure,
- write AUDIT document to Bucket2.
- delete the document that is inserted in BUCKET1 and throw the exception.
- write Audit document to Bucket2
We have written a service class, and are using two repository classes to save the documents to couchbase, and a webclient to call external service.
Our service class method business logic looks like below.
{
//1. Validate the request and throw the error
List<String> validationMessages = handler.validate(customerRequest);
if (validationMessages != null && !validationMessages.isEmpty()) {
return Mono.error(new InvalidRequestException("Invalid Request", validationMessages, null));
}
//generate the id, set it to the request and save it to BUCKET1
String customerRequestId = sequenceGenerator.nextId(Sequence.CUSTOMER_ACCOUNT_ID);
customerRequest.setcustomerRequestId(customerRequestId);
customerRequestMono = bucket1Repository.save(customerRequest);
//2. Call the external service using webclient
externalServiceResponse = customerRequestWebClient.createCFEEnrollment(customerRequest);
//2. Subscribe to the response and and on Success write audit to BUCKET2 , and onerror write audit to BUCKET2 , and delete the inserted documet from BUCKET1
externalServiceResponse.subscribe(response -> {
//Initialise the success audit bean and save
//2.1 a) Write Audt to BUCKET2
Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(cfeAudit);
}, errorResp -> {
//2.2 a) Write Audt to BUCKET2
//Initialise the error audit bean and save
Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(cfeAudit);
//2.2 b)Delete the inserted
Mono<CustomerRequest> delCustomer = bucket1Repository.deleteByLoanAccountId(loanAccountId);
});
//Get the loan account id and return the same
finalResponse = bucket1Repository.findByCustomerId(customerId);
return Mono.when(externalServiceResponse,customerRequestMono,finalResponse).then(finalResponse)
.doOnSuccess(resp -> {
try {
finalMasterAudit.setServiceResponse(new ObjectMapper().writeValueAsString(resp));
Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(finalMasterAudit);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
})
.doOnError(error -> {
try {
finalMasterAudit.setServiceResponse(new ObjectMapper().writeValueAsString(error.getMessage()));
Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(finalMasterAudit);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
});
}
Couple issues we observerd are
- Document is not persisted until we subscribe it in some cases. Is this the expected behavior ? Do we need to subscribe for the document to be saved ?
- Unable to delete the document in case of error.
- Also I know I didnt follow pure reactive programming above. Help me with any pointers to write the code effectively in reactive.
Please help us with any pointers
reactive-programming couchbase spring-webflux
add a comment |
Recently we decided to use spring-webflux with couchbase in our project, and we need help on how to solve below use case in reactive programming
- Validate and save the request in Bucket1 couchbase ,(we used javax.validation, and spring ReactiveCouchbaseRepository.
Call external service (we used webclient to call the API.
Upon successful,
- write AUDIT document to Bucket2.
- Fetch the document that is inserted in Bucket1 and send the same in response.
- write Audit document to Bucket2
Upon failure,
- write AUDIT document to Bucket2.
- delete the document that is inserted in BUCKET1 and throw the exception.
- write Audit document to Bucket2
We have written a service class, and are using two repository classes to save the documents to couchbase, and a webclient to call external service.
Our service class method business logic looks like below.
{
//1. Validate the request and throw the error
List<String> validationMessages = handler.validate(customerRequest);
if (validationMessages != null && !validationMessages.isEmpty()) {
return Mono.error(new InvalidRequestException("Invalid Request", validationMessages, null));
}
//generate the id, set it to the request and save it to BUCKET1
String customerRequestId = sequenceGenerator.nextId(Sequence.CUSTOMER_ACCOUNT_ID);
customerRequest.setcustomerRequestId(customerRequestId);
customerRequestMono = bucket1Repository.save(customerRequest);
//2. Call the external service using webclient
externalServiceResponse = customerRequestWebClient.createCFEEnrollment(customerRequest);
//2. Subscribe to the response and and on Success write audit to BUCKET2 , and onerror write audit to BUCKET2 , and delete the inserted documet from BUCKET1
externalServiceResponse.subscribe(response -> {
//Initialise the success audit bean and save
//2.1 a) Write Audt to BUCKET2
Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(cfeAudit);
}, errorResp -> {
//2.2 a) Write Audt to BUCKET2
//Initialise the error audit bean and save
Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(cfeAudit);
//2.2 b)Delete the inserted
Mono<CustomerRequest> delCustomer = bucket1Repository.deleteByLoanAccountId(loanAccountId);
});
//Get the loan account id and return the same
finalResponse = bucket1Repository.findByCustomerId(customerId);
return Mono.when(externalServiceResponse,customerRequestMono,finalResponse).then(finalResponse)
.doOnSuccess(resp -> {
try {
finalMasterAudit.setServiceResponse(new ObjectMapper().writeValueAsString(resp));
Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(finalMasterAudit);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
})
.doOnError(error -> {
try {
finalMasterAudit.setServiceResponse(new ObjectMapper().writeValueAsString(error.getMessage()));
Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(finalMasterAudit);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
});
}
Couple issues we observerd are
- Document is not persisted until we subscribe it in some cases. Is this the expected behavior ? Do we need to subscribe for the document to be saved ?
- Unable to delete the document in case of error.
- Also I know I didnt follow pure reactive programming above. Help me with any pointers to write the code effectively in reactive.
Please help us with any pointers
reactive-programming couchbase spring-webflux
add a comment |
Recently we decided to use spring-webflux with couchbase in our project, and we need help on how to solve below use case in reactive programming
- Validate and save the request in Bucket1 couchbase ,(we used javax.validation, and spring ReactiveCouchbaseRepository.
Call external service (we used webclient to call the API.
Upon successful,
- write AUDIT document to Bucket2.
- Fetch the document that is inserted in Bucket1 and send the same in response.
- write Audit document to Bucket2
Upon failure,
- write AUDIT document to Bucket2.
- delete the document that is inserted in BUCKET1 and throw the exception.
- write Audit document to Bucket2
We have written a service class, and are using two repository classes to save the documents to couchbase, and a webclient to call external service.
Our service class method business logic looks like below.
{
//1. Validate the request and throw the error
List<String> validationMessages = handler.validate(customerRequest);
if (validationMessages != null && !validationMessages.isEmpty()) {
return Mono.error(new InvalidRequestException("Invalid Request", validationMessages, null));
}
//generate the id, set it to the request and save it to BUCKET1
String customerRequestId = sequenceGenerator.nextId(Sequence.CUSTOMER_ACCOUNT_ID);
customerRequest.setcustomerRequestId(customerRequestId);
customerRequestMono = bucket1Repository.save(customerRequest);
//2. Call the external service using webclient
externalServiceResponse = customerRequestWebClient.createCFEEnrollment(customerRequest);
//2. Subscribe to the response and and on Success write audit to BUCKET2 , and onerror write audit to BUCKET2 , and delete the inserted documet from BUCKET1
externalServiceResponse.subscribe(response -> {
//Initialise the success audit bean and save
//2.1 a) Write Audt to BUCKET2
Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(cfeAudit);
}, errorResp -> {
//2.2 a) Write Audt to BUCKET2
//Initialise the error audit bean and save
Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(cfeAudit);
//2.2 b)Delete the inserted
Mono<CustomerRequest> delCustomer = bucket1Repository.deleteByLoanAccountId(loanAccountId);
});
//Get the loan account id and return the same
finalResponse = bucket1Repository.findByCustomerId(customerId);
return Mono.when(externalServiceResponse,customerRequestMono,finalResponse).then(finalResponse)
.doOnSuccess(resp -> {
try {
finalMasterAudit.setServiceResponse(new ObjectMapper().writeValueAsString(resp));
Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(finalMasterAudit);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
})
.doOnError(error -> {
try {
finalMasterAudit.setServiceResponse(new ObjectMapper().writeValueAsString(error.getMessage()));
Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(finalMasterAudit);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
});
}
Couple issues we observerd are
- Document is not persisted until we subscribe it in some cases. Is this the expected behavior ? Do we need to subscribe for the document to be saved ?
- Unable to delete the document in case of error.
- Also I know I didnt follow pure reactive programming above. Help me with any pointers to write the code effectively in reactive.
Please help us with any pointers
reactive-programming couchbase spring-webflux
Recently we decided to use spring-webflux with couchbase in our project, and we need help on how to solve below use case in reactive programming
- Validate and save the request in Bucket1 couchbase ,(we used javax.validation, and spring ReactiveCouchbaseRepository.
Call external service (we used webclient to call the API.
Upon successful,
- write AUDIT document to Bucket2.
- Fetch the document that is inserted in Bucket1 and send the same in response.
- write Audit document to Bucket2
Upon failure,
- write AUDIT document to Bucket2.
- delete the document that is inserted in BUCKET1 and throw the exception.
- write Audit document to Bucket2
We have written a service class, and are using two repository classes to save the documents to couchbase, and a webclient to call external service.
Our service class method business logic looks like below.
{
//1. Validate the request and throw the error
List<String> validationMessages = handler.validate(customerRequest);
if (validationMessages != null && !validationMessages.isEmpty()) {
return Mono.error(new InvalidRequestException("Invalid Request", validationMessages, null));
}
//generate the id, set it to the request and save it to BUCKET1
String customerRequestId = sequenceGenerator.nextId(Sequence.CUSTOMER_ACCOUNT_ID);
customerRequest.setcustomerRequestId(customerRequestId);
customerRequestMono = bucket1Repository.save(customerRequest);
//2. Call the external service using webclient
externalServiceResponse = customerRequestWebClient.createCFEEnrollment(customerRequest);
//2. Subscribe to the response and and on Success write audit to BUCKET2 , and onerror write audit to BUCKET2 , and delete the inserted documet from BUCKET1
externalServiceResponse.subscribe(response -> {
//Initialise the success audit bean and save
//2.1 a) Write Audt to BUCKET2
Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(cfeAudit);
}, errorResp -> {
//2.2 a) Write Audt to BUCKET2
//Initialise the error audit bean and save
Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(cfeAudit);
//2.2 b)Delete the inserted
Mono<CustomerRequest> delCustomer = bucket1Repository.deleteByLoanAccountId(loanAccountId);
});
//Get the loan account id and return the same
finalResponse = bucket1Repository.findByCustomerId(customerId);
return Mono.when(externalServiceResponse,customerRequestMono,finalResponse).then(finalResponse)
.doOnSuccess(resp -> {
try {
finalMasterAudit.setServiceResponse(new ObjectMapper().writeValueAsString(resp));
Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(finalMasterAudit);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
})
.doOnError(error -> {
try {
finalMasterAudit.setServiceResponse(new ObjectMapper().writeValueAsString(error.getMessage()));
Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(finalMasterAudit);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
});
}
Couple issues we observerd are
- Document is not persisted until we subscribe it in some cases. Is this the expected behavior ? Do we need to subscribe for the document to be saved ?
- Unable to delete the document in case of error.
- Also I know I didnt follow pure reactive programming above. Help me with any pointers to write the code effectively in reactive.
Please help us with any pointers
reactive-programming couchbase spring-webflux
reactive-programming couchbase spring-webflux
edited Jan 3 at 0:58
ytram99
asked Jan 3 at 0:47
ytram99ytram99
215
215
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
Taking a chunk of the code above:
externalServiceResponse.subscribe(response -> {
Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(cfeAudit);
}, errorResp -> {
Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(cfeAudit);
Mono<CustomerRequest> delCustomer = bucket1Repository.deleteByLoanAccountId(loanAccountId);
});
There are two reactive programming issues with it:
- You are creating Monos that you don't subscribe to, so they will never execute.
- You shouldn't be creating them in subscribe anyway, but instead using flatMap or onErrorResume to chain them, pre-subscribe.
Something like this should do the trick (forgive me, I haven't tested it so you may need to make some tweaks):
externalServiceResponse
// If something goes wrong then delete the inserted doc
.onErrorResume(err -> bucket1Repository.deleteByLoanAccountId(loanAccountId))
// Always want to save the audit regardless
.then(bucket2Repository.save(cfeAudit))
.subscribe();
There are other problems to fix in the code, e.g. it looks like you want to flatMap multiple Monos together before subscribing to the final Mono, but hopefully this gets you started.
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54015026%2fspring-data-couchbase-unable-to-delete-the-document-by-id-using-reactive-progra%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
Taking a chunk of the code above:
externalServiceResponse.subscribe(response -> {
Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(cfeAudit);
}, errorResp -> {
Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(cfeAudit);
Mono<CustomerRequest> delCustomer = bucket1Repository.deleteByLoanAccountId(loanAccountId);
});
There are two reactive programming issues with it:
- You are creating Monos that you don't subscribe to, so they will never execute.
- You shouldn't be creating them in subscribe anyway, but instead using flatMap or onErrorResume to chain them, pre-subscribe.
Something like this should do the trick (forgive me, I haven't tested it so you may need to make some tweaks):
externalServiceResponse
// If something goes wrong then delete the inserted doc
.onErrorResume(err -> bucket1Repository.deleteByLoanAccountId(loanAccountId))
// Always want to save the audit regardless
.then(bucket2Repository.save(cfeAudit))
.subscribe();
There are other problems to fix in the code, e.g. it looks like you want to flatMap multiple Monos together before subscribing to the final Mono, but hopefully this gets you started.
add a comment |
Taking a chunk of the code above:
externalServiceResponse.subscribe(response -> {
Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(cfeAudit);
}, errorResp -> {
Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(cfeAudit);
Mono<CustomerRequest> delCustomer = bucket1Repository.deleteByLoanAccountId(loanAccountId);
});
There are two reactive programming issues with it:
- You are creating Monos that you don't subscribe to, so they will never execute.
- You shouldn't be creating them in subscribe anyway, but instead using flatMap or onErrorResume to chain them, pre-subscribe.
Something like this should do the trick (forgive me, I haven't tested it so you may need to make some tweaks):
externalServiceResponse
// If something goes wrong then delete the inserted doc
.onErrorResume(err -> bucket1Repository.deleteByLoanAccountId(loanAccountId))
// Always want to save the audit regardless
.then(bucket2Repository.save(cfeAudit))
.subscribe();
There are other problems to fix in the code, e.g. it looks like you want to flatMap multiple Monos together before subscribing to the final Mono, but hopefully this gets you started.
add a comment |
Taking a chunk of the code above:
externalServiceResponse.subscribe(response -> {
Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(cfeAudit);
}, errorResp -> {
Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(cfeAudit);
Mono<CustomerRequest> delCustomer = bucket1Repository.deleteByLoanAccountId(loanAccountId);
});
There are two reactive programming issues with it:
- You are creating Monos that you don't subscribe to, so they will never execute.
- You shouldn't be creating them in subscribe anyway, but instead using flatMap or onErrorResume to chain them, pre-subscribe.
Something like this should do the trick (forgive me, I haven't tested it so you may need to make some tweaks):
externalServiceResponse
// If something goes wrong then delete the inserted doc
.onErrorResume(err -> bucket1Repository.deleteByLoanAccountId(loanAccountId))
// Always want to save the audit regardless
.then(bucket2Repository.save(cfeAudit))
.subscribe();
There are other problems to fix in the code, e.g. it looks like you want to flatMap multiple Monos together before subscribing to the final Mono, but hopefully this gets you started.
Taking a chunk of the code above:
externalServiceResponse.subscribe(response -> {
Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(cfeAudit);
}, errorResp -> {
Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(cfeAudit);
Mono<CustomerRequest> delCustomer = bucket1Repository.deleteByLoanAccountId(loanAccountId);
});
There are two reactive programming issues with it:
- You are creating Monos that you don't subscribe to, so they will never execute.
- You shouldn't be creating them in subscribe anyway, but instead using flatMap or onErrorResume to chain them, pre-subscribe.
Something like this should do the trick (forgive me, I haven't tested it so you may need to make some tweaks):
externalServiceResponse
// If something goes wrong then delete the inserted doc
.onErrorResume(err -> bucket1Repository.deleteByLoanAccountId(loanAccountId))
// Always want to save the audit regardless
.then(bucket2Repository.save(cfeAudit))
.subscribe();
There are other problems to fix in the code, e.g. it looks like you want to flatMap multiple Monos together before subscribing to the final Mono, but hopefully this gets you started.
edited Jan 3 at 15:57
answered Jan 3 at 12:37
Graham PopleGraham Pople
15615
15615
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54015026%2fspring-data-couchbase-unable-to-delete-the-document-by-id-using-reactive-progra%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown