Skip to content

Commit 3bb4d4e

Browse files
committed
Split functionalily into separate files
Signed-off-by: Nicolai Buchwitz <nb@tipi-net.de>
1 parent 91a62e5 commit 3bb4d4e

3 files changed

Lines changed: 188 additions & 177 deletions

File tree

uhubctl/__init__.py

Lines changed: 3 additions & 177 deletions
Original file line numberDiff line numberDiff line change
@@ -1,179 +1,5 @@
11
"""Wrapper module for uhubctl"""
2+
from .usb import discover_hubs, Hub, Port
3+
from . import utils
24

3-
import re
4-
import subprocess
5-
from typing import List, Optional
6-
7-
UHUBCTL_BINARY = "uhubctl"
8-
9-
10-
def _uhubctl(args: list = None) -> list:
11-
cmd = UHUBCTL_BINARY.split(" ")
12-
13-
if args is not None:
14-
cmd += args
15-
16-
try:
17-
result = subprocess.run(cmd, capture_output=True, check=True)
18-
stdout = result.stdout.decode()
19-
20-
return stdout.split('\n')
21-
except subprocess.CalledProcessError as exc:
22-
stderr = exc.stderr.decode()
23-
24-
if stderr.startswith("No compatible devices detected"):
25-
return []
26-
27-
raise Exception(f"uhubctl failed: {stderr}") from exc
28-
29-
30-
def discover_hubs():
31-
"""
32-
Return list of all by uhubctl supported USB hubs with their ports
33-
34-
Returns:
35-
List of hubs
36-
37-
"""
38-
hubs = []
39-
40-
pattern = re.compile(r"Current status for hub ([\.\d-]+)")
41-
42-
for line in _uhubctl():
43-
regex = pattern.match(line)
44-
45-
if regex:
46-
hub = Hub(regex.group(1), enumerate_ports=True)
47-
hubs.append(hub)
48-
49-
return hubs
50-
51-
52-
class Hub:
53-
"""
54-
USB hub representation from uhubctl
55-
"""
56-
def __init__(self, path: str, enumerate_ports: bool = False) -> None:
57-
"""
58-
Create new hub instance
59-
60-
Arguments:
61-
path: USB hub path identifier
62-
enumerate_ports: Automatically enumerate ports
63-
"""
64-
self.path: str = path
65-
self.ports: List[Port] = []
66-
67-
if enumerate_ports:
68-
self.discover_ports()
69-
70-
def add_port(self, port_number: int) -> 'Port':
71-
"""
72-
Add port to hub by port number
73-
74-
Arguments:
75-
port_number: Indentification number of port
76-
77-
Returns:
78-
Port
79-
80-
"""
81-
port = Port(self, port_number)
82-
self.ports.append(port)
83-
84-
return port
85-
86-
def add_ports(self, port_start: int, port_end: int):
87-
"""
88-
Add multiple ports to hub
89-
90-
Arguments:
91-
port_start: First port's indentification number
92-
port_end: Last port's ndentification number
93-
"""
94-
for port_number in range(port_start, port_end):
95-
self.add_port(port_number)
96-
97-
def find_port(self, port_number: int) -> Optional['Port']:
98-
"""
99-
Find port by port number
100-
101-
Arguments:
102-
port_number: Identification number of port to find
103-
104-
Returns:
105-
Port or None
106-
"""
107-
108-
for port in self.ports:
109-
if port.port_number == int(port_number):
110-
return port
111-
112-
return None
113-
114-
def discover_ports(self) -> None:
115-
"""
116-
Discover ports for this hub instance
117-
"""
118-
pattern = re.compile(r" Port (\d+): \d{4} (power|off)")
119-
120-
for line in _uhubctl(["-l", self.path]):
121-
regex = pattern.match(line)
122-
123-
if regex:
124-
port = Port(self, regex.group(1))
125-
self.ports.append(port)
126-
127-
def __str__(self) -> str:
128-
return f"USB Hub {self.path}"
129-
130-
131-
class Port:
132-
"""
133-
USB port representation from uhubctl
134-
"""
135-
def __init__(self, hub: Hub, port_number: int):
136-
"""
137-
Create new port instance
138-
139-
Arguments:
140-
hub: Hub to attach port to
141-
port_number: Number of port to create
142-
143-
"""
144-
self.hub = hub
145-
self.port_number = int(port_number)
146-
147-
@property
148-
def status(self) -> bool:
149-
"""
150-
Port power status
151-
"""
152-
status = None
153-
pattern = re.compile(fr" Port {self.port_number}: \d{{4}} (power|off)")
154-
155-
args = ["-l", self.hub.path, "-p", str(self.port_number)]
156-
for line in _uhubctl(args):
157-
reg = pattern.match(line)
158-
159-
if reg:
160-
status = (reg.group(1) == "power")
161-
162-
if status is None:
163-
raise Exception()
164-
165-
return status
166-
167-
@status.setter
168-
def status(self, status: bool) -> None:
169-
args = ["-l", self.hub.path, "-p", str(self.port_number), "-a"]
170-
171-
if status:
172-
args.append("on")
173-
else:
174-
args.append("off")
175-
176-
_uhubctl(args)
177-
178-
def __str__(self) -> str:
179-
return f"USB Port {self.hub.path}.{self.port_number}"
5+
__all__ = ['discover_hubs', 'Hub', 'Port', 'utils']

