Workers

The implementation of an External Task is done by a Worker.

So for each External Task type we have a DSL for an according Worker.

In the future, it is possible to have more than the Worker types, that are described here.

General

Let's start with functions that are provided for all Workers.

validate

In every Worker, the input In is validated automatically (it is decoded from JSON to a In object). However, you can override the validate method to add more sophisticated validation logic.

Example:

  override def validate(in: In): Either[ValidatorError, In] =
    for
      _ <- Try(in.accountTypeFilterAsSeq).toEither.left.map: _ =>
          ValidatorError(
            "accountTypeFilter must be a single Int or a comma separated String of Ints"
          )
      //_ <- moreValidation(in)
    yield in    

Custom Worker

The Custom Worker is a general Worker that can used for any business logic or integration.

You can create:

Use this Generator to create a new Custom Worker

import mycompany.myproject.bpmn.myprocess.v1.MyCustomTask.*

@SpringConfiguration
class MyCustomTaskWorker extends CompanyCustomWorkerDsl[In, Out]:

  lazy val customTask = example

  override def runWork(in: In): Either[CamundalaWorkerError.CustomError, Out] =
    // your business logic
    ???

Example:

def runWork(in: In): Either[CamundalaWorkerError.CustomError, Out] =
  doSomethingThatCanFail(in)
    .left.map: e => 
      CamundalaWorkerError.CustomError("Problem in Worker: " + e.getMessage)
    
private def doSomethingThatCanFail(in: In): Either[Throwable, In] = ???    

doSomethingThatCanFail does some mapping or business logic that can fail. If it fails, it returns a Left with an error message, that you wrap with a CamundalaWorkerError.CustomError.

Advanced Custom Workers

It is possible to orchestrate Workers in a Custom Worker.

This is advanced in a sense that you work with the ZIO library.

Here are the main differences to a simple Custom Worker:

import mycompany.myproject.bpmn.myprocess.v1.MyCustomTask.*

@SpringConfiguration
class MyCustomTaskWorker extends CompanyCustomWorkerDsl[In, Out]:

  lazy val customTask = example

  override def runWorkZIO(in: In): EngineRunContext ?=> IO[CustomError, Out] =
    // your business logic
    ???

Why ZIO?

ZIO is a library for asynchronous and concurrent programming in Scala. It is used to handle errors and side effects in a functional way. It is also used to handle the asynchronous nature of the Camunda 8 API. This means that composing Workers is easier and more readable.

Let's look at an Example:

    @Autowired
    var processInstanceService: GetProcessInstanceWorker         = uninitialized
    @Autowired
    var createSetModulesOtherWorker: CreateSetModulesOtherWorker = uninitialized
    
    override def runWorkZIO(in: In): EngineRunContext ?=> IO[CustomError, Out] =
      for
        client <- getProcessInstances(in)
        result <- createSetModulesOther(in, client)
      yield result

Running in parallel can also be achieved easily:

    for
          partnerClientKeys               <- getPartnerClientKeys(in, client)
          partners                        <- getPartners(partnerClientKeys) // wait for the above result
          eBankingContractsFork           <- getEBankingContracts(partners).fork // run in parallel
          cardsFork                       <- getCards(in).fork // run in parallel
          // Join all forks
          eBankingContracts               <- eBankingContractsFork.join 
          allCards                        <- cardsFork.join
          output                          <- createOutput(eBankingContracts, allCards)
        yield output

Error Handling:

    private[v3] def getAccounts(in: In): EngineRunContext ?=> IO[CustomError, Seq[Account]] =
      getAccountsWorker
        .runWorkFromWorkerUnsafe(GetAccounts.In.minimalExample.copy(
          clientKey = in.clientKey
        ))
        .mapError: err =>
          CustomError(
            s"Error while get Accounts: ${err.errorMsg}"
          )

Simply use mapError to map the error to a CustomError.

For more information on ZIO see the ZIO documentation.

Mocking

You can also mock a composed Worker like:

  private[v3] def checkClient(in: In)(using engineContext: EngineRunContext): IO[CustomError, GetClientClientKey.Out] =
    given EngineRunContext = engineContext
      .copy(generalVariables =
        engineContext.generalVariables.copy(
          outputServiceMock = in.loadCustomerMock.map(_.asJson)
        )
      )
    getClientClientKeyWorker
      .runWorkFromWorkerUnsafe(GetClientClientKey.In(in.clientKey))

Handled Errors (BpmnError)

