SFTP to JDBC File Ingest
This recipe provides step by step instructions to build a Data Flow pipeline to ingest files from an SFTP source and save the contents to a JDBC data store. The pipeline is designed to launch a task whenever a new file is detected by the SFTP source. In this case, the task is a Spring Batch job that processes the file, converting the contents of each line to uppercase, and inserting it into a table.
The file ingest batch job reads from a CSV text file with lines formatted as first_name,last_name
and writes each entry to a database table by using a JdbcBatchItemWriter
] that performs INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)
for each line.
You can download the project that contains the source code and sample data from your browser or from the command line:
wget https://github.com/spring-cloud/spring-cloud-dataflow-samples/blob/master/dataflow-website/recipes/file-ingest/file-to-jdbc/file-to-jdbc.zip?raw=true -O file-to-jdbc.zip
If you choose not to build the task application yourself, the executable jar is published to the Spring Maven repository and to the springcloud/ingest Docker repository.
The pipeline is built by using the following pre-packaged Spring Cloud Stream applications:
- sftp-dataflow-source is an SFTP source configured to emit a Task Launch Request whenever it detects a new file in one or more polled SFTP directories.
- dataflow-task-launcher-sink is a sink that acts as a REST client to the Data Flow server to launch a Data Flow task.
This pipeline runs on all supported Data Flow platforms.
The SFTP source downloads each file from the SFTP server to a local directory before sending the task launch request.
The request sets localFilePath
as a command line argument for the task. When running on a cloud platform, we need to mount a shared directory available to the SFTP source container and the task container.
For this example, we set up an NFS mounted directory.
Configuring the environment and containers for NFS is platform-specific and is described here for Cloud Foundry v2.3+ and minikube.
Prerequisites
This section covers the set up and configuration steps you need to do before starting the batch application.
Data Flow Installation
Make sure you have installed Spring Cloud Data Flow to the platform of your choice:
NOTE: For Kubernetes, the sample task application is configured to use MySQL. The Data Flow server must also be configured for MySQL.
Using Data Flow
This example assumes that you know how to use Spring Cloud Data Flow to register and deploy applications by using the Spring Cloud Data Flow dashboard or the Spring Cloud Data Flow shell. If you need further instructions on using Data Flow see Stream Processing by Using Spring Cloud Data Flow and Register and Launch a Batch Application by Using Spring Cloud Data Flow.
SFTP server
This example requires access to an SFTP server. For running on a local
machine and minikube
, we use the host machine as the SFTP server. For Cloud Foundry
, and Kubernetes
in general, an external SFTP server is required.
On the SFTP server, create a /remote-files
directory. This is where we drop files to trigger the pipeline.
NFS configuration
NFS is not required when running locally.
Cloud Foundry NFS configuration
This feature is provided in Pivotal Cloud Foundry by NFS Volume Services
To run this example, we need:
- A Cloud Foundry instance (v2.3+) with NFS Volume Services enabled
- An NFS server accessible from the Cloud Foundry instance
- A properly configured
nfs
service instance
NOTE: For simplicity, this example assumes the nfs
service is created with common configuration as follows with a common mount point (/var/scdf
) for all bound apps. You can also set these parameters when binding the NFS service to an application by using deployment propterties:
cf create-service nfs Existing nfs -c '{"share":<nfs-host>/staging","uid":<uid>,"gid":<gid>, "mount":"/var/scdf"}'
Kubernetes NFS configuration
Kubernetes provides many options for configuring and sharing persistent volumes. For this example, we use minikube
and use the host machine as the NFS server. The following instructions works for OS/X
and should be similar for Linux hosts:
Make sure minikube is started. The commands in these instructions provide NFS access to the minikube VM. The minikube IP is subject to change each time it is started, so you should perform these steps after each start.
Expose a shared directory called /staging
.
sudo mkdir /staging
sudo chmod 777 /staging
sudo echo "/staging -alldirs -mapall="$(id -u)":"$(id -g)" $(minikube ip)" >> /etc/exports
sudo nfsd restart
Verify the NFS mounts:
showmount -e 127.0.0.1
Exports list on 127.0.0.1:
/staging 192.168.99.105
Configure persistent volume and persistent volume claim resources. Copy the following and save it to a file named nfs-config.yml
:
---
apiVersion: v1
kind: PersistentVolume
metadata:
name: nfs-volume
spec:
capacity:
storage: 4Gi
accessModes:
- ReadWriteMany
persistentVolumeReclaimPolicy: Retain
storageClassName: standard
nfs:
# The address 192.168.99.1 is the Minikube gateway to the host for VirtualBox. This way
# not the container IP will be visible by the NFS server on the host machine,
# but the IP address of the `minikube ip` command. You will need to
# grant access to the `minikube ip` IP address.
server: 192.168.99.1
path: '/staging'
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: nfs-volume-claim
namespace: default
spec:
storageClassName: standard
accessModes:
- ReadWriteMany
resources:
requests:
storage: 4Gi
Create the resources:
kubectl apply -f nfs-config.yml
Deployment
This section addresses how to deploy to the following environments:
- Local
- Cloud Foundry
- Kubernetes
Local
For local deployment, this example uses Kafka as the message broker. Create directories for the remote and local files:
mkdir -p /tmp/remote-files /tmp/local-files
Register the Applications
If you downloaded and built the sample project, you can register it by using a file://
URL. e.g. file://<path-to-project>/target/ingest-1.0.0-SNAPSHOT.jar
Otherwise, you can use the published Maven jar:
app register --name fileIngest --type task --uri maven://io.spring.cloud.dataflow.ingest:ingest:1.0.0.BUILD-SNAPSHOT
Register the prepackaged sftp
source and task-launcher
sink applications:
app register --name sftp --type source --uri maven://org.springframework.cloud.stream.app:sftp-dataflow-source-kafka:2.1.0.RELEASE
app register --name task-launcher --type sink --uri maven://org.springframework.cloud.stream.app:task-launcher-dataflow-sink-kafka:1.0.1.RELEASE
Create the Task
To create the task, run the following command:
task create fileIngestTask --definition fileIngest
Create and Deploy the Stream
To create and deploy the stream, run the following command:
NOTE: Replace <user>
and <pass>
.
The username
and password
are the credentials for the local (or remote) user.
If you do not use a local SFTP server, specify the host by setting the host
parameter
(and, optionally, the port
parameter). If not defined, host
defaults to 127.0.0.1
and port
defaults to 22
.
stream create --name inboundSftp --definition "sftp --username=<user> --password=<pass> --allow-unknown-keys=true --task.launch.request.taskName=fileIngestTask --remote-dir=/tmp/remote-files/ --local-dir=/tmp/local-files/ | task-launcher" --deploy
The dataflow-task-launcher-sink uses a PollableMessageSource
controlled by a dynamic trigger with exponential backoff. By default, the sink polls its input destination every second. If there are no task launch requests, the polling period continues to double up to a maximum of 30 seconds. If a task launch request is present, the trigger resets to one second. You can continue the trigger parameters by setting the task-launcher
sink properties, trigger.period
and trigger.max-period
, in the stream definition.
Verify Stream deployment
We can see the status of the streams to be deployed with stream list
, as the following example shows:
dataflow:>stream list
╔═══════════╤════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╤════════════════════════════╗
║Stream Name│ Stream Definition │ Status ║
╠═══════════╪════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╪════════════════════════════╣
║inboundSftp│sftp --password='******' --remote-dir=/tmp/remote-files/ --local-dir=/tmp/local-files/ --task.launch.request.taskName=fileIngestTask│The stream has been ║
║ │--allow-unknown-keys=true --username=<user> | task-launcher │successfully deployed ║
╚═══════════╧════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╧════════════════════════════╝
Inspect the Application Logs
In the event the stream failed to deploy or you would like to inspect the logs for any reason, you can get the location of the logs to applications created for the inboundSftp
stream by using the runtime apps
command, as follows:
dataflow:>runtime apps
╔═══════════════════════════╤═══════════╤════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╗
║ App Id / Instance Id │Unit Status│ No. of Instances / Attributes ║
╠═══════════════════════════╪═══════════╪════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╣
║inboundSftp.sftp │ deployed │ 1 ║
║ │ │ guid = 23057 ║
║ │ │ pid = 71927 ║
║ │ │ port = 23057 ║
╟───────────────────────────┼───────────┼────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╢
║inboundSftp.sftp-0 │ deployed │ stderr = /var/folders/hd/5yqz2v2d3sxd3n879f4sg4gr0000gn/T/spring-cloud-deployer-120915912946760306/inboundSftp-1540821009913/inboundSftp.sftp/stderr_0.log ║
║ │ │ stdout = /var/folders/hd/5yqz2v2d3sxd3n879f4sg4gr0000gn/T/spring-cloud-deployer-120915912946760306/inboundSftp-1540821009913/inboundSftp.sftp/stdout_0.log ║
║ │ │ url = http://192.168.64.1:23057 ║
║ │ │working.dir = /var/folders/hd/5yqz2v2d3sxd3n879f4sg4gr0000gn/T/spring-cloud-deployer-120915912946760306/inboundSftp-1540821009913/inboundSftp.sftp ║
╟───────────────────────────┼───────────┼────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╢
║inboundSftp.task-launcher │ deployed │ 1 ║
╟───────────────────────────┼───────────┼────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╢
║ │ │ guid = 60081 ║
║ │ │ pid = 71926 ║
║ │ │ port = 60081 ║
║inboundSftp.task-launcher-0│ deployed │ stderr = /var/folders/hd/5yqz2v2d3sxd3n879f4sg4gr0000gn/T/spring-cloud-deployer-120915912946760306/inboundSftp-1540820991695/inboundSftp.task-launcher/stderr_0.log║
║ │ │ stdout = /var/folders/hd/5yqz2v2d3sxd3n879f4sg4gr0000gn/T/spring-cloud-deployer-120915912946760306/inboundSftp-1540820991695/inboundSftp.task-launcher/stdout_0.log║
║ │ │ url = http://192.168.64.1:60081 ║
║ │ │working.dir = /var/folders/hd/5yqz2v2d3sxd3n879f4sg4gr0000gn/T/spring-cloud-deployer-120915912946760306/inboundSftp-1540820991695/inboundSftp.task-launcher ║
╚═══════════════════════════╧═══════════╧════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╝
Copying a File into the Remote Directory
Normally, data would be uploaded to an SFTP server.
We simulate this by copying a file into the directory specified by --remote-dir
.
You can find sample data in the data/
directory of the sample project.
Copy data/name-list.csv
into the /tmp/remote-files
directory, which the SFTP source is monitoring.
When this file is detected, the sftp
source downloads it to the /tmp/local-files
directory specified by --local-dir
, and emits a task launch request.
The task launch request includes the name of the task to launch along with the local file path, given as a command line argument.
Spring Batch binds each command line argument to a corresponding JobParameter
.
The FileIngestTask
job processes the file given by the JobParameter
named localFilePath
.
Since there have not been any recent requests, the task launches within 30 seconds after the request is published (see the earlier tip above about configuring the launch trigger).
cp data/name-list.csv /tmp/remote-files
When the batch job launches, you see something like this in the SCDF console log:
2018-10-26 16:47:24.879 INFO 86034 --- [nio-9393-exec-7] o.s.c.d.spi.local.LocalTaskLauncher : Command to be executed: /Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/bin/java -jar <path-to>/batch/file-ingest/target/ingest-1.0.0.jar localFilePath=/tmp/local-files/name-list.csv --spring.cloud.task.executionid=1
2018-10-26 16:47:25.100 INFO 86034 --- [nio-9393-exec-7] o.s.c.d.spi.local.LocalTaskLauncher : launching task fileIngestTask-8852d94d-9dd8-4760-b0e4-90f75ee028de
Logs will be in /var/folders/hd/5yqz2v2d3sxd3n879f4sg4gr0000gn/T/fileIngestTask3100511340216074735/1540586844871/fileIngestTask-8852d94d-9dd8-4760-b0e4-90f75ee028de
Inspect Job Executions
After data is received and the batch job runs, it is recorded as a job execution. We can view job executions by, for example, issuing the following command in the Spring Cloud Data Flow shell:
dataflow:>job execution list
╔═══╤═══════╤═════════╤════════════════════════════╤═════════════════════╤══════════════════╗
║ID │Task ID│Job Name │ Start Time │Step Execution Count │Definition Status ║
╠═══╪═══════╪═════════╪════════════════════════════╪═════════════════════╪══════════════════╣
║1 │1 │ingestJob│Tue May 01 23:34:05 EDT 2018│1 │Created ║
╚═══╧═══════╧═════════╧════════════════════════════╧═════════════════════╧══════════════════╝
We can also list more details about that specific job execution:
dataflow:>job execution display --id 1
╔═══════════════════════════════════════╤══════════════════════════════╗
║ Key │ Value ║
╠═══════════════════════════════════════╪══════════════════════════════╣
║Job Execution Id │1 ║
║Task Execution Id │1 ║
║Task Instance Id │1 ║
║Job Name │ingestJob ║
║Create Time │Fri Oct 26 16:57:51 EDT 2018 ║
║Start Time │Fri Oct 26 16:57:51 EDT 2018 ║
║End Time │Fri Oct 26 16:57:53 EDT 2018 ║
║Running │false ║
║Stopping │false ║
║Step Execution Count │1 ║
║Execution Status │COMPLETED ║
║Exit Status │COMPLETED ║
║Exit Message │ ║
║Definition Status │Created ║
║Job Parameters │ ║
║-spring.cloud.task.executionid(STRING) │1 ║
║run.id(LONG) │1 ║
║localFilePath(STRING) │/tmp/local-files/name-list.csv║
╚═══════════════════════════════════════╧══════════════════════════════╝
Verify Data
When the the batch job runs, it processes the file in the local directory (/tmp/local-files
), transforms each item to uppercase names, and inserts it into the database.
You can use any database tool that supports the H2 database to inspect the data. In this example, we use the DBeaver database tool. We can inspect the table to ensure our data was processed correctly.
Within DBeaver, create a connection to the database by using the JDBC URL jdbc:h2:tcp://localhost:19092/mem:dataflow
and user sa
with no password.
When connected, expand the PUBLIC
schema, expand Tables
, and double click on the PEOPLE
table.
When the table data loads, click the Data tab to view the data.
Cloud Foundry
This section describes how to set up Spring Batch and Spring Cloud Data Flow on Cloud Foundry and then create our example batch process.
Prerequisites
Running this example on Cloud Foundry requires:
- Configuring an NFS server and creating an
nfs
service to access it as described in the Cloud Foundry NFS Configuration section. - An external SFTP server with a
/remote-files
directory. - A
mysql
service instance - A
rabbit
service instance - PivotalMySQLWeb or another database tool to view the data
Register the Applications
To register the applications, run the following command:
app register --name fileIngest --type task --uri maven://io.spring.cloud.dataflow.ingest:ingest:1.0.0.BUILD-SNAPSHOT
Then register the prepackaged sftp
source and task-launcher
sink applications:
app register --name sftp --type source --uri maven://org.springframework.cloud.stream.app:sftp-dataflow-source-kafka:2.1.0.RELEASE
app register --name task-launcher --type sink --uri maven://org.springframework.cloud.stream.app:task-launcher-dataflow-sink-kafka:1.0.1.RELEASE
Create the Task
To creat the task, run the following command:
task create fileIngestTask --definition fileIngest
Create the Stream
The sftp
source is configured to publish a task launch request that launches the fileIngestTask
task.
The launch request binds the nfs
service to the task container by using deployment properties task.launch.request.deployment-properties=deployer.*.cloudfoundry.services=nfs
.
NOTE: Replace <user>
, <pass>
,<host>
and <data-flow-server-uri>
in the following stream definition.
stream create --name inboundSftp --definition "sftp --username=<user> --password=<pass> --host=<host> --allow-unknown-keys=true --remote-dir=/remote-files/ --local-dir=/var/scdf/shared-files/ --task.launch.request.taskName=fileIngestTask --task.launch.request.deployment-properties=deployer.*.cloudfoundry.services=nfs | task-launcher --spring.cloud.dataflow.client.server-uri=<data-flow-server-uri>"
The dataflow-task-launcher-sink uses a PollableMessageSource
controlled by a dynamic trigger with exponential backoff. By default, the sink polls its input destination every 1 second. If there are no task launch requests, the polling period continues to double, to a maximum of 30 seconds. If a task launch request is present, the trigger resets to 1 second. You can configure the trigger parameters by setting the task-launcher
sink properties, trigger.period
and trigger.max-period
, in the stream definition.
Deploy the stream
When we deploy the stream, we must also configure the sftp
pod by running the following command:
stream deploy inboundSftp --properties "deployer.sftp.cloudfoundry.services=nfs"
Verify Stream Deployment
We can see the status of the streams to be deployed with stream list
, as the following example shows:
dataflow:>stream list
╔═══════════╤═══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╤═══════════════════╗
║Stream Name│ Stream Definition │ Status ║
╠═══════════╪═══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╪═══════════════════╣
║inboundSftp│sftp --task.launch.request.deployment-properties='deployer.*.cloudfoundry.services=nfs' --sftp.factory.password='******' --sftp.local-dir=/var/scdf/shared-files/ │The stream has been║
║ │--sftp.factory.allow-unknown-keys=true --sftp.factory.username='******' --sftp.remote-dir=/remote-files/ --sftp.factory.host=<host> --task.launch.request.taskName=fileIngestTask | │successfully ║
║ │task-launcher --spring.cloud.dataflow.client.server-uri=<data-flow-server-uri │deployed ║
╚═══════════╧═══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╧═══════════════════╝
Inspect the Application Logs
Use the Cloud Foundry CLI to list the apps. The source
and sink
applications should be in a started state.
cf apps
Getting apps in org myorg / space myspace as someuser...
OK
name requested state instances memory disk urls
...
Ky7Uk6q-inboundSftp-sftp-v1 started 1/1 2G 1G Ky7Uk6q-inboundSftp-sftp-v1.apps.hayward.cf-app.com
Ky7Uk6q-inboundSftp-task-launcher-v1 started 1/1 2G 1G Ky7Uk6q-inboundSftp-task-launcher-v1.apps.hayward.cf-app.com
...
The log files of the sftp
source would be useful to debug issues such as SFTP connection failures and to verify SFTP downloads. To view the logs, run the following command:
cf logs Ky7Uk6q-inboundSftp-sftp-v1 --recent
The logs for the task-launcher
application would also be useful to debug data flow connection issues and verify task launch requests:
cf logs Ky7Uk6q-inboundSftp-task-launcher-v1 --recent
Copy a File into the Remote Directory
You can find sample data in the data/
directory of the sample project.
Connect to the SFTP server and upload data/name-list.csv
into the remote-files
directory.
When this file is detected, the sftp
source downloads it to the /var/scdf/shared-files
directory specified by --local-dir
. We use the /var/scdf
shared mount path that we configured for the nfs
service. When the file is downloaded, the source emits a task launch request.
The task launch request includes the name of the task to launch along with the local file path, given as a command line argument.
Spring Batch binds each command line argument to a corresponding JobParameter
.
The FileIngestTask
job processes the file given by the JobParameter
named localFilePath
.
Since there have not been any recent requests, the task launches within 30 seconds after the request is published (see the earlier tip about configuring the launch trigger).
Inspect Job Executions
To inspect job inspections, run the following command (shown with its output):
dataflow:>job execution list
╔═══╤═══════╤═════════╤════════════════════════════╤═════════════════════╤══════════════════╗
║ID │Task ID│Job Name │ Start Time │Step Execution Count │Definition Status ║
╠═══╪═══════╪═════════╪════════════════════════════╪═════════════════════╪══════════════════╣
║1 │1 │ingestJob│Tue Jun 11 15:56:27 EDT 2019│1 │Created ║
╚═══╧═══════╧═════════╧════════════════════════════╧═════════════════════╧══════════════════╝
We can also list more details about that specific job execution:
dataflow:>job execution display --id 1
╔═══════════════════════════════════════╤════════════════════════════════════╗
║ Key │ Value ║
╠═══════════════════════════════════════╪════════════════════════════════════╣
║Job Execution Id │6 ║
║Task Execution Id │6 ║
║Task Instance Id │6 ║
║Job Name │ingestJob ║
║Create Time │Thu Jun 13 17:06:28 EDT 2019 ║
║Start Time │Thu Jun 13 17:06:29 EDT 2019 ║
║End Time │Thu Jun 13 17:06:57 EDT 2019 ║
║Running │false ║
║Stopping │false ║
║Step Execution Count │1 ║
║Execution Status │COMPLETED ║
║Exit Status │COMPLETED ║
║Exit Message │ ║
║Definition Status │Created ║
║Job Parameters │ ║
║-spring.cloud.task.executionid(STRING) │1 ║
║run.id(LONG) │1 ║
║localFilePath(STRING) │/var/scdf/shared-files/name-list.csv║
╚═══════════════════════════════════════╧════════════════════════════════════╝
Verify Data
When the the batch job runs, it processes the file in the local directory (/var/scdf/shared-files
), transforms each item to uppercase names, and inserts it into the database.
Use PivotalMySQLWeb to inspect the data.
Kubernetes
This section describes how to set up Spring Batch and Spring Cloud Data Flow on Kubernetes and then create our example batch process.
Prerequisites
This example assumes Data Flow is installed on Minikube with Kafka and MySQL. We recommend using the Helm chart. To get started, run the following command:
helm install --name my-release --set kafka.enabled=true,rabbitmq.enabled=false,server.service.type=NodePort stable/spring-cloud-data-flow
Running this example on Kubernetes requires configuring an NFS server and creating a corresponding persistent volume
and persistent volume claim
resources, as described in the Kubernetes NFS Configuration section.
We also require an external SFTP server with a /remote-files
directory.
Register the Applications
If you downloaded the sample project, you can build and publish the Docker image to the Minikube registry:
eval $(minikube docker-env)
./mvnw clean package docker:build -Pkubernetes
Otherwise, you can skip this step to pull the image from Dockerhub:
app register --name fileIngest --type task --uri docker://springcloud/ingest
Register the prepackaged sftp
source and task-launcher
sink applications:
app register --name sftp --type source --uri docker://springcloudstream/sftp-dataflow-source-kafka:2.1.0.RELEASE --metadata-uri maven://org.springframework.cloud.stream.app:sftp-dataflow-source-kafka:jar:metadata:2.1.0.RELEASE
app register --name task-launcher --type sink --uri docker://springcloudstream/task-launcher-dataflow-sink-kafka:1.0.1.RELEASE --metadata-uri maven://org.springframework.cloud.stream.app:task-launcher-dataflow-sink-kafka:jar:metadata:1.0.1.RELEASE
Create the Task
To create the task, run the following command:
task create fileIngestTask --definition fileIngest
Create the Stream
The sftp
source is configured to publish a task launch request to launch the fileIngestTask
task.
The launch request mounts the NFS share to the task pod by using the
deployer.*.kubernetes.volumes=[{'name':'staging','persistentVolumeClaim':{'claimName':'nfs-volume-claim'}}]
and
deployer.*.kubernetes.volumeMounts=[{'mountPath':'/staging/shared-files','name':'staging'}]
deployment properties.
NOTE: Replace <user>
, <pass>
, and <data-flow-server-uri>
in the following stream definition. The <host>
value here is the default Minikube gateway for VirtualBox.
To get the <data-flow-server-uri>
find the name of the service and use the minikube service
command:
kubectl get svc
...
my-release-data-flow-server NodePort 10.97.74.123 <none> 80:30826/TCP
minikube service my-release-data-flow-server --url
http://192.168.99.105:30826
stream create inboundSftp --definition "sftp --host=192.168.99.1 --username=<user> --password=<pass> --allow-unknown-keys=true --remote-dir=/remote-files --local-dir=/staging/shared-files --task.launch.request.taskName=fileIngestTask --task.launch.request.deployment-properties=deployer.*.kubernetes.volumes=[{'name':'staging','persistentVolumeClaim':{'claimName':'nfs-volume-claim'}}],deployer.*.kubernetes.volumeMounts=[{'mountPath':'/staging/shared-files','name':'staging'}] | task-launcher --spring.cloud.dataflow.client.server-uri=<dataflow-uri>"
The dataflow-task-launcher-sink uses a PollableMessageSource
controlled by a dynamic trigger with exponential backoff. By default, the sink polls its input destination every second. If there are no task launch requests, the polling period continue to double, to a maximum of 30 seconds. If a task launch request is present, the trigger resets to one second. You can configure the trigger parameters by setting the task-launcher
sink properties, trigger.period
and trigger.max-period
, in the stream definition.
Deploy the Stream
When we deploy the stream, we must also configure a volume mount for the sftp
source.
stream deploy inboundSftp --properties "deployer.sftp.kubernetes.volumes=[{'name':'staging','persistentVolumeClaim':{'claimName':'nfs-volume-claim'}}],deployer.sftp.kubernetes.volumeMounts=[{'mountPath':'/staging/shared-files','name':'staging'}]"
Verify Stream deployment
We can see the status of the streams to be deployed with stream list
, as the following example shows:
dataflow:>stream list
╔═══════════╤═════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╤════════════╗
║Stream Name│ Stream Definition │ Status ║
╠═══════════╪═════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╪════════════╣
║inboundSftp│sftp │The stream ║
║ │--task.launch.request.deployment-properties="deployer.*.kubernetes.volumes=[{'name':'staging','persistentVolumeClaim':{'claimName':'nfs-volume-claim'}}],deployer.*.kubernetes.volumeMounts=[{'mountPath':'/staging/shared-files','name':'staging'}]"│has been ║
║ │--sftp.factory.password='******' --sftp.local-dir=/staging/shared-files --sftp.factory.allow-unknown-keys=true --sftp.factory.username='******' --sftp.remote-dir=/remote-files --sftp.factory.host=192.168.99.1 │successfully║
║ │--task.launch.request.taskName=fileIngestTask | task-launcher --spring.cloud.dataflow.client.server-uri=http://192.168.99.105:30826 │deployed ║
╚═══════════╧═════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╧════════════╝
Inspect the Application Logs
Use kubectl
to list the apps. The source
and sink
applications should be in a started state.
kubectl get pods
NAME READY STATUS RESTARTS AGE
...
inboundsftp-sftp-v12-6d55d469bd-t8znd 1/1 Running 0 6m24s
inboundsftp-task-launcher-v12-555d4785c5-zjr6b 1/1 Running 0 6m24s
...
The log files of the sftp
source would be useful to debug issues such as SFTP connection failures and to verify SFTP downloads.
kubectl logs inboundsftp-sftp-v12-6d55d469bd-t8znd
The logs for the task-launcher
application would be useful to debug data flow connection issues and verify task launch requests:
kubectl logs inboundsftp-task-launcher-v12-555d4785c5-zjr6b
Copy a File into the Remote Directory
You can find sample data in the data/
directory of the sample project.
Connect to the SFTP server and upload data/name-list.csv
into the remote-files
directory:
When this file is detected, the sftp
source downloads it to the /var/scdf/shared-files
directory specified by --local-dir
. We use the /var/scdf
shared mount path that we configured for the nfs
service. When the file is downloaded, the source emits a task launch request.
The task launch request includes the name of the task to launch, along with the local file path, given as a command line argument.
Spring Batch binds each command line argument to a corresponding JobParameter
.
The FileIngestTask
job processes the file given by the JobParameter
named localFilePath
.
Since there have not been any recent requests, the task launches within 30 seconds after the request is published (see the earlier tip about configuring the launch trigger).
Inspect Job Executions
To inspect job executions, run the following command (shown with its output):
dataflow:>job execution list
╔═══╤═══════╤═════════╤════════════════════════════╤═════════════════════╤══════════════════╗
║ID │Task ID│Job Name │ Start Time │Step Execution Count │Definition Status ║
╠═══╪═══════╪═════════╪════════════════════════════╪═════════════════════╪══════════════════╣
║1 │1 │ingestJob│Thu Jun 13 08:39:59 EDT 2019│1 │Created ║
╚═══╧═══════╧═════════╧════════════════════════════╧═════════════════════╧══════════════════╝
We can also list more details about that specific job execution:
dataflow:>job execution display --id 1
╔═══════════════════════════════════════════╤═══════════════════════════════════╗
║ Key │ Value ║
╠═══════════════════════════════════════════╪═══════════════════════════════════╣
║Job Execution Id │1 ║
║Task Execution Id │424 ║
║Task Instance Id │1 ║
║Job Name │ingestJob ║
║Create Time │Thu Jun 13 08:39:59 EDT 2019 ║
║Start Time │Thu Jun 13 08:39:59 EDT 2019 ║
║End Time │Thu Jun 13 08:40:07 EDT 2019 ║
║Running │false ║
║Stopping │false ║
║Step Execution Count │1 ║
║Execution Status │COMPLETED ║
║Exit Status │COMPLETED ║
║Exit Message │ ║
║Definition Status │Created ║
║Job Parameters │ ║
║-spring.cloud.task.executionid(STRING) │424 ║
║run.id(LONG) │1 ║
║-spring.datasource.username(STRING) │****** ║
║-spring.cloud.task.name(STRING) │fileIngestTask ║
║-spring.datasource.password(STRING) │****** ║
║-spring.datasource.driverClassName(STRING) │org.mariadb.jdbc.Driver ║
║localFilePath(STRING) │/staging/shared-files/name-list.csv║
║-spring.datasource.url(STRING) │****** ║
╚═══════════════════════════════════════════╧═══════════════════════════════════╝
Verify Data
When the the batch job runs, it processes the file in the local directory (/staging/shared-files
), transforms each item to uppercase names, and inserts it into the database.
Open a shell in the mysql
container to query the people
table:
kubectl get pods
...
my-release-mysql-56f988dd6c-qlm8q 1/1 Running
...
kubectl exec -it my-release-mysql-56f988dd6c-qlm8q -- /bin/bash
# mysql -u root -p$MYSQL_ROOT_PASSWORD
mysql> select * from dataflow.people;
+-----------+------------+-----------+
| person_id | first_name | last_name |
+-----------+------------+-----------+
| 1 | AARON | AABERG |
| 2 | AARON | AABY |
| 3 | ABBEY | AADLAND |
| 4 | ABBIE | AAGAARD |
| 5 | ABBY | AAKRE |
| 6 | ABDUL | AALAND |
| 7 | ABE | AALBERS |
| 8 | ABEL | AALDERINK |
| 9 | ABIGAIL | AALUND |
| 10 | ABRAHAM | AAMODT |
| ... |
+-----------+------------+-----------+
Limiting Concurrent Task Executions
This recipe processes a single file with 5000+ items. What if we copy 100 files into the remote directory?
The sftp
source processes them immediately, generating 100 task launch requests. The Dataflow Server launches tasks asynchronously, so this could potentially overwhelm the resources of the runtime platform.
For example, when running the Data Flow server on your local machine, each launched task creates a new JVM. In Cloud Foundry, each task creates a new container instance, and, in Kubernetes, a pod.
Fortunately, Spring Cloud Data Flow provides configuration settings to limit the number of concurrently running tasks.
We can use this sample to see how this works.
Lower the Maximum Concurrent Task Executions
The sample project includes twenty files in the data/spilt
directory. To observe the limit in action, we can set the maximum concurrent tasks to 3.
For running tasks on a local server, restart the server and add the following command line argument: spring.cloud.dataflow.task.platform.local.accounts[default].maximum-concurrent-tasks=3
.
If your process runs on Cloud Foundry, run the following command:
cf set-env <dataflow-server> SPRING_CLOUD_DATAFLOW_TASK_PLATFORM_CLOUDFOUNDRY_ACCOUNTS[DEFAULT]_DEPLOYMENT_MAXIMUMCONCURRENTTASKS 3
If your process runs on Kubernetes, edit the Data Flow server configmap
, by running the following command:
kubectl edit configmap my-release-data-flow-server
Add the maximum-concurrent-tasks
property, as follows:
apiVersion: v1
data:
application.yaml: |-
spring:
cloud:
dataflow:
task:
platform:
kubernetes:
accounts:
default:
maximum-concurrent-tasks: 3
limits:
memory: 1024Mi
cpu: 500m
After editing the configmap
, delete the Data Flow server pod to force it to restart. Then wait for it to restart.
Verify that the Maximum Concurrent Task Executions is Enforced
The task launcher sink polls the input destination. The polling period adjusts according to the presence of task launch requests and also to the number of currently running tasks reported through the Data Flow server's tasks/executions/current
REST endpoint.
The sink queries this endpoint and pauses polling the input for new requests if the number of concurrent tasks for the task platform is at its limit.
This introduces a 1-30 second lag between the creation of the task launch request and the execution of the request, sacrificing some performance for resilience.
Task launch requests are never sent to a dead letter queue because the server is busy or unavailable.
The exponential backoff also prevents the app from querying the server excessively when there are no task launch requests.
Monitor the Task Executions
Tail the task-launcher
container logs.
You can also monitor the Data Flow server for current task executions:
watch curl <dataflow-server-url>/tasks/executions/current
Every 2.0s: curl http://192.168.99.105:30826/tasks/executions/current
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
0 0 0 0 0 0 0 0 --:--:-- --:--:-- --:--:-- 0100 92 0 92 0 0 1202 0 --:--:-- --:--:-- --:--:
-- 1210
[{"name":"default","type":"Kubernetes","maximumTaskExecutions":3,"runningExecutionCount":0}]
Run the Sample with Multiple Files
With the sample stream deployed, upload the twenty files in data/spilt
to /remote-files
files. In the task-launcher
logs, you should see the exponential backoff working:
2019-06-14 15:00:48.247 INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : Polling period reset to 1000 ms.
2019-06-14 15:00:49.265 INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : Launching Task fileIngestTask on platform default
2019-06-14 15:00:50.433 INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : Launching Task fileIngestTask on platform default
2019-06-14 15:00:51.686 INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : Launching Task fileIngestTask on platform default
2019-06-14 15:00:52.929 WARN 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : The data Flow task platform default has reached its concurrent task execution limit: (3)
2019-06-14 15:00:52.929 INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : Polling paused- increasing polling period to 2 seconds.
2019-06-14 15:00:55.008 WARN 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : The data Flow task platform default has reached its concurrent task execution limit: (3)
2019-06-14 15:00:55.008 INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : Polling paused- increasing polling period to 4 seconds.
2019-06-14 15:00:59.039 WARN 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : The data Flow task platform default has reached its concurrent task execution limit: (3)
2019-06-14 15:00:59.040 INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : Polling paused- increasing polling period to 8 seconds.
2019-06-14 15:01:07.104 WARN 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : The data Flow task platform default has reached its concurrent task execution limit: (3)
2019-06-14 15:01:07.104 INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : Polling paused- increasing polling period to 16 seconds.
2019-06-14 15:01:23.127 INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : Polling resumed
2019-06-14 15:01:23.128 INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : Launching Task fileIngestTask on platform default
2019-06-14 15:01:23.232 INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : Polling period reset to 1000 ms.
2019-06-14 15:01:24.277 INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : Launching Task fileIngestTask on platform default
2019-06-14 15:01:25.483 INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : Launching Task fileIngestTask on platform default
2019-06-14 15:01:26.743 INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : Launching Task fileIngestTask on platform default
2019-06-14 15:01:28.035 INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : Launching Task fileIngestTask on platform default
2019-06-14 15:01:29.324 WARN 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : The data Flow task platform default has reached its concurrent task execution limit: (3)
2019-06-14 15:01:29.325 INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : Polling paused- increasing polling period to 2 seconds.
2019-06-14 15:01:31.435 WARN 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : The data Flow task platform default has reached its concurrent task execution limit: (3)
2019-06-14 15:01:31.436 INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : Polling paused- increasing polling period to 4 seconds.
2019-06-14 15:01:35.531 WARN 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : The data Flow task platform default has reached its concurrent task execution limit: (3)
2019-06-14 15:01:35.532 INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : Polling paused- increasing polling period to 8 seconds.
2019-06-14 15:01:43.615 WARN 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : The data Flow task platform default has reached its concurrent task execution limit: (3)
2019-06-14 15:01:43.615 INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer : Polling paused- increasing polling period to 16 seconds.
Avoiding Duplicate Processing
The sftp
source does not process files that it has already seen.
It uses a Metadata Store to keep track of files by extracting content from messages at runtime.
Out of the box, it uses an in-memory metadata store, but it is pluggable to a persistent store that is useful for production deployments.
Thus, if we re-deploy the stream or restart the sftp
source, this state is lost and files are reprocessed.
Thanks to the magic of Spring, we can auto-configure one of the available persistent Metadata Stores to prevent duplicate processing.
In this example, we auto configure the JDBC metadata store, since we are already using a JDBC database.
Configure and Build the SFTP Source
For this purpose, we add some JDBC dependencies to the sftp-dataflow
source.
Clone the [sftp]https://github.com/spring-cloud-stream-app-starters/sftp stream app starter.
Change directory to the SFTP directory.
In the following command, replace <binder>
with kafka
or rabbit
as appropriate for your configuration and run the command:
./mvnw clean install -DskipTests -PgenerateApps
cd apps/sftp-dataflow-source-<binder>
Add the following dependencies to pom.xml
:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
</dependency>
If you run on Kubernetes, use the mariadb driver instead of H2:
<dependency>
<groupId>org.mariadb.jdbc</groupId>
<artifactId>mariadb-java-client</artifactId>
<version>2.3.0</version>
</dependency>
If you run on a local server with the in-memory H2 database, set the JDBC URL in src/main/resources/application.properties
to use the Data Flow server's database:
spring.datasource.url=jdbc:h2:tcp://localhost:19092/mem:dataflow
If you run on Kubernetes, set the datasource to use the internal IP of the mysql
service, as the following example shows:
spring.datasource.url=jdbc:mysql://10.98.214.235:3306/dataflow
If you rune in Cloud Foundry or Kubernetes, add the following property to src/main/resources/application.properties
:
spring.integration.jdbc.initialize-schema=always
Build the sftp
source and register it with Data Flow.
Run the Sample App
Follow the instructions for running the sample on your preferred platform, up to the Copy file...
step.
If you have already completed the main exercise, restore the data to its initial state and redeploy the stream:
- Clean the local and remote data directories.
- Run the
DROP TABLE PEOPLE;
SQL command in the database. - Undeploy the stream and deploy it again to run the updated
sftp
source.
If you run in Cloud Foundry, set the deployment properties to bind sftp
to the mysql
service, as the following example shows:
dataflow>stream deploy inboundSftp --properties "deployer.sftp.cloudfoundry.services=nfs,mysql"
Copy a File into the Remote Directory
We use one small file for this.
The data/split
directory in the sample project contains the contents of
data/name-list.csv
split into 20 files. Upload names_aa.csv
:
Inspect the Database
By using a database tool, as described earlier, view the contents of the INT_METADATA_STORE
table. The following image shows the result:
Note that there is a single key-value pair, where the key identifies the file name (the sftpSource/
prefix provides a namespace for the sftp
source app). The value is a timestamp indicating when the message was received.
The metadata store tracks files that have already been processed.
This prevents the same files from being pulled from the remote directory on every polling cycle.
Only new files or files that have been updated are processed.
Since there are no uniqueness constraints on the PEOPLE
table, a file being processed multiple times by our batch job results in duplicate table rows. Since we have configured a persistent metadata store, duplicate processing is prevented across container restarts. You can verify this by undeploying and redeploying the stream or by restarting the sftp
source.
If we view the PEOPLE
table, it should look something like this:
Now we can upload the same file to the SFTP server. If you are logged into it, you can update the timestamp, as follows:
touch /remote-files/names_aa.csv
Now the file is reprocessed and the PEOPLE
table contains duplicate data. If you ORDER BY FIRST_NAME
, you see something like this:
If we drop another one of files into the remote directory, that is processed, and we see another entry in the Metadata Store.