Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;
import org.apache.paimon.utils.SegmentsCache;

import org.slf4j.Logger;
Expand Down Expand Up @@ -508,21 +509,28 @@ public InnerTableScan dropStats() {
* <p>Partitions that exist in the main branch (based on partition predicates only) are
* treated as complete and are read from the main branch with the full predicate. Partitions
* that exist only in the fallback branch are read from the fallback branch.
*
* <p>When main and fallback branches have different partition schemas (e.g. main has (dt,
* a) and fallback has (dt)), main partition rows are projected to the fallback key layout
* before deduplication so that fallback-only partitions are correctly identified.
*/
@Override
public TableScan.Plan plan() {
List<Split> splits = new ArrayList<>();
Set<BinaryRow> completePartitions =
new HashSet<>(
newPartitionListingScan(true, partitionPredicate).listPartitions());
List<BinaryRow> mainPartitions =
newPartitionListingScan(true, partitionPredicate).listPartitions();
List<BinaryRow> fallbackPartitions =
newPartitionListingScan(false, null).listPartitions();
Set<BinaryRow> coveredFallbackPartitions =
findCoveredFallbackPartitions(mainPartitions, fallbackPartitions);
for (Split split : mainScan.plan().splits()) {
DataSplit dataSplit = (DataSplit) split;
splits.add(toFallbackSplit(dataSplit, false));
}

List<BinaryRow> remainingPartitions =
newPartitionListingScan(false, partitionPredicate).listPartitions().stream()
.filter(p -> !completePartitions.contains(p))
fallbackPartitions.stream()
.filter(p -> !coveredFallbackPartitions.contains(p))
.collect(Collectors.toList());
if (!remainingPartitions.isEmpty()) {
fallbackScan.withPartitionFilter(remainingPartitions);
Expand All @@ -536,7 +544,7 @@ public TableScan.Plan plan() {
@Override
public List<PartitionEntry> listPartitionEntries() {
DataTableScan mainListingScan = newPartitionListingScan(true, partitionPredicate);
DataTableScan fallbackListingScan = newPartitionListingScan(false, partitionPredicate);
DataTableScan fallbackListingScan = newPartitionListingScan(false, null);
List<PartitionEntry> partitionEntries =
new ArrayList<>(mainListingScan.listPartitionEntries());
Set<BinaryRow> partitions =
Expand All @@ -559,6 +567,73 @@ protected PartitionPredicate getPartitionPredicate() {
return partitionPredicate;
}

/**
* Determines which fallback partitions are already covered (owned) by the main branch.
*
* <p>When main and fallback have the same partition schema, a fallback partition is covered
* if its BinaryRow appears in the main partition set (fast path).
*
* <p>When schemas differ (e.g. main has (dt, a) and fallback has (dt)), a fallback
* partition {dt=X} is covered if main has ANY partition whose dt value equals X. Comparison
* is done by projecting main partition rows to the fallback key columns via
* RowDataToObjectArrayConverter.
*/
private Set<BinaryRow> findCoveredFallbackPartitions(
List<BinaryRow> mainPartitions, List<BinaryRow> fallbackPartitions) {
List<String> mainKeys = wrappedTable.schema().partitionKeys();
List<String> fallbackKeys = fallbackTable.schema().partitionKeys();
if (mainKeys.equals(fallbackKeys) || fallbackKeys.isEmpty()) {
// Same schema (or unpartitioned fallback): direct BinaryRow set lookup
return new HashSet<>(mainPartitions);
}
// Build index: fallbackKeys[i] -> position in mainKeys
int[] fallbackToMain = new int[fallbackKeys.size()];
for (int i = 0; i < fallbackKeys.size(); i++) {
int mainIdx = mainKeys.indexOf(fallbackKeys.get(i));
if (mainIdx < 0) {
// Fallback has a key not present in main — cannot project; no coverage
return new HashSet<>();
}
fallbackToMain[i] = mainIdx;
}
RowType mainPartitionType = wrappedTable.schema().logicalPartitionType();
RowType fallbackPartitionType = fallbackTable.schema().logicalPartitionType();
RowDataToObjectArrayConverter mainConverter =
new RowDataToObjectArrayConverter(mainPartitionType);
// Build a set of projected-key strings from main partitions
Set<String> mainProjectedKeys = new HashSet<>();
for (BinaryRow mainRow : mainPartitions) {
Object[] objs = mainConverter.convert(mainRow);
mainProjectedKeys.add(buildKeyString(objs, fallbackToMain));
}
// Collect fallback partition rows whose projected key appears in main
RowDataToObjectArrayConverter fallbackConverter =
new RowDataToObjectArrayConverter(fallbackPartitionType);
int[] identity = new int[fallbackKeys.size()];
for (int i = 0; i < identity.length; i++) {
identity[i] = i;
}
Set<BinaryRow> covered = new HashSet<>();
for (BinaryRow fallbackRow : fallbackPartitions) {
Object[] objs = fallbackConverter.convert(fallbackRow);
if (mainProjectedKeys.contains(buildKeyString(objs, identity))) {
covered.add(fallbackRow);
}
}
return covered;
}

private static String buildKeyString(Object[] objects, int[] indices) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < indices.length; i++) {
if (i > 0) {
sb.append('\u0000');
}
sb.append(objects[indices[i]]);
}
return sb.toString();
}

private DataTableScan newPartitionListingScan(
boolean isMain, PartitionPredicate scanPartitionPredicate) {
DataTableScan scan = isMain ? wrappedTable.newScan() : fallbackTable.newScan();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.table;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileIOFinder;
Expand All @@ -45,6 +46,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
Expand Down Expand Up @@ -202,6 +204,132 @@ public void testPlanWithDataFilter() throws Exception {
.isTrue();
}

/**
* Reproduces issue #7503: when main branch has more partition keys (e.g. dt, t1) than the
* fallback branch (e.g. dt only), a query predicate containing extra keys (t1) must not be
* pushed to the fallback scan, and the partition-ownership comparison must use the fallback key
* layout so that fallback-only partitions are correctly identified and read.
*
* <p>Setup:
*
* <ul>
* <li>Main table: PARTITIONED BY (dt, a) — simulates t1 with multi-key partition
* <li>Fallback table: PARTITIONED BY (dt) — simulates branch_snapshot with fewer keys
* <li>Main (delta): dt=20250811, a=aaa → row (20250811, aaa, x_new)
* <li>Fallback (snapshot): dt=20250810 → rows (20250810, aaa, x), (20250810, bbb, y)
* </ul>
*
* <p>Query: WHERE dt=20250811. Expected: fallback dt=20250810 data is NOT returned (not in
* predicate), delta dt=20250811 data IS returned. The extra key "a" in the predicate must be
* stripped before pushing to fallback to avoid incorrect filtering.
*
* <p>Query: WHERE dt=20250810. Expected: no delta data, fallback dt=20250810 data IS returned.
* Previously the bug caused fallback to never be read (because BinaryRow {dt,a} never equals
* BinaryRow {dt}, so all fallback partitions appeared "remaining" but the predicate stripped
* them; or they were incorrectly filtered by the extra key).
*/
@Test
public void testFallbackWithSubsetPartitionKeys() throws Exception {
// Row type shared by both tables: columns dt, a, val
RowType rowType =
RowType.of(
new DataType[] {DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()},
new String[] {"dt", "a", "val"});

// Main table: partitioned by (dt, a)
Path mainPath = new Path(TraceableFileIO.SCHEME + "://" + tempDir.toString() + "/main");
FileIO mainFileIO = FileIOFinder.find(mainPath);
TableSchema mainSchema =
SchemaUtils.forceCommit(
new SchemaManager(LocalFileIO.create(), mainPath),
new Schema(
rowType.getFields(),
Arrays.asList("dt", "a"),
Collections.emptyList(),
Collections.emptyMap(),
""));
AppendOnlyFileStoreTable mainTable =
new AppendOnlyFileStoreTable(mainFileIO, mainPath, mainSchema);

// Fallback table: partitioned by (dt) only
Path fallbackPath =
new Path(TraceableFileIO.SCHEME + "://" + tempDir.toString() + "/fallback");
FileIO fallbackFileIO = FileIOFinder.find(fallbackPath);
TableSchema fallbackSchema =
SchemaUtils.forceCommit(
new SchemaManager(LocalFileIO.create(), fallbackPath),
new Schema(
rowType.getFields(),
Collections.singletonList("dt"),
Collections.emptyList(),
Collections.emptyMap(),
""));
AppendOnlyFileStoreTable fallbackTable =
new AppendOnlyFileStoreTable(fallbackFileIO, fallbackPath, fallbackSchema);

// Delta (main): dt=20250811, a=aaa
writeDataIntoTable(
mainTable,
0,
GenericRow.of(
org.apache.paimon.data.BinaryString.fromString("20250811"),
org.apache.paimon.data.BinaryString.fromString("aaa"),
org.apache.paimon.data.BinaryString.fromString("x_new")));

// Snapshot (fallback): dt=20250810, a=aaa and a=bbb
writeDataIntoTable(
fallbackTable,
0,
GenericRow.of(
org.apache.paimon.data.BinaryString.fromString("20250810"),
org.apache.paimon.data.BinaryString.fromString("aaa"),
org.apache.paimon.data.BinaryString.fromString("x")),
GenericRow.of(
org.apache.paimon.data.BinaryString.fromString("20250810"),
org.apache.paimon.data.BinaryString.fromString("bbb"),
org.apache.paimon.data.BinaryString.fromString("y")));

FallbackReadFileStoreTable combined =
new FallbackReadFileStoreTable(mainTable, fallbackTable);
PredicateBuilder builder = new PredicateBuilder(rowType);

// Case 1: WHERE dt=20250811 — main owns dt=20250811; must have at least one non-fallback
// split.
DataTableScan scan1 = combined.newScan();
scan1.withFilter(
builder.equal(0, org.apache.paimon.data.BinaryString.fromString("20250811")));
List<Split> splits1 = scan1.plan().splits();
assertThat(splits1).isNotEmpty();
boolean hasMainSplit1 = false;
for (Split s : splits1) {
if (!((FallbackReadFileStoreTable.FallbackSplit) s).isFallback()) {
hasMainSplit1 = true;
}
}
assertThat(hasMainSplit1)
.as("dt=20250811 owned by main; must have at least one non-fallback split")
.isTrue();

// Case 2: WHERE dt=20250810 — only fallback data; main has no dt=20250810
DataTableScan scan2 = combined.newScan();
scan2.withFilter(
builder.equal(0, org.apache.paimon.data.BinaryString.fromString("20250810")));
List<Split> splits2 = scan2.plan().splits();
// dt=20250810 not in main; must be read from fallback
assertThat(splits2).isNotEmpty();
boolean hasFallback = false;
for (Split s : splits2) {
if (((FallbackReadFileStoreTable.FallbackSplit) s).isFallback()) {
hasFallback = true;
}
}
assertThat(hasFallback)
.as(
"dt=20250810 exists only in fallback; extra key 'a' must not prevent "
+ "fallback from being found (Bug #7503)")
.isTrue();
}

private void writeDataIntoTable(
FileStoreTable table, long commitIdentifier, InternalRow... allData) throws Exception {
StreamTableWrite write = table.newWrite(commitUser);
Expand Down
Loading