SFTP to JDBC File Ingest

This recipe provides step by step instructions to build a Data Flow pipeline to ingest files from an SFTP source and save the contents to a JDBC data store. The pipeline is designed to launch a task whenever a new file is detected by the SFTP source. In this case, the task is a Spring Batch job that processes the file, converting the contents of each line to uppercase, and inserting it into a table.

The file ingest batch job reads from a CSV text file with lines formatted as first_name,last_name and writes each entry to a database table using a JdbcBatchItemWriter] that executes INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName) for each line.

You can download the project that contains the source code and sample data from your browser, or from the command line:

wget https://github.com/spring-cloud/spring-cloud-dataflow-samples/blob/master/dataflow-website/recipes/file-ingest/file-to-jdbc/file-to-jdbc.zip?raw=true -O file-to-jdbc.zip

If you choose not to build the task application yourself, the executable jar is published to the Spring Maven repository and to the springcloud/ingest Docker repository.

The pipeline is built using the following pre-packaged Spring Cloud Stream applications:

  • sftp-dataflow-source an SFTP source configured to emit a Task Launch Request whenever it detects a new file in one or more polled SFTP directories.
  • dataflow-task-launcher-sink a sink that acts as a REST client to the Data Flow server to launch a Data Flow task.

This pipeline runs on all supported Data Flow platforms. The SFTP source downloads each file from the SFTP server to a local directory before sending the task launch request. The request sets localFilePath as a command line argument for the task. When running on a cloud platform, we need to mount a shared directory available to the SFTP source container and the task container. For this example, we will set up an NFS mounted directory. Configuring the environment and containers for NFS is platform specific and is described here for Cloud Foundry v2.3+ and minikube.

Prerequisites

Data Flow Installation

Make sure you have installed Spring Cloud Data Flow to the platform of your choice:

NOTE: For kubernetes, the sample task application is configured to use mysql. The Data Flow server must also be configured for mysql.

Using Data Flow

This example assumes that you know how to use Spring Cloud Data Flow to register and deploy applications using the Spring Cloud Data Flow dashboard or the Spring Cloud Data Flow shell. If you need further instructions on using Data Flow please refer to Stream Processing using Spring Cloud Data Flow and Register and launch a batch application using Spring Cloud Data Flow.

SFTP server

This example requires access to an SFTP server. For running on a local machine and minikube, we will use the host machine as the SFTP server. For Cloud Foundry, and Kubernetes in general, an external SFTP server is required. On the SFTP server, create a /remote-files directory. This is where we will drop files to trigger the pipeline.

NFS configuration

NFS is not required when running locally.

Cloud Foundry NFS configuration

This feature is provided in Pivotal Cloud Foundry by NFS Volume Services

To run this example, we will need:

  • a Cloud Foundry instance v2.3+ with NFS Volume Services enabled
  • An NFS server accessible from the Cloud Foundry instance
  • An nfs service instance properly configured

NOTE: For simplicity, this example assumes the nfs service is created with common configuration as follows with a common mount point /var/scdf for all bound apps. It is also possible to set these parameters when binding the nfs service to an application using deployment propterties:

cf create-service nfs Existing nfs -c '{"share":<nfs-host>/staging","uid":<uid>,"gid":<gid>, "mount":"/var/scdf"}'

Kubernetes NFS configuration

Kubernetes provides many options for configuring and sharing persistent volumes. For this example, we will use minikube and use the host machine as the NFS server. The following instructions works for OS/X and should be similar for Linux hosts:

Make sure minikube is started. The commands below provide NFS access to the minikube VM. The minikube IP is subject to change each time it is started, so these steps should be performed after each start.

Here we will expose a shared directory called /staging.

sudo mkdir /staging
sudo chmod 777 /staging
sudo echo "/staging -alldirs -mapall="$(id -u)":"$(id -g)" $(minikube ip)" >> /etc/exports
sudo nfsd restart

Verify the nfs mounts:

showmount -e 127.0.0.1
Exports list on 127.0.0.1:
/staging 192.168.99.105

Configure persistent volume and persistent volume claim resources. Copy the following and save it to a file named nfs-config.yml:

---
apiVersion: v1
kind: PersistentVolume
metadata:
  name: nfs-volume
spec:
  capacity:
    storage: 4Gi
  accessModes:
    - ReadWriteMany
  persistentVolumeReclaimPolicy: Retain
  storageClassName: standard
  nfs:
    # The address 192.168.99.1 is the Minikube gateway to the host for VirtualBox. This way
    # not the container IP will be visible by the NFS server on the host machine,
    # but the IP address of the `minikube ip` command. You will need to
    # grant access to the `minikube ip` IP address.
    server: 192.168.99.1
    path: '/staging'

---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: nfs-volume-claim
  namespace: default
spec:
  storageClassName: standard
  accessModes:
    - ReadWriteMany
  resources:
    requests:
      storage: 4Gi

Create the resources:

kubectl apply -f nfs-config.yml

Deployment

Local

For local deployment, this example uses Kafka as the message broker. Create directories for the remote and local files:

mkdir -p /tmp/remote-files /tmp/local-files

