Skip to content

Commit c94cf91

Browse files
murali-dbclaude
andcommitted
[Server-Side Planning] Integrate DeltaCatalog with ServerSidePlannedTable and add E2E tests (#9)
* [Server-Side Planning] Integrate DeltaCatalog with ServerSidePlannedTable and add E2E tests Implements DeltaCatalog integration to use ServerSidePlannedTable when Unity Catalog tables lack credentials, with comprehensive end-to-end testing and improvements. Key changes: - Add loadTable() logic in DeltaCatalog to detect UC tables without credentials - Implement hasCredentials() to check for credential properties in table metadata - Add ENABLE_SERVER_SIDE_PLANNING config flag for testing - Add comprehensive integration tests with reflection-based credential testing - Add Spark source code references for Identifier namespace structure - Improve test suite by removing redundant aggregation test - Revert verbose documentation comments in ServerSidePlannedTable Test coverage: - E2E: Full stack integration with DeltaCatalog - E2E: Verify normal path unchanged when feature disabled - loadTable() decision logic with ENABLE_SERVER_SIDE_PLANNING config (tests 3 scenarios including UC without credentials via reflection) See Spark's LookupCatalog, CatalogAndIdentifier and ResolveSessionCatalog for Identifier namespace structure references. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Refactor PR9: Extract decision logic and improve test quality Changes: - Extracted shouldUseServerSidePlanning() method with boolean inputs - Replaced reflection-based test with clean unit test - Tests all input combinations without brittle reflection code - Improved testability and maintainability 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Clean up ServerSidePlannedTable: Remove unnecessary helper function - Remove misleading "Fallback" comment that didn't apply to all cases - Inline create() function into tryCreate() to reduce indirection - Simplify logic: directly handle client creation in try-catch 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Remove unnecessary logging and unused imports from ServerSidePlannedTable - Remove conditional logging that differentiated between forced and fallback paths - Remove unused imports: MDC and DeltaLogKeys 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Remove useless AutoCloseable implementation from ServerSidePlannedTable The close() method was never called because: - Spark's Table interface has no lifecycle hooks - No code explicitly called close() on ServerSidePlannedTable instances - No try-with-resources or Using() blocks wrapped it HTTP connection cleanup happens via connection timeouts (30s) and JVM finalization, making AutoCloseable purely ceremonial dead code. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Simplify verbose test suite comments Reduced 20+ line formatted comments to simple 2-line descriptions. The bullet-pointed lists were over-documenting obvious test structure. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Remove useless forTesting() wrapper method The forTesting() method was just a wrapper around 'new' that added no value. Tests now directly instantiate ServerSidePlannedTable with the constructor. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Fix brittle test assertion for table capabilities Removed assertion that table has exactly 1 capability, which would break if we add streaming support (MICRO_BATCH_READ, CONTINUOUS_READ) or other non-write capabilities later. Now tests what actually matters: supports BATCH_READ, does NOT support BATCH_WRITE. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Remove redundant SupportsWrite interface check in test Testing !isInstanceOf[SupportsWrite] is redundant with checking !capabilities.contains(BATCH_WRITE) because: - BATCH_WRITE capability requires SupportsWrite interface - Having SupportsWrite without BATCH_WRITE would violate Spark contract The capability check tests the public API contract, which is sufficient. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Remove e2e/integration terminology from ServerSidePlanningSuite Changed: - Test names: removed "E2E:" prefix - Database name: integration_db → test_db - Table name: e2e_test → planning_test These tests use mock clients, not external systems, so e2e/integration terminology was misleading. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Merge test suites: keep existing ServerSidePlannedTableSuite, delete new ServerSidePlanningSuite ServerSidePlanningSuite was added in this PR, while ServerSidePlannedTableSuite existed before. Merged them by keeping the existing file and deleting the new one, so the PR shows modification rather than deletion+addition. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Refactor tests and improve documentation - Refactor ServerSidePlannedTableSuite: create database/table once in beforeAll() - Use minimal early-return pattern in DeltaCatalog with oss-only markers - Move current snapshot limitation docs to ServerSidePlanningClient interface - Add UC credential injection link to hasCredentials() documentation - Lowercase test names and remove redundant client verification 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Simplify current snapshot limitation documentation Shorten documentation from 4 lines to 1 concise line. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Address PR9 review feedback Changes: 1. Remove unnecessary afterEach() cleanup - no resource leaks to prevent 2. Make test 2 explicit by setting config=false instead of relying on cleanup 3. Remove redundant test "loadTable() decision logic" - already covered by other tests 4. Add explanation for deltahadoopconfiguration scalastyle suppression Tests: 4/4 passing, scalastyle clean 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> * Address PR9 review comments: improve test quality and cleanup - Add withServerSidePlanningEnabled helper method to prevent test pollution - Encapsulates factory and config setup/teardown in one place - Guarantees cleanup in finally block - Prevents tests from interfering with each other - Replace white-box capability check with black-box insert test - Test actual insert behavior instead of inspecting capabilities - Verifies inserts succeed without SSP, fail with SSP enabled - More realistic end-to-end test of read-only behavior - Remove OSS-only marker comments from DeltaCatalog - Clean up // oss-only-start and // oss-only-end comments - Remove unused import (DeltaCatalog) All tests passing (4/4): - full query through DeltaCatalog with server-side planning - verify normal path unchanged when feature disabled - shouldUseServerSidePlanning() decision logic - ServerSidePlannedTable is read-only 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> --------- Co-authored-by: Claude <[email protected]>
1 parent da0c2ae commit c94cf91

File tree

5 files changed

+331
-104
lines changed

5 files changed

+331
-104
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalog.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,15 @@ import org.apache.spark.sql.delta.skipping.clustering.ClusteredTableUtils
3131
import org.apache.spark.sql.delta.skipping.clustering.temp.{ClusterBy, ClusterBySpec}
3232
import org.apache.spark.sql.delta.skipping.clustering.temp.{ClusterByTransform => TempClusterByTransform}
3333
import org.apache.spark.sql.delta.{ColumnWithDefaultExprUtils, DeltaConfigs, DeltaErrors, DeltaTableUtils}
34-
import org.apache.spark.sql.delta.{DeltaLog, DeltaOptions, IdentityColumn}
34+
import org.apache.spark.sql.delta.{DeltaOptions, IdentityColumn}
3535
import org.apache.spark.sql.delta.DeltaTableIdentifier.gluePermissionError
3636
import org.apache.spark.sql.delta.commands._
3737
import org.apache.spark.sql.delta.constraints.{AddConstraint, DropConstraint}
3838
import org.apache.spark.sql.delta.logging.DeltaLogKeys
3939
import org.apache.spark.sql.delta.metering.DeltaLogging
4040
import org.apache.spark.sql.delta.redirect.RedirectFeature
4141
import org.apache.spark.sql.delta.schema.SchemaUtils
42+
import org.apache.spark.sql.delta.serverSidePlanning.ServerSidePlannedTable
4243
import org.apache.spark.sql.delta.sources.{DeltaDataSource, DeltaSourceUtils, DeltaSQLConf}
4344
import org.apache.spark.sql.delta.stats.StatisticsCollection
4445
import org.apache.spark.sql.delta.tablefeatures.DropFeature
@@ -47,7 +48,7 @@ import org.apache.spark.sql.delta.util.PartitionUtils
4748
import org.apache.hadoop.fs.Path
4849

4950
import org.apache.spark.SparkException
50-
import org.apache.spark.internal.{Logging, MDC}
51+
import org.apache.spark.internal.MDC
5152
import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession}
5253
import org.apache.spark.sql.catalyst.TableIdentifier
5354
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, UnresolvedAttribute, UnresolvedFieldName, UnresolvedFieldPosition}
@@ -236,7 +237,13 @@ class AbstractDeltaCatalog extends DelegatingCatalogExtension
236237
override def loadTable(ident: Identifier): Table = recordFrameProfile(
237238
"DeltaCatalog", "loadTable") {
238239
try {
239-
super.loadTable(ident) match {
240+
val table = super.loadTable(ident)
241+
242+
ServerSidePlannedTable.tryCreate(spark, ident, table, isUnityCatalog).foreach { sspt =>
243+
return sspt
244+
}
245+
246+
table match {
240247
case v1: V1Table if DeltaTableUtils.isDeltaTable(v1.catalogTable) =>
241248
loadCatalogTable(ident, v1.catalogTable)
242249
case o => o

spark/src/main/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlannedTable.scala

Lines changed: 122 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,133 @@ import scala.collection.JavaConverters._
2424
import org.apache.spark.paths.SparkPath
2525
import org.apache.spark.sql.SparkSession
2626
import org.apache.spark.sql.catalyst.InternalRow
27-
import org.apache.spark.sql.delta.serverSidePlanning.ServerSidePlanningClient
27+
import org.apache.spark.sql.connector.catalog.Identifier
28+
import org.apache.spark.sql.delta.metering.DeltaLogging
29+
import org.apache.spark.sql.delta.sources.DeltaSQLConf
2830
import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, TableCapability}
2931
import org.apache.spark.sql.connector.read._
3032
import org.apache.spark.sql.execution.datasources.{FileFormat, PartitionedFile}
3133
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
3234
import org.apache.spark.sql.types.StructType
3335
import org.apache.spark.sql.util.CaseInsensitiveStringMap
3436

37+
/**
38+
* Companion object for ServerSidePlannedTable with factory methods.
39+
*/
40+
object ServerSidePlannedTable extends DeltaLogging {
41+
/**
42+
* Property keys that indicate table credentials are available.
43+
* Unity Catalog tables may expose temporary credentials via these properties.
44+
*/
45+
private val CREDENTIAL_PROPERTY_KEYS = Seq(
46+
"storage.credential",
47+
"aws.temporary.credentials",
48+
"azure.temporary.credentials",
49+
"gcs.temporary.credentials",
50+
"credential"
51+
)
52+
53+
/**
54+
* Determine if server-side planning should be used based on catalog type,
55+
* credential availability, and configuration.
56+
*
57+
* Decision logic:
58+
* - Use server-side planning if forceServerSidePlanning is true (config override)
59+
* - Use server-side planning if Unity Catalog table lacks credentials
60+
* - Otherwise use normal table loading path
61+
*
62+
* @param isUnityCatalog Whether this is a Unity Catalog instance
63+
* @param hasCredentials Whether the table has credentials available
64+
* @param forceServerSidePlanning Whether to force server-side planning (config flag)
65+
* @return true if server-side planning should be used
66+
*/
67+
private[serverSidePlanning] def shouldUseServerSidePlanning(
68+
isUnityCatalog: Boolean,
69+
hasCredentials: Boolean,
70+
forceServerSidePlanning: Boolean): Boolean = {
71+
(isUnityCatalog && !hasCredentials) || forceServerSidePlanning
72+
}
73+
74+
/**
75+
* Try to create a ServerSidePlannedTable if server-side planning is needed.
76+
* Returns None if not needed or if the planning client factory is not available.
77+
*
78+
* This method encapsulates all the logic to decide whether to use server-side planning:
79+
* - Checks if Unity Catalog table lacks credentials
80+
* - Checks if server-side planning is forced via config (for testing)
81+
* - Extracts catalog name and table identifiers
82+
* - Attempts to create the planning client
83+
*
84+
* Test coverage: ServerSidePlanningSuite tests verify the decision logic through
85+
* shouldUseServerSidePlanning() method with different input combinations.
86+
*
87+
* @param spark The SparkSession
88+
* @param ident The table identifier
89+
* @param table The loaded table from the delegate catalog
90+
* @param isUnityCatalog Whether this is a Unity Catalog instance
91+
* @return Some(ServerSidePlannedTable) if server-side planning should be used, None otherwise
92+
*/
93+
def tryCreate(
94+
spark: SparkSession,
95+
ident: Identifier,
96+
table: Table,
97+
isUnityCatalog: Boolean): Option[ServerSidePlannedTable] = {
98+
// Check if we should force server-side planning (for testing)
99+
val forceServerSidePlanning =
100+
spark.conf.get(DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key, "false").toBoolean
101+
val hasTableCredentials = hasCredentials(table)
102+
103+
// Check if we should use server-side planning
104+
if (shouldUseServerSidePlanning(isUnityCatalog, hasTableCredentials, forceServerSidePlanning)) {
105+
val namespace = ident.namespace().mkString(".")
106+
val tableName = ident.name()
107+
108+
// Extract catalog name from identifier namespace, or default to spark_catalog
109+
//
110+
// Spark Identifier structure:
111+
// - For "catalog.database.table": namespace() = ["catalog", "database"], name() = "table"
112+
// - For "database.table": namespace() = ["database"], name() = "table"
113+
// - For "table": namespace() = [], name() = "table"
114+
//
115+
// Note: We check namespace().length > 1 (not >= 1) because a single-element namespace
116+
// represents just the database name without an explicit catalog, so we use the default.
117+
// See Spark's LookupCatalog, CatalogAndIdentifier and ResolveSessionCatalog.
118+
val catalogName = if (ident.namespace().length > 1) {
119+
ident.namespace().head
120+
} else {
121+
"spark_catalog"
122+
}
123+
124+
// Try to create ServerSidePlannedTable with server-side planning
125+
try {
126+
val client = ServerSidePlanningClientFactory.getClient(spark, catalogName)
127+
Some(new ServerSidePlannedTable(spark, namespace, tableName, table.schema(), client))
128+
} catch {
129+
case _: IllegalStateException =>
130+
// Factory not registered - fall through to normal path
131+
logWarning(s"Server-side planning not available for catalog $catalogName. " +
132+
"Falling back to normal table loading.")
133+
None
134+
}
135+
} else {
136+
None
137+
}
138+
}
139+
140+
/**
141+
* Check if a table has credentials available.
142+
* Unity Catalog tables may lack credentials when accessed without proper permissions.
143+
* UC injects credentials as table properties, see:
144+
* https://github.com/unitycatalog/unitycatalog/blob/main/connectors/spark/src/main/scala/
145+
* io/unitycatalog/spark/UCSingleCatalog.scala#L260
146+
*/
147+
private def hasCredentials(table: Table): Boolean = {
148+
// Check table properties for credential information
149+
val properties = table.properties()
150+
CREDENTIAL_PROPERTY_KEYS.exists(key => properties.containsKey(key))
151+
}
152+
}
153+
35154
/**
36155
* A Spark Table implementation that uses server-side scan planning
37156
* to get the list of files to read. Used as a fallback when Unity Catalog
@@ -45,7 +164,8 @@ class ServerSidePlannedTable(
45164
databaseName: String,
46165
tableName: String,
47166
tableSchema: StructType,
48-
planningClient: ServerSidePlanningClient) extends Table with SupportsRead {
167+
planningClient: ServerSidePlanningClient)
168+
extends Table with SupportsRead with DeltaLogging {
49169

50170
// Returns fully qualified name (e.g., "catalog.database.table").
51171
// The databaseName parameter receives ident.namespace().mkString(".") from DeltaCatalog,

spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2905,6 +2905,15 @@ trait DeltaSQLConfBase extends DeltaSQLConfUtils {
29052905
|When enabled, it's decided by a per-command flag.""".stripMargin)
29062906
.booleanConf
29072907
.createWithDefault(false)
2908+
2909+
val ENABLE_SERVER_SIDE_PLANNING =
2910+
buildConf("catalog.enableServerSidePlanning")
2911+
.internal()
2912+
.doc(
2913+
"""When enabled, DeltaCatalog will use server-side scan planning path
2914+
|instead of normal table loading.""".stripMargin)
2915+
.booleanConf
2916+
.createWithDefault(false)
29082917
}
29092918

29102919
object DeltaSQLConf extends DeltaSQLConfBase

0 commit comments

Comments
 (0)