Debezium Connector for Oracle

Table of Contents

Overview

Debezium’s Oracle connector captures and records row-level changes that occur in databases on an Oracle server, including tables that are added while the connector is running. You can configure the connector to emit change events for specific subsets of schemas and tables, or to ignore, mask, or truncate values in specific columns.

For information about the Oracle Database versions that are compatible with this connector, see the Debezium release overview.

Debezium can ingest change events from Oracle by using the native LogMiner database package, the XStream API, or OpenLogReplicator.

How the Oracle connector works

To optimally configure and run a Debezium Oracle connector, it is helpful to understand how the connector works.

Adapter Modes

The Debezium Oracle connector supports multiple adapters for capturing changes from the Oracle database transaction logs. You can configure the connector to use a specific adapter by setting the database.connection.adapter configuration property.

The connector supports the following adapters:

LogMiner

By default, the Debezium Oracle connector uses the native Oracle LogMiner API to read and stream changes from database transaction logs. You can configure Debezium to use LogMiner in either of the following distinct modes:

Uncommitted changes mode

In uncommitted changes mode, the Debezium Oracle connector receives a continuous stream of change events for both committed and in-flight uncommitted transactions. Because Oracle interleaves operations from multiple transactions, the connector must buffer events until a corresponding commit or rollback is detected.

Because this the default mode for the connector, it’s important to allocate enough memory to the connector to handle the largest transactions and highest concurrency levels across all captured tables.

Uncommitted changes mode minimizes the load on the source database by delegating most processing to the connector. Oracle LogMiner is responsible only for reading redo and archive logs from disk and streaming the raw changes.

Enable this mode by setting the database.connection.adapter property to logminer (default).

Committed changes mode

In committed changes mode, the Debezium Oracle connector instructs Oracle LogMiner to stream only committed changes. Because the connector receives only committed events, it can immediately forward them to destination topics without maintaining an internal transaction buffer.

This mode requires significantly less connector memory, but shifts a much greater processing and memory burden to the source database. For large transactions, this can easily exceed the database’s available PGA memory, potentially leading to PGA_AGGREGATE_LIMIT errors.

This mode is recommended only if you can guarantee that all transactions fit within the database’s PGA memory space. Otherwise, use the uncommitted changes mode.

Enable this mode by setting the database.connection.adapter property to logminer_unbuffered.

This feature is currently incubating and might change in future releases.

OpenLogReplicator

You can configure the Debezium Oracle connector to use OpenLogReplicator, an open-source application that reads Oracle changes directly from the redo and archive logs with minimal impact on the database.

Enable OpenLogReplicator by setting the database.connection.adapter property to olr. For more information, see the OpenLogReplicator support section.

Oracle XStream

The Debezium Oracle connector can also use Oracle XStream, a commercial component of Oracle GoldenGate.

Enable Oracle XStream by setting the database.connection.adapter property to xstream. For more details, see the XStream section.

Snapshots

Typically, the redo logs on an Oracle server are configured to not retain the complete history of the database. As a result, the Debezium Oracle connector cannot retrieve the entire history of the database from the logs. To enable the connector to establish a baseline for the current state of the database, the first time that the connector starts, it performs an initial consistent snapshot of the database.

If the time needed to complete the initial snapshot exceeds the UNDO_RETENTION time that is set for the database (fifteen minutes, by default), an ORA-01555 exception can occur. For more information about the error, and about the steps that you can take to recover from it, see the Frequently asked questions.

During a table’s snapshot, it’s possible for Oracle to raise an ORA-01466 exception. This happens when a user modifies the schema of the table or adds, changes, or drops an index or related object associated with the table being snapshot. In the event this happens, the connector will stop and the initial snapshot will need to be taken from the beginning.

To remediate the problem, you can configure the snapshot.database.errors.max.retries property with a value greater than 0 so that the specific table’s snapshot will restart. While the entire snapshot will not start from the beginning when retrying, the specific table in question will be re-read from the beginning and the table’s topic will contain duplicate snapshot events.

Default workflow that the Oracle connector uses to perform an initial snapshot

The following workflow lists the steps that Debezium takes to create a snapshot. These steps describe the process for a snapshot when the snapshot.mode configuration property is set to its default value, which is initial. You can customize the way that the connector creates snapshots by changing the value of the snapshot.mode property. If you configure a different snapshot mode, the connector completes the snapshot by using a modified version of this workflow.

