Postgres To Kafka: Load Data In Minutes With 2 Easy Methods

Luke Smith
Enterprise Solutions Architect
July 14, 2023
Matt Tanner
Developer Relations Lead
July 14, 2023
Matt Tanner
Developer Relations Lead
July 14, 2023
Get a migration case study in your inbox
Join our newsletter

In the modern data-driven era, organizations of all sizes and across various sectors produce a vast amount of data. This can create a landscape rich in information and potential insights. Yet, abundant data also introduces the challenge of effective data collection and implementing comprehensive analytics to extract meaningful, actionable insights. The crux of the matter lies in the real-time detection of changes in datasets and the subsequent, efficient application of these updates from one system or application to another.

Consider the scenario where applications and disparate data sources continuously publish data into a single, central database. This often results in a volatile and complex data landscape that is constantly being altered. In these circumstances, a sophisticated messaging system becomes a pivotal component of a comprehensive data management strategy. This type of software is engineered to respond to data alterations and perform different operations, including reading the latest data, updating records, or deleting obsolete data. It provides an automated, reliable mechanism to react to data changes, ensuring the system stays up-to-date.

Enter the world of data streaming platforms like Kafka. Kafka is a powerful, open-source stream-processing software platform that handles real-time data feeds. It's a versatile tool capable of handling many tasks ranging from seemingly simple data processing operations to complex ETL (Extract, Transform, Load) systems. It is particularly suited for high-velocity environments that demand robust systems with high throughput to swiftly process large volumes of data. Kafka has a unique ability to detect and track changes in your database as they occur, making it a valuable asset for any organization wishing to stay ahead of the curve with real-time data analytics.

This blog post will explore how you can harness the power of Kafka to continuously monitor changes in your PostgreSQL database. It will guide you in transmitting these changes to any chosen destination, allowing you to use the data for use cases such as real-time analytics. We'll walk you through two distinct methods to provide you with a comprehensive overview and give you a few options for replicating data. The first method involves crafting custom scripts designed to track and transmit changes. In contrast, the second method shown will leverage Arcion, simplifying the process by automating many steps. By the end of this article, you will have gained a deeper understanding of the role Kafka can play in real-time database monitoring and be equipped with practical knowledge to apply in your data management endeavors.

Table of Contents

What is Postgres?

PostgreSQL is a popular and powerful open-source Relational Database Management System (RDBMS). It uses standard Structured Query Language (SQL) and the pgAdmin tool to provide GUI and SQL interface. Postgres is a strictly SQL-compliant platform. 

It has been widely used in production systems for many years and has been in active development since its initial release in 1996. It supports all major operating systems, including Linux, Unix, Mac OS, and Windows. It is highly reliable, has data integrity, and data correctness backed by years of development through its open-source community.

PostgreSQL is a robust database frequently used as the database layer for web applications, a data warehouse, or analytics/business intelligence applications. PostgreSQL is a multi-use database with a track record of being the database of choice for some of the largest and most well-known applications and services. 

PostgreSQL is commonly seen being used:

  • By large corporations and startups for transactional database purposes.
  • As a back-end database in Linux, Apache, PostgreSQL, and PHP (python and Perl) (LAPP) stack to power dynamic websites and web applications.¬†
  • As a geospatial database used for geographic information systems (GIS) with the PostGIS extension found on PostgreSQL.

What is Kafka?

Apache Kafka is an open-source distributed event streaming service used to build high-performance data pipelines. Many organizations that adopt Kafka use it for streaming analytics, data integration, and to assist with database migrations. Kafka is one of the most well-known and frequently used data-streaming platforms. According to the Kafka website, it meets the stream-processing needs of over 80% of the Fortune 100.

Streaming data is data that is continuously generated from various data sources. Each source's data is processed in real time, and the data records created are typically sent for storage. A stream processing tool is required to handle this processing and routing of data. Each of the sources will need to be connected to one or multiple destinations through a data pipeline. This is exactly where Kafka comes in to complete the solution.

