Skip to content

Commit 387bb1c

Browse files
Merge pull request #1344 from cloudsufi/patch-BQ-Execute-label-support
[PLUGIN-1729] Added BQ Execute Action job label support
2 parents 8e43f42 + 4ed1e8c commit 387bb1c

8 files changed

Lines changed: 350 additions & 289 deletions

File tree

docs/BigQueryExecute-action.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,14 @@ write BigQuery data to this project.
3030

3131
**SQL**: SQL command to execute.
3232

33+
**BQ Job Labels:** Key value pairs to be added as labels to the BigQuery job. Keys must be unique. (Macro Enabled)
34+
35+
[job_source, type] are system defined labels used by CDAP for internal purpose and cannot be used as label keys.
36+
Macro format is supported. example `key1:val1,key2:val2`
37+
38+
Keys and values can contain only lowercase letters, numeric characters, underscores, and dashes.
39+
For more information about labels, see [Docs](https://cloud.google.com/bigquery/docs/labels-intro#requirements).
40+
3341
**Dialect**: Dialect of the SQL command. The value must be 'legacy' or 'standard'. If set to 'standard',
3442
the query will use BigQuery's standard SQL: https://cloud.google.com/bigquery/sql-reference/.
3543
If set to 'legacy', BigQuery's legacy SQL dialect will be used for this query.

src/main/java/io/cdap/plugin/gcp/bigquery/action/BigQueryExecute.java

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ public void run(ActionContext context) throws Exception {
125125
}
126126

127127
// Add labels for the BigQuery Execute job.
128-
builder.setLabels(BigQueryUtil.getJobLabels(BigQueryUtil.BQ_JOB_TYPE_EXECUTE_TAG));
128+
builder.setLabels(BigQueryUtil.getJobLabels(BigQueryUtil.BQ_JOB_TYPE_EXECUTE_TAG, config.getJobLabelKeyValue()));
129129

130130
QueryJobConfiguration queryConfig = builder.build();
131131

@@ -205,6 +205,7 @@ public static final class Config extends AbstractBigQueryActionConfig {
205205
private static final String DATASET = "dataset";
206206
private static final String TABLE = "table";
207207
private static final String NAME_LOCATION = "location";
208+
public static final String NAME_BQ_JOB_LABELS = "jobLabels";
208209
private static final int ERROR_CODE_NOT_FOUND = 404;
209210
private static final String STORE_RESULTS = "storeResults";
210211

@@ -272,10 +273,17 @@ public static final class Config extends AbstractBigQueryActionConfig {
272273
@Description("Whether to store results in a BigQuery Table.")
273274
private Boolean storeResults;
274275

276+
@Name(NAME_BQ_JOB_LABELS)
277+
@Macro
278+
@Nullable
279+
@Description("Key value pairs to be added as labels to the BigQuery job. Keys must be unique. [job_source, type] " +
280+
"are reserved keys and cannot be used as label keys.")
281+
protected String jobLabelKeyValue;
282+
275283
private Config(@Nullable String project, @Nullable String serviceAccountType, @Nullable String serviceFilePath,
276284
@Nullable String serviceAccountJson, @Nullable String dataset, @Nullable String table,
277285
@Nullable String location, @Nullable String cmekKey, @Nullable String dialect, @Nullable String sql,
278-
@Nullable String mode, @Nullable Boolean storeResults) {
286+
@Nullable String mode, @Nullable Boolean storeResults, @Nullable String jobLabelKeyValue) {
279287
this.project = project;
280288
this.serviceAccountType = serviceAccountType;
281289
this.serviceFilePath = serviceFilePath;
@@ -288,6 +296,7 @@ private Config(@Nullable String project, @Nullable String serviceAccountType, @N
288296
this.sql = sql;
289297
this.mode = mode;
290298
this.storeResults = storeResults;
299+
this.jobLabelKeyValue = jobLabelKeyValue;
291300
}
292301

293302
public boolean isLegacySQL() {
@@ -328,6 +337,11 @@ public String getTable() {
328337
return table;
329338
}
330339

340+
@Nullable
341+
public String getJobLabelKeyValue() {
342+
return jobLabelKeyValue;
343+
}
344+
331345
@Override
332346
public void validate(FailureCollector failureCollector) {
333347
validate(failureCollector, Collections.emptyMap());
@@ -376,9 +390,17 @@ public void validate(FailureCollector failureCollector, Map<String, String> argu
376390
validateCmekKey(failureCollector, arguments);
377391
}
378392

393+
if (!containsMacro(NAME_BQ_JOB_LABELS)) {
394+
validateJobLabelKeyValue(failureCollector);
395+
}
396+
379397
failureCollector.getOrThrowException();
380398
}
381399

400+
void validateJobLabelKeyValue(FailureCollector failureCollector) {
401+
BigQueryUtil.validateJobLabelKeyValue(jobLabelKeyValue, failureCollector, NAME_BQ_JOB_LABELS);
402+
}
403+
382404
void validateCmekKey(FailureCollector failureCollector, Map<String, String> arguments) {
383405
CryptoKeyName cmekKeyName = CmekUtils.getCmekKey(cmekKey, arguments, failureCollector);
384406
//these fields are needed to check if bucket exists or not and for location validation
@@ -449,6 +471,7 @@ public static class Builder {
449471
private String sql;
450472
private String mode;
451473
private Boolean storeResults;
474+
private String jobLabelKeyValue;
452475

453476
public Builder setProject(@Nullable String project) {
454477
this.project = project;
@@ -505,6 +528,11 @@ public Builder setSql(@Nullable String sql) {
505528
return this;
506529
}
507530

531+
public Builder setJobLabelKeyValue(@Nullable String jobLabelKeyValue) {
532+
this.jobLabelKeyValue = jobLabelKeyValue;
533+
return this;
534+
}
535+
508536
public Config build() {
509537
return new Config(
510538
project,
@@ -518,7 +546,8 @@ public Config build() {
518546
dialect,
519547
sql,
520548
mode,
521-
storeResults
549+
storeResults,
550+
jobLabelKeyValue
522551
);
523552
}
524553

src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySinkConfig.java

Lines changed: 1 addition & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -187,106 +187,8 @@ void validateCmekKey(FailureCollector failureCollector, Map<String, String> argu
187187
validateCmekKeyLocation(cmekKeyName, null, location, failureCollector);
188188
}
189189

190-
/**
191-
* Validates job label key value pairs, as per the following rules:
192-
* Keys and values can contain only lowercase letters, numeric characters, underscores, and dashes.
193-
* Defined in the following link:
194-
* <a href="https://cloud.google.com/bigquery/docs/labels-intro#requirements">Docs</a>
195-
* @param failureCollector failure collector
196-
*/
197190
void validateJobLabelKeyValue(FailureCollector failureCollector) {
198-
Set<String> reservedKeys = BigQueryUtil.BQ_JOB_LABEL_SYSTEM_KEYS;
199-
int maxLabels = 64 - reservedKeys.size();
200-
int maxKeyLength = 63;
201-
int maxValueLength = 63;
202-
203-
String validLabelKeyRegex = "^[\\p{L}][a-z0-9-_\\p{L}]+$";
204-
String validLabelValueRegex = "^[a-z0-9-_\\p{L}]+$";
205-
String capitalLetterRegex = ".*[A-Z].*";
206-
String labelKeyValue = getJobLabelKeyValue();
207-
208-
if (Strings.isNullOrEmpty(labelKeyValue)) {
209-
return;
210-
}
211-
212-
String[] keyValuePairs = labelKeyValue.split(",");
213-
Set<String> uniqueKeys = new HashSet<>();
214-
215-
for (String keyValuePair : keyValuePairs) {
216-
217-
// Adding a label without a value is valid behavior
218-
// Read more here: https://cloud.google.com/bigquery/docs/adding-labels#adding_a_label_without_a_value
219-
String[] keyValue = keyValuePair.trim().split(":");
220-
boolean isKeyPresent = keyValue.length == 1 || keyValue.length == 2;
221-
boolean isValuePresent = keyValue.length == 2;
222-
223-
224-
if (!isKeyPresent) {
225-
failureCollector.addFailure(String.format("Invalid job label key value pair '%s'.", keyValuePair),
226-
"Job label key value pair should be in the format 'key:value'.")
227-
.withConfigProperty(NAME_BQ_JOB_LABELS);
228-
continue;
229-
}
230-
231-
// Check if key is reserved
232-
if (reservedKeys.contains(keyValue[0])) {
233-
failureCollector.addFailure(String.format("Invalid job label key '%s'.", keyValue[0]),
234-
"A system label already exists with same name.").withConfigProperty(NAME_BQ_JOB_LABELS);
235-
continue;
236-
}
237-
238-
String key = keyValue[0];
239-
String value = isValuePresent ? keyValue[1] : "";
240-
boolean isKeyValid = true;
241-
boolean isValueValid = true;
242-
243-
// Key cannot be empty
244-
if (Strings.isNullOrEmpty(key)) {
245-
failureCollector.addFailure(String.format("Invalid job label key '%s'.", key),
246-
"Job label key cannot be empty.").withConfigProperty(NAME_BQ_JOB_LABELS);
247-
isKeyValid = false;
248-
}
249-
250-
// Key cannot be longer than 63 characters
251-
if (key.length() > maxKeyLength) {
252-
failureCollector.addFailure(String.format("Invalid job label key '%s'.", key),
253-
"Job label key cannot be longer than 63 characters.").withConfigProperty(NAME_BQ_JOB_LABELS);
254-
isKeyValid = false;
255-
}
256-
257-
// Value cannot be longer than 63 characters
258-
if (value.length() > maxValueLength) {
259-
failureCollector.addFailure(String.format("Invalid job label value '%s'.", value),
260-
"Job label value cannot be longer than 63 characters.").withConfigProperty(NAME_BQ_JOB_LABELS);
261-
isValueValid = false;
262-
}
263-
264-
if (isKeyValid && (!key.matches(validLabelKeyRegex) || key.matches(capitalLetterRegex))) {
265-
failureCollector.addFailure(String.format("Invalid job label key '%s'.", key),
266-
"Job label key can only contain lowercase letters, numeric characters, " +
267-
"underscores, and dashes. Check docs for more details.")
268-
.withConfigProperty(NAME_BQ_JOB_LABELS);
269-
isKeyValid = false;
270-
}
271-
272-
if (isValuePresent && isValueValid &&
273-
(!value.matches(validLabelValueRegex) || value.matches(capitalLetterRegex))) {
274-
failureCollector.addFailure(String.format("Invalid job label value '%s'.", value),
275-
"Job label value can only contain lowercase letters, numeric characters, " +
276-
"underscores, and dashes.").withConfigProperty(NAME_BQ_JOB_LABELS);
277-
}
278-
279-
if (isKeyValid && !uniqueKeys.add(key)) {
280-
failureCollector.addFailure(String.format("Duplicate job label key '%s'.", key),
281-
"Job label key should be unique.").withConfigProperty(NAME_BQ_JOB_LABELS);
282-
}
283-
}
284-
// Check if number of labels is greater than 64 - reserved keys
285-
if (uniqueKeys.size() > maxLabels) {
286-
failureCollector.addFailure("Number of job labels exceeds the limit.",
287-
String.format("Number of job labels cannot be greater than %d.", maxLabels))
288-
.withConfigProperty(NAME_BQ_JOB_LABELS);
289-
}
191+
BigQueryUtil.validateJobLabelKeyValue(jobLabelKeyValue, failureCollector, NAME_BQ_JOB_LABELS);
290192
}
291193

292194
public String getDatasetProject() {

src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import io.cdap.cdap.etl.api.validation.InvalidConfigPropertyException;
4040
import io.cdap.cdap.etl.api.validation.InvalidStageException;
4141
import io.cdap.cdap.etl.api.validation.ValidationFailure;
42+
import io.cdap.plugin.gcp.bigquery.sink.AbstractBigQuerySinkConfig;
4243
import io.cdap.plugin.gcp.bigquery.sink.BigQuerySink;
4344
import io.cdap.plugin.gcp.bigquery.source.BigQuerySource;
4445
import io.cdap.plugin.gcp.bigquery.source.BigQuerySourceConfig;
@@ -60,6 +61,7 @@
6061
import java.io.IOException;
6162
import java.util.ArrayList;
6263
import java.util.HashMap;
64+
import java.util.HashSet;
6365
import java.util.List;
6466
import java.util.Map;
6567
import java.util.Set;
@@ -923,4 +925,106 @@ public static String getStagingBucketName(Map<String, String> arguments, @Nullab
923925
}
924926
return bucket;
925927
}
928+
929+
/**
930+
* Validates job label key value pairs, as per the following rules:
931+
* Keys and values can contain only lowercase letters, numeric characters, underscores, and dashes.
932+
* Defined in the following link:
933+
* <a href="https://cloud.google.com/bigquery/docs/labels-intro#requirements">Docs</a>
934+
* @param failureCollector failure collector
935+
*/
936+
public static void validateJobLabelKeyValue(String labelKeyValue, FailureCollector failureCollector,
937+
String stageConfigProperty) {
938+
Set<String> reservedKeys = BQ_JOB_LABEL_SYSTEM_KEYS;
939+
int maxLabels = 64 - reservedKeys.size();
940+
int maxKeyLength = 63;
941+
int maxValueLength = 63;
942+
943+
String validLabelKeyRegex = "^[\\p{L}][a-z0-9-_\\p{L}]+$";
944+
String validLabelValueRegex = "^[a-z0-9-_\\p{L}]+$";
945+
String capitalLetterRegex = ".*[A-Z].*";
946+
947+
if (com.google.api.client.util.Strings.isNullOrEmpty(labelKeyValue)) {
948+
return;
949+
}
950+
951+
String[] keyValuePairs = labelKeyValue.split(",");
952+
Set<String> uniqueKeys = new HashSet<>();
953+
954+
for (String keyValuePair : keyValuePairs) {
955+
956+
// Adding a label without a value is valid behavior
957+
// Read more here: https://cloud.google.com/bigquery/docs/adding-labels#adding_a_label_without_a_value
958+
String[] keyValue = keyValuePair.trim().split(":");
959+
boolean isKeyPresent = keyValue.length == 1 || keyValue.length == 2;
960+
boolean isValuePresent = keyValue.length == 2;
961+
962+
963+
if (!isKeyPresent) {
964+
failureCollector.addFailure(String.format("Invalid job label key value pair '%s'.", keyValuePair),
965+
"Job label key value pair should be in the format 'key:value'.")
966+
.withConfigProperty(stageConfigProperty);
967+
continue;
968+
}
969+
970+
// Check if key is reserved
971+
if (reservedKeys.contains(keyValue[0])) {
972+
failureCollector.addFailure(String.format("Invalid job label key '%s'.", keyValue[0]),
973+
"A system label already exists with same name.").withConfigProperty(stageConfigProperty);
974+
continue;
975+
}
976+
977+
String key = keyValue[0];
978+
String value = isValuePresent ? keyValue[1] : "";
979+
boolean isKeyValid = true;
980+
boolean isValueValid = true;
981+
982+
// Key cannot be empty
983+
if (com.google.api.client.util.Strings.isNullOrEmpty(key)) {
984+
failureCollector.addFailure(String.format("Invalid job label key '%s'.", key),
985+
"Job label key cannot be empty.").withConfigProperty(stageConfigProperty);
986+
isKeyValid = false;
987+
}
988+
989+
// Key cannot be longer than 63 characters
990+
if (key.length() > maxKeyLength) {
991+
failureCollector.addFailure(String.format("Invalid job label key '%s'.", key),
992+
"Job label key cannot be longer than 63 characters.").withConfigProperty(stageConfigProperty);
993+
isKeyValid = false;
994+
}
995+
996+
// Value cannot be longer than 63 characters
997+
if (value.length() > maxValueLength) {
998+
failureCollector.addFailure(String.format("Invalid job label value '%s'.", value),
999+
"Job label value cannot be longer than 63 characters.").withConfigProperty(stageConfigProperty);
1000+
isValueValid = false;
1001+
}
1002+
1003+
if (isKeyValid && (!key.matches(validLabelKeyRegex) || key.matches(capitalLetterRegex))) {
1004+
failureCollector.addFailure(String.format("Invalid job label key '%s'.", key),
1005+
"Job label key can only contain lowercase letters, numeric characters, " +
1006+
"underscores, and dashes. Check docs for more details.")
1007+
.withConfigProperty(stageConfigProperty);
1008+
isKeyValid = false;
1009+
}
1010+
1011+
if (isValuePresent && isValueValid &&
1012+
(!value.matches(validLabelValueRegex) || value.matches(capitalLetterRegex))) {
1013+
failureCollector.addFailure(String.format("Invalid job label value '%s'.", value),
1014+
"Job label value can only contain lowercase letters, numeric characters, " +
1015+
"underscores, and dashes.").withConfigProperty(stageConfigProperty);
1016+
}
1017+
1018+
if (isKeyValid && !uniqueKeys.add(key)) {
1019+
failureCollector.addFailure(String.format("Duplicate job label key '%s'.", key),
1020+
"Job label key should be unique.").withConfigProperty(stageConfigProperty);
1021+
}
1022+
}
1023+
// Check if number of labels is greater than 64 - reserved keys
1024+
if (uniqueKeys.size() > maxLabels) {
1025+
failureCollector.addFailure("Number of job labels exceeds the limit.",
1026+
String.format("Number of job labels cannot be greater than %d.", maxLabels))
1027+
.withConfigProperty(stageConfigProperty);
1028+
}
1029+
}
9261030
}

0 commit comments

Comments
 (0)