When the snapshot mode is set to the default, the connector completes the following tasks to create a snapshot:

  1. Establish a connection to the database.

  2. Determine the tables to be captured. By default, the connector captures all tables except those with schemas that exclude them from capture. After the snapshot completes, the connector continues to stream data for the specified tables. If you want the connector to capture data only from specific tables you can direct the connector to capture the data for only a subset of tables or table elements by setting properties such as table.include.list or table.exclude.list.

  3. Obtain a ROW SHARE MODE lock on each of the captured tables to prevent structural changes from occurring during creation of the snapshot. Debezium holds the locks for only a short time.

  4. Read the current system change number (SCN) position from the server’s redo log.

  5. Capture the structure of all database tables, or all tables that are designated for capture. The connector persists schema information in its internal database schema history topic. The schema history provides information about the structure that is in effect when a change event occurs.

    By default, the connector captures the schema of every table in the database that is in capture mode, including tables that are not configured for capture. If tables are not configured for capture, the initial snapshot captures only their structure; it does not capture any table data. For more information about why snapshots persist schema information for tables that you did not include in the initial snapshot, see Understanding why initial snapshots capture the schema for all tables.

  6. Release the locks obtained in Step 3. Other database clients can now write to any previously locked tables.

  7. At the SCN position that was read in Step 4, the connector scans the tables that are designated for capture (SELECT * FROM …​ AS OF SCN 123). During the scan, the connector completes the following tasks:

    1. Confirms that the table was created before the snapshot began. If the table was created after the snapshot began, the connector skips the table. After the snapshot is complete, and the connector transitions to streaming, it emits change events for any tables that were created after the snapshot began.

    2. Produces a read event for each row that is captured from a table. All read events contain the same SCN position, which is the SCN position that was obtained in step 4.

    3. Emits each read event to the Kafka topic for the source table.

    4. Releases data table locks, if applicable.

  8. Record the successful completion of the snapshot in the connector offsets.

The resulting initial snapshot captures the current state of each row in the captured tables. From this baseline state, the connector captures subsequent changes as they occur.

After the snapshot process begins, if the process is interrupted due to connector failure, rebalancing, or other reasons, the process restarts after the connector restarts. After the connector completes the initial snapshot, it continues streaming from the position that it read in Step 3 so that it does not miss any updates. If the connector stops again for any reason, after it restarts, it resumes streaming changes from where it previously left off.

Table 1. Settings for snapshot.mode connector configuration property
Setting Description

always

Perform snapshot on each connector start. After the snapshot completes, the connector begins to stream event records for subsequent database changes.

initial

The connector performs a database snapshot as described in the default workflow for creating an initial snapshot. After the snapshot completes, the connector begins to stream event records for subsequent database changes.

initial_only

The connector performs a database snapshot and stops before streaming any change event records, not allowing any subsequent change events to be captured.

schema_only

Deprecated, see no_data.

no_data

The connector captures the structure of all relevant tables, performing all of the steps described in the default snapshot workflow, except that it does not create READ events to represent the data set at the point of the connector’s start-up (Step 6).

schema_only_recovery

Deprecated, see recovery.

recovery

Set this option to restore a database schema history topic that is lost or corrupted. After a restart, the connector runs a snapshot that rebuilds the topic from the source tables. You can also set the property to periodically prune a database schema history topic that experiences unexpected growth.

WARNING: Do not use this mode to perform a snapshot if schema changes were committed to the database after the last connector shutdown.

when_needed

After the connector starts, it performs a snapshot only if it detects one of the following circumstances:

  • It cannot detect any topic offsets.

  • A previously recorded offset specifies a log position that is not available on the server.

configuration_based

Set the snapshot mode to configuration_based to control snapshot behavior through the set of connector properties that have the prefix 'snapshot.mode.configuration.based'.

custom

The custom snapshot mode lets you inject your own implementation of the io.debezium.spi.snapshot.Snapshotter interface. Set the snapshot.mode.custom.name configuration property to the name provided by the name() method of your implementation. The name is specified on the classpath of your Kafka Connect cluster. If you use the Debezium DebeziumEngine, the name is included in the connector JAR file. For more information, see custom snapshotter SPI.

For more information, see snapshot.mode in the table of connector configuration properties.

Understanding why initial snapshots capture the schema history for all tables

The initial snapshot that a connector runs captures two types of information:

Table data

Information about INSERT, UPDATE, and DELETE operations in tables that are named in the connector’s table.include.list property.

Schema data

DDL statements that describe the structural changes that are applied to tables. Schema data is persisted to both the internal schema history topic, and to the connector’s schema change topic, if one is configured.