Register the applications

If you downloaded and built the sample project, you can register it using a file:// url, e.g. file://<path-to-project>/target/ingest-1.0.0-SNAPSHOT.jar Otherwise use the published maven jar:

app register --name fileIngest --type task --uri maven://io.spring.cloud.dataflow.ingest:ingest:1.0.0.BUILD-SNAPSHOT

Register the prepackaged sftp source and task-launcher sink applications:

app register --name sftp --type source  --uri maven://org.springframework.cloud.stream.app:sftp-dataflow-source-kafka:2.1.0.RELEASE
app register --name task-launcher --type sink --uri maven://org.springframework.cloud.stream.app:task-launcher-dataflow-sink-kafka:1.0.1.RELEASE

Create the task

task create fileIngestTask --definition fileIngest

Create and deploy the stream

NOTE: Replace <user> and <pass> below. The username and password are the credentials for the local (or remote) user. If you are not using a local SFTP server, specify the host using the host, and optionally port, parameters. If not defined, host defaults to 127.0.0.1 and port defaults to 22.

stream create --name inboundSftp --definition "sftp --username=<user> --password=<pass> --allow-unknown-keys=true --task.launch.request.taskName=fileIngestTask --remote-dir=/tmp/remote-files/ --local-dir=/tmp/local-files/ | task-launcher" --deploy

The dataflow-task-launcher-sink uses a PollableMessageSource controlled by a dynamic trigger with exponential backoff. By default, the sink polls its input destination every 1 second. If there are no task launch requests, the polling period will continue to double up to a maximum of 30 seconds. If a task launch request is present, the trigger resets to 1 second. The trigger parameters may be configured by setting the task-launcher sink properties trigger.period and trigger.max-period in the stream definition.

Verify Stream deployment

We can see the status of the streams to be deployed with stream list, for example:

dataflow:>stream list
╔═══════════╤════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╤════════════════════════════╗
║Stream Name│                                                         Stream Definition                                                          │           Status           ║
╠═══════════╪════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╪════════════════════════════╣
║inboundSftp│sftp --password='******' --remote-dir=/tmp/remote-files/ --local-dir=/tmp/local-files/ --task.launch.request.taskName=fileIngestTask│The stream has been         ║
║           │--allow-unknown-keys=true --username=<user> | task-launcher                                                                         │successfully deployed       ║
╚═══════════╧════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╧════════════════════════════╝

Inspect the application logs

In the event the stream failed to deploy, or you would like to inspect the logs for any reason, you can get the location of the logs to applications created for the inboundSftp stream using the runtime apps command:

dataflow:>runtime apps
╔═══════════════════════════╤═══════════╤════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╗
║   App Id / Instance Id    │Unit Status│                                                                     No. of Instances / Attributes                                                                      ║
╠═══════════════════════════╪═══════════╪════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╣
║inboundSftp.sftp           │ deployed  │                                                                                   1                                                                                    ║
║                           │           │       guid = 23057                                                                                                                                                     ║
║                           │           │        pid = 71927                                                                                                                                                     ║
║                           │           │       port = 23057                                                                                                                                                     ║
╟───────────────────────────┼───────────┼────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╢
║inboundSftp.sftp-0         │ deployed  │     stderr = /var/folders/hd/5yqz2v2d3sxd3n879f4sg4gr0000gn/T/spring-cloud-deployer-120915912946760306/inboundSftp-1540821009913/inboundSftp.sftp/stderr_0.log         ║
║                           │           │     stdout = /var/folders/hd/5yqz2v2d3sxd3n879f4sg4gr0000gn/T/spring-cloud-deployer-120915912946760306/inboundSftp-1540821009913/inboundSftp.sftp/stdout_0.log         ║
║                           │           │        url = http://192.168.64.1:23057                                                                                                                                 ║
║                           │           │working.dir = /var/folders/hd/5yqz2v2d3sxd3n879f4sg4gr0000gn/T/spring-cloud-deployer-120915912946760306/inboundSftp-1540821009913/inboundSftp.sftp                      ║
╟───────────────────────────┼───────────┼────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╢
║inboundSftp.task-launcher  │ deployed  │                                                                                   1                                                                                    ║
╟───────────────────────────┼───────────┼────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╢
║                           │           │       guid = 60081                                                                                                                                                     ║
║                           │           │        pid = 71926                                                                                                                                                     ║
║                           │           │       port = 60081                                                                                                                                                     ║
║inboundSftp.task-launcher-0│ deployed  │     stderr = /var/folders/hd/5yqz2v2d3sxd3n879f4sg4gr0000gn/T/spring-cloud-deployer-120915912946760306/inboundSftp-1540820991695/inboundSftp.task-launcher/stderr_0.log║
║                           │           │     stdout = /var/folders/hd/5yqz2v2d3sxd3n879f4sg4gr0000gn/T/spring-cloud-deployer-120915912946760306/inboundSftp-1540820991695/inboundSftp.task-launcher/stdout_0.log║
║                           │           │        url = http://192.168.64.1:60081                                                                                                                                 ║
║                           │           │working.dir = /var/folders/hd/5yqz2v2d3sxd3n879f4sg4gr0000gn/T/spring-cloud-deployer-120915912946760306/inboundSftp-1540820991695/inboundSftp.task-launcher             ║
╚═══════════════════════════╧═══════════╧════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╝

