Skip to content

sink: Add schema/table routing support for mysql and redo sinks#3704

Open
a-cong wants to merge 17 commits intopingcap:masterfrom
a-cong:sink-routing
Open

sink: Add schema/table routing support for mysql and redo sinks#3704
a-cong wants to merge 17 commits intopingcap:masterfrom
a-cong:sink-routing

Conversation

@a-cong
Copy link

@a-cong a-cong commented Dec 18, 2025

What problem does this PR solve?

Issue Number: close #3700

What is changed and how it works?

Porting changes from pingcap/tiflow#12435 to new ticdc architecture.

The dispatchers config is the same as in tiflow -

[[sink.dispatchers]]
matcher = ["uds_000.*"]
schema = "tidb_failover_test"
table = "{table}"

[[sink.dispatchers]]
matcher = ["oyster_production.*"]
schema = "oyster_production"
table = "{table}"

We also add the TargetSchema and TargetTable fields to the TableName struct. Since there is now a shared TableInfo object that can be accessed by multiple changefeeds and a new concept of Dispatcher objects, we clone the TableInfo in each dispatcher with the routing information for that table in handleHandshakeEvent (which is executed when the dispatcher starts), and then store that cloned/routed TableInfo for the dispatcher. Subsequent DMLs then look up the stored info. Subsequent DDLs also explicitly update routing in handleSingleDataEvents.

Another difference between ticdc and tiflow is that DDLs are now handled per-sink instead of in one single entry point. Because of this, we need to apply routing in both the mysql and redo sinks separately - the shared rewriting functionality is in pkg/sink/util/ddl_routing.go.

We also copied FetchDDLTables and RenameDDLTable (and the corresponding tests) from https://github.com/pingcap/tiflow/blob/master/dm/pkg/parser/common.go so we could use them to rewrite DDLs.

Check List

Tests

  • Unit tests
  • Integration tests
  • Manual test - deployed to internal dev cluster and bootstrapped a migration shadow with schema mapping, verified changefeed routes DDLs/DMLs correctly

Questions

Do you need to update user documentation, design documentation or monitoring documentation?

Yes - will do so in separate PR

Release note

Added support for schema and table routing for mysql-compatible sinks by extending the `dispatchers` config

Summary by CodeRabbit

Release Notes

  • New Features

    • Added schema and table routing capabilities to dispatch rules with {schema} and {table} placeholder support
    • Routing rules are applied to DML operations and DDL statements, redirecting data to target schemas and tables
    • Redo log replay now preserves routing information during recovery
    • Routing configuration validation for schema and table expressions
  • Testing

    • Added comprehensive integration tests for sink routing scenarios and redo log schema routing

@ti-chi-bot
Copy link

ti-chi-bot bot commented Dec 18, 2025

Adding the "do-not-merge/release-note-label-needed" label because no release-note block was detected, please follow our release note process to remove it.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.

@ti-chi-bot ti-chi-bot bot added the do-not-merge/release-note-label-needed Indicates that a PR should not merge because it's missing one of the release note labels. label Dec 18, 2025
@ti-chi-bot
Copy link

ti-chi-bot bot commented Dec 18, 2025

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign flowbehappy for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@ti-chi-bot ti-chi-bot bot added contribution This PR is from a community contributor. first-time-contributor Indicates that the PR was contributed by an external member and is a first-time contributor. needs-ok-to-test Indicates a PR created by contributors and need ORG member send '/ok-to-test' to start testing. labels Dec 18, 2025
@ti-chi-bot
Copy link

ti-chi-bot bot commented Dec 18, 2025

Hi @a-cong. Thanks for your PR.

I'm waiting for a pingcap member to verify that this patch is reasonable to test. If it is, they should reply with /ok-to-test on its own line. Until that is done, I will not automatically test new commits in this PR, but the usual testing commands by org members will still work. Regular contributors should join the org to skip this step.

Once the patch is verified, the new status will be reflected by the ok-to-test label.

I understand the commands that are listed here.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.

@pingcap-cla-assistant
Copy link

pingcap-cla-assistant bot commented Dec 18, 2025

CLA assistant check
All committers have signed the CLA.

@ti-chi-bot ti-chi-bot bot added the size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. label Dec 18, 2025
@gemini-code-assist
Copy link

Summary of Changes

Hello @a-cong, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the data replication capabilities by introducing schema and table routing for MySQL-compatible and Redo sinks. It allows users to dynamically remap database and table names from the source to the target, providing greater flexibility for data integration and consolidation. The changes involve updating configuration structures, implementing DDL query rewriting logic, and ensuring concurrency safety through event cloning, ultimately enabling seamless replication to custom target schemas and tables.

