Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config/batcontrol_config_dummy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ battery_control:
always_allow_discharge_limit: 0.90 # 0.00 to 1.00 above this SOC limit using energy from the battery is always allowed
max_charging_from_grid_limit: 0.89 # 0.00 to 1.00 charging from the grid is only allowed until this SOC limit
# min_grid_charge_soc: 0.55 # optional 0.00 to 1.00 target to preserve/charge before expensive slots
# grid_charge_target_strategy: fixed # fixed = use min_grid_charge_soc unchanged; forecast = raise it from forecasted expensive-slot need
# grid_charge_forecast_pv_factor: 1.0 # forecast strategy only: 0.0 to 1.0 multiplier for PV forecast trust
min_recharge_amount: 100 # in Wh, start & minimum amount of energy to recharge the battery

#--------------------------
Expand Down
80 changes: 79 additions & 1 deletion src/batcontrol/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@
from .logic import CalculationInput, CalculationParameters
from .logic import CommonLogic
from .logic import PeakShavingConfig
from .logic.grid_charge_target import (
GRID_CHARGE_TARGET_STRATEGIES,
GRID_CHARGE_TARGET_STRATEGY_FIXED,
calculate_effective_grid_charge_soc,
)

from .dynamictariff import DynamicTariff as tariff_factory
from .inverter import Inverter as inverter_factory
Expand Down Expand Up @@ -80,6 +85,27 @@ def _parse_optional_ratio(value, config_key: str) -> Optional[float]:
return ratio


def _parse_ratio(value, config_key: str) -> float:
"""Parse a required 0..1 ratio config value."""
ratio = _parse_optional_ratio(value, config_key)
if ratio is None:
raise ValueError(
f"{config_key} must be numeric between 0 and 1, got None"
)
return ratio


def _parse_grid_charge_target_strategy(value) -> str:
"""Parse the grid-charge target strategy config value."""
strategy = str(value).strip().lower()
if strategy not in GRID_CHARGE_TARGET_STRATEGIES:
raise ValueError(
f"battery_control.grid_charge_target_strategy must be one of "
f"{GRID_CHARGE_TARGET_STRATEGIES}, got {value!r}"
)
return strategy
Comment on lines +98 to +106
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in f5261de.



class Batcontrol:
""" Main class for Batcontrol, handles the logic and control of the battery system """
general_logic = None # type: CommonLogic
Expand Down Expand Up @@ -235,6 +261,16 @@ def __init__(self, configdict: dict):
self.batconfig.get('min_grid_charge_soc', None),
'battery_control.min_grid_charge_soc'
)
self.grid_charge_target_strategy = _parse_grid_charge_target_strategy(
self.batconfig.get(
'grid_charge_target_strategy',
GRID_CHARGE_TARGET_STRATEGY_FIXED,
)
)
self.grid_charge_forecast_pv_factor = _parse_ratio(
self.batconfig.get('grid_charge_forecast_pv_factor', 1.0),
'battery_control.grid_charge_forecast_pv_factor'
)
self.preserve_min_grid_charge_soc = False
if (self.min_grid_charge_soc is not None
and self.min_grid_charge_soc > self.max_charging_from_grid_limit):
Expand Down Expand Up @@ -616,12 +652,19 @@ def run(self):
self.peak_shaving_config,
enabled=peak_shaving_config_enabled and not evcc_disable_peak_shaving,
)
effective_min_grid_charge_soc = self.__calculate_effective_min_grid_charge_soc(
calc_input,
production,
consumption,
prices,
)

calc_parameters = CalculationParameters(
self.max_charging_from_grid_limit,
self.min_price_difference,
self.min_price_difference_rel,
self.get_max_capacity(),
min_grid_charge_soc=self.min_grid_charge_soc,
min_grid_charge_soc=effective_min_grid_charge_soc,
preserve_min_grid_charge_soc=self.preserve_min_grid_charge_soc,
peak_shaving=ps_runtime,
)
Expand Down Expand Up @@ -666,6 +709,41 @@ def run(self):
else:
self.avoid_discharging()