Drop a file into the remote directory

Normally data would be uploaded to an SFTP server. We will simulate this by copying a file into the directory specified by --remote-dir. Sample data can be found in the data/ directory of the sample project.

Copy data/name-list.csv into the /tmp/remote-files directory which the SFTP source is monitoring. When this file is detected, the sftp source will download it to the /tmp/local-files directory specified by --local-dir, and emit a Task Launch Request. The Task Launch Request includes the name of the task to launch along with the local file path, given as a command line argument. Spring Batch binds each command line argument to a corresponding JobParameter. The FileIngestTask job processes the file given by the JobParameter named localFilePath. Since there have not been any recent requests, the task will launch within 30 seconds after the request is published (see tip above about configuring the launch trigger).

cp data/name-list.csv /tmp/remote-files

When the batch job launches, you will see something like this in the SCDF console log:

2018-10-26 16:47:24.879  INFO 86034 --- [nio-9393-exec-7] o.s.c.d.spi.local.LocalTaskLauncher      : Command to be executed: /Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/jre/bin/java -jar <path-to>/batch/file-ingest/target/ingest-1.0.0.jar localFilePath=/tmp/local-files/name-list.csv --spring.cloud.task.executionid=1
2018-10-26 16:47:25.100  INFO 86034 --- [nio-9393-exec-7] o.s.c.d.spi.local.LocalTaskLauncher      : launching task fileIngestTask-8852d94d-9dd8-4760-b0e4-90f75ee028de
   Logs will be in /var/folders/hd/5yqz2v2d3sxd3n879f4sg4gr0000gn/T/fileIngestTask3100511340216074735/1540586844871/fileIngestTask-8852d94d-9dd8-4760-b0e4-90f75ee028de

Inspect Job Executions

After data is received and the batch job runs, it will be recorded as a Job Execution. We can view job executions by for example issuing the following command in the Spring Cloud Data Flow shell:

dataflow:>job execution list
╔═══╤═══════╤═════════╤════════════════════════════╤═════════════════════╤══════════════════╗
║ID │Task ID│Job Name │         Start Time         │Step Execution Count │Definition Status ║
╠═══╪═══════╪═════════╪════════════════════════════╪═════════════════════╪══════════════════╣
║1  │1      │ingestJob│Tue May 01 23:34:05 EDT 2018│1                    │Created           ║
╚═══╧═══════╧═════════╧════════════════════════════╧═════════════════════╧══════════════════╝

As well as list more details about that specific job execution:

dataflow:>job execution display --id 1
╔═══════════════════════════════════════╤══════════════════════════════╗
║                  Key                  │            Value             ║
╠═══════════════════════════════════════╪══════════════════════════════╣
║Job Execution Id                       │1                             ║
║Task Execution Id                      │1                             ║
║Task Instance Id                       │1                             ║
║Job Name                               │ingestJob                     ║
║Create Time                            │Fri Oct 26 16:57:51 EDT 2018  ║
║Start Time                             │Fri Oct 26 16:57:51 EDT 2018  ║
║End Time                               │Fri Oct 26 16:57:53 EDT 2018  ║
║Running                                │false                         ║
║Stopping                               │false                         ║
║Step Execution Count                   │1                             ║
║Execution Status                       │COMPLETED                     ║
║Exit Status                            │COMPLETED                     ║
║Exit Message                           │                              ║
║Definition Status                      │Created                       ║
║Job Parameters                         │                              ║
║-spring.cloud.task.executionid(STRING) │1                             ║
║run.id(LONG)                           │1                             ║
║localFilePath(STRING)                  │/tmp/local-files/name-list.csv║
╚═══════════════════════════════════════╧══════════════════════════════╝

Verify data

When the the batch job runs, it processes the file in the local directory /tmp/local-files and transforms each item to uppercase names and inserts it into the database.

You may use any database tool that supports the H2 database to inspect the data. In this example we use the database tool DBeaver. Lets inspect the table to ensure our data was processed correctly.

Within DBeaver, create a connection to the database using the JDBC URL jdbc:h2:tcp://localhost:19092/mem:dataflow, and user sa with no password. When connected, expand the PUBLIC schema, then expand Tables and then double click on the table PEOPLE. When the table data loads, click the "Data" tab to view the data.

Cloud Foundry

Prerequisites

Running this example on Cloud Foundry requires configuring an NFS server and creating an nfs service to access it as discribed in the Cloud Foundry NFS Configuration section. We also require an external SFTP server with a /remote-files directory.

This also requires:

  • A mysql service instance
  • A rabbit service instance
  • PivotalMySQLWeb or another database tool to view the data

Register the applications

app register --name fileIngest --type task --uri maven://io.spring.cloud.dataflow.ingest:ingest:1.0.0.BUILD-SNAPSHOT

Register the prepackaged sftp source and task-launcher sink applications:

app register --name sftp --type source  --uri maven://org.springframework.cloud.stream.app:sftp-dataflow-source-kafka:2.1.0.RELEASE
app register --name task-launcher --type sink --uri maven://org.springframework.cloud.stream.app:task-launcher-dataflow-sink-kafka:1.0.1.RELEASE

Create the task

task create fileIngestTask --definition fileIngest

Create the stream

The sftp source is configured to publish a task launch request to launch the fileIngestTask task. The launch request binds the nfs service to the task container using deployment properties task.launch.request.deployment-properties=deployer.*.cloudfoundry.services=nfs.

NOTE: Replace <user>, <pass>,<host> and <data-flow-server-uri> in the stream definition below.

stream create --name inboundSftp --definition "sftp --username=<user> --password=<pass> --host=<host>  --allow-unknown-keys=true --remote-dir=/remote-files/ --local-dir=/var/scdf/shared-files/ --task.launch.request.taskName=fileIngestTask --task.launch.request.deployment-properties=deployer.*.cloudfoundry.services=nfs | task-launcher --spring.cloud.dataflow.client.server-uri=<data-flow-server-uri>"

The dataflow-task-launcher-sink uses a PollableMessageSource controlled by a dynamic trigger with exponential backoff. By default, the sink polls its input destination every 1 second. If there are no task launch requests, the polling period will continue to double up to a maximum of 30 seconds. If a task launch request is present, the trigger resets to 1 second. The trigger parameters may be configured by setting the task-launcher sink properties trigger.period and trigger.max-period in the stream definition.

Deploy the stream

When we deploy the stream we must also configure the sftp pod with the

stream deploy inboundSftp --properties "deployer.sftp.cloudfoundry.services=nfs"

Verify Stream deployment

We can see the status of the streams to be deployed with stream list, for example:

dataflow:>stream list
╔═══════════╤═══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╤═══════════════════╗
║Stream Name│                                                                                     Stream Definition                                                                                     │      Status       ║
╠═══════════╪═══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╪═══════════════════╣
║inboundSftp│sftp --task.launch.request.deployment-properties='deployer.*.cloudfoundry.services=nfs' --sftp.factory.password='******' --sftp.local-dir=/var/scdf/shared-files/                          │The stream has been║
║           │--sftp.factory.allow-unknown-keys=true --sftp.factory.username='******' --sftp.remote-dir=/remote-files/ --sftp.factory.host=<host> --task.launch.request.taskName=fileIngestTask |        │successfully       ║
║           │task-launcher --spring.cloud.dataflow.client.server-uri=<data-flow-server-uri                                                                                                              │deployed           ║
╚═══════════╧═══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╧═══════════════════╝

Inspect the application logs

Use the Cloud Foundry CLI to list the apps. The source and sink applications should be in a started state.

cf apps
Getting apps in org myorg / space myspace as someuser...
OK

name                                   requested state   instances   memory   disk   urls
...
Ky7Uk6q-inboundSftp-sftp-v1            started           1/1         2G       1G     Ky7Uk6q-inboundSftp-sftp-v1.apps.hayward.cf-app.com
Ky7Uk6q-inboundSftp-task-launcher-v1   started           1/1         2G       1G     Ky7Uk6q-inboundSftp-task-launcher-v1.apps.hayward.cf-app.com
...

The log files of the sftp source would be useful to debug issues such as SFTP connection failures and to verify SFTP downloads.

cf logs Ky7Uk6q-inboundSftp-sftp-v1 --recent

The logs for the task-launcher application would be useful to debug data flow connection issues and verify task launch requests:

Drop a file into the remote directory

Sample data can be found in the data/ directory of the sample project.

Connect to the SFTP server and upload data/name-list.csv into the remote-files directory:

When this file is detected, the sftp source will download it to the /var/scdf/shared-files directory specified by --local-dir. Here we are using the shared mount path /var/scdf that we configured for the nfs service. When the file is downloaded, the source emits a Task Launch Request. The Task Launch Request includes the name of the task to launch along with the local file path, given as a command line argument. Spring Batch binds each command line argument to a corresponding JobParameter. The FileIngestTask job processes the file given by the JobParameter named localFilePath. Since there have not been any recent requests, the task will launch within 30 seconds after the request is published (see tip above about configuring the launch trigger).

Inspect Job Executions

dataflow:>job execution list
╔═══╤═══════╤═════════╤════════════════════════════╤═════════════════════╤══════════════════╗
║ID │Task ID│Job Name │         Start Time         │Step Execution Count │Definition Status ║
╠═══╪═══════╪═════════╪════════════════════════════╪═════════════════════╪══════════════════╣
║1  │1      │ingestJob│Tue Jun 11 15:56:27 EDT 2019│1                    │Created           ║
╚═══╧═══════╧═════════╧════════════════════════════╧═════════════════════╧══════════════════╝

As well as list more details about that specific job execution:

