|
14 | 14 | import java.util.function.Consumer; |
15 | 15 | import java.util.logging.Logger; |
16 | 16 |
|
| 17 | +import io.opentelemetry.api.trace.Span; |
| 18 | +import io.opentelemetry.api.trace.StatusCode; |
17 | 19 | import org.hyperledger.fabric.Logging; |
18 | 20 | import org.hyperledger.fabric.protos.peer.ChaincodeShim.ChaincodeMessage; |
19 | 21 | import org.hyperledger.fabric.protos.peer.ChaincodeShim.ChaincodeMessage.Type; |
|
22 | 24 |
|
23 | 25 | import com.google.protobuf.ByteString; |
24 | 26 | import com.google.protobuf.InvalidProtocolBufferException; |
| 27 | +import org.hyperledger.fabric.traces.Traces; |
25 | 28 |
|
26 | 29 | /** |
27 | 30 | * A 'Callable' implementation the has the job of invoking the chaincode, and |
28 | 31 | * matching the response and requests. |
29 | | - * |
30 | 32 | */ |
31 | 33 | public class ChaincodeInvocationTask implements Callable<ChaincodeMessage> { |
32 | 34 |
|
@@ -76,55 +78,70 @@ public ChaincodeInvocationTask(final ChaincodeMessage message, final Type type, |
76 | 78 | public ChaincodeMessage call() { |
77 | 79 | ChaincodeMessage finalResponseMessage; |
78 | 80 |
|
| 81 | + Span span = null; |
79 | 82 | try { |
80 | | - perfLogger.fine(() -> "> task:start TX::" + this.txId); |
81 | | - |
82 | | - // A key interface for the chaincode's invoke() method implementation |
83 | | - // is the 'ChaincodeStub' interface. An instance of this is created |
84 | | - // per transaction invocation. |
85 | | - // |
86 | | - // This needs to be passed the message triggering the invoke, as well |
87 | | - // as the interface to be used for sending any requests to the peer |
88 | | - final ChaincodeStub stub = new InvocationStubImpl(message, this); |
89 | | - |
90 | | - // result is what will be sent to the peer as a response to this invocation |
91 | | - final Chaincode.Response result; |
92 | | - |
93 | | - |
94 | | - perfLogger.fine(() -> "> task:invoke TX::" + this.txId); |
95 | | - // Call chaincode's invoke |
96 | | - // Note in Fabric v2, there won't be any INIT |
97 | | - if (this.type.equals(Type.INIT)) { |
98 | | - result = chaincode.init(stub); |
99 | | - } else { |
100 | | - result = chaincode.invoke(stub); |
101 | | - } |
102 | | - |
103 | | - perfLogger.fine(() -> "< task:invoke TX::" + this.txId); |
104 | | - |
105 | | - if (result.getStatus().getCode() >= Chaincode.Response.Status.INTERNAL_SERVER_ERROR.getCode()) { |
106 | | - // Send ERROR with entire result.Message as payload |
107 | | - logger.severe(() -> String.format("[%-8.8s] Invoke failed with error code %d. Sending %s", |
108 | | - message.getTxid(), result.getStatus().getCode(), ERROR)); |
| 83 | + try { |
| 84 | + perfLogger.fine(() -> "> task:start TX::" + this.txId); |
| 85 | + |
| 86 | + // A key interface for the chaincode's invoke() method implementation |
| 87 | + // is the 'ChaincodeStub' interface. An instance of this is created |
| 88 | + // per transaction invocation. |
| 89 | + // |
| 90 | + // This needs to be passed the message triggering the invoke, as well |
| 91 | + // as the interface to be used for sending any requests to the peer |
| 92 | + final ChaincodeStub stub = new InvocationStubImpl(message, this); |
| 93 | + |
| 94 | + span = Traces.getProvider().createSpan(stub); |
| 95 | + // result is what will be sent to the peer as a response to this invocation |
| 96 | + final Chaincode.Response result; |
| 97 | + |
| 98 | + |
| 99 | + perfLogger.fine(() -> "> task:invoke TX::" + this.txId); |
| 100 | + |
| 101 | + // Call chaincode's invoke |
| 102 | + // Note in Fabric v2, there won't be any INIT |
| 103 | + if (this.type.equals(Type.INIT)) { |
| 104 | + result = chaincode.init(stub); |
| 105 | + } else { |
| 106 | + result = chaincode.invoke(stub); |
| 107 | + } |
| 108 | + |
| 109 | + perfLogger.fine(() -> "< task:invoke TX::" + this.txId); |
| 110 | + |
| 111 | + if (result.getStatus().getCode() >= Chaincode.Response.Status.INTERNAL_SERVER_ERROR.getCode()) { |
| 112 | + // Send ERROR with entire result.Message as payload |
| 113 | + logger.severe(() -> String.format("[%-8.8s] Invoke failed with error code %d. Sending %s", |
| 114 | + message.getTxid(), result.getStatus().getCode(), ERROR)); |
| 115 | + finalResponseMessage = ChaincodeMessageFactory.newErrorEventMessage(message.getChannelId(), |
| 116 | + message.getTxid(), result.getMessage(), stub.getEvent()); |
| 117 | + if (span != null) { |
| 118 | + span.setStatus(StatusCode.ERROR, result.getMessage()); |
| 119 | + } |
| 120 | + } else { |
| 121 | + // Send COMPLETED with entire result as payload |
| 122 | + logger.fine(() -> String.format("[%-8.8s] Invoke succeeded. Sending %s", message.getTxid(), COMPLETED)); |
| 123 | + finalResponseMessage = ChaincodeMessageFactory.newCompletedEventMessage(message.getChannelId(), |
| 124 | + message.getTxid(), result, stub.getEvent()); |
| 125 | + } |
| 126 | + |
| 127 | + } catch (InvalidProtocolBufferException | RuntimeException e) { |
| 128 | + logger.severe(() -> String.format("[%-8.8s] Invoke failed. Sending %s: %s", message.getTxid(), ERROR, e)); |
109 | 129 | finalResponseMessage = ChaincodeMessageFactory.newErrorEventMessage(message.getChannelId(), |
110 | | - message.getTxid(), result.getMessage(), stub.getEvent()); |
111 | | - } else { |
112 | | - // Send COMPLETED with entire result as payload |
113 | | - logger.fine(() -> String.format("[%-8.8s] Invoke succeeded. Sending %s", message.getTxid(), COMPLETED)); |
114 | | - finalResponseMessage = ChaincodeMessageFactory.newCompletedEventMessage(message.getChannelId(), |
115 | | - message.getTxid(), result, stub.getEvent()); |
| 130 | + message.getTxid(), e); |
| 131 | + if (span != null) { |
| 132 | + span.setStatus(StatusCode.ERROR, e.getMessage()); |
| 133 | + } |
116 | 134 | } |
117 | 135 |
|
118 | | - } catch (InvalidProtocolBufferException | RuntimeException e) { |
119 | | - logger.severe(() -> String.format("[%-8.8s] Invoke failed. Sending %s: %s", message.getTxid(), ERROR, e)); |
120 | | - finalResponseMessage = ChaincodeMessageFactory.newErrorEventMessage(message.getChannelId(), |
121 | | - message.getTxid(), e); |
| 136 | + // send the final response message to the peer |
| 137 | + outgoingMessageConsumer.accept(finalResponseMessage); |
| 138 | + perfLogger.fine(() -> "< task:end TX::" + this.txId); |
| 139 | + } finally { |
| 140 | + if (span != null) { |
| 141 | + span.end(); |
| 142 | + } |
122 | 143 | } |
123 | 144 |
|
124 | | - // send the final response message to the peer |
125 | | - outgoingMessageConsumer.accept(finalResponseMessage); |
126 | | - perfLogger.fine(() -> "< task:end TX::" + this.txId); |
127 | | - |
128 | 145 | return null; |
129 | 146 | } |
130 | 147 |
|
|
0 commit comments