Kafka is a distinguished tool that can handle large amounts of data at scale. Kafka was built to process data sequentially and incrementally, as it is primarily used to build real-time streaming data pipelines. The Apache Kafka framework provides two messaging models: queuing and publish-subscribe. These messaging models offer different approaches to processing and moving data along the data pipelines created with Kafka. Both are highly scalable and can transfer millions of messages per second.

How to Connect Postgres to Kafka? ( Methods )

This section will look at two methods to connect PostgreSQL and Kafka. With a connection established, you can begin to receive updates from your database through the streaming services offered by Kafka. The two methods we will look at are custom scripts and using the modern data pipeline tool Arcion.

Method 1: Custom Scripts 

One approach to connecting PostgreSQL to Kafka is through custom scripts, primarily utilizing Debezium as a PostgreSQL Kafka connector. To understand this process, we must familiarize ourselves with the technologies used to establish the connection between the two platforms. Let’s briefly take a look at each component below.

  • Kafka: As previously explained, Kafka is a real-time event streaming platform capable of processing massive amounts of data. Its capacity to manage a fast data pipeline makes it an indispensable tool for projects requiring swift data transfer.
  • Kafka Connect: Kafka Connect acts as a bridge for streaming data in and out of Kafka, connecting the Kafka database with various data sources by specifying the connectors to move data from and to the databases. For this guide, Kafka Connect will be used to interface with Debezium and then PostgreSQL.
  • Debezium: This open-source tool employs Change Data Capture (CDC) features, converting Write-Ahead Logs (WALs) into data streams using the mechanisms provided by database systems. Debezium intercepts changes from one system and transports them to another, acting as a link between PostgreSQL and Kafka via the Kafka Connect API.
  • Zookeeper: An Apache tool, Zookeeper, maintains Kafka's configuration and manages the status of the Kafka cluster nodes, topics, partitions, etc. Zookeeper also supports concurrent read and write operations, functioning as a shared configuration service within the system.
  • Docker: Docker facilitates the setup of PostgreSQL, Kafka, and Debezium by defining and running the software in lightweight, standalone containers.

Now that we’ve briefly examined the technologies we will use to connect PostgreSQL and Kafka, let's dive into the step-by-step process. We'll start by running the PostgreSQL database, Kafka, Zookeeper, and Debezium instances using Docker

Step 1: Create a Postgres Container with Docker

This script initializes a basic PostgreSQL container setup with Docker. The "db" service specifies the image to pull from Docker Hub (`postgres:latest`) and the ports mapping for the container. The `POSTGRES_PASSWORD` environment variable sets the password for the database.

Before executing any Docker command, ensure Docker is installed and running on your machine. Open a terminal/command prompt to perform the next actions.

You must create a Docker Compose file, usually named docker-compose.yml, where you define all the services (in this case, PostgreSQL). This file can be located anywhere, though it's usually placed in the root directory of your project.

version: '3.9'services:  db:    image: postgres:latest    ports:      - "5432:5432"    environment:      - POSTGRES_PASSWORD=postgres


This script creates a new service named "db," based on the latest PostgreSQL image. It maps port 5432 of your machine to port 5432 of the Docker container.

Run this Docker Compose file by executing `docker-compose up` in the terminal, from the same directory where your docker-compose.yml is located.

Step 2: Running Multiple Instances for PostgreSQL Kafka Connector

After creating the PostgreSQL container, the database system is ready to accept connections. The next step is to add other images needed for the connection, such as Kafka Connect, Debezium, and Zookeeper, before connecting to PostgreSQL. The Docker Compose file should add these services and set up their necessary configurations, including network ports and environment variables. `depends_on` controls the startup order.

Update the docker-compose.yml as follows:

version: '3.9'services:  db:    image: postgres:latest    ports:      - "5432:5432"    environment:      - POSTGRES_PASSWORD=postgres  zookeeper:    image: debezium/zookeeper    ports:      - "2181:2181"      - "2888:2888"      - "3888:3888"  kafka:    image: debezium/kafka    ports:      - "9092:9092"      - "29092:29092"    depends_on:      - zookeeper    environment:      - ZOOKEEPER_CONNECT=zookeeper:2181      - KAFKA_ADVERTISED_LISTENERS=LISTENER_EXT://localhost:29092,LISTENER_INT://kafka:9092      - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=LISTENER_INT:PLAINTEXT,LISTENER_EXT:PLAINTEXT      - KAFKA_LISTENERS=LISTENER_INT://0.0.0.0:9092,LISTENER_EXT://0.0.0.0:29092      - KAFKA_INTER_BROKER_LISTENER_NAME=LISTENER_INT  connect:    image: debezium/connect    ports:      - "8083:8083"    environment:      - BOOTSTRAP_SERVERS=kafka:9092      - GROUP_ID=1      - CONFIG_STORAGE_TOPIC=my_connect_configs      - OFFSET_STORAGE_TOPIC=my_connect_offsets      - STATUS_STORAGE_TOPIC=my_connect_statuses    depends_on:      - zookeeper      - kafka


Run the updated Docker Compose file with the command `docker-compose up`.

Step 3: Create a Postgres Database

Now, you will need to create a table in the PostgreSQL server. For that, access the running PostgreSQL container and use the PostgreSQL command line interface.

In your terminal, execute `docker exec -it <container_id> psql -U postgres` to connect to your PostgreSQL server. <container_id> refers to the ID of your running PostgreSQL container, which can be found using the `docker ps` command.

Once you're connected, create your table using the command:‚Äć

create table test (id serial primary key,name varchar);


Step 4: Set up Kafka Connect (Debezium) to Connect to the Database (Postgres) 

Before Debezium can start streaming data from PostgreSQL, it requires a connector. Here, we'll create a configuration file for the Kafka Connector (Debezium), enabling it to connect to PostgreSQL to detect changes. Debezium is communicated with by making HTTP requests. A POST request containing a JSON-formatted data configuration will be used.

Debezium requires a connector configuration to start streaming data from PostgreSQL. This is a JSON file named debezium.json which you can create with the following command in your terminal:

$ echo '{    "name": "arctype-connector",    "config": {        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",        "tasks.max": "1",        "plugin.name": "wal2json",        "database.hostname": "db",        "database.port": "5432",        "database.user": "postgres",        "database.password": "arctype",        "database.dbname": "postgres",        "database.server.name": "ARCTYPE",        "key.converter": "org.apache.kafka.connect.json.JsonConverter",        "value.converter": "org.apache.kafka.connect.json.JsonConverter",        "key.converter.schemas.enable": "false",        "value.converter.schemas.enable": "false",        "snapshot.mode": "always"    }}' > debezium.json

‚Äć

Next, use the `curl` command to send the configuration to Debezium:

$ curl -i -X POST \         -H "Accept:application/json" \         -H "Content-Type:application/json" \         127.0.0.1:8083/connectors/ \         --data "@debezium.json"


This command sends a POST request to Debezium, passing the configuration file as data.

The command should return a response similar to the JSON object sent to the connector. This signifies a successful connection between PostgreSQL and Kafka, resulting in a running data pipeline using Debezium. Any changes made to the table in the PostgreSQL database, such as INSERT, DELETE, or UPDATE, will be captured and streamed to Kafka as a message in the Kafka topics associated with the table.

Disadvantages of using Custom Scripts to Connect Postgres to Kafka

While custom scripts offer a hands-on approach to connecting PostgreSQL to Kafka, they also come with some disadvantages:

  • Complexity: The connection process can be challenging due to the variety of tools and services that must be understood and coordinated.
  • Troubleshooting Difficulty: If the connection isn't established correctly, it can be difficult to identify the source of the error, and there may not be ready access to assistance.
  • Maintenance Effort: Scripts might require frequent updates to stay in sync with the evolution of the different components being used (PostgreSQL, Kafka, Debezium, etc.).
  • Limited Features: Custom scripts might not support advanced features like fault tolerance, exactly-once processing, load balancing, etc., that come out of the box with more sophisticated tools or services.
  • Scalability: Scaling up might be challenging as it would require manual management of multiple Kafka and PostgreSQL nodes.

