Package com.veeva.vault.sdk.api.queue


package com.veeva.vault.sdk.api.queue
This package provides interfaces to send and receive Spark messages.

To use Spark messages, you must first set up a Connection and Queue in Vault Admin for each Vault that needs to send or receive a message. Connections can be set up for the local Vault, with another Vault, or with an external application. Learn more about Spark Messaging in the Developer Portal.

Overview of Core Services

This package contains three primary developer entry points:

Using QueueService

The QueueService is the entry point for sending messages. You use it to construct a Message, populate it with identifiers or attributes, and place it on a pre-configured outbound queue.

Example: Sending a Document Processing Task

The following example sends a message containing a document ID and a parameter
 
 QueueService queueService = ServiceLocator.locate(QueueService.class);
 String documentId = "12345";

 // Create a new message for a specific outbound queue.
 // The queue name 'doc_processing_outbound__c' must be configured by an admin.
 Message docTaskMessage = queueService.newMessage("doc_processing_outbound__c");

 // Use attributes for key-value data.
 docTaskMessage.setAttribute("documentId", documentId);
 docTaskMessage.setAttribute("action", "RUN_OCR");

 // Send the message asynchronously
 PutMessageResponse response = queueService.putMessage(docTaskMessage);

 if (response.getError() != null) {
 // Handle potential errors during the send operation
 LogService logService = ServiceLocator.locate(LogService.class);
 logService.error("Failed to send message: {}", response.getError().getMessage());
 }
 
 

Using MessageProcessor

A MessageProcessor is a class you create to handle incoming messages. It is automatically invoked by the framework when a message arrives on an inbound queue that it's configured to listen to.

Example: Basic Usage

The following example creates a CrossLink and starts a workflow with information from a Spark message.
 @MessageProcessorInfo()
 public class ProcessStudy implements MessageProcessor {
          public void execute(MessageContext context) {
              String docId = context.getMessage().getAttribute("docId",MessageAttributeValueType.STRING);
              String docName = context.getMessage().getAttribute("docName",MessageAttributeValueType.STRING);
              String remoteVaultId = context.getRemoteVaultId();

              HttpService httpService = ServiceLocator.locate(HttpService.class);

              // When a message is received, create a CrossLink to the document in the source Vault.
              HttpRequest httpRequest = httpService.newLocalHttpRequest()
                  .setMethod(HttpMethod.POST)
                  .setBodyParam("source_document_id__v",docId)
                  .setBodyParam("source_vault_id__v", remoteVaultId)
                  .setBodyParam("source_binding_rule__v", "Latest version")
                  .setBodyParam("name__v",docName)
                  .setBodyParam("type__v", "TypeX")
                  .setBodyParam("lifecycle__v", "General Lifecycle")
                  .appendPath("/api/v18.3/objects/documents");

              httpService.send(httpRequest,HttpResponseBodyValueType.JSONDATA)
                  .onSuccess(response -> {
                      // After CrossLink is created, start the Approval workflow
                      JsonData body = response.getResponse();
                      String crossLinkDocId = body.getJsonObject().getValue("id", JsonValueType.NUMBER).toString();
                      HttpRequest startWorkflowRequest = httpService.newLocalHttpRequest()
                          .setMethod(HttpMethod.PUT)
                          .setBodyParam("Approver", RequestContext.get().getCurrentUserId())
                          .setBodyParam("dueDate", LocalDate.now().plusDays(30).toString())
                          .appendPath("/api/v18.3/objects/documents/" + crossLinkDocId + "/versions/0/1/lifecycle_actions/startApproval");
                      httpService.send(startWorkflowRequest,HttpResponseBodyValueType.STRING)
                          .onError(starkWorkflowResponse -> {
                          int startWorkflowResponseCode = starkWorkflowResponse.getHttpResponse().getHttpStatusCode();
                          ServiceLocator.locate(LogService.class).info("RESPONSE: " + responseCode);
                      })
                          .execute();
                 })
                 .onError(response -> {
                      int responseCode = response.getHttpResponse().getHttpStatusCode();
                      ServiceLocator.locate(LogService.class).info("RESPONSE: " + responseCode);
                 })
                 .execute();

          }
     }
 
 

Example: Set Error

