Skip to content

Commit f574cd3

Browse files
committed
IGNITE-28498 Remove Serializable from Zookeeper discovery custom messages
1 parent 7ca39e3 commit f574cd3

10 files changed

Lines changed: 87 additions & 54 deletions

File tree

modules/codegen/src/main/java/org/apache/ignite/internal/MessageProcessor.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,14 @@
1818
package org.apache.ignite.internal;
1919

2020
import java.util.ArrayList;
21+
import java.util.Arrays;
2122
import java.util.Comparator;
2223
import java.util.HashMap;
2324
import java.util.List;
2425
import java.util.Map;
26+
import java.util.Objects;
2527
import java.util.Set;
28+
import java.util.stream.Collectors;
2629
import javax.annotation.processing.AbstractProcessor;
2730
import javax.annotation.processing.RoundEnvironment;
2831
import javax.annotation.processing.SupportedAnnotationTypes;
@@ -72,8 +75,11 @@ public class MessageProcessor extends AbstractProcessor {
7275
/** Externalizable message. */
7376
static final String MARSHALLABLE_MESSAGE_INTERFACE = "org.apache.ignite.plugin.extensions.communication.MarshallableMessage";
7477

75-
/** This is the only message with zero fields. A serializer must be generated due to restrictions in our communication process. */
76-
static final String HANDSHAKE_WAIT_MESSAGE = "org.apache.ignite.spi.communication.tcp.messages.HandshakeWaitMessage";
78+
/** Messages with no fields. A serializer must be generated due to restrictions in our communication process. */
79+
static final String[] EMPTY_MESSAGES = {
80+
"org.apache.ignite.spi.communication.tcp.messages.HandshakeWaitMessage",
81+
"org.apache.ignite.spi.discovery.zk.internal.ZkNoServersMessage"
82+
};
7783

7884
/** */
7985
private final Map<String, IgniteBiTuple<String, String>> enumMappersInUse = new HashMap<>();
@@ -83,7 +89,11 @@ public class MessageProcessor extends AbstractProcessor {
8389
*/
8490
@Override public boolean process(Set<? extends TypeElement> annotations, RoundEnvironment roundEnv) {
8591
TypeMirror msgType = processingEnv.getElementUtils().getTypeElement(MESSAGE_INTERFACE).asType();
86-
TypeMirror handshakeWaitMsgType = processingEnv.getElementUtils().getTypeElement(HANDSHAKE_WAIT_MESSAGE).asType();
92+
List<TypeMirror> emptyMsgs = Arrays.stream(EMPTY_MESSAGES)
93+
.map(cls -> processingEnv.getElementUtils().getTypeElement(cls))
94+
.filter(Objects::nonNull)
95+
.map(Element::asType)
96+
.collect(Collectors.toList());
8797

8898
Map<TypeElement, List<VariableElement>> msgFields = new HashMap<>();
8999

@@ -101,7 +111,7 @@ public class MessageProcessor extends AbstractProcessor {
101111

102112
List<VariableElement> fields = orderedFields(clazz);
103113

104-
if (!fields.isEmpty() || processingEnv.getTypeUtils().isAssignable(clazz.asType(), handshakeWaitMsgType))
114+
if (!fields.isEmpty() || emptyMsgs.stream().anyMatch(t -> processingEnv.getTypeUtils().isAssignable(clazz.asType(), t)))
105115
msgFields.put(clazz, fields);
106116
}
107117

modules/zookeeper/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,12 @@
4040
<artifactId>ignite-core</artifactId>
4141
</dependency>
4242

43+
<dependency>
44+
<groupId>${project.groupId}</groupId>
45+
<artifactId>ignite-codegen</artifactId>
46+
<scope>provided</scope>
47+
</dependency>
48+
4349
<dependency>
4450
<groupId>org.apache.zookeeper</groupId>
4551
<artifactId>zookeeper</artifactId>

modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/DiscoveryMessageParser.java

Lines changed: 2 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
3333
import org.apache.ignite.internal.util.typedef.internal.U;
3434
import org.apache.ignite.marshaller.Marshaller;
35-
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
3635
import org.apache.ignite.plugin.extensions.communication.Message;
3736
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
3837
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
@@ -46,44 +45,24 @@
4645
* Class is responsible for serializing discovery messages using RU-ready {@link MessageSerializer} mechanism.
4746
*/
4847
public class DiscoveryMessageParser {
49-
/** Leading byte for messages use {@link JdkMarshaller} for serialization. */
50-
// TODO: remove these flags after refactoring all discovery messages.
51-
private static final byte JAVA_SERIALIZATION = (byte)1;
52-
53-
/** Leading byte for messages use {@link MessageSerializer} for serialization. */
54-
private static final byte MESSAGE_SERIALIZATION = (byte)2;
55-
5648
/** Size for an intermediate buffer for serializing discovery messages. */
5749
private static final int MSG_BUFFER_SIZE = 100;
5850

5951
/** */
6052
private final MessageFactory msgFactory;
6153

62-
/** */
63-
private final Marshaller marsh;
64-
6554
/** */
6655
public DiscoveryMessageParser(Marshaller marsh) {
67-
this.marsh = marsh;
6856
this.msgFactory = new IgniteMessageFactoryImpl(
69-
new MessageFactoryProvider[] { new DiscoveryMessageFactory(marsh, U.gridClassLoader()) });
57+
new MessageFactoryProvider[] { new DiscoveryMessageFactory(marsh, U.gridClassLoader()), new ZkMessageFactory() });
7058
}
7159

7260
/** Marshals discovery message to bytes array. */
7361
public byte[] marshalZip(DiscoverySpiCustomMessage msg) {
7462
ByteArrayOutputStream baos = new ByteArrayOutputStream();
7563

7664
try (DeflaterOutputStream out = new DeflaterOutputStream(baos)) {
77-
if (msg instanceof Message) {
78-
out.write(MESSAGE_SERIALIZATION);
79-
80-
serializeMessage((Message)msg, out);
81-
}
82-
else {
83-
out.write(JAVA_SERIALIZATION);
84-
85-
U.marshal(marsh, msg, out);
86-
}
65+
serializeMessage(msg, out);
8766
}
8867
catch (Exception e) {
8968
throw new IgniteSpiException("Failed to serialize message: " + msg, e);
@@ -98,14 +77,6 @@ public DiscoverySpiCustomMessage unmarshalZip(byte[] bytes) {
9877
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
9978
InflaterInputStream in = new InflaterInputStream(bais)
10079
) {
101-
byte mode = (byte)in.read();
102-
103-
if (mode == JAVA_SERIALIZATION)
104-
return U.unmarshal(marsh, in, U.gridClassLoader());
105-
106-
if (MESSAGE_SERIALIZATION != mode)
107-
throw new IOException("Received unexpected byte while reading discovery message: " + mode);
108-
10980
return (DiscoverySpiCustomMessage)deserializeMessage(in);
11081
}
11182
catch (Exception e) {

modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveFinishMessage.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
package org.apache.ignite.spi.discovery.zk.internal;
1919

2020
import java.util.UUID;
21+
import org.apache.ignite.internal.Order;
2122
import org.apache.ignite.internal.util.typedef.internal.S;
23+
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
2224
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
2325
import org.jetbrains.annotations.Nullable;
2426

@@ -27,16 +29,20 @@
2729
*/
2830
class ZkCommunicationErrorResolveFinishMessage implements DiscoverySpiCustomMessage, ZkInternalMessage {
2931
/** */
30-
private static final long serialVersionUID = 0L;
32+
@Order(0)
33+
UUID futId;
3134

3235
/** */
33-
final UUID futId;
36+
@Order(1)
37+
long topVer;
3438

3539
/** */
36-
final long topVer;
40+
ZkCommunicationErrorResolveResult res;
3741

38-
/** */
39-
transient ZkCommunicationErrorResolveResult res;
42+
/** Constructor for {@link MessageFactory}. */
43+
public ZkCommunicationErrorResolveFinishMessage() {
44+
// No-op.
45+
}
4046

4147
/**
4248
* @param futId Future ID.

modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkCommunicationErrorResolveStartMessage.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
package org.apache.ignite.spi.discovery.zk.internal;
1919

2020
import java.util.UUID;
21+
import org.apache.ignite.internal.Order;
2122
import org.apache.ignite.internal.util.typedef.internal.S;
23+
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
2224
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
2325
import org.jetbrains.annotations.Nullable;
2426

@@ -27,10 +29,13 @@
2729
*/
2830
public class ZkCommunicationErrorResolveStartMessage implements DiscoverySpiCustomMessage, ZkInternalMessage {
2931
/** */
30-
private static final long serialVersionUID = 0L;
32+
@Order(0)
33+
UUID id;
3134

32-
/** */
33-
final UUID id;
35+
/** Constructor for {@link MessageFactory}. */
36+
public ZkCommunicationErrorResolveStartMessage() {
37+
// No-op.
38+
}
3439

3540
/**
3641
* @param id Unique ID.

modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkForceNodeFailMessage.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717

1818
package org.apache.ignite.spi.discovery.zk.internal;
1919

20+
import org.apache.ignite.internal.Order;
2021
import org.apache.ignite.internal.util.typedef.internal.S;
22+
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
2123
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
2224
import org.jetbrains.annotations.Nullable;
2325

@@ -26,13 +28,17 @@
2628
*/
2729
public class ZkForceNodeFailMessage implements DiscoverySpiCustomMessage, ZkInternalMessage {
2830
/** */
29-
private static final long serialVersionUID = 0L;
31+
@Order(0)
32+
long nodeInternalId;
3033

3134
/** */
32-
final long nodeInternalId;
35+
@Order(1)
36+
String warning;
3337

34-
/** */
35-
final String warning;
38+
/** Constructor for {@link MessageFactory}. */
39+
public ZkForceNodeFailMessage() {
40+
// No-op.
41+
}
3642

3743
/**
3844
* @param nodeInternalId Node ID.

modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalJoinErrorMessage.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@
1717

1818
package org.apache.ignite.spi.discovery.zk.internal;
1919

20+
import java.io.Serializable;
21+
2022
/**
2123
*
2224
*/
23-
class ZkInternalJoinErrorMessage implements ZkInternalMessage {
25+
class ZkInternalJoinErrorMessage implements ZkInternalMessage, Serializable {
2426
/** */
2527
private static final long serialVersionUID = 0L;
2628

modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkInternalMessage.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,9 @@
1717

1818
package org.apache.ignite.spi.discovery.zk.internal;
1919

20-
import java.io.Serializable;
21-
2220
/**
2321
*
2422
*/
25-
interface ZkInternalMessage extends Serializable {
23+
interface ZkInternalMessage {
2624
// No-op.
2725
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.spi.discovery.zk.internal;
19+
20+
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
21+
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
22+
23+
/** */
24+
public class ZkMessageFactory implements MessageFactoryProvider {
25+
/** {@inheritDoc} */
26+
@Override public void registerAll(MessageFactory factory) {
27+
factory.register(400, ZkCommunicationErrorResolveFinishMessage::new, new ZkCommunicationErrorResolveFinishMessageSerializer());
28+
factory.register(401, ZkCommunicationErrorResolveStartMessage::new, new ZkCommunicationErrorResolveStartMessageSerializer());
29+
factory.register(402, ZkForceNodeFailMessage::new, new ZkForceNodeFailMessageSerializer());
30+
factory.register(403, ZkNoServersMessage::new, new ZkNoServersMessageSerializer());
31+
}
32+
}

modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkNoServersMessage.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,6 @@
2525
*
2626
*/
2727
class ZkNoServersMessage implements DiscoverySpiCustomMessage, ZkInternalMessage {
28-
/** */
29-
private static final long serialVersionUID = 0L;
30-
3128
/** {@inheritDoc} */
3229
@Nullable @Override public DiscoverySpiCustomMessage ackMessage() {
3330
return null;

0 commit comments

Comments
 (0)