Skip to content

Commit 44bffbf

Browse files
committed
IGNITE-28046 Use MessageSerializer for MetadataUpdateProposedMessage
1 parent f574cd3 commit 44bffbf

4 files changed

Lines changed: 55 additions & 29 deletions

File tree

modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@
5757
import org.apache.ignite.internal.processors.cache.binary.MetadataRemoveProposedMessageSerializer;
5858
import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateAcceptedMessage;
5959
import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateAcceptedMessageSerializer;
60+
import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessage;
61+
import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessageMarshallableSerializer;
6062
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
6163
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeIdSerializer;
6264
import org.apache.ignite.internal.processors.cache.persistence.snapshot.DataStreamerUpdatesHandlerResult;
@@ -379,5 +381,6 @@ public DiscoveryMessageFactory(Marshaller marsh, ClassLoader clsLdr) {
379381
factory.register(540, StartRoutineDiscoveryMessage::new, new StartRoutineDiscoveryMessageSerializer());
380382
factory.register(541, StartRoutineAckDiscoveryMessage::new, new StartRoutineAckDiscoveryMessageSerializer());
381383
factory.register(542, StartRoutineDiscoveryMessageV2::new, new StartRoutineDiscoveryMessageV2Serializer());
384+
factory.register(543, MetadataUpdateProposedMessage::new, new MetadataUpdateProposedMessageMarshallableSerializer(marsh, clsLdr));
382385
}
383386
}

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -550,7 +550,7 @@ private final class MetadataUpdateProposedListener implements CustomEventListene
550550
//coordinator receives update request
551551
if (metaVerInfo != null) {
552552
if (metaVerInfo.removing()) {
553-
msg.markRejected(new BinaryObjectException("The type is removing now [typeId=" + typeId + ']'));
553+
msg.markRejected("The type is removing now [typeId=" + typeId + ']');
554554

555555
pendingVer = REMOVED_VERSION;
556556
acceptedVer = REMOVED_VERSION;
@@ -589,7 +589,7 @@ private final class MetadataUpdateProposedListener implements CustomEventListene
589589
catch (BinaryObjectException err) {
590590
log.warning("Exception with merging metadata for typeId: " + typeId, err);
591591

592-
msg.markRejected(err);
592+
msg.markRejected(err.getMessage());
593593
}
594594
}
595595
}
@@ -602,7 +602,7 @@ private final class MetadataUpdateProposedListener implements CustomEventListene
602602
MetadataUpdateResultFuture fut = unlabeledFutures.poll();
603603