uhubctl/usb.py

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
"""Representation classes and helper functions"""
2+
import re
3+
from typing import List, Optional
4+
5+
from .utils import _uhubctl
6+
7+
8+
def discover_hubs() -> List['Hub']:
9+
"""
10+
Return list of all by uhubctl supported USB hubs with their ports
11+
12+
Returns:
13+
List of hubs
14+
15+
"""
16+
hubs = []
17+
18+
pattern = re.compile(r"Current status for hub ([\.\d-]+)")
19+
20+
for line in _uhubctl():
21+
regex = pattern.match(line)
22+
23+
if regex:
24+
hub = Hub(regex.group(1), enumerate_ports=True)
25+
hubs.append(hub)
26+
27+
return hubs
28+
29+
30+
class Hub:
31+
"""
32+
USB hub representation from uhubctl
33+
"""
34+
35+
def __init__(self, path: str, enumerate_ports: bool = False) -> None:
36+
"""
37+
Create new hub instance
38+
39+
Arguments:
40+
path: USB hub path identifier
41+
enumerate_ports: Automatically enumerate ports
42+
"""
43+
self.path: str = path
44+
self.ports: List[Port] = []
45+
46+
if enumerate_ports:
47+
self.discover_ports()
48+
49+
def add_port(self, port_number: int) -> 'Port':
50+
"""
51+
Add port to hub by port number
52+
53+
Arguments:
54+
port_number: Indentification number of port
55+
56+
Returns:
57+
Port
58+
59+
"""
60+
port = Port(self, port_number)
61+
self.ports.append(port)
62+
63+
return port
64+
65+
def add_ports(self, port_start: int, port_end: int):
66+
"""
67+
Add multiple ports to hub
68+
69+
Arguments:
70+
port_start: First port's indentification number
71+
port_end: Last port's ndentification number
72+
"""
73+
for port_number in range(port_start, port_end):
74+
self.add_port(port_number)
75+
76+
def find_port(self, port_number: int) -> Optional['Port']:
77+
"""
78+
Find port by port number
79+
80+
Arguments:
81+
port_number: Identification number of port to find
82+
83+
Returns:
84+
Port or None
85+
"""
86+
87+
for port in self.ports:
88+
if port.port_number == int(port_number):
89+
return port
90+
91+
return None
92+
93+
def discover_ports(self) -> None:
94+
"""
95+
Discover ports for this hub instance
96+
"""
97+
pattern = re.compile(r" Port (\d+): \d{4} (power|off)")
98+
99+
for line in _uhubctl(["-l", self.path]):
100+
regex = pattern.match(line)
101+
102+
if regex:
103+
port = Port(self, regex.group(1))
104+
self.ports.append(port)
105+
106+
def __str__(self) -> str:
107+
return f"USB Hub {self.path}"
108+
109+
110+
class Port:
111+
"""
112+
USB port representation from uhubctl
113+
"""
114+
115+
def __init__(self, hub: Hub, port_number: int):
116+
"""
117+
Create new port instance
118+
119+
Arguments:
120+
hub: Hub to attach port to
121+
port_number: Number of port to create
122+
123+
"""
124+
self.hub = hub
125+
self.port_number = int(port_number)
126+
127+
@property
128+
def status(self) -> bool:
129+
"""
130+
Port power status
131+
"""
132+
status = None
133+
pattern = re.compile(
134+
fr" Port {self.port_number}: \d{{4}} (power|off)")
135+
136+
args = ["-l", self.hub.path, "-p", str(self.port_number)]
137+
for line in _uhubctl(args):
138+
reg = pattern.match(line)
139+
140+
if reg:
141+
status = (reg.group(1) == "power")
142+
143+
if status is None:
144+
raise Exception()
145+
146+
return status
147+
148+
@status.setter
149+
def status(self, status: bool) -> None:
150+
args = ["-l", self.hub.path, "-p", str(self.port_number), "-a"]
151+
152+
if status:
153+
args.append("on")
154+
else:
155+
args.append("off")
156+
157+
_uhubctl(args)
158+
159+
def __str__(self) -> str:
160+
return f"USB Port {self.hub.path}.{self.port_number}"

uhubctl/utils.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
"""Utilities around uhubctl binary"""
2+
import subprocess
3+
4+
UHUBCTL_BINARY = "uhubctl"
5+
6+
7+
def _uhubctl(args: list = None) -> list:
8+
cmd = UHUBCTL_BINARY.split(" ")
9+
cmd.append('-N')
10+
11+
if args is not None:
12+
cmd += args
13+
14+
try:
15+
result = subprocess.run(cmd, capture_output=True, check=True)
16+
stdout = result.stdout.decode()
17+
18+
return stdout.split('\n')
19+
except subprocess.CalledProcessError as exc:
20+
stderr = exc.stderr.decode()
21+
22+
if stderr.startswith("No compatible devices detected"):
23+
return []
24+
25+
raise Exception(f"uhubctl failed: {stderr}") from exc

0 commit comments

Comments
 (0)