After you run an initial snapshot, you might notice that the snapshot captures schema information for tables that are not designated for capture. By default, initial snapshots are designed to capture schema information for every table that is present in the database, not only from tables that are designated for capture. Connectors require that the table’s schema is present in the schema history topic before they can capture a table. By enabling the initial snapshot to capture schema data for tables that are not part of the original capture set, Debezium prepares the connector to readily capture event data from these tables should that later become necessary. If the initial snapshot does not capture a table’s schema, you must add the schema to the history topic before the connector can capture data from the table.

In some cases, you might want to limit schema capture in the initial snapshot. This can be useful when you want to reduce the time required to complete a snapshot. Or when Debezium connects to the database instance through a user account that has access to multiple logical databases, but you want the connector to capture changes only from tables in a specific logic database.

Additional information

Capturing data from tables not captured by the initial snapshot (no schema change)

In some cases, you might want the connector to capture data from a table whose schema was not captured by the initial snapshot. Depending on the connector configuration, the initial snapshot might capture the table schema only for specific tables in the database. If the table schema is not present in the history topic, the connector fails to capture the table, and reports a missing schema error.

You might still be able to capture data from the table, but you must perform additional steps to add the table schema.

Prerequisites
Procedure
  1. Stop the connector.

  2. Remove the internal database schema history topic that is specified by the schema.history.internal.kafka.topic property.

  3. In the connector configuration:

    1. Set the snapshot.mode to recovery.

    2. (Optional) Set the value of schema.history.internal.store.only.captured.tables.ddl to false to ensure that in the future the connector can readily capture data for tables that are not currently designated for capture. Connectors can capture data from a table only if the table’s schema history is present in the history topic.

    3. Add the tables that you want the connector to capture to table.include.list.

  4. Restart the connector. The snapshot recovery process rebuilds the schema history based on the current structure of the tables.

  5. (Optional) After the snapshot completes, initiate an incremental snapshot on the newly added tables. The incremental snapshot first streams the historical data of the newly added tables, and then resumes reading changes from the redo and archive logs for previously configured tables, including changes that occur while that connector was off-line.

  6. (Optional) Reset the snapshot.mode back to no_data to prevent the connector from initiating recovery after a future restart.

Capturing data from tables not captured by the initial snapshot (schema change)

If a schema change is applied to a table, records that are committed before the schema change have different structures than those that were committed after the change. When Debezium captures data from a table, it reads the schema history to ensure that it applies the correct schema to each event. If the schema is not present in the schema history topic, the connector is unable to capture the table, and an error results.

If you want to capture data from a table that was not captured by the initial snapshot, and the schema of the table was modified, you must add the schema to the history topic, if it is not already available. You can add the schema by running a new schema snapshot, or by running an initial snapshot for the table.

Prerequisites
  • You want to capture data from a table with a schema that the connector did not capture during the initial snapshot.

  • A schema change was applied to the table so that the records to be captured do not have a uniform structure.

Procedure
Initial snapshot captured the schema for all tables (store.only.captured.tables.ddl was set to false)
  1. Edit the table.include.list property to specify the tables that you want to capture.

  2. Restart the connector.

  3. Initiate an incremental snapshot if you want to capture existing data from the newly added tables.

Initial snapshot did not capture the schema for all tables (store.only.captured.tables.ddl was set to true)

If the initial snapshot did not save the schema of the table that you want to capture, complete one of the following procedures:

Procedure 1: Schema snapshot, followed by incremental snapshot

In this procedure, the connector first performs a schema snapshot. You can then initiate an incremental snapshot to enable the connector to synchronize data.

  1. Stop the connector.

  2. Remove the internal database schema history topic that is specified by the schema.history.internal.kafka.topic property.

  3. Clear the offsets in the configured Kafka Connect offset.storage.topic. For more information about how to remove offsets, see the Debezium community FAQ.

    Removing offsets should be performed only by advanced users who have experience in manipulating internal Kafka Connect data. This operation is potentially destructive, and should be performed only as a last resort.

  4. Set values for properties in the connector configuration as described in the following steps:

    1. Set the value of the snapshot.mode property to no_data.

    2. Edit the table.include.list to add the tables that you want to capture.

  5. Restart the connector.

  6. Wait for Debezium to capture the schema of the new and existing tables. Data changes that occurred any tables after the connector stopped are not captured.

  7. To ensure that no data is lost, initiate an incremental snapshot.

Procedure 2: Initial snapshot, followed by optional incremental snapshot

