Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@ import org.apache.spark.sql.delta.skipping.clustering.ClusteredTableUtils
import org.apache.spark.sql.delta.skipping.clustering.temp.{ClusterBy, ClusterBySpec}
import org.apache.spark.sql.delta.skipping.clustering.temp.{ClusterByTransform => TempClusterByTransform}
import org.apache.spark.sql.delta.{ColumnWithDefaultExprUtils, DeltaConfigs, DeltaErrors, DeltaTableUtils}
import org.apache.spark.sql.delta.{DeltaLog, DeltaOptions, IdentityColumn}
import org.apache.spark.sql.delta.{DeltaOptions, IdentityColumn}
import org.apache.spark.sql.delta.DeltaTableIdentifier.gluePermissionError
import org.apache.spark.sql.delta.commands._
import org.apache.spark.sql.delta.constraints.{AddConstraint, DropConstraint}
import org.apache.spark.sql.delta.logging.DeltaLogKeys
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.redirect.RedirectFeature
import org.apache.spark.sql.delta.schema.SchemaUtils
import org.apache.spark.sql.delta.serverSidePlanning.ServerSidePlannedTable
import org.apache.spark.sql.delta.sources.{DeltaDataSource, DeltaSourceUtils, DeltaSQLConf}
import org.apache.spark.sql.delta.stats.StatisticsCollection
import org.apache.spark.sql.delta.tablefeatures.DropFeature
Expand All @@ -47,7 +48,7 @@ import org.apache.spark.sql.delta.util.PartitionUtils
import org.apache.hadoop.fs.Path

