Skip to content

Commit 4347f4f

Browse files
mshauneureturnString
authored andcommitted
Add Utf8 support
1 parent cf77fc7 commit 4347f4f

4 files changed

Lines changed: 15 additions & 7 deletions

File tree

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,4 @@
11
/target
22
Cargo.lock
3+
.idea/
4+

convergence-arrow/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ tokio = { version = "1" }
1212
async-trait = "0.1"
1313
datafusion = "47"
1414
convergence = { path = "../convergence", version = "0.16.0" }
15-
chrono = "0.4"
15+
chrono = "=0.4.39"
1616

1717
[dev-dependencies]
1818
tokio-postgres = { version = "0.7", features = [ "with-chrono-0_4" ] }

convergence-arrow/src/table.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use convergence::protocol::{DataTypeOid, ErrorResponse, FieldDescription, SqlSta
44
use convergence::protocol_ext::DataRowBatch;
55
use datafusion::arrow::array::{
66
BooleanArray, Date32Array, Date64Array, Float16Array, Float32Array, Float64Array, Int16Array, Int32Array,
7-
Int64Array, Int8Array, StringArray, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
7+
Int64Array, Int8Array, StringArray, StringViewArray, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
88
TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
99
};
1010
use datafusion::arrow::datatypes::{DataType, Schema, TimeUnit};
@@ -48,6 +48,7 @@ pub fn record_batch_to_rows(arrow_batch: &RecordBatch, pg_batch: &mut DataRowBat
4848
DataType::Float32 => row.write_float4(array_val!(Float32Array, col, row_idx)),
4949
DataType::Float64 => row.write_float8(array_val!(Float64Array, col, row_idx)),
5050
DataType::Utf8 => row.write_string(array_val!(StringArray, col, row_idx)),
51+
DataType::Utf8View => row.write_string(array_val!(StringViewArray, col, row_idx)),
5152
DataType::Date32 => {
5253
row.write_date(array_val!(Date32Array, col, row_idx, value_as_date).ok_or_else(|| {
5354
ErrorResponse::error(SqlState::InvalidDatetimeFormat, "unsupported date type")
@@ -102,7 +103,7 @@ pub fn data_type_to_oid(ty: &DataType) -> Result<DataTypeOid, ErrorResponse> {
102103
DataType::UInt64 => DataTypeOid::Int8,
103104
DataType::Float16 | DataType::Float32 => DataTypeOid::Float4,
104105
DataType::Float64 => DataTypeOid::Float8,
105-
DataType::Utf8 => DataTypeOid::Text,
106+
DataType::Utf8 | DataType::Utf8View => DataTypeOid::Text,
106107
DataType::Date32 | DataType::Date64 => DataTypeOid::Date,
107108
DataType::Timestamp(_, None) => DataTypeOid::Timestamp,
108109
other => {

convergence-arrow/tests/test_arrow.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use convergence::protocol_ext::DataRowBatch;
66
use convergence::server::{self, BindOptions};
77
use convergence::sqlparser::ast::Statement;
88
use convergence_arrow::table::{record_batch_to_rows, schema_to_field_desc};
9-
use datafusion::arrow::array::{ArrayRef, Date32Array, Float32Array, Int32Array, StringArray, TimestampSecondArray};
9+
use datafusion::arrow::array::{ArrayRef, Date32Array, Float32Array, Int32Array, StringArray, StringViewArray, TimestampSecondArray};
1010
use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
1111
use datafusion::arrow::record_batch::RecordBatch;
1212
use std::sync::Arc;
@@ -32,19 +32,21 @@ impl ArrowEngine {
3232
let int_col = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;
3333
let float_col = Arc::new(Float32Array::from(vec![1.5, 2.5, 3.5])) as ArrayRef;
3434
let string_col = Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef;
35+
let string_view_col = Arc::new(StringViewArray::from(vec!["aa", "bb", "cc"])) as ArrayRef;
3536
let ts_col = Arc::new(TimestampSecondArray::from(vec![1577836800, 1580515200, 1583020800])) as ArrayRef;
3637
let date_col = Arc::new(Date32Array::from(vec![0, 1, 2])) as ArrayRef;
3738

3839
let schema = Schema::new(vec![
3940
Field::new("int_col", DataType::Int32, true),
4041
Field::new("float_col", DataType::Float32, true),
4142
Field::new("string_col", DataType::Utf8, true),
43+
Field::new("string_view_col", DataType::Utf8View, true),
4244
Field::new("ts_col", DataType::Timestamp(TimeUnit::Second, None), true),
4345
Field::new("date_col", DataType::Date32, true),
4446
]);
4547

4648
Self {
47-
batch: RecordBatch::try_new(Arc::new(schema), vec![int_col, float_col, string_col, ts_col, date_col])
49+
batch: RecordBatch::try_new(Arc::new(schema), vec![int_col, float_col, string_col, string_view_col, ts_col, date_col])
4850
.expect("failed to create batch"),
4951
}
5052
}
@@ -89,8 +91,8 @@ async fn basic_data_types() {
8991
let rows = client.query("select 1", &[]).await.unwrap();
9092
let get_row = |idx: usize| {
9193
let row = &rows[idx];
92-
let cols: (i32, f32, &str, NaiveDateTime, NaiveDate) =
93-
(row.get(0), row.get(1), row.get(2), row.get(3), row.get(4));
94+
let cols: (i32, f32, &str, &str, NaiveDateTime, NaiveDate) =
95+
(row.get(0), row.get(1), row.get(2), row.get(3), row.get(4), row.get(5));
9496
cols
9597
};
9698

@@ -100,6 +102,7 @@ async fn basic_data_types() {
100102
1,
101103
1.5,
102104
"a",
105+
"aa",
103106
NaiveDate::from_ymd_opt(2020, 1, 1)
104107
.unwrap()
105108
.and_hms_opt(0, 0, 0)
@@ -113,6 +116,7 @@ async fn basic_data_types() {
113116
2,
114117
2.5,
115118
"b",
119+
"bb",
116120
NaiveDate::from_ymd_opt(2020, 2, 1)
117121
.unwrap()
118122
.and_hms_opt(0, 0, 0)
@@ -126,6 +130,7 @@ async fn basic_data_types() {
126130
3,
127131
3.5,
128132
"c",
133+
"cc",
129134
NaiveDate::from_ymd_opt(2020, 3, 1)
130135
.unwrap()
131136
.and_hms_opt(0, 0, 0)

0 commit comments

Comments
 (0)