-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpython_util.py
More file actions
393 lines (330 loc) · 12.4 KB
/
python_util.py
File metadata and controls
393 lines (330 loc) · 12.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
"""
Include useful Python utilities that are not part of the standard library.
"""
import os, sys
import hashlib
import typing
import time
import datetime
import fcntl
import re
import heapq
import linecache
import tracemalloc
StrBytesT = typing.TypeVar("StrBytesT", str, bytes)
# NOTE [Python typing on SupportsRead and SupportsWrite]:
# The typing module does not provide SupportsRead and SupportsWrite instances for runtime type
# checking, a custom implementation is provided below. The str version for SupportsRead and
# SupportsWrite is provided below.
@typing.runtime_checkable
class SupportsReadStr(typing.Protocol):
def read(self, size: typing.Optional[int] = -1, /) -> str: ...
@typing.runtime_checkable
class SupportsWriteStr(typing.Protocol):
def write(self, data: str, /) -> None: ...
def get_script_path(file: str) -> str:
"""
Get the absolute path of the script file, resolving symlinks.
:param file: The script file name or path.
:return: The absolute path of the script file.
"""
return os.path.realpath(file)
def get_script_dir(file: str) -> str:
"""
Get the directory of the script file.
:param file: The script file name or path.
:return: The directory of the script file.
"""
return os.path.dirname(get_script_path(file))
def printerr(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)
def safeval(val, default):
if val:
return val
return default
def hash_file(algo, path, bufsize=131072):
assert algo in hashlib.algorithms_available, f"Hash algorithm {algo} is not supported"
assert os.path.isfile(path), f"File \"{path}\" does not exist (Working dir \"{os.getcwd()}\")"
hs = hashlib.new(algo)
buf = bytearray(bufsize)
mv = memoryview(buf)
with open(path, "rb", buffering=0) as fin:
while nbytes := fin.readinto(mv):
hs.update(mv[:nbytes])
return hs.hexdigest()
dependency_check_funcs = {
"f": lambda s: os.path.isfile(s),
"d": lambda s: os.path.isdir(s),
"e": lambda s: os.access(s, os.F_OK),
"r": lambda s: os.access(s, os.R_OK),
"w": lambda s: os.access(s, os.W_OK),
"x": lambda s: os.access(s, os.X_OK),
}
def check_dependency(test_type, dependency) -> typing.Optional[str]:
func = dependency_check_funcs.get(test_type)
assert func is not None, f"Invalid type string {test_type}"
if func(dependency):
return os.path.realpath(dependency)
def find_device_for_path(path: str, device_name_only: bool = True) -> typing.Optional[str]:
path = os.path.realpath(path)
if not os.path.exists(path):
return None
dev = os.stat(path).st_dev
try:
with open("/proc/mounts") as f:
for line in f:
line_split = line.split()
target_dev, mount_point, *_ = line_split
try:
if os.stat(mount_point).st_dev == dev:
if device_name_only:
return os.path.basename(target_dev)
return target_dev
except Exception:
continue
except FileNotFoundError:
return None
return None
def display_options(
option_list: dict[str, str],
default_option: str,
print_long_descriptions: bool = False,
message: str = "",
*args: list[str],
) -> str:
default_mark_str = "(default) "
option_list_copy = option_list.copy()
message = message.format(*args) if len(message) != 0 else ""
is_short_format = all(len(option) == 1 for option in option_list.keys())
assert default_option in option_list, "Default option must be in the option list."
assert all(
option.islower() for option in option_list.keys()
), "All options must be in lowercase letters."
print_delimiter = "" if is_short_format else "/"
max_opt_len = max(len(option) for option in option_list.keys())
display_option_list = [
(
option
if option != default_option
else (option.upper() if is_short_format else f"[{option}]")
)
for option in option_list.keys()
]
if print_long_descriptions:
for option, description in option_list_copy.items():
is_default_option = option == default_option
default_str = ""
if is_default_option:
default_str = default_mark_str
option = option.upper() if is_short_format else f"[{option}]"
message += (
f"\n{default_str:{len(default_mark_str)}s}{option:<{max_opt_len}s}: {description}"
)
display_options_str = print_delimiter.join(display_option_list)
message += f"\n Selection" if print_long_descriptions else ""
message += f" ({display_options_str}) ? "
repeat_message = f"Invalid selection ({display_options_str}) ? "
while True:
selection = input(message).strip().lower()
if len(selection) == 0:
return default_option
if selection in option_list.keys():
return selection
message = repeat_message
def display_yn_option(message: str = "", *args: list[str]) -> bool:
yn_options = {"y": "Confirm", "n": "Deny"}
selection = display_options(
yn_options, default_option="n", print_long_descriptions=False, message=message, *args
)
return selection == "y"
def acquireLockFile(lock_file: str):
"""
Acquire an exclusive lock on a file.
@param lock_file: The path to the lock file.
@return: A file descriptor for the locked file.
@raises OSError: If the lock cannot be acquired.
"""
locked_file_descriptor = open(lock_file, "w+")
try:
# non-blocking and exclusive lock
fcntl.lockf(locked_file_descriptor, fcntl.LOCK_NB | fcntl.LOCK_EX)
except OSError as e:
printerr(
f"Could not get lock {lock_file} - {e.errno}: {e.strerror}.\n"
f"Unable to acquire the lock, is another process using it? "
)
raise e
return locked_file_descriptor
def releaseLockFile(locked_file_descriptor):
"""
Release the lock on a file and close the file descriptor.
@param locked_file_descriptor: The file descriptor for the locked file.
"""
locked_file_descriptor.close()
# handle time formats like "2025-10-04T11:49:20.880Z"
def parse_time(time_str: str, format: re.Pattern) -> typing.Optional[time.struct_time]:
"""
Parse a time string into a struct_time object.
@param time_str The time string to parse.
@param format The format string to use for parsing.
@return A struct_time object if parsing is successful, None otherwise.
"""
match = format.match(time_str)
total_struct_len = 6
if match and len(match.groups()) <= total_struct_len:
return time.struct_time(
(
*map(int, match.groups()),
*(0 for _ in range(total_struct_len - len(match.groups()))),
0, # ignore tm_wday
0, # ignore tm_yday
-1, # ignore daylight savings time
)
)
return None
def parse_time_datetime(
time_str: StrBytesT,
format: re.Pattern[StrBytesT],
is_millisecond: bool = True,
tzinfo: datetime.tzinfo = datetime.timezone.utc,
) -> datetime.datetime | None:
match = format.match(time_str)
total_struct_len = 7
if match and len(match.groups()) <= total_struct_len:
matched_groups = list(map(int, match.groups()))
if len(match.groups()) == 7 and is_millisecond:
matched_groups[6] *= 1000 # convert milliseconds to microseconds
return datetime.datetime(*matched_groups, tzinfo=tzinfo)
return None
def parse_iso_time_format(time_str: str) -> typing.Optional[datetime.datetime]:
try:
return datetime.datetime.fromisoformat(time_str[:-1])
except ValueError:
return None
# this is faster than get_first_word_re in practice for line with date info at the beginning of line
def get_first_word(line: str) -> str:
"""
Get the first word from a line.
The first word is defined as the first sequence of non-whitespace characters.
@param line The input line.
@return The first word in the line.
"""
# raise NotImplementedError()
leading_space_end = -1
for i, c in enumerate(line):
if c.isspace():
if leading_space_end < -1:
# the space is a leading space, go on
continue
# the space first seen, perform substring
return line[leading_space_end + 1 : i]
elif leading_space_end < -1:
# encountered first non-space character
leading_space_end = i - 1
return line[leading_space_end + 1 :]
first_word_re = re.compile(r"^\s*(\S+)")
def get_first_word_re(line: str) -> str:
"""
Get the first word from a line using a regular expression.
The first word is defined as the first sequence of non-whitespace characters.
@param line The input line.
@return The first word in the line.
"""
match = first_word_re.match(line)
return match.group(1) if match else line
def get_line_without_first_word(line: str) -> str:
"""
Get the line without the first word.
The first word is defined as the first sequence of non-whitespace characters.
@param line The input line.
@return The line without the first word.
"""
leading_space_end = -2
leading_word_end = -1
for i, c in enumerate(line):
if c.isspace():
if leading_space_end >= -1:
leading_word_end = i - 1
else:
if leading_space_end < -1:
leading_space_end = i - 1
elif leading_word_end >= 0:
return line[i:]
return ""
# This is faster than get_line_without_first_word in practice for long lines
line_without_first_word_re = re.compile(r"^\s*\S+\s(.*\r?\n)")
def get_line_without_first_word_re(line: str) -> str:
"""
Get the line without the first word using a regular expression.
The first word is defined as the first sequence of non-whitespace characters.
@param line The input line.
@return The line without the first word.
"""
match = line_without_first_word_re.match(line)
return match.group(1) if match else line
def common_prefix_two(s1: str, s2: str):
for i, c in enumerate(s1):
if c != s2[i]:
return s1[:i]
return s1
def common_prefix(m: list[StrBytesT]) -> typing.Optional[StrBytesT]:
"""
Reference: https://stackoverflow.com/questions/6718196/determine-the-common-prefix-of-multiple-strings
"""
m = [s for s in m if len(s) > 0]
if len(m) == 0:
return None
if len(m) == 1:
return type(m[0])()
s1 = min(m)
s2 = max(m)
for i, c in enumerate(s1):
if c != s2[i]:
return s1[:i]
return s1
def sublist_creator(
lst: typing.Sequence,
n: int,
value_func: typing.Callable[[typing.Any], int]
) -> list[list]:
lists = [[] for _ in range(n)]
totals = [(0, i) for i in range(n)]
heapq.heapify(totals)
for item in lst:
value = value_func(item)
total, index = heapq.heappop(totals)
lists[index].append(item)
heapq.heappush(totals, (total + value, index))
return lists
def get_dir_total_size(path: str):
return sum([
os.stat(os.path.join(path, file)).st_size
for file in os.listdir(path)
])
def get_dir_content_size(path: str):
return {
os.path.join(path, file): os.stat(os.path.join(path, file)).st_size
for file in os.listdir(path)
}
def display_top(snapshot, key_type='lineno', limit=3):
snapshot = snapshot.filter_traces((
tracemalloc.Filter(False, "<frozen importlib._bootstrap>"),
tracemalloc.Filter(False, "<unknown>"),
))
top_stats = snapshot.statistics(key_type)
print("Top %s lines" % limit)
for index, stat in enumerate(top_stats[:limit], 1):
frame = stat.traceback[0]
# replace "/path/to/module/file.py" with "module/file.py"
filename = os.sep.join(frame.filename.split(os.sep)[-2:])
print("#%s: %s:%s: %.1f KiB"
% (index, filename, frame.lineno, stat.size / 1024))
line = linecache.getline(frame.filename, frame.lineno).strip()
if line:
print(' %s' % line)
other = top_stats[limit:]
if other:
size = sum(stat.size for stat in other)
print("%s other: %.1f KiB" % (len(other), size / 1024))
total = sum(stat.size for stat in top_stats)
print("Total allocated size: %.1f KiB" % (total / 1024))