Skip to content

Commit d8def4f

Browse files
authored
Merge pull request #37 from dataiku/bug/sc-133490-stable-output-schema
Bug/sc 133490 stable output schema
2 parents e5f5d41 + 4935aa2 commit d8def4f

6 files changed

Lines changed: 57 additions & 15 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@
44

55
- Add Brotli compression
66
- Faster recurring calls
7+
- dku_error column kept at all time in API-Connect recipe output schema
78
- Updated code-env descriptor for DSS 12
89

910

1011
## [Version 1.1.4](https://github.com/dataiku/dss-plugin-api-connect/releases/tag/v1.1.4) - Feature and bugfix release - 2023-02-28
1112

1213
- Add Brotli compression
1314
- Faster recurring calls
15+
- dku_error column kept at all time in API-Connect recipe output schema
1416

1517
## [Version 1.1.3](https://github.com/dataiku/dss-plugin-api-connect/releases/tag/v1.1.3) - Bugfix release - 2023-04-18
1618

custom-recipes/api-connect/recipe.json

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,19 @@
237237
"type": "SEPARATOR",
238238
"label": "Advanced"
239239
},
240+
{
241+
"name": "behaviour_when_error",
242+
"label": "Error behaviour",
243+
"description": "Decide how the recipe should react when an input line results in an error",
244+
"type": "SELECT",
245+
"defaultValue": "keep-error-column",
246+
"selectChoices":[
247+
{"value": "ignore", "label": "Ignore the error"},
248+
{"value": "add-error-column", "label": "Add an error column"},
249+
{"value": "keep-error-column", "label": "Error column always in schema"},
250+
{"value": "raise", "label": "Fail the job"}
251+
]
252+
},
240253
{
241254
"name": "ignore_ssl_check",
242255
"label": "Ignore SSL check",

custom-recipes/api-connect/recipe.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ def get_partitioning_keys(id_list, dku_flow_variables):
3535
logger.info("config={}".format(logger.filter_secrets(config)))
3636

3737
credential_parameters = config.get("credential", {})
38+
behaviour_when_error = config.get("behaviour_when_error", "add-error-column")
3839
endpoint_parameters = get_endpoint_parameters(config)
3940
extraction_key = endpoint_parameters.get("extraction_key", "")
4041
is_raw_output = endpoint_parameters.get("raw_output", True)
@@ -58,7 +59,8 @@ def get_partitioning_keys(id_list, dku_flow_variables):
5859
parameter_columns,
5960
parameter_renamings,
6061
display_metadata,
61-
maximum_number_rows=maximum_number_rows
62+
maximum_number_rows=maximum_number_rows,
63+
behaviour_when_error=behaviour_when_error
6264
)
6365
results = recipe_session.process_dataframe(input_parameters_dataframe, is_raw_output)
6466

python-lib/dku_constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@ class DKUConstants(object):
44
FORM_DATA_BODY_FORMAT = "FORM_DATA"
55
PLUGIN_VERSION = "1.2.0"
66
RAW_BODY_FORMAT = "RAW"
7+
REPONSE_ERROR_KEY = "dku_error"

python-lib/rest_api_client.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ class RestAPIClientError(ValueError):
1717

1818
class RestAPIClient(object):
1919

20-
def __init__(self, credential, endpoint, custom_key_values={}, session=None):
20+
def __init__(self, credential, endpoint, custom_key_values={}, session=None, behaviour_when_error=None):
2121
logger.info("Initialising RestAPIClient, credential={}, endpoint={}".format(logger.filter_secrets(credential), endpoint))
2222

2323
# presets_variables contains all variables available in templates using the {{variable_name}} notation
@@ -57,6 +57,7 @@ def __init__(self, credential, endpoint, custom_key_values={}, session=None):
5757
self.timeout = endpoint.get("timeout", -1)
5858
if self.timeout > 0:
5959
self.requests_kwargs.update({"timeout": self.timeout})
60+
self.behaviour_when_error = behaviour_when_error or "add-error-column"
6061

6162
self.requests_kwargs.update({"params": self.params})
6263
self.pagination = Pagination()
@@ -90,6 +91,8 @@ def __init__(self, credential, endpoint, custom_key_values={}, session=None):
9091
key_value_body = endpoint.get("key_value_body", {})
9192
self.requests_kwargs.update({"json": get_dku_key_values(key_value_body)})
9293
self.metadata = {}
94+
if self.behaviour_when_error == "keep-error-column":
95+
self.metadata = {DKUConstants.REPONSE_ERROR_KEY: None}
9396
self.call_number = 0
9497
self.session = session or requests.Session()
9598

@@ -133,8 +136,12 @@ def request(self, method, url, can_raise_exeption=True, **kwargs):
133136
request_start_time = time.time()
134137
self.time_last_request = request_start_time
135138
error_message = None
139+
status_code = None
140+
response_headers = None
136141
try:
137142
response = self.request_with_redirect_retry(method, url, **kwargs)
143+
status_code = response.status_code
144+
response_headers = response.headers
138145
except Exception as err:
139146
self.pagination.is_last_batch_empty = True
140147
error_message = "Error: {}".format(err)
@@ -143,19 +150,19 @@ def request(self, method, url, can_raise_exeption=True, **kwargs):
143150

144151
request_finish_time = time.time()
145152
self.set_metadata("request_duration", request_finish_time - request_start_time)
146-
self.set_metadata("status_code", response.status_code)
147-
self.set_metadata("response_headers", "{}".format(response.headers))
153+
self.set_metadata("status_code", status_code)
154+
self.set_metadata("response_headers", "{}".format(response_headers))
148155

149156
if error_message:
150-
return {"error": error_message}
157+
return {} if self.behaviour_when_error=="ignore" else {DKUConstants.REPONSE_ERROR_KEY: error_message}
151158

