Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,48 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

#### **StatefulSet Reconciliation Improvements** (2025-12-03, Issue #43)

Implemented intelligent StatefulSet update detection and validation to improve reconciliation efficiency and safety:

- **Diff Detection**: Added `statefulset_needs_update()` method to detect actual changes
- Compares existing vs desired StatefulSet specs semantically
- Avoids unnecessary API calls when no changes are needed
- Checks: replicas, image, env vars, resources, scheduling, pod management policy, etc.

- **Immutable Field Validation**: Added `validate_statefulset_update()` method
- Prevents modifications to immutable StatefulSet fields (selector, volumeClaimTemplates, serviceName)
- Provides clear error messages for invalid updates (e.g., changing volumesPerServer)
- Protects against API rejections during reconciliation

- **Enhanced Reconciliation Logic**: Refactored StatefulSet reconciliation loop
- Checks if StatefulSet exists before attempting update
- Validates update safety before applying changes
- Only applies updates when actual changes are detected
- Records Kubernetes events for update lifecycle (Created, UpdateStarted, UpdateValidationFailed)

- **Error Handling**: Extended error policy
- Added 60-second requeue for immutable field modification errors (user-fixable)
- Consistent error handling across credential and validation failures

- **New Error Types**: Added to `types::error::Error`
- `InternalError` - For unexpected internal conditions
- `ImmutableFieldModified` - For attempted modifications to immutable fields
- `SerdeJson` - For JSON serialization errors during comparisons

- **Comprehensive Test Coverage**: Added 9 new unit tests (35 tests total)
- Tests for diff detection (no changes, image, replicas, env vars, resources)
- Tests for validation (selector, serviceName, volumesPerServer changes rejected)
- Test for safe updates (image changes allowed)

**Benefits**:
- Reduces unnecessary API calls and reconciliation overhead
- Prevents reconciliation failures from invalid updates
- Provides better error messages for users
- Foundation for rollout monitoring (Phase 2)

### Changed

#### **Code Refactoring**: Credential Validation Simplification (2025-11-15)
Expand Down
18 changes: 18 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ homepage = "https://rustfs.com"


[dependencies]
chrono = "0.4"
const-str = "0.7.0"
serde = { version = "1.0.228", features = ["derive"] }
tokio = { version = "1.48.0", features = ["rt", "rt-multi-thread", "macros", "fs", "io-std", "io-util"] }
futures = "0.3.31"
tracing = "0.1.41"
tracing-subscriber = { version = "0.3.20" }
tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }
serde_json = "1.0.145"
serde_yaml_ng = "0.10.0"
strum = { version = "0.27.2", features = ["derive"] }
Expand Down
75 changes: 72 additions & 3 deletions deploy/rustfs-operator/crds/tenant.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ spec:
- jsonPath: .status.currentState
name: State
type: string
- jsonPath: .status.healthStatus
name: Health
type: string
- jsonPath: .metadata.creationTimestamp
name: Age
type: date
Expand Down Expand Up @@ -1157,15 +1154,87 @@ spec:
availableReplicas:
format: int32
type: integer
conditions:
description: Kubernetes standard conditions
items:
description: Kubernetes standard condition for Tenant resources
properties:
lastTransitionTime:
description: Last time the condition transitioned from one status to another
nullable: true
type: string
message:
description: Human-readable message indicating details about the transition
type: string
observedGeneration:
description: The generation of the Tenant resource that this condition reflects
format: int64
nullable: true
type: integer
reason:
description: One-word CamelCase reason for the condition's last transition
type: string
status:
description: Status of the condition (True, False, Unknown)
type: string
type:
description: Type of condition (Ready, Progressing, Degraded)
type: string
required:
- message
- reason
- status
- type
type: object
type: array
currentState:
type: string
observedGeneration:
description: The generation observed by the operator
format: int64
nullable: true
type: integer
pools:
items:
properties:
currentReplicas:
description: Number of pods with current revision
format: int32
nullable: true
type: integer
currentRevision:
description: Current revision hash of the StatefulSet
nullable: true
type: string
lastUpdateTime:
description: Last time the pool status was updated
nullable: true
type: string
readyReplicas:
description: Number of pods with Ready condition
format: int32
nullable: true
type: integer
replicas:
description: Total number of non-terminated pods targeted by this pool's StatefulSet
format: int32
nullable: true
type: integer
ssName:
description: Name of the StatefulSet for this pool
type: string
state:
description: Current state of the pool
type: string
updateRevision:
description: Update revision hash of the StatefulSet (different from current during rollout)
nullable: true
type: string
updatedReplicas:
description: Number of pods with updated revision
format: int32
nullable: true
type: integer
required:
- ssName
- state
Expand Down
126 changes: 105 additions & 21 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,38 +106,41 @@ impl Context {
.await
}

