Skip to content

Commit cde8a66

Browse files
authored
Merge pull request #35 from dataiku/release/1.2.0
Release/1.2.0
2 parents cc74416 + 44b3d40 commit cde8a66

10 files changed

Lines changed: 96 additions & 30 deletions

File tree

CHANGELOG.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,18 @@
11
# Changelog
22

3+
## [Version 1.2.0](https://github.com/dataiku/dss-plugin-api-connect/releases/tag/v1.2.0) - Feature and bugfix release - 2023-05-31
4+
5+
- Add Brotli compression
6+
- Faster recurring calls
7+
- dku_error column kept at all time in API-Connect recipe output schema
8+
- Updated code-env descriptor for DSS 12
9+
10+
## [Version 1.1.4](https://github.com/dataiku/dss-plugin-api-connect/releases/tag/v1.1.4) - Feature and bugfix release - 2023-02-28
11+
12+
- Add Brotli compression
13+
- Faster recurring calls
14+
- dku_error column kept at all time in API-Connect recipe output schema
15+
316
## [Version 1.1.3](https://github.com/dataiku/dss-plugin-api-connect/releases/tag/v1.1.3) - Bugfix release - 2023-04-18
417

518
- Updated code-env descriptor for DSS 12
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,4 @@
11
jsonpath-ng==1.5.3
22
requests_ntlm==1.1.0
3+
requests==2.26.0
4+
Brotli==1.0.9

custom-recipes/api-connect/recipe.json

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,19 @@
237237
"type": "SEPARATOR",
238238
"label": "Advanced"
239239
},
240+
{
241+
"name": "behaviour_when_error",
242+
"label": "Error behaviour",
243+
"description": "Decide how the recipe should react when an input line results in an error",
244+
"type": "SELECT",
245+
"defaultValue": "keep-error-column",
246+
"selectChoices":[
247+
{"value": "ignore", "label": "Ignore the error"},
248+
{"value": "add-error-column", "label": "Add an error column"},
249+
{"value": "keep-error-column", "label": "Error column always in schema"},
250+
{"value": "raise", "label": "Fail the job"}
251+
]
252+
},
240253
{
241254
"name": "ignore_ssl_check",
242255
"label": "Ignore SSL check",

custom-recipes/api-connect/recipe.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@
55
from safe_logger import SafeLogger
66
from dku_utils import get_dku_key_values, get_endpoint_parameters
77
from rest_api_recipe_session import RestApiRecipeSession
8+
from dku_constants import DKUConstants
89

910

10-
logger = SafeLogger("api-connect plugin", forbiden_keys=["token", "password"])
11+
logger = SafeLogger("api-connect plugin", forbidden_keys=DKUConstants.FORBIDDEN_KEYS)
1112

1213

1314
def get_partitioning_keys(id_list, dku_flow_variables):
@@ -25,7 +26,7 @@ def get_partitioning_keys(id_list, dku_flow_variables):
2526
return partitioning_keys
2627

2728

28-
logger.info('API-Connect plugin recipe v1.1.3')
29+
logger.info('API-Connect plugin recipe v{}'.format(DKUConstants.PLUGIN_VERSION))
2930

3031
input_A_names = get_input_names_for_role('input_A_role')
3132
config = get_recipe_config()
@@ -34,6 +35,7 @@ def get_partitioning_keys(id_list, dku_flow_variables):
3435
logger.info("config={}".format(logger.filter_secrets(config)))
3536

3637
credential_parameters = config.get("credential", {})
38+
behaviour_when_error = config.get("behaviour_when_error", "add-error-column")
3739
endpoint_parameters = get_endpoint_parameters(config)
3840
extraction_key = endpoint_parameters.get("extraction_key", "")
3941
is_raw_output = endpoint_parameters.get("raw_output", True)
@@ -57,7 +59,8 @@ def get_partitioning_keys(id_list, dku_flow_variables):
5759
parameter_columns,
5860
parameter_renamings,
5961
display_metadata,
60-
maximum_number_rows=maximum_number_rows
62+
maximum_number_rows=maximum_number_rows,
63+
behaviour_when_error=behaviour_when_error
6164
)
6265
results = recipe_session.process_dataframe(input_parameters_dataframe, is_raw_output)
6366