dataflow:>job execution display --id 1
╔═══════════════════════════════════════╤════════════════════════════════════╗
║                  Key                  │               Value                ║
╠═══════════════════════════════════════╪════════════════════════════════════╣
║Job Execution Id                       │6                                   ║
║Task Execution Id                      │6                                   ║
║Task Instance Id                       │6                                   ║
║Job Name                               │ingestJob                           ║
║Create Time                            │Thu Jun 13 17:06:28 EDT 2019        ║
║Start Time                             │Thu Jun 13 17:06:29 EDT 2019        ║
║End Time                               │Thu Jun 13 17:06:57 EDT 2019        ║
║Running                                │false                               ║
║Stopping                               │false                               ║
║Step Execution Count                   │1                                   ║
║Execution Status                       │COMPLETED                           ║
║Exit Status                            │COMPLETED                           ║
║Exit Message                           │                                    ║
║Definition Status                      │Created                             ║
║Job Parameters                         │                                    ║
║-spring.cloud.task.executionid(STRING) │1                                   ║
║run.id(LONG)                           │1                                   ║
║localFilePath(STRING)                  │/var/scdf/shared-files/name-list.csv║
╚═══════════════════════════════════════╧════════════════════════════════════╝

Verify data

When the the batch job runs, it processes the file in the local directory /var/scdf/shared-files and transforms each item to uppercase names and inserts it into the database.

Use PivotalMySQLWeb to inspect the data.

Kubernetes

Prerequisites

This example assumes Data Flow is installed on minikube with kafka and mysql. It is recommended to use the helm chart.

helm install --name my-release --set kafka.enabled=true,rabbitmq.enabled=false,server.service.type=NodePort stable/spring-cloud-data-flow

Running this example on Kubernetes requires configuring an NFS server and creating an corresponding persistent volume and persistent volume claim resources as described in the Kubernetes NFS Configuration section. We also require an external SFTP server with a /remote-files directory.

Register the applications

If you downloaded the sample project you can build and publish the docker image to the minikube registry:

eval $(minikube docker-env)
./mvnw clean package docker:build -Pkubernetes

Otherwise, you can skip this step to pull the image from dockerhub.

app register --name fileIngest --type task --uri docker://springcloud/ingest

Register the prepackaged sftp source and task-launcher sink applications:

app register --name sftp --type source  --uri docker://springcloudstream/sftp-dataflow-source-kafka:2.1.0.RELEASE --metadata-uri maven://org.springframework.cloud.stream.app:sftp-dataflow-source-kafka:jar:metadata:2.1.0.RELEASE
app register --name task-launcher --type sink --uri docker://springcloudstream/task-launcher-dataflow-sink-kafka:1.0.1.RELEASE --metadata-uri maven://org.springframework.cloud.stream.app:task-launcher-dataflow-sink-kafka:jar:metadata:1.0.1.RELEASE

Create the task

task create fileIngestTask --definition fileIngest

Create the stream

The sftp source is configured to publish a task launch request to launch the fileIngestTask task. The launch request mounts the nfs share to the task pod using deployment properties deployer.*.kubernetes.volumes=[{'name':'staging','persistentVolumeClaim':{'claimName':'nfs-volume-claim'}}] and deployer.*.kubernetes.volumeMounts=[{'mountPath':'/staging/shared-files','name':'staging'}].

NOTE: Replace <user>, <pass> and <data-flow-server-uri> in the stream definition below. The <host> value here is the default minikube gateway for VirtualBox.

To get the <data-flow-server-uri> find the name of the service and use the minikube service command:

kubectl get svc
...
my-release-data-flow-server     NodePort       10.97.74.123     <none>        80:30826/TCP

minikube service my-release-data-flow-server --url
http://192.168.99.105:30826
stream create inboundSftp --definition "sftp --host=192.168.99.1 --username=<user> --password=<pass> --allow-unknown-keys=true --remote-dir=/remote-files --local-dir=/staging/shared-files --task.launch.request.taskName=fileIngestTask --task.launch.request.deployment-properties=deployer.*.kubernetes.volumes=[{'name':'staging','persistentVolumeClaim':{'claimName':'nfs-volume-claim'}}],deployer.*.kubernetes.volumeMounts=[{'mountPath':'/staging/shared-files','name':'staging'}] | task-launcher --spring.cloud.dataflow.client.server-uri=<dataflow-uri>"

The dataflow-task-launcher-sink uses a PollableMessageSource controlled by a dynamic trigger with exponential backoff. By default, the sink polls its input destination every 1 second. If there are no task launch requests, the polling period will continue to double up to a maximum of 30 seconds. If a task launch request is present, the trigger resets to 1 second. The trigger parameters may be configured by setting the task-launcher sink properties trigger.period and trigger.max-period in the stream definition.

Deploy the stream

When we deploy the stream we must also configure a volume mount for the sftp source.

stream deploy inboundSftp --properties "deployer.sftp.kubernetes.volumes=[{'name':'staging','persistentVolumeClaim':{'claimName':'nfs-volume-claim'}}],deployer.sftp.kubernetes.volumeMounts=[{'mountPath':'/staging/shared-files','name':'staging'}]"

Verify Stream deployment

We can see the status of the streams to be deployed with stream list, for example:

dataflow:>stream list
╔═══════════╤═════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╤════════════╗
║Stream Name│                                                                                                                  Stream Definition                                                                                                                  │   Status   ║
╠═══════════╪═════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╪════════════╣
║inboundSftp│sftp                                                                                                                                                                                                                                                 │The stream  ║
║           │--task.launch.request.deployment-properties="deployer.*.kubernetes.volumes=[{'name':'staging','persistentVolumeClaim':{'claimName':'nfs-volume-claim'}}],deployer.*.kubernetes.volumeMounts=[{'mountPath':'/staging/shared-files','name':'staging'}]"│has been    ║
║           │--sftp.factory.password='******' --sftp.local-dir=/staging/shared-files --sftp.factory.allow-unknown-keys=true --sftp.factory.username='******' --sftp.remote-dir=/remote-files --sftp.factory.host=192.168.99.1                                     │successfully║
║           │--task.launch.request.taskName=fileIngestTask | task-launcher --spring.cloud.dataflow.client.server-uri=http://192.168.99.105:30826                                                                                                                  │deployed    ║
╚═══════════╧═════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╧════════════╝

Inspect the application logs

Use kubectl to list the apps. The source and sink applications should be in a started state.

kubectl get pods
NAME                                             READY   STATUS    RESTARTS   AGE
...
inboundsftp-sftp-v12-6d55d469bd-t8znd            1/1     Running   0          6m24s
inboundsftp-task-launcher-v12-555d4785c5-zjr6b   1/1     Running   0          6m24s
...

The log files of the sftp source would be useful to debug issues such as SFTP connection failures and to verify SFTP downloads.

kubectl logs inboundsftp-sftp-v12-6d55d469bd-t8znd

The logs for the task-launcher application would be useful to debug data flow connection issues and verify task launch requests:

Drop a file into the remote directory

Sample data can be found in the data/ directory of the sample project.

Connect to the SFTP server and upload data/name-list.csv into the remote-files directory:

When this file is detected, the sftp source will download it to the /var/scdf/shared-files directory specified by --local-dir. Here we are using the shared mount path /var/scdf that we configured for the nfs service. When the file is downloaded, the source emits a Task Launch Request. The Task Launch Request includes the name of the task to launch along with the local file path, given as a command line argument. Spring Batch binds each command line argument to a corresponding JobParameter. The FileIngestTask job processes the file given by the JobParameter named localFilePath. Since there have not been any recent requests, the task will launch within 30 seconds after the request is published (see tip above about configuring the launch trigger).

Inspect Job Executions

dataflow:>job execution list
╔═══╤═══════╤═════════╤════════════════════════════╤═════════════════════╤══════════════════╗
║ID │Task ID│Job Name │         Start Time         │Step Execution Count │Definition Status ║
╠═══╪═══════╪═════════╪════════════════════════════╪═════════════════════╪══════════════════╣
║1  │1      │ingestJob│Thu Jun 13 08:39:59 EDT 2019│1                    │Created           ║
╚═══╧═══════╧═════════╧════════════════════════════╧═════════════════════╧══════════════════╝

As well as list more details about that specific job execution:

dataflow:>job execution display --id 1
╔═══════════════════════════════════════════╤═══════════════════════════════════╗
║                    Key                    │               Value               ║
╠═══════════════════════════════════════════╪═══════════════════════════════════╣
║Job Execution Id                           │1                                  ║
║Task Execution Id                          │424                                ║
║Task Instance Id                           │1                                  ║
║Job Name                                   │ingestJob                          ║
║Create Time                                │Thu Jun 13 08:39:59 EDT 2019       ║
║Start Time                                 │Thu Jun 13 08:39:59 EDT 2019       ║
║End Time                                   │Thu Jun 13 08:40:07 EDT 2019       ║
║Running                                    │false                              ║
║Stopping                                   │false                              ║
║Step Execution Count                       │1                                  ║
║Execution Status                           │COMPLETED                          ║
║Exit Status                                │COMPLETED                          ║
║Exit Message                               │                                   ║
║Definition Status                          │Created                            ║
║Job Parameters                             │                                   ║
║-spring.cloud.task.executionid(STRING)     │424                                ║
║run.id(LONG)                               │1                                  ║
║-spring.datasource.username(STRING)        │******                             ║
║-spring.cloud.task.name(STRING)            │fileIngestTask                     ║
║-spring.datasource.password(STRING)        │******                             ║
║-spring.datasource.driverClassName(STRING) │org.mariadb.jdbc.Driver            ║
║localFilePath(STRING)                      │/staging/shared-files/name-list.csv║
║-spring.datasource.url(STRING)             │******                             ║
╚═══════════════════════════════════════════╧═══════════════════════════════════╝

Verify data

When the the batch job runs, it processes the file in the local directory /staging/shared-files and transforms each item to uppercase names and inserts it into the database.

Open a shell in the mysql container to query the people table.:

kubectl get pods
...
my-release-mysql-56f988dd6c-qlm8q                1/1     Running
...
kubectl exec -it my-release-mysql-56f988dd6c-qlm8q -- /bin/bash
# mysql -u root -p$MYSQL_ROOT_PASSWORD
mysql> select * from dataflow.people;
+-----------+------------+-----------+
| person_id | first_name | last_name |
+-----------+------------+-----------+
|         1 | AARON      | AABERG    |
|         2 | AARON      | AABY      |
|         3 | ABBEY      | AADLAND   |
|         4 | ABBIE      | AAGAARD   |
|         5 | ABBY       | AAKRE     |
|         6 | ABDUL      | AALAND    |
|         7 | ABE        | AALBERS   |
|         8 | ABEL       | AALDERINK |
|         9 | ABIGAIL    | AALUND    |
|        10 | ABRAHAM    | AAMODT    |
|      ...                           |
+-----------+------------+-----------+

Limiting concurrent task executions

This recipe processes a single file with 5000+ items. What if we drop 100 files to the remote directory? The sftp source will process them immediately, generating 100 task launch requests. The Dataflow Server launches tasks asynchronously so this could potentially overwhelm the resources of the runtime platform. For example, when running the Data Flow server on your local machine, each launched task creates a new JVM. In Cloud Foundry, each task creates a new container instance, and in Kubernetes a pod.

Fortunately, Spring Cloud Data Flow provides configuration settings to limit the number of concurrently running tasks

We can use this sample to see how this works.

Lower the maximum concurrent task executions

The sample project includes 20 files in the data/spilt directory. To observe the limit in action we can set the maximum concurrent tasks to 3.

For running tasks on a local server, restart the server, adding a command line argument spring.cloud.dataflow.task.platform.local.accounts[default].maximum-concurrent-tasks=3.

If running on Cloud Foundry:

cf set-env <dataflow-server> SPRING_CLOUD_DATAFLOW_TASK_PLATFORM_CLOUDFOUNDRY_ACCOUNTS[DEFAULT]_DEPLOYMENT_MAXIMUMCONCURRENTTASKS 3

If running on Kubernetes, edit the Data Flow server configmap, for example:

kubectl edit configmap my-release-data-flow-server

Add the 'maximum-concurrent-tasks` property as shown below:

apiVersion: v1
data:
  application.yaml: |-
    spring:
      cloud:
        dataflow:
          task:
            platform:
              kubernetes:
                accounts:
                  default:
                    maximum-concurrent-tasks: 3
                    limits:
                      memory: 1024Mi
                      cpu: 500m

After editing the configmap, delete the Data Flow server pod to force it to restart then wait for it to restart.

Verify maximum concurrent task executions is enforced.

The task launcher sink polls the input destination. The polling period adjusts according to the presence of task launch requests and also to the number of currently running tasks reported via the Data Flow server's tasks/executions/current REST endpoint. The sink queries this endpoint and will pause polling the input for new requests if the number of concurrent tasks for the task platform is at its limit. This introduces a 1-30 second lag between the creation of the task launch request and the execution of the request, sacrificing some performance for resilience. Task launch requests will never be sent to a dead letter queue because the server is busy or unavailable. The exponential backoff also prevents the app from querying the server excessively when there are no task launch requests.

Monitor the task executions

Tail the task-launcher container logs.

You can also monitor the Data Flow server for current task executions:

watch curl <dataflow-server-url>/tasks/executions/current
Every 2.0s: curl http://192.168.99.105:30826/tasks/executions/current

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100    92    0    92    0     0   1202      0 --:--:-- --:--:-- --:--:
--  1210
[{"name":"default","type":"Kubernetes","maximumTaskExecutions":3,"runningExecutionCount":0}]

Run the sample with multiple files

With the sample stream deployed, upload the 20 files in data/spilt to /remote-files files. In the task-launcher logs, you should see the exponential backoff working:

2019-06-14 15:00:48.247  INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : Polling period reset to 1000 ms.
2019-06-14 15:00:49.265  INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : Launching Task fileIngestTask on platform default
2019-06-14 15:00:50.433  INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : Launching Task fileIngestTask on platform default
2019-06-14 15:00:51.686  INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : Launching Task fileIngestTask on platform default
2019-06-14 15:00:52.929  WARN 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : The data Flow task platform default has reached its concurrent task execution limit: (3)
2019-06-14 15:00:52.929  INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : Polling paused- increasing polling period to 2 seconds.
2019-06-14 15:00:55.008  WARN 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : The data Flow task platform default has reached its concurrent task execution limit: (3)
2019-06-14 15:00:55.008  INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : Polling paused- increasing polling period to 4 seconds.
2019-06-14 15:00:59.039  WARN 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : The data Flow task platform default has reached its concurrent task execution limit: (3)
2019-06-14 15:00:59.040  INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : Polling paused- increasing polling period to 8 seconds.
2019-06-14 15:01:07.104  WARN 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : The data Flow task platform default has reached its concurrent task execution limit: (3)
2019-06-14 15:01:07.104  INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : Polling paused- increasing polling period to 16 seconds.
2019-06-14 15:01:23.127  INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : Polling resumed
2019-06-14 15:01:23.128  INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : Launching Task fileIngestTask on platform default
2019-06-14 15:01:23.232  INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : Polling period reset to 1000 ms.
2019-06-14 15:01:24.277  INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : Launching Task fileIngestTask on platform default
2019-06-14 15:01:25.483  INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : Launching Task fileIngestTask on platform default
2019-06-14 15:01:26.743  INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : Launching Task fileIngestTask on platform default
2019-06-14 15:01:28.035  INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : Launching Task fileIngestTask on platform default
2019-06-14 15:01:29.324  WARN 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : The data Flow task platform default has reached its concurrent task execution limit: (3)
2019-06-14 15:01:29.325  INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : Polling paused- increasing polling period to 2 seconds.
2019-06-14 15:01:31.435  WARN 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : The data Flow task platform default has reached its concurrent task execution limit: (3)
2019-06-14 15:01:31.436  INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : Polling paused- increasing polling period to 4 seconds.
2019-06-14 15:01:35.531  WARN 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : The data Flow task platform default has reached its concurrent task execution limit: (3)
2019-06-14 15:01:35.532  INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : Polling paused- increasing polling period to 8 seconds.
2019-06-14 15:01:43.615  WARN 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : The data Flow task platform default has reached its concurrent task execution limit: (3)
2019-06-14 15:01:43.615  INFO 1 --- [pool-2-thread-1] o.s.c.s.a.t.l.d.s.LaunchRequestConsumer  : Polling paused- increasing polling period to 16 seconds.

