Skip to content

Commit 6cb8bc4

Browse files
committed
stream join convert
1 parent b683034 commit 6cb8bc4

4 files changed

Lines changed: 80 additions & 45 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
package com.dtstack.flink.sql;
2222

2323
import com.dtstack.flink.sql.classloader.DtClassLoader;
24+
import com.dtstack.flink.sql.config.CalciteConfig;
2425
import com.dtstack.flink.sql.constrant.ConfigConstrant;
2526
import com.dtstack.flink.sql.enums.ClusterMode;
2627
import com.dtstack.flink.sql.enums.ECacheType;
@@ -101,10 +102,6 @@ public class Main {
101102

102103
private static final Logger LOG = LoggerFactory.getLogger(Main.class);
103104

104-
public static Config config = org.apache.calcite.sql.parser.SqlParser
105-
.configBuilder()
106-
.setLex(Lex.MYSQL)
107-
.build();
108105

109106
public static void main(String[] args) throws Exception {
110107

@@ -180,7 +177,7 @@ private static void sqlTranslation(Options options,StreamTableEnvironment tableE
180177
CreateTmpTableParser.SqlParserResult tmp = sqlTree.getTmpTableMap().get(tableName);
181178
String realSql = DtStringUtil.replaceIgnoreQuota(result.getExecSql(), "`", "");
182179

183-
SqlNode sqlNode = org.apache.calcite.sql.parser.SqlParser.create(realSql,config).parseStmt();
180+
SqlNode sqlNode = org.apache.calcite.sql.parser.SqlParser.create(realSql, CalciteConfig.MYSQL_LEX_CONFIG).parseStmt();
184181
String tmpSql = ((SqlInsert) sqlNode).getSource().toString();
185182
tmp.setExecSql(tmpSql);
186183
sideSqlExec.registerTmpTable(tmp, sideTableMap, tableEnv, registerTableCache);

core/src/main/java/com/dtstack/flink/sql/side/JoinInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public class JoinInfo implements Serializable {
4141

4242
//左表是否是维表
4343
private boolean leftIsSideTable;
44-
44+
//左表是 转换后的中间表
4545
private boolean leftIsMidTable;
4646

4747
//右表是否是维表

core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java

Lines changed: 63 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,8 @@
2020

2121
package com.dtstack.flink.sql.side;
2222

23-
import com.dtstack.flink.sql.Main;
23+
import com.dtstack.flink.sql.config.CalciteConfig;
2424
import com.dtstack.flink.sql.util.ParseUtils;
25-
import org.apache.calcite.config.Lex;
2625
import org.apache.calcite.sql.JoinType;
2726
import org.apache.calcite.sql.SqlAsOperator;
2827
import org.apache.calcite.sql.SqlBasicCall;
@@ -41,13 +40,15 @@
4140
import org.apache.calcite.sql.parser.SqlParser;
4241
import org.apache.calcite.sql.parser.SqlParserPos;
4342
import org.apache.commons.collections.CollectionUtils;
43+
import org.apache.flink.api.java.tuple.Tuple2;
4444
import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
4545
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
4646
import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
4747
import org.apache.flink.calcite.shaded.com.google.common.collect.Queues;
4848
import org.slf4j.Logger;
4949
import org.slf4j.LoggerFactory;
5050

51+
import java.util.ArrayList;
5152
import java.util.List;
5253
import java.util.Map;
5354
import java.util.Queue;
@@ -72,12 +73,11 @@ public class SideSQLParser {
7273
public Queue<Object> getExeQueue(String exeSql, Set<String> sideTableSet) throws SqlParseException {
7374
System.out.println("---exeSql---");
7475
System.out.println(exeSql);
76+
LOG.info("---exeSql---");
77+
LOG.info(exeSql);
78+
7579
Queue<Object> queueInfo = Queues.newLinkedBlockingQueue();
76-
SqlParser.Config config = SqlParser
77-
.configBuilder()
78-
.setLex(Lex.MYSQL)
79-
.build();
80-
SqlParser sqlParser = SqlParser.create(exeSql,Main.config);
80+
SqlParser sqlParser = SqlParser.create(exeSql, CalciteConfig.MYSQL_LEX_CONFIG);
8181
SqlNode sqlNode = sqlParser.parseStmt();
8282
parseSql(sqlNode, sideTableSet, queueInfo);
8383
queueInfo.offer(sqlNode);
@@ -163,30 +163,40 @@ private JoinInfo dealJoinNode(SqlJoin joinNode, Set<String> sideTableSet, Queue<
163163
JoinType joinType = joinNode.getJoinType();
164164
String leftTbName = "";
165165
String leftTbAlias = "";
166+
String rightTableName = "";
167+
String rightTableAlias = "";
166168
Map<String, String> midTableMapping = null ;
167169
boolean leftIsMidTable = false;
170+
// 右节点已经被解析
171+
boolean rightIsParse = false;
172+
Tuple2<String, String> rightTableNameAndAlias = null;
168173

169174

170175
if(leftNode.getKind() == IDENTIFIER){
171176
leftTbName = leftNode.toString();
172177
}else if(leftNode.getKind() == JOIN){
173178
JoinInfo leftNodeJoinInfo = (JoinInfo)parseSql(leftNode, sideTableSet, queueInfo);//解析多JOIN
174-
// select * from xxx
175-
SqlNode sqlNode = buildSelectByLeftNode(leftNode);
176-
// ( select * from xxx) as xxx_0
177-
SqlBasicCall newAsNode = buildAsNodeByJoinInfo(leftNodeJoinInfo, sqlNode);
178-
179-
leftNode = newAsNode;
180-
181-
joinNode.setLeft(leftNode);
182-
183-
leftIsMidTable = true;
184179

185-
midTableMapping = saveTabMapping(leftNodeJoinInfo);
186-
187-
AliasInfo aliasInfo = (AliasInfo) parseSql(newAsNode, sideTableSet, queueInfo);
188-
leftTbName = aliasInfo.getName();
189-
leftTbAlias = aliasInfo.getAlias();
180+
rightTableNameAndAlias = parseRightNode(rightNode, sideTableSet, queueInfo);
181+
rightIsParse = true;
182+
if (checkIsSideTable(rightTableNameAndAlias.f0, sideTableSet)) {
183+
// select * from xxx
184+
SqlNode sqlNode = buildSelectByLeftNode(leftNode);
185+
// ( select * from xxx) as xxx_0
186+
SqlBasicCall newAsNode = buildAsNodeByJoinInfo(leftNodeJoinInfo, sqlNode);
187+
leftNode = newAsNode;
188+
joinNode.setLeft(leftNode);
189+
190+
leftIsMidTable = true;
191+
midTableMapping = saveTabMapping(leftNodeJoinInfo);
192+
193+
AliasInfo aliasInfo = (AliasInfo) parseSql(newAsNode, sideTableSet, queueInfo);
194+
leftTbName = aliasInfo.getName();
195+
leftTbAlias = aliasInfo.getAlias();
196+
} else {
197+
leftTbName = leftNodeJoinInfo.getRightTableName();
198+
leftTbAlias = leftNodeJoinInfo.getRightTableAlias();
199+
}
190200

191201
}else if(leftNode.getKind() == AS){
192202
AliasInfo aliasInfo = (AliasInfo) parseSql(leftNode, sideTableSet, queueInfo);
@@ -201,16 +211,11 @@ private JoinInfo dealJoinNode(SqlJoin joinNode, Set<String> sideTableSet, Queue<
201211
throw new RuntimeException("side-table must be at the right of join operator");
202212
}
203213

204-
String rightTableName = "";
205-
String rightTableAlias = "";
206-
207-
if(rightNode.getKind() == IDENTIFIER){
208-
rightTableName = rightNode.toString();
209-
}else{
210-
AliasInfo aliasInfo = (AliasInfo)parseSql(rightNode, sideTableSet, queueInfo);
211-
rightTableName = aliasInfo.getName();
212-
rightTableAlias = aliasInfo.getAlias();
214+
if (!rightIsParse) {
215+
rightTableNameAndAlias = parseRightNode(rightNode, sideTableSet, queueInfo);
213216
}
217+
rightTableName = rightTableNameAndAlias.f0;
218+
rightTableAlias = rightTableNameAndAlias.f1;
214219

215220
boolean rightIsSide = checkIsSideTable(rightTableName, sideTableSet);
216221
if(joinType == JoinType.RIGHT){
@@ -249,25 +254,44 @@ private JoinInfo dealJoinNode(SqlJoin joinNode, Set<String> sideTableSet, Queue<
249254
return tableInfo;
250255
}
251256

257+
private Tuple2<String, String> parseRightNode(SqlNode sqlNode, Set<String> sideTableSet, Queue<Object> queueInfo) {
258+
Tuple2<String, String> tabName = new Tuple2<>("", "");
259+
if(sqlNode.getKind() == IDENTIFIER){
260+
tabName.f0 = sqlNode.toString();
261+
}else{
262+
AliasInfo aliasInfo = (AliasInfo)parseSql(sqlNode, sideTableSet, queueInfo);
263+
tabName.f0 = aliasInfo.getName();
264+
tabName.f1 = aliasInfo.getAlias();
265+
}
266+
return tabName;
267+
}
268+
252269
private Map<String, String> saveTabMapping(JoinInfo leftNodeJoinInfo) {
253-
Map<String, String> midTableMapping;
270+
Map<String, String> midTableMapping = Maps.newHashMap();;
254271

255272
String midTab = buidTableName(leftNodeJoinInfo.getLeftTableAlias(), SPLIT, leftNodeJoinInfo.getRightTableAlias());
256-
midTab += "_0";
257-
258-
midTableMapping = Maps.newHashMap();
273+
String finalMidTab = midTab + "_0";
259274

260275
if(leftNodeJoinInfo.isLeftIsMidTable()) {
261276
midTableMapping.putAll(leftNodeJoinInfo.getLeftTabMapping());
262277
}
263-
264-
midTableMapping.put(leftNodeJoinInfo.getLeftTableAlias(), midTab);
265-
midTableMapping.put(leftNodeJoinInfo.getRightTableAlias(), midTab);
278+
fillLeftAllTable(leftNodeJoinInfo, midTableMapping, finalMidTab);
266279
return midTableMapping;
267280
}
268281

282+
private void fillLeftAllTable(JoinInfo leftNodeJoinInfo, Map<String, String> midTableMapping, String finalMidTab) {
283+
List<String> tablesName = Lists.newArrayList();
284+
ParseUtils.parseLeftNodeTableName(leftNodeJoinInfo.getLeftNode(), tablesName);
285+
286+
tablesName.forEach(tab ->{
287+
midTableMapping.put(tab, finalMidTab);
288+
});
289+
midTableMapping.put(leftNodeJoinInfo.getRightTableAlias(), finalMidTab);
290+
}
291+
292+
269293
private SqlNode buildSelectByLeftNode(SqlNode leftNode) {
270-
SqlParser sqlParser = SqlParser.create(tempSQL, Main.config);
294+
SqlParser sqlParser = SqlParser.create(tempSQL, CalciteConfig.MYSQL_LEX_CONFIG);
271295
SqlNode sqlNode = null;
272296
try {
273297
sqlNode = sqlParser.parseStmt();

core/src/main/java/com/dtstack/flink/sql/util/ParseUtils.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.apache.calcite.sql.SqlBasicCall;
4141
import org.apache.calcite.sql.SqlDataTypeSpec;
4242
import org.apache.calcite.sql.SqlIdentifier;
43+
import org.apache.calcite.sql.SqlJoin;
4344
import org.apache.calcite.sql.SqlKind;
4445
import org.apache.calcite.sql.SqlLiteral;
4546
import org.apache.calcite.sql.SqlNode;
@@ -247,4 +248,17 @@ public static String getRootName(Map<String, String> maps, String key) {
247248
}
248249
return res;
249250
}
251+
252+
public static void parseLeftNodeTableName(SqlNode leftJoin, List<String> tablesName) {
253+
if (leftJoin.getKind() == IDENTIFIER) {
254+
SqlIdentifier sqlIdentifier = (SqlIdentifier) leftJoin;
255+
tablesName.add(sqlIdentifier.names.get(0));
256+
} else if (leftJoin.getKind() == AS) {
257+
SqlNode sqlNode = ((SqlBasicCall) leftJoin).getOperands()[1];
258+
tablesName.add(sqlNode.toString());
259+
} else if (leftJoin.getKind() == JOIN) {
260+
parseLeftNodeTableName(((SqlJoin) leftJoin).getLeft(), tablesName);
261+
parseLeftNodeTableName(((SqlJoin) leftJoin).getRight(), tablesName);
262+
}
263+
}
250264
}

0 commit comments

Comments
 (0)