152159
if response.status_code >= 400:
153160
error_message = "Error {}: {}".format(response.status_code, response.content)
154161
self.pagination.is_last_batch_empty = True
155162
if can_raise_exeption:
156163
raise RestAPIClientError(error_message)
157164
else:
158-
return {"error": error_message}
165+
return {} if self.behaviour_when_error=="ignore" else {DKUConstants.REPONSE_ERROR_KEY: error_message}
159166
if response.status_code in [204]:
160167
self.pagination.update_next_page({}, response.links)
161168
return self.empty_json_response()
@@ -168,7 +175,7 @@ def request(self, method, url, can_raise_exeption=True, **kwargs):
168175
logger.error("response.content={}".format(response.content))
169176
if can_raise_exeption:
170177
raise RestAPIClientError("The API did not return JSON as expected. {}".format(error_message))
171-
return {"error": error_message}
178+
return {} if self.behaviour_when_error=="ignore" else {DKUConstants.REPONSE_ERROR_KEY: error_message}
172179

173180
self.pagination.update_next_page(json_response, response.links)
174181
return json_response

python-lib/rest_api_recipe_session.py

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
class RestApiRecipeSession:
1515
def __init__(self, custom_key_values, credential_parameters, endpoint_parameters, extraction_key, parameter_columns, parameter_renamings,
1616
display_metadata=False,
17-
maximum_number_rows=-1):
17+
maximum_number_rows=-1, behaviour_when_error=None):
1818
self.custom_key_values = custom_key_values
1919
self.credential_parameters = credential_parameters
2020
self.endpoint_parameters = endpoint_parameters
@@ -25,7 +25,8 @@ def __init__(self, custom_key_values, credential_parameters, endpoint_parameters
2525
self.display_metadata = display_metadata
2626
self.maximum_number_rows = maximum_number_rows
2727
self.is_row_limit = (self.maximum_number_rows > 0)
28-
self.can_raise = False
28+
self.behaviour_when_error = behaviour_when_error or "add-error-column"
29+
self.can_raise = self.behaviour_when_error == "raise"
2930

3031
@staticmethod
3132
def get_column_to_parameter_dict(parameter_columns, parameter_renamings):
@@ -55,7 +56,13 @@ def process_dataframe(self, input_parameters_dataframe, is_raw_output):
5556
updated_endpoint_parameters,
5657
self.custom_key_values
5758
))
58-
self.client = RestAPIClient(self.credential_parameters, updated_endpoint_parameters, custom_key_values=self.custom_key_values, session=session)
59+
self.client = RestAPIClient(
60+
self.credential_parameters,
61+
updated_endpoint_parameters,
62+
custom_key_values=self.custom_key_values,
63+
session=session,
64+
behaviour_when_error=self.behaviour_when_error
65+
)
5966
self.client.time_last_request = time_last_request
6067
while self.client.has_more_data():
6168
page_results = self.retrieve_next_page(is_raw_output)
@@ -69,17 +76,24 @@ def process_dataframe(self, input_parameters_dataframe, is_raw_output):
6976
def retrieve_next_page(self, is_raw_output):
7077
page_rows = []
7178
logger.info("retrieve_next_page: Calling next page")
72-
json_response = self.client.paginated_api_call(can_raise_exeption=False)
73-
metadata = self.client.get_metadata() if self.display_metadata else {}
79+
json_response = self.client.paginated_api_call(can_raise_exeption=self.can_raise)
80+
default_dict = {
81+
DKUConstants.REPONSE_ERROR_KEY: json_response.get(DKUConstants.REPONSE_ERROR_KEY, None)
82+
} if self.behaviour_when_error == "keep-error-column" else {}
83+
metadata = self.client.get_metadata() if self.display_metadata else default_dict
7484
is_api_returning_dict = True
7585
if self.extraction_key:
7686
data_rows = get_value_from_path(json_response, self.extraction_key.split("."), can_raise=False)
77-
if data_rows is None:
87+
if data_rows is None or type(data_rows) != list:
88+
if self.behaviour_when_error == "ignore":
89+
return []
7890
error_message = "Extraction key '{}' was not found in the incoming data".format(self.extraction_key)
7991
if self.can_raise:
8092
raise DataikuException(error_message)
93+
elif DKUConstants.REPONSE_ERROR_KEY in metadata:
94+
return [metadata]
8195
else:
82-
return self.format_page_rows([{"error": error_message}], is_raw_output, metadata)
96+
return self.format_page_rows([{DKUConstants.REPONSE_ERROR_KEY: error_message}], is_raw_output, metadata)
8397
page_rows.extend(self.format_page_rows(data_rows, is_raw_output, metadata))
8498
else:
8599
# Todo: check api_response key is free and add something overwise
@@ -115,6 +129,9 @@ def format_page_rows(self, data_rows, is_raw_output, metadata=None):
115129
base_row.update(metadata)
116130
if is_raw_output:
117131
if is_error_message(data_row):
132+
base_row.update({
133+
DKUConstants.API_RESPONSE_KEY: None
134+
})
118135
base_row.update(parse_keys_for_json(data_row))
119136
else:
120137
base_row.update({
@@ -129,7 +146,7 @@ def format_page_rows(self, data_rows, is_raw_output, metadata=None):
129146
def is_error_message(jsons_response):
130147
if type(jsons_response) not in [dict, list]:
131148
return False
132-
if "error" in jsons_response and len(jsons_response) == 1:
149+
if DKUConstants.REPONSE_ERROR_KEY in jsons_response and len(jsons_response) == 1:
133150
return True
134151
else:
135152
return False

0 commit comments

Comments
 (0)