Method 2: Using Arcion to Move Data from Postgres to Kafka

The second method for creating a connection between PostgreSQL and Kafka will be explored in this section. It will involve using the Arcion CLI tool, known as Replicant. Arcion is a real-time in-memory Change Data Capture (CDC) solution that guarantees you a high level of scalability and sub-second latency. It also ensures that your data is consistent at all times. One of the leading solutions for data migration and data replication, Arcion has integrations with various enterprise databases and data warehouses.

To use the Arcion Replicant CLI, you will first need to download and install the Arcion self-hosted before setting up PostgreSQL as the source database and then Kafka as the destination. Below are the steps needed to achieve this.

Step 1: Download And Install Arcion Self-hosted

The first thing we will need is to configure Arcion. For this, we will need to download and install the self-hosted version of Arcion. To gain access, you will be required to follow the steps outlined on the Arcion Self-hosted webpage. Once you have downloaded Arcion’s CLI tool Replicant, you will need to create a home directory for it. By default, the home directory will be where Replicant was downloaded. This home directory, from now on, will just be referred to as $REPLICANT_HOME in the examples below. The last step is adding your credentials to your Arcion instance, which is covered in the quick start guide that you can reference for more details on how to do this.

Having downloaded and installed Arcion Self-hosted, we can move on to the next steps where we will configure and enable Arcion to connect with PostgreSQL and Kafka. 

Step 2: Create a user in PostgreSQL

After the installation of Arcion, the next step is to create a user in PostgreSQL by executing the following steps.

  • Connect to your database server and log into the PostgreSQL client
psql -U $POSTGRESQL_ROOT_USER
  • Create a user that will be used for replication
CREATE USER <username> PASSWORD '<password>';
  • Grant the permissions listed below.
GRANT USAGE ON SCHEMA "<schema>" TO <username>;GRANT SELECT ON ALL TABLES IN SCHEMA "<schema>" TO <username>;ALTER ROLE <username> WITH REPLICATION;

Step 3: Setup PostgreSQL for Replication

For Arcion to access the data in Postgres, you’ll need to create a PostgreSQL user for replication. Below is an example of the commands that need to be run on your Postgres instance to do this.

First, you’ll need to open the postgresql.conf file so it can be edited. Below is the command you can use to edit it with vi.

‚Äć

vi $PGDATA/postgresql.conf


Next, set up the parameters as shown below in postgresql.conf

wal_level = logicalmax_replication_slots = 1 #Can be increased if more slots need to be created


Lastly, to enable log consumption for CDC replication on the PostgreSQL server, the test_decoding plugin that is by default installed in PostgreSQL can be used. Alternatively, you can install the logical decoding plugin wal2json.

For the tables part of the replication process that do not have a primary key, set the REPLICA IDENTITY to FULL.

ALTER TABLE <table_name> REPLICA IDENTITY FULL;

Step 4: Setup Connection Configuration

From $REPLICANT_HOME, go to the connection configuration file.

vi conf/conn/postgresql.yaml

Next, you can instruct Replicant to retrieve your connection credentials from AWS Secrets Manager if they are stored there. If not, simply put them as shown in the sample below.

type: POSTGRESQLhost: localhost #Replace localhost with your PostgreSQL hostnameport: 5432 #Replace the default port number 5432 if neededdatabase: "postgres" #Replace postgres with your database nameusername: "replicant" #Replace replicant with your postgresql usernamepassword: "Replicant#123" #Replace Replicant#123 with your user's passwordmax-connections: 30 #Maximum number of connections replicant can open in postgresqlsocket-timeout-s: 60 #The timeout value for socket read operations. The timeout is in seconds and a value of zero means that it is disabled.max-retries: 10 #Number of times any operation on the source system will be re-attempted on failures.retry-wait-duration-ms: 1000 #Duration in milliseconds Replicant should wait before performing the next retry of a failed operation.#List your replication slots (slots that hold the real-time changes of the source database) as follows  replication-slots:    io_replicate: #Replace "io-replicate" with your replication slot name      - wal2json #plugin used to create replication slot (wal2json | test_decoding)    io_replicate1: #Replace "io-replicate1" with your replication slot name      - wal2jsonlog-reader-type: {STREAM|SQL}

