Skip to content

Commit 36f2249

Browse files
committed
IGNITE-28058 Revise custom serialization in DistributedMetaStorage* messages
1 parent 5208e0f commit 36f2249

4 files changed

Lines changed: 70 additions & 36 deletions

File tree

modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -116,11 +116,11 @@
116116
import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageCasAckMessage;
117117
import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageCasAckMessageSerializer;
118118
import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageCasMessage;
119-
import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageCasMessageSerializer;
119+
import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageCasMessageMarshallableSerializer;
120120
import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUpdateAckMessage;
121121
import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUpdateAckMessageSerializer;
122122
import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUpdateMessage;
123-
import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUpdateMessageSerializer;
123+
import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUpdateMessageMarshallableSerializer;
124124
import org.apache.ignite.internal.processors.query.QueryField;
125125
import org.apache.ignite.internal.processors.query.QueryFieldMarshallableSerializer;
126126
import org.apache.ignite.internal.processors.query.schema.message.SchemaFinishDiscoveryMessage;
@@ -302,9 +302,11 @@ public DiscoveryMessageFactory(Marshaller marsh, ClassLoader clsLdr) {
302302
factory.register(22, TcpDiscoveryServerOnlyCustomEventMessage::new,
303303
new TcpDiscoveryServerOnlyCustomEventMessageMarshallableSerializer(marsh, clsLdr));
304304
factory.register(23, TcpConnectionRequestDiscoveryMessage::new, new TcpConnectionRequestDiscoveryMessageSerializer());
305-
factory.register(24, DistributedMetaStorageUpdateMessage::new, new DistributedMetaStorageUpdateMessageSerializer());
305+
factory.register(24, DistributedMetaStorageUpdateMessage::new,
306+
new DistributedMetaStorageUpdateMessageMarshallableSerializer(marsh, clsLdr));
306307
factory.register(25, DistributedMetaStorageUpdateAckMessage::new, new DistributedMetaStorageUpdateAckMessageSerializer());
307-
factory.register(26, DistributedMetaStorageCasMessage::new, new DistributedMetaStorageCasMessageSerializer());
308+
factory.register(26, DistributedMetaStorageCasMessage::new,
309+
new DistributedMetaStorageCasMessageMarshallableSerializer(marsh, clsLdr));
308310
factory.register(27, DistributedMetaStorageCasAckMessage::new, new DistributedMetaStorageCasAckMessageSerializer());
309311
factory.register(28, TcpDiscoveryClientReconnectMessage::new, new TcpDiscoveryClientReconnectMessageSerializer());
310312
factory.register(29, TcpDiscoveryNodeAddedMessage::new, new TcpDiscoveryNodeAddedMessageMarshallableSerializer(marsh, clsLdr));

modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasMessage.java

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,28 @@
1717

1818
package org.apache.ignite.internal.processors.metastorage.persistence;
1919

20+
import java.io.Serializable;
2021
import java.util.UUID;
22+
import org.apache.ignite.IgniteCheckedException;
2123
import org.apache.ignite.internal.Order;
2224
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
2325
import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
2426
import org.apache.ignite.internal.util.typedef.internal.S;
27+
import org.apache.ignite.internal.util.typedef.internal.U;
28+
import org.apache.ignite.marshaller.Marshaller;
2529
import org.jetbrains.annotations.Nullable;
2630

2731
/** */
2832
public class DistributedMetaStorageCasMessage extends DistributedMetaStorageUpdateMessage {
2933
/** */
3034
private static final long serialVersionUID = 0L;
3135

32-
/** TODO: revise the external serialization https://issues.apache.org/jira/browse/IGNITE-28058. */
36+
/** */
37+
private @Nullable Serializable expVal;
38+
39+
/** */
3340
@Order(0)
34-
byte[] expectedVal;
41+
byte[] expValBytes;
3542

3643
/** */
3744
@Order(1)
@@ -43,16 +50,16 @@ public DistributedMetaStorageCasMessage() {
4350
}
4451

4552
/** */
46-
public DistributedMetaStorageCasMessage(UUID reqId, String key, byte[] expValBytes, byte[] valBytes) {
47-
super(reqId, key, valBytes);
53+
public DistributedMetaStorageCasMessage(UUID reqId, String key, @Nullable Serializable expVal, @Nullable Serializable val) {
54+
super(reqId, key, val);
4855

49-
expectedVal = expValBytes;
56+
this.expVal = expVal;
5057
matches = true;
5158
}
5259

5360
/** */
54-
public byte[] expectedValue() {
55-
return expectedVal;
61+
public Serializable expectedValue() {
62+
return expVal;
5663
}
5764

5865
/** */
@@ -67,9 +74,24 @@ public boolean matches() {
6774

6875
/** {@inheritDoc} */
6976
@Override @Nullable public DiscoveryCustomMessage ackMessage() {
70-
return new DistributedMetaStorageCasAckMessage(requestId(), matches);
77+
return new DistributedMetaStorageCasAckMessage(reqId, matches);
78+
}
79+
80+
/** {@inheritDoc} */
81+
@Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
82+
super.prepareMarshal(marsh);
83+
84+
if (expVal != null)
85+
expValBytes = U.marshal(marsh, expVal);
7186
}
7287

88+
/** {@inheritDoc} */
89+
@Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException {
90+
super.finishUnmarshal(marsh, clsLdr);
91+
92+
if (expValBytes != null)
93+
expVal = U.unmarshal(marsh, expValBytes, clsLdr);
94+
}
7395

7496
/** {@inheritDoc} */
7597
@Override public String toString() {

modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,6 @@
8484
import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageHistoryItem.EMPTY_ARRAY;
8585
import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.historyItemPrefix;
8686
import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.historyItemVer;
87-
import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.marshal;
8887
import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUtil.unmarshal;
8988
import static org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageVersion.INITIAL_VERSION;
9089
import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
@@ -479,7 +478,7 @@ private void onMetaStorageReadyForWrite(ReadWriteMetastorage metastorage) {
479478
@Override public void write(@NotNull String key, @NotNull Serializable val) throws IgniteCheckedException {
480479
assert val != null : key;
481480

482-
startWrite(key, marshal(marshaller, val)).get();
481+
startWrite(key, val).get();
483482
}
484483

485484
/** {@inheritDoc} */
@@ -489,7 +488,7 @@ private void onMetaStorageReadyForWrite(ReadWriteMetastorage metastorage) {
489488
) throws IgniteCheckedException {
490489
assert val != null : key;
491490

492-
return startWrite(key, marshal(marshaller, val));
491+
return startWrite(key, val);
493492
}
494493

495494
/** {@inheritDoc} */
@@ -521,7 +520,7 @@ private void onMetaStorageReadyForWrite(ReadWriteMetastorage metastorage) {
521520
) throws IgniteCheckedException {
522521
assert newVal != null : key;
523522

524-
return startCas(key, marshal(marshaller, expVal), marshal(marshaller, newVal));
523+
return startCas(key, expVal, newVal);
525524
}
526525

527526
/** {@inheritDoc} */
@@ -531,7 +530,7 @@ private void onMetaStorageReadyForWrite(ReadWriteMetastorage metastorage) {
531530
) throws IgniteCheckedException {
532531
assert expVal != null : key;
533532

534-
return startCas(key, marshal(marshaller, expVal), null).get();
533+
return startCas(key, expVal, null).get();
535534
}
536535

537536
/** {@inheritDoc} */
@@ -1046,28 +1045,28 @@ else if (!isClient && ver.id() > 0) {
10461045
* for operation to be completed.
10471046
*
10481047
* @param key The key.
1049-
* @param valBytes Value bytes to write. Null if value needs to be removed.
1048+
* @param val Value to write. Null if value needs to be removed.
10501049
* @throws IgniteCheckedException If there was an error while sending discovery message.
10511050
*/
1052-
private GridFutureAdapter<?> startWrite(String key, byte[] valBytes) throws IgniteCheckedException {
1051+
private GridFutureAdapter<?> startWrite(String key, @Nullable Serializable val) throws IgniteCheckedException {
10531052
UUID reqId = UUID.randomUUID();
10541053

10551054
GridFutureAdapter<?> fut = prepareWriteFuture(reqId);
10561055

10571056
if (fut.isDone())
10581057
return fut;
10591058

1060-
DiscoveryCustomMessage msg = new DistributedMetaStorageUpdateMessage(reqId, key, valBytes);
1059+
DiscoveryCustomMessage msg = new DistributedMetaStorageUpdateMessage(reqId, key, val);
10611060

10621061
ctx.discovery().sendCustomEvent(msg);
10631062

10641063
return fut;
10651064
}
10661065

10671066
/**
1068-
* Basically the same as {@link #startWrite(String, byte[])} but for CAS operations.
1067+
* Basically the same as {@link #startWrite(String, Serializable)} but for CAS operations.
10691068
*/
1070-
private GridFutureAdapter<Boolean> startCas(String key, byte[] expValBytes, byte[] newValBytes)
1069+
private GridFutureAdapter<Boolean> startCas(String key, @Nullable Serializable expVal, @Nullable Serializable newVal)
10711070
throws IgniteCheckedException {
10721071
UUID reqId = UUID.randomUUID();
10731072

@@ -1076,7 +1075,7 @@ private GridFutureAdapter<Boolean> startCas(String key, byte[] expValBytes, byte
10761075
if (fut.isDone())
10771076
return fut;
10781077

1079-
DiscoveryCustomMessage msg = new DistributedMetaStorageCasMessage(reqId, key, expValBytes, newValBytes);
1078+
DiscoveryCustomMessage msg = new DistributedMetaStorageCasMessage(reqId, key, expVal, newVal);
10801079

10811080
ctx.discovery().sendCustomEvent(msg);
10821081

@@ -1318,9 +1317,7 @@ private void completeCas(
13181317

13191318
Serializable oldVal = bridge.read(msg.key());
13201319

1321-
Serializable expVal = unmarshal(marshaller, msg.expectedValue());
1322-
1323-
if (!Objects.deepEquals(oldVal, expVal)) {
1320+
if (!Objects.deepEquals(oldVal, msg.expectedValue())) {
13241321
msg.setMatches(false);
13251322

13261323
// Do nothing if expected value doesn't match with the actual one.

modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateMessage.java

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,22 @@
1717

1818
package org.apache.ignite.internal.processors.metastorage.persistence;
1919

20+
import java.io.Serializable;
2021
import java.util.UUID;
22+
import org.apache.ignite.IgniteCheckedException;
2123
import org.apache.ignite.internal.Order;
2224
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
2325
import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
2426
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
2527
import org.apache.ignite.internal.util.typedef.internal.S;
28+
import org.apache.ignite.internal.util.typedef.internal.U;
2629
import org.apache.ignite.lang.IgniteUuid;
27-
import org.apache.ignite.plugin.extensions.communication.Message;
30+
import org.apache.ignite.marshaller.Marshaller;
31+
import org.apache.ignite.plugin.extensions.communication.MarshallableMessage;
2832
import org.jetbrains.annotations.Nullable;
2933

3034
/** */
31-
public class DistributedMetaStorageUpdateMessage implements DiscoveryCustomMessage, Message {
35+
public class DistributedMetaStorageUpdateMessage implements DiscoveryCustomMessage, MarshallableMessage {
3236
/** */
3337
private static final long serialVersionUID = 0L;
3438

@@ -46,7 +50,10 @@ public class DistributedMetaStorageUpdateMessage implements DiscoveryCustomMessa
4650
@Order(2)
4751
String key;
4852

49-
/** TODO: revise the external serialization https://issues.apache.org/jira/browse/IGNITE-28058. */
53+
/** */
54+
private @Nullable Serializable val;
55+
56+
/** */
5057
@Order(3)
5158
byte[] valBytes;
5259

@@ -56,24 +63,19 @@ public DistributedMetaStorageUpdateMessage() {
5663
}
5764

5865
/** */
59-
public DistributedMetaStorageUpdateMessage(UUID reqId, String key, byte[] valBytes) {
66+
public DistributedMetaStorageUpdateMessage(UUID reqId, String key, @Nullable Serializable val) {
6067
id = IgniteUuid.randomUuid();
6168

6269
this.reqId = reqId;
6370
this.key = key;
64-
this.valBytes = valBytes;
71+
this.val = val;
6572
}
6673

6774
/** {@inheritDoc} */
6875
@Override public IgniteUuid id() {
6976
return id;
7077
}
7178

72-
/** */
73-
public UUID requestId() {
74-
return reqId;
75-
}
76-
7779
/** */
7880
public String key() {
7981
return key;
@@ -94,6 +96,17 @@ public byte[] value() {
9496
return true;
9597
}
9698

99+
/** {@inheritDoc} */
100+
@Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
101+
if (val != null)
102+
valBytes = U.marshal(marsh, val);
103+
}
104+
105+
/** {@inheritDoc} */
106+
@Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException {
107+
if (valBytes != null)
108+
val = U.unmarshal(marsh, valBytes, clsLdr);
109+
}
97110

98111
/** {@inheritDoc} */
99112
@Override public String toString() {

0 commit comments

Comments
 (0)