Skip to main content

Your submission was sent successfully! Close

Thank you for signing up for our newsletter!
In these regular emails you will find the latest updates from Canonical and upcoming events where you can meet our team.Close

Thank you for contacting us. A member of our team will be in touch shortly. Close

How to use Kafka Connect for ETL workloads

Kafka Connect is a framework for easy deployment of Apache Kafka clients for common ETL tasks on different data sources and sinks, managed through multiple jobs running on a distributed cluster of workers.

The Kafka Connect charm delivers automated operations management from day 0 to day 2 on Kafka Connect, which hugely simplifies the deployment and adminisitrative tasks on Kafka Connect clusters.

This operator can be found on Charmhub and it comes with production-ready features such as automated and manual plugin management, replication and scalability, authentication, TLS support, and seamless integration with Charmed Apache Kafka set of operators.

This How-to guide covers deploying Kafka Connect, integrating it with Charmed Apache Kafka, and running a connector—either manually or using an integrator charm.

Prerequisites

For this guide, we will need an active Charmed Apache Kafka application, either using Apache ZooKeeper or in KRaft mode. Follow the How to deploy Charmed Apache Kafka guide to set up the environment.

Deploy and set up

To deploy Kafka Connect charm and integrate it with Charmed Apache Kafka, use the following commands:

juju deploy kafka-connect --channel latest/edge
juju integrate kafka-connect kafka

Use REST API

Kafka Connect uses a RESTful API for common administrative tasks. By default, Charmed Kafka Connect enforces authentication on the Kafka Connect REST API.

To configure the password of the built-in admin user via Juju secrets, first, create a secret in Juju containing your password:

juju add-secret mysecret admin=<secure-password>

You will receive a secret-id in response. Make sure to note it down, as you will need to configure it for the Kafka Connect charm shortly. It looks like this:

secret:cvh7kruupa1s46bqvuig

Now, grant the secret to the Kafka Connect charm using juju grant-secret command:

juju grant-secret mysecret kafka-connect

Finally, the Kafka Connect charm should be configured to use the newly provided secret. This can be done by running the juju config command to specify the secret-id obtained above:

juju config kafka-connect system-users=secret:cvh7kruupa1s46bqvuig

To verify that Kafka Connect is properly configured and functioning, send a request to the REST interface to list all registered connectors using the password set in Juju secret:

curl -u admin:<secure-password> -X GET http://<kafka-connect-unit-ip>:8083/connector-plugins

You should get a response like below:

[
  {
    "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
    "type": "source",
    "version": "3.9.0-ubuntu1"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
    "type": "source",
    "version": "3.9.0-ubuntu1"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    "type": "source",
    "version": "3.9.0-ubuntu1"
  }
]

Add a connector plugin

To add a custom connector plugin to the Charmed Kafka Connect, you can use the juju attach-resource command. For example, let’s add the Aiven’s open source S3 source connector to Charmed Kafka Connect.

First, download the connector plugin v3.2.0 from the respective repository:

wget https://github.com/Aiven-Open/cloud-storage-connectors-for-apache-kafka/releases/download/v3.2.0/s3-source-connector-for-apache-kafka-3.2.0.tar

Once downloaded, attach the connector to the charm using the juju attach-resource command.

juju attach-resource kafka-connect connect-plugin=./s3-source-connector-for-apache-kafka-3.2.0.tar

This triggers a restart of Charmed Kafka Connect application. Once all units show active|idle status, the plugin is ready to use. To verify using the Kafka Connect REST API:

curl -u admin:<secure-password> -X GET http://<kafka-connect-unit-ip>:8083/connector-plugins

The output will have {"class":"io.aiven.kafka.connect.s3.source.S3SourceConnector","type":"source","version":"3.2.0"}.

Start connector/task

Once our desired plugin is available, use the Kafka Connect REST API to manually start a task. This can be achieved by sending a POST request with a JSON containing task configuration to the /connectors endpoint.

For example, in order to load data from JSONL files on an AWS S3 bucket named testbucket into a Apache Kafka topic named s3topic, the following request can be sent to the Kafka Connect REST endpoint (please refer to Aiven’s S3 source connector docs for more details on the connector configuration):

curl -u admin:<secure-password> \
     -H "Content-Type: application/json" \
     -d '{
          "name": "test-s3-source",
          "config": {
               "connector.class": "io.aiven.kafka.connect.s3.source.S3SourceConnector",
               "tasks.max": 1,
               "key.converter": "org.apache.kafka.connect.storage.StringConverter",
               "input.type": "jsonl",
               "topic": "s3topic",
               "aws.access.key.id": "<YOUR_AWS_KEY_ID>",
               "aws.secret.access.key": "<YOUR_AWS_SECRET_ACCESS_KEY>",
               "aws.s3.region": "us-east-1",
               "aws.s3.bucket.name": "testbucket"
          }
     }' -X POST http://<kafka-connect-unit-ip>:8083/connectors

Each connector has its own specific configuration options covering the specifics of each connector. Please refer to the connector’s documentation for more information on available options and supported use cases.

Check connector/task status

Once the task is submitted, we can query the /connectors REST endpoint to find out the status of our submitted connector/task:

curl -u admin:<secure-password> -X GET http://<kafka-connect-unit-ip>:8083/connectors?expand=status

The returned value is a JSON showing status of each connector and its associated tasks:

{
  "test-s3-source": {
    "status": {
      "name": "test-s3-source",
      "connector": {
        "state": "RUNNING",
        "worker_id": "10.150.221.240:8083"
      },
      "tasks": [
        {
          "id": 0,
          "state": "RUNNING",
          "worker_id": "10.150.221.240:8083"
        }
      ],
      "type": "source"
    }
  }
}

Stop connector

A connector is continuously running as long as the cluster is up. To stop a connector, use the /connectors/<connector-name>/stop endpoint:

curl -u admin:<secure-password> -X PUT http://<kafka-connect-unit-ip>:8083/connectors/test-s3-source/stop

The connector is now in the STOPPED state:

{
  "test-s3-source": {
    "status": {
      "name": "test-s3-source",
      "connector": {
        "state": "STOPPED",
        "worker_id": "10.150.221.240:8083"
      },
      "tasks": [],
      "type": "source"
    }
  }
}

Use Kafka Connect integrator charms

While connectors lifecycle management can be done manually using the Kafka Connect REST endpoint, for common use-cases such as moving data from/to popular databases/storage services, the recommended way is to use the Kafka Connect integrator family of charms.

Each integrator charm is designed for a general ETL use case and streamlines the entire process - from loading connector plugins to configuring connectors, managing task execution, and reporting status - significantly reducing administrative overhead.

A curated set of integrators for common ETL use cases on Canonical Data Platform line of products are provided in the Template Connect Integrator repository. These charmed operators support use cases such as loading data to and from MySQL, PostgreSQL, OpenSearch, S3-compatible storage services, and active/passive replication of Apache Kafka topics using MirrorMaker. To learn more about integrator charms, please refer to the tutorial which covers a practical use-case of moving data from MySQL to Opensearch using integrator charms.

Last updated 8 hours ago. Help improve this document in the forum.