The following example creates a new Product record, sets a User Exception Message record, and throws a RollbackException when the operation fails.
    @MessageProcessorInfo()
    public class ProcessProduct implements MessageProcessor {
      @Override
      public void execute(MessageContext context) {
         RecordService recordService = ServiceLocator.locate(RecordService.class);
         Record record = recordService.newRecord("product__v");
         record.setValue("name__v", "productName");
         List<BatchOperationError> errors = new ArrayList<>();
         recordService.batchSaveRecords(VaultCollections.asList(record)).
             onErrors(le -> errors.addAll(le)).
             execute();

         if (! errors.isEmpty()) {
             // Create a new error record and set to the ErrorProvider
             Record errRecord = recordService.newRecord("exception_message__sys");
             // Below are 5 required fields
             errRecord.setValue("name__v", "productCreationError");
             errRecord.setValue("error_message__sys", "Product batch operation error");
             errRecord.setValue("integration__sys", "V2T000000001002");
             errRecord.setValue("integration_point__sys", "V2U000000001001");
             errRecord.setValue("error_type__sys", Collections.singletonList("item_processing_error__sys"));
             messageContext.getErrorProvider().setError(errRecord);

             // Rollback the entire transaction
             throw new RollbackException("TRACKED_AND_PROPAGATED_ERRORS", "Product batch operation error");
         }
      }
    }
  
  

Using MessageDeliveryEventHandler

A MessageDeliveryEventHandler allows you to define custom logic for when a message fails to be delivered after all system retries have been exhausted.

Example: Handle Undelivered Message

The following example resends an undelivered message, creates a new User Exception Message record, gets the error type, and sends a notification to the user or users specified in the NotificationParameters.
    @MessageDeliveryEventHandlerInfo()
    public class MessageDeliveryHandler implements MessageDeliveryEventHandler {
      public void onError(MessageDeliveryEventHandlerContext messageDeliveryEventHandlerContext) {

          // Instantiate an instance of the QueueService which is used to put messages back on the queue
          QueueService queueService = ServiceLocator.locate(QueueService.class);

         // Resend Message

         // Get Message Context
         MessageContext messageContext = messageDeliveryEventHandlerContext.getMessageContext();

         // Get Queue Name and original Message
         String queueName = messageContext.getRemoteQueueName();
         Message originalMessage =  messageContext.getMessage();

         PutMessageResponse response = queueService.putMessage(originalMessage);

         // Instantiate RecordService
         RecordService recordService = ServiceLocator.locate(RecordService.class);

         MessageDeliveryError messageDeliveryError = messageDeliveryEventHandlerContext.getMessageDeliveryError();
         MessageDeliveryErrorType messageDeliveryErrorType = messageDeliveryError.getType();

         if(messageDeliveryErrorType == MessageDeliveryErrorType.CONFIG_ERROR) {

             // Create User Exception Record
             Record record = recordService.newRecord("exception_message__sys");

             // Below are 5 required fields
             record.setValue("name__v", "Spark Integration Configuration Error");
             record.setValue("error_message__sys", "Integration Config error");
             record.setValue("integration__sys", "V1P000000001001");
             record.setValue("integration_point__sys", "V1Q000000001001");
             record.setValue("error_type__sys", VaultCollections.asList("item_processing_error__sys"));

             recordService.batchSaveRecords(VaultCollections.asList(record)).rollbackOnErrors().execute();

         } else if (messageDeliveryErrorType == MessageDeliveryErrorType.CONNECTION_FAILED) {
             // Notify Sysadmins
             GroupService groupService = ServiceLocator.locate(GroupService.class);
             List groupNames = VaultCollections.newList();
             groupNames.add("system_administrators__v");
             GetGroupsResponse groupResponse = groupService.getGroupsByNames(groupNames);
             Group group = groupResponse.getGroupByName("system_administrators__v");
             List groups = VaultCollections.newList();
             groups.add(group);

             //NotificationService
             NotificationService notificationService = ServiceLocator.locate(NotificationService.class);

             //NotificationParameters
             NotificationParameters notificationParameters = notificationService.newNotificationParameters();
             notificationParameters.setRecipientsByGroups(groups);

             //NotificationTemplate
             NotificationTemplate template = notificationService.newNotificationTemplate()
                     .setTemplateName("spark_integration_error_notification_template__c")
                     .setTokenValue("errorType", "Connection Error");

             //Send Notification with a NotificationTemplate
             notificationService.send(notificationParameters, template);
         }
      }
   }