There are 2 scenarios where you can handle Errors in a Worker: 1. The worker logic should go on: scala .catchAll: case err: ServiceError if err.errorCode == 404 => ZIO.succeed(Seq.empty[Card]) case err => ZIO.fail: CustomError( s"Error while loading cards.", causeError = Some(err) ) Just handle the error and switch the error to a successful result.

  1. The worker logic should stop:
    ..
    .mapError: err =>
        CustomError(
          s"Error while checking client: ${err.errorMsg}",
          generalVariables =
            Some(GeneralVariables(handledErrors = engineContext.generalVariables.handledErrors)),
          causeError = Some(err)
        )

    Just extend the CustomError with the handledErrors and causeError to handle the error. The rest is done by Camundala.

Init Process Worker

The Init Process Worker is a special Worker that is used to start a process.

It automatically does:

You can:

import mycompany.myproject.bpmn.myprocess.v1.MyProcess.*

@SpringConfiguration
class MyProcessWorker extends CompanyInitWorkerDsl[In, Out, InitIn, InConfig]:

  lazy val inOutExample = example
    
  def customInit(in: In): InitIn =
    ??? // init logic here

Examples:

customInit

def customInit(in: In): InitIn =
  InitIn(
    currency = in.currency.getOrElse(Currency.EUR), // set optional value with default value
    requestCounter = 0, // init process variables used to control the process flow
    iban = in.person.iban, // simplify process variables
    //...
  )

Service Worker

The Service Worker is a special Worker that is used to call a REST API service.

You can provide:

import mycompany.myproject.bpmn.myprocess.v1.MyServiceTask.*

@SpringConfiguration
class MyServiceTaskWorker extends CompanyServiceWorkerDsl[In, Out]:

  lazy val serviceTask = example

  def apiUri(in: In) = uri"$serviceBasePath/myService"
  override lazy val method: Method = Method.POST
 
  override def inputHeaders(in: In): Map[String, String] =
  ??? // map the input variables to the headers
  override def querySegments(in: In) =
    ??? // map the input variables to the query parameters  
  override def inputMapper(in: In): Option[ServiceIn] =
    ??? // map the input variables to the service request body
  override def outputMapper(
                           out: ServiceResponse[ServiceOut],
                           in: In
                         ): Either[ServiceMappingError, Out] =
    ??? // map the service response body and header to the output variables

Examples:

apiUri

def apiUri(in: In) = uri"$serviceBasePath/myService/account/${in.accountId}"

The only required function. It returns the path of the service, with the path parameters from the in object.

method

override lazy val method: Method = Method.POST

Override the HTTP method. Default is Method.GET.

inputHeaders

override def inputHeaders(in: In): Map[String, String] =
  Map("Correlation-ID" -> in.userId)

querySegments

We support three ways to provide query parameters:

queryKeys

override def querySegments(in: In) =
  queryKeys("limitSelection", "accountType")

A list of optional In fields that are mapped to query parameters. So in this example you need to have limitSelection and accountType in your In object.

case class In(limitSelection: Option[Int], accountType: Option[String])

queryKeyValues

override def querySegments(in: In) =
  queryKeyValues(
    "limitSelection" -> in.limitSelection,
    "accountType" -> adjust(in.accountType)
  )

If you need to adjust an In value, you can use this way of explicit listing the key-value pairs.

queryValues

override def querySegments(in: In) =
  queryValues(
    s"eq(username,string:${in.user})"
  )

If you have a query language, you can use this way to provide the query parameters.

a combination of the above

override def querySegments(in: In) =
  queryKeys("limitSelection") ++
    queryKeyValues("accountType" -> adjust(in.accountType))

And you can combine them as you like.

inputMapper

override def inputMapper(in: In): Option[ServiceIn] =
  Some(ServiceIn(in.accountType, in.accountId))

Mapping the input variables to the request body.

outputMapper

override def outputMapper(
                           out: ServiceResponse[ServiceOut],
                           in: In
                         ): Either[ServiceMappingError, Out] =
  out.outputBody
    .collect:
      case b if b.nonEmpty =>
        Right(Out(
          creditCardDetail = b.head,
          creditCardDetails = b
        ))
   .getOrElse(Left(ServiceMappingError("There is at least one CreditCardDetail expected.")))

Mapping the response body and -headers to the output variables.

As you can see there are only two methods that can fail:

  • validate -> For the input, use this methode to validate the input and return any possible error.
  • outputMapper -> For the output, we do not have an extra 'validate' method. So if the service response is not as expected, you can return an error here.