2222import io .cdap .cdap .api .data .format .StructuredRecord ;
2323import io .cdap .cdap .api .data .schema .Schema ;
2424import io .cdap .plugin .http .common .RetryPolicy ;
25+ import io .cdap .plugin .http .common .error .ErrorHandling ;
2526import io .cdap .plugin .http .common .error .HttpErrorHandler ;
2627import io .cdap .plugin .http .common .error .RetryableErrorHandling ;
2728import io .cdap .plugin .http .common .http .HttpRequest ;
29+ import io .cdap .plugin .http .common .http .HttpResponse ;
2830import io .cdap .plugin .http .common .http .OAuthUtil ;
2931
3032import org .apache .hadoop .mapreduce .RecordWriter ;
@@ -83,6 +85,7 @@ public class HTTPRecordWriter extends RecordWriter<StructuredRecord, StructuredR
8385 public static final String REQUEST_METHOD_POST = "POST" ;
8486 public static final String REQUEST_METHOD_PUT = "PUT" ;
8587 public static final String REQUEST_METHOD_DELETE = "DELETE" ;
88+ public static final String REQUEST_METHOD_PATCH = "PATCH" ;
8689
8790 private final HTTPSinkConfig config ;
8891 private final MessageBuffer messageBuffer ;
@@ -96,6 +99,7 @@ public class HTTPRecordWriter extends RecordWriter<StructuredRecord, StructuredR
9699 private final HttpErrorHandler httpErrorHandler ;
97100 private final PollInterval pollInterval ;
98101 private int httpStatusCode ;
102+ private String httpResponseBody ;
99103 private static int retryCount ;
100104
101105 HTTPRecordWriter (HTTPSinkConfig config , Schema inputSchema ) {
@@ -120,11 +124,13 @@ public class HTTPRecordWriter extends RecordWriter<StructuredRecord, StructuredR
120124 @ Override
121125 public void write (StructuredRecord input , StructuredRecord unused ) throws IOException {
122126 configURL = url ;
123- if (config .getMethod ().equals (REQUEST_METHOD_POST ) || config .getMethod ().equals (REQUEST_METHOD_PUT )) {
127+ if (config .getMethod ().equals (REQUEST_METHOD_POST ) || config .getMethod ().equals (REQUEST_METHOD_PUT ) ||
128+ config .getMethod ().equals (REQUEST_METHOD_PATCH )) {
124129 messageBuffer .add (input );
125130 }
126131
127- if (config .getMethod ().equals (REQUEST_METHOD_PUT ) || config .getMethod ().equals (REQUEST_METHOD_DELETE )
132+ if (config .getMethod ().equals (REQUEST_METHOD_PUT ) || config .getMethod ().equals (REQUEST_METHOD_PATCH ) ||
133+ config .getMethod ().equals (REQUEST_METHOD_DELETE )
128134 && !placeHolderList .isEmpty ()) {
129135 configURL = updateURLWithPlaceholderValue (input );
130136 }
@@ -200,9 +206,9 @@ private boolean executeHTTPServiceAndCheckStatusCode() throws IOException {
200206 request .setHeaders (getRequestHeaders ());
201207
202208 response = httpClient .execute (request );
203-
204209 httpStatusCode = response .getStatusLine ().getStatusCode ();
205210 LOG .debug ("Response HTTP Status code: {}" , httpStatusCode );
211+ httpResponseBody = new HttpResponse (response ).getBody ();
206212
207213 } catch (MalformedURLException | ProtocolException e ) {
208214 throw new IllegalStateException ("Error opening url connection. Reason: " + e .getMessage (), e );
@@ -277,7 +283,9 @@ private Header[] getRequestHeaders() throws IOException {
277283 headers .put ("Instance-Follow-Redirects" , String .valueOf (config .getFollowRedirects ()));
278284 headers .put ("charset" , config .getCharset ());
279285
280- if (config .getMethod ().equals (REQUEST_METHOD_POST ) || config .getMethod ().equals (REQUEST_METHOD_PUT )) {
286+ if (config .getMethod ().equals (REQUEST_METHOD_POST )
287+ || config .getMethod ().equals (REQUEST_METHOD_PATCH )
288+ || config .getMethod ().equals (REQUEST_METHOD_PUT )) {
281289 if (!headers .containsKey ("Content-Type" )) {
282290 headers .put ("Content-Type" , contentType );
283291 }
@@ -302,7 +310,8 @@ private Header getAuthorizationHeader(AccessToken accessToken) {
302310 */
303311 private List <PlaceholderBean > getPlaceholderListFromURL () {
304312 List <PlaceholderBean > placeholderList = new ArrayList <>();
305- if (!(config .getMethod ().equals (REQUEST_METHOD_PUT ) || config .getMethod ().equals (REQUEST_METHOD_DELETE ))) {
313+ if (!(config .getMethod ().equals (REQUEST_METHOD_PUT ) || config .getMethod ().equals (REQUEST_METHOD_PATCH ) ||
314+ config .getMethod ().equals (REQUEST_METHOD_DELETE ))) {
306315 return placeholderList ;
307316 }
308317 Pattern pattern = Pattern .compile (REGEX_HASHED_VAR );
@@ -351,6 +360,25 @@ private void flushMessageBuffer() {
351360 "after the batch execution. " + e );
352361 }
353362 messageBuffer .clear ();
363+
364+ ErrorHandling postRetryStrategy = httpErrorHandler .getErrorHandlingStrategy (httpStatusCode )
365+ .getAfterRetryStrategy ();
366+
367+ switch (postRetryStrategy ) {
368+ case SUCCESS :
369+ break ;
370+ case STOP :
371+ throw new IllegalStateException (String .format ("Fetching from url '%s' returned status code '%d' and body '%s'" ,
372+ config .getUrl (), httpStatusCode , httpResponseBody ));
373+ case SKIP :
374+ case SEND :
375+ LOG .warn (String .format ("Fetching from url '%s' returned status code '%d' and body '%s'" ,
376+ config .getUrl (), httpStatusCode , httpResponseBody ));
377+ break ;
378+ default :
379+ throw new IllegalArgumentException (String .format ("Unexpected http error handling: '%s'" , postRetryStrategy ));
380+ }
381+
354382 }
355383
356384}
0 commit comments