Skip to content
This repository was archived by the owner on Aug 8, 2025. It is now read-only.

Commit 9256513

Browse files
committed
Update to 0.15.1, update json schema to include oneOf to count options, bigquery and rabbitmq options, use gradle 8.12, update memory settings
1 parent 3551e3f commit 9256513

9 files changed

Lines changed: 232 additions & 25 deletions

File tree

docker/data/custom/application.conf

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ flags {
1313
enableFailOnError = ${?ENABLE_FAIL_ON_ERROR}
1414
enableUniqueCheck = true
1515
enableUniqueCheck = ${?ENABLE_UNIQUE_CHECK}
16-
enableSinkMetadata = true
16+
enableSinkMetadata = false
1717
enableSinkMetadata = ${?ENABLE_SINK_METADATA}
1818
enableSaveReports = true
1919
enableSaveReports = ${?ENABLE_SAVE_REPORTS}
@@ -23,6 +23,8 @@ flags {
2323
enableGenerateValidations = ${?ENABLE_GENERATE_VALIDATIONS}
2424
enableAlerts = false
2525
enableAlerts = ${?ENABLE_ALERTS}
26+
enableUniqueCheckOnlyInBatch = false
27+
enableUniqueCheckOnlyInBatch = ${?ENABLE_UNIQUE_CHECK_ONLY_IN_BATCH}
2628
}
2729

2830
folders {
@@ -32,6 +34,8 @@ folders {
3234
planFilePath = ${?PLAN_FILE_PATH}
3335
taskFolderPath = "/opt/app/custom/task"
3436
taskFolderPath = ${?TASK_FOLDER_PATH}
37+
validationFolderPath = "/opt/app/custom/validation/csv"
38+
validationFolderPath = ${?VALIDATION_FOLDER_PATH}
3539
recordTrackingFolderPath = "/opt/app/custom/recordTracking"
3640
recordTrackingFolderPath = ${?RECORD_TRACKING_FOLDER_PATH}
3741
recordTrackingForValidationFolderPath = "/opt/app/custom/validation/recordTracking"
@@ -80,22 +84,28 @@ runtime {
8084
master = "local[*]"
8185
master = ${?DATA_CATERER_MASTER}
8286
config {
87+
"spark.driver.memory" = "6g",
88+
"spark.executor.memory" = "6g",
89+
"spark.executor.memoryOverhead" = "512m",
90+
"spark.memory.fraction" = "0.6",
91+
"spark.memory.storageFraction" = "0.5",
92+
"spark.memory.offHeap.size" = "1g",
93+
"spark.sql.shuffle.partitions" = "10",
8394
"spark.sql.cbo.enabled": "true",
84-
"spark.sql.adaptive.enabled": "true",
85-
"spark.sql.cbo.planStats.enabled": "true",
86-
"spark.sql.legacy.allowUntypedScalaUDF": "true",
87-
"spark.sql.legacy.allowParameterlessCount": "true",
88-
"spark.sql.statistics.histogram.enabled": "true",
89-
"spark.sql.shuffle.partitions": "10",
90-
"spark.sql.catalog.postgres": "",
91-
"spark.sql.catalog.cassandra": "com.datastax.spark.connector.datasource.CassandraCatalog",
92-
"spark.sql.catalog.iceberg": "org.apache.iceberg.spark.SparkCatalog",
93-
"spark.sql.catalog.iceberg.type": "hadoop",
94-
"spark.hadoop.fs.s3a.directory.marker.retention": "keep",
95-
"spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled": "true",
96-
"spark.hadoop.fs.hdfs.impl": "org.apache.hadoop.hdfs.DistributedFileSystem",
97-
"spark.hadoop.fs.file.impl": "com.globalmentor.apache.hadoop.fs.BareLocalFileSystem",
98-
"spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
95+
"spark.sql.adaptive.enabled" = "true",
96+
"spark.sql.cbo.planStats.enabled" = "true",
97+
"spark.sql.legacy.allowUntypedScalaUDF" = "true",
98+
"spark.sql.legacy.allowParameterlessCount" = "true",
99+
"spark.sql.statistics.histogram.enabled" = "true",
100+
"spark.sql.catalog.postgres" = "",
101+
"spark.sql.catalog.cassandra" = "com.datastax.spark.connector.datasource.CassandraCatalog",
102+
"spark.sql.catalog.iceberg" = "org.apache.iceberg.spark.SparkCatalog",
103+
"spark.sql.catalog.iceberg.type" = "hadoop",
104+
"spark.hadoop.fs.s3a.directory.marker.retention" = "keep",
105+
"spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled" = "true",
106+
"spark.hadoop.fs.hdfs.impl" = "org.apache.hadoop.hdfs.DistributedFileSystem",
107+
"spark.hadoop.fs.file.impl" = "com.globalmentor.apache.hadoop.fs.BareLocalFileSystem",
108+
"spark.sql.extensions" = "io.delta.sql.DeltaSparkSessionExtension,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
99109
}
100110
}
101111

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
name: "csv_multiple_relationships_plan"
2+
tasks:
3+
- name: "csv_customer_files"
4+
dataSourceName: "csv"
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
name: "csv_customer_files"
2+
steps:
3+
- name: "products"
4+
type: "csv"
5+
options:
6+
path: "/opt/app/custom/csv/multi-relationship/products"
7+
header: true
8+
partitions: 1
9+
saveMode: "Overwrite"
10+
count:
11+
records: 100
12+
fields:
13+
- name: "product_id"
14+
options:
15+
uuid: ""
16+
incremental: 1
17+
- name: "product_name"
18+
options:
19+
regex: "product_[0-9]{8}"
20+
- name: "customers"
21+
type: "csv"
22+
options:
23+
path: "/opt/app/custom/csv/multi-relationship/customers"
24+
header: true
25+
saveMode: "Overwrite"
26+
count:
27+
records: 1000000
28+
fields:
29+
- name: "customer_id"
30+
options:
31+
uuid: ""
32+
incremental: 1000000
33+
- name: "first_name"
34+
options:
35+
expression: "#{Name.firstName}"
36+
- name: "last_name"
37+
options:
38+
expression: "#{Name.lastName}"
39+
- name: "accounts"
40+
type: "csv"
41+
options:
42+
path: "/opt/app/custom/csv/multi-relationship/accounts"
43+
header: true
44+
saveMode: "Overwrite"
45+
count:
46+
records: 1000000
47+
perField:
48+
fieldNames:
49+
- "customer_id"
50+
options:
51+
oneOf: ["1->0.6", "2->0.2", "3->0.1", "4->0.1", "5->0.1"]
52+
fields:
53+
- name: "customer_id"
54+
options:
55+
uuid: ""
56+
incremental: 1000000
57+
isPrimaryKey: true
58+
- name: "product_id_int"
59+
type: "int"
60+
options:
61+
min: 1
62+
max: 100
63+
omit: true
64+
- name: "product_id"
65+
options:
66+
uuid: "product_id_int"
67+
isPrimaryKey: true
68+
- name: "country_code"
69+
options:
70+
expression: "#{Address.countryCode}"
71+
- name: "customer_access"
72+
type: "csv"
73+
options:
74+
path: "/opt/app/custom/csv/multi-relationship/customer-access"
75+
header: true
76+
saveMode: "Overwrite"
77+
count:
78+
records: 1000000
79+
perField:
80+
fieldNames:
81+
- "customer_product_id"
82+
options:
83+
min: 1
84+
max: 5
85+
fields:
86+
- name: "customer_product_id"
87+
options:
88+
uuid: ""
89+
incremental: 1000000
90+
isPrimaryKey: true
91+
- name: "product_id_int"
92+
type: "int"
93+
options:
94+
min: 1
95+
max: 100
96+
omit: true
97+
- name: "product_id"
98+
options:
99+
uuid: "product_id_int"
100+
isPrimaryKey: true
101+
- name: "party_id"
102+
options:
103+
uuid: ""
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
---
2+
name: "csv_customer_checks"
3+
description: "Check customer related fields have gone through system correctly"
4+
dataSources:
5+
csv:
6+
- options:
7+
path: "/opt/app/custom/csv/multi-relationship/products"
8+
validations:
9+
- aggType: count
10+
aggExpr: count == 100
11+
- options:
12+
path: "/opt/app/custom/csv/multi-relationship/customers"
13+
validations:
14+
- aggType: count
15+
aggExpr: count == 1000000
16+
- options:
17+
path: "/opt/app/custom/csv/multi-relationship/accounts"
18+
validations:
19+
- aggType: count
20+
aggExpr: count > 2000000 and count < 3000000
21+
- options:
22+
path: "/opt/app/custom/csv/multi-relationship/customer-access"
23+
validations:
24+
- aggType: count
25+
aggExpr: count > 2000000 and count < 3000000

docker/data/custom/validation/json/json-validation.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ dataSources:
127127
negate: true
128128
- type: "quantileValuesBetween"
129129
quantileRanges:
130-
"0.1":
130+
0.1:
131131
- - 1.0
132132
- 10.0
133133
negate: true

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@ version=0.1.0
88

99
scalaVersion=2.12
1010
scalaSpecificVersion=2.12.19
11-
dataCatererVersion=0.15.0
11+
dataCatererVersion=0.15.1
1212
sparkMajorVersion=3.5

gradle/wrapper/gradle-wrapper.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
distributionBase=GRADLE_USER_HOME
22
distributionPath=wrapper/dists
3-
distributionUrl=https\://services.gradle.org/distributions/gradle-8.8-bin.zip
3+
distributionUrl=https\://services.gradle.org/distributions/gradle-8.12-bin.zip
44
networkTimeout=10000
55
validateDistributionUrl=true
66
zipStoreBase=GRADLE_USER_HOME

run.sh

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,6 @@ DOCKER_CMD=(
6464
-e "APPLICATION_CONFIG_PATH=/opt/app/custom/application.conf"
6565
-e "$full_class_name"
6666
-e "DEPLOY_MODE=client"
67-
-e "DRIVER_MEMORY=2g"
68-
-e "EXECUTOR_MEMORY=2g"
6967
-e "DATA_CATERER_API_USER=$DATA_CATERER_API_USER"
7068
-e "DATA_CATERER_API_TOKEN=$DATA_CATERER_API_TOKEN"
7169
--network "insta-infra_default"

schema/data-caterer-latest.json

Lines changed: 71 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@
125125
"type": {
126126
"type": "string",
127127
"description": "Type of data source.",
128-
"enum": ["cassandra", "postgres", "mysql", "kafka", "solace", "csv", "delta", "json", "orc", "parquet", "iceberg", ""]
128+
"enum": ["bigquery", "cassandra", "postgres", "mysql", "kafka", "solace", "csv", "delta", "json", "orc", "parquet", "iceberg", "rabbitmq"]
129129
},
130130
"options": {
131131
"type": "object",
@@ -220,6 +220,31 @@
220220
"required": ["options"]
221221
}
222222
},
223+
{
224+
"if": {
225+
"properties": {
226+
"type": {
227+
"enum": ["bigquery"]
228+
}
229+
},
230+
"required": ["type"]
231+
},
232+
"then": {
233+
"properties": {
234+
"options": {
235+
"type": "object",
236+
"properties": {
237+
"table": {
238+
"type": "string",
239+
"description": "BigQuery table name. Follows format '<project>.<dataset>.<table>'."
240+
}
241+
},
242+
"required": ["table"]
243+
}
244+
},
245+
"required": ["options"]
246+
}
247+
},
223248
{
224249
"if": {
225250
"properties": {
@@ -359,7 +384,7 @@
359384
"if": {
360385
"properties": {
361386
"type": {
362-
"enum": ["solace"]
387+
"enum": ["solace", "rabbitmq"]
363388
}
364389
},
365390
"required": ["type"]
@@ -633,8 +658,36 @@
633658
"CountGeneratorOptions": {
634659
"type": "object",
635660
"propertyNames": {
636-
"enum": ["sql", "min", "max", "stddev", "mean"]
637-
}
661+
"enum": ["sql", "min", "max", "stddev", "mean", "oneOf"]
662+
},
663+
"allOf": [
664+
{
665+
"if": {
666+
"required": ["oneOf"]
667+
},
668+
"then": {
669+
"properties": {
670+
"oneOf": {
671+
"oneOf": [
672+
{
673+
"type": "array",
674+
"items": {
675+
"type": "integer"
676+
}
677+
},
678+
{
679+
"type": "array",
680+
"items": {
681+
"type": "string",
682+
"pattern": "^0|[1-9]\\d*->(0|[1-9]\\d*|\\d+\\.\\d+)$"
683+
}
684+
}
685+
]
686+
}
687+
}
688+
}
689+
}
690+
]
638691
},
639692
"SchemaFieldDataTypes": {
640693
"enum": [
@@ -733,6 +786,11 @@
733786
"distributionRateParam": {
734787
"type": "number",
735788
"description": "If distribution is `exponential`, rate parameter to adjust exponential distribution."
789+
},
790+
"incremental": {
791+
"type": "number",
792+
"description": "Generate incremental numbers. By default, starts at 1. You can define a starting number.",
793+
"default": 1
736794
}
737795
},
738796
"allOf": [
@@ -763,6 +821,15 @@
763821
"type": "string",
764822
"description": "Regex for generating values.",
765823
"examples": ["ACC[0-9]{8}"]
824+
},
825+
"uuid": {
826+
"type": "string",
827+
"description": "Generate UUID values. If non-empty, you can define another column name to generate UUID values from another columns value."
828+
},
829+
"incremental": {
830+
"type": "number",
831+
"description": "Generate incremental numbers. By default, starts at 1. You can define a starting number.",
832+
"default": 1
766833
}
767834
},
768835
"allOf": [

0 commit comments

Comments
 (0)