diff --git a/milvus/build.gradle.kts b/milvus/build.gradle.kts new file mode 100644 index 00000000000..e9c0b491e28 --- /dev/null +++ b/milvus/build.gradle.kts @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +dependencies { + api(project(":core")) + api(project(":linq4j")) + + implementation("io.milvus:milvus-sdk-java:2.5.13") + + testImplementation(platform("org.junit:junit-bom:5.10.0")) + testImplementation("org.junit.jupiter:junit-jupiter") + testImplementation("org.testcontainers:testcontainers") +} diff --git a/milvus/src/main/java/org/apache/calcite/adapter/milvus/convention/MilvusRel.java b/milvus/src/main/java/org/apache/calcite/adapter/milvus/convention/MilvusRel.java new file mode 100644 index 00000000000..70e3b21f1b8 --- /dev/null +++ b/milvus/src/main/java/org/apache/calcite/adapter/milvus/convention/MilvusRel.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.milvus.convention; + +import org.apache.calcite.adapter.milvus.factory.MilvusTranslatableTable; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.volcano.RelSubset; +import org.apache.calcite.rel.RelFieldCollation; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexNode; + +import java.util.List; +import java.util.Map; + +/** + * Relational expression that uses Milvus calling convention. + */ +public interface MilvusRel extends RelNode { + void implement(Implementor implementor); + + Convention CONVENTION = new Convention.Impl("MILVUS", MilvusRel.class); + + /** + * Implementor for Milvus relational expressions. + */ + class Implementor { + public final RexBuilder rexBuilder; + + // scan + public RelOptTable table; + public MilvusTranslatableTable milvusTable; + public RelDataType rowType; + public Map milvusOptions; + + // filter + public RexNode filterCondition; + + // project + public RelDataType projectRowType; + public List projects; + + // vector search + public RexNode vectorDistanceExpr; + public Integer vectorDistanceFieldIndex; + public RelFieldCollation.Direction sortOrder; + public RexNode limit; + + public Implementor(RexBuilder rexBuilder) { + this.rexBuilder = rexBuilder; + } + + public void visitChild(int ordinal, RelNode input) { + assert ordinal == 0; + + RelNode node = findMilvusRel(input); + + if (!(node instanceof MilvusRel)) { + throw new IllegalStateException( + "Expected MilvusRel input but got " + + (node == null ? "null" : node.getClass().getName()) + + " (original=" + input.getClass().getName() + ")"); + } + ((MilvusRel) node).implement(this); + } + } + + static RelNode findMilvusRel(RelNode input) { + RelNode node = input; + if (node instanceof RelSubset) { + final RelSubset subset = (RelSubset) node; + RelNode best = subset.getBest(); + if (best != null) { + node = best; + } else { + // find first MilvusRel in the subset + for (RelNode r : subset.getRelList()) { + if (r instanceof MilvusRel) { + node = r; + break; + } + } + } + } + return node; + } +} diff --git a/milvus/src/main/java/org/apache/calcite/adapter/milvus/convention/MilvusToEnumerableConverter.java b/milvus/src/main/java/org/apache/calcite/adapter/milvus/convention/MilvusToEnumerableConverter.java new file mode 100644 index 00000000000..58dd19db2a3 --- /dev/null +++ b/milvus/src/main/java/org/apache/calcite/adapter/milvus/convention/MilvusToEnumerableConverter.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.milvus.convention; + +import org.apache.calcite.adapter.enumerable.EnumerableRel; +import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor; +import org.apache.calcite.adapter.enumerable.JavaRowFormat; +import org.apache.calcite.adapter.enumerable.PhysType; +import org.apache.calcite.adapter.enumerable.PhysTypeImpl; +import org.apache.calcite.adapter.milvus.factory.MilvusTranslatableTable; +import org.apache.calcite.adapter.milvus.operation.MilvusProjectExpression; +import org.apache.calcite.linq4j.tree.BlockBuilder; +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.linq4j.tree.Expressions; +import org.apache.calcite.linq4j.tree.Types; +import org.apache.calcite.plan.ConventionTraitDef; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterImpl; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.util.BuiltInMethod; +import org.apache.calcite.util.Pair; + +import org.checkerframework.checker.nullness.qual.Nullable; + +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.List; + +/** + * MilvusToEnumerableConverter converts a relational expression + * from Milvus calling convention to Enumerable calling convention. + */ +public class MilvusToEnumerableConverter + extends ConverterImpl + implements EnumerableRel { + protected MilvusToEnumerableConverter( + RelOptCluster cluster, + RelTraitSet traits, + RelNode input) { + super(cluster, ConventionTraitDef.INSTANCE, traits, input); + } + + @Override public RelNode copy(RelTraitSet traitSet, List inputs) { + return new MilvusToEnumerableConverter( + getCluster(), traitSet, sole(inputs)); + } + + @Override public @Nullable RelOptCost computeSelfCost(RelOptPlanner planner, + RelMetadataQuery mq) { + return super.computeSelfCost(planner, mq).multiplyBy(.1); + } + + @Override public Result implement(EnumerableRelImplementor implementor, Prefer pref) { + final BlockBuilder list = new BlockBuilder(); + final MilvusRel.Implementor milvusImplementor = + new MilvusRel.Implementor(getCluster().getRexBuilder()); + milvusImplementor.visitChild(0, getInput()); + + final Expression root = implementor.getRootExpression(); + final Expression schema = + Expressions.call(root, BuiltInMethod.DATA_CONTEXT_GET_ROOT_SCHEMA.method); + + // scan + final List qualifiedTableName = milvusImplementor.table.getQualifiedName(); + final Expression table = getScanInfo(qualifiedTableName, schema); + final Expression tableExpr = + Expressions.convert_(table, MilvusTranslatableTable.class); + // project + final RelDataType rowType = milvusImplementor.projectRowType != null + ? milvusImplementor.projectRowType + : getRowType(); + + final PhysType physType = + PhysTypeImpl.of( + implementor.getTypeFactory(), rowType, + pref.prefer(JavaRowFormat.ARRAY)); + + final List projects = milvusImplementor.projects; + List> + projectInfo = getProjectInfo(projects, rowType, milvusImplementor.rowType, physType); + final Expression projectInfoExpr = + list.append("projectRowTypeMapForEnumerator", expressionForProjectPairs(projectInfo)); + + Expression enumerable = + list.append( + "enumerable", Expressions.call(tableExpr, + "scan", + Expressions.constant(""), + projectInfoExpr)); + + list.add(Expressions.return_(null, enumerable)); + return implementor.result(physType, list.toBlock()); + } + + private static Expression getScanInfo(List qualifiedName, + Expression schema) { + final String schemaName = qualifiedName.size() > 1 ? qualifiedName.get(0) : null; + final String tableName = qualifiedName.get(qualifiedName.size() - 1); + + Expression current = schema; + + if (schemaName != null) { + current = + Expressions.call(current, BuiltInMethod.SCHEMA_GET_SUB_SCHEMA.method, + Expressions.constant(schemaName)); + current = Expressions.convert_(current, Schema.class); + } + + return Expressions.call(current, + BuiltInMethod.SCHEMA_GET_TABLE.method, + Expressions.constant(tableName)); + } + + private static List> getProjectInfo(List projects, + RelDataType rowType, RelDataType inputRowType, PhysType physType) { + List> projectInfo = new ArrayList<>(); + if (projects != null) { + for (int i = 0; i < projects.size(); i++) { + RexNode project = projects.get(i); + Class fieldClass = physType.fieldClass(i); + MilvusProjectExpression expr; + + if (project instanceof RexInputRef) { + int inputIndex = ((RexInputRef) project).getIndex(); + String originalFieldName = inputRowType.getFieldNames().get(inputIndex); + expr = new MilvusProjectExpression.InputField(originalFieldName, fieldClass); + } else if (project instanceof RexLiteral) { + RexLiteral literal = (RexLiteral) project; + expr = new MilvusProjectExpression.Constant(fieldClass, literal.getValue3()); + } else { + throw new UnsupportedOperationException("Unsupported project type"); + } + projectInfo.add(Pair.of(i, expr)); + } + } else { + List inputFields = rowType.getFieldNames(); + for (int i = 0; i < inputFields.size(); i++) { + String fieldName = inputFields.get(i); + Class fieldClass = physType.fieldClass(i); + projectInfo.add( + Pair.of(i, + new MilvusProjectExpression.InputField(fieldName, fieldClass))); + } + } + return projectInfo; + } + + private Expression expressionForProjectExpression(MilvusProjectExpression expr) { + if (expr instanceof MilvusProjectExpression.InputField) { + String fieldName = ((MilvusProjectExpression.InputField) expr).getFieldName(); + return Expressions.new_(MilvusProjectExpression.InputField.class, + Expressions.constant(fieldName), + Expressions.constant(expr.getClazz(), Class.class)); + } else if (expr instanceof MilvusProjectExpression.Constant) { + Object value = ((MilvusProjectExpression.Constant) expr).getValue(); + return Expressions.new_(MilvusProjectExpression.Constant.class, + Expressions.constant(expr.getClazz(), Class.class), + Expressions.constant(value)); + } else if (expr instanceof MilvusProjectExpression.VectorScore) { + return Expressions.new_(MilvusProjectExpression.VectorScore.class, + Expressions.constant(expr.getClazz(), Class.class)); + } else { + throw new AssertionError("Unknown expression type: " + expr); + } + } + + private Expression expressionForProjectPairs(List> pairs) { + List pairExpressions = new ArrayList<>(); + + for (Pair pair : pairs) { + Expression first = Expressions.constant(pair.left, Integer.class); + Expression second = expressionForProjectExpression(pair.right); + Type pairType = Types.of(Pair.class, Integer.class, MilvusProjectExpression.class); + Expression pairExpr = Expressions.new_(pairType, first, second); + pairExpressions.add(pairExpr); + } + return Expressions.call(BuiltInMethod.ARRAYS_AS_LIST.method, + Expressions.newArrayInit(Pair.class, pairExpressions)); + } + +} diff --git a/milvus/src/main/java/org/apache/calcite/adapter/milvus/convention/MilvusToEnumerableConverterRule.java b/milvus/src/main/java/org/apache/calcite/adapter/milvus/convention/MilvusToEnumerableConverterRule.java new file mode 100644 index 00000000000..1a197ad3c00 --- /dev/null +++ b/milvus/src/main/java/org/apache/calcite/adapter/milvus/convention/MilvusToEnumerableConverterRule.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.milvus.convention; + +import org.apache.calcite.adapter.enumerable.EnumerableConvention; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; + +/** + * Rule to convert a relational expression from + * {@link MilvusRel#CONVENTION Milvus calling convention} to + * {@link org.apache.calcite.adapter.enumerable.EnumerableConvention Enumerable calling convention}. + */ +public class MilvusToEnumerableConverterRule extends ConverterRule { + public static final MilvusToEnumerableConverterRule INSTANCE = + Config.INSTANCE + .withConversion(MilvusRel.class, MilvusRel.CONVENTION, + EnumerableConvention.INSTANCE, "MilvusToEnumerableConverterRule") + .withRuleFactory(MilvusToEnumerableConverterRule::new) + .toRule(MilvusToEnumerableConverterRule.class); + + protected MilvusToEnumerableConverterRule(Config config) { + super(config); + } + + @Override public RelNode convert(RelNode relNode) { + final RelTraitSet traitSet = relNode.getTraitSet() + .replace(getOutTrait()); + return new MilvusToEnumerableConverter(relNode.getCluster(), traitSet, + relNode); + } + + @Override public void onMatch(RelOptRuleCall call) { + final RelNode rel = call.rel(0); + if (rel.getConvention() == getOutTrait()) { + return; + } + final RelNode converted = convert(rel); + if (converted != null) { + call.transformTo(converted); + } + } +} diff --git a/milvus/src/main/java/org/apache/calcite/adapter/milvus/convention/package-info.java b/milvus/src/main/java/org/apache/calcite/adapter/milvus/convention/package-info.java new file mode 100644 index 00000000000..7dbf8dae4ff --- /dev/null +++ b/milvus/src/main/java/org/apache/calcite/adapter/milvus/convention/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Calling convention for the Milvus adapter. + */ +package org.apache.calcite.adapter.milvus.convention; diff --git a/milvus/src/main/java/org/apache/calcite/adapter/milvus/factory/MilvusSchema.java b/milvus/src/main/java/org/apache/calcite/adapter/milvus/factory/MilvusSchema.java new file mode 100644 index 00000000000..6c269ab7d9b --- /dev/null +++ b/milvus/src/main/java/org/apache/calcite/adapter/milvus/factory/MilvusSchema.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.milvus.factory; + +import org.apache.calcite.schema.Table; +import org.apache.calcite.schema.impl.AbstractSchema; + +import io.milvus.v2.client.ConnectConfig; +import io.milvus.v2.client.MilvusClientV2; +import io.milvus.v2.service.collection.request.CreateCollectionReq; +import io.milvus.v2.service.collection.request.DescribeCollectionReq; +import io.milvus.v2.service.collection.response.DescribeCollectionResp; +import io.milvus.v2.service.collection.response.ListCollectionsResp; + +import java.util.HashMap; +import java.util.Map; + +/** + * Schema implementation for Milvus database. + * Uses a global singleton MilvusClientV2 for all operations. + */ +public class MilvusSchema extends AbstractSchema { + private final Map tableMap = new HashMap<>(); + + private final String host; + private final Integer port; + private final String databaseName; + private final String user; + private final String password; + + + public MilvusSchema(String host, Integer port, String databaseName, String user, + String password) { + super(); + this.host = host; + this.port = port; + this.databaseName = databaseName; + this.user = user; + this.password = password; + } + + private ConnectConfig buildConnectConfig() { + ConnectConfig.ConnectConfigBuilder builder = ConnectConfig.builder() + .uri("http://" + host + ":" + port); + if (databaseName != null) { + builder.dbName(databaseName); + } + if (user != null) { + builder.username(user); + } + if (password != null) { + builder.password(password); + } + return builder.build(); + } + + /** + * Borrows a MilvusClientV2 for this schema's database. + * Creates a new client with the specific database context. + */ + public MilvusClientV2 borrowClient() { + // Create a client with specific database context + return new MilvusClientV2(buildConnectConfig()); + } + + /** + * Returns a MilvusClientV2 back to the pool. + * Closes the client as we create a new one for each borrow. + */ + public void returnClient(MilvusClientV2 client) { + if (client != null) { + try { + client.close(); + } catch (Exception e) { + // Ignore close errors + } + } + } + + @Override protected synchronized Map getTableMap() { + MilvusClientV2 client = borrowClient(); + try { + ListCollectionsResp list = client.listCollections(); + if (list.getCollectionNames() != null) { + for (String name : list.getCollectionNames()) { + tableMap.computeIfAbsent(name, n -> + new MilvusTranslatableTable(this, n, getCollectionSchema(n, client))); + } + } + } finally { + returnClient(client); + } + return tableMap; + } + + private CreateCollectionReq.CollectionSchema getCollectionSchema(String collectionName, + MilvusClientV2 milvusClient) { + DescribeCollectionReq req = + DescribeCollectionReq.builder() + .collectionName(collectionName) + .databaseName(databaseName) + .build(); + DescribeCollectionResp describeCollectionResp = + milvusClient.describeCollection(req); + return describeCollectionResp.getCollectionSchema(); + } +} diff --git a/milvus/src/main/java/org/apache/calcite/adapter/milvus/factory/MilvusSchemaFactory.java b/milvus/src/main/java/org/apache/calcite/adapter/milvus/factory/MilvusSchemaFactory.java new file mode 100644 index 00000000000..933c56a4f08 --- /dev/null +++ b/milvus/src/main/java/org/apache/calcite/adapter/milvus/factory/MilvusSchemaFactory.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.milvus.factory; + +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaFactory; +import org.apache.calcite.schema.SchemaPlus; + +import java.util.Map; + +/** + * Schema Factory for milvus database. + */ +public class MilvusSchemaFactory implements SchemaFactory { + @Override public Schema create(SchemaPlus parentSchema, String name, + Map operand) { + String host = (String) operand.get("host"); + Integer milvusPort = (Integer) operand.get("port"); + String databaseName = (String) operand.get("databaseName"); + String user = (String) operand.get("user"); + String password = (String) operand.get("password"); + return new MilvusSchema(host, milvusPort, databaseName, user, password); + } +} diff --git a/milvus/src/main/java/org/apache/calcite/adapter/milvus/factory/MilvusTranslatableTable.java b/milvus/src/main/java/org/apache/calcite/adapter/milvus/factory/MilvusTranslatableTable.java new file mode 100644 index 00000000000..6f92967a17d --- /dev/null +++ b/milvus/src/main/java/org/apache/calcite/adapter/milvus/factory/MilvusTranslatableTable.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.milvus.factory; + +import org.apache.calcite.adapter.milvus.operation.MilvusProjectExpression; +import org.apache.calcite.adapter.milvus.operation.MilvusQueryEnumerator; +import org.apache.calcite.adapter.milvus.operation.MilvusTableScan; +import org.apache.calcite.linq4j.AbstractEnumerable; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.Enumerator; +import org.apache.calcite.linq4j.QueryProvider; +import org.apache.calcite.linq4j.Queryable; +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.schema.QueryableTable; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Schemas; +import org.apache.calcite.schema.TranslatableTable; +import org.apache.calcite.schema.impl.AbstractTable; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.util.Pair; + +import io.milvus.v2.common.DataType; +import io.milvus.v2.service.collection.request.CreateCollectionReq; + +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.List; + +/** + * Milvus table that supports translation to relational algebra. + */ +public class MilvusTranslatableTable extends AbstractTable + implements QueryableTable, TranslatableTable { + private final MilvusSchema schema; + private final String collectionName; + private final CreateCollectionReq.CollectionSchema collectionSchema; + + public MilvusTranslatableTable(MilvusSchema schema, String collectionName, + CreateCollectionReq.CollectionSchema collectionSchema) { + this.collectionName = collectionName; + this.schema = schema; + this.collectionSchema = collectionSchema; + + } + + @Override public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) { + final RelOptCluster cluster = context.getCluster(); + return new MilvusTableScan( + cluster, + cluster.traitSetOf(org.apache.calcite.adapter.milvus.convention.MilvusRel.CONVENTION), + context.getTableHints(), + relOptTable, + this); + } + + @Override public RelDataType getRowType(RelDataTypeFactory typeFactory) { + + List dataTypes = new ArrayList<>(); + List fieldNames = new ArrayList<>(); + for (CreateCollectionReq.FieldSchema fieldSchema : collectionSchema + .getFieldSchemaList()) { + DataType dataType = fieldSchema.getDataType(); + fieldNames.add(fieldSchema.getName()); + switch (dataType) { + case Int8: + dataTypes.add( + typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.SMALLINT), + true)); + break; + case Int16: + dataTypes.add( + typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.SMALLINT), + true)); + break; + case Int32: + dataTypes.add( + typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.INTEGER), + true)); + break; + case Int64: + dataTypes.add( + typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.BIGINT), + true)); + break; + case BinaryVector: + dataTypes.add( + typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.VARBINARY), + true)); + break; + case FloatVector: + dataTypes.add( + typeFactory.createTypeWithNullability( + typeFactory.createArrayType( + typeFactory.createSqlType(SqlTypeName.REAL), -1), + true)); + break; + case Float: + dataTypes.add( + typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.REAL), + true)); + break; + case Double: + dataTypes.add( + typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.DOUBLE), + true)); + break; + case Bool: + dataTypes.add( + typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.BOOLEAN), + true)); + break; + default: + dataTypes.add( + typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.VARCHAR), + true)); + break; + } + } + return typeFactory.createStructType(Pair.zip(fieldNames, dataTypes)); + } + + public Enumerable scan( + String filterExpression, + List> projectRowTypeMapForEnumerator) { + return new AbstractEnumerable() { + @Override public Enumerator enumerator() { + return new MilvusQueryEnumerator( + MilvusTranslatableTable.this.schema, + collectionName, + filterExpression, + projectRowTypeMapForEnumerator); + } + }; + } + + @Override public Expression getExpression(SchemaPlus schema, String tableName, Class clazz) { + return Schemas.tableExpression(schema, getElementType(), tableName, clazz); + } + + @Override public Queryable asQueryable(QueryProvider queryProvider, SchemaPlus schema, + String tableName) { + throw new UnsupportedOperationException(); + } + + @Override public Type getElementType() { + return Object[].class; + } +} diff --git a/milvus/src/main/java/org/apache/calcite/adapter/milvus/factory/package-info.java b/milvus/src/main/java/org/apache/calcite/adapter/milvus/factory/package-info.java new file mode 100644 index 00000000000..80553761166 --- /dev/null +++ b/milvus/src/main/java/org/apache/calcite/adapter/milvus/factory/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Factory classes for the Milvus adapter. + */ +package org.apache.calcite.adapter.milvus.factory; diff --git a/milvus/src/main/java/org/apache/calcite/adapter/milvus/operation/MilvusProjectExpression.java b/milvus/src/main/java/org/apache/calcite/adapter/milvus/operation/MilvusProjectExpression.java new file mode 100644 index 00000000000..4136f9e25fb --- /dev/null +++ b/milvus/src/main/java/org/apache/calcite/adapter/milvus/operation/MilvusProjectExpression.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.milvus.operation; + +/** + * Milvus project expression abstraction representing different types of + * projected values in a Milvus query. + */ +public abstract class MilvusProjectExpression { + + /** + * Type of project expression. + */ + public enum ExpressionType { + /** + * Constant value expression. + */ + CONSTANT, + /** + * Vector retrieval score expression. + */ + VECTOR_SCORE, + /** + * Input field expression. + */ + INPUT_FIELD + } + + private final Class clazz; + private final ExpressionType type; + + protected MilvusProjectExpression(ExpressionType type, Class clazz) { + this.clazz = clazz; + this.type = type; + } + + public Class getClazz() { + return clazz; + } + + public ExpressionType getType() { + return type; + } + + /** + * Constant value expression. + */ + public static class Constant extends MilvusProjectExpression { + private final Object value; + + public Constant(Class clazz, Object value) { + super(ExpressionType.CONSTANT, clazz); + this.value = value; + } + + public Object getValue() { + return value; + } + + // tostring + @Override public String toString() { + return "Constant{" + "value=" + value + ", clazz=" + getClass() + "}"; + } + } + + /** + * Vector retrieval score expression. + */ + public static class VectorScore extends MilvusProjectExpression { + public VectorScore(Class clazz) { + super(ExpressionType.VECTOR_SCORE, clazz); + } + + @Override public String toString() { + return "VectorScore{clazz=" + getClass() + "}"; + } + } + + /** + * Input field expression. + */ + public static class InputField extends MilvusProjectExpression { + private final String fieldName; + + public InputField(String fieldName, Class clazz) { + super(ExpressionType.INPUT_FIELD, clazz); + this.fieldName = fieldName; + } + + public String getFieldName() { + return fieldName; + } + + @Override public String toString() { + return "InputField{" + "fieldName='" + fieldName + '\'' + ", clazz=" + getClass() + "}"; + } + } +} diff --git a/milvus/src/main/java/org/apache/calcite/adapter/milvus/operation/MilvusQueryEnumerator.java b/milvus/src/main/java/org/apache/calcite/adapter/milvus/operation/MilvusQueryEnumerator.java new file mode 100644 index 00000000000..6a0d2fbe354 --- /dev/null +++ b/milvus/src/main/java/org/apache/calcite/adapter/milvus/operation/MilvusQueryEnumerator.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.milvus.operation; + +import org.apache.calcite.adapter.milvus.factory.MilvusSchema; +import org.apache.calcite.linq4j.Enumerator; +import org.apache.calcite.util.Pair; + +import io.milvus.v2.client.MilvusClientV2; + +import org.checkerframework.checker.nullness.qual.Nullable; + +import java.util.Iterator; +import java.util.List; + +/** + * Enumerator that reads from a Milvus collection with pagination support. + */ +public class MilvusQueryEnumerator implements Enumerator { + private static final int DEFAULT_PAGINATION_SIZE = 5000; + private static final Object NO_CURRENT = new Object(); + private final MilvusClientV2 client; + private final Iterator iterator; + private Object current; + + private MilvusSchema milvusSchema; + + public MilvusQueryEnumerator( + MilvusSchema schema, + String collectionName, + @Nullable String filterExpression, + @Nullable List> projectRowTypeMapForEnumerator) { + this.milvusSchema = schema; + this.client = schema.borrowClient(); + this.iterator = + createIterator(this.client, collectionName, filterExpression, DEFAULT_PAGINATION_SIZE, + projectRowTypeMapForEnumerator); + this.current = NO_CURRENT; + } + + static Iterator createIterator( + MilvusClientV2 client, + String collectionName, + @Nullable String filterExpression, + int batchSize, + @Nullable List> projectRowTypeMapForEnumerator) { + return new MilvusRowIterator(client, collectionName, filterExpression, batchSize, + projectRowTypeMapForEnumerator); + } + + + @Override public Object current() { + if (current == NO_CURRENT) { + throw new IllegalStateException(); + } + return current; + } + + @Override public boolean moveNext() { + if (iterator.hasNext()) { + Row row = iterator.next(); + if (row.values.length == 1) { + current = row.values[0]; + } else { + current = row.values; + } + return true; + } else { + current = NO_CURRENT; + return false; + } + } + + @Override public void reset() { + + } + + @Override public void close() { + try { + this.milvusSchema.returnClient(this.client); + } catch (Exception ignore) { + // ignore + } + } + +} diff --git a/milvus/src/main/java/org/apache/calcite/adapter/milvus/operation/MilvusRowIterator.java b/milvus/src/main/java/org/apache/calcite/adapter/milvus/operation/MilvusRowIterator.java new file mode 100644 index 00000000000..ef3382af0ed --- /dev/null +++ b/milvus/src/main/java/org/apache/calcite/adapter/milvus/operation/MilvusRowIterator.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.milvus.operation; + +import org.apache.calcite.util.Pair; + +import io.milvus.v2.client.MilvusClientV2; +import io.milvus.v2.service.vector.request.QueryReq; +import io.milvus.v2.service.vector.response.QueryResp; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * Iterator for reading rows from a Milvus collection with pagination support. + */ +public class MilvusRowIterator implements Iterator { + private final MilvusClientV2 client; + private final String collectionName; + private final List project; + private final String filterExpression; + private final int pageSize; + private final Map projectRowTypeMap; + + private Iterator currentPageIterator = null; + private long offset = 0; + private boolean hasMore = true; + + MilvusRowIterator( + MilvusClientV2 client, + String collectionName, + String filterExpression, + int pageSize, + List> projectRowTypeMapForEnumerator) { + this.client = client; + this.collectionName = collectionName; + this.filterExpression = filterExpression; + this.pageSize = pageSize; + + // Build projectRowTypeMap and extract field names for querying Milvus + if (projectRowTypeMapForEnumerator != null && !projectRowTypeMapForEnumerator.isEmpty()) { + this.projectRowTypeMap = new LinkedHashMap<>(); + this.project = new ArrayList<>(); + for (Pair pair : projectRowTypeMapForEnumerator) { + MilvusProjectExpression expr = pair.right; + this.projectRowTypeMap.put(pair.left, expr); + + // Extract InputField names for Milvus query + if (expr instanceof MilvusProjectExpression.InputField) { + String fieldName = ((MilvusProjectExpression.InputField) expr).getFieldName(); + this.project.add(fieldName); + } + } + } else { + this.project = null; + this.projectRowTypeMap = null; + } + } + + @Override public boolean hasNext() { + // Check if we have more data in the current page + if (currentPageIterator != null && currentPageIterator.hasNext()) { + return true; + } + + // No more data in current page, try to load next page + if (hasMore) { + loadNextPage(); + return currentPageIterator != null && currentPageIterator.hasNext(); + } + + return false; + } + + @Override public Row next() { + if (currentPageIterator == null || !currentPageIterator.hasNext()) { + if (hasMore) { + loadNextPage(); + } + if (currentPageIterator == null || !currentPageIterator.hasNext()) { + throw new java.util.NoSuchElementException(); + } + } + return currentPageIterator.next(); + } + + private void loadNextPage() { + if (!hasMore) { + return; + } + + try { + // Query a page of records from Milvus + QueryReq.QueryReqBuilder queryBuilder = QueryReq.builder() + .collectionName(collectionName) + .outputFields(project) + .limit(pageSize); + + if (filterExpression != null && !filterExpression.isEmpty()) { + queryBuilder.filter(filterExpression); + } + + if (offset > 0) { + queryBuilder.offset(offset); + } + + QueryReq queryReq = queryBuilder.build(); + QueryResp response = client.query(queryReq); + + List queryResults = response.getQueryResults(); + + if (queryResults == null || queryResults.isEmpty()) { + hasMore = false; + currentPageIterator = Collections.emptyIterator(); + return; + } + + int rowCount = queryResults.size(); + + if (rowCount == 0) { + hasMore = false; + currentPageIterator = Collections.emptyIterator(); + return; + } + + currentPageIterator = createIteratorFromQueryResults(queryResults, projectRowTypeMap); + + // Update offset and hasMore for next page + if (rowCount < pageSize) { + hasMore = false; + } else { + offset += rowCount; + } + } catch (Exception e) { + hasMore = false; + throw new RuntimeException("Error loading next page from Milvus: " + e.getMessage(), e); + } + } + + private static Object[] fillProjectRow( + Map entity, + Map projectRowTypeMap) { + if (projectRowTypeMap == null) { + return entity.values().toArray(); + } + Object[] rowValues = new Object[projectRowTypeMap.size()]; + for (Map.Entry entry : projectRowTypeMap.entrySet()) { + Integer position = entry.getKey(); + MilvusProjectExpression expr = entry.getValue(); + if (expr.getType() == MilvusProjectExpression.ExpressionType.CONSTANT) { + rowValues[position] = ((MilvusProjectExpression.Constant) expr).getValue(); + } else if (expr.getType() == MilvusProjectExpression.ExpressionType.INPUT_FIELD + && expr instanceof MilvusProjectExpression.InputField) { + rowValues[position] = + entity.get(((MilvusProjectExpression.InputField) expr).getFieldName()); + } else { + rowValues[position] = null; + } + } + return rowValues; + } + + private Iterator createIteratorFromQueryResults( + List queryResults, + Map projectRowTypeMap) { + List pageRows = new ArrayList<>(); + + for (QueryResp.QueryResult result : queryResults) { + Map entity = result.getEntity(); + Object[] rowValues = fillProjectRow(entity, projectRowTypeMap); + pageRows.add(new Row(rowValues)); + } + + return pageRows.iterator(); + } +} diff --git a/milvus/src/main/java/org/apache/calcite/adapter/milvus/operation/MilvusTableScan.java b/milvus/src/main/java/org/apache/calcite/adapter/milvus/operation/MilvusTableScan.java new file mode 100644 index 00000000000..6d3893bf6a5 --- /dev/null +++ b/milvus/src/main/java/org/apache/calcite/adapter/milvus/operation/MilvusTableScan.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.milvus.operation; + +import org.apache.calcite.adapter.milvus.convention.MilvusRel; +import org.apache.calcite.adapter.milvus.convention.MilvusToEnumerableConverterRule; +import org.apache.calcite.adapter.milvus.factory.MilvusTranslatableTable; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.hint.RelHint; +import org.apache.calcite.rel.metadata.RelMetadataQuery; + +import org.checkerframework.checker.nullness.qual.Nullable; + +import java.util.List; + +import static com.google.common.base.Preconditions.checkArgument; + +import static java.util.Objects.requireNonNull; + +/** + * Relational expression representing a scan of a Milvus collection. + * + *

Additional operations might be applied, + * using query methods. + */ +public class MilvusTableScan extends TableScan implements MilvusRel { + final MilvusTranslatableTable milvusTable; + + public MilvusTableScan(RelOptCluster cluster, RelTraitSet traitSet, + List hints, + RelOptTable table, MilvusTranslatableTable milvusTable) { + super(cluster, traitSet, hints, table); + + this.milvusTable = requireNonNull(milvusTable, "milvusTable"); + checkArgument(getConvention() == MilvusRel.CONVENTION); + } + + @Override public RelNode withHints(List hintList) { + return new MilvusTableScan(getCluster(), traitSet, hintList, table, milvusTable); + } + + @Override public RelNode copy(RelTraitSet traitSet, List inputs) { + assert inputs.isEmpty(); + return new MilvusTableScan(getCluster(), traitSet, getHints(), table, milvusTable); + } + + @Override public RelWriter explainTerms(RelWriter pw) { + return super.explainTerms(pw) + .itemIf("hints", getHints(), !getHints().isEmpty()); + } + + @Override public @Nullable RelOptCost computeSelfCost(RelOptPlanner planner, + RelMetadataQuery mq) { + final RelOptCost cost = requireNonNull(super.computeSelfCost(planner, mq)); + final int fieldCount = getRowType().getFieldCount(); + return cost.multiplyBy(0.01 * fieldCount); + } + + @Override public void register(RelOptPlanner planner) { + planner.addRule(MilvusToEnumerableConverterRule.INSTANCE); + } + + @Override public void implement(Implementor implementor) { + implementor.table = table; + implementor.milvusTable = milvusTable; + implementor.rowType = getRowType(); + } + +} diff --git a/milvus/src/main/java/org/apache/calcite/adapter/milvus/operation/Row.java b/milvus/src/main/java/org/apache/calcite/adapter/milvus/operation/Row.java new file mode 100644 index 00000000000..9971321d70f --- /dev/null +++ b/milvus/src/main/java/org/apache/calcite/adapter/milvus/operation/Row.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.milvus.operation; + +/** + * Row of projected data from Milvus. + */ +public class Row { + public final Object[] values; + + public Row(Object[] values) { + this.values = values; + } + + public static Row createEmpty(int fieldCount) { + return new Row(new Object[fieldCount]); + } +} diff --git a/milvus/src/main/java/org/apache/calcite/adapter/milvus/operation/package-info.java b/milvus/src/main/java/org/apache/calcite/adapter/milvus/operation/package-info.java new file mode 100644 index 00000000000..15394ad9135 --- /dev/null +++ b/milvus/src/main/java/org/apache/calcite/adapter/milvus/operation/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Operations for the Milvus adapter. + */ +package org.apache.calcite.adapter.milvus.operation; diff --git a/milvus/src/test/java/org/apache/calcite/adapter/milvus/MilvusBaseE2ETest.java b/milvus/src/test/java/org/apache/calcite/adapter/milvus/MilvusBaseE2ETest.java new file mode 100644 index 00000000000..bfe3a5cc7b5 --- /dev/null +++ b/milvus/src/test/java/org/apache/calcite/adapter/milvus/MilvusBaseE2ETest.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.milvus; + +import org.apache.calcite.adapter.milvus.extension.MilvusExtension; +import org.apache.calcite.adapter.milvus.factory.MilvusSchemaFactory; +import org.apache.calcite.jdbc.CalciteConnection; +import org.apache.calcite.jdbc.Driver; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaPlus; + +import io.milvus.client.MilvusServiceClient; +import io.milvus.v2.client.MilvusClientV2; + +import org.junit.jupiter.api.extension.ExtendWith; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * Base class for Milvus E2E tests providing utility methods and constants. + * + *

Tests extending this class should also use + * {@code @ExtendWith(MilvusExtension.class)} to ensure Milvus containers + * are properly initialized. + */ +@ExtendWith(MilvusExtension.class) +public abstract class MilvusBaseE2ETest { + + public static final String MILVUS_CONVERTER = "MilvusToEnumerableConverter"; + public static final String MILVUS_SCAN = "MilvusTableScan"; + + + public static MilvusServiceClient getMilvusServiceClientV1() { + return MilvusExtension.getMilvusClientV1(); + } + + + public static MilvusClientV2 getMilvusServiceClientV2() { + return MilvusExtension.getMilvusClient(); + } + + + /** + * Check if the execution plan contains a specific Milvus operator. + * + * @param executionPlan the execution plan string + * @param operator the operator name to search for + * @return true if the operator is present + */ + protected boolean containsMilvusOperator(String executionPlan, String operator) { + for (String plan : executionPlan.split("\n")) { + if (plan.contains(operator)) { + return true; + } + } + return false; + } + + /** + * Get the execution plan for a SQL query. + * + * @param sql the SQL query + * @param connection the Calcite connection + * @return the execution plan string + * @throws SQLException if query execution fails + */ + public String getExecutionPlan(String sql, Connection connection) throws SQLException { + String explainSql = "EXPLAIN PLAN FOR " + sql; + + String executionPlan = ""; + try (Statement statement = connection.createStatement()) { + ResultSet resultSet = statement.executeQuery(explainSql); + while (resultSet.next()) { + executionPlan = resultSet.getString(1); + } + } + System.out.println("\n=== Execution Plan for SQL ==="); + System.out.println(executionPlan); + return executionPlan; + } + + /** + * Set up a Calcite connection with Milvus schema. + * + * @return Connection to Calcite with Milvus schema + * @throws Exception if setup fails + */ + public static Connection setupCalciteConnection() throws Exception { + // Enable Janino source printing +// System.setProperty("calcite.debug", "true"); +// System.setProperty("calcite.debug.janino", "true"); + + Map params = MilvusExtension.getConnectionParams(); + String host = (String) params.get("host"); + Integer port = (Integer) params.get("port"); + + Properties info = new Properties(); + info.setProperty("lex", "JAVA"); + + final Driver driver = new Driver(); + Connection connection = driver.connect("jdbc:calcite:", info); + CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class); + SchemaPlus rootSchema = calciteConnection.getRootSchema(); + + Map operands = new HashMap<>(); + operands.put("host", host); + operands.put("port", port); + operands.put("databaseName", "default"); + + MilvusSchemaFactory schemaFactory = new MilvusSchemaFactory(); + // Create schema + Schema milvusSchema = schemaFactory.create(rootSchema, "milvus", operands); + // Add to root + rootSchema.add("milvus", milvusSchema); + + return connection; + } + + protected List getSqlResult(String sql, Connection connection) throws SQLException { + try (Statement statement = connection.createStatement()) { + ResultSet resultSet = statement.executeQuery(sql); + List actual = new ArrayList<>(); + int columnCount = resultSet.getMetaData().getColumnCount(); + while (resultSet.next()) { + StringBuilder rowValue = new StringBuilder(); + for (int i = 1; i <= columnCount; i++) { + String value = resultSet.getString(i); + if (i > 1) { + rowValue.append(","); + } + rowValue.append(value); + } + actual.add(rowValue.toString()); + } + return actual; + } + } +} diff --git a/milvus/src/test/java/org/apache/calcite/adapter/milvus/extension/MilvusExtension.java b/milvus/src/test/java/org/apache/calcite/adapter/milvus/extension/MilvusExtension.java new file mode 100644 index 00000000000..31e6859d25c --- /dev/null +++ b/milvus/src/test/java/org/apache/calcite/adapter/milvus/extension/MilvusExtension.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.milvus.extension; + +import org.apache.calcite.adapter.milvus.util.ContainerUtil; + +import io.milvus.client.MilvusServiceClient; +import io.milvus.param.ConnectParam; +import io.milvus.v2.client.ConnectConfig; +import io.milvus.v2.client.MilvusClientV2; + +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.ExtensionContext; + +import java.util.HashMap; +import java.util.Map; + +/** + * JUnit 5 Extension for managing Milvus container lifecycle. + * This extension is responsible for: + *

    + *
  • Starting Milvus containers before all tests in a class
  • + *
  • Creating Milvus client connections
  • + *
  • Cleaning up resources after all tests complete
  • + *
+ * + *

Usage: Add {@code @ExtendWith(MilvusExtension.class)} to your test class. + */ +public class MilvusExtension implements BeforeAllCallback, AfterAllCallback { + + private static ContainerUtil containerUtil; + private static MilvusClientV2 milvusClientV2; + private static MilvusServiceClient milvusServiceClient; // V1 client for test data setup + + + @Override public void beforeAll(ExtensionContext context) { + if (containerUtil == null) { + containerUtil = new ContainerUtil(); + containerUtil.startMilvusContainers(); + } + + if (milvusClientV2 == null) { + String host = containerUtil.getMilvusHost(); + Integer port = containerUtil.getMilvusPort(); + ConnectConfig connectConfig = ConnectConfig.builder() + .uri("http://" + host + ":" + port) + .build(); + milvusClientV2 = new MilvusClientV2(connectConfig); + } + + if (milvusServiceClient == null) { + String host = containerUtil.getMilvusHost(); + Integer port = containerUtil.getMilvusPort(); + ConnectParam cp = ConnectParam.newBuilder() + .withHost(host) + .withPort(port) + .build(); + milvusServiceClient = new MilvusServiceClient(cp); + } + } + + @Override public void afterAll(ExtensionContext context) { + // In Gradle builds, test classes can run concurrently even within the same worker. + // Tearing down a shared Testcontainers-backed Milvus instance here can break other test + // classes that are still executing. + // We intentionally keep the container alive for the duration of the test JVM. + } + + /** + * Get the Milvus V2 client. + * Note: This should only be called after the extension has been initialized. + * + * @return the MilvusClientV2 instance + */ + public static MilvusClientV2 getMilvusClient() { + if (milvusClientV2 == null) { + throw new IllegalStateException( + "Milvus client not initialized. Ensure @ExtendWith(MilvusExtension.class) is used."); + } + return milvusClientV2; + } + + /** + * Get the Milvus V1 client (for test data preparation only). + * Note: This should only be called after the extension has been initialized. + * + * @return the MilvusServiceClient instance + */ + public static MilvusServiceClient getMilvusClientV1() { + if (milvusServiceClient == null) { + throw new IllegalStateException( + "Milvus V1 client not initialized. Ensure @ExtendWith(MilvusExtension.class) is used."); + } + return milvusServiceClient; + } + + /** + * Get Milvus connection parameters. + * + * @return map containing host and port + */ + public static Map getConnectionParams() { + if (containerUtil == null) { + throw new IllegalStateException( + "Container not initialized. Ensure @ExtendWith(MilvusExtension.class) is used."); + } + Map params = new HashMap<>(); + params.put("host", containerUtil.getMilvusHost()); + params.put("port", containerUtil.getMilvusPort()); + return params; + } +} diff --git a/milvus/src/test/java/org/apache/calcite/adapter/milvus/sql/MilvusScanQueryTest.java b/milvus/src/test/java/org/apache/calcite/adapter/milvus/sql/MilvusScanQueryTest.java new file mode 100644 index 00000000000..4abcada8f88 --- /dev/null +++ b/milvus/src/test/java/org/apache/calcite/adapter/milvus/sql/MilvusScanQueryTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.milvus.sql; + +import org.apache.calcite.adapter.milvus.MilvusBaseE2ETest; +import org.apache.calcite.adapter.milvus.extension.MilvusExtension; +import org.apache.calcite.adapter.milvus.util.TestEnvUtil; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.testcontainers.shaded.com.google.common.collect.Lists; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Tests for Milvus scan query functionality. + */ +@ExtendWith(MilvusExtension.class) +public class MilvusScanQueryTest extends MilvusBaseE2ETest { + private static final String COLLECTION_NAME = "MilvusScanQueryTest"; + private Connection connection; + + @BeforeAll + static void setupOnce() { + TestEnvUtil testEnvUtil = + new TestEnvUtil(COLLECTION_NAME, getMilvusServiceClientV1()); + testEnvUtil.createExampleCollection(); + } + + @BeforeEach + public void setUp() throws Exception { + connection = setupCalciteConnection(); + } + + @Test public void testStringConstant() throws SQLException { + List expected = + Lists.newArrayList( + "ABriefHistoryOfTime,Time is a mysterious phenomenon that is everywhere" + + " yet hard to grasp.,[0.2, 0.4, 0.6, 0.8]", + "ADreamOfRedMansions,A Dream of Red Mansions is a great work depicting the rise and fall of feudal society.,[0.7, 1.4, 2.1, 2.8]", + "FortressBesieged,Those inside the fortress want to get out, while those outside want to get in. This is the paradox of life.,[0.5, 1.0, 1.5, 2.0]", + "NorwegianWood,Norwegian Wood is filled with the confusion and hesitation of youth.,[0.90000004, 1.8000001, 2.7, 3.6000001]", + "OneHundredYearsOfSolitude,Macondo is a small town full of magical colors, where many incredible stories happened.,[0.3, 0.6, 0.90000004, 1.2]", + "OrdinaryWorld,Although life is ordinary, everyone has their own dreams and pursuits.,[0.6, 1.2, 1.8000001, 2.4]", + "TheKiteRunner,The Kite Runner tells a moving story about friendship and redemption.,[1.0, 2.0, 3.0, 4.0]", + "TheLittlePrince,Once upon a time there was a little prince who lived on a very small planet, where there was a rose he cherished very much.,[0.1, 0.2, 0.3, 0.4]", + "ThreeBody,The arrival of the Trisolaran civilization changed humanity's understanding of the universe.,[0.8, 1.6, 2.4, 3.2]", + "ToLive,Life is like a play, and we are all actors in it, experiencing joy and sorrow.,[0.4, 0.8, 1.2, 1.6]"); + String sql = "select * from milvus." + COLLECTION_NAME + " "; + String executionPlan = getExecutionPlan(sql, connection); + assertTrue(containsMilvusOperator(executionPlan, MILVUS_SCAN)); + try (Statement statement = connection.createStatement()) { + ResultSet resultSet = statement.executeQuery(sql); + List actual = new ArrayList<>(); + while (resultSet.next()) { + String bookName = resultSet.getString(1); + String distance = resultSet.getString(2); + String vector = resultSet.getString(3); + actual.add(bookName + "," + distance + "," + vector); + } + Assertions.assertEquals(expected, actual); + } + } + + @AfterEach + public void tearDown() throws Exception { + if (connection != null && !connection.isClosed()) { + connection.close(); + } + } +} diff --git a/milvus/src/test/java/org/apache/calcite/adapter/milvus/util/ContainerUtil.java b/milvus/src/test/java/org/apache/calcite/adapter/milvus/util/ContainerUtil.java new file mode 100644 index 00000000000..1f2f0c3f126 --- /dev/null +++ b/milvus/src/test/java/org/apache/calcite/adapter/milvus/util/ContainerUtil.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.milvus.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerImageName; + +import java.net.HttpURLConnection; +import java.net.URI; +import java.net.URL; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Stream; + +/** + * Utility class for managing Milvus test containers. + */ +public class ContainerUtil { + + public static final String ETCD_NET_ALIAS = "etcd"; + public static final String MINIO_NET_ALIAS = "minio"; + public static final String MILVUS_NET_ALIAS = "milvus"; + public static String etcdImageVersion = "quay.io/coreos/etcd:v3.5.18"; + public static String minioImageVersion = "minio/minio:RELEASE.2024-05-28T17-19-04Z"; + public static String milvusImageVersion = "milvusdb/milvus:v2.5.18"; + + private static Logger logger = LoggerFactory.getLogger(ContainerUtil.class); + private GenericContainer etcdContainer; + private GenericContainer minioContainer; + private GenericContainer milvusContainer; + private Network network; + + public ContainerUtil() { + } + + private GenericContainer initialContainer( + String imageName, + Map env, + String command, + String netAliases, + Integer... exporsedPort) { + GenericContainer container = new GenericContainer<>(DockerImageName.parse(imageName)); + + return container + .withEnv(env) + .withCommand(command) + .withLogConsumer(new Slf4jLogConsumer(logger)) + .withNetwork(network) + .withNetworkAliases(netAliases) + .withExposedPorts(exporsedPort); + } + + public void startMilvusContainers() { + this.network = Network.newNetwork(); + etcdContainer = initialEtcdContainer(); + minioContainer = initialMinioContainer(); + milvusContainer = initialMilvusContainer(); + etcdContainer.withStartupTimeout(Duration.ofMinutes(3)); + minioContainer.withStartupTimeout(Duration.ofMinutes(3)); + milvusContainer.withStartupTimeout(Duration.ofMinutes(5)); + milvusContainer.dependsOn(etcdContainer, minioContainer); + Startables.deepStart(Stream.of(etcdContainer, minioContainer, milvusContainer)).join(); + waitForMilvusReady(); + } + + public String getMilvusHost() { + return milvusContainer.getHost(); + } + + public Integer getMilvusPort() { + return milvusContainer.getMappedPort(19530); + } + + public void clearAll() { + milvusContainer.close(); + minioContainer.close(); + etcdContainer.close(); + network.close(); + } + + private GenericContainer initialEtcdContainer() { + Map etcdEnv = + new HashMap() { + { + this.put("ETCD_AUTO_COMPACTION_MODE", "revision"); + this.put("ETCD_AUTO_COMPACTION_RETENTION", "1000"); + this.put("ETCD_QUOTA_BACKEND_BYTES", "4294967296"); + this.put("ETCD_SNAPSHOT_COUNT", "50000"); + } + }; + String command = + "etcd -advertise-client-urls=http://etcd:2379 -listen-client-urls" + + " http://0.0.0.0:2379 --data-dir /etcd "; + return initialContainer(etcdImageVersion, etcdEnv, command, ETCD_NET_ALIAS, 2379); + } + + private GenericContainer initialMinioContainer() { + Map minioEnv = + new HashMap() { + { + this.put("MINIO_ACCESS_KEY", "minioadmin"); + this.put("MINIO_SECRET_KEY", "minioadmin"); + } + }; + String command = "minio server /minio_data --console-address :9001"; + return initialContainer( + minioImageVersion, minioEnv, command, MINIO_NET_ALIAS, 9000, 9001); + } + + private GenericContainer initialMilvusContainer() { + Map milvusEnv = + new HashMap() { + { + this.put("ETCD_ENDPOINTS", ETCD_NET_ALIAS + ":" + "2379"); + this.put("MINIO_ADDRESS", MINIO_NET_ALIAS + ":" + "9000"); + this.put("MINIO_REGION", "us-east-1"); + } + }; + + String command = "milvus run standalone"; + return initialContainer( + milvusImageVersion, milvusEnv, command, MILVUS_NET_ALIAS, 19530, 9091); + } + + private void waitForMilvusReady() { + int maxRetries = 30; // maximum retry count + int retryInterval = 5000; // retry interval in milliseconds + + for (int i = 0; i < maxRetries; i++) { + try { + String host = milvusContainer.getHost(); + int port = milvusContainer.getMappedPort(9091); + URI healthUri = new URI("http://" + host + ":" + port + "/healthz"); + URL healthUrl = healthUri.toURL(); + HttpURLConnection connection = (HttpURLConnection) healthUrl.openConnection(); + connection.setRequestMethod("GET"); + connection.setConnectTimeout(5000); + connection.setReadTimeout(5000); + + int responseCode = connection.getResponseCode(); + if (responseCode == 200) { + logger.info("Milvus health check passed, service is ready"); + return; + } else { + logger.warn("Milvus health check returned code: {}, retrying...", responseCode); + } + } catch (Exception e) { + logger.info("Waiting for Milvus to be ready... (attempt {}/{})", i + 1, maxRetries); + } + + try { + Thread.sleep(retryInterval); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while waiting for Milvus to be ready", ie); + } + } + + throw new RuntimeException("Milvus failed to become ready within timeout"); + } + +} diff --git a/milvus/src/test/java/org/apache/calcite/adapter/milvus/util/TestEnvUtil.java b/milvus/src/test/java/org/apache/calcite/adapter/milvus/util/TestEnvUtil.java new file mode 100644 index 00000000000..568daec3d82 --- /dev/null +++ b/milvus/src/test/java/org/apache/calcite/adapter/milvus/util/TestEnvUtil.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.milvus.util; + +import com.google.common.collect.Lists; + +import io.milvus.client.MilvusServiceClient; +import io.milvus.grpc.DataType; +import io.milvus.grpc.MutationResult; +import io.milvus.param.IndexType; +import io.milvus.param.MetricType; +import io.milvus.param.R; +import io.milvus.param.RpcStatus; +import io.milvus.param.collection.CollectionSchemaParam; +import io.milvus.param.collection.CreateCollectionParam; +import io.milvus.param.collection.DropCollectionParam; +import io.milvus.param.collection.FieldType; +import io.milvus.param.collection.HasCollectionParam; +import io.milvus.param.collection.LoadCollectionParam; +import io.milvus.param.dml.InsertParam; +import io.milvus.param.index.CreateIndexParam; + +import java.util.ArrayList; +import java.util.List; + +/** + * Utility class for setting up Milvus test environment. + */ +public class TestEnvUtil { + private String collectionName; + private String databaseName; + private MilvusServiceClient milvusServiceClient; + private final MetricType metricType; + + public TestEnvUtil(String collectionName, MilvusServiceClient milvusServiceClient) { + this(collectionName, "default", milvusServiceClient, MetricType.L2); + } + + public TestEnvUtil(String collectionName, String databaseName, + MilvusServiceClient milvusServiceClient) { + this(collectionName, databaseName, milvusServiceClient, MetricType.L2); + } + + public TestEnvUtil(String collectionName, MilvusServiceClient milvusServiceClient, + MetricType metricType) { + this(collectionName, "default", milvusServiceClient, metricType); + } + + public TestEnvUtil(String collectionName, String databaseName, + MilvusServiceClient milvusServiceClient, MetricType metricType) { + this.collectionName = collectionName; + this.databaseName = databaseName != null ? databaseName : "default"; + this.milvusServiceClient = milvusServiceClient; + this.metricType = metricType; + } + + public void createExampleCollection() { + dropCollectionIfExists(); + createFloatVectorCollection(); + generateVectorExampleData(); + } + + private void generateVectorExampleData() { + List bookNames = + Lists.newArrayList("TheLittlePrince", + "ABriefHistoryOfTime", + "OneHundredYearsOfSolitude", + "ToLive", + "FortressBesieged", + "OrdinaryWorld", + "ADreamOfRedMansions", + "ThreeBody", + "NorwegianWood", + "TheKiteRunner"); + + List bookContents = + Lists.newArrayList( + "Once upon a time there was a little prince who lived on a very small" + + " planet, where there was a rose he cherished very much.", + "Time is a mysterious phenomenon that is everywhere yet hard to grasp.", + "Macondo is a small town full of magical colors, where many incredible stories happened.", + "Life is like a play, and we are all actors in it, experiencing joy and sorrow.", + "Those inside the fortress want to get out, while those outside want to get in. This is the paradox of life.", + "Although life is ordinary, everyone has their own dreams and pursuits.", + "A Dream of Red Mansions is a great work depicting the rise and fall of feudal society.", + "The arrival of the Trisolaran civilization changed humanity's understanding of the universe.", + "Norwegian Wood is filled with the confusion and hesitation of youth.", + "The Kite Runner tells a moving story about friendship and redemption."); + + // Generate float vectors (4 dimensions for testing) + List> vectors = generateDeterministicVectors(bookNames.size(), 4, 0.1f); + + // Create fields for insertion + List fields = new ArrayList<>(); + fields.add(new InsertParam.Field("book_name", bookNames)); + fields.add(new InsertParam.Field("book_content", bookContents)); + fields.add(new InsertParam.Field("VectorFieldAutoTest", vectors)); + + // Insert data + InsertParam insertParam = InsertParam.newBuilder() + .withCollectionName(collectionName) + .withDatabaseName(databaseName) + .withFields(fields) + .build(); + + R response = + milvusServiceClient.insert(insertParam); + if (response.getStatus() != 0) { + throw new RuntimeException("Failed to insert data: " + response.getMessage()); + } + + CreateIndexParam indexParam = CreateIndexParam.newBuilder() + .withCollectionName(collectionName) + .withDatabaseName(databaseName) + .withFieldName("VectorFieldAutoTest") + .withIndexName("float_vector_idx") + .withIndexType(IndexType.IVF_FLAT) + .withMetricType(metricType) + .withExtraParam("{\"nlist\":1024}") + .withSyncMode(Boolean.TRUE) + .build(); + + R indexResponse = milvusServiceClient.createIndex(indexParam); + if (indexResponse.getStatus() != 0) { + System.out.println("Warning: Failed to create index: " + indexResponse.getMessage()); + } + + // Load collection + LoadCollectionParam loadParam = LoadCollectionParam.newBuilder() + .withCollectionName(collectionName) + .withDatabaseName(databaseName) + .withSyncLoad(Boolean.TRUE) + .build(); + + R loadResponse = milvusServiceClient.loadCollection(loadParam); + if (loadResponse.getStatus() != R.Status.Success.getCode()) { + System.out.println("Warning: Failed to load collection: " + loadResponse.getMessage()); + } + } + + + private List> generateDeterministicVectors(int count, int dimension, float scale) { + List> vectors = new ArrayList<>(count); + for (int i = 0; i < count; i++) { + List vector = new ArrayList<>(dimension); + float base = (i + 1) * scale; + for (int j = 0; j < dimension; j++) { + vector.add(base * (j + 1)); + } + vectors.add(vector); + } + return vectors; + } + + private void dropCollectionIfExists() { + if (collectionExists(collectionName)) { + dropCollection(); + } + } + + + private boolean collectionExists(String collectionName) { + try { + HasCollectionParam.Builder paramBuilder = HasCollectionParam.newBuilder() + .withCollectionName(collectionName); + if (databaseName != null && !"default".equals(databaseName)) { + paramBuilder.withDatabaseName(databaseName); + } + R response = milvusServiceClient.hasCollection(paramBuilder.build()); + return response.getData(); + } catch (Exception e) { + return false; + } + } + + /** + * Creates a collection with FloatVector field specifically for vector search testing. + * This does not modify the base test class and is specific to vector search scenarios. + */ + private void createFloatVectorCollection() { + List fieldsSchema = new ArrayList<>(); + + // Primary key field + fieldsSchema.add( + FieldType.newBuilder() + .withName("book_name") + .withDataType(DataType.VarChar) + .withMaxLength(200) + .withPrimaryKey(true) + .withAutoID(false) + .build()); + + // Content field + fieldsSchema.add( + FieldType.newBuilder() + .withName("book_content") + .withDataType(DataType.VarChar) + .withMaxLength(200) + .build()); + + // Float vector field for L2 distance testing + fieldsSchema.add( + FieldType.newBuilder() + .withName("VectorFieldAutoTest") + .withDataType(DataType.FloatVector) + .withDimension(4) // Small dimension for ease of testing + .build()); + + CollectionSchemaParam schemaParam = CollectionSchemaParam.newBuilder() + .withFieldTypes(fieldsSchema) + .build(); + + CreateCollectionParam.Builder createCollectionReqBuilder = CreateCollectionParam.newBuilder() + .withCollectionName(collectionName) + .withDescription("Collection for vector search testing") + .withShardsNum(2) + .withSchema(schemaParam); + if (databaseName != null && !"default".equals(databaseName)) { + createCollectionReqBuilder.withDatabaseName(databaseName); + } + CreateCollectionParam createCollectionReq = createCollectionReqBuilder.build(); + + R response = milvusServiceClient.createCollection(createCollectionReq); + if (response.getStatus() != 0) { + throw new RuntimeException("Failed to create collection: " + response.getMessage()); + } + } + + private void dropCollection() { + DropCollectionParam.Builder dropParamBuilder = DropCollectionParam.newBuilder() + .withCollectionName(collectionName); + if (databaseName != null && !"default".equals(databaseName)) { + dropParamBuilder.withDatabaseName(databaseName); + } + milvusServiceClient.dropCollection(dropParamBuilder.build()); + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 67695f630c5..b751fcff944 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -85,7 +85,8 @@ include( "spark", "splunk", "testkit", - "ubenchmark" + "ubenchmark", + "milvus" ) // See https://github.com/gradle/gradle/issues/1348#issuecomment-284758705 and