Highlights

  • Schema and Table Routing for MySQL and Redo Sinks: Introduces comprehensive support for schema and table routing, allowing users to define rules to map source database and table names to different target names in MySQL-compatible and Redo sinks. This enables flexible data migration and consolidation scenarios.
  • DDL Query Rewriting: Implements a mechanism to parse and rewrite DDL (Data Definition Language) queries based on configured routing rules. This ensures that DDL statements like CREATE TABLE, ALTER TABLE, and RENAME TABLE are executed against the correct target schema and table names in the downstream.
  • Event Cloning for Concurrency Safety: Adds cloning logic for BatchDMLEvent and DDLEvent objects before applying routing. This prevents race conditions and unintended mutations when the same event is processed by multiple dispatchers (e.g., the main event dispatcher and the redo log dispatcher), ensuring each can apply routing independently.
  • Enhanced TableName and TableInfo: Extends the TableName struct with TargetSchema and TargetTable fields to store routed names. The TableInfo struct now includes a CloneWithRouting method to create a routed copy, and DML SQL generation functions (buildInsert, buildDelete, buildUpdate) are updated to use these target names.
  • Integration with Redo Log Replay: Ensures that DDL routing is applied before writing events to the redo log. This means that when redo logs are replayed (e.g., during disaster recovery), the DDLs will correctly target the routed schema and table names, maintaining data consistency.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a significant new feature: schema and table routing for MySQL and redo log sinks. The changes are extensive, touching upon the API, configuration, dispatchers, and sinks to support this functionality. The design thoughtfully addresses potential concurrency issues by cloning event objects to prevent race conditions, especially when events are shared between the main sink and the redo log sink. A comprehensive set of unit and integration tests has been added to validate the new routing logic. Overall, this is a well-structured implementation of a complex feature. I've identified one critical issue in BatchDMLEvent.CloneForRouting that could lead to data corruption or mis-routing due to improper cloning of nested event objects. Please address this to ensure the correctness of the feature.

@wk989898
Copy link
Collaborator

/ok-to-test

@ti-chi-bot ti-chi-bot bot added ok-to-test Indicates a PR is ready to be tested. and removed needs-ok-to-test Indicates a PR created by contributors and need ORG member send '/ok-to-test' to start testing. labels Dec 26, 2025
@a-cong a-cong force-pushed the sink-routing branch 2 times, most recently from d9fa289 to beac037 Compare January 2, 2026 23:14
@a-cong
Copy link
Author

a-cong commented Jan 3, 2026

/test all

1 similar comment
@3AceShowHand
Copy link
Collaborator

/test all

@a-cong
Copy link
Author

a-cong commented Jan 5, 2026

/test all

@a-cong
Copy link
Author

a-cong commented Jan 6, 2026

/test all

a-cong and others added 5 commits February 4, 2026 14:01
Address reviewer concern about potential race conditions if callbacks
are appended to both original and clone sharing the same backing array.

Currently DDL events arrive with nil PostTxnFlushed (callbacks are added
downstream), so this is a defensive/future-proofing change. If any code
path later adds callbacks before cloning, sharing the backing array
could cause nondeterministic callback visibility or data races.
CloneWithRouting shares the refcounted columnSchema pointer. Without
incrementing the refcount and setting a finalizer, the clone could
reference a recycled columnSchema if the original is GC'd first.

This follows the same pattern used in UnmarshalJSONToTableInfo and
WrapTableInfo.
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
@a-cong
Copy link
Author

a-cong commented Feb 4, 2026

/test pull-cdc-mysql-integration-light

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🤖 Fix all issues with AI agents
In `@downstreamadapter/eventcollector/dispatcher_stat.go`:
- Around line 750-787: Update the debug log messages inside
dispatcherStat.applyRoutingToTableInfo to remove the function name and hyphens
and rephrase them to plain descriptive text (e.g., "router is nil" -> "no router
configured", "no routing change" -> "routing unchanged", "routing applied" ->
"routing result differs from source"); keep the same structured fields
(changefeedID, sourceSchema/sourceTable, targetSchema/targetTable) and ensure
messages use spaces instead of hyphens and do not embed the function name so
they conform to logging guidelines.
- Around line 482-500: Update the log messages inside applyRoutingToTableInfo to
remove the function name prefix: replace any occurrences of log messages that
read "applyRoutingToTableInfo: router is nil", "applyRoutingToTableInfo: no
routing change", and "applyRoutingToTableInfo: routing applied" with "router is
nil", "no routing change", and "routing applied" respectively; locate these
strings in the applyRoutingToTableInfo function and update the processLogger/log
calls (or whichever logger is used) to use the shorter messages while keeping
the same log level and context.

In `@downstreamadapter/sink/mysql/sink_test.go`:
- Around line 444-456: The NewMySQLSink call in getMysqlSinkWithRouter omits
additional parameters required by the current constructor signature; update the
call to match the same argument order/values used in other test helpers (e.g.,
the other getMysqlSink helper) by adding the missing parameters (such as the
owner flag, additional options, or logger/router position) so NewMySQLSink(ctx,
changefeedID, cfg, db, ...) matches the project's canonical call site; use the
existing tests that call NewMySQLSink as the reference to copy the exact extra
arguments and order.

In `@pkg/common/table_info.go`:
- Around line 150-179: CloneWithRouting currently constructs cloned :=
&TableInfo{...} but omits copying the ActiveActiveTable and SoftDeleteTable
flags, causing routed TableInfo to lose those semantics; update CloneWithRouting
(the cloned TableInfo initializer in pkg/common/table_info.go) to include
ActiveActiveTable: ti.ActiveActiveTable and SoftDeleteTable: ti.SoftDeleteTable
so the clone preserves those flags, and retain the existing refcount/finalizer
logic for columnSchema (GetSharedColumnSchemaStorage().incColumnSchemaCount,
runtime.SetFinalizer, tryReleaseColumnSchema).

