Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,35 @@ default boolean tryToWriteAtomic(Path path, String content) throws IOException {
return success;
}

/**
* Write content atomically, failing if target file already exists. Uses native conditional
* writes on supported object stores. Falls back to {@link #tryToWriteAtomic} on filesystems
* without native conditional write support.
*
* @param path the target file path
* @param content the content to write
* @return true if write succeeded, false if file already exists
* @throws IOException on I/O errors (not including "file exists" condition)
*/
default boolean tryToWriteAtomicIfAbsent(Path path, String content) throws IOException {
// Default implementation uses tryToWriteAtomic which is safe for HDFS
// but requires external locking for object stores without native conditional writes
return tryToWriteAtomic(path, content);
}

/**
* Returns whether this FileIO supports native conditional writes (write-if-absent semantics).
*
* <p>When true, {@link #tryToWriteAtomicIfAbsent} uses native conditional write operations that
* atomically fail if the target file exists. This eliminates the need for external locking on
* object stores.
*
* @return true if native conditional writes are supported
*/
default boolean supportsConditionalWrite() {
return false;
}

default void writeFile(Path path, String content, boolean overwrite) throws IOException {
try (PositionOutputStream out = newOutputStream(path, overwrite)) {
OutputStreamWriter writer = new OutputStreamWriter(out, StandardCharsets.UTF_8);
Expand Down
11 changes: 11 additions & 0 deletions paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,17 @@
/** Test static methods and methods with default implementations of {@link FileIO}. */
public class FileIOTest {

@Test
public void testConditionalWriteDefaults(@TempDir java.nio.file.Path tempDir) throws Exception {
FileIO fileIO = new DummyFileIO();
Path file = new Path(tempDir.resolve("test.txt").toUri());

assertThat(fileIO.supportsConditionalWrite()).isFalse();
assertThat(fileIO.tryToWriteAtomicIfAbsent(file, "first")).isTrue();
assertThat(fileIO.tryToWriteAtomicIfAbsent(file, "second")).isFalse();
assertThat(fileIO.readFileUtf8(file)).isEqualTo("first");
}

@TempDir java.nio.file.Path tempDir;

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
* A {@link SnapshotCommit} using file renaming to commit.
*
* <p>Note that when the file system is local or HDFS, rename is atomic. But if the file system is
* object storage, we need additional lock protection.
* object storage, we need additional lock protection unless the storage supports native conditional
* writes.
*/
public class RenamingSnapshotCommit implements SnapshotCommit {

Expand All @@ -54,6 +55,17 @@ public boolean commit(Snapshot snapshot, String branch, List<PartitionStatistics
? snapshotManager.snapshotPath(snapshot.id())
: snapshotManager.copyWithBranch(branch).snapshotPath(snapshot.id());

// Use native conditional writes if supported
// This eliminates the need for external locking on object stores like S3
if (fileIO.supportsConditionalWrite()) {
boolean committed = fileIO.tryToWriteAtomicIfAbsent(newSnapshotPath, snapshot.toJson());
if (committed) {
snapshotManager.commitLatestHint(snapshot.id());
}
return committed;
}

// Fall back to lock-based approach for filesystems without conditional write support
Callable<Boolean> callable =
() -> {
boolean committed = fileIO.tryToWriteAtomic(newSnapshotPath, snapshot.toJson());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.catalog;

import org.apache.paimon.Snapshot;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.utils.SnapshotManager;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.paimon.utils.SnapshotManagerTest.createSnapshotWithMillis;
import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link RenamingSnapshotCommit}. */
public class RenamingSnapshotCommitTest {

@TempDir java.nio.file.Path tempDir;

private Path tablePath;

@BeforeEach
void setUp() throws IOException {
tablePath = new Path(tempDir.toString());
LocalFileIO.create().mkdirs(new Path(tablePath, "snapshot"));
}

@Test
public void testConditionalWritePathSkipsLock() throws Exception {
ConditionalWriteFileIO fileIO = new ConditionalWriteFileIO();
RenamingSnapshotCommit commit = newCommit(fileIO, Lock.empty());

assertThat(commit.commit(newSnapshot(1), "main", Collections.emptyList())).isTrue();
assertThat(fileIO.conditionalWriteCalls.get()).isEqualTo(1);
}

@Test
public void testConditionalWriteFailsOnConflict() throws Exception {
ConditionalWriteFileIO fileIO = new ConditionalWriteFileIO();
RenamingSnapshotCommit commit = newCommit(fileIO, Lock.empty());

assertThat(commit.commit(newSnapshot(1), "main", Collections.emptyList())).isTrue();
assertThat(commit.commit(newSnapshot(1), "main", Collections.emptyList())).isFalse();
}

@Test
public void testFallbackPathUsesLock() throws Exception {
AtomicInteger lockCalls = new AtomicInteger();
RenamingSnapshotCommit commit = newCommit(LocalFileIO.create(), trackingLock(lockCalls));

assertThat(commit.commit(newSnapshot(1), "main", Collections.emptyList())).isTrue();
assertThat(lockCalls.get()).isEqualTo(1);
}

@Test
public void testFallbackPathFailsOnConflict() throws Exception {
AtomicInteger lockCalls = new AtomicInteger();
RenamingSnapshotCommit commit = newCommit(LocalFileIO.create(), trackingLock(lockCalls));

assertThat(commit.commit(newSnapshot(1), "main", Collections.emptyList())).isTrue();
assertThat(commit.commit(newSnapshot(1), "main", Collections.emptyList())).isFalse();
assertThat(lockCalls.get()).isEqualTo(2);
}

private RenamingSnapshotCommit newCommit(LocalFileIO fileIO, Lock lock) {
return new RenamingSnapshotCommit(
new SnapshotManager(fileIO, tablePath, "main", null, null), lock);
}

private static Snapshot newSnapshot(long id) {
return createSnapshotWithMillis(id, System.currentTimeMillis());
}

private static Lock trackingLock(AtomicInteger counter) {
return new Lock() {
@Override
public <T> T runWithLock(java.util.concurrent.Callable<T> callable) throws Exception {
counter.incrementAndGet();
return callable.call();
}

@Override
public void close() {}
};
}

private static class ConditionalWriteFileIO extends LocalFileIO {
final AtomicInteger conditionalWriteCalls = new AtomicInteger();

@Override
public boolean supportsConditionalWrite() {
return true;
}

@Override
public boolean tryToWriteAtomicIfAbsent(Path path, String content) throws IOException {
conditionalWriteCalls.incrementAndGet();
if (exists(path)) {
return false;
}
writeFile(path, content, false);
return true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.paimon.options.Options;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.slf4j.Logger;
Expand All @@ -33,6 +35,7 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -85,6 +88,39 @@ public TwoPhaseOutputStream newTwoPhaseOutputStream(Path path, boolean overwrite
new S3MultiPartUpload(fs, fs.getConf()), hadoopPath, path);
}

@Override
public boolean supportsConditionalWrite() {
return true;
}

/**
* Write content atomically using S3 conditional writes via Hadoop 3.4+ native API.
*
* @param path the target file path
* @param content the content to write
* @return true if write succeeded, false if file already exists
* @throws IOException on I/O errors
*/
@Override
public boolean tryToWriteAtomicIfAbsent(Path path, String content) throws IOException {
org.apache.hadoop.fs.Path hadoopPath = path(path);
S3AFileSystem fs = (S3AFileSystem) getFileSystem(hadoopPath);

byte[] contentBytes = content.getBytes(StandardCharsets.UTF_8);

try (FSDataOutputStream out =
fs.createFile(hadoopPath)
.create()
.overwrite(false) // Fails if file exists (uses If-None-Match: * on S3)
.build()) {
out.write(contentBytes);
return true;
} catch (FileAlreadyExistsException e) {
LOG.debug("Conditional write failed, file already exists: {}", path);
return false;
}
}

// add additional config entries from the IO config to the Hadoop config
private Options loadHadoopConfigFromContext(CatalogContext context) {
Options hadoopConfig = new Options();
Expand Down
2 changes: 1 addition & 1 deletion paimon-filesystems/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
</modules>

<properties>
<fs.hadoopshaded.version>3.3.4</fs.hadoopshaded.version>
<fs.hadoopshaded.version>3.4.2</fs.hadoopshaded.version>
<fs.hadoopshaded-3.4.version>3.4.2</fs.hadoopshaded-3.4.version>
<fs.s3.aws.version>1.12.319</fs.s3.aws.version>
<commons.beanutils.version>1.9.4</commons.beanutils.version>
Expand Down
Loading