def __calculate_effective_min_grid_charge_soc(
self,
calc_input: CalculationInput,
production,
consumption,
prices) -> Optional[float]:
effective_min_grid_charge_soc = calculate_effective_grid_charge_soc(
strategy=self.grid_charge_target_strategy,
configured_min_grid_charge_soc=self.min_grid_charge_soc,
max_charging_from_grid_limit=self.max_charging_from_grid_limit,
max_capacity=self.get_max_capacity(),
min_soc_energy=max(
0.0,
calc_input.stored_energy - calc_input.stored_usable_energy,
),
production=production,
consumption=consumption,
prices=prices,
min_price_difference=self.min_price_difference,
min_price_difference_rel=self.min_price_difference_rel,
pv_forecast_factor=self.grid_charge_forecast_pv_factor,
)
if effective_min_grid_charge_soc != self.min_grid_charge_soc:
logger.info(
'Forecast grid-charge target raised min_grid_charge_soc '
'from %.1f%% to %.1f%%',
self.min_grid_charge_soc * 100,
effective_min_grid_charge_soc * 100,
)
if (self.mqtt_api is not None
and effective_min_grid_charge_soc is not None):
self.mqtt_api.publish_effective_min_grid_charge_soc(
effective_min_grid_charge_soc)
return effective_min_grid_charge_soc

def __set_charge_rate(self, charge_rate: int):
""" Set charge rate and publish to mqtt """
self.last_charge_rate = charge_rate
Expand Down
106 changes: 106 additions & 0 deletions src/batcontrol/logic/grid_charge_target.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
"""Effective grid-charge SoC target calculation."""

from typing import Optional, Sequence

GRID_CHARGE_TARGET_STRATEGY_FIXED = 'fixed'
GRID_CHARGE_TARGET_STRATEGY_FORECAST = 'forecast'
GRID_CHARGE_TARGET_STRATEGIES = (
GRID_CHARGE_TARGET_STRATEGY_FIXED,
GRID_CHARGE_TARGET_STRATEGY_FORECAST,
)


def _ordered_values(values) -> Sequence[float]:
if isinstance(values, dict):
if not values:
return []
keys = set(values.keys())
error_message = (
"forecast dict values must use consecutive integer "
"indices starting at 0"
)
if not all(isinstance(index, int) for index in keys):
raise ValueError(error_message)
expected_keys = set(range(max(keys) + 1))
if keys != expected_keys:
raise ValueError(error_message)
return [values[index] for index in range(max(keys) + 1)]
return list(values)


def _calculate_min_dynamic_price_difference(
current_price: float,
min_price_difference: float,
min_price_difference_rel: float) -> float:
return max(min_price_difference,
min_price_difference_rel * abs(current_price))


def calculate_effective_grid_charge_soc(
strategy: str,
configured_min_grid_charge_soc: Optional[float],
max_charging_from_grid_limit: float,
max_capacity: float,
min_soc_energy: float,
production,
consumption,
prices,
min_price_difference: float,
min_price_difference_rel: float = 0.0,
pv_forecast_factor: float = 1.0) -> Optional[float]:
"""Calculate the effective minimum grid-charge SoC for this evaluation.

``fixed`` returns the configured target unchanged. ``forecast`` treats the
configured target as a floor and raises it when future expensive-slot net
demand implies a higher target. Forecast PV can be discounted with
``pv_forecast_factor`` to account for uncertain PV ramps.
"""
if configured_min_grid_charge_soc is None:
return None
if strategy not in GRID_CHARGE_TARGET_STRATEGIES:
raise ValueError(
f"grid_charge_target_strategy must be one of "
f"{GRID_CHARGE_TARGET_STRATEGIES}, got '{strategy}'"
)
if strategy == GRID_CHARGE_TARGET_STRATEGY_FIXED:
return configured_min_grid_charge_soc
if max_capacity <= 0:
raise ValueError("max_capacity must be greater than 0")
if not 0 <= pv_forecast_factor <= 1:
raise ValueError(
"grid_charge_forecast_pv_factor must be between 0 and 1, "
f"got {pv_forecast_factor}"
)

