initial commit

This commit is contained in:
Dursun KOC
2021-07-16 01:15:03 +03:00
commit cacb1cb37b
8 changed files with 271 additions and 0 deletions

View File

@@ -0,0 +1,28 @@
FROM debezium/connect:1.0
ENV KAFKA_CONNECT_JDBC_DIR=$KAFKA_CONNECT_PLUGINS_DIR/kafka-connect-jdbc \
KAFKA_CONNECT_ES_DIR=$KAFKA_CONNECT_PLUGINS_DIR/kafka-connect-elasticsearch
ARG POSTGRES_VERSION=42.2.8
ARG KAFKA_JDBC_VERSION=5.3.2
ARG KAFKA_ELASTICSEARCH_VERSION=5.3.2
# Deploy PostgreSQL JDBC Driver
RUN cd /kafka/libs && curl -sO https://jdbc.postgresql.org/download/postgresql-$POSTGRES_VERSION.jar
# Deploy Kafka Connect JDBC
RUN mkdir $KAFKA_CONNECT_JDBC_DIR && cd $KAFKA_CONNECT_JDBC_DIR &&\
curl -sO https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/$KAFKA_JDBC_VERSION/kafka-connect-jdbc-$KAFKA_JDBC_VERSION.jar
# Deploy Confluent Elasticsearch sink connector
RUN mkdir $KAFKA_CONNECT_ES_DIR && cd $KAFKA_CONNECT_ES_DIR &&\
curl -sO https://packages.confluent.io/maven/io/confluent/kafka-connect-elasticsearch/$KAFKA_ELASTICSEARCH_VERSION/kafka-connect-elasticsearch-$KAFKA_ELASTICSEARCH_VERSION.jar && \
curl -sO https://repo1.maven.org/maven2/io/searchbox/jest/6.3.1/jest-6.3.1.jar && \
curl -sO https://repo1.maven.org/maven2/org/apache/httpcomponents/httpcore-nio/4.4.4/httpcore-nio-4.4.4.jar && \
curl -sO https://repo1.maven.org/maven2/org/apache/httpcomponents/httpclient/4.5.1/httpclient-4.5.1.jar && \
curl -sO https://repo1.maven.org/maven2/org/apache/httpcomponents/httpasyncclient/4.1.1/httpasyncclient-4.1.1.jar && \
curl -sO https://repo1.maven.org/maven2/org/apache/httpcomponents/httpcore/4.4.4/httpcore-4.4.4.jar && \
curl -sO https://repo1.maven.org/maven2/commons-logging/commons-logging/1.2/commons-logging-1.2.jar && \
curl -sO https://repo1.maven.org/maven2/commons-codec/commons-codec/1.9/commons-codec-1.9.jar && \
curl -sO https://repo1.maven.org/maven2/org/apache/httpcomponents/httpcore/4.4.4/httpcore-4.4.4.jar && \
curl -sO https://repo1.maven.org/maven2/io/searchbox/jest-common/6.3.1/jest-common-6.3.1.jar && \
curl -sO https://repo1.maven.org/maven2/com/google/code/gson/gson/2.8.6/gson-2.8.6.jar

View File

@@ -0,0 +1,49 @@
version: '2'
services:
zookeeper:
image: debezium/zookeeper:${DEBEZIUM_VERSION}
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: debezium/kafka:${DEBEZIUM_VERSION}
ports:
- 9092:9092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
mysql:
image: debezium/example-mysql:${DEBEZIUM_VERSION}
ports:
- 3306:3306
environment:
- MYSQL_ROOT_PASSWORD=debezium
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
postgres:
image: debezium/postgres:9.6
ports:
- "5432:5432"
environment:
- POSTGRES_USER=postgresuser
- POSTGRES_PASSWORD=postgrespw
- POSTGRES_DB=inventory
connect:
image: debezium/connect-jdbc-es:${DEBEZIUM_VERSION}
build:
context: debezium-jdbc-es
ports:
- 8083:8083
- 5005:5005
links:
- kafka
- mysql
- postgres
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_source_connect_statuses

View File

@@ -0,0 +1,36 @@
version: '2'
services:
zookeeper:
image: debezium/zookeeper:${DEBEZIUM_VERSION}
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: debezium/kafka:${DEBEZIUM_VERSION}
ports:
- 9092:9092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
postgres:
image: debezium/example-postgres:${DEBEZIUM_VERSION}
ports:
- 5432:5432
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
connect:
image: debezium/connect:${DEBEZIUM_VERSION}
ports:
- 8083:8083
links:
- kafka
- postgres
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses

93
readme.md Normal file
View File

@@ -0,0 +1,93 @@
# Debezium Sample
## Debezium From PostgreSql
+----------------+
| |
| PostgreSQL |
| |
+-------+--------+
|
|
|
+---------------v------------------+
| |
| Kafka Connect |
| (Debezium, JDBC connectors) |
| Kafka Messages |
+----------------------------------+
```shell
# Start the topology as defined in https://debezium.io/docs/tutorial/
export DEBEZIUM_VERSION=1.6
docker-compose -f docker-compose-postgres.yaml up
# Start Postgres connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres-source.json
# Consume messages from a Debezium topic
docker-compose -f docker-compose-postgres.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic dbserver1.inventory.customers
# Shut down the cluster
docker-compose -f docker-compose-postgres.yaml down
```
## Debezium From Mysql To PostgreSql
+-------------+
| |
| MySQL |
| |
+------+------+
|
|
|
+---------------v------------------+
| |
| Kafka Connect |
| (Debezium, JDBC connectors) |
| |
+---------------+------------------+
|
|
|
|
+-------v--------+
| |
| PostgreSQL |
| |
+----------------+
```shell
# Start the topology as defined in https://debezium.io/docs/tutorial/
export DEBEZIUM_VERSION=1.6
docker-compose -f docker-compose-mysql-to-postgres.yaml up
# Start Postgres sink connector for customers
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres-sink-customers.json
# Start Mysql source connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql-source.json
# Start Postgres sink connector for products
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres-sink-products.json
curl -i http://localhost:8083/connectors
# List topics
docker-compose -f docker-compose-postgres.yaml exec kafka /kafka/bin/kafka-topics.sh --list --bootstrap-server kafka:9092
# Inspect a topic
docker-compose -f docker-compose-postgres.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic customers
# Shut down the cluster
docker-compose -f docker-compose-mysql-to-postgres.yaml down
```

View File

@@ -0,0 +1,20 @@
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.whitelist": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3"
}
}

View File

@@ -0,0 +1,15 @@
{
"name": "jdbc-sink-customers",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "customers",
"connection.url": "jdbc:postgresql://postgres:5432/inventory?user=postgresuser&password=postgrespw",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"auto.create": "true",
"insert.mode": "upsert",
"pk.fields": "id",
"pk.mode": "record_value"
}
}

View File

@@ -0,0 +1,16 @@
{
"name": "jdbc-sink-products",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "products",
"connection.url": "jdbc:postgresql://postgres:5432/inventory?user=postgresuser&password=postgrespw",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"auto.create": "true",
"insert.mode": "upsert",
"delete.enabled":"true",
"pk.fields": "id",
"pk.mode": "record_key"
}
}

View File

@@ -0,0 +1,14 @@
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"database.server.name": "dbserver1",
"schema.include": "inventory"
}
}