-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathusb_device.py
More file actions
451 lines (373 loc) · 16.8 KB
/
usb_device.py
File metadata and controls
451 lines (373 loc) · 16.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
# SPDX-FileCopyrightText: 2025 Alexander Brinkman
# SPDX-License-Identifier: GPL-3.0-or-later
"""USB device wrapper and manager module for USB/IP server.
This module provides classes for accessing and managing USB devices:
- USBDevice: Wrapper class for individual USB device access
- USBDeviceManager: Manager class for device enumeration and lookup
"""
import logging
import re
import usb.core
import usb.util
class USBDevice:
"""Wrapper class for USB device access via pyusb.
Provides convenient access to USB device properties and descriptors.
Attributes:
device: The underlying pyusb device object.
bus_id: The bus identifier string (e.g., "20-4.2.1").
"""
def __init__(self, device: usb.core.Device) -> None:
"""Initialize USBDevice with a pyusb device.
Args:
device: A pyusb Device object.
"""
self.device = device
self.bus_id = self.build_bus_id(device)
self._manufacturer: str | None = None
self._product: str | None = None
self._serial_number: str | None = None
self._strings_loaded = False
@staticmethod
def clean_usb_string(value: str | None) -> str | None:
"""Clean a USB string by removing null characters and whitespace.
USB strings sometimes contain garbage data after null terminators.
This method extracts only the valid portion of the string.
Args:
value: The USB string to clean, or None.
Returns:
The cleaned string, or None if input was None or empty.
"""
if value is None:
return None
# Split at null character and take first part, then strip whitespace
cleaned = value.split("\x00")[0].strip()
return cleaned if cleaned else None
@staticmethod
def build_bus_id(device: usb.core.Device) -> str:
"""Build a bus ID string from a pyusb device.
The bus ID format is "bus-port.port.port" (e.g., "20-4.2.1").
Args:
device: A pyusb Device object.
Returns:
The bus ID string.
"""
port_numbers = device.port_numbers
if port_numbers:
port_path = ".".join(str(port) for port in port_numbers)
return f"{device.bus}-{port_path}"
return f"{device.bus}-{device.address}"
@staticmethod
def parse_bus_id(bus_id: str) -> tuple[int, tuple[int, ...]]:
"""Parse a bus ID string into bus number and port numbers.
Args:
bus_id: The bus ID string (e.g., "20-4.2.1").
Returns:
A tuple of (bus_number, port_numbers_tuple).
Raises:
ValueError: If the bus ID format is invalid.
"""
match = re.match(r"^(\d+)-(.+)$", bus_id)
if not match:
raise ValueError(f"Invalid bus ID format: {bus_id}")
bus_number = int(match.group(1))
port_path = match.group(2)
port_numbers = tuple(int(port) for port in port_path.split("."))
return bus_number, port_numbers
def _load_strings(self) -> None:
"""Load USB string descriptors from the device.
This is done lazily to avoid claiming devices unnecessarily.
Errors are logged but don't raise exceptions.
"""
if self._strings_loaded:
return
self._strings_loaded = True
logger = logging.getLogger(__name__)
try:
if self.device.iManufacturer:
raw_manufacturer = usb.util.get_string(self.device, self.device.iManufacturer)
self._manufacturer = self.clean_usb_string(raw_manufacturer)
except (usb.core.USBError, ValueError) as error:
logger.debug("Could not read manufacturer string: %s", error)
try:
if self.device.iProduct:
raw_product = usb.util.get_string(self.device, self.device.iProduct)
self._product = self.clean_usb_string(raw_product)
except (usb.core.USBError, ValueError) as error:
logger.debug("Could not read product string: %s", error)
try:
if self.device.iSerialNumber:
raw_serial = usb.util.get_string(self.device, self.device.iSerialNumber)
self._serial_number = self.clean_usb_string(raw_serial)
except (usb.core.USBError, ValueError) as error:
logger.debug("Could not read serial number string: %s", error)
@property
def vendor_id(self) -> int:
"""Get the vendor ID (VID) of the device."""
return int(self.device.idVendor)
@property
def product_id(self) -> int:
"""Get the product ID (PID) of the device."""
return int(self.device.idProduct)
@property
def manufacturer(self) -> str | None:
"""Get the manufacturer string of the device."""
self._load_strings()
return self._manufacturer
@property
def product(self) -> str | None:
"""Get the product string of the device."""
self._load_strings()
return self._product
@property
def serial_number(self) -> str | None:
"""Get the serial number string of the device."""
self._load_strings()
return self._serial_number
@property
def device_id(self) -> str:
"""Get the device identity string (VID:PID:serial or VID:PID)."""
self._load_strings()
if self._serial_number:
return f"{self.vendor_id:04x}:{self.product_id:04x}:{self._serial_number}"
return f"{self.vendor_id:04x}:{self.product_id:04x}"
def to_dict(self) -> dict[str, str | None]:
"""Get basic device information as a dictionary.
Returns:
Dictionary with bus_id, vid, pid, manufacturer, product, and serial.
"""
return {
"bus_id": self.bus_id,
"vid": f"{self.vendor_id:04x}",
"pid": f"{self.product_id:04x}",
"manufacturer": self.manufacturer,
"product": self.product,
"serial": self.serial_number,
}
def claim(self) -> bool:
"""Claim the device for exclusive access.
Detaches kernel drivers (on Linux) and claims all interfaces.
On macOS, kernel driver detachment is not supported by libusb, so HID
devices (mice, keyboards) may not work correctly.
Returns:
True if the device was claimed successfully, False otherwise.
"""
logger = logging.getLogger(__name__)
access_denied = False
kernel_driver_warning_shown = False
try:
# Try to set configuration if not already set
try:
config = self.device.get_active_configuration()
except usb.core.USBError:
try:
self.device.set_configuration()
config = self.device.get_active_configuration()
except usb.core.USBError as error:
if error.errno == 13: # Access denied
logger.error(
"Access denied when setting device configuration. "
"Try running with sudo."
)
return False
logger.warning("Could not set configuration: %s", error)
return False
# Detach kernel drivers and claim all interfaces
for interface in config:
interface_number = interface.bInterfaceNumber
# Try to detach kernel driver
try:
if self.device.is_kernel_driver_active(interface_number):
try:
self.device.detach_kernel_driver(interface_number)
logger.info(
"Detached kernel driver from interface %d",
interface_number,
)
except usb.core.USBError as error:
if not kernel_driver_warning_shown:
logger.warning(
"Could not detach kernel driver from interface "
"%d: %s. HID devices may not work correctly.",
interface_number,
error,
)
kernel_driver_warning_shown = True
except NotImplementedError:
# macOS doesn't support is_kernel_driver_active/detach_kernel_driver
if not kernel_driver_warning_shown:
if interface.bInterfaceClass == 3: # HID class
logger.warning(
"Cannot detach kernel driver on macOS. "
"HID devices (mice, keyboards) may not work correctly "
"as macOS will still consume input events."
)
kernel_driver_warning_shown = True
# Claim the interface
try:
usb.util.claim_interface(self.device, interface_number)
logger.debug("Claimed interface %d", interface_number)
except usb.core.USBError as error:
if error.errno == 13: # Access denied
access_denied = True
logger.debug("Access denied for interface %d", interface_number)
else:
logger.warning("Could not claim interface %d: %s", interface_number, error)
except usb.core.USBError as error:
if error.errno == 13:
access_denied = True
logger.warning("Error claiming device: %s", error)
if access_denied:
logger.error("Insufficient permissions to access USB device. Try running with sudo.")
return False
return True
def release(self) -> None:
"""Release all interfaces of the device.
Should be called when done using the device to allow other
processes to access it.
"""
logger = logging.getLogger(__name__)
try:
config = self.device.get_active_configuration()
for interface in config:
interface_number = interface.bInterfaceNumber
try:
usb.util.release_interface(self.device, interface_number)
logger.debug("Released interface %d", interface_number)
except usb.core.USBError as error:
logger.debug("Could not release interface %d: %s", interface_number, error)
except usb.core.USBError as error:
logger.debug("Could not get configuration for release: %s", error)
def get_detailed_info(self) -> str:
"""Get detailed device information as a formatted string.
Returns:
Multi-line string with device details including configurations
and endpoints.
"""
lines = [
f"Bus ID: {self.bus_id}",
f"Vendor ID: 0x{self.vendor_id:04x}",
f"Product ID: 0x{self.product_id:04x}",
f"Manufacturer: {self.manufacturer or 'N/A'}",
f"Product: {self.product or 'N/A'}",
f"Serial Number: {self.serial_number or 'N/A'}",
f"Device Class: 0x{self.device.bDeviceClass:02x}",
f"Device Subclass: 0x{self.device.bDeviceSubClass:02x}",
f"Device Protocol: 0x{self.device.bDeviceProtocol:02x}",
f"Max Packet Size: {self.device.bMaxPacketSize0}",
f"Number of Configurations: {self.device.bNumConfigurations}",
]
# Add configuration details
for config in self.device:
lines.append(f"\nConfiguration {config.bConfigurationValue}:")
lines.append(f" Total Length: {config.wTotalLength}")
lines.append(f" Number of Interfaces: {config.bNumInterfaces}")
for interface in config:
lines.append(
f"\n Interface {interface.bInterfaceNumber}, "
f"Alt Setting {interface.bAlternateSetting}:"
)
lines.append(f" Class: 0x{interface.bInterfaceClass:02x}")
lines.append(f" Subclass: 0x{interface.bInterfaceSubClass:02x}")
lines.append(f" Protocol: 0x{interface.bInterfaceProtocol:02x}")
lines.append(f" Number of Endpoints: {interface.bNumEndpoints}")
for endpoint in interface:
direction = "IN" if endpoint.bEndpointAddress & 0x80 else "OUT"
transfer_type = {
0: "Control",
1: "Isochronous",
2: "Bulk",
3: "Interrupt",
}.get(endpoint.bmAttributes & 0x03, "Unknown")
lines.append(
f"\n Endpoint 0x{endpoint.bEndpointAddress:02x} "
f"({direction}, {transfer_type}):"
)
lines.append(f" Max Packet Size: {endpoint.wMaxPacketSize}")
lines.append(f" Interval: {endpoint.bInterval}")
return "\n".join(lines)
class USBDeviceManager:
"""Manager class for USB device enumeration and lookup.
Provides methods to list devices, find devices by various criteria,
and resolve bindings to current devices.
"""
def __init__(self) -> None:
"""Initialize the USBDeviceManager."""
self._logger = logging.getLogger(__name__)
def list_devices(self) -> list[USBDevice]:
"""List all available USB devices.
Returns:
List of USBDevice objects for all connected devices.
"""
devices = usb.core.find(find_all=True)
return [USBDevice(device) for device in devices]
def find_by_bus_id(self, bus_id: str) -> "USBDevice" | None:
"""Find a device by its bus ID.
Args:
bus_id: The bus ID string (e.g., "20-4.2.1" or "0-1").
Returns:
The USBDevice if found, None otherwise.
"""
try:
target_bus, target_ports = USBDevice.parse_bus_id(bus_id)
except ValueError as error:
self._logger.error("Invalid bus ID: %s", error)
return None
devices = usb.core.find(find_all=True)
for device in devices:
if device.bus == target_bus:
port_numbers = device.port_numbers
if port_numbers:
# Device has port numbers - match against port path
if tuple(port_numbers) == target_ports:
return USBDevice(device)
else:
# Device has no port numbers - match against address
# This handles the fallback case in build_bus_id
if len(target_ports) == 1 and device.address == target_ports[0]:
return USBDevice(device)
return None
def find_by_identity(
self,
vendor_id: int,
product_id: int,
serial_number: str | None = None,
) -> "USBDevice" | None:
"""Find a device by VID, PID, and optionally serial number.
Args:
vendor_id: The vendor ID to match.
product_id: The product ID to match.
serial_number: The serial number to match (optional).
Returns:
The USBDevice if found, None otherwise.
"""
devices = usb.core.find(
find_all=True,
idVendor=vendor_id,
idProduct=product_id,
)
for device in devices:
usb_device = USBDevice(device)
# Normalize both to empty string for comparison
# (binding stores "" for no serial, USBDevice returns None)
search_serial = serial_number or ""
device_serial = usb_device.serial_number or ""
if search_serial == device_serial:
return usb_device
return None
def find_by_binding(self, binding: dict[str, str]) -> "USBDevice" | None:
"""Find a device that matches a binding configuration.
The binding dictionary should contain 'vendor_id', 'product_id',
and optionally 'serial_number'.
Args:
binding: Dictionary with device identity information.
Returns:
The USBDevice if found, None otherwise.
"""
try:
vendor_id = int(binding["vendor_id"], 16)
product_id = int(binding["product_id"], 16)
serial_number = binding.get("serial_number")
except (KeyError, ValueError) as error:
self._logger.error("Invalid binding format: %s", error)
return None
return self.find_by_identity(vendor_id, product_id, serial_number)