pub async fn update_status<S>(
pub async fn update_status(
&self,
resource: &Tenant,
current_status: S,
replica: i32,
) -> Result<Tenant, Error>
where
S: ToString,
{
status: crate::types::v1alpha1::status::Status,
) -> Result<Tenant, Error> {
use kube::api::{Patch, PatchParams};

let api: Api<Tenant> = Api::namespaced(self.client.clone(), &resource.namespace()?);
let name = &resource.name();
let name = resource.name();

let update_func = async |tenant: &Tenant| {
let mut status = tenant.status.clone().unwrap_or_default();
status.available_replicas = replica;
status.current_state = current_status.to_string();
let status_body = serde_json::to_vec(&status)?;
// Create a JSON merge patch for the status
let status_patch = serde_json::json!({
"status": status
});

api.replace_status(name, &PostParams::default(), status_body)
.context(KubeSnafu)
.await
};

match update_func(resource).await {
// Try to patch the status
match api
.patch_status(
&name,
&PatchParams::default(),
&Patch::Merge(status_patch.clone()),
)
.context(KubeSnafu)
.await
{
Ok(t) => return Ok(t),
_ => {}
}

info!("status update failed due to conflict, retrieve the latest resource and retry.");

let new_one = api.get(name).context(KubeSnafu).await?;
update_func(&new_one).await
// Retry with the same patch
api.patch_status(&name, &PatchParams::default(), &Patch::Merge(status_patch))
.context(KubeSnafu)
.await
}

pub async fn delete<T>(&self, name: &str, namespace: &str) -> Result<(), Error>
Expand Down Expand Up @@ -287,4 +290,85 @@ impl Context {

Ok(())
}

/// Gets the status of a StatefulSet including rollout progress
///
/// # Returns
/// The StatefulSet status with replica counts and revision information
pub async fn get_statefulset_status(
&self,
name: &str,
namespace: &str,
) -> Result<k8s_openapi::api::apps::v1::StatefulSetStatus, Error> {
let ss: k8s_openapi::api::apps::v1::StatefulSet = self.get(name, namespace).await?;

ss.status.ok_or_else(|| Error::Types {
source: types::error::Error::InternalError {
msg: format!("StatefulSet {} has no status", name),
},
})
}

/// Checks if a StatefulSet rollout is complete
///
/// A rollout is considered complete when:
/// - observedGeneration matches metadata.generation (controller has seen latest spec)
/// - replicas == readyReplicas (all pods are ready)
/// - currentRevision == updateRevision (all pods are on the new revision)
/// - updatedReplicas == replicas (all pods have been updated)
///
/// # Returns
/// - `Ok(true)` if rollout is complete
/// - `Ok(false)` if rollout is still in progress
/// - `Err` if there's an error fetching the StatefulSet
pub async fn is_rollout_complete(&self, name: &str, namespace: &str) -> Result<bool, Error> {
let ss: k8s_openapi::api::apps::v1::StatefulSet = self.get(name, namespace).await?;

let metadata = &ss.metadata;
let spec = ss.spec.as_ref().ok_or_else(|| Error::Types {
source: types::error::Error::InternalError {
msg: format!("StatefulSet {} missing spec", name),
},
})?;

let status = ss.status.as_ref().ok_or_else(|| Error::Types {
source: types::error::Error::InternalError {
msg: format!("StatefulSet {} missing status", name),
},
})?;

let desired_replicas = spec.replicas.unwrap_or(1);

// Check if controller has observed the latest generation
let generation_current = metadata.generation.is_some()
&& status.observed_generation.is_some()
&& metadata.generation == status.observed_generation;

// Check if all replicas are ready
let replicas_ready = status.replicas == desired_replicas
&& status.ready_replicas.unwrap_or(0) == desired_replicas
&& status.updated_replicas.unwrap_or(0) == desired_replicas;

// Check if all pods are on the same revision
let revisions_match = status.current_revision.is_some()
&& status.update_revision.is_some()
&& status.current_revision == status.update_revision;

Ok(generation_current && replicas_ready && revisions_match)
}

/// Gets the current and update revision of a StatefulSet
///
/// # Returns
/// A tuple of (current_revision, update_revision)
/// Returns None for either value if not available
pub async fn get_statefulset_revisions(
&self,
name: &str,
namespace: &str,
) -> Result<(Option<String>, Option<String>), Error> {
let status = self.get_statefulset_status(name, namespace).await?;

Ok((status.current_revision, status.update_revision))
}
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub mod tests;

pub async fn run() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.with_level(true)
.with_file(true)
.with_line_number(true)
Expand Down
Loading