From cc0006fb6dbc25b28389e4553fcc036edeeae684 Mon Sep 17 00:00:00 2001 From: YeYe Date: Fri, 7 Nov 2025 17:56:33 +0700 Subject: [PATCH] finished gpio repositories, PID logic sservice and temp_service --- apps/heater-api/mocks/gpio_mock.py | 121 +++++------------------ apps/heater-api/repositories/gpio.py | 99 +++++++++++-------- apps/heater-api/services/PID.py | 79 +++++++++++++++ apps/heater-api/services/temp_service.py | 73 ++++++++++++-- 4 files changed, 231 insertions(+), 141 deletions(-) create mode 100644 apps/heater-api/services/PID.py diff --git a/apps/heater-api/mocks/gpio_mock.py b/apps/heater-api/mocks/gpio_mock.py index 9dcd331..3968392 100644 --- a/apps/heater-api/mocks/gpio_mock.py +++ b/apps/heater-api/mocks/gpio_mock.py @@ -2,97 +2,30 @@ class GPIORepositoryMock(GPIORepository): - """Mock implementation of GPIORepository for testing and development - - This mock maintains an in-memory state of GPIO pins and their configurations. - """ - - def __init__(self): - """Initialize the mock with empty pin states""" - self._pin_configs = { - # Store pin configurations: {pin: {'frequency': float, 'duty_cycle': float}} - } - - def setup_pwm(self, pin: int, frequency: float) -> None: - """Setup a GPIO pin for PWM output - - Args: - pin (int): GPIO pin number - frequency (float): PWM frequency in Hz - """ - if pin < 0: - raise ValueError("Pin number must be non-negative") - if frequency <= 0: - raise ValueError("Frequency must be positive") - - self._pin_configs[pin] = { - 'frequency': frequency, - 'duty_cycle': 0.0 # Initialize with 0% duty cycle - } - print(f"Mock: Setup PWM on pin {pin} with frequency {frequency} Hz") - - def cleanup_channel(self, pin: int) -> None: - """Cleanup a GPIO pin - - Args: - pin (int): GPIO pin number - """ - if pin not in self._pin_configs: - print(f"Mock: Pin {pin} not configured; nothing to cleanup") - return - - del self._pin_configs[pin] - print(f"Mock: Cleaned up pin {pin}") - - def cleanup_all(self) -> None: - """Cleanup all GPIO pins""" - pin_count = len(self._pin_configs) - self._pin_configs.clear() - print(f"Mock: Cleaned up all pins ({pin_count} pins cleared)") - - def set_duty_cycle(self, pin: int, duty_cycle: float) -> None: - """Set the duty cycle for a PWM pin - - Args: - pin (int): GPIO pin number - duty_cycle (float): Duty cycle percentage (0.0 to 100.0) - """ - if pin not in self._pin_configs: - raise ValueError(f"Pin {pin} has not been setup for PWM") - if not 0.0 <= duty_cycle <= 100.0: - raise ValueError("Duty cycle must be between 0.0 and 100.0") - - self._pin_configs[pin]['duty_cycle'] = duty_cycle - print(f"Mock: Set duty cycle on pin {pin} to {duty_cycle}%") - - def get_duty_cycle(self, pin: int) -> float: - """Get the current duty cycle for a PWM pin - - Args: - pin (int): GPIO pin number - Returns: - float: Current duty cycle percentage - """ - if pin not in self._pin_configs: - raise ValueError(f"Pin {pin} has not been setup for PWM") - - duty_cycle = self._pin_configs[pin]['duty_cycle'] - print(f"Mock: Retrieved duty cycle for pin {pin}: {duty_cycle}%") - return duty_cycle - - def get_pin_config(self, pin: int) -> dict: - """Helper method to get the full configuration of a pin (for testing) - - Args: - pin (int): GPIO pin number - Returns: - dict: Pin configuration containing frequency and duty_cycle - """ - if pin not in self._pin_configs: - raise ValueError(f"Pin {pin} has not been setup for PWM") - return self._pin_configs[pin].copy() - - def reset_all_pins(self) -> None: - """Helper method to reset all pin configurations (for testing)""" - self._pin_configs.clear() - print("Mock: All pins reset") + """Mock version for testing, no hardware interaction.""" + + def __init__(self, gpio=None, pin: int = 0, frequency: float = 0.2, duty_cycle: float = 0.0) -> None: + self.gpio = gpio + self.pin = pin + self.frequency = frequency + self.duty_cycle = duty_cycle + self.pwm = None + + def setup_pwm(self) -> None: + self.pwm = True + print(f"[Mock] Setup PWM at pin={self.pin}, freq={self.frequency}Hz") + + def set_duty_cycle(self, duty_cycle: float) -> None: + if self.pwm is None: + raise RuntimeError("PWM not set up. Call setup_pwm() first.") + self.duty_cycle = max(0.0, min(100.0, duty_cycle)) + print(f"[Mock] Set duty cycle to {self.duty_cycle}% on pin={self.pin}") + + def get_duty_cycle(self) -> float: + if self.pwm is None: + raise RuntimeError("PWM not set up. Call setup_pwm() first.") + return self.duty_cycle + + def cleanup(self) -> None: + print(f"[Mock] Cleanup pin {self.pin}") + self.pwm = None diff --git a/apps/heater-api/repositories/gpio.py b/apps/heater-api/repositories/gpio.py index 1286664..37999b6 100644 --- a/apps/heater-api/repositories/gpio.py +++ b/apps/heater-api/repositories/gpio.py @@ -1,52 +1,71 @@ from abc import ABC, abstractmethod +import threading +from typing import Any, Optional class GPIORepository(ABC): - """Repository for controlling a GPIO port as PWM output - """ - @abstractmethod - def setup_pwm(self, pin: int, frequency: float) -> None: - """Setup a GPIO pin for PWM output + def setup_pwm(self) -> None: ... + @abstractmethod + def set_duty_cycle(self, duty_cycle: float) -> None: ... + @abstractmethod + def get_duty_cycle(self) -> float: ... + @abstractmethod + def cleanup(self) -> None: ... - Args: - pin (int): GPIO pin number - frequency (float): PWM frequency in Hz - """ - pass - @abstractmethod - def cleanup_channel(self, pin: int) -> None: - """Cleanup a GPIO pin +class RPiGPIORepository(GPIORepository): + def __init__(self, gpio: Any, pin: int, frequency: float, duty_cycle: float = 0.0) -> None: + self._lock: threading.Lock = threading.Lock() + self.gpio = gpio + self.pin: int = int(pin) + self.frequency: float = float(frequency) + self.duty_cycle: float = float(duty_cycle) + self.pwm: Optional[Any] = None - Args: - pin (int): GPIO pin number - """ - pass + required_attrs = ("setmode", "BCM", "setup", "OUT", "PWM", "cleanup") + missing = [ + attr for attr in required_attrs if not hasattr(self.gpio, attr)] + if missing: + raise RuntimeError( + f"gpio object missing attributes: {', '.join(missing)}, gpio interface must have these methods and properties to function properly") - @abstractmethod - def cleanup_all(self) -> None: - """Cleanup all GPIO pins - """ - pass + # Configure and start PWM + self.gpio.setmode(self.gpio.BCM) + self.gpio.setup(self.pin, self.gpio.OUT) - @abstractmethod - def set_duty_cycle(self, pin: int, duty_cycle: float) -> None: - """Set the duty cycle for a PWM pin + def setup_pwm(self) -> None: + with self._lock: + if self.pwm is not None: + try: + self.pwm.stop() + except Exception: + raise RuntimeError( + "Failed to stop existing PWM instance, could not reinitialize PWM") + self.pwm = self.gpio.PWM(self.pin, self.frequency) + self.pwm.start(self.duty_cycle) - Args: - pin (int): GPIO pin number - duty_cycle (float): Duty cycle percentage (0.0 to 100.0) - """ - pass + def set_duty_cycle(self, duty_cycle: float) -> None: + dc = max(0.0, min(100.0, float(duty_cycle))) + with self._lock: + self.duty_cycle = dc + if self.pwm is not None: + self.pwm.ChangeDutyCycle(self.duty_cycle) - @abstractmethod - def get_duty_cycle(self, pin: int) -> float: - """Get the current duty cycle for a PWM pin - - Args: - pin (int): GPIO pin number - Returns: - float: Current duty cycle percentage - """ - pass + def get_duty_cycle(self) -> float: + with self._lock: + return self.duty_cycle + + def cleanup(self) -> None: + with self._lock: + if self.pwm is not None: + try: + self.pwm.stop() + except Exception: + raise RuntimeError( + "Failed to stop PWM instance during cleanup") + try: + self.gpio.cleanup(self.pin) + except Exception: + raise RuntimeError( + "Failed to cleanup GPIO pin during cleanup") diff --git a/apps/heater-api/services/PID.py b/apps/heater-api/services/PID.py new file mode 100644 index 0000000..388e9cf --- /dev/null +++ b/apps/heater-api/services/PID.py @@ -0,0 +1,79 @@ +import threading +from typing import Optional + + +class PIDController: + def __init__(self, kp: float, ki: float, kd: float, setpoint: float = 0.0): + self.kp = float(kp) + self.ki = float(ki) + self.kd = float(kd) + self.setpoint = float(setpoint) + + self._integral = 0.0 + self._last_error = 0.0 + self._last_measurement: Optional[float] = None + self._lock = threading.Lock() + + # ---------------------- + # PID Logic + # ---------------------- + def update(self, measurement: float) -> float: + with self._lock: + error = self.setpoint - measurement + new_integral = self._integral + error + derivative = 0.0 if self._last_measurement is None else ( + error - self._last_error) + + output = ( + (self.kp * error) + + (self.ki * new_integral) + + (self.kd * derivative) + ) + + self._last_error = error + self._last_measurement = measurement + + # Anti-windup + if 0.0 <= output <= 100.0: + self._integral = new_integral + else: + output = max(0.0, min(100.0, output)) + self._integral = max(min(new_integral, 100.0), -100.0) + + return output + + # ---------------------- + # Tuning and Setpoint + # ---------------------- + def set_coefficients(self, kp: float, ki: float, kd: float): + with self._lock: + self.kp = float(kp) + self.ki = float(ki) + self.kd = float(kd) + + def set_setpoint(self, new_setpoint: float): + with self._lock: + self.setpoint = float(new_setpoint) + self._integral = 0.0 + self._last_error = 0.0 + + def reset(self, setpoint: float): + with self._lock: + self.setpoint = float(setpoint) + self._integral = 0.0 + self._last_error = 0.0 + self._last_measurement = None + + # ---------------------- + # Utility + # ---------------------- + def get_state(self): + with self._lock: + return { + "Kp": self.kp, + "Ki": self.ki, + "Kd": self.kd, + "setpoint": self.setpoint, + "integral": self._integral, + "last_error": self._last_error, + } diff --git a/apps/heater-api/services/temp_service.py b/apps/heater-api/services/temp_service.py index f5a639e..5fbe57a 100644 --- a/apps/heater-api/services/temp_service.py +++ b/apps/heater-api/services/temp_service.py @@ -1,25 +1,51 @@ +import threading +import time from mocks.gpio_mock import GPIORepositoryMock from repositories.gpio import GPIORepository from schemas.temp_control import Parameters, StatusOut +from PID import PIDController +from lgg_client import ReadingApi, ApiClient, Configuration + +config = Configuration(host="http://localhost:8000") +api_client = ApiClient(config) + +readingApi = ReadingApi(api_client=api_client) class TempService: def __init__(self): - self.gpio: GPIORepository = GPIORepositoryMock() - self.gpio.setup_pwm(pin=18, frequency=0.2) - self._target = 30.0 self._params = Parameters() self.pin = 18 + self._pid = PIDController( + kp=self._params.kp, + ki=self._params.ki, + kd=self._params.kd, + setpoint=self._target, + ) + + # Mock GPIO for development + self.gpio: GPIORepository = GPIORepositoryMock() + self.gpio.setup_pwm() + + # Real GPIO + # import RPi.GPIO as GPIO + # self.gpio: GPIORepository = GPIORepository(GPIO, pin=self.pin, frequency=0.2, duty_cycle=0.0) + # self.gpio.setup_pwm() + + self._running = False + self._thread: threading.Thread | None = None + def get_target(self): return self._target def set_target(self, value: float): self._target = value + self._pid.set_setpoint(value) def get_status(self): - duty_cycle = self.gpio.get_duty_cycle(pin=self.pin) + duty_cycle = self.gpio.get_duty_cycle() return StatusOut(target=self._target, duty_cycle=duty_cycle) def get_parameters(self): @@ -27,10 +53,43 @@ def get_parameters(self): def set_parameters(self, params: Parameters): self._params = params + self._pid.set_coefficients( + kp=params.kp, + ki=params.ki, + kd=params.kd, + ) return self._params + def start(self): + """Start the temperature control loop in a background thread.""" + if self._running: + print("Control loop already running.") + return + + self._running = True + self._thread = threading.Thread(target=self._control_loop, daemon=True) + self._thread.start() + print("Temperature control loop started.") + def stop(self): - self.gpio.set_duty_cycle(pin=self.pin, duty_cycle=0.0) + """Stop the temperature control loop and set duty cycle to zero.""" + self._running = False + if self._thread: + self._thread.join(timeout=2.0) + self.gpio.set_duty_cycle(duty_cycle=0.0) + print("Temperature control loop stopped and heater turned off.") - def start(self): - raise NotImplementedError("Start method not implemented yet") + def _control_loop(self): + """Internal method that runs in background to control temperature.""" + while self._running: + current_temp = readingApi.get_monitoring(1).kelvin - 273.15 + duty = self._pid.update(current_temp) + self.gpio.set_duty_cycle(duty_cycle=duty) + + print( + f"T={current_temp:.2f}°C | Target={self._target:.1f}°C | Duty={duty:.1f}%") + + time.sleep(5) + + def cleanup(self): + self.gpio.cleanup()