Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 27 additions & 94 deletions apps/heater-api/mocks/gpio_mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,97 +2,30 @@


class GPIORepositoryMock(GPIORepository):
"""Mock implementation of GPIORepository for testing and development
This mock maintains an in-memory state of GPIO pins and their configurations.
"""

def __init__(self):
"""Initialize the mock with empty pin states"""
self._pin_configs = {
# Store pin configurations: {pin: {'frequency': float, 'duty_cycle': float}}
}

def setup_pwm(self, pin: int, frequency: float) -> None:
"""Setup a GPIO pin for PWM output
Args:
pin (int): GPIO pin number
frequency (float): PWM frequency in Hz
"""
if pin < 0:
raise ValueError("Pin number must be non-negative")
if frequency <= 0:
raise ValueError("Frequency must be positive")

self._pin_configs[pin] = {
'frequency': frequency,
'duty_cycle': 0.0 # Initialize with 0% duty cycle
}
print(f"Mock: Setup PWM on pin {pin} with frequency {frequency} Hz")

def cleanup_channel(self, pin: int) -> None:
"""Cleanup a GPIO pin
Args:
pin (int): GPIO pin number
"""
if pin not in self._pin_configs:
print(f"Mock: Pin {pin} not configured; nothing to cleanup")
return

del self._pin_configs[pin]
print(f"Mock: Cleaned up pin {pin}")

def cleanup_all(self) -> None:
"""Cleanup all GPIO pins"""
pin_count = len(self._pin_configs)
self._pin_configs.clear()
print(f"Mock: Cleaned up all pins ({pin_count} pins cleared)")

def set_duty_cycle(self, pin: int, duty_cycle: float) -> None:
"""Set the duty cycle for a PWM pin
Args:
pin (int): GPIO pin number
duty_cycle (float): Duty cycle percentage (0.0 to 100.0)
"""
if pin not in self._pin_configs:
raise ValueError(f"Pin {pin} has not been setup for PWM")
if not 0.0 <= duty_cycle <= 100.0:
raise ValueError("Duty cycle must be between 0.0 and 100.0")

self._pin_configs[pin]['duty_cycle'] = duty_cycle
print(f"Mock: Set duty cycle on pin {pin} to {duty_cycle}%")

def get_duty_cycle(self, pin: int) -> float:
"""Get the current duty cycle for a PWM pin
Args:
pin (int): GPIO pin number
Returns:
float: Current duty cycle percentage
"""
if pin not in self._pin_configs:
raise ValueError(f"Pin {pin} has not been setup for PWM")

duty_cycle = self._pin_configs[pin]['duty_cycle']
print(f"Mock: Retrieved duty cycle for pin {pin}: {duty_cycle}%")
return duty_cycle

def get_pin_config(self, pin: int) -> dict:
"""Helper method to get the full configuration of a pin (for testing)
Args:
pin (int): GPIO pin number
Returns:
dict: Pin configuration containing frequency and duty_cycle
"""
if pin not in self._pin_configs:
raise ValueError(f"Pin {pin} has not been setup for PWM")
return self._pin_configs[pin].copy()

def reset_all_pins(self) -> None:
"""Helper method to reset all pin configurations (for testing)"""
self._pin_configs.clear()
print("Mock: All pins reset")
"""Mock version for testing, no hardware interaction."""

def __init__(self, gpio=None, pin: int = 0, frequency: float = 0.2, duty_cycle: float = 0.0) -> None:
self.gpio = gpio
self.pin = pin
self.frequency = frequency
self.duty_cycle = duty_cycle
self.pwm = None

def setup_pwm(self) -> None:
self.pwm = True
print(f"[Mock] Setup PWM at pin={self.pin}, freq={self.frequency}Hz")

def set_duty_cycle(self, duty_cycle: float) -> None:
if self.pwm is None:
raise RuntimeError("PWM not set up. Call setup_pwm() first.")
self.duty_cycle = max(0.0, min(100.0, duty_cycle))
print(f"[Mock] Set duty cycle to {self.duty_cycle}% on pin={self.pin}")

def get_duty_cycle(self) -> float:
if self.pwm is None:
raise RuntimeError("PWM not set up. Call setup_pwm() first.")
Comment on lines +19 to +26
Copy link