import org.apache.spark.SparkException
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.MDC
import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, UnresolvedAttribute, UnresolvedFieldName, UnresolvedFieldPosition}
Expand Down Expand Up @@ -236,7 +237,13 @@ class AbstractDeltaCatalog extends DelegatingCatalogExtension
override def loadTable(ident: Identifier): Table = recordFrameProfile(
"DeltaCatalog", "loadTable") {
try {
super.loadTable(ident) match {
val table = super.loadTable(ident)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DeltaCatalog.scala was refactored to AbstractDeltaCatalog.scala (commit 156e41f) so the loadTable changes are in this file.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gotcha


ServerSidePlannedTable.tryCreate(spark, ident, table, isUnityCatalog).foreach { sspt =>
return sspt
}

table match {
case v1: V1Table if DeltaTableUtils.isDeltaTable(v1.catalogTable) =>
loadCatalogTable(ident, v1.catalogTable)
case o => o
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,133 @@ import scala.collection.JavaConverters._
import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.delta.serverSidePlanning.ServerSidePlanningClient
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, TableCapability}
import org.apache.spark.sql.connector.read._
import org.apache.spark.sql.execution.datasources.{FileFormat, PartitionedFile}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

/**
* Companion object for ServerSidePlannedTable with factory methods.
*/
object ServerSidePlannedTable extends DeltaLogging {
/**
* Property keys that indicate table credentials are available.
* Unity Catalog tables may expose temporary credentials via these properties.
*/
private val CREDENTIAL_PROPERTY_KEYS = Seq(
"storage.credential",
"aws.temporary.credentials",
"azure.temporary.credentials",
"gcs.temporary.credentials",
"credential"
)

/**
* Determine if server-side planning should be used based on catalog type,
* credential availability, and configuration.
*
* Decision logic:
* - Use server-side planning if forceServerSidePlanning is true (config override)
* - Use server-side planning if Unity Catalog table lacks credentials
* - Otherwise use normal table loading path
*
* @param isUnityCatalog Whether this is a Unity Catalog instance
* @param hasCredentials Whether the table has credentials available
* @param forceServerSidePlanning Whether to force server-side planning (config flag)
* @return true if server-side planning should be used
*/
private[serverSidePlanning] def shouldUseServerSidePlanning(
isUnityCatalog: Boolean,
hasCredentials: Boolean,
forceServerSidePlanning: Boolean): Boolean = {
(isUnityCatalog && !hasCredentials) || forceServerSidePlanning
}

/**
* Try to create a ServerSidePlannedTable if server-side planning is needed.
* Returns None if not needed or if the planning client factory is not available.
*
* This method encapsulates all the logic to decide whether to use server-side planning:
* - Checks if Unity Catalog table lacks credentials
* - Checks if server-side planning is forced via config (for testing)
* - Extracts catalog name and table identifiers
* - Attempts to create the planning client
*
* Test coverage: ServerSidePlanningSuite tests verify the decision logic through
* shouldUseServerSidePlanning() method with different input combinations.
*
* @param spark The SparkSession
* @param ident The table identifier
* @param table The loaded table from the delegate catalog
* @param isUnityCatalog Whether this is a Unity Catalog instance
* @return Some(ServerSidePlannedTable) if server-side planning should be used, None otherwise
*/
def tryCreate(
spark: SparkSession,
ident: Identifier,
table: Table,
isUnityCatalog: Boolean): Option[ServerSidePlannedTable] = {
// Check if we should force server-side planning (for testing)
val forceServerSidePlanning =
spark.conf.get(DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key, "false").toBoolean
val hasTableCredentials = hasCredentials(table)

// Check if we should use server-side planning
if (shouldUseServerSidePlanning(isUnityCatalog, hasTableCredentials, forceServerSidePlanning)) {
val namespace = ident.namespace().mkString(".")
val tableName = ident.name()

// Extract catalog name from identifier namespace, or default to spark_catalog
//
// Spark Identifier structure:
// - For "catalog.database.table": namespace() = ["catalog", "database"], name() = "table"
// - For "database.table": namespace() = ["database"], name() = "table"
// - For "table": namespace() = [], name() = "table"
//
// Note: We check namespace().length > 1 (not >= 1) because a single-element namespace
// represents just the database name without an explicit catalog, so we use the default.
// See Spark's LookupCatalog, CatalogAndIdentifier and ResolveSessionCatalog.
val catalogName = if (ident.namespace().length > 1) {
ident.namespace().head
} else {
"spark_catalog"
}
Comment on lines +118 to +122
Copy link
Contributor

@tdas tdas Dec 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note for future PR: we should test for table/catalog/schema names that have special chars like hyphens. we found the hard way in uc-spark connector when a user complained.


// Try to create ServerSidePlannedTable with server-side planning
try {
val client = ServerSidePlanningClientFactory.getClient(spark, catalogName)
Some(new ServerSidePlannedTable(spark, namespace, tableName, table.schema(), client))
} catch {
case _: IllegalStateException =>
// Factory not registered - fall through to normal path
logWarning(s"Server-side planning not available for catalog $catalogName. " +
"Falling back to normal table loading.")
None
}
} else {
None
}
}

/**
* Check if a table has credentials available.
* Unity Catalog tables may lack credentials when accessed without proper permissions.
* UC injects credentials as table properties, see:
* https://github.com/unitycatalog/unitycatalog/blob/main/connectors/spark/src/main/scala/
* io/unitycatalog/spark/UCSingleCatalog.scala#L260
*/
private def hasCredentials(table: Table): Boolean = {
// Check table properties for credential information
val properties = table.properties()
CREDENTIAL_PROPERTY_KEYS.exists(key => properties.containsKey(key))
}
}

/**
* A Spark Table implementation that uses server-side scan planning
* to get the list of files to read. Used as a fallback when Unity Catalog
Expand All @@ -45,7 +164,8 @@ class ServerSidePlannedTable(
databaseName: String,
tableName: String,
tableSchema: StructType,
planningClient: ServerSidePlanningClient) extends Table with SupportsRead {
planningClient: ServerSidePlanningClient)
extends Table with SupportsRead with DeltaLogging {

// Returns fully qualified name (e.g., "catalog.database.table").
// The databaseName parameter receives ident.namespace().mkString(".") from DeltaCatalog,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2905,6 +2905,15 @@ trait DeltaSQLConfBase extends DeltaSQLConfUtils {
|When enabled, it's decided by a per-command flag.""".stripMargin)
.booleanConf
.createWithDefault(false)

val ENABLE_SERVER_SIDE_PLANNING =
buildConf("catalog.enableServerSidePlanning")
.internal()
.doc(
"""When enabled, DeltaCatalog will use server-side scan planning path
|instead of normal table loading.""".stripMargin)
.booleanConf
.createWithDefault(false)
}

object DeltaSQLConf extends DeltaSQLConfBase
Loading
Loading