plugin.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"id": "api-connect",
3-
"version": "1.1.3",
3+
"version": "1.2.0",
44
"meta": {
55
"label": "API Connect",
66
"description": "Retrieve data from any REST API",

python-connectors/api-connect_dataset/connector.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,14 @@
77
import json
88

99

10-
logger = SafeLogger("api-connect plugin", forbiden_keys=["token", "password"])
10+
logger = SafeLogger("api-connect plugin", forbidden_keys=DKUConstants.FORBIDDEN_KEYS)
1111

1212

1313
class RestAPIConnector(Connector):
1414

1515
def __init__(self, config, plugin_config):
1616
Connector.__init__(self, config, plugin_config) # pass the parameters to the base class
17-
logger.info('API-Connect plugin connector v1.1.3')
17+
logger.info('API-Connect plugin connector v{}'.format(DKUConstants.PLUGIN_VERSION))
1818
logger.info("config={}".format(logger.filter_secrets(config)))
1919
endpoint_parameters = get_endpoint_parameters(config)
2020
credential = config.get("credential", {})

python-lib/dku_constants.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
class DKUConstants(object):
22
API_RESPONSE_KEY = "api_response"
3-
RAW_BODY_FORMAT = "RAW"
3+
FORBIDDEN_KEYS = ["token", "password", "api_key_value"]
44
FORM_DATA_BODY_FORMAT = "FORM_DATA"
5+
PLUGIN_VERSION = "1.2.0"
6+
RAW_BODY_FORMAT = "RAW"
7+
REPONSE_ERROR_KEY = "dku_error"

python-lib/rest_api_client.py

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from dku_constants import DKUConstants
99

1010

11-
logger = SafeLogger("api-connect plugin", forbiden_keys=["token", "password"])
11+
logger = SafeLogger("api-connect plugin", forbidden_keys=DKUConstants.FORBIDDEN_KEYS)
1212

1313

1414
class RestAPIClientError(ValueError):
@@ -17,7 +17,7 @@ class RestAPIClientError(ValueError):
1717

1818
class RestAPIClient(object):
1919

20-
def __init__(self, credential, endpoint, custom_key_values={}):
20+
def __init__(self, credential, endpoint, custom_key_values={}, session=None, behaviour_when_error=None):
2121
logger.info("Initialising RestAPIClient, credential={}, endpoint={}".format(logger.filter_secrets(credential), endpoint))
2222

2323
# presets_variables contains all variables available in templates using the {{variable_name}} notation
@@ -57,6 +57,7 @@ def __init__(self, credential, endpoint, custom_key_values={}):
5757
self.timeout = endpoint.get("timeout", -1)
5858
if self.timeout > 0:
5959
self.requests_kwargs.update({"timeout": self.timeout})
60+
self.behaviour_when_error = behaviour_when_error or "add-error-column"
6061

6162
self.requests_kwargs.update({"params": self.params})
6263
self.pagination = Pagination()
@@ -90,7 +91,10 @@ def __init__(self, credential, endpoint, custom_key_values={}):
9091
key_value_body = endpoint.get("key_value_body", {})
9192
self.requests_kwargs.update({"json": get_dku_key_values(key_value_body)})
9293
self.metadata = {}
94+
if self.behaviour_when_error == "keep-error-column":
95+
self.metadata = {DKUConstants.REPONSE_ERROR_KEY: None}
9396
self.call_number = 0
97+
self.session = session or requests.Session()
9498

9599
def set_login(self, credential):
96100
login_type = credential.get("login_type", "no_auth")
@@ -131,26 +135,34 @@ def request(self, method, url, can_raise_exeption=True, **kwargs):
131135
raise RestAPIClientError("The api-connect plugin is stuck in a loop. Please check the pagination parameters.")
132136
request_start_time = time.time()
133137
self.time_last_request = request_start_time
138+
error_message = None
139+
status_code = None
140+
response_headers = None
134141
try:
135142
response = self.request_with_redirect_retry(method, url, **kwargs)
136-
request_finish_time = time.time()
143+
status_code = response.status_code
144+
response_headers = response.headers
137145
except Exception as err:
138146
self.pagination.is_last_batch_empty = True
139147
error_message = "Error: {}".format(err)
140148
if can_raise_exeption:
141149
raise RestAPIClientError(error_message)
142-
else:
143-
return {"error": error_message}
150+
151+
request_finish_time = time.time()
144152
self.set_metadata("request_duration", request_finish_time - request_start_time)
145-
self.set_metadata("status_code", response.status_code)
146-
self.set_metadata("response_headers", "{}".format(response.headers))
153+
self.set_metadata("status_code", status_code)
154+
self.set_metadata("response_headers", "{}".format(response_headers))
155+
156+
if error_message:
157+
return {} if self.behaviour_when_error=="ignore" else {DKUConstants.REPONSE_ERROR_KEY: error_message}
158+
147159
if response.status_code >= 400:
148160
error_message = "Error {}: {}".format(response.status_code, response.content)
149161
self.pagination.is_last_batch_empty = True
150162
if can_raise_exeption:
151163
raise RestAPIClientError(error_message)
152164
else:
153-
return {"error": error_message}
165+
return {} if self.behaviour_when_error=="ignore" else {DKUConstants.REPONSE_ERROR_KEY: error_message}
154166
if response.status_code in [204]:
155167
self.pagination.update_next_page({}, response.links)
156168
return self.empty_json_response()
@@ -163,20 +175,20 @@ def request(self, method, url, can_raise_exeption=True, **kwargs):
163175
logger.error("response.content={}".format(response.content))
164176
if can_raise_exeption:
165177
raise RestAPIClientError("The API did not return JSON as expected. {}".format(error_message))
166-
return {"error": error_message}
178+
return {} if self.behaviour_when_error=="ignore" else {DKUConstants.REPONSE_ERROR_KEY: error_message}
167179

168180
self.pagination.update_next_page(json_response, response.links)
169181
return json_response
170182

171183
def request_with_redirect_retry(self, method, url, **kwargs):
172184
# In case of redirection to another domain, the authorization header is not kept
173185
# If redirect_auth_header is true, another attempt is made with initial headers to the redirected url
174-
response = requests.request(method, url, **kwargs)
186+
response = self.session.request(method, url, **kwargs)
175187
if self.redirect_auth_header and not response.url.startswith(url):
176188
redirection_kwargs = copy.deepcopy(kwargs)
177189
redirection_kwargs.pop("params", None) # params are contained in the redirected url
178190
logger.warning("Redirection ! Accessing endpoint {} with initial authorization headers".format(response.url))
179-
response = requests.request(method, response.url, **redirection_kwargs)
191+
response = self.session.request(method, response.url, **redirection_kwargs)
180192
return response
181193

182194
def paginated_api_call(self, can_raise_exeption=True):

python-lib/rest_api_recipe_session.py

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,16 @@
55
from dku_constants import DKUConstants
66
import copy
77
import json
8+
import requests
89

9-
logger = SafeLogger("api-connect plugin", forbiden_keys=["token", "password"])
10+
11+
logger = SafeLogger("api-connect plugin", forbidden_keys=DKUConstants.FORBIDDEN_KEYS)
1012

1113

1214
class RestApiRecipeSession:
1315
def __init__(self, custom_key_values, credential_parameters, endpoint_parameters, extraction_key, parameter_columns, parameter_renamings,
1416
display_metadata=False,
15-
maximum_number_rows=-1):
17+
maximum_number_rows=-1, behaviour_when_error=None):
1618
self.custom_key_values = custom_key_values
1719
self.credential_parameters = credential_parameters
1820
self.endpoint_parameters = endpoint_parameters
@@ -23,7 +25,8 @@ def __init__(self, custom_key_values, credential_parameters, endpoint_parameters
2325
self.display_metadata = display_metadata
2426
self.maximum_number_rows = maximum_number_rows
2527
self.is_row_limit = (self.maximum_number_rows > 0)
26-
self.can_raise = False
28+
self.behaviour_when_error = behaviour_when_error or "add-error-column"
29+
self.can_raise = self.behaviour_when_error == "raise"
2730

2831
@staticmethod
2932
def get_column_to_parameter_dict(parameter_columns, parameter_renamings):
@@ -38,6 +41,7 @@ def get_column_to_parameter_dict(parameter_columns, parameter_renamings):
3841
def process_dataframe(self, input_parameters_dataframe, is_raw_output):
3942
results = []
4043
time_last_request = None
44+
session = requests.Session()
4145
for index, input_parameters_row in input_parameters_dataframe.iterrows():
4246
rows_count = 0
4347
self.initial_parameter_columns = {}
@@ -52,7 +56,13 @@ def process_dataframe(self, input_parameters_dataframe, is_raw_output):
5256
updated_endpoint_parameters,
5357
self.custom_key_values
5458
))
55-
self.client = RestAPIClient(self.credential_parameters, updated_endpoint_parameters, custom_key_values=self.custom_key_values)
59+
self.client = RestAPIClient(
60+
self.credential_parameters,
61+
updated_endpoint_parameters,
62+
custom_key_values=self.custom_key_values,
63+
session=session,
64+
behaviour_when_error=self.behaviour_when_error
65+
)
5666
self.client.time_last_request = time_last_request
5767
while self.client.has_more_data():
5868
page_results = self.retrieve_next_page(is_raw_output)
@@ -66,17 +76,24 @@ def process_dataframe(self, input_parameters_dataframe, is_raw_output):
6676
def retrieve_next_page(self, is_raw_output):
6777
page_rows = []
6878
logger.info("retrieve_next_page: Calling next page")
69-
json_response = self.client.paginated_api_call(can_raise_exeption=False)
70-
metadata = self.client.get_metadata() if self.display_metadata else {}
79+
json_response = self.client.paginated_api_call(can_raise_exeption=self.can_raise)
80+
default_dict = {
81+
DKUConstants.REPONSE_ERROR_KEY: json_response.get(DKUConstants.REPONSE_ERROR_KEY, None)
82+
} if self.behaviour_when_error == "keep-error-column" else {}
83+
metadata = self.client.get_metadata() if self.display_metadata else default_dict
7184
is_api_returning_dict = True
7285
if self.extraction_key:
7386
data_rows = get_value_from_path(json_response, self.extraction_key.split("."), can_raise=False)
74-
if data_rows is None:
87+
if data_rows is None or type(data_rows) != list:
88+
if self.behaviour_when_error == "ignore":
89+
return []
7590
error_message = "Extraction key '{}' was not found in the incoming data".format(self.extraction_key)
7691
if self.can_raise:
7792
raise DataikuException(error_message)
93+
elif DKUConstants.REPONSE_ERROR_KEY in metadata:
94+
return [metadata]
7895
else:
79-
return [{"error": error_message}]
96+
return self.format_page_rows([{DKUConstants.REPONSE_ERROR_KEY: error_message}], is_raw_output, metadata)
8097
page_rows.extend(self.format_page_rows(data_rows, is_raw_output, metadata))
8198
else:
8299
# Todo: check api_response key is free and add something overwise
@@ -112,6 +129,9 @@ def format_page_rows(self, data_rows, is_raw_output, metadata=None):
112129
base_row.update(metadata)
113130
if is_raw_output:
114131
if is_error_message(data_row):
132+
base_row.update({
133+
DKUConstants.API_RESPONSE_KEY: None
134+
})
115135
base_row.update(parse_keys_for_json(data_row))
116136
else:
117137
base_row.update({
@@ -126,7 +146,7 @@ def format_page_rows(self, data_rows, is_raw_output, metadata=None):
126146
def is_error_message(jsons_response):
127147
if type(jsons_response) not in [dict, list]:
128148
return False
129-
if "error" in jsons_response and len(jsons_response) == 1:
149+
if DKUConstants.REPONSE_ERROR_KEY in jsons_response and len(jsons_response) == 1:
130150
return True
131151
else:
132152
return False

python-lib/safe_logger.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@
33

44

55
class SafeLogger(object):
6-
def __init__(self, name, forbiden_keys=None):
6+
def __init__(self, name, forbidden_keys=None):
77
self.name = name
88
self.logger = logging.getLogger(self.name)
99
logging.basicConfig(
1010
level=logging.INFO,
1111
format='{} %(levelname)s - %(message)s'.format(self.name)
1212
)
13-
self.forbiden_keys = forbiden_keys
13+
self.forbidden_keys = forbidden_keys
1414

1515
def info(self, message):
1616
self.logger.info(message)
@@ -33,7 +33,7 @@ def dig_secrets(self, dictionary):
3333
for key in dictionary:
3434
if isinstance(dictionary[key], dict):
3535
dictionary[key] = self.filter_secrets(dictionary[key])
36-
if key in self.forbiden_keys:
36+
if key in self.forbidden_keys:
3737
dictionary[key] = hash(dictionary[key])
3838
return dictionary
3939

0 commit comments

Comments
 (0)