Skip to content

Commit 0846a1a

Browse files
authored
Merge pull request JanKaul#174 from JanKaul/fix/leaking-file-handle
create test for parquet files with empty input stream
2 parents fbb5f62 + d2a87c7 commit 0846a1a

1 file changed

Lines changed: 161 additions & 0 deletions

File tree

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
use std::sync::Arc;
2+
3+
use datafusion::{
4+
common::tree_node::{TransformedResult, TreeNode},
5+
execution::SessionStateBuilder,
6+
prelude::SessionContext,
7+
};
8+
use datafusion_expr::ScalarUDF;
9+
use datafusion_iceberg::{
10+
catalog::catalog_list::IcebergCatalogList,
11+
planner::{iceberg_transform, IcebergQueryPlanner, RefreshMaterializedView},
12+
};
13+
use futures::TryStreamExt;
14+
use iceberg_rust::object_store::{Bucket, ObjectStoreBuilder};
15+
use iceberg_sql_catalog::SqlCatalogList;
16+
use object_store::ObjectMeta;
17+
use testcontainers::{core::ExecCommand, runners::AsyncRunner, ImageExt};
18+
use testcontainers_modules::localstack::LocalStack;
19+
20+
#[tokio::test]
21+
pub async fn test_empty_insert() {
22+
let localstack = LocalStack::default()
23+
.with_env_var("SERVICES", "s3")
24+
.with_env_var("AWS_ACCESS_KEY_ID", "user")
25+
.with_env_var("AWS_SECRET_ACCESS_KEY", "password")
26+
.start()
27+
.await
28+
.unwrap();
29+
30+
let mut command = localstack
31+
.exec(ExecCommand::new(vec![
32+
"awslocal",
33+
"s3api",
34+
"create-bucket",
35+
"--bucket",
36+
"warehouse",
37+
]))
38+
.await
39+
.unwrap();
40+
41+
command.stdout_to_vec().await.unwrap();
42+
43+
let localstack_host = localstack.get_host().await.unwrap();
44+
let localstack_port = localstack.get_host_port_ipv4(4566).await.unwrap();
45+
46+
let object_store = ObjectStoreBuilder::s3()
47+
.with_config("aws_access_key_id".parse().unwrap(), "user")
48+
.with_config("aws_secret_access_key".parse().unwrap(), "password")
49+
.with_config(
50+
"endpoint".parse().unwrap(),
51+
format!("http://{}:{}", localstack_host, localstack_port),
52+
)
53+
.with_config("region".parse().unwrap(), "us-east-1")
54+
.with_config("allow_http".parse().unwrap(), "true");
55+
56+
let iceberg_catalog_list = Arc::new(
57+
SqlCatalogList::new("sqlite://", object_store.clone())
58+
.await
59+
.unwrap(),
60+
);
61+
62+
let catalog_list = {
63+
Arc::new(
64+
IcebergCatalogList::new(iceberg_catalog_list.clone())
65+
.await
66+
.unwrap(),
67+
)
68+
};
69+
70+
let state = SessionStateBuilder::new()
71+
.with_default_features()
72+
.with_catalog_list(catalog_list)
73+
.with_query_planner(Arc::new(IcebergQueryPlanner::new()))
74+
.build();
75+
76+
let ctx = SessionContext::new_with_state(state);
77+
78+
ctx.register_udf(ScalarUDF::from(RefreshMaterializedView::new(
79+
iceberg_catalog_list,
80+
)));
81+
82+
let sql = "CREATE EXTERNAL TABLE lineitem (
83+
L_ORDERKEY BIGINT NOT NULL,
84+
L_PARTKEY BIGINT NOT NULL,
85+
L_SUPPKEY BIGINT NOT NULL,
86+
L_LINENUMBER INT NOT NULL,
87+
L_QUANTITY DOUBLE NOT NULL,
88+
L_EXTENDED_PRICE DOUBLE NOT NULL,
89+
L_DISCOUNT DOUBLE NOT NULL,
90+
L_TAX DOUBLE NOT NULL,
91+
L_RETURNFLAG CHAR NOT NULL,
92+
L_LINESTATUS CHAR NOT NULL,
93+
L_SHIPDATE DATE NOT NULL,
94+
L_COMMITDATE DATE NOT NULL,
95+
L_RECEIPTDATE DATE NOT NULL,
96+
L_SHIPINSTRUCT VARCHAR NOT NULL,
97+
L_SHIPMODE VARCHAR NOT NULL,
98+
L_COMMENT VARCHAR NOT NULL ) STORED AS CSV LOCATION 'testdata/tpch/lineitem.csv' OPTIONS ('has_header' 'false');";
99+
100+
let plan = ctx.state().create_logical_plan(sql).await.unwrap();
101+
102+
let transformed = plan.transform(iceberg_transform).data().unwrap();
103+
104+
ctx.execute_logical_plan(transformed)
105+
.await
106+
.unwrap()
107+
.collect()
108+
.await
109+
.expect("Failed to execute query plan.");
110+
111+
let sql = "CREATE EXTERNAL TABLE warehouse.tpch.lineitem (
112+
L_ORDERKEY BIGINT NOT NULL,
113+
L_PARTKEY BIGINT NOT NULL,
114+
L_SUPPKEY BIGINT NOT NULL,
115+
L_LINENUMBER INT NOT NULL,
116+
L_QUANTITY DOUBLE NOT NULL,
117+
L_EXTENDED_PRICE DOUBLE NOT NULL,
118+
L_DISCOUNT DOUBLE NOT NULL,
119+
L_TAX DOUBLE NOT NULL,
120+
L_RETURNFLAG CHAR NOT NULL,
121+
L_LINESTATUS CHAR NOT NULL,
122+
L_SHIPDATE DATE NOT NULL,
123+
L_COMMITDATE DATE NOT NULL,
124+
L_RECEIPTDATE DATE NOT NULL,
125+
L_SHIPINSTRUCT VARCHAR NOT NULL,
126+
L_SHIPMODE VARCHAR NOT NULL,
127+
L_COMMENT VARCHAR NOT NULL ) STORED AS ICEBERG LOCATION 's3://warehouse/tpch/lineitem' PARTITIONED BY ( \"month(L_SHIPDATE)\" );";
128+
129+
let plan = ctx.state().create_logical_plan(sql).await.unwrap();
130+
131+
let transformed = plan.transform(iceberg_transform).data().unwrap();
132+
133+
ctx.execute_logical_plan(transformed)
134+
.await
135+
.unwrap()
136+
.collect()
137+
.await
138+
.expect("Failed to execute query plan.");
139+
140+
let sql = "insert into warehouse.tpch.lineitem select * from lineitem where l_quantity > 9999;";
141+
142+
let plan = ctx.state().create_logical_plan(sql).await.unwrap();
143+
144+
let transformed = plan.transform(iceberg_transform).data().unwrap();
145+
146+
ctx.execute_logical_plan(transformed)
147+
.await
148+
.unwrap()
149+
.collect()
150+
.await
151+
.expect("Failed to execute query plan.");
152+
153+
let object_store = object_store.build(Bucket::S3("warehouse")).unwrap();
154+
let datafiles: Vec<ObjectMeta> = object_store
155+
.list(Some(&"s3://warehouse/tpch/lineitem/data".into()))
156+
.try_collect()
157+
.await
158+
.unwrap();
159+
160+
assert!(datafiles.is_empty());
161+
}

0 commit comments

Comments
 (0)