Introduction to CDC (Change Data Capture)Change Data Capture (CDC) is a technique used to track changes at the row level in database operations (insertions, updates, deletions) and notify other systems in the order of events. In disaster recovery scenarios, CDC primarily synchronizes data between a primary and a backup database, enabling real-time data syncing from the primary to the secondary database.source ----------> CDC ----------> sinkApache SeaTunnel CDCSeaTunnel CDC offers two types of data synchronization:Snapshot Read: Reads historical data from a table.Incremental Tracking: Reads incremental log changes from a table.Lock-Free Snapshot SynchronizationThe lock-free snapshot synchronization phase is emphasized because many existing CDC platforms, such as Debezium, may lock tables during historical data synchronization. Snapshot reading is the process of synchronizing a database’s historical data. The basic flow of this process is as follows:storage -------------> splitEnumerator ---------- split ----------> reader ^ | | | \----------------- report -----------/Split PartitioningsplitEnumerator (split distributor) partitions the table data into multiple splits based on specified fields (such as table ID or unique keys) and defined step size.\Parallel ProcessingEach split is assigned to a different reader for parallel reading. A single reader will occupy one connection.\Event FeedbackAfter completing the read operation for a split, each reader reports progress back to the splitEnumerator. The metadata for the split is provided as follows:String splitId # Routing IDTableId tableId # Table IDSeatunnelRowType splitKeyType # The type of field used for partitioningObject splitStart # Start point of the partitionObject splitEnd # End point of the partition\Once the reader receives the split information, it generates the appropriate SQL statements. Before starting, it logs the current split’s corresponding position in the database log. After completing the current split, the reader reports progress to the splitEnumerator with the following data:String splitId # Split IDOffset highWatermark # Log position corresponding to the split, for future validationIncremental SynchronizationThe incremental synchronization phase begins after the snapshot read phase. In this stage, any changes occurring in the source database are captured and synchronized to the backup database in real time. This phase listens to the database log (e.g., MySQL binlog). Incremental tracking is usually single-threaded to avoid duplicate pulls of the binlog and reduce database load. Therefore, only one reader is used, occupying a single connection.data log -------------> splitEnumerator ---------- split ----------> reader ^ | | | \----------------- report -----------/\In the incremental synchronization phase, all splits and tables from the snapshot phase are combined into a single split. The split metadata during this phase is as follows:String splitIdOffset startingOffset # The lowest log start position among all splitsOffset endingOffset # Log end position, or "continuous" if ongoing, e.g., in the incremental phaseList tableIdsMap tableWatermarks # Watermark for all splitsList completedSnapshotSplitInfos # Snapshot phase split details\The CompletedSnapshotSplitInfo fields are as follows:String splitIdTableId tableIdSeatunnelRowType splitKeyTypeObject splitStartObject splitEndOffset watermark # Corresponds to the highWatermark in the reportThe split in the incremental phase contains the watermark for all splits in the snapshot phase. The minimal watermark is selected as the starting point for incremental synchronization.Exactly-Once SemanticsWhether in the snapshot read or incremental read phase, the database might also change for synchronization. How do we guarantee exactly one delivery?Snapshot Read PhaseIn the snapshot read phase, for example, a split is being synchronized while changes are happening, such as the insertion of a row k3, an update to k2, and a deletion of k1. If no task identification is used during the read process, the updates could be lost. SeaTunnel handles this by:\First, checking the binlog position (low watermark) before reading the split.Reading the data in the range split{start, end}.Recording the high watermark after reading.\If high = low, the data for the split has not changed during the read. If (high - low) > 0, changes have occurred during processing. In such a case, SeaTunnel will:\Cache the split data in memory as an in-memory table.Apply changes from low watermark to high watermark in order, using primary keys to replay operations on the in-memory table.Report the high watermark.\ insert k3 update k2 delete k1 | | | v v v bin log --|---------------------------------------------------|-- log offset low watermark high watermarkCDC reads: k1 k3 k4 | Replays vReal data: k2 k3' k4Incremental PhaseBefore starting the incremental phase, SeaTunnel first validates all splits from the previous step. Between splits, data may be updated, for instance, if new records are inserted between split1 and split2, they could be missed during the snapshot phase. To recover this data between splits, SeaTunnel follows this approach:\From all split reports, find the smallest watermark as the start watermark to begin reading the log.For each log entry read, check completedSnapshotSplitInfos to see if the data has been processed in any split. If not, it is considered data between splits and should be corrected.Once all splits are validated, the process moves to the full incremental phase.\ |------------filter split2-----------------| |----filter split1------| data log -|-----------------------|------------------|----------------------------------|- log offset min watermark split1 watermark split2 watermark max watermarkCheckpoint and ResumeWhat about pausing and resuming CDC? SeaTunnel uses a distributed snapshot algorithm (Chandy-Lamport):Assume the system has two processes, p1 and p2, where p1 has three variables X1 Y1 Z1 and p2 has three variables X2 Y2 Z2. The initial states are as follows:p1 p2X1:0 X2:4Y1:0 Y2:2Z1:0 Z2:3\At this point, p1 initiates a global snapshot. p1 first records its process state, then sends a marker to p2.\Before the marker reaches p2, p2 sends message M to p1.p1 p2X1:0 -------marker-------> X2:4Y1:0