Copilot AI Nov 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The RPiGPIORepository implementation (lines 59-71 in gpio.py) does not validate that PWM is set up before calling cleanup(), but the mock raises an error if PWM is not set up. This inconsistency in error handling between the real implementation and mock could hide bugs. Either add validation to RPiGPIORepository.cleanup() or remove it from the mock to maintain consistency.

Suggested change
if self.pwm is None:
raise RuntimeError("PWM not set up. Call setup_pwm() first.")
self.duty_cycle = max(0.0, min(100.0, duty_cycle))
print(f"[Mock] Set duty cycle to {self.duty_cycle}% on pin={self.pin}")
def get_duty_cycle(self) -> float:
if self.pwm is None:
raise RuntimeError("PWM not set up. Call setup_pwm() first.")
self.duty_cycle = max(0.0, min(100.0, duty_cycle))
print(f"[Mock] Set duty cycle to {self.duty_cycle}% on pin={self.pin}")
def get_duty_cycle(self) -> float:

Copilot uses AI. Check for mistakes.
return self.duty_cycle

def cleanup(self) -> None:
print(f"[Mock] Cleanup pin {self.pin}")
self.pwm = None
99 changes: 59 additions & 40 deletions apps/heater-api/repositories/gpio.py
Original file line number Diff line number Diff line change
@@ -1,52 +1,71 @@
from abc import ABC, abstractmethod
import threading
from typing import Any, Optional


class GPIORepository(ABC):
"""Repository for controlling a GPIO port as PWM output
"""

@abstractmethod
def setup_pwm(self, pin: int, frequency: float) -> None:
"""Setup a GPIO pin for PWM output
def setup_pwm(self) -> None: ...
@abstractmethod
def set_duty_cycle(self, duty_cycle: float) -> None: ...
@abstractmethod
def get_duty_cycle(self) -> float: ...
@abstractmethod
def cleanup(self) -> None: ...

Args:
pin (int): GPIO pin number
frequency (float): PWM frequency in Hz
"""
pass

@abstractmethod
def cleanup_channel(self, pin: int) -> None:
"""Cleanup a GPIO pin
class RPiGPIORepository(GPIORepository):
def __init__(self, gpio: Any, pin: int, frequency: float, duty_cycle: float = 0.0) -> None:
self._lock: threading.Lock = threading.Lock()
self.gpio = gpio
self.pin: int = int(pin)
self.frequency: float = float(frequency)
self.duty_cycle: float = float(duty_cycle)
self.pwm: Optional[Any] = None

Args:
pin (int): GPIO pin number
"""
pass
required_attrs = ("setmode", "BCM", "setup", "OUT", "PWM", "cleanup")
missing = [
attr for attr in required_attrs if not hasattr(self.gpio, attr)]
if missing:
raise RuntimeError(
f"gpio object missing attributes: {', '.join(missing)}, gpio interface must have these methods and properties to function properly")
Copy link

Copilot AI Nov 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message is a run-on sentence without proper punctuation. Add a period or semicolon after the attribute list to improve readability: 'gpio object missing attributes: {', '.join(missing)}. GPIO interface must have these methods and properties to function properly.'

Suggested change
f"gpio object missing attributes: {', '.join(missing)}, gpio interface must have these methods and properties to function properly")
f"gpio object missing attributes: {', '.join(missing)}. GPIO interface must have these methods and properties to function properly")

Copilot uses AI. Check for mistakes.

@abstractmethod
def cleanup_all(self) -> None:
"""Cleanup all GPIO pins
"""
pass
# Configure and start PWM
self.gpio.setmode(self.gpio.BCM)
self.gpio.setup(self.pin, self.gpio.OUT)

@abstractmethod
def set_duty_cycle(self, pin: int, duty_cycle: float) -> None:
"""Set the duty cycle for a PWM pin
def setup_pwm(self) -> None:
with self._lock:
if self.pwm is not None:
try:
self.pwm.stop()
except Exception:
raise RuntimeError(
"Failed to stop existing PWM instance, could not reinitialize PWM")
Comment on lines +42 to +44
Copy link

Copilot AI Nov 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message does not include the underlying exception details, making debugging difficult. Include the original exception information using 'from e' or include the exception message in the error text to provide more context about what went wrong.

Suggested change
except Exception:
raise RuntimeError(
"Failed to stop existing PWM instance, could not reinitialize PWM")
except Exception as e:
raise RuntimeError(
f"Failed to stop existing PWM instance, could not reinitialize PWM: {e}"
) from e

Copilot uses AI. Check for mistakes.
self.pwm = self.gpio.PWM(self.pin, self.frequency)
self.pwm.start(self.duty_cycle)

Args:
pin (int): GPIO pin number
duty_cycle (float): Duty cycle percentage (0.0 to 100.0)
"""
pass
def set_duty_cycle(self, duty_cycle: float) -> None:
dc = max(0.0, min(100.0, float(duty_cycle)))
with self._lock:
self.duty_cycle = dc
if self.pwm is not None:
self.pwm.ChangeDutyCycle(self.duty_cycle)

