Skip to content

Commit f5c3c7f

Browse files
committed
fix conflicts
2 parents 88da30f + 9468eb4 commit f5c3c7f

13 files changed

Lines changed: 280 additions & 136 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 6 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020

2121
package com.dtstack.flink.sql;
2222

23-
import com.dtstack.flink.sql.classloader.DtClassLoader;
2423
import com.dtstack.flink.sql.config.CalciteConfig;
24+
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
2525
import com.dtstack.flink.sql.constrant.ConfigConstrant;
2626
import com.dtstack.flink.sql.enums.ClusterMode;
2727
import com.dtstack.flink.sql.enums.ECacheType;
@@ -76,10 +76,7 @@
7676
import java.lang.reflect.InvocationTargetException;
7777
import java.lang.reflect.Method;
7878
import java.net.URL;
79-
import java.net.URLClassLoader;
8079
import java.net.URLDecoder;
81-
import java.util.ArrayList;
82-
import java.util.Arrays;
8380
import java.util.List;
8481
import java.util.Map;
8582
import java.util.Properties;
@@ -123,10 +120,6 @@ public static void main(String[] args) throws Exception {
123120
addJarFileList = objMapper.readValue(addJarListStr, List.class);
124121
}
125122

126-
ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader();
127-
DtClassLoader parentClassloader = new DtClassLoader(new URL[]{}, threadClassLoader);
128-
Thread.currentThread().setContextClassLoader(parentClassloader);
129-
130123
confProp = URLDecoder.decode(confProp, Charsets.UTF_8.toString());
131124
Properties confProperties = PluginUtil.jsonStrToObject(confProp, Properties.class);
132125
StreamExecutionEnvironment env = getStreamExeEnv(confProperties, deployMode);
@@ -145,16 +138,14 @@ public static void main(String[] args) throws Exception {
145138
Map<String, Table> registerTableCache = Maps.newHashMap();
146139

147140
//register udf
148-
registerUDF(sqlTree, jarURList, parentClassloader, tableEnv);
141+
registerUDF(sqlTree, jarURList, tableEnv);
149142
//register table schema
150143
registerTable(sqlTree, env, tableEnv, localSqlPluginPath, remoteSqlPluginPath, sideTableMap, registerTableCache);
151144

152145
sqlTranslation(options,tableEnv,sqlTree,sideTableMap,registerTableCache);
153146

154147
if(env instanceof MyLocalStreamEnvironment) {
155-
List<URL> urlList = new ArrayList<>();
156-
urlList.addAll(Arrays.asList(parentClassloader.getURLs()));
157-
((MyLocalStreamEnvironment) env).setClasspaths(urlList);
148+
((MyLocalStreamEnvironment) env).setClasspaths(ClassLoaderManager.getClassPath());
158149
}
159150

160151
env.execute(name);
@@ -221,19 +212,12 @@ private static void addEnvClassPath(StreamExecutionEnvironment env, Set<URL> cla
221212
}
222213
}
223214

