Skip to content

Commit 27488c8

Browse files
authored
Add TZ support (returnString#304)
1 parent 5509d87 commit 27488c8

4 files changed

Lines changed: 120 additions & 34 deletions

File tree

convergence-arrow/src/table.rs

Lines changed: 56 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
//! Utilities for converting between Arrow and Postgres formats.
22
3+
use std::str::FromStr;
4+
35
use convergence::protocol::{DataTypeOid, ErrorResponse, FieldDescription, SqlState};
46
use convergence::protocol_ext::DataRowBatch;
7+
use datafusion::arrow::array::timezone::Tz;
58
use datafusion::arrow::array::{
6-
BooleanArray, Date32Array, Date64Array, Decimal128Array, Float16Array,
7-
Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, StringArray,
8-
StringViewArray, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
9-
TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array
9+
BooleanArray, Date32Array, Date64Array, Decimal128Array, Float16Array, Float32Array, Float64Array, Int16Array,
10+
Int32Array, Int64Array, Int8Array, StringArray, StringViewArray, TimestampMicrosecondArray,
11+
TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array,
12+
UInt8Array,
1013
};
1114
use datafusion::arrow::datatypes::{DataType, Schema, TimeUnit};
1215
use datafusion::arrow::record_batch::RecordBatch;
@@ -61,23 +64,52 @@ pub fn record_batch_to_rows(arrow_batch: &RecordBatch, pg_batch: &mut DataRowBat
6164
ErrorResponse::error(SqlState::InvalidDatetimeFormat, "unsupported date type")
6265
})?)
6366
}
64-
DataType::Timestamp(unit, None) => row.write_timestamp(
65-
match unit {
66-
TimeUnit::Second => array_val!(TimestampSecondArray, col, row_idx, value_as_datetime),
67-
TimeUnit::Millisecond => {
68-
array_val!(TimestampMillisecondArray, col, row_idx, value_as_datetime)
69-
}
70-
TimeUnit::Microsecond => {
71-
array_val!(TimestampMicrosecondArray, col, row_idx, value_as_datetime)
67+
DataType::Timestamp(unit, tz) => {
68+
match tz {
69+
Some(tz) => {
70+
let tz = Tz::from_str(tz.as_ref()).map_err(|_| {
71+
ErrorResponse::error(SqlState::InvalidDatetimeFormat, "unsupported timezone")
72+
})?;
73+
let dt = match unit {
74+
TimeUnit::Second => array_cast!(TimestampSecondArray, col)
75+
.value_as_datetime_with_tz(row_idx, tz)
76+
.map(|d| d.fixed_offset()),
77+
TimeUnit::Millisecond => array_cast!(TimestampMillisecondArray, col)
78+
.value_as_datetime_with_tz(row_idx, tz)
79+
.map(|d| d.fixed_offset()),
80+
TimeUnit::Microsecond => array_cast!(TimestampMicrosecondArray, col)
81+
.value_as_datetime_with_tz(row_idx, tz)
82+
.map(|d| d.fixed_offset()),
83+
TimeUnit::Nanosecond => array_cast!(TimestampNanosecondArray, col)
84+
.value_as_datetime_with_tz(row_idx, tz)
85+
.map(|d| d.fixed_offset()),
86+
}
87+
.ok_or_else(|| {
88+
ErrorResponse::error(SqlState::InvalidDatetimeFormat, "unsupported timestamp type")
89+
})?;
90+
row.write_timestamp_with_tz(dt)
7291
}
73-
TimeUnit::Nanosecond => {
74-
array_val!(TimestampNanosecondArray, col, row_idx, value_as_datetime)
75-
}
76-
}
77-
.ok_or_else(|| {
78-
ErrorResponse::error(SqlState::InvalidDatetimeFormat, "unsupported timestamp type")
79-
})?,
80-
),
92+
None => row.write_timestamp(
93+
match unit {
94+
TimeUnit::Second => {
95+
array_val!(TimestampSecondArray, col, row_idx, value_as_datetime)
96+
}
97+
TimeUnit::Millisecond => {
98+
array_val!(TimestampMillisecondArray, col, row_idx, value_as_datetime)
99+
}
100+
TimeUnit::Microsecond => {
101+
array_val!(TimestampMicrosecondArray, col, row_idx, value_as_datetime)
102+
}
103+
TimeUnit::Nanosecond => {
104+
array_val!(TimestampNanosecondArray, col, row_idx, value_as_datetime)
105+
}
106+
}
107+
.ok_or_else(|| {
108+
ErrorResponse::error(SqlState::InvalidDatetimeFormat, "unsupported timestamp type")
109+
})?,
110+
),
111+
};
112+
}
81113
other => {
82114
return Err(ErrorResponse::error(
83115
SqlState::FeatureNotSupported,
@@ -108,7 +140,10 @@ pub fn data_type_to_oid(ty: &DataType) -> Result<DataTypeOid, ErrorResponse> {
108140
DataType::Decimal128(_, _) => DataTypeOid::Numeric,
109141
DataType::Utf8 | DataType::Utf8View => DataTypeOid::Text,
110142
DataType::Date32 | DataType::Date64 => DataTypeOid::Date,
111-
DataType::Timestamp(_, None) => DataTypeOid::Timestamp,
143+
DataType::Timestamp(_, tz) => match tz {
144+
Some(_) => DataTypeOid::Timestamptz,
145+
None => DataTypeOid::Timestamp,
146+
},
112147
other => {
113148
return Err(ErrorResponse::error(
114149
SqlState::FeatureNotSupported,

convergence-arrow/tests/test_arrow.rs

Lines changed: 46 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,19 @@
11
use async_trait::async_trait;
2-
use chrono::{NaiveDate, NaiveDateTime};
2+
use chrono::{DateTime, NaiveDate, NaiveDateTime};
33
use convergence::engine::{Engine, Portal};
44
use convergence::protocol::{ErrorResponse, FieldDescription};
55
use convergence::protocol_ext::DataRowBatch;
66
use convergence::server::{self, BindOptions};
77
use convergence::sqlparser::ast::Statement;
88
use convergence_arrow::table::{record_batch_to_rows, schema_to_field_desc};
9-
use datafusion::arrow::array::{ArrayRef, Date32Array, Decimal128Array, Float32Array, Int32Array, StringArray, StringViewArray, TimestampSecondArray};
9+
use datafusion::arrow::array::{
10+
ArrayRef, Date32Array, Decimal128Array, Float32Array, Int32Array, StringArray, StringViewArray,
11+
TimestampSecondArray,
12+
};
1013
use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
1114
use datafusion::arrow::record_batch::RecordBatch;
12-
use std::sync::Arc;
1315
use rust_decimal::Decimal;
16+
use std::sync::Arc;
1417
use tokio_postgres::{connect, NoTls};
1518

1619
struct ArrowPortal {
@@ -32,10 +35,17 @@ impl ArrowEngine {
3235
fn new() -> Self {
3336
let int_col = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;
3437
let float_col = Arc::new(Float32Array::from(vec![1.5, 2.5, 3.5])) as ArrayRef;
35-
let decimal_col = Arc::new(Decimal128Array::from(vec![11, 22, 33]).with_precision_and_scale(2, 0).unwrap()) as ArrayRef;
38+
let decimal_col = Arc::new(
39+
Decimal128Array::from(vec![11, 22, 33])
40+
.with_precision_and_scale(2, 0)
41+
.unwrap(),
42+
) as ArrayRef;
3643
let string_col = Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef;
3744
let string_view_col = Arc::new(StringViewArray::from(vec!["aa", "bb", "cc"])) as ArrayRef;
3845
let ts_col = Arc::new(TimestampSecondArray::from(vec![1577836800, 1580515200, 1583020800])) as ArrayRef;
46+
let ts_tz_col =
47+
Arc::new(TimestampSecondArray::from(vec![1577854800, 1580533200, 1583038800]).with_timezone("+05:00"))
48+
as ArrayRef;
3949
let date_col = Arc::new(Date32Array::from(vec![0, 1, 2])) as ArrayRef;
4050

4151
let schema = Schema::new(vec![
@@ -45,12 +55,29 @@ impl ArrowEngine {
4555
Field::new("string_col", DataType::Utf8, true),
4656
Field::new("string_view_col", DataType::Utf8View, true),
4757
Field::new("ts_col", DataType::Timestamp(TimeUnit::Second, None), true),
58+
Field::new(
59+
"ts_tz_col",
60+
DataType::Timestamp(TimeUnit::Second, Some("+05:00".into())),
61+
true,
62+
),
4863
Field::new("date_col", DataType::Date32, true),
4964
]);
5065

5166
Self {
52-
batch: RecordBatch::try_new(Arc::new(schema), vec![int_col, float_col, decimal_col, string_col, string_view_col, ts_col, date_col])
53-
.expect("failed to create batch"),
67+
batch: RecordBatch::try_new(
68+
Arc::new(schema),
69+
vec![
70+
int_col,
71+
float_col,
72+
decimal_col,
73+
string_col,
74+
string_view_col,
75+
ts_col,
76+
ts_tz_col,
77+
date_col,
78+
],
79+
)
80+
.expect("failed to create batch"),
5481
}
5582
}
5683
}
@@ -94,8 +121,16 @@ async fn basic_data_types() {
94121
let rows = client.query("select 1", &[]).await.unwrap();
95122
let get_row = |idx: usize| {
96123
let row = &rows[idx];
97-
let cols: (i32, f32, Decimal, &str, &str, NaiveDateTime, NaiveDate) =
98-
(row.get(0), row.get(1), row.get(2), row.get(3), row.get(4), row.get(5), row.get(6));
124+
let cols: (i32, f32, Decimal, &str, &str, NaiveDateTime, DateTime<_>, NaiveDate) = (
125+
row.get(0),
126+
row.get(1),
127+
row.get(2),
128+
row.get(3),
129+
row.get(4),
130+
row.get(5),
131+
row.get(6),
132+
row.get(7),
133+
);
99134
cols
100135
};
101136

@@ -111,6 +146,7 @@ async fn basic_data_types() {
111146
.unwrap()
112147
.and_hms_opt(0, 0, 0)
113148
.unwrap(),
149+
DateTime::from_timestamp_millis(1577854800000).unwrap(),
114150
NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(),
115151
)
116152
);
@@ -126,6 +162,7 @@ async fn basic_data_types() {
126162
.unwrap()
127163
.and_hms_opt(0, 0, 0)
128164
.unwrap(),
165+
DateTime::from_timestamp_millis(1580533200000).unwrap(),
129166
NaiveDate::from_ymd_opt(1970, 1, 2).unwrap()
130167
)
131168
);
@@ -141,6 +178,7 @@ async fn basic_data_types() {
141178
.unwrap()
142179
.and_hms_opt(0, 0, 0)
143180
.unwrap(),
181+
DateTime::from_timestamp_millis(1583038800000).unwrap(),
144182
NaiveDate::from_ymd_opt(1970, 1, 3).unwrap()
145183
)
146184
);

convergence/src/protocol.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ data_types! {
7979

8080
Date = 1082, 4
8181
Timestamp = 1114, 8
82+
Timestamptz = 1184, 8
8283

8384
Text = 25, -1
8485
}

convergence/src/protocol_ext.rs

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
33
use crate::protocol::{ConnectionCodec, FormatCode, ProtocolError, RowDescription};
44
use bytes::{BufMut, BytesMut};
5-
use chrono::{NaiveDate, NaiveDateTime};
5+
use chrono::{DateTime, FixedOffset, NaiveDate, NaiveDateTime};
66
use rust_decimal::Decimal;
77
use tokio_postgres::types::{ToSql, Type};
88
use tokio_util::codec::Encoder;
@@ -133,17 +133,29 @@ impl<'a> DataRowWriter<'a> {
133133
}
134134
}
135135

136+
/// Writes a timestamp with timezone value for the next column.
137+
pub fn write_timestamp_with_tz(&mut self, val: DateTime<FixedOffset>) {
138+
match self.parent.format_code {
139+
FormatCode::Binary => {
140+
let ts_tz_type = Type::from_oid(1184).expect("failed to create timestamptz type");
141+
let mut buf = BytesMut::new();
142+
val.to_sql(&ts_tz_type, &mut buf).expect("failed to write timestamptz");
143+
self.write_value(&buf.freeze())
144+
}
145+
FormatCode::Text => self.write_string(&val.to_string()),
146+
}
147+
}
148+
136149
/// Writes a numeric value for the next column.
137150
pub fn write_numeric_16(&mut self, val: i128, _p: &u8, s: &i8) {
138151
let decimal = Decimal::from_i128_with_scale(val, *s as u32);
139152
match self.parent.format_code {
140-
FormatCode::Text => {
141-
self.write_string(&decimal.to_string())
142-
}
153+
FormatCode::Text => self.write_string(&decimal.to_string()),
143154
FormatCode::Binary => {
144155
let numeric_type = Type::from_oid(1700).expect("failed to create numeric type");
145156
let mut buf = BytesMut::new();
146-
decimal.to_sql(&numeric_type, &mut buf)
157+
decimal
158+
.to_sql(&numeric_type, &mut buf)
147159
.expect("failed to write numeric");
148160

149161
self.write_value(&buf.freeze())

0 commit comments

Comments
 (0)