Avoiding duplicate processing

The sftp source will not process files that it has already seen. It uses a Metadata Store to keep track of files by extracting content from messages at runtime. Out of the box, it uses an in-memory Metadata Store, but it is pluggable to a persistent store used for production deployments Thus, if we re-deploy the stream, or restart the sftp source, this state is lost and files will be reprocessed.

Thanks to the magic of Spring, we can auto-configure one of the available persistent Metadata Stores to prevent duplicate processing.

In this example, we will auto configure the JDBC Metadata Store since we are already using a JDBC database.

Configure and Build the SFTP source

For this we add some JDBC dependencies to the sftp-dataflow source.

Clone the [sftp]https://github.com/spring-cloud-stream-app-starters/sftp stream app starter. From the sftp directory. Replace <binder> below with kafka or rabbit as appropriate for your configuration:

./mvnw clean install -DskipTests -PgenerateApps
cd apps/sftp-dataflow-source-<binder>

Add the following dependencies to pom.xml:

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-jdbc</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
</dependency>

If you are running on Kubernetes use the mariadb driver instead of H2:

<dependency>
    <groupId>org.mariadb.jdbc</groupId>
    <artifactId>mariadb-java-client</artifactId>
    <version>2.3.0</version>
</dependency>

If you are running on a local server with the in memory H2 database, set the JDBC url in src/main/resources/application.properties to use the Data Flow server's database:

spring.datasource.url=jdbc:h2:tcp://localhost:19092/mem:dataflow

If running on Kubernetes, set the datasource to use the internal IP of the mysql service, e.g.:

spring.datasource.url=jdbc:mysql://10.98.214.235:3306/dataflow

If you are running in Cloud Foundry or Kubernetes, add the following property to src/main/resources/application.properties:

spring.integration.jdbc.initialize-schema=always

Build the sftp source and register it with Data Flow.

Run the sample app

Follow the instructions for running the sample on your preferred platform, up to the Drop file... Step`.

If you have already completed the main exercise, restore the data to its initial state, and redeploy the stream:

  • Clean the local and remote data directories
  • Execute the SQL command DROP TABLE PEOPLE; in the database
  • Undeploy the stream, and deploy it again to run the updated sftp source

If you are running in Cloud Foundry, set the deployment properties to bind sftp to the mysql service. For example:

dataflow>stream deploy inboundSftp --properties "deployer.sftp.cloudfoundry.services=nfs,mysql"

Drop a file into the remote directory

Let's use one small file for this. The directory data/split in the sample project contains the contents of data/name-list.csv split into 20 files. Upload names_aa.csv:

Inspect the database

Using a Database tool, as described above, view the contents of the INT_METADATA_STORE table.

JDBC Metadata Store

Note that there is a single key-value pair, where the key identies the file name (the prefix sftpSource/ provides a namespace for the sftp source app) and the value is a timestamp indicating when the message was received. The metadata store tracks files that have already been processed. This prevents the same files from being pulled from the remote directory on every polling cycle. Only new files, or files that have been updated will be processed.

Since there are no uniqueness constraints on the PEOPLE table, a file processed multiple times by our batch job will result in duplicate table rows. Since we have configured a persistent metadata store, duplicate processing will be prevented across container restarts. You can verify this by undeploying and redeploying the stream, or simply restarting the sftp source.

If we view the PEOPLE table, it should look something like this:

People table

Now let's upload the same file to the SFTP server, or if you are logged into it, you can just update the timestamp:

touch /remote-files/names_aa.csv

Now the file will be reprocessed and the PEOPLE table will contain duplicate data. If you ORDER BY FIRST_NAME, you will see something like this:

People table with duplicates

Of course, if we drop another one of files into the remote directory, that will processed and we will see another entry in the Metadata Store.