Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions downstreamadapter/sink/redo/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2026 PingCAP, Inc.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The copyright year is set to 2026, which is in the future. Please correct it to the current year.

Suggested change
// Copyright 2026 PingCAP, Inc.
// Copyright 2024 PingCAP, Inc.

//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package redo

import (
"time"

"github.com/pingcap/ticdc/pkg/common"
"github.com/pingcap/ticdc/pkg/metrics"
"github.com/pingcap/ticdc/pkg/redo"
"github.com/prometheus/client_golang/prometheus"
)

type redoSinkMetrics struct {
changefeedID common.ChangeFeedID

rowWriteLogDuration prometheus.Observer
ddlWriteLogDuration prometheus.Observer

rowTotalCount prometheus.Counter
ddlTotalCount prometheus.Counter

rowWorkerBusyRatio prometheus.Counter
ddlWorkerBusyRatio prometheus.Counter
}

func newRedoSinkMetrics(changefeedID common.ChangeFeedID) *redoSinkMetrics {
keyspace := changefeedID.Keyspace()
changefeed := changefeedID.Name()
return &redoSinkMetrics{
changefeedID: changefeedID,
rowWriteLogDuration: metrics.RedoWriteLogDurationHistogram.
WithLabelValues(keyspace, changefeed, redo.RedoRowLogFileType),
ddlWriteLogDuration: metrics.RedoWriteLogDurationHistogram.
WithLabelValues(keyspace, changefeed, redo.RedoDDLLogFileType),
rowTotalCount: metrics.RedoTotalRowsCountGauge.
WithLabelValues(keyspace, changefeed, redo.RedoRowLogFileType),
ddlTotalCount: metrics.RedoTotalRowsCountGauge.
WithLabelValues(keyspace, changefeed, redo.RedoDDLLogFileType),
rowWorkerBusyRatio: metrics.RedoWorkerBusyRatio.
WithLabelValues(keyspace, changefeed, redo.RedoRowLogFileType),
ddlWorkerBusyRatio: metrics.RedoWorkerBusyRatio.
WithLabelValues(keyspace, changefeed, redo.RedoDDLLogFileType),
}
}

func (m *redoSinkMetrics) observeRowWrite(rows int, duration time.Duration) {
if rows > 0 {
m.rowTotalCount.Add(float64(rows))
}
m.rowWriteLogDuration.Observe(duration.Seconds())
m.rowWorkerBusyRatio.Add(duration.Seconds())
}

func (m *redoSinkMetrics) observeDDLWrite(duration time.Duration) {
m.ddlTotalCount.Inc()
m.ddlWriteLogDuration.Observe(duration.Seconds())
m.ddlWorkerBusyRatio.Add(duration.Seconds())
}

func (m *redoSinkMetrics) close() {
keyspace := m.changefeedID.Keyspace()
changefeed := m.changefeedID.Name()

for _, logType := range []string{redo.RedoRowLogFileType, redo.RedoDDLLogFileType} {
metrics.RedoWriteLogDurationHistogram.DeleteLabelValues(keyspace, changefeed, logType)
metrics.RedoTotalRowsCountGauge.DeleteLabelValues(keyspace, changefeed, logType)
metrics.RedoWorkerBusyRatio.DeleteLabelValues(keyspace, changefeed, logType)
}
}
105 changes: 53 additions & 52 deletions downstreamadapter/sink/redo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/metrics"
"github.com/pingcap/ticdc/pkg/redo"
"github.com/pingcap/ticdc/pkg/redo/writer"
"github.com/pingcap/ticdc/pkg/redo/writer/factory"
Expand All @@ -44,9 +43,10 @@ type Sink struct {
logBuffer *chann.UnlimitedChannel[writer.RedoEvent, any]

// isNormal indicate whether the sink is in the normal state.
isNormal *atomic.Bool
isClosed *atomic.Bool
statistics *metrics.Statistics
isNormal *atomic.Bool
isClosed *atomic.Bool

metric *redoSinkMetrics
}

func Verify(ctx context.Context, changefeedID common.ChangeFeedID, cfg *config.ConsistentConfig) error {
Expand All @@ -68,10 +68,9 @@ func New(ctx context.Context, changefeedID common.ChangeFeedID,
ChangeFeedID: changefeedID,
MaxLogSizeInBytes: util.GetOrZero(cfg.MaxLogSize) * redo.Megabyte,
},
logBuffer: chann.NewUnlimitedChannelDefault[writer.RedoEvent](),
isNormal: atomic.NewBool(true),
isClosed: atomic.NewBool(false),
statistics: metrics.NewStatistics(changefeedID, "redo"),
logBuffer: chann.NewUnlimitedChannelDefault[writer.RedoEvent](),
isNormal: atomic.NewBool(true),
isClosed: atomic.NewBool(false),
}
start := time.Now()
ddlWriter, err := factory.NewRedoLogWriter(s.ctx, s.cfg, redo.RedoDDLLogFileType)
Expand All @@ -94,6 +93,7 @@ func New(ctx context.Context, changefeedID common.ChangeFeedID,
}
s.ddlWriter = ddlWriter
s.dmlWriter = dmlWriter
s.metric = newRedoSinkMetrics(changefeedID)
return s
}

