-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Description
Search before asking
- I searched in the issues and found nothing similar.
Paimon version
1.3.1
Compute Engine
Hadoop: 2.7.5
Flink: 1.19.1
Minimal reproduce step
- Create a Paimon catalog without Kerberos configuration:
CREATE CATALOG paimon_catalog WITH ( 'type' = 'paimon', 'metastore' = 'hive', 'uri' = 'thrift://xxx:9083', 'warehouse' = 'hdfs://nameservice1/user/hive/warehouse/paimon.db' -- No security.kerberos.login.keytab or security.kerberos.login.principal ); - Write data using Flink (with checkpoint enabled):
INSERT INTO paimon_catalog.database.table SELECT * FROM source_table; - Wait for checkpoint to trigger (or manually trigger checkpoint)
- Observe errors:
TaskManager logs show InterruptedIOException during checkpoint
Flink Web UI shows "Unable to close file because the last block does not have enough number of replicas"
Checkpoint fails
Code to reproduce:
Options options = new Options(); // No security configuration provided SecurityConfiguration config = new SecurityConfiguration(options); boolean isLegal = config.isLegal(); // Returns true (incorrect)
What doesn't meet your expectations?
isLegal()should returnfalsewhen no security configuration is provided- Security wrappers should only be created when Kerberos is actually configured
- Non-Kerberos environments should use the native
FileSystemdirectly withoutdoAs()overhead - Checkpoints should succeed without
InterruptedIOExceptionerrors - HDFS files should close properly without block replica issues
at org.apache.paimon.fs.hadoop.HadoopSecuredFileSystem.lambda$delete$5(HadoopSecuredFileSystem.java:109) ~[zrc_2635_1.0.59.jar:?] at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_201] at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_201] at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1937) ~[flink-shaded-hadoop-2-uber-3.3.5-infra-v2-10.0.jar:3.3.5-infra-v2-10.0] at org.apache.paimon.fs.hadoop.HadoopSecuredFileSystem.runSecuredWithIOException(HadoopSecuredFileSystem.java:155) ~[zrc_2635_1.0.59.jar:?] at org.apache.paimon.fs.hadoop.HadoopSecuredFileSystem.delete(HadoopSecuredFileSystem.java:109) ~[zrc_2635_1.0.59.jar:?] at org.apache.paimon.fs.hadoop.HadoopFileIO.delete(HadoopFileIO.java:209) ~[zrc_2635_1.0.59.jar:?] at org.apache.paimon.fs.FileIO.deleteQuietly(FileIO.java:246) ~[zrc_2635_1.0.59.jar:?] at org.apache.paimon.io.SingleFileWriter.abort(SingleFileWriter.java:161) ~[zrc_2635_1.0.59.jar:?] at org.apache.paimon.io.RollingFileWriter.abort(RollingFileWriter.java:147) ~[zrc_2635_1.0.59.jar:?] at org.apache.paimon.io.RollingFileWriter.close(RollingFileWriter.java:172) ~[zrc_2635_1.0.59.jar:?] at org.apache.paimon.mergetree.MergeTreeWriter.flushWriteBuffer(MergeTreeWriter.java:234) ~[zrc_2635_1.0.59.jar:?] at org.apache.paimon.mergetree.MergeTreeWriter.prepareCommit(MergeTreeWriter.java:253) ~[zrc_2635_1.0.59.jar:?] at org.apache.paimon.operation.AbstractFileStoreWrite.prepareCommit(AbstractFileStoreWrite.java:215) ~[zrc_2635_1.0.59.jar:?] at org.apache.paimon.operation.MemoryFileStoreWrite.prepareCommit(MemoryFileStoreWrite.java:152) ~[zrc_2635_1.0.59.jar:?] at org.apache.paimon.table.sink.TableWriteImpl.prepareCommit(TableWriteImpl.java:262) ~[zrc_2635_1.0.59.jar:?] at org.apache.paimon.flink.sink.StoreSinkWriteImpl.prepareCommit(StoreSinkWriteImpl.java:150) ~[zrc_2635_1.0.59.jar:?] at org.apache.paimon.flink.sink.TableWriteOperator.prepareCommit(TableWriteOperator.java:151) ~[zrc_2635_1.0.59.jar:?] at org.apache.paimon.flink.sink.RowDataStoreWriteOperator.prepareCommit(RowDataStoreWriteOperator.java:205) ~[zrc_2635_1.0.59.jar:?] at org.apache.paimon.flink.sink.PrepareCommitOperator.emitCommittables(PrepareCommitOperator.java:115) ~[zrc_2635_1.0.59.jar:?] at org.apache.paimon.flink.sink.PrepareCommitOperator.prepareSnapshotPreBarrier(PrepareCommitOperator.java:95) ~[zrc_2635_1.0.59.jar:?] at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89) ~[flink-dist-1.19.1.jar:1.19.1]
Anything else?
My Understanding:
- The
isLegal()method should indicate whether a valid security configuration exists - When no configuration is provided, it should return
falseto avoid unnecessary security wrappers - The
ugi.doAs()wrapper inHadoopSecuredFileSystemcauses thread interruption handling issues in Hadoop 2.7.5 - During Flink checkpoints, thread interruption combined with
doAs()leads to:- HDFS I/O operations timing out
- Files not being closed properly
- HDFS reporting insufficient block replicas
- Checkpoint failures
My Design:
- Change the return value from
truetofalsewhen no keytab/principal is configured - Add a comment explaining why we return
false - This ensures backward compatibility (Kerberos environments still work correctly)
- This is a minimal, safe change that fixes the root cause
POC Code:
I have tested this fix locally and confirmed:
- Non-Kerberos environments no longer create unnecessary wrappers
- Kerberos environments continue to work correctly
- Checkpoint operations succeed in Hadoop 2.7.5
- No more
InterruptedIOExceptionerrors - HDFS files close properly without block replica issues
Proposed Solution:
Change line 96 in SecurityConfiguration.java from return true; to return false;:
`
public boolean isLegal() {
if (StringUtils.isNullOrWhitespaceOnly(keytab)
!= StringUtils.isNullOrWhitespaceOnly(principal)) {
return false;
}
if (!StringUtils.isNullOrWhitespaceOnly(keytab)) {
File keytabFile = new File(keytab);
return keytabFile.exists() && keytabFile.isFile() && keytabFile.canRead();
}
// Return false when no security configuration is provided
// This prevents unnecessary security wrapper creation
return false; // ✅ Fixed
}
Workaround: As a temporary workaround, I rolled back to Paimon 1.0 behavior by modifyingHadoopFileIO.createFileSystem():
protected FileSystem createFileSystem(org.apache.hadoop.fs.Path path) throws IOException {
return path.getFileSystem(hadoopConf.get());
// Removed: HadoopSecuredFileSystem.trySecureFileSystem() call
}
This workaround resolves the issue but loses the security features. The proper fix is to correctisLegal()` logic.
Understanding: SecurityConfiguration.isLegal() logic flaw causes unnecessary security wrapper creation, leading to checkpoint failures in Hadoop 2.7.5.
Solution: Change line 96 from return true; to return false; when no keytab/principal configured.
POC: Tested locally - checkpoints succeed, no more errors.
Are you willing to submit a PR?
- I'm willing to submit a PR!