-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
129 lines (105 loc) · 4.3 KB
/
utils.py
File metadata and controls
129 lines (105 loc) · 4.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import logging
import numpy as np
import pandas as pd
import torch
from typing import Dict, List, Tuple
from abc import ABC, abstractmethod
from enum import Enum
from dataclasses import dataclass
from scipy.optimize import minimize
from scipy.spatial import distance
from scipy.stats import norm
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Constants and configuration
CONFIG = {
'VELOCITY_THRESHOLD': 0.1,
'FLOW_THEORY_THRESHOLD': 0.5,
'MAX_ITERATIONS': 1000,
'TOLERANCE': 1e-6
}
# Exception classes
class UtilityError(Exception):
pass
class InvalidInputError(UtilityError):
pass
class OptimizationError(UtilityError):
pass
# Data structures/models
@dataclass
class State:
x: float
y: float
vx: float
vy: float
@dataclass
class ControlInput:
u: float
v: float
# Validation functions
def validate_state(state: State) -> None:
if not isinstance(state, State):
raise InvalidInputError('Invalid state input')
if not (isinstance(state.x, (int, float)) and isinstance(state.y, (int, float))):
raise InvalidInputError('Invalid state coordinates')
if not (isinstance(state.vx, (int, float)) and isinstance(state.vy, (int, float))):
raise InvalidInputError('Invalid state velocities')
def validate_control_input(control_input: ControlInput) -> None:
if not isinstance(control_input, ControlInput):
raise InvalidInputError('Invalid control input')
if not (isinstance(control_input.u, (int, float)) and isinstance(control_input.v, (int, float))):
raise InvalidInputError('Invalid control input values')
# Utility methods
def calculate_distance(state1: State, state2: State) -> float:
return distance.euclidean((state1.x, state1.y), (state2.x, state2.y))
def calculate_velocity(state1: State, state2: State) -> float:
return np.sqrt((state2.vx - state1.vx)**2 + (state2.vy - state1.vy)**2)
def calculate_flow_theory(state: State, control_input: ControlInput) -> float:
return np.sqrt((control_input.u - state.vx)**2 + (control_input.v - state.vy)**2)
def calculate_velocity_threshold(state: State, threshold: float) -> float:
return np.sqrt((state.vx**2 + state.vy**2))
def calculate_barrier_function(state: State, control_input: ControlInput, threshold: float) -> float:
return calculate_flow_theory(state, control_input) - threshold
def optimize_control_input(state: State, threshold: float) -> ControlInput:
def objective(control_input: ControlInput) -> float:
return calculate_barrier_function(state, control_input, threshold)
def constraint(control_input: ControlInput) -> float:
return calculate_flow_theory(state, control_input) - threshold
result = minimize(objective, [0, 0], method='SLSQP', constraints={'type': 'ineq', 'fun': constraint})
return ControlInput(result.x[0], result.x[1])
# Integration interfaces
class UtilityInterface(ABC):
@abstractmethod
def get_control_input(self, state: State) -> ControlInput:
pass
class VelocityThresholdUtility(UtilityInterface):
def get_control_input(self, state: State) -> ControlInput:
validate_state(state)
threshold = CONFIG['VELOCITY_THRESHOLD']
return optimize_control_input(state, threshold)
class FlowTheoryUtility(UtilityInterface):
def get_control_input(self, state: State) -> ControlInput:
validate_state(state)
threshold = CONFIG['FLOW_THEORY_THRESHOLD']
return optimize_control_input(state, threshold)
# Main class
class Utility:
def __init__(self, utility_type: str):
self.utility_type = utility_type
self.utility_interface = self.get_utility_interface()
def get_utility_interface(self) -> UtilityInterface:
if self.utility_type == 'velocity_threshold':
return VelocityThresholdUtility()
elif self.utility_type == 'flow_theory':
return FlowTheoryUtility()
else:
raise InvalidInputError('Invalid utility type')
def get_control_input(self, state: State) -> ControlInput:
return self.utility_interface.get_control_input(state)
# Example usage
if __name__ == '__main__':
state = State(0, 0, 1, 1)
utility = Utility('velocity_threshold')
control_input = utility.get_control_input(state)
logger.info(f'Control input: {control_input}')