Package com.veeva.vault.sdk.api.job
Overview
The Jobs SDK allows developers to define and run long-running or resource-intensive operations asynchronously.
The primary entry point for all interactions is the JobService.
Job interface, which follows a
three-phase lifecycle:
init(): A lightweight phase to gather and prepare all items for processing. The work is divided into smaller chunks, or tasks.process(): This phase executes once for each task, often in parallel, to perform the main work.completeWithSuccess():completeWithError(): A final phase that runs after all tasks are finished. completeWithError will be called if any job tasks have failed
You can also use a job processor as logic for a new Job Definition. For example, you can create a custom job definition available to Vault Admins to execute your custom code at scheduled or recurring intervals.
Using the JobService
Best Practices
- Idempotency: Set
@JobInfo(idempotent = true)for jobs that can be safely run multiple times with the same input, such as operations involving record updates or deletions. This allows the system to automatically retry interrupted tasks, or retry tasks in-code viaTaskOutput#setRetry. - Single Instance States: Can be defined on the
JobMetadataof a Job, and can be populated with any of the following states: Scheduled, Queued, Running. If a single instance state is set, only one Job of thisJobMetadatacan be in one of those states at a time. This is useful for Jobs that do not pass in any parameters, and behave the same across all calls. Example: If 'Scheduled' is set as a single instance state, and 2 Jobs of the same metadata are 'Scheduled' at once, only the first will begin processing. - Limits: The framework will enforce limits on certain aspect of Jobs functionality, such as a maximum chunk size of 500
JobItem's, a maximumJobTasksize of 128 KB, a maximum number of JobTask instances at 5000, and a maximum size ofJobQueueParametersas 8KB. Please see https://developer.veevavault.com/sdk/#sdk-job-limits for a full list of up to date limits.Example: Defining and Running a Custom Job Processor
The following illustrates how to create a placeholder custom SDK job.First, create the job processor code by implementing the
Jobinterface.
After deploying the code, configure the job metadata for your job processor in the Vault UI, including the job_code to associate with your job processor, and chunk_size to specify how to split up the job items passed in via init. Learn more in Vault Help.@JobInfo(adminConfigurable = true) public class UpdateManagersJob implements Job { public JobInputSupplier init(JobInitContext jobInitContext) { List<String> recordIdsToProcess = jobInitContext.getJobParameter("recordIdsToProcess", JobParamValueType.LIST_STRING); List<JobItem> jobItems = VaultCollections.newList(); recordIdsToProcess.forEach(recordId -> { JobItem singleItem = jobInitContext.newJobItem(); singleItem.setValue("recordId", recordId); jobItems.add(singleItem); }); return jobInitContext.newJobInput(jobItems); } public void process(JobProcessContext jobProcessContext) { List<String> recordIdsInBatch = jobProcessContext.getCurrentTask().getItems().stream() .map(jobItem -> jobItem.getValue("recordId", JobValueType.STRING) .collect(Collectors.toList()); //perform processing } public void completeWithSuccess(JobCompletionContext jobCompletionContext) { JobLogger logger = jobCompletionContext.getJobLogger(); List<JobTask> tasks = jobCompletionContext.getSuccessTasks(); for (JobTask task : tasks) { TaskOutput taskOutput = task.getTaskOutput(); logger.log(task.getTaskId() + " completed successfully, items: " + task.getItems().size()); } } public void completeWithError(JobCompletionContext jobCompletionContext) { //at least one job task completed with failure or threw an exception, perform error handling and send notifications JobResult result = jobCompletionContext.getJobResult(); JobLogger logger = jobCompletionContext.getJobLogger(); logger.log("completeWithError: " + result.getNumberFailedTasks() + "tasks failed out of " + result.getNumberTasks()); List<JobTask> tasks = jobCompletionContext.getErrorTasks(); for (JobTask task : tasks) { TaskOutput taskOutput = task.getTaskOutput(); if (TaskState.ERRORS_ENCOUNTERED.equals(taskOutput.getState())) { logger.log(task.getTaskId() + " failed with error message " + taskOutput.getValue("firstError", JobValueType.STRING)); } else if (TaskState.FAILED_TO_RUN.equals(taskOutput.getState())) { logger.log(task.getTaskId() + " failed to run " + taskOutput.getValue("firstError", JobValueType.STRING)); } } } }Finally, deploy the entrypoint (RecordTrigger, WebApi, etc) that schedules the job based on its jobMetadata api name like below
JobService jobService = ServiceLocator.locate(JobService.class); List<String> recordIdsToProcess = queryForRecordIdsToProcess(); // Create job queue parameters JobQueueParameters queueParameters = jobService.newJobQueueParametersBuilder() .appendParameter("recordIdsToProcess", recordIdsToProcess) .build(); // Build the job queue request JobQueueRequest jobQueueRequest = jobService.newJobQueueRequestBuilder() .withJobName("job_metadata_api_name__c") .withJobQueueParameters(queueParameters) .build(); // Queue the job for execution jobService.jobQueueOperation(jobQueueRequest).execute();Example: Custom Job Processor, Update Managers
Some data types are supported innately byJobQueueParameters(String, List, Number, etc) based on the JobParamValueTypeenum. If a more complicated structure/data type is required (ie, passing in an object with several fields), a custom dataType can be defined by implementing theJobParamValueinterface. The below example illustrates how to define a JobParamValue to store an object mapping an "oldManagerId" string to a "newManagerId" string, in order to pass to a Job Processor.This Job is a custom SDK job to update the manager for all Persons after a manager becomes inactive, replacing all references to the inactive manager with the 2nd-level manager.
First, create a new class implementing
JobParamValue
When dispatching the Job, the trigger will need to pass in the parameter into thepublic class CustomJobParamValue implements JobParamValue { private Map<String, String> oldManagerToNewManagerMap public CustomJobParamValue() {} public CustomJobParamValue(Map<String, String> oldManagerToNewManagerMap) { this.oldManagerToNewManagerMap = oldManagerToNewManagerMap; } public Map<String, String> getOldManagerToNewManagerMap() { return oldManagerToNewManagerMap; } public void setOldManagerToNewManagerMap(Map<String, String> oldManagerToNewManagerMap) { this.oldManagerToNewManagerMap = oldManagerToNewManagerMap; } }JobQueueParametersThe below SDK trigger fires when a manager becomes inactive, and calls the job to update managers.Note: Cancelling a Job
If a job is marked as@JobInfo(adminCancellable = true), it can be cancelled while in a scheduled or queued state. If the job to be cancelled is currently running, only tasks that have not started processing will be cancelled - in-flight tasks will remain running. The above example shows how to programmatically cancel a job instance that was scheduled for the future.Finally, create the job processor code by implementing the@RecordTriggerInfo(object = "person__sys", events = {RecordEvent.AFTER_UPDATE}) public class RunUpdateManagersJob implements RecordTrigger { public void execute(RecordTriggerContext recordTriggerContext) { JobService jobService = ServiceLocator.locate(JobService.class); List<RecordChange> recordChanges = recordTriggerContext.getRecordChanges(); Map<String, String> oldToNewManagers = VaultCollections.newMap(); for (RecordChange recordChange : recordChanges) { Record newRecord = recordChange.getNew(); // When a Person becomes inactive, update the manager of all related Person objects List<String> statusL = newRecord.getValue("status__v", ValueType.PICKLIST_VALUES); if (statusL != null && "inactive__v".equals(statusL.get(0))) { String oldManagerId = newRecord.getValue("id", ValueType.STRING); String newManagerId = newRecord.getValue("manager__sys", ValueType.STRING); oldToNewManagers.put(oldManagerId, newManagerId); } } CustomJobParamValue oldManagerToNewManagerParamValue = new CustomJobParamValue(); oldManagerToNewManagerParamValue.setOldManagerToNewManagerMap(oldToNewManagers); JobQueueParameters jobQueueParameters = jobService.newJobQueueParametersBuilder() .appendParameter("managersMapContainer", oldManagerToNewManagerParamValue) .build(); JobQueueRequest jobQueueRequest = jobService.newJobQueueRequestBuilder() .withJobName("update_managers__c") .withJobRunTime(ZonedDateTime.now().plusMinutes(15)) //schedules the job to run 15 minutes from now .withJobQueueParameters(jobQueueParameters) .build(); jobService.jobQueueOperation(jobQueueRequest) .onSuccess(jobQueueResult -> { if (somethingBadHappened()) { JobCancelRequest request = jobService.newCancelJobRequestBuilder() .withId(jobQueueResult.getJobId()) .build(); jobService.cancel(request) .withSuccessHandler(response -> { // Log success }) .withErrorHandler(error -> { // Log failure message and type }) .execute(); } }) .execute(); } }Jobinterface. The job processor accesses the passed in parameter viaJobContext.getJobParameter(String, Class), and passes the values intoJobItem's to be split up for processing in job task's based on the chunk_size on the associated JobMetadata (chunk_size # of job items processed per job task).@JobInfo(adminConfigurable = true) public class UpdateManagersJob implements Job { public JobInputSupplier init(JobInitContext jobInitContext) { QueryService queryService = ServiceLocator.locate(QueryService.class); JobLogger logger = jobInitContext.getJobLogger(); List<JobItem> jobItems = VaultCollections.newList(); CustomJobParamValue managersMapContainer = jobInitContext.getJobParameter("managersMapContainer", CustomJobParamValue.class); Map<String, String> oldManagerToNewManagerMap = managersMapContainer.getOldManagerToNewManagerMap(); // Get IDs of all Persons who have the old manager String query = "SELECT id, (SELECT id FROM persons__sysr) from person__sys WHERE id CONTAINS (${Custom.oldManagerIds})"; List<String> oldManagerIds = VaultCollections.newList(); oldManagerIds.addAll(oldManagerToNewManagerMap.keySet()); QueryExecutionRequest queryExecutionRequest = queryService.newQueryExecutionRequestBuilder() .withQueryString(query) .withTokenRequest(tokenService.newTokenRequestBuilder() .withValue("Custom.oldManagerIds", oldManagerIds) .build()) .build(); QueryOperation<QueryExecutionResponse> operation = queryService.query(request) .onSuccess(queryExecutionResponse -> { logger.log("VQL query " + queryExecutionResponse.getQueryString() + " returned " + queryExecutionResponse.getResultCount() + " results"); queryExecutionResponse.streamResults().forEach(queryResult -> { QueryResponse subResults = queryResult.getSubqueryResponse("persons__sysr"); subResults.streamResults().forEach(subResult -> { JobItem employeeItem = jobInitContext.newJobItem(); String employeeId = subResult.getValue("id", ValueType.STRING); employeeItem.setValue("employeeId", employeeId); employeeItem.setValue("newManagerId", newManagerId); jobItems.add(employeeItem); logger.log("Added new job item, employeeId = " + employeeId + ", newManagerId = " + newManagerId); }); }); }); return jobInitContext.newJobInput(jobItems); } public void process(JobProcessContext jobProcessContext) { RecordService recordService = ServiceLocator.locate(RecordService.class); JobLogger logger = jobProcessContext.getJobLogger(); List<JobItem> items = jobProcessContext.getCurrentTask().getItems(); List<Record> newRecords = VaultCollections.newList(); for (JobItem employeeItem : items) { String employeeId = employeeItem.getValue("employeeId", JobValueType.STRING); String newManagerId = employeeItem.getValue("newManagerId", JobValueType.STRING); Record employeeRecord = recordService.newRecordWithId("person__sys", employeeId); employeeRecord.setValue("manager__sys", newManagerId); newRecords.add(employeeRecord); logger.log("Updated employee record, employeeId = " + employeeId + ", newManagerId = " + newManagerId); } JobTask task = jobProcessContext.getCurrentTask(); TaskOutput taskOutput = task.getTaskOutput(); BatchOperation<PositionalRecordId, BatchOperationError> batchResult = recordService.batchSaveRecords(newRecords); batchResult.onSuccesses(positionalRecordIds -> { taskOutput.setState(TaskState.SUCCESS); logger.log("Task successful"); }); batchResult.onErrors( batchOperationErrors -> { taskOutput.setState(TaskState.ERRORS_ENCOUNTERED); taskOutput.setValue("firstError", batchOperationErrors.get(0).getError().getMessage()); logger.log("Task unsuccessful"); }); batchResult.execute(); } public void completeWithSuccess(JobCompletionContext jobCompletionContext) { JobLogger logger = jobCompletionContext.getJobLogger(); List<JobTask> tasks = jobCompletionContext.getSuccessTasks(); for (JobTask task : tasks) { TaskOutput taskOutput = task.getTaskOutput(); logger.log(task.getTaskId() + " completed successfully, items: " + task.getItems().size()); } } public void completeWithError(JobCompletionContext jobCompletionContext) { JobResult result = jobCompletionContext.getJobResult(); JobLogger logger = jobCompletionContext.getJobLogger(); logger.log("completeWithError: " + result.getNumberFailedTasks() + "tasks failed out of " + result.getNumberTasks()); List<JobTask> tasks = jobCompletionContext.getErrorTasks(); for (JobTask task : tasks) { TaskOutput taskOutput = task.getTaskOutput(); if (TaskState.ERRORS_ENCOUNTERED.equals(taskOutput.getState())) { logger.log(task.getTaskId() + " failed with error message " + taskOutput.getValue("firstError", JobValueType.STRING)); } else if (TaskState.FAILED_TO_RUN.equals(taskOutput.getState())) { logger.log(task.getTaskId() + " failed to run " + taskOutput.getValue("firstError", JobValueType.STRING)); } } } }}Example: Handling Job Queueing Success or Failure
You can handle errors that occur during job queueing by queueing a job with
JobService.jobQueueOperation(com.veeva.vault.sdk.api.job.JobQueueRequest)and defining its success and error handlers. This is useful for reacting immediately when either a job was successfully queued or an error prevented it from being queued.-
JobQueueParameters: Defines the custom parameters passed to a job during the asynchronous queueing process. -
JobQueueRequest: Used to specify the details of the job to be queued, such as its title, run time, and owner. -
JobQueueOperation: Represents an asynchronous operation for queueing a job, allowing for success and error handling.
JobService jobService = ServiceLocator.locate(JobService.class); // Create job queue parameters JobQueueParameters queueParameters = jobService.newJobQueueParametersBuilder() .appendParameter("recordIdsToProcess", recordIdsToProcess) .build(); // Build request to queue job JobQueueRequest jobQueueRequest = jobService.newJobQueueRequestBuilder() .withJobName("job_name__c") .withJobOwner(RequestContextUserType.REQUEST_OWNER) .withJobRunTime(ZonedDateTime.now().plusMinutes(30)) // Schedule to run 30 minutes from now .withJobTitle("Test Job") .withJobQueueParameters(queueParameters) .build(); JobQueueOperation jobQueueOperation = jobService.jobQueueOperation(jobQueueRequest); jobQueueOperation .onSuccess(jobQueueResult -> { // Job successfully queued. System.out.println("Job successfully queued with ID: " + jobQueueResult.getJobId()); }) .onError(jobOperationError -> { // An error occurred during job queueing. Handle the error. System.err.println("Failed to queue job: " + jobOperationError.getErrorMessage()); }) .execute(); -
-
ClassDescriptionImplement this interface to create a job that can be executed on a schedule or invoked by other SDK code via the
JobService.A sequence of instructions that can be chained together, building a job cancellation operation which can be executed withJobCancelOperation.execute().Represents an unsuccessful attempt to cancel a job.Valid values for error types encountered during job cancellation.Represents a job cancellation request which can be submitted throughJobService.cancel(JobCancelRequest).Creates a new instance ofJobCancelRequest.Represents a successful attempt to cancel a job.Provides the final results of a job to theJob.completeWithSuccess(JobCompletionContext)orJob.completeWithError(JobCompletionContext)methods.Contains contextual information about the current job.Annotates a class that implementsJob, Indicating a class is a job processor.Context given toJob.init(JobInitContext).An object returned byJob.init(com.veeva.vault.sdk.api.job.JobInitContext)which allows the job to be chunked and processed correctly.Contains data about a single item in aJob.Allows logging to the job log file.Represents an error that occurred during a job operation.Valid error types forJobOperationError.Contains information about a job's owner.Valid values for the job owner type, either a single user or a group.Deprecated.An interface to create complex data objects passed as input parameters to a job.Type of data supported by the Vault Java SDK for job parameters set onJobQueueParameterswhen invoking a job throughJobService.Provides the context for a single execution of theJob.process(JobProcessContext)method.Provides methods for configuring and executing a job submission.Contains parameters to be passed to aJobwhen it is executed.The Builder forJobQueueParameters.Represents a request to queue a job for asynchronous execution.Builder interface for creating aJobQueueRequestinstance.Contains information about a job started throughJobQueueOperation.execute().Contains information about aJob's task results.Contains information about a job started throughJobQueueOperation.execute().Service interface to queue jobs with specified parameters and perform cancel operations.Provides methods to get task information.An interface to create complex data objects passed between job phases.JobValueType<T>Type of data supported by the Vault Java SDK for parameters passed between job phases.Methods to get and set output parameters and results for a job task.Contains valid values to define the final state of aTaskOutput..
JobQueueRequest