‚Äć

You can also enable SSL for the connection by including the SSL field and specifying the needed parameters.

ssl:  ssl-cert: <full_path_to_SSL_certificate_file>  root-cert: <full_path_to_SSL_root_certificate_file>  ssl-key: <full_path_to_SSL_key_file>

Step 5: Setup Filter Configuration

From $REPLICANT_HOME, open the filter configuration file.

vi filter/postgresql_filter.yaml

Specify the data to be replicated based on your replication needs using the template format shown below.

allow:  catalog: <your_catalog_name>  schema: <your_schema_name>  types: <your_object_type>#If not collections are specified, all the data tables in the provided catalog and schema will be replicatedallow:  <your_table_name>:    allow: ["your_column_name"]    conditions: "your_condition"  <your_table_name>:      allow: ["your_column_name"]    conditions: "your_condition"  <your_table_name>:      ```

Step 6: Setup Extractor Configuration

To set up the extractor configuration, you will have to specify the configuration in the Extractor configuration file according to your requirements. A sample Extractor configuration file, postgresql.yaml, is found in the $REPLICANT_HOME/conf/src directory. In this file, you can configure either snapshot, realtime, or delta-snapshot replication modes based on your needs by specifying parameters within each respective mode.

To configure snapshot replication

Below is a sample for the snapshot configuration mode.

snapshot:  threads: 16  fetch-size-rows: 5_000  _traceDBTasks: true  min-job-size-rows: 1_000_000  max-jobs-per-chunk: 32  per-table-config:  - catalog: tpch    schema: public    tables:      lineitem:        row-identifier-key: [l_orderkey, l_linenumber]        split-key: l_orderkey        split-hints:          row-count-estimate: 15000          split-key-min-value: 1          split-key-max-value: 60_00

‚Äć

To configure realtime replication

For the realtime replication mode, you must create a heartbeat table in the source PostgreSQL database. The steps below are used to create the table, grant privileges, and configure the Extractor configuration for realtime replication. 

To meet the heartbeat table requirement, create a heartbeat table in the schema of the database to be replicated using the DDL below.

CREATE TABLE "<user_database>"."public"."replicate_io_cdc_heartbeat"("timestamp" INT8 NOT NULL, PRIMARY  KEY("timestamp"))

Next, Grant INSERT, UPDATE, and DELETE privileges to the user configured for replication.

Lastly, under the realtime section of the extractor configuration file, specify your configuration, including your heartbeat table details, as seen in the example below.

realtime:  heartbeat:    enable: true    catalog: "postgres"    schema: "public"    table-name: replicate_io_cdc_heartbeat    column-name: timestamp

‚Äć

To configure delta-snapshot replication

The delta-snapshot replication mode can be used when you are unable to create replication slots in PostgreSQL using either wal2json or test_decoding. It carries out its replication without replication slots and uses PostgreSQL’s internal column to identify changes. The example below shows how this can be done in the delta-snapshot section of the Extractor configuration file.

delta-snapshot:  row-identifier-key: [orderkey,suppkey]  update-key: [partkey]  replicate-deletes: true|false  per-table-config:  - catalog: tpch    schema: public    tables:      lineitem1:        row-identifier-key: [l_orderkey, l_linenumber]        split-key: l_orderkey        replicate-deletes: false

Step 7: Setup Connection Configuration for Destination Apache Kafka

The extracted replicant-cli is referred to as $REPLICANT_HOME where you will specify your Kafka connection details to Replicant with a connection configuration file. The sample connection configuration file kafka.yaml is found in the $REPLICANT_HOME/conf/conn directory.

Arcion supports four different methods of creating a configuration connection with Kafka, the include: connecting with username and password without any data encryption, connecting with username and password with SSL for data encryption, connecting without username and password with no data encryption, and using SSL for both connection and data encryption. Let’s look at each method below in more detail.

Connecting with a username and password without any data encryption

