@@ -232,6 +232,7 @@ public void commitJob(JobContext jobContext) throws IOException {
232232 allowSchemaRelaxationOnEmptyOutput =
233233 conf .getBoolean (BigQueryConstants .CONFIG_ALLOW_SCHEMA_RELAXATION_ON_EMPTY_OUTPUT , false );
234234 LOG .debug ("Allow schema relaxation: '{}'" , allowSchemaRelaxation );
235+ String jobLabelKeyValue = conf .get (BigQueryConstants .CONFIG_JOB_LABEL_KEY_VALUE , null );
235236 PartitionType partitionType = conf .getEnum (BigQueryConstants .CONFIG_PARTITION_TYPE , PartitionType .NONE );
236237 LOG .debug ("Create Partitioned Table type: '{}'" , partitionType );
237238 com .google .cloud .bigquery .TimePartitioning .Type timePartitioningType = conf .getEnum (
@@ -263,7 +264,7 @@ public void commitJob(JobContext jobContext) throws IOException {
263264 try {
264265 importFromGcs (destProjectId , destTable , destSchema .orElse (null ), kmsKeyName , outputFileFormat ,
265266 writeDisposition , sourceUris , partitionType , timePartitioningType , range , partitionByField ,
266- requirePartitionFilter , clusteringOrderList , tableExists , conf );
267+ requirePartitionFilter , clusteringOrderList , tableExists , jobLabelKeyValue , conf );
267268 } catch (Exception e ) {
268269 throw new IOException ("Failed to import GCS into BigQuery. " , e );
269270 }
@@ -309,7 +310,8 @@ private void importFromGcs(String projectId, TableReference tableRef, @Nullable
309310 List <String > gcsPaths , PartitionType partitionType ,
310311 com .google .cloud .bigquery .TimePartitioning .Type timePartitioningType ,
311312 @ Nullable Range range , @ Nullable String partitionByField , boolean requirePartitionFilter ,
312- List <String > clusteringOrderList , boolean tableExists , Configuration conf )
313+ List <String > clusteringOrderList , boolean tableExists , String jobLabelKeyValue ,
314+ Configuration conf )
313315 throws IOException , InterruptedException {
314316 LOG .info ("Importing into table '{}' from {} paths; path[0] is '{}'; awaitCompletion: {}" ,
315317 BigQueryStrings .toString (tableRef ), gcsPaths .size (), gcsPaths .isEmpty () ? "(empty)" : gcsPaths .get (0 ),
@@ -431,18 +433,18 @@ private void importFromGcs(String projectId, TableReference tableRef, @Nullable
431433
432434 JobConfiguration config = new JobConfiguration ();
433435 config .setLoad (loadConfig );
434- config .setLabels (BigQueryUtil .getJobTags (BigQueryUtil .BQ_JOB_TYPE_SINK_TAG ));
436+ config .setLabels (BigQueryUtil .getJobLabels (BigQueryUtil .BQ_JOB_TYPE_SINK_TAG , jobLabelKeyValue ));
435437 triggerBigqueryJob (projectId , jobId , dataset , config , tableRef );
436438 } else {
437439 // First load the data in a temp table.
438- loadInBatchesInTempTable (tableRef , loadConfig , gcsPaths , projectId , jobId , dataset );
440+ loadInBatchesInTempTable (tableRef , loadConfig , gcsPaths , projectId , jobId , dataset , jobLabelKeyValue );
439441
440442 if (operation .equals (Operation .INSERT )) { // For the case when gcs paths is more than 10000
441443 handleInsertOperation (tableRef , writeDisposition , loadConfig .getDestinationEncryptionConfiguration (),
442- projectId , jobId , dataset , tableExists );
444+ projectId , jobId , dataset , tableExists , jobLabelKeyValue );
443445 } else {
444446 handleUpdateUpsertOperation (tableRef , tableExists , kmsKeyName , getJobIdForUpdateUpsert (conf ),
445- projectId , dataset );
447+ projectId , dataset , jobLabelKeyValue );
446448 }
447449 }
448450
@@ -471,7 +473,8 @@ private void triggerBigqueryJob(String projectId, String jobId, Dataset dataset,
471473 }
472474
473475 private void loadInBatchesInTempTable (TableReference tableRef , JobConfigurationLoad loadConfig ,
474- List <String > gcsPaths , String projectId , String jobId , Dataset dataset )
476+ List <String > gcsPaths , String projectId , String jobId , Dataset dataset ,
477+ String jobLabelKeyValue )
475478 throws IOException , InterruptedException {
476479
477480 LOG .info (" Importing into a temporary table first in batches of 10000" );
@@ -495,7 +498,7 @@ private void loadInBatchesInTempTable(TableReference tableRef, JobConfigurationL
495498 loadConfig .setSourceUris (gcsPathBatch );
496499 JobConfiguration config = new JobConfiguration ();
497500 config .setLoad (loadConfig );
498- config .setLabels (BigQueryUtil .getJobTags (BigQueryUtil .BQ_JOB_TYPE_SINK_TAG ));
501+ config .setLabels (BigQueryUtil .getJobLabels (BigQueryUtil .BQ_JOB_TYPE_SINK_TAG , jobLabelKeyValue ));
499502
500503 triggerBigqueryJob (projectId , jobId + "_" + jobcount , dataset , config , tableRef );
501504 jobcount ++;
@@ -627,7 +630,8 @@ private static Optional<TableSchema> getTableSchema(Configuration conf) throws I
627630
628631 private void handleInsertOperation (TableReference tableRef , String writeDisposition ,
629632 EncryptionConfiguration encryptionConfiguration , String projectId , String jobId ,
630- Dataset dataset , boolean tableExists ) throws IOException , InterruptedException {
633+ Dataset dataset , boolean tableExists ,
634+ String jobLabelKeyValue ) throws IOException , InterruptedException {
631635 if (allowSchemaRelaxation && tableExists ) {
632636 updateTableSchema (tableRef );
633637 }
@@ -639,7 +643,7 @@ private void handleInsertOperation(TableReference tableRef, String writeDisposit
639643
640644 JobConfiguration config = new JobConfiguration ();
641645 config .setCopy (tableCopyConfig );
642- config .setLabels (BigQueryUtil .getJobTags (BigQueryUtil .BQ_JOB_TYPE_SINK_TAG ));
646+ config .setLabels (BigQueryUtil .getJobLabels (BigQueryUtil .BQ_JOB_TYPE_SINK_TAG , jobLabelKeyValue ));
643647 triggerBigqueryJob (projectId , jobId , dataset , config , tableRef );
644648 }
645649
@@ -648,7 +652,8 @@ private void handleUpdateUpsertOperation(TableReference tableRef,
648652 @ Nullable String cmekKey ,
649653 JobId jobId ,
650654 String projectId ,
651- Dataset dataset ) throws IOException , InterruptedException {
655+ Dataset dataset ,
656+ String jobLabelKeyValue ) throws IOException , InterruptedException {
652657 if (allowSchemaRelaxation && tableExists ) {
653658 updateTableSchema (tableRef );
654659 }
@@ -677,7 +682,7 @@ private void handleUpdateUpsertOperation(TableReference tableRef,
677682
678683 // Create Job Configuration and add job labels
679684 JobConfiguration jobConfiguration = new JobConfiguration ();
680- jobConfiguration .setLabels (BigQueryUtil .getJobTags (BigQueryUtil .BQ_JOB_TYPE_SINK_TAG ));
685+ jobConfiguration .setLabels (BigQueryUtil .getJobLabels (BigQueryUtil .BQ_JOB_TYPE_SINK_TAG , jobLabelKeyValue ));
681686 jobConfiguration .setQuery (jobConfigurationQuery );
682687
683688 // Trigger job execution
0 commit comments