From a0759dcfd7b3f1defb942933351b06dfa94ddc93 Mon Sep 17 00:00:00 2001 From: Xoffroad <65235705+Xoffroad@users.noreply.github.com> Date: Tue, 14 Apr 2026 11:03:20 +0200 Subject: [PATCH 1/6] Qcells active battery control (#3244) * adding active battery control for qcells and new field for overload security * remove field for max_power and use max_charge_power and max_discharge_power instead * Apply suggestion from @seaspotter Co-authored-by: SeaSpotter * Apply suggestions from code review Co-authored-by: SeaSpotter * Update packages/modules/devices/qcells/qcells/bat.py Co-authored-by: SeaSpotter * Indentation corrected * remove whitespaces --------- Co-authored-by: SeaSpotter --- packages/modules/devices/qcells/qcells/bat.py | 70 ++++++++++++++++++- 1 file changed, 69 insertions(+), 1 deletion(-) diff --git a/packages/modules/devices/qcells/qcells/bat.py b/packages/modules/devices/qcells/qcells/bat.py index 3a68bd58cd..2697076f35 100644 --- a/packages/modules/devices/qcells/qcells/bat.py +++ b/packages/modules/devices/qcells/qcells/bat.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 -from typing import TypedDict, Any +import logging +from typing import TypedDict, Any, Optional from modules.common.abstract_device import AbstractBat from modules.common.component_state import BatState @@ -11,6 +12,25 @@ from modules.common.utils.peak_filter import PeakFilter from modules.common.component_type import ComponentType +log = logging.getLogger(__name__) + +# Solax/QCells Mode 8 Remote Control Registers (Holding Registers) +# Speichersteuerung via "Individual Setting - Duration Mode" +# Unterstuetzte Hardware: QCells Q.VOLT HYB-G3-3P (Solax Gen4), +# Solax Gen4/Gen5/Gen6 Hybrid und AC Wechselrichter. +REMOTE_CONTROL_MODE_REG = 0xA0 # U16: 0=Disabled, 8=Individual Duration +REMOTE_CONTROL_SET_TYPE_REG = 0xA1 # U16: 1=Set +REMOTE_CONTROL_PV_LIMIT_REG = 0xA2 # U32: PV Power Limit in Watt (keine Begrenzung = 30000) +REMOTE_CONTROL_PUSH_POWER_REG = 0xA4 # S32: Battery Push Power (+Entladung, -Ladung) +REMOTE_CONTROL_DURATION_REG = 0xA6 # U16: Dauer in Sekunden +REMOTE_CONTROL_TIMEOUT_REG = 0xA7 # U16: Timeout in Sekunden + +MODE_8_INDIVIDUAL_DURATION = 8 +SET_TYPE_SET = 1 +PV_LIMIT_NO_CURTAILMENT = 30000 +REMOTE_CONTROL_DURATION = 300 +REMOTE_CONTROL_TIMEOUT = 300 + class KwargsDict(TypedDict): modbus_id: int @@ -28,6 +48,7 @@ def initialize(self) -> None: self.store = get_bat_value_store(self.component_config.id) self.fault_state = FaultState(ComponentInfo.from_component_config(self.component_config)) self.peak_filter = PeakFilter(ComponentType.BAT, self.component_config.id, self.fault_state) + self.last_mode: Optional[str] = 'Undefined' def update(self) -> None: power = self.client.read_input_registers(0x0016, ModbusDataType.INT_16, unit=self.__modbus_id) @@ -46,5 +67,52 @@ def update(self) -> None: ) self.store.set(bat_state) + def set_power_limit(self, power_limit: Optional[int]) -> None: + unit = self.__modbus_id + log.debug(f"QCells set_power_limit: power_limit={power_limit}, " + f"last_mode={self.last_mode}") + + if power_limit is None: + log.debug("Keine Batteriesteuerung, Selbstregelung durch Wechselrichter") + if self.last_mode is not None: + with self.client: + self.client.write_register( + REMOTE_CONTROL_MODE_REG, 0, data_type=ModbusDataType.UINT_16, unit=unit) + self.last_mode = None + else: + log.debug("Aktive Batteriesteuerung aktiv") + if self.last_mode != 'limited': + self.last_mode = 'limited' + + # Berechne power value: 0 = stop, != 0 = multipliziere mit -1 + power_value = 0 if power_limit == 0 else int(power_limit) * -1 + + self._write_mode8(power_value, unit=unit) + + def _write_mode8(self, power_value: int, unit: int) -> None: + """Schreibt die Mode 8 Remote Control Register (0xA0-0xA7).""" + with self.client: + self.client.write_register( + REMOTE_CONTROL_MODE_REG, MODE_8_INDIVIDUAL_DURATION, + data_type=ModbusDataType.UINT_16, unit=unit) + self.client.write_register( + REMOTE_CONTROL_SET_TYPE_REG, SET_TYPE_SET, + data_type=ModbusDataType.UINT_16, unit=unit) + self.client.write_register( + REMOTE_CONTROL_PV_LIMIT_REG, PV_LIMIT_NO_CURTAILMENT, + data_type=ModbusDataType.UINT_32, unit=unit) + self.client.write_register( + REMOTE_CONTROL_PUSH_POWER_REG, power_value, + data_type=ModbusDataType.INT_32, unit=unit) + self.client.write_register( + REMOTE_CONTROL_DURATION_REG, REMOTE_CONTROL_DURATION, + data_type=ModbusDataType.UINT_16, unit=unit) + self.client.write_register( + REMOTE_CONTROL_TIMEOUT_REG, REMOTE_CONTROL_TIMEOUT, + data_type=ModbusDataType.UINT_16, unit=unit) + + def power_limit_controllable(self) -> bool: + return True + component_descriptor = ComponentDescriptor(configuration_factory=QCellsBatSetup) From dc69a35362306b39397b4a37a2315c243dc00d7d Mon Sep 17 00:00:00 2001 From: Xoffroad <65235705+Xoffroad@users.noreply.github.com> Date: Mon, 20 Apr 2026 14:58:09 +0200 Subject: [PATCH 2/6] Qcells active battery control (#3306) * adding active battery control for qcells and new field for overload security * remove field for max_power and use max_charge_power and max_discharge_power instead * fix: switch qcells battery control to mode 1 with dynamic timing; mode 8 was not stable --- packages/modules/devices/qcells/qcells/bat.py | 104 ++++++++++++------ 1 file changed, 69 insertions(+), 35 deletions(-) diff --git a/packages/modules/devices/qcells/qcells/bat.py b/packages/modules/devices/qcells/qcells/bat.py index 2697076f35..0db941fd47 100644 --- a/packages/modules/devices/qcells/qcells/bat.py +++ b/packages/modules/devices/qcells/qcells/bat.py @@ -1,7 +1,9 @@ #!/usr/bin/env python3 import logging from typing import TypedDict, Any, Optional +from pymodbus.constants import Endian +from control import data from modules.common.abstract_device import AbstractBat from modules.common.component_state import BatState from modules.common.component_type import ComponentDescriptor @@ -14,22 +16,19 @@ log = logging.getLogger(__name__) -# Solax/QCells Mode 8 Remote Control Registers (Holding Registers) -# Speichersteuerung via "Individual Setting - Duration Mode" -# Unterstuetzte Hardware: QCells Q.VOLT HYB-G3-3P (Solax Gen4), -# Solax Gen4/Gen5/Gen6 Hybrid und AC Wechselrichter. -REMOTE_CONTROL_MODE_REG = 0xA0 # U16: 0=Disabled, 8=Individual Duration -REMOTE_CONTROL_SET_TYPE_REG = 0xA1 # U16: 1=Set -REMOTE_CONTROL_PV_LIMIT_REG = 0xA2 # U32: PV Power Limit in Watt (keine Begrenzung = 30000) -REMOTE_CONTROL_PUSH_POWER_REG = 0xA4 # S32: Battery Push Power (+Entladung, -Ladung) -REMOTE_CONTROL_DURATION_REG = 0xA6 # U16: Dauer in Sekunden -REMOTE_CONTROL_TIMEOUT_REG = 0xA7 # U16: Timeout in Sekunden - -MODE_8_INDIVIDUAL_DURATION = 8 +# Solax/QCells Mode 1 Remote Control Registers (Holding Registers) +# Speichersteuerung via Active Power Sollwert. +REMOTE_CONTROL_MODE_REG = 0x7C # U16: 0=Disabled, 1=Enabled Power Control +REMOTE_CONTROL_SET_TYPE_REG = 0x7D # U16: 1=Set +REMOTE_CONTROL_ACTIVE_POWER_REG = 0x7E # S32: Active Power Sollwert in Watt +REMOTE_CONTROL_DURATION_REG = 0x82 # U16: Dauer in Sekunden +REMOTE_CONTROL_TIMEOUT_REG = 0x88 # U16: Timeout in Sekunden + +MODE_1_DISABLED = 0 +MODE_1_ENABLED_POWER_CONTROL = 1 SET_TYPE_SET = 1 -PV_LIMIT_NO_CURTAILMENT = 30000 -REMOTE_CONTROL_DURATION = 300 -REMOTE_CONTROL_TIMEOUT = 300 +MIN_REMOTE_CONTROL_DURATION = 20 +MIN_REMOTE_CONTROL_TIMEOUT = 60 class KwargsDict(TypedDict): @@ -77,40 +76,75 @@ def set_power_limit(self, power_limit: Optional[int]) -> None: if self.last_mode is not None: with self.client: self.client.write_register( - REMOTE_CONTROL_MODE_REG, 0, data_type=ModbusDataType.UINT_16, unit=unit) + REMOTE_CONTROL_MODE_REG, MODE_1_DISABLED, + data_type=ModbusDataType.UINT_16, unit=unit) self.last_mode = None else: - log.debug("Aktive Batteriesteuerung aktiv") - if self.last_mode != 'limited': - self.last_mode = 'limited' - - # Berechne power value: 0 = stop, != 0 = multipliziere mit -1 - power_value = 0 if power_limit == 0 else int(power_limit) * -1 - - self._write_mode8(power_value, unit=unit) - - def _write_mode8(self, power_value: int, unit: int) -> None: - """Schreibt die Mode 8 Remote Control Register (0xA0-0xA7).""" + if power_limit < 0: + self.last_mode = 'discharge' + elif power_limit > 0: + self.last_mode = 'charge' + else: + self.last_mode = 'stop' + + ap_target = self._get_active_power_target(int(power_limit)) + self._write_mode1(ap_target, unit=unit) + + def _get_active_power_target(self, power_limit: int) -> int: + # openWB-Werte verwenden (nicht WR-Berechnungen): + # power_limit < 0 = Entladen, 0 = Stop, > 0 = Laden + home_consumption = int(data.data.counter_all_data.data.set.home_consumption) + cp_power = int(data.data.cp_all_data.data.get.power) + house_load = max(0, home_consumption + cp_power) + pv_generation = max(0, int(data.data.pv_all_data.data.get.power * -1)) + ap_target = power_limit + house_load - pv_generation + + try: + evu_counter = data.data.counter_all_data.get_evu_counter() + import_limit = int(evu_counter.data.config.max_total_power) + except Exception: + import_limit = 0 + + if import_limit > 0: + ap_target = min(ap_target, import_limit) + + log.debug(( + f"QCells Mode1 target: power_limit={power_limit}W, home_consumption={home_consumption}W, " + f"cp_power={cp_power}W, house_load={house_load}W, " + f"pv_generation={pv_generation}W, import_limit={import_limit}W -> ap_target={ap_target}W" + )) + return int(ap_target) + + def _write_mode1(self, ap_target: int, unit: int) -> None: + """Schreibt die Mode 1 Remote Control Register (0x7C-0x88).""" + duration, timeout = self._get_mode1_timing() with self.client: self.client.write_register( - REMOTE_CONTROL_MODE_REG, MODE_8_INDIVIDUAL_DURATION, + REMOTE_CONTROL_MODE_REG, MODE_1_ENABLED_POWER_CONTROL, data_type=ModbusDataType.UINT_16, unit=unit) self.client.write_register( REMOTE_CONTROL_SET_TYPE_REG, SET_TYPE_SET, data_type=ModbusDataType.UINT_16, unit=unit) self.client.write_register( - REMOTE_CONTROL_PV_LIMIT_REG, PV_LIMIT_NO_CURTAILMENT, - data_type=ModbusDataType.UINT_32, unit=unit) + REMOTE_CONTROL_ACTIVE_POWER_REG, ap_target, + data_type=ModbusDataType.INT_32, wordorder=Endian.Little, unit=unit) self.client.write_register( - REMOTE_CONTROL_PUSH_POWER_REG, power_value, - data_type=ModbusDataType.INT_32, unit=unit) - self.client.write_register( - REMOTE_CONTROL_DURATION_REG, REMOTE_CONTROL_DURATION, + REMOTE_CONTROL_DURATION_REG, duration, data_type=ModbusDataType.UINT_16, unit=unit) self.client.write_register( - REMOTE_CONTROL_TIMEOUT_REG, REMOTE_CONTROL_TIMEOUT, + REMOTE_CONTROL_TIMEOUT_REG, timeout, data_type=ModbusDataType.UINT_16, unit=unit) + def _get_mode1_timing(self) -> tuple[int, int]: + try: + control_interval = int(data.data.general_data.data.control_interval) + except Exception: + control_interval = 10 + + duration = max(MIN_REMOTE_CONTROL_DURATION, control_interval * 2) + timeout = max(MIN_REMOTE_CONTROL_TIMEOUT, control_interval * 3) + return duration, timeout + def power_limit_controllable(self) -> bool: return True From 4f3b90e19da4c188eca00579488fd156361b62a1 Mon Sep 17 00:00:00 2001 From: Xoffroad <65235705+Xoffroad@users.noreply.github.com> Date: Wed, 22 Apr 2026 08:50:38 +0200 Subject: [PATCH 3/6] Qcells active battery control - some fixes (#3314) * adding active battery control for qcells and new field for overload security * remove field for max_power and use max_charge_power and max_discharge_power instead * fix: stabilize QCells active battery control in mode 1 Reworked QCells battery control to a stable Mode 1 workflow aligned with the validated HA behavior. The target calculation was corrected to avoid unintended forced charging, import limiting was applied consistently for net import control, and the full Mode 1 control payload is now written each cycle with dynamic timing based on the configured openWB control interval. --- packages/modules/devices/qcells/qcells/bat.py | 33 ++++++++++++++++--- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/packages/modules/devices/qcells/qcells/bat.py b/packages/modules/devices/qcells/qcells/bat.py index 0db941fd47..13b1cbb9f7 100644 --- a/packages/modules/devices/qcells/qcells/bat.py +++ b/packages/modules/devices/qcells/qcells/bat.py @@ -21,7 +21,11 @@ REMOTE_CONTROL_MODE_REG = 0x7C # U16: 0=Disabled, 1=Enabled Power Control REMOTE_CONTROL_SET_TYPE_REG = 0x7D # U16: 1=Set REMOTE_CONTROL_ACTIVE_POWER_REG = 0x7E # S32: Active Power Sollwert in Watt +REMOTE_CONTROL_REACTIVE_POWER_REG = 0x80 # S32: Reactive Power Sollwert (0) REMOTE_CONTROL_DURATION_REG = 0x82 # U16: Dauer in Sekunden +REMOTE_CONTROL_TARGET_SOC_REG = 0x83 # U16: Target SoC (Dummy 0) +REMOTE_CONTROL_TARGET_ENERGY_REG = 0x84 # U32: Target Energy (Dummy 0) +REMOTE_CONTROL_TARGET_POWER_REG = 0x86 # S32: Target Charge/Discharge Power (Dummy 0) REMOTE_CONTROL_TIMEOUT_REG = 0x88 # U16: Timeout in Sekunden MODE_1_DISABLED = 0 @@ -97,7 +101,8 @@ def _get_active_power_target(self, power_limit: int) -> int: cp_power = int(data.data.cp_all_data.data.get.power) house_load = max(0, home_consumption + cp_power) pv_generation = max(0, int(data.data.pv_all_data.data.get.power * -1)) - ap_target = power_limit + house_load - pv_generation + # Mode 1 / Enabled Battery Control: house_load wird inverter-intern bereits berücksichtigt. + ap_target = power_limit - pv_generation try: evu_counter = data.data.counter_all_data.get_evu_counter() @@ -105,19 +110,27 @@ def _get_active_power_target(self, power_limit: int) -> int: except Exception: import_limit = 0 - if import_limit > 0: - ap_target = min(ap_target, import_limit) + import_bound = None + if import_limit > 0 and ap_target > 0: + import_bound = import_limit - house_load + ap_target = min(ap_target, import_bound) log.debug(( f"QCells Mode1 target: power_limit={power_limit}W, home_consumption={home_consumption}W, " f"cp_power={cp_power}W, house_load={house_load}W, " - f"pv_generation={pv_generation}W, import_limit={import_limit}W -> ap_target={ap_target}W" + f"pv_generation={pv_generation}W, import_limit={import_limit}W, " + f"import_bound={import_bound}W -> ap_target={ap_target}W" )) return int(ap_target) def _write_mode1(self, ap_target: int, unit: int) -> None: """Schreibt die Mode 1 Remote Control Register (0x7C-0x88).""" duration, timeout = self._get_mode1_timing() + log.debug(( + f"QCells Mode1 write: mode={MODE_1_ENABLED_POWER_CONTROL}, set_type={SET_TYPE_SET}, " + f"active_power={ap_target}W, reactive_power=0var, duration={duration}s, " + f"target_soc=0, target_energy=0Wh, target_power=0W, timeout={timeout}s" + )) with self.client: self.client.write_register( REMOTE_CONTROL_MODE_REG, MODE_1_ENABLED_POWER_CONTROL, @@ -128,9 +141,21 @@ def _write_mode1(self, ap_target: int, unit: int) -> None: self.client.write_register( REMOTE_CONTROL_ACTIVE_POWER_REG, ap_target, data_type=ModbusDataType.INT_32, wordorder=Endian.Little, unit=unit) + self.client.write_register( + REMOTE_CONTROL_REACTIVE_POWER_REG, 0, + data_type=ModbusDataType.INT_32, wordorder=Endian.Little, unit=unit) self.client.write_register( REMOTE_CONTROL_DURATION_REG, duration, data_type=ModbusDataType.UINT_16, unit=unit) + self.client.write_register( + REMOTE_CONTROL_TARGET_SOC_REG, 0, + data_type=ModbusDataType.UINT_16, unit=unit) + self.client.write_register( + REMOTE_CONTROL_TARGET_ENERGY_REG, 0, + data_type=ModbusDataType.UINT_32, wordorder=Endian.Little, unit=unit) + self.client.write_register( + REMOTE_CONTROL_TARGET_POWER_REG, 0, + data_type=ModbusDataType.INT_32, wordorder=Endian.Little, unit=unit) self.client.write_register( REMOTE_CONTROL_TIMEOUT_REG, timeout, data_type=ModbusDataType.UINT_16, unit=unit) From 00d593ddd98e930f22456c6f8b9e810b3b1d4b4a Mon Sep 17 00:00:00 2001 From: Xoffroad <65235705+Xoffroad@users.noreply.github.com> Date: Thu, 23 Apr 2026 15:23:34 +0200 Subject: [PATCH 4/6] fix: align QCells Mode 1 battery control with openWB semantics and timing (#3318) * adding active battery control for qcells and new field for overload security * remove field for max_power and use max_charge_power and max_discharge_power instead * fix: align QCells Mode 1 battery control with openWB semantics and timing --- packages/modules/devices/qcells/qcells/bat.py | 38 +++--- .../modules/devices/qcells/qcells/bat_test.py | 109 ++++++++++++++++++ 2 files changed, 131 insertions(+), 16 deletions(-) create mode 100644 packages/modules/devices/qcells/qcells/bat_test.py diff --git a/packages/modules/devices/qcells/qcells/bat.py b/packages/modules/devices/qcells/qcells/bat.py index 13b1cbb9f7..d40dcb69a9 100644 --- a/packages/modules/devices/qcells/qcells/bat.py +++ b/packages/modules/devices/qcells/qcells/bat.py @@ -26,13 +26,14 @@ REMOTE_CONTROL_TARGET_SOC_REG = 0x83 # U16: Target SoC (Dummy 0) REMOTE_CONTROL_TARGET_ENERGY_REG = 0x84 # U32: Target Energy (Dummy 0) REMOTE_CONTROL_TARGET_POWER_REG = 0x86 # S32: Target Charge/Discharge Power (Dummy 0) -REMOTE_CONTROL_TIMEOUT_REG = 0x88 # U16: Timeout in Sekunden +REMOTE_CONTROL_TIMEOUT_REG = 0x88 # U16: Timeout in Sekunden (0 = deaktiviert) MODE_1_DISABLED = 0 MODE_1_ENABLED_POWER_CONTROL = 1 SET_TYPE_SET = 1 MIN_REMOTE_CONTROL_DURATION = 20 -MIN_REMOTE_CONTROL_TIMEOUT = 60 +DURATION_SAFETY_BUFFER = 10 +MODE_1_TIMEOUT_DISABLED = 0 class KwargsDict(TypedDict): @@ -101,35 +102,42 @@ def _get_active_power_target(self, power_limit: int) -> int: cp_power = int(data.data.cp_all_data.data.get.power) house_load = max(0, home_consumption + cp_power) pv_generation = max(0, int(data.data.pv_all_data.data.get.power * -1)) - # Mode 1 / Enabled Battery Control: house_load wird inverter-intern bereits berücksichtigt. - ap_target = power_limit - pv_generation + bat_power = int(data.data.bat_all_data.data.get.power) + ap_target_raw = power_limit try: evu_counter = data.data.counter_all_data.get_evu_counter() + evu_power = int(evu_counter.data.get.power) import_limit = int(evu_counter.data.config.max_total_power) except Exception: + evu_power = 0 import_limit = 0 import_bound = None - if import_limit > 0 and ap_target > 0: - import_bound = import_limit - house_load - ap_target = min(ap_target, import_bound) + if power_limit == 0: + ap_target = 0 + elif power_limit > 0 and import_limit > 0: + import_bound = max(0, import_limit - house_load) + ap_target = min(ap_target_raw, import_bound) + else: + ap_target = ap_target_raw log.debug(( - f"QCells Mode1 target: power_limit={power_limit}W, home_consumption={home_consumption}W, " + f"QCells Mode1 target: power_limit={power_limit}W, bat_power={bat_power}W, evu_power={evu_power}W, " + f"home_consumption={home_consumption}W, " f"cp_power={cp_power}W, house_load={house_load}W, " f"pv_generation={pv_generation}W, import_limit={import_limit}W, " - f"import_bound={import_bound}W -> ap_target={ap_target}W" + f"import_bound={import_bound}W, ap_target_raw={ap_target_raw}W -> ap_target={ap_target}W" )) return int(ap_target) def _write_mode1(self, ap_target: int, unit: int) -> None: """Schreibt die Mode 1 Remote Control Register (0x7C-0x88).""" - duration, timeout = self._get_mode1_timing() + duration = self._get_mode1_duration() log.debug(( f"QCells Mode1 write: mode={MODE_1_ENABLED_POWER_CONTROL}, set_type={SET_TYPE_SET}, " f"active_power={ap_target}W, reactive_power=0var, duration={duration}s, " - f"target_soc=0, target_energy=0Wh, target_power=0W, timeout={timeout}s" + f"target_soc=0, target_energy=0Wh, target_power=0W, timeout={MODE_1_TIMEOUT_DISABLED}s" )) with self.client: self.client.write_register( @@ -157,18 +165,16 @@ def _write_mode1(self, ap_target: int, unit: int) -> None: REMOTE_CONTROL_TARGET_POWER_REG, 0, data_type=ModbusDataType.INT_32, wordorder=Endian.Little, unit=unit) self.client.write_register( - REMOTE_CONTROL_TIMEOUT_REG, timeout, + REMOTE_CONTROL_TIMEOUT_REG, MODE_1_TIMEOUT_DISABLED, data_type=ModbusDataType.UINT_16, unit=unit) - def _get_mode1_timing(self) -> tuple[int, int]: + def _get_mode1_duration(self) -> int: try: control_interval = int(data.data.general_data.data.control_interval) except Exception: control_interval = 10 - duration = max(MIN_REMOTE_CONTROL_DURATION, control_interval * 2) - timeout = max(MIN_REMOTE_CONTROL_TIMEOUT, control_interval * 3) - return duration, timeout + return max(MIN_REMOTE_CONTROL_DURATION, control_interval + DURATION_SAFETY_BUFFER) def power_limit_controllable(self) -> bool: return True diff --git a/packages/modules/devices/qcells/qcells/bat_test.py b/packages/modules/devices/qcells/qcells/bat_test.py new file mode 100644 index 0000000000..b1016810b1 --- /dev/null +++ b/packages/modules/devices/qcells/qcells/bat_test.py @@ -0,0 +1,109 @@ +from types import SimpleNamespace + +from modules.devices.qcells.qcells import bat +from modules.devices.qcells.qcells.config import QCellsBatSetup + + +def _fake_data( + home_consumption: int, + cp_power: int, + pv_power: int, + bat_power: int, + evu_power: int, + import_limit: int, +) -> SimpleNamespace: + evu_counter = SimpleNamespace( + data=SimpleNamespace( + get=SimpleNamespace(power=evu_power), + config=SimpleNamespace(max_total_power=import_limit), + ) + ) + return SimpleNamespace( + counter_all_data=SimpleNamespace( + data=SimpleNamespace(set=SimpleNamespace(home_consumption=home_consumption)), + get_evu_counter=lambda: evu_counter, + ), + cp_all_data=SimpleNamespace(data=SimpleNamespace(get=SimpleNamespace(power=cp_power))), + pv_all_data=SimpleNamespace(data=SimpleNamespace(get=SimpleNamespace(power=pv_power))), + bat_all_data=SimpleNamespace(data=SimpleNamespace(get=SimpleNamespace(power=bat_power))), + ) + + +def _create_qcells_bat() -> bat.QCellsBat: + return bat.QCellsBat(QCellsBatSetup(), modbus_id=1, client=SimpleNamespace()) + + +def test_get_active_power_target_stop_is_zero(monkeypatch) -> None: + qcells_bat = _create_qcells_bat() + monkeypatch.setattr( + bat.data, + "data", + _fake_data( + home_consumption=450, + cp_power=5500, + pv_power=-5600, + bat_power=-200, + evu_power=100, + import_limit=24000, + ), + raising=False, + ) + + assert qcells_bat._get_active_power_target(0) == 0 + + +def test_get_active_power_target_discharge_keeps_limit(monkeypatch) -> None: + qcells_bat = _create_qcells_bat() + monkeypatch.setattr( + bat.data, + "data", + _fake_data( + home_consumption=500, + cp_power=4800, + pv_power=-5200, + bat_power=-900, + evu_power=-300, + import_limit=24000, + ), + raising=False, + ) + + assert qcells_bat._get_active_power_target(-700) == -700 + + +def test_get_active_power_target_charge_clamped_by_import_limit(monkeypatch) -> None: + qcells_bat = _create_qcells_bat() + monkeypatch.setattr( + bat.data, + "data", + _fake_data( + home_consumption=600, + cp_power=5400, + pv_power=-4000, + bat_power=300, + evu_power=1800, + import_limit=6200, + ), + raising=False, + ) + + assert qcells_bat._get_active_power_target(1200) == 200 + + +def test_get_active_power_target_charge_clamped_to_zero_without_headroom(monkeypatch) -> None: + qcells_bat = _create_qcells_bat() + monkeypatch.setattr( + bat.data, + "data", + _fake_data( + home_consumption=800, + cp_power=5200, + pv_power=-4500, + bat_power=100, + evu_power=5800, + import_limit=5000, + ), + raising=False, + ) + + assert qcells_bat._get_active_power_target(1000) == 0 From 9aecbdded7129bb144a388c44c8019e0455465cd Mon Sep 17 00:00:00 2001 From: Xoffroad <65235705+Xoffroad@users.noreply.github.com> Date: Fri, 24 Apr 2026 10:01:21 +0200 Subject: [PATCH 5/6] fix: switch QCells remote control to Enabled Battery Control (mode 12) (#3320) * adding active battery control for qcells and new field for overload security * remove field for max_power and use max_charge_power and max_discharge_power instead * fix: switch QCells remote control to Enabled Battery Control (mode 12) * fix: switch QCells battery control to SolaX mode 4 push-power with atomic Modbus block writes --- packages/modules/conftest.py | 1 + packages/modules/devices/qcells/qcells/bat.py | 202 +++++++----------- .../modules/devices/qcells/qcells/bat_test.py | 100 +-------- 3 files changed, 88 insertions(+), 215 deletions(-) diff --git a/packages/modules/conftest.py b/packages/modules/conftest.py index 9733696c81..481004bc61 100644 --- a/packages/modules/conftest.py +++ b/packages/modules/conftest.py @@ -31,6 +31,7 @@ sys.modules['pymodbus.constants'] = module module = type(sys)('pymodbus.payload') +module.BinaryPayloadBuilder = Mock() module.BinaryPayloadDecoder = Mock() sys.modules['pymodbus.payload'] = module diff --git a/packages/modules/devices/qcells/qcells/bat.py b/packages/modules/devices/qcells/qcells/bat.py index d40dcb69a9..822f0a24f1 100644 --- a/packages/modules/devices/qcells/qcells/bat.py +++ b/packages/modules/devices/qcells/qcells/bat.py @@ -1,39 +1,38 @@ #!/usr/bin/env python3 import logging -from typing import TypedDict, Any, Optional +from typing import Any, Optional, TypedDict + from pymodbus.constants import Endian +from pymodbus.payload import BinaryPayloadBuilder -from control import data from modules.common.abstract_device import AbstractBat from modules.common.component_state import BatState -from modules.common.component_type import ComponentDescriptor +from modules.common.component_type import ComponentDescriptor, ComponentType from modules.common.fault_state import ComponentInfo, FaultState from modules.common.modbus import ModbusDataType, ModbusTcpClient_ from modules.common.store import get_bat_value_store -from modules.devices.qcells.qcells.config import QCellsBatSetup from modules.common.utils.peak_filter import PeakFilter -from modules.common.component_type import ComponentType +from modules.devices.qcells.qcells.config import QCellsBatSetup log = logging.getLogger(__name__) -# Solax/QCells Mode 1 Remote Control Registers (Holding Registers) -# Speichersteuerung via Active Power Sollwert. -REMOTE_CONTROL_MODE_REG = 0x7C # U16: 0=Disabled, 1=Enabled Power Control -REMOTE_CONTROL_SET_TYPE_REG = 0x7D # U16: 1=Set -REMOTE_CONTROL_ACTIVE_POWER_REG = 0x7E # S32: Active Power Sollwert in Watt -REMOTE_CONTROL_REACTIVE_POWER_REG = 0x80 # S32: Reactive Power Sollwert (0) -REMOTE_CONTROL_DURATION_REG = 0x82 # U16: Dauer in Sekunden -REMOTE_CONTROL_TARGET_SOC_REG = 0x83 # U16: Target SoC (Dummy 0) -REMOTE_CONTROL_TARGET_ENERGY_REG = 0x84 # U32: Target Energy (Dummy 0) -REMOTE_CONTROL_TARGET_POWER_REG = 0x86 # S32: Target Charge/Discharge Power (Dummy 0) -REMOTE_CONTROL_TIMEOUT_REG = 0x88 # U16: Timeout in Sekunden (0 = deaktiviert) - -MODE_1_DISABLED = 0 -MODE_1_ENABLED_POWER_CONTROL = 1 +# Solax/QCells Remote Control Registers (Holding Registers) +REMOTE_CONTROL_MODE_REG = 0x7C +REMOTE_CONTROL_SET_TYPE_REG = 0x7D +REMOTE_CONTROL_ACTIVE_POWER_REG = 0x7E +REMOTE_CONTROL_REACTIVE_POWER_REG = 0x80 +REMOTE_CONTROL_DURATION_REG = 0x82 +REMOTE_CONTROL_TARGET_SOC_REG = 0x83 +REMOTE_CONTROL_TARGET_ENERGY_REG = 0x84 +REMOTE_CONTROL_TARGET_POWER_REG = 0x86 +REMOTE_CONTROL_TIMEOUT_REG = 0x88 +REMOTE_CONTROL_PUSH_POWER_MODE4_REG = 0x89 + +MODE_DISABLED = 0 +MODE_4_PUSH_POWER = 4 SET_TYPE_SET = 1 -MIN_REMOTE_CONTROL_DURATION = 20 -DURATION_SAFETY_BUFFER = 10 -MODE_1_TIMEOUT_DISABLED = 0 +MODE_4_TIMEOUT_DISABLED = 0 +MODE4_BLOCK_REG_COUNT = 15 class KwargsDict(TypedDict): @@ -47,134 +46,91 @@ def __init__(self, component_config: QCellsBatSetup, **kwargs: Any) -> None: self.kwargs: KwargsDict = kwargs def initialize(self) -> None: - self.__modbus_id: int = self.kwargs['modbus_id'] - self.client: ModbusTcpClient_ = self.kwargs['client'] + self.__modbus_id: int = self.kwargs["modbus_id"] + self.client: ModbusTcpClient_ = self.kwargs["client"] self.store = get_bat_value_store(self.component_config.id) self.fault_state = FaultState(ComponentInfo.from_component_config(self.component_config)) self.peak_filter = PeakFilter(ComponentType.BAT, self.component_config.id, self.fault_state) - self.last_mode: Optional[str] = 'Undefined' + self.last_mode: Optional[str] = "Undefined" def update(self) -> None: power = self.client.read_input_registers(0x0016, ModbusDataType.INT_16, unit=self.__modbus_id) soc = self.client.read_input_registers(0x001C, ModbusDataType.UINT_16, unit=self.__modbus_id) - imported = self.client.read_input_registers( - 0x0021, ModbusDataType.UINT_16, unit=self.__modbus_id) * 100 - exported = self.client.read_input_registers( - 0x001D, ModbusDataType.UINT_16, unit=self.__modbus_id) * 100 + imported = self.client.read_input_registers(0x0021, ModbusDataType.UINT_16, unit=self.__modbus_id) * 100 + exported = self.client.read_input_registers(0x001D, ModbusDataType.UINT_16, unit=self.__modbus_id) * 100 imported, exported = self.peak_filter.check_values(power, imported, exported) bat_state = BatState( power=power, soc=soc, imported=imported, - exported=exported + exported=exported, ) self.store.set(bat_state) def set_power_limit(self, power_limit: Optional[int]) -> None: unit = self.__modbus_id - log.debug(f"QCells set_power_limit: power_limit={power_limit}, " - f"last_mode={self.last_mode}") + log.debug(f"QCells set_power_limit: power_limit={power_limit}, last_mode={self.last_mode}") if power_limit is None: log.debug("Keine Batteriesteuerung, Selbstregelung durch Wechselrichter") if self.last_mode is not None: with self.client: self.client.write_register( - REMOTE_CONTROL_MODE_REG, MODE_1_DISABLED, - data_type=ModbusDataType.UINT_16, unit=unit) + REMOTE_CONTROL_MODE_REG, + MODE_DISABLED, + data_type=ModbusDataType.UINT_16, + unit=unit, + ) self.last_mode = None + return + + if power_limit < 0: + self.last_mode = "discharge" + elif power_limit > 0: + self.last_mode = "charge" else: - if power_limit < 0: - self.last_mode = 'discharge' - elif power_limit > 0: - self.last_mode = 'charge' - else: - self.last_mode = 'stop' - - ap_target = self._get_active_power_target(int(power_limit)) - self._write_mode1(ap_target, unit=unit) - - def _get_active_power_target(self, power_limit: int) -> int: - # openWB-Werte verwenden (nicht WR-Berechnungen): - # power_limit < 0 = Entladen, 0 = Stop, > 0 = Laden - home_consumption = int(data.data.counter_all_data.data.set.home_consumption) - cp_power = int(data.data.cp_all_data.data.get.power) - house_load = max(0, home_consumption + cp_power) - pv_generation = max(0, int(data.data.pv_all_data.data.get.power * -1)) - bat_power = int(data.data.bat_all_data.data.get.power) - ap_target_raw = power_limit - - try: - evu_counter = data.data.counter_all_data.get_evu_counter() - evu_power = int(evu_counter.data.get.power) - import_limit = int(evu_counter.data.config.max_total_power) - except Exception: - evu_power = 0 - import_limit = 0 - - import_bound = None - if power_limit == 0: - ap_target = 0 - elif power_limit > 0 and import_limit > 0: - import_bound = max(0, import_limit - house_load) - ap_target = min(ap_target_raw, import_bound) - else: - ap_target = ap_target_raw - - log.debug(( - f"QCells Mode1 target: power_limit={power_limit}W, bat_power={bat_power}W, evu_power={evu_power}W, " - f"home_consumption={home_consumption}W, " - f"cp_power={cp_power}W, house_load={house_load}W, " - f"pv_generation={pv_generation}W, import_limit={import_limit}W, " - f"import_bound={import_bound}W, ap_target_raw={ap_target_raw}W -> ap_target={ap_target}W" - )) - return int(ap_target) - - def _write_mode1(self, ap_target: int, unit: int) -> None: - """Schreibt die Mode 1 Remote Control Register (0x7C-0x88).""" - duration = self._get_mode1_duration() - log.debug(( - f"QCells Mode1 write: mode={MODE_1_ENABLED_POWER_CONTROL}, set_type={SET_TYPE_SET}, " - f"active_power={ap_target}W, reactive_power=0var, duration={duration}s, " - f"target_soc=0, target_energy=0Wh, target_power=0W, timeout={MODE_1_TIMEOUT_DISABLED}s" - )) + self.last_mode = "stop" + + push_power = self._get_mode4_push_power(int(power_limit)) + self._write_mode4(push_power, unit) + + def _get_mode4_push_power(self, power_limit: int) -> int: + # openWB power_limit semantics: + # <0 discharge, 0 stop, >0 charge + # Mode 4 push_power semantics: + # >0 discharge, 0 stop, <0 charge + push_power = int(power_limit * -1) + log.debug(f"QCells Mode4 target: power_limit={power_limit}W -> push_power={push_power}W") + return push_power + + def _write_mode4(self, push_power: int, unit: int) -> None: + log.debug( + ( + f"QCells Mode4 write: mode={MODE_4_PUSH_POWER}, set_type={SET_TYPE_SET}, " + f"timeout={MODE_4_TIMEOUT_DISABLED}s, push_power={push_power}W" + ) + ) + builder = BinaryPayloadBuilder(byteorder=Endian.Big, wordorder=Endian.Little) + builder.add_16bit_uint(MODE_4_PUSH_POWER) + builder.add_16bit_uint(SET_TYPE_SET) + builder.add_32bit_int(0) + builder.add_32bit_int(0) + builder.add_16bit_uint(0) + builder.add_16bit_uint(0) + builder.add_32bit_uint(0) + builder.add_32bit_int(0) + builder.add_16bit_uint(MODE_4_TIMEOUT_DISABLED) + builder.add_32bit_int(push_power) + payload = builder.to_registers() + if len(payload) != MODE4_BLOCK_REG_COUNT: + raise RuntimeError( + f"Unexpected mode4 payload size {len(payload)}, expected {MODE4_BLOCK_REG_COUNT}" + ) + with self.client: - self.client.write_register( - REMOTE_CONTROL_MODE_REG, MODE_1_ENABLED_POWER_CONTROL, - data_type=ModbusDataType.UINT_16, unit=unit) - self.client.write_register( - REMOTE_CONTROL_SET_TYPE_REG, SET_TYPE_SET, - data_type=ModbusDataType.UINT_16, unit=unit) - self.client.write_register( - REMOTE_CONTROL_ACTIVE_POWER_REG, ap_target, - data_type=ModbusDataType.INT_32, wordorder=Endian.Little, unit=unit) - self.client.write_register( - REMOTE_CONTROL_REACTIVE_POWER_REG, 0, - data_type=ModbusDataType.INT_32, wordorder=Endian.Little, unit=unit) - self.client.write_register( - REMOTE_CONTROL_DURATION_REG, duration, - data_type=ModbusDataType.UINT_16, unit=unit) - self.client.write_register( - REMOTE_CONTROL_TARGET_SOC_REG, 0, - data_type=ModbusDataType.UINT_16, unit=unit) - self.client.write_register( - REMOTE_CONTROL_TARGET_ENERGY_REG, 0, - data_type=ModbusDataType.UINT_32, wordorder=Endian.Little, unit=unit) - self.client.write_register( - REMOTE_CONTROL_TARGET_POWER_REG, 0, - data_type=ModbusDataType.INT_32, wordorder=Endian.Little, unit=unit) - self.client.write_register( - REMOTE_CONTROL_TIMEOUT_REG, MODE_1_TIMEOUT_DISABLED, - data_type=ModbusDataType.UINT_16, unit=unit) - - def _get_mode1_duration(self) -> int: - try: - control_interval = int(data.data.general_data.data.control_interval) - except Exception: - control_interval = 10 - - return max(MIN_REMOTE_CONTROL_DURATION, control_interval + DURATION_SAFETY_BUFFER) + # data_type=None with list payload writes a contiguous FC16 block. + self.client.write_register(REMOTE_CONTROL_MODE_REG, payload, unit=unit) def power_limit_controllable(self) -> bool: return True diff --git a/packages/modules/devices/qcells/qcells/bat_test.py b/packages/modules/devices/qcells/qcells/bat_test.py index b1016810b1..8563c4b399 100644 --- a/packages/modules/devices/qcells/qcells/bat_test.py +++ b/packages/modules/devices/qcells/qcells/bat_test.py @@ -4,106 +4,22 @@ from modules.devices.qcells.qcells.config import QCellsBatSetup -def _fake_data( - home_consumption: int, - cp_power: int, - pv_power: int, - bat_power: int, - evu_power: int, - import_limit: int, -) -> SimpleNamespace: - evu_counter = SimpleNamespace( - data=SimpleNamespace( - get=SimpleNamespace(power=evu_power), - config=SimpleNamespace(max_total_power=import_limit), - ) - ) - return SimpleNamespace( - counter_all_data=SimpleNamespace( - data=SimpleNamespace(set=SimpleNamespace(home_consumption=home_consumption)), - get_evu_counter=lambda: evu_counter, - ), - cp_all_data=SimpleNamespace(data=SimpleNamespace(get=SimpleNamespace(power=cp_power))), - pv_all_data=SimpleNamespace(data=SimpleNamespace(get=SimpleNamespace(power=pv_power))), - bat_all_data=SimpleNamespace(data=SimpleNamespace(get=SimpleNamespace(power=bat_power))), - ) - - def _create_qcells_bat() -> bat.QCellsBat: return bat.QCellsBat(QCellsBatSetup(), modbus_id=1, client=SimpleNamespace()) -def test_get_active_power_target_stop_is_zero(monkeypatch) -> None: +def test_get_mode4_push_power_stop_is_zero() -> None: qcells_bat = _create_qcells_bat() - monkeypatch.setattr( - bat.data, - "data", - _fake_data( - home_consumption=450, - cp_power=5500, - pv_power=-5600, - bat_power=-200, - evu_power=100, - import_limit=24000, - ), - raising=False, - ) - - assert qcells_bat._get_active_power_target(0) == 0 + assert qcells_bat._get_mode4_push_power(0) == 0 -def test_get_active_power_target_discharge_keeps_limit(monkeypatch) -> None: +def test_get_mode4_push_power_discharge_is_positive() -> None: qcells_bat = _create_qcells_bat() - monkeypatch.setattr( - bat.data, - "data", - _fake_data( - home_consumption=500, - cp_power=4800, - pv_power=-5200, - bat_power=-900, - evu_power=-300, - import_limit=24000, - ), - raising=False, - ) + # openWB discharge limit is negative -> mode4 push power must be positive + assert qcells_bat._get_mode4_push_power(-700) == 700 - assert qcells_bat._get_active_power_target(-700) == -700 - -def test_get_active_power_target_charge_clamped_by_import_limit(monkeypatch) -> None: +def test_get_mode4_push_power_charge_is_negative() -> None: qcells_bat = _create_qcells_bat() - monkeypatch.setattr( - bat.data, - "data", - _fake_data( - home_consumption=600, - cp_power=5400, - pv_power=-4000, - bat_power=300, - evu_power=1800, - import_limit=6200, - ), - raising=False, - ) - - assert qcells_bat._get_active_power_target(1200) == 200 - - -def test_get_active_power_target_charge_clamped_to_zero_without_headroom(monkeypatch) -> None: - qcells_bat = _create_qcells_bat() - monkeypatch.setattr( - bat.data, - "data", - _fake_data( - home_consumption=800, - cp_power=5200, - pv_power=-4500, - bat_power=100, - evu_power=5800, - import_limit=5000, - ), - raising=False, - ) - - assert qcells_bat._get_active_power_target(1000) == 0 + # openWB charge limit is positive -> mode4 push power must be negative + assert qcells_bat._get_mode4_push_power(1000) == -1000 From 0a4553dbdfd76f2bf7a2e7f3bc426f2022665bcd Mon Sep 17 00:00:00 2001 From: Xoffroad <65235705+Xoffroad@users.noreply.github.com> Date: Mon, 11 May 2026 12:54:53 +0200 Subject: [PATCH 6/6] Implement active battery control via SolaX Remote Control Mode 4 in the SolaX battery module. (#3351) * adding active battery control for qcells and new field for overload security * remove field for max_power and use max_charge_power and max_discharge_power instead * fix(solax): enable mode4 battery control for g3 hybrids only Implement active battery control via SolaX Remote Control Mode 4 in the SolaX battery module. Gate controllability to version g3 (Gen3/Gen4 Hybrid) and keep g2/g4 non-controllable; add unit tests for sign mapping and version gating. --- packages/modules/devices/solax/solax/bat.py | 96 ++++++++++++++++++- .../modules/devices/solax/solax/bat_test.py | 41 ++++++++ 2 files changed, 135 insertions(+), 2 deletions(-) create mode 100644 packages/modules/devices/solax/solax/bat_test.py diff --git a/packages/modules/devices/solax/solax/bat.py b/packages/modules/devices/solax/solax/bat.py index c1cd62a371..d2aa4b023b 100644 --- a/packages/modules/devices/solax/solax/bat.py +++ b/packages/modules/devices/solax/solax/bat.py @@ -1,5 +1,9 @@ #!/usr/bin/env python3 -from typing import Any, TypedDict +import logging +from typing import Any, Optional, TypedDict + +from pymodbus.constants import Endian +from pymodbus.payload import BinaryPayloadBuilder from modules.common import modbus from modules.common.abstract_device import AbstractBat @@ -10,9 +14,20 @@ from modules.common.simcount import SimCounter from modules.common.store import get_bat_value_store from modules.devices.solax.solax.config import SolaxBatSetup, Solax +from modules.devices.solax.solax.version import SolaxVersion from modules.common.utils.peak_filter import PeakFilter from modules.common.component_type import ComponentType +log = logging.getLogger(__name__) + +# Solax Remote Control Registers (Holding Registers) +REMOTE_CONTROL_MODE_REG = 0x7C +MODE_DISABLED = 0 +MODE_4_PUSH_POWER = 4 +SET_TYPE_SET = 1 +MODE_4_TIMEOUT_DISABLED = 0 +MODE4_BLOCK_REG_COUNT = 15 + class KwargsDict(TypedDict): client: modbus.ModbusTcpClient_ @@ -31,11 +46,12 @@ def initialize(self) -> None: self.store = get_bat_value_store(self.component_config.id) self.fault_state = FaultState(ComponentInfo.from_component_config(self.component_config)) self.peak_filter = PeakFilter(ComponentType.BAT, self.component_config.id, self.fault_state) + self.last_mode: Optional[str] = 'Undefined' def update(self) -> None: unit = self.device_config.configuration.modbus_id - # kein Speicher für Versionen G2 und G4 + # Basiswerte aus dem Batterie-Registersatz lesen power = self.__tcp_client.read_input_registers(0x0016, ModbusDataType.INT_16, unit=unit) soc = self.__tcp_client.read_input_registers(0x001C, ModbusDataType.UINT_16, unit=unit) self.peak_filter.check_values(power) @@ -48,5 +64,81 @@ def update(self) -> None: ) self.store.set(bat_state) + def set_power_limit(self, power_limit: Optional[int]) -> None: + if self.power_limit_controllable() is False: + log.debug("SolaX set_power_limit: aktive Speichersteuerung für diese Version nicht unterstützt") + return + + unit = self.device_config.configuration.modbus_id + log.debug(f"SolaX set_power_limit: power_limit={power_limit}, last_mode={self.last_mode}") + + if power_limit is None: + log.debug("Keine Batteriesteuerung, Selbstregelung durch Wechselrichter") + if self.last_mode is not None: + with self.__tcp_client: + self.__tcp_client.write_register( + REMOTE_CONTROL_MODE_REG, + MODE_DISABLED, + data_type=ModbusDataType.UINT_16, + unit=unit, + ) + self.last_mode = None + return + + if power_limit < 0: + self.last_mode = 'discharge' + elif power_limit > 0: + self.last_mode = 'charge' + else: + self.last_mode = 'stop' + + push_power = self._get_mode4_push_power(int(power_limit)) + self._write_mode4(push_power, unit) + + def _get_mode4_push_power(self, power_limit: int) -> int: + # openWB power_limit semantics: + # <0 discharge, 0 stop, >0 charge + # Mode 4 push_power semantics: + # >0 discharge, 0 stop, <0 charge + push_power = int(power_limit * -1) + log.debug(f"SolaX Mode4 target: power_limit={power_limit}W -> push_power={push_power}W") + return push_power + + def _write_mode4(self, push_power: int, unit: int) -> None: + log.debug( + ( + f"SolaX Mode4 write: mode={MODE_4_PUSH_POWER}, set_type={SET_TYPE_SET}, " + f"timeout={MODE_4_TIMEOUT_DISABLED}s, push_power={push_power}W" + ) + ) + builder = BinaryPayloadBuilder(byteorder=Endian.Big, wordorder=Endian.Little) + builder.add_16bit_uint(MODE_4_PUSH_POWER) + builder.add_16bit_uint(SET_TYPE_SET) + builder.add_32bit_int(0) + builder.add_32bit_int(0) + builder.add_16bit_uint(0) + builder.add_16bit_uint(0) + builder.add_32bit_uint(0) + builder.add_32bit_int(0) + builder.add_16bit_uint(MODE_4_TIMEOUT_DISABLED) + builder.add_32bit_int(push_power) + payload = builder.to_registers() + if len(payload) != MODE4_BLOCK_REG_COUNT: + raise RuntimeError( + f"Unexpected mode4 payload size {len(payload)}, expected {MODE4_BLOCK_REG_COUNT}" + ) + + with self.__tcp_client: + self.__tcp_client.write_register(REMOTE_CONTROL_MODE_REG, payload, unit=unit) + + def power_limit_controllable(self) -> bool: + device_config = getattr(self, 'device_config', self.kwargs.get('device_config')) + if device_config is None: + return False + try: + return SolaxVersion(device_config.configuration.version) == SolaxVersion.G3 + except ValueError: + return False + component_descriptor = ComponentDescriptor(configuration_factory=SolaxBatSetup) diff --git a/packages/modules/devices/solax/solax/bat_test.py b/packages/modules/devices/solax/solax/bat_test.py new file mode 100644 index 0000000000..fdfc6d2d41 --- /dev/null +++ b/packages/modules/devices/solax/solax/bat_test.py @@ -0,0 +1,41 @@ +from types import SimpleNamespace + +from modules.devices.solax.solax import bat +from modules.devices.solax.solax.config import Solax, SolaxBatSetup, SolaxConfiguration +from modules.devices.solax.solax.version import SolaxVersion + + +def _create_solax_bat(version: SolaxVersion) -> bat.SolaxBat: + config = SolaxConfiguration(version=version) + device_config = Solax(configuration=config) + return bat.SolaxBat(SolaxBatSetup(), device_config=device_config, client=SimpleNamespace()) + + +def test_get_mode4_push_power_stop_is_zero() -> None: + solax_bat = _create_solax_bat(SolaxVersion.G3) + assert solax_bat._get_mode4_push_power(0) == 0 + + +def test_get_mode4_push_power_discharge_is_positive() -> None: + solax_bat = _create_solax_bat(SolaxVersion.G3) + assert solax_bat._get_mode4_push_power(-700) == 700 + + +def test_get_mode4_push_power_charge_is_negative() -> None: + solax_bat = _create_solax_bat(SolaxVersion.G3) + assert solax_bat._get_mode4_push_power(1000) == -1000 + + +def test_power_limit_controllable_true_for_g3() -> None: + solax_bat = _create_solax_bat(SolaxVersion.G3) + assert solax_bat.power_limit_controllable() is True + + +def test_power_limit_controllable_false_for_g2() -> None: + solax_bat = _create_solax_bat(SolaxVersion.G2) + assert solax_bat.power_limit_controllable() is False + + +def test_power_limit_controllable_false_for_g4() -> None: + solax_bat = _create_solax_bat(SolaxVersion.G4) + assert solax_bat.power_limit_controllable() is False