This method allows the user to connect to Kafka with a username and password without any data encryption, this is done by specifying the connection details as given in the manner below. To use this method, you will have to enable username and password-based authentication on Kafka broker.

type: KAFKAusername: USERNAMEpassword: PASSWORDauth-type: SASLbrokers:  broker1:    host: HOSTNAME    port: PORT_NUMBER  broker2:    host: HOSTNAME    port: PORT_NUMBER  broker3:    host: HOSTNAME    port: PORT_NUMBER
‚Äć

From the sample above, we have the following:

USERNAME: This is the username that will be used to connect to Kafka.

PASSWORD: This is the password related to the USERNAME.

HOSTNAME: This is the hostname of the Kafka broker.

PORT-NUMBER: This is the port number of the Kafka broker.

Connecting with a username and password with SSL for data encryption

This second method allows the user to connect with a username and password while using SSL for data encryption. You will have to enable username and password-based authentication and data encryption on Kafka broker. The sample below shows how you can use this method by specifying the connection details. 

type: KAFKAusername: USERNAMEpassword: PASSWORDauth-type: SASLssl:  enable: true  trust-store:    path: "PATH_TO_TRUSTSTORE"    password: "TRUSTSTORE_PASSWORD"brokers:  broker1:    host: HOSTNAME    port: PORT_NUMBER  broker2:    host: HOSTNAME    port: PORT_NUMBER  broker3:    host: HOSTNAME    port: PORT_NUMBER

‚Äć

From the sample above, we have the following:

USERNAME: This is the username that will be used to connect to the Kafka server.

PASSWORD: This is the password related to the USERNAME.

HOSTNAME: This is the hostname of the Kafka broker.

PORT-NUMBER: This is the port number of the Kafka broker.

PPATH_TO_TRUSTSTORE: This is the path to the TrustStore with JKS type.

TRUSTSTORE_PASSWORD: This is the TrustStore password.

Connecting without a username and password with no data encryption

The third method allows you to connect to Kafka without the need for a username and password with no data encryption either. The sample below shows how to do this.

type: KAFKAauth-type: NONEbrokers:  broker1:    host: HOSTNAME    port: PORT_NUMBER  broker2:    host: HOSTNAME    port: PORT_NUMBER  broker3:    host: HOSTNAME    port: PORT_NUMBER

‚Äć

From the sample above, we have the following:

HOSTNAME: This is the hostname of the Kafka broker server.

PORT-NUMBER: This is the port number of the Kafka broker server.

Using SSL for both connection and data encryption

In the fourth and final method of setting up the connection configuration, the user needs to provide both client authentication and data encryption using SSL. You must enable SSL-based client authentication and data encryption on Kafka broker before specifying your connection details, as shown below. 

type: KAFKAauth-type: SSLssl:  enable: true  trust-store:    path: "PATH_TO_TRUSTSTORE"    password: "TRUSTSTORE_PASSWORD"  key-store:    path: "PATH_TO_KEYSTORE"    password: "KEYSTORE_PASSWORD"      brokers:  broker1:    host: HOSTNAME    port: PORT_NUMBER  broker2:    host: HOSTNAME    port: PORT_NUMBER  broker3:    host: HOSTNAME    port: PORT_NUMBER

From the sample above, we have the following:

USERNAME: This is the username that will be used to connect to the Kafka server.

PASSWORD: This is the password related to the USERNAME.

HOSTNAME: This is the hostname of the Kafka broker.

PORT-NUMBER: This is the port number of the Kafka broker.

PPATH_TO_TRUSTSTORE: This is the path to the TrustStore with JKS type.

TRUSTSTORE_PASSWORD: This is the TrustStore password.

PATH_TO_ KEYSTORE: This is the path to the KeyStore with JKS type.

KEYSTORE_PASSWORD: This is the KeyStone password.

Step 8: Configure Mapper File

This optional step can be used to define data mapping from your source PostgreSQL to Kafka. This is done by specifying the mapping rules in the mapper file.