224-
private static void registerUDF(SqlTree sqlTree, List<URL> jarURList, URLClassLoader parentClassloader,
225-
StreamTableEnvironment tableEnv)
215+
private static void registerUDF(SqlTree sqlTree, List<URL> jarURList, StreamTableEnvironment tableEnv)
226216
throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException {
227-
List<CreateFuncParser.SqlParserResult> funcList = sqlTree.getFunctionList();
228-
if (funcList.isEmpty()) {
229-
return;
230-
}
231-
//load jar
232-
URLClassLoader classLoader = FlinkUtil.loadExtraJar(jarURList, parentClassloader);
233217
//register urf
218+
List<CreateFuncParser.SqlParserResult> funcList = sqlTree.getFunctionList();
234219
for (CreateFuncParser.SqlParserResult funcInfo : funcList) {
235-
FlinkUtil.registerUDF(funcInfo.getType(), funcInfo.getClassName(), funcInfo.getName(),
236-
tableEnv, classLoader);
220+
FlinkUtil.registerUDF(funcInfo.getType(), funcInfo.getClassName(), funcInfo.getName(), tableEnv, jarURList);
237221
}
238222
}
239223

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.classloader;
20+
21+
import com.dtstack.flink.sql.util.PluginUtil;
22+
import org.apache.commons.lang3.StringUtils;
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
26+
import java.net.URL;
27+
import java.util.ArrayList;
28+
import java.util.Arrays;
29+
import java.util.Comparator;
30+
import java.util.List;
31+
import java.util.Map;
32+
import java.util.concurrent.ConcurrentHashMap;
33+
34+
/**
35+
* company: www.dtstack.com
36+
* author: toutian
37+
* create: 2019/10/14
38+
*/
39+
public class ClassLoaderManager {
40+
41+
private static final Logger LOG = LoggerFactory.getLogger(ClassLoaderManager.class);
42+
43+
private static Map<String, DtClassLoader> pluginClassLoader = new ConcurrentHashMap<>();
44+
45+
public static <R> R newInstance(String pluginJarPath, ClassLoaderSupplier<R> supplier) throws Exception {
46+
ClassLoader classLoader = retrieveClassLoad(pluginJarPath);
47+
return ClassLoaderSupplierCallBack.callbackAndReset(supplier, classLoader);
48+
}
49+
50+
public static <R> R newInstance(List<URL> jarUrls, ClassLoaderSupplier<R> supplier) throws Exception {
51+
ClassLoader classLoader = retrieveClassLoad(jarUrls);
52+
return ClassLoaderSupplierCallBack.callbackAndReset(supplier, classLoader);
53+
}
54+
55+
private static DtClassLoader retrieveClassLoad(String pluginJarPath) {
56+
return pluginClassLoader.computeIfAbsent(pluginJarPath, k -> {
57+
try {
58+
URL[] urls = PluginUtil.getPluginJarUrls(pluginJarPath);
59+
ClassLoader parentClassLoader = Thread.currentThread().getContextClassLoader();
60+
DtClassLoader classLoader = new DtClassLoader(urls, parentClassLoader);
61+
LOG.info("pluginJarPath:{} create ClassLoad successful...", pluginJarPath);
62+
return classLoader;
63+
} catch (Throwable e) {
64+
LOG.error("retrieve ClassLoad happens error:{}", e);
65+
throw new RuntimeException("retrieve ClassLoad happens error");
66+
}
67+
});
68+
}
69+
70+
private static DtClassLoader retrieveClassLoad(List<URL> jarUrls) {
71+
jarUrls.sort(Comparator.comparing(URL::toString));
72+
String jarUrlkey = StringUtils.join(jarUrls, "_");
73+
return pluginClassLoader.computeIfAbsent(jarUrlkey, k -> {
74+
try {
75+
URL[] urls = jarUrls.toArray(new URL[jarUrls.size()]);
76+
ClassLoader parentClassLoader = Thread.currentThread().getContextClassLoader();
77+
DtClassLoader classLoader = new DtClassLoader(urls, parentClassLoader);
78+
LOG.info("jarUrl:{} create ClassLoad successful...", jarUrlkey);
79+
return classLoader;
80+
} catch (Throwable e) {
81+
LOG.error("retrieve ClassLoad happens error:{}", e);
82+
throw new RuntimeException("retrieve ClassLoad happens error");
83+
}
84+
});
85+
}
86+
87+
public static List<URL> getClassPath() {
88+
List<URL> classPaths = new ArrayList<>();
89+
for (Map.Entry<String, DtClassLoader> entry : pluginClassLoader.entrySet()) {
90+
classPaths.addAll(Arrays.asList(entry.getValue().getURLs()));
91+
}
92+
return classPaths;
93+
}
94+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
20+
package com.dtstack.flink.sql.classloader;
21+
22+
/**
23+
* Represents a supplier of results.
24+
*
25+
* <p>There is no requirement that a new or distinct result be returned each
26+
* time the supplier is invoked.
27+
*
28+
* <p>This is a <a href="package-summary.html">functional interface</a>
29+
* whose functional method is {@link #get()}.
30+
*
31+
* @param <T> the type of results supplied by this supplier
32+
*
33+
* @since 1.8
34+
*/
35+
@FunctionalInterface
36+
public interface ClassLoaderSupplier<T> {
37+
38+
/**
39+
* Gets a result.
40+
*
41+
* @return a result
42+
*/
43+
T get(ClassLoader cl) throws Exception;
44+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
20+
package com.dtstack.flink.sql.classloader;
21+
22+
/**
23+
* company: www.dtstack.com
24+
* author: toutian
25+
* create: 2019/10/14
26+
*/
27+
public class ClassLoaderSupplierCallBack {
28+
29+
public static <R> R callbackAndReset(ClassLoaderSupplier<R> supplier, ClassLoader toSetClassLoader) throws Exception {
30+
ClassLoader oldClassLoader = Thread.currentThread().getContextClassLoader();
31+
Thread.currentThread().setContextClassLoader(toSetClassLoader);
32+
try {
33+
return supplier.get(toSetClassLoader);
34+
} finally {
35+
Thread.currentThread().setContextClassLoader(oldClassLoader);
36+
}
37+
}
38+
39+
40+
}

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818

1919

20+
2021
package com.dtstack.flink.sql.side;
2122

2223
import com.dtstack.flink.sql.enums.ECacheType;
@@ -362,8 +363,14 @@ private SqlNode replaceOrderByTableName(SqlNode orderNode, String tableAlias) {
362363
private SqlNode replaceNodeInfo(SqlNode groupNode, HashBasedTable<String, String, String> mappingTable, String tableAlias){
363364
if(groupNode.getKind() == IDENTIFIER){
364365
SqlIdentifier sqlIdentifier = (SqlIdentifier) groupNode;
366+
if(sqlIdentifier.names.size() == 1){
367+
return sqlIdentifier;
368+
}
365369
String mappingFieldName = mappingTable.get(sqlIdentifier.getComponent(0).getSimple(), sqlIdentifier.getComponent(1).getSimple());
366370

371+
if(mappingFieldName == null){
372+
throw new RuntimeException("can't find mapping fieldName:" + sqlIdentifier.toString() );
373+
}
367374
sqlIdentifier = sqlIdentifier.setName(0, tableAlias);
368375
return sqlIdentifier.setName(1, mappingFieldName);
369376
}else if(groupNode instanceof SqlBasicCall){

core/src/main/java/com/dtstack/flink/sql/side/StreamSideFactory.java

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,10 @@
1616
* limitations under the License.
1717
*/
1818

19-
2019

2120
package com.dtstack.flink.sql.side;
2221

23-
import com.dtstack.flink.sql.classloader.DtClassLoader;
22+
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
2423
import com.dtstack.flink.sql.enums.ECacheType;
2524
import com.dtstack.flink.sql.table.AbsSideTableParser;
2625
import com.dtstack.flink.sql.table.AbsTableParser;
@@ -30,6 +29,7 @@
3029
* get specify side parser
3130
* Date: 2018/7/25
3231
* Company: www.dtstack.com
32+
*
3333
* @author xuchao
3434
*/
3535

@@ -40,18 +40,15 @@ public class StreamSideFactory {
4040
public static AbsTableParser getSqlParser(String pluginType, String sqlRootDir, String cacheType) throws Exception {
4141

4242
String sideOperator = ECacheType.ALL.name().equals(cacheType) ? "all" : "async";
43-
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
4443
String pluginJarPath = PluginUtil.getSideJarFileDirPath(pluginType, sideOperator, "side", sqlRootDir);
45-
46-
DtClassLoader dtClassLoader = (DtClassLoader) classLoader;
47-
PluginUtil.addPluginJar(pluginJarPath, dtClassLoader);
4844
String className = PluginUtil.getSqlParserClassName(pluginType, CURR_TYPE);
4945

50-
Class<?> sideParser = dtClassLoader.loadClass(className);
51-
if(!AbsSideTableParser.class.isAssignableFrom(sideParser)){
52-
throw new RuntimeException("class " + sideParser.getName() + " not subClass of AbsSideTableParser");
53-
}
54-
55-
return sideParser.asSubclass(AbsTableParser.class).newInstance();
46+
return ClassLoaderManager.newInstance(pluginJarPath, (cl) -> {
47+
Class<?> sideParser = cl.loadClass(className);
48+
if (!AbsSideTableParser.class.isAssignableFrom(sideParser)) {
49+
throw new RuntimeException("class " + sideParser.getName() + " not subClass of AbsSideTableParser");
50+
}
51+
return sideParser.asSubclass(AbsTableParser.class).newInstance();
52+
});
5653
}
5754
}

core/src/main/java/com/dtstack/flink/sql/side/operator/SideAsyncOperator.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
package com.dtstack.flink.sql.side.operator;
2121

22-
import com.dtstack.flink.sql.classloader.DtClassLoader;
22+
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
2323
import com.dtstack.flink.sql.side.AsyncReqRow;
2424
import com.dtstack.flink.sql.side.FieldInfo;
2525
import com.dtstack.flink.sql.side.JoinInfo;
@@ -51,14 +51,13 @@ public class SideAsyncOperator {
5151

5252
private static AsyncReqRow loadAsyncReq(String sideType, String sqlRootDir, RowTypeInfo rowTypeInfo,
5353
JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) throws Exception {
54-
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
5554
String pathOfType = String.format(PATH_FORMAT, sideType);
5655
String pluginJarPath = PluginUtil.getJarFileDirPath(pathOfType, sqlRootDir);
57-
DtClassLoader dtClassLoader = (DtClassLoader) classLoader;
58-
PluginUtil.addPluginJar(pluginJarPath, dtClassLoader);
5956
String className = PluginUtil.getSqlSideClassName(sideType, "side", OPERATOR_TYPE);
60-
return dtClassLoader.loadClass(className).asSubclass(AsyncReqRow.class)
61-
.getConstructor(RowTypeInfo.class, JoinInfo.class, List.class, SideTableInfo.class).newInstance(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
57+
return ClassLoaderManager.newInstance(pluginJarPath, (cl) ->
58+
cl.loadClass(className).asSubclass(AsyncReqRow.class)
59+
.getConstructor(RowTypeInfo.class, JoinInfo.class, List.class, SideTableInfo.class)
60+
.newInstance(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
6261
}
6362

6463
public static DataStream getSideJoinDataStream(DataStream inputStream, String sideType, String sqlRootDir, RowTypeInfo rowTypeInfo, JoinInfo joinInfo,

core/src/main/java/com/dtstack/flink/sql/side/operator/SideWithAllCacheOperator.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
package com.dtstack.flink.sql.side.operator;
2121

22-
import com.dtstack.flink.sql.classloader.DtClassLoader;
22+
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
2323
import com.dtstack.flink.sql.side.AllReqRow;
2424
import com.dtstack.flink.sql.side.FieldInfo;
2525
import com.dtstack.flink.sql.side.JoinInfo;
@@ -28,7 +28,6 @@
2828
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2929
import org.apache.flink.streaming.api.datastream.DataStream;
3030

31-
import java.net.MalformedURLException;
3231
import java.util.List;
3332

3433
/**
@@ -49,18 +48,13 @@ private static AllReqRow loadFlatMap(String sideType, String sqlRootDir, RowType
4948
JoinInfo joinInfo, List<FieldInfo> outFieldInfoList,
5049
SideTableInfo sideTableInfo) throws Exception {
5150

52-
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
5351
String pathOfType = String.format(PATH_FORMAT, sideType);
5452
String pluginJarPath = PluginUtil.getJarFileDirPath(pathOfType, sqlRootDir);
55-
56-
DtClassLoader dtClassLoader = (DtClassLoader) classLoader;
57-
PluginUtil.addPluginJar(pluginJarPath, dtClassLoader);
5853
String className = PluginUtil.getSqlSideClassName(sideType, "side", OPERATOR_TYPE);
5954

60-
return dtClassLoader.loadClass(className).asSubclass(AllReqRow.class).getConstructor(RowTypeInfo.class, JoinInfo.class, List.class, SideTableInfo.class)
61-
.newInstance(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
62-
63-
55+
return ClassLoaderManager.newInstance(pluginJarPath, (cl) -> cl.loadClass(className).asSubclass(AllReqRow.class)
56+
.getConstructor(RowTypeInfo.class, JoinInfo.class, List.class, SideTableInfo.class)
57+
.newInstance(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
6458
}
6559

6660
public static DataStream getSideJoinDataStream(DataStream inputStream, String sideType, String sqlRootDir, RowTypeInfo rowTypeInfo, JoinInfo joinInfo,

0 commit comments

Comments
 (0)