In `@pkg/sink/util/ddl_routing.go`:
- Around line 83-135: The error returns from parser.ParseOneStmt,
FetchDDLTables, and RenameDDLTable should preserve stack traces: replace the
existing errors.Errorf(...) wrappers with errors.Annotatef(errors.Trace(err),
...) so the original error is traced and annotated with the same contextual
message (e.g., "failed to parse DDL query for routing: %s", "failed to fetch
tables from DDL for routing: %s", "failed to rewrite DDL query with routing:
%s"). Update the three return paths that reference parser.ParseOneStmt,
FetchDDLTables, and RenameDDLTable accordingly while keeping the original
query/context text and formatting.

Comment on lines 482 to 500
originalDDL := events[0].Event.(*commonEvent.DDLEvent)
// Clone the DDL event before applying routing. This is defensive to ensure
// we don't mutate the original event which may be referenced elsewhere.
// The clone allows us to safely modify Query and TargetSchemaName fields.
ddl := originalDDL.CloneForRouting()
events[0].Event = ddl
d.tableInfoVersion.Store(ddl.FinishedTs)
if ddl.TableInfo != nil {
d.tableInfo.Store(ddl.TableInfo)
// Apply routing to get a TableInfo with TargetSchema/TargetTable set.
// This returns a cloned TableInfo to avoid mutating the shared original,
// which may be used by other changefeeds with different routing rules.
routedTableInfo := d.applyRoutingToTableInfo(ddl.TableInfo)
ddl.TableInfo = routedTableInfo
d.tableInfo.Store(routedTableInfo)
}
// Also apply routing to MultipleTableInfos for multi-table DDLs (e.g., RENAME TABLE)
for i, tableInfo := range ddl.MultipleTableInfos {
ddl.MultipleTableInfos[i] = d.applyRoutingToTableInfo(tableInfo)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Inspect CloneForRouting and PostTxnFlushed handling
rg -n "CloneForRouting" -g '*.go' -C 3
rg -n "PostTxnFlushed" -g '*.go' -C 2

Repository: pingcap/ticdc

Length of output: 22567


🏁 Script executed:

#!/bin/bash
# Find the applyRoutingToTableInfo implementation and check log messages
rg -n "applyRoutingToTableInfo" downstreamadapter/eventcollector/dispatcher_stat.go -C 5 -A 40

Repository: pingcap/ticdc

Length of output: 5887


Fix log message style in applyRoutingToTableInfo.

The DDL cloning logic properly deep-copies PostTxnFlushed with an explicit make and copy (verified in tests), so the shallow-copy concern is not an issue. However, the log messages at lines 760, 772, and 779 violate the coding guideline: "log message strings should not include function names." Change:

  • "applyRoutingToTableInfo: router is nil""router is nil"
  • "applyRoutingToTableInfo: no routing change""no routing change"
  • "applyRoutingToTableInfo: routing applied""routing applied"
🤖 Prompt for AI Agents
In `@downstreamadapter/eventcollector/dispatcher_stat.go` around lines 482 - 500,
Update the log messages inside applyRoutingToTableInfo to remove the function
name prefix: replace any occurrences of log messages that read
"applyRoutingToTableInfo: router is nil", "applyRoutingToTableInfo: no routing
change", and "applyRoutingToTableInfo: routing applied" with "router is nil",
"no routing change", and "routing applied" respectively; locate these strings in
the applyRoutingToTableInfo function and update the processLogger/log calls (or
whichever logger is used) to use the shorter messages while keeping the same log
level and context.

Comment on lines +83 to +111
// Parse the DDL query using TiDB parser
p := parser.New()
stmt, err := p.ParseOneStmt(ddl.Query, "", "")
if err != nil {
return nil, errors.Errorf("failed to parse DDL query for routing: %v (query: %s)", err, ddl.Query)
}

// Fetch source tables from the DDL
sourceTables, err := FetchDDLTables(defaultSchema, stmt)
if err != nil {
return nil, errors.Errorf("failed to fetch tables from DDL for routing: %v (query: %s)", err, ddl.Query)
}

if len(sourceTables) == 0 {
return result, nil
}

// Build target tables by applying routing rules
targetTables := make([]*filter.Table, 0, len(sourceTables))
hasRouting := false
for _, srcTable := range sourceTables {
targetSchema, targetTable := router.Route(srcTable.Schema, srcTable.Name)
if targetSchema != srcTable.Schema || targetTable != srcTable.Name {
hasRouting = true
}
targetTables = append(targetTables, &filter.Table{
Schema: targetSchema,
Name: targetTable,
})
}

if !hasRouting {
return result, nil
}

// Routing rules matched - mark as applied regardless of whether query changes
result.RoutingApplied = true

// Set the target schema for the USE command when executing the DDL.
// We use the first target table's schema because:
// 1. For single-table DDLs, this is the correct target schema
// 2. For multi-table DDLs (e.g., RENAME TABLE), all table references in the
// rewritten query are fully qualified, so the USE command just needs to
// switch to any database the user has access to - the first target schema
// is guaranteed to be accessible since routing was configured for it.
if len(targetTables) > 0 {
result.TargetSchemaName = targetTables[0].Schema
}

// Rewrite the DDL with target tables
newQuery, err := RenameDDLTable(stmt, targetTables)
if err != nil {
return nil, errors.Errorf("failed to rewrite DDL query with routing: %v (query: %s)", err, ddl.Query)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

cat -n pkg/sink/util/ddl_routing.go | head -150

Repository: pingcap/ticdc

Length of output: 6610


🏁 Script executed:

rg -n "func FetchDDLTables|func RenameDDLTable" pkg/sink/util/

Repository: pingcap/ticdc

Length of output: 297


🏁 Script executed:

sed -n '62,90p;129,160p' pkg/sink/util/ddl_table_utils.go

Repository: pingcap/ticdc

Length of output: 2004


Wrap parser and DDL rewrite errors with errors.Trace to preserve stack traces. Errors from parser.ParseOneStmt (line 85), FetchDDLTables (line 91), and RenameDDLTable (line 133) should be wrapped with errors.Trace instead of errors.Errorf. The third-party parser call especially requires errors.Trace per coding guidelines; the internal functions should also preserve stack traces. Use errors.Annotatef(errors.Trace(err), ...) to wrap each error while adding context.

Proposed fix
-	stmt, err := p.ParseOneStmt(ddl.Query, "", "")
-	if err != nil {
-		return nil, errors.Errorf("failed to parse DDL query for routing: %v (query: %s)", err, ddl.Query)
-	}
+	stmt, err := p.ParseOneStmt(ddl.Query, "", "")
+	if err != nil {
+		return nil, errors.Annotatef(errors.Trace(err), "failed to parse DDL query for routing (query: %s)", ddl.Query)
+	}

-	sourceTables, err := FetchDDLTables(defaultSchema, stmt)
-	if err != nil {
-		return nil, errors.Errorf("failed to fetch tables from DDL for routing: %v (query: %s)", err, ddl.Query)
-	}
+	sourceTables, err := FetchDDLTables(defaultSchema, stmt)
+	if err != nil {
+		return nil, errors.Annotatef(errors.Trace(err), "failed to fetch tables from DDL for routing (query: %s)", ddl.Query)
+	}

-	newQuery, err := RenameDDLTable(stmt, targetTables)
-	if err != nil {
-		return nil, errors.Errorf("failed to rewrite DDL query with routing: %v (query: %s)", err, ddl.Query)
-	}
+	newQuery, err := RenameDDLTable(stmt, targetTables)
+	if err != nil {
+		return nil, errors.Annotatef(errors.Trace(err), "failed to rewrite DDL query with routing (query: %s)", ddl.Query)
+	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Parse the DDL query using TiDB parser
p := parser.New()
stmt, err := p.ParseOneStmt(ddl.Query, "", "")
if err != nil {
return nil, errors.Errorf("failed to parse DDL query for routing: %v (query: %s)", err, ddl.Query)
}
// Fetch source tables from the DDL
sourceTables, err := FetchDDLTables(defaultSchema, stmt)
if err != nil {
return nil, errors.Errorf("failed to fetch tables from DDL for routing: %v (query: %s)", err, ddl.Query)
}
if len(sourceTables) == 0 {
return result, nil
}
// Build target tables by applying routing rules
targetTables := make([]*filter.Table, 0, len(sourceTables))
hasRouting := false
for _, srcTable := range sourceTables {
targetSchema, targetTable := router.Route(srcTable.Schema, srcTable.Name)
if targetSchema != srcTable.Schema || targetTable != srcTable.Name {
hasRouting = true
}
targetTables = append(targetTables, &filter.Table{
Schema: targetSchema,
Name: targetTable,
})
}
if !hasRouting {
return result, nil
}
// Routing rules matched - mark as applied regardless of whether query changes
result.RoutingApplied = true
// Set the target schema for the USE command when executing the DDL.
// We use the first target table's schema because:
// 1. For single-table DDLs, this is the correct target schema
// 2. For multi-table DDLs (e.g., RENAME TABLE), all table references in the
// rewritten query are fully qualified, so the USE command just needs to
// switch to any database the user has access to - the first target schema
// is guaranteed to be accessible since routing was configured for it.
if len(targetTables) > 0 {
result.TargetSchemaName = targetTables[0].Schema
}
// Rewrite the DDL with target tables
newQuery, err := RenameDDLTable(stmt, targetTables)
if err != nil {
return nil, errors.Errorf("failed to rewrite DDL query with routing: %v (query: %s)", err, ddl.Query)
// Parse the DDL query using TiDB parser
p := parser.New()
stmt, err := p.ParseOneStmt(ddl.Query, "", "")
if err != nil {
return nil, errors.Annotatef(errors.Trace(err), "failed to parse DDL query for routing (query: %s)", ddl.Query)
}
// Fetch source tables from the DDL
sourceTables, err := FetchDDLTables(defaultSchema, stmt)
if err != nil {
return nil, errors.Annotatef(errors.Trace(err), "failed to fetch tables from DDL for routing (query: %s)", ddl.Query)
}
if len(sourceTables) == 0 {
return result, nil
}
// Build target tables by applying routing rules
targetTables := make([]*filter.Table, 0, len(sourceTables))
hasRouting := false
for _, srcTable := range sourceTables {
targetSchema, targetTable := router.Route(srcTable.Schema, srcTable.Name)
if targetSchema != srcTable.Schema || targetTable != srcTable.Name {
hasRouting = true
}
targetTables = append(targetTables, &filter.Table{
Schema: targetSchema,
Name: targetTable,
})
}
if !hasRouting {
return result, nil
}
// Routing rules matched - mark as applied regardless of whether query changes
result.RoutingApplied = true
// Set the target schema for the USE command when executing the DDL.
// We use the first target table's schema because:
// 1. For single-table DDLs, this is the correct target schema
// 2. For multi-table DDLs (e.g., RENAME TABLE), all table references in the
// rewritten query are fully qualified, so the USE command just needs to
// switch to any database the user has access to - the first target schema
// is guaranteed to be accessible since routing was configured for it.
if len(targetTables) > 0 {
result.TargetSchemaName = targetTables[0].Schema
}
// Rewrite the DDL with target tables
newQuery, err := RenameDDLTable(stmt, targetTables)
if err != nil {
return nil, errors.Annotatef(errors.Trace(err), "failed to rewrite DDL query with routing (query: %s)", ddl.Query)
}
🤖 Prompt for AI Agents
In `@pkg/sink/util/ddl_routing.go` around lines 83 - 135, The error returns from
parser.ParseOneStmt, FetchDDLTables, and RenameDDLTable should preserve stack
traces: replace the existing errors.Errorf(...) wrappers with
errors.Annotatef(errors.Trace(err), ...) so the original error is traced and
annotated with the same contextual message (e.g., "failed to parse DDL query for
routing: %s", "failed to fetch tables from DDL for routing: %s", "failed to
rewrite DDL query with routing: %s"). Update the three return paths that
reference parser.ParseOneStmt, FetchDDLTables, and RenameDDLTable accordingly
while keeping the original query/context text and formatting.

@a-cong
Copy link
Author

a-cong commented Feb 4, 2026

/test pull-cdc-mysql-integration-light

@a-cong
Copy link
Author

a-cong commented Feb 4, 2026

/test pull-cdc-mysql-integration-light

1 similar comment
@a-cong
Copy link
Author

a-cong commented Feb 4, 2026

/test pull-cdc-mysql-integration-light

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
@a-cong
Copy link
Author

a-cong commented Feb 4, 2026

/test pull-cdc-mysql-integration-light

@a-cong
Copy link
Author

a-cong commented Feb 6, 2026

/test pull-cdc-mysql-integration-light

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🤖 Fix all issues with AI agents
In `@downstreamadapter/eventcollector/dispatcher_stat.go`:
- Around line 492-497: The log message in the log.Info call for the DDL event
includes the function-name prefix "handleSingleDataEvents:" and a hyphen; update
the first argument to remove the function name and hyphen (e.g., change
"handleSingleDataEvents: processing DDL event" to "processing DDL event") while
keeping the structured fields (originalDDL, d.target.GetChangefeedID(),
d.getDispatcherID(), originalDDL.GetDDLType(), originalDDL.Query) unchanged so
identifiers like originalDDL and methods getDispatcherID/GetChangefeedID remain
as-is.
- Around line 669-678: The log message passed to log.Info inside the block that
calls d.applyRoutingToTableInfo and d.tableInfo.Store currently includes the
function name ("handleHandshakeEvent: storing routed tableInfo") and a
hyphenated style; update that log.Info call to use a plain, function-name-free
message such as "storing routed tableInfo" (or "storing routed table info") with
spaces instead of hyphens, leaving the existing structured fields (changefeedID,
dispatcher, originalSchema, originalTable, routedTargetSchema,
routedTargetTable) unchanged so the call site around applyRoutingToTableInfo and
d.tableInfo.Store remains otherwise identical.
- Around line 392-402: Remove the function-name prefix and hyphenated
punctuation from the two log messages: instead of starting the message with
"handleBatchDataEvents: DROPPED - stale epoch" and "handleBatchDataEvents:
DROPPED - sequence verification failed, resetting dispatcher", change them to
plain space-separated messages (e.g. "DROPPED stale epoch" and "DROPPED sequence
verification failed resetting dispatcher") in the log.Info calls used near
verifyEventSequence and the epoch check; keep the existing zap fields
(d.target.GetChangefeedID(), d.getDispatcherID(),
commonEvent.TypeToString(event.GetType())) intact so only the log message string
is modified.
- Around line 419-425: The log in handleBatchDataEvents that reports loaded
tableInfo for DML (currently using log.Info with fields like
d.target.GetChangefeedID(), d.getDispatcherID(), tableInfo.TableName.Schema/
Table/ GetTargetSchema/GetTargetTable) should be changed to log.Debug so it
doesn't flood production logs; update the call site in handleBatchDataEvents to
use log.Debug (preserving the same structured fields and message) so the
information remains available for debugging but is suppressed at Info level.

In `@downstreamadapter/eventcollector/helper.go`:
- Around line 75-81: Replace the noisy log.Info call with a debug-level message
and remove the function-name prefix: change log.Info(...) to log.Debug(...) and
update the message string from "EventsHandler.Handle: incoming events" to a
plain phrase like "incoming events" (keep the existing zap fields such as
zap.Stringer("dispatcher", stat.target.GetId()), zap.Int("eventLen",
len(events)), zap.String("firstEventType",
commonEvent.TypeToString(events[0].GetType()))). If you prefer to eliminate
redundancy with the existing debug at line 74, simply remove this log statement
entirely.
🧹 Nitpick comments (6)
tests/integration_tests/create_table_like_bug/run.sh (1)

43-48: sleep 2 is a fragile sync mechanism — consider a polling check instead.

If CDC startup or the initial snapshot takes longer than 2 seconds, the assertion on line 48 will fail, making this test flaky. The script already uses check_table_exists with a timeout for the finish marker (line 74). A similar polling/retry approach for the initial data count would be more robust.

pkg/sink/util/ddl_table_utils.go (2)

92-124: Partial AST mutation on error leaves stmt in an inconsistent state.

When tableRenameVisitor encounters more TableName nodes than targetTables, it sets hasErr and aborts. However, some table nodes have already been renamed in-place on the caller's stmt. If a caller ignores the error or reuses the stmt, the AST will contain a mix of original and routed names.

This is mitigated by the fact that RenameDDLTable returns an error, but the caller's stmt is still silently mutated. Consider documenting that stmt is modified in-place and must not be reused on error, or working on a clone.


166-167: Nit: prefer new(bytes.Buffer) over declaring an unused nil slice.

var b []byte is unused as a meaningful initial value — bytes.NewBuffer(b) behaves identically to bytes.NewBuffer(nil). A more idiomatic approach:

Proposed simplification
-	var b []byte
-	bf := bytes.NewBuffer(b)
+	bf := new(bytes.Buffer)
tests/integration_tests/sink_routing/run.sh (3)

10-11: Quote shell variables to prevent word splitting.

While unlikely to cause issues in a controlled test environment, unquoted variables throughout the script (e.g., $CUR, $WORK_DIR, $OUT_DIR) are technically vulnerable to word splitting. This pattern appears consistently throughout the file (Lines 10, 11, 22, 24, 26, 38, 41, etc.).

Example fix for common occurrences
-source $CUR/../_utils/test_prepare
-WORK_DIR=$OUT_DIR/$TEST_NAME
+source "$CUR/../_utils/test_prepare"
+WORK_DIR="$OUT_DIR/$TEST_NAME"

105-108: Consider adding check_table_not_exists for source_db.products as well.

Lines 106-107 verify that source_db.users and source_db.orders don't exist downstream, but source_db.products is not checked despite target_db.products_routed being verified at Line 103. This is a minor test coverage gap.

Suggested addition
 	check_table_not_exists source_db.users ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
 	check_table_not_exists source_db.orders ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
+	check_table_not_exists source_db.products ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}

84-96: Diagnostic grep for "error|panic|fail" will produce many false positives.

The pattern grep -i "error\|panic\|fail" will match field names, log context, and unrelated strings (e.g., "failover", "errorCode" in structured log fields). Since this is purely diagnostic output and not a test assertion, it's not harmful — just noisy.

Comment on lines +392 to +402
log.Info("handleBatchDataEvents: DROPPED - stale epoch",
zap.Stringer("changefeedID", d.target.GetChangefeedID()),
zap.Stringer("dispatcher", d.getDispatcherID()),
zap.String("eventType", commonEvent.TypeToString(event.GetType())),
zap.Any("event", event.Event))
zap.String("eventType", commonEvent.TypeToString(event.GetType())))
continue
}
if !d.verifyEventSequence(event) {
log.Info("handleBatchDataEvents: DROPPED - sequence verification failed, resetting dispatcher",
zap.Stringer("changefeedID", d.target.GetChangefeedID()),
zap.Stringer("dispatcher", d.getDispatcherID()),
zap.String("eventType", commonEvent.TypeToString(event.GetType())))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Log messages include function names, violating coding guidelines.

Lines 392 and 399 prefix messages with "handleBatchDataEvents: ..." and use hyphens. Per guidelines, log messages should not include function names and should use spaces instead of hyphens.

Suggested fix
-		log.Info("handleBatchDataEvents: DROPPED - stale epoch",
+		log.Info("dropped event due to stale epoch",
 			zap.Stringer("changefeedID", d.target.GetChangefeedID()),
 			zap.Stringer("dispatcher", d.getDispatcherID()),
 			zap.String("eventType", commonEvent.TypeToString(event.GetType())))

-		log.Info("handleBatchDataEvents: DROPPED - sequence verification failed, resetting dispatcher",
+		log.Info("dropped event due to sequence verification failure, resetting dispatcher",
 			zap.Stringer("changefeedID", d.target.GetChangefeedID()),
 			zap.Stringer("dispatcher", d.getDispatcherID()),
 			zap.String("eventType", commonEvent.TypeToString(event.GetType())))

As per coding guidelines, log message strings should not include function names and should avoid hyphens (use spaces instead).

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
log.Info("handleBatchDataEvents: DROPPED - stale epoch",
zap.Stringer("changefeedID", d.target.GetChangefeedID()),
zap.Stringer("dispatcher", d.getDispatcherID()),
zap.String("eventType", commonEvent.TypeToString(event.GetType())),
zap.Any("event", event.Event))
zap.String("eventType", commonEvent.TypeToString(event.GetType())))
continue
}
if !d.verifyEventSequence(event) {
log.Info("handleBatchDataEvents: DROPPED - sequence verification failed, resetting dispatcher",
zap.Stringer("changefeedID", d.target.GetChangefeedID()),
zap.Stringer("dispatcher", d.getDispatcherID()),
zap.String("eventType", commonEvent.TypeToString(event.GetType())))
log.Info("dropped event due to stale epoch",
zap.Stringer("changefeedID", d.target.GetChangefeedID()),
zap.Stringer("dispatcher", d.getDispatcherID()),
zap.String("eventType", commonEvent.TypeToString(event.GetType())))
continue
}
if !d.verifyEventSequence(event) {
log.Info("dropped event due to sequence verification failure, resetting dispatcher",
zap.Stringer("changefeedID", d.target.GetChangefeedID()),
zap.Stringer("dispatcher", d.getDispatcherID()),
zap.String("eventType", commonEvent.TypeToString(event.GetType())))
🤖 Prompt for AI Agents
In `@downstreamadapter/eventcollector/dispatcher_stat.go` around lines 392 - 402,
Remove the function-name prefix and hyphenated punctuation from the two log
messages: instead of starting the message with "handleBatchDataEvents: DROPPED -
stale epoch" and "handleBatchDataEvents: DROPPED - sequence verification failed,
resetting dispatcher", change them to plain space-separated messages (e.g.
"DROPPED stale epoch" and "DROPPED sequence verification failed resetting
dispatcher") in the log.Info calls used near verifyEventSequence and the epoch
check; keep the existing zap fields (d.target.GetChangefeedID(),
d.getDispatcherID(), commonEvent.TypeToString(event.GetType())) intact so only
the log message string is modified.

Comment on lines 419 to 425
log.Info("handleBatchDataEvents: loaded tableInfo for DML",
zap.Stringer("changefeedID", d.target.GetChangefeedID()),
zap.Stringer("dispatcher", d.getDispatcherID()),
zap.String("schema", tableInfo.TableName.Schema),
zap.String("table", tableInfo.TableName.Table),
zap.String("targetSchema", tableInfo.TableName.GetTargetSchema()),
zap.String("targetTable", tableInfo.TableName.GetTargetTable()))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Log level should be Debug, not Info, for per-batch DML tableInfo loading.

This log fires on every batch DML event. In high-throughput workloads this will flood the logs. The information is useful for debugging routing but not for routine production monitoring.

Suggested fix
-		log.Info("handleBatchDataEvents: loaded tableInfo for DML",
+		log.Debug("loaded tableInfo for DML",
 			zap.Stringer("changefeedID", d.target.GetChangefeedID()),
 			zap.Stringer("dispatcher", d.getDispatcherID()),
 			zap.String("schema", tableInfo.TableName.Schema),
 			zap.String("table", tableInfo.TableName.Table),
 			zap.String("targetSchema", tableInfo.TableName.GetTargetSchema()),
 			zap.String("targetTable", tableInfo.TableName.GetTargetTable()))
🤖 Prompt for AI Agents
In `@downstreamadapter/eventcollector/dispatcher_stat.go` around lines 419 - 425,
The log in handleBatchDataEvents that reports loaded tableInfo for DML
(currently using log.Info with fields like d.target.GetChangefeedID(),
d.getDispatcherID(), tableInfo.TableName.Schema/ Table/
GetTargetSchema/GetTargetTable) should be changed to log.Debug so it doesn't
flood production logs; update the call site in handleBatchDataEvents to use
log.Debug (preserving the same structured fields and message) so the information
remains available for debugging but is suppressed at Info level.

Comment on lines 669 to 674
// Apply routing and store the (possibly cloned) TableInfo
routedTableInfo := d.applyRoutingToTableInfo(tableInfo)
log.Info("handleHandshakeEvent: storing routed tableInfo",
zap.Stringer("changefeedID", d.target.GetChangefeedID()),
zap.Stringer("dispatcher", d.getDispatcherID()),
zap.String("originalSchema", tableInfo.TableName.Schema),
zap.String("originalTable", tableInfo.TableName.Table),
zap.String("routedTargetSchema", routedTableInfo.TableName.GetTargetSchema()),
zap.String("routedTargetTable", routedTableInfo.TableName.GetTargetTable()))
d.tableInfo.Store(routedTableInfo)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Log message includes function name prefix.

"handleHandshakeEvent: storing routed tableInfo" should not contain the function name.

Suggested fix
-		log.Info("handleHandshakeEvent: storing routed tableInfo",
+		log.Info("storing routed table info from handshake",

As per coding guidelines, log message strings should not include function names and should avoid hyphens (use spaces instead).

🤖 Prompt for AI Agents
In `@downstreamadapter/eventcollector/dispatcher_stat.go` around lines 669 - 678,
The log message passed to log.Info inside the block that calls
d.applyRoutingToTableInfo and d.tableInfo.Store currently includes the function
name ("handleHandshakeEvent: storing routed tableInfo") and a hyphenated style;
update that log.Info call to use a plain, function-name-free message such as
"storing routed tableInfo" (or "storing routed table info") with spaces instead
of hyphens, leaving the existing structured fields (changefeedID, dispatcher,
originalSchema, originalTable, routedTargetSchema, routedTargetTable) unchanged
so the call site around applyRoutingToTableInfo and d.tableInfo.Store remains
otherwise identical.

@a-cong
Copy link
Author

a-cong commented Feb 6, 2026

/test pull-cdc-mysql-integration-light

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@tests/integration_tests/create_table_like_bug/run.sh`:
- Around line 50-58: The marker-sync waiting loop (while in run.sh calling
run_sql "SELECT val FROM test.t WHERE id = 99" and checking grep for "val: 9999"
in "$WORK_DIR/sql_res.$TEST_NAME.log") can time out silently; after the loop
completes without breaking, explicitly fail the test instead of continuing.
Modify the script so that immediately after the loop you detect the timeout
(e.g. check whether the grep condition was never met or whether the loop
completed the full 30 iterations) and then print a clear error mentioning the
marker sync timeout and TEST_NAME/WORK_DIR, and exit non-zero (or call the
suite's fail helper such as fail_test if available) so the test fails with a
helpful message.
🧹 Nitpick comments (1)
tests/integration_tests/create_table_like_bug/run.sh (1)

49-49: Unnecessary sleep 3 before the polling loop.

The polling loop on Lines 51–58 already retries with 1-second intervals, so this initial 3-second sleep just adds dead time without benefit.

Proposed fix
-	sleep 3
 	i=0

Comment on lines 50 to 59
i=0
while [ $i -lt 30 ]; do
run_sql "SELECT val FROM test.t WHERE id = 99" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
if grep -q "val: 9999" "$WORK_DIR/sql_res.$TEST_NAME.log"; then
break
fi
sleep 1
i=$((i + 1))
done
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Marker-sync loop silently falls through on timeout.

If the downstream never receives the marker row within 30 seconds, the loop exits without error and the test proceeds. With set -e, the later check_contains "cnt: 3" on Line 65 would eventually catch it, but the resulting failure message would be misleading (missing count vs. sync timeout). Other patterns in TiCDC tests typically fail explicitly on timeout.

Proposed fix: add explicit failure after the loop
 	i=0
 	while [ $i -lt 30 ]; do
 		run_sql "SELECT val FROM test.t WHERE id = 99" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
 		if grep -q "val: 9999" "$WORK_DIR/sql_res.$TEST_NAME.log"; then
 			break
 		fi
 		sleep 1
 		i=$((i + 1))
 	done
+	if [ $i -ge 30 ]; then
+		echo "ERROR: Timed out waiting for initial data sync (marker row id=99)"
+		exit 1
+	fi
 	# Clean up marker row
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
i=0
while [ $i -lt 30 ]; do
run_sql "SELECT val FROM test.t WHERE id = 99" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
if grep -q "val: 9999" "$WORK_DIR/sql_res.$TEST_NAME.log"; then
break
fi
sleep 1
i=$((i + 1))
done
i=0
while [ $i -lt 30 ]; do
run_sql "SELECT val FROM test.t WHERE id = 99" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
if grep -q "val: 9999" "$WORK_DIR/sql_res.$TEST_NAME.log"; then
break
fi
sleep 1
i=$((i + 1))
done
if [ $i -ge 30 ]; then
echo "ERROR: Timed out waiting for initial data sync (marker row id=99)"
exit 1
fi
🤖 Prompt for AI Agents
In `@tests/integration_tests/create_table_like_bug/run.sh` around lines 50 - 58,
The marker-sync waiting loop (while in run.sh calling run_sql "SELECT val FROM
test.t WHERE id = 99" and checking grep for "val: 9999" in
"$WORK_DIR/sql_res.$TEST_NAME.log") can time out silently; after the loop
completes without breaking, explicitly fail the test instead of continuing.
Modify the script so that immediately after the loop you detect the timeout
(e.g. check whether the grep condition was never met or whether the loop
completed the full 30 iterations) and then print a clear error mentioning the
marker sync timeout and TEST_NAME/WORK_DIR, and exit non-zero (or call the
suite's fail helper such as fail_test if available) so the test fails with a
helpful message.

@a-cong
Copy link
Author

a-cong commented Feb 6, 2026

/test pull-cdc-mysql-integration-light

@a-cong
Copy link
Author

a-cong commented Feb 6, 2026

/test pull-cdc-mysql-integration-light

… with routing

When `CREATE TABLE t_like LIKE t` is executed, the DDL is added to table
t's DDL history for blocking purposes. When t's dispatcher processes this
DDL, it was incorrectly storing t_like's tableInfo, overwriting t's.

With routing enabled, `AssembleRows` replaces the event's tableInfo with
the stored one (when TargetSchema/TargetTable are set), causing DMLs to
be written to the wrong table.

The fix: Check if the DDL's TableInfo.TableName.TableID matches the
dispatcher's TableSpan.TableID before storing. If they don't match
(as in CREATE TABLE LIKE), skip the tableInfo update.

Also fix test cases that had mismatched TableIDs and add a unit test
verifying the CREATE TABLE LIKE scenario.

Co-Authored-By: Claude <noreply@anthropic.com>
@a-cong
Copy link
Author

a-cong commented Feb 6, 2026

/test pull-cdc-mysql-integration-light

@ti-chi-bot
Copy link

ti-chi-bot bot commented Feb 6, 2026

@a-cong: The following tests failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
pull-cdc-pulsar-integration-heavy 5eea991 link false /test pull-cdc-pulsar-integration-heavy
pull-error-log-review fd5cb48 link true /test pull-error-log-review

Full PR test history. Your PR dashboard.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

contribution This PR is from a community contributor. do-not-merge/release-note-label-needed Indicates that a PR should not merge because it's missing one of the release note labels. first-time-contributor Indicates that the PR was contributed by an external member and is a first-time contributor. ok-to-test Indicates a PR is ready to be tested. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Schema/table routing support for ticdc

3 participants