Python Stream Processor
The example code in this section shows how to run a Python script as a processor within a Data Flow Stream.
In this guide, we package the Python script as a Docker image and deploy it to Kubernetes. We use Apache Kafka as the messaging middleware.
We register the Docker image in Data Flow as an application of type Processor
.
The guide demonstrates a text-processing streaming data pipeline. It receives text-messages over HTTP, delegates the text processing to a Python script registered as a Data Flow processor, and prints the result to the logs. The Python script reverses the input text if the reversestring
property is set to true
. Otherwise, the resulting message remains unchanged.
The following diagram shows the text-reversing processing pipeline:
Development
You can find the source code in the samples GitHub repository and download it as a zipped archive from polyglot-python-processor.zip.
The processor uses the kafka-python library to create consumer and producer connections.
The main loop of execution resides in python_processor.py.
For each message received on the inbound Kafka topic, the script either sends the output to the Kafka topic as-is, or, if --reversestring=true
is passed to the processor as part of the stream definition, reverses the string and then sends it to the output. The following listing shows python_processor.py
:
#!/usr/bin/env python
import os
import sys
from kafka import KafkaConsumer, KafkaProducer
from util.http_status_server import HttpHealthServer
from util.task_args import get_kafka_binder_brokers, get_input_channel, get_output_channel, get_reverse_string
consumer = KafkaConsumer(get_input_channel(), bootstrap_servers=[get_kafka_binder_brokers()])
producer = KafkaProducer(bootstrap_servers=[get_kafka_binder_brokers()])
HttpHealthServer.run_thread()
while True:
for message in consumer:
output_message = message.value
reverse_string = get_reverse_string()
if reverse_string is not None and reverse_string.lower() == "true":
output_message = "".join(reversed(message.value))
producer.send(get_output_channel(), output_message)
Helper methods are defined in a utility file called task_args.py
. They aid in extracting common environment and command line values.
An HTTPServer
implementation runs as a thread that responds to Spring Boot path health check endpoints (/actuator/health
and /actuator/info
) with a default implementation of always returning HTTP 200. A Dockerfile
creates the image.
For python_processor.py
to act as a Data Flow processor
, it needs to be bundled in a docker image and uploaded to DockerHub
. The following Dockerfile shows how to bundle a Python script into a Docker image:
FROM springcloud/openjdk:latest
RUN apt-get update && apt-get install --no-install-recommends -y \
python-pip \
&& rm -rf /var/lib/apt/lists/*
RUN pip install kafka-python
COPY python_processor.py /processor/
COPY util/*.py /processor/util/
ENTRYPOINT ["python", "/processor/python_processor.py", "$@", "--"]
The Dockerfile installs the required dependencies, adds the python_processor.py
script and utilities (under the util
folder), and sets the command entry.
Build
We can now build the Docker image and push it to the DockerHub registry. To do so:
-
Check out the sample project and navigate to the
polyglot-python-processor
folder:git clone https://github.com/spring-cloud/spring-cloud-dataflow-samples cd ./spring-cloud-dataflow-samples/dataflow-website/recipes/polyglot/polyglot-python-processor/
-
From within the
polyglot-python-processor/
, build and push thepolyglot-python-processor
Docker image to DockerHub:docker build -t springcloud/polyglot-python-processor:0.1 . docker push springcloud/polyglot-python-processor:0.1
Replace
springcloud
with your docker hub prefix.
Once published in Docker Hub, you can register the image in Data Flow and deploy it.
Deployment
To deploy the processor:
- Follow the installation instructions to set up Data Flow on Kubernetes.
-
Retrieve the Data Flow URL from Minikube by running the following command:
minikube service --url scdf-server
-
Configure your Data Flow shell by running the following command:
dataflow config server --uri <Your Data Flow URL>
-
Import the SCDF app starters
app import --uri https://dataflow.spring.io/kafka-docker-latest
-
Register the
polyglot-python-processor
aspython-processor
of typeprocessor
.app register --type processor --name python-processor --uri docker://springcloud/polyglot-python-processor:0.1
docker://springcloud/polyglot-python-processor:0.1
is resolved from the DockerHub repository. -
Create the Data Flow
text-reversal
Stream by running the following command:stream create --name text-reversal --definition "http --server.port=32123 | python-processor --reversestring=true | log"
The
http
source listens for incoming HTTP messages on port32123
and forwards them to thepython-processor
. The processor is configured to reverse the input messages (ifreversestring=true
) and sends them downstream to thelog
sink. -
Deploy the stream by using the
kubernetes.createNodePort
property to expose the HTTP port to the local host by running the following command:stream deploy text-reversal --properties "deployer.http.kubernetes.createNodePort=32123"
-
Retrieve the
http-source
URL from minikube to publish the test data by running the following command:minikube service --url text-reversal-http-v1 http://192.168.99.104:32123
-
Post a sample message against the
http-source
application by running the following command:http post --target http://192.168.99.104:32123 --data "hello world"
If the post is successful, you should see a confirmation message like this:
> POST (text/plain) http://192.168.99.104:32123 hello world > 202 ACCEPTED
-
Inspect the logs for posted message by running the following command
kubectl logs -f <log pod name>
You should see output similar to the following:
INFO 1 --- [container-0-C-1] log-sink : dlrow olleh
You should see the posted message in reversed order (in this case,
dlrow olleh
).