Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use paimon;

create database if not exists test_paimon_ro_read_db;

use test_paimon_ro_read_db;

CREATE TABLE if not exists paimon_pk_for_ro (
id INT,
name STRING,
age INT
) USING paimon
TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '1');

INSERT INTO paimon_pk_for_ro (id, name, age) VALUES (1, 'Alice', 30),(2, 'Bob', 25),(3, 'Charlie', 28);

CALL sys.compact(table => 'test_paimon_ro_read_db.paimon_pk_for_ro');

INSERT INTO paimon_pk_for_ro (id, name, age) VALUES (1, 'AliceNew', 33);
Original file line number Diff line number Diff line change
Expand Up @@ -1404,7 +1404,7 @@ identifierSeq
;

optScanParams
: ATSIGN funcName=identifier LEFT_PAREN (mapParams=propertyItemList | listParams=identifierSeq)? RIGHT_PAREN
: ATSIGN funcName=identifier (LEFT_PAREN (mapParams=propertyItemList | listParams=identifierSeq)? RIGHT_PAREN)?
;

relationPrimary
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,33 @@ public class TableScanParams {
public static final String INCREMENTAL_READ = "incr";
public static final String BRANCH = "branch";
public static final String TAG = "tag";
// for paimon read-optimized system table
public static final String RO = "ro";
private static final ImmutableSet<String> VALID_PARAM_TYPES = ImmutableSet.of(
INCREMENTAL_READ,
BRANCH,
TAG);
TAG,
RO);

private final String paramType;
// There are two ways to pass parameters to a function.
// There are three ways to pass parameters to a function.
// - One is in map form, where the data is stored in `mapParams`.
// such as: @func_name('param1'='value1', 'param2'='value2', 'param3'='value3')
// - The other is in list form, where the data is stored in `listParams`.
// - Another one is in list form, where the data is stored in `listParams`.
// such as: `listParams` is used for @func_name('value1', 'value2', 'value3')
// - The third one is in short form, now only used for paimon 'ro' table.
// such as: @ro
private final Map<String, String> mapParams;
private final List<String> listParams;

private void validate() {
if (!VALID_PARAM_TYPES.contains(paramType)) {
throw new IllegalArgumentException("Invalid param type: " + paramType);
}
if (isRo() && (!mapParams.isEmpty() || !listParams.isEmpty())) {
throw new IllegalArgumentException(
"The '@ro' parameter for Paimon read-optimized tables must be used without arguments");
}
// TODO: validate mapParams and listParams for different param types
}

Expand Down Expand Up @@ -80,4 +89,8 @@ public boolean isBranch() {
public boolean isTag() {
return TAG.equals(paramType);
}

public boolean isRo() {
return RO.equals(paramType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,8 @@ private PaimonSnapshotCacheValue getPaimonSnapshotCacheValue(Optional<TableSnaps
try {
String branch = PaimonUtil.resolvePaimonBranch(scanParams.get(), paimonTable);
Table table = ((PaimonExternalCatalog) catalog).getPaimonTable(getOrBuildNameMapping(), branch, null);
Optional<Snapshot> latestSnapshot = table.latestSnapshot();
long latestSnapshotId = PaimonSnapshot.INVALID_SNAPSHOT_ID;
if (latestSnapshot.isPresent()) {
latestSnapshotId = latestSnapshot.get().id();
}
long latestSnapshotId = table.latestSnapshot().map(Snapshot::id)
.orElse(PaimonSnapshot.INVALID_SNAPSHOT_ID);
// Branches in Paimon can have independent schemas and snapshots.
// TODO: Add time travel support for paimon branch tables.
DataTable dataTable = (DataTable) table;
Expand All @@ -141,6 +138,21 @@ private PaimonSnapshotCacheValue getPaimonSnapshotCacheValue(Optional<TableSnaps
"Failed to get Paimon branch: " + (e.getMessage() == null ? "unknown cause" : e.getMessage()),
e);
}
} else if (scanParams.isPresent() && scanParams.get().isRo()) {
try {
Table table = ((PaimonExternalCatalog) catalog).getPaimonTable(getOrBuildNameMapping(), null, "ro");
long latestSnapshotId = table.latestSnapshot().map(Snapshot::id)
.orElse(PaimonSnapshot.INVALID_SNAPSHOT_ID);
DataTable dataTable = (DataTable) table;
Long schemaId = dataTable.schemaManager().latest().map(TableSchema::id).orElse(0L);
return new PaimonSnapshotCacheValue(PaimonPartitionInfo.EMPTY,
new PaimonSnapshot(latestSnapshotId, schemaId, dataTable));
} catch (Exception e) {
LOG.warn("Failed to get Paimon ro for table {}", paimonTable.name(), e);
throw new RuntimeException(
"Failed to get Paimon ro: " + (e.getMessage() == null ? "unknown cause" : e.getMessage()),
e);
}
} else {
// Otherwise, use the latest snapshot and the latest schema.
return Env.getCurrentEnv().getExtMetaCacheMgr().getPaimonMetadataCache()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
sb.append(prefix).append(prefix).append(predicate).append("\n");
}
}
sb.append(prefix).append("PaimonTable: ").append(source.getPaimonTable().name()).append("\n");

if (detailLevel == TExplainLevel.VERBOSE) {
sb.append(prefix).append("PaimonSplitStats: \n");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !normal --
1 AliceNew 33
2 Bob 25
3 Charlie 28

-- !ro --
1 Alice 30
2 Bob 25
3 Charlie 28

Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_paimon_ro_read", "p0,external,doris,external_docker,external_docker_doris") {
logger.info("start paimon test")
String enabled = context.config.otherConfigs.get("enablePaimonTest")
if (enabled == null || !enabled.equalsIgnoreCase("true")) {
logger.info("disable paimon test.")
return
}
String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
String catalog_name = "test_paimon_ro_read_catalog"
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
try {
sql """drop catalog if exists ${catalog_name}"""

sql """
CREATE CATALOG ${catalog_name} PROPERTIES (
'type' = 'paimon',
'warehouse' = 's3://warehouse/wh',
's3.endpoint' = 'http://${externalEnvIp}:${minio_port}',
's3.access_key' = 'admin',
's3.secret_key' = 'password',
's3.path.style.access' = 'true'
);
"""
sql """switch `${catalog_name}`"""
sql """use test_paimon_ro_read_db"""

// normal read can get latest data
order_qt_normal """select * from paimon_pk_for_ro"""

// ro read can only get data in full compaction view
order_qt_ro """select * from paimon_pk_for_ro@ro"""

test {
sql """select * from paimon_incr@ro('startSnapshotId'='0')"""
exception "Paimon read-optimized tables must be used without arguments"
}

test {
sql """select * from paimon_incr@ro('name'='branch1')"""
exception "Paimon read-optimized tables must be used without arguments"
}

test {
sql """select * from paimon_incr@ro(tag1)"""
exception "Paimon read-optimized tables must be used without arguments"
}
} finally {
sql """drop catalog if exists ${catalog_name}"""
}
}