Workers

The implementation of an External Task is done by a Worker.

So for each External Task type we have a DSL for an according Worker.

In the future, it is possible to have more than the Worker types, that are described here.

General

Let's start with functions that are provided for all Workers.

validate

In every Worker, the input In is validated automatically (it is decoded from JSON to a In object). However, you can override the validate method to add more sophisticated validation logic.

Example:

  override def validate(in: In): Either[ValidatorError, In] =
    for
      _ <- Try(in.accountTypeFilterAsSeq).toEither.left.map: _ =>
          ValidatorError(
            "accountTypeFilter must be a single Int or a comma separated String of Ints"
          )
      //_ <- moreValidation(in)
    yield in    

Custom Worker

The Custom Worker is a general Worker that can used for any business logic or integration.

You can create:

Use this Generator to create a new Custom Worker

import mycompany.myproject.bpmn.myprocess.v1.MyCustomTask.*

@Configuration
class MyCustomTaskWorker extends CompanyCustomWorkerDsl[In, Out]:

  lazy val customTask = example

  def runWork(in: In): Either[CamundalaWorkerError.CustomError, Out] =
    // your business logic
    ???

Example:

def runWork(in: In): Either[CamundalaWorkerError.CustomError, Out] =
  doSomethingThatCanFail(in)
    .left.map: e => 
      CamundalaWorkerError.CustomError("Problem in Worker: " + e.getMessage)
    
private def doSomethingThatCanFail(in: In): Either[Throwable, In] = ???    

doSomethingThatCanFail does some mapping or business logic that can fail. If it fails, it returns a Left with an error message, that you wrap with a CamundalaWorkerError.CustomError.

Init Process Worker

The Init Process Worker is a special Worker that is used to start a process.

It automatically does:

You can:

import mycompany.myproject.bpmn.myprocess.v1.MyProcess.*

@Configuration
class MyProcessWorker extends CompanyInitWorkerDsl[In, Out, InitIn, InConfig]:

  lazy val inOutExample = example
    
  def customInit(in: In): InitIn =
    ??? // init logic here

Examples:

customInit

def customInit(in: In): InitIn =
  InitIn(
    currency = in.currency.getOrElse(Currency.EUR), // set optional value with default value
    requestCounter = 0, // init process variables used to control the process flow
    iban = in.person.iban, // simplify process variables
    //...
  )

Service Worker

The Service Worker is a special Worker that is used to call a REST API service.

You can provide:

import mycompany.myproject.bpmn.myprocess.v1.MyServiceTask.*

@Configuration
class MyServiceTaskWorker extends CompanyServiceWorkerDsl[In, Out]:

  lazy val serviceTask = example

  def apiUri(in: In) = uri"$serviceBasePath/myService"
  override lazy val method: Method = Method.POST
 
  override def inputHeaders(in: In): Map[String, String] =
  ??? // map the input variables to the headers
  override def querySegments(in: In) =
    ??? // map the input variables to the query parameters  
  override def inputMapper(in: In): Option[ServiceIn] =
    ??? // map the input variables to the service request body
  override def outputMapper(
                           out: ServiceResponse[ServiceOut],
                           in: In
                         ): Either[ServiceMappingError, Out] =
    ??? // map the service response body and header to the output variables

Examples:

apiUri

def apiUri(in: In) = uri"$serviceBasePath/myService/account/${in.accountId}"

The only required function. It returns the path of the service, with the path parameters from the in object.

method

override lazy val method: Method = Method.POST

Override the HTTP method. Default is Method.GET.

inputHeaders

override def inputHeaders(in: In): Map[String, String] =
  Map("Correlation-ID" -> in.userId)

querySegments

We support three ways to provide query parameters:

queryKeys

override def querySegments(in: In) =
  queryKeys("limitSelection", "accountType")

A list of optional In fields that are mapped to query parameters. So in this example you need to have limitSelection and accountType in your In object.

case class In(limitSelection: Option[Int], accountType: Option[String])

queryKeyValues

override def querySegments(in: In) =
  queryKeyValues(
    "limitSelection" -> in.limitSelection,
    "accountType" -> adjust(in.accountType)
  )

If you need to adjust an In value, you can use this way of explicit listing the key-value pairs.

queryValues

override def querySegments(in: In) =
  queryValues(
    s"eq(username,string:${in.user})"
  )

If you have a query language, you can use this way to provide the query parameters.

a combination of the above

override def querySegments(in: In) =
  queryKeys("limitSelection") ++
    queryKeyValues("accountType" -> adjust(in.accountType))

And you can combine them as you like.

inputMapper

override def inputMapper(in: In): Option[ServiceIn] =
  Some(ServiceIn(in.accountType, in.accountId))

Mapping the input variables to the request body.

outputMapper

override def outputMapper(
                           out: ServiceResponse[ServiceOut],
                           in: In
                         ): Either[ServiceMappingError, Out] =
  out.outputBody
    .collect:
      case b if b.nonEmpty =>
        Right(Out(
          creditCardDetail = b.head,
          creditCardDetails = b
        ))
   .getOrElse(Left(ServiceMappingError("There is at least one CreditCardDetail expected.")))

Mapping the response body and -headers to the output variables.

As you can see there are only two methods that can fail:

  • validate -> For the input, use this methode to validate the input and return any possible error.
  • outputMapper -> For the output, we do not have an extra 'validate' method. So if the service response is not as expected, you can return an error here.