RabbitMQ as Source and Sink
Reading from and writing to RabbitMQ is a very common use-case. Primarily when the source and sink applications use spring Cloud Stream's RabbitMQ binder implementation, the configurations can be confusing. The goal of this recipe is to unpack the complexity step by step.
Before we begin, let's describe the use-case requirements.
As a user, I'd like to,
- Consume
String
payload from a queue running in an external RabbitMQ cluster. - For every payload, I'd like to transform by converting the received
String
to uppercase. - Finally, I'd want to publish the transformed payload to a different queue, again running in the external RabbitMQ cluster.
To further make it more interesting, we will also use the Spring Cloud Stream's RabbitMQ binder implementation in the source, processor, and sink applications.
Configuration
There are two levels of RabbitMQ configurations required for this use-case.
- Configuration for RabbitMQ source and sink applications to connect to the external RabbitMQ cluster.
- Configuration of RabbitMQ binder properties at source, processor, and sink applications. We will use a locally running RabbitMQ at
127.0.0.1
(aka:localhost
) for the binder.
Prerequisite
-
Download the
rabbit-source
,transform-processor
, andrabbit-sink
applications.wget https://repo.spring.io/release/org/springframework/cloud/stream/app/rabbit-source-rabbit/2.1.0.RELEASE/rabbit-source-rabbit-2.1.0.RELEASE.jar
wget https://repo.spring.io/release/org/springframework/cloud/stream/app/transform-processor-rabbit/2.1.0.RELEASE/transform-processor-rabbit-2.1.0.RELEASE.jar
wget https://repo.spring.io/release/org/springframework/cloud/stream/app/rabbit-sink-rabbit/2.1.0.RELEASE/rabbit-sink-rabbit-2.1.0.RELEASE.jar
- Start RabbitMQ locally at
127.0.0.1
. - Setup external RabbitMQ cluster and prepare the cluster connection credentials.
Deployment
With all the prerequisites from the previous step complete, we can now start the three applications.
Source
java -jar rabbit-source-rabbit-2.1.0.RELEASE.jar --server.port=9001 --rabbit.queues=sabbyfooz --spring.rabbitmq.addresses=amqp://<USER>:<PASSWORD>@<HOST>:<PORT> --spring.rabbitmq.username=<USER> --spring.rabbitmq.password=<PASSWORD> --spring.cloud.stream.binders.rabbitBinder.type=rabbit --spring.cloud.stream.binders.rabbitBinder.environment.spring.rabbitmq.addresses=amqp://guest:[email protected]:5672 --spring.cloud.stream.bindings.output.destination=rabzysrc
External RabbitMQ cluster credentials are supplied via --spring.rabbitmq.*
properties.
The binder configurations are supplied via --spring.cloud.stream.binders.rabbitBinder.environment.spring.rabbitmq.*
properties.
The prefix spring.cloud.stream.binders
refers to the binder configuration properties while the name rabbitBinder
is the configuration name chosen for this binder configuration.
You'd have to replace <USER>
, <PASSWORD>
, <HOST>
, and <PORT>
with external cluster credentials.
That's how two different RabbitMQ credentials are passed to the same application; one for the actual data and the other for binder configuration.
sabbyfooz
is the queue from which we will be polling for new data.rabzysrc
is the destination to which the polled data will be published.
Processor
java -jar transform-processor-rabbit-2.1.0.RELEASE.jar --server.port=9002 --spring.cloud.stream.binders.rabbitBinder.type=rabbit --spring.cloud.stream.binders.rabbitBinder.environment.spring.rabbitmq.addresses=amqp://guest:[email protected]:5672 --spring.cloud.stream.bindings.input.destination=rabzysrc --spring.cloud.stream.bindings.output.destination=rabzysink --transformer.expression='''payload.toUpperCase()'''
rabzysrc
is the destination from which we will be receiving new data from the source application.rabzysink
is the destination to which the transformed data will be published.
Sink
java -jar rabbit-sink-rabbit-2.1.0.RELEASE.jar --server.port=9003 --rabbit.exchange=sabbyexchange --rabbit.routing-key=foo --spring.rabbitmq.addresses=amqp://<USER>:<PASSWORD>@<HOST>:<PORT> --spring.rabbitmq.username=<USER> --spring.rabbitmq.password=<PASSWORD> --spring.cloud.stream.binders.rabbitBinder.type=rabbit --spring.cloud.stream.binders.rabbitBinder.environment.spring.rabbitmq.addresses=amqp://guest:[email protected]:5672 --spring.cloud.stream.bindings.input.destination=rabzysink
External RabbitMQ cluster credentials are supplied via --spring.rabbitmq.*
properties.
The binder configurations are supplied via --spring.cloud.stream.binders.rabbitBinder.environment.spring.rabbitmq.*
properties.
The prefix spring.cloud.stream.binders
refers to the binder configuration properties while the name rabbitBinder
is the configuration name chosen for this binder configuration.
You'd have to replace <USER>
, <PASSWORD>
, <HOST>
, and <PORT>
with external cluster credentials.
That's how two different RabbitMQ credentials are passed to the same application; one for the actual data and the other for binder configuration.
rabzysink
is the destination from which the transformed data will be received.sabbyexchange
with thefoo
routing-key is where the data will finally reach.
Testing
Publish Test Data
- Go to the management console of the external RabbitMQ cluster.
- Navigate to the
sabbyfooz
queue from the queues list. - Click
Publish message
to publish the test message (i.e.,hello, rabbit!
).
Verify Results
- Go to the management console of the external RabbitMQ cluster.
- In this sample, the
sabbyexchange
withfoo
routing-key is bound tosabbybaaz
queue. So, let's navigate to that queue from the queues list.
- Click
Get Message(s)
to receive the incoming messages. - Confirm that the payload is transformed from lower to upper case (i.e.,
HELLO, RABBIT!
).
That's all! This concludes the demonstration.