Skip to content

Commit 0f852a8

Browse files
committed
[opt](maxcompute)Optimize maxcompute performance for select in scenarios with small limits.
Issue Number: None Related PR: None
1 parent 2fda4e0 commit 0f852a8

4 files changed

Lines changed: 460 additions & 1 deletion

File tree

fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java

Lines changed: 214 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,14 @@
5656
import java.time.Instant;
5757
import java.time.LocalDate;
5858
import java.time.LocalDateTime;
59+
import java.time.OffsetDateTime;
5960
import java.time.ZoneId;
6061
import java.time.ZonedDateTime;
62+
import java.time.format.DateTimeParseException;
63+
import java.util.ArrayList;
6164
import java.util.Base64;
6265
import java.util.HashMap;
66+
import java.util.LinkedHashMap;
6367
import java.util.List;
6468
import java.util.Map;
6569
import java.util.Objects;
@@ -634,6 +638,9 @@ public void unpackArray(List<ColumnValue> values) {
634638
ColumnType childType = columnType.getChildTypes().get(0);
635639
Object arrayObject = value;
636640
try {
641+
if (arrayObject instanceof String) {
642+
arrayObject = new ComplexValueParser((String) arrayObject).parseArray(childType);
643+
}
637644
if (arrayObject instanceof java.sql.Array) {
638645
arrayObject = ((java.sql.Array) arrayObject).getArray();
639646
}
@@ -655,7 +662,12 @@ public void unpackArray(List<ColumnValue> values) {
655662

656663
@Override
657664
public void unpackMap(List<ColumnValue> keys, List<ColumnValue> values) {
658-
Map<?, ?> map = (Map<?, ?>) value;
665+
Object mapObject = value;
666+
if (mapObject instanceof String) {
667+
mapObject = new ComplexValueParser((String) mapObject).parseMap(
668+
columnType.getChildTypes().get(0), columnType.getChildTypes().get(1));
669+
}
670+
Map<?, ?> map = (Map<?, ?>) mapObject;
659671
ColumnType keyType = columnType.getChildTypes().get(0);
660672
ColumnType valueType = columnType.getChildTypes().get(1);
661673
for (Map.Entry<?, ?> entry : map.entrySet()) {
@@ -668,6 +680,22 @@ public void unpackMap(List<ColumnValue> keys, List<ColumnValue> values) {
668680
public void unpackStruct(List<Integer> structFieldIndex, List<ColumnValue> values) {
669681
List<ColumnType> childTypes = columnType.getChildTypes();
670682
try {
683+
if (value instanceof String) {
684+
List<Object> fieldValues = new ComplexValueParser((String) value).parseStruct(columnType);
685+
for (Integer fieldIndex : structFieldIndex) {
686+
values.add(new ResultSetColumnValue(fieldValues.get(fieldIndex),
687+
childTypes.get(fieldIndex), timeZone));
688+
}
689+
return;
690+
}
691+
if (value instanceof List<?>) {
692+
List<?> fieldValues = (List<?>) value;
693+
for (Integer fieldIndex : structFieldIndex) {
694+
values.add(new ResultSetColumnValue(fieldValues.get(fieldIndex),
695+
childTypes.get(fieldIndex), timeZone));
696+
}
697+
return;
698+
}
671699
if (value instanceof Struct) {
672700
Struct struct = (Struct) value;
673701
for (Integer fieldIndex : structFieldIndex) {
@@ -704,7 +732,192 @@ private Instant asInstant() {
704732
if (value instanceof java.util.Date) {
705733
return ((java.util.Date) value).toInstant();
706734
}
735+
if (value instanceof CharSequence) {
736+
String text = normalizeTemporalText(value.toString());
737+
try {
738+
return OffsetDateTime.parse(text).toInstant();
739+
} catch (DateTimeParseException e) {
740+
// fall through
741+
}
742+
try {
743+
return ZonedDateTime.parse(text).toInstant();
744+
} catch (DateTimeParseException e) {
745+
// fall through
746+
}
747+
try {
748+
return Instant.parse(text);
749+
} catch (DateTimeParseException e) {
750+
// fall through
751+
}
752+
try {
753+
return LocalDateTime.parse(text).atZone(timeZone).toInstant();
754+
} catch (DateTimeParseException e) {
755+
// fall through
756+
}
757+
try {
758+
return LocalDate.parse(text).atStartOfDay(timeZone).toInstant();
759+
} catch (DateTimeParseException e) {
760+
throw new IllegalStateException("Unsupported temporal string value: " + text, e);
761+
}
762+
}
707763
throw new IllegalStateException("Unsupported temporal value type: " + value.getClass().getName());
708764
}
765+
766+
private String normalizeTemporalText(String text) {
767+
String normalized = text.trim();
768+
if (normalized.length() > 10 && normalized.charAt(10) == ' ') {
769+
return normalized.substring(0, 10) + "T" + normalized.substring(11);
770+
}
771+
return normalized;
772+
}
773+
}
774+
775+
private static final class ComplexValueParser {
776+
private final String text;
777+
private int position;
778+
779+
private ComplexValueParser(String text) {
780+
this.text = text == null ? "" : text;
781+
}
782+
783+
private List<Object> parseArray(ColumnType childType) {
784+
skipLeadingSpaces();
785+
expect('[');
786+
List<Object> values = new ArrayList<>();
787+
skipLeadingSpaces();
788+
if (consumeIf(']')) {
789+
return values;
790+
}
791+
while (true) {
792+
values.add(parseValue(childType, ',', ']'));
793+
skipLeadingSpaces();
794+
if (consumeIf(',')) {
795+
skipLeadingSpaces();
796+
continue;
797+
}
798+
expect(']');
799+
return values;
800+
}
801+
}
802+
803+
private Map<Object, Object> parseMap(ColumnType keyType, ColumnType valueType) {
804+
skipLeadingSpaces();
805+
expect('{');
806+
Map<Object, Object> values = new LinkedHashMap<>();
807+
skipLeadingSpaces();
808+
if (consumeIf('}')) {
809+
return values;
810+
}
811+
while (true) {
812+
Object key = parseValue(keyType, ':');
813+
expect(':');
814+
Object value = parseValue(valueType, ',', '}');
815+
values.put(key, value);
816+
skipLeadingSpaces();
817+
if (consumeIf(',')) {
818+
skipLeadingSpaces();
819+
continue;
820+
}
821+
expect('}');
822+
return values;
823+
}
824+
}
825+
826+
private List<Object> parseStruct(ColumnType structType) {
827+
skipLeadingSpaces();
828+
expect('{');
829+
List<Object> values = new ArrayList<>();
830+
for (int i = 0; i < structType.getChildTypes().size(); i++) {
831+
values.add(null);
832+
}
833+
skipLeadingSpaces();
834+
if (consumeIf('}')) {
835+
return values;
836+
}
837+
while (true) {
838+
String fieldName = asString(parseScalar(':'));
839+
int fieldIndex = structType.getChildNames().indexOf(fieldName);
840+
if (fieldIndex < 0) {
841+
throw new IllegalStateException("Unknown struct field '" + fieldName + "' in value: " + text);
842+
}
843+
expect(':');
844+
values.set(fieldIndex, parseValue(structType.getChildTypes().get(fieldIndex), ',', '}'));
845+
skipLeadingSpaces();
846+
if (consumeIf(',')) {
847+
skipLeadingSpaces();
848+
continue;
849+
}
850+
expect('}');
851+
return values;
852+
}
853+
}
854+
855+
private Object parseValue(ColumnType type, char... terminators) {
856+
skipLeadingSpaces();
857+
if (isNullToken(terminators)) {
858+
position += 4;
859+
return null;
860+
}
861+
if (type.isArray()) {
862+
return parseArray(type.getChildTypes().get(0));
863+
}
864+
if (type.isMap()) {
865+
return parseMap(type.getChildTypes().get(0), type.getChildTypes().get(1));
866+
}
867+
if (type.isStruct()) {
868+
return parseStruct(type);
869+
}
870+
return parseScalar(terminators);
871+
}
872+
873+
private Object parseScalar(char... terminators) {
874+
int start = position;
875+
while (position < text.length() && !isTerminator(text.charAt(position), terminators)) {
876+
position++;
877+
}
878+
return text.substring(start, position);
879+
}
880+
881+
private boolean isNullToken(char... terminators) {
882+
if (!text.regionMatches(true, position, "NULL", 0, 4)) {
883+
return false;
884+
}
885+
int end = position + 4;
886+
return end >= text.length() || isTerminator(text.charAt(end), terminators);
887+
}
888+
889+
private boolean isTerminator(char current, char... terminators) {
890+
for (char terminator : terminators) {
891+
if (current == terminator) {
892+
return true;
893+
}
894+
}
895+
return false;
896+
}
897+
898+
private void skipLeadingSpaces() {
899+
while (position < text.length() && text.charAt(position) == ' ') {
900+
position++;
901+
}
902+
}
903+
904+
private boolean consumeIf(char expected) {
905+
if (position < text.length() && text.charAt(position) == expected) {
906+
position++;
907+
return true;
908+
}
909+
return false;
910+
}
911+
912+
private void expect(char expected) {
913+
if (!consumeIf(expected)) {
914+
throw new IllegalStateException("Expected '" + expected + "' at position "
915+
+ position + " in value: " + text);
916+
}
917+
}
918+
919+
private String asString(Object value) {
920+
return value == null ? null : String.valueOf(value);
921+
}
709922
}
710923
}
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.maxcompute;
19+
20+
import org.apache.doris.common.jni.vec.ColumnType;
21+
import org.apache.doris.common.jni.vec.ColumnValue;
22+
23+
import org.junit.Assert;
24+
import org.junit.Test;
25+
26+
import java.lang.reflect.Constructor;
27+
import java.time.LocalDate;
28+
import java.time.LocalDateTime;
29+
import java.time.ZoneId;
30+
import java.util.ArrayList;
31+
import java.util.Arrays;
32+
import java.util.List;
33+
34+
public class MaxComputeJniScannerTest {
35+
private static final ZoneId SHANGHAI = ZoneId.of("Asia/Shanghai");
36+
37+
@Test
38+
public void testResultSetColumnValueParsesStringDateTime() throws Exception {
39+
ColumnValue value = newResultSetColumnValue("2026-04-12 15:16:17.123", "datetimev2(3)");
40+
41+
Assert.assertEquals(LocalDateTime.of(2026, 4, 12, 15, 16, 17, 123_000_000), value.getDateTime());
42+
}
43+
44+
@Test
45+
public void testResultSetColumnValueParsesOffsetDateTimeString() throws Exception {
46+
ColumnValue value = newResultSetColumnValue("2026-04-12T15:16:17Z", "datetimev2(3)");
47+
48+
Assert.assertEquals(LocalDateTime.of(2026, 4, 12, 23, 16, 17), value.getDateTime());
49+
}
50+
51+
@Test
52+
public void testResultSetColumnValueParsesStringDate() throws Exception {
53+
ColumnValue value = newResultSetColumnValue("2026-04-12", "datev2");
54+
55+
Assert.assertEquals(LocalDate.of(2026, 4, 12), value.getDate());
56+
}
57+
58+
@Test
59+
public void testResultSetColumnValueUnpacksArrayString() throws Exception {
60+
ColumnValue value = newResultSetColumnValue("[value1, value2, NULL]", "array<string>");
61+
List<ColumnValue> elements = new ArrayList<>();
62+
63+
value.unpackArray(elements);
64+
65+
Assert.assertEquals(3, elements.size());
66+
Assert.assertEquals("value1", elements.get(0).getString());
67+
Assert.assertEquals("value2", elements.get(1).getString());
68+
Assert.assertTrue(elements.get(2).isNull());
69+
}
70+
71+
@Test
72+
public void testResultSetColumnValueUnpacksMapString() throws Exception {
73+
ColumnValue value = newResultSetColumnValue("{key1:value1, key2:NULL}", "map<string,string>");
74+
List<ColumnValue> keys = new ArrayList<>();
75+
List<ColumnValue> values = new ArrayList<>();
76+
77+
value.unpackMap(keys, values);
78+
79+
Assert.assertEquals(2, keys.size());
80+
Assert.assertEquals("key1", keys.get(0).getString());
81+
Assert.assertEquals("value1", values.get(0).getString());
82+
Assert.assertEquals("key2", keys.get(1).getString());
83+
Assert.assertTrue(values.get(1).isNull());
84+
}
85+
86+
@Test
87+
public void testResultSetColumnValueUnpacksStructString() throws Exception {
88+
ColumnValue value = newResultSetColumnValue("{field1:abc, field2:123}", "struct<field1:string,field2:int>");
89+
List<ColumnValue> fields = new ArrayList<>();
90+
91+
value.unpackStruct(Arrays.asList(0, 1), fields);
92+
93+
Assert.assertEquals(2, fields.size());
94+
Assert.assertEquals("abc", fields.get(0).getString());
95+
Assert.assertEquals(123, fields.get(1).getInt());
96+
}
97+
98+
@Test
99+
public void testResultSetColumnValueUnpacksNestedComplexString() throws Exception {
100+
ColumnValue value = newResultSetColumnValue("{key:[{s_int:-123}]}", "map<string,array<struct<s_int:int>>>");
101+
List<ColumnValue> keys = new ArrayList<>();
102+
List<ColumnValue> values = new ArrayList<>();
103+
104+
value.unpackMap(keys, values);
105+
106+
Assert.assertEquals(1, keys.size());
107+
Assert.assertEquals("key", keys.get(0).getString());
108+
109+
List<ColumnValue> arrayElements = new ArrayList<>();
110+
values.get(0).unpackArray(arrayElements);
111+
Assert.assertEquals(1, arrayElements.size());
112+
113+
List<ColumnValue> structFields = new ArrayList<>();
114+
arrayElements.get(0).unpackStruct(Arrays.asList(0), structFields);
115+
Assert.assertEquals(-123, structFields.get(0).getInt());
116+
}
117+
118+
private ColumnValue newResultSetColumnValue(Object value, String type) throws Exception {
119+
Class<?> clazz = Class.forName("org.apache.doris.maxcompute.MaxComputeJniScanner$ResultSetColumnValue");
120+
Constructor<?> constructor = clazz.getDeclaredConstructor(Object.class, ColumnType.class, ZoneId.class);
121+
constructor.setAccessible(true);
122+
return (ColumnValue) constructor.newInstance(value, ColumnType.parseType("c1", type), SHANGHAI);
123+
}
124+
}

0 commit comments

Comments
 (0)