Step 9: Setup Applier Configuration

  • From $REPLICANT_HOME, go to the sample Kafka Applier configuration file.
  • The configuration file contains parameters related to three modes, namely, Global, snapshot, and realtime.

Global Configuration Parameters

Global configuration parameters are found at the topmost level of the Applier configuration file, and the specifications made on it can affect both snapshot and realtime replication as its parameters are defined globally.

The NATIVE and JSON values are allowed using Global parameters, and the Applier configuration parameters are available in replication format.

Snapshot mode parameters

The snapshot mode has various Kafka-specific parameters, a few of these are shown in the sample below.

snapshot:  threads: 16  txn-size-rows: 10000  replication-factor: 1  schema-dictionary: SCHEMA_DUMP  # Allowed values: POJO | SCHEMA_DUMP| NONE  kafka-compression-type: lz4  kafka-batch-size-in-bytes: 100000  kafka-buffer-memory-size-in-bytes: 67108864  kafka-linger-ms: 10  skip-tables-on-failures : false  kafka-interceptor-classes: ["KafkaInterceptors.SampleInterceptor"]  producer-max-block-ms: 60_000  create-topic-timeout-ms: 100_000

From the sample above, we have the following parameters:

  • replication-factor: The replication factor is used for topics and Kafka cluster setup, it defines the factor in which Kafka topics partitions are replicated on different brokers.¬†
  • kafka-compression-type: This is used to set the compression type, the allowed values are lz4 the default, gzip, snappy, and none.
  • kafka-batch-size-in-bytes: This indicates the batch size for Kafka producer and the default is set to 100000.
  • kafka-buffer-memory-size-in-bytes: This shows the memory allocated to the Kafka client to store unsent messages, the default value is 67108864.
  • kafka-linger-ms: This configuration is used to give more time to Kafka batches to fill, it is usually in milliseconds and set to a default of 10.
  • kafka-interceptor-classes: This specifies the list of interceptor classes, it corresponds to ProducerConfig.INTERCEPTOR_CLASSES_CONFIG.
  • producer-max-block-ms: This corresponds to the max.block.ms parameter of Kafka Producer, its default value is 60_000.
  • create-topic-timeout-ms: This is used to specify the timeout for topic creation, its default is set to 60_000
  • Per-table-config: This configuration allows you to specify various properties for target tables on a per-table basis such as replication-factor, num-shards, shard-key, and shard-function.

Realtime mode parameters

The realtime section is used to operate in real time by specifying your configuration using Kafka-specific parameters available for realtime. A sample configuration is given below.

realtime:  txn-size-rows: 1000  before-image-format: ALL  # Allowed values : KEY, ALL  after-image-format: ALL   # Allowed values : UPDATED, ALL  kafka-compression-type: lz4  shard-key: id  num-shards: 1  shard-function: MOD # Allowed values: MOD, NONE. NONE means storage will use its default sharding  skip-tables-on-failures : false  producer-max-block-ms: 60_000  create-topic-timeout-ms: 100_000  per-table-config:  - tables:      io_blitzz_nation:        shard-key: id        num-shards: 16 #default: 1        shard-function: NONE      io_blitzz_region:        shard-key: id      io_blitzz_customer:        shard-key: custkey        num-shards: 16

From the sample above, we have the following parameters:

  • split-topic: This is a global parameter for realtime mode and can be set to true or false. When set to true, it creates a separate topic for snapshot and CDC data and if false, a single topic contains the data for snapshot and CDC.
  • replication-factor: The replication factor is used for CDC topics and Kafka cluster setup, it defines the factor in which Kafka topics partitions are replicated on different brokers.¬†
  • kafka-compression-type: This is used to set the compression type, the allowed values are lz4 the default, gzip, snappy, and none.
  • kafka-batch-size-in-bytes: This indicates the batch size for Kafka producer and the default is set to 100000.
  • kafka-buffer-memory-size-in-bytes: This shows the memory allocated to the Kafka client to store unsent messages, the default value is 67108864.
  • kafka-linger-ms: This configuration is used to give more time to Kafka batches to fill, it is usually in milliseconds and set to a default of 10.
  • kafka-interceptor-classes: This specifies the list of interceptor classes. It corresponds to ProducerConfig.INTERCEPTOR_CLASSES_CONFIG.
  • producer-max-block-ms: This corresponds to the max.block.ms parameter of Kafka Producer, its default value is 60_000.
  • create-topic-timeout-ms: This is used to specify the timeout for topic creation. Its default is set to 60_000

