Create and Deploy a Python Application
This recipe illustrates how to deploy a Python script as an Data Flow application.
Unlike the other applications types (e.g. source
, processor
or sink
), Data Flow does not set deployment properties that wire up producers and consumers when deploying the app
application type.
It is the developer’s responsibility to 'wire up' the multiple applications when deploying in order for them to communicate by using deployment properties.
The recipe creates a data processing pipelines that dispatches input
stream of timestamps to either even
or odd
downstream channels.
Technically the recipe implements the Dynamic Router integration pattern.
The Pipeline takes time's source timestamps
messages from an timeDest
input channel, depending on the timestamp value it routes the message to dedicated evenDest
or oddDest
downstream channels.
The following diagram shows the architecture of the cafe processing pipelines.
As timestamp source will use the prebuilt Time Source application but registered as Data Flow App
type.
It continuously emits timestamps to a downstream Kafka topic called timeDest
.
The Router
app, implemented by the Python script and packaged as a Docker image, consumes the incoming timestamps from the timeDest
Kafka topic and according to the timestamp value routes the messages downstream to either the evenDest
or oddDest
Kafka topics.
The Even Logger
and Odd Logger
components are the prebuilt Log Sink applications but registered as Data Flow App
type.
Loggers consume the evenDest
or oddDest
topics and prints the incoming message in on the console.
Apache Kafka will be used as the messaging middleware.
Development
The source code can be found in the samples GitHub repository and downloaded as a zipped archive: polyglot-python-app.zip.
The pythonrouterapp.py implements the timestamp Router application logic.
from kafka import KafkaConsumer, KafkaProducer
from kafka.admin import KafkaAdminClient, NewTopic
from kafka.errors import TopicAlreadyExistsError
from util.actuator import Actuator
from util.arguments import get_kafka_brokers, get_env_info, get_channel_topic
class Router:
def __init__(self, info, kafka_brokers, input_topic, even_topic, odd_topic):
self.kafka_brokers = kafka_brokers
self.input_topic = input_topic
self.even_topic = even_topic
self.odd_topic = odd_topic
# Serve the liveliness and readiness probes via http server in a separate thread.
Actuator.start(port=8080, info=info)
# Ensure the output topics exist.
self.__create_topics_if_missing([self.input_topic, self.even_topic, self.odd_topic])
self.consumer = KafkaConsumer(self.input_topic, bootstrap_servers=self.kafka_brokers)
self.producer = KafkaProducer(bootstrap_servers=self.kafka_brokers)
def __create_topics_if_missing(self, topic_names):
admin_client = KafkaAdminClient(bootstrap_servers=self.kafka_brokers, client_id='test')
for topic in topic_names:
try:
new_topic = NewTopic(name=topic, num_partitions=1, replication_factor=1)
admin_client.create_topics(new_topics=[new_topic], validate_only=False)
except TopicAlreadyExistsError:
print ('Topic: {} already exists!')
def process_timestamps(self):
while True:
for message in self.consumer:
if message.value is not None:
if self.is_even_timestamp(message.value):
self.producer.send(self.even_topic, b'Even timestamp: ' + message.value)
else:
self.producer.send(self.odd_topic, b'Odd timestamp:' + message.value)
@staticmethod
def is_even_timestamp(value):
return int(value[-1:]) % 2 == 0
Router(
get_env_info(),
get_kafka_brokers(),
get_channel_topic('input'),
get_channel_topic('even'),
get_channel_topic('odd')
).process_timestamps()
If the print
command is used inside the Python script, later must be flushed with sys.stdout.flush()
to prevent the output buffer being filled up, causing disruption to the Kafka’s consumer/producer flow!
- The kafka-python library is used to consume and produce Kafka messages. The process_timestamps method continuously consumes timestamps from the input channel and routs the even or odd values to the output channels.
- The Actuator class inside actuator.py utility is used to expose operational information about the running application, such as health, liveliness, info, etc.
It runs an embedded HTTP server in a separate thread and exposes the
/actuator/health
and/actuator/info
entry-points handles the Kubernetes liveness and readiness probes requests. - The arguments.py utility helps to retrieve the required input parameters from the command line arguments and environment variables. The utility assumes default (e.g. exec) entry point style. Note that Data Flow passes the Kafka broker connection properties as environment variables.
For the python_router_app.py
to act as a Data Flow app
it needs to be bundled in a docker image and uploaded to DockerHub
. Following Dockerfile illustrates how to bundle a Python script into docker image:
FROM python:3.7.3-slim
RUN pip install kafka-python
RUN pip install flask
ADD /util/* /util/
ADD python_router_app.py /
ENTRYPOINT ["python","/python_router_app.py"]
CMD []
The Dockerfile installs the required dependencies, adds the python script (e.g. ADD python_router_app.py
) and utilities (under the util
folder above) and sets the command entry.
Build
We will now build the docker image and push it to the DockerHub registry.
Checkout the sample project and navigate to the polyglot-python-app
folder:
git clone https://github.com/spring-cloud/spring-cloud-dataflow-samples
cd ./spring-cloud-dataflow-samples/dataflow-website/recipes/polyglot/polyglot-python-app/
From within the polyglot-python-app
, build and push the polyglot-python-app Docker image to DockerHub:
docker build -t springcloud/polyglot-python-app:0.2 .
docker push springcloud/polyglot-python-app:0.2
Replace springcloud
with your docker hub prefix.
Once published in Docker Hub, the image can be registered in Data Flow and deployed.
Deployment
Follow the installation instructions to set up Data Flow on Kubernetes.
Retrieve the Data Flow url from minikube (minikube service --url scdf-server
) and configure your Data Flow shell:
dataflow config server --uri http://192.168.99.100:30868
Import the SCDF time
and log
app starters and register the polyglot-python-app as python-router
of type app
app register --name time --type app --uri docker:springcloudstream/time-source-kafka:2.1.0.RELEASE --metadata-uri maven://org.springframework.cloud.stream.app:time-source-kafka:jar:metadata:2.1.0.RELEASE
app register --name log --type app --uri docker:springcloudstream/log-sink-kafka:2.1.1.RELEASE --metadata-uri maven://org.springframework.cloud.stream.app:log-sink-kafka:jar:metadata:2.1.1.RELEASE
app register --type app --name python-router --uri docker://springcloud/polyglot-python-app:0.2
The docker://springcloud/polyglot-python-app:0.2
is resolved from the DockerHub repository.
Create the timestamp routing Stream pipeline:
stream create --name timeStampStream --definition "time || python-router || evenLogger: log || oddLogger: log"
The stream definitions above make use of the label feature in the DSL.
As result the following stream pipeline is created:
The time
, log
and python-router
are registered as App type applications and therefore can have multiple input and output bindings (e.g. channels). Data Flow does not make any assumptions about the flow of data from one application to another. It is the developer’s responsibility to 'wire up' the multiple applications when deploying in order for them to communicate.
Keeping this in mind we deploy the timestamp Stream pipeline with the polyglot-python-app-deployment.properties deployment properties:
stream deploy --name timeStampStream --propertiesFile <polyglot-python-app folder>/polyglot-python-app-deployment.properties
The deployment properties defines the Kafka topics used to wire the time, python-router and logger applications:
app.time.spring.cloud.stream.bindings.output.destination=timeDest
app.python-router.spring.cloud.stream.bindings.input.destination=timeDest
app.python-router.spring.cloud.stream.bindings.even.destination=evenDest
app.python-router.spring.cloud.stream.bindings.odd.destination=oddDest
app.evenLogger.spring.cloud.stream.bindings.input.destination=evenDest
app.oddLogger.spring.cloud.stream.bindings.input.destination=oddDest
the app.python-router.xxx prefix is a Data Flow convention to map the properties specified after the prefix to the python-router app in the timeStampStream stream.
The timestamp channel is bound to the timeDest
Kafka topic, the router's even output channel is bound to the evenDest
topic and the odd channel is bound to the oddDest
topic.
After the deployment the data flow looks like this:
-
Use
kubectl get all
command to list the statuses of the deployed k8s containers. Usekubectl logs -f xxx
to observe the even and odd pipeline output.For example the
kubectl logs -f po/timestampstream-evenlogger-xxx
should output:2019-05-17 17:56:36.241 INFO 1 --- log-sink : Even timestamp:05/17/19 17:56:36 2019-05-17 17:56:38.301 INFO 1 --- log-sink : Even timestamp:05/17/19 17:56:38 2019-05-17 17:56:40.402 INFO 1 --- log-sink : Even timestamp:05/17/19 17:56:40 ...
and the
kubectl logs -f po/timestampstream-oddlogger-xxx
should output:2019-05-17 17:56:37.447 INFO 1 --- log-sink : Odd timestamp:05/17/19 17:56:37 2019-05-17 17:56:39.358 INFO 1 --- log-sink : Odd timestamp:05/17/19 17:56:39 2019-05-17 17:56:41.452 INFO 1 --- log-sink : Odd timestamp:05/17/19 17:56:41 ...