-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathutils.py
More file actions
206 lines (162 loc) · 6.14 KB
/
utils.py
File metadata and controls
206 lines (162 loc) · 6.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
"""
TrendScan Utilities
Shared utility functions for logging, file operations, and data processing.
"""
import json
import logging
import re
from pathlib import Path
from datetime import datetime
from typing import Optional, Dict, Any
from logging.handlers import RotatingFileHandler
from config import LoggingConfig
def setup_logging(config: LoggingConfig) -> logging.Logger:
"""Set up logging with console and optional file handlers."""
log_dir = Path(config.log_directory)
log_dir.mkdir(parents=True, exist_ok=True)
root_logger = logging.getLogger()
root_logger.setLevel(getattr(logging, config.level.upper()))
root_logger.handlers.clear() # Avoid duplicate handlers
formatter = logging.Formatter(config.format)
# Console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(getattr(logging, config.level.upper()))
console_handler.setFormatter(formatter)
root_logger.addHandler(console_handler)
# File handler (if enabled)
if config.enable_file_logging:
log_file = log_dir / f"TrendScan_{datetime.now().strftime('%Y%m%d')}.log"
file_handler = RotatingFileHandler(
log_file,
maxBytes=config.max_file_size_mb * 1024 * 1024,
backupCount=config.backup_count,
encoding='utf-8'
)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(formatter)
root_logger.addHandler(file_handler)
logger = logging.getLogger("TrendScan")
logger.info("Logging system initialized")
return logger
def sanitize_filename(filename: str) -> str:
"""Clean filename by removing invalid characters and normalizing format."""
# Remove invalid file system characters
sanitized = re.sub(r'[<>:"/\\|?*]', '_', filename)
# Normalize whitespace
sanitized = re.sub(r'\s+', '_', sanitized)
sanitized = sanitized.strip(' .')
sanitized = sanitized.lower()
if not sanitized:
sanitized = "unknown"
# Limit length to prevent file system issues
if len(sanitized) > 100:
sanitized = sanitized[:100]
return sanitized
def create_output_structure(company_name: str, base_directory: str,
include_timestamp: bool = True) -> Path:
"""Create organized output directory structure for a company."""
safe_company_name = sanitize_filename(company_name)
base_path = Path(base_directory)
base_path.mkdir(parents=True, exist_ok=True)
# Add timestamp to avoid directory conflicts
if include_timestamp:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
company_dir = base_path / f"{safe_company_name}_{timestamp}"
else:
company_dir = base_path / safe_company_name
company_dir.mkdir(parents=True, exist_ok=True)
return company_dir
def format_duration(seconds: float) -> str:
"""Format seconds into human-readable duration string."""
if seconds < 60:
return f"{seconds:.1f}s"
elif seconds < 3600:
minutes = seconds / 60
return f"{minutes:.1f}m"
else:
hours = seconds / 3600
return f"{hours:.1f}h"
def format_file_size(size_bytes: int) -> str:
"""Format bytes into human-readable file size string."""
if size_bytes < 1024:
return f"{size_bytes} B"
elif size_bytes < 1024 * 1024:
kb = size_bytes / 1024
return f"{kb:.1f} KB"
elif size_bytes < 1024 * 1024 * 1024:
mb = size_bytes / (1024 * 1024)
return f"{mb:.1f} MB"
else:
gb = size_bytes / (1024 * 1024 * 1024)
return f"{gb:.1f} GB"
def get_file_info(file_path: str) -> Dict[str, Any]:
"""Get comprehensive file information including size, dates, and metadata."""
try:
path = Path(file_path)
if not path.exists():
return {"exists": False}
stat = path.stat()
return {
"exists": True,
"name": path.name,
"size_bytes": stat.st_size,
"size_formatted": format_file_size(stat.st_size),
"created": datetime.fromtimestamp(stat.st_ctime).isoformat(),
"modified": datetime.fromtimestamp(stat.st_mtime).isoformat(),
"extension": path.suffix.lower(),
"is_directory": path.is_dir(),
"absolute_path": str(path.absolute())
}
except Exception as e:
return {
"exists": False,
"error": str(e)
}
def validate_json_file(file_path: str) -> Dict[str, Any]:
"""Validate JSON file format and return parsing information."""
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = f.read()
# Parse and validate JSON structure
parsed_data = json.loads(data)
return {
"valid": True,
"size_chars": len(data),
"type": type(parsed_data).__name__,
"items_count": len(parsed_data) if isinstance(parsed_data, (list, dict)) else None
}
except json.JSONDecodeError as e:
return {
"valid": False,
"error": f"JSON decode error: {e}",
"error_type": "json"
}
except Exception as e:
return {
"valid": False,
"error": str(e),
"error_type": "file"
}
if __name__ == "__main__":
# Test the utility functions
print("Testing TrendScan utilities...")
test_names = [
"OpenAI Inc.",
"Anthropic/Claude",
"Microsoft <Azure>",
"Google (Alphabet)",
"Très Spécial Café"
]
print("\nFilename sanitization:")
for name in test_names:
sanitized = sanitize_filename(name)
print(f" {name} -> {sanitized}")
print("\nDuration formatting:")
for seconds in [5.5, 75.3, 3661.2, 86401.5]:
formatted = format_duration(seconds)
print(f" {seconds}s -> {formatted}")
print("\nFile size formatting:")
for size in [512, 1536, 1048576, 1073741824]:
formatted = format_file_size(size)
print(f" {size} bytes -> {formatted}")
print("\nUtilities test completed!")