@abstractmethod
def get_duty_cycle(self, pin: int) -> float:
"""Get the current duty cycle for a PWM pin
Args:
pin (int): GPIO pin number
Returns:
float: Current duty cycle percentage
"""
pass
def get_duty_cycle(self) -> float:
with self._lock:
return self.duty_cycle

def cleanup(self) -> None:
with self._lock:
if self.pwm is not None:
try:
self.pwm.stop()
except Exception:
raise RuntimeError(
"Failed to stop PWM instance during cleanup")
Comment on lines +64 to +66
Copy link

Copilot AI Nov 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message does not include the underlying exception details, making debugging difficult. Include the original exception information using 'from e' or include the exception message in the error text to provide more context about what went wrong.

Suggested change
except Exception:
raise RuntimeError(
"Failed to stop PWM instance during cleanup")
except Exception as e:
raise RuntimeError(
"Failed to stop PWM instance during cleanup") from e

Copilot uses AI. Check for mistakes.
try:
self.gpio.cleanup(self.pin)
except Exception:
raise RuntimeError(
"Failed to cleanup GPIO pin during cleanup")
Comment on lines +69 to +71
Copy link

Copilot AI Nov 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message does not include the underlying exception details, making debugging difficult. Include the original exception information using 'from e' or include the exception message in the error text to provide more context about what went wrong.

Suggested change
except Exception:
raise RuntimeError(
"Failed to cleanup GPIO pin during cleanup")
except Exception as e:
raise RuntimeError(
"Failed to cleanup GPIO pin during cleanup") from e

Copilot uses AI. Check for mistakes.
79 changes: 79 additions & 0 deletions apps/heater-api/services/PID.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import threading
from typing import Optional


class PIDController:
def __init__(self, kp: float, ki: float, kd: float, setpoint: float = 0.0):
self.kp = float(kp)
self.ki = float(ki)
self.kd = float(kd)
self.setpoint = float(setpoint)

self._integral = 0.0
self._last_error = 0.0
self._last_measurement: Optional[float] = None
self._lock = threading.Lock()

# ----------------------
# PID Logic
# ----------------------
def update(self, measurement: float) -> float:
with self._lock:
error = self.setpoint - measurement
new_integral = self._integral + error
derivative = 0.0 if self._last_measurement is None else (
error - self._last_error)
Copy link

Copilot AI Nov 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The derivative calculation uses error difference instead of measurement difference, which is susceptible to derivative kick when the setpoint changes. Consider calculating derivative based on measurement change: -(measurement - self._last_measurement) to avoid sudden output spikes when the target temperature is adjusted.

Suggested change
error - self._last_error)
-(measurement - self._last_measurement))

Copilot uses AI. Check for mistakes.

output = (
(self.kp * error) +
(self.ki * new_integral) +
(self.kd * derivative)
)

self._last_error = error
self._last_measurement = measurement

# Anti-windup
if 0.0 <= output <= 100.0:
self._integral = new_integral
else:
output = max(0.0, min(100.0, output))
self._integral = max(min(new_integral, 100.0), -100.0)

return output

# ----------------------
# Tuning and Setpoint
# ----------------------
def set_coefficients(self, kp: float, ki: float, kd: float):
with self._lock:
self.kp = float(kp)
self.ki = float(ki)
self.kd = float(kd)

def set_setpoint(self, new_setpoint: float):
with self._lock:
self.setpoint = float(new_setpoint)
self._integral = 0.0
self._last_error = 0.0

def reset(self, setpoint: float):
with self._lock:
self.setpoint = float(setpoint)
self._integral = 0.0
self._last_error = 0.0
self._last_measurement = None

# ----------------------
# Utility
# ----------------------
def get_state(self):
with self._lock:
return {
"Kp": self.kp,
"Ki": self.ki,
"Kd": self.kd,
"setpoint": self.setpoint,
"integral": self._integral,
"last_error": self._last_error,
}
Loading
Loading