diff --git a/pkg/common/const.go b/pkg/common/const.go index d439be48ab974..c6e885e16dd8b 100644 --- a/pkg/common/const.go +++ b/pkg/common/const.go @@ -48,3 +48,42 @@ func ConvertBytesToHumanReadable(bytes int64) string { } return fmt.Sprintf("%.2f TiB", num/TiB) } + +// FormatBytes formats bytes to human-readable string using decimal units (B, KB, MB, GB, TB) +// Examples: 0 -> "0B", 1386624 -> "1.39MB", 1024 -> "1.02KB" +// This uses decimal units (1000-based) instead of 1024-based) +func FormatBytes(bytes int64) string { + if bytes == 0 { + return "0B" + } + num := float64(bytes) + if bytes < THOUSAND { + return fmt.Sprintf("%dB", bytes) + } + if bytes < MILLION { + return fmt.Sprintf("%.2fKB", num/THOUSAND) + } + if bytes < BILLION { + return fmt.Sprintf("%.2fMB", num/MILLION) + } + if bytes < TRILLION { + return fmt.Sprintf("%.2fGB", num/BILLION) + } + return fmt.Sprintf("%.2fTB", num/TRILLION) +} + +// FormatDuration formats nanoseconds to human-readable string (ms or s) +// Examples: 0 -> "0ms", 21625539 -> "21.63ms", 1000000000 -> "1.00s" +func FormatDuration(ns int64) string { + if ns == 0 { + return "0ns" + } + const ( + nanosPerMilli = 1000000 + nanosPerSec = 1000000000 + ) + if ns < nanosPerSec { + return fmt.Sprintf("%.2fms", float64(ns)/nanosPerMilli) + } + return fmt.Sprintf("%.2fs", float64(ns)/nanosPerSec) +} diff --git a/pkg/frontend/mysql_cmd_executor.go b/pkg/frontend/mysql_cmd_executor.go index dbd9631b5fbb7..3f44cf5f01448 100644 --- a/pkg/frontend/mysql_cmd_executor.go +++ b/pkg/frontend/mysql_cmd_executor.go @@ -3621,10 +3621,11 @@ func buildErrorJsonPlan(buffer *bytes.Buffer, uuid uuid.UUID, errcode uint16, ms } type jsonPlanHandler struct { - jsonBytes []byte - statsBytes statistic.StatsArray - stats motrace.Statistic - buffer *bytes.Buffer + jsonBytes []byte + statsBytes statistic.StatsArray + stats motrace.Statistic + buffer *bytes.Buffer + marshalPlan *models.ExplainData // Store ExplainData reference to generate text output } func NewJsonPlanHandler(ctx context.Context, stmt *motrace.StatementInfo, ses FeSession, plan *plan2.Plan, phyPlan *models.PhyPlan, opts ...marshalPlanOptions) *jsonPlanHandler { @@ -3632,10 +3633,11 @@ func NewJsonPlanHandler(ctx context.Context, stmt *motrace.StatementInfo, ses Fe jsonBytes := h.Marshal(ctx) statsBytes, stats := h.Stats(ctx, ses) return &jsonPlanHandler{ - jsonBytes: jsonBytes, - statsBytes: statsBytes, - stats: stats, - buffer: h.handoverBuffer(), + jsonBytes: jsonBytes, + statsBytes: statsBytes, + stats: stats, + buffer: h.handoverBuffer(), + marshalPlan: h.marshalPlan, // Store ExplainData reference } } @@ -3644,6 +3646,28 @@ func (h *jsonPlanHandler) Stats(ctx context.Context) (statistic.StatsArray, motr } func (h *jsonPlanHandler) Marshal(ctx context.Context) []byte { + // If marshalPlan is available and has physical plan, generate analyze mode text output + // Otherwise, return original JSON (for error cases or when plan is not available) + if h.marshalPlan != nil && (len(h.marshalPlan.PhyPlan.LocalScope) > 0 || len(h.marshalPlan.PhyPlan.RemoteScope) > 0) { + // Check execution time to determine output level + // Get total execution time from ExecutionDuration + totalExecTime := h.marshalPlan.NewPlanStats.ExecuteStage.ExecutionDuration + longQueryThreshold := motrace.GetLongQueryTime() + + // If execution time > (5s + longQueryThreshold) or > 3x longQueryThreshold, include Physical Plan + includePhysicalPlan := totalExecTime > (5*time.Second+longQueryThreshold) || totalExecTime > 3*longQueryThreshold + + var phyplanText string + if includePhysicalPlan { + // Generate full analyze mode text output including Physical Plan + phyplanText = models.ExplainPhyPlan(&h.marshalPlan.PhyPlan, &h.marshalPlan.NewPlanStats, models.AnalyzeOption) + } else { + // Generate only Overview section (without Physical Plan) + phyplanText = models.ExplainPhyPlanOverview(&h.marshalPlan.PhyPlan, &h.marshalPlan.NewPlanStats, models.AnalyzeOption) + } + return []byte(phyplanText) + } + // Fall back to original JSON for error cases or when physical plan is not available return h.jsonBytes } diff --git a/pkg/sql/compile/analyze_module.go b/pkg/sql/compile/analyze_module.go index b8373f0c8c1a9..3659b40295ddd 100644 --- a/pkg/sql/compile/analyze_module.go +++ b/pkg/sql/compile/analyze_module.go @@ -767,7 +767,7 @@ func explainPipeline(node vm.Operator, prefix string, isRoot bool, isTail bool, analyzeStr := "" if option.Verbose { - analyzeStr = fmt.Sprintf("(idx:%v, isFirst:%v, isLast:%v)", + analyzeStr = fmt.Sprintf("(%v,%v,%v)", node.GetOperatorBase().Idx, node.GetOperatorBase().IsFirst, node.GetOperatorBase().IsLast) diff --git a/pkg/sql/compile/analyze_module_test.go b/pkg/sql/compile/analyze_module_test.go index 2c4e830c9fde5..84a3dc9011a84 100644 --- a/pkg/sql/compile/analyze_module_test.go +++ b/pkg/sql/compile/analyze_module_test.go @@ -54,31 +54,31 @@ func Test_processPhyScope(t *testing.T) { Version: 1.0, S3IOInputCount: 0, S3IOOutputCount: 0 LOCAL SCOPES: Scope 1 (Magic: Merge, mcpu: 1, Receiver: [4]) - Pipeline: └── Output (idx:-1, isFirst:false, isLast:false) - └── Projection (idx:2, isFirst:true, isLast:true) - └── Projection (idx:1, isFirst:false, isLast:true) - └── MergeGroup (idx:1, isFirst:false, isLast:false) - └── Merge (idx:1, isFirst:false, isLast:false) + Pipeline: └── Output (-1,false,false) + └── Projection (2,true,true) + └── Projection (1,false,true) + └── MergeGroup (1,false,false) + └── Merge (1,false,false) PreScopes: { Scope 2 (Magic: Normal, mcpu: 4, Receiver: [0, 1, 2, 3]) - Pipeline: └── Connector (idx:1, isFirst:false, isLast:false) to MergeReceiver 4 - └── MergeGroup (idx:1, isFirst:false, isLast:false) - └── Merge (idx:1, isFirst:false, isLast:false) + Pipeline: └── Connector (1,false,false) to MergeReceiver 4 + └── MergeGroup (1,false,false) + └── Merge (1,false,false) PreScopes: { - Scope 3 (Magic: Normal, mcpu: 1, Receiver: []) + Scope 3 (Magic: Normal, mcpu: 1) DataSource: cloud_device.real_time_position[time_stamp distance] - Pipeline: └── Connector (idx:0, isFirst:false, isLast:false) to MergeReceiver 0 - └── Group (idx:1, isFirst:true, isLast:false) - └── Projection (idx:0, isFirst:false, isLast:true) - └── Filter (idx:0, isFirst:false, isLast:false) - └── TableScan (idx:0, isFirst:true, isLast:false) - Scope 4 (Magic: Normal, mcpu: 1, Receiver: []) + Pipeline: └── Connector (0,false,false) to MergeReceiver 0 + └── Group (1,true,false) + └── Projection (0,false,true) + └── Filter (0,false,false) + └── TableScan (0,true,false) + Scope 4 (Magic: Normal, mcpu: 1) DataSource: cloud_device.real_time_position[time_stamp distance] - Pipeline: └── Connector (idx:0, isFirst:false, isLast:false) to MergeReceiver 1 - └── Group (idx:1, isFirst:true, isLast:false) - └── Projection (idx:0, isFirst:false, isLast:true) - └── Filter (idx:0, isFirst:false, isLast:false) - └── TableScan (idx:0, isFirst:true, isLast:false) + Pipeline: └── Connector (0,false,false) to MergeReceiver 1 + └── Group (1,true,false) + └── Projection (0,false,true) + └── Filter (0,false,false) + └── TableScan (0,true,false) } } */ diff --git a/pkg/sql/compile/debugTools.go b/pkg/sql/compile/debugTools.go index 9624f05adb585..991b6c97304b1 100644 --- a/pkg/sql/compile/debugTools.go +++ b/pkg/sql/compile/debugTools.go @@ -213,7 +213,7 @@ func ShowPipelineTree( analyzeStr := "" if level == VerboseLevel || level == AnalyzeLevel { - analyzeStr = fmt.Sprintf("(idx:%v, isFirst:%v, isLast:%v)", + analyzeStr = fmt.Sprintf("(%v,%v,%v)", node.GetOperatorBase().Idx, node.GetOperatorBase().IsFirst, node.GetOperatorBase().IsLast) diff --git a/pkg/sql/models/show_phyplan.go b/pkg/sql/models/show_phyplan.go index 91167131281dd..6e2e20aa13ecb 100644 --- a/pkg/sql/models/show_phyplan.go +++ b/pkg/sql/models/show_phyplan.go @@ -19,6 +19,7 @@ import ( "fmt" "strings" + "github.com/matrixorigin/matrixone/pkg/common" "github.com/matrixorigin/matrixone/pkg/objectio" "github.com/matrixorigin/matrixone/pkg/util/trace/impl/motrace/statistic" "github.com/matrixorigin/matrixone/pkg/vm" @@ -68,13 +69,27 @@ func explainResourceOverview(phy *PhyPlan, statsInfo *statistic.StatsInfo, optio if option == VerboseOption || option == AnalyzeOption { gblStats := ExtractPhyPlanGlbStats(phy) buffer.WriteString("Overview:\n") - buffer.WriteString(fmt.Sprintf("\tMemoryUsage:%dB, SpillSize:%dB, DiskI/O:%dB, NewWorkI/O:%dB, RetryTime: %v", - gblStats.MemorySize, - gblStats.SpillSize, - gblStats.DiskIOSize, - gblStats.NetWorkSize, - phy.RetryTime, - )) + // Format MemoryUsage, SpillSize, DiskI/O, NewWorkI/O, only include non-zero values + overviewParts := []string{} + if gblStats.MemorySize > 0 { + overviewParts = append(overviewParts, fmt.Sprintf("MemoryUsage:%s", common.FormatBytes(gblStats.MemorySize))) + } + if gblStats.SpillSize > 0 { + overviewParts = append(overviewParts, fmt.Sprintf("SpillSize:%s", common.FormatBytes(gblStats.SpillSize))) + } + if gblStats.DiskIOSize > 0 { + overviewParts = append(overviewParts, fmt.Sprintf("DiskI/O:%s", common.FormatBytes(gblStats.DiskIOSize))) + } + if gblStats.NetWorkSize > 0 { + overviewParts = append(overviewParts, fmt.Sprintf("NewWorkI/O:%s", common.FormatBytes(gblStats.NetWorkSize))) + } + // Always include RetryTime + overviewStr := strings.Join(overviewParts, ", ") + if len(overviewStr) > 0 { + buffer.WriteString(fmt.Sprintf("\t%s, RetryTime: %v", overviewStr, phy.RetryTime)) + } else { + buffer.WriteString(fmt.Sprintf("\tRetryTime: %v", phy.RetryTime)) + } if statsInfo != nil { buffer.WriteString("\n") @@ -82,9 +97,33 @@ func explainResourceOverview(phy *PhyPlan, statsInfo *statistic.StatsInfo, optio list, head, put, get, delete, deleteMul, writtenRows, deletedRows := CalcTotalS3Requests(gblStats, statsInfo) s3InputEstByRows := objectio.EstimateS3Input(writtenRows) - buffer.WriteString(fmt.Sprintf("\tS3List:%d, S3Head:%d, S3Put:%d, S3Get:%d, S3Delete:%d, S3DeleteMul:%d, S3InputEstByRows((%d+%d)/8192):%.4f \n", - list, head, put, get, delete, deleteMul, writtenRows, deletedRows, s3InputEstByRows, - )) + // Format S3 stats, only include non-zero values + s3OverviewParts := []string{} + if list > 0 { + s3OverviewParts = append(s3OverviewParts, fmt.Sprintf("S3List:%d", list)) + } + if head > 0 { + s3OverviewParts = append(s3OverviewParts, fmt.Sprintf("S3Head:%d", head)) + } + if put > 0 { + s3OverviewParts = append(s3OverviewParts, fmt.Sprintf("S3Put:%d", put)) + } + if get > 0 { + s3OverviewParts = append(s3OverviewParts, fmt.Sprintf("S3Get:%d", get)) + } + if delete > 0 { + s3OverviewParts = append(s3OverviewParts, fmt.Sprintf("S3Delete:%d", delete)) + } + if deleteMul > 0 { + s3OverviewParts = append(s3OverviewParts, fmt.Sprintf("S3DeleteMul:%d", deleteMul)) + } + // Always include S3InputEstByRows if writtenRows or deletedRows > 0, or if there are S3 requests + if len(s3OverviewParts) > 0 || writtenRows > 0 || deletedRows > 0 { + if len(s3OverviewParts) > 0 { + buffer.WriteString(fmt.Sprintf("\t%s, ", strings.Join(s3OverviewParts, ", "))) + } + buffer.WriteString(fmt.Sprintf("S3InputEstByRows((%d+%d)/8192):%.4f \n", writtenRows, deletedRows, s3InputEstByRows)) + } cpuTimeVal := gblStats.OperatorTimeConsumed + int64(statsInfo.ParseStage.ParseDuration+statsInfo.PlanStage.PlanDuration+statsInfo.CompileStage.CompileDuration) + @@ -93,91 +132,189 @@ func explainResourceOverview(phy *PhyPlan, statsInfo *statistic.StatsInfo, optio (statsInfo.IOAccessTimeConsumption + statsInfo.S3FSPrefetchFileIOMergerTimeConsumption) buffer.WriteString("\tCPU Usage: \n") - buffer.WriteString(fmt.Sprintf("\t\t- Total CPU Time: %dns \n", cpuTimeVal)) - buffer.WriteString(fmt.Sprintf("\t\t- CPU Time Detail: Parse(%d)+BuildPlan(%d)+Compile(%d)+PhyExec(%d)+PrepareRun(%d)-PreRunWaitLock(%d)-PlanStatsIO(%d)-IOAccess(%d)-IOMerge(%d)\n", - statsInfo.ParseStage.ParseDuration, - statsInfo.PlanStage.PlanDuration, - statsInfo.CompileStage.CompileDuration, - gblStats.OperatorTimeConsumed, - gblStats.ScopePrepareTimeConsumed+statsInfo.PrepareRunStage.CompilePreRunOnceDuration, - statsInfo.PrepareRunStage.CompilePreRunOnceWaitLock, - statsInfo.PlanStage.BuildPlanStatsIOConsumption, - statsInfo.IOAccessTimeConsumption, - statsInfo.S3FSPrefetchFileIOMergerTimeConsumption)) + buffer.WriteString(fmt.Sprintf("\t\t- Total CPU Time: %s \n", common.FormatDuration(cpuTimeVal))) + buffer.WriteString(fmt.Sprintf("\t\t- CPU Time Detail: Parse(%s)+BuildPlan(%s)+Compile(%s)+PhyExec(%s)+PrepareRun(%s)-PreRunWaitLock(%s)-PlanStatsIO(%s)-IOAccess(%s)-IOMerge(%s)\n", + common.FormatDuration(int64(statsInfo.ParseStage.ParseDuration)), + common.FormatDuration(int64(statsInfo.PlanStage.PlanDuration)), + common.FormatDuration(int64(statsInfo.CompileStage.CompileDuration)), + common.FormatDuration(gblStats.OperatorTimeConsumed), + common.FormatDuration(gblStats.ScopePrepareTimeConsumed+statsInfo.PrepareRunStage.CompilePreRunOnceDuration), + common.FormatDuration(int64(statsInfo.PrepareRunStage.CompilePreRunOnceWaitLock)), + common.FormatDuration(int64(statsInfo.PlanStage.BuildPlanStatsIOConsumption)), + common.FormatDuration(int64(statsInfo.IOAccessTimeConsumption)), + common.FormatDuration(int64(statsInfo.S3FSPrefetchFileIOMergerTimeConsumption)))) buffer.WriteString(fmt.Sprintf("\t\t- Permission Authentication Stats Array: %v \n", statsInfo.PermissionAuth)) //------------------------------------------------------------------------------------------------------- if option == AnalyzeOption { buffer.WriteString("\tQuery Build Plan Stage:\n") - buffer.WriteString(fmt.Sprintf("\t\t- CPU Time: %dns \n", int64(statsInfo.PlanStage.PlanDuration)-statsInfo.PlanStage.BuildPlanStatsIOConsumption)) - buffer.WriteString(fmt.Sprintf("\t\t- S3List:%d, S3Head:%d, S3Put:%d, S3Get:%d, S3Delete:%d, S3DeleteMul:%d\n", - statsInfo.PlanStage.BuildPlanS3Request.List, - statsInfo.PlanStage.BuildPlanS3Request.Head, - statsInfo.PlanStage.BuildPlanS3Request.Put, - statsInfo.PlanStage.BuildPlanS3Request.Get, - statsInfo.PlanStage.BuildPlanS3Request.Delete, - statsInfo.PlanStage.BuildPlanS3Request.DeleteMul, - )) - buffer.WriteString(fmt.Sprintf("\t\t- Build Plan Duration: %dns \n", int64(statsInfo.PlanStage.PlanDuration))) - buffer.WriteString(fmt.Sprintf("\t\t- Call Stats Duration: %dns \n", statsInfo.PlanStage.BuildPlanStatsDuration)) - buffer.WriteString(fmt.Sprintf("\t\t- Call StatsInCache Duration: %dns \n", statsInfo.PlanStage.BuildPlanStatsInCacheDuration)) - buffer.WriteString(fmt.Sprintf("\t\t- Call Stats IO Consumption: %dns \n", statsInfo.PlanStage.BuildPlanStatsIOConsumption)) - buffer.WriteString(fmt.Sprintf("\t\t- Call Stats S3List:%d, S3Head:%d, S3Put:%d, S3Get:%d, S3Delete:%d, S3DeleteMul:%d\n", - statsInfo.PlanStage.BuildPlanStatsS3.List, - statsInfo.PlanStage.BuildPlanStatsS3.Head, - statsInfo.PlanStage.BuildPlanStatsS3.Put, - statsInfo.PlanStage.BuildPlanStatsS3.Get, - statsInfo.PlanStage.BuildPlanStatsS3.Delete, - statsInfo.PlanStage.BuildPlanStatsS3.DeleteMul, - )) + buffer.WriteString(fmt.Sprintf("\t\t- CPU Time: %s \n", common.FormatDuration(int64(statsInfo.PlanStage.PlanDuration)-statsInfo.PlanStage.BuildPlanStatsIOConsumption))) + + // Format S3 request stats, only include non-zero values + s3ReqParts := []string{} + if statsInfo.PlanStage.BuildPlanS3Request.List > 0 { + s3ReqParts = append(s3ReqParts, fmt.Sprintf("S3List:%d", statsInfo.PlanStage.BuildPlanS3Request.List)) + } + if statsInfo.PlanStage.BuildPlanS3Request.Head > 0 { + s3ReqParts = append(s3ReqParts, fmt.Sprintf("S3Head:%d", statsInfo.PlanStage.BuildPlanS3Request.Head)) + } + if statsInfo.PlanStage.BuildPlanS3Request.Put > 0 { + s3ReqParts = append(s3ReqParts, fmt.Sprintf("S3Put:%d", statsInfo.PlanStage.BuildPlanS3Request.Put)) + } + if statsInfo.PlanStage.BuildPlanS3Request.Get > 0 { + s3ReqParts = append(s3ReqParts, fmt.Sprintf("S3Get:%d", statsInfo.PlanStage.BuildPlanS3Request.Get)) + } + if statsInfo.PlanStage.BuildPlanS3Request.Delete > 0 { + s3ReqParts = append(s3ReqParts, fmt.Sprintf("S3Delete:%d", statsInfo.PlanStage.BuildPlanS3Request.Delete)) + } + if statsInfo.PlanStage.BuildPlanS3Request.DeleteMul > 0 { + s3ReqParts = append(s3ReqParts, fmt.Sprintf("S3DeleteMul:%d", statsInfo.PlanStage.BuildPlanS3Request.DeleteMul)) + } + // Only print S3 request line if at least one value is non-zero + if len(s3ReqParts) > 0 { + buffer.WriteString(fmt.Sprintf("\t\t- %s\n", strings.Join(s3ReqParts, ", "))) + } + + buffer.WriteString(fmt.Sprintf("\t\t- Build Plan Duration: %s \n", common.FormatDuration(int64(statsInfo.PlanStage.PlanDuration)))) + buffer.WriteString(fmt.Sprintf("\t\t- Call Stats Duration: %s \n", common.FormatDuration(int64(statsInfo.PlanStage.BuildPlanStatsDuration)))) + buffer.WriteString(fmt.Sprintf("\t\t- Call StatsInCache Duration: %s \n", common.FormatDuration(int64(statsInfo.PlanStage.BuildPlanStatsInCacheDuration)))) + buffer.WriteString(fmt.Sprintf("\t\t- Call Stats IO Consumption: %s \n", common.FormatDuration(int64(statsInfo.PlanStage.BuildPlanStatsIOConsumption)))) + + // Format Call Stats S3, only include non-zero values + s3StatsParts := []string{} + if statsInfo.PlanStage.BuildPlanStatsS3.List > 0 { + s3StatsParts = append(s3StatsParts, fmt.Sprintf("S3List:%d", statsInfo.PlanStage.BuildPlanStatsS3.List)) + } + if statsInfo.PlanStage.BuildPlanStatsS3.Head > 0 { + s3StatsParts = append(s3StatsParts, fmt.Sprintf("S3Head:%d", statsInfo.PlanStage.BuildPlanStatsS3.Head)) + } + if statsInfo.PlanStage.BuildPlanStatsS3.Put > 0 { + s3StatsParts = append(s3StatsParts, fmt.Sprintf("S3Put:%d", statsInfo.PlanStage.BuildPlanStatsS3.Put)) + } + if statsInfo.PlanStage.BuildPlanStatsS3.Get > 0 { + s3StatsParts = append(s3StatsParts, fmt.Sprintf("S3Get:%d", statsInfo.PlanStage.BuildPlanStatsS3.Get)) + } + if statsInfo.PlanStage.BuildPlanStatsS3.Delete > 0 { + s3StatsParts = append(s3StatsParts, fmt.Sprintf("S3Delete:%d", statsInfo.PlanStage.BuildPlanStatsS3.Delete)) + } + if statsInfo.PlanStage.BuildPlanStatsS3.DeleteMul > 0 { + s3StatsParts = append(s3StatsParts, fmt.Sprintf("S3DeleteMul:%d", statsInfo.PlanStage.BuildPlanStatsS3.DeleteMul)) + } + // Only print Call Stats S3 line if at least one value is non-zero + if len(s3StatsParts) > 0 { + buffer.WriteString(fmt.Sprintf("\t\t- Call Stats %s\n", strings.Join(s3StatsParts, ", "))) + } //------------------------------------------------------------------------------------------------------- buffer.WriteString("\tQuery Compile Stage:\n") - buffer.WriteString(fmt.Sprintf("\t\t- CPU Time: %dns \n", statsInfo.CompileStage.CompileDuration)) - buffer.WriteString(fmt.Sprintf("\t\t- S3List:%d, S3Head:%d, S3Put:%d, S3Get:%d, S3Delete:%d, S3DeleteMul:%d\n", - statsInfo.CompileStage.CompileS3Request.List, - statsInfo.CompileStage.CompileS3Request.Head, - statsInfo.CompileStage.CompileS3Request.Put, - statsInfo.CompileStage.CompileS3Request.Get, - statsInfo.CompileStage.CompileS3Request.Delete, - statsInfo.CompileStage.CompileS3Request.DeleteMul, - )) - buffer.WriteString(fmt.Sprintf("\t\t- Compile TableScan Duration: %dns \n", statsInfo.CompileStage.CompileTableScanDuration)) + buffer.WriteString(fmt.Sprintf("\t\t- CPU Time: %s \n", common.FormatDuration(int64(statsInfo.CompileStage.CompileDuration)))) + + // Format S3 request stats, only include non-zero values + s3CompileParts := []string{} + if statsInfo.CompileStage.CompileS3Request.List > 0 { + s3CompileParts = append(s3CompileParts, fmt.Sprintf("S3List:%d", statsInfo.CompileStage.CompileS3Request.List)) + } + if statsInfo.CompileStage.CompileS3Request.Head > 0 { + s3CompileParts = append(s3CompileParts, fmt.Sprintf("S3Head:%d", statsInfo.CompileStage.CompileS3Request.Head)) + } + if statsInfo.CompileStage.CompileS3Request.Put > 0 { + s3CompileParts = append(s3CompileParts, fmt.Sprintf("S3Put:%d", statsInfo.CompileStage.CompileS3Request.Put)) + } + if statsInfo.CompileStage.CompileS3Request.Get > 0 { + s3CompileParts = append(s3CompileParts, fmt.Sprintf("S3Get:%d", statsInfo.CompileStage.CompileS3Request.Get)) + } + if statsInfo.CompileStage.CompileS3Request.Delete > 0 { + s3CompileParts = append(s3CompileParts, fmt.Sprintf("S3Delete:%d", statsInfo.CompileStage.CompileS3Request.Delete)) + } + if statsInfo.CompileStage.CompileS3Request.DeleteMul > 0 { + s3CompileParts = append(s3CompileParts, fmt.Sprintf("S3DeleteMul:%d", statsInfo.CompileStage.CompileS3Request.DeleteMul)) + } + // Only print S3 request line if at least one value is non-zero + if len(s3CompileParts) > 0 { + buffer.WriteString(fmt.Sprintf("\t\t- %s\n", strings.Join(s3CompileParts, ", "))) + } + buffer.WriteString(fmt.Sprintf("\t\t- Compile TableScan Duration: %s \n", common.FormatDuration(int64(statsInfo.CompileStage.CompileTableScanDuration)))) //------------------------------------------------------------------------------------------------------- buffer.WriteString("\tQuery Prepare Exec Stage:\n") - buffer.WriteString(fmt.Sprintf("\t\t- CPU Time: %dns \n", gblStats.ScopePrepareTimeConsumed+statsInfo.PrepareRunStage.CompilePreRunOnceDuration-statsInfo.PrepareRunStage.CompilePreRunOnceWaitLock)) - buffer.WriteString(fmt.Sprintf("\t\t- CompilePreRunOnce Duration: %dns \n", statsInfo.PrepareRunStage.CompilePreRunOnceDuration)) - buffer.WriteString(fmt.Sprintf("\t\t- PreRunOnce WaitLock: %dns \n", statsInfo.PrepareRunStage.CompilePreRunOnceWaitLock)) - buffer.WriteString(fmt.Sprintf("\t\t- ScopePrepareTimeConsumed: %dns \n", gblStats.ScopePrepareTimeConsumed)) - buffer.WriteString(fmt.Sprintf("\t\t- BuildReader Duration: %dns \n", statsInfo.PrepareRunStage.BuildReaderDuration)) - buffer.WriteString(fmt.Sprintf("\t\t- S3List:%d, S3Head:%d, S3Put:%d, S3Get:%d, S3Delete:%d, S3DeleteMul:%d\n", - statsInfo.PrepareRunStage.ScopePrepareS3Request.List, - statsInfo.PrepareRunStage.ScopePrepareS3Request.Head, - statsInfo.PrepareRunStage.ScopePrepareS3Request.Put, - statsInfo.PrepareRunStage.ScopePrepareS3Request.Get, - statsInfo.PrepareRunStage.ScopePrepareS3Request.Delete, - statsInfo.PrepareRunStage.ScopePrepareS3Request.DeleteMul, - )) + buffer.WriteString(fmt.Sprintf("\t\t- CPU Time: %s \n", common.FormatDuration(gblStats.ScopePrepareTimeConsumed+statsInfo.PrepareRunStage.CompilePreRunOnceDuration-statsInfo.PrepareRunStage.CompilePreRunOnceWaitLock))) + buffer.WriteString(fmt.Sprintf("\t\t- CompilePreRunOnce Duration: %s \n", common.FormatDuration(int64(statsInfo.PrepareRunStage.CompilePreRunOnceDuration)))) + buffer.WriteString(fmt.Sprintf("\t\t- PreRunOnce WaitLock: %s \n", common.FormatDuration(int64(statsInfo.PrepareRunStage.CompilePreRunOnceWaitLock)))) + buffer.WriteString(fmt.Sprintf("\t\t- ScopePrepareTimeConsumed: %s \n", common.FormatDuration(gblStats.ScopePrepareTimeConsumed))) + buffer.WriteString(fmt.Sprintf("\t\t- BuildReader Duration: %s \n", common.FormatDuration(int64(statsInfo.PrepareRunStage.BuildReaderDuration)))) + + // Format S3 request stats, only include non-zero values + s3PrepareParts := []string{} + if statsInfo.PrepareRunStage.ScopePrepareS3Request.List > 0 { + s3PrepareParts = append(s3PrepareParts, fmt.Sprintf("S3List:%d", statsInfo.PrepareRunStage.ScopePrepareS3Request.List)) + } + if statsInfo.PrepareRunStage.ScopePrepareS3Request.Head > 0 { + s3PrepareParts = append(s3PrepareParts, fmt.Sprintf("S3Head:%d", statsInfo.PrepareRunStage.ScopePrepareS3Request.Head)) + } + if statsInfo.PrepareRunStage.ScopePrepareS3Request.Put > 0 { + s3PrepareParts = append(s3PrepareParts, fmt.Sprintf("S3Put:%d", statsInfo.PrepareRunStage.ScopePrepareS3Request.Put)) + } + if statsInfo.PrepareRunStage.ScopePrepareS3Request.Get > 0 { + s3PrepareParts = append(s3PrepareParts, fmt.Sprintf("S3Get:%d", statsInfo.PrepareRunStage.ScopePrepareS3Request.Get)) + } + if statsInfo.PrepareRunStage.ScopePrepareS3Request.Delete > 0 { + s3PrepareParts = append(s3PrepareParts, fmt.Sprintf("S3Delete:%d", statsInfo.PrepareRunStage.ScopePrepareS3Request.Delete)) + } + if statsInfo.PrepareRunStage.ScopePrepareS3Request.DeleteMul > 0 { + s3PrepareParts = append(s3PrepareParts, fmt.Sprintf("S3DeleteMul:%d", statsInfo.PrepareRunStage.ScopePrepareS3Request.DeleteMul)) + } + // Only print S3 request line if at least one value is non-zero + if len(s3PrepareParts) > 0 { + buffer.WriteString(fmt.Sprintf("\t\t- %s\n", strings.Join(s3PrepareParts, ", "))) + } //------------------------------------------------------------------------------------------------------- buffer.WriteString("\tQuery Execution Stage:\n") - buffer.WriteString(fmt.Sprintf("\t\t- CPU Time: %dns \n", gblStats.OperatorTimeConsumed)) - buffer.WriteString(fmt.Sprintf("\t\t- S3List:%d, S3Head:%d, S3Put:%d, S3Get:%d, S3Delete:%d, S3DeleteMul:%d\n", - gblStats.S3ListRequest, - gblStats.S3HeadRequest, - gblStats.S3PutRequest, - gblStats.S3GetRequest, - gblStats.S3DeleteRequest, - gblStats.S3DeleteMultiRequest, - )) - - buffer.WriteString(fmt.Sprintf("\t\t- MemoryUsage: %dB, SpillSize: %dB, DiskI/O: %dB, NewWorkI/O:%dB\n", - gblStats.MemorySize, - gblStats.SpillSize, - gblStats.DiskIOSize, - gblStats.NetWorkSize, - )) + buffer.WriteString(fmt.Sprintf("\t\t- CPU Time: %s \n", common.FormatDuration(gblStats.OperatorTimeConsumed))) + + // Format S3 request stats, only include non-zero values + s3ExecParts := []string{} + if gblStats.S3ListRequest > 0 { + s3ExecParts = append(s3ExecParts, fmt.Sprintf("S3List:%d", gblStats.S3ListRequest)) + } + if gblStats.S3HeadRequest > 0 { + s3ExecParts = append(s3ExecParts, fmt.Sprintf("S3Head:%d", gblStats.S3HeadRequest)) + } + if gblStats.S3PutRequest > 0 { + s3ExecParts = append(s3ExecParts, fmt.Sprintf("S3Put:%d", gblStats.S3PutRequest)) + } + if gblStats.S3GetRequest > 0 { + s3ExecParts = append(s3ExecParts, fmt.Sprintf("S3Get:%d", gblStats.S3GetRequest)) + } + if gblStats.S3DeleteRequest > 0 { + s3ExecParts = append(s3ExecParts, fmt.Sprintf("S3Delete:%d", gblStats.S3DeleteRequest)) + } + if gblStats.S3DeleteMultiRequest > 0 { + s3ExecParts = append(s3ExecParts, fmt.Sprintf("S3DeleteMul:%d", gblStats.S3DeleteMultiRequest)) + } + // Only print S3 request line if at least one value is non-zero + if len(s3ExecParts) > 0 { + buffer.WriteString(fmt.Sprintf("\t\t- %s\n", strings.Join(s3ExecParts, ", "))) + } + + // Format MemoryUsage, SpillSize, DiskI/O, NewWorkI/O, only include non-zero values + resourceParts := []string{} + if gblStats.MemorySize > 0 { + resourceParts = append(resourceParts, fmt.Sprintf("MemoryUsage: %s", common.FormatBytes(gblStats.MemorySize))) + } + if gblStats.SpillSize > 0 { + resourceParts = append(resourceParts, fmt.Sprintf("SpillSize: %s", common.FormatBytes(gblStats.SpillSize))) + } + if gblStats.DiskIOSize > 0 { + resourceParts = append(resourceParts, fmt.Sprintf("DiskI/O: %s", common.FormatBytes(gblStats.DiskIOSize))) + } + if gblStats.NetWorkSize > 0 { + resourceParts = append(resourceParts, fmt.Sprintf("NewWorkI/O: %s", common.FormatBytes(gblStats.NetWorkSize))) + } + // Only print resource line if at least one value is non-zero + if len(resourceParts) > 0 { + buffer.WriteString(fmt.Sprintf("\t\t- %s\n", strings.Join(resourceParts, ", "))) + } } //------------------------------------------------------------------------------------------------------- buffer.WriteString("Physical Plan Deployment:") @@ -191,8 +328,12 @@ func explainPhyScope(scope PhyScope, index int, gap int, option ExplainOption, b gapNextLine(gap, buffer) // Scope Header - receiverStr := getReceiverStr(scope.Receiver) - buffer.WriteString(fmt.Sprintf("Scope %d (Magic: %s, mcpu: %v, Receiver: %s)", index+1, scope.Magic, scope.Mcpu, receiverStr)) + if len(scope.Receiver) > 0 { + receiverStr := getReceiverStr(scope.Receiver) + buffer.WriteString(fmt.Sprintf("Scope %d (Magic: %s, mcpu: %v, Receiver: %s)", index+1, scope.Magic, scope.Mcpu, receiverStr)) + } else { + buffer.WriteString(fmt.Sprintf("Scope %d (Magic: %s, mcpu: %v)", index+1, scope.Magic, scope.Mcpu)) + } // Scope DataSource if scope.DataSource != nil { @@ -212,6 +353,7 @@ func explainPhyScope(scope PhyScope, index int, gap int, option ExplainOption, b for i := range scope.PreScopes { explainPhyScope(scope.PreScopes[i], i, gap+4, option, buffer) } + // Always output the closing brace on a new line with proper indentation gapNextLine(gap, buffer) buffer.WriteString(" }") } @@ -229,7 +371,7 @@ func PrintPipelineTree(node *PhyOperator, prefix string, isRoot, isTail bool, op // Extract the original bool values isFirst := (node.Status & IsFirstMask) != 0 isLast := (node.Status & IsLastMask) != 0 - analyzeStr = fmt.Sprintf(" (idx:%v, isFirst:%v, isLast:%v)", node.NodeIdx, isFirst, isLast) + analyzeStr = fmt.Sprintf(" (%v,%v,%v)", node.NodeIdx, isFirst, isLast) } if option == AnalyzeOption && node.OpStats != nil { analyzeStr += node.OpStats.String() @@ -432,3 +574,123 @@ func ExtractPhyPlanGlbStats(plan *PhyPlan) GblStats { return stats } + +// ExplainPhyPlanOverview generates only the Overview section of the physical plan +func ExplainPhyPlanOverview(phy *PhyPlan, statsInfo *statistic.StatsInfo, option ExplainOption) string { + buffer := bytes.NewBuffer(make([]byte, 0, 300)) + if len(phy.LocalScope) > 0 || len(phy.RemoteScope) > 0 { + explainResourceOverview(phy, statsInfo, option, buffer) + } + return buffer.String() +} + +// ExplainPhyPlanCompressed generates a compressed version with key information but reduced verbosity +func ExplainPhyPlanCompressed(phy *PhyPlan, statsInfo *statistic.StatsInfo, option ExplainOption) string { + buffer := bytes.NewBuffer(make([]byte, 0, 1000)) + + // Always include Overview + if len(phy.LocalScope) > 0 || len(phy.RemoteScope) > 0 { + explainResourceOverview(phy, statsInfo, option, buffer) + } + + buffer.WriteString("\nPhysical Plan Deployment (Compressed):\n") + + if len(phy.LocalScope) > 0 { + buffer.WriteString("LOCAL SCOPES:\n") + for i, scope := range phy.LocalScope { + explainPhyScopeCompressed(scope, i, 0, buffer) + } + } + + if len(phy.RemoteScope) > 0 { + buffer.WriteString("REMOTE SCOPES:\n") + for i, scope := range phy.RemoteScope { + explainPhyScopeCompressed(scope, i, 0, buffer) + } + } + + return buffer.String() +} + +// explainPhyScopeCompressed generates compressed scope information with key metrics only +func explainPhyScopeCompressed(scope PhyScope, scopeIdx int, depth int, buffer *bytes.Buffer) { + indent := strings.Repeat(" ", depth) + + // Scope header with key info + buffer.WriteString(fmt.Sprintf("%sScope %d (%s, mcpu: %d", indent, scopeIdx+1, scope.Magic, scope.Mcpu)) + if len(scope.Receiver) > 0 { + buffer.WriteString(fmt.Sprintf(", Receiver: %v", scope.Receiver)) + } + buffer.WriteString(")\n") + + // DataSource if present + if scope.DataSource != nil && scope.DataSource.SchemaName != "" { + buffer.WriteString(fmt.Sprintf("%s DataSource: %s.%s%s\n", + indent, scope.DataSource.SchemaName, scope.DataSource.RelationName, + formatColumns(scope.DataSource.Attributes))) + } + + // Pipeline - only show operators with significant metrics + if scope.RootOperator != nil { + buffer.WriteString(fmt.Sprintf("%s Pipeline: ", indent)) + explainOperatorChainCompressed(scope.RootOperator, buffer) + buffer.WriteString("\n") + } + + // PreScopes - recursively but more compact + if len(scope.PreScopes) > 0 { + buffer.WriteString(fmt.Sprintf("%s PreScopes: %d scope(s)\n", indent, len(scope.PreScopes))) + for i, preScope := range scope.PreScopes { + explainPhyScopeCompressed(preScope, i, depth+2, buffer) + } + } +} + +// explainOperatorChainCompressed shows operator chain with only key metrics +func explainOperatorChainCompressed(op *PhyOperator, buffer *bytes.Buffer) { + if op == nil { + return + } + + // Show operator with key metrics only + metrics := "" + if op.OpStats != nil { + stats := op.OpStats + // Only show non-zero significant metrics + parts := []string{} + if stats.CallNum > 0 { + parts = append(parts, fmt.Sprintf("Calls:%d", stats.CallNum)) + } + if stats.TimeConsumed > 1000000 { // > 1ms + parts = append(parts, fmt.Sprintf("Time:%s", common.FormatDuration(stats.TimeConsumed))) + } + if stats.InputRows > 0 { + parts = append(parts, fmt.Sprintf("Rows:%d→%d", stats.InputRows, stats.OutputRows)) + } + if stats.InputSize > 1024 { // > 1KB + parts = append(parts, fmt.Sprintf("Size:%s", common.FormatBytes(stats.InputSize))) + } + if len(parts) > 0 { + metrics = " [" + strings.Join(parts, " ") + "]" + } + } + + buffer.WriteString(fmt.Sprintf("%s%s", op.OpName, metrics)) + + // Show child operators in chain + if len(op.Children) > 0 { + buffer.WriteString(" → ") + explainOperatorChainCompressed(op.Children[0], buffer) + } +} + +// formatColumns formats column list compactly +func formatColumns(columns []string) string { + if len(columns) == 0 { + return "" + } + if len(columns) <= 3 { + return "[" + strings.Join(columns, " ") + "]" + } + return fmt.Sprintf("[%s...+%d]", strings.Join(columns[:2], " "), len(columns)-2) +} diff --git a/pkg/sql/models/show_phyplan_compressed_test.go b/pkg/sql/models/show_phyplan_compressed_test.go new file mode 100644 index 0000000000000..319d897447140 --- /dev/null +++ b/pkg/sql/models/show_phyplan_compressed_test.go @@ -0,0 +1,969 @@ +// Copyright 2024 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package models + +import ( + "bytes" + "strings" + "testing" + + "github.com/matrixorigin/matrixone/pkg/util/trace/impl/motrace/statistic" + "github.com/matrixorigin/matrixone/pkg/vm/process" +) + +// TestFormatColumns tests the formatColumns function with various scenarios +func TestFormatColumns(t *testing.T) { + tests := []struct { + name string + columns []string + expected string + }{ + { + name: "empty columns", + columns: []string{}, + expected: "", + }, + { + name: "nil columns", + columns: nil, + expected: "", + }, + { + name: "single column", + columns: []string{"col1"}, + expected: "[col1]", + }, + { + name: "two columns", + columns: []string{"col1", "col2"}, + expected: "[col1 col2]", + }, + { + name: "three columns", + columns: []string{"col1", "col2", "col3"}, + expected: "[col1 col2 col3]", + }, + { + name: "four columns - should truncate", + columns: []string{"col1", "col2", "col3", "col4"}, + expected: "[col1 col2...+2]", + }, + { + name: "many columns - should truncate", + columns: []string{"col1", "col2", "col3", "col4", "col5", "col6", "col7"}, + expected: "[col1 col2...+5]", + }, + { + name: "columns with special characters", + columns: []string{"col_1", "col-2", "col.3"}, + expected: "[col_1 col-2 col.3]", + }, + { + name: "columns with spaces", + columns: []string{"col 1", "col 2"}, + expected: "[col 1 col 2]", + }, + { + name: "empty string columns", + columns: []string{"", ""}, + expected: "[ ]", + }, + { + name: "single empty string column", + columns: []string{""}, + expected: "[]", + }, + { + name: "columns with unicode characters", + columns: []string{"列1", "列2"}, + expected: "[列1 列2]", + }, + { + name: "very long column names", + columns: []string{"very_long_column_name_1", "very_long_column_name_2", "very_long_column_name_3", "very_long_column_name_4"}, + expected: "[very_long_column_name_1 very_long_column_name_2...+2]", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := formatColumns(tt.columns) + if result != tt.expected { + t.Errorf("formatColumns() = %q, want %q", result, tt.expected) + } + }) + } +} + +// TestExplainOperatorChainCompressed tests the explainOperatorChainCompressed function +func TestExplainOperatorChainCompressed(t *testing.T) { + tests := []struct { + name string + operator *PhyOperator + contains []string // strings that should be present in output + notContains []string // strings that should NOT be present in output + }{ + { + name: "nil operator", + operator: nil, + contains: []string{}, + notContains: []string{}, + }, + { + name: "operator without stats", + operator: &PhyOperator{OpName: "TableScan", NodeIdx: 0}, + contains: []string{"TableScan"}, + notContains: []string{}, + }, + { + name: "operator with stats - all metrics below threshold", + operator: &PhyOperator{ + OpName: "Filter", + NodeIdx: 0, + OpStats: &process.OperatorStats{ + CallNum: 0, + TimeConsumed: 500000, // < 1ms threshold + InputRows: 0, + InputSize: 512, // < 1KB threshold + }, + }, + contains: []string{"Filter"}, + notContains: []string{"Calls:", "Time:", "Rows:", "Size:"}, + }, + { + name: "operator with significant stats", + operator: &PhyOperator{ + OpName: "TableScan", + NodeIdx: 0, + OpStats: &process.OperatorStats{ + CallNum: 10, + TimeConsumed: 2000000, // > 1ms + InputRows: 1000, + OutputRows: 950, + InputSize: 2048, // > 1KB + }, + }, + contains: []string{"TableScan", "Calls:10", "Time:", "Rows:1000→950", "Size:"}, + notContains: []string{}, + }, + { + name: "operator chain with children", + operator: &PhyOperator{ + OpName: "Projection", + NodeIdx: 0, + OpStats: &process.OperatorStats{ + CallNum: 5, + TimeConsumed: 1500000, + InputRows: 500, + OutputRows: 500, + }, + Children: []*PhyOperator{ + { + OpName: "Filter", + NodeIdx: 0, + OpStats: &process.OperatorStats{ + CallNum: 5, + TimeConsumed: 1000000, + InputRows: 1000, + OutputRows: 500, + }, + }, + }, + }, + contains: []string{"Projection", "Filter", "→"}, + notContains: []string{}, + }, + { + name: "operator with only CallNum significant", + operator: &PhyOperator{ + OpName: "Group", + NodeIdx: 1, + OpStats: &process.OperatorStats{ + CallNum: 20, + TimeConsumed: 500000, // below threshold + InputRows: 0, + InputSize: 512, // below threshold + }, + }, + contains: []string{"Group", "Calls:20"}, + notContains: []string{"Time:", "Rows:", "Size:"}, + }, + { + name: "operator with only Time significant", + operator: &PhyOperator{ + OpName: "Join", + NodeIdx: 2, + OpStats: &process.OperatorStats{ + CallNum: 0, + TimeConsumed: 5000000, // > 1ms + InputRows: 0, + InputSize: 512, // below threshold + }, + }, + contains: []string{"Join", "Time:"}, + notContains: []string{"Calls:", "Rows:", "Size:"}, + }, + { + name: "operator with only Rows significant", + operator: &PhyOperator{ + OpName: "Sort", + NodeIdx: 3, + OpStats: &process.OperatorStats{ + CallNum: 0, + TimeConsumed: 500000, // below threshold + InputRows: 5000, + OutputRows: 5000, + InputSize: 512, // below threshold + }, + }, + contains: []string{"Sort", "Rows:5000→5000"}, + notContains: []string{"Calls:", "Time:", "Size:"}, + }, + { + name: "operator with only Size significant", + operator: &PhyOperator{ + OpName: "Merge", + NodeIdx: 4, + OpStats: &process.OperatorStats{ + CallNum: 0, + TimeConsumed: 500000, // below threshold + InputRows: 0, + InputSize: 2048, // > 1KB + }, + }, + contains: []string{"Merge", "Size:"}, + notContains: []string{"Calls:", "Time:", "Rows:"}, + }, + { + name: "deep operator chain", + operator: &PhyOperator{ + OpName: "Output", + NodeIdx: -1, + Children: []*PhyOperator{ + { + OpName: "Projection", + NodeIdx: 0, + Children: []*PhyOperator{ + { + OpName: "Filter", + NodeIdx: 0, + }, + }, + }, + }, + }, + contains: []string{"Output", "→", "Projection", "→", "Filter"}, + notContains: []string{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + buffer := bytes.NewBuffer(make([]byte, 0, 100)) + explainOperatorChainCompressed(tt.operator, buffer) + result := buffer.String() + + // Check for expected strings + for _, expected := range tt.contains { + if !strings.Contains(result, expected) { + t.Errorf("explainOperatorChainCompressed() output should contain %q, got: %q", expected, result) + } + } + + // Check for strings that should NOT be present + for _, notExpected := range tt.notContains { + if strings.Contains(result, notExpected) { + t.Errorf("explainOperatorChainCompressed() output should NOT contain %q, got: %q", notExpected, result) + } + } + }) + } +} + +// TestExplainPhyScopeCompressed tests the explainPhyScopeCompressed function +func TestExplainPhyScopeCompressed(t *testing.T) { + tests := []struct { + name string + scope PhyScope + scopeIdx int + depth int + contains []string + notContains []string + }{ + { + name: "basic scope without datasource", + scope: PhyScope{ + Magic: "Normal", + Mcpu: 4, + }, + scopeIdx: 0, + depth: 0, + contains: []string{"Scope 1", "Normal", "mcpu: 4"}, + notContains: []string{}, + }, + { + name: "scope with receiver", + scope: PhyScope{ + Magic: "Merge", + Mcpu: 8, + Receiver: []PhyReceiver{ + {Idx: 0, RemoteUuid: ""}, + {Idx: 1, RemoteUuid: "uuid-123"}, + }, + }, + scopeIdx: 1, + depth: 0, + contains: []string{"Scope 2", "Merge", "mcpu: 8", "Receiver:"}, + notContains: []string{}, + }, + { + name: "scope with datasource", + scope: PhyScope{ + Magic: "Normal", + Mcpu: 2, + DataSource: &PhySource{ + SchemaName: "test_schema", + RelationName: "test_table", + Attributes: []string{"col1", "col2"}, + }, + }, + scopeIdx: 0, + depth: 0, + contains: []string{"Scope 1", "DataSource:", "test_schema", "test_table", "[col1 col2]"}, + notContains: []string{}, + }, + { + name: "scope with datasource - many columns", + scope: PhyScope{ + Magic: "Normal", + Mcpu: 2, + DataSource: &PhySource{ + SchemaName: "schema", + RelationName: "table", + Attributes: []string{"col1", "col2", "col3", "col4", "col5"}, + }, + }, + scopeIdx: 0, + depth: 0, + contains: []string{"DataSource:", "[col1 col2...+3]"}, + notContains: []string{}, + }, + { + name: "scope with root operator", + scope: PhyScope{ + Magic: "Normal", + Mcpu: 4, + RootOperator: &PhyOperator{ + OpName: "TableScan", + NodeIdx: 0, + OpStats: &process.OperatorStats{ + CallNum: 10, + TimeConsumed: 2000000, + InputRows: 1000, + OutputRows: 950, + }, + }, + }, + scopeIdx: 0, + depth: 0, + contains: []string{"Scope 1", "Pipeline:", "TableScan"}, + notContains: []string{}, + }, + { + name: "scope with prescopes", + scope: PhyScope{ + Magic: "Normal", + Mcpu: 4, + PreScopes: []PhyScope{ + { + Magic: "Merge", + Mcpu: 2, + }, + { + Magic: "Normal", + Mcpu: 2, + }, + }, + }, + scopeIdx: 0, + depth: 0, + contains: []string{"Scope 1", "PreScopes: 2 scope(s)", "Scope 1", "Scope 2"}, + notContains: []string{}, + }, + { + name: "scope with nested prescopes", + scope: PhyScope{ + Magic: "Normal", + Mcpu: 4, + PreScopes: []PhyScope{ + { + Magic: "Merge", + Mcpu: 2, + PreScopes: []PhyScope{ + { + Magic: "Normal", + Mcpu: 1, + }, + }, + }, + }, + }, + scopeIdx: 0, + depth: 0, + contains: []string{"PreScopes: 1 scope(s)", "Scope 1"}, + notContains: []string{}, + }, + { + name: "scope with empty datasource schema", + scope: PhyScope{ + Magic: "Normal", + Mcpu: 4, + DataSource: &PhySource{ + SchemaName: "", + RelationName: "table", + Attributes: []string{"col1"}, + }, + }, + scopeIdx: 0, + depth: 0, + notContains: []string{"DataSource:"}, + }, + { + name: "scope with indentation at depth 2", + scope: PhyScope{ + Magic: "Normal", + Mcpu: 4, + }, + scopeIdx: 0, + depth: 2, + contains: []string{"Scope 1"}, + notContains: []string{}, + }, + { + name: "complete scope with all fields", + scope: PhyScope{ + Magic: "Merge", + Mcpu: 8, + Receiver: []PhyReceiver{ + {Idx: 0, RemoteUuid: ""}, + }, + DataSource: &PhySource{ + SchemaName: "db", + RelationName: "users", + Attributes: []string{"id", "name", "email"}, + }, + RootOperator: &PhyOperator{ + OpName: "TableScan", + NodeIdx: 0, + OpStats: &process.OperatorStats{ + CallNum: 5, + TimeConsumed: 1500000, + InputRows: 100, + OutputRows: 100, + }, + }, + PreScopes: []PhyScope{ + { + Magic: "Normal", + Mcpu: 4, + }, + }, + }, + scopeIdx: 0, + depth: 0, + contains: []string{"Scope 1", "Merge", "mcpu: 8", "Receiver:", "DataSource:", "db", "users", "[id name email]", "Pipeline:", "TableScan", "PreScopes: 1 scope(s)"}, + notContains: []string{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + buffer := bytes.NewBuffer(make([]byte, 0, 200)) + explainPhyScopeCompressed(tt.scope, tt.scopeIdx, tt.depth, buffer) + result := buffer.String() + + // Check for expected strings + for _, expected := range tt.contains { + if !strings.Contains(result, expected) { + t.Errorf("explainPhyScopeCompressed() output should contain %q, got: %q", expected, result) + } + } + + // Check for strings that should NOT be present + for _, notExpected := range tt.notContains { + if strings.Contains(result, notExpected) { + t.Errorf("explainPhyScopeCompressed() output should NOT contain %q, got: %q", notExpected, result) + } + } + }) + } +} + +// TestExplainPhyPlanCompressed tests the ExplainPhyPlanCompressed function +func TestExplainPhyPlanCompressed(t *testing.T) { + // Create test data + operatorStats := &process.OperatorStats{ + CallNum: 10, + TimeConsumed: 2000000, + InputRows: 1000, + OutputRows: 950, + InputSize: 2048, + } + + rootOp := &PhyOperator{ + OpName: "TableScan", + NodeIdx: 0, + OpStats: operatorStats, + } + + scopeWithDataSource := PhyScope{ + Magic: "Normal", + Mcpu: 4, + DataSource: &PhySource{ + SchemaName: "test_schema", + RelationName: "test_table", + Attributes: []string{"col1", "col2", "col3"}, + }, + RootOperator: rootOp, + } + + scopeWithReceiver := PhyScope{ + Magic: "Merge", + Mcpu: 8, + Receiver: []PhyReceiver{ + {Idx: 0, RemoteUuid: ""}, + }, + RootOperator: &PhyOperator{ + OpName: "Projection", + NodeIdx: 1, + OpStats: operatorStats, + Children: []*PhyOperator{rootOp}, + }, + } + + scopeWithPreScopes := PhyScope{ + Magic: "Normal", + Mcpu: 4, + PreScopes: []PhyScope{ + scopeWithDataSource, + }, + RootOperator: &PhyOperator{ + OpName: "Output", + NodeIdx: -1, + }, + } + + tests := []struct { + name string + plan *PhyPlan + statsInfo *statistic.StatsInfo + option ExplainOption + contains []string + notContains []string + }{ + { + name: "empty plan", + plan: NewPhyPlan(), + statsInfo: nil, + option: NormalOption, + contains: []string{"Physical Plan Deployment (Compressed):"}, + notContains: []string{"LOCAL SCOPES:", "REMOTE SCOPES:"}, + }, + { + name: "plan with local scope only", + plan: &PhyPlan{ + Version: "1.0", + RetryTime: 0, + LocalScope: []PhyScope{scopeWithDataSource}, + }, + statsInfo: nil, + option: NormalOption, + contains: []string{"LOCAL SCOPES:", "Scope 1", "test_schema", "test_table", "TableScan"}, + notContains: []string{}, + }, + { + name: "plan with remote scope only", + plan: &PhyPlan{ + Version: "1.0", + RetryTime: 0, + RemoteScope: []PhyScope{scopeWithReceiver}, + }, + statsInfo: nil, + option: NormalOption, + contains: []string{"REMOTE SCOPES:", "Scope 1", "Merge", "Projection"}, + notContains: []string{}, + }, + { + name: "plan with both local and remote scopes", + plan: &PhyPlan{ + Version: "1.0", + RetryTime: 0, + LocalScope: []PhyScope{scopeWithDataSource}, + RemoteScope: []PhyScope{scopeWithReceiver}, + }, + statsInfo: nil, + option: NormalOption, + contains: []string{"LOCAL SCOPES:", "REMOTE SCOPES:", "Scope 1"}, + notContains: []string{}, + }, + { + name: "plan with prescopes", + plan: &PhyPlan{ + Version: "1.0", + RetryTime: 0, + LocalScope: []PhyScope{scopeWithPreScopes}, + }, + statsInfo: nil, + option: NormalOption, + contains: []string{"LOCAL SCOPES:", "PreScopes: 1 scope(s)", "test_schema", "test_table"}, + notContains: []string{}, + }, + { + name: "plan with multiple local scopes", + plan: &PhyPlan{ + Version: "1.0", + RetryTime: 0, + LocalScope: []PhyScope{scopeWithDataSource, scopeWithReceiver}, + }, + statsInfo: nil, + option: NormalOption, + contains: []string{"LOCAL SCOPES:", "Scope 1", "Scope 2"}, + notContains: []string{}, + }, + { + name: "plan with stats info and verbose option", + plan: &PhyPlan{ + Version: "1.0", + RetryTime: 1, + LocalScope: []PhyScope{scopeWithDataSource}, + }, + statsInfo: func() *statistic.StatsInfo { + s := &statistic.StatsInfo{} + s.ParseStage.ParseDuration = 1000000 + s.PlanStage.PlanDuration = 2000000 + s.CompileStage.CompileDuration = 500000 + return s + }(), + option: VerboseOption, + contains: []string{"Overview:", "LOCAL SCOPES:", "Physical Plan Deployment"}, + notContains: []string{}, + }, + { + name: "plan with stats info and analyze option", + plan: &PhyPlan{ + Version: "1.0", + RetryTime: 2, + LocalScope: []PhyScope{scopeWithDataSource}, + }, + statsInfo: func() *statistic.StatsInfo { + s := &statistic.StatsInfo{} + s.ParseStage.ParseDuration = 1000000 + s.PlanStage.PlanDuration = 2000000 + s.CompileStage.CompileDuration = 500000 + return s + }(), + option: AnalyzeOption, + contains: []string{"Overview:", "LOCAL SCOPES:", "Physical Plan Deployment"}, + notContains: []string{}, + }, + { + name: "plan with scope containing many columns", + plan: &PhyPlan{ + Version: "1.0", + RetryTime: 0, + LocalScope: []PhyScope{ + { + Magic: "Normal", + Mcpu: 4, + DataSource: &PhySource{ + SchemaName: "schema", + RelationName: "table", + Attributes: []string{"col1", "col2", "col3", "col4", "col5", "col6"}, + }, + }, + }, + }, + statsInfo: nil, + option: NormalOption, + contains: []string{"[col1 col2...+4]"}, + notContains: []string{}, + }, + { + name: "plan with operator chain", + plan: &PhyPlan{ + Version: "1.0", + RetryTime: 0, + LocalScope: []PhyScope{ + { + Magic: "Normal", + Mcpu: 4, + RootOperator: &PhyOperator{ + OpName: "Output", + NodeIdx: -1, + Children: []*PhyOperator{ + { + OpName: "Projection", + NodeIdx: 0, + OpStats: &process.OperatorStats{ + CallNum: 5, + TimeConsumed: 1500000, + InputRows: 100, + OutputRows: 100, + }, + Children: []*PhyOperator{ + { + OpName: "Filter", + NodeIdx: 0, + OpStats: &process.OperatorStats{ + CallNum: 5, + TimeConsumed: 1000000, + InputRows: 200, + OutputRows: 100, + }, + }, + }, + }, + }, + }, + }, + }, + }, + statsInfo: nil, + option: NormalOption, + contains: []string{"Output", "→", "Projection", "→", "Filter"}, + notContains: []string{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := ExplainPhyPlanCompressed(tt.plan, tt.statsInfo, tt.option) + + // Check for expected strings + for _, expected := range tt.contains { + if !strings.Contains(result, expected) { + t.Errorf("ExplainPhyPlanCompressed() output should contain %q, got: %q", expected, result) + } + } + + // Check for strings that should NOT be present + for _, notExpected := range tt.notContains { + if strings.Contains(result, notExpected) { + t.Errorf("ExplainPhyPlanCompressed() output should NOT contain %q, got: %q", notExpected, result) + } + } + + // Verify that the result always contains the compressed header + if !strings.Contains(result, "Physical Plan Deployment (Compressed):") { + t.Error("ExplainPhyPlanCompressed() output should always contain 'Physical Plan Deployment (Compressed):'") + } + }) + } +} + +// TestExplainPhyPlanCompressedEdgeCases tests edge cases and boundary conditions +func TestExplainPhyPlanCompressedEdgeCases(t *testing.T) { + t.Run("nil plan", func(t *testing.T) { + // Note: The function doesn't handle nil plan, so we skip this test + // or test that it panics as expected + defer func() { + if r := recover(); r == nil { + t.Error("ExplainPhyPlanCompressed() should panic on nil plan") + } + }() + ExplainPhyPlanCompressed(nil, nil, NormalOption) + }) + + t.Run("plan with empty scopes", func(t *testing.T) { + plan := &PhyPlan{ + Version: "1.0", + LocalScope: []PhyScope{}, + RemoteScope: []PhyScope{}, + } + result := ExplainPhyPlanCompressed(plan, nil, NormalOption) + if !strings.Contains(result, "Physical Plan Deployment (Compressed):") { + t.Error("ExplainPhyPlanCompressed() should output header even with empty scopes") + } + }) + + t.Run("scope with nil root operator", func(t *testing.T) { + plan := &PhyPlan{ + Version: "1.0", + LocalScope: []PhyScope{ + { + Magic: "Normal", + Mcpu: 4, + RootOperator: nil, + }, + }, + } + result := ExplainPhyPlanCompressed(plan, nil, NormalOption) + if !strings.Contains(result, "Scope 1") { + t.Error("ExplainPhyPlanCompressed() should handle nil root operator") + } + if strings.Contains(result, "Pipeline:") { + t.Error("ExplainPhyPlanCompressed() should not output Pipeline when root operator is nil") + } + }) + + t.Run("scope with empty prescopes", func(t *testing.T) { + plan := &PhyPlan{ + Version: "1.0", + LocalScope: []PhyScope{ + { + Magic: "Normal", + Mcpu: 4, + PreScopes: []PhyScope{}, + }, + }, + } + result := ExplainPhyPlanCompressed(plan, nil, NormalOption) + if strings.Contains(result, "PreScopes:") { + t.Error("ExplainPhyPlanCompressed() should not output PreScopes when empty") + } + }) + + t.Run("scope with nil datasource", func(t *testing.T) { + plan := &PhyPlan{ + Version: "1.0", + LocalScope: []PhyScope{ + { + Magic: "Normal", + Mcpu: 4, + DataSource: nil, + }, + }, + } + result := ExplainPhyPlanCompressed(plan, nil, NormalOption) + if !strings.Contains(result, "Scope 1") { + t.Error("ExplainPhyPlanCompressed() should handle nil datasource") + } + if strings.Contains(result, "DataSource:") { + t.Error("ExplainPhyPlanCompressed() should not output DataSource when nil") + } + }) + + t.Run("scope with empty receiver", func(t *testing.T) { + plan := &PhyPlan{ + Version: "1.0", + LocalScope: []PhyScope{ + { + Magic: "Normal", + Mcpu: 4, + Receiver: []PhyReceiver{}, + }, + }, + } + result := ExplainPhyPlanCompressed(plan, nil, NormalOption) + if !strings.Contains(result, "Scope 1") { + t.Error("ExplainPhyPlanCompressed() should handle empty receiver") + } + }) + + t.Run("operator with multiple children - only first child shown", func(t *testing.T) { + plan := &PhyPlan{ + Version: "1.0", + LocalScope: []PhyScope{ + { + Magic: "Normal", + Mcpu: 4, + RootOperator: &PhyOperator{ + OpName: "Union", + NodeIdx: 0, + Children: []*PhyOperator{ + { + OpName: "TableScan1", + NodeIdx: 0, + }, + { + OpName: "TableScan2", + NodeIdx: 1, + }, + }, + }, + }, + }, + } + result := ExplainPhyPlanCompressed(plan, nil, NormalOption) + if !strings.Contains(result, "Union") { + t.Error("ExplainPhyPlanCompressed() should show root operator") + } + if !strings.Contains(result, "TableScan1") { + t.Error("ExplainPhyPlanCompressed() should show first child") + } + // Note: The compressed version only shows the first child, so TableScan2 might not appear + }) + + t.Run("operator with stats at exact threshold values", func(t *testing.T) { + plan := &PhyPlan{ + Version: "1.0", + LocalScope: []PhyScope{ + { + Magic: "Normal", + Mcpu: 4, + RootOperator: &PhyOperator{ + OpName: "TableScan", + NodeIdx: 0, + OpStats: &process.OperatorStats{ + TimeConsumed: 1000000, // exactly 1ms threshold + InputSize: 1024, // exactly 1KB threshold + }, + }, + }, + }, + } + result := ExplainPhyPlanCompressed(plan, nil, NormalOption) + if !strings.Contains(result, "TableScan") { + t.Error("ExplainPhyPlanCompressed() should show operator name") + } + // Time and Size should not appear as they are at threshold (not > threshold) + if strings.Contains(result, "Time:") { + t.Error("ExplainPhyPlanCompressed() should not show Time when exactly at threshold") + } + if strings.Contains(result, "Size:") { + t.Error("ExplainPhyPlanCompressed() should not show Size when exactly at threshold") + } + }) + + t.Run("operator with stats just above threshold", func(t *testing.T) { + plan := &PhyPlan{ + Version: "1.0", + LocalScope: []PhyScope{ + { + Magic: "Normal", + Mcpu: 4, + RootOperator: &PhyOperator{ + OpName: "TableScan", + NodeIdx: 0, + OpStats: &process.OperatorStats{ + TimeConsumed: 1000001, // just above 1ms threshold + InputSize: 1025, // just above 1KB threshold + }, + }, + }, + }, + } + result := ExplainPhyPlanCompressed(plan, nil, NormalOption) + if !strings.Contains(result, "TableScan") { + t.Error("ExplainPhyPlanCompressed() should show operator name") + } + if !strings.Contains(result, "Time:") { + t.Error("ExplainPhyPlanCompressed() should show Time when just above threshold") + } + if !strings.Contains(result, "Size:") { + t.Error("ExplainPhyPlanCompressed() should show Size when just above threshold") + } + }) +} diff --git a/pkg/sql/plan/function/func_mo_explain_phy_test.go b/pkg/sql/plan/function/func_mo_explain_phy_test.go index 94ff8d88ce095..8b03dc50de4bb 100644 --- a/pkg/sql/plan/function/func_mo_explain_phy_test.go +++ b/pkg/sql/plan/function/func_mo_explain_phy_test.go @@ -875,7 +875,7 @@ Scope 1 (Magic: Merge, mcpu: 1, Receiver: [0]) └── MergeGroup └── Merge PreScopes: { - Scope 1 (Magic: Remote, mcpu: 4, Receiver: []) + Scope 1 (Magic: Remote, mcpu: 4) DataSource: cloud_device.real_time_position[time_stamp distance] Pipeline: └── Connector to MergeReceiver 0 └── Group @@ -883,30 +883,30 @@ Scope 1 (Magic: Merge, mcpu: 1, Receiver: [0]) └── Filter └── TableScan PreScopes: { - Scope 1 (Magic: Normal, mcpu: 0, Receiver: []) + Scope 1 (Magic: Normal, mcpu: 0) PreScopes: { - Scope 1 (Magic: Normal, mcpu: 1, Receiver: []) + Scope 1 (Magic: Normal, mcpu: 1) DataSource: cloud_device.real_time_position[time_stamp distance] Pipeline: └── Connector to MergeReceiver 0 └── Group └── Projection └── Filter └── TableScan - Scope 2 (Magic: Normal, mcpu: 1, Receiver: []) + Scope 2 (Magic: Normal, mcpu: 1) DataSource: cloud_device.real_time_position[time_stamp distance] Pipeline: └── Connector to MergeReceiver 0 └── Group └── Projection └── Filter └── TableScan - Scope 3 (Magic: Normal, mcpu: 1, Receiver: []) + Scope 3 (Magic: Normal, mcpu: 1) DataSource: cloud_device.real_time_position[time_stamp distance] Pipeline: └── Connector to MergeReceiver 0 └── Group └── Projection └── Filter └── TableScan - Scope 4 (Magic: Normal, mcpu: 1, Receiver: []) + Scope 4 (Magic: Normal, mcpu: 1) DataSource: cloud_device.real_time_position[time_stamp distance] Pipeline: └── Connector to MergeReceiver 0 └── Group @@ -917,140 +917,133 @@ Scope 1 (Magic: Merge, mcpu: 1, Receiver: [0]) } }` explain_verbose_res := `Overview: - MemoryUsage:5623960B, SpillSize:0B, DiskI/O:66715122B, NewWorkI/O:0B, RetryTime: 0 - S3List:0, S3Head:0, S3Put:0, S3Get:0, S3Delete:0, S3DeleteMul:0, S3InputEstByRows((0+0)/8192):0.0000 + MemoryUsage:5.62MB, DiskI/O:66.72MB, RetryTime: 0 CPU Usage: - - Total CPU Time: 91033157ns - - CPU Time Detail: Parse(162026)+BuildPlan(649910)+Compile(299370)+PhyExec(304651393)+PrepareRun(178265)-PreRunWaitLock(0)-PlanStatsIO(0)-IOAccess(214917833)-IOMerge(0) + - Total CPU Time: 91.03ms + - CPU Time Detail: Parse(0.16ms)+BuildPlan(0.65ms)+Compile(0.30ms)+PhyExec(304.65ms)+PrepareRun(0.18ms)-PreRunWaitLock(0ns)-PlanStatsIO(0ns)-IOAccess(214.92ms)-IOMerge(0ns) - Permission Authentication Stats Array: [0 0 0 0 0 0 0 0 0 0 0] Physical Plan Deployment: LOCAL SCOPES: Scope 1 (Magic: Merge, mcpu: 1, Receiver: [0]) - Pipeline: └── Output (idx:-1, isFirst:false, isLast:false) - └── Projection (idx:2, isFirst:true, isLast:true) - └── Projection (idx:1, isFirst:false, isLast:true) - └── MergeGroup (idx:1, isFirst:false, isLast:false) - └── Merge (idx:1, isFirst:false, isLast:false) + Pipeline: └── Output (-1,false,false) + └── Projection (2,true,true) + └── Projection (1,false,true) + └── MergeGroup (1,false,false) + └── Merge (1,false,false) PreScopes: { - Scope 1 (Magic: Remote, mcpu: 4, Receiver: []) + Scope 1 (Magic: Remote, mcpu: 4) DataSource: cloud_device.real_time_position[time_stamp distance] - Pipeline: └── Connector (idx:1, isFirst:false, isLast:false) to MergeReceiver 0 - └── Group (idx:1, isFirst:true, isLast:false) - └── Projection (idx:0, isFirst:false, isLast:true) - └── Filter (idx:0, isFirst:false, isLast:false) - └── TableScan (idx:0, isFirst:true, isLast:false) + Pipeline: └── Connector (1,false,false) to MergeReceiver 0 + └── Group (1,true,false) + └── Projection (0,false,true) + └── Filter (0,false,false) + └── TableScan (0,true,false) PreScopes: { - Scope 1 (Magic: Normal, mcpu: 0, Receiver: []) + Scope 1 (Magic: Normal, mcpu: 0) PreScopes: { - Scope 1 (Magic: Normal, mcpu: 1, Receiver: []) + Scope 1 (Magic: Normal, mcpu: 1) DataSource: cloud_device.real_time_position[time_stamp distance] - Pipeline: └── Connector (idx:1, isFirst:false, isLast:false) to MergeReceiver 0 - └── Group (idx:1, isFirst:true, isLast:false) - └── Projection (idx:0, isFirst:false, isLast:true) - └── Filter (idx:0, isFirst:false, isLast:false) - └── TableScan (idx:0, isFirst:true, isLast:false) - Scope 2 (Magic: Normal, mcpu: 1, Receiver: []) + Pipeline: └── Connector (1,false,false) to MergeReceiver 0 + └── Group (1,true,false) + └── Projection (0,false,true) + └── Filter (0,false,false) + └── TableScan (0,true,false) + Scope 2 (Magic: Normal, mcpu: 1) DataSource: cloud_device.real_time_position[time_stamp distance] - Pipeline: └── Connector (idx:1, isFirst:false, isLast:false) to MergeReceiver 0 - └── Group (idx:1, isFirst:true, isLast:false) - └── Projection (idx:0, isFirst:false, isLast:true) - └── Filter (idx:0, isFirst:false, isLast:false) - └── TableScan (idx:0, isFirst:true, isLast:false) - Scope 3 (Magic: Normal, mcpu: 1, Receiver: []) + Pipeline: └── Connector (1,false,false) to MergeReceiver 0 + └── Group (1,true,false) + └── Projection (0,false,true) + └── Filter (0,false,false) + └── TableScan (0,true,false) + Scope 3 (Magic: Normal, mcpu: 1) DataSource: cloud_device.real_time_position[time_stamp distance] - Pipeline: └── Connector (idx:1, isFirst:false, isLast:false) to MergeReceiver 0 - └── Group (idx:1, isFirst:true, isLast:false) - └── Projection (idx:0, isFirst:false, isLast:true) - └── Filter (idx:0, isFirst:false, isLast:false) - └── TableScan (idx:0, isFirst:true, isLast:false) - Scope 4 (Magic: Normal, mcpu: 1, Receiver: []) + Pipeline: └── Connector (1,false,false) to MergeReceiver 0 + └── Group (1,true,false) + └── Projection (0,false,true) + └── Filter (0,false,false) + └── TableScan (0,true,false) + Scope 4 (Magic: Normal, mcpu: 1) DataSource: cloud_device.real_time_position[time_stamp distance] - Pipeline: └── Connector (idx:1, isFirst:false, isLast:false) to MergeReceiver 0 - └── Group (idx:1, isFirst:true, isLast:false) - └── Projection (idx:0, isFirst:false, isLast:true) - └── Filter (idx:0, isFirst:false, isLast:false) - └── TableScan (idx:0, isFirst:true, isLast:false) + Pipeline: └── Connector (1,false,false) to MergeReceiver 0 + └── Group (1,true,false) + └── Projection (0,false,true) + └── Filter (0,false,false) + └── TableScan (0,true,false) } } }` explain_analyze_res := `Overview: - MemoryUsage:5623960B, SpillSize:0B, DiskI/O:66715122B, NewWorkI/O:0B, RetryTime: 0 - S3List:0, S3Head:0, S3Put:0, S3Get:0, S3Delete:0, S3DeleteMul:0, S3InputEstByRows((0+0)/8192):0.0000 + MemoryUsage:5.62MB, DiskI/O:66.72MB, RetryTime: 0 CPU Usage: - - Total CPU Time: 91033157ns - - CPU Time Detail: Parse(162026)+BuildPlan(649910)+Compile(299370)+PhyExec(304651393)+PrepareRun(178265)-PreRunWaitLock(0)-PlanStatsIO(0)-IOAccess(214917833)-IOMerge(0) + - Total CPU Time: 91.03ms + - CPU Time Detail: Parse(0.16ms)+BuildPlan(0.65ms)+Compile(0.30ms)+PhyExec(304.65ms)+PrepareRun(0.18ms)-PreRunWaitLock(0ns)-PlanStatsIO(0ns)-IOAccess(214.92ms)-IOMerge(0ns) - Permission Authentication Stats Array: [0 0 0 0 0 0 0 0 0 0 0] Query Build Plan Stage: - - CPU Time: 649910ns - - S3List:0, S3Head:0, S3Put:0, S3Get:0, S3Delete:0, S3DeleteMul:0 - - Build Plan Duration: 649910ns - - Call Stats Duration: 3457758ns + - CPU Time: 0.65ms + - Build Plan Duration: 0.65ms + - Call Stats Duration: 3.46ms - Call StatsInCache Duration: 0ns - Call Stats IO Consumption: 0ns - - Call Stats S3List:0, S3Head:0, S3Put:0, S3Get:0, S3Delete:0, S3DeleteMul:0 Query Compile Stage: - - CPU Time: 299370ns - - S3List:0, S3Head:0, S3Put:0, S3Get:0, S3Delete:0, S3DeleteMul:0 - - Compile TableScan Duration: 33791ns + - CPU Time: 0.30ms + - Compile TableScan Duration: 0.03ms Query Prepare Exec Stage: - - CPU Time: 178265ns - - CompilePreRunOnce Duration: 12456ns + - CPU Time: 0.18ms + - CompilePreRunOnce Duration: 0.01ms - PreRunOnce WaitLock: 0ns - - ScopePrepareTimeConsumed: 165809ns - - BuildReader Duration: 144702ns - - S3List:0, S3Head:0, S3Put:0, S3Get:0, S3Delete:0, S3DeleteMul:0 + - ScopePrepareTimeConsumed: 0.17ms + - BuildReader Duration: 0.14ms Query Execution Stage: - - CPU Time: 304651393ns - - S3List:0, S3Head:0, S3Put:0, S3Get:0, S3Delete:0, S3DeleteMul:0 - - MemoryUsage: 5623960B, SpillSize: 0B, DiskI/O: 66715122B, NewWorkI/O:0B + - CPU Time: 304.65ms + - MemoryUsage: 5.62MB, DiskI/O: 66.72MB Physical Plan Deployment: LOCAL SCOPES: Scope 1 (Magic: Merge, mcpu: 1, Receiver: [0]) - Pipeline: └── Output (idx:-1, isFirst:false, isLast:false) CallNum:2 TimeCost:21883ns WaitTime:0ns InRows:0 OutRows:0 InSize:0bytes InBlock:0 OutSize:0bytes MemSize:0bytes SpillSize:0bytes ScanBytes:0bytes NetworkIO:0bytes DiskIO:0bytes - └── Projection (idx:2, isFirst:true, isLast:true) CallNum:2 TimeCost:15439ns WaitTime:0ns InRows:1 OutRows:1 InSize:8bytes InBlock:0 OutSize:8bytes MemSize:8bytes SpillSize:0bytes ScanBytes:0bytes NetworkIO:0bytes DiskIO:0bytes - └── Projection (idx:1, isFirst:false, isLast:true) CallNum:2 TimeCost:1094ns WaitTime:0ns InRows:0 OutRows:1 InSize:0bytes InBlock:0 OutSize:8bytes MemSize:8bytes SpillSize:0bytes ScanBytes:0bytes NetworkIO:0bytes DiskIO:0bytes - └── MergeGroup (idx:1, isFirst:false, isLast:false) CallNum:2 TimeCost:15712ns WaitTime:0ns InRows:0 OutRows:0 InSize:0bytes InBlock:0 OutSize:0bytes MemSize:8bytes SpillSize:0bytes ScanBytes:0bytes NetworkIO:0bytes DiskIO:0bytes - └── Merge (idx:1, isFirst:false, isLast:false) CallNum:5 TimeCost:29182ns WaitTime:81917495ns InRows:0 OutRows:0 InSize:0bytes InBlock:0 OutSize:0bytes MemSize:0bytes SpillSize:0bytes ScanBytes:0bytes NetworkIO:0bytes DiskIO:0bytes + Pipeline: └── Output (-1,false,false) CallNum:2 TimeCost:0.02ms WaitTime:0ns InRows:0 OutRows:0 InSize:0B InBlock:0 + └── Projection (2,true,true) CallNum:2 TimeCost:0.02ms WaitTime:0ns InRows:1 OutRows:1 InSize:8B InBlock:0 OutSize:8B MemSize:8B + └── Projection (1,false,true) CallNum:2 TimeCost:0.00ms WaitTime:0ns InRows:0 OutRows:1 InSize:0B InBlock:0 OutSize:8B MemSize:8B + └── MergeGroup (1,false,false) CallNum:2 TimeCost:0.02ms WaitTime:0ns InRows:0 OutRows:0 InSize:0B InBlock:0 MemSize:8B + └── Merge (1,false,false) CallNum:5 TimeCost:0.03ms WaitTime:81.92ms InRows:0 OutRows:0 InSize:0B InBlock:0 PreScopes: { - Scope 1 (Magic: Remote, mcpu: 4, Receiver: []) + Scope 1 (Magic: Remote, mcpu: 4) DataSource: cloud_device.real_time_position[time_stamp distance] - Pipeline: └── Connector (idx:1, isFirst:false, isLast:false) to MergeReceiver 0 - └── Group (idx:1, isFirst:true, isLast:false) - └── Projection (idx:0, isFirst:false, isLast:true) - └── Filter (idx:0, isFirst:false, isLast:false) - └── TableScan (idx:0, isFirst:true, isLast:false) + Pipeline: └── Connector (1,false,false) to MergeReceiver 0 + └── Group (1,true,false) + └── Projection (0,false,true) + └── Filter (0,false,false) + └── TableScan (0,true,false) PreScopes: { - Scope 1 (Magic: Normal, mcpu: 0, Receiver: []) + Scope 1 (Magic: Normal, mcpu: 0) PreScopes: { - Scope 1 (Magic: Normal, mcpu: 1, Receiver: []) + Scope 1 (Magic: Normal, mcpu: 1) DataSource: cloud_device.real_time_position[time_stamp distance] - Pipeline: └── Connector (idx:1, isFirst:false, isLast:false) CallNum:2 TimeCost:22677ns WaitTime:0ns InRows:0 OutRows:0 InSize:0bytes InBlock:0 OutSize:0bytes MemSize:0bytes SpillSize:0bytes ScanBytes:0bytes NetworkIO:0bytes DiskIO:0bytes to MergeReceiver 0 - └── Group (idx:1, isFirst:true, isLast:false) CallNum:2 TimeCost:338784ns WaitTime:0ns InRows:15593 OutRows:0 InSize:124744bytes InBlock:0 OutSize:0bytes MemSize:0bytes SpillSize:0bytes ScanBytes:0bytes NetworkIO:0bytes DiskIO:0bytes - └── Projection (idx:0, isFirst:false, isLast:true) CallNum:153 TimeCost:61454ns WaitTime:0ns InRows:0 OutRows:15593 InSize:0bytes InBlock:0 OutSize:124744bytes MemSize:29800bytes SpillSize:0bytes ScanBytes:0bytes NetworkIO:0bytes DiskIO:0bytes - └── Filter (idx:0, isFirst:false, isLast:false) CallNum:153 TimeCost:16747009ns WaitTime:0ns InRows:0 OutRows:0 InSize:0bytes InBlock:0 OutSize:0bytes MemSize:1245184bytes SpillSize:0bytes ScanBytes:0bytes NetworkIO:0bytes DiskIO:0bytes - └── TableScan (idx:0, isFirst:true, isLast:false) CallNum:153 TimeCost:58472333ns WaitTime:0ns InRows:1245184 OutRows:0 InSize:19922944bytes InBlock:0 OutSize:0bytes MemSize:131072bytes SpillSize:0bytes ScanBytes:0bytes NetworkIO:0bytes DiskIO:16491709bytes - Scope 2 (Magic: Normal, mcpu: 1, Receiver: []) + Pipeline: └── Connector (1,false,false) CallNum:2 TimeCost:0.02ms WaitTime:0ns InRows:0 OutRows:0 InSize:0B InBlock:0 to MergeReceiver 0 + └── Group (1,true,false) CallNum:2 TimeCost:0.34ms WaitTime:0ns InRows:15593 OutRows:0 InSize:124.74KB InBlock:0 + └── Projection (0,false,true) CallNum:153 TimeCost:0.06ms WaitTime:0ns InRows:0 OutRows:15593 InSize:0B InBlock:0 OutSize:124.74KB MemSize:29.80KB + └── Filter (0,false,false) CallNum:153 TimeCost:16.75ms WaitTime:0ns InRows:0 OutRows:0 InSize:0B InBlock:0 MemSize:1.25MB + └── TableScan (0,true,false) CallNum:153 TimeCost:58.47ms WaitTime:0ns InRows:1245184 OutRows:0 InSize:19.92MB InBlock:0 MemSize:131.07KB + Scope 2 (Magic: Normal, mcpu: 1) DataSource: cloud_device.real_time_position[time_stamp distance] - Pipeline: └── Connector (idx:1, isFirst:false, isLast:false) CallNum:2 TimeCost:9887ns WaitTime:0ns InRows:0 OutRows:0 InSize:0bytes InBlock:0 OutSize:0bytes MemSize:0bytes SpillSize:0bytes ScanBytes:0bytes NetworkIO:0bytes DiskIO:0bytes to MergeReceiver 0 - └── Group (idx:1, isFirst:true, isLast:false) CallNum:2 TimeCost:319777ns WaitTime:0ns InRows:15405 OutRows:0 InSize:123240bytes InBlock:0 OutSize:0bytes MemSize:0bytes SpillSize:0bytes ScanBytes:0bytes NetworkIO:0bytes DiskIO:0bytes - └── Projection (idx:0, isFirst:false, isLast:true) CallNum:154 TimeCost:66538ns WaitTime:0ns InRows:0 OutRows:15405 InSize:0bytes InBlock:0 OutSize:123240bytes MemSize:18864bytes SpillSize:0bytes ScanBytes:0bytes NetworkIO:0bytes DiskIO:0bytes - └── Filter (idx:0, isFirst:false, isLast:false) CallNum:154 TimeCost:18563940ns WaitTime:0ns InRows:0 OutRows:0 InSize:0bytes InBlock:0 OutSize:0bytes MemSize:1253376bytes SpillSize:0bytes ScanBytes:0bytes NetworkIO:0bytes DiskIO:0bytes - └── TableScan (idx:0, isFirst:true, isLast:false) CallNum:154 TimeCost:61209774ns WaitTime:0ns InRows:1253376 OutRows:0 InSize:20054016bytes InBlock:0 OutSize:0bytes MemSize:131072bytes SpillSize:0bytes ScanBytes:0bytes NetworkIO:0bytes DiskIO:16797879bytes - Scope 3 (Magic: Normal, mcpu: 1, Receiver: []) + Pipeline: └── Connector (1,false,false) CallNum:2 TimeCost:0.01ms WaitTime:0ns InRows:0 OutRows:0 InSize:0B InBlock:0 to MergeReceiver 0 + └── Group (1,true,false) CallNum:2 TimeCost:0.32ms WaitTime:0ns InRows:15405 OutRows:0 InSize:123.24KB InBlock:0 + └── Projection (0,false,true) CallNum:154 TimeCost:0.07ms WaitTime:0ns InRows:0 OutRows:15405 InSize:0B InBlock:0 OutSize:123.24KB MemSize:18.86KB + └── Filter (0,false,false) CallNum:154 TimeCost:18.56ms WaitTime:0ns InRows:0 OutRows:0 InSize:0B InBlock:0 MemSize:1.25MB + └── TableScan (0,true,false) CallNum:154 TimeCost:61.21ms WaitTime:0ns InRows:1253376 OutRows:0 InSize:20.05MB InBlock:0 MemSize:131.07KB + Scope 3 (Magic: Normal, mcpu: 1) DataSource: cloud_device.real_time_position[time_stamp distance] - Pipeline: └── Connector (idx:1, isFirst:false, isLast:false) CallNum:2 TimeCost:2950ns WaitTime:0ns InRows:0 OutRows:0 InSize:0bytes InBlock:0 OutSize:0bytes MemSize:0bytes SpillSize:0bytes ScanBytes:0bytes NetworkIO:0bytes DiskIO:0bytes to MergeReceiver 0 - └── Group (idx:1, isFirst:true, isLast:false) CallNum:2 TimeCost:396141ns WaitTime:0ns InRows:19167 OutRows:0 InSize:153336bytes InBlock:0 OutSize:0bytes MemSize:0bytes SpillSize:0bytes ScanBytes:0bytes NetworkIO:0bytes DiskIO:0bytes - └── Projection (idx:0, isFirst:false, isLast:true) CallNum:154 TimeCost:82273ns WaitTime:0ns InRows:0 OutRows:19167 InSize:0bytes InBlock:0 OutSize:153336bytes MemSize:24624bytes SpillSize:0bytes ScanBytes:0bytes NetworkIO:0bytes DiskIO:0bytes - └── Filter (idx:0, isFirst:false, isLast:false) CallNum:154 TimeCost:17764543ns WaitTime:0ns InRows:0 OutRows:0 InSize:0bytes InBlock:0 OutSize:0bytes MemSize:1253376bytes SpillSize:0bytes ScanBytes:0bytes NetworkIO:0bytes DiskIO:0bytes - └── TableScan (idx:0, isFirst:true, isLast:false) CallNum:154 TimeCost:58087865ns WaitTime:0ns InRows:1253376 OutRows:0 InSize:20054016bytes InBlock:0 OutSize:0bytes MemSize:131072bytes SpillSize:0bytes ScanBytes:0bytes NetworkIO:0bytes DiskIO:16717018bytes - Scope 4 (Magic: Normal, mcpu: 1, Receiver: []) + Pipeline: └── Connector (1,false,false) CallNum:2 TimeCost:0.00ms WaitTime:0ns InRows:0 OutRows:0 InSize:0B InBlock:0 to MergeReceiver 0 + └── Group (1,true,false) CallNum:2 TimeCost:0.40ms WaitTime:0ns InRows:19167 OutRows:0 InSize:153.34KB InBlock:0 + └── Projection (0,false,true) CallNum:154 TimeCost:0.08ms WaitTime:0ns InRows:0 OutRows:19167 InSize:0B InBlock:0 OutSize:153.34KB MemSize:24.62KB + └── Filter (0,false,false) CallNum:154 TimeCost:17.76ms WaitTime:0ns InRows:0 OutRows:0 InSize:0B InBlock:0 MemSize:1.25MB + └── TableScan (0,true,false) CallNum:154 TimeCost:58.09ms WaitTime:0ns InRows:1253376 OutRows:0 InSize:20.05MB InBlock:0 MemSize:131.07KB + Scope 4 (Magic: Normal, mcpu: 1) DataSource: cloud_device.real_time_position[time_stamp distance] - Pipeline: └── Connector (idx:1, isFirst:false, isLast:false) CallNum:2 TimeCost:6821ns WaitTime:0ns InRows:0 OutRows:0 InSize:0bytes InBlock:0 OutSize:0bytes MemSize:0bytes SpillSize:0bytes ScanBytes:0bytes NetworkIO:0bytes DiskIO:0bytes to MergeReceiver 0 - └── Group (idx:1, isFirst:true, isLast:false) CallNum:2 TimeCost:374224ns WaitTime:0ns InRows:19366 OutRows:0 InSize:154928bytes InBlock:0 OutSize:0bytes MemSize:0bytes SpillSize:0bytes ScanBytes:0bytes NetworkIO:0bytes DiskIO:0bytes - └── Projection (idx:0, isFirst:false, isLast:true) CallNum:154 TimeCost:59683ns WaitTime:0ns InRows:0 OutRows:19366 InSize:0bytes InBlock:0 OutSize:154928bytes MemSize:26360bytes SpillSize:0bytes ScanBytes:0bytes NetworkIO:0bytes DiskIO:0bytes - └── Filter (idx:0, isFirst:false, isLast:false) CallNum:154 TimeCost:17421140ns WaitTime:0ns InRows:0 OutRows:0 InSize:0bytes InBlock:0 OutSize:0bytes MemSize:1248064bytes SpillSize:0bytes ScanBytes:0bytes NetworkIO:0bytes DiskIO:0bytes - └── TableScan (idx:0, isFirst:true, isLast:false) CallNum:154 TimeCost:54582153ns WaitTime:0ns InRows:1248064 OutRows:0 InSize:19969024bytes InBlock:0 OutSize:0bytes MemSize:131072bytes SpillSize:0bytes ScanBytes:0bytes NetworkIO:0bytes DiskIO:16708516bytes + Pipeline: └── Connector (1,false,false) CallNum:2 TimeCost:0.01ms WaitTime:0ns InRows:0 OutRows:0 InSize:0B InBlock:0 to MergeReceiver 0 + └── Group (1,true,false) CallNum:2 TimeCost:0.37ms WaitTime:0ns InRows:19366 OutRows:0 InSize:154.93KB InBlock:0 + └── Projection (0,false,true) CallNum:154 TimeCost:0.06ms WaitTime:0ns InRows:0 OutRows:19366 InSize:0B InBlock:0 OutSize:154.93KB MemSize:26.36KB + └── Filter (0,false,false) CallNum:154 TimeCost:17.42ms WaitTime:0ns InRows:0 OutRows:0 InSize:0B InBlock:0 MemSize:1.25MB + └── TableScan (0,true,false) CallNum:154 TimeCost:54.58ms WaitTime:0ns InRows:1248064 OutRows:0 InSize:19.97MB InBlock:0 MemSize:131.07KB } } }` diff --git a/pkg/vectorindex/metric/gpu.go b/pkg/vectorindex/metric/gpu.go index a061c563ad1cd..d0ad025c1f3f0 100644 --- a/pkg/vectorindex/metric/gpu.go +++ b/pkg/vectorindex/metric/gpu.go @@ -17,16 +17,15 @@ package metric import ( - cuvs "github.com/rapidsai/cuvs/go" + cuvs "github.com/rapidsai/cuvs/go" ) var ( - MetricTypeToCuvsMetric = map[MetricType]cuvs.Distance{ - Metric_L2sqDistance: cuvs.DistanceSQEuclidean, - Metric_L2Distance: cuvs.DistanceSQEuclidean, - Metric_InnerProduct: cuvs.DistanceInnerProduct, - Metric_CosineDistance: cuvs.DistanceCosine, - Metric_L1Distance: cuvs.DistanceL1, - } + MetricTypeToCuvsMetric = map[MetricType]cuvs.Distance{ + Metric_L2sqDistance: cuvs.DistanceSQEuclidean, + Metric_L2Distance: cuvs.DistanceSQEuclidean, + Metric_InnerProduct: cuvs.DistanceInnerProduct, + Metric_CosineDistance: cuvs.DistanceCosine, + Metric_L1Distance: cuvs.DistanceL1, + } ) - diff --git a/pkg/vm/process/operator_analyzer.go b/pkg/vm/process/operator_analyzer.go index c112bd4cec71e..40d6c554dd702 100644 --- a/pkg/vm/process/operator_analyzer.go +++ b/pkg/vm/process/operator_analyzer.go @@ -19,6 +19,7 @@ import ( "strings" "time" + "github.com/matrixorigin/matrixone/pkg/common" "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/pb/plan" "github.com/matrixorigin/matrixone/pkg/perfcounter" @@ -401,31 +402,33 @@ func (ps *OperatorStats) String() string { // Use strings.Builder for efficient string concatenation var sb strings.Builder sb.WriteString(fmt.Sprintf(" CallNum:%d "+ - "TimeCost:%dns "+ - "WaitTime:%dns "+ + "TimeCost:%s "+ + "WaitTime:%s "+ "InRows:%d "+ "OutRows:%d "+ - "InSize:%dbytes "+ - "InBlock:%d "+ - "OutSize:%dbytes "+ - "MemSize:%dbytes "+ - "SpillSize:%dbytes "+ - "ScanBytes:%dbytes "+ - "NetworkIO:%dbytes "+ - "DiskIO:%dbytes ", + "InSize:%s "+ + "InBlock:%d ", ps.CallNum, - ps.TimeConsumed, - ps.WaitTimeConsumed, + common.FormatDuration(ps.TimeConsumed), + common.FormatDuration(ps.WaitTimeConsumed), ps.InputRows, ps.OutputRows, - ps.InputSize, - ps.InputBlocks, - ps.OutputSize, - ps.MemorySize, - ps.SpillSize, - ps.ScanBytes, - ps.NetworkIO, - ps.DiskIO)) + common.FormatBytes(ps.InputSize), + ps.InputBlocks)) + + // Only include non-zero values for OutSize, MemSize, SpillSize, ScanBytes + if ps.OutputSize > 0 { + sb.WriteString(fmt.Sprintf("OutSize:%s ", common.FormatBytes(ps.OutputSize))) + } + if ps.MemorySize > 0 { + sb.WriteString(fmt.Sprintf("MemSize:%s ", common.FormatBytes(ps.MemorySize))) + } + if ps.SpillSize > 0 { + sb.WriteString(fmt.Sprintf("SpillSize:%s ", common.FormatBytes(ps.SpillSize))) + } + if ps.ScanBytes > 0 { + sb.WriteString(fmt.Sprintf("ScanBytes:%s ", common.FormatBytes(ps.ScanBytes))) + } // Collect S3 stats in a slice for efficient concatenation dynamicAttrs := []string{} @@ -454,38 +457,14 @@ func (ps *OperatorStats) String() string { dynamicAttrs = append(dynamicAttrs, fmt.Sprintf("S3DeleteMul:%d ", ps.S3DeleteMul)) } //--------------------------------------------------------------------------------------------- - if ps.ReadSize > 0 { - dynamicAttrs = append(dynamicAttrs, fmt.Sprintf("ReadSize:%dbytes ", ps.ReadSize)) - } - if ps.S3ReadSize > 0 { - dynamicAttrs = append(dynamicAttrs, fmt.Sprintf("S3ReadSize:%dbytes ", ps.S3ReadSize)) - } - if ps.DiskReadSize > 0 { - dynamicAttrs = append(dynamicAttrs, fmt.Sprintf("DiskReadSize:%dbytes ", ps.DiskReadSize)) - } - if ps.CacheRead > 0 { - dynamicAttrs = append(dynamicAttrs, fmt.Sprintf("CacheRead:%d ", ps.CacheRead)) - } - if ps.CacheHit > 0 { - dynamicAttrs = append(dynamicAttrs, fmt.Sprintf("CacheHit:%d ", ps.CacheHit)) - } - if ps.CacheMemoryRead > 0 { - dynamicAttrs = append(dynamicAttrs, fmt.Sprintf("CacheMemoryRead:%d ", ps.CacheMemoryRead)) - } - if ps.CacheMemoryHit > 0 { - dynamicAttrs = append(dynamicAttrs, fmt.Sprintf("CacheMemoryHit:%d ", ps.CacheMemoryHit)) - } - if ps.CacheDiskRead > 0 { - dynamicAttrs = append(dynamicAttrs, fmt.Sprintf("CacheDiskRead:%d ", ps.CacheDiskRead)) - } - if ps.CacheDiskHit > 0 { - dynamicAttrs = append(dynamicAttrs, fmt.Sprintf("CacheDiskHit:%d ", ps.CacheDiskHit)) - } - if ps.CacheRemoteRead > 0 { - dynamicAttrs = append(dynamicAttrs, fmt.Sprintf("CacheRemoteRead:%d ", ps.CacheRemoteRead)) - } - if ps.CacheRemoteHit > 0 { - dynamicAttrs = append(dynamicAttrs, fmt.Sprintf("CacheRemoteHit:%d ", ps.CacheRemoteHit)) + // ReadSize format: ReadSize=total|s3|disk (same as explain analyze) + // Only show ReadSize if at least one value is non-zero + // Use FormatBytes instead of ConvertBytesToHumanReadable to get "B" instead of "bytes" + if ps.ReadSize > 0 || ps.S3ReadSize > 0 || ps.DiskReadSize > 0 { + dynamicAttrs = append(dynamicAttrs, fmt.Sprintf("ReadSize=%s|%s|%s ", + common.FormatBytes(ps.ReadSize), + common.FormatBytes(ps.S3ReadSize), + common.FormatBytes(ps.DiskReadSize))) } // Join and append S3 stats if any @@ -508,7 +487,7 @@ func (ps *OperatorStats) String() string { case OpWaitLockTime: metricName = "WaitLockTime" } - metricsStr += fmt.Sprintf("%s:%dns ", metricName, v) + metricsStr += fmt.Sprintf("%s:%s ", metricName, common.FormatDuration(v)) } } diff --git a/pkg/vm/process/operator_analyzer_test.go b/pkg/vm/process/operator_analyzer_test.go index 0629e69dbd816..9ac84c6798946 100644 --- a/pkg/vm/process/operator_analyzer_test.go +++ b/pkg/vm/process/operator_analyzer_test.go @@ -221,8 +221,7 @@ func TestOperatorStats_String(t *testing.T) { want string }{ { - // CallNum:154 TimeCost:54449492ns WaitTime:0ns InRows:1248064 OutRows:0 InSize:19969024bytes InBlock:153 OutSize:0bytes MemSize:131072bytes SpillSize:131072bytes ScanBytes:19969024bytes NetworkIO:0bytes DiskIO:7888601bytes CacheRead:428 CacheMemoryRead:428 - name: "test01", + name: "test01 - with ReadSize, S3ReadSize, DiskReadSize, no Cache stats", fields: fields{ OperatorName: "testOp", CallNum: 154, @@ -238,9 +237,9 @@ func TestOperatorStats_String(t *testing.T) { DiskIO: 7888601, InputBlocks: 153, ScanBytes: 19969024, - ReadSize: 16000000, - S3ReadSize: 15000000, - DiskReadSize: 1000000, + ReadSize: 16000000, // 15.26 MiB + S3ReadSize: 15000000, // 14.31 MiB + DiskReadSize: 1000000, // 976.56 KiB WrittenRows: 12, DeletedRows: 12, S3List: 2, @@ -249,22 +248,30 @@ func TestOperatorStats_String(t *testing.T) { S3Get: 2, S3Delete: 2, S3DeleteMul: 2, - CacheRead: 428, - CacheHit: 428, - CacheMemoryRead: 428, - CacheMemoryHit: 428, - CacheDiskRead: 428, - CacheDiskHit: 428, - CacheRemoteRead: 428, - CacheRemoteHit: 428, + CacheRead: 428, // Should not appear in output + CacheHit: 428, // Should not appear in output + CacheMemoryRead: 428, // Should not appear in output + CacheMemoryHit: 428, // Should not appear in output + CacheDiskRead: 428, // Should not appear in output + CacheDiskHit: 428, // Should not appear in output + CacheRemoteRead: 428, // Should not appear in output + CacheRemoteHit: 428, // Should not appear in output OperatorMetrics: map[MetricType]int64{ OpScanTime: 452, }, }, - want: " CallNum:154 TimeCost:54449492ns WaitTime:0ns InRows:1248064 OutRows:0 InSize:19969024bytes InBlock:153 OutSize:0bytes MemSize:131072bytes SpillSize:131072bytes ScanBytes:19969024bytes NetworkIO:0bytes DiskIO:7888601bytes WrittenRows:12 DeletedRows:12 S3List:2 S3Head:2 S3Put:2 S3Get:2 S3Delete:2 S3DeleteMul:2 ReadSize:16000000bytes S3ReadSize:15000000bytes DiskReadSize:1000000bytes CacheRead:428 CacheHit:428 CacheMemoryRead:428 CacheMemoryHit:428 CacheDiskRead:428 CacheDiskHit:428 CacheRemoteRead:428 CacheRemoteHit:428 ScanTime:452ns ", + // Format: ReadSize=total|s3|disk (same as explain analyze) + // Cache stats should NOT appear + // OutSize is 0, so it should not appear + // NetworkIO and DiskIO are removed to avoid duplication with ReadSize + // Bytes and time are formatted: 0bytes -> 0B, 1386624bytes -> 1.39MB, 21625539ns -> 21.63ms + // Note: ScanTime 452ns = 0.000452ms, rounded to 0.00ms + // ReadSize uses FormatBytes (decimal units: MB, KB) instead of ConvertBytesToHumanReadable (binary units: MiB, KiB) + // 16000000 bytes = 16.00MB (decimal), 15000000 bytes = 15.00MB (decimal), 1000000 bytes = 1.00MB (decimal) + want: " CallNum:154 TimeCost:54.45ms WaitTime:0ns InRows:1248064 OutRows:0 InSize:19.97MB InBlock:153 MemSize:131.07KB SpillSize:131.07KB ScanBytes:19.97MB WrittenRows:12 DeletedRows:12 S3List:2 S3Head:2 S3Put:2 S3Get:2 S3Delete:2 S3DeleteMul:2 ReadSize=16.00MB|15.00MB|1.00MB ScanTime:0.00ms ", }, { - name: "test02 - ReadSize, S3ReadSize, DiskReadSize are zero, should not appear in output", + name: "test02 - SpillSize and ReadSize are zero, so they should not appear", fields: fields{ OperatorName: "testOp", CallNum: 10, @@ -301,7 +308,238 @@ func TestOperatorStats_String(t *testing.T) { CacheRemoteHit: 0, OperatorMetrics: nil, }, - want: " CallNum:10 TimeCost:1000000ns WaitTime:0ns InRows:100 OutRows:50 InSize:1024bytes InBlock:1 OutSize:512bytes MemSize:1024bytes SpillSize:0bytes ScanBytes:1024bytes NetworkIO:0bytes DiskIO:0bytes ", + // SpillSize is 0, so it should not appear + // ReadSize all values are 0, so it should not appear + // NetworkIO and DiskIO are removed to avoid duplication with ReadSize + // Bytes and time are formatted: 0bytes -> 0B, 1386624bytes -> 1.39MB, 21625539ns -> 21.63ms + want: " CallNum:10 TimeCost:1.00ms WaitTime:0ns InRows:100 OutRows:50 InSize:1.02KB InBlock:1 OutSize:512B MemSize:1.02KB ScanBytes:1.02KB ", + }, + { + name: "test03 - ReadSize with KiB format (real-world example)", + fields: fields{ + OperatorName: "TableScan", + CallNum: 77, + TimeConsumed: 1353592, + WaitTimeConsumed: 0, + MemorySize: 196608, + SpillSize: 0, + InputRows: 622592, + InputSize: 7471104, + OutputRows: 622592, + OutputSize: 7471104, + NetworkIO: 0, + DiskIO: 0, + InputBlocks: 76, + ScanBytes: 7471104, + ReadSize: 343050, // 335.01 KiB + S3ReadSize: 0, + DiskReadSize: 0, + WrittenRows: 0, + DeletedRows: 0, + S3List: 0, + S3Head: 0, + S3Put: 0, + S3Get: 0, + S3Delete: 0, + S3DeleteMul: 0, + CacheRead: 228, // Should not appear + CacheHit: 228, // Should not appear + CacheMemoryRead: 228, // Should not appear + CacheMemoryHit: 228, // Should not appear + CacheDiskRead: 0, + CacheDiskHit: 0, + CacheRemoteRead: 0, + CacheRemoteHit: 0, + OperatorMetrics: nil, + }, + // Real-world example: ReadSize=343.05KB|0B|0B + // SpillSize is 0, so it should not appear + // NetworkIO and DiskIO are removed to avoid duplication with ReadSize + // Bytes and time are formatted: 0bytes -> 0B, 1386624bytes -> 1.39MB, 21625539ns -> 21.63ms + // ReadSize uses FormatBytes (0B) instead of ConvertBytesToHumanReadable (0 bytes) + // 343050 bytes = 343.05KB (decimal), not 335.01 KiB (binary) + want: " CallNum:77 TimeCost:1.35ms WaitTime:0ns InRows:622592 OutRows:622592 InSize:7.47MB InBlock:76 OutSize:7.47MB MemSize:196.61KB ScanBytes:7.47MB ReadSize=343.05KB|0B|0B ", + }, + { + name: "test04 - ReadSize with MiB format", + fields: fields{ + OperatorName: "TableScan", + CallNum: 1, + TimeConsumed: 1000000, + WaitTimeConsumed: 0, + MemorySize: 1024 * 1024, + SpillSize: 0, + InputRows: 1000000, + InputSize: 100 * 1024 * 1024, + OutputRows: 1000000, + OutputSize: 100 * 1024 * 1024, + NetworkIO: 0, + DiskIO: 0, + InputBlocks: 100, + ScanBytes: 100 * 1024 * 1024, + ReadSize: 50 * 1024 * 1024, // 50 MiB + S3ReadSize: 30 * 1024 * 1024, // 30 MiB + DiskReadSize: 20 * 1024 * 1024, // 20 MiB + WrittenRows: 0, + DeletedRows: 0, + S3List: 0, + S3Head: 0, + S3Put: 0, + S3Get: 0, + S3Delete: 0, + S3DeleteMul: 0, + CacheRead: 1000, // Should not appear + CacheHit: 1000, // Should not appear + CacheMemoryRead: 500, // Should not appear + CacheMemoryHit: 500, // Should not appear + CacheDiskRead: 300, // Should not appear + CacheDiskHit: 300, // Should not appear + CacheRemoteRead: 200, // Should not appear + CacheRemoteHit: 200, // Should not appear + OperatorMetrics: nil, + }, + // SpillSize is 0, so it should not appear + // NetworkIO and DiskIO are removed to avoid duplication with ReadSize + // Bytes and time are formatted: 0bytes -> 0B, 1386624bytes -> 1.39MB, 21625539ns -> 21.63ms + // ReadSize uses FormatBytes (decimal units: MB) instead of ConvertBytesToHumanReadable (binary units: MiB) + // 50*1024*1024 bytes = 52.43MB (decimal), 30*1024*1024 = 31.46MB, 20*1024*1024 = 20.97MB + want: " CallNum:1 TimeCost:1.00ms WaitTime:0ns InRows:1000000 OutRows:1000000 InSize:104.86MB InBlock:100 OutSize:104.86MB MemSize:1.05MB ScanBytes:104.86MB ReadSize=52.43MB|31.46MB|20.97MB ", + }, + { + name: "test05 - ReadSize with GiB format", + fields: fields{ + OperatorName: "TableScan", + CallNum: 1, + TimeConsumed: 1000000, + WaitTimeConsumed: 0, + MemorySize: 1024 * 1024 * 1024, + SpillSize: 0, + InputRows: 10000000, + InputSize: 10 * 1024 * 1024 * 1024, + OutputRows: 10000000, + OutputSize: 10 * 1024 * 1024 * 1024, + NetworkIO: 0, + DiskIO: 0, + InputBlocks: 1000, + ScanBytes: 10 * 1024 * 1024 * 1024, + ReadSize: 5 * 1024 * 1024 * 1024, // 5 GiB + S3ReadSize: 3 * 1024 * 1024 * 1024, // 3 GiB + DiskReadSize: 2 * 1024 * 1024 * 1024, // 2 GiB + WrittenRows: 0, + DeletedRows: 0, + S3List: 0, + S3Head: 0, + S3Put: 0, + S3Get: 0, + S3Delete: 0, + S3DeleteMul: 0, + CacheRead: 0, + CacheHit: 0, + CacheMemoryRead: 0, + CacheMemoryHit: 0, + CacheDiskRead: 0, + CacheDiskHit: 0, + CacheRemoteRead: 0, + CacheRemoteHit: 0, + OperatorMetrics: nil, + }, + // SpillSize is 0, so it should not appear + // NetworkIO and DiskIO are removed to avoid duplication with ReadSize + // Bytes and time are formatted: 0bytes -> 0B, 1386624bytes -> 1.39MB, 21625539ns -> 21.63ms + // ReadSize uses FormatBytes (decimal units: GB) instead of ConvertBytesToHumanReadable (binary units: GiB) + // 5*1024*1024*1024 bytes = 5.37GB (decimal), 3*1024*1024*1024 = 3.22GB, 2*1024*1024*1024 = 2.15GB + want: " CallNum:1 TimeCost:1.00ms WaitTime:0ns InRows:10000000 OutRows:10000000 InSize:10.74GB InBlock:1000 OutSize:10.74GB MemSize:1.07GB ScanBytes:10.74GB ReadSize=5.37GB|3.22GB|2.15GB ", + }, + { + name: "test06 - ReadSize with mixed formats (bytes, KiB, MiB)", + fields: fields{ + OperatorName: "TableScan", + CallNum: 1, + TimeConsumed: 1000000, + WaitTimeConsumed: 0, + MemorySize: 1024, + SpillSize: 0, + InputRows: 1000, + InputSize: 1024 * 1024, + OutputRows: 1000, + OutputSize: 1024 * 1024, + NetworkIO: 0, + DiskIO: 0, + InputBlocks: 1, + ScanBytes: 1024 * 1024, + ReadSize: 512, // 512 bytes + S3ReadSize: 10 * 1024, // 10 KiB + DiskReadSize: 1024 * 1024, // 1 MiB + WrittenRows: 0, + DeletedRows: 0, + S3List: 0, + S3Head: 0, + S3Put: 0, + S3Get: 0, + S3Delete: 0, + S3DeleteMul: 0, + CacheRead: 0, + CacheHit: 0, + CacheMemoryRead: 0, + CacheMemoryHit: 0, + CacheDiskRead: 0, + CacheDiskHit: 0, + CacheRemoteRead: 0, + CacheRemoteHit: 0, + OperatorMetrics: nil, + }, + // SpillSize is 0, so it should not appear + // NetworkIO and DiskIO are removed to avoid duplication with ReadSize + // Bytes and time are formatted: 0bytes -> 0B, 1386624bytes -> 1.39MB, 21625539ns -> 21.63ms + // ReadSize uses FormatBytes (decimal units: B, KB, MB) instead of ConvertBytesToHumanReadable (binary units: bytes, KiB, MiB) + // 512 bytes = 512B, 10*1024 = 10.24KB, 1024*1024 = 1.05MB + want: " CallNum:1 TimeCost:1.00ms WaitTime:0ns InRows:1000 OutRows:1000 InSize:1.05MB InBlock:1 OutSize:1.05MB MemSize:1.02KB ScanBytes:1.05MB ReadSize=512B|10.24KB|1.05MB ", + }, + { + name: "test07 - Cache stats should NOT appear even with non-zero values", + fields: fields{ + OperatorName: "TableScan", + CallNum: 1, + TimeConsumed: 1000000, + WaitTimeConsumed: 0, + MemorySize: 1024, + SpillSize: 0, + InputRows: 100, + InputSize: 1024, + OutputRows: 100, + OutputSize: 1024, + NetworkIO: 0, + DiskIO: 0, + InputBlocks: 1, + ScanBytes: 1024, + ReadSize: 1024, + S3ReadSize: 512, + DiskReadSize: 256, + WrittenRows: 0, + DeletedRows: 0, + S3List: 0, + S3Head: 0, + S3Put: 0, + S3Get: 0, + S3Delete: 0, + S3DeleteMul: 0, + CacheRead: 1000, // Non-zero, but should NOT appear + CacheHit: 800, // Non-zero, but should NOT appear + CacheMemoryRead: 500, // Non-zero, but should NOT appear + CacheMemoryHit: 400, // Non-zero, but should NOT appear + CacheDiskRead: 300, // Non-zero, but should NOT appear + CacheDiskHit: 200, // Non-zero, but should NOT appear + CacheRemoteRead: 100, // Non-zero, but should NOT appear + CacheRemoteHit: 50, // Non-zero, but should NOT appear + OperatorMetrics: nil, + }, + // Verify that no Cache* fields appear in output + // SpillSize is 0, so it should not appear + // NetworkIO and DiskIO are removed to avoid duplication with ReadSize + // Bytes and time are formatted: 0bytes -> 0B, 1386624bytes -> 1.39MB, 21625539ns -> 21.63ms + // ReadSize uses FormatBytes (decimal units: B, KB) instead of ConvertBytesToHumanReadable (binary units: bytes, KiB) + // 1024 bytes = 1.02KB (decimal), 512 bytes = 512B, 256 bytes = 256B + want: " CallNum:1 TimeCost:1.00ms WaitTime:0ns InRows:100 OutRows:100 InSize:1.02KB InBlock:1 OutSize:1.02KB MemSize:1.02KB ScanBytes:1.02KB ReadSize=1.02KB|512B|256B ", }, } for _, tt := range tests { diff --git a/test/distributed/cases/analyze/explain_phyplan.result b/test/distributed/cases/analyze/explain_phyplan.result index 38897bff1f92e..74144a3374af2 100644 --- a/test/distributed/cases/analyze/explain_phyplan.result +++ b/test/distributed/cases/analyze/explain_phyplan.result @@ -63,7 +63,7 @@ Scope 1 (Magic: Remote, mcpu: 1, Receiver: []) 𝄀 └── tablescan explain phyplan verbose select a.ename,b.dname from emp a left join dept b on a.deptno = b.deptno; -- @regex("(?s)MemoryUsage.*CPU", true) --- @regex("(?s)left\(idx:(\d+).*hash build\(idx:\1", true) +-- @regex("(?s)left\\((\\d+),true,true\\).*hash build\\(\\1,true,false\\)", true) tp query phyplan 𝄀 Overview: 𝄀 MemoryUsage:34674B, SpillSize:0B, DiskI/O:0B, NewWorkI/O:0B, AffectedRows: 0 𝄀 @@ -75,15 +75,15 @@ Overview: 𝄀 Physical Plan Deployment: 𝄀 Scope 1 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: []) PrepareTimeConsumed: 57246ns 𝄀 DataSource: db1.emp[ename deptno] 𝄀 - Pipeline: └── output(idx:-1, isFirst:false, isLast:false) 𝄀 - └── projection(idx:3, isFirst:true, isLast:true) 𝄀 - └── left(idx:2, isFirst:true, isLast:true) 𝄀 - └── tablescan(idx:0, isFirst:true, isLast:true) 𝄀 + Pipeline: └── output(-1,false,false) 𝄀 + └── projection(3,true,true) 𝄀 + └── left(2,true,true) 𝄀 + └── tablescan(0,true,true) 𝄀 PreScopes: { 𝄀 Scope 1 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: []) PrepareTimeConsumed: 5572ns 𝄀 DataSource: db1.dept[deptno dname] 𝄀 - Pipeline: └── hash build(idx:2, isFirst:true, isLast:false) 𝄀 - └── tablescan(idx:1, isFirst:true, isLast:true) 𝄀 + Pipeline: └── hash build(2,true,false) 𝄀 + └── tablescan(1,true,true) 𝄀 } drop table emp; drop table dept; diff --git a/test/distributed/cases/analyze/explain_phyplan.sql b/test/distributed/cases/analyze/explain_phyplan.sql index 8f8656f0a41ed..3e152573bcb88 100644 --- a/test/distributed/cases/analyze/explain_phyplan.sql +++ b/test/distributed/cases/analyze/explain_phyplan.sql @@ -51,9 +51,7 @@ explain phyplan select * from emp; explain phyplan select empno, ename, job from emp where sal > 2000; -- check verbose headers, (?s) make . match any character, including newlines --- @regex("(?s)MemoryUsage.*CPU",true) -- check left join index keeps consistent in probe and build phase --- @regex("(?s)left\(idx:(\d+).*hash build\(idx:\1",true) explain phyplan verbose select a.ename,b.dname from emp a left join dept b on a.deptno = b.deptno; drop table emp;