Expand All @@ -114,51 +114,49 @@ func (s *Sink) Run(ctx context.Context) error {
func (s *Sink) WriteBlockEvent(event commonEvent.BlockEvent) error {
switch e := event.(type) {
case *commonEvent.DDLEvent:
err := s.statistics.RecordDDLExecution(func() (string, error) {
ddlType := e.GetDDLType().String()
return ddlType, s.ddlWriter.WriteEvents(s.ctx, e)
})
start := time.Now()
err := s.ddlWriter.WriteEvents(s.ctx, e)
if err != nil {
s.isNormal.Store(false)
return errors.Trace(err)
}
if s.metric != nil {
s.metric.observeDDLWrite(time.Since(start))
}
}
return nil
}

func (s *Sink) AddDMLEvent(event *commonEvent.DMLEvent) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

By removing statistics.RecordBatchExecution, we've lost valuable batch-level metrics such as ExecBatchHistogram (batch size in rows) and ExecBatchWriteBytesHistogram (batch size in bytes). While new per-row metrics are added, these batch metrics are important for monitoring and performance tuning. Was this intentional? If not, please consider reintroducing them or providing an equivalent.

_ = s.statistics.RecordBatchExecution(func() (int, int64, error) {
toRowCallback := func(postTxnFlushed []func(), totalCount uint64) func() {
var calledCount atomic.Uint64
// The callback of the last row will trigger the callback of the txn.
return func() {
if calledCount.Inc() == totalCount {
for _, callback := range postTxnFlushed {
callback()
}
toRowCallback := func(postTxnFlushed []func(), totalCount uint64) func() {
var calledCount atomic.Uint64
// The callback of the last row will trigger the callback of the txn.
return func() {
if calledCount.Inc() == totalCount {
for _, callback := range postTxnFlushed {
callback()
}
}
}
rowsCount := uint64(event.Len())
rowCallback := toRowCallback(event.PostTxnFlushed, rowsCount)

for {
row, ok := event.GetNextRow()
if !ok {
event.Rewind()
break
}
s.logBuffer.Push(&commonEvent.RedoRowEvent{
StartTs: event.StartTs,
CommitTs: event.CommitTs,
Event: row,
PhysicalTableID: event.PhysicalTableID,
TableInfo: event.TableInfo,
Callback: rowCallback,
})
}
rowsCount := uint64(event.Len())
rowCallback := toRowCallback(event.PostTxnFlushed, rowsCount)

for {
row, ok := event.GetNextRow()
if !ok {
event.Rewind()
break
}
return int(event.Len()), event.GetSize(), nil
})
s.logBuffer.Push(&commonEvent.RedoRowEvent{
StartTs: event.StartTs,
CommitTs: event.CommitTs,
Event: row,
PhysicalTableID: event.PhysicalTableID,
TableInfo: event.TableInfo,
Callback: rowCallback,
})
}
}

func (s *Sink) IsNormal() bool {
Expand Down Expand Up @@ -194,8 +192,8 @@ func (s *Sink) Close(_ bool) {
zap.Error(err))
}
}
if s.statistics != nil {
s.statistics.Close()
if s.metric != nil {
s.metric.close()
}
log.Info("redo sink closed",
zap.String("keyspace", s.cfg.ChangeFeedID.Keyspace()),
Expand All @@ -204,16 +202,19 @@ func (s *Sink) Close(_ bool) {

func (s *Sink) sendMessages(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
if e, ok := s.logBuffer.Get(); ok {
err := s.dmlWriter.WriteEvents(ctx, e)
if err != nil {
return err
}
}
e, ok := s.logBuffer.Get()
if !ok {
return nil
}

start := time.Now()
err := s.dmlWriter.WriteEvents(ctx, e)
if err != nil {
return err
}

if s.metric != nil {
s.metric.observeRowWrite(1, time.Since(start))
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions metrics/grafana/ticdc_new_arch.json
Original file line number Diff line number Diff line change
Expand Up @@ -22311,7 +22311,7 @@
"targets": [
{
"exemplar": true,
"expr": "max(rate(ticdc_redo_flushall_duration_seconds_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\"}[1m])) by (le)",
"expr": "max(rate(ticdc_redo_flush_all_duration_seconds_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\"}[1m])) by (le)",
"format": "heatmap",
"interval": "",
"intervalFactor": 2,
Expand Down Expand Up @@ -22751,7 +22751,7 @@
"targets": [
{
"exemplar": true,
"expr": "sum(rate(ticdc_redo_worker_busy_ratio{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\",changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\"}[1m])/10) by (changefeed,instance)",
"expr": "sum(rate(ticdc_redo_worker_busy_ratio{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\",changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\"}[1m])*100) by (changefeed,instance)",
"interval": "",
"legendFormat": "{{changefeed}}-{{instance}}",
"queryType": "randomWalk",
Expand Down Expand Up @@ -25515,4 +25515,4 @@
"title": "${DS_TEST-CLUSTER}-TiCDC-New-Arch",
"uid": "YiGL8hBZ0aac",
"version": 38
}
}
6 changes: 3 additions & 3 deletions metrics/nextgengrafana/ticdc_new_arch_next_gen.json
Original file line number Diff line number Diff line change
Expand Up @@ -22311,7 +22311,7 @@
"targets": [
{
"exemplar": true,
"expr": "max(rate(ticdc_redo_flushall_duration_seconds_bucket{k8s_cluster=\"$k8s_cluster\", sharedpool_id=\"$tidb_cluster\", changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\"}[1m])) by (le)",
"expr": "max(rate(ticdc_redo_flush_all_duration_seconds_bucket{k8s_cluster=\"$k8s_cluster\", sharedpool_id=\"$tidb_cluster\", changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\"}[1m])) by (le)",
"format": "heatmap",
"interval": "",
"intervalFactor": 2,
Expand Down Expand Up @@ -22751,7 +22751,7 @@
"targets": [
{
"exemplar": true,
"expr": "sum(rate(ticdc_redo_worker_busy_ratio{k8s_cluster=\"$k8s_cluster\",sharedpool_id=\"$tidb_cluster\",changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\"}[1m])/10) by (changefeed,instance)",
"expr": "sum(rate(ticdc_redo_worker_busy_ratio{k8s_cluster=\"$k8s_cluster\",sharedpool_id=\"$tidb_cluster\",changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\"}[1m])*100) by (changefeed,instance)",
"interval": "",
"legendFormat": "{{changefeed}}-{{instance}}",
"queryType": "randomWalk",
Expand Down Expand Up @@ -25515,4 +25515,4 @@
"title": "${DS_TEST-CLUSTER}-TiCDC-New-Arch",
"uid": "YiGL8hBZ0aac",
"version": 38
}
}
29 changes: 29 additions & 0 deletions pkg/metrics/redo.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,23 @@ var (
Buckets: prometheus.ExponentialBuckets(0.001, 2.0, 16),
}, []string{getKeyspaceLabel(), "changefeed", "type"})

// RedoTotalRowsCountGauge records the total number of rows written to redo log.
RedoTotalRowsCountGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "total_rows_count",
Help: "The total count of rows that are processed by redo writer",
}, []string{getKeyspaceLabel(), "changefeed", "type"})

// RedoWriteLogDurationHistogram records the latency distributions of writeLog.
RedoWriteLogDurationHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "write_log_duration_seconds",
Help: "The latency distributions of writeLog called by redo sink",
Buckets: prometheus.ExponentialBuckets(0.001, 2.0, 16),
}, []string{getKeyspaceLabel(), "changefeed", "type"})

// RedoFlushLogDurationHistogram records the latency distributions of flushLog.
RedoFlushLogDurationHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Expand All @@ -58,11 +75,23 @@ var (
Help: "The latency distributions of flushLog called by redo sink",
Buckets: prometheus.ExponentialBuckets(0.001, 2.0, 16),
}, []string{getKeyspaceLabel(), "changefeed", "type"})

// RedoWorkerBusyRatio records the busy ratio of redo sink worker.
RedoWorkerBusyRatio = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "worker_busy_ratio",
Help: "Busy ratio for redo sink worker.",
}, []string{getKeyspaceLabel(), "changefeed", "type"})
)

func initRedoMetrics(registry *prometheus.Registry) {
registry.MustRegister(RedoFsyncDurationHistogram)
registry.MustRegister(RedoTotalRowsCountGauge)
registry.MustRegister(RedoWriteBytesGauge)
registry.MustRegister(RedoFlushAllDurationHistogram)
registry.MustRegister(RedoWriteLogDurationHistogram)
registry.MustRegister(RedoFlushLogDurationHistogram)
registry.MustRegister(RedoWorkerBusyRatio)
}
Loading