Skip to content

Commit fea3f25

Browse files
committed
IGNITE-28498 Remove Serializable from Zookeeper discovery custom messages
1 parent 851caa7 commit fea3f25

14 files changed

Lines changed: 120 additions & 63 deletions

File tree

modules/codegen/src/main/java/org/apache/ignite/internal/MessageProcessor.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,14 @@
1818
package org.apache.ignite.internal;
1919

2020
import java.util.ArrayList;
21+
import java.util.Arrays;
2122
import java.util.Comparator;
2223
import java.util.HashMap;
2324
import java.util.List;
2425
import java.util.Map;
26+
import java.util.Objects;
2527
import java.util.Set;
28+
import java.util.stream.Collectors;
2629
import javax.annotation.processing.AbstractProcessor;
2730
import javax.annotation.processing.RoundEnvironment;
2831
import javax.annotation.processing.SupportedAnnotationTypes;
@@ -72,8 +75,11 @@ public class MessageProcessor extends AbstractProcessor {
7275
/** Externalizable message. */
7376
static final String MARSHALLABLE_MESSAGE_INTERFACE = "org.apache.ignite.plugin.extensions.communication.MarshallableMessage";
7477

75-
/** This is the only message with zero fields. A serializer must be generated due to restrictions in our communication process. */
76-
static final String HANDSHAKE_WAIT_MESSAGE = "org.apache.ignite.spi.communication.tcp.messages.HandshakeWaitMessage";
78+
/** Messages with no fields. A serializer must be generated due to restrictions in our communication process. */
79+
static final String[] EMPTY_MESSAGES = {
80+
"org.apache.ignite.spi.communication.tcp.messages.HandshakeWaitMessage",
81+
"org.apache.ignite.spi.discovery.zk.internal.ZkNoServersMessage"
82+
};
7783

7884
/** */
7985
private final Map<String, IgniteBiTuple<String, String>> enumMappersInUse = new HashMap<>();
@@ -83,7 +89,11 @@ public class MessageProcessor extends AbstractProcessor {
8389
*/
8490
@Override public boolean process(Set<? extends TypeElement> annotations, RoundEnvironment roundEnv) {
8591
TypeMirror msgType = processingEnv.getElementUtils().getTypeElement(MESSAGE_INTERFACE).asType();
86-
TypeMirror handshakeWaitMsgType = processingEnv.getElementUtils().getTypeElement(HANDSHAKE_WAIT_MESSAGE).asType();
92+
List<TypeMirror> emptyMsgs = Arrays.stream(EMPTY_MESSAGES)
93+
.map(cls -> processingEnv.getElementUtils().getTypeElement(cls))
94+
.filter(Objects::nonNull)
95+
.map(Element::asType)
96+
.collect(Collectors.toList());
8797

8898
Map<TypeElement, List<VariableElement>> msgFields = new HashMap<>();
8999

@@ -101,7 +111,7 @@ public class MessageProcessor extends AbstractProcessor {
101111

102112
List<VariableElement> fields = orderedFields(clazz);
103113

104-
if (!fields.isEmpty() || processingEnv.getTypeUtils().isAssignable(clazz.asType(), handshakeWaitMsgType))
114+
if (!fields.isEmpty() || emptyMsgs.stream().anyMatch(t -> processingEnv.getTypeUtils().isAssignable(clazz.asType(), t)))
105115
msgFields.put(clazz, fields);
106116
}
107117

modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@
103103
import org.apache.ignite.internal.managers.deployment.GridDeploymentManager;
104104
import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData;
105105
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
106+
import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
106107
import org.apache.ignite.internal.managers.encryption.GridEncryptionManager;
107108
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
108109
import org.apache.ignite.internal.managers.failover.GridFailoverManager;
@@ -214,6 +215,7 @@
214215
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
215216
import org.apache.ignite.spi.IgniteSpi;
216217
import org.apache.ignite.spi.IgniteSpiVersionCheckException;
218+
import org.apache.ignite.spi.discovery.DiscoverySpi;
217219
import org.apache.ignite.spi.discovery.isolated.IsolatedDiscoverySpi;
218220
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
219221
import org.apache.ignite.spi.tracing.TracingConfigurationManager;
@@ -1315,9 +1317,6 @@ else if (e instanceof IgniteCheckedException)
13151317
private void initMessageFactory() throws IgniteCheckedException {
13161318
MessageFactoryProvider[] msgs = ctx.plugins().extensions(MessageFactoryProvider.class);
13171319

1318-
if (msgs == null)
1319-
msgs = new MessageFactoryProvider[0];
1320-
13211320
List<MessageFactoryProvider> compMsgs = new ArrayList<>();
13221321

13231322
compMsgs.add(new CoreMessagesProvider(ctx.marshaller(), ctx.marshallerContext().jdkMarshaller(),
@@ -1330,6 +1329,15 @@ private void initMessageFactory() throws IgniteCheckedException {
13301329
compMsgs.add(f);
13311330
}
13321331

1332+
DiscoverySpi discoSpi = ctx.config().getDiscoverySpi();
1333+
1334+
if (discoSpi instanceof IgniteDiscoverySpi) {
1335+
MessageFactoryProvider discoMsgs = ((IgniteDiscoverySpi)discoSpi).messageFactoryProvider();
1336+
1337+
if (discoMsgs != null)
1338+
compMsgs.add(discoMsgs);
1339+
}
1340+
13331341
if (!compMsgs.isEmpty())
13341342
msgs = F.concat(msgs, compMsgs.toArray(new MessageFactoryProvider[compMsgs.size()]));
13351343

modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.util.UUID;
2121
import org.apache.ignite.cluster.ClusterNode;
22+
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
2223
import org.apache.ignite.spi.discovery.DiscoverySpi;
2324

2425
/**
@@ -57,4 +58,9 @@ public interface IgniteDiscoverySpi extends DiscoverySpi {
5758
* @param err Connection error.
5859
*/
5960
public void resolveCommunicationFailure(ClusterNode node, Exception err);
61+
62+
/** @return Message factory provider. */
63+
public default MessageFactoryProvider messageFactoryProvider() {
64+
return null;
65+
}
6066
}

modules/zookeeper/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,12 @@
4040
<artifactId>ignite-core</artifactId>
4141
</dependency>
4242

43+
<dependency>
44+
<groupId>${project.groupId}</groupId>
45+
<artifactId>ignite-codegen</artifactId>
46+
<scope>provided</scope>
47+
</dependency>
48+
4349
<dependency>
4450
<groupId>org.apache.zookeeper</groupId>
4551
<artifactId>zookeeper</artifactId>

modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.ignite.internal.util.typedef.internal.U;
4040
import org.apache.ignite.lang.IgniteBiTuple;
4141
import org.apache.ignite.lang.IgniteProductVersion;
42+
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
4243
import org.apache.ignite.resources.LoggerResource;
4344
import org.apache.ignite.spi.IgniteSpiAdapter;
4445
import org.apache.ignite.spi.IgniteSpiConfiguration;
@@ -57,6 +58,7 @@
5758
import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator;
5859
import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport;
5960
import org.apache.ignite.spi.discovery.zk.internal.ZkIgnitePaths;
61+
import org.apache.ignite.spi.discovery.zk.internal.ZkMessageFactory;
6062
import org.apache.ignite.spi.discovery.zk.internal.ZookeeperClusterNode;
6163
import org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryImpl;
6264
import org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryStatistics;
@@ -554,6 +556,11 @@ private ZookeeperClusterNode initLocalNode() {
554556
return locNode;
555557
}
556558

559+
/** {@inheritDoc} */
560+
@Override public MessageFactoryProvider messageFactoryProvider() {
561+
return new ZkMessageFactory();
562+
}
563+
557564
/** {@inheritDoc} */
558565
@Override public String toString() {
559566
return S.toString(ZookeeperDiscoverySpi.class, this);

modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/DiscoveryMessageParser.java

Lines changed: 2 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@
2828
import java.util.zip.InflaterInputStream;
2929
import org.apache.ignite.internal.direct.DirectMessageReader;
3030
import org.apache.ignite.internal.direct.DirectMessageWriter;
31-
import org.apache.ignite.internal.util.typedef.internal.U;
32-
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
3331
import org.apache.ignite.plugin.extensions.communication.Message;
3432
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
3533
import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
@@ -42,25 +40,14 @@
4240
* Class is responsible for serializing discovery messages using RU-ready {@link MessageSerializer} mechanism.
4341
*/
4442
public class DiscoveryMessageParser {
45-
/** Leading byte for messages use {@link JdkMarshaller} for serialization. */
46-
// TODO: remove these flags after refactoring all discovery messages.
47-
private static final byte JAVA_SERIALIZATION = (byte)1;
48-
49-
/** Leading byte for messages use {@link MessageSerializer} for serialization. */
50-
private static final byte MESSAGE_SERIALIZATION = (byte)2;
51-
5243
/** Size for an intermediate buffer for serializing discovery messages. */
5344
private static final int MSG_BUFFER_SIZE = 100;
5445

55-
/** */
56-
private final JdkMarshaller jdkMarshaller;
57-
5846
/** */
5947
private final MessageFactory msgFactory;
6048

6149
/** */
62-
public DiscoveryMessageParser(JdkMarshaller jdkMarshaller, MessageFactory msgFactory) {
63-
this.jdkMarshaller = jdkMarshaller;
50+
public DiscoveryMessageParser(MessageFactory msgFactory) {
6451
this.msgFactory = msgFactory;
6552
}
6653

@@ -69,16 +56,7 @@ public byte[] marshalZip(DiscoverySpiCustomMessage msg) {
6956
ByteArrayOutputStream baos = new ByteArrayOutputStream();
7057

7158
try (DeflaterOutputStream out = new DeflaterOutputStream(baos)) {
72-
if (msg instanceof Message) {
73-
out.write(MESSAGE_SERIALIZATION);
74-
75-
serializeMessage((Message)msg, out);
76-
}
77-
else {
78-
out.write(JAVA_SERIALIZATION);
79-
80-
U.marshal(jdkMarshaller, msg, out);
81-
}
59+
serializeMessage((Message)msg, out);
8260
}
8361
catch (Exception e) {
8462
throw new IgniteSpiException("Failed to serialize message: " + msg, e);
@@ -93,14 +71,6 @@ public DiscoverySpiCustomMessage unmarshalZip(byte[] bytes) {
9371
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
9472
InflaterInputStream in = new InflaterInputStream(bais)
9573
) {
96-
byte mode = (byte)in.read();
97-
98-
if (mode == JAVA_SERIALIZATION)
99-
return U.unmarshal(jdkMarshaller, in, U.gridClassLoader());
100-
101-
if (MESSAGE_SERIALIZATION != mode)
102-
throw new IOException("Received unexpected byte while reading discovery message: " + mode);
103-
10474
return (DiscoverySpiCustomMessage)deserializeMessage(in);
10575
}
10676
catch (Exception e) {

modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,25 +18,32 @@
1818
package org.apache.ignite.spi.discovery.zk.internal;
1919

2020
import java.util.UUID;
21+
import org.apache.ignite.internal.Order;
2122
import org.apache.ignite.internal.util.typedef.internal.S;
23+
import org.apache.ignite.plugin.extensions.communication.Message;
24+
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
2225
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
2326
import org.jetbrains.annotations.Nullable;
2427

2528
/**
2629
*
2730
*/
28-
class ZkCommunicationErrorResolveFinishMessage implements DiscoverySpiCustomMessage, ZkInternalMessage {
31+
class ZkCommunicationErrorResolveFinishMessage implements DiscoverySpiCustomMessage, ZkInternalMessage, Message {
2932
/** */
30-
private static final long serialVersionUID = 0L;
33+
@Order(0)
34+
UUID futId;
3135

3236
/** */
33-
final UUID futId;
37+
@Order(1)
38+
long topVer;
3439

3540
/** */
36-
final long topVer;
41+
ZkCommunicationErrorResolveResult res;
3742

38-
/** */
39-
transient ZkCommunicationErrorResolveResult res;
43+
/** Constructor for {@link MessageFactory}. */
44+
public ZkCommunicationErrorResolveFinishMessage() {
45+
// No-op.
46+
}
4047

4148
/**
4249
* @param futId Future ID.

modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,25 @@
1818
package org.apache.ignite.spi.discovery.zk.internal;
1919

2020
import java.util.UUID;
21+
import org.apache.ignite.internal.Order;
2122
import org.apache.ignite.internal.util.typedef.internal.S;
23+
import org.apache.ignite.plugin.extensions.communication.Message;
24+
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
2225
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
2326
import org.jetbrains.annotations.Nullable;
2427

2528
/**
2629
* Zk Communication Error Resolve Start Message.
2730
*/
28-
public class ZkCommunicationErrorResolveStartMessage implements DiscoverySpiCustomMessage, ZkInternalMessage {
31+
public class ZkCommunicationErrorResolveStartMessage implements DiscoverySpiCustomMessage, ZkInternalMessage, Message {
2932
/** */
30-
private static final long serialVersionUID = 0L;
33+
@Order(0)
34+
UUID id;
3135

32-
/** */
33-
final UUID id;
36+
/** Constructor for {@link MessageFactory}. */
37+
public ZkCommunicationErrorResolveStartMessage() {
38+
// No-op.
39+
}
3440

3541
/**
3642
* @param id Unique ID.

modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,29 @@
1717

1818
package org.apache.ignite.spi.discovery.zk.internal;
1919

20+
import org.apache.ignite.internal.Order;
2021
import org.apache.ignite.internal.util.typedef.internal.S;
22+
import org.apache.ignite.plugin.extensions.communication.Message;
23+
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
2124
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
2225
import org.jetbrains.annotations.Nullable;
2326

2427
/**
2528
* Zk Force Node Fail Message.
2629
*/
27-
public class ZkForceNodeFailMessage implements DiscoverySpiCustomMessage, ZkInternalMessage {
30+
public class ZkForceNodeFailMessage implements DiscoverySpiCustomMessage, ZkInternalMessage, Message {
2831
/** */
29-
private static final long serialVersionUID = 0L;
32+
@Order(0)
33+
long nodeInternalId;
3034

3135
/** */
32-
final long nodeInternalId;
36+
@Order(1)
37+
String warning;
3338

34-
/** */
35-
final String warning;
39+
/** Constructor for {@link MessageFactory}. */
40+
public ZkForceNodeFailMessage() {
41+
// No-op.
42+
}
3643

3744
/**
3845
* @param nodeInternalId Node ID.

modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@
1717

1818
package org.apache.ignite.spi.discovery.zk.internal;
1919

20+
import java.io.Serializable;
21+
2022
/**
2123
*
2224
*/
23-
class ZkInternalJoinErrorMessage implements ZkInternalMessage {
25+
class ZkInternalJoinErrorMessage implements ZkInternalMessage, Serializable {
2426
/** */
2527
private static final long serialVersionUID = 0L;
2628

0 commit comments

Comments
 (0)