Workers
The implementation of an External Task is done by a Worker.
So for each External Task type we have a DSL for an according Worker.
In the future, it is possible to have more than the Worker types, that are described here.
General
Let's start with functions that are provided for all Workers.
validate
In every Worker, the input In
is validated automatically (it is decoded from JSON to a In
object).
However, you can override the validate
method to add more sophisticated validation logic.
Example:
override def validate(in: In): Either[ValidatorError, In] =
for
_ <- Try(in.accountTypeFilterAsSeq).toEither.left.map: _ =>
ValidatorError(
"accountTypeFilter must be a single Int or a comma separated String of Ints"
)
//_ <- moreValidation(in)
yield in
Custom Worker
The Custom Worker is a general Worker that can used for any business logic or integration.
You can create:
- business logic that can't be handled by the expression language in the BPMN itself
- complex mappings
- a service integration that is not covered by the Service Worker
- whatever you want
Use this Generator to create a new Custom Worker
import mycompany.myproject.bpmn.myprocess.v1.MyCustomTask.*
@SpringConfiguration
class MyCustomTaskWorker extends CompanyCustomWorkerDsl[In, Out]:
lazy val customTask = example
override def runWork(in: In): Either[CamundalaWorkerError.CustomError, Out] =
// your business logic
???
lazy val customTask
is just needed for the compiler to check the correctness of the types (Prototype pattern).runWork
is the method that is called by the Worker to execute the business logic.- The code can either:
- complete the task successfully with a result ->
Right[Out]
- fail the task with an error ->
Left[CamundalaWorkerError.CustomError]
Example:
def runWork(in: In): Either[CamundalaWorkerError.CustomError, Out] =
doSomethingThatCanFail(in)
.left.map: e =>
CamundalaWorkerError.CustomError("Problem in Worker: " + e.getMessage)
private def doSomethingThatCanFail(in: In): Either[Throwable, In] = ???
doSomethingThatCanFail
does some mapping or business logic that can fail.
If it fails, it returns a Left
with an error message, that you wrap with a CamundalaWorkerError.CustomError.
Advanced Custom Workers
It is possible to orchestrate Workers in a Custom Worker.
This is advanced in a sense that you work with the ZIO library.
Here are the main differences to a simple Custom Worker:
import mycompany.myproject.bpmn.myprocess.v1.MyCustomTask.*
@SpringConfiguration
class MyCustomTaskWorker extends CompanyCustomWorkerDsl[In, Out]:
lazy val customTask = example
override def runWorkZIO(in: In): EngineRunContext ?=> IO[CustomError, Out] =
// your business logic
???
runWorkZIO
instead ofrunWork
is the method that is called by the Worker to execute the business logic.-
The code can either:
- complete the task successfully with a result ->
ZIO.succeed[Out]
- fail the task with an error ->
ZIO.fail[CamundalaWorkerError.CustomError]
- complete the task successfully with a result ->
-
What about the strange return value
EngineRunContext ?=> IO[CustomError, Seq[Account]]
?- No worries
EngineRunContext ?=>
is just a way to pass theEngineRunContext
to the function implicitly. - As you have this already in the
runWorkZIO
method, you can use it in thegetAccounts
method. - This is needed to call the
runWorkFromWorkerUnsafe
method of an orchestrated Worker and gets you access to theGeneralVariables
for example.
- No worries
Why ZIO?
ZIO is a library for asynchronous and concurrent programming in Scala. It is used to handle errors and side effects in a functional way. It is also used to handle the asynchronous nature of the Camunda 8 API. This means that composing Workers is easier and more readable.
Let's look at an Example:
@Autowired
var processInstanceService: GetProcessInstanceWorker = uninitialized
@Autowired
var createSetModulesOtherWorker: CreateSetModulesOtherWorker = uninitialized
override def runWorkZIO(in: In): EngineRunContext ?=> IO[CustomError, Out] =
for
client <- getProcessInstances(in)
result <- createSetModulesOther(in, client)
yield result
GetProcessInstanceWorker
: you simply can add Workers as dependencies.getProcessInstances(in)
: you can simply compose the Workers.
Running in parallel can also be achieved easily:
for
partnerClientKeys <- getPartnerClientKeys(in, client)
partners <- getPartners(partnerClientKeys) // wait for the above result
eBankingContractsFork <- getEBankingContracts(partners).fork // run in parallel
cardsFork <- getCards(in).fork // run in parallel
// Join all forks
eBankingContracts <- eBankingContractsFork.join
allCards <- cardsFork.join
output <- createOutput(eBankingContracts, allCards)
yield output
Error Handling:
private[v3] def getAccounts(in: In): EngineRunContext ?=> IO[CustomError, Seq[Account]] =
getAccountsWorker
.runWorkFromWorkerUnsafe(GetAccounts.In.minimalExample.copy(
clientKey = in.clientKey
))
.mapError: err =>
CustomError(
s"Error while get Accounts: ${err.errorMsg}"
)
Simply use mapError
to map the error to a CustomError
.
For more information on ZIO see the ZIO documentation.
Mocking
You can also mock a composed Worker like:
private[v3] def checkClient(in: In)(using engineContext: EngineRunContext): IO[CustomError, GetClientClientKey.Out] =
given EngineRunContext = engineContext
.copy(generalVariables =
engineContext.generalVariables.copy(
outputServiceMock = in.loadCustomerMock.map(_.asJson)
)
)
getClientClientKeyWorker
.runWorkFromWorkerUnsafe(GetClientClientKey.In(in.clientKey))
- You see
(using engineContext: EngineRunContext):
is an alternative way to pass theEngineRunContext
to the function implicitly. - For mocking you can use the
outputServiceMock
oroutputMock
in theGeneralVariables
. - The mock itself must be in the
In
object of the Worker as there is noInConfig
for a Worker.
Handled Errors (BpmnError)
There are 2 scenarios where you can handle Errors in a Worker:
1. The worker logic should go on:
scala
.catchAll:
case err: ServiceError if err.errorCode == 404 =>
ZIO.succeed(Seq.empty[Card])
case err =>
ZIO.fail:
CustomError(
s"Error while loading cards.",
causeError = Some(err)
)
Just handle the error and switch the error to a successful result.
-
The worker logic should stop:
.. .mapError: err => CustomError( s"Error while checking client: ${err.errorMsg}", generalVariables = Some(GeneralVariables(handledErrors = engineContext.generalVariables.handledErrors)), causeError = Some(err) )
Just extend the
CustomError
with thehandledErrors
andcauseError
to handle the error. The rest is done by Camundala.
Init Process Worker
The Init Process Worker is a special Worker that is used to start a process.
It automatically does:
- validate the input variables
- merge the inConfig variables with manual overrides
You can:
- init input process variables with default values.
- init process variables used to control the process flow, like counters, variables that may not be on the process.
- create simplified process variables to simplify the process flow.
import mycompany.myproject.bpmn.myprocess.v1.MyProcess.*
@SpringConfiguration
class MyProcessWorker extends CompanyInitWorkerDsl[In, Out, InitIn, InConfig]:
lazy val inOutExample = example
def customInit(in: In): InitIn =
??? // init logic here
lazy val inOutExample
is just needed for the compiler to check the correctness of the types.customInit
is the method that is called by the Worker to execute the init logic.- The method sets the process variables according the InitIn object. Be aware that this can not fail, as the input variables are already validated.
Examples:
customInit
def customInit(in: In): InitIn =
InitIn(
currency = in.currency.getOrElse(Currency.EUR), // set optional value with default value
requestCounter = 0, // init process variables used to control the process flow
iban = in.person.iban, // simplify process variables
//...
)
Service Worker
The Service Worker is a special Worker that is used to call a REST API service.
You can provide:
- the method and the path of the service
- the query parameters
- the headers
- the mapping of the input- or output request body
import mycompany.myproject.bpmn.myprocess.v1.MyServiceTask.*
@SpringConfiguration
class MyServiceTaskWorker extends CompanyServiceWorkerDsl[In, Out]:
lazy val serviceTask = example
def apiUri(in: In) = uri"$serviceBasePath/myService"
override lazy val method: Method = Method.POST
override def inputHeaders(in: In): Map[String, String] =
??? // map the input variables to the headers
override def querySegments(in: In) =
??? // map the input variables to the query parameters
override def inputMapper(in: In): Option[ServiceIn] =
??? // map the input variables to the service request body
override def outputMapper(
out: ServiceResponse[ServiceOut],
in: In
): Either[ServiceMappingError, Out] =
??? // map the service response body and header to the output variables
lazy val serviceTask
is just needed for the compiler to check the correctness of the types.def apiUri(in: In)
the path of the service, with the path parameters from thein
object. The only required function.override protected lazy val method: Method
is the HTTP method. Default isMethod.GET
.override def querySegments(in: In)
is optional and can be used to add query parameters to the request.override def inputHeaders(in: In)
is optional and can be used to add headers to the request.override def inputMapper(in: In)
is optional and can be used to map the input variables to the request body.override def outputMapper(out: ServiceResponse[ServiceOut], in: In)
is optional and can be used to map the response body and -headers to the output variables.
Examples:
apiUri
def apiUri(in: In) = uri"$serviceBasePath/myService/account/${in.accountId}"
The only required function. It returns the path of the service, with the path parameters from the in
object.
method
override lazy val method: Method = Method.POST
Override the HTTP method. Default is Method.GET
.
inputHeaders
override def inputHeaders(in: In): Map[String, String] =
Map("Correlation-ID" -> in.userId)
querySegments
We support three ways to provide query parameters:
queryKeys
override def querySegments(in: In) =
queryKeys("limitSelection", "accountType")
A list of optional In
fields that are mapped to query parameters.
So in this example you need to have limitSelection
and accountType
in your In
object.
case class In(limitSelection: Option[Int], accountType: Option[String])
queryKeyValues
override def querySegments(in: In) =
queryKeyValues(
"limitSelection" -> in.limitSelection,
"accountType" -> adjust(in.accountType)
)
If you need to adjust an In
value, you can use this way of explicit listing the key-value pairs.
queryValues
override def querySegments(in: In) =
queryValues(
s"eq(username,string:${in.user})"
)
If you have a query language, you can use this way to provide the query parameters.
a combination of the above
override def querySegments(in: In) =
queryKeys("limitSelection") ++
queryKeyValues("accountType" -> adjust(in.accountType))
And you can combine them as you like.
inputMapper
override def inputMapper(in: In): Option[ServiceIn] =
Some(ServiceIn(in.accountType, in.accountId))
Mapping the input variables to the request body.
outputMapper
override def outputMapper(
out: ServiceResponse[ServiceOut],
in: In
): Either[ServiceMappingError, Out] =
out.outputBody
.collect:
case b if b.nonEmpty =>
Right(Out(
creditCardDetail = b.head,
creditCardDetails = b
))
.getOrElse(Left(ServiceMappingError("There is at least one CreditCardDetail expected.")))
Mapping the response body and -headers to the output variables.
As you can see there are only two methods that can fail:
validate
-> For the input, use this methode to validate the input and return any possible error.outputMapper
-> For the output, we do not have an extra 'validate' method. So if the service response is not as expected, you can return an error here.