604604
if (msg.rejected())
605-
fut.onDone(MetadataUpdateResult.createFailureResult(msg.rejectionError()));
605+
fut.onDone(MetadataUpdateResult.createFailureResult(msg.rejectionErrorMessage()));
606606
else {
607607
if (clientNode) {
608608
boolean success = casBinaryMetadata(typeId, new BinaryMetadataVersionInfo(msg.metadata(), pendingVer, acceptedVer));

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java

Lines changed: 49 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,25 @@
1717
package org.apache.ignite.internal.processors.cache.binary;
1818

1919
import java.util.UUID;
20-
import org.apache.ignite.binary.BinaryObjectException;
20+
import org.apache.ignite.IgniteCheckedException;
21+
import org.apache.ignite.internal.Order;
2122
import org.apache.ignite.internal.binary.BinaryMetadata;
2223
import org.apache.ignite.internal.binary.BinaryMetadataHandler;
2324
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
2425
import org.apache.ignite.internal.util.typedef.internal.S;
26+
import org.apache.ignite.internal.util.typedef.internal.U;
2527
import org.apache.ignite.lang.IgniteUuid;
28+
import org.apache.ignite.marshaller.Marshaller;
29+
import org.apache.ignite.plugin.extensions.communication.MarshallableMessage;
2630
import org.jetbrains.annotations.Nullable;
2731

2832
/**
2933
* <b>MetadataUpdateProposedMessage</b> and {@link MetadataUpdateAcceptedMessage} messages make a basis for
3034
* discovery-based protocol for exchanging {@link BinaryMetadata metadata} describing objects in binary format stored in Ignite caches.
31-
*
35+
* <p>
3236
* All interactions with binary metadata are performed through {@link BinaryMetadataHandler}
3337
* interface implemented in {@link CacheObjectBinaryProcessorImpl} processor.
34-
*
38+
* <p>
3539
* Protocol works as follows:
3640
* <ol>
3741
* <li>
@@ -67,33 +71,49 @@
6771
* it gets blocked until {@link MetadataUpdateAcceptedMessage} arrives with <b>accepted version</b>
6872
* equals to <b>pending version</b> of this metadata to the moment when is was initially read by the thread.
6973
*/
70-
public final class MetadataUpdateProposedMessage implements DiscoveryCustomMessage {
74+
public final class MetadataUpdateProposedMessage implements DiscoveryCustomMessage, MarshallableMessage {
7175
/** */
7276
private static final long serialVersionUID = 0L;
7377

7478
/** */
75-
private final IgniteUuid id = IgniteUuid.randomUuid();
79+
@Order(0)
80+
IgniteUuid id;
7681

7782
/** Node UUID which initiated metadata update. */
78-
private final UUID origNodeId;
83+
@Order(1)
84+
UUID origNodeId;
7985

8086
/** */
8187
private BinaryMetadata metadata;
8288

89+
/** Serialized {@link #metadata}. */
90+
@Order(2)
91+
byte[] metadataBytes;
92+
8393
/** Metadata type id. */
84-
private final int typeId;
94+
@Order(3)
95+
int typeId;
8596

8697
/** Metadata version which is pending for update. */
87-
private int pendingVer;
98+
@Order(4)
99+
int pendingVer;
88100

89101
/** Metadata version which is already accepted by entire cluster. */
90-
private int acceptedVer;
102+
@Order(5)
103+
int acceptedVer;
91104

92105
/** Message acceptance status. */
93-
private ProposalStatus status = ProposalStatus.SUCCESSFUL;
106+
@Order(6)
107+
boolean rejected;
94108

95109
/** */
96-
private BinaryObjectException err;
110+
@Order(7)
111+
String errMsg;
112+
113+
/** Constructor. */
114+
public MetadataUpdateProposedMessage() {
115+
// No-op.
116+
}
97117

98118
/**
99119
* @param metadata {@link BinaryMetadata} requested to be updated.
@@ -103,6 +123,7 @@ public MetadataUpdateProposedMessage(BinaryMetadata metadata, UUID origNodeId) {
103123
assert origNodeId != null;
104124
assert metadata != null;
105125

126+
id = IgniteUuid.randomUuid();
106127
this.origNodeId = origNodeId;
107128

108129
this.metadata = metadata;
@@ -120,7 +141,7 @@ public MetadataUpdateProposedMessage(BinaryMetadata metadata, UUID origNodeId) {
120141
* {@inheritDoc}
121142
*/
122143
@Nullable @Override public DiscoveryCustomMessage ackMessage() {
123-
return (status == ProposalStatus.SUCCESSFUL) ? new MetadataUpdateAcceptedMessage(typeId, pendingVer) : null;
144+
return !rejected ? new MetadataUpdateAcceptedMessage(typeId, pendingVer) : null;
124145
}
125146

126147
/**
@@ -131,25 +152,25 @@ public MetadataUpdateProposedMessage(BinaryMetadata metadata, UUID origNodeId) {
131152
}
132153

133154
/**
134-
* @param err Error caused this update to be rejected.
155+
* @param errMsg Error message caused this update to be rejected.
135156
*/
136-
void markRejected(BinaryObjectException err) {
137-
status = ProposalStatus.REJECTED;
138-
this.err = err;
157+
void markRejected(String errMsg) {
158+
rejected = true;
159+
this.errMsg = errMsg;
139160
}
140161

141162
/**
142163
*
143164
*/
144165
boolean rejected() {
145-
return status == ProposalStatus.REJECTED;
166+
return rejected;
146167
}
147168

148169
/**
149170
*
150171
*/
151-
BinaryObjectException rejectionError() {
152-
return err;
172+
String rejectionErrorMessage() {
173+
return errMsg;
153174
}
154175

155176
/**
@@ -208,13 +229,16 @@ public int typeId() {
208229
return typeId;
209230
}
210231

211-
/** Message acceptance status. */
212-
private enum ProposalStatus {
213-
/** */
214-
SUCCESSFUL,
232+
/** {@inheritDoc} */
233+
@Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
234+
if (metadata != null)
235+
metadataBytes = U.marshal(marsh, metadata);
236+
}
215237

216-
/** */
217-
REJECTED
238+
/** {@inheritDoc} */
239+
@Override public void finishUnmarshal(Marshaller marsh, ClassLoader ldr) throws IgniteCheckedException {
240+
if (metadataBytes != null)
241+
metadata = U.unmarshal(marsh, metadataBytes, ldr);
218242
}
219243

220244
/** {@inheritDoc} */

modules/core/src/main/resources/META-INF/classnames.properties

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1030,7 +1030,6 @@ org.apache.ignite.internal.processors.cache.binary.MetadataRequestMessage
10301030
org.apache.ignite.internal.processors.cache.binary.MetadataResponseMessage
10311031
org.apache.ignite.internal.processors.cache.binary.MetadataUpdateAcceptedMessage
10321032
org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessage
1033-
org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessage$ProposalStatus
10341033
org.apache.ignite.internal.processors.cache.binary.MetadataUpdateResult$ResultType
10351034
org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager$BlockSetCallable
10361035
org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager$QueueHeaderPredicate

0 commit comments

Comments
 (0)