Advantages of Using Arcion

  1. Sub-second latency from distributed & highly scalable architecture: Arcion is the only CDC solution with an underlying end-to-end multi-threaded architecture, which supports auto-scaling both vertically and horizontally. Its patent-pending technology parallelizes every single Arcion CDC process for maximum throughput. So users get ultra-low latency and maximum throughput even as data volume grows.
  2. Arcion allows smooth schema management of your data, thereby ensuring data integrity, reliability, and consistency. As the schema evolves on the MySQL instance, these changes will be applied to the BigQuery instance automatically to keep both in sync.
  3. Arcion is the only CDC vendor in the market that offers 100% agentless CDC to all its supported 20+ connectors. Arcion‚Äôs agentless CDC connectors apply to all the complex enterprise databases modern enterprises use, like Microsoft SQL Server, MongoDB, and all versions of Oracle. Arcion reads directly from the transaction logs, never reading from the database itself. Previously, data teams faced administrative nightmares and security risks associated with running agent-based software in production environments. You can now replicate data in real-time, at scale, with guaranteed delivery ‚ÄĒ but without the inherent performance issues or the security concerns of agent-based connectors.
  4. Arcion provides transactional integrity and data consistency through its CDC technology. To further this effort, Arcion also has built-in data validation support that works automatically and efficiently to ensure data integrity is always maintained. It offers a solution for both scalable data migration and replication while making sure that zero data loss has occurred.
  5. Effortless setup & maintenance: Arcion's no-code platform removes DevOps dependencies; you do not need to incorporate Kafka, Spark Streaming, Kinesis, or other streaming tools. So you can simplify the data architecture, saving both time and cost.
  6. Arcion is SOC 2 Type 1 & Type 2, HIPAA, PCI compliant. The enterprise-grade security and compliance standards ensure data governance.

Conclusion

This article has taken you through the process of connecting Postgres with Kafka via custom scripts and the Arcion CLI tool, Replicant. Both Postgres and Kafka were explained then we began exploring the process of using custom scripts thoroughly. We covered how to use tools such as Debezium, Kafka Connect, Zookeeper, and Docker to implement a data pipeline. We also looked at some of the disadvantages of this approach.

Lastly, we looked at Arcion, an enterprise-ready Change Data Capture (CDC) solution that is profoundly scalable and efficiently integrates with some of the most popular enterprise databases and data warehouses. Arcion is the preferred platform for companies striving to modernize their data infrastructure with its real-time, in-memory CDC solution that ensures data consistency. When it comes to connecting PostgreSQL to Kafka, Arcion is the most efficient and trusted solution. You can get started today by connecting with our team at Arcion and connect your Postgres instance to Kafka in a matter of minutes.

Matt is a developer at heart with a passion for data, software architecture, and writing technical content. In the past, Matt worked at some of the largest finance and insurance companies in Canada before pivoting to working for fast-growing startups.
Matt is a developer at heart with a passion for data, software architecture, and writing technical content. In the past, Matt worked at some of the largest finance and insurance companies in Canada before pivoting to working for fast-growing startups.
Luke has two decades of experience working with database technologies and has worked for companies like Oracle, AWS, and MariaDB. He is experienced in C++, Python, and JavaScript. He now works at Arcion as an Enterprise Solutions Architect to help companies simplify their data replication process.
Join our newsletter

Take Arcion for a Spin

Deploy the only cloud-native data replication platform you’ll ever need. Get real-time, high-performance data pipelines today.

Free download

8 sources & 6 targets

Pre-configured enterprise instance

Available in four US AWS regions

Contact us

20+ enterprise source and target connectors

Deploy on-prem or VPC

Satisfy security requirements