In this procedure the connector performs a full initial snapshot of the database. As with any initial snapshot, in a database with many large tables, running an initial snapshot can be a time-consuming operation. After the snapshot completes, you can optionally trigger an incremental snapshot to capture any changes that occur while the connector is off-line.

  1. Stop the connector.

  2. Remove the internal database schema history topic that is specified by the schema.history.internal.kafka.topic property.

  3. Clear the offsets in the configured Kafka Connect offset.storage.topic. For more information about how to remove offsets, see the Debezium community FAQ.

    Removing offsets should be performed only by advanced users who have experience in manipulating internal Kafka Connect data. This operation is potentially destructive, and should be performed only as a last resort.

  4. Edit the table.include.list to add the tables that you want to capture.

  5. Set values for properties in the connector configuration as described in the following steps:

    1. Set the value of the snapshot.mode property to initial.

    2. (Optional) Set schema.history.internal.store.only.captured.tables.ddl to false.

  6. Restart the connector. The connector takes a full database snapshot. After the snapshot completes, the connector transitions to streaming.

  7. (Optional) To capture any data that changed while the connector was off-line, initiate an incremental snapshot.

Ad hoc snapshots

By default, a connector runs an initial snapshot operation only after it starts for the first time. Following this initial snapshot, under normal circumstances, the connector does not repeat the snapshot process. Any future change event data that the connector captures comes in through the streaming process only.

However, in some situations the data that the connector obtained during the initial snapshot might become stale, lost, or incomplete. To provide a mechanism for recapturing table data, Debezium includes an option to perform ad hoc snapshots. You might want to perform an ad hoc snapshot after any of the following changes occur in your Debezium environment:

  • The connector configuration is modified to capture a different set of tables.

  • Kafka topics are deleted and must be rebuilt.

  • Data corruption occurs due to a configuration error or some other problem.

You can re-run a snapshot for a table for which you previously captured a snapshot by initiating a so-called ad-hoc snapshot. Ad hoc snapshots require the use of signaling tables. You initiate an ad hoc snapshot by sending a signal request to the Debezium signaling table.

When you initiate an ad hoc snapshot of an existing table, the connector appends content to the topic that already exists for the table. If a previously existing topic was removed, Debezium can create a topic automatically if automatic topic creation is enabled.

Ad hoc snapshot signals specify the tables to include in the snapshot. The snapshot can capture the entire contents of the database, or capture only a subset of the tables in the database. Also, the snapshot can capture a subset of the contents of the table(s) in the database.

You specify the tables to capture by sending an execute-snapshot message to the signaling table. Set the type of the execute-snapshot signal to incremental or blocking, and provide the names of the tables to include in the snapshot, as described in the following table:

Table 2. Example of an ad hoc execute-snapshot signal record
Field Default Value

type

incremental

Specifies the type of snapshot that you want to run.
Currently, you can request incremental or blocking snapshots.

data-collections

N/A

An array that contains regular expressions matching the fully-qualified names of the tables to include in the snapshot.
For the Oracle connector, use the following format to specify the fully qualified name of a table: database.schema.table.

additional-conditions

N/A

An optional array that specifies a set of additional conditions that the connector evaluates to determine the subset of records to include in a snapshot.
Each additional condition is an object that specifies the criteria for filtering the data that an ad hoc snapshot captures. You can set the following parameters for each additional condition:

data-collection

The fully-qualified name of the table that the filter applies to. You can apply different filters to each table.

filter

Specifies column values that must be present in a database record for the snapshot to include it, for example, "color='blue'".

The values that you assign to the filter parameter are the same types of values that you might specify in the WHERE clause of SELECT statements when you set the snapshot.select.statement.overrides property for a blocking snapshot.

surrogate-key

N/A

An optional string that specifies the column name that the connector uses as the primary key of a table during the snapshot process.

Triggering an ad hoc incremental snapshot

You initiate an ad hoc incremental snapshot by adding an entry with the execute-snapshot signal type to the signaling table, or by sending a signal message to a Kafka signaling topic. After the connector processes the message, it begins the snapshot operation. The snapshot process reads the first and last primary key values and uses those values as the start and end point for each table. Based on the number of entries in the table, and the configured chunk size, Debezium divides the table into chunks, and proceeds to snapshot each chunk, in succession, one at a time.

For more information, see Incremental snapshots.

Triggering an ad hoc blocking snapshot

You initiate an ad hoc blocking snapshot by adding an entry with the execute-snapshot signal type to the signaling table or signaling topic. After the connector processes the message, it begins the snapshot operation. The connector temporarily stops streaming, and then initiates a snapshot of the specified table, following the same process that it uses during an initial snapshot. After the snapshot completes, the connector resumes streaming.

