Stream Processing with Apache Kafka
In this guide, we develop three Spring Boot applications that use Spring Cloud Stream's support for Apache Kafka and deploy them to Cloud Foundry, Kubernetes, and your local machine. In another guide, we deploy these applications by using Spring Cloud Data Flow. By deploying the applications manually, you get a better understanding of the steps that Data Flow can automate for you.
The following sections describe how to build these applications from scratch. If you prefer, you can download a zip file that contains the sources for these applications, unzip it, and proceed to the deployment section.
You can download a zip file containing the completed application that contains all three applications from your browser. You can also download the zip file from the command line by using the following command:
wget https://github.com/spring-cloud/spring-cloud-dataflow-samples/blob/master/dataflow-website/stream-developer-guides/streams/standalone-stream-sample/dist/usage-cost-stream-sample.zip?raw=true -O usage-cost-stream-sample.zip
Development
We create three Spring Cloud Stream applications that communicate using Kafka.
The scenario is a cell phone company creating bills for its customers.
Each call made by a user has a duration
and an amount of data
used during the call.
As part of the process to generate a bill, the raw call data needs to be converted to a cost for the duration of the call and a cost for the amount of data used.
The call is modeled by using the UsageDetail
class, which contains the duration
of the call and the amount of data
used during the call.
The bill is modeled by using the UsageCostDetail
class, which contains the cost of the call (costCall
) and the cost of the data (costData
). Each class contains an ID (userId
) to identify the person making the call.
The three streaming applications are as follows:
- The
Source
application (namedUsageDetailSender
) generates the user's callduration
and amount ofdata
used peruserId
and sends a message containing theUsageDetail
object as JSON. - The
Processor
application (namedUsageCostProcessor
) consumes theUsageDetail
and computes the cost of the call and the cost of the data peruserId
. It sends theUsageCostDetail
object as JSON. - The
Sink
application (namedUsageCostLogger
) consumes theUsageCostDetail
object and logs the cost of the call and the cost of the data.
UsageDetailSender source
Either download the initialzr generated project directly or visit the Spring Initialzr site and follow these instructions:
- Create a new Maven project with a Group name of
io.spring.dataflow.sample
and an Artifact name ofusage-detail-sender-kafka
. - In the Dependencies text box, type
Kafka
to select the Kafka binder dependency. - In the Dependencies text box, type
Cloud Stream
to select the Spring Cloud Stream dependency. - In the Dependencies text box, type
Actuator
to select the Spring Boot actuator dependency. - If your target platform is
Cloud Foundry
, typeCloud Connectors
to select the Spring Cloud Connector dependency. - Click the Generate Project button.
Now you should unzip
the usage-detail-sender-kafka.zip
file and import the project into your favorite IDE.
Business Logic
Now we can create the code required for this application. To do so:
- Create a
UsageDetail
class in theio.spring.dataflow.sample.usagedetailsender
package with content that resembles UsageDetail.java. ThisUsageDetail
model containsuserId
,data
, andduration
properties. - Create the
UsageDetailSender
class in theio.spring.dataflow.sample.usagedetailsender
package with content that resembles the following:
package io.spring.dataflow.sample.usagedetailsender;
import java.util.Random;
import java.util.function.Supplier;
import io.spring.dataflow.sample.UsageDetail;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class UsageDetailSender {
private String[] users = {"user1", "user2", "user3", "user4", "user5"};
@Bean
public Supplier<UsageDetail> sendEvents() {
return () -> {
UsageDetail usageDetail = new UsageDetail();
usageDetail.setUserId(this.users[new Random().nextInt(5)]);
usageDetail.setDuration(new Random().nextInt(300));
usageDetail.setData(new Random().nextInt(700));
return usageDetail;
};
}
}
This is a simple Configuration
class with a single bean that returns a java.util.function.Supplier
.
Spring Cloud Stream, behind the scenes will turn this Supplier
into a producer.
By default, the supplier will be invoked every second.
On each invocation, the supplier method sendEvents
constructs a UsageDetail
object.
Configuring the UsageDetailSender application
When configuring the producer
application, we need to set the producer binding destination (Kafka topic) where the producer publishes the data.
The default producer output binding for the above method is going to be sendEvents-out-0
(method name followed by the literal -out-0
where 0
is the index).
If the application does not set a destination, Spring Cloud Stream will use this same binding name as the output destination (Kafka topic).
However, in our case, we neither want this default binding name used by Spring Cloud Stream nor the destination name.
We want to use the binding name as output
and provide a custom destination.
In src/main/resources/application.properties
, you can add the following properties to override:
spring.cloud.stream.function.bindings.sendEvents-out-0=output
spring.cloud.stream.bindings.output.destination=usage-detail
The first property will override the default binding name to output
and the second one will set destination on that binding.
Building
Now we can build the Usage Detail Sender application.
In the usage-detail-sender
directory, use the following command to build the project using maven:
./mvnw clean package -Pkafka
Testing
Spring Cloud Stream provides a test binder to test an application. Following are the maven coordinates for this artifact.
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<type>test-jar</type>
<classifier>test-binder</classifier>
<scope>test</scope>
</dependency>
Instead of the Kafka
binder, the tests use the Test
binder to trace and test your application's outbound and inbound messages.
The Test
binder provides abstractions for output and input destinations as OutputDestination
and InputDestination
.
Using them, you can simulate the behavior of actual middleware based binders.
To unit test this UsageDetailSender
application, add the following code in the UsageDetailSenderApplicationTests
class:
package io.spring.dataflow.sample.usagedetailsender;
import io.spring.dataflow.sample.UsageDetail;
import org.junit.jupiter.api.Test;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.stream.binder.test.OutputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import static org.assertj.core.api.Assertions.assertThat;
public class UsageDetailSenderApplicationTests {
@Test
public void contextLoads() {
}
@Test
public void testUsageDetailSender() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration
.getCompleteConfiguration(UsageDetailSenderApplication.class))
.web(WebApplicationType.NONE)
.run()) {
OutputDestination target = context.getBean(OutputDestination.class);
Message<byte[]> sourceMessage = target.receive(10000);
final MessageConverter converter = context.getBean(CompositeMessageConverter.class);
UsageDetail usageDetail = (UsageDetail) converter
.fromMessage(sourceMessage, UsageDetail.class);
assertThat(usageDetail.getUserId()).isBetween("user1", "user5");
assertThat(usageDetail.getData()).isBetween(0L, 700L);
assertThat(usageDetail.getDuration()).isBetween(0L, 300L);
}
}
}
- The
contextLoads
test case verifies the application starts successfully. - The
testUsageDetailSender
test case uses the test binder to receive messages from the output destination where the supplier publishes messages to.
UsageCostProcessor
Processor
Either download the initialzr generated project directly or visit the Spring Initialzr site and follow these instructions:
- Create a new Maven project with a Group name of
io.spring.dataflow.sample
and an Artifact name ofusage-cost-processor-kafka
. - In the Dependencies text box, type
kafka
to select the Kafka binder dependency. - In the Dependencies text box, type
cloud stream
to select the Spring Cloud Stream dependency. - In the Dependencies text box, type
Actuator
to select the Spring Boot actuator dependency. - If your target platform is
Cloud Foundry
, typeCloud Connectors
to select the Spring Cloud Connector dependency. - Click the Generate Project button.
Now you should unzip
the usage-cost-processor-kafka.zip
file and import the project into your favorite IDE.
Business Logic
Now we can create the code required for this application.
- Create the
UsageDetail
class in theio.spring.dataflow.sample.usagecostprocessor
with content that resembles UsageDetail.java. TheUsageDetail
class containsuserId
,data
and,duration
properties. - Create the
UsageCostDetail
class in theio.spring.dataflow.sample.usagecostprocessor
package with content that resembles UsageCostDetail.java. ThisUsageCostDetail
class containsuserId
,callCost
, anddataCost
properties. - Create the
UsageCostProcessor
class in theio.spring.dataflow.sample.usagecostprocessor
package that receives theUsageDetail
message, computes the call and data cost and sends aUsageCostDetail
message. The following listing shows the source code:
package io.spring.dataflow.sample.usagecostprocessor;
import java.util.function.Function;
import io.spring.dataflow.sample.UsageCostDetail;
import io.spring.dataflow.sample.UsageDetail;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class UsageCostProcessor {
private double ratePerSecond = 0.1;
private double ratePerMB = 0.05;
@Bean
public Function<UsageDetail, UsageCostDetail> processUsageCost() {
return usageDetail -> {
UsageCostDetail usageCostDetail = new UsageCostDetail();
usageCostDetail.setUserId(usageDetail.getUserId());
usageCostDetail.setCallCost(usageDetail.getDuration() * this.ratePerSecond);
usageCostDetail.setDataCost(usageDetail.getData() * this.ratePerMB);
return usageCostDetail;
};
}
}
In the preceding application, we are providing a bean that returns a java.util.function.Function
that consumes a UsageDetail
as input and publishes a UsageCostDetail
as ouptut.
Configuring the UsageCostProcessor
Application
When configuring this processor
application, we need to set both the input and output destinations (Kafka topics).
By default, Sprig Cloud Stream uses binding names as processUsageCost-in-0
and processUsageCost-out-0
which becomes the topic names unless the application overrides them.
However, in our case, as in the producer above, we don't want these defaults but rather we would want to make them more descriptive.
We want to use the binding name as input
and output
and provide custom destinations on them.
In src/main/resources/application.properties
, you can add the following properties:
spring.cloud.stream.function.bindings.processUsageCost-in-0=input
spring.cloud.stream.function.bindings.processUsageCost-out-0=output
spring.cloud.stream.bindings.input.destination=usage-detail
spring.cloud.stream.bindings.output.destination=usage-cost
- The
spring.cloud.stream.function.bindings.processUsageCost-in-0
overrides the binding name toinput
. - The
spring.cloud.stream.function.bindings.processUsageCost-out-0
overrides the binding name tooutput
. - The
spring.cloud.stream.bindings.processUsageCost-in-0.destination
sets the destination to theusage-detail
Kafka topic. - The
spring.cloud.stream.bindings.processUsageCost-out-0.destination
sets the destination to theusage-cost
Kafka topic.
Building
Now we can build the Usage Cost Processor application.
In the usage-cost-processor
directory, use the following command to build the project with Maven:
./mvnw clean package -Pkafka
Testing
We can use the same test binder that we used above for testing the supplier.
To unit test the UsageCostProcessor
, add the following code in the UsageCostProcessorApplicationTests
class:
package io.spring.dataflow.sample.usagecostprocessor;
import java.util.HashMap;
import java.util.Map;
import io.spring.dataflow.sample.UsageCostDetail;
import io.spring.dataflow.sample.UsageDetail;
import org.junit.jupiter.api.Test;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.stream.binder.test.InputDestination;
import org.springframework.cloud.stream.binder.test.OutputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import static org.assertj.core.api.Assertions.assertThat;
public class UsageCostProcessorApplicationTests {
@Test
public void contextLoads() {
}
@Test
public void testUsageCostProcessor() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration.getCompleteConfiguration(
UsageCostProcessorApplication.class)).web(WebApplicationType.NONE)
.run()) {
InputDestination source = context.getBean(InputDestination.class);
UsageDetail usageDetail = new UsageDetail();
usageDetail.setUserId("user1");
usageDetail.setDuration(30L);
usageDetail.setData(100L);
final MessageConverter converter = context.getBean(CompositeMessageConverter.class);
Map<String, Object> headers = new HashMap<>();
headers.put("contentType", "application/json");
MessageHeaders messageHeaders = new MessageHeaders(headers);
final Message<?> message = converter.toMessage(usageDetail, messageHeaders);
source.send(message);
OutputDestination target = context.getBean(OutputDestination.class);
Message<byte[]> sourceMessage = target.receive(10000);
final UsageCostDetail usageCostDetail = (UsageCostDetail) converter
.fromMessage(sourceMessage, UsageCostDetail.class);
assertThat(usageCostDetail.getCallCost()).isEqualTo(3.0);
assertThat(usageCostDetail.getDataCost()).isEqualTo(5.0);
}
}
}
- The
contextLoads
test case verifies the application starts successfully. - The
testUsageCostProcessor
test case uses the test binder'sInputDestination
to publish a message which is consumed by the function in the processor. Then we use theOutputDestination
to verify that theUsageDetail
is property transformed into aUsageCostDetail
.
UsageCostLogger
Sink
Either download the initialzr generated project directly or visit the Spring Initialzr site and follow these instructions:
- Create a new Maven project with a Group name of
io.spring.dataflow.sample
and an Artifact name ofusage-cost-logger-kafka
. - In the Dependencies text box, type
kafka
to select the Kafka binder dependency. - In the Dependencies text box, type
cloud stream
to select the Spring Cloud Stream dependency. - In the Dependencies text box, type
Actuator
to select the Spring Boot actuator dependency. - If your target platform is
Cloud Foundry
, typeCloud Connectors
to select the Spring Cloud Connector dependency. - Click the Generate Project button.
Now you should unzip
the usage-cost-logger-kafka.zip
file and import the project into your favorite IDE.
Business Logic
Now we can create the business logic for the sink application. To do so:
- Create a
UsageCostDetail
class in theio.spring.dataflow.sample.usagecostlogger
package with content that resembles UsageCostDetail.java. TheUsageCostDetail
class containsuserId
,callCost
, anddataCost
properties. - Create the
UsageCostLogger
class in theio.spring.dataflow.sample.usagecostlogger
package to receive theUsageCostDetail
message and log it. The following listing shows the source code:
package io.spring.dataflow.sample.usagecostlogger;
import java.util.function.Consumer;
import io.spring.dataflow.sample.UsageCostDetail;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class UsageCostLogger {
private static final Logger logger = LoggerFactory.getLogger(UsageCostLoggerApplication.class);
@Bean
public Consumer<UsageCostDetail> process() {
return usageCostDetail -> {
logger.info(usageCostDetail.toString());
};
}
}
Here we have a java.util.function.Consumer
bean that consumes a UsageCostDetail
and then logs that information.
Configuring the UsageCostLogger
Application
When configuring the consumer
application, we need to set the input
binding destination (a Kafka topic).
By default, the input binding used by Spring Cloud Stream will be process-in-0
(so does the destination name if the application does not override it).
We want to override these to make the sink application work with the above two applications (source and processor).
In src/main/resources/application.properties
, you can add them:
spring.cloud.stream.function.bindings.process-in-0=input
spring.cloud.stream.bindings.input.destination=usage-cost
The spring.cloud.stream.function.bindings.process-in-0
property overrides the binding name to input
and spring.cloud.stream.bindings.input.destinationproperty sets the destination to the
usage-cost` Kafka topic.
There are many configuration options that you can choose to extend/override to achieve the desired runtime behavior when using Apache Kafka as the message broker. The Apache Kafka-specific binder configuration properties are listed in Apache Kafka-binder documentation
Building
Now we can build the Usage Cost Logger application.
In the usage-cost-logger
directory, run the following command to build the project with Maven:
./mvnw clean package
Testing
To unit test the UsageCostLogger
, add the following code in the UsageCostLoggerApplicationTests
class:
package io.spring.dataflow.sample.usagecostlogger;
import java.util.HashMap;
import java.util.Map;
import io.spring.dataflow.sample.UsageCostDetail;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.test.system.CapturedOutput;
import org.springframework.boot.test.system.OutputCaptureExtension;
import org.springframework.cloud.stream.binder.test.InputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MessageConverter;
@ExtendWith(OutputCaptureExtension.class)
public class UsageCostLoggerApplicationTests {
@Test
public void contextLoads() {
}
@Test
public void testUsageCostLogger(CapturedOutput output) {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration
.getCompleteConfiguration(UsageCostLoggerApplication.class))
.web(WebApplicationType.NONE)
.run()) {
InputDestination source = context.getBean(InputDestination.class);
UsageCostDetail usageCostDetail = new UsageCostDetail();
usageCostDetail.setUserId("user1");
usageCostDetail.setCallCost(3.0);
usageCostDetail.setDataCost(5.0);
final MessageConverter converter = context.getBean(CompositeMessageConverter.class);
Map<String, Object> headers = new HashMap<>();
headers.put("contentType", "application/json");
MessageHeaders messageHeaders = new MessageHeaders(headers);
final Message<?> message = converter.toMessage(usageCostDetail, messageHeaders);
source.send(message);
Awaitility.await().until(output::getOut, value -> value.contains("{\"userId\": \"user1\", \"callCost\": \"3.0\", \"dataCost\": \"5.0\" }"));
}
}
}
- The
contextLoads
test case verifies the application starts successfully. - The
testUsageCostLogger
test case verifies that theprocess
method ofUsageCostLogger
is invoked. We use theOutputCaptureExtension
facility provided by Spring Boot testing infrastructure to verify that the message is logged to the console.
Deployment
In this section, we deploy the applications we created earlier to the local machine, to Cloud Foundry, and to Kubernetes.
When you deploy these three applications (UsageDetailSender
, UsageCostProcessor
and UsageCostLogger
), the flow of message is as follows:
UsageDetailSender -> UsageCostProcessor -> UsageCostLogger
The UsageDetailSender
source application's output is connected to the UsageCostProcessor
processor application's input.
The UsageCostProcessor
application's output is connected to the UsageCostLogger
sink application's input.
When these applications run, the Kafka
binder binds the applications' output and input boundaries to the corresponding topics in Kafka.
Local
This section shows how to run the three applications as standalone applications in your local
environment.
If you have not already done so, you must download and set up Kafka
in your local environment.
After unpacking the downloaded archive, you can start the ZooKeeper
and Kafka
servers by running the following commands:
./bin/zookeeper-server-start.sh config/zookeeper.properties &
./bin/kafka-server-start.sh config/server.properties &
Running the Source
By using the pre-defined configuration properties (along with a unique server port) for UsageDetailSender
, you can run the application, as follows:
java -jar target/usage-detail-sender-kafka-0.0.1-SNAPSHOT.jar --server.port=9001 &
Now you can see the messages being sent to the usage-detail
Kafka topic by using the Kafka console consumer, as follows:
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic usage-detail
To list the topics, run the following command:
./bin/kafka-topics.sh --zookeeper localhost:2181 --list
Running the Processor
By using the pre-defined configuration properties(along with a unique server port) for UsageCostProcessor
, you can run the application, as follows:
java -jar target/usage-cost-processor-kafka-0.0.1-SNAPSHOT.jar --server.port=9002 &
With the UsageDetail
data in the usage-detail
Kafka topic from the UsageDetailSender
source application, you can see the UsageCostDetail
from the usage-cost
Kafka topic, as follows:
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic usage-cost
Running the Sink
By using the pre-defined configuration properties (along with a unique server port) for UsageCostLogger
, you can run the application, as follows:
java -jar target/usage-cost-logger-kafka-0.0.1-SNAPSHOT.jar --server.port=9003 &
Now you can see that this application logs the usage cost detail.
Cloud Foundry
This section walks you through how to deploy the UsageDetailSender
, UsageCostProcessor
, and UsageCostLogger
applications on CloudFoundry.
Create a CF Manifest for the UsageDetail
Sender
You need to create a CF manifest YAML file called usage-detail-sender.yml
for the UsageDetailSender
to define its configuration properties, as follows
applications:
- name: usage-detail-sender
timeout: 120
path: ./target/usage-detail-sender-kafka-0.0.1-SNAPSHOT.jar
memory: 1G
buildpack: java_buildpack
env:
SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS: [Kafka_Service_IP_Address:Kafka_Service_Port]
SPRING_CLOUD_STREAM_KAFKA_BINDER_ZKNODES: [ZooKeeper_Service_IP_Address:ZooKeeper_Service_Port]
Then you need to push the UsageDetailSender
application by using its manifest YAML file, as follows:
cf push -f usage-detail-sender.yml
You need to create a CF manifest YAML file called usage-cost-processor.yml
for the UsageCostProcessor
to define its configuration properties, as follows
applications:
- name: usage-cost-processor
timeout: 120
path: ./target/usage-cost-processor-kafka-0.0.1-SNAPSHOT.jar
memory: 1G
buildpack: java_buildpack
env:
SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS: [Kafka_Service_IP_Address:Kafka_Service_Port]
SPRING_CLOUD_STREAM_KAFKA_BINDER_ZKNODES: [ZooKeeper_Service_IP_Address:ZooKeeper_Service_Port]
Then you need to push the UsageCostProcessor
application by using its manifest YAML file, as follows:
cf push -f usage-cost-processor.yml
You need to create a CF manifest YAML file called usage-cost-logger.yml
for the UsageCostLogger
to define its configuration properties, as follows:
applications:
- name: usage-cost-logger
timeout: 120
path: ./target/usage-cost-logger-kafka-0.0.1-SNAPSHOT.jar
memory: 1G
buildpack: java_buildpack
env:
SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS: [Kafka_Service_IP_Address:Kafka_Service_Port]
SPRING_CLOUD_STREAM_KAFKA_BINDER_ZKNODES: [ZooKeeper_Service_IP_Address:ZooKeeper_Service_Port]
Then you need to push the UsageCostLogger
application by using its manifest YAML file, as follows:
cf push -f usage-cost-logger.yml
You can see the applications by running the cf apps
command, as the folowing example (with output) shows:
cf apps
name requested state instances memory disk urls
usage-cost-logger started 1/1 1G 1G usage-cost-logger.cfapps.io
usage-cost-processor started 1/1 1G 1G usage-cost-processor.cfapps.io
usage-detail-sender started 1/1 1G 1G usage-detail-sender.cfapps.io
2019-05-13T23:23:33.36+0530 [APP/PROC/WEB/0] OUT 2019-05-13 17:53:33.362 INFO 15 --- [e-cost.logger-1] i.s.d.s.u.UsageCostLoggerApplication : {"userId": "user5", "callCost": "1.0", "dataCost": "12.350000000000001" }
2019-05-13T23:23:33.46+0530 [APP/PROC/WEB/0] OUT 2019-05-13 17:53:33.467 INFO 15 --- [e-cost.logger-1] i.s.d.s.u.UsageCostLoggerApplication : {"userId": "user1", "callCost": "19.0", "dataCost": "10.0" }
2019-05-13T23:23:34.46+0530 [APP/PROC/WEB/0] OUT 2019-05-13 17:53:34.466 INFO 15 --- [e-cost.logger-1] i.s.d.s.u.UsageCostLoggerApplication : {"userId": "user4", "callCost": "2.2", "dataCost": "5.15" }
2019-05-13T23:23:35.46+0530 [APP/PROC/WEB/0] OUT 2019-05-13 17:53:35.469 INFO 15 --- [e-cost.logger-1] i.s.d.s.u.UsageCostLoggerApplication : {"userId": "user3", "callCost": "21.0", "dataCost": "17.3" }
Kubernetes
This section walks you through how to deploy the three Spring Cloud Stream applications on Kubernetes.
Setting up the Kubernetes Cluster
For this we need a running Kubernetes cluster. For this example we will deploy to minikube
.
Verifying Minikube is Running
To verify that Minikube is running, run the following command (shown with typical output if Minikube is running):
$minikube status
host: Running
kubelet: Running
apiserver: Running
kubectl: Correctly Configured: pointing to minikube-vm at 192.168.99.100
Installing Apache Kafka
Now we can install the Kafka message broker by using the default configuration from Spring Cloud Data Flow. To do so, run the following command:
kubectl apply -f https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/v2.6.3/src/kubernetes/kafka/kafka-deployment.yaml \
-f https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/v2.6.3/src/kubernetes/kafka/kafka-svc.yaml \
-f https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/v2.6.3/src/kubernetes/kafka/kafka-zk-deployment.yaml \
-f https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/v2.6.3/src/kubernetes/kafka/kafka-zk-svc.yaml
Building Docker Images
To build Docker images, we use the jib Maven plugin.
If you downloaded the source distribution, the jib plugin is already configured.
If you built the apps from scratch, add the following under plugins
in each pom.xml
file:
<plugin>
<groupId>com.google.cloud.tools</groupId>
<artifactId>jib-maven-plugin</artifactId>
<version>0.10.1</version>
<configuration>
<from>
<image>springcloud/openjdk</image>
</from>
<to>
<image>${docker.org}/${project.artifactId}:${docker.version}</image>
</to>
<container>
<useCurrentTimestamp>true</useCurrentTimestamp>
</container>
</configuration>
</plugin>
Then add the following properties under the properties
section of each pom.xml
file. For this example, we use the following properties:
<docker.org>springcloudstream</docker.org>
<docker.version>${project.version}</docker.version>
Now you can run the Maven build to create the Docker images in the minikube
Docker registry. To do so, run the following commands:
$ eval $(minikube docker-env)
$./mvnw package jib:dockerBuild
If you downloaded the project source, the project includes a parent pom to let you build all the modules with a single command.
Otherwise, run the build for the source, processor, and sink individually.
You need only run eval $(minikube docker-env)
once for each terminal session.
Deploying the Stream
To deploy the stream, you must first copy and paste the following YAML and save it to usage-cost-stream.yaml
:
kind: Pod
apiVersion: v1
metadata:
name: usage-detail-sender
labels:
app: usage-cost-stream
spec:
containers:
- name: usage-detail-sender
image: springcloudstream/usage-detail-sender-kafka:0.0.1-SNAPSHOT
ports:
- containerPort: 80
protocol: TCP
env:
- name: SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS
value: kafka
- name: SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION
value: user-details
- name: SERVER_PORT
value: '80'
restartPolicy: Always
---
kind: Pod
apiVersion: v1
metadata:
name: usage-cost-processor
labels:
app: usage-cost-stream
spec:
containers:
- name: usage-cost-processor
image: springcloudstream/usage-cost-processor-kafka:0.0.1-SNAPSHOT
ports:
- containerPort: 80
protocol: TCP
env:
- name: SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS
value: kafka
- name: SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP
value: usage-cost-stream
- name: SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION
value: user-details
- name: SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION
value: user-cost
- name: SERVER_PORT
value: '80'
restartPolicy: Always
---
kind: Pod
apiVersion: v1
metadata:
name: usage-cost-logger
labels:
app: usage-cost-stream
spec:
containers:
- name: usage-cost-logger
image: springcloudstream/usage-cost-logger-kafka:0.0.1-SNAPSHOT
ports:
- containerPort: 80
protocol: TCP
env:
- name: SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS
value: kafka
- name: SPRING_CLOUD_STREAM_BINDINGS_INPUT_GROUP
value: usage-cost-stream
- name: SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION
value: user-cost
- name: SERVER_PORT
value: '80'
restartPolicy: Always
Then you need to deploy the apps, by running the following command:
kubectl apply -f usage-cost-stream.yaml
If all is well, you should see the following output:
pod/usage-detail-sender created
pod/usage-cost-processor created
pod/usage-cost-logger created
The preceding YAML specifies three pod resources, for the source, processor, and sink applications. Each pod has a single container that references the corresponding docker image.
We set the Kafka binding parameters as environment variables.
The input and output destination names have to be correct to wire the stream. Specifically, the output of the source must be the same as the input of the processor, and the output of the processor must be the same as the input of the sink.
We also set the logical hostname for the Kafka broker so that each application can connect to it.
Here we use the Kafka service name — kafka
, in this case.
We set the app: user-cost-stream
label to logically group our apps.
We set the Spring Cloud Stream binding parameters by using environment variables. The input and output destination names have to be correct to wire the stream. Specifically, the output of the source must be the same as the input of the processor, and the output of the processor must be the same as the input of the sink. We set the inputs and outputs as follows:
- Usage Detail Sender:
SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION=user-details
- Usage Cost Processor:
SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION=user-details
andSPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION=user-cost
- Usage Cost Logger:
SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION=user-cost
Verifying the Deployment
You can use the following command to tail the log for the usage-cost-logger
sink:
kubectl logs -f usage-cost-logger
You should see messages similar to the following messages:
2019-05-02 15:48:18.550 INFO 1 --- [container-0-C-1] i.s.d.s.u.UsageCostLoggerApplication : {"userId": "Mark", "callCost": "21.1", "dataCost": "26.05" }
2019-05-02 15:48:19.553 INFO 1 --- [container-0-C-1] i.s.d.s.u.UsageCostLoggerApplication : {"userId": "Ilaya", "callCost": "4.2", "dataCost": "15.75" }
2019-05-02 15:48:20.549 INFO 1 --- [container-0-C-1] i.s.d.s.u.UsageCostLoggerApplication : {"userId": "Mark", "callCost": "28.400000000000002", "dataCost": "15.0" }
2019-05-02 15:48:21.553 INFO 1 --- [container-0-C-1] i.s.d.s.u.UsageCostLoggerApplication : {"userId": "Ilaya", "callCost": "16.8", "dataCost": "28.5" }
2019-05-02 15:48:22.551 INFO 1 --- [container-0-C-1] i.s.d.s.u.UsageCostLoggerApplication : {"userId": "Mark", "callCost": "22.700000000000003", "dataCost": "20.3" }
2019-05-02 15:48:23.556 INFO 1 --- [container-0-C-1] i.s.d.s.u.UsageCostLoggerApplication : {"userId": "Janne", "callCost": "16.6", "dataCost": "2.6" }
2019-05-02 15:48:24.557 INFO 1 --- [container-0-C-1] i.s.d.s.u.UsageCostLoggerApplication : {"userId": "Janne", "callCost": "6.7", "dataCost": "1.0" }
2019-05-02 15:48:25.555 INFO 1 --- [container-0-C-1] i.s.d.s.u.UsageCostLoggerApplication : {"userId": "Glenn", "callCost": "3.7", "dataCost": "2.6500000000000004" }
2019-05-02 15:48:26.557 INFO 1 --- [container-0-C-1] i.s.d.s.u.UsageCostLoggerApplication : {"userId": "Janne", "callCost": "24.200000000000003", "dataCost": "32.9" }
2019-05-02 15:48:27.556 INFO 1 --- [container-0-C-1] i.s.d.s.u.UsageCostLoggerApplication : {"userId": "Glenn", "callCost": "19.200000000000003", "dataCost": "7.4" }
2019-05-02 15:48:28.559 INFO 1 --- [container-0-C-1] i.s.d.s.u.UsageCostLoggerApplication : {"userId": "Sabby", "callCost": "17.7", "dataCost": "27.35" }
2019-05-02 15:48:29.562 INFO 1 --- [container-0-C-1] i.s.d.s.u.UsageCostLoggerApplication : {"userId": "Ilaya", "callCost": "26.8", "dataCost": "32.45" }
2019-05-02 15:48:30.561 INFO 1 --- [container-0-C-1] i.s.d.s.u.UsageCostLoggerApplication : {"userId": "Janne", "callCost": "26.5", "dataCost": "33.300000000000004" }
2019-05-02 15:48:31.562 INFO 1 --- [container-0-C-1] i.s.d.s.u.UsageCostLoggerApplication : {"userId": "Sabby", "callCost": "16.1", "dataCost": "5.0" }
2019-05-02 15:48:32.564 INFO 1 --- [container-0-C-1] i.s.d.s.u.UsageCostLoggerApplication : {"userId": "Janne", "callCost": "16.3", "dataCost": "23.6" }
2019-05-02 15:48:33.567 INFO 1 --- [container-0-C-1] i.s.d.s.u.UsageCostLoggerApplication : {"userId": "Ilaya", "callCost": "29.400000000000002", "dataCost": "2.1" }
2019-05-02 15:48:34.567 INFO 1 --- [container-0-C-1] i.s.d.s.u.UsageCostLoggerApplication : {"userId": "Janne", "callCost": "5.2", "dataCost": "20.200000000000003" }
Cleaning up
To delete the stream, we can use the label we created earlier. The following command shows how to do so:
kubectl delete pod -l app=usage-cost-stream
To uninstall Kafka, run the following command:
kubectl delete all -l app=kafka
What's Next
The RabiitMQ shows you how to create the same three applications but with RabbitMQ instead. Alternatively, you can use Spring Cloud Data Flow to deploy the three applications, as described in Create and Deploy a Stream Processing Pipeline using Spring Cloud Data Flow.