diff --git a/rust/cube/cubeshared/src/codegen/http_message.fbs b/rust/cube/cubeshared/src/codegen/http_message.fbs index ad9038a55bfea..b0a8b8994d672 100644 --- a/rust/cube/cubeshared/src/codegen/http_message.fbs +++ b/rust/cube/cubeshared/src/codegen/http_message.fbs @@ -1,7 +1,8 @@ union HttpCommand { HttpQuery, HttpResultSet, - HttpError + HttpError, + HttpQueryResult } table HttpMessage { @@ -22,11 +23,18 @@ table HttpParameter { value: HttpParameterValue (required); } +enum QueryResultFormat: ubyte { + // HttpResultSet will be returned. + Legacy = 0, + Arrow = 1, +} + table HttpQuery { query: string; trace_obj: string; inline_tables: [HttpTable]; parameters: [HttpParameter]; + response_format: QueryResultFormat = Legacy; } table HttpTable { @@ -54,4 +62,21 @@ table HttpColumnValue { string_value: string; } +table HttpQueryResultArrow { + // Self-contained Arrow IPC stream payload. Contains a schema header + // followed by one or more RecordBatch messages. Consumers must use a + // streaming IPC reader (not assume a single batch). + data: [ubyte] (required); + // True on the final frame of a streamed result. + is_last: bool; +} + +union HttpQueryResultData { + HttpQueryResultArrow, +} + +table HttpQueryResult { + data: HttpQueryResultData (required); +} + root_type HttpMessage; diff --git a/rust/cube/cubeshared/src/codegen/http_message_generated.rs b/rust/cube/cubeshared/src/codegen/http_message_generated.rs index 5ae37dccc821c..19eac2f5ab526 100644 --- a/rust/cube/cubeshared/src/codegen/http_message_generated.rs +++ b/rust/cube/cubeshared/src/codegen/http_message_generated.rs @@ -11,17 +11,18 @@ pub const ENUM_MIN_HTTP_COMMAND: u8 = 0; since = "2.0.0", note = "Use associated constants instead. This will no longer be generated in 2021." )] -pub const ENUM_MAX_HTTP_COMMAND: u8 = 3; +pub const ENUM_MAX_HTTP_COMMAND: u8 = 4; #[deprecated( since = "2.0.0", note = "Use associated constants instead. This will no longer be generated in 2021." )] #[allow(non_camel_case_types)] -pub const ENUM_VALUES_HTTP_COMMAND: [HttpCommand; 4] = [ +pub const ENUM_VALUES_HTTP_COMMAND: [HttpCommand; 5] = [ HttpCommand::NONE, HttpCommand::HttpQuery, HttpCommand::HttpResultSet, HttpCommand::HttpError, + HttpCommand::HttpQueryResult, ]; #[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)] @@ -33,14 +34,16 @@ impl HttpCommand { pub const HttpQuery: Self = Self(1); pub const HttpResultSet: Self = Self(2); pub const HttpError: Self = Self(3); + pub const HttpQueryResult: Self = Self(4); pub const ENUM_MIN: u8 = 0; - pub const ENUM_MAX: u8 = 3; + pub const ENUM_MAX: u8 = 4; pub const ENUM_VALUES: &'static [Self] = &[ Self::NONE, Self::HttpQuery, Self::HttpResultSet, Self::HttpError, + Self::HttpQueryResult, ]; /// Returns the variant's name or "" if unknown. pub fn variant_name(self) -> Option<&'static str> { @@ -49,6 +52,7 @@ impl HttpCommand { Self::HttpQuery => Some("HttpQuery"), Self::HttpResultSet => Some("HttpResultSet"), Self::HttpError => Some("HttpError"), + Self::HttpQueryResult => Some("HttpQueryResult"), _ => None, } } @@ -222,6 +226,188 @@ impl<'a> ::flatbuffers::Verifiable for HttpParameterValue { impl ::flatbuffers::SimpleToVerifyInSlice for HttpParameterValue {} pub struct HttpParameterValueUnionTableOffset {} +#[deprecated( + since = "2.0.0", + note = "Use associated constants instead. This will no longer be generated in 2021." +)] +pub const ENUM_MIN_QUERY_RESULT_FORMAT: u8 = 0; +#[deprecated( + since = "2.0.0", + note = "Use associated constants instead. This will no longer be generated in 2021." +)] +pub const ENUM_MAX_QUERY_RESULT_FORMAT: u8 = 1; +#[deprecated( + since = "2.0.0", + note = "Use associated constants instead. This will no longer be generated in 2021." +)] +#[allow(non_camel_case_types)] +pub const ENUM_VALUES_QUERY_RESULT_FORMAT: [QueryResultFormat; 2] = + [QueryResultFormat::Legacy, QueryResultFormat::Arrow]; + +#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)] +#[repr(transparent)] +pub struct QueryResultFormat(pub u8); +#[allow(non_upper_case_globals)] +impl QueryResultFormat { + pub const Legacy: Self = Self(0); + pub const Arrow: Self = Self(1); + + pub const ENUM_MIN: u8 = 0; + pub const ENUM_MAX: u8 = 1; + pub const ENUM_VALUES: &'static [Self] = &[Self::Legacy, Self::Arrow]; + /// Returns the variant's name or "" if unknown. + pub fn variant_name(self) -> Option<&'static str> { + match self { + Self::Legacy => Some("Legacy"), + Self::Arrow => Some("Arrow"), + _ => None, + } + } +} +impl ::core::fmt::Debug for QueryResultFormat { + fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result { + if let Some(name) = self.variant_name() { + f.write_str(name) + } else { + f.write_fmt(format_args!("", self.0)) + } + } +} +impl<'a> ::flatbuffers::Follow<'a> for QueryResultFormat { + type Inner = Self; + #[inline] + unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { + let b = unsafe { ::flatbuffers::read_scalar_at::(buf, loc) }; + Self(b) + } +} + +impl ::flatbuffers::Push for QueryResultFormat { + type Output = QueryResultFormat; + #[inline] + unsafe fn push(&self, dst: &mut [u8], _written_len: usize) { + unsafe { ::flatbuffers::emplace_scalar::(dst, self.0) }; + } +} + +impl ::flatbuffers::EndianScalar for QueryResultFormat { + type Scalar = u8; + #[inline] + fn to_little_endian(self) -> u8 { + self.0.to_le() + } + #[inline] + #[allow(clippy::wrong_self_convention)] + fn from_little_endian(v: u8) -> Self { + let b = u8::from_le(v); + Self(b) + } +} + +impl<'a> ::flatbuffers::Verifiable for QueryResultFormat { + #[inline] + fn run_verifier( + v: &mut ::flatbuffers::Verifier, + pos: usize, + ) -> Result<(), ::flatbuffers::InvalidFlatbuffer> { + u8::run_verifier(v, pos) + } +} + +impl ::flatbuffers::SimpleToVerifyInSlice for QueryResultFormat {} +#[deprecated( + since = "2.0.0", + note = "Use associated constants instead. This will no longer be generated in 2021." +)] +pub const ENUM_MIN_HTTP_QUERY_RESULT_DATA: u8 = 0; +#[deprecated( + since = "2.0.0", + note = "Use associated constants instead. This will no longer be generated in 2021." +)] +pub const ENUM_MAX_HTTP_QUERY_RESULT_DATA: u8 = 1; +#[deprecated( + since = "2.0.0", + note = "Use associated constants instead. This will no longer be generated in 2021." +)] +#[allow(non_camel_case_types)] +pub const ENUM_VALUES_HTTP_QUERY_RESULT_DATA: [HttpQueryResultData; 2] = [ + HttpQueryResultData::NONE, + HttpQueryResultData::HttpQueryResultArrow, +]; + +#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)] +#[repr(transparent)] +pub struct HttpQueryResultData(pub u8); +#[allow(non_upper_case_globals)] +impl HttpQueryResultData { + pub const NONE: Self = Self(0); + pub const HttpQueryResultArrow: Self = Self(1); + + pub const ENUM_MIN: u8 = 0; + pub const ENUM_MAX: u8 = 1; + pub const ENUM_VALUES: &'static [Self] = &[Self::NONE, Self::HttpQueryResultArrow]; + /// Returns the variant's name or "" if unknown. + pub fn variant_name(self) -> Option<&'static str> { + match self { + Self::NONE => Some("NONE"), + Self::HttpQueryResultArrow => Some("HttpQueryResultArrow"), + _ => None, + } + } +} +impl ::core::fmt::Debug for HttpQueryResultData { + fn fmt(&self, f: &mut ::core::fmt::Formatter) -> ::core::fmt::Result { + if let Some(name) = self.variant_name() { + f.write_str(name) + } else { + f.write_fmt(format_args!("", self.0)) + } + } +} +impl<'a> ::flatbuffers::Follow<'a> for HttpQueryResultData { + type Inner = Self; + #[inline] + unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { + let b = unsafe { ::flatbuffers::read_scalar_at::(buf, loc) }; + Self(b) + } +} + +impl ::flatbuffers::Push for HttpQueryResultData { + type Output = HttpQueryResultData; + #[inline] + unsafe fn push(&self, dst: &mut [u8], _written_len: usize) { + unsafe { ::flatbuffers::emplace_scalar::(dst, self.0) }; + } +} + +impl ::flatbuffers::EndianScalar for HttpQueryResultData { + type Scalar = u8; + #[inline] + fn to_little_endian(self) -> u8 { + self.0.to_le() + } + #[inline] + #[allow(clippy::wrong_self_convention)] + fn from_little_endian(v: u8) -> Self { + let b = u8::from_le(v); + Self(b) + } +} + +impl<'a> ::flatbuffers::Verifiable for HttpQueryResultData { + #[inline] + fn run_verifier( + v: &mut ::flatbuffers::Verifier, + pos: usize, + ) -> Result<(), ::flatbuffers::InvalidFlatbuffer> { + u8::run_verifier(v, pos) + } +} + +impl ::flatbuffers::SimpleToVerifyInSlice for HttpQueryResultData {} +pub struct HttpQueryResultDataUnionTableOffset {} + pub enum HttpMessageOffset {} #[derive(Copy, Clone, PartialEq)] @@ -360,6 +546,21 @@ impl<'a> HttpMessage<'a> { None } } + + #[inline] + #[allow(non_snake_case)] + pub fn command_as_http_query_result(&self) -> Option> { + if self.command_type() == HttpCommand::HttpQueryResult { + self.command().map(|t| { + // Safety: + // Created from a valid Table for this object + // Which contains a valid union in this slot + unsafe { HttpQueryResult::init_from_table(t) } + }) + } else { + None + } + } } impl ::flatbuffers::Verifiable for HttpMessage<'_> { @@ -392,6 +593,11 @@ impl ::flatbuffers::Verifiable for HttpMessage<'_> { "HttpCommand::HttpError", pos, ), + HttpCommand::HttpQueryResult => v + .verify_union_variant::<::flatbuffers::ForwardsUOffset>( + "HttpCommand::HttpQueryResult", + pos, + ), _ => Ok(()), }, )? @@ -508,6 +714,16 @@ impl ::core::fmt::Debug for HttpMessage<'_> { ) } } + HttpCommand::HttpQueryResult => { + if let Some(x) = self.command_as_http_query_result() { + ds.field("command", &x) + } else { + ds.field( + "command", + &"InvalidFlatbuffer: Union discriminant does not match value.", + ) + } + } _ => { let x: Option<()> = None; ds.field("command", &x) @@ -1494,6 +1710,7 @@ impl<'a> HttpQuery<'a> { pub const VT_TRACE_OBJ: ::flatbuffers::VOffsetT = 6; pub const VT_INLINE_TABLES: ::flatbuffers::VOffsetT = 8; pub const VT_PARAMETERS: ::flatbuffers::VOffsetT = 10; + pub const VT_RESPONSE_FORMAT: ::flatbuffers::VOffsetT = 12; #[inline] pub unsafe fn init_from_table(table: ::flatbuffers::Table<'a>) -> Self { @@ -1522,6 +1739,7 @@ impl<'a> HttpQuery<'a> { if let Some(x) = args.query { builder.add_query(x); } + builder.add_response_format(args.response_format); builder.finish() } @@ -1571,6 +1789,20 @@ impl<'a> HttpQuery<'a> { >>(HttpQuery::VT_PARAMETERS, None) } } + #[inline] + pub fn response_format(&self) -> QueryResultFormat { + // Safety: + // Created from valid Table for this object + // which contains a valid value in this slot + unsafe { + self._tab + .get::( + HttpQuery::VT_RESPONSE_FORMAT, + Some(QueryResultFormat::Legacy), + ) + .unwrap() + } + } } impl ::flatbuffers::Verifiable for HttpQuery<'_> { @@ -1592,6 +1824,7 @@ impl ::flatbuffers::Verifiable for HttpQuery<'_> { .visit_field::<::flatbuffers::ForwardsUOffset< ::flatbuffers::Vector<'_, ::flatbuffers::ForwardsUOffset>, >>("parameters", Self::VT_PARAMETERS, false)? + .visit_field::("response_format", Self::VT_RESPONSE_FORMAT, false)? .finish(); Ok(()) } @@ -1609,6 +1842,7 @@ pub struct HttpQueryArgs<'a> { ::flatbuffers::Vector<'a, ::flatbuffers::ForwardsUOffset>>, >, >, + pub response_format: QueryResultFormat, } impl<'a> Default for HttpQueryArgs<'a> { #[inline] @@ -1618,6 +1852,7 @@ impl<'a> Default for HttpQueryArgs<'a> { trace_obj: None, inline_tables: None, parameters: None, + response_format: QueryResultFormat::Legacy, } } } @@ -1660,6 +1895,14 @@ impl<'a: 'b, 'b, A: ::flatbuffers::Allocator + 'a> HttpQueryBuilder<'a, 'b, A> { .push_slot_always::<::flatbuffers::WIPOffset<_>>(HttpQuery::VT_PARAMETERS, parameters); } #[inline] + pub fn add_response_format(&mut self, response_format: QueryResultFormat) { + self.fbb_.push_slot::( + HttpQuery::VT_RESPONSE_FORMAT, + response_format, + QueryResultFormat::Legacy, + ); + } + #[inline] pub fn new( _fbb: &'b mut ::flatbuffers::FlatBufferBuilder<'a, A>, ) -> HttpQueryBuilder<'a, 'b, A> { @@ -1683,6 +1926,7 @@ impl ::core::fmt::Debug for HttpQuery<'_> { ds.field("trace_obj", &self.trace_obj()); ds.field("inline_tables", &self.inline_tables()); ds.field("parameters", &self.parameters()); + ds.field("response_format", &self.response_format()); ds.finish() } } @@ -2406,6 +2650,337 @@ impl ::core::fmt::Debug for HttpColumnValue<'_> { ds.finish() } } +pub enum HttpQueryResultArrowOffset {} +#[derive(Copy, Clone, PartialEq)] + +pub struct HttpQueryResultArrow<'a> { + pub _tab: ::flatbuffers::Table<'a>, +} + +impl<'a> ::flatbuffers::Follow<'a> for HttpQueryResultArrow<'a> { + type Inner = HttpQueryResultArrow<'a>; + #[inline] + unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { + Self { + _tab: unsafe { ::flatbuffers::Table::new(buf, loc) }, + } + } +} + +impl<'a> HttpQueryResultArrow<'a> { + pub const VT_DATA: ::flatbuffers::VOffsetT = 4; + pub const VT_IS_LAST: ::flatbuffers::VOffsetT = 6; + + #[inline] + pub unsafe fn init_from_table(table: ::flatbuffers::Table<'a>) -> Self { + HttpQueryResultArrow { _tab: table } + } + #[allow(unused_mut)] + pub fn create< + 'bldr: 'args, + 'args: 'mut_bldr, + 'mut_bldr, + A: ::flatbuffers::Allocator + 'bldr, + >( + _fbb: &'mut_bldr mut ::flatbuffers::FlatBufferBuilder<'bldr, A>, + args: &'args HttpQueryResultArrowArgs<'args>, + ) -> ::flatbuffers::WIPOffset> { + let mut builder = HttpQueryResultArrowBuilder::new(_fbb); + if let Some(x) = args.data { + builder.add_data(x); + } + builder.add_is_last(args.is_last); + builder.finish() + } + + #[inline] + pub fn data(&self) -> ::flatbuffers::Vector<'a, u8> { + // Safety: + // Created from valid Table for this object + // which contains a valid value in this slot + unsafe { + self._tab + .get::<::flatbuffers::ForwardsUOffset<::flatbuffers::Vector<'a, u8>>>( + HttpQueryResultArrow::VT_DATA, + None, + ) + .unwrap() + } + } + #[inline] + pub fn is_last(&self) -> bool { + // Safety: + // Created from valid Table for this object + // which contains a valid value in this slot + unsafe { + self._tab + .get::(HttpQueryResultArrow::VT_IS_LAST, Some(false)) + .unwrap() + } + } +} + +impl ::flatbuffers::Verifiable for HttpQueryResultArrow<'_> { + #[inline] + fn run_verifier( + v: &mut ::flatbuffers::Verifier, + pos: usize, + ) -> Result<(), ::flatbuffers::InvalidFlatbuffer> { + v.visit_table(pos)? + .visit_field::<::flatbuffers::ForwardsUOffset<::flatbuffers::Vector<'_, u8>>>( + "data", + Self::VT_DATA, + true, + )? + .visit_field::("is_last", Self::VT_IS_LAST, false)? + .finish(); + Ok(()) + } +} +pub struct HttpQueryResultArrowArgs<'a> { + pub data: Option<::flatbuffers::WIPOffset<::flatbuffers::Vector<'a, u8>>>, + pub is_last: bool, +} +impl<'a> Default for HttpQueryResultArrowArgs<'a> { + #[inline] + fn default() -> Self { + HttpQueryResultArrowArgs { + data: None, // required field + is_last: false, + } + } +} + +pub struct HttpQueryResultArrowBuilder<'a: 'b, 'b, A: ::flatbuffers::Allocator + 'a> { + fbb_: &'b mut ::flatbuffers::FlatBufferBuilder<'a, A>, + start_: ::flatbuffers::WIPOffset<::flatbuffers::TableUnfinishedWIPOffset>, +} +impl<'a: 'b, 'b, A: ::flatbuffers::Allocator + 'a> HttpQueryResultArrowBuilder<'a, 'b, A> { + #[inline] + pub fn add_data(&mut self, data: ::flatbuffers::WIPOffset<::flatbuffers::Vector<'b, u8>>) { + self.fbb_ + .push_slot_always::<::flatbuffers::WIPOffset<_>>(HttpQueryResultArrow::VT_DATA, data); + } + #[inline] + pub fn add_is_last(&mut self, is_last: bool) { + self.fbb_ + .push_slot::(HttpQueryResultArrow::VT_IS_LAST, is_last, false); + } + #[inline] + pub fn new( + _fbb: &'b mut ::flatbuffers::FlatBufferBuilder<'a, A>, + ) -> HttpQueryResultArrowBuilder<'a, 'b, A> { + let start = _fbb.start_table(); + HttpQueryResultArrowBuilder { + fbb_: _fbb, + start_: start, + } + } + #[inline] + pub fn finish(self) -> ::flatbuffers::WIPOffset> { + let o = self.fbb_.end_table(self.start_); + self.fbb_.required(o, HttpQueryResultArrow::VT_DATA, "data"); + ::flatbuffers::WIPOffset::new(o.value()) + } +} + +impl ::core::fmt::Debug for HttpQueryResultArrow<'_> { + fn fmt(&self, f: &mut ::core::fmt::Formatter<'_>) -> ::core::fmt::Result { + let mut ds = f.debug_struct("HttpQueryResultArrow"); + ds.field("data", &self.data()); + ds.field("is_last", &self.is_last()); + ds.finish() + } +} +pub enum HttpQueryResultOffset {} +#[derive(Copy, Clone, PartialEq)] + +pub struct HttpQueryResult<'a> { + pub _tab: ::flatbuffers::Table<'a>, +} + +impl<'a> ::flatbuffers::Follow<'a> for HttpQueryResult<'a> { + type Inner = HttpQueryResult<'a>; + #[inline] + unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { + Self { + _tab: unsafe { ::flatbuffers::Table::new(buf, loc) }, + } + } +} + +impl<'a> HttpQueryResult<'a> { + pub const VT_DATA_TYPE: ::flatbuffers::VOffsetT = 4; + pub const VT_DATA: ::flatbuffers::VOffsetT = 6; + + #[inline] + pub unsafe fn init_from_table(table: ::flatbuffers::Table<'a>) -> Self { + HttpQueryResult { _tab: table } + } + #[allow(unused_mut)] + pub fn create< + 'bldr: 'args, + 'args: 'mut_bldr, + 'mut_bldr, + A: ::flatbuffers::Allocator + 'bldr, + >( + _fbb: &'mut_bldr mut ::flatbuffers::FlatBufferBuilder<'bldr, A>, + args: &'args HttpQueryResultArgs, + ) -> ::flatbuffers::WIPOffset> { + let mut builder = HttpQueryResultBuilder::new(_fbb); + if let Some(x) = args.data { + builder.add_data(x); + } + builder.add_data_type(args.data_type); + builder.finish() + } + + #[inline] + pub fn data_type(&self) -> HttpQueryResultData { + // Safety: + // Created from valid Table for this object + // which contains a valid value in this slot + unsafe { + self._tab + .get::( + HttpQueryResult::VT_DATA_TYPE, + Some(HttpQueryResultData::NONE), + ) + .unwrap() + } + } + #[inline] + pub fn data(&self) -> ::flatbuffers::Table<'a> { + // Safety: + // Created from valid Table for this object + // which contains a valid value in this slot + unsafe { + self._tab + .get::<::flatbuffers::ForwardsUOffset<::flatbuffers::Table<'a>>>( + HttpQueryResult::VT_DATA, + None, + ) + .unwrap() + } + } + #[inline] + #[allow(non_snake_case)] + pub fn data_as_http_query_result_arrow(&self) -> Option> { + if self.data_type() == HttpQueryResultData::HttpQueryResultArrow { + let u = self.data(); + // Safety: + // Created from a valid Table for this object + // Which contains a valid union in this slot + Some(unsafe { HttpQueryResultArrow::init_from_table(u) }) + } else { + None + } + } +} + +impl ::flatbuffers::Verifiable for HttpQueryResult<'_> { + #[inline] + fn run_verifier( + v: &mut ::flatbuffers::Verifier, + pos: usize, + ) -> Result<(), ::flatbuffers::InvalidFlatbuffer> { + v.visit_table(pos)? + .visit_union::( + "data_type", + Self::VT_DATA_TYPE, + "data", + Self::VT_DATA, + true, + |key, v, pos| { + match key { + HttpQueryResultData::HttpQueryResultArrow => v + .verify_union_variant::<::flatbuffers::ForwardsUOffset< + HttpQueryResultArrow, + >>( + "HttpQueryResultData::HttpQueryResultArrow", + pos, + ), + _ => Ok(()), + } + }, + )? + .finish(); + Ok(()) + } +} +pub struct HttpQueryResultArgs { + pub data_type: HttpQueryResultData, + pub data: Option<::flatbuffers::WIPOffset<::flatbuffers::UnionWIPOffset>>, +} +impl<'a> Default for HttpQueryResultArgs { + #[inline] + fn default() -> Self { + HttpQueryResultArgs { + data_type: HttpQueryResultData::NONE, + data: None, // required field + } + } +} + +pub struct HttpQueryResultBuilder<'a: 'b, 'b, A: ::flatbuffers::Allocator + 'a> { + fbb_: &'b mut ::flatbuffers::FlatBufferBuilder<'a, A>, + start_: ::flatbuffers::WIPOffset<::flatbuffers::TableUnfinishedWIPOffset>, +} +impl<'a: 'b, 'b, A: ::flatbuffers::Allocator + 'a> HttpQueryResultBuilder<'a, 'b, A> { + #[inline] + pub fn add_data_type(&mut self, data_type: HttpQueryResultData) { + self.fbb_.push_slot::( + HttpQueryResult::VT_DATA_TYPE, + data_type, + HttpQueryResultData::NONE, + ); + } + #[inline] + pub fn add_data(&mut self, data: ::flatbuffers::WIPOffset<::flatbuffers::UnionWIPOffset>) { + self.fbb_ + .push_slot_always::<::flatbuffers::WIPOffset<_>>(HttpQueryResult::VT_DATA, data); + } + #[inline] + pub fn new( + _fbb: &'b mut ::flatbuffers::FlatBufferBuilder<'a, A>, + ) -> HttpQueryResultBuilder<'a, 'b, A> { + let start = _fbb.start_table(); + HttpQueryResultBuilder { + fbb_: _fbb, + start_: start, + } + } + #[inline] + pub fn finish(self) -> ::flatbuffers::WIPOffset> { + let o = self.fbb_.end_table(self.start_); + self.fbb_.required(o, HttpQueryResult::VT_DATA, "data"); + ::flatbuffers::WIPOffset::new(o.value()) + } +} + +impl ::core::fmt::Debug for HttpQueryResult<'_> { + fn fmt(&self, f: &mut ::core::fmt::Formatter<'_>) -> ::core::fmt::Result { + let mut ds = f.debug_struct("HttpQueryResult"); + ds.field("data_type", &self.data_type()); + match self.data_type() { + HttpQueryResultData::HttpQueryResultArrow => { + if let Some(x) = self.data_as_http_query_result_arrow() { + ds.field("data", &x) + } else { + ds.field( + "data", + &"InvalidFlatbuffer: Union discriminant does not match value.", + ) + } + } + _ => { + let x: Option<()> = None; + ds.field("data", &x) + } + }; + ds.finish() + } +} #[inline] /// Verifies that a buffer of bytes contains a `HttpMessage` /// and returns it. diff --git a/rust/cubesql/cubesql/src/compile/date_parser.rs b/rust/cubesql/cubesql/src/compile/date_parser.rs index ead20967032cd..1bfa57c493b79 100644 --- a/rust/cubesql/cubesql/src/compile/date_parser.rs +++ b/rust/cubesql/cubesql/src/compile/date_parser.rs @@ -2,20 +2,173 @@ use crate::compile::engine::df::scan::DataFusionError; use chrono::{NaiveDate, NaiveDateTime}; pub fn parse_date_str(s: &str) -> Result { - let parsed = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") - .or_else(|_| NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f")) - .or_else(|_| NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S")) - .or_else(|_| NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f UTC")) - .or_else(|_| NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f")) - .or_else(|_| NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.fZ")) - .or_else(|_| { - NaiveDate::parse_from_str(s, "%Y-%m-%d").map(|date| date.and_hms_opt(0, 0, 0).unwrap()) - }); + if let Some(parsed) = parse_fast(s) { + return Ok(parsed); + } - parsed.map_err(|e| { + parse_with_chrono(s).map_err(|e| { DataFusionError::Internal(format!( "Can't parse date/time string literal {:?}: {}", s, e )) }) } + +#[inline] +fn digit(b: u8) -> Option { + let d = b.wrapping_sub(b'0'); + if d <= 9 { + Some(d as u32) + } else { + None + } +} + +#[inline] +fn ascii_u32_2(b: &[u8], at: usize) -> Option { + Some(digit(b[at])? * 10 + digit(b[at + 1])?) +} + +#[inline] +fn ascii_u32_4(b: &[u8], at: usize) -> Option { + Some( + digit(b[at])? * 1000 + digit(b[at + 1])? * 100 + digit(b[at + 2])? * 10 + digit(b[at + 3])?, + ) +} + +/// Recognizes only `YYYY-MM-DDTHH:MM:SS.fff` (23 bytes, `T` separator, 3-digit fraction). +fn parse_fast(s: &str) -> Option { + let b = s.as_bytes(); + if b.len() != 23 + || b[4] != b'-' + || b[7] != b'-' + || b[10] != b'T' + || b[13] != b':' + || b[16] != b':' + || b[19] != b'.' + { + return None; + } + + let year = ascii_u32_4(b, 0)? as i32; + let month = ascii_u32_2(b, 5)?; + let day = ascii_u32_2(b, 8)?; + let hour = ascii_u32_2(b, 11)?; + let minute = ascii_u32_2(b, 14)?; + let second = ascii_u32_2(b, 17)?; + let frac_millis = digit(b[20])? * 100 + digit(b[21])? * 10 + digit(b[22])?; + + NaiveDate::from_ymd_opt(year, month, day)?.and_hms_nano_opt( + hour, + minute, + second, + frac_millis * 1_000_000, + ) +} + +fn parse_with_chrono(s: &str) -> chrono::ParseResult { + NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f") + .or_else(|_| NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f")) + .or_else(|_| NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S")) + .or_else(|_| NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S")) + .or_else(|_| NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.fZ")) + .or_else(|_| NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f UTC")) + .or_else(|_| { + NaiveDate::parse_from_str(s, "%Y-%m-%d").map(|date| date.and_hms_opt(0, 0, 0).unwrap()) + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn ymd_hmsn(y: i32, m: u32, d: u32, h: u32, mi: u32, s: u32, n: u32) -> NaiveDateTime { + NaiveDate::from_ymd_opt(y, m, d) + .unwrap() + .and_hms_nano_opt(h, mi, s, n) + .unwrap() + } + + #[test] + fn fast_path_accepts_canonical_shape() { + let cases: &[(&str, NaiveDateTime)] = &[ + ("2022-01-01T00:00:00.000", ymd_hmsn(2022, 1, 1, 0, 0, 0, 0)), + ( + "2024-06-15T13:45:07.123", + ymd_hmsn(2024, 6, 15, 13, 45, 7, 123_000_000), + ), + ( + "9999-12-31T23:59:59.999", + ymd_hmsn(9999, 12, 31, 23, 59, 59, 999_000_000), + ), + ]; + + for (input, expected) in cases { + assert_eq!(parse_fast(input), Some(*expected), "fast path: {}", input); + assert_eq!( + parse_date_str(input).unwrap(), + *expected, + "wrapper: {}", + input + ); + } + } + + #[test] + fn fast_path_rejects_non_canonical() { + // Wrong length / shape — must not be fast-parsed. + let rejects = [ + "2022", + "2022-01-01", + "2022-01-01 00:00:00", + "2022-01-01T00:00:00", + "2022-01-01T00:00:00.000Z", + "2022-01-01 00:00:00.000", + "2022-01-01T00:00:00.123456", + "2022-13-01T00:00:00.000", + "2022-01-32T00:00:00.000", + "2022/01/01T00:00:00.000", + "2022-01-01x00:00:00.000", + "2022-01-01T25:00:00.000", + "2022-01-01T00:60:00.000", + "2022-01-01T00:00:60.000", + ]; + for s in rejects { + assert!(parse_fast(s).is_none(), "unexpectedly fast-parsed: {:?}", s); + } + } + + #[test] + fn wrapper_handles_other_shapes_via_chrono_fallback() { + let cases: &[(&str, NaiveDateTime)] = &[ + ("2022-01-01", ymd_hmsn(2022, 1, 1, 0, 0, 0, 0)), + ("2022-01-01 00:00:00", ymd_hmsn(2022, 1, 1, 0, 0, 0, 0)), + ("2022-01-01T00:00:00", ymd_hmsn(2022, 1, 1, 0, 0, 0, 0)), + ("2022-01-01T00:00:00.000Z", ymd_hmsn(2022, 1, 1, 0, 0, 0, 0)), + ( + "2022-01-01 00:00:00.000 UTC", + ymd_hmsn(2022, 1, 1, 0, 0, 0, 0), + ), + ( + "2024-06-15T13:45:07.123456789", + ymd_hmsn(2024, 6, 15, 13, 45, 7, 123_456_789), + ), + ]; + + for (input, expected) in cases { + assert_eq!( + parse_date_str(input).unwrap(), + *expected, + "wrapper: {}", + input + ); + } + } + + #[test] + fn rejects_propagate_through_wrapper() { + for s in ["", "2022/01/01", "2022-01-01T00:00:00+02:00"] { + assert!(parse_date_str(s).is_err(), "should error: {:?}", s); + } + } +} diff --git a/rust/cubesql/cubesql/src/compile/engine/df/scan.rs b/rust/cubesql/cubesql/src/compile/engine/df/scan.rs index f2406c1070821..377e699189cf3 100644 --- a/rust/cubesql/cubesql/src/compile/engine/df/scan.rs +++ b/rust/cubesql/cubesql/src/compile/engine/df/scan.rs @@ -1086,14 +1086,12 @@ macro_rules! transform_response_body { (FieldValue::String(s), builder) => { let timestamp = parse_date_str(s.as_ref())?; // TODO switch parsing to microseconds - if timestamp.and_utc().timestamp_millis() > (((1i64) << 62) / 1_000_000) { - builder.append_null()?; - } else if let Some(nanos) = timestamp.and_utc().timestamp_nanos_opt() { + if let Some(nanos) = timestamp.and_utc().timestamp_nanos_opt() { builder.append_value(nanos)?; } else { log::error!( "Unable to cast timestamp value to nanoseconds: {}", - timestamp.to_string() + timestamp ); builder.append_null()?; } @@ -1114,12 +1112,7 @@ macro_rules! transform_response_body { { (FieldValue::String(s), builder) => { let timestamp = parse_date_str(s.as_ref())?; - // TODO switch parsing to microseconds - if timestamp.and_utc().timestamp_millis() > (((1 as i64) << 62) / 1_000_000) { - builder.append_null()?; - } else { - builder.append_value(timestamp.and_utc().timestamp_millis())?; - } + builder.append_value(timestamp.and_utc().timestamp_millis())?; }, }, { @@ -1136,28 +1129,18 @@ macro_rules! transform_response_body { field_name, { (FieldValue::String(s), builder) => { - let date = NaiveDate::parse_from_str(s.as_ref(), "%Y-%m-%d") - // FIXME: temporary solution for cases when expected type is Date32 - // but underlying data is a Timestamp - .or_else(|_| NaiveDate::parse_from_str(s.as_ref(), "%Y-%m-%dT00:00:00.000")) - .map_err(|e| { - DataFusionError::Execution(format!( - "Can't parse date: '{}': {}", - s, e - )) - }); - match date { - Ok(date) => { + match parse_date_str(s.as_ref()) { + Ok(timestamp) => { let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(); - let days_since_epoch = date.num_days_from_ce() - epoch.num_days_from_ce(); + let days_since_epoch = timestamp.date().num_days_from_ce() + - epoch.num_days_from_ce(); builder.append_value(days_since_epoch)?; } Err(error) => { log::error!( "Unable to parse value as Date32: {}", - error.to_string() + error ); - builder.append_null()? } } @@ -1375,7 +1358,9 @@ mod tests { use cubeclient::models::V1LoadResponse; use datafusion::{ arrow::{ - array::{BooleanArray, Float64Array, StringArray, TimestampNanosecondArray}, + array::{ + BooleanArray, Date32Array, Float64Array, StringArray, TimestampNanosecondArray, + }, datatypes::{Field, Schema}, }, execution::{ @@ -1441,11 +1426,11 @@ mod tests { "timeDimensions": [] }, "data": [ - {"KibanaSampleDataEcommerce.count": null, "KibanaSampleDataEcommerce.maxPrice": null, "KibanaSampleDataEcommerce.isBool": null, "KibanaSampleDataEcommerce.orderDate": null, "KibanaSampleDataEcommerce.city": "City 1"}, - {"KibanaSampleDataEcommerce.count": 5, "KibanaSampleDataEcommerce.maxPrice": 5.05, "KibanaSampleDataEcommerce.isBool": true, "KibanaSampleDataEcommerce.orderDate": "2022-01-01 00:00:00.000", "KibanaSampleDataEcommerce.city": "City 2"}, - {"KibanaSampleDataEcommerce.count": "5", "KibanaSampleDataEcommerce.maxPrice": "5.05", "KibanaSampleDataEcommerce.isBool": false, "KibanaSampleDataEcommerce.orderDate": "2023-01-01 00:00:00.000", "KibanaSampleDataEcommerce.city": "City 3"}, - {"KibanaSampleDataEcommerce.count": null, "KibanaSampleDataEcommerce.maxPrice": null, "KibanaSampleDataEcommerce.isBool": "true", "KibanaSampleDataEcommerce.orderDate": "9999-12-31 00:00:00.000", "KibanaSampleDataEcommerce.city": "City 4"}, - {"KibanaSampleDataEcommerce.count": null, "KibanaSampleDataEcommerce.maxPrice": null, "KibanaSampleDataEcommerce.isBool": "false", "KibanaSampleDataEcommerce.orderDate": null, "KibanaSampleDataEcommerce.city": null} + {"KibanaSampleDataEcommerce.count": null, "KibanaSampleDataEcommerce.maxPrice": null, "KibanaSampleDataEcommerce.isBool": null, "KibanaSampleDataEcommerce.orderTimestamp": null, "KibanaSampleDataEcommerce.orderDate": null, "KibanaSampleDataEcommerce.city": "City 1"}, + {"KibanaSampleDataEcommerce.count": 5, "KibanaSampleDataEcommerce.maxPrice": 5.05, "KibanaSampleDataEcommerce.isBool": true, "KibanaSampleDataEcommerce.orderTimestamp": "2022-01-01 00:00:00.000", "KibanaSampleDataEcommerce.orderDate": "2022-01-01", "KibanaSampleDataEcommerce.city": "City 2"}, + {"KibanaSampleDataEcommerce.count": "5", "KibanaSampleDataEcommerce.maxPrice": "5.05", "KibanaSampleDataEcommerce.isBool": false, "KibanaSampleDataEcommerce.orderTimestamp": "2023-01-01 00:00:00.000", "KibanaSampleDataEcommerce.orderDate": "2023-01-01", "KibanaSampleDataEcommerce.city": "City 3"}, + {"KibanaSampleDataEcommerce.count": null, "KibanaSampleDataEcommerce.maxPrice": null, "KibanaSampleDataEcommerce.isBool": "true", "KibanaSampleDataEcommerce.orderTimestamp": "9999-12-31 00:00:00.000", "KibanaSampleDataEcommerce.orderDate": "9999-12-31", "KibanaSampleDataEcommerce.city": "City 4"}, + {"KibanaSampleDataEcommerce.count": null, "KibanaSampleDataEcommerce.maxPrice": null, "KibanaSampleDataEcommerce.isBool": "false", "KibanaSampleDataEcommerce.orderTimestamp": null, "KibanaSampleDataEcommerce.orderDate": null, "KibanaSampleDataEcommerce.city": null} ] }] } @@ -1498,7 +1483,7 @@ mod tests { } #[tokio::test] - async fn test_df_cube_scan_execute() { + async fn test_df_cube_scan_execute() -> Result<(), CubeError> { assert_eq!(std::mem::size_of::(), 24); let schema = Arc::new(Schema::new(vec![ @@ -1510,7 +1495,7 @@ mod tests { false, ), Field::new( - "KibanaSampleDataEcommerce.orderDate", + "KibanaSampleDataEcommerce.orderTimestamp", DataType::Timestamp(TimeUnit::Nanosecond, None), false, ), @@ -1521,6 +1506,11 @@ mod tests { false, ), Field::new("KibanaSampleDataEcommerce.city", DataType::Utf8, false), + Field::new( + "KibanaSampleDataEcommerce.orderDate", + DataType::Date32, + false, + ), ])); let scan_node = CubeScanExecutionPlan { @@ -1543,6 +1533,7 @@ mod tests { ]), dimensions: Some(vec![ "KibanaSampleDataEcommerce.isBool".to_string(), + "KibanaSampleDataEcommerce.orderTimestamp".to_string(), "KibanaSampleDataEcommerce.orderDate".to_string(), "KibanaSampleDataEcommerce.city".to_string(), ]), @@ -1565,9 +1556,7 @@ mod tests { config_obj: crate::config::Config::test().config_obj(), }; - let runtime = Arc::new( - RuntimeEnv::new(RuntimeConfig::new()).expect("Unable to create RuntimeEnv for testing"), - ); + let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::new())?); let task = Arc::new(TaskContext::new( "test".to_string(), "session".to_string(), @@ -1576,8 +1565,8 @@ mod tests { HashMap::new(), runtime, )); - let stream = scan_node.execute(0, task).await.unwrap(); - let batches = common::collect(stream).await.unwrap(); + let stream = scan_node.execute(0, task).await?; + let batches = common::collect(stream).await?; assert_eq!( batches[0], @@ -1627,9 +1616,17 @@ mod tests { Some("City 4"), None ])) as ArrayRef, + Arc::new(Date32Array::from(vec![ + None, + Some(18993), + Some(19358), + Some(2_932_896), + None, + ])) as ArrayRef, ], - ) - .unwrap() - ) + )? + ); + + Ok(()) } } diff --git a/rust/cubestore/Cargo.lock b/rust/cubestore/Cargo.lock index 8d64c5e44ec70..0c9b149ea8f63 100644 --- a/rust/cubestore/Cargo.lock +++ b/rust/cubestore/Cargo.lock @@ -1090,7 +1090,7 @@ dependencies = [ "bitflags 1.3.2", "strsim", "textwrap", - "unicode-width", + "unicode-width 0.1.8", "vec_map", ] @@ -1166,7 +1166,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3538270d33cc669650c4b093848450d380def10c331d38c768e34cac80576e6e" dependencies = [ "termcolor", - "unicode-width", + "unicode-width 0.1.8", ] [[package]] @@ -1181,13 +1181,13 @@ dependencies = [ [[package]] name = "comfy-table" -version = "7.1.1" +version = "7.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b34115915337defe99b2aff5c2ce6771e5fbc4079f4b506301f5cf394c8452f7" +checksum = "958c5d6ecf1f214b4c2bbbbf6ab9523a864bd136dcf71a7e8904799acfe1ad47" dependencies = [ - "strum", - "strum_macros", - "unicode-width", + "crossterm", + "unicode-segmentation", + "unicode-width 0.2.2", ] [[package]] @@ -1199,6 +1199,18 @@ dependencies = [ "cache-padded", ] +[[package]] +name = "console" +version = "0.15.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e1f83fc076bd6dd27517eacdf25fef6c4dfe5f1d7448bafaaf3a26f13b5e4eb" +dependencies = [ + "encode_unicode", + "lazy_static", + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "const-random" version = "0.1.18" @@ -1415,6 +1427,29 @@ dependencies = [ "cfg-if 1.0.0", ] +[[package]] +name = "crossterm" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8b9f2e4c67f833b660cdb0a3523065869fb35570177239812ed4c905aeff87b" +dependencies = [ + "bitflags 2.11.1", + "crossterm_winapi", + "document-features", + "parking_lot", + "rustix 1.1.4", + "winapi 0.3.9", +] + +[[package]] +name = "crossterm_winapi" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acdd7c62a3665c7f6830a51635d9ac9b23ed385797f70a83bb8bafe9c572ab2b" +dependencies = [ + "winapi 0.3.9", +] + [[package]] name = "crunchy" version = "0.2.2" @@ -1540,6 +1575,7 @@ dependencies = [ "chrono", "chrono-tz 0.8.2", "cloud-storage", + "comfy-table", "criterion", "csv", "ctor", @@ -1566,6 +1602,7 @@ dependencies = [ "humansize", "indexmap", "indoc", + "insta", "ipc-channel", "itertools 0.14.0", "json", @@ -2327,6 +2364,15 @@ dependencies = [ "const-random", ] +[[package]] +name = "document-features" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4b8a88685455ed29a21542a33abd9cb6510b6b129abadabdcef0f4c55bc8f61" +dependencies = [ + "litrs", +] + [[package]] name = "dotenv" version = "0.15.0" @@ -2371,6 +2417,12 @@ version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" +[[package]] +name = "encode_unicode" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" + [[package]] name = "encoding_rs" version = "0.8.28" @@ -2397,9 +2449,9 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" [[package]] name = "errno" -version = "0.3.8" +version = "0.3.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a258e46cdc063eb8519c00b9fc845fc47bcfca4130e2f08e88665ceda8474245" +checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", "windows-sys 0.52.0", @@ -3329,6 +3381,18 @@ dependencies = [ "generic-array 0.14.4", ] +[[package]] +name = "insta" +version = "1.41.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e9ffc4d4892617c50a928c52b2961cb5174b6fc6ebf252b2fac9d21955c48b8" +dependencies = [ + "console", + "lazy_static", + "linked-hash-map", + "similar", +] + [[package]] name = "instant" version = "0.1.10" @@ -3601,9 +3665,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.172" +version = "0.2.186" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa" +checksum = "68ab91017fe16c622486840e4c83c9a37afeff978bd239b5293d61ece587de66" [[package]] name = "libloading" @@ -3657,18 +3721,36 @@ dependencies = [ "cc", ] +[[package]] +name = "linked-hash-map" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" + [[package]] name = "linux-raw-sys" version = "0.4.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c" +[[package]] +name = "linux-raw-sys" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a66949e030da00e8c7d4434b251670a91556f4144941d37452769c25d58a53" + [[package]] name = "litemap" version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "23fb14cb19457329c82206317a5663005a4d404783dc74f4252769b0d5f42856" +[[package]] +name = "litrs" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11d3d7f243d5c5a8b9bb5d6dd2b1602c0cb0b9db1621bafc7ed66e35ff9fe092" + [[package]] name = "lock_api" version = "0.4.12" @@ -5556,7 +5638,20 @@ dependencies = [ "bitflags 2.11.1", "errno", "libc", - "linux-raw-sys", + "linux-raw-sys 0.4.13", + "windows-sys 0.52.0", +] + +[[package]] +name = "rustix" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6fe4565b9518b83ef4f91bb47ce29620ca828bd32cb7e408f0062e9930ba190" +dependencies = [ + "bitflags 2.11.1", + "errno", + "libc", + "linux-raw-sys 0.12.1", "windows-sys 0.52.0", ] @@ -5896,6 +5991,12 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3a9fe34e3e7a50316060351f37187a3f546bce95496156754b601a5fa71b76e" +[[package]] +name = "similar" +version = "2.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbbb5d9659141646ae647b42fe094daf6c6192d1620870b449d9557f748b2daa" + [[package]] name = "simple_asn1" version = "0.4.1" @@ -6096,25 +6197,6 @@ dependencies = [ "syn 1.0.107", ] -[[package]] -name = "strum" -version = "0.26.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8fec0f0aef304996cf250b31b5a10dee7980c85da9d759361292b8bca5a18f06" - -[[package]] -name = "strum_macros" -version = "0.26.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c6bee85a5a24955dc440386795aa378cd9cf82acd5f764469152d2270e581be" -dependencies = [ - "heck 0.5.0", - "proc-macro2", - "quote", - "rustversion", - "syn 2.0.87", -] - [[package]] name = "subtle" version = "2.5.0" @@ -6224,7 +6306,7 @@ checksum = "85b77fafb263dd9d05cbeac119526425676db3784113aa9295c88498cbf8bff1" dependencies = [ "cfg-if 1.0.0", "fastrand 2.0.2", - "rustix", + "rustix 0.38.32", "windows-sys 0.52.0", ] @@ -6243,7 +6325,7 @@ version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060" dependencies = [ - "unicode-width", + "unicode-width 0.1.8", ] [[package]] @@ -6777,6 +6859,12 @@ version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9337591893a19b88d8d87f2cec1e73fad5cdfd10e5a6f349f498ad6ea2ffb1e3" +[[package]] +name = "unicode-width" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4ac048d71ede7ee76d585517add45da530660ef4390e49b098733c6e897f254" + [[package]] name = "unicode-xid" version = "0.2.2" diff --git a/rust/cubestore/cubestore/Cargo.toml b/rust/cubestore/cubestore/Cargo.toml index 5d2e60156e5d1..e20446310c235 100644 --- a/rust/cubestore/cubestore/Cargo.toml +++ b/rust/cubestore/cubestore/Cargo.toml @@ -108,6 +108,7 @@ anyhow = "1.0" arc-swap = "1.7.1" object_store = "0.11.1" prost = "0.13.1" +comfy-table = "7.2.2" [target.'cfg(target_os = "linux")'.dependencies] rdkafka = { version = "0.29.0", features = ["ssl", "gssapi", "cmake-build"] } @@ -125,6 +126,7 @@ sasl2-sys = { version = "0.1.6", features = ["vendored"] } pretty_assertions = "0.7.1" criterion = { version = "0.5.1", features = ["async_tokio", "html_reports"] } md5 = "0.8.0" +insta = "1.34" [[bench]] name = "cachestore_queue" diff --git a/rust/cubestore/cubestore/src/http/mod.rs b/rust/cubestore/cubestore/src/http/mod.rs index 38b49c6c6e79c..5fffbfff33dd2 100644 --- a/rust/cubestore/cubestore/src/http/mod.rs +++ b/rust/cubestore/cubestore/src/http/mod.rs @@ -16,8 +16,9 @@ use crate::{app_metrics, CubeError}; use async_std::fs::File; use cubeshared::codegen::{ root_as_http_message, HttpColumnValue, HttpColumnValueArgs, HttpError, HttpErrorArgs, - HttpMessageArgs, HttpParameterValue, HttpQuery, HttpQueryArgs, HttpResultSet, - HttpResultSetArgs, HttpRow, HttpRowArgs, + HttpMessageArgs, HttpParameterValue, HttpQuery, HttpQueryArgs, HttpQueryResult, + HttpQueryResultArgs, HttpQueryResultArrow, HttpQueryResultArrowArgs, HttpQueryResultData, + HttpResultSet, HttpResultSetArgs, HttpRow, HttpRowArgs, QueryResultFormat, }; use cubeshared::flatbuffers::{FlatBufferBuilder, ForwardsUOffset, Vector, WIPOffset}; use datafusion::cube_ext; @@ -326,6 +327,7 @@ impl HttpServer { HttpCommand::Error { error } => format!("HttpCommand::Error {{ error: {:?} }}", error), HttpCommand::CloseConnection { error } => format!("HttpCommand::CloseConnection {{ error: {:?} }}", error), HttpCommand::ResultSet { .. } => format!("HttpCommand::ResultSet {{}}"), + HttpCommand::QueryResultArrow { .. } => format!("HttpCommand::QueryResultArrow {{}}"), }; log::error!( "Error processing HTTP command (connection_id={}): {}\nThe command: {}", @@ -406,6 +408,7 @@ impl HttpServer { HttpCommand::Error { error } => format!("HttpCommand::Error {{ error: {:?} }}", error), HttpCommand::CloseConnection { error } => format!("HttpCommand::CloseConnection {{ error: {:?} }}", error), HttpCommand::ResultSet { .. } => format!("HttpCommand::ResultSet {{}}"), + HttpCommand::QueryResultArrow { .. } => format!("HttpCommand::QueryResultArrow {{}}"), }; let res = HttpServer::process_command( sql_service.clone(), @@ -580,8 +583,9 @@ impl HttpServer { inline_tables, trace_obj, parameters, - } => Ok(HttpCommand::ResultSet { - data_frame: sql_service + response_format, + } => { + let query_result = sql_service .exec_query_with_context( sql_query_context .with_trace_obj(trace_obj) @@ -589,10 +593,21 @@ impl HttpServer { .with_parameters(¶meters), &query, ) - .await? - .collect() - .await?, - }), + .await?; + match response_format { + QueryResultFormat::Legacy => Ok(HttpCommand::ResultSet { + data_frame: query_result.collect().await?, + }), + QueryResultFormat::Arrow => { + let data = query_result.to_arrow_ipc_stream().await?; + Ok(HttpCommand::QueryResultArrow { data }) + } + other => Err(CubeError::user(format!( + "Unsupported response_format: {:?}", + other + ))), + } + } x => Err(CubeError::user(format!("Unexpected command: {:?}", x))), } } @@ -642,10 +657,17 @@ pub enum HttpCommand { inline_tables: InlineTables, trace_obj: Option, parameters: Option, + response_format: QueryResultFormat, }, ResultSet { data_frame: Arc, }, + QueryResultArrow { + /// Pre-serialized Arrow IPC stream payload. May contain multiple + /// RecordBatch messages following the schema header; consumers must + /// decode it with a streaming IPC reader. + data: Vec, + }, CloseConnection { error: String, }, @@ -663,6 +685,9 @@ impl HttpMessage { command_type: match self.command { HttpCommand::Query { .. } => cubeshared::codegen::HttpCommand::HttpQuery, HttpCommand::ResultSet { .. } => cubeshared::codegen::HttpCommand::HttpResultSet, + HttpCommand::QueryResultArrow { .. } => { + cubeshared::codegen::HttpCommand::HttpQueryResult + } HttpCommand::CloseConnection { .. } | HttpCommand::Error { .. } => { cubeshared::codegen::HttpCommand::HttpError } @@ -673,6 +698,7 @@ impl HttpMessage { inline_tables, trace_obj, parameters, + response_format, } => { let query_offset = builder.create_string(&query); let trace_obj_offset = trace_obj.as_ref().map(|o| builder.create_string(o)); @@ -693,6 +719,29 @@ impl HttpMessage { inline_tables: None, trace_obj: trace_obj_offset, parameters: None, + response_format: *response_format, + }, + ) + .as_union_value(), + ) + } + HttpCommand::QueryResultArrow { data } => { + let payload = builder.create_vector(data); + let arrow_table = HttpQueryResultArrow::create( + &mut builder, + &HttpQueryResultArrowArgs { + data: Some(payload), + // We don't support streaming for now, but clients should implement it + // according to the protocol specification + is_last: true, + }, + ); + Some( + HttpQueryResult::create( + &mut builder, + &HttpQueryResultArgs { + data_type: HttpQueryResultData::HttpQueryResultArrow, + data: Some(arrow_table.as_union_value()), }, ) .as_union_value(), @@ -929,6 +978,7 @@ impl HttpMessage { trace_obj: query.trace_obj().map(|q| q.to_string()), inline_tables, parameters, + response_format: query.response_format(), } } cubeshared::codegen::HttpCommand::HttpResultSet => { @@ -999,6 +1049,7 @@ mod tests { use futures_util::{SinkExt, StreamExt}; use indoc::indoc; use log::trace; + use pretty_assertions::assert_eq; use std::path::Path; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; @@ -1021,7 +1072,7 @@ mod tests { } #[tokio::test] - async fn query_test() { + async fn query_test() -> Result<(), CubeError> { let message = HttpMessage { message_id: 1234, command: HttpCommand::Query { @@ -1029,18 +1080,18 @@ mod tests { inline_tables: vec![], trace_obj: Some("test trace".to_string()), parameters: None, + response_format: QueryResultFormat::Legacy, }, connection_id: Some("foo".to_string()), }; let bytes = message.bytes(); - let output_message = HttpMessage::read(root_as_http_message(&bytes).unwrap()) - .await - .unwrap(); + let output_message = HttpMessage::read(root_as_http_message(&bytes)?).await?; assert_eq!(message, output_message); + Ok(()) } #[tokio::test] - async fn inline_tables_query_test() { + async fn inline_tables_query_test() -> Result<(), CubeError> { let columns = vec![ Column::new("A".to_string(), ColumnType::Int, 0), Column::new("B".to_string(), ColumnType::String, 1), @@ -1100,6 +1151,7 @@ mod tests { inline_tables: Some(inline_tables_offset), trace_obj: None, parameters: None, + response_format: QueryResultFormat::Legacy, }, ); let args = HttpMessageArgs { @@ -1111,9 +1163,7 @@ mod tests { let message = cubeshared::codegen::HttpMessage::create(&mut builder, &args); builder.finish(message, None); let bytes = builder.finished_data().to_vec(); - let message = HttpMessage::read(root_as_http_message(&bytes).unwrap()) - .await - .unwrap(); + let message = HttpMessage::read(root_as_http_message(&bytes)?).await?; assert_eq!( message, HttpMessage { @@ -1126,11 +1176,168 @@ mod tests { Arc::new(DataFrame::new(columns, rows.clone())) )], trace_obj: None, - parameters: None + parameters: None, + response_format: QueryResultFormat::Legacy, }, connection_id: Some("foo".to_string()), } ); + Ok(()) + } + + #[tokio::test] + async fn arrow_response_format_round_trip() -> Result<(), CubeError> { + use crate::queryplanner::query_executor::batches_to_dataframe; + use crate::sql::{timestamp_from_string, QueryResult}; + use crate::util::decimal::{Decimal, Decimal96}; + use crate::util::int96::Int96; + use cubeshared::codegen::{root_as_http_message, HttpQueryResultData}; + use datafusion::arrow::ipc::reader::StreamReader; + use datafusion::arrow::record_batch::RecordBatch; + + // 1. Build a DataFrame with every TableValue variant + nulls. + let columns = vec![ + Column::new("c_string".to_string(), ColumnType::String, 0), + Column::new("c_int".to_string(), ColumnType::Int, 1), + Column::new("c_int96".to_string(), ColumnType::Int96, 2), + Column::new( + "c_decimal".to_string(), + ColumnType::Decimal { + scale: 4, + precision: 18, + }, + 3, + ), + Column::new( + "c_decimal96".to_string(), + ColumnType::Decimal96 { + scale: 6, + precision: 38, + }, + 4, + ), + Column::new("c_float".to_string(), ColumnType::Float, 5), + Column::new("c_bytes".to_string(), ColumnType::Bytes, 6), + Column::new("c_timestamp".to_string(), ColumnType::Timestamp, 7), + Column::new("c_bool".to_string(), ColumnType::Boolean, 8), + ]; + let rows = vec![ + Row::new(vec![ + TableValue::String("hello".to_string()), + TableValue::Int(42), + TableValue::Int96(Int96::new(123_456_789_012_345_i128)), + TableValue::Decimal(Decimal::new(12345)), + TableValue::Decimal96(Decimal96::new(67890)), + TableValue::Float(3.5_f64.into()), + TableValue::Bytes(vec![0x01, 0x02, 0x03]), + TableValue::Timestamp(timestamp_from_string("2024-01-15T10:30:45.123Z")?), + TableValue::Boolean(true), + ]), + Row::new(vec![ + TableValue::Null, + TableValue::Null, + TableValue::Null, + TableValue::Null, + TableValue::Null, + TableValue::Null, + TableValue::Null, + TableValue::Null, + TableValue::Null, + ]), + ]; + let original_df = Arc::new(DataFrame::new(columns.clone(), rows)); + + // 2. Drive process_command with response_format = Arrow. + struct StubService(Arc); + crate::di_service!(StubService, [SqlService]); + #[async_trait] + impl SqlService for StubService { + async fn exec_query(&self, _q: &str) -> Result { + unimplemented!("Mock") + } + async fn exec_query_with_context( + &self, + _ctx: SqlQueryContext, + _q: &str, + ) -> Result { + Ok(QueryResult::Frame(self.0.clone())) + } + async fn plan_query(&self, _q: &str) -> Result { + unimplemented!("Mock") + } + async fn plan_query_with_context( + &self, + _ctx: SqlQueryContext, + _q: &str, + ) -> Result { + unimplemented!("Mock") + } + async fn upload_temp_file( + &self, + _ctx: SqlQueryContext, + _name: String, + _path: &Path, + ) -> Result<(), CubeError> { + unimplemented!("Mock") + } + async fn temp_uploads_dir(&self, _ctx: SqlQueryContext) -> Result { + unimplemented!("Mock") + } + } + + let svc = Arc::new(StubService(original_df.clone())); + let resp = HttpServer::process_command( + svc, + SqlQueryContext::default(), + HttpCommand::Query { + query: "select 1".to_string(), + inline_tables: vec![], + trace_obj: None, + parameters: None, + response_format: QueryResultFormat::Arrow, + }, + ) + .await?; + let arrow_bytes = match resp { + HttpCommand::QueryResultArrow { data } => data, + other => panic!("expected QueryResultArrow, got: {:?}", other), + }; + + // 3. Round-trip through HttpMessage::bytes() and verify the wire shape. + let wire = HttpMessage { + message_id: 99, + command: HttpCommand::QueryResultArrow { + data: arrow_bytes.clone(), + }, + connection_id: None, + } + .bytes(); + let parsed = root_as_http_message(&wire)?; + assert_eq!( + parsed.command_type(), + cubeshared::codegen::HttpCommand::HttpQueryResult + ); + let result = parsed.command_as_http_query_result().unwrap(); + assert_eq!( + result.data_type(), + HttpQueryResultData::HttpQueryResultArrow + ); + let arrow = result.data_as_http_query_result_arrow().unwrap(); + let payload: Vec = arrow.data().iter().collect(); + assert_eq!(payload, arrow_bytes); + + // 4. Decode the Arrow IPC stream, round-trip back to a DataFrame + let reader = StreamReader::try_new(std::io::Cursor::new(payload), None).unwrap(); + let batches: Vec = reader.collect::>().unwrap(); + + let decoded = batches_to_dataframe(batches)?; + // we don't compare directly both dataframes, because there is a difference with decimal96 + assert_eq!(decoded.get_columns().len(), original_df.get_columns().len()); + assert_eq!(decoded.get_rows().len(), original_df.get_rows().len()); + + insta::assert_snapshot!("arrow_response_format_round_trip", decoded.print()); + + Ok(()) } pub struct SqlServiceMock { @@ -1191,7 +1398,7 @@ mod tests { } #[tokio::test] - async fn ws_test() { + async fn ws_test() -> Result<(), CubeError> { init_test_logger().await; let sql_service = SqlServiceMock { @@ -1241,6 +1448,7 @@ mod tests { inline_tables: vec![], trace_obj: None, parameters: None, + response_format: QueryResultFormat::Legacy, }, connection_id, } @@ -1370,5 +1578,6 @@ mod tests { assert_message(&mut socket2, "10").await; http_server.stop_processing().await; + Ok(()) } } diff --git a/rust/cubestore/cubestore/src/http/snapshots/cubestore__http__tests__arrow_response_format_round_trip.snap b/rust/cubestore/cubestore/src/http/snapshots/cubestore__http__tests__arrow_response_format_round_trip.snap new file mode 100644 index 0000000000000..bafc79a2734c5 --- /dev/null +++ b/rust/cubestore/cubestore/src/http/snapshots/cubestore__http__tests__arrow_response_format_round_trip.snap @@ -0,0 +1,12 @@ +--- +source: cubestore/src/http/mod.rs +assertion_line: 1337 +expression: decoded.print() +snapshot_kind: text +--- ++----------+-------+-----------------+-----------+-------------+---------+----------+--------------------------+--------+ +| c_string | c_int | c_int96 | c_decimal | c_decimal96 | c_float | c_bytes | c_timestamp | c_bool | ++----------+-------+-----------------+-----------+-------------+---------+----------+--------------------------+--------+ +| hello | 42 | 123456789012345 | 1.2345 | 0.06789 | 3.5 | 0x010203 | 2024-01-15T10:30:45.123Z | true | +| NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | ++----------+-------+-----------------+-----------+-------------+---------+----------+--------------------------+--------+ diff --git a/rust/cubestore/cubestore/src/sql/mod.rs b/rust/cubestore/cubestore/src/sql/mod.rs index 60f2f9889a75a..ad21a22acd8db 100644 --- a/rust/cubestore/cubestore/src/sql/mod.rs +++ b/rust/cubestore/cubestore/src/sql/mod.rs @@ -119,6 +119,52 @@ impl QueryResult { } } } + + pub async fn to_arrow_ipc_stream(self) -> Result, CubeError> { + // It's used to handle conversion + write in single spawn_blocking + enum Pending { + // CPU-bound task should be converted on spawn_blocking + Frame(Arc), + Batches { + schema: SchemaRef, + batches: Vec, + }, + } + let pending = match self { + QueryResult::Frame(df) => Pending::Frame(df), + QueryResult::Stream { schema, batches } => Pending::Batches { + schema, + batches: batches.try_collect().await?, + }, + }; + + cube_ext::spawn_blocking(move || -> Result, CubeError> { + use datafusion::arrow::ipc::writer::StreamWriter; + use std::io::Cursor; + + let (schema, batches) = match pending { + Pending::Frame(df) => { + let schema = df.get_schema(); + let arrays = data::rows_to_columns(df.get_columns(), df.get_rows()); + let batch = RecordBatch::try_new(schema.clone(), arrays)?; + (schema, vec![batch]) + } + Pending::Batches { schema, batches } => (schema, batches), + }; + + let mut writer = StreamWriter::try_new(Cursor::new(Vec::new()), schema.as_ref())?; + + // Writes multiple batches, because it's Arrow IPC stream format, client should handle it + for batch in &batches { + writer.write(batch)?; + } + + writer.finish()?; + + Ok(writer.into_inner()?.into_inner()) + }) + .await? + } } impl std::fmt::Debug for QueryResult { diff --git a/rust/cubestore/cubestore/src/store/mod.rs b/rust/cubestore/cubestore/src/store/mod.rs index 0535e72de33a1..0e3325264f2d7 100644 --- a/rust/cubestore/cubestore/src/store/mod.rs +++ b/rust/cubestore/cubestore/src/store/mod.rs @@ -97,6 +97,25 @@ impl DataFrame { &self.data } + pub fn print(&self) -> String { + use comfy_table::{Cell, Table}; + + let mut table = Table::new(); + table.load_preset("||--+-++| ++++++"); + table.set_header(self.columns.iter().map(|c| Cell::new(c.get_name()))); + + for row in &self.data { + table.add_row( + self.columns + .iter() + .zip(row.values().iter()) + .map(|(col, value)| value.format_with(col.get_column_type())), + ); + } + + table.trim_fmt() + } + pub fn to_execution_plan( &self, columns: &Vec, diff --git a/rust/cubestore/cubestore/src/table/mod.rs b/rust/cubestore/cubestore/src/table/mod.rs index 858617804e2db..4ea75d1c550df 100644 --- a/rust/cubestore/cubestore/src/table/mod.rs +++ b/rust/cubestore/cubestore/src/table/mod.rs @@ -1,3 +1,4 @@ +use crate::metastore::ColumnType; use crate::util::decimal::{Decimal, Decimal96}; use crate::util::int96::Int96; @@ -117,6 +118,45 @@ impl TableValue { ), } } + + /// Render the value as a string, using `column_type` for context-dependent + /// variants (currently `Decimal` / `Decimal96`, where scale lives on the + /// column rather than on the value). Falls back to `Display` otherwise. + pub fn format_with(&self, column_type: &ColumnType) -> String { + match (self, column_type) { + (TableValue::Decimal(v), ColumnType::Decimal { scale, .. }) => { + v.to_string(*scale as u8) + } + (TableValue::Decimal96(v), ColumnType::Decimal96 { scale, .. }) => { + v.to_string(*scale as u8) + } + (v, _) => v.to_string(), + } + } +} + +impl fmt::Display for TableValue { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + match self { + TableValue::Null => f.write_str("NULL"), + TableValue::String(v) => f.write_str(v), + TableValue::Int(v) => write!(f, "{}", v), + TableValue::Int96(v) => f.write_str(&v.to_string()), + // Scale-unaware fallbacks; `format_with` produces the proper form. + TableValue::Decimal(v) => write!(f, "{}", v.raw_value()), + TableValue::Decimal96(v) => write!(f, "{}", v.raw_value()), + TableValue::Float(v) => write!(f, "{}", v.0), + TableValue::Bytes(v) => { + f.write_str("0x")?; + for b in v { + write!(f, "{:02x}", b)?; + } + Ok(()) + } + TableValue::Timestamp(v) => f.write_str(&v.to_string()), + TableValue::Boolean(v) => write!(f, "{}", v), + } + } } #[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd, Hash)] diff --git a/rust/cubestore/package.json b/rust/cubestore/package.json index 391bf4fa7c429..65facff6d893f 100644 --- a/rust/cubestore/package.json +++ b/rust/cubestore/package.json @@ -51,6 +51,9 @@ "extends": "../../packages/cubejs-linter" }, "jest": { + "roots": [ + "/dist" + ], "testMatch": [ "/dist/test/*.(test|spec).(js)" ],