For more information, see Blocking snapshots.

Incremental snapshots

To provide flexibility in managing snapshots, Debezium includes a supplementary snapshot mechanism, known as incremental snapshotting. Incremental snapshots rely on the Debezium mechanism for sending signals to a Debezium connector. Incremental snapshots are based on the DDD-3 design document.

In an incremental snapshot, instead of capturing the full state of a database all at once, as in an initial snapshot, Debezium captures each table in phases, in a series of configurable chunks. You can specify the tables that you want the snapshot to capture and the size of each chunk. The chunk size determines the number of rows that the snapshot collects during each fetch operation on the database. The default chunk size for incremental snapshots is 1024 rows.

As an incremental snapshot proceeds, Debezium uses watermarks to track its progress, maintaining a record of each table row that it captures. This phased approach to capturing data provides the following advantages over the standard initial snapshot process:

  • You can run incremental snapshots in parallel with streamed data capture, instead of postponing streaming until the snapshot completes. The connector continues to capture near real-time events from the change log throughout the snapshot process, and neither operation blocks the other.

  • If the progress of an incremental snapshot is interrupted, you can resume it without losing any data. After the process resumes, the snapshot begins at the point where it stopped, rather than recapturing the table from the beginning.

  • You can run an incremental snapshot on demand at any time, and repeat the process as needed to adapt to database updates. For example, you might re-run a snapshot after you modify the connector configuration to add a table to its table.include.list property.

Incremental snapshot process

When you run an incremental snapshot, Debezium sorts each table by primary key and then splits the table into chunks based on the configured chunk size. Working chunk by chunk, it then captures each table row in a chunk. For each row that it captures, the snapshot emits a READ event. That event represents the value of the row when the snapshot for the chunk began.

As a snapshot proceeds, it’s likely that other processes continue to access the database, potentially modifying table records. To reflect such changes, INSERT, UPDATE, or DELETE operations are committed to the transaction log as per usual. Similarly, the ongoing Debezium streaming process continues to detect these change events and emits corresponding change event records to Kafka.

How Debezium resolves collisions among records with the same primary key

In some cases, the UPDATE or DELETE events that the streaming process emits are received out of sequence. That is, the streaming process might emit an event that modifies a table row before the snapshot captures the chunk that contains the READ event for that row. When the snapshot eventually emits the corresponding READ event for the row, its value is already superseded. To ensure that incremental snapshot events that arrive out of sequence are processed in the correct logical order, Debezium employs a buffering scheme for resolving collisions. Only after collisions between the snapshot events and the streamed events are resolved does Debezium emit an event record to Kafka.

Snapshot window

To assist in resolving collisions between late-arriving READ events and streamed events that modify the same table row, Debezium employs a so-called snapshot window. The snapshot window demarcates the interval during which an incremental snapshot captures data for a specified table chunk. Before the snapshot window for a chunk opens, Debezium follows its usual behavior and emits events from the transaction log directly downstream to the target Kafka topic. But from the moment that the snapshot for a particular chunk opens, until it closes, Debezium performs a de-duplication step to resolve collisions between events that have the same primary key..

For each data collection, the Debezium emits two types of events, and stores the records for them both in a single destination Kafka topic. The snapshot records that it captures directly from a table are emitted as READ operations. Meanwhile, as users continue to update records in the data collection, and the transaction log is updated to reflect each commit, Debezium emits UPDATE or DELETE operations for each change.

As the snapshot window opens, and Debezium begins processing a snapshot chunk, it delivers snapshot records to a memory buffer. During the snapshot windows, the primary keys of the READ events in the buffer are compared to the primary keys of the incoming streamed events. If no match is found, the streamed event record is sent directly to Kafka. If Debezium detects a match, it discards the buffered READ event, and writes the streamed record to the destination topic, because the streamed event logically supersede the static snapshot event. After the snapshot window for the chunk closes, the buffer contains only READ events for which no related transaction log events exist. Debezium emits these remaining READ events to the table’s Kafka topic.

The connector repeats the process for each snapshot chunk.

To enable Debezium to perform incremental snapshots, you must grant the connector permission to write to the signaling table.

Write permission is unnecessary only for connectors that can be configured to perform read-only incrementals snapshots (MariaDB, MySQL, or PostgreSQL).

Currently, you can use either of the following methods to initiate an incremental snapshot:

The Debezium connector for Oracle does not support schema changes while an incremental snapshot is running.