sink: Add schema/table routing support for mysql and redo sinks#3704
sink: Add schema/table routing support for mysql and redo sinks#3704a-cong wants to merge 17 commits intopingcap:masterfrom
Conversation
|
Adding the "do-not-merge/release-note-label-needed" label because no release-note block was detected, please follow our release note process to remove it. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
Hi @a-cong. Thanks for your PR. I'm waiting for a pingcap member to verify that this patch is reasonable to test. If it is, they should reply with Once the patch is verified, the new status will be reflected by the I understand the commands that are listed here. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
Summary of ChangesHello @a-cong, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the data replication capabilities by introducing schema and table routing for MySQL-compatible and Redo sinks. It allows users to dynamically remap database and table names from the source to the target, providing greater flexibility for data integration and consolidation. The changes involve updating configuration structures, implementing DDL query rewriting logic, and ensuring concurrency safety through event cloning, ultimately enabling seamless replication to custom target schemas and tables. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a significant new feature: schema and table routing for MySQL and redo log sinks. The changes are extensive, touching upon the API, configuration, dispatchers, and sinks to support this functionality. The design thoughtfully addresses potential concurrency issues by cloning event objects to prevent race conditions, especially when events are shared between the main sink and the redo log sink. A comprehensive set of unit and integration tests has been added to validate the new routing logic. Overall, this is a well-structured implementation of a complex feature. I've identified one critical issue in BatchDMLEvent.CloneForRouting that could lead to data corruption or mis-routing due to improper cloning of nested event objects. Please address this to ensure the correctness of the feature.
|
/ok-to-test |
d9fa289 to
beac037
Compare
|
/test all |
1 similar comment
|
/test all |
2bce70e to
f4a078c
Compare
|
/test all |
|
/test all |
Address reviewer concern about potential race conditions if callbacks are appended to both original and clone sharing the same backing array. Currently DDL events arrive with nil PostTxnFlushed (callbacks are added downstream), so this is a defensive/future-proofing change. If any code path later adds callbacks before cloning, sharing the backing array could cause nondeterministic callback visibility or data races.
CloneWithRouting shares the refcounted columnSchema pointer. Without incrementing the refcount and setting a finalizer, the clone could reference a recycled columnSchema if the original is GC'd first. This follows the same pattern used in UnmarshalJSONToTableInfo and WrapTableInfo.
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
|
/test pull-cdc-mysql-integration-light |
There was a problem hiding this comment.
Actionable comments posted: 5
🤖 Fix all issues with AI agents
In `@downstreamadapter/eventcollector/dispatcher_stat.go`:
- Around line 750-787: Update the debug log messages inside
dispatcherStat.applyRoutingToTableInfo to remove the function name and hyphens
and rephrase them to plain descriptive text (e.g., "router is nil" -> "no router
configured", "no routing change" -> "routing unchanged", "routing applied" ->
"routing result differs from source"); keep the same structured fields
(changefeedID, sourceSchema/sourceTable, targetSchema/targetTable) and ensure
messages use spaces instead of hyphens and do not embed the function name so
they conform to logging guidelines.
- Around line 482-500: Update the log messages inside applyRoutingToTableInfo to
remove the function name prefix: replace any occurrences of log messages that
read "applyRoutingToTableInfo: router is nil", "applyRoutingToTableInfo: no
routing change", and "applyRoutingToTableInfo: routing applied" with "router is
nil", "no routing change", and "routing applied" respectively; locate these
strings in the applyRoutingToTableInfo function and update the processLogger/log
calls (or whichever logger is used) to use the shorter messages while keeping
the same log level and context.
In `@downstreamadapter/sink/mysql/sink_test.go`:
- Around line 444-456: The NewMySQLSink call in getMysqlSinkWithRouter omits
additional parameters required by the current constructor signature; update the
call to match the same argument order/values used in other test helpers (e.g.,
the other getMysqlSink helper) by adding the missing parameters (such as the
owner flag, additional options, or logger/router position) so NewMySQLSink(ctx,
changefeedID, cfg, db, ...) matches the project's canonical call site; use the
existing tests that call NewMySQLSink as the reference to copy the exact extra
arguments and order.
In `@pkg/common/table_info.go`:
- Around line 150-179: CloneWithRouting currently constructs cloned :=
&TableInfo{...} but omits copying the ActiveActiveTable and SoftDeleteTable
flags, causing routed TableInfo to lose those semantics; update CloneWithRouting
(the cloned TableInfo initializer in pkg/common/table_info.go) to include
ActiveActiveTable: ti.ActiveActiveTable and SoftDeleteTable: ti.SoftDeleteTable
so the clone preserves those flags, and retain the existing refcount/finalizer
logic for columnSchema (GetSharedColumnSchemaStorage().incColumnSchemaCount,
runtime.SetFinalizer, tryReleaseColumnSchema).
In `@pkg/sink/util/ddl_routing.go`:
- Around line 83-135: The error returns from parser.ParseOneStmt,
FetchDDLTables, and RenameDDLTable should preserve stack traces: replace the
existing errors.Errorf(...) wrappers with errors.Annotatef(errors.Trace(err),
...) so the original error is traced and annotated with the same contextual
message (e.g., "failed to parse DDL query for routing: %s", "failed to fetch
tables from DDL for routing: %s", "failed to rewrite DDL query with routing:
%s"). Update the three return paths that reference parser.ParseOneStmt,
FetchDDLTables, and RenameDDLTable accordingly while keeping the original
query/context text and formatting.
| originalDDL := events[0].Event.(*commonEvent.DDLEvent) | ||
| // Clone the DDL event before applying routing. This is defensive to ensure | ||
| // we don't mutate the original event which may be referenced elsewhere. | ||
| // The clone allows us to safely modify Query and TargetSchemaName fields. | ||
| ddl := originalDDL.CloneForRouting() | ||
| events[0].Event = ddl | ||
| d.tableInfoVersion.Store(ddl.FinishedTs) | ||
| if ddl.TableInfo != nil { | ||
| d.tableInfo.Store(ddl.TableInfo) | ||
| // Apply routing to get a TableInfo with TargetSchema/TargetTable set. | ||
| // This returns a cloned TableInfo to avoid mutating the shared original, | ||
| // which may be used by other changefeeds with different routing rules. | ||
| routedTableInfo := d.applyRoutingToTableInfo(ddl.TableInfo) | ||
| ddl.TableInfo = routedTableInfo | ||
| d.tableInfo.Store(routedTableInfo) | ||
| } | ||
| // Also apply routing to MultipleTableInfos for multi-table DDLs (e.g., RENAME TABLE) | ||
| for i, tableInfo := range ddl.MultipleTableInfos { | ||
| ddl.MultipleTableInfos[i] = d.applyRoutingToTableInfo(tableInfo) | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Inspect CloneForRouting and PostTxnFlushed handling
rg -n "CloneForRouting" -g '*.go' -C 3
rg -n "PostTxnFlushed" -g '*.go' -C 2Repository: pingcap/ticdc
Length of output: 22567
🏁 Script executed:
#!/bin/bash
# Find the applyRoutingToTableInfo implementation and check log messages
rg -n "applyRoutingToTableInfo" downstreamadapter/eventcollector/dispatcher_stat.go -C 5 -A 40Repository: pingcap/ticdc
Length of output: 5887
Fix log message style in applyRoutingToTableInfo.
The DDL cloning logic properly deep-copies PostTxnFlushed with an explicit make and copy (verified in tests), so the shallow-copy concern is not an issue. However, the log messages at lines 760, 772, and 779 violate the coding guideline: "log message strings should not include function names." Change:
"applyRoutingToTableInfo: router is nil"→"router is nil""applyRoutingToTableInfo: no routing change"→"no routing change""applyRoutingToTableInfo: routing applied"→"routing applied"
🤖 Prompt for AI Agents
In `@downstreamadapter/eventcollector/dispatcher_stat.go` around lines 482 - 500,
Update the log messages inside applyRoutingToTableInfo to remove the function
name prefix: replace any occurrences of log messages that read
"applyRoutingToTableInfo: router is nil", "applyRoutingToTableInfo: no routing
change", and "applyRoutingToTableInfo: routing applied" with "router is nil",
"no routing change", and "routing applied" respectively; locate these strings in
the applyRoutingToTableInfo function and update the processLogger/log calls (or
whichever logger is used) to use the shorter messages while keeping the same log
level and context.
| // Parse the DDL query using TiDB parser | ||
| p := parser.New() | ||
| stmt, err := p.ParseOneStmt(ddl.Query, "", "") | ||
| if err != nil { | ||
| return nil, errors.Errorf("failed to parse DDL query for routing: %v (query: %s)", err, ddl.Query) | ||
| } | ||
|
|
||
| // Fetch source tables from the DDL | ||
| sourceTables, err := FetchDDLTables(defaultSchema, stmt) | ||
| if err != nil { | ||
| return nil, errors.Errorf("failed to fetch tables from DDL for routing: %v (query: %s)", err, ddl.Query) | ||
| } | ||
|
|
||
| if len(sourceTables) == 0 { | ||
| return result, nil | ||
| } | ||
|
|
||
| // Build target tables by applying routing rules | ||
| targetTables := make([]*filter.Table, 0, len(sourceTables)) | ||
| hasRouting := false | ||
| for _, srcTable := range sourceTables { | ||
| targetSchema, targetTable := router.Route(srcTable.Schema, srcTable.Name) | ||
| if targetSchema != srcTable.Schema || targetTable != srcTable.Name { | ||
| hasRouting = true | ||
| } | ||
| targetTables = append(targetTables, &filter.Table{ | ||
| Schema: targetSchema, | ||
| Name: targetTable, | ||
| }) | ||
| } | ||
|
|
||
| if !hasRouting { | ||
| return result, nil | ||
| } | ||
|
|
||
| // Routing rules matched - mark as applied regardless of whether query changes | ||
| result.RoutingApplied = true | ||
|
|
||
| // Set the target schema for the USE command when executing the DDL. | ||
| // We use the first target table's schema because: | ||
| // 1. For single-table DDLs, this is the correct target schema | ||
| // 2. For multi-table DDLs (e.g., RENAME TABLE), all table references in the | ||
| // rewritten query are fully qualified, so the USE command just needs to | ||
| // switch to any database the user has access to - the first target schema | ||
| // is guaranteed to be accessible since routing was configured for it. | ||
| if len(targetTables) > 0 { | ||
| result.TargetSchemaName = targetTables[0].Schema | ||
| } | ||
|
|
||
| // Rewrite the DDL with target tables | ||
| newQuery, err := RenameDDLTable(stmt, targetTables) | ||
| if err != nil { | ||
| return nil, errors.Errorf("failed to rewrite DDL query with routing: %v (query: %s)", err, ddl.Query) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
cat -n pkg/sink/util/ddl_routing.go | head -150Repository: pingcap/ticdc
Length of output: 6610
🏁 Script executed:
rg -n "func FetchDDLTables|func RenameDDLTable" pkg/sink/util/Repository: pingcap/ticdc
Length of output: 297
🏁 Script executed:
sed -n '62,90p;129,160p' pkg/sink/util/ddl_table_utils.goRepository: pingcap/ticdc
Length of output: 2004
Wrap parser and DDL rewrite errors with errors.Trace to preserve stack traces. Errors from parser.ParseOneStmt (line 85), FetchDDLTables (line 91), and RenameDDLTable (line 133) should be wrapped with errors.Trace instead of errors.Errorf. The third-party parser call especially requires errors.Trace per coding guidelines; the internal functions should also preserve stack traces. Use errors.Annotatef(errors.Trace(err), ...) to wrap each error while adding context.
Proposed fix
- stmt, err := p.ParseOneStmt(ddl.Query, "", "")
- if err != nil {
- return nil, errors.Errorf("failed to parse DDL query for routing: %v (query: %s)", err, ddl.Query)
- }
+ stmt, err := p.ParseOneStmt(ddl.Query, "", "")
+ if err != nil {
+ return nil, errors.Annotatef(errors.Trace(err), "failed to parse DDL query for routing (query: %s)", ddl.Query)
+ }
- sourceTables, err := FetchDDLTables(defaultSchema, stmt)
- if err != nil {
- return nil, errors.Errorf("failed to fetch tables from DDL for routing: %v (query: %s)", err, ddl.Query)
- }
+ sourceTables, err := FetchDDLTables(defaultSchema, stmt)
+ if err != nil {
+ return nil, errors.Annotatef(errors.Trace(err), "failed to fetch tables from DDL for routing (query: %s)", ddl.Query)
+ }
- newQuery, err := RenameDDLTable(stmt, targetTables)
- if err != nil {
- return nil, errors.Errorf("failed to rewrite DDL query with routing: %v (query: %s)", err, ddl.Query)
- }
+ newQuery, err := RenameDDLTable(stmt, targetTables)
+ if err != nil {
+ return nil, errors.Annotatef(errors.Trace(err), "failed to rewrite DDL query with routing (query: %s)", ddl.Query)
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Parse the DDL query using TiDB parser | |
| p := parser.New() | |
| stmt, err := p.ParseOneStmt(ddl.Query, "", "") | |
| if err != nil { | |
| return nil, errors.Errorf("failed to parse DDL query for routing: %v (query: %s)", err, ddl.Query) | |
| } | |
| // Fetch source tables from the DDL | |
| sourceTables, err := FetchDDLTables(defaultSchema, stmt) | |
| if err != nil { | |
| return nil, errors.Errorf("failed to fetch tables from DDL for routing: %v (query: %s)", err, ddl.Query) | |
| } | |
| if len(sourceTables) == 0 { | |
| return result, nil | |
| } | |
| // Build target tables by applying routing rules | |
| targetTables := make([]*filter.Table, 0, len(sourceTables)) | |
| hasRouting := false | |
| for _, srcTable := range sourceTables { | |
| targetSchema, targetTable := router.Route(srcTable.Schema, srcTable.Name) | |
| if targetSchema != srcTable.Schema || targetTable != srcTable.Name { | |
| hasRouting = true | |
| } | |
| targetTables = append(targetTables, &filter.Table{ | |
| Schema: targetSchema, | |
| Name: targetTable, | |
| }) | |
| } | |
| if !hasRouting { | |
| return result, nil | |
| } | |
| // Routing rules matched - mark as applied regardless of whether query changes | |
| result.RoutingApplied = true | |
| // Set the target schema for the USE command when executing the DDL. | |
| // We use the first target table's schema because: | |
| // 1. For single-table DDLs, this is the correct target schema | |
| // 2. For multi-table DDLs (e.g., RENAME TABLE), all table references in the | |
| // rewritten query are fully qualified, so the USE command just needs to | |
| // switch to any database the user has access to - the first target schema | |
| // is guaranteed to be accessible since routing was configured for it. | |
| if len(targetTables) > 0 { | |
| result.TargetSchemaName = targetTables[0].Schema | |
| } | |
| // Rewrite the DDL with target tables | |
| newQuery, err := RenameDDLTable(stmt, targetTables) | |
| if err != nil { | |
| return nil, errors.Errorf("failed to rewrite DDL query with routing: %v (query: %s)", err, ddl.Query) | |
| // Parse the DDL query using TiDB parser | |
| p := parser.New() | |
| stmt, err := p.ParseOneStmt(ddl.Query, "", "") | |
| if err != nil { | |
| return nil, errors.Annotatef(errors.Trace(err), "failed to parse DDL query for routing (query: %s)", ddl.Query) | |
| } | |
| // Fetch source tables from the DDL | |
| sourceTables, err := FetchDDLTables(defaultSchema, stmt) | |
| if err != nil { | |
| return nil, errors.Annotatef(errors.Trace(err), "failed to fetch tables from DDL for routing (query: %s)", ddl.Query) | |
| } | |
| if len(sourceTables) == 0 { | |
| return result, nil | |
| } | |
| // Build target tables by applying routing rules | |
| targetTables := make([]*filter.Table, 0, len(sourceTables)) | |
| hasRouting := false | |
| for _, srcTable := range sourceTables { | |
| targetSchema, targetTable := router.Route(srcTable.Schema, srcTable.Name) | |
| if targetSchema != srcTable.Schema || targetTable != srcTable.Name { | |
| hasRouting = true | |
| } | |
| targetTables = append(targetTables, &filter.Table{ | |
| Schema: targetSchema, | |
| Name: targetTable, | |
| }) | |
| } | |
| if !hasRouting { | |
| return result, nil | |
| } | |
| // Routing rules matched - mark as applied regardless of whether query changes | |
| result.RoutingApplied = true | |
| // Set the target schema for the USE command when executing the DDL. | |
| // We use the first target table's schema because: | |
| // 1. For single-table DDLs, this is the correct target schema | |
| // 2. For multi-table DDLs (e.g., RENAME TABLE), all table references in the | |
| // rewritten query are fully qualified, so the USE command just needs to | |
| // switch to any database the user has access to - the first target schema | |
| // is guaranteed to be accessible since routing was configured for it. | |
| if len(targetTables) > 0 { | |
| result.TargetSchemaName = targetTables[0].Schema | |
| } | |
| // Rewrite the DDL with target tables | |
| newQuery, err := RenameDDLTable(stmt, targetTables) | |
| if err != nil { | |
| return nil, errors.Annotatef(errors.Trace(err), "failed to rewrite DDL query with routing (query: %s)", ddl.Query) | |
| } |
🤖 Prompt for AI Agents
In `@pkg/sink/util/ddl_routing.go` around lines 83 - 135, The error returns from
parser.ParseOneStmt, FetchDDLTables, and RenameDDLTable should preserve stack
traces: replace the existing errors.Errorf(...) wrappers with
errors.Annotatef(errors.Trace(err), ...) so the original error is traced and
annotated with the same contextual message (e.g., "failed to parse DDL query for
routing: %s", "failed to fetch tables from DDL for routing: %s", "failed to
rewrite DDL query with routing: %s"). Update the three return paths that
reference parser.ParseOneStmt, FetchDDLTables, and RenameDDLTable accordingly
while keeping the original query/context text and formatting.
|
/test pull-cdc-mysql-integration-light |
|
/test pull-cdc-mysql-integration-light |
1 similar comment
|
/test pull-cdc-mysql-integration-light |
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
|
/test pull-cdc-mysql-integration-light |
|
/test pull-cdc-mysql-integration-light |
There was a problem hiding this comment.
Actionable comments posted: 5
🤖 Fix all issues with AI agents
In `@downstreamadapter/eventcollector/dispatcher_stat.go`:
- Around line 492-497: The log message in the log.Info call for the DDL event
includes the function-name prefix "handleSingleDataEvents:" and a hyphen; update
the first argument to remove the function name and hyphen (e.g., change
"handleSingleDataEvents: processing DDL event" to "processing DDL event") while
keeping the structured fields (originalDDL, d.target.GetChangefeedID(),
d.getDispatcherID(), originalDDL.GetDDLType(), originalDDL.Query) unchanged so
identifiers like originalDDL and methods getDispatcherID/GetChangefeedID remain
as-is.
- Around line 669-678: The log message passed to log.Info inside the block that
calls d.applyRoutingToTableInfo and d.tableInfo.Store currently includes the
function name ("handleHandshakeEvent: storing routed tableInfo") and a
hyphenated style; update that log.Info call to use a plain, function-name-free
message such as "storing routed tableInfo" (or "storing routed table info") with
spaces instead of hyphens, leaving the existing structured fields (changefeedID,
dispatcher, originalSchema, originalTable, routedTargetSchema,
routedTargetTable) unchanged so the call site around applyRoutingToTableInfo and
d.tableInfo.Store remains otherwise identical.
- Around line 392-402: Remove the function-name prefix and hyphenated
punctuation from the two log messages: instead of starting the message with
"handleBatchDataEvents: DROPPED - stale epoch" and "handleBatchDataEvents:
DROPPED - sequence verification failed, resetting dispatcher", change them to
plain space-separated messages (e.g. "DROPPED stale epoch" and "DROPPED sequence
verification failed resetting dispatcher") in the log.Info calls used near
verifyEventSequence and the epoch check; keep the existing zap fields
(d.target.GetChangefeedID(), d.getDispatcherID(),
commonEvent.TypeToString(event.GetType())) intact so only the log message string
is modified.
- Around line 419-425: The log in handleBatchDataEvents that reports loaded
tableInfo for DML (currently using log.Info with fields like
d.target.GetChangefeedID(), d.getDispatcherID(), tableInfo.TableName.Schema/
Table/ GetTargetSchema/GetTargetTable) should be changed to log.Debug so it
doesn't flood production logs; update the call site in handleBatchDataEvents to
use log.Debug (preserving the same structured fields and message) so the
information remains available for debugging but is suppressed at Info level.
In `@downstreamadapter/eventcollector/helper.go`:
- Around line 75-81: Replace the noisy log.Info call with a debug-level message
and remove the function-name prefix: change log.Info(...) to log.Debug(...) and
update the message string from "EventsHandler.Handle: incoming events" to a
plain phrase like "incoming events" (keep the existing zap fields such as
zap.Stringer("dispatcher", stat.target.GetId()), zap.Int("eventLen",
len(events)), zap.String("firstEventType",
commonEvent.TypeToString(events[0].GetType()))). If you prefer to eliminate
redundancy with the existing debug at line 74, simply remove this log statement
entirely.
🧹 Nitpick comments (6)
tests/integration_tests/create_table_like_bug/run.sh (1)
43-48:sleep 2is a fragile sync mechanism — consider a polling check instead.If CDC startup or the initial snapshot takes longer than 2 seconds, the assertion on line 48 will fail, making this test flaky. The script already uses
check_table_existswith a timeout for the finish marker (line 74). A similar polling/retry approach for the initial data count would be more robust.pkg/sink/util/ddl_table_utils.go (2)
92-124: Partial AST mutation on error leavesstmtin an inconsistent state.When
tableRenameVisitorencounters moreTableNamenodes thantargetTables, it setshasErrand aborts. However, some table nodes have already been renamed in-place on the caller'sstmt. If a caller ignores the error or reuses thestmt, the AST will contain a mix of original and routed names.This is mitigated by the fact that
RenameDDLTablereturns an error, but the caller'sstmtis still silently mutated. Consider documenting thatstmtis modified in-place and must not be reused on error, or working on a clone.
166-167: Nit: prefernew(bytes.Buffer)over declaring an unused nil slice.
var b []byteis unused as a meaningful initial value —bytes.NewBuffer(b)behaves identically tobytes.NewBuffer(nil). A more idiomatic approach:Proposed simplification
- var b []byte - bf := bytes.NewBuffer(b) + bf := new(bytes.Buffer)tests/integration_tests/sink_routing/run.sh (3)
10-11: Quote shell variables to prevent word splitting.While unlikely to cause issues in a controlled test environment, unquoted variables throughout the script (e.g.,
$CUR,$WORK_DIR,$OUT_DIR) are technically vulnerable to word splitting. This pattern appears consistently throughout the file (Lines 10, 11, 22, 24, 26, 38, 41, etc.).Example fix for common occurrences
-source $CUR/../_utils/test_prepare -WORK_DIR=$OUT_DIR/$TEST_NAME +source "$CUR/../_utils/test_prepare" +WORK_DIR="$OUT_DIR/$TEST_NAME"
105-108: Consider addingcheck_table_not_existsforsource_db.productsas well.Lines 106-107 verify that
source_db.usersandsource_db.ordersdon't exist downstream, butsource_db.productsis not checked despitetarget_db.products_routedbeing verified at Line 103. This is a minor test coverage gap.Suggested addition
check_table_not_exists source_db.users ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} check_table_not_exists source_db.orders ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_table_not_exists source_db.products ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
84-96: Diagnostic grep for "error|panic|fail" will produce many false positives.The pattern
grep -i "error\|panic\|fail"will match field names, log context, and unrelated strings (e.g., "failover", "errorCode" in structured log fields). Since this is purely diagnostic output and not a test assertion, it's not harmful — just noisy.
| log.Info("handleBatchDataEvents: DROPPED - stale epoch", | ||
| zap.Stringer("changefeedID", d.target.GetChangefeedID()), | ||
| zap.Stringer("dispatcher", d.getDispatcherID()), | ||
| zap.String("eventType", commonEvent.TypeToString(event.GetType())), | ||
| zap.Any("event", event.Event)) | ||
| zap.String("eventType", commonEvent.TypeToString(event.GetType()))) | ||
| continue | ||
| } | ||
| if !d.verifyEventSequence(event) { | ||
| log.Info("handleBatchDataEvents: DROPPED - sequence verification failed, resetting dispatcher", | ||
| zap.Stringer("changefeedID", d.target.GetChangefeedID()), | ||
| zap.Stringer("dispatcher", d.getDispatcherID()), | ||
| zap.String("eventType", commonEvent.TypeToString(event.GetType()))) |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Log messages include function names, violating coding guidelines.
Lines 392 and 399 prefix messages with "handleBatchDataEvents: ..." and use hyphens. Per guidelines, log messages should not include function names and should use spaces instead of hyphens.
Suggested fix
- log.Info("handleBatchDataEvents: DROPPED - stale epoch",
+ log.Info("dropped event due to stale epoch",
zap.Stringer("changefeedID", d.target.GetChangefeedID()),
zap.Stringer("dispatcher", d.getDispatcherID()),
zap.String("eventType", commonEvent.TypeToString(event.GetType())))
- log.Info("handleBatchDataEvents: DROPPED - sequence verification failed, resetting dispatcher",
+ log.Info("dropped event due to sequence verification failure, resetting dispatcher",
zap.Stringer("changefeedID", d.target.GetChangefeedID()),
zap.Stringer("dispatcher", d.getDispatcherID()),
zap.String("eventType", commonEvent.TypeToString(event.GetType())))As per coding guidelines, log message strings should not include function names and should avoid hyphens (use spaces instead).
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| log.Info("handleBatchDataEvents: DROPPED - stale epoch", | |
| zap.Stringer("changefeedID", d.target.GetChangefeedID()), | |
| zap.Stringer("dispatcher", d.getDispatcherID()), | |
| zap.String("eventType", commonEvent.TypeToString(event.GetType())), | |
| zap.Any("event", event.Event)) | |
| zap.String("eventType", commonEvent.TypeToString(event.GetType()))) | |
| continue | |
| } | |
| if !d.verifyEventSequence(event) { | |
| log.Info("handleBatchDataEvents: DROPPED - sequence verification failed, resetting dispatcher", | |
| zap.Stringer("changefeedID", d.target.GetChangefeedID()), | |
| zap.Stringer("dispatcher", d.getDispatcherID()), | |
| zap.String("eventType", commonEvent.TypeToString(event.GetType()))) | |
| log.Info("dropped event due to stale epoch", | |
| zap.Stringer("changefeedID", d.target.GetChangefeedID()), | |
| zap.Stringer("dispatcher", d.getDispatcherID()), | |
| zap.String("eventType", commonEvent.TypeToString(event.GetType()))) | |
| continue | |
| } | |
| if !d.verifyEventSequence(event) { | |
| log.Info("dropped event due to sequence verification failure, resetting dispatcher", | |
| zap.Stringer("changefeedID", d.target.GetChangefeedID()), | |
| zap.Stringer("dispatcher", d.getDispatcherID()), | |
| zap.String("eventType", commonEvent.TypeToString(event.GetType()))) |
🤖 Prompt for AI Agents
In `@downstreamadapter/eventcollector/dispatcher_stat.go` around lines 392 - 402,
Remove the function-name prefix and hyphenated punctuation from the two log
messages: instead of starting the message with "handleBatchDataEvents: DROPPED -
stale epoch" and "handleBatchDataEvents: DROPPED - sequence verification failed,
resetting dispatcher", change them to plain space-separated messages (e.g.
"DROPPED stale epoch" and "DROPPED sequence verification failed resetting
dispatcher") in the log.Info calls used near verifyEventSequence and the epoch
check; keep the existing zap fields (d.target.GetChangefeedID(),
d.getDispatcherID(), commonEvent.TypeToString(event.GetType())) intact so only
the log message string is modified.
| log.Info("handleBatchDataEvents: loaded tableInfo for DML", | ||
| zap.Stringer("changefeedID", d.target.GetChangefeedID()), | ||
| zap.Stringer("dispatcher", d.getDispatcherID()), | ||
| zap.String("schema", tableInfo.TableName.Schema), | ||
| zap.String("table", tableInfo.TableName.Table), | ||
| zap.String("targetSchema", tableInfo.TableName.GetTargetSchema()), | ||
| zap.String("targetTable", tableInfo.TableName.GetTargetTable())) |
There was a problem hiding this comment.
Log level should be Debug, not Info, for per-batch DML tableInfo loading.
This log fires on every batch DML event. In high-throughput workloads this will flood the logs. The information is useful for debugging routing but not for routine production monitoring.
Suggested fix
- log.Info("handleBatchDataEvents: loaded tableInfo for DML",
+ log.Debug("loaded tableInfo for DML",
zap.Stringer("changefeedID", d.target.GetChangefeedID()),
zap.Stringer("dispatcher", d.getDispatcherID()),
zap.String("schema", tableInfo.TableName.Schema),
zap.String("table", tableInfo.TableName.Table),
zap.String("targetSchema", tableInfo.TableName.GetTargetSchema()),
zap.String("targetTable", tableInfo.TableName.GetTargetTable()))🤖 Prompt for AI Agents
In `@downstreamadapter/eventcollector/dispatcher_stat.go` around lines 419 - 425,
The log in handleBatchDataEvents that reports loaded tableInfo for DML
(currently using log.Info with fields like d.target.GetChangefeedID(),
d.getDispatcherID(), tableInfo.TableName.Schema/ Table/
GetTargetSchema/GetTargetTable) should be changed to log.Debug so it doesn't
flood production logs; update the call site in handleBatchDataEvents to use
log.Debug (preserving the same structured fields and message) so the information
remains available for debugging but is suppressed at Info level.
| // Apply routing and store the (possibly cloned) TableInfo | ||
| routedTableInfo := d.applyRoutingToTableInfo(tableInfo) | ||
| log.Info("handleHandshakeEvent: storing routed tableInfo", | ||
| zap.Stringer("changefeedID", d.target.GetChangefeedID()), | ||
| zap.Stringer("dispatcher", d.getDispatcherID()), | ||
| zap.String("originalSchema", tableInfo.TableName.Schema), | ||
| zap.String("originalTable", tableInfo.TableName.Table), | ||
| zap.String("routedTargetSchema", routedTableInfo.TableName.GetTargetSchema()), | ||
| zap.String("routedTargetTable", routedTableInfo.TableName.GetTargetTable())) | ||
| d.tableInfo.Store(routedTableInfo) |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Log message includes function name prefix.
"handleHandshakeEvent: storing routed tableInfo" should not contain the function name.
Suggested fix
- log.Info("handleHandshakeEvent: storing routed tableInfo",
+ log.Info("storing routed table info from handshake",As per coding guidelines, log message strings should not include function names and should avoid hyphens (use spaces instead).
🤖 Prompt for AI Agents
In `@downstreamadapter/eventcollector/dispatcher_stat.go` around lines 669 - 678,
The log message passed to log.Info inside the block that calls
d.applyRoutingToTableInfo and d.tableInfo.Store currently includes the function
name ("handleHandshakeEvent: storing routed tableInfo") and a hyphenated style;
update that log.Info call to use a plain, function-name-free message such as
"storing routed tableInfo" (or "storing routed table info") with spaces instead
of hyphens, leaving the existing structured fields (changefeedID, dispatcher,
originalSchema, originalTable, routedTargetSchema, routedTargetTable) unchanged
so the call site around applyRoutingToTableInfo and d.tableInfo.Store remains
otherwise identical.
|
/test pull-cdc-mysql-integration-light |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@tests/integration_tests/create_table_like_bug/run.sh`:
- Around line 50-58: The marker-sync waiting loop (while in run.sh calling
run_sql "SELECT val FROM test.t WHERE id = 99" and checking grep for "val: 9999"
in "$WORK_DIR/sql_res.$TEST_NAME.log") can time out silently; after the loop
completes without breaking, explicitly fail the test instead of continuing.
Modify the script so that immediately after the loop you detect the timeout
(e.g. check whether the grep condition was never met or whether the loop
completed the full 30 iterations) and then print a clear error mentioning the
marker sync timeout and TEST_NAME/WORK_DIR, and exit non-zero (or call the
suite's fail helper such as fail_test if available) so the test fails with a
helpful message.
🧹 Nitpick comments (1)
tests/integration_tests/create_table_like_bug/run.sh (1)
49-49: Unnecessarysleep 3before the polling loop.The polling loop on Lines 51–58 already retries with 1-second intervals, so this initial 3-second sleep just adds dead time without benefit.
Proposed fix
- sleep 3 i=0
| i=0 | ||
| while [ $i -lt 30 ]; do | ||
| run_sql "SELECT val FROM test.t WHERE id = 99" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} | ||
| if grep -q "val: 9999" "$WORK_DIR/sql_res.$TEST_NAME.log"; then | ||
| break | ||
| fi | ||
| sleep 1 | ||
| i=$((i + 1)) | ||
| done |
There was a problem hiding this comment.
Marker-sync loop silently falls through on timeout.
If the downstream never receives the marker row within 30 seconds, the loop exits without error and the test proceeds. With set -e, the later check_contains "cnt: 3" on Line 65 would eventually catch it, but the resulting failure message would be misleading (missing count vs. sync timeout). Other patterns in TiCDC tests typically fail explicitly on timeout.
Proposed fix: add explicit failure after the loop
i=0
while [ $i -lt 30 ]; do
run_sql "SELECT val FROM test.t WHERE id = 99" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
if grep -q "val: 9999" "$WORK_DIR/sql_res.$TEST_NAME.log"; then
break
fi
sleep 1
i=$((i + 1))
done
+ if [ $i -ge 30 ]; then
+ echo "ERROR: Timed out waiting for initial data sync (marker row id=99)"
+ exit 1
+ fi
# Clean up marker row📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| i=0 | |
| while [ $i -lt 30 ]; do | |
| run_sql "SELECT val FROM test.t WHERE id = 99" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} | |
| if grep -q "val: 9999" "$WORK_DIR/sql_res.$TEST_NAME.log"; then | |
| break | |
| fi | |
| sleep 1 | |
| i=$((i + 1)) | |
| done | |
| i=0 | |
| while [ $i -lt 30 ]; do | |
| run_sql "SELECT val FROM test.t WHERE id = 99" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} | |
| if grep -q "val: 9999" "$WORK_DIR/sql_res.$TEST_NAME.log"; then | |
| break | |
| fi | |
| sleep 1 | |
| i=$((i + 1)) | |
| done | |
| if [ $i -ge 30 ]; then | |
| echo "ERROR: Timed out waiting for initial data sync (marker row id=99)" | |
| exit 1 | |
| fi |
🤖 Prompt for AI Agents
In `@tests/integration_tests/create_table_like_bug/run.sh` around lines 50 - 58,
The marker-sync waiting loop (while in run.sh calling run_sql "SELECT val FROM
test.t WHERE id = 99" and checking grep for "val: 9999" in
"$WORK_DIR/sql_res.$TEST_NAME.log") can time out silently; after the loop
completes without breaking, explicitly fail the test instead of continuing.
Modify the script so that immediately after the loop you detect the timeout
(e.g. check whether the grep condition was never met or whether the loop
completed the full 30 iterations) and then print a clear error mentioning the
marker sync timeout and TEST_NAME/WORK_DIR, and exit non-zero (or call the
suite's fail helper such as fail_test if available) so the test fails with a
helpful message.
|
/test pull-cdc-mysql-integration-light |
|
/test pull-cdc-mysql-integration-light |
… with routing When `CREATE TABLE t_like LIKE t` is executed, the DDL is added to table t's DDL history for blocking purposes. When t's dispatcher processes this DDL, it was incorrectly storing t_like's tableInfo, overwriting t's. With routing enabled, `AssembleRows` replaces the event's tableInfo with the stored one (when TargetSchema/TargetTable are set), causing DMLs to be written to the wrong table. The fix: Check if the DDL's TableInfo.TableName.TableID matches the dispatcher's TableSpan.TableID before storing. If they don't match (as in CREATE TABLE LIKE), skip the tableInfo update. Also fix test cases that had mismatched TableIDs and add a unit test verifying the CREATE TABLE LIKE scenario. Co-Authored-By: Claude <noreply@anthropic.com>
|
/test pull-cdc-mysql-integration-light |
|
@a-cong: The following tests failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
What problem does this PR solve?
Issue Number: close #3700
What is changed and how it works?
Porting changes from pingcap/tiflow#12435 to new ticdc architecture.
The
dispatchersconfig is the same as in tiflow -We also add the
TargetSchemaandTargetTablefields to theTableNamestruct. Since there is now a sharedTableInfoobject that can be accessed by multiple changefeeds and a new concept ofDispatcherobjects, we clone theTableInfoin each dispatcher with the routing information for that table inhandleHandshakeEvent(which is executed when the dispatcher starts), and then store that cloned/routed TableInfo for the dispatcher. Subsequent DMLs then look up the stored info. Subsequent DDLs also explicitly update routing inhandleSingleDataEvents.Another difference between ticdc and tiflow is that DDLs are now handled per-sink instead of in one single entry point. Because of this, we need to apply routing in both the mysql and redo sinks separately - the shared rewriting functionality is in
pkg/sink/util/ddl_routing.go.We also copied
FetchDDLTablesandRenameDDLTable(and the corresponding tests) from https://github.com/pingcap/tiflow/blob/master/dm/pkg/parser/common.go so we could use them to rewrite DDLs.Check List
Tests
Questions
Do you need to update user documentation, design documentation or monitoring documentation?
Yes - will do so in separate PR
Release note
Summary by CodeRabbit
Release Notes
New Features
{schema}and{table}placeholder supportTesting