diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalog.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalog.scala index df39f2fff51..0464e12ad7b 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalog.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalog.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.delta.skipping.clustering.ClusteredTableUtils import org.apache.spark.sql.delta.skipping.clustering.temp.{ClusterBy, ClusterBySpec} import org.apache.spark.sql.delta.skipping.clustering.temp.{ClusterByTransform => TempClusterByTransform} import org.apache.spark.sql.delta.{ColumnWithDefaultExprUtils, DeltaConfigs, DeltaErrors, DeltaTableUtils} -import org.apache.spark.sql.delta.{DeltaLog, DeltaOptions, IdentityColumn} +import org.apache.spark.sql.delta.{DeltaOptions, IdentityColumn} import org.apache.spark.sql.delta.DeltaTableIdentifier.gluePermissionError import org.apache.spark.sql.delta.commands._ import org.apache.spark.sql.delta.constraints.{AddConstraint, DropConstraint} @@ -39,6 +39,7 @@ import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.redirect.RedirectFeature import org.apache.spark.sql.delta.schema.SchemaUtils +import org.apache.spark.sql.delta.serverSidePlanning.ServerSidePlannedTable import org.apache.spark.sql.delta.sources.{DeltaDataSource, DeltaSourceUtils, DeltaSQLConf} import org.apache.spark.sql.delta.stats.StatisticsCollection import org.apache.spark.sql.delta.tablefeatures.DropFeature @@ -47,7 +48,7 @@ import org.apache.spark.sql.delta.util.PartitionUtils import org.apache.hadoop.fs.Path import org.apache.spark.SparkException -import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.MDC import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, UnresolvedAttribute, UnresolvedFieldName, UnresolvedFieldPosition} @@ -236,7 +237,13 @@ class AbstractDeltaCatalog extends DelegatingCatalogExtension override def loadTable(ident: Identifier): Table = recordFrameProfile( "DeltaCatalog", "loadTable") { try { - super.loadTable(ident) match { + val table = super.loadTable(ident) + + ServerSidePlannedTable.tryCreate(spark, ident, table, isUnityCatalog).foreach { sspt => + return sspt + } + + table match { case v1: V1Table if DeltaTableUtils.isDeltaTable(v1.catalogTable) => loadCatalogTable(ident, v1.catalogTable) case o => o diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlannedTable.scala b/spark/src/main/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlannedTable.scala index c67aaab58b4..6c2e6cc4303 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlannedTable.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlannedTable.scala @@ -24,7 +24,9 @@ import scala.collection.JavaConverters._ import org.apache.spark.paths.SparkPath import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.delta.serverSidePlanning.ServerSidePlanningClient +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.delta.metering.DeltaLogging +import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, TableCapability} import org.apache.spark.sql.connector.read._ import org.apache.spark.sql.execution.datasources.{FileFormat, PartitionedFile} @@ -32,6 +34,123 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap +/** + * Companion object for ServerSidePlannedTable with factory methods. + */ +object ServerSidePlannedTable extends DeltaLogging { + /** + * Property keys that indicate table credentials are available. + * Unity Catalog tables may expose temporary credentials via these properties. + */ + private val CREDENTIAL_PROPERTY_KEYS = Seq( + "storage.credential", + "aws.temporary.credentials", + "azure.temporary.credentials", + "gcs.temporary.credentials", + "credential" + ) + + /** + * Determine if server-side planning should be used based on catalog type, + * credential availability, and configuration. + * + * Decision logic: + * - Use server-side planning if forceServerSidePlanning is true (config override) + * - Use server-side planning if Unity Catalog table lacks credentials + * - Otherwise use normal table loading path + * + * @param isUnityCatalog Whether this is a Unity Catalog instance + * @param hasCredentials Whether the table has credentials available + * @param forceServerSidePlanning Whether to force server-side planning (config flag) + * @return true if server-side planning should be used + */ + private[serverSidePlanning] def shouldUseServerSidePlanning( + isUnityCatalog: Boolean, + hasCredentials: Boolean, + forceServerSidePlanning: Boolean): Boolean = { + (isUnityCatalog && !hasCredentials) || forceServerSidePlanning + } + + /** + * Try to create a ServerSidePlannedTable if server-side planning is needed. + * Returns None if not needed or if the planning client factory is not available. + * + * This method encapsulates all the logic to decide whether to use server-side planning: + * - Checks if Unity Catalog table lacks credentials + * - Checks if server-side planning is forced via config (for testing) + * - Extracts catalog name and table identifiers + * - Attempts to create the planning client + * + * Test coverage: ServerSidePlanningSuite tests verify the decision logic through + * shouldUseServerSidePlanning() method with different input combinations. + * + * @param spark The SparkSession + * @param ident The table identifier + * @param table The loaded table from the delegate catalog + * @param isUnityCatalog Whether this is a Unity Catalog instance + * @return Some(ServerSidePlannedTable) if server-side planning should be used, None otherwise + */ + def tryCreate( + spark: SparkSession, + ident: Identifier, + table: Table, + isUnityCatalog: Boolean): Option[ServerSidePlannedTable] = { + // Check if we should force server-side planning (for testing) + val forceServerSidePlanning = + spark.conf.get(DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key, "false").toBoolean + val hasTableCredentials = hasCredentials(table) + + // Check if we should use server-side planning + if (shouldUseServerSidePlanning(isUnityCatalog, hasTableCredentials, forceServerSidePlanning)) { + val namespace = ident.namespace().mkString(".") + val tableName = ident.name() + + // Extract catalog name from identifier namespace, or default to spark_catalog + // + // Spark Identifier structure: + // - For "catalog.database.table": namespace() = ["catalog", "database"], name() = "table" + // - For "database.table": namespace() = ["database"], name() = "table" + // - For "table": namespace() = [], name() = "table" + // + // Note: We check namespace().length > 1 (not >= 1) because a single-element namespace + // represents just the database name without an explicit catalog, so we use the default. + // See Spark's LookupCatalog, CatalogAndIdentifier and ResolveSessionCatalog. + val catalogName = if (ident.namespace().length > 1) { + ident.namespace().head + } else { + "spark_catalog" + } + + // Try to create ServerSidePlannedTable with server-side planning + try { + val client = ServerSidePlanningClientFactory.getClient(spark, catalogName) + Some(new ServerSidePlannedTable(spark, namespace, tableName, table.schema(), client)) + } catch { + case _: IllegalStateException => + // Factory not registered - fall through to normal path + logWarning(s"Server-side planning not available for catalog $catalogName. " + + "Falling back to normal table loading.") + None + } + } else { + None + } + } + + /** + * Check if a table has credentials available. + * Unity Catalog tables may lack credentials when accessed without proper permissions. + * UC injects credentials as table properties, see: + * https://github.com/unitycatalog/unitycatalog/blob/main/connectors/spark/src/main/scala/ + * io/unitycatalog/spark/UCSingleCatalog.scala#L260 + */ + private def hasCredentials(table: Table): Boolean = { + // Check table properties for credential information + val properties = table.properties() + CREDENTIAL_PROPERTY_KEYS.exists(key => properties.containsKey(key)) + } +} + /** * A Spark Table implementation that uses server-side scan planning * to get the list of files to read. Used as a fallback when Unity Catalog @@ -45,7 +164,8 @@ class ServerSidePlannedTable( databaseName: String, tableName: String, tableSchema: StructType, - planningClient: ServerSidePlanningClient) extends Table with SupportsRead { + planningClient: ServerSidePlanningClient) + extends Table with SupportsRead with DeltaLogging { // Returns fully qualified name (e.g., "catalog.database.table"). // The databaseName parameter receives ident.namespace().mkString(".") from DeltaCatalog, diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index c52cf55423f..81e9322b18a 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -2905,6 +2905,15 @@ trait DeltaSQLConfBase extends DeltaSQLConfUtils { |When enabled, it's decided by a per-command flag.""".stripMargin) .booleanConf .createWithDefault(false) + + val ENABLE_SERVER_SIDE_PLANNING = + buildConf("catalog.enableServerSidePlanning") + .internal() + .doc( + """When enabled, DeltaCatalog will use server-side scan planning path + |instead of normal table loading.""".stripMargin) + .booleanConf + .createWithDefault(false) } object DeltaSQLConf extends DeltaSQLConfBase diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlannedTableSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlannedTableSuite.scala index a31ed56c909..d064fb89486 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlannedTableSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlannedTableSuite.scala @@ -16,89 +16,164 @@ package org.apache.spark.sql.delta.serverSidePlanning -import org.apache.spark.sql.QueryTest -import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.{AnalysisException, QueryTest, Row} +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.test.DeltaSQLCommandTest -class ServerSidePlannedTableSuite extends QueryTest with SharedSparkSession { +/** + * Tests for server-side planning with a mock client. + */ +class ServerSidePlannedTableSuite extends QueryTest with DeltaSQLCommandTest { + + override def beforeAll(): Unit = { + super.beforeAll() + // Create test database and shared table once for all tests + sql("CREATE DATABASE IF NOT EXISTS test_db") + sql(""" + CREATE TABLE test_db.shared_test ( + id INT, + name STRING, + value INT + ) USING parquet + """) + sql(""" + INSERT INTO test_db.shared_test (id, name, value) VALUES + (1, 'alpha', 10), + (2, 'beta', 20), + (3, 'gamma', 30) + """) + } + + /** + * Helper method to run tests with server-side planning enabled. + * Automatically sets up the test factory and config, then cleans up afterwards. + * This prevents test pollution from leaked configuration. + */ + private def withServerSidePlanningEnabled(f: => Unit): Unit = { + val originalConfig = spark.conf.getOption(DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key) + ServerSidePlanningClientFactory.setFactory(new TestServerSidePlanningClientFactory()) + spark.conf.set(DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key, "true") + try { + f + } finally { + // Reset factory + ServerSidePlanningClientFactory.clearFactory() + // Restore original config + originalConfig match { + case Some(value) => spark.conf.set(DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key, value) + case None => spark.conf.unset(DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key) + } + } + } + + test("full query through DeltaCatalog with server-side planning") { + // This test verifies server-side planning works end-to-end by checking: + // (1) DeltaCatalog returns ServerSidePlannedTable (not normal table) + // (2) Query execution returns correct results + // If both are true, the server-side planning client worked correctly - that's the only way + // ServerSidePlannedTable can read data. + + withServerSidePlanningEnabled { + // (1) Verify that DeltaCatalog actually returns ServerSidePlannedTable + val catalog = spark.sessionState.catalogManager.catalog("spark_catalog") + .asInstanceOf[org.apache.spark.sql.connector.catalog.TableCatalog] + val loadedTable = catalog.loadTable( + org.apache.spark.sql.connector.catalog.Identifier.of( + Array("test_db"), "shared_test")) + assert(loadedTable.isInstanceOf[ServerSidePlannedTable], + s"Expected ServerSidePlannedTable but got ${loadedTable.getClass.getName}") + + // (2) Execute query - should go through full server-side planning stack + checkAnswer( + sql("SELECT id, name, value FROM test_db.shared_test ORDER BY id"), + Seq( + Row(1, "alpha", 10), + Row(2, "beta", 20), + Row(3, "gamma", 30) + ) + ) + } + } - test("end-to-end: ServerSidePlannedTable with test client") { - withTable("test_table") { - // Create a Parquet table with data + test("verify normal path unchanged when feature disabled") { + // Explicitly disable server-side planning + spark.conf.set(DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key, "false") + + // Verify that DeltaCatalog returns normal table, not ServerSidePlannedTable + val catalog = spark.sessionState.catalogManager.catalog("spark_catalog") + .asInstanceOf[org.apache.spark.sql.connector.catalog.TableCatalog] + val loadedTable = catalog.loadTable( + org.apache.spark.sql.connector.catalog.Identifier.of( + Array("test_db"), "shared_test")) + assert(!loadedTable.isInstanceOf[ServerSidePlannedTable], + s"Expected normal table but got ServerSidePlannedTable when config is disabled") + } + + test("shouldUseServerSidePlanning() decision logic") { + // Case 1: Force flag enabled -> should always use server-side planning + assert(ServerSidePlannedTable.shouldUseServerSidePlanning( + isUnityCatalog = false, + hasCredentials = true, + forceServerSidePlanning = true), + "Should use server-side planning when force flag is true") + + // Case 2: Unity Catalog without credentials -> should use server-side planning + assert(ServerSidePlannedTable.shouldUseServerSidePlanning( + isUnityCatalog = true, + hasCredentials = false, + forceServerSidePlanning = false), + "Should use server-side planning for UC table without credentials") + + // Case 3: Unity Catalog with credentials -> should NOT use server-side planning + assert(!ServerSidePlannedTable.shouldUseServerSidePlanning( + isUnityCatalog = true, + hasCredentials = true, + forceServerSidePlanning = false), + "Should NOT use server-side planning for UC table with credentials") + + // Case 4: Non-UC catalog -> should NOT use server-side planning + assert(!ServerSidePlannedTable.shouldUseServerSidePlanning( + isUnityCatalog = false, + hasCredentials = true, + forceServerSidePlanning = false), + "Should NOT use server-side planning for non-UC catalog") + + assert(!ServerSidePlannedTable.shouldUseServerSidePlanning( + isUnityCatalog = false, + hasCredentials = false, + forceServerSidePlanning = false), + "Should NOT use server-side planning for non-UC catalog (even without credentials)") + } + + test("ServerSidePlannedTable is read-only") { + withTable("readonly_test") { sql(""" - CREATE TABLE test_table ( + CREATE TABLE readonly_test ( id INT, - name STRING, - category STRING + data STRING ) USING parquet """) - sql(""" - INSERT INTO test_table (id, name, category) VALUES - (1, 'Alice', 'A'), - (2, 'Bob', 'B'), - (3, 'Charlie', 'A'), - (4, 'David', 'B') - """) - - // Configure factory to use test client - val testFactory = new TestServerSidePlanningClientFactory() - ServerSidePlanningClientFactory.setFactory(testFactory) - - try { - // Create client and verify it's the test client - val client = ServerSidePlanningClientFactory.getClient(spark, "spark_catalog") - assert(client.isInstanceOf[TestServerSidePlanningClient], - "Client should be TestServerSidePlanningClient") - - // Get scan plan and verify file discovery - val scanPlan = client.planScan("default", "test_table") - assert(scanPlan.files.nonEmpty, "Should discover data files") - assert(scanPlan.files.forall(_.fileFormat == "parquet"), - "Parquet tables should have parquet file format") - assert(scanPlan.files.forall(_.fileSizeInBytes > 0), - "All files should have positive size") - - // Get the table schema from the actual table - val tableSchema = spark.table("test_table").schema - - // Create ServerSidePlannedTable using schema from the table - val table = new ServerSidePlannedTable( - spark = spark, - databaseName = "default", - tableName = "test_table", - tableSchema = tableSchema, - planningClient = client - ) - - // Verify table metadata - assert(table.name() == "default.test_table", - "Table name should be fully qualified") - assert(table.schema() == tableSchema, - "Table schema should match") - - // Verify scan produces correct number of partitions - val scan = table.newScanBuilder( - new org.apache.spark.sql.util.CaseInsensitiveStringMap( - java.util.Collections.emptyMap() - ) - ).build() - - val partitions = scan.toBatch.planInputPartitions() - assert(partitions.length == scanPlan.files.length, - s"Should have ${scanPlan.files.length} partitions, one per file") - - // Verify reader factory can be created - val readerFactory = scan.toBatch.createReaderFactory() - assert(readerFactory != null, "Reader factory should be created") - - // Verify we can create a reader for the first partition - val reader = readerFactory.createReader(partitions(0)) - assert(reader != null, "Reader should be created for partition") - - } finally { - // Clean up factory - ServerSidePlanningClientFactory.clearFactory() + // First insert WITHOUT server-side planning should succeed + sql("INSERT INTO readonly_test VALUES (1, 'initial')") + checkAnswer( + sql("SELECT * FROM readonly_test"), + Seq(Row(1, "initial")) + ) + + // Try to insert WITH server-side planning enabled - should fail + withServerSidePlanningEnabled { + val exception = intercept[AnalysisException] { + sql("INSERT INTO readonly_test VALUES (2, 'should_fail')") + } + assert(exception.getMessage.contains("does not support append")) } + + // Verify data unchanged - second insert didn't happen + checkAnswer( + sql("SELECT * FROM readonly_test"), + Seq(Row(1, "initial")) + ) } } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/serverSidePlanning/TestServerSidePlanningClient.scala b/spark/src/test/scala/org/apache/spark/sql/delta/serverSidePlanning/TestServerSidePlanningClient.scala index 328e3f2c045..2aa0b9e5454 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/serverSidePlanning/TestServerSidePlanningClient.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/serverSidePlanning/TestServerSidePlanningClient.scala @@ -20,6 +20,7 @@ import java.util.Locale import org.apache.hadoop.fs.Path import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.functions.input_file_name /** @@ -32,34 +33,49 @@ class TestServerSidePlanningClient(spark: SparkSession) extends ServerSidePlanni override def planScan(databaseName: String, table: String): ScanPlan = { val fullTableName = s"$databaseName.$table" - // Use input_file_name() to get the list of files - // Query: SELECT DISTINCT input_file_name() FROM table - val filesDF = spark.table(fullTableName) - .select(input_file_name().as("file_path")) - .distinct() + // Temporarily disable server-side planning to avoid infinite recursion + // when this test client internally loads the table + val originalConfigValue = spark.conf.getOption(DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key) + spark.conf.set(DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key, "false") - // Collect file paths - val filePaths = filesDF.collect().map(_.getString(0)) + try { + // Use input_file_name() to get the list of files + // Query: SELECT DISTINCT input_file_name() FROM table + val filesDF = spark.table(fullTableName) + .select(input_file_name().as("file_path")) + .distinct() - // Get file metadata (size, format) from filesystem - // scalastyle:off deltahadoopconfiguration - val hadoopConf = spark.sessionState.newHadoopConf() - // scalastyle:on deltahadoopconfiguration - val files = filePaths.map { filePath => - // input_file_name() returns URL-encoded paths, decode them - val decodedPath = java.net.URLDecoder.decode(filePath, "UTF-8") - val path = new Path(decodedPath) - val fs = path.getFileSystem(hadoopConf) - val fileStatus = fs.getFileStatus(path) + // Collect file paths + val filePaths = filesDF.collect().map(_.getString(0)) - ScanFile( - filePath = decodedPath, - fileSizeInBytes = fileStatus.getLen, - fileFormat = getFileFormat(path) - ) - }.toSeq + // Get file metadata (size, format) from filesystem + // scalastyle:off deltahadoopconfiguration + // The rule prevents accessing Hadoop conf on executors where it could use wrong credentials + // for multi-catalog scenarios. Safe here: test-only code simulating server filesystem access. + val hadoopConf = spark.sessionState.newHadoopConf() + // scalastyle:on deltahadoopconfiguration + val files = filePaths.map { filePath => + // input_file_name() returns URL-encoded paths, decode them + val decodedPath = java.net.URLDecoder.decode(filePath, "UTF-8") + val path = new Path(decodedPath) + val fs = path.getFileSystem(hadoopConf) + val fileStatus = fs.getFileStatus(path) - ScanPlan(files = files) + ScanFile( + filePath = decodedPath, + fileSizeInBytes = fileStatus.getLen, + fileFormat = getFileFormat(path) + ) + }.toSeq + + ScanPlan(files = files) + } finally { + // Restore original config value + originalConfigValue match { + case Some(value) => spark.conf.set(DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key, value) + case None => spark.conf.unset(DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key) + } + } } private def getFileFormat(path: Path): String = "parquet"