initial commit

This commit is contained in:
Dursun KOC
2022-02-05 23:55:30 +03:00
commit a9ef8f0e81
18 changed files with 890 additions and 0 deletions

6
.gitignore vendored Normal file
View File

@@ -0,0 +1,6 @@
.DS_Store
oracle_instantclient
visual-connector
visual-connector-be
nginx
docker-compose-ws.yaml

166
README.md Normal file
View File

@@ -0,0 +1,166 @@
# Using Debezium From Oracle To Oracle & Postgresql
You must download the [Oracle instant client for Linux](http://www.oracle.com/technetwork/topics/linuxx86-64soft-092277.html)
and put it under the directory _debezium-with-oracle-jdbc/oracle_instantclient_.
```shell
+-------------+
| |
| Oracle |
| |
+-------------+
+
|
|
|
v
+----------------------------------+
| |
| Kafka Connect |
| (Debezium, JDBC connectors) |
| |
+----------------------------------+
+
|
_____________|_____________
| |
v v
+-----------------+ +-----------------+
| | ~~~~ | TARGET |
| TARGET ORACLE | ~~~~ | POSTGRESQL |
| | ~~~~ | |
+-----------------+ +-----------------+
```
- Start the topology as defined in <https://debezium.io/docs/tutorial/>
```shell
export DEBEZIUM_VERSION=1.7
export PROJECT_PATH=$(pwd -P)
docker-compose -f docker-compose.yaml up --build --no-start
docker-compose -f docker-compose.yaml start
```
- Start Oracle sink connector for Customers table.
```shell
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-oracle-sink-customers.json
```
- Start Postgres sink connector for Customers table.
```shell
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres-sink-customers.json
```
- Start Oracle source connector
```shell
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-source-oracle.json
```
- Connect to Source Oracle DB
- Host: localhost
- Port: 1521
- Service Name: XE
- user: SYS
- pass: oracle
- Connect to Target Oracle DB
- Host: localhost
- Port: 3042
- Service Name: XE
- user: SYS
- pass: oracle
- Connect to Target Postgresql DB
- Host: localhost
- Port: 5432
- user: postgres
- pass: postgres
- database: inventory
- Make changes on Source DB, see results on kafka topic, and on the target database.
```sql
--SOURCE DB
SELECT * FROM INVENTORY.CUSTOMERS c ;
UPDATE INVENTORY.CUSTOMERS c SET c.FIRST_NAME = CASE WHEN c.FIRST_NAME = 'Anne' THEN 'Marie Anne' ELSE 'Anne' END
WHERE c.id = 1004;
UPDATE INVENTORY.CUSTOMERS c SET c.EMAIL = c.EMAIL || '.tr';
--TARGET DB
SELECT * FROM ALL_TABLES at2 WHERE OWNER = 'INVENTORY';
SELECT * FROM INVENTORY.CUSTOMERS c;
--TARGET DB - POSTGRESQL
SELECT * FROM information_schema.tables where table_schema = 'public';
SELECT * FROM public."CUSTOMERS" c;
```
- See the kafka topics
```shell
docker exec -it kafka /kafka/bin/kafka-topics.sh --bootstrap-server kafka:9092 --list
```
- Inpsect a kafka topic
```shell
export DEBEZIUM_VERSION=1.7
export PROJECT_PATH=$(pwd -P)
docker-compose -f docker-compose.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic oracle-db-source.INVENTORY.CUSTOMERS
```
- See the connectors
```shell
curl -i -X GET http://localhost:8083/connectors
```
- Manage Connectors
- See the connector status
```shell
curl -s "http://localhost:8083/connectors?expand=info&expand=status"
```
```shell
curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
column -s : -t| sed 's/\"//g'| sort
```
- Restart a connector
```shell
curl -i -X POST http://localhost:8083/connectors/inventory-source-connector/restart
#OR
curl -i -X POST http://localhost:8083/connectors/jdbc-sink-customers/restart
```
- Remove a connector
```shell
curl -i -X DELETE http://localhost:8083/connectors/inventory-source-connector
#OR
curl -i -X DELETE http://localhost:8083/connectors/jdbc-sink-customers
```
- Stop the topology
```shell
export DEBEZIUM_VERSION=1.7
export PROJECT_PATH=$(pwd -P)
docker-compose -f docker-compose.yaml down
```

View File

@@ -0,0 +1,38 @@
ARG DEBEZIUM_VERSION
FROM debezium/connect:$DEBEZIUM_VERSION
ENV KAFKA_CONNECT_JDBC_DIR=$KAFKA_CONNECT_PLUGINS_DIR/kafka-connect-jdbc \
INSTANT_CLIENT_DIR=/instant_client/ \
DEBEZIUM_CONNECTOR_ORACLE_DIR=$KAFKA_CONNECT_PLUGINS_DIR/debezium-connector-oracle
ARG POSTGRES_VERSION=42.2.8
ARG KAFKA_JDBC_VERSION=10.2.0
ARG KAFKA_ELASTICSEARCH_VERSION=5.3.2
ARG JMX_AGENT_VERSION
# Deploy PostgreSQL JDBC Driver
RUN cd /kafka/libs && curl -sO https://jdbc.postgresql.org/download/postgresql-$POSTGRES_VERSION.jar
# Deploy Kafka Connect JDBC
RUN mkdir $KAFKA_CONNECT_JDBC_DIR && cd $KAFKA_CONNECT_JDBC_DIR &&\
curl -sO https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/$KAFKA_JDBC_VERSION/kafka-connect-jdbc-$KAFKA_JDBC_VERSION.jar &&\
cp $DEBEZIUM_CONNECTOR_ORACLE_DIR/* .
RUN mkdir /kafka/etc && cd /kafka/etc &&\
curl -so jmx_prometheus_javaagent.jar \
https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/$JMX_AGENT_VERSION/jmx_prometheus_javaagent-$JMX_AGENT_VERSION.jar
COPY config.yml /kafka/etc/config.yml
COPY connect-distributed.properties /kafka/config/connect-distributed.properties
RUN mkdir /kafka/connect/multiple-field-timestamp-converter/
COPY multiple-field-timestamp-converter-1.0.0-jar-with-dependencies.jar /kafka/connect/multiple-field-timestamp-converter/multiple-field-timestamp-converter-1.0.0-jar-with-dependencies.jar
USER root
RUN microdnf -y install libaio && microdnf clean all
USER kafka
# Deploy Oracle client and drivers
COPY oracle_instantclient/* $INSTANT_CLIENT_DIR
COPY oracle_instantclient/xstreams.jar /kafka/libs
COPY oracle_instantclient/ojdbc8.jar /kafka/libs

View File

@@ -0,0 +1,31 @@
startDelaySeconds: 0
ssl: false
lowercaseOutputName: false
lowercaseOutputLabelNames: false
rules:
- pattern : "kafka.connect<type=connect-worker-metrics>([^:]+):"
name: "kafka_connect_worker_metrics_$1"
- pattern : "kafka.connect<type=connect-metrics, client-id=([^:]+)><>([^:]+)"
name: "kafka_connect_metrics_$2"
labels:
client: "$1"
- pattern: kafka.connect<type=(.+)-metrics, connector=(.+), task=(.+)><>(.+-total|.+-count|.+-ms|.+-ratio|.+-avg|.+-failures|.+-requests|.+-timestamp|.+-logged|.+-errors|.+-retries|.+-skipped)
name: kafka_connect_$1_$4
labels:
connector: "$2"
task: "$3"
help: "Kafka Connect JMX metric type $1"
type: GAUGE
- pattern: "debezium.([^:]+)<type=connector-metrics, context=([^,]+), server=([^,]+), key=([^>]+)><>RowsScanned"
name: "debezium_metrics_RowsScanned"
labels:
plugin: "$1"
name: "$3"
context: "$2"
table: "$4"
- pattern: "debezium.([^:]+)<type=connector-metrics, context=([^,]+), server=([^>]+)>([^:]+)"
name: "debezium_metrics_$4"
labels:
plugin: "$1"
name: "$3"
context: "$2"

View File

@@ -0,0 +1,90 @@
##
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
# This file contains some of the configurations for the Kafka Connect distributed worker. This file is intended
# to be used with the examples, and some settings may differ from those used in a production system, especially
# the `bootstrap.servers` and those specifying replication factors.
# A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
bootstrap.servers=kafka:9092
# unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs
group.id=1
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true
# Topic to use for storing offsets. This topic should have many partitions and be replicated and compacted.
# Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
offset.storage.topic=my_connect_offsets
offset.storage.replication.factor=1
#offset.storage.partitions=25
# Topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated,
# and compacted topic. Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
config.storage.topic=my_connect_configs
config.storage.replication.factor=1
# Topic to use for storing statuses. This topic can have multiple partitions and should be replicated and compacted.
# Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
status.storage.topic=my_connect_statuses
status.storage.replication.factor=1
#status.storage.partitions=5
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=60000
# These are provided to inform the user about the presence of the REST host and port configs
# Hostname & Port for the REST API to listen on. If this is set, it will bind to the interface used to listen to requests.
rest.host.name=172.18.0.7
rest.port=8083
# The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
rest.advertised.host.name=172.18.0.7
rest.advertised.port=8083
# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/kafka/connect
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
task.shutdown.graceful.timeout.ms=10000
offset.flush.timeout.ms=5000
internal.key.converter=org.apache.kafka.connect.json.JsonConverter

View File

@@ -0,0 +1,87 @@
CREATE TABLE products (
id NUMBER(4) GENERATED BY DEFAULT ON NULL AS IDENTITY (START WITH 101) NOT NULL PRIMARY KEY,
name VARCHAR2(255) NOT NULL,
description VARCHAR2(512),
weight FLOAT
);
GRANT SELECT ON products to c##dbzuser;
ALTER TABLE products ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
INSERT INTO products
VALUES (NULL,'scooter','Small 2-wheel scooter',3.14);
INSERT INTO products
VALUES (NULL,'car battery','12V car battery',8.1);
INSERT INTO products
VALUES (NULL,'12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8);
INSERT INTO products
VALUES (NULL,'hammer','12oz carpenter''s hammer',0.75);
INSERT INTO products
VALUES (NULL,'hammer','14oz carpenter''s hammer',0.875);
INSERT INTO products
VALUES (NULL,'hammer','16oz carpenter''s hammer',1.0);
INSERT INTO products
VALUES (NULL,'rocks','box of assorted rocks',5.3);
INSERT INTO products
VALUES (NULL,'jacket','water resistent black wind breaker',0.1);
INSERT INTO products
VALUES (NULL,'spare tire','24 inch spare tire',22.2);
-- Create and populate the products on hand using multiple inserts
CREATE TABLE products_on_hand (
product_id NUMBER(4) NOT NULL PRIMARY KEY,
quantity NUMBER(4) NOT NULL,
FOREIGN KEY (product_id) REFERENCES products(id)
);
GRANT SELECT ON products_on_hand to c##dbzuser;
ALTER TABLE products_on_hand ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
INSERT INTO products_on_hand VALUES (101,3);
INSERT INTO products_on_hand VALUES (102,8);
INSERT INTO products_on_hand VALUES (103,18);
INSERT INTO products_on_hand VALUES (104,4);
INSERT INTO products_on_hand VALUES (105,5);
INSERT INTO products_on_hand VALUES (106,0);
INSERT INTO products_on_hand VALUES (107,44);
INSERT INTO products_on_hand VALUES (108,2);
INSERT INTO products_on_hand VALUES (109,5);
-- Create some customers ...
CREATE TABLE customers (
id NUMBER(4) GENERATED BY DEFAULT ON NULL AS IDENTITY (START WITH 1001) NOT NULL PRIMARY KEY,
first_name VARCHAR2(255) NOT NULL,
last_name VARCHAR2(255) NOT NULL,
email VARCHAR2(255) NOT NULL UNIQUE
);
GRANT SELECT ON customers to c##dbzuser;
ALTER TABLE customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
INSERT INTO customers
VALUES (NULL,'Sally','Thomas','sally.thomas@acme.com');
INSERT INTO customers
VALUES (NULL,'George','Bailey','gbailey@foobar.com');
INSERT INTO customers
VALUES (NULL,'Edward','Walker','ed@walker.com');
INSERT INTO customers
VALUES (NULL,'Anne','Kretchmar','annek@noanswer.org');
-- Create some very simple orders
CREATE TABLE debezium.orders (
id NUMBER(6) GENERATED BY DEFAULT ON NULL AS IDENTITY (START WITH 10001) NOT NULL PRIMARY KEY,
order_date DATE NOT NULL,
purchaser NUMBER(4) NOT NULL,
quantity NUMBER(4) NOT NULL,
product_id NUMBER(4) NOT NULL,
FOREIGN KEY (purchaser) REFERENCES customers(id),
FOREIGN KEY (product_id) REFERENCES products(id)
);
GRANT SELECT ON orders to c##dbzuser;
ALTER TABLE orders ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
INSERT INTO orders
VALUES (NULL, '16-JAN-2016', 1001, 1, 102);
INSERT INTO orders
VALUES (NULL, '17-JAN-2016', 1002, 2, 105);
INSERT INTO orders
VALUES (NULL, '19-FEB-2016', 1002, 2, 106);
INSERT INTO orders
VALUES (NULL, '21-FEB-2016', 1003, 1, 107);

79
docker-compose.yaml Normal file
View File

@@ -0,0 +1,79 @@
version: '3'
services:
zookeeper:
container_name: zookeeper
image: debezium/zookeeper:${DEBEZIUM_VERSION}
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
container_name: kafka
image: debezium/kafka:${DEBEZIUM_VERSION}
ports:
- 9092:9092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
connect:
container_name: connect
image: debezium/connect-with-oracle-jdbc:${DEBEZIUM_VERSION}
build:
context: debezium-with-oracle-jdbc
args:
DEBEZIUM_VERSION: ${DEBEZIUM_VERSION}
JMX_AGENT_VERSION: 0.15.0
ports:
- 5005:5005
- 1976:1976
- 8083:8083
links:
- kafka
- oracle-db-source
- oracle-db-target
- postgres
volumes:
- ${PROJECT_PATH}/debezium-with-oracle-jdbc/xstreams.jar /kafka/libs
- ${PROJECT_PATH}/debezium-with-oracle-jdbc/oracle_instantclient/ojdbc8.jar /kafka/libs
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
- LD_LIBRARY_PATH=/instant_client
- KAFKA_OPTS=-javaagent:/kafka/etc/jmx_prometheus_javaagent.jar=8080:/kafka/etc/config.yml
- JMXHOST=localhost
- JMXPORT=1976
oracle-db-source:
container_name: oracle-db-source
image: oracleinanutshell/oracle-xe-11g:latest
ports:
- 1521:1521
- 5500:5500
volumes:
- ${PROJECT_PATH}/ora-init:/docker-entrypoint-initdb.d
environment:
- ORACLE_ALLOW_REMOTE=YES
- ORACLE_HOME=/u01/app/oracle/product/11.2.0/xe
oracle-db-target:
container_name: oracle-db-target
image: oracleinanutshell/oracle-xe-11g:latest
ports:
- 3042:1521
- 3300:5500
volumes:
- ${PROJECT_PATH}/ora-target-init:/docker-entrypoint-initdb.d
environment:
- ORACLE_ALLOW_REMOTE=YES
- ORACLE_HOME=/u01/app/oracle/product/11.2.0/xe
postgres:
container_name: postgres
image: postgres
ports:
- 5432:5432
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=inventory

View File

@@ -0,0 +1,110 @@
-- Create and populate our products using a single insert with many rows
create table inventory.products (
id NUMBER(4) NOT NULL PRIMARY KEY,
name VARCHAR2(255)NOT NULL,
description VARCHAR2(512),
weight FLOAT
);
create sequence inventory.t1_seq
increment by 1
start with 101;
GRANT SELECT ON inventory.products to c##logminer;
ALTER TABLE inventory.products ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
INSERT INTO inventory.products
VALUES (inventory.t1_seq.nextval,'scooter','Small 2-wheel scooter',3.14);
INSERT INTO inventory.products
VALUES (inventory.t1_seq.nextval,'car battery','12V car battery',8.1);
INSERT INTO inventory.products
VALUES (inventory.t1_seq.nextval,'12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8);
INSERT INTO inventory.products
VALUES (inventory.t1_seq.nextval,'hammer','12oz carpenter''s hammer',0.75);
INSERT INTO inventory.products
VALUES (inventory.t1_seq.nextval,'hammer','14oz carpenter''s hammer',0.875);
INSERT INTO inventory.products
VALUES (inventory.t1_seq.nextval,'hammer','16oz carpenter''s hammer',1.0);
INSERT INTO inventory.products
VALUES (inventory.t1_seq.nextval,'rocks','box of assorted rocks',5.3);
INSERT INTO inventory.products
VALUES (inventory.t1_seq.nextval,'jacket','water resistent black wind breaker',0.1);
INSERT INTO inventory.products
VALUES (inventory.t1_seq.nextval,'spare tire','24 inch spare tire',22.2);
-- Create and populate the products on hand using multiple inserts
CREATE TABLE inventory.products_on_hand (
product_id NUMBER(4) NOT NULL PRIMARY KEY,
quantity NUMBER(4) NOT NULL,
FOREIGN KEY (product_id) REFERENCES inventory.products(id)
);
GRANT SELECT ON inventory.products_on_hand to c##logminer;
ALTER TABLE inventory.products_on_hand ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
INSERT INTO inventory.products_on_hand VALUES (101,3);
INSERT INTO inventory.products_on_hand VALUES (102,8);
INSERT INTO inventory.products_on_hand VALUES (103,18);
INSERT INTO inventory.products_on_hand VALUES (104,4);
INSERT INTO inventory.products_on_hand VALUES (105,5);
INSERT INTO inventory.products_on_hand VALUES (106,0);
INSERT INTO inventory.products_on_hand VALUES (107,44);
INSERT INTO inventory.products_on_hand VALUES (108,2);
INSERT INTO inventory.products_on_hand VALUES (109,5);
-- Create some inventory.customers ...
CREATE TABLE inventory.customers (
id NUMBER(4) NOT NULL PRIMARY KEY,
first_name VARCHAR2(255) NOT NULL,
last_name VARCHAR2(255) NOT NULL,
email VARCHAR2(255) NOT NULL UNIQUE
);
create sequence inventory.t2_seq
increment by 1
start with 1001;
GRANT SELECT ON inventory.customers to c##logminer;
ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
INSERT INTO inventory.customers
VALUES (inventory.t2_seq.nextval,'Sally','Thomas','sally.thomas@acme.com');
INSERT INTO inventory.customers
VALUES (inventory.t2_seq.nextval,'George','Bailey','gbailey@foobar.com');
INSERT INTO inventory.customers
VALUES (inventory.t2_seq.nextval,'Edward','Walker','ed@walker.com');
INSERT INTO inventory.customers
VALUES (inventory.t2_seq.nextval,'Anne','Kretchmar','annek@noanswer.org');
-- Create some very simple inventory.orders
CREATE TABLE inventory.orders (
id NUMBER(6) NOT NULL PRIMARY KEY,
order_date DATE NOT NULL,
purchase_date DATE NOT NULL,
purchaser NUMBER(4) NOT NULL,
quantity NUMBER(4) NOT NULL,
product_id NUMBER(4) NOT NULL,
FOREIGN KEY (purchaser) REFERENCES inventory.customers(id),
FOREIGN KEY (product_id) REFERENCES inventory.products(id)
);
create sequence inventory.t3_seq
increment by 1
start with 10001;
GRANT SELECT ON inventory.orders to c##logminer;
ALTER TABLE inventory.orders ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
INSERT INTO inventory.orders
VALUES (inventory.t3_seq.nextval, '16-JAN-2016', '16-JAN-2016', 1001, 1, 102);
INSERT INTO inventory.orders
VALUES (inventory.t3_seq.nextval, '17-JAN-2016', '17-JAN-2016', 1002, 2, 105);
INSERT INTO inventory.orders
VALUES (inventory.t3_seq.nextval, '19-FEB-2016', '19-FEB-2016', 1002, 2, 106);
INSERT INTO inventory.orders
VALUES (inventory.t3_seq.nextval, '21-FEB-2016', '21-FEB-2016', 1003, 1, 107);

View File

@@ -0,0 +1,150 @@
-- Create App User
CREATE USER INVENTORY IDENTIFIED BY dbz;
GRANT CONNECT TO INVENTORY;
GRANT DBA TO INVENTORY;
-- Create LogMiner Users/TBS
CREATE TABLESPACE LOGMINER_TBS DATAFILE '/u01/app/oracle/oradata/XE/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
CREATE USER c##dbzuser IDENTIFIED BY dbz DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
CREATE USER c##logminer IDENTIFIED BY dbz DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
GRANT CREATE SESSION TO c##dbzuser ;
GRANT CREATE SESSION TO c##logminer;
GRANT SELECT ON V_$DATABASE TO c##dbzuser;
GRANT FLASHBACK ANY TABLE TO c##dbzuser ;
GRANT SELECT ANY TABLE TO c##dbzuser ;
GRANT SELECT_CATALOG_ROLE TO c##dbzuser ;
GRANT EXECUTE_CATALOG_ROLE TO c##dbzuser ;
GRANT SELECT ANY TRANSACTION TO c##dbzuser ;
GRANT SELECT ANY DICTIONARY TO c##dbzuser ;
-- GRANT LOGMINING TO c##dbzuser ;
GRANT CREATE TABLE TO c##dbzuser ;
GRANT ALTER ANY TABLE TO c##dbzuser ;
GRANT LOCK ANY TABLE TO c##dbzuser ;
GRANT CREATE SEQUENCE TO c##dbzuser ;
-- GRANT LOGMINING TO c##logminer
GRANT CREATE SESSION TO c##logminer;
GRANT SELECT ON V_$DATABASE to c##logminer;
GRANT FLASHBACK ANY TABLE TO c##logminer;
GRANT LOCK ANY TABLE TO c##logminer;
GRANT CREATE TABLE TO c##logminer;
GRANT CREATE SEQUENCE TO c##logminer;
GRANT SELECT ON V_$LOG TO c##logminer;
-- GRANT LOGMINING TO c##logminer;
GRANT EXECUTE ON DBMS_LOGMNR TO c##logminer;
GRANT EXECUTE ON DBMS_LOGMNR_D TO c##logminer;
GRANT EXECUTE_CATALOG_ROLE TO c##logminer;
GRANT SELECT_CATALOG_ROLE TO c##logminer;
GRANT SELECT ANY TRANSACTION TO c##logminer;
GRANT SELECT ANY DICTIONARY TO c##logminer;
GRANT SELECT ON V_$LOGMNR_LOGS TO c##logminer;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO c##logminer;
GRANT SELECT ON V_$LOGFILE TO c##logminer;
GRANT SELECT ON V_$ARCHIVED_LOG TO c##logminer;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO c##logminer;
GRANT INSERT ANY TABLE TO c##logminer;
GRANT SELECT ANY TABLE TO c##logminer;
GRANT UPDATE ANY TABLE TO c##logminer;
GRANT DELETE ANY TABLE TO c##logminer;
CREATE USER debezium IDENTIFIED BY dbz;
GRANT CONNECT TO debezium;
GRANT CREATE SESSION TO debezium;
GRANT CREATE TABLE TO debezium;
GRANT CREATE SEQUENCE to debezium;
ALTER USER debezium QUOTA 100M on users;
-- add REDO size and number
SET PAGES 100
COL STATUS FORMAT a8
SELECT a.group#, b.bytes/1024/1024, b.status
FROM v$logfile a, v$log b
WHERE a.group#=b.group#;
ALTER DATABASE ADD LOGFILE GROUP 3 SIZE 200M;
ALTER DATABASE ADD LOGFILE GROUP 4 SIZE 200M;
ALTER DATABASE ADD LOGFILE GROUP 5 SIZE 200M;
ALTER DATABASE ADD LOGFILE MEMBER '/u01/app/oracle/fast_recovery_area/XE/onlinelog/02_mf_3_j1t13w5g_.log' TO GROUP 3;
ALTER DATABASE ADD LOGFILE MEMBER '/u01/app/oracle/fast_recovery_area/XE/onlinelog/02_mf_4_j1t13wk0_.log' TO GROUP 4;
ALTER DATABASE ADD LOGFILE MEMBER '/u01/app/oracle/fast_recovery_area/XE/onlinelog/02_mf_5_j1t13x3f_.log' TO GROUP 5;
SELECT a.group#, b.bytes/1024/1024, b.status
FROM v$logfile a, v$log b
WHERE a.group#=b.group#;
ALTER SYSTEM SWITCH LOGFILE;
ALTER SYSTEM SWITCH LOGFILE;
ALTER SYSTEM SWITCH LOGFILE;
ALTER SYSTEM SWITCH LOGFILE;
SELECT a.group#, b.bytes/1024/1024, b.status
FROM v$logfile a, v$log b
WHERE a.group#=b.group#;
ALTER SYSTEM SWITCH LOGFILE;
ALTER SYSTEM SWITCH LOGFILE;
SELECT a.group#, b.bytes/1024/1024, b.status
FROM v$logfile a, v$log b
WHERE a.group#=b.group#;
-- Enable ARCHIVELOG mode
SHUTDOWN IMMEDIATE;
STARTUP MOUNT
ALTER DATABASE ARCHIVELOG;
ALTER DATABASE OPEN;
ARCHIVE LOG LIST;
-- Supplemental Logging
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER PROFILE DEFAULT LIMIT FAILED_LOGIN_ATTEMPTS UNLIMITED;
set pages 100
col member format a69
col status format a8
select a.group#, b.bytes/1024/1024, b.status
from v$logfile a, v$log b
where a.group#=b.group#;
-- Temporary DBA to forget about this in test
GRANT DBA TO c##logminer;
GRANT DBA TO DEBEZIUM;
GRANT DBA TO c##dbzuser;
-- Recreate REDO GROUPS 1 and 2
ALTER DATABASE DROP LOGFILE GROUP 1;
ALTER DATABASE DROP LOGFILE GROUP 2;
ALTER DATABASE ADD LOGFILE GROUP 1 SIZE 200M;
ALTER DATABASE ADD LOGFILE GROUP 2 SIZE 200M;
ALTER DATABASE ADD LOGFILE MEMBER '/u01/app/oracle/fast_recovery_area/XE/onlinelog/02_mf_1_.log' TO GROUP 1;
ALTER DATABASE ADD LOGFILE MEMBER '/u01/app/oracle/fast_recovery_area/XE/onlinelog/02_mf_2_.log' TO GROUP 2;
exit;

9
ora-init/launch.sh Executable file
View File

@@ -0,0 +1,9 @@
##setup database
echo "Entering Launch"
export ORACLE_SID=XE
echo "Generating database."
$ORACLE_HOME/bin/sqlplus sys/oracle as sysdba @/docker-entrypoint-initdb.d/db-init/setup_database.sql
echo "Generating inventory schema."
$ORACLE_HOME/bin/sqlplus sys/oracle as sysdba @/docker-entrypoint-initdb.d/db-init/inventory.sql

View File

@@ -0,0 +1,5 @@
-- Create App User
CREATE USER INVENTORY IDENTIFIED BY dbz;
GRANT CONNECT TO INVENTORY;
GRANT DBA TO INVENTORY;
exit;

6
ora-target-init/launch.sh Executable file
View File

@@ -0,0 +1,6 @@
##setup database
echo "Entering Launch"
export ORACLE_SID=XE
echo "Generating database."
$ORACLE_HOME/bin/sqlplus sys/oracle as sysdba @/docker-entrypoint-initdb.d/db-init/setup_database.sql

View File

@@ -0,0 +1,23 @@
{
"name": "jdbc-sink-customers",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"dialect.name": "OracleDatabaseDialect",
"table.name.format": "CUSTOMERS",
"topics": "oracle-db-source.INVENTORY.CUSTOMERS",
"connection.url": "jdbc:oracle:thin:@oracle-db-target:1521:XE",
"connection.user": "INVENTORY",
"connection.password": "dbz",
"transforms": "unwrap,ettdate",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.ettdate.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.ettdate.timestamp.field": "ETT_DATE",
"auto.create": "true",
"insert.mode": "upsert",
"delete.enabled": "true",
"pk.fields": "ID",
"pk.mode": "record_key"
}
}

View File

@@ -0,0 +1,25 @@
{
"name": "jdbc-sink-orders",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"dialect.name": "OracleDatabaseDialect",
"table.name.format": "ORDERS",
"topics": "dbz_oracle.INVENTORY.ORDERS",
"connection.url": "jdbc:oracle:thin:@trg_oracle:1521:XE",
"connection.user": "INVENTORY",
"connection.password": "dbz",
"transforms": "unwrap,timestampConverter",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.timestampConverter.type": "io.github.dursunkoc.multiplefieldtimestampconverter.TimestampConverterMultifields$Value",
"transforms.timestampConverter.target.type": "Date",
"transforms.timestampConverter.fields": "ORDER_DATE,PURCHASE_DATE",
"transforms.timestampConverter.format": "yyyy-MM-dd HH:mm:ss",
"auto.create": "true",
"insert.mode": "upsert",
"delete.enabled": "true",
"pk.fields": "ID",
"pk.mode": "record_key"
}
}

View File

@@ -0,0 +1,21 @@
{
"name": "jdbc-sink-products",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"dialect.name": "OracleDatabaseDialect",
"table.name.format": "PRODUCTS",
"topics": "dbz_oracle.INVENTORY.PRODUCTS",
"connection.url": "jdbc:oracle:thin:@trg_oracle:1521:XE",
"connection.user": "INVENTORY",
"connection.password": "dbz",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"auto.create": "true",
"insert.mode": "upsert",
"delete.enabled": "true",
"pk.fields": "ID",
"pk.mode": "record_key"
}
}

View File

@@ -0,0 +1,19 @@
{
"name": "jdbc-sink-customers-postgress",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"dialect.name": "PostgreSqlDatabaseDialect",
"table.name.format": "CUSTOMERS",
"topics": "oracle-db-source.INVENTORY.CUSTOMERS",
"connection.url": "jdbc:postgresql://postgres:5432/inventory?user=postgres&password=postgres",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"auto.create": "true",
"insert.mode": "upsert",
"delete.enabled": "true",
"pk.fields": "ID",
"pk.mode": "record_key"
}
}

View File

@@ -0,0 +1,25 @@
{
"name": "inventory-source-connector",
"config": {
"connector.class" : "io.debezium.connector.oracle.OracleConnector",
"tasks.max" : "1",
"database.server.name" : "oracle-db-source",
"database.hostname" : "oracle-db-source",
"database.port" : "1521",
"database.user" : "c##logminer",
"database.password" : "dbz",
"database.dbname" : "XE",
"database.out.server.name":"dbzxout",
"database.oracle.version": "11",
"database.history.kafka.bootstrap.servers" : "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"database.connection.adapter": "logminer",
"table.include.list" : "INVENTORY.CUSTOMERS, INVENTORY.PRODUCTS, INVENTORY.ORDERS",
"database.schema": "inventory",
"errors.log.enable": "true",
"snapshot.lock.timeout.ms":"5000",
"include.schema.changes": "true",
"snapshot.mode":"initial",
"decimal.handling.mode": "double"
}
}