diff --git a/downstreamadapter/sink/redo/metrics.go b/downstreamadapter/sink/redo/metrics.go new file mode 100644 index 0000000000..1afb47ab05 --- /dev/null +++ b/downstreamadapter/sink/redo/metrics.go @@ -0,0 +1,81 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package redo + +import ( + "time" + + "github.com/pingcap/ticdc/pkg/common" + "github.com/pingcap/ticdc/pkg/metrics" + "github.com/pingcap/ticdc/pkg/redo" + "github.com/prometheus/client_golang/prometheus" +) + +type redoSinkMetrics struct { + changefeedID common.ChangeFeedID + + rowWriteLogDuration prometheus.Observer + ddlWriteLogDuration prometheus.Observer + + rowTotalCount prometheus.Counter + ddlTotalCount prometheus.Counter + + rowWorkerBusyRatio prometheus.Counter + ddlWorkerBusyRatio prometheus.Counter +} + +func newRedoSinkMetrics(changefeedID common.ChangeFeedID) *redoSinkMetrics { + keyspace := changefeedID.Keyspace() + changefeed := changefeedID.Name() + return &redoSinkMetrics{ + changefeedID: changefeedID, + rowWriteLogDuration: metrics.RedoWriteLogDurationHistogram. + WithLabelValues(keyspace, changefeed, redo.RedoRowLogFileType), + ddlWriteLogDuration: metrics.RedoWriteLogDurationHistogram. + WithLabelValues(keyspace, changefeed, redo.RedoDDLLogFileType), + rowTotalCount: metrics.RedoTotalRowsCountGauge. + WithLabelValues(keyspace, changefeed, redo.RedoRowLogFileType), + ddlTotalCount: metrics.RedoTotalRowsCountGauge. + WithLabelValues(keyspace, changefeed, redo.RedoDDLLogFileType), + rowWorkerBusyRatio: metrics.RedoWorkerBusyRatio. + WithLabelValues(keyspace, changefeed, redo.RedoRowLogFileType), + ddlWorkerBusyRatio: metrics.RedoWorkerBusyRatio. + WithLabelValues(keyspace, changefeed, redo.RedoDDLLogFileType), + } +} + +func (m *redoSinkMetrics) observeRowWrite(rows int, duration time.Duration) { + if rows > 0 { + m.rowTotalCount.Add(float64(rows)) + } + m.rowWriteLogDuration.Observe(duration.Seconds()) + m.rowWorkerBusyRatio.Add(duration.Seconds()) +} + +func (m *redoSinkMetrics) observeDDLWrite(duration time.Duration) { + m.ddlTotalCount.Inc() + m.ddlWriteLogDuration.Observe(duration.Seconds()) + m.ddlWorkerBusyRatio.Add(duration.Seconds()) +} + +func (m *redoSinkMetrics) close() { + keyspace := m.changefeedID.Keyspace() + changefeed := m.changefeedID.Name() + +for _, logType := range []string{redo.RedoRowLogFileType, redo.RedoDDLLogFileType} { + metrics.RedoWriteLogDurationHistogram.DeleteLabelValues(keyspace, changefeed, logType) + metrics.RedoTotalRowsCountGauge.DeleteLabelValues(keyspace, changefeed, logType) + metrics.RedoWorkerBusyRatio.DeleteLabelValues(keyspace, changefeed, logType) +} +} diff --git a/downstreamadapter/sink/redo/sink.go b/downstreamadapter/sink/redo/sink.go index 70ce2c12a2..fe01ca46ab 100644 --- a/downstreamadapter/sink/redo/sink.go +++ b/downstreamadapter/sink/redo/sink.go @@ -22,7 +22,6 @@ import ( commonEvent "github.com/pingcap/ticdc/pkg/common/event" "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/errors" - "github.com/pingcap/ticdc/pkg/metrics" "github.com/pingcap/ticdc/pkg/redo" "github.com/pingcap/ticdc/pkg/redo/writer" "github.com/pingcap/ticdc/pkg/redo/writer/factory" @@ -44,9 +43,10 @@ type Sink struct { logBuffer *chann.UnlimitedChannel[writer.RedoEvent, any] // isNormal indicate whether the sink is in the normal state. - isNormal *atomic.Bool - isClosed *atomic.Bool - statistics *metrics.Statistics + isNormal *atomic.Bool + isClosed *atomic.Bool + + metric *redoSinkMetrics } func Verify(ctx context.Context, changefeedID common.ChangeFeedID, cfg *config.ConsistentConfig) error { @@ -68,10 +68,9 @@ func New(ctx context.Context, changefeedID common.ChangeFeedID, ChangeFeedID: changefeedID, MaxLogSizeInBytes: util.GetOrZero(cfg.MaxLogSize) * redo.Megabyte, }, - logBuffer: chann.NewUnlimitedChannelDefault[writer.RedoEvent](), - isNormal: atomic.NewBool(true), - isClosed: atomic.NewBool(false), - statistics: metrics.NewStatistics(changefeedID, "redo"), + logBuffer: chann.NewUnlimitedChannelDefault[writer.RedoEvent](), + isNormal: atomic.NewBool(true), + isClosed: atomic.NewBool(false), } start := time.Now() ddlWriter, err := factory.NewRedoLogWriter(s.ctx, s.cfg, redo.RedoDDLLogFileType) @@ -94,6 +93,7 @@ func New(ctx context.Context, changefeedID common.ChangeFeedID, } s.ddlWriter = ddlWriter s.dmlWriter = dmlWriter + s.metric = newRedoSinkMetrics(changefeedID) return s } @@ -114,51 +114,49 @@ func (s *Sink) Run(ctx context.Context) error { func (s *Sink) WriteBlockEvent(event commonEvent.BlockEvent) error { switch e := event.(type) { case *commonEvent.DDLEvent: - err := s.statistics.RecordDDLExecution(func() (string, error) { - ddlType := e.GetDDLType().String() - return ddlType, s.ddlWriter.WriteEvents(s.ctx, e) - }) + start := time.Now() + err := s.ddlWriter.WriteEvents(s.ctx, e) if err != nil { s.isNormal.Store(false) return errors.Trace(err) } + if s.metric != nil { + s.metric.observeDDLWrite(time.Since(start)) + } } return nil } func (s *Sink) AddDMLEvent(event *commonEvent.DMLEvent) { - _ = s.statistics.RecordBatchExecution(func() (int, int64, error) { - toRowCallback := func(postTxnFlushed []func(), totalCount uint64) func() { - var calledCount atomic.Uint64 - // The callback of the last row will trigger the callback of the txn. - return func() { - if calledCount.Inc() == totalCount { - for _, callback := range postTxnFlushed { - callback() - } + toRowCallback := func(postTxnFlushed []func(), totalCount uint64) func() { + var calledCount atomic.Uint64 + // The callback of the last row will trigger the callback of the txn. + return func() { + if calledCount.Inc() == totalCount { + for _, callback := range postTxnFlushed { + callback() } } } - rowsCount := uint64(event.Len()) - rowCallback := toRowCallback(event.PostTxnFlushed, rowsCount) - - for { - row, ok := event.GetNextRow() - if !ok { - event.Rewind() - break - } - s.logBuffer.Push(&commonEvent.RedoRowEvent{ - StartTs: event.StartTs, - CommitTs: event.CommitTs, - Event: row, - PhysicalTableID: event.PhysicalTableID, - TableInfo: event.TableInfo, - Callback: rowCallback, - }) + } + rowsCount := uint64(event.Len()) + rowCallback := toRowCallback(event.PostTxnFlushed, rowsCount) + + for { + row, ok := event.GetNextRow() + if !ok { + event.Rewind() + break } - return int(event.Len()), event.GetSize(), nil - }) + s.logBuffer.Push(&commonEvent.RedoRowEvent{ + StartTs: event.StartTs, + CommitTs: event.CommitTs, + Event: row, + PhysicalTableID: event.PhysicalTableID, + TableInfo: event.TableInfo, + Callback: rowCallback, + }) + } } func (s *Sink) IsNormal() bool { @@ -194,8 +192,8 @@ func (s *Sink) Close(_ bool) { zap.Error(err)) } } - if s.statistics != nil { - s.statistics.Close() + if s.metric != nil { + s.metric.close() } log.Info("redo sink closed", zap.String("keyspace", s.cfg.ChangeFeedID.Keyspace()), @@ -204,16 +202,19 @@ func (s *Sink) Close(_ bool) { func (s *Sink) sendMessages(ctx context.Context) error { for { - select { - case <-ctx.Done(): - return ctx.Err() - default: - if e, ok := s.logBuffer.Get(); ok { - err := s.dmlWriter.WriteEvents(ctx, e) - if err != nil { - return err - } - } + e, ok := s.logBuffer.Get() + if !ok { + return nil + } + + start := time.Now() + err := s.dmlWriter.WriteEvents(ctx, e) + if err != nil { + return err + } + + if s.metric != nil { + s.metric.observeRowWrite(1, time.Since(start)) } } } diff --git a/metrics/grafana/ticdc_new_arch.json b/metrics/grafana/ticdc_new_arch.json index 41412f8a31..922bca1eb7 100644 --- a/metrics/grafana/ticdc_new_arch.json +++ b/metrics/grafana/ticdc_new_arch.json @@ -22311,7 +22311,7 @@ "targets": [ { "exemplar": true, - "expr": "max(rate(ticdc_redo_flushall_duration_seconds_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\"}[1m])) by (le)", + "expr": "max(rate(ticdc_redo_flush_all_duration_seconds_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\"}[1m])) by (le)", "format": "heatmap", "interval": "", "intervalFactor": 2, @@ -22751,7 +22751,7 @@ "targets": [ { "exemplar": true, - "expr": "sum(rate(ticdc_redo_worker_busy_ratio{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\",changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\"}[1m])/10) by (changefeed,instance)", + "expr": "sum(rate(ticdc_redo_worker_busy_ratio{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\",changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\"}[1m])*100) by (changefeed,instance)", "interval": "", "legendFormat": "{{changefeed}}-{{instance}}", "queryType": "randomWalk", @@ -25515,4 +25515,4 @@ "title": "${DS_TEST-CLUSTER}-TiCDC-New-Arch", "uid": "YiGL8hBZ0aac", "version": 38 -} \ No newline at end of file +} diff --git a/metrics/nextgengrafana/ticdc_new_arch_next_gen.json b/metrics/nextgengrafana/ticdc_new_arch_next_gen.json index 6c342519c1..ba06b0c674 100644 --- a/metrics/nextgengrafana/ticdc_new_arch_next_gen.json +++ b/metrics/nextgengrafana/ticdc_new_arch_next_gen.json @@ -22311,7 +22311,7 @@ "targets": [ { "exemplar": true, - "expr": "max(rate(ticdc_redo_flushall_duration_seconds_bucket{k8s_cluster=\"$k8s_cluster\", sharedpool_id=\"$tidb_cluster\", changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\"}[1m])) by (le)", + "expr": "max(rate(ticdc_redo_flush_all_duration_seconds_bucket{k8s_cluster=\"$k8s_cluster\", sharedpool_id=\"$tidb_cluster\", changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\"}[1m])) by (le)", "format": "heatmap", "interval": "", "intervalFactor": 2, @@ -22751,7 +22751,7 @@ "targets": [ { "exemplar": true, - "expr": "sum(rate(ticdc_redo_worker_busy_ratio{k8s_cluster=\"$k8s_cluster\",sharedpool_id=\"$tidb_cluster\",changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\"}[1m])/10) by (changefeed,instance)", + "expr": "sum(rate(ticdc_redo_worker_busy_ratio{k8s_cluster=\"$k8s_cluster\",sharedpool_id=\"$tidb_cluster\",changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\"}[1m])*100) by (changefeed,instance)", "interval": "", "legendFormat": "{{changefeed}}-{{instance}}", "queryType": "randomWalk", @@ -25515,4 +25515,4 @@ "title": "${DS_TEST-CLUSTER}-TiCDC-New-Arch", "uid": "YiGL8hBZ0aac", "version": 38 -} \ No newline at end of file +} diff --git a/pkg/metrics/redo.go b/pkg/metrics/redo.go index a8e33851fa..48042ef461 100644 --- a/pkg/metrics/redo.go +++ b/pkg/metrics/redo.go @@ -50,6 +50,23 @@ var ( Buckets: prometheus.ExponentialBuckets(0.001, 2.0, 16), }, []string{getKeyspaceLabel(), "changefeed", "type"}) + // RedoTotalRowsCountGauge records the total number of rows written to redo log. + RedoTotalRowsCountGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "total_rows_count", + Help: "The total count of rows that are processed by redo writer", + }, []string{getKeyspaceLabel(), "changefeed", "type"}) + + // RedoWriteLogDurationHistogram records the latency distributions of writeLog. + RedoWriteLogDurationHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "write_log_duration_seconds", + Help: "The latency distributions of writeLog called by redo sink", + Buckets: prometheus.ExponentialBuckets(0.001, 2.0, 16), + }, []string{getKeyspaceLabel(), "changefeed", "type"}) + // RedoFlushLogDurationHistogram records the latency distributions of flushLog. RedoFlushLogDurationHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: namespace, @@ -58,11 +75,23 @@ var ( Help: "The latency distributions of flushLog called by redo sink", Buckets: prometheus.ExponentialBuckets(0.001, 2.0, 16), }, []string{getKeyspaceLabel(), "changefeed", "type"}) + + // RedoWorkerBusyRatio records the busy ratio of redo sink worker. + RedoWorkerBusyRatio = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "worker_busy_ratio", + Help: "Busy ratio for redo sink worker.", + }, []string{getKeyspaceLabel(), "changefeed", "type"}) ) func initRedoMetrics(registry *prometheus.Registry) { registry.MustRegister(RedoFsyncDurationHistogram) + registry.MustRegister(RedoTotalRowsCountGauge) registry.MustRegister(RedoWriteBytesGauge) registry.MustRegister(RedoFlushAllDurationHistogram) + registry.MustRegister(RedoWriteLogDurationHistogram) registry.MustRegister(RedoFlushLogDurationHistogram) + registry.MustRegister(RedoWorkerBusyRatio) }