production_values = _ordered_values(production)
consumption_values = _ordered_values(consumption)
price_values = _ordered_values(prices)
max_slot = min(len(production_values), len(consumption_values), len(price_values))
if max_slot < 2:
return configured_min_grid_charge_soc

current_price = price_values[0]
min_dynamic_price_difference = _calculate_min_dynamic_price_difference(
current_price,
min_price_difference,
min_price_difference_rel,
)

# Evaluate until the next price slot that is no more expensive than the
# current slot. This keeps the target tied to the current cheap/economical
# charging window rather than charging for the whole forecast horizon.
for slot in range(1, max_slot):
if price_values[slot] <= current_price:
max_slot = slot
break

forecast_need = 0.0
for slot in range(1, max_slot):
if price_values[slot] <= current_price + min_dynamic_price_difference:
continue
discounted_pv = production_values[slot] * pv_forecast_factor
forecast_need += max(0.0, consumption_values[slot] - discounted_pv)

forecast_target = (min_soc_energy + forecast_need) / max_capacity
forecast_target = min(forecast_target, max_charging_from_grid_limit)
return max(configured_min_grid_charge_soc, forecast_target)
34 changes: 31 additions & 3 deletions src/batcontrol/mqtt_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
- /mode: operational mode (-1 = charge from grid, 0 = avoid discharge, 8 = limit battery charge, 10 = discharge allowed)
- /max_charging_from_grid_limit: charge limit in 0.1-1
- /max_charging_from_grid_limit_percent: charge limit in %
- /min_grid_charge_soc: optional minimum grid-charge target in 0.0-1.0
- /min_grid_charge_soc_percent: optional minimum grid-charge target in %
- /min_grid_charge_soc: configured optional minimum grid-charge target in 0.0-1.0
- /min_grid_charge_soc_percent: configured optional minimum grid-charge target in %
- /effective_min_grid_charge_soc: runtime effective minimum grid-charge target in 0.0-1.0
- /effective_min_grid_charge_soc_percent: runtime effective minimum grid-charge target in %
- /always_allow_discharge_limit: always discharge limit in 0.1-1
- /always_allow_discharge_limit_percent: always discharge limit in %
- /always_allow_discharge_limit_capacity: always discharge limit in Wh
Expand Down Expand Up @@ -390,7 +392,7 @@ def publish_max_charging_from_grid_limit(
)

def publish_min_grid_charge_soc(self, min_grid_charge_soc: float) -> None:
""" Publish the optional minimum grid-charge SoC target to MQTT
""" Publish the configured optional minimum grid-charge SoC target to MQTT
/min_grid_charge_soc_percent
/min_grid_charge_soc as digit.
"""
Expand All @@ -404,6 +406,22 @@ def publish_min_grid_charge_soc(self, min_grid_charge_soc: float) -> None:
f'{min_grid_charge_soc:.2f}'
)

def publish_effective_min_grid_charge_soc(
self, effective_min_grid_charge_soc: float) -> None:
""" Publish the runtime effective minimum grid-charge SoC target to MQTT
/effective_min_grid_charge_soc_percent
/effective_min_grid_charge_soc as digit.
"""
if self.client.is_connected():
self.client.publish(
self.base_topic + '/effective_min_grid_charge_soc_percent',
f'{effective_min_grid_charge_soc * 100:.0f}'
)
self.client.publish(
self.base_topic + '/effective_min_grid_charge_soc',
f'{effective_min_grid_charge_soc:.2f}'
)

def publish_min_price_difference(
self, min_price_difference: float) -> None:
""" Publish the minimum price difference to MQTT found in config
Expand Down Expand Up @@ -652,6 +670,16 @@ def send_mqtt_discovery_messages(self) -> None:
"/min_grid_charge_soc_percent",
entity_category="diagnostic")

self.publish_mqtt_discovery_message(
"Effective Minimum Grid Charge SOC",
"batcontrol_effective_min_grid_charge_soc",
"sensor",
"battery",
"%",
self.base_topic +
"/effective_min_grid_charge_soc_percent",
entity_category="diagnostic")

self.publish_mqtt_discovery_message(
"Min Price Difference",
"batcontrol_min_price_difference",
Expand Down
Loading
Loading