-
Notifications
You must be signed in to change notification settings - Fork 33
OrderProcessing sample
This example demonstrates a simple flow for imaginary Order Processing system You can see the flow diagram below.
User submits the order via MessagingGateway which needs to be:
- validated (proceed or fail)
- split if validation is successful (basically separate 'bikes' order from 'books' order)
- route order to specific processors ('bikeProcessor' vs 'bookProcessor')
- aggregate processing results and send reply back to the user
We begin from defining a MessagingGateway. Just like in Java this MessagingGateway is identified by a strategy interface (or 'trait' in Scala terms) giving you simple POSO (Plain Old Scala Object) access to the Messaging system without direct dependency on Spring Integration API Note that there is no implementation for such trait since it will be represented as Proxy.
val orderGateway = gateway.withErrorChannel("errorFlowChannel").using(classOf[OrderProcessingGateway])
As you can see from above we use Spring Integration Scala fluent API to configure the gateway with errorChannel and user defined trait. We also define an aggregationChannel since its going to be reused in teh flow configuration and we want to make sure we have a shared reference instead of relying on channel name (e.g., String) Than we create an SpringIntegrationContext passing the flow configuration as a constructor argument while configuring the rest of the components in-line and composing the Message flow using '->' operator defined in Spring Integration Scala DSL.
val integrationContext = SpringIntegrationContext(
{
orderGateway ->
filter.withName("orderValidator").andErrorOnRejection(true).using{p:PurchaseOrder => !p.items.isEmpty} ->
split.using{p:PurchaseOrder => JavaConversions.asList(p.items)} ->
channel.withExecutor ->
route.withChannelMappings(Map("books" -> "booksChannel", "bikes" -> "bikesChannel")).using{pi:PurchaseOrderItem => pi.itemType}
},
{
channel("errorFlowChannel") ->
service.using{m:Message[] => println("Received ERROR: " + m); "ERROR processing order"}
},
{
channel("bikesChannel") ->
service.using{m:Message[] => println("Processing bikes order: " + m); m} ->
aggregationChannel
},
{
channel("booksChannel") ->
service.using{m:Message[_] => println("Processing books order: " + m); Thread.sleep(new Random().nextInt(2000)); m} ->
aggregationChannel
},
{
aggregationChannel ->
aggregate()
}
)