Skip to content
This repository was archived by the owner on Jul 9, 2022. It is now read-only.
olegz edited this page Jan 31, 2012 · 114 revisions

Welcome to Spring Integration Scala DSL wiki!

This page shows a few working samples that should give you an idea of what the EIP and Spring Integration (SI) DSL may look like.

For more on Spring Integration follow these links:

Spring Integration Home

Spring Integration Reference manual

At this point its mainly to gather feedback and ideas therefore the state of the code and the DSL may change rather quickly, but we'll try to keep this page as up-to-date as we can. You can always file a JIRA issue with your suggestions and comments here: Spring Integration Scala JIRA

The idea for the style of DSL you'll see below was inspired from the simple text-based way of documenting pipes-and-filters Messaging architectures

 pipe -> filter -> pipe -> filter -> ...

One should always assume the Message navigating through pipes and filters and at the moment we are experimenting with using verb representation of EIP patterns. For example instead of transformer we use transform.

 Channel("A") --> transform() --> Channel("B")

In the above it should read Transform Message received from channel "A" and send it to channel "B". We're using '-->' operator which essentially depicts the direction of the flow and identifies the next EIP component in the Message flow composition.

For those familiar with XML style of SI configuration the above configuration would look like this:

 <int:channel id="A"/>

 <int:transformer input-channel="A" output-channel="B" .../>

 <int:channel id="B"/>

Checkout the project

git clone git://github.com/SpringSource/spring-integration-scala.git

The project is using SBT as its build system (see https://github.com/harrah/xsbt/wiki/Getting-Started-Setup)

  • To work in IntelliJ:

    • First generate IDEA artifacts via 'sbt gen-idea' command, than simply import the progect as a regular project

Look at the demo.DslDemo for samples described in this page

You can also check out some sample Applications available below (more to come later)

[Order Processing] (https://github.com/SpringSource/spring-integration-scala/wiki/OrderProcessing-sample)

Sample Usage

Simple Message exchange between Point-to-Point channel (DirectChannel) and Service Activator

val integrationFlow = 
    Channel("inputChannel") -->
    handle.using { m: Message[String] => println(m.getPayload) }

In the above example the activated service implemented as Scala function which simply prints the payload of the Message. Later you'll also find out that unless your channel has a complex configuration, defining it explicitly is not required. You can also use Spring's Expression Language (SpEL). The equivalent SpEL-based Service Activator configuration would look like this:

    handle.using("T(java.lang.System).out.println(payload)")

Compare with XML

To send a Message simply use one of the convinience methods provided on the Message flow configuration:

    // you create SI Message
    integrationFlow.send(new GenericMessage("Hello from Scala")) 

    // Input parameter will become a payload of the Message
    integrationFlow.send("Hello from Scala")) 

    // The Message will be constructed based on metadata (e.g., payload, headers etc.) provided
    integrationFlow.send("Hello from Scala", headers=Map("foo"->"foo", "bar"->"bar"))) 

Simple Message exchange between asynchronous Point-to-Point channel (ExecutorChannel) and Service Activator

    val integrationFlow = 
    	Channel.withDispatcher(taskExecutor = Executors.newCachedThreadPool)  -->
        handle.using { m: Message[String] =>  println(m.getPayload)  }

The only difference between this sample and the previous one is that the Message Dispatcher used by the inputChannel is injected with an Executor. In this case channel is necessary since it manages how Messages are dispatched downstream

Simple Message exchange with Queue Channel (channel which will buffer the messages until they are retrieved via explicit receive call or polled from):

    val integrationFlow = 
        transform.using { m: Message[String] => m.getPayload.toUpperCase() } -->
        Channel.withQueue(5) --> poll.usingFixedRate(1000).withMaxMessagesPerPoll(5) -->
        handle.using { m: Message[String] => println(m.getPayload) }

In the above example we see a QueueChannel between Transformer and ServiceActivator with queue capacity of 5. Since we are dealing with PollableChannel we need PollingConsumer to poll messages from the QueueChannel hence Poller configuration which will poll every second getting as many as 5 Messages per single poll sending each Message to the ServiceActivator downstream.

###Implicit Message channels

THe EIP pipes-and-filters pattern states that filters connected via pipes, thus in Spring Integration endpoints (filters) are connected with channels (pipes). However when endpoints are connected with default synchronous point-to-point channel (i.e., DirectChannel) defining such channel explicitly seem like an overkill considering its default nature. For example:

        Channel("A") --> 
        handle.using{m:Message[_] => m.getPayload + "_activator1"} -->
        Channel("B") -->
        transform.using{m:Message[_] => m.getPayload + "_transformer1"} -->
        Channel("C") -->
        handle.using{m:Message[_] => m.getPayload + "_activator2"} -->
        Channel("D") -->
        transform.using{m:Message[_] => m.getPayload + "_transformer2"} -->
        Channel("E") -->
        handle.using{m:Message[_] => println(m)}

In the above configuration channels that Service Activators and Transformers are DirectChannels with default configuration. To preserve semantics of of pipes-and-filters while improving and simplifying configuration we currently support implicit channel configuration where if channel connecting two endpoints is a DirectChannel with default configuration, it no longer has to be explicitly defined. Here is the example that simplifies the above configuration:

    val integrationFlow = 
        handle.using{m:Message[_] => m.getPayload + "_activator1"} -->
        transform.using{m:Message[_] => m.getPayload + "_transformer1"} -->
        handle.using{m:Message[_] => m.getPayload + "_activator2"} -->
        transform.using{m:Message[_] => m.getPayload + "_transformer2"} -->
        handle.using{m:Message[_] => println(m)}
    )

We simply auto-create DirectChannels to connect endpoints defined in the above configuration.

###Message exchange via Message Router

    val integrationFlow = 
        route.using{pi:PurchaseOrderItem => pi.itemType}(
          when("bikes") {
            transform.using{m:Message[_] => println("Processing bikes order: " + m); m} -->
            handle.using("payload")
          },
          when("books") {
            transform.using{m:Message[_] => println("Processing books order: " + m); m} 
          }
      ) -->
      handle.using{m:Message[_] => println("Received order after route: " + m.getPayload)} 

In the above configuration the router simply routes based on the result of the code executed in the first segment of curried "using" method. If the result is String with value of 'books' it will go to the second route, otherwise the first one. The 'when' method defines message flow compositions specific to a particular route.

Clone this wiki locally