Package com.veeva.vault.sdk.api.queue
package com.veeva.vault.sdk.api.queue
This package provides interfaces to send and receive Spark messages.
Using
The Using
A Using
A
To use Spark messages, you must first set up a Connection and Queue in Vault Admin for each Vault that needs to send or receive a message. Connections can be set up for the local Vault, with another Vault, or with an external application. Learn more about Spark Messaging in the Developer Portal.
Overview of Core Services
This package contains three primary developer entry points:QueueServicefor creating and sending messages to a queue.MessageProcessorfor implementing logic to process inbound messages.MessageDeliveryEventHandlerfor implementing custom error handling when a message fails to deliver.
Using QueueService
The QueueService is the entry point for sending messages. You use it to construct a Message,
populate it with identifiers or attributes, and place it on a pre-configured outbound queue.
Example: Sending a Document Processing Task
The following example sends a message containing a document ID and a parameter
QueueService queueService = ServiceLocator.locate(QueueService.class);
String documentId = "12345";
// Create a new message for a specific outbound queue.
// The queue name 'doc_processing_outbound__c' must be configured by an admin.
Message docTaskMessage = queueService.newMessage("doc_processing_outbound__c");
// Use attributes for key-value data.
docTaskMessage.setAttribute("documentId", documentId);
docTaskMessage.setAttribute("action", "RUN_OCR");
// Send the message asynchronously
PutMessageResponse response = queueService.putMessage(docTaskMessage);
if (response.getError() != null) {
// Handle potential errors during the send operation
LogService logService = ServiceLocator.locate(LogService.class);
logService.error("Failed to send message: {}", response.getError().getMessage());
}
Using MessageProcessor
A MessageProcessor is a class you create to handle incoming messages. It is automatically invoked by the
framework when a message arrives on an inbound queue that it's configured to listen to.
Example: Basic Usage
The following example creates a CrossLink and starts a workflow with information from a Spark message.
@MessageProcessorInfo()
public class ProcessStudy implements MessageProcessor {
public void execute(MessageContext context) {
String docId = context.getMessage().getAttribute("docId",MessageAttributeValueType.STRING);
String docName = context.getMessage().getAttribute("docName",MessageAttributeValueType.STRING);
String remoteVaultId = context.getRemoteVaultId();
HttpService httpService = ServiceLocator.locate(HttpService.class);
// When a message is received, create a CrossLink to the document in the source Vault.
HttpRequest httpRequest = httpService.newLocalHttpRequest()
.setMethod(HttpMethod.POST)
.setBodyParam("source_document_id__v",docId)
.setBodyParam("source_vault_id__v", remoteVaultId)
.setBodyParam("source_binding_rule__v", "Latest version")
.setBodyParam("name__v",docName)
.setBodyParam("type__v", "TypeX")
.setBodyParam("lifecycle__v", "General Lifecycle")
.appendPath("/api/v18.3/objects/documents");
httpService.send(httpRequest,HttpResponseBodyValueType.JSONDATA)
.onSuccess(response -> {
// After CrossLink is created, start the Approval workflow
JsonData body = response.getResponse();
String crossLinkDocId = body.getJsonObject().getValue("id", JsonValueType.NUMBER).toString();
HttpRequest startWorkflowRequest = httpService.newLocalHttpRequest()
.setMethod(HttpMethod.PUT)
.setBodyParam("Approver", RequestContext.get().getCurrentUserId())
.setBodyParam("dueDate", LocalDate.now().plusDays(30).toString())
.appendPath("/api/v18.3/objects/documents/" + crossLinkDocId + "/versions/0/1/lifecycle_actions/startApproval");
httpService.send(startWorkflowRequest,HttpResponseBodyValueType.STRING)
.onError(starkWorkflowResponse -> {
int startWorkflowResponseCode = starkWorkflowResponse.getHttpResponse().getHttpStatusCode();
ServiceLocator.locate(LogService.class).info("RESPONSE: " + responseCode);
})
.execute();
})
.onError(response -> {
int responseCode = response.getHttpResponse().getHttpStatusCode();
ServiceLocator.locate(LogService.class).info("RESPONSE: " + responseCode);
})
.execute();
}
}
Example: Set Error
The following example creates a new Product record, sets a User Exception Message record, and throws aRollbackException when the operation fails.
@MessageProcessorInfo()
public class ProcessProduct implements MessageProcessor {
@Override
public void execute(MessageContext context) {
RecordService recordService = ServiceLocator.locate(RecordService.class);
Record record = recordService.newRecord("product__v");
record.setValue("name__v", "productName");
List<BatchOperationError> errors = new ArrayList<>();
recordService.batchSaveRecords(VaultCollections.asList(record)).
onErrors(le -> errors.addAll(le)).
execute();
if (! errors.isEmpty()) {
// Create a new error record and set to the ErrorProvider
Record errRecord = recordService.newRecord("exception_message__sys");
// Below are 5 required fields
errRecord.setValue("name__v", "productCreationError");
errRecord.setValue("error_message__sys", "Product batch operation error");
errRecord.setValue("integration__sys", "V2T000000001002");
errRecord.setValue("integration_point__sys", "V2U000000001001");
errRecord.setValue("error_type__sys", Collections.singletonList("item_processing_error__sys"));
messageContext.getErrorProvider().setError(errRecord);
// Rollback the entire transaction
throw new RollbackException("TRACKED_AND_PROPAGATED_ERRORS", "Product batch operation error");
}
}
}
Using MessageDeliveryEventHandler
A MessageDeliveryEventHandler allows you to define custom logic for when a message fails to be delivered
after all system retries have been exhausted.
Example: Handle Undelivered Message
The following example resends an undelivered message, creates a new User Exception Message record, gets the error type, and sends a notification to the user or users specified in theNotificationParameters.
@MessageDeliveryEventHandlerInfo()
public class MessageDeliveryHandler implements MessageDeliveryEventHandler {
public void onError(MessageDeliveryEventHandlerContext messageDeliveryEventHandlerContext) {
// Instantiate an instance of the QueueService which is used to put messages back on the queue
QueueService queueService = ServiceLocator.locate(QueueService.class);
// Resend Message
// Get Message Context
MessageContext messageContext = messageDeliveryEventHandlerContext.getMessageContext();
// Get Queue Name and original Message
String queueName = messageContext.getRemoteQueueName();
Message originalMessage = messageContext.getMessage();
PutMessageResponse response = queueService.putMessage(originalMessage);
// Instantiate RecordService
RecordService recordService = ServiceLocator.locate(RecordService.class);
MessageDeliveryError messageDeliveryError = messageDeliveryEventHandlerContext.getMessageDeliveryError();
MessageDeliveryErrorType messageDeliveryErrorType = messageDeliveryError.getType();
if(messageDeliveryErrorType == MessageDeliveryErrorType.CONFIG_ERROR) {
// Create User Exception Record
Record record = recordService.newRecord("exception_message__sys");
// Below are 5 required fields
record.setValue("name__v", "Spark Integration Configuration Error");
record.setValue("error_message__sys", "Integration Config error");
record.setValue("integration__sys", "V1P000000001001");
record.setValue("integration_point__sys", "V1Q000000001001");
record.setValue("error_type__sys", VaultCollections.asList("item_processing_error__sys"));
recordService.batchSaveRecords(VaultCollections.asList(record)).rollbackOnErrors().execute();
} else if (messageDeliveryErrorType == MessageDeliveryErrorType.CONNECTION_FAILED) {
// Notify Sysadmins
GroupService groupService = ServiceLocator.locate(GroupService.class);
List groupNames = VaultCollections.newList();
groupNames.add("system_administrators__v");
GetGroupsResponse groupResponse = groupService.getGroupsByNames(groupNames);
Group group = groupResponse.getGroupByName("system_administrators__v");
List groups = VaultCollections.newList();
groups.add(group);
//NotificationService
NotificationService notificationService = ServiceLocator.locate(NotificationService.class);
//NotificationParameters
NotificationParameters notificationParameters = notificationService.newNotificationParameters();
notificationParameters.setRecipientsByGroups(groups);
//NotificationTemplate
NotificationTemplate template = notificationService.newNotificationTemplate()
.setTemplateName("spark_integration_error_notification_template__c")
.setTokenValue("errorType", "Connection Error");
//Send Notification with a NotificationTemplate
notificationService.send(notificationParameters, template);
}
}
}
-
ClassDescriptionProvides a method to set an error record in Vault.Provides methods to update the content of a Spark message.Type of data supported by the Vault Java SDK for
Messageattributes.Contains contextual information about a Message.Provides methods to retrieve the failure type and error message of the failed Spark message delivery.Error types that may occur when sending a Spark message.Provides methods to override default behavior for handling failed Spark message deliveries.Retrieves the context and delivery error of the failed Spark message delivery.Indicates a class is aMessageDeliveryEventHandler.Retrieves the context and sets tokens for the outbound Spark message.Invoked to process a message received by an inbound queue.Indicates a class is aMessageProcessor.The response returned byQueueService.putMessage(Message).Represents the result for a specific connection duringQueueService.putMessage(Message).Provides methods to create and put messages in a queue.