1- import os
21import re
2+ import subprocess
33
4+ UHUBCTL_BINARY = "uhubctl"
45
5- def discover_hubs (uhubctl_binary : str = "uhubctl" ):
6- cmd = f"{ uhubctl_binary } "
7- stream = os .popen (cmd )
86
7+ def __uhubctl (args : list = []) -> list :
8+ cmd = UHUBCTL_BINARY .split (" " ) + args
9+ result = subprocess .run (cmd , capture_output = True )
10+ stdout = result .stdout .decode ()
11+
12+ if result .returncode != 0 :
13+ stderr = result .stderr .decode ()
14+
15+ raise Exception (f"uhubctl failed: { stderr } " )
16+
17+ return stdout .split ('\n ' )
18+
19+
20+ def discover_hubs ():
921 hubs = []
1022
1123 pattern_hub = re .compile ("Current status for hub ([\.\d-]+)" )
1224 pattern_port = re .compile (" Port (\d+): \d{4} (power|off)" )
1325
14- for line in stream . readlines ():
26+ for line in __uhubctl ():
1527 reg_hub = pattern_hub .match (line )
1628 reg_port = pattern_port .match (line )
1729
1830 if reg_hub :
19- hub = Hub (reg_hub .group (1 ), uhubctl_binary )
31+ hub = Hub (reg_hub .group (1 ))
2032 hubs .append (hub )
2133 elif reg_port :
2234 # assume that the port is the last detected one
@@ -32,10 +44,9 @@ def discover_hubs(uhubctl_binary: str = "uhubctl"):
3244
3345
3446class Hub :
35- def __init__ (self , path : str , uhubctl_binary : str = "uhubctl" ) -> None :
47+ def __init__ (self , path : str ) -> None :
3648 self .path = path
3749 self .ports = []
38- self .uhubctl = uhubctl_binary
3950
4051 def add_port (self , port_number : int ) -> 'Port' :
4152 port = Port (self , port_number )
@@ -61,10 +72,8 @@ def status(self) -> bool:
6172 status = None
6273 pattern = re .compile (f" Port { self .port_number } : \d{{4}} (power|off)" )
6374
64- cmd = f"{ self .hub .uhubctl } -l { self .hub .path } -p { self .port_number } "
65- stream = os .popen (cmd )
66-
67- for line in stream .readlines ():
75+ args = ["-l" , self .hub .path , "-p" , self .port_number ]
76+ for line in __uhubctl (args ):
6877 reg = pattern .match (line )
6978
7079 if reg :
@@ -77,14 +86,14 @@ def status(self) -> bool:
7786
7887 @status .setter
7988 def status (self , status : bool ) -> None :
80- cmd = f"{ self .hub .uhubctl } -l { self .hub .path } -p { self .port_number } -a "
89+ args = ["-l" , self .hub .path , "-p" , self .port_number , "-a" ]
90+
8191 if status :
82- cmd += "on"
92+ args . append ( "on" )
8393 else :
84- cmd += "off"
94+ args . append ( "off" )
8595
86- stream = os .popen (cmd )
87- stream .readlines ()
96+ __uhubctl (args )
8897
8998 def __str__ (self ) -> str :
9099 return f"USB Port { self .hub .path } .{ self .port_number } "
0 commit comments