From 3cbd70553be72b6d0c9145cc9392be7ad735d9e2 Mon Sep 17 00:00:00 2001 From: Toby Cole Date: Fri, 30 Jan 2026 15:42:22 +0000 Subject: [PATCH 1/2] Initial implementation of S3 atomic move for snapshots --- .../java/org/apache/paimon/fs/FileIO.java | 29 +++++++++++++++ .../catalog/RenamingSnapshotCommit.java | 14 +++++++- .../java/org/apache/paimon/s3/S3FileIO.java | 36 +++++++++++++++++++ paimon-filesystems/pom.xml | 2 +- 4 files changed, 79 insertions(+), 2 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java index 5e5fe9fbfef1..2e238f287f37 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java @@ -342,6 +342,35 @@ default boolean tryToWriteAtomic(Path path, String content) throws IOException { return success; } + /** + * Write content atomically, failing if target file already exists. Uses native conditional + * writes on supported object stores. Falls back to {@link #tryToWriteAtomic} on filesystems + * without native conditional write support. + * + * @param path the target file path + * @param content the content to write + * @return true if write succeeded, false if file already exists + * @throws IOException on I/O errors (not including "file exists" condition) + */ + default boolean tryToWriteAtomicIfAbsent(Path path, String content) throws IOException { + // Default implementation uses tryToWriteAtomic which is safe for HDFS + // but requires external locking for object stores without native conditional writes + return tryToWriteAtomic(path, content); + } + + /** + * Returns whether this FileIO supports native conditional writes (write-if-absent semantics). + * + *

When true, {@link #tryToWriteAtomicIfAbsent} uses native conditional write operations that + * atomically fail if the target file exists. This eliminates the need for external locking on + * object stores. + * + * @return true if native conditional writes are supported + */ + default boolean supportsConditionalWrite() { + return false; + } + default void writeFile(Path path, String content, boolean overwrite) throws IOException { try (PositionOutputStream out = newOutputStream(path, overwrite)) { OutputStreamWriter writer = new OutputStreamWriter(out, StandardCharsets.UTF_8); diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/RenamingSnapshotCommit.java b/paimon-core/src/main/java/org/apache/paimon/catalog/RenamingSnapshotCommit.java index 5cc9d2a0199a..801d4cbf0af6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/RenamingSnapshotCommit.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/RenamingSnapshotCommit.java @@ -32,7 +32,8 @@ * A {@link SnapshotCommit} using file renaming to commit. * *

Note that when the file system is local or HDFS, rename is atomic. But if the file system is - * object storage, we need additional lock protection. + * object storage, we need additional lock protection unless the storage supports native conditional + * writes. */ public class RenamingSnapshotCommit implements SnapshotCommit { @@ -54,6 +55,17 @@ public boolean commit(Snapshot snapshot, String branch, List callable = () -> { boolean committed = fileIO.tryToWriteAtomic(newSnapshotPath, snapshot.toJson()); diff --git a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java index 827251837342..8d50e48d1a51 100644 --- a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java +++ b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3FileIO.java @@ -25,6 +25,8 @@ import org.apache.paimon.options.Options; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.slf4j.Logger; @@ -33,6 +35,7 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.net.URI; +import java.nio.charset.StandardCharsets; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; @@ -85,6 +88,39 @@ public TwoPhaseOutputStream newTwoPhaseOutputStream(Path path, boolean overwrite new S3MultiPartUpload(fs, fs.getConf()), hadoopPath, path); } + @Override + public boolean supportsConditionalWrite() { + return true; + } + + /** + * Write content atomically using S3 conditional writes via Hadoop 3.4+ native API. + * + * @param path the target file path + * @param content the content to write + * @return true if write succeeded, false if file already exists + * @throws IOException on I/O errors + */ + @Override + public boolean tryToWriteAtomicIfAbsent(Path path, String content) throws IOException { + org.apache.hadoop.fs.Path hadoopPath = path(path); + S3AFileSystem fs = (S3AFileSystem) getFileSystem(hadoopPath); + + byte[] contentBytes = content.getBytes(StandardCharsets.UTF_8); + + try (FSDataOutputStream out = + fs.createFile(hadoopPath) + .create() + .overwrite(false) // Fails if file exists (uses If-None-Match: * on S3) + .build()) { + out.write(contentBytes); + return true; + } catch (FileAlreadyExistsException e) { + LOG.debug("Conditional write failed, file already exists: {}", path); + return false; + } + } + // add additional config entries from the IO config to the Hadoop config private Options loadHadoopConfigFromContext(CatalogContext context) { Options hadoopConfig = new Options(); diff --git a/paimon-filesystems/pom.xml b/paimon-filesystems/pom.xml index 1550b9d46aa6..d8e20940985a 100644 --- a/paimon-filesystems/pom.xml +++ b/paimon-filesystems/pom.xml @@ -51,7 +51,7 @@ - 3.3.4 + 3.4.2 3.4.2 1.12.319 1.9.4 From 59a895934f7758ee63b4575f03b8d2aa916e1f3a Mon Sep 17 00:00:00 2001 From: Toby Cole Date: Fri, 30 Jan 2026 18:02:45 +0000 Subject: [PATCH 2/2] Add tests for S3 conditional write support - Add FileIOTest.testConditionalWriteDefaults for tryToWriteAtomicIfAbsent - Add RenamingSnapshotCommitTest for conditional vs lock-based commit paths Co-Authored-By: Claude --- .../java/org/apache/paimon/fs/FileIOTest.java | 11 ++ .../catalog/RenamingSnapshotCommitTest.java | 128 ++++++++++++++++++ 2 files changed, 139 insertions(+) create mode 100644 paimon-core/src/test/java/org/apache/paimon/catalog/RenamingSnapshotCommitTest.java diff --git a/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java b/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java index 367bce383719..6ac60eb01dfa 100644 --- a/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java @@ -47,6 +47,17 @@ /** Test static methods and methods with default implementations of {@link FileIO}. */ public class FileIOTest { + @Test + public void testConditionalWriteDefaults(@TempDir java.nio.file.Path tempDir) throws Exception { + FileIO fileIO = new DummyFileIO(); + Path file = new Path(tempDir.resolve("test.txt").toUri()); + + assertThat(fileIO.supportsConditionalWrite()).isFalse(); + assertThat(fileIO.tryToWriteAtomicIfAbsent(file, "first")).isTrue(); + assertThat(fileIO.tryToWriteAtomicIfAbsent(file, "second")).isFalse(); + assertThat(fileIO.readFileUtf8(file)).isEqualTo("first"); + } + @TempDir java.nio.file.Path tempDir; @Test diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/RenamingSnapshotCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/catalog/RenamingSnapshotCommitTest.java new file mode 100644 index 000000000000..9cadaf2ba996 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/RenamingSnapshotCommitTest.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.catalog; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.operation.Lock; +import org.apache.paimon.utils.SnapshotManager; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.util.Collections; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.paimon.utils.SnapshotManagerTest.createSnapshotWithMillis; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link RenamingSnapshotCommit}. */ +public class RenamingSnapshotCommitTest { + + @TempDir java.nio.file.Path tempDir; + + private Path tablePath; + + @BeforeEach + void setUp() throws IOException { + tablePath = new Path(tempDir.toString()); + LocalFileIO.create().mkdirs(new Path(tablePath, "snapshot")); + } + + @Test + public void testConditionalWritePathSkipsLock() throws Exception { + ConditionalWriteFileIO fileIO = new ConditionalWriteFileIO(); + RenamingSnapshotCommit commit = newCommit(fileIO, Lock.empty()); + + assertThat(commit.commit(newSnapshot(1), "main", Collections.emptyList())).isTrue(); + assertThat(fileIO.conditionalWriteCalls.get()).isEqualTo(1); + } + + @Test + public void testConditionalWriteFailsOnConflict() throws Exception { + ConditionalWriteFileIO fileIO = new ConditionalWriteFileIO(); + RenamingSnapshotCommit commit = newCommit(fileIO, Lock.empty()); + + assertThat(commit.commit(newSnapshot(1), "main", Collections.emptyList())).isTrue(); + assertThat(commit.commit(newSnapshot(1), "main", Collections.emptyList())).isFalse(); + } + + @Test + public void testFallbackPathUsesLock() throws Exception { + AtomicInteger lockCalls = new AtomicInteger(); + RenamingSnapshotCommit commit = newCommit(LocalFileIO.create(), trackingLock(lockCalls)); + + assertThat(commit.commit(newSnapshot(1), "main", Collections.emptyList())).isTrue(); + assertThat(lockCalls.get()).isEqualTo(1); + } + + @Test + public void testFallbackPathFailsOnConflict() throws Exception { + AtomicInteger lockCalls = new AtomicInteger(); + RenamingSnapshotCommit commit = newCommit(LocalFileIO.create(), trackingLock(lockCalls)); + + assertThat(commit.commit(newSnapshot(1), "main", Collections.emptyList())).isTrue(); + assertThat(commit.commit(newSnapshot(1), "main", Collections.emptyList())).isFalse(); + assertThat(lockCalls.get()).isEqualTo(2); + } + + private RenamingSnapshotCommit newCommit(LocalFileIO fileIO, Lock lock) { + return new RenamingSnapshotCommit( + new SnapshotManager(fileIO, tablePath, "main", null, null), lock); + } + + private static Snapshot newSnapshot(long id) { + return createSnapshotWithMillis(id, System.currentTimeMillis()); + } + + private static Lock trackingLock(AtomicInteger counter) { + return new Lock() { + @Override + public T runWithLock(java.util.concurrent.Callable callable) throws Exception { + counter.incrementAndGet(); + return callable.call(); + } + + @Override + public void close() {} + }; + } + + private static class ConditionalWriteFileIO extends LocalFileIO { + final AtomicInteger conditionalWriteCalls = new AtomicInteger(); + + @Override + public boolean supportsConditionalWrite() { + return true; + } + + @Override + public boolean tryToWriteAtomicIfAbsent(Path path, String content) throws IOException { + conditionalWriteCalls.incrementAndGet(); + if (exists(path)) { + return false; + } + writeFile(path, content, false); + return true; + } + } +}