@@ -58,7 +58,8 @@ public final class BigQuerySinkConfig extends AbstractBigQuerySinkConfig {
5858 private static final String WHERE = "WHERE" ;
5959 public static final Set <Schema .Type > SUPPORTED_CLUSTERING_TYPES =
6060 ImmutableSet .of (Schema .Type .INT , Schema .Type .LONG , Schema .Type .STRING , Schema .Type .BOOLEAN , Schema .Type .BYTES );
61- private static final Pattern FIELD_PATTERN = Pattern .compile ("[a-zA-Z0-9_]+" );
61+ // Read More : https://cloud.google.com/bigquery/docs/schemas#flexible-column-names
62+ private static final Pattern FIELD_PATTERN = Pattern .compile ("[\\ p{L}\\ p{M}\\ p{N}\\ p{Pc}\\ p{Pd}&%+=:'<>#| ]+" );
6263
6364 public static final String NAME_TABLE = "table" ;
6465 public static final String NAME_SCHEMA = "schema" ;
@@ -75,6 +76,8 @@ public final class BigQuerySinkConfig extends AbstractBigQuerySinkConfig {
7576 public static final String NAME_RANGE_INTERVAL = "rangeInterval" ;
7677
7778 public static final int MAX_NUMBER_OF_COLUMNS = 4 ;
79+ // As defined in https://cloud.google.com/bigquery/docs/schemas#column_names
80+ private static final int MAX_LENGTH_OF_COLUMN_NAME = 300 ;
7881
7982 @ Name (NAME_TABLE )
8083 @ Macro
@@ -345,9 +348,18 @@ public void validate(@Nullable Schema inputSchema, @Nullable Schema outputSchema
345348 String name = field .getName ();
346349 // BigQuery column names only allow alphanumeric characters and _
347350 // https://cloud.google.com/bigquery/docs/schemas#column_names
351+ // Allow support for Flexible column names
352+ // https://cloud.google.com/bigquery/docs/schemas#flexible-column-names
348353 if (!FIELD_PATTERN .matcher (name ).matches ()) {
349- collector .addFailure (String .format ("Output field '%s' must only contain alphanumeric characters and '_'." ,
350- name ), null ).withOutputSchemaField (name );
354+ collector .addFailure (String .format ("Output field '%s' contains invalid characters. " +
355+ "Check column names docs for more details." ,
356+ name ), null ).withOutputSchemaField (name );
357+ }
358+
359+ // Check if the field name exceeds the maximum length of 300 characters.
360+ if (name .length () > MAX_LENGTH_OF_COLUMN_NAME ) {
361+ collector .addFailure (String .format ("Output field '%s' exceeds the maximum length of 300 characters." ,
362+ name ), null ).withOutputSchemaField (name );
351363 }
352364
353365 // check if the required fields are present in the input schema.
0 commit comments