From 2baf0bc920f51043df17c49294f5acfac7344c13 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Tue, 5 May 2026 16:28:53 +0200 Subject: [PATCH 1/2] refactor(cubestore): Introduce QueryResult enum for SqlService execute (#10817) --- .../cubestore-sql-tests/src/benches.rs | 16 +- rust/cubestore/cubestore-sql-tests/src/lib.rs | 9 +- .../cubestore-sql-tests/tests/cluster.rs | 2 + .../cubestore-sql-tests/tests/in_process.rs | 1 + .../cubestore-sql-tests/tests/migration.rs | 7 +- .../tests/multi_process.rs | 1 + rust/cubestore/cubestore/src/config/mod.rs | 28 +- rust/cubestore/cubestore/src/http/mod.rs | 14 +- rust/cubestore/cubestore/src/mysql/mod.rs | 4 + .../cubestore/src/remotefs/cleanup.rs | 26 +- .../cubestore/cubestore/src/sql/cachestore.rs | 28 +- rust/cubestore/cubestore/src/sql/mod.rs | 1287 ++++++++++------- .../cubestore/src/store/compaction.rs | 65 +- rust/cubestore/cubestore/src/store/mod.rs | 7 + rust/cubestore/cubestore/src/streaming/mod.rs | 203 ++- 15 files changed, 998 insertions(+), 700 deletions(-) diff --git a/rust/cubestore/cubestore-sql-tests/src/benches.rs b/rust/cubestore/cubestore-sql-tests/src/benches.rs index 90ed40ba3c902..25c81fdf3690c 100644 --- a/rust/cubestore/cubestore-sql-tests/src/benches.rs +++ b/rust/cubestore/cubestore-sql-tests/src/benches.rs @@ -67,6 +67,8 @@ impl Bench for SimpleBench { let r = services .sql_service .exec_query(state.query.as_str()) + .await? + .collect() .await?; let rows = to_rows(&r); assert_eq!(rows, vec![vec![TableValue::Int(23)]]); @@ -105,11 +107,13 @@ impl Bench for ParquetMetadataCacheBench { let _ = services .sql_service .exec_query("CREATE SCHEMA IF NOT EXISTS test") + .await? + .collect() .await?; let _ = services.sql_service .exec_query(format!("CREATE TABLE test.table (`repo` text, `email` text, `commit_count` int) WITH (input_format = 'csv') LOCATION '{}'", path_to_string(path)?).as_str()) - .await?; + .await?.collect().await?; // Wait for all pending (compaction) jobs to finish. wait_for_all_jobs(services).await?; @@ -137,6 +141,8 @@ impl Bench for ParquetMetadataCacheBench { ) .as_str(), ) + .await? + .collect() .await?; let rows = to_rows(&r); assert_eq!(rows, vec![vec![TableValue::Int(6)]]); @@ -157,6 +163,8 @@ impl Bench for CacheSetGetBench { services .sql_service .exec_query("CACHE SET TTL 600 'my_key' 'my_value'") + .await? + .collect() .await?; let state = Arc::new(()); @@ -171,6 +179,8 @@ impl Bench for CacheSetGetBench { let r = services .sql_service .exec_query("CACHE GET 'my_key'") + .await? + .collect() .await?; let rows = to_rows(&r); @@ -208,6 +218,8 @@ impl Bench for crate::benches::QueueListBench { i, "a".repeat(self.payload_size) )) + .await? + .collect() .await?; } @@ -223,6 +235,8 @@ impl Bench for crate::benches::QueueListBench { let r = services .sql_service .exec_query(r#"QUEUE PENDING "STANDALONE#queue""#) + .await? + .collect() .await?; assert_eq!( diff --git a/rust/cubestore/cubestore-sql-tests/src/lib.rs b/rust/cubestore/cubestore-sql-tests/src/lib.rs index 5e9ca55ad1184..97106943f431d 100644 --- a/rust/cubestore/cubestore-sql-tests/src/lib.rs +++ b/rust/cubestore/cubestore-sql-tests/src/lib.rs @@ -116,7 +116,12 @@ pub struct BasicSqlClient { #[async_trait] impl SqlClient for BasicSqlClient { async fn exec_query(&self, query: &str) -> Result, CubeError> { - self.service.as_ref().exec_query(query).await + self.service + .as_ref() + .exec_query(query) + .await? + .collect() + .await } async fn exec_query_with_context( @@ -127,6 +132,8 @@ impl SqlClient for BasicSqlClient { self.service .as_ref() .exec_query_with_context(context, query) + .await? + .collect() .await } diff --git a/rust/cubestore/cubestore-sql-tests/tests/cluster.rs b/rust/cubestore/cubestore-sql-tests/tests/cluster.rs index f26b91201eb25..f9b062acf5014 100644 --- a/rust/cubestore/cubestore-sql-tests/tests/cluster.rs +++ b/rust/cubestore/cubestore-sql-tests/tests/cluster.rs @@ -84,6 +84,7 @@ impl MultiProcTest for ClusterSqlTest { })) .await .unwrap(); + Ok(()) }) .await; } @@ -123,6 +124,7 @@ impl WorkerProc for WorkerFn { .start_test_worker(|_| async move { init.signal().await; done.wait_completion().await; + Ok(()) }) .await } diff --git a/rust/cubestore/cubestore-sql-tests/tests/in_process.rs b/rust/cubestore/cubestore-sql-tests/tests/in_process.rs index f50500886cbaf..46f4d8ef8a87b 100644 --- a/rust/cubestore/cubestore-sql-tests/tests/in_process.rs +++ b/rust/cubestore/cubestore-sql-tests/tests/in_process.rs @@ -22,6 +22,7 @@ fn main() { })) .await .unwrap(); + Ok(()) })); }); } diff --git a/rust/cubestore/cubestore-sql-tests/tests/migration.rs b/rust/cubestore/cubestore-sql-tests/tests/migration.rs index 090de7630c777..6bc812e40bf42 100644 --- a/rust/cubestore/cubestore-sql-tests/tests/migration.rs +++ b/rust/cubestore/cubestore-sql-tests/tests/migration.rs @@ -63,6 +63,7 @@ fn main() { test_fn(Box::new(FilterWritesSqlClient::new(services.sql_service))) .await .unwrap(); + Ok(()) }, )); }); @@ -147,7 +148,9 @@ impl FilterWritesSqlClient { impl SqlClient for FilterWritesSqlClient { async fn exec_query(&self, query: &str) -> Result, CubeError> { match self.compute_filter_flag(query) { - FilterQueryResult::RunQuery => self.sql_service.exec_query(query).await, + FilterQueryResult::RunQuery => { + self.sql_service.exec_query(query).await?.collect().await + } FilterQueryResult::Hardcoded(result) => result, FilterQueryResult::UnrecognizedQueryType => unimplemented!( "FilterWritesSqlClient does not support query prefix for '{}'", @@ -164,6 +167,8 @@ impl SqlClient for FilterWritesSqlClient { FilterQueryResult::RunQuery => { self.sql_service .exec_query_with_context(context, query) + .await? + .collect() .await } FilterQueryResult::Hardcoded(result) => result, diff --git a/rust/cubestore/cubestore-sql-tests/tests/multi_process.rs b/rust/cubestore/cubestore-sql-tests/tests/multi_process.rs index 688fdd8b7bc61..eb21aaf3eb3b0 100644 --- a/rust/cubestore/cubestore-sql-tests/tests/multi_process.rs +++ b/rust/cubestore/cubestore-sql-tests/tests/multi_process.rs @@ -30,6 +30,7 @@ fn main() { })) .await .unwrap(); + Ok(()) }), ); }); diff --git a/rust/cubestore/cubestore/src/config/mod.rs b/rust/cubestore/cubestore/src/config/mod.rs index f8d1f38959b1d..c0bba2edc7109 100644 --- a/rust/cubestore/cubestore/src/config/mod.rs +++ b/rust/cubestore/cubestore/src/config/mod.rs @@ -1804,7 +1804,7 @@ impl Config { pub async fn start_test(&self, test_fn: impl FnOnce(CubeServices) -> T) where - T: Future + Send, + T: Future> + Send, { self.start_test_with_options::<_, T, _, _>( true, @@ -1822,7 +1822,7 @@ impl Config { pub async fn start_migration_test(&self, test_fn: impl FnOnce(CubeServices) -> T) where - T: Future + Send, + T: Future> + Send, { self.start_migration_test_with_options::<_, T, _, _>( Option::< @@ -1839,7 +1839,7 @@ impl Config { pub async fn start_test_worker(&self, test_fn: impl FnOnce(CubeServices) -> T) where - T: Future + Send, + T: Future> + Send, { self.start_test_with_options::<_, T, _, _>( false, @@ -1861,7 +1861,7 @@ impl Config { test_fn: impl FnOnce(CubeServices) -> T2, ) where T1: Future + Send, - T2: Future + Send, + T2: Future> + Send, { self.start_test_with_options(true, Some(configure_injector), test_fn) .await @@ -1874,7 +1874,7 @@ impl Config { test_fn: F, ) where T1: Future + Send, - T2: Future + Send, + T2: Future> + Send, I: FnOnce(Arc) -> T1, F: FnOnce(CubeServices) -> T2, { @@ -1900,8 +1900,10 @@ impl Config { // Should be long enough even for CI. let timeout = Duration::from_secs(600); - if let Err(_) = timeout_at(Instant::now() + timeout, test_fn(services.clone())).await { - panic!("Test timed out after {} seconds", timeout.as_secs()); + match timeout_at(Instant::now() + timeout, test_fn(services.clone())).await { + Err(_) => panic!("Test timed out after {} seconds", timeout.as_secs()), + Ok(Err(e)) => panic!("Test failed: {}", e.display_with_backtrace()), + Ok(Ok(())) => {} } services.stop_processing_loops().await.unwrap(); @@ -1924,7 +1926,7 @@ impl Config { test_fn: F, ) where T1: Future + Send, - T2: Future + Send, + T2: Future> + Send, I: FnOnce(Arc) -> T1, F: FnOnce(CubeServices) -> T2, { @@ -1943,8 +1945,10 @@ impl Config { // Should be long enough even for CI. let timeout = Duration::from_secs(600); - if let Err(_) = timeout_at(Instant::now() + timeout, test_fn(services.clone())).await { - panic!("Test timed out after {} seconds", timeout.as_secs()); + match timeout_at(Instant::now() + timeout, test_fn(services.clone())).await { + Err(_) => panic!("Test timed out after {} seconds", timeout.as_secs()), + Ok(Err(e)) => panic!("Test failed: {}", e.display_with_backtrace()), + Ok(Ok(())) => {} } services.stop_processing_loops().await.unwrap(); @@ -1962,14 +1966,14 @@ impl Config { pub async fn run_test(name: &str, test_fn: impl FnOnce(CubeServices) -> T) where - T: Future + Send, + T: Future> + Send, { Self::test(name).start_test(test_fn).await; } pub async fn run_migration_test(name: &str, test_fn: impl FnOnce(CubeServices) -> T) where - T: Future + Send, + T: Future> + Send, { Self::migration_test(name) .start_migration_test(test_fn) diff --git a/rust/cubestore/cubestore/src/http/mod.rs b/rust/cubestore/cubestore/src/http/mod.rs index c081d5fba0f2e..38b49c6c6e79c 100644 --- a/rust/cubestore/cubestore/src/http/mod.rs +++ b/rust/cubestore/cubestore/src/http/mod.rs @@ -589,6 +589,8 @@ impl HttpServer { .with_parameters(¶meters), &query, ) + .await? + .collect() .await?, }), x => Err(CubeError::user(format!("Unexpected command: {:?}", x))), @@ -982,7 +984,9 @@ mod tests { use crate::http::{HttpCommand, HttpMessage, HttpServer}; use crate::metastore::{Column, ColumnType}; use crate::mysql::MockSqlAuthService; - use crate::sql::{timestamp_from_string, InlineTable, QueryPlans, SqlQueryContext, SqlService}; + use crate::sql::{ + timestamp_from_string, InlineTable, QueryPlans, QueryResult, SqlQueryContext, SqlService, + }; use crate::store::DataFrame; use crate::table::{Row, TableValue}; use crate::CubeError; @@ -1137,7 +1141,7 @@ mod tests { #[async_trait] impl SqlService for SqlServiceMock { - async fn exec_query(&self, _query: &str) -> Result, CubeError> { + async fn exec_query(&self, _query: &str) -> Result { todo!() } @@ -1145,7 +1149,7 @@ mod tests { &self, _context: SqlQueryContext, query: &str, - ) -> Result, CubeError> { + ) -> Result { tokio::time::sleep(Duration::from_secs(2)).await; let counter = self.message_counter.fetch_add(1, Ordering::Relaxed); if query == "close_connection" { @@ -1153,10 +1157,10 @@ mod tests { } else if query == "error" { Err(CubeError::internal("error".to_string())) } else { - Ok(Arc::new(DataFrame::new( + Ok(QueryResult::Frame(Arc::new(DataFrame::new( vec![Column::new("foo".to_string(), ColumnType::String, 0)], vec![Row::new(vec![TableValue::String(format!("{}", counter))])], - ))) + )))) } } diff --git a/rust/cubestore/cubestore/src/mysql/mod.rs b/rust/cubestore/cubestore/src/mysql/mod.rs index 2b2e054208c73..16e500740191c 100644 --- a/rust/cubestore/cubestore/src/mysql/mod.rs +++ b/rust/cubestore/cubestore/src/mysql/mod.rs @@ -69,6 +69,10 @@ impl AsyncMysqlShim for Backend { query, ) .await; + let res = match res { + Ok(qr) => qr.collect().await, + Err(e) => Err(e), + }; if let Err(e) = res { error!( "Error during processing {}: {}", diff --git a/rust/cubestore/cubestore/src/remotefs/cleanup.rs b/rust/cubestore/cubestore/src/remotefs/cleanup.rs index 4b3d5746c46a5..f35f0cced2d07 100644 --- a/rust/cubestore/cubestore/src/remotefs/cleanup.rs +++ b/rust/cubestore/cubestore/src/remotefs/cleanup.rs @@ -315,18 +315,33 @@ mod test { let service = services.sql_service; let meta_store = services.meta_store; let remote_fs = services.injector.get_service_typed::().await; - let _ = service.exec_query("CREATE SCHEMA test").await.unwrap(); + let _ = service + .exec_query("CREATE SCHEMA test") + .await + .unwrap() + .collect() + .await + .unwrap(); let _ = service .exec_query("CREATE TABLE test.tst (a int, b int)") .await + .unwrap() + .collect() + .await .unwrap(); let _ = service .exec_query("INSERT INTO test.tst (a, b) VALUES (10, 10), (20 , 20)") .await + .unwrap() + .collect() + .await .unwrap(); let _ = service .exec_query("INSERT INTO test.tst (a, b) VALUES (20, 20), (40 , 40)") .await + .unwrap() + .collect() + .await .unwrap(); let files = remove_root_paritition(meta_store.get_all_filenames().await.unwrap()); assert_eq!(files.len(), 2); @@ -361,7 +376,14 @@ mod test { let path = remote_fs.local_file("metastore".to_string()).await.unwrap(); assert!(Path::new(&path).exists()); - let _ = service.exec_query("SELECT * FROM test.tst").await.unwrap(); + let _ = service + .exec_query("SELECT * FROM test.tst") + .await + .unwrap() + .collect() + .await + .unwrap(); + Ok::<(), CubeError>(()) }) .await; } diff --git a/rust/cubestore/cubestore/src/sql/cachestore.rs b/rust/cubestore/cubestore/src/sql/cachestore.rs index 5b66b924f6095..af78d45e5aeb7 100644 --- a/rust/cubestore/cubestore/src/sql/cachestore.rs +++ b/rust/cubestore/cubestore/src/sql/cachestore.rs @@ -7,7 +7,7 @@ use crate::sql::parser::{ CacheCommand, CacheStoreCommand, CubeStoreParser, QueueCommand, Statement as CubeStoreStatement, SystemCommand, }; -use crate::sql::{QueryPlans, SqlQueryContext, SqlService}; +use crate::sql::{QueryPlans, QueryResult, SqlQueryContext, SqlService}; use crate::store::DataFrame; use crate::table::{Row, TableValue}; use crate::util::metrics; @@ -606,7 +606,7 @@ impl CacheStoreSqlService { #[async_trait] impl SqlService for CacheStoreSqlService { - async fn exec_query(&self, q: &str) -> Result, CubeError> { + async fn exec_query(&self, q: &str) -> Result { self.exec_query_with_context(SqlQueryContext::default(), q) .await } @@ -615,7 +615,7 @@ impl SqlService for CacheStoreSqlService { &self, mut ctx: SqlQueryContext, query: &str, - ) -> Result, CubeError> { + ) -> Result { let stmt = { let mut parser = CubeStoreParser::new(query, ctx.parameters.take())?; parser.parse_statement()? @@ -635,25 +635,25 @@ impl SqlService for CacheStoreSqlService { match logical_plan { QueryPlan::Meta(logical_plan) => { app_metrics::META_QUERIES.increment(); - Ok(Arc::new( + Ok(QueryResult::Frame(Arc::new( self.query_planner.execute_meta_plan(logical_plan).await?, - )) + ))) } _ => Err(CubeError::user(format!("Unsupported SQL: '{}'", query))), } } CubeStoreStatement::System(command) => match command { - SystemCommand::CacheStore(command) => { - self.exec_system_command_with_context(ctx, command).await - } + SystemCommand::CacheStore(command) => Ok(QueryResult::Frame( + self.exec_system_command_with_context(ctx, command).await?, + )), _ => Err(CubeError::user(format!("Unsupported SQL: '{}'", query))), }, - CubeStoreStatement::Queue(command) => { - self.exec_queue_command_with_context(ctx, command).await - } - CubeStoreStatement::Cache(command) => { - self.exec_cache_command_with_context(ctx, command).await - } + CubeStoreStatement::Queue(command) => Ok(QueryResult::Frame( + self.exec_queue_command_with_context(ctx, command).await?, + )), + CubeStoreStatement::Cache(command) => Ok(QueryResult::Frame( + self.exec_cache_command_with_context(ctx, command).await?, + )), _ => Err(CubeError::user(format!("Unsupported SQL: '{}'", query))), } } diff --git a/rust/cubestore/cubestore/src/sql/mod.rs b/rust/cubestore/cubestore/src/sql/mod.rs index 86e0c10514501..60f2f9889a75a 100644 --- a/rust/cubestore/cubestore/src/sql/mod.rs +++ b/rust/cubestore/cubestore/src/sql/mod.rs @@ -13,10 +13,14 @@ use chrono::format::Parsed; use chrono::{ParseResult, TimeZone, Utc}; use datafusion::arrow::array::*; use datafusion::arrow::compute::kernels::cast_utils::string_to_timestamp_nanos; +use datafusion::arrow::datatypes::SchemaRef; +use datafusion::arrow::record_batch::RecordBatch; use datafusion::cube_ext; use datafusion::physical_plan::ExecutionPlan; use datafusion::sql::parser::Statement as DFStatement; use futures::future::join_all; +use futures::stream::BoxStream; +use futures::TryStreamExt; use hex::FromHex; use itertools::Itertools; use log::trace; @@ -84,16 +88,83 @@ use mockall::automock; use table_creator::{convert_columns_type, TableCreator}; pub use table_creator::{TableExtensionService, TableExtensionServiceImpl}; +pub enum QueryResult { + Frame(Arc), + Stream { + schema: SchemaRef, + batches: BoxStream<'static, Result>, + }, +} + +impl QueryResult { + pub fn schema(&self) -> SchemaRef { + match self { + QueryResult::Frame(df) => df.get_schema(), + QueryResult::Stream { schema, .. } => schema.clone(), + } + } + + pub async fn collect(self) -> Result, CubeError> { + match self { + QueryResult::Frame(df) => Ok(df), + QueryResult::Stream { batches, .. } => { + let acc: Vec = batches.try_collect().await?; + + let df = cube_ext::spawn_blocking(move || -> Result { + batches_to_dataframe(acc) + }) + .await??; + + Ok(Arc::new(df)) + } + } + } +} + +impl std::fmt::Debug for QueryResult { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + QueryResult::Frame(df) => f.debug_tuple("Frame").field(df).finish(), + QueryResult::Stream { schema, .. } => f + .debug_struct("Stream") + .field("schema", schema) + .finish_non_exhaustive(), + } + } +} + +impl PartialEq for QueryResult { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (QueryResult::Frame(a), QueryResult::Frame(b)) => a == b, + // Streaming results carry a live BoxStream and aren't comparable. + _ => false, + } + } +} + +impl From for QueryResult { + fn from(df: DataFrame) -> Self { + QueryResult::Frame(Arc::new(df)) + } +} + +impl From> for QueryResult { + fn from(df: Arc) -> Self { + QueryResult::Frame(df) + } +} + #[automock] #[async_trait] pub trait SqlService: DIService + Send + Sync { - async fn exec_query(&self, query: &str) -> Result, CubeError>; + async fn exec_query(&self, query: &str) -> Result; async fn exec_query_with_context( &self, context: SqlQueryContext, query: &str, - ) -> Result, CubeError>; + ) -> Result; /// Exposed only for tests. Worker plan created as if all partitions are on the same worker. async fn plan_query(&self, query: &str) -> Result; @@ -639,7 +710,7 @@ impl Dialect for MySqlDialectWithBackTicks { #[async_trait] impl SqlService for SqlServiceImpl { - async fn exec_query(&self, q: &str) -> Result, CubeError> { + async fn exec_query(&self, q: &str) -> Result { self.exec_query_with_context(SqlQueryContext::default(), q) .await } @@ -649,14 +720,14 @@ impl SqlService for SqlServiceImpl { &self, mut context: SqlQueryContext, query: &str, - ) -> Result, CubeError> { + ) -> Result { if !query.to_lowercase().starts_with("insert") && !query.to_lowercase().contains("password") { trace!("Query: '{}'", query); } if let Some(data_frame) = SqlServiceImpl::handle_workbench_queries(query) { - return Ok(Arc::new(data_frame)); + return Ok(data_frame.into()); } let ast = { @@ -673,33 +744,29 @@ impl SqlService for SqlServiceImpl { ))); } match variable[0].value.to_lowercase() { - s if s == "schemas" => { - Ok(Arc::new(DataFrame::from(self.db.get_schemas().await?))) + s if s == "schemas" => Ok(DataFrame::from(self.db.get_schemas().await?).into()), + s if s == "tables" => Ok(DataFrame::from(self.db.get_tables().await?).into()), + s if s == "chunks" => { + Ok(DataFrame::from(self.db.chunks_table().all_rows().await?).into()) + } + s if s == "indexes" => { + Ok(DataFrame::from(self.db.index_table().all_rows().await?).into()) } - s if s == "tables" => { - Ok(Arc::new(DataFrame::from(self.db.get_tables().await?))) + s if s == "partitions" => { + Ok(DataFrame::from(self.db.partition_table().all_rows().await?).into()) } - s if s == "chunks" => Ok(Arc::new(DataFrame::from( - self.db.chunks_table().all_rows().await?, - ))), - s if s == "indexes" => Ok(Arc::new(DataFrame::from( - self.db.index_table().all_rows().await?, - ))), - s if s == "partitions" => Ok(Arc::new(DataFrame::from( - self.db.partition_table().all_rows().await?, - ))), x => Err(CubeError::user(format!("Unknown SHOW: {}", x))), } } CubeStoreStatement::System(command) => match command { SystemCommand::KillAllJobs => { self.db.delete_all_jobs().await?; - Ok(Arc::new(DataFrame::new(vec![], vec![]))) + Ok(DataFrame::empty().into()) } SystemCommand::Repartition { partition_id } => { let partition = self.db.get_partition(partition_id).await?; self.cluster.schedule_repartition(&partition).await?; - Ok(Arc::new(DataFrame::new(vec![], vec![]))) + Ok(DataFrame::empty().into()) } SystemCommand::PanicWorker => { let cluster = self.cluster.clone(); @@ -741,36 +808,36 @@ impl SqlService for SqlServiceImpl { DropCommand::DropQueryCache => { self.cache.clear().await; - Ok(Arc::new(DataFrame::new(vec![], vec![]))) + Ok(DataFrame::empty().into()) } DropCommand::DropAllCache => { self.cache.clear().await; - Ok(Arc::new(DataFrame::new(vec![], vec![]))) + Ok(DataFrame::empty().into()) } }, SystemCommand::MetaStore(command) => match command { MetaStoreCommand::SetCurrent { id } => { self.db.set_current_snapshot(id).await?; - Ok(Arc::new(DataFrame::new(vec![], vec![]))) + Ok(DataFrame::empty().into()) } MetaStoreCommand::Compaction => { self.db.compaction().await?; - Ok(Arc::new(DataFrame::new(vec![], vec![]))) + Ok(DataFrame::empty().into()) } MetaStoreCommand::Healthcheck => { self.db.healthcheck().await?; - Ok(Arc::new(DataFrame::new(vec![], vec![]))) + Ok(DataFrame::empty().into()) } }, - SystemCommand::CacheStore(command) => { - self.cachestore - .exec_system_command_with_context(context, command) - .await - } + SystemCommand::CacheStore(command) => Ok(self + .cachestore + .exec_system_command_with_context(context, command) + .await? + .into()), }, CubeStoreStatement::Statement(Statement::SetVariable { .. }) => { - Ok(Arc::new(DataFrame::new(vec![], vec![]))) + Ok(DataFrame::empty().into()) } CubeStoreStatement::CreateSchema { schema_name, @@ -783,7 +850,7 @@ impl SqlService for SqlServiceImpl { let name = normalize_for_schema_table_or_index_name(&schema_name.0[0]); let res = self.create_schema(name, if_not_exists).await?; - Ok(Arc::new(DataFrame::from(vec![res]))) + Ok(DataFrame::from(vec![res]).into()) } CubeStoreStatement::CreateTable { create_table: @@ -982,7 +1049,7 @@ impl SqlService for SqlServiceImpl { &context.trace_obj, ) .await?; - Ok(Arc::new(DataFrame::from(vec![res]))) + Ok(DataFrame::from(vec![res]).into()) } CubeStoreStatement::Statement(Statement::CreateIndex(CreateIndex { name, @@ -1027,7 +1094,7 @@ impl SqlService for SqlServiceImpl { .collect::, _>>()?, ) .await?; - Ok(Arc::new(DataFrame::from(vec![res]))) + Ok(DataFrame::from(vec![res]).into()) } CubeStoreStatement::CreateSource { name, @@ -1074,7 +1141,7 @@ impl SqlService for SqlServiceImpl { .db .create_or_update_source(normalize_for_source_name(&name), creds?) .await?; - Ok(Arc::new(DataFrame::from(vec![source]))) + Ok(DataFrame::from(vec![source]).into()) } else { Err(CubeError::user( "CREATE SOURCE OR UPDATE should be used instead".to_string(), @@ -1110,7 +1177,7 @@ impl SqlService for SqlServiceImpl { if_not_exists, ) .await?; - Ok(Arc::new(DataFrame::from(vec![res]))) + Ok(DataFrame::from(vec![res]).into()) } CubeStoreStatement::Statement(Statement::Drop { object_type, names, .. @@ -1140,7 +1207,7 @@ impl SqlService for SqlServiceImpl { app_metrics::DATA_QUERIES .add_with_tags(1, Some(&vec![metrics::format_tag("command", command)])); - Ok(Arc::new(DataFrame::new(vec![], vec![]))) + Ok(DataFrame::empty().into()) } CubeStoreStatement::Statement(Statement::Insert(Insert { table, @@ -1180,18 +1247,18 @@ impl SqlService for SqlServiceImpl { self.insert_data(schema_name.clone(), table_name.clone(), &columns, data) .await?; - Ok(Arc::new(DataFrame::new(vec![], vec![]))) - } - CubeStoreStatement::Queue(command) => { - self.cachestore - .exec_queue_command_with_context(context, command) - .await - } - CubeStoreStatement::Cache(command) => { - self.cachestore - .exec_cache_command_with_context(context, command) - .await + Ok(DataFrame::empty().into()) } + CubeStoreStatement::Queue(command) => Ok(self + .cachestore + .exec_queue_command_with_context(context, command) + .await? + .into()), + CubeStoreStatement::Cache(command) => Ok(self + .cachestore + .exec_cache_command_with_context(context, command) + .await? + .into()), CubeStoreStatement::Statement(Statement::Query(q)) => { let logical_plan_time_start = SystemTime::now(); let logical_plan = self @@ -1207,7 +1274,7 @@ impl SqlService for SqlServiceImpl { .report(logical_plan_time_start.elapsed()?.as_micros() as i64); // TODO distribute and combine - let res = match logical_plan { + let res: Arc = match logical_plan { QueryPlan::Meta(logical_plan) => { app_metrics::META_QUERIES.increment(); Arc::new(self.query_planner.execute_meta_plan(logical_plan).await?) @@ -1254,7 +1321,7 @@ impl SqlService for SqlServiceImpl { .await?? } }; - Ok(res) + Ok(res.into()) } CubeStoreStatement::Statement(Statement::Explain { analyze, @@ -1262,14 +1329,17 @@ impl SqlService for SqlServiceImpl { statement, .. }) => match *statement { - Statement::Query(q) => self.explain(Statement::Query(q.clone()), analyze).await, + Statement::Query(q) => Ok(self + .explain(Statement::Query(q.clone()), analyze) + .await? + .into()), _ => Err(CubeError::user(format!( "Unsupported explain request: '{}'", query ))), }, - CubeStoreStatement::Dump(q) => self.dump_select_inputs(query, q).await, + CubeStoreStatement::Dump(q) => Ok(self.dump_select_inputs(query, q).await?.into()), _ => Err(CubeError::user(format!("Unsupported SQL: '{}'", query))), } @@ -1830,7 +1900,7 @@ mod tests { use regex::Regex; #[tokio::test] - async fn create_schema_test() { + async fn create_schema_test() -> Result<(), CubeError> { let config = Config::test("create_schema_test"); let path = "/tmp/test_create_schema"; @@ -1891,7 +1961,11 @@ mod tests { )), BasicProcessRateLimiter::new(), ); - let i = service.exec_query("CREATE SCHEMA foo").await.unwrap(); + let i = service + .exec_query("CREATE SCHEMA foo") + .await? + .collect() + .await?; assert_eq!( i.get_rows()[0], Row::new(vec![ @@ -1906,10 +1980,11 @@ mod tests { let _ = fs::remove_dir_all(store_path.clone()); let _ = fs::remove_dir_all(remote_store_path.clone()); + Ok(()) } #[tokio::test] - async fn create_table_test() { + async fn create_table_test() -> Result<(), CubeError> { let config = Config::test("create_table_test"); let path = "/tmp/test_create_table"; @@ -1970,7 +2045,11 @@ mod tests { )), BasicProcessRateLimiter::new(), ); - let i = service.exec_query("CREATE SCHEMA `Foo`").await.unwrap(); + let i = service + .exec_query("CREATE SCHEMA `Foo`") + .await? + .collect() + .await?; assert_eq!( i.get_rows()[0], Row::new(vec![ @@ -1985,7 +2064,11 @@ mod tests { `Address` varchar(255), `City` varchar(255) );"; - let i = service.exec_query(&query.to_string()).await.unwrap(); + let i = service + .exec_query(&query.to_string()) + .await? + .collect() + .await?; assert_eq!(i.get_rows()[0], Row::new(vec![ TableValue::Int(1), TableValue::String("Persons".to_string()), @@ -1995,7 +2078,7 @@ mod tests { TableValue::String("NULL".to_string()), TableValue::String("false".to_string()), TableValue::String("true".to_string()), - TableValue::String(meta_store.get_table("Foo".to_string(), "Persons".to_string()).await.unwrap().get_row().created_at().as_ref().unwrap().to_string()), + TableValue::String(meta_store.get_table("Foo".to_string(), "Persons".to_string()).await?.get_row().created_at().as_ref().unwrap().to_string()), TableValue::String("NULL".to_string()), TableValue::String("NULL".to_string()), TableValue::String("false".to_string()), @@ -2016,10 +2099,11 @@ mod tests { let _ = fs::remove_dir_all(store_path.clone()); let _ = fs::remove_dir_all(remote_store_path.clone()); + Ok(()) } #[tokio::test] - async fn create_table_test_seal_at() { + async fn create_table_test_seal_at() -> Result<(), CubeError> { let config = Config::test("create_table_test_seal_at"); let path = "/tmp/test_create_table_seal_at"; @@ -2080,7 +2164,11 @@ mod tests { )), BasicProcessRateLimiter::new(), ); - let i = service.exec_query("CREATE SCHEMA `Foo`").await.unwrap(); + let i = service + .exec_query("CREATE SCHEMA `Foo`") + .await? + .collect() + .await?; assert_eq!( i.get_rows()[0], Row::new(vec![ @@ -2095,7 +2183,11 @@ mod tests { `Address` varchar(255), `City` varchar(255) ) WITH (seal_at='2022-10-05T01:00:00.000Z', select_statement='SELECT * FROM test WHERE created_at > ''2022-05-01 00:00:00''');"; - let i = service.exec_query(&query.to_string()).await.unwrap(); + let i = service + .exec_query(&query.to_string()) + .await? + .collect() + .await?; assert_eq!(i.get_rows()[0], Row::new(vec![ TableValue::Int(1), TableValue::String("Persons".to_string()), @@ -2105,7 +2197,7 @@ mod tests { TableValue::String("NULL".to_string()), TableValue::String("false".to_string()), TableValue::String("true".to_string()), - TableValue::String(meta_store.get_table("Foo".to_string(), "Persons".to_string()).await.unwrap().get_row().created_at().as_ref().unwrap().to_string()), + TableValue::String(meta_store.get_table("Foo".to_string(), "Persons".to_string()).await?.get_row().created_at().as_ref().unwrap().to_string()), TableValue::String("NULL".to_string()), TableValue::String("2022-10-05 01:00:00 UTC".to_string()), TableValue::String("false".to_string()), @@ -2126,6 +2218,7 @@ mod tests { let _ = fs::remove_dir_all(store_path.clone()); let _ = fs::remove_dir_all(remote_store_path.clone()); + Ok(()) } //#[derive(Debug)] @@ -2195,7 +2288,7 @@ mod tests { impl ExtendedRemoteFs for FailingRemoteFs {} #[tokio::test] - async fn create_table_if_not_exists() { + async fn create_table_if_not_exists() -> Result<(), CubeError> { Config::test("create_table_if_not_exists").start_with_injector_override(async move |injector| { injector.register_typed::(async move |injector| { Arc::new(FailingRemoteFs( @@ -2206,11 +2299,11 @@ mod tests { }, async move |services| { let service = services.sql_service; - let _ = service.exec_query("CREATE SCHEMA foo").await.unwrap(); + let _ = service.exec_query("CREATE SCHEMA foo").await?.collect().await?; let created_table = service .exec_query("CREATE TABLE foo.values (id int, dec_value decimal, dec_value_1 decimal(18, 2))") - .await.unwrap(); + .await?.collect().await?; let res = service .exec_query("CREATE TABLE foo.values (id int, dec_value decimal, dec_value_1 decimal(18, 2))") .await; @@ -2218,14 +2311,16 @@ mod tests { let res = service .exec_query("CREATE TABLE IF NOT EXISTS foo.values (id int, dec_value decimal, dec_value_1 decimal(18, 2))") .await; - assert_eq!(res.unwrap(), created_table); + assert_eq!(res.unwrap().collect().await?, created_table); + Ok::<(), CubeError>(()) }) .await; + Ok(()) } #[tokio::test] - async fn failed_upload_drop() { + async fn failed_upload_drop() -> Result<(), CubeError> { Config::test("failed_upload_drop").start_with_injector_override(async move |injector| { injector.register_typed::(async move |injector| { Arc::new(FailingRemoteFs( @@ -2236,12 +2331,11 @@ mod tests { }, async move |services| { let service = services.sql_service; - let _ = service.exec_query("CREATE SCHEMA foo").await.unwrap(); + let _ = service.exec_query("CREATE SCHEMA foo").await?.collect().await?; let _ = service .exec_query("CREATE TABLE foo.values (id int, dec_value decimal, dec_value_1 decimal(18, 2))") - .await - .unwrap(); + .await?.collect().await?; let res = service .exec_query("INSERT INTO foo.values (id, dec_value, dec_value_1) VALUES (1, -153, 1), (2, 20.01, 3.5), (3, 20.30, 12.3), (4, 120.30, 43.12), (5, NULL, NULL), (6, NULL, NULL), (7, NULL, NULL), (NULL, NULL, NULL)") @@ -2251,51 +2345,48 @@ mod tests { let remote_fs = services.injector.get_service_typed::().await; - let temp_upload = remote_fs.temp_upload_path("".to_string()).await.unwrap(); + let temp_upload = remote_fs.temp_upload_path("".to_string()).await?; let res = fs::read_dir(temp_upload.clone()).unwrap(); assert!(res.into_iter().next().is_none(), "Expected empty uploads directory but found: {:?}", fs::read_dir(temp_upload).unwrap().into_iter().map(|e| e.unwrap().path().to_string_lossy().to_string()).collect::>()); + Ok::<(), CubeError>(()) }) .await; + Ok(()) } #[tokio::test] - async fn decimal() { + async fn decimal() -> Result<(), CubeError> { Config::test("decimal").update_config(|mut c| { c.partition_split_threshold = 2; c }).start_test(async move |services| { let service = services.sql_service; - let _ = service.exec_query("CREATE SCHEMA foo").await.unwrap(); + let _ = service.exec_query("CREATE SCHEMA foo").await?.collect().await?; let _ = service .exec_query("CREATE TABLE foo.values (id int, dec_value decimal, dec_value_1 decimal(18, 2))") - .await - .unwrap(); + .await?.collect().await?; service .exec_query("INSERT INTO foo.values (id, dec_value, dec_value_1) VALUES (1, -153, 1), (2, 20.01, 3.5), (3, 20.30, 12.3), (4, 120.30, 43.12), (5, NULL, NULL), (6, NULL, NULL), (7, NULL, NULL), (NULL, NULL, NULL)") - .await - .unwrap(); + .await?.collect().await?; let result = service .exec_query("SELECT sum(dec_value), sum(dec_value_1) from foo.values") - .await - .unwrap(); + .await?.collect().await?; assert_eq!(result.get_rows()[0], Row::new(vec![TableValue::Decimal(Decimal::new(761000)), TableValue::Decimal(Decimal::new(5992))])); let result = service .exec_query("SELECT sum(dec_value), sum(dec_value_1) from foo.values where dec_value > 10") - .await - .unwrap(); + .await?.collect().await?; assert_eq!(result.get_rows()[0], Row::new(vec![TableValue::Decimal(Decimal::new(16061000)), TableValue::Decimal(Decimal::new(5892))])); let result = service .exec_query("SELECT sum(dec_value), sum(dec_value_1) / 10 from foo.values where dec_value > 10") - .await - .unwrap(); + .await?.collect().await?; // For this test's purposes there is no a priori reason to expect (precision, scale) = // (32, 6) -- DF decided that on its own initiative. @@ -2305,45 +2396,50 @@ mod tests { let result = service .exec_query("SELECT sum(dec_value), sum(dec_value_1) / 10 from foo.values where dec_value_1 < 10") - .await - .unwrap(); + .await?.collect().await?; assert_eq!(result.get_schema().field(1).data_type(), &datafusion::arrow::datatypes::DataType::Decimal128(32, EXPECTED_SCALE)); assert_eq!(result.get_rows()[0], Row::new(vec![TableValue::Decimal(Decimal::new(-13299000)), TableValue::Decimal(Decimal::new(450 * 10i128.pow((EXPECTED_SCALE - 3) as u32)))])); let result = service .exec_query("SELECT sum(dec_value), sum(dec_value_1) / 10 from foo.values where dec_value_1 < decimal '10'") - .await - .unwrap(); + .await?.collect().await?; assert_eq!(result.get_schema().field(1).data_type(), &datafusion::arrow::datatypes::DataType::Decimal128(32, EXPECTED_SCALE)); assert_eq!(result.get_rows()[0], Row::new(vec![TableValue::Decimal(Decimal::new(-13299000)), TableValue::Decimal(Decimal::new(450 * 10i128.pow((EXPECTED_SCALE - 3) as u32)))])); + Ok::<(), CubeError>(()) }) .await; + Ok(()) } /// Runs int96 test with write operations, or runs read-only on an existing store. - async fn int96_helper(services: CubeServices, perform_writes: bool) { + async fn int96_helper(services: CubeServices, perform_writes: bool) -> Result<(), CubeError> { let service = services.sql_service; if perform_writes { - let _ = service.exec_query("CREATE SCHEMA foo").await.unwrap(); + let _ = service + .exec_query("CREATE SCHEMA foo") + .await? + .collect() + .await?; let _ = service .exec_query("CREATE TABLE foo.values (id int, value int96)") - .await - .unwrap(); + .await? + .collect() + .await?; service .exec_query("INSERT INTO foo.values (id, value) VALUES (1, 10000000000000000000000), (2, 20000000000000000000000), (3, 10000000000000220000000), (4, 12000000000000000000024), (5, 123)") - .await - .unwrap(); + .await?.collect().await?; } let result = service .exec_query("SELECT * from foo.values") - .await - .unwrap(); + .await? + .collect() + .await?; assert_eq!( result.get_rows()[0], @@ -2383,8 +2479,9 @@ mod tests { let result = service .exec_query("SELECT sum(value) from foo.values") - .await - .unwrap(); + .await? + .collect() + .await?; assert_eq!( result.get_rows()[0], @@ -2395,8 +2492,9 @@ mod tests { let result = service .exec_query("SELECT max(value), min(value) from foo.values") - .await - .unwrap(); + .await? + .collect() + .await?; assert_eq!( result.get_rows()[0], @@ -2408,8 +2506,7 @@ mod tests { let result = service .exec_query("SELECT value + 103, value + value, value = CAST('12000000000000000000024' AS DECIMAL(38, 0)) from foo.values where value = CAST('12000000000000000000024' AS DECIMAL(38, 0))") - .await - .unwrap(); + .await?.collect().await?; assert_eq!( result.get_rows()[0], @@ -2424,8 +2521,9 @@ mod tests { .exec_query( "SELECT value / 2, value * 2 from foo.values where value > 12000000000000000000024", ) - .await - .unwrap(); + .await? + .collect() + .await?; // This value 4 just describes DataFusion behavior with Decimal. const EXPECTED_SCALE: i8 = 4; @@ -2449,8 +2547,9 @@ mod tests { let result = service .exec_query("SELECT * from foo.values order by value") - .await - .unwrap(); + .await? + .collect() + .await?; assert_eq!( result.get_rows()[0], @@ -2491,19 +2590,20 @@ mod tests { if perform_writes { let _ = service .exec_query("CREATE TABLE foo.values2 (id int, value int96)") - .await - .unwrap(); + .await? + .collect() + .await?; service .exec_query("INSERT INTO foo.values2 (id, value) VALUES (1, 10000000000000000000000), (2, 20000000000000000000000), (3, 10000000000000000000000), (4, 20000000000000000000000), (5, 123)") - .await - .unwrap(); + .await?.collect().await?; } let result = service .exec_query("SELECT value, count(*) from foo.values2 group by value order by value") - .await - .unwrap(); + .await? + .collect() + .await?; assert_eq!( result.get_rows()[0], @@ -2530,19 +2630,20 @@ mod tests { if perform_writes { let _ = service .exec_query("CREATE TABLE foo.values3 (id int, value int96)") - .await - .unwrap(); + .await? + .collect() + .await?; service .exec_query("INSERT INTO foo.values3 (id, value) VALUES (1, -10000000000000000000000), (2, -20000000000000000000000), (3, -10000000000000220000000), (4, -12000000000000000000024), (5, -123)") - .await - .unwrap(); + .await?.collect().await?; } let result = service .exec_query("SELECT * from foo.values3") - .await - .unwrap(); + .await? + .collect() + .await?; assert_eq!( result.get_rows()[0], @@ -2579,10 +2680,11 @@ mod tests { TableValue::Decimal(Decimal::new(-123)) ]) ); + Ok(()) } #[tokio::test] - async fn int96() { + async fn int96() -> Result<(), CubeError> { Config::test("int96") .update_config(|mut c| { c.partition_split_threshold = 2; @@ -2590,10 +2692,11 @@ mod tests { }) .start_test(async move |services| int96_helper(services, true).await) .await; + Ok(()) } #[tokio::test] - async fn int96_read() { + async fn int96_read() -> Result<(), CubeError> { // Copy pre-DF store. let fixtures_path = env::current_dir() .unwrap() @@ -2608,29 +2711,38 @@ mod tests { }) .start_migration_test(async move |services| int96_helper(services, false).await) .await; + Ok(()) } - async fn decimal96_helper(services: CubeServices, perform_writes: bool) { + async fn decimal96_helper( + services: CubeServices, + perform_writes: bool, + ) -> Result<(), CubeError> { let service: Arc = services.sql_service; if perform_writes { - let _ = service.exec_query("CREATE SCHEMA foo").await.unwrap(); + let _ = service + .exec_query("CREATE SCHEMA foo") + .await? + .collect() + .await?; let _ = service .exec_query("CREATE TABLE foo.values (id int, value decimal96)") - .await - .unwrap(); + .await? + .collect() + .await?; service .exec_query("INSERT INTO foo.values (id, value) VALUES (1, 100000000000000000000.10), (2, 200000000000000000000), (3, 100000000000002200000.01), (4, 120000000000000000.10024), (5, 1.23)") - .await - .unwrap(); + .await?.collect().await?; } let result = service .exec_query("SELECT * from foo.values") - .await - .unwrap(); + .await? + .collect() + .await?; assert_eq!( result.get_schema().field(1).data_type(), @@ -2675,8 +2787,9 @@ mod tests { let result = service .exec_query("SELECT sum(value) from foo.values") - .await - .unwrap(); + .await? + .collect() + .await?; assert_eq!( result.get_rows()[0], @@ -2687,8 +2800,9 @@ mod tests { let result = service .exec_query("SELECT max(value), min(value) from foo.values") - .await - .unwrap(); + .await? + .collect() + .await?; assert_eq!( result.get_rows()[0], @@ -2700,8 +2814,7 @@ mod tests { let result = service .exec_query("SELECT value + CAST('10.103' AS DECIMAL(27, 5)), value + value from foo.values where id = 4") - .await - .unwrap(); + .await?.collect().await?; // 27, 5 comes from Cube's convert_columns_type. Precision = 28 here comes from DataFusion behavior. assert_eq!( @@ -2724,8 +2837,9 @@ mod tests { .exec_query( "SELECT value / 2, value * 2 from foo.values where value > 100000000000002200000", ) - .await - .unwrap(); + .await? + .collect() + .await?; // 31, 9, and 38, 5 simply describes the DF behavior we see (starting from value being a // decimal(27, 5)). Prior to DF upgrade, this returned a Float. @@ -2747,8 +2861,9 @@ mod tests { let result = service .exec_query("SELECT * from foo.values order by value") - .await - .unwrap(); + .await? + .collect() + .await?; assert_eq!( result.get_rows()[0], @@ -2789,19 +2904,20 @@ mod tests { if perform_writes { let _ = service .exec_query("CREATE TABLE foo.values2 (id int, value decimal(27, 2))") - .await - .unwrap(); + .await? + .collect() + .await?; service .exec_query("INSERT INTO foo.values2 (id, value) VALUES (1, 100000000000000000000.10), (2, 20000000000000000000000.1), (3, 100000000000000000000.10), (4, 20000000000000000000000.1), (5, 123)") - .await - .unwrap(); + .await?.collect().await?; } let result = service .exec_query("SELECT value, count(*) from foo.values2 group by value order by value") - .await - .unwrap(); + .await? + .collect() + .await?; assert_eq!( result.get_rows()[0], @@ -2828,19 +2944,20 @@ mod tests { if perform_writes { let _ = service .exec_query("CREATE TABLE foo.values3 (id int, value decimal96)") - .await - .unwrap(); + .await? + .collect() + .await?; service .exec_query("INSERT INTO foo.values3 (id, value) VALUES (1, -100000000000000000000.10), (2, -200000000000000000000), (3, -100000000000002200000.01), (4, -120000000000000000.10024), (5, -1.23)") - .await - .unwrap(); + .await?.collect().await?; } let result = service .exec_query("SELECT * from foo.values3") - .await - .unwrap(); + .await? + .collect() + .await?; assert_eq!( result.get_rows()[0], @@ -2877,10 +2994,11 @@ mod tests { TableValue::Decimal(Decimal::new(-123000)) ]) ); + Ok(()) } #[tokio::test] - async fn decimal96() { + async fn decimal96() -> Result<(), CubeError> { Config::test("decimal96") .update_config(|mut c| { c.partition_split_threshold = 2; @@ -2888,10 +3006,11 @@ mod tests { }) .start_test(async move |services| decimal96_helper(services, true).await) .await; + Ok(()) } #[tokio::test] - async fn decimal96_read() { + async fn decimal96_read() -> Result<(), CubeError> { // Copy pre-DF store. let fixtures_path = env::current_dir() .unwrap() @@ -2906,10 +3025,11 @@ mod tests { }) .start_migration_test(async move |services| decimal96_helper(services, false).await) .await; + Ok(()) } #[tokio::test] - async fn over_2k_booleans() { + async fn over_2k_booleans() -> Result<(), CubeError> { Config::test("over_2k_booleans").update_config(|mut c| { c.partition_split_threshold = 1000000; c.compaction_chunks_count_threshold = 0; @@ -2917,9 +3037,9 @@ mod tests { }).start_test(async move |services| { let service = services.sql_service; - let _ = service.exec_query("CREATE SCHEMA foo").await.unwrap(); + let _ = service.exec_query("CREATE SCHEMA foo").await?.collect().await?; - let _ = service.exec_query("CREATE TABLE foo.bool_group (bool_value boolean)").await.unwrap(); + let _ = service.exec_query("CREATE TABLE foo.bool_group (bool_value boolean)").await?.collect().await?; for batch in 0..25 { let mut bools = Vec::new(); @@ -2931,47 +3051,49 @@ mod tests { let values = bools.into_iter().map(|b| format!("({})", b)).join(", "); service.exec_query( &format!("INSERT INTO foo.bool_group (bool_value) VALUES {}", values) - ).await.unwrap(); + ).await?.collect().await?; } - let result = service.exec_query("SELECT count(*) from foo.bool_group").await.unwrap(); + let result = service.exec_query("SELECT count(*) from foo.bool_group").await?.collect().await?; assert_eq!(result.get_rows()[0], Row::new(vec![TableValue::Int(25000)])); - let result = service.exec_query("SELECT count(*) from foo.bool_group where bool_value = true").await.unwrap(); + let result = service.exec_query("SELECT count(*) from foo.bool_group where bool_value = true").await?.collect().await?; assert_eq!(result.get_rows()[0], Row::new(vec![TableValue::Int(3823)])); - let result = service.exec_query("SELECT g.bool_value, count(*) from foo.bool_group g GROUP BY 1 ORDER BY 2 DESC").await.unwrap(); + let result = service.exec_query("SELECT g.bool_value, count(*) from foo.bool_group g GROUP BY 1 ORDER BY 2 DESC").await?.collect().await?; assert_eq!(result.get_rows()[0], Row::new(vec![TableValue::Boolean(false), TableValue::Int(21177)])); assert_eq!(result.get_rows()[1], Row::new(vec![TableValue::Boolean(true), TableValue::Int(3823)])); + Ok::<(), CubeError>(()) }).await; + Ok(()) } #[tokio::test] - async fn flatten_union() { + async fn flatten_union() -> Result<(), CubeError> { Config::test("flatten_union").start_test(async move |services| { let service = services.sql_service; - let _ = service.exec_query("CREATE SCHEMA foo").await.unwrap(); + let _ = service.exec_query("CREATE SCHEMA foo").await?.collect().await?; - let _ = service.exec_query("CREATE TABLE foo.a (a int, b int, c int)").await.unwrap(); - let _ = service.exec_query("CREATE TABLE foo.b (a int, b int, c int)").await.unwrap(); + let _ = service.exec_query("CREATE TABLE foo.a (a int, b int, c int)").await?.collect().await?; + let _ = service.exec_query("CREATE TABLE foo.b (a int, b int, c int)").await?.collect().await?; - let _ = service.exec_query("CREATE TABLE foo.a1 (a int, b int, c int)").await.unwrap(); - let _ = service.exec_query("CREATE TABLE foo.b1 (a int, b int, c int)").await.unwrap(); + let _ = service.exec_query("CREATE TABLE foo.a1 (a int, b int, c int)").await?.collect().await?; + let _ = service.exec_query("CREATE TABLE foo.b1 (a int, b int, c int)").await?.collect().await?; service.exec_query( "INSERT INTO foo.a (a, b, c) VALUES (1, 1, 1)" - ).await.unwrap(); + ).await?.collect().await?; service.exec_query( "INSERT INTO foo.b (a, b, c) VALUES (2, 2, 1)" - ).await.unwrap(); + ).await?.collect().await?; service.exec_query( "INSERT INTO foo.a1 (a, b, c) VALUES (1, 1, 2)" - ).await.unwrap(); + ).await?.collect().await?; service.exec_query( "INSERT INTO foo.b1 (a, b, c) VALUES (2, 2, 2)" - ).await.unwrap(); + ).await?.collect().await?; let result = service.exec_query("EXPLAIN SELECT a `sel__a`, b `sel__b`, sum(c) `sel__c` from ( \ select * from ( \ @@ -2988,7 +3110,7 @@ mod tests { union all \ select * from foo.b \ ) \ - ) AS `lambda` where a = 1 group by 1, 2 order by 3 desc").await.unwrap(); + ) AS `lambda` where a = 1 group by 1, 2 order by 3 desc").await?.collect().await?; match &result.get_rows()[0].values()[0] { TableValue::String(s) => { assert_eq!(s, @@ -3027,7 +3149,7 @@ mod tests { ) \ union all select * from foo.b \ - ) AS `lambda` where a = 1 group by 1, 2 order by 3 desc").await.unwrap(); + ) AS `lambda` where a = 1 group by 1, 2 order by 3 desc").await?.collect().await?; match &result.get_rows()[0].values()[0] { TableValue::String(s) => { assert_eq!(s, @@ -3065,7 +3187,7 @@ mod tests { ) \ union all select * from foo.b \ - ) AS `lambda` where a = 1 group by 1, 2 order by 3 desc").await.unwrap(); + ) AS `lambda` where a = 1 group by 1, 2 order by 3 desc").await?.collect().await?; match &result.get_rows()[0].values()[0] { TableValue::String(s) => { assert_eq!(s, @@ -3105,7 +3227,7 @@ mod tests { ) \ union all select * from foo.b \ - ) AS `lambda` where a = 1 group by 1, 2 order by 3 desc").await.unwrap(); + ) AS `lambda` where a = 1 group by 1, 2 order by 3 desc").await?.collect().await?; match &result.get_rows()[0].values()[0] { TableValue::String(s) => { assert_eq!(s, @@ -3127,11 +3249,13 @@ mod tests { _ => assert!(false), }; + Ok::<(), CubeError>(()) }).await; + Ok(()) } #[tokio::test] - async fn over_10k_join() { + async fn over_10k_join() -> Result<(), CubeError> { Config::test("over_10k_join").update_config(|mut c| { c.partition_split_threshold = 1000000; c.compaction_chunks_count_threshold = 50; @@ -3140,15 +3264,15 @@ mod tests { }).start_test(async move |services| { let service = services.sql_service; - service.exec_query("CREATE SCHEMA foo").await.unwrap(); + service.exec_query("CREATE SCHEMA foo").await?.collect().await?; - service.exec_query("CREATE TABLE foo.orders (amount int, email text)").await.unwrap(); + service.exec_query("CREATE TABLE foo.orders (amount int, email text)").await?.collect().await?; - service.exec_query("CREATE INDEX orders_by_email ON foo.orders (email)").await.unwrap(); + service.exec_query("CREATE INDEX orders_by_email ON foo.orders (email)").await?.collect().await?; - service.exec_query("CREATE TABLE foo.customers (email text, system text, uuid text)").await.unwrap(); + service.exec_query("CREATE TABLE foo.customers (email text, system text, uuid text)").await?.collect().await?; - service.exec_query("CREATE INDEX customers_by_email ON foo.customers (email)").await.unwrap(); + service.exec_query("CREATE INDEX customers_by_email ON foo.customers (email)").await?.collect().await?; let mut join_results = Vec::new(); @@ -3186,18 +3310,18 @@ mod tests { service.exec_query( &format!("INSERT INTO foo.orders (amount, email) VALUES {}", values) - ).await.unwrap(); + ).await?.collect().await?; let values = customers.into_iter().map(|(email, uuid)| format!("('{}', 'system', '{}')", email, uuid)).join(", "); service.exec_query( &format!("INSERT INTO foo.customers (email, system, uuid) VALUES {}", values) - ).await.unwrap(); + ).await?.collect().await?; } join_results.sort_by(|a, b| cmp_row_key_heap(1, &a.values(), &b.values())); - let result = service.exec_query("SELECT o.email, c.uuid, sum(o.amount) from foo.orders o LEFT JOIN foo.customers c ON o.email = c.email GROUP BY 1, 2 ORDER BY 1 ASC").await.unwrap(); + let result = service.exec_query("SELECT o.email, c.uuid, sum(o.amount) from foo.orders o LEFT JOIN foo.customers c ON o.email = c.email GROUP BY 1, 2 ORDER BY 1 ASC").await?.collect().await?; assert_eq!(result.get_rows().len(), join_results.len()); for i in 0..result.get_rows().len() { @@ -3205,28 +3329,36 @@ mod tests { // println!("Expected {}: {:?}", i, &join_results[i]); assert_eq!(&result.get_rows()[i], &join_results[i]); } + Ok::<(), CubeError>(()) }).await; + Ok(()) } #[tokio::test] - async fn file_size_consistency() { + async fn file_size_consistency() -> Result<(), CubeError> { Config::test("file_size_consistency") .start_test(async move |services| { let service = services.sql_service; - let _ = service.exec_query("CREATE SCHEMA foo").await.unwrap(); + let _ = service + .exec_query("CREATE SCHEMA foo") + .await? + .collect() + .await?; let _ = service .exec_query("CREATE TABLE foo.ints (value int)") - .await - .unwrap(); + .await? + .collect() + .await?; service .exec_query("INSERT INTO foo.ints (value) VALUES (42)") - .await - .unwrap(); + .await? + .collect() + .await?; - let chunk = services.meta_store.get_chunk(1).await.unwrap(); + let chunk = services.meta_store.get_chunk(1).await?; let path = { let dir = env::temp_dir(); @@ -3243,8 +3375,7 @@ mod tests { path.to_str().unwrap().to_string(), chunk.get_row().get_full_name(chunk.get_id()), ) - .await - .unwrap(); + .await?; let result = service.exec_query("SELECT count(*) from foo.ints").await; println!("Result: {:?}", result); @@ -3252,22 +3383,24 @@ mod tests { let result = service.exec_query("SELECT count(*) from foo.ints").await; println!("Result: {:?}", result); + let err_message = result + .as_ref() + .err() + .map(|e| e.to_string()) + .unwrap_or_default(); assert!( - result - .clone() - .err() - .unwrap() - .to_string() - .contains("not found"), + err_message.contains("not found"), "Expected table not found error but got {:?}", result ); + Ok::<(), CubeError>(()) }) .await; + Ok(()) } #[tokio::test] - async fn high_frequency_inserts() { + async fn high_frequency_inserts() -> Result<(), CubeError> { Config::test("high_frequency_inserts") .update_config(|mut c| { c.partition_split_threshold = 100; @@ -3277,37 +3410,47 @@ mod tests { .start_test(async move |services| { let service = services.sql_service; - service.exec_query("CREATE SCHEMA foo").await.unwrap(); + service + .exec_query("CREATE SCHEMA foo") + .await? + .collect() + .await?; service .exec_query("CREATE TABLE foo.numbers (num int)") - .await - .unwrap(); + .await? + .collect() + .await?; for i in 0..300 { service .exec_query(&format!("INSERT INTO foo.numbers (num) VALUES ({})", i)) - .await - .unwrap(); + .await? + .collect() + .await?; } let result = service .exec_query("SELECT count(*) from foo.numbers") - .await - .unwrap(); + .await? + .collect() + .await?; assert_eq!(result.get_rows()[0], Row::new(vec![TableValue::Int(300)])); let result = service .exec_query("SELECT sum(num) from foo.numbers") - .await - .unwrap(); + .await? + .collect() + .await?; assert_eq!(result.get_rows()[0], Row::new(vec![TableValue::Int(44850)])); + Ok::<(), CubeError>(()) }) .await; + Ok(()) } #[tokio::test] - async fn decimal_partition_pruning() { + async fn decimal_partition_pruning() -> Result<(), CubeError> { Config::test("decimal_partition_pruning") .update_config(|mut c| { c.partition_split_threshold = 1; @@ -3317,32 +3460,28 @@ mod tests { .start_test(async move |services| { let service = services.sql_service; - service.exec_query("CREATE SCHEMA foo").await.unwrap(); + service.exec_query("CREATE SCHEMA foo").await?.collect().await?; service .exec_query("CREATE TABLE foo.numbers (num decimal)") - .await - .unwrap(); + .await?.collect().await?; for i in 0..100 { service .exec_query(&format!("INSERT INTO foo.numbers (num) VALUES ({})", i)) - .await - .unwrap(); + .await?.collect().await?; } Delay::new(Duration::from_millis(10000)).await; let result = service .exec_query("SELECT count(*) from foo.numbers") - .await - .unwrap(); + .await?.collect().await?; assert_eq!(result.get_rows()[0], Row::new(vec![TableValue::Int(100)])); let result = service .exec_query("SELECT sum(num) from foo.numbers where num = 50") - .await - .unwrap(); + .await?.collect().await?; assert_eq!( result.get_rows()[0], Row::new(vec![TableValue::Decimal(Decimal::new(5000000))]) @@ -3350,8 +3489,7 @@ mod tests { let partitions = service .exec_query("SELECT id, min_value, max_value FROM system.partitions") - .await - .unwrap(); + .await?.collect().await?; println!("All partitions: {:#?}", partitions); @@ -3362,8 +3500,7 @@ mod tests { let plans = service .plan_query("SELECT sum(num) from foo.numbers where num = 50") - .await - .unwrap(); + .await?; let worker_plan = pp_phys_plan(plans.worker.as_ref()); let parquet_regex = Regex::new(r"\d+-[a-z0-9]+\.parquet").unwrap(); @@ -3381,12 +3518,14 @@ mod tests { ); } } + Ok::<(), CubeError>(()) }) .await; + Ok(()) } #[tokio::test] - async fn check_memory_test() { + async fn check_memory_test() -> Result<(), CubeError> { Config::test("check_memory_test") .update_config(|mut c| { c.partition_split_threshold = 25; @@ -3396,19 +3535,25 @@ mod tests { .start_test(async move |services| { let service = services.sql_service; - service.exec_query("CREATE SCHEMA foo").await.unwrap(); + service + .exec_query("CREATE SCHEMA foo") + .await? + .collect() + .await?; service .exec_query("CREATE TABLE foo.numbers (num decimal)") - .await - .unwrap(); + .await? + .collect() + .await?; for _ in 0..2 { let t = (0..100).map(|i| format!("({i})")).join(", "); service .exec_query(&format!("INSERT INTO foo.numbers (num) VALUES {}", t)) - .await - .unwrap(); + .await? + .collect() + .await?; } let mut opts = PPOptions::default(); @@ -3416,8 +3561,7 @@ mod tests { let plans = service .plan_query("SELECT sum(num) from foo.numbers where num = 50") - .await - .unwrap(); + .await?; let plan_regexp = Regex::new(r"ParquetScan.*\.parquet").unwrap(); let expected = "LinearFinalAggregate\ @@ -3438,12 +3582,14 @@ mod tests { let p = plan_regexp.replace_all(&plan, "ParquetScan"); println!("pp {}", p); assert_eq!(p, expected); + Ok::<(), CubeError>(()) }) .await; + Ok(()) } #[tokio::test] - async fn delete_middle_main() { + async fn delete_middle_main() -> Result<(), CubeError> { Config::test("delete_middle_main") .update_config(|mut c| { c.partition_split_threshold = 10; @@ -3453,40 +3599,43 @@ mod tests { .start_test(async move |services| { let service = services.sql_service; - service.exec_query("CREATE SCHEMA foo").await.unwrap(); + service + .exec_query("CREATE SCHEMA foo") + .await? + .collect() + .await?; service .exec_query("CREATE TABLE foo.numbers (num int)") - .await - .unwrap(); + .await? + .collect() + .await?; for i in 0..100 { service .exec_query(&format!("INSERT INTO foo.numbers (num) VALUES ({})", i)) - .await - .unwrap(); + .await? + .collect() + .await?; let partitions = services .meta_store .get_partitions_with_chunks_created_seconds_ago(0) - .await - .unwrap(); + .await?; for p in partitions.into_iter() { services .injector .get_service_typed::() .await .schedule_partition_to_compact(&p) - .await - .unwrap() + .await? } } let to_repartition = services .meta_store .all_inactive_partitions_to_repartition() - .await - .unwrap(); + .await?; for p in to_repartition.into_iter() { services @@ -3494,11 +3643,10 @@ mod tests { .get_service_typed::() .await .schedule_repartition_if_needed(&p) - .await - .unwrap(); + .await?; } - let chunks = services.meta_store.chunks_table().all_rows().await.unwrap(); + let chunks = services.meta_store.chunks_table().all_rows().await?; println!("All chunks: {:?}", chunks); @@ -3509,8 +3657,7 @@ mod tests { let all_inactive_partitions = services .meta_store .all_inactive_middle_man_partitions() - .await - .unwrap(); + .await?; println!("Middle man partitions: {:?}", all_inactive_partitions); let mut futures = Vec::new(); for p in all_inactive_partitions.into_iter() { @@ -3524,27 +3671,25 @@ mod tests { println!( "All partitions: {:?}", - services - .meta_store - .partition_table() - .all_rows() - .await - .unwrap() + services.meta_store.partition_table().all_rows().await? ); let result = service .exec_query("SELECT count(*) from foo.numbers") - .await - .unwrap(); + .await? + .collect() + .await?; assert_eq!(result.get_rows()[0], Row::new(vec![TableValue::Int(100)])); + Ok::<(), CubeError>(()) }) .await; + Ok(()) } #[tokio::test] - async fn high_frequency_inserts_s3() { + async fn high_frequency_inserts_s3() -> Result<(), CubeError> { if env::var("CUBESTORE_AWS_ACCESS_KEY_ID").is_err() { - return; + return Ok(()); } Config::test("high_frequency_inserts_s3") .update_config(|mut c| { @@ -3573,12 +3718,17 @@ mod tests { c }) .start_test_worker(async move |_| { - service.exec_query("CREATE SCHEMA foo").await.unwrap(); + service + .exec_query("CREATE SCHEMA foo") + .await? + .collect() + .await?; service .exec_query("CREATE TABLE foo.numbers (num int)") - .await - .unwrap(); + .await? + .collect() + .await?; for _ in 0..3 { let mut values = Vec::new(); @@ -3592,8 +3742,9 @@ mod tests { "INSERT INTO foo.numbers (num) VALUES {}", values )) - .await - .unwrap(); + .await? + .collect() + .await?; } let (first_query, second_query) = futures::future::join( @@ -3602,29 +3753,32 @@ mod tests { ) .await; - let result = first_query.unwrap(); + let result = first_query.unwrap().collect().await?; assert_eq!( result.get_rows()[0], Row::new(vec![TableValue::Int(300000)]) ); - let result = second_query.unwrap(); + let result = second_query.unwrap().collect().await?; assert_eq!( result.get_rows()[0], Row::new(vec![TableValue::Int(300000 / 2 * 99999)]) ); + Ok::<(), CubeError>(()) }) .await; + Ok::<(), CubeError>(()) }) .await; + Ok(()) } #[tokio::test] - async fn high_frequency_inserts_gcs() { + async fn high_frequency_inserts_gcs() -> Result<(), CubeError> { if env::var("SERVICE_ACCOUNT_JSON").is_err() && env::var("CUBESTORE_GCP_SERVICE_ACCOUNT_JSON").is_err() { - return; + return Ok(()); } Config::test("high_frequency_inserts_gcs") .update_config(|mut c| { @@ -3653,12 +3807,17 @@ mod tests { c }) .start_test_worker(async move |_| { - service.exec_query("CREATE SCHEMA foo").await.unwrap(); + service + .exec_query("CREATE SCHEMA foo") + .await? + .collect() + .await?; service .exec_query("CREATE TABLE foo.numbers (num int)") - .await - .unwrap(); + .await? + .collect() + .await?; for _ in 0..3 { let mut values = Vec::new(); @@ -3672,8 +3831,9 @@ mod tests { "INSERT INTO foo.numbers (num) VALUES {}", values )) - .await - .unwrap(); + .await? + .collect() + .await?; } let (first_query, second_query) = futures::future::join( @@ -3682,25 +3842,28 @@ mod tests { ) .await; - let result = first_query.unwrap(); + let result = first_query.unwrap().collect().await?; assert_eq!( result.get_rows()[0], Row::new(vec![TableValue::Int(300000)]) ); - let result = second_query.unwrap(); + let result = second_query.unwrap().collect().await?; assert_eq!( result.get_rows()[0], Row::new(vec![TableValue::Int(300000 / 2 * 99999)]) ); + Ok::<(), CubeError>(()) }) .await; + Ok::<(), CubeError>(()) }) .await; + Ok(()) } #[tokio::test] - async fn inactive_partitions_cleanup() { + async fn inactive_partitions_cleanup() -> Result<(), CubeError> { Config::test("inactive_partitions_cleanup") .update_config(|mut c| { c.partition_split_threshold = 1000000; @@ -3714,18 +3877,24 @@ mod tests { .start_test(async move |services| { let service = services.sql_service; - service.exec_query("CREATE SCHEMA foo").await.unwrap(); + service + .exec_query("CREATE SCHEMA foo") + .await? + .collect() + .await?; service .exec_query("CREATE TABLE foo.numbers (num int)") - .await - .unwrap(); + .await? + .collect() + .await?; for i in 0..10_u64 { service .exec_query(&format!("INSERT INTO foo.numbers (num) VALUES ({})", i)) - .await - .unwrap(); + .await? + .collect() + .await?; } // let listener = services.cluster.job_result_listener(); @@ -3748,15 +3917,15 @@ mod tests { let result = service .exec_query("SELECT count(*) from foo.numbers") - .await - .unwrap(); + .await? + .collect() + .await?; assert_eq!(result.get_rows()[0], Row::new(vec![TableValue::Int(10)])); let active_partitions = services .meta_store .get_active_partitions_by_index_id(1) - .await - .unwrap(); + .await?; let last_active_partition = active_partitions.iter().next().unwrap(); // Wait for GC tasks to drop files @@ -3765,8 +3934,7 @@ mod tests { let remote_fs = services.injector.get_service_typed::().await; let files = remote_fs .list("".to_string()) - .await - .unwrap() + .await? .into_iter() .filter(|r| r.ends_with(".parquet")) .collect::>(); @@ -3777,13 +3945,15 @@ mod tests { last_active_partition.get_id(), last_active_partition.get_row().suffix().as_ref().unwrap() )] - ) + ); + Ok::<(), CubeError>(()) }) - .await + .await; + Ok(()) } #[tokio::test] - async fn in_memory_compaction() { + async fn in_memory_compaction() -> Result<(), CubeError> { Config::test("inmemory_compaction") .update_config(|mut c| { c.partition_split_threshold = 1000000; @@ -3800,12 +3970,17 @@ mod tests { .get_service_typed::() .await; - service.exec_query("CREATE SCHEMA foo").await.unwrap(); + service + .exec_query("CREATE SCHEMA foo") + .await? + .collect() + .await?; service .exec_query("CREATE TABLE foo.numbers (a int, num int) UNIQUE KEY (a)") - .await - .unwrap(); + .await? + .collect() + .await?; for i in 0..6 { service @@ -3813,28 +3988,24 @@ mod tests { "INSERT INTO foo.numbers (a, num, __seq) VALUES ({}, {}, {})", i, i, i )) - .await - .unwrap(); + .await? + .collect() + .await?; } - compaction_service - .compact_in_memory_chunks(1) - .await - .unwrap(); + compaction_service.compact_in_memory_chunks(1).await?; let active_partitions = services .meta_store .get_active_partitions_by_index_id(1) - .await - .unwrap(); + .await?; assert_eq!(active_partitions.len(), 1); let partition = active_partitions.first().unwrap(); assert_eq!(partition.get_row().main_table_row_count(), 0); let chunks = services .meta_store .get_chunks_by_partition(partition.get_id(), false) - .await - .unwrap(); + .await?; assert_eq!(chunks.len(), 1); assert_eq!(chunks.first().unwrap().get_row().get_row_count(), 6); assert_eq!(chunks.first().unwrap().get_row().in_memory(), true); @@ -3847,28 +4018,27 @@ mod tests { i + 1, i + 1 )) - .await - .unwrap(); + .await? + .collect() + .await?; } - compaction_service - .compact_in_memory_chunks(1) - .await - .unwrap(); + compaction_service.compact_in_memory_chunks(1).await?; Delay::new(Duration::from_millis(2000)).await; let active_partitions = services .meta_store .get_active_partitions_by_index_id(1) - .await - .unwrap(); + .await?; assert_eq!(active_partitions.len(), 1); let partition = active_partitions.first().unwrap(); assert_eq!(partition.get_row().main_table_row_count(), 6); + Ok::<(), CubeError>(()) }) - .await + .await; + Ok(()) } #[tokio::test] - async fn cluster() { + async fn cluster() -> Result<(), CubeError> { Config::test("cluster_router").update_config(|mut config| { config.select_workers = vec!["127.0.0.1:14306".to_string(), "127.0.0.1:14307".to_string()]; config.metastore_bind_address = Some("127.0.0.1:15306".to_string()); @@ -3901,38 +4071,38 @@ mod tests { config.compaction_chunks_count_threshold = 0; config }).start_test_worker(async move |_| { - service.exec_query("CREATE SCHEMA foo").await.unwrap(); + service.exec_query("CREATE SCHEMA foo").await?.collect().await?; - service.exec_query("CREATE TABLE foo.orders_1 (orders_customer_id text, orders_product_id int, amount int)").await.unwrap(); - service.exec_query("CREATE TABLE foo.orders_2 (orders_customer_id text, orders_product_id int, amount int)").await.unwrap(); - service.exec_query("CREATE INDEX orders_by_product_1 ON foo.orders_1 (orders_product_id)").await.unwrap(); - service.exec_query("CREATE INDEX orders_by_product_2 ON foo.orders_2 (orders_product_id)").await.unwrap(); - service.exec_query("CREATE TABLE foo.customers (customer_id text, city text, state text)").await.unwrap(); - service.exec_query("CREATE TABLE foo.products (product_id int, name text)").await.unwrap(); + service.exec_query("CREATE TABLE foo.orders_1 (orders_customer_id text, orders_product_id int, amount int)").await?.collect().await?; + service.exec_query("CREATE TABLE foo.orders_2 (orders_customer_id text, orders_product_id int, amount int)").await?.collect().await?; + service.exec_query("CREATE INDEX orders_by_product_1 ON foo.orders_1 (orders_product_id)").await?.collect().await?; + service.exec_query("CREATE INDEX orders_by_product_2 ON foo.orders_2 (orders_product_id)").await?.collect().await?; + service.exec_query("CREATE TABLE foo.customers (customer_id text, city text, state text)").await?.collect().await?; + service.exec_query("CREATE TABLE foo.products (product_id int, name text)").await?.collect().await?; service.exec_query( "INSERT INTO foo.orders_1 (orders_customer_id, orders_product_id, amount) VALUES ('a', 1, 10), ('b', 2, 2), ('b', 2, 3)" - ).await.unwrap(); + ).await?.collect().await?; service.exec_query( "INSERT INTO foo.orders_1 (orders_customer_id, orders_product_id, amount) VALUES ('b', 1, 10), ('c', 2, 2), ('c', 2, 3)" - ).await.unwrap(); + ).await?.collect().await?; service.exec_query( "INSERT INTO foo.orders_2 (orders_customer_id, orders_product_id, amount) VALUES ('c', 1, 10), ('d', 2, 2), ('d', 2, 3)" - ).await.unwrap(); + ).await?.collect().await?; service.exec_query( "INSERT INTO foo.customers (customer_id, city, state) VALUES ('a', 'San Francisco', 'CA'), ('b', 'New York', 'NY')" - ).await.unwrap(); + ).await?.collect().await?; service.exec_query( "INSERT INTO foo.customers (customer_id, city, state) VALUES ('c', 'San Francisco', 'CA'), ('d', 'New York', 'NY')" - ).await.unwrap(); + ).await?.collect().await?; service.exec_query( "INSERT INTO foo.products (product_id, name) VALUES (1, 'Potato'), (2, 'Tomato')" - ).await.unwrap(); + ).await?.collect().await?; let result = service.exec_query( "SELECT city, name, sum(amount) FROM (SELECT * FROM foo.orders_1 UNION ALL SELECT * FROM foo.orders_2) o \ @@ -3940,7 +4110,7 @@ mod tests { LEFT JOIN foo.products p ON orders_product_id = product_id \ WHERE customer_id = 'a' \ GROUP BY 1, 2 ORDER BY 3 DESC, 1 ASC, 2 ASC" - ).await.unwrap(); + ).await?.collect().await?; let expected = vec![ Row::new(vec![TableValue::String("San Francisco".to_string()), TableValue::String("Potato".to_string()), TableValue::Int(10)]), @@ -3950,13 +4120,17 @@ mod tests { result.get_rows(), &expected ); + Ok::<(), CubeError>(()) }).await; + Ok::<(), CubeError>(()) }).await; + Ok::<(), CubeError>(()) }).await; + Ok(()) } #[tokio::test] - async fn table_partition_split_threshold() { + async fn table_partition_split_threshold() -> Result<(), CubeError> { let test_name = "table_partition_split_threshold"; let port_base = 24406; Config::test(test_name).update_config(|mut config| { @@ -3998,17 +4172,15 @@ mod tests { service .exec_query("CREATE SCHEMA IF NOT EXISTS foo") - .await - .unwrap(); + .await?.collect().await?; let create_table_sql = format!("CREATE TABLE foo.bikes (`Response ID` int, `Start Date` text, `End Date` text) LOCATION '{}'", url); - service.exec_query(&create_table_sql).await.unwrap(); + service.exec_query(&create_table_sql).await?.collect().await?; let result = service .exec_query("SELECT count(*) from foo.bikes") - .await - .unwrap(); + .await?.collect().await?; assert_eq!( result.get_rows(), @@ -4017,22 +4189,25 @@ mod tests { let result = service .exec_query("SELECT partition_split_threshold from system.tables") - .await - .unwrap(); + .await?.collect().await?; assert_eq!( result.get_rows(), &vec![Row::new(vec![TableValue::Int(200)])] ); + Ok::<(), CubeError>(()) }).await; + Ok::<(), CubeError>(()) }).await; + Ok::<(), CubeError>(()) }).await; + Ok(()) } #[tokio::test] - async fn create_table_with_location_cluster() { + async fn create_table_with_location_cluster() -> Result<(), CubeError> { if env::var("CUBESTORE_AWS_ACCESS_KEY_ID").is_err() { - return; + return Ok(()); } Config::test("create_table_with_location_cluster") .update_config(|mut c| { @@ -4075,38 +4250,41 @@ mod tests { file.write_all("2,\"New York\",\"[\"\"\"\"]\",2021-01-24 19:12:23.123 UTC\n".as_bytes()).unwrap(); file.write_all("3,New York,\"de Comunicación\",2021-01-25 19:12:23 UTC\n".as_bytes()).unwrap(); - let mut file = GzipEncoder::new(BufWriter::new(tokio::fs::File::create(path_2.clone()).await.unwrap())); + let mut file = GzipEncoder::new(BufWriter::new(tokio::fs::File::create(path_2.clone()).await?)); - file.write_all("id,city,arr,t\n".as_bytes()).await.unwrap(); - file.write_all("1,San Francisco,\"[\"\"Foo\"\",\"\"Bar\"\",\"\"FooBar\"\"]\",\"2021-01-24 12:12:23 UTC\"\n".as_bytes()).await.unwrap(); - file.write_all("2,\"New York\",\"[\"\"\"\"]\",2021-01-24 19:12:23 UTC\n".as_bytes()).await.unwrap(); - file.write_all("3,New York,,2021-01-25 19:12:23 UTC\n".as_bytes()).await.unwrap(); - file.write_all("4,New York,\"\",2021-01-25 19:12:23 UTC\n".as_bytes()).await.unwrap(); - file.write_all("5,New York,\"\",2021-01-25 19:12:23 UTC\n".as_bytes()).await.unwrap(); + file.write_all("id,city,arr,t\n".as_bytes()).await?; + file.write_all("1,San Francisco,\"[\"\"Foo\"\",\"\"Bar\"\",\"\"FooBar\"\"]\",\"2021-01-24 12:12:23 UTC\"\n".as_bytes()).await?; + file.write_all("2,\"New York\",\"[\"\"\"\"]\",2021-01-24 19:12:23 UTC\n".as_bytes()).await?; + file.write_all("3,New York,,2021-01-25 19:12:23 UTC\n".as_bytes()).await?; + file.write_all("4,New York,\"\",2021-01-25 19:12:23 UTC\n".as_bytes()).await?; + file.write_all("5,New York,\"\",2021-01-25 19:12:23 UTC\n".as_bytes()).await?; - file.shutdown().await.unwrap(); + file.shutdown().await?; vec![path_1, path_2] }; - let _ = service.exec_query("CREATE SCHEMA IF NOT EXISTS Foo").await.unwrap(); + let _ = service.exec_query("CREATE SCHEMA IF NOT EXISTS Foo").await?.collect().await?; let _ = service.exec_query( &format!( "CREATE TABLE Foo.Persons (id int, city text, t timestamp, arr text) INDEX persons_city (`city`, `id`) LOCATION {}", paths.into_iter().map(|p| format!("'{}'", p.to_string_lossy())).join(",") ) - ).await.unwrap(); + ).await?.collect().await?; - let result = service.exec_query("SELECT count(*) as cnt from Foo.Persons").await.unwrap(); + let result = service.exec_query("SELECT count(*) as cnt from Foo.Persons").await?.collect().await?; assert_eq!(result.get_rows(), &vec![Row::new(vec![TableValue::Int(8)])]); + Ok::<(), CubeError>(()) }) .await; + Ok::<(), CubeError>(()) }) .await; + Ok(()) } #[tokio::test] - async fn disk_space_limit() { + async fn disk_space_limit() -> Result<(), CubeError> { Config::test("disk_space_limit") .update_config(|mut c| { c.partition_split_threshold = 1000000; @@ -4147,26 +4325,26 @@ mod tests { } - let mut file = GzipEncoder::new(BufWriter::new(tokio::fs::File::create(path_2.clone()).await.unwrap())); + let mut file = GzipEncoder::new(BufWriter::new(tokio::fs::File::create(path_2.clone()).await?)); - file.write_all("id,city,arr,t\n".as_bytes()).await.unwrap(); + file.write_all("id,city,arr,t\n".as_bytes()).await?; for i in 0..50 { - file.write_all(format!("{},San Francisco,\"[\"\"Foo\"\",\"\"Bar\"\",\"\"FooBar\"\"]\",\"2021-01-24 12:12:23 UTC\"\n", i).as_bytes()).await.unwrap(); + file.write_all(format!("{},San Francisco,\"[\"\"Foo\"\",\"\"Bar\"\",\"\"FooBar\"\"]\",\"2021-01-24 12:12:23 UTC\"\n", i).as_bytes()).await?; } - file.shutdown().await.unwrap(); + file.shutdown().await?; vec![path_1, path_2] }; - let _ = service.exec_query("CREATE SCHEMA IF NOT EXISTS Foo").await.unwrap(); + let _ = service.exec_query("CREATE SCHEMA IF NOT EXISTS Foo").await?.collect().await?; let _ = service.exec_query( &format!( "CREATE TABLE Foo.Persons (id int, city text, t timestamp, arr text) INDEX persons_city (`city`, `id`) LOCATION {}", paths.iter().map(|p| format!("'{}'", p.to_string_lossy())).join(",") ) - ).await.unwrap(); + ).await?.collect().await?; let res = service.exec_query( &format!( @@ -4180,14 +4358,17 @@ mod tests { assert!(false); } + Ok::<(), CubeError>(()) }) .await; + Ok::<(), CubeError>(()) }) .await; + Ok(()) } #[tokio::test] - async fn disk_space_limit_per_worker() { + async fn disk_space_limit_per_worker() -> Result<(), CubeError> { Config::test("disk_space_limit_per_worker") .update_config(|mut c| { c.partition_split_threshold = 1000000; @@ -4228,26 +4409,26 @@ mod tests { } - let mut file = GzipEncoder::new(BufWriter::new(tokio::fs::File::create(path_2.clone()).await.unwrap())); + let mut file = GzipEncoder::new(BufWriter::new(tokio::fs::File::create(path_2.clone()).await?)); - file.write_all("id,city,arr,t\n".as_bytes()).await.unwrap(); + file.write_all("id,city,arr,t\n".as_bytes()).await?; for i in 0..50 { - file.write_all(format!("{},San Francisco,\"[\"\"Foo\"\",\"\"Bar\"\",\"\"FooBar\"\"]\",\"2021-01-24 12:12:23 UTC\"\n", i).as_bytes()).await.unwrap(); + file.write_all(format!("{},San Francisco,\"[\"\"Foo\"\",\"\"Bar\"\",\"\"FooBar\"\"]\",\"2021-01-24 12:12:23 UTC\"\n", i).as_bytes()).await?; } - file.shutdown().await.unwrap(); + file.shutdown().await?; vec![path_1, path_2] }; - let _ = service.exec_query("CREATE SCHEMA IF NOT EXISTS Foo").await.unwrap(); + let _ = service.exec_query("CREATE SCHEMA IF NOT EXISTS Foo").await?.collect().await?; let _ = service.exec_query( &format!( "CREATE TABLE Foo.Persons (id int, city text, t timestamp, arr text) INDEX persons_city (`city`, `id`) LOCATION {}", paths.iter().map(|p| format!("'{}'", p.to_string_lossy())).join(",") ) - ).await.unwrap(); + ).await?.collect().await?; let res = service.exec_query( &format!( @@ -4261,14 +4442,17 @@ mod tests { assert!(false); } + Ok::<(), CubeError>(()) }) .await; + Ok::<(), CubeError>(()) }) .await; + Ok(()) } #[tokio::test] - async fn csv_import_with_decimals_and_empty_strings() { + async fn csv_import_with_decimals_and_empty_strings() -> Result<(), CubeError> { Config::test("csv_import_with_decimals_and_empty_strings") .update_config(|mut c| { c.partition_split_threshold = 1000000; @@ -4282,7 +4466,7 @@ mod tests { let path = dir.join("csv_import_decimals_test.csv.gz"); let mut file = GzipEncoder::new(BufWriter::new( - tokio::fs::File::create(path.clone()).await.unwrap(), + tokio::fs::File::create(path.clone()).await?, )); let csv_data = r#"id,product_name,order_date,discount,profit,quantity,total_amount @@ -4331,13 +4515,12 @@ mod tests { 8697,HTC One,2020-06-01T00:00:00.000Z,0.200000000,26.99730,3,239.97600 2661,Google Nexus 5,2020-11-01T00:00:00.000Z,0E-9,494.97250,11,1979.89000 "#; - file.write_all(csv_data.as_bytes()).await.unwrap(); - file.shutdown().await.unwrap(); + file.write_all(csv_data.as_bytes()).await?; + file.shutdown().await?; let _ = service .exec_query("CREATE SCHEMA IF NOT EXISTS Test") - .await - .unwrap(); + .await?.collect().await?; let _ = service .exec_query(&format!( "CREATE TABLE Test.Orders (\ @@ -4351,13 +4534,11 @@ mod tests { ) LOCATION '{}'", path.to_string_lossy() )) - .await - .unwrap(); + .await?.collect().await?; let result = service .exec_query("SELECT id, product_name, order_date, discount, profit, quantity, total_amount FROM Test.Orders ORDER BY id") - .await - .unwrap(); + .await?.collect().await?; assert_eq!(result.get_rows().len(), 44); @@ -4445,12 +4626,14 @@ mod tests { } } assert_eq!(mismatch_count, 0); + Ok::<(), CubeError>(()) }) .await; + Ok(()) } #[tokio::test] - async fn csv_import_with_decimal_and_empty_strings() { + async fn csv_import_with_decimal_and_empty_strings() -> Result<(), CubeError> { Config::test("csv_import_with_decimal_and_empty_strings") .update_config(|mut c| { c.partition_split_threshold = 1000000; @@ -4464,7 +4647,7 @@ mod tests { let path = dir.join("csv_import_decimal_test.csv.gz"); let mut file = GzipEncoder::new(BufWriter::new( - tokio::fs::File::create(path.clone()).await.unwrap(), + tokio::fs::File::create(path.clone()).await?, )); let csv_data = r#"id,product_name,order_date,discount,profit,quantity,total_amount @@ -4513,13 +4696,12 @@ mod tests { 8697,HTC One,2020-06-01T00:00:00.000Z,0.200000000,26.99730,3,239.97600 2661,Google Nexus 5,2020-11-01T00:00:00.000Z,0E-9,494.97250,11,1979.89000 "#; - file.write_all(csv_data.as_bytes()).await.unwrap(); - file.shutdown().await.unwrap(); + file.write_all(csv_data.as_bytes()).await?; + file.shutdown().await?; let _ = service .exec_query("CREATE SCHEMA IF NOT EXISTS Test") - .await - .unwrap(); + .await?.collect().await?; let _ = service .exec_query(&format!( "CREATE TABLE Test.Orders (\ @@ -4533,13 +4715,11 @@ mod tests { ) LOCATION '{}'", path.to_string_lossy() )) - .await - .unwrap(); + .await?.collect().await?; let result = service .exec_query("SELECT id, product_name, order_date, discount, profit, quantity, total_amount FROM Test.Orders ORDER BY id") - .await - .unwrap(); + .await?.collect().await?; assert_eq!(result.get_rows().len(), 44); @@ -4627,12 +4807,14 @@ mod tests { } } assert_eq!(mismatch_count, 0); + Ok::<(), CubeError>(()) }) .await; + Ok(()) } #[tokio::test] - async fn compaction() { + async fn compaction() -> Result<(), CubeError> { Config::test("compaction").update_config(|mut config| { config.partition_split_threshold = 5; config.compaction_chunks_count_threshold = 0; @@ -4641,22 +4823,22 @@ mod tests { }).start_test(async move |services| { let service = services.sql_service; - service.exec_query("CREATE SCHEMA foo").await.unwrap(); + service.exec_query("CREATE SCHEMA foo").await?.collect().await?; - service.exec_query("CREATE TABLE foo.table (t int)").await.unwrap(); + service.exec_query("CREATE TABLE foo.table (t int)").await?.collect().await?; let listener = services.cluster.job_result_listener(); service.exec_query( "INSERT INTO foo.table (t) VALUES (NULL), (1), (3), (5), (10), (20), (25), (25), (25), (25), (25), (NULL), (NULL), (NULL), (2), (4), (5), (27), (28), (29)" - ).await.unwrap(); + ).await?.collect().await?; let wait = listener.wait_for_job_results(vec![ (RowKey::Table(TableId::Partitions, 1), JobType::PartitionCompaction), ]); - timeout(Duration::from_secs(10), wait).await.unwrap().unwrap(); + timeout(Duration::from_secs(10), wait).await?.unwrap(); - let partitions = services.meta_store.get_active_partitions_by_index_id(1).await.unwrap(); + let partitions = services.meta_store.get_active_partitions_by_index_id(1).await?; assert_eq!(partitions.len(), 4); let p_1 = partitions.iter().find(|r| r.get_id() == 2).unwrap(); @@ -4689,10 +4871,12 @@ mod tests { expected.sort_by(|(min_a, _, _, _), (min_b, _, _, _)| cmp_min_rows(1, min_a.as_ref(), min_b.as_ref())); assert_eq!(intervals_set, expected); - let result = service.exec_query("SELECT count(*) from foo.table").await.unwrap(); + let result = service.exec_query("SELECT count(*) from foo.table").await?.collect().await?; assert_eq!(result.get_rows()[0], Row::new(vec![TableValue::Int(20)])); + Ok::<(), CubeError>(()) }).await; + Ok(()) } #[test] @@ -4711,36 +4895,37 @@ mod tests { let path_2 = dir.clone().join("foo-3.csv.gz"); - let mut file = GzipEncoder::new(BufWriter::new(tokio::fs::File::create(path_2.clone()).await.unwrap())); + let mut file = GzipEncoder::new(BufWriter::new(tokio::fs::File::create(path_2.clone()).await?)); - file.write_all("id,city,arr,t\n".as_bytes()).await.unwrap(); - file.write_all("1,San Francisco,\"[\"\"Foo\"\",\"\"Bar\"\",\"\"FooBar\"\"]\",\"2021-01-24 12:12:23 UTC\"\n".as_bytes()).await.unwrap(); - file.write_all("2,\"New York\",\"[\"\"\"\"]\",2021-01-24 19:12:23 UTC\n".as_bytes()).await.unwrap(); - file.write_all("3,New York,,2021-01-25 19:12:23 UTC\n".as_bytes()).await.unwrap(); - file.write_all("4,New York,\"\",2021-01-25 19:12:23 UTC\n".as_bytes()).await.unwrap(); - file.write_all("5,New York,\"\",2021-01-25 19:12:23 UTC\n".as_bytes()).await.unwrap(); + file.write_all("id,city,arr,t\n".as_bytes()).await?; + file.write_all("1,San Francisco,\"[\"\"Foo\"\",\"\"Bar\"\",\"\"FooBar\"\"]\",\"2021-01-24 12:12:23 UTC\"\n".as_bytes()).await?; + file.write_all("2,\"New York\",\"[\"\"\"\"]\",2021-01-24 19:12:23 UTC\n".as_bytes()).await?; + file.write_all("3,New York,,2021-01-25 19:12:23 UTC\n".as_bytes()).await?; + file.write_all("4,New York,\"\",2021-01-25 19:12:23 UTC\n".as_bytes()).await?; + file.write_all("5,New York,\"\",2021-01-25 19:12:23 UTC\n".as_bytes()).await?; - file.shutdown().await.unwrap(); + file.shutdown().await?; let remote_fs = services.injector.get_service_typed::().await; - remote_fs.upload_file(path_2.to_str().unwrap().to_string(), "temp-uploads/foo-3.csv.gz".to_string()).await.unwrap(); + remote_fs.upload_file(path_2.to_str().unwrap().to_string(), "temp-uploads/foo-3.csv.gz".to_string()).await?; vec!["temp://foo-3.csv.gz".to_string()] }; - let _ = service.exec_query("CREATE SCHEMA IF NOT EXISTS Foo").await.unwrap(); + let _ = service.exec_query("CREATE SCHEMA IF NOT EXISTS Foo").await?.collect().await?; let _ = service.exec_query( &format!( "CREATE TABLE Foo.Persons (id int, city text, t timestamp, arr text) INDEX persons_city (`city`, `id`) LOCATION {}", paths.into_iter().map(|p| format!("'{}'", p)).join(",") ) - ).await.unwrap(); + ).await?.collect().await?; - let result = service.exec_query("SELECT count(*) as cnt from Foo.Persons").await.unwrap(); + let result = service.exec_query("SELECT count(*) as cnt from Foo.Persons").await?.collect().await?; assert_eq!(result.get_rows(), &vec![Row::new(vec![TableValue::Int(5)])]); - let result = service.exec_query("SELECT count(*) as cnt from Foo.Persons WHERE arr = '[\"Foo\",\"Bar\",\"FooBar\"]' or arr = '[\"\"]' or arr is null").await.unwrap(); + let result = service.exec_query("SELECT count(*) as cnt from Foo.Persons WHERE arr = '[\"Foo\",\"Bar\",\"FooBar\"]' or arr = '[\"\"]' or arr is null").await?.collect().await?; assert_eq!(result.get_rows(), &vec![Row::new(vec![TableValue::Int(5)])]); + Ok::<(), CubeError>(()) }).await; } @@ -4748,14 +4933,14 @@ mod tests { } #[tokio::test] - async fn explain_meta_logical_plan() { + async fn explain_meta_logical_plan() -> Result<(), CubeError> { Config::run_test("explain_meta_logical_plan", async move |services| { let service = services.sql_service; - service.exec_query("CREATE SCHEMA foo").await.unwrap(); + service.exec_query("CREATE SCHEMA foo").await?.collect().await?; let result = service.exec_query( "EXPLAIN SELECT table_name FROM information_schema.tables WHERE table_schema = 'foo'" - ).await.unwrap(); + ).await?.collect().await?; assert_eq!(result.len(), 1); assert_eq!(result.get_columns().len(), 1); @@ -4771,24 +4956,26 @@ mod tests { \n Filter\ \n Scan information_schema.tables, source: InfoSchemaTableProvider(table: Tables), fields: [table_schema, table_name]" ); + Ok::<(), CubeError>(()) }).await; + Ok(()) } #[tokio::test] - async fn explain_logical_plan() { + async fn explain_logical_plan() -> Result<(), CubeError> { Config::run_test("explain_logical_plan", async move |services| { let service = services.sql_service; - service.exec_query("CREATE SCHEMA foo").await.unwrap(); + service.exec_query("CREATE SCHEMA foo").await?.collect().await?; - service.exec_query("CREATE TABLE foo.orders (id int, platform text, age int, amount int)").await.unwrap(); + service.exec_query("CREATE TABLE foo.orders (id int, platform text, age int, amount int)").await?.collect().await?; service.exec_query( "INSERT INTO foo.orders (id, platform, age, amount) VALUES (1, 'android', 18, 4), (2, 'andorid', 17, 4), (3, 'ios', 20, 5)" - ).await.unwrap(); + ).await?.collect().await?; let result = service.exec_query( "EXPLAIN SELECT platform, sum(amount) from foo.orders where age > 15 group by platform" - ).await.unwrap(); + ).await?.collect().await?; assert_eq!(result.len(), 1); assert_eq!(result.get_columns().len(), 1); @@ -4806,11 +4993,13 @@ mod tests { \n Filter\ \n Scan foo.orders, source: CubeTable(index: default:1:[1]), fields: [platform, age, amount]" ); + Ok::<(), CubeError>(()) }).await; + Ok(()) } #[tokio::test] - async fn builtin_functions() { + async fn builtin_functions() -> Result<(), CubeError> { Config::run_test("builtin_functions", async move |services| { let service = services.sql_service; @@ -4818,8 +5007,9 @@ mod tests { { let result = service .exec_query("SELECT round(42.4), round(42.4382, 2), round(1234.56, -1)") - .await - .unwrap(); + .await? + .collect() + .await?; assert_eq!(result.len(), 1); assert_eq!(result.get_columns().len(), 3); @@ -4833,12 +5023,14 @@ mod tests { ]),] ) } + Ok::<(), CubeError>(()) }) .await; + Ok(()) } #[tokio::test] - async fn explain_physical_plan() { + async fn explain_physical_plan() -> Result<(), CubeError> { Config::test("explain_analyze_router").update_config(|mut config| { config.select_workers = vec!["127.0.0.1:14006".to_string()]; config.metastore_bind_address = Some("127.0.0.1:15006".to_string()); @@ -4859,17 +5051,17 @@ mod tests { config.compaction_chunks_count_threshold = 0; config }).start_test_worker(async move |_| { - service.exec_query("CREATE SCHEMA foo").await.unwrap(); + service.exec_query("CREATE SCHEMA foo").await?.collect().await?; - service.exec_query("CREATE TABLE foo.orders (id int, platform text, age int, amount int)").await.unwrap(); + service.exec_query("CREATE TABLE foo.orders (id int, platform text, age int, amount int)").await?.collect().await?; service.exec_query( "INSERT INTO foo.orders (id, platform, age, amount) VALUES (1, 'android', 18, 4), (2, 'andorid', 17, 4), (3, 'ios', 20, 5)" - ).await.unwrap(); + ).await?.collect().await?; let result = service.exec_query( "EXPLAIN ANALYZE SELECT platform, sum(amount) from foo.orders where age > 15 group by platform" - ).await.unwrap(); + ).await?.collect().await?; assert_eq!(result.len(), 2); @@ -4923,11 +5115,14 @@ mod tests { _ => {assert!(false);} }; + Ok::<(), CubeError>(()) }).await; + Ok::<(), CubeError>(()) }).await; + Ok(()) } #[tokio::test] - async fn create_aggr_index() { + async fn create_aggr_index() -> Result<(), CubeError> { assert!(true); Config::test("aggregate_index") .update_config(|mut c| { @@ -4937,7 +5132,11 @@ mod tests { }) .start_test(async move |services| { let service = services.sql_service; - service.exec_query("CREATE SCHEMA foo").await.unwrap(); + service + .exec_query("CREATE SCHEMA foo") + .await? + .collect() + .await?; let paths = { let dir = env::temp_dir(); @@ -4945,36 +5144,28 @@ mod tests { let path_2 = dir.clone().join("orders.csv.gz"); let mut file = GzipEncoder::new(BufWriter::new( - tokio::fs::File::create(path_2.clone()).await.unwrap(), + tokio::fs::File::create(path_2.clone()).await?, )); file.write_all("platform,age,gender,cnt,max_id\n".as_bytes()) - .await - .unwrap(); + .await?; file.write_all("\"ios\",20,\"M\",10,100\n".as_bytes()) - .await - .unwrap(); + .await?; file.write_all("\"android\",20,\"M\",2,10\n".as_bytes()) - .await - .unwrap(); + .await?; file.write_all("\"web\",20,\"M\",20,111\n".as_bytes()) - .await - .unwrap(); + .await?; file.write_all("\"ios\",20,\"F\",10,100\n".as_bytes()) - .await - .unwrap(); + .await?; file.write_all("\"android\",20,\"F\",2,10\n".as_bytes()) - .await - .unwrap(); + .await?; file.write_all("\"web\",22,\"F\",20,115\n".as_bytes()) - .await - .unwrap(); + .await?; file.write_all("\"web\",22,\"F\",20,222\n".as_bytes()) - .await - .unwrap(); + .await?; - file.shutdown().await.unwrap(); + file.shutdown().await?; services .injector @@ -4984,8 +5175,7 @@ mod tests { path_2.to_str().unwrap().to_string(), "temp-uploads/orders.csv.gz".to_string(), ) - .await - .unwrap(); + .await?; vec!["temp://orders.csv.gz".to_string()] }; @@ -5003,9 +5193,9 @@ mod tests { LOCATION {}", paths.into_iter().map(|p| format!("'{}'", p)).join(",") ); - service.exec_query(&query).await.unwrap(); + service.exec_query(&query).await?.collect().await?; - let indices = services.meta_store.get_table_indexes(1).await.unwrap(); + let indices = services.meta_store.get_table_indexes(1).await?; let aggr_index = indices .iter() @@ -5015,13 +5205,11 @@ mod tests { let partitions = services .meta_store .get_active_partitions_by_index_id(aggr_index.get_id()) - .await - .unwrap(); + .await?; let chunks = services .meta_store .get_chunks_by_partition(partitions[0].get_id(), false) - .await - .unwrap(); + .await?; assert_eq!(chunks.len(), 1); assert_eq!(chunks[0].get_row().get_row_count(), 4); @@ -5030,44 +5218,41 @@ mod tests { .plan_query( "SELECT platform, age, sum(cnt) FROM foo.Orders GROUP BY platform, age", ) - .await - .unwrap(); + .await?; let worker_plan = pp_phys_plan(p.worker.as_ref()); assert!(worker_plan.find("aggr_index").is_some()); + Ok::<(), CubeError>(()) }) .await; + Ok(()) } #[tokio::test] - async fn validate_ksql_location() { + async fn validate_ksql_location() -> Result<(), CubeError> { Config::test("validate_ksql_location").update_config(|mut c| { c.partition_split_threshold = 2; c }).start_test(async move |services| { let service = services.sql_service; - let _ = service.exec_query("CREATE SCHEMA test").await.unwrap(); + let _ = service.exec_query("CREATE SCHEMA test").await?.collect().await?; service .exec_query("CREATE SOURCE OR UPDATE ksql AS 'ksql' VALUES (user = 'foo', password = 'bar', url = 'http://foo.com')") - .await - .unwrap(); + .await?.collect().await?; let _ = service .exec_query("CREATE TABLE test.events_by_type_1 (`EVENT` text, `KSQL_COL_0` int) WITH (select_statement = 'SELECT * FROM EVENTS_BY_TYPE WHERE time >= ''2022-01-01'' AND time < ''2022-02-01''') unique key (`EVENT`) location 'stream://ksql/EVENTS_BY_TYPE'") - .await - .unwrap(); + .await?.collect().await?; let _ = service .exec_query("CREATE TABLE test.events_by_type_2 (`EVENT` text, `KSQL_COL_0` int) WITH (select_statement = 'SELECT * FROM EVENTS_BY_TYPE') unique key (`EVENT`) location 'stream://ksql/EVENTS_BY_TYPE'") - .await - .unwrap(); + .await?.collect().await?; let _ = service .exec_query("CREATE TABLE test.events_by_type_3 (`EVENT` text, `KSQL_COL_0` int) unique key (`EVENT`) location 'stream://ksql/EVENTS_BY_TYPE'") - .await - .unwrap(); + .await?.collect().await?; let _ = service .exec_query("CREATE TABLE test.events_by_type_fail_1 (`EVENT` text, `KSQL_COL_0` int) WITH (select_statement = 'SELECT * EVENTS_BY_TYPE WHERE time >= \\'2022-01-01\\' AND time < \\'2022-02-01\\'') unique key (`EVENT`) location 'stream://ksql/EVENTS_BY_TYPE'") @@ -5078,11 +5263,13 @@ mod tests { .exec_query("CREATE TABLE test.events_by_type_fail_2 (`EVENT` text, `KSQL_COL_0` int) WITH (select_statement = 'SELECT * FROM (SELECT * FROM EVENTS_BY_TYPE WHERE time >= \\'2022-01-01\\' AND time < \\'2022-02-01\\')') unique key (`EVENT`) location 'stream://ksql/EVENTS_BY_TYPE'") .await .expect_err("Validation should fail"); + Ok::<(), CubeError>(()) }).await; + Ok(()) } #[tokio::test] - async fn create_stream_table_with_projection() { + async fn create_stream_table_with_projection() -> Result<(), CubeError> { Config::test("create_stream_table_with_projection").update_config(|mut c| { c.partition_split_threshold = 2; c @@ -5090,21 +5277,19 @@ mod tests { let service = services.sql_service; let metastore = services.meta_store; - let _ = service.exec_query("CREATE SCHEMA test").await.unwrap(); + let _ = service.exec_query("CREATE SCHEMA test").await?.collect().await?; service .exec_query("CREATE SOURCE OR UPDATE kafka AS 'kafka' VALUES (user = 'foo', password = 'bar', host = 'localhost:9092')") - .await - .unwrap(); + .await?.collect().await?; let _ = service .exec_query("CREATE TABLE test.events_1 (a int, b int) WITH (\ select_statement = 'SELECT a as a, b + c as b FROM `EVENTS_BY_TYPE` WHERE c > 10',\ source_table = 'CREATE TABLE events1 (a int, b int, c int)' ) unique key (`a`) location 'stream://kafka/EVENTS_BY_TYPE/0'") - .await - .unwrap(); - let table = metastore.get_table("test".to_string(), "events_1".to_string()).await.unwrap(); + .await?.collect().await?; + let table = metastore.get_table("test".to_string(), "events_1".to_string()).await?; assert_eq!( table.get_row().source_columns(), &Some(vec![ @@ -5130,60 +5315,68 @@ mod tests { .expect_err("Validation should fail"); + Ok::<(), CubeError>(()) }).await; + Ok(()) } #[tokio::test] - async fn trace_obj_for_streaming_table() { + async fn trace_obj_for_streaming_table() -> Result<(), CubeError> { Config::test("trace_obj_for_streaming_table").start_test(async move |services| { let service = services.sql_service; let meta_store = services.meta_store; - let _ = service.exec_query("CREATE SCHEMA test").await.unwrap(); + let _ = service.exec_query("CREATE SCHEMA test").await?.collect().await?; service - .exec_query("CREATE SOURCE OR UPDATE ksql AS 'ksql' VALUES (user = 'foo', password = 'bar', url = 'http://foo.com')").await.unwrap(); + .exec_query("CREATE SOURCE OR UPDATE ksql AS 'ksql' VALUES (user = 'foo', password = 'bar', url = 'http://foo.com')").await?.collect().await?; let context = SqlQueryContext::default().with_trace_obj(Some("{\"test\":\"context\"}".to_string())); let _ = service .exec_query_with_context(context, "CREATE TABLE test.table_1 (`EVENT` text, `KSQL_COL_0` int) unique key (`EVENT`) location 'stream://ksql/EVENTS_BY_TYPE'") - .await - .unwrap(); + .await?.collect().await?; - let table = meta_store.get_table("test".to_string(), "table_1".to_string()).await.unwrap(); - let trace_obj = meta_store.get_trace_obj_by_table_id(table.get_id()).await.unwrap(); + let table = meta_store.get_table("test".to_string(), "table_1".to_string()).await?; + let trace_obj = meta_store.get_trace_obj_by_table_id(table.get_id()).await?; assert!(trace_obj.is_some()); assert_eq!(trace_obj.unwrap(), "{\"test\":\"context\"}".to_string()); let _ = service .exec_query("CREATE TABLE test.table_2 (`EVENT` text, `KSQL_COL_0` int) unique key (`EVENT`)") - .await - .unwrap(); + .await?.collect().await?; - let table = meta_store.get_table("test".to_string(), "table_2".to_string()).await.unwrap(); - let trace_obj = meta_store.get_trace_obj_by_table_id(table.get_id()).await.unwrap(); + let table = meta_store.get_table("test".to_string(), "table_2".to_string()).await?; + let trace_obj = meta_store.get_trace_obj_by_table_id(table.get_id()).await?; println!("tobj {:?}", trace_obj); assert!(trace_obj.is_none()); + Ok::<(), CubeError>(()) }).await; + Ok(()) } #[tokio::test] - async fn total_count_over_groupping() { + async fn total_count_over_groupping() -> Result<(), CubeError> { Config::test("total_count_over_groupping") .start_test(async move |services| { let service = services.sql_service; - let _ = service.exec_query("CREATE SCHEMA test").await.unwrap(); + let _ = service + .exec_query("CREATE SCHEMA test") + .await? + .collect() + .await?; service .exec_query("CREATE TABLE test.test (id int, created timestamp, value int)") - .await - .unwrap(); + .await? + .collect() + .await?; service .exec_query("CREATE TABLE test.test1 (id int, created timestamp, value int)") - .await - .unwrap(); + .await? + .collect() + .await?; service .exec_query( @@ -5195,8 +5388,9 @@ mod tests { (2, '2022-01-02T00:00:00Z', 1)\ ", ) - .await - .unwrap(); + .await? + .collect() + .await?; service .exec_query( "INSERT INTO test.test1 (id, created, value) values \ @@ -5207,8 +5401,9 @@ mod tests { (2, '2022-01-02T00:00:00Z', 1)\ ", ) - .await - .unwrap(); + .await? + .collect() + .await?; let res = service .exec_query( "SELECT count(*) cnt FROM \ @@ -5221,8 +5416,9 @@ mod tests { order by 2 ) tmp", ) - .await - .unwrap(); + .await? + .collect() + .await?; assert_eq!(res.get_rows(), &vec![Row::new(vec![TableValue::Int(2)])]); let res = service @@ -5237,8 +5433,9 @@ mod tests { order by 2 ) tmp", ) - .await - .unwrap(); + .await? + .collect() + .await?; assert_eq!(res.get_rows(), &vec![Row::new(vec![TableValue::Int(3)])]); let res = service @@ -5257,8 +5454,9 @@ mod tests { group by 1, 2 ) tmp", ) - .await - .unwrap(); + .await? + .collect() + .await?; assert_eq!(res.get_rows(), &vec![Row::new(vec![TableValue::Int(4)])]); let res = service @@ -5275,29 +5473,37 @@ mod tests { order by 1, 2 ) tmp", ) - .await - .unwrap(); + .await? + .collect() + .await?; assert_eq!(res.get_rows(), &vec![Row::new(vec![TableValue::Int(4)])]); + Ok::<(), CubeError>(()) }) .await; //assert_eq!(res.get_rows(), &vec![Row::new(vec![TableValue::Int(2)])]); + Ok(()) } #[tokio::test] - async fn ungroupped_with_order_and_limit() { + async fn ungroupped_with_order_and_limit() -> Result<(), CubeError> { Config::test("ungroupped_with_order_and_limit") .start_test(async move |services| { let service = services.sql_service; - let _ = service.exec_query("CREATE SCHEMA test").await.unwrap(); + let _ = service + .exec_query("CREATE SCHEMA test") + .await? + .collect() + .await?; service .exec_query( "CREATE TABLE test.topk_test (id int, name varchar, total_sales int)", ) - .await - .unwrap(); + .await? + .collect() + .await?; service .exec_query( @@ -5343,14 +5549,16 @@ mod tests { (159, 'Atticus', 62700), (160, 'Beatrix', 49800)", ) - .await - .unwrap(); + .await? + .collect() + .await?; let res = service .exec_query( "SELECT name, total_sales FROM test.topk_test ORDER BY 1 ASC limit 3", ) - .await - .unwrap(); + .await? + .collect() + .await?; assert_eq!( res.get_rows(), &vec![ @@ -5368,24 +5576,31 @@ mod tests { ]), ] ); + Ok::<(), CubeError>(()) }) .await; //assert_eq!(res.get_rows(), &vec![Row::new(vec![TableValue::Int(2)])]); + Ok(()) } #[tokio::test] - async fn total_count_over_single_row() { + async fn total_count_over_single_row() -> Result<(), CubeError> { Config::test("total_count_over_single_row") .start_test(async move |services| { let service = services.sql_service; - let _ = service.exec_query("CREATE SCHEMA test").await.unwrap(); + let _ = service + .exec_query("CREATE SCHEMA test") + .await? + .collect() + .await?; service .exec_query("CREATE TABLE test.test (idd int, value int)") - .await - .unwrap(); + .await? + .collect() + .await?; service .exec_query( @@ -5393,8 +5608,9 @@ mod tests { (1, 10)\ ", ) - .await - .unwrap(); + .await? + .collect() + .await?; let res = service .exec_query( "SELECT count(*) cnt FROM \ @@ -5404,13 +5620,16 @@ mod tests { from test.test ) tmp", ) - .await - .unwrap(); + .await? + .collect() + .await?; assert_eq!(res.get_rows(), &vec![Row::new(vec![TableValue::Int(1)])]); + Ok::<(), CubeError>(()) }) .await; //assert_eq!(res.get_rows(), &vec![Row::new(vec![TableValue::Int(2)])]); + Ok(()) } #[test] @@ -5454,8 +5673,9 @@ mod tests { service .exec_query("CREATE SCHEMA IF NOT EXISTS test") - .await - .unwrap(); + .await? + .collect() + .await?; service .exec_query(&format!( "CREATE TABLE test.compaction_ready (`id` int, `value` int) \ @@ -5466,13 +5686,15 @@ mod tests { .collect::>() .join(",") )) - .await - .unwrap(); + .await? + .collect() + .await?; let result = service .exec_query("SELECT count(*) FROM test.compaction_ready") - .await - .unwrap(); + .await? + .collect() + .await?; assert_eq!( result.get_rows()[0], Row::new(vec![TableValue::Int(total_rows as i64)]) @@ -5484,19 +5706,16 @@ mod tests { services .meta_store .get_table("test".to_string(), "compaction_ready".to_string()) - .await - .unwrap() + .await? .get_id(), ) - .await - .unwrap(); + .await?; let partitions = services .meta_store .get_active_partitions_and_chunks_by_index_id_for_select( indexes.iter().map(|i| i.get_id()).collect(), ) - .await - .unwrap(); + .await?; for index_partitions in &partitions { for (_partition, chunks) in index_partitions { let active = chunks.iter().filter(|c| c.get_row().active()).count(); @@ -5508,6 +5727,7 @@ mod tests { ); } } + Ok::<(), CubeError>(()) }) .await; }); @@ -5554,8 +5774,9 @@ mod tests { service .exec_query("CREATE SCHEMA IF NOT EXISTS test") - .await - .unwrap(); + .await? + .collect() + .await?; service .exec_query(&format!( "CREATE TABLE test.no_threshold (`id` int, `value` int) \ @@ -5566,13 +5787,15 @@ mod tests { .collect::>() .join(",") )) - .await - .unwrap(); + .await? + .collect() + .await?; let result = service .exec_query("SELECT count(*) FROM test.no_threshold") - .await - .unwrap(); + .await? + .collect() + .await?; assert_eq!( result.get_rows()[0], Row::new(vec![TableValue::Int(total_rows as i64)]) @@ -5584,19 +5807,16 @@ mod tests { services .meta_store .get_table("test".to_string(), "no_threshold".to_string()) - .await - .unwrap() + .await? .get_id(), ) - .await - .unwrap(); + .await?; let partitions = services .meta_store .get_active_partitions_and_chunks_by_index_id_for_select( indexes.iter().map(|i| i.get_id()).collect(), ) - .await - .unwrap(); + .await?; let max_chunks = partitions .iter() .flat_map(|index_partitions| index_partitions.iter()) @@ -5611,6 +5831,7 @@ mod tests { but max active chunks per partition was {}", max_chunks, ); + Ok::<(), CubeError>(()) }) .await; }); @@ -5672,13 +5893,13 @@ impl SqlServiceImpl { )); } if q.to_lowercase() == "set character set utf8" { - return Some(DataFrame::new(vec![], vec![])); + return Some(DataFrame::empty()); } if q.to_lowercase() == "set names utf8" { - return Some(DataFrame::new(vec![], vec![])); + return Some(DataFrame::empty()); } if q.to_lowercase() == "show character set where charset = 'utf8mb4'" { - return Some(DataFrame::new(vec![], vec![])); + return Some(DataFrame::empty()); } None } diff --git a/rust/cubestore/cubestore/src/store/compaction.rs b/rust/cubestore/cubestore/src/store/compaction.rs index 0fb484f7a996b..f07d97692105b 100644 --- a/rust/cubestore/cubestore/src/store/compaction.rs +++ b/rust/cubestore/cubestore/src/store/compaction.rs @@ -2182,7 +2182,13 @@ mod tests { }) .start_test(async move |services| { let service = services.sql_service; - let _ = service.exec_query("CREATE SCHEMA test").await.unwrap(); + let _ = service + .exec_query("CREATE SCHEMA test") + .await + .unwrap() + .collect() + .await + .unwrap(); let compaction_service = services .injector .get_service_typed::() @@ -2190,13 +2196,22 @@ mod tests { service .exec_query("create table test.a (a int, b int96)") .await + .unwrap() + .collect() + .await .unwrap(); let values = (0..15) .map(|i| format!("({}, {})", i, i)) .collect::>() .join(", "); let query = format!("insert into test.a (a, b) values {}", values); - service.exec_query(&query).await.unwrap(); + service + .exec_query(&query) + .await + .unwrap() + .collect() + .await + .unwrap(); compaction_service .compact(1, DataLoadedSize::new()) .await @@ -2214,7 +2229,13 @@ mod tests { let query = format!("insert into test.a (a, b) values {}", values); - service.exec_query(&query).await.unwrap(); + service + .exec_query(&query) + .await + .unwrap() + .collect() + .await + .unwrap(); compaction_service .compact(partitions[0].get_id(), DataLoadedSize::new()) .await @@ -2225,6 +2246,7 @@ mod tests { .await .unwrap(); assert_eq!(partitions.len(), 3); + Ok::<(), CubeError>(()) }) .await; } @@ -2238,7 +2260,7 @@ mod tests { }) .start_test(async move |services| { let service = services.sql_service; - let _ = service.exec_query("CREATE SCHEMA test").await.unwrap(); + let _ = service.exec_query("CREATE SCHEMA test").await.unwrap().collect().await.unwrap(); let compaction_service = services .injector .get_service_typed::() @@ -2246,13 +2268,13 @@ mod tests { service .exec_query("create table test.a (a int, d0 decimal(20,0), d1 decimal(20, 1), d2 decimal(20, 2), d3 decimal(20, 3), d4 decimal(20, 4), d5 decimal(20, 5), d10 decimal(20, 10))") .await - .unwrap(); + .unwrap().collect().await.unwrap(); let values = (0..15) .map(|i| format!("({}, {}, {}, {}, {}, {}, {}, {})", i, i, i, i, i, i, i, i)) .collect::>() .join(", "); let query = format!("insert into test.a (a, d0, d1, d2, d3, d4, d5, d10) values {}", values); - service.exec_query(&query).await.unwrap(); + service.exec_query(&query).await.unwrap().collect().await.unwrap(); compaction_service .compact(1, DataLoadedSize::new()) .await @@ -2269,7 +2291,7 @@ mod tests { .join(", "); let query = format!("insert into test.a (a, d0, d1, d2, d3, d4, d5, d10) values {}", values); - service.exec_query(&query).await.unwrap(); + service.exec_query(&query).await.unwrap().collect().await.unwrap(); compaction_service .compact(partitions[0].get_id(), DataLoadedSize::new()) .await @@ -2280,6 +2302,7 @@ mod tests { .await .unwrap(); assert_eq!(partitions.len(), 3); + Ok::<(), CubeError>(()) }) .await; } @@ -2294,7 +2317,13 @@ mod tests { }) .start_test(async move |services| { let service = services.sql_service; - let _ = service.exec_query("CREATE SCHEMA test").await.unwrap(); + let _ = service + .exec_query("CREATE SCHEMA test") + .await + .unwrap() + .collect() + .await + .unwrap(); let compaction_service = services .injector .get_service_typed::() @@ -2302,13 +2331,22 @@ mod tests { service .exec_query("create table test.a (a varchar(255), b varchar(255))") .await + .unwrap() + .collect() + .await .unwrap(); let values = (0..1000) .map(|i| format!("('{}{}', '{}{}')", i, "a".repeat(10), i, "b".repeat(10))) .collect::>() .join(", "); let query = format!("insert into test.a (a, b) values {}", values); - service.exec_query(&query).await.unwrap(); + service + .exec_query(&query) + .await + .unwrap() + .collect() + .await + .unwrap(); compaction_service .compact(1, DataLoadedSize::new()) .await @@ -2325,7 +2363,13 @@ mod tests { .join(", "); let query = format!("insert into test.a (a, b) values {}", values); - service.exec_query(&query).await.unwrap(); + service + .exec_query(&query) + .await + .unwrap() + .collect() + .await + .unwrap(); compaction_service .compact(partitions[0].get_id(), DataLoadedSize::new()) .await @@ -2339,6 +2383,7 @@ mod tests { for p in partitions.iter() { assert!(p.get_row().file_size().unwrap() <= 10000); } + Ok::<(), CubeError>(()) }) .await; } diff --git a/rust/cubestore/cubestore/src/store/mod.rs b/rust/cubestore/cubestore/src/store/mod.rs index 2b38810c64658..0535e72de33a1 100644 --- a/rust/cubestore/cubestore/src/store/mod.rs +++ b/rust/cubestore/cubestore/src/store/mod.rs @@ -69,6 +69,13 @@ impl DataFrame { DataFrame { columns, data } } + pub fn empty() -> DataFrame { + DataFrame { + columns: vec![], + data: vec![], + } + } + pub fn len(&self) -> usize { self.data.len() } diff --git a/rust/cubestore/cubestore/src/streaming/mod.rs b/rust/cubestore/cubestore/src/streaming/mod.rs index 3b39d08cb6dc0..1a317d8d4f280 100644 --- a/rust/cubestore/cubestore/src/streaming/mod.rs +++ b/rust/cubestore/cubestore/src/streaming/mod.rs @@ -1164,35 +1164,32 @@ mod tests { let service = services.sql_service; let meta_store = services.meta_store; - let _ = service.exec_query("CREATE SCHEMA test").await.unwrap(); + let _ = service.exec_query("CREATE SCHEMA test").await?.collect().await?; service .exec_query("CREATE SOURCE OR UPDATE ksql AS 'ksql' VALUES (user = 'foo', password = 'bar', url = 'http://foo.com')") - .await - .unwrap(); + .await?.collect().await?; let listener = services.cluster.job_result_listener(); let _ = service .exec_query("CREATE TABLE test.events_by_type_1 (`ANONYMOUSID` text, `MESSAGEID` text) WITH (select_statement = 'SELECT * FROM EVENTS_BY_TYPE WHERE time >= ''2022-01-01'' AND time < ''2022-02-01''', stream_offset = 'earliest') unique key (`ANONYMOUSID`, `MESSAGEID`) INDEX by_anonymous(`ANONYMOUSID`) location 'stream://ksql/EVENTS_BY_TYPE/0', 'stream://ksql/EVENTS_BY_TYPE/1'") - .await - .unwrap(); + .await?.collect().await?; let wait = listener.wait_for_job_results(vec![ (RowKey::Table(TableId::Tables, 1), JobType::TableImportCSV("stream://ksql/EVENTS_BY_TYPE/0".to_string())), (RowKey::Table(TableId::Tables, 1), JobType::TableImportCSV("stream://ksql/EVENTS_BY_TYPE/1".to_string())), ]); - timeout(Duration::from_secs(15), wait).await.unwrap().unwrap(); + timeout(Duration::from_secs(15), wait).await?.unwrap(); let result = service .exec_query("SELECT COUNT(*) FROM test.events_by_type_1") - .await - .unwrap(); + .await?.collect().await?; assert_eq!(result.get_rows(), &vec![Row::new(vec![TableValue::Int(10000)])]); let listener = services.cluster.job_result_listener(); - let chunks = meta_store.chunks_table().all_rows().await.unwrap(); - let replay_handles = meta_store.get_replay_handles_by_ids(chunks.iter().filter_map(|c| c.get_row().replay_handle_id().clone()).collect()).await.unwrap(); + let chunks = meta_store.chunks_table().all_rows().await?; + let replay_handles = meta_store.get_replay_handles_by_ids(chunks.iter().filter_map(|c| c.get_row().replay_handle_id().clone()).collect()).await?; let mut middle_chunk = None; for chunk in chunks.iter() { if chunk.get_row().get_partition_id() != 1 { @@ -1203,7 +1200,7 @@ mod tests { if let Some(seq_pointers) = handle.get_row().seq_pointers_by_location() { if seq_pointers.iter().any(|p| p.as_ref().map(|p| p.start_seq().as_ref().zip(p.end_seq().as_ref()).map(|(a, b)| *a > 0 && *b <= 3276).unwrap_or(false)).unwrap_or(false)) { let chunk_name = chunk_file_name(chunk.get_id(), chunk.get_row().suffix()); - chunk_store.free_memory_chunk(chunk_name).await.unwrap(); + chunk_store.free_memory_chunk(chunk_name).await?; middle_chunk = Some(chunk.clone()); break; } @@ -1211,7 +1208,7 @@ mod tests { } } let partition_id = middle_chunk.unwrap().get_row().get_partition_id(); - let partition = &meta_store.get_partition(partition_id).await.unwrap(); + let partition = &meta_store.get_partition(partition_id).await?; let node = cluster.node_name_by_partition(partition); let job = meta_store @@ -1220,80 +1217,72 @@ mod tests { JobType::InMemoryChunksCompaction, node.to_string(), )) - .await.unwrap(); + .await?; if job.is_some() { - cluster.notify_job_runner(node).await.unwrap(); + cluster.notify_job_runner(node).await?; } let wait = listener.wait_for_job_results(vec![ (RowKey::Table(TableId::Partitions, 1), JobType::InMemoryChunksCompaction), ]); - timeout(Duration::from_secs(10), wait).await.unwrap().unwrap(); + timeout(Duration::from_secs(10), wait).await?.unwrap(); println!("chunks: {:#?}", service .exec_query("SELECT * FROM system.chunks") - .await - .unwrap() + .await?.collect().await? ); println!("replay handles: {:#?}", service .exec_query("SELECT * FROM system.replay_handles") - .await - .unwrap() + .await?.collect().await? ); let result = service .exec_query("SELECT COUNT(*) FROM test.events_by_type_1") - .await - .unwrap(); + .await?.collect().await?; assert_eq!(result.get_rows(), &vec![Row::new(vec![TableValue::Int(10000 - 1638)])]); let listener = services.cluster.job_result_listener(); - scheduler.reconcile_table_imports().await.unwrap(); + scheduler.reconcile_table_imports().await?; let wait = listener.wait_for_job_results(vec![ (RowKey::Table(TableId::Tables, 1), JobType::TableImportCSV("stream://ksql/EVENTS_BY_TYPE/0".to_string())), (RowKey::Table(TableId::Tables, 1), JobType::TableImportCSV("stream://ksql/EVENTS_BY_TYPE/1".to_string())), ]); - timeout(Duration::from_secs(10), wait).await.unwrap().unwrap(); + timeout(Duration::from_secs(10), wait).await?.unwrap(); Delay::new(Duration::from_millis(10000)).await; let result = service .exec_query("SELECT COUNT(*) FROM test.events_by_type_1") - .await - .unwrap(); + .await?.collect().await?; assert_eq!(result.get_rows(), &vec![Row::new(vec![TableValue::Int(10000)])]); println!("replay handles pre merge: {:#?}", service .exec_query("SELECT * FROM system.replay_handles") - .await - .unwrap() + .await?.collect().await? ); - scheduler.merge_replay_handles().await.unwrap(); + scheduler.merge_replay_handles().await?; let result = service .exec_query("SELECT * FROM system.replay_handles WHERE has_failed_to_persist_chunks = true") - .await - .unwrap(); + .await?.collect().await?; assert_eq!(result.get_rows().len(), 0); println!("replay handles after merge: {:#?}", service .exec_query("SELECT * FROM system.replay_handles") - .await - .unwrap() + .await?.collect().await? ); service .exec_query("DROP TABLE test.events_by_type_1") - .await - .unwrap(); + .await?.collect().await?; let result = service .exec_query("SELECT * FROM system.replay_handles") - .await - .unwrap(); + .await?.collect().await?; assert_eq!(result.get_rows().len(), 0); + Ok::<(), CubeError>(()) }) .await; } @@ -1323,35 +1312,32 @@ mod tests { let service = services.sql_service; let meta_store = services.meta_store; - let _ = service.exec_query("CREATE SCHEMA test").await.unwrap(); + let _ = service.exec_query("CREATE SCHEMA test").await?.collect().await?; service .exec_query("CREATE SOURCE OR UPDATE kafka AS 'kafka' VALUES (user = 'foo', password = 'bar', host = 'localhost:9092')") - .await - .unwrap(); + .await?.collect().await?; let listener = services.cluster.job_result_listener(); let _ = service .exec_query("CREATE TABLE test.events_by_type_1 (`ANONYMOUSID` text, `MESSAGEID` text) WITH (stream_offset = 'earliest') unique key (`ANONYMOUSID`, `MESSAGEID`) INDEX by_anonymous(`ANONYMOUSID`) location 'stream://kafka/EVENTS_BY_TYPE/0', 'stream://kafka/EVENTS_BY_TYPE/1'") - .await - .unwrap(); + .await?.collect().await?; let wait = listener.wait_for_job_results(vec![ (RowKey::Table(TableId::Tables, 1), JobType::TableImportCSV("stream://kafka/EVENTS_BY_TYPE/0".to_string())), (RowKey::Table(TableId::Tables, 1), JobType::TableImportCSV("stream://kafka/EVENTS_BY_TYPE/1".to_string())), ]); - timeout(Duration::from_secs(15), wait).await.unwrap().unwrap(); + timeout(Duration::from_secs(15), wait).await?.unwrap(); let result = service .exec_query("SELECT COUNT(*) FROM test.events_by_type_1") - .await - .unwrap(); + .await?.collect().await?; assert_eq!(result.get_rows(), &vec![Row::new(vec![TableValue::Int(10000)])]); let listener = services.cluster.job_result_listener(); - let chunks = meta_store.chunks_table().all_rows().await.unwrap(); - let replay_handles = meta_store.get_replay_handles_by_ids(chunks.iter().filter_map(|c| c.get_row().replay_handle_id().clone()).collect()).await.unwrap(); + let chunks = meta_store.chunks_table().all_rows().await?; + let replay_handles = meta_store.get_replay_handles_by_ids(chunks.iter().filter_map(|c| c.get_row().replay_handle_id().clone()).collect()).await?; let mut middle_chunk = None; for chunk in chunks.iter() { if chunk.get_row().get_partition_id() != 1 { @@ -1362,7 +1348,7 @@ mod tests { if let Some(seq_pointers) = handle.get_row().seq_pointers_by_location() { if seq_pointers.iter().any(|p| p.as_ref().map(|p| p.start_seq().as_ref().zip(p.end_seq().as_ref()).map(|(a, b)| *a > 0 && *b <= 3276).unwrap_or(false)).unwrap_or(false)) { let chunk_name = chunk_file_name(chunk.get_id(), chunk.get_row().suffix()); - chunk_store.free_memory_chunk(chunk_name).await.unwrap(); + chunk_store.free_memory_chunk(chunk_name).await?; middle_chunk = Some(chunk.clone()); break; } @@ -1371,7 +1357,7 @@ mod tests { } let partition_id = middle_chunk.unwrap().get_row().get_partition_id(); - let partition = &meta_store.get_partition(partition_id).await.unwrap(); + let partition = &meta_store.get_partition(partition_id).await?; let node = cluster.node_name_by_partition(partition); let job = meta_store @@ -1380,59 +1366,55 @@ mod tests { JobType::InMemoryChunksCompaction, node.to_string(), )) - .await.unwrap(); + .await?; if job.is_some() { - cluster.notify_job_runner(node).await.unwrap(); + cluster.notify_job_runner(node).await?; } let wait = listener.wait_for_job_results(vec![ (RowKey::Table(TableId::Partitions, 1), JobType::InMemoryChunksCompaction), ]); - timeout(Duration::from_secs(10), wait).await.unwrap().unwrap(); + timeout(Duration::from_secs(10), wait).await?.unwrap(); let result = service .exec_query("SELECT COUNT(*) FROM test.events_by_type_1") - .await - .unwrap(); + .await?.collect().await?; assert_eq!(result.get_rows(), &vec![Row::new(vec![TableValue::Int(10000 - 1638)])]); let listener = services.cluster.job_result_listener(); - scheduler.reconcile_table_imports().await.unwrap(); + scheduler.reconcile_table_imports().await?; let wait = listener.wait_for_job_results(vec![ (RowKey::Table(TableId::Tables, 1), JobType::TableImportCSV("stream://kafka/EVENTS_BY_TYPE/0".to_string())), (RowKey::Table(TableId::Tables, 1), JobType::TableImportCSV("stream://kafka/EVENTS_BY_TYPE/1".to_string())), ]); - timeout(Duration::from_secs(10), wait).await.unwrap().unwrap(); + timeout(Duration::from_secs(10), wait).await?.unwrap(); Delay::new(Duration::from_millis(10000)).await; let result = service .exec_query("SELECT COUNT(*) FROM test.events_by_type_1") - .await - .unwrap(); + .await?.collect().await?; assert_eq!(result.get_rows(), &vec![Row::new(vec![TableValue::Int(10000)])]); - scheduler.merge_replay_handles().await.unwrap(); + scheduler.merge_replay_handles().await?; let result = service .exec_query("SELECT * FROM system.replay_handles WHERE has_failed_to_persist_chunks = true") - .await - .unwrap(); + .await?.collect().await?; assert_eq!(result.get_rows().len(), 0); service .exec_query("DROP TABLE test.events_by_type_1") - .await - .unwrap(); + .await?.collect().await?; let result = service .exec_query("SELECT * FROM system.replay_handles") - .await - .unwrap(); + .await?.collect().await?; assert_eq!(result.get_rows().len(), 0); + Ok::<(), CubeError>(()) }) .await; } @@ -1458,12 +1440,11 @@ mod tests { //PARSE_TIMESTAMP('2023-01-24T23:59:59.999Z', 'yyyy-MM-dd''T''HH:mm:ss.SSSX', 'UTC') let service = services.sql_service; - let _ = service.exec_query("CREATE SCHEMA test").await.unwrap(); + let _ = service.exec_query("CREATE SCHEMA test").await?.collect().await?; service .exec_query("CREATE SOURCE OR UPDATE kafka AS 'kafka' VALUES (user = 'foo', password = 'bar', host = 'localhost:9092')") - .await - .unwrap(); + .await?.collect().await?; let listener = services.cluster.job_result_listener(); @@ -1471,8 +1452,7 @@ mod tests { .exec_query("CREATE TABLE test.events_by_type_1 (`ANONYMOUSID` text, `MESSAGEID` text, `FILTER_ID` int) \ WITH (stream_offset = 'earliest', select_statement = 'SELECT * FROM `EVENTS_BY_TYPE` WHERE `FILTER_ID` >= 1000 and `FILTER_ID` < 1400') \ unique key (`ANONYMOUSID`, `MESSAGEID`, `FILTER_ID`) INDEX by_anonymous(`ANONYMOUSID`, `FILTER_ID`) location 'stream://kafka/EVENTS_BY_TYPE/0', 'stream://kafka/EVENTS_BY_TYPE/1'") - .await - .unwrap(); + .await?.collect().await?; let wait = listener.wait_for_job_results(vec![ (RowKey::Table(TableId::Tables, 1), JobType::TableImportCSV("stream://kafka/EVENTS_BY_TYPE/0".to_string())), @@ -1482,21 +1462,19 @@ mod tests { let result = service .exec_query("SELECT COUNT(*) FROM test.events_by_type_1") - .await - .unwrap(); + .await?.collect().await?; assert_eq!(result.get_rows(), &vec![Row::new(vec![TableValue::Int(800)])]); let result = service .exec_query("SELECT min(`FILTER_ID`) FROM test.events_by_type_1 ") - .await - .unwrap(); + .await?.collect().await?; assert_eq!(result.get_rows(), &vec![Row::new(vec![TableValue::Int(1000)])]); let result = service .exec_query("SELECT max(`FILTER_ID`) FROM test.events_by_type_1 ") - .await - .unwrap(); + .await?.collect().await?; assert_eq!(result.get_rows(), &vec![Row::new(vec![TableValue::Int(1399)])]); + Ok::<(), CubeError>(()) }) .await; } @@ -1522,12 +1500,11 @@ mod tests { //PARSE_TIMESTAMP('2023-01-24T23:59:59.999Z', 'yyyy-MM-dd''T''HH:mm:ss.SSSX', 'UTC') let service = services.sql_service; - let _ = service.exec_query("CREATE SCHEMA test").await.unwrap(); + let _ = service.exec_query("CREATE SCHEMA test").await?.collect().await?; service .exec_query("CREATE SOURCE OR UPDATE kafka AS 'kafka' VALUES (user = 'foo', password = 'bar', host = 'localhost:9092')") - .await - .unwrap(); + .await?.collect().await?; let listener = services.cluster.job_result_listener(); @@ -1535,8 +1512,7 @@ mod tests { .exec_query("CREATE TABLE test.events_by_type_1 (`ANONYMOUSID` text, `MESSAGEID` text, `FILTER_ID` int, `CONCATID` text) \ WITH (stream_offset = 'earliest', select_statement = 'SELECT `ANONYMOUSID`, `MESSAGEID`, `FILTER_ID`, concat(`ANONYMOUSID`, `MESSAGEID`) AS `CONCATID` FROM `EVENTS_BY_TYPE` WHERE `FILTER_ID` >= 1000 and `FILTER_ID` < 1400') \ unique key (`ANONYMOUSID`, `MESSAGEID`, `FILTER_ID`) INDEX by_anonymous(`ANONYMOUSID`, `FILTER_ID`) location 'stream://kafka/EVENTS_BY_TYPE/0', 'stream://kafka/EVENTS_BY_TYPE/1'") - .await - .unwrap(); + .await?.collect().await?; let wait = listener.wait_for_job_results(vec![ (RowKey::Table(TableId::Tables, 1), JobType::TableImportCSV("stream://kafka/EVENTS_BY_TYPE/0".to_string())), @@ -1546,14 +1522,12 @@ mod tests { let result = service .exec_query("SELECT COUNT(*) FROM test.events_by_type_1") - .await - .unwrap(); + .await?.collect().await?; assert_eq!(result.get_rows(), &vec![Row::new(vec![TableValue::Int(800)])]); let result = service .exec_query("SELECT concat(`ANONYMOUSID`, `MESSAGEID`), `CONCATID` FROM test.events_by_type_1 ") - .await - .unwrap(); + .await?.collect().await?; let rows = result.get_rows(); assert_eq!(rows.len(), 800); for (i, row) in rows.iter().enumerate() { @@ -1561,6 +1535,7 @@ mod tests { assert_eq!(values[0], values[1], "i = {}", i); } + Ok::<(), CubeError>(()) }) .await; } @@ -1586,12 +1561,11 @@ mod tests { //PARSE_TIMESTAMP('2023-01-24T23:59:59.999Z', 'yyyy-MM-dd''T''HH:mm:ss.SSSX', 'UTC') let service = services.sql_service; - let _ = service.exec_query("CREATE SCHEMA test").await.unwrap(); + let _ = service.exec_query("CREATE SCHEMA test").await?.collect().await?; service .exec_query("CREATE SOURCE OR UPDATE kafka AS 'kafka' VALUES (user = 'foo', password = 'bar', host = 'localhost:9092')") - .await - .unwrap(); + .await?.collect().await?; let listener = services.cluster.job_result_listener(); @@ -1603,8 +1577,7 @@ mod tests { `TIMESTAMP` < PARSE_TIMESTAMP(''1970-01-01T01:10:00.000Z'', ''yyyy-MM-dd''''T''''HH:mm:ss.SSSX'', ''UTC'') \ ') \ unique key (`ANONYMOUSID`, `MESSAGEID`, `FILTER_ID`, `TIMESTAMP`) INDEX by_anonymous(`ANONYMOUSID`, `TIMESTAMP`) location 'stream://kafka/EVENTS_BY_TYPE/0', 'stream://kafka/EVENTS_BY_TYPE/1'") - .await - .unwrap(); + .await?.collect().await?; let wait = listener.wait_for_job_results(vec![ (RowKey::Table(TableId::Tables, 1), JobType::TableImportCSV("stream://kafka/EVENTS_BY_TYPE/0".to_string())), @@ -1614,21 +1587,19 @@ mod tests { let result = service .exec_query("SELECT COUNT(*) FROM test.events_by_type_1") - .await - .unwrap(); + .await?.collect().await?; assert_eq!(result.get_rows(), &vec![Row::new(vec![TableValue::Int(20 * 60)])]); let result = service .exec_query("SELECT min(`FILTER_ID`) FROM test.events_by_type_1 ") - .await - .unwrap(); + .await?.collect().await?; assert_eq!(result.get_rows(), &vec![Row::new(vec![TableValue::Int(3600)])]); let result = service .exec_query("SELECT max(`FILTER_ID`) FROM test.events_by_type_1 ") - .await - .unwrap(); + .await?.collect().await?; assert_eq!(result.get_rows(), &vec![Row::new(vec![TableValue::Int(3600 + 600 - 1)])]); + Ok::<(), CubeError>(()) }) .await; } @@ -1654,12 +1625,11 @@ mod tests { //PARSE_TIMESTAMP('2023-01-24T23:59:59.999Z', 'yyyy-MM-dd''T''HH:mm:ss.SSSX', 'UTC') let service = services.sql_service; - let _ = service.exec_query("CREATE SCHEMA test").await.unwrap(); + let _ = service.exec_query("CREATE SCHEMA test").await?.collect().await?; service .exec_query("CREATE SOURCE OR UPDATE kafka AS 'kafka' VALUES (user = 'foo', password = 'bar', host = 'localhost:9092')") - .await - .unwrap(); + .await?.collect().await?; service .exec_query("CREATE TABLE test.events_by_type_1 (`ANONYMOUSID` text, `MESSAGEID` text, `FILTER_ID` int, `TIMESTAMP` text) \ @@ -1675,8 +1645,7 @@ mod tests { '\ ) \ unique key (`ANONYMOUSID`, `MESSAGEID`, `FILTER_ID`, `TIMESTAMP`) INDEX by_anonymous(`ANONYMOUSID`, `TIMESTAMP`) location 'stream://kafka/EVENTS_BY_TYPE/0', 'stream://kafka/EVENTS_BY_TYPE/1'") - .await - .unwrap(); + .await?.collect().await?; service .exec_query("CREATE TABLE test.events_by_type_2 (`ANONYMOUSID` text, `MESSAGEID` text, `FILTER_ID` int, `TIMESTAMP` text) \ @@ -1692,8 +1661,7 @@ mod tests { '\ ) \ unique key (`ANONYMOUSID`, `MESSAGEID`) INDEX by_anonymous(`ANONYMOUSID`) location 'stream://kafka/EVENTS_BY_TYPE/0', 'stream://kafka/EVENTS_BY_TYPE/1'") - .await - .unwrap(); + .await?.collect().await?; service .exec_query("CREATE TABLE test.events_by_type_3 (`ANONYMOUSID` text, `MESSAGEID` text, `FILTER_ID` int, `TIMESTAMP` text) \ @@ -1741,8 +1709,7 @@ mod tests { source_table='CREATE TABLE `EVENTS_BY_TYPE` (`ANONYMOUSID` text, `MESSAGEID` text, `FILTER_ID` int, `TIMESTAMP` text)'\ ) \ unique key (`message_id`, `an_id`) INDEX by_anonymous(`message_id`) location 'stream://kafka/EVENTS_BY_TYPE/0', 'stream://kafka/EVENTS_BY_TYPE/1'") - .await - .unwrap(); + .await?.collect().await?; let _ = service .exec_query("CREATE TABLE test.events_by_type_5 (`an_id` text, `message_id` text, `filter_id` float, `minute_timestamp` timestamp) \ @@ -1791,8 +1758,8 @@ mod tests { '\ ) \ unique key (`ANONYMOUSID`, `MESSAGEID`, `FILTER_ID`, `TIMESTAMP`, `TIMESTAMP_SECOND`) INDEX by_anonymous(`ANONYMOUSID`, `TIMESTAMP_SECOND`,`TIMESTAMP`) location 'stream://kafka/EVENTS_BY_TYPE/0', 'stream://kafka/EVENTS_BY_TYPE/1'") - .await - .unwrap(); + .await?.collect().await?; + Ok::<(), CubeError>(()) }) .await; } @@ -1817,12 +1784,11 @@ mod tests { //PARSE_TIMESTAMP('2023-01-24T23:59:59.999Z', 'yyyy-MM-dd''T''HH:mm:ss.SSSX', 'UTC') let service = services.sql_service; - let _ = service.exec_query("CREATE SCHEMA test").await.unwrap(); + let _ = service.exec_query("CREATE SCHEMA test").await?.collect().await?; service .exec_query("CREATE SOURCE OR UPDATE kafka AS 'kafka' VALUES (user = 'foo', password = 'bar', host = 'localhost:9092')") - .await - .unwrap(); + .await?.collect().await?; let listener = services.cluster.job_result_listener(); @@ -1855,8 +1821,7 @@ mod tests { source_table='CREATE TABLE EVENTS_BY_TYPE (`ANONYMOUSID` text, `MESSAGEID` text, `FILTER_ID` int, `TIMESTAMP` text)'\ ) \ unique key (`message_id`, `an_id`) INDEX by_anonymous(`message_id`) location 'stream://kafka/EVENTS_BY_TYPE/0', 'stream://kafka/EVENTS_BY_TYPE/1'") - .await - .unwrap(); + .await?.collect().await?; let wait = listener.wait_for_job_results(vec![ (RowKey::Table(TableId::Tables, 1), JobType::TableImportCSV("stream://kafka/EVENTS_BY_TYPE/0".to_string())), @@ -1866,31 +1831,27 @@ mod tests { let result = service .exec_query("SELECT COUNT(*) FROM test.events_by_type_1") - .await - .unwrap(); + .await?.collect().await?; assert_eq!(result.get_rows(), &vec![Row::new(vec![TableValue::Int(20 * 60)])]); let result = service .exec_query("SELECT COUNT(*) FROM test.events_by_type_1 where minute_timestamp = to_timestamp('1970-01-01T01:06:00'))") - .await - .unwrap(); + .await?.collect().await?; assert_eq!(result.get_rows(), &vec![Row::new(vec![TableValue::Int(2 * 60)])]); let result = service .exec_query("SELECT minute_timestamp, count(*) FROM test.events_by_type_1 group by 1") - .await - .unwrap(); + .await?.collect().await?; assert_eq!(result.get_rows().len(), 10); let result = service .exec_query("SELECT min(filter_id) FROM test.events_by_type_1 ") - .await - .unwrap(); + .await?.collect().await?; assert_eq!(result.get_rows(), &vec![Row::new(vec![TableValue::Int(3600)])]); let result = service .exec_query("SELECT max(filter_id) FROM test.events_by_type_1 ") - .await - .unwrap(); + .await?.collect().await?; assert_eq!(result.get_rows(), &vec![Row::new(vec![TableValue::Int(3600 + 600 - 1)])]); + Ok::<(), CubeError>(()) }) .await; } From 29287664463fbf72550b8d00291b365201650f3d Mon Sep 17 00:00:00 2001 From: Mike Nitsenko Date: Tue, 5 May 2026 21:17:52 +0500 Subject: [PATCH 2/2] docs: update semantic snowflake semantic views doc following CUB-2368 (#10810) * upd * upd --- .../integrations/snowflake-semantic-views.mdx | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/docs-mintlify/docs/integrations/snowflake-semantic-views.mdx b/docs-mintlify/docs/integrations/snowflake-semantic-views.mdx index a845b6a6ad168..d4eb00f34768b 100644 --- a/docs-mintlify/docs/integrations/snowflake-semantic-views.mdx +++ b/docs-mintlify/docs/integrations/snowflake-semantic-views.mdx @@ -48,6 +48,16 @@ Alternatively, you can push Cube views into Snowflake as native semantic views. This enables you to use Cube-authored views directly in Snowflake, maintaining consistency across both platforms. +### Cubes defined with `sql` + +When a cube uses the `sql` property with a plain SQL string, Cube creates a helper +Snowflake view named `CUBE_SV_SRC_` in a configurable schema (defaults to +`PUBLIC`) and uses that view as the source for the semantic view. For example: +`sql: "SELECT id, status FROM raw.orders"`. + +Note that if you're simply referencing a table, use `sql_table` instead, as it's the +recommended approach for straightforward table access (e.g., `sql_table: MY_SCHEMA.MY_TABLE`). + ### Prerequisites The push integration uses the SQL Runner to execute DDL statements in Snowflake. To @@ -60,6 +70,8 @@ successfully create semantic views, ensure the following: - The Snowflake role configured for your Cube data source (via [`CUBEJS_DB_SNOWFLAKE_ROLE`](/reference/configuration/environment-variables#cubejs_db_snowflake_role)) has privileges to create semantic views in the target database and schema (`CREATE SEMANTIC VIEW` on the schema, plus `USAGE` on the parent database and schema). + If any cube uses a plain SQL string in its `sql` property, the role also needs + `CREATE VIEW` privileges on the schema where helper views are created (which defaults to `PUBLIC`). - The role has `USAGE` on the warehouse specified by [`CUBEJS_DB_SNOWFLAKE_WAREHOUSE`](/reference/configuration/environment-variables#cubejs_db_snowflake_warehouse) and `SELECT` on the underlying tables referenced by the view. - [`CUBEJS_DB_SNOWFLAKE_QUOTED_IDENTIFIERS_IGNORE_CASE`](/reference/configuration/environment-variables#cubejs_db_snowflake_quoted_identifiers_ignore_case) @@ -81,9 +93,10 @@ Cube only. When pushing a Cube view to Snowflake, the following are currently not supported: -- **Cubes without a `sql_table`.** Each cube referenced by the view must be - backed by a physical table. Cubes defined with an arbitrary `sql` block - (subqueries, CTEs, or other inline SQL) can't be pushed. +- **Cubes with templated `sql`.** If a cube's `sql` property uses template + expressions (e.g., Jinja or dbt `{{ source(...) }}` syntax), it can't be + pushed. See [Cubes defined with `sql`](#cubes-defined-with-sql) for details on + what SQL patterns are supported. - **Cubes without a single-column primary key.** Every cube needs a `primary_key` dimension that resolves to a single physical column. Composite primary keys and primary keys defined as SQL expressions aren't supported.