-
Notifications
You must be signed in to change notification settings - Fork 11
Expand file tree
/
Copy pathjob.py
More file actions
157 lines (128 loc) · 5.12 KB
/
job.py
File metadata and controls
157 lines (128 loc) · 5.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import time
from dataclasses import dataclass
from typing import Dict, List
import requests
from nucleus.constants import (
JOB_CREATION_TIME_KEY,
JOB_ID_KEY,
JOB_LAST_KNOWN_STATUS_KEY,
JOB_TYPE_KEY,
STATUS_KEY,
)
from nucleus.utils import replace_double_slashes
JOB_POLLING_INTERVAL = 5
@dataclass
class AsyncJob:
"""Object used to check the status or errors of a long running asynchronous operation.
::
import nucleus
client = nucleus.NucleusClient(YOUR_SCALE_API_KEY)
dataset = client.get_dataset("ds_bwkezj6g5c4g05gqp1eg")
# When kicking off an asynchronous job, store the return value as a variable
job = dataset.append(items=YOUR_DATASET_ITEMS, asynchronous=True)
# Poll for status or errors
print(job.status())
print(job.errors())
# Block until job finishes
job.sleep_until_complete()
"""
job_id: str
job_last_known_status: str
job_type: str
job_creation_time: str
client: "NucleusClient" # type: ignore # noqa: F821
def status(self) -> Dict[str, str]:
"""Fetches status of the job and an informative message on job progress.
Returns:
A dict of the job ID, status (one of Running, Completed, or Errored),
an informative message on the job progress, and number of both completed
and total steps.
::
{
"job_id": "job_c19xcf9mkws46gah0000",
"status": "Completed",
"message": "Job completed successfully.",
"job_progress": "0.33",
"completed_steps": "1",
"total_steps:": "3",
}
"""
response = self.client.make_request(
payload={},
route=f"job/{self.job_id}",
requests_command=requests.get,
)
self.job_last_known_status = response[STATUS_KEY]
return response
def errors(self) -> List[str]:
"""Fetches a list of the latest errors generated by the asynchronous job.
Useful for debugging failed or partially successful jobs.
Returns:
A list of strings containing the 10,000 most recently generated errors.
::
[
'{"annotation":{"label":"car","type":"box","geometry":{"x":50,"y":60,"width":70,"height":80},"referenceId":"bad_ref_id","annotationId":"attempted_annot_upload","metadata":{}},"error":"Item with id bad_ref_id doesn\'t exist."}'
]
"""
errors = self.client.make_request(
payload={},
route=f"job/{self.job_id}/errors",
requests_command=requests.get,
)
return [replace_double_slashes(error) for error in errors]
def sleep_until_complete(
self, verbose_std_out=True, timeout_s: int = None
):
"""Blocks until the job completes or errors.
Parameters:
verbose_std_out (Optional[bool]): Whether or not to verbosely log while
sleeping. Defaults to True.
timeout_s: Raise error if job is still running after timout_s seconds
"""
start_time = time.perf_counter()
while 1:
status = self.status()
time.sleep(JOB_POLLING_INTERVAL)
time_elapsed = time.perf_counter() - start_time
if verbose_std_out:
print(f"Status at {time_elapsed} s: {status}")
if timeout_s and time_elapsed > timeout_s:
raise JobTimeoutError(self, timeout_s)
if status["status"] == "Running":
continue
break
if verbose_std_out:
print(
f"Finished at {time.perf_counter() - start_time} s: {status}"
)
final_status = status
if final_status["status"] == "Errored":
raise JobError(final_status, self)
@classmethod
def from_json(cls, payload: dict, client):
# TODO: make private
return cls(
job_id=payload[JOB_ID_KEY],
job_last_known_status=payload[JOB_LAST_KNOWN_STATUS_KEY],
job_type=payload[JOB_TYPE_KEY],
job_creation_time=payload[JOB_CREATION_TIME_KEY],
client=client,
)
class JobError(Exception):
def __init__(self, job_status: Dict[str, str], job: AsyncJob):
final_status_message = job_status["message"]
final_status = job_status["status"]
message = (
f"The job reported a final status of {final_status} "
"This could, however, mean a partial success with some successes and some failures. "
f"The final status message was: {final_status_message} \n"
f"For more detailed error messages you can call {str(job)}.errors()"
)
message = replace_double_slashes(message)
super().__init__(message)
class JobTimeoutError(Exception):
def __init__(self, job: AsyncJob, timeout_seconds):
message = (
f"Refusing to wait longer for job: {job.job_id}. It is still running after {timeout_seconds} seconds",
)
super().__init__(message)