Skip to content
This repository was archived by the owner on Jul 9, 2022. It is now read-only.
olegz edited this page Jul 20, 2011 · 114 revisions

Welcome to Spring Integration Scala DSL wiki!

This page shows a few working samples that should give you an idea of what the EIP and Spring Integration (SI) DSL may look like.

For more on Spring Integration follow these links:

Spring Integration Home

Spring Integration Reference manual

At this point its mainly to gather feedback and ideas therefore the state of the code and the DSL may change rather quickly, but we'll try to keep this page as up-to-date as we can. You can always file a JIRA issue with your suggestions and comments here: Spring Integration Scala JIRA

The idea for the style of DSL you'll see below was inspired from the simple text-based way of documenting pipes-and-filters Messaging architectures

 pipe -> filter -> pipe -> filter -> ...

One should always assume the Message navigating through pipes and filters and at the moment we are experimenting with using verb representation of EIP patterns. For example instead of transformer we use transform.

 channel("A") >=> transform() >=> channel("B")

In the above it should read Transform Message received from channel "A" and send it to channel "B". We're using '>=>' Kleisli composition operator which uses Responder - a Scala continuation monad to help with message flow composition.

For those familiar with XML style of SI configuration the above configuration would look like this:

 <int:channel id="A"/>

 <int:transformer input-channel="A" output-channel="B" .../>

 <int:channel id="B"/>

Checkout the project

git clone git://github.com/SpringSource/spring-integration-scala.git
  • Build at command-line via Maven.
  • To work in Eclipse:
    • It is recommended that you have the Scala IDE plugin (http://www.scala-ide.org/) installed into Eclipse. The project as checked-in has the Scala nature set, as recognized by the Scala plugin.
    • Your Eclipse install should also include m2Eclipse, the Maven plugin for Eclipse.
    • Import the project into Eclipse as Maven project (Import | Maven | Existing Maven Project), not as a normal Eclipse project. You
    • If the Scala IDE plugin offers to add the Scala libraries to the project, you should not accept this; the libraries should be there brought in by Maven anyway.

Look at the demo.DslDemo for samples described in this page

You can also check out some sample Applications available below (more to come later)

[Order Processing] (https://github.com/SpringSource/spring-integration-scala/wiki/OrderProcessing-sample)

Sample Usage

Simple Message exchange between Point-to-Point channel (DirectChannel) and Service Activator

val inputChannel = channel()

val integrationContext = IntegrationContext(
    inputChannel >=>
    service.using { m: Message[String] => println(m.getPayload) }
)

inputChannel.send(new GenericMessage("==> Hello from Scala"))

In the above example the activated service implemented as Scala function which simply prints the payload of the Message. You can also use Spring's Expression Language (SpEL). The equivalent SpEL-based Service Activator configuration would look like this:

service.using("T(java.lang.System).out.println(payload)")

Compare with XML

Simple Message exchange between asynchronous Point-to-Point channel (DirectChannel) and Service Activator

val inputChannel = channel.withExecutor(Executors.newFixedThreadPool(10))

val integrationContext = IntegrationContext(
	inputChannel >=>
    service.withName("myService").using { m: Message[String] => { println(m.getPayload) } }
)

inputChannel.send(new GenericMessage("==> Hello from Scala"))

The only difference between this sample and the previous one is that the Message Dispatcher used by the inputChannel is injected with an Executor.

Simple Message exchange with Queue Channel (channel which will buffer the messages until they are retrieved via explicit receive call)

val inputChannel = channel.withName("inputChannel")
val outputChannel = channel.withName("outputChannel").andQueue(5)

val integrationContext = IntegrationContext(
    inputChannel >=>
    service.withName("myService").using { m: Message[String] => m.getPayload.toUpperCase() } >=>
    outputChannel
)

inputChannel.send(new GenericMessage("==> Hello from Scala"))
val outputMessage = outputChannel.receive
println("Output Message: " + outputMessage)

In the above sample outputChannel is configured as QueueChannel with queue capacity of 5. Service Activator produces an output Message and sends it to the outputChannel. The Message is received from the outputChannel by calling receive method

Simple Message exchange with Queue Channel and Polling Consumer

val inputChannel = channel.withExecutor().andName("inputChannel")
val middleChannel = channel.withQueue(5).andName("middleChannel")
val resultChannel = channel.withQueue.andName("resultChannel")

val integrationContext = IntegrationContext(
    inputChannel >=>
    service.withName("myService").using { m: Message[String] => m.getPayload.toUpperCase() } >=>
    middleChannel >=>
    transform.withName("myTransformer").andPoller(1000, 5).using{ "'### ' + payload.toLowerCase() + ' ###'" } >=>
    resultChannel
)

inputChannel.send(new GenericMessage("==> Hello from Scala"))
val outputMessage = resultChannel.receive
println("Output Message: " + outputMessage)

In the above sample the Message Trasformer is a Polling Consumer since its input channel is QueueChannel. It is configured with poller to poll every second and get as many as 5 messages per poll. If poller configuration is not explicitly provided, the default poller will be created.

Compare with XML

###Simple Message exchange with Publish-Subscribe-Channel

val inputChannel = pub_sub_channel.withName("inputChannel")
val middleChannel = channel.withName("middleChannel").andQueue(5)
val resultChannel = channel.withName("resultChannel").andQueue

val integrationContext = IntegrationContext(
  inputChannel >=> ( 
    // subscriber 1
	{
		transform.withName("xfmrA").using{ "'From Transformer: ' + payload.toUpperCase()" } >=>
		middleChannel >=>
		transform.withName("xfmrB").using{ m: Message[String] => m.getPayload().asInstanceOf[String].toUpperCase() } >=>
		resultChannel
	},
    // subscriber 2
    {
      service.using{m: Message[String] => println("From Service Activator: " + m) }
    })
)

inputChannel.send(new GenericMessage("==> Hello from Scala"))
val outputMessage = resultChannel.receive
println("Output Message: " + outputMessage)

In the above sample inputChannel is a publish-subscribe-channel which takes array of subscriber configurations. The first subscriber configuration begins with transform.withName("xfmrA")... and the second with service.using .... The Message sent to an inputChannel will be distributed to both subscribers.

Compare with XML

###Implicit Message channels

In EIP filters connected via pipes, thus in Spring Integration endpoints are connected with channels. However when endpoints are connected with default synchronous point-to-point channel (i.e., DirectChannel) defining such channel explicitly seem like an overkill considering its default state. For example:

    inputChannel >=> 
    service.using{m:Message[_] => m.getPayload + "_activator1"} >=>
    channel() >=>
    transform.using{m:Message[_] => m.getPayload + "_transformer1"} >=>
    channel() >=>
    service.using{m:Message[_] => m.getPayload + "_activator2"} >=>
    channel() >=>
    transform.using{m:Message[_] => m.getPayload + "_transformer2"} >=>
    channel() >=>
    service.using{m:Message[_] => println(m)}

In the above configuration channel that is explicitly defined to connect activators and transformers is a DirectChannel with default configuration. To preserve semantics of of pipes-and-filters while improving and simplifying configuration we currently support implicit channel configuration where if channel connecting two endpoints is a DirectChannel with default configuration, it no longer has to be explicitly defined. Here is the example that simplifies the above configuration:

val inputChannel = channel.withName("inputChannel")

val integrationContext = SpringIntegrationContext(
    inputChannel >=> 
    service.using{m:Message[_] => m.getPayload + "_activator1"} >=>
    transform.using{m:Message[_] => m.getPayload + "_transformer1"} >=>
    service.using{m:Message[_] => m.getPayload + "_activator2"} >=>
    transform.using{m:Message[_] => m.getPayload + "_transformer2"} >=>
    service.using{m:Message[_] => println(m)}
)

inputChannel.send(new GenericMessage("==> Hello from Scala"))

We simply auto-create DirectChannels to connect endpoints defined in the above configuration.

###Message exchange via Message Router

val inputChannel = channel.withName("inputChannel")

val integrationContext = IntegrationContext(
    {
      channel("foo") >=>
      service.using{ m: Message[String] => println("FROM FOO channel: " + m.getPayload)}
    },
	{
      channel("bar") >=>
      service.using{ m: Message[String] => println("FROM BAR channel: " + m.getPayload)}
    },
    {
      inputChannel >=>
      route.using{m: Message[String] => m.getPayload }
    }
)

inputChannel.send(new GenericMessage("foo"))
inputChannel.send(new GenericMessage("bar"))

In the above configuration we are passing the name of the channel to route-to in the payload of the Message

Clone this wiki locally