|
1 | 1 | package org.zstack.compute.vm; |
2 | 2 |
|
| 3 | +import org.apache.logging.log4j.LogManager; |
| 4 | +import org.apache.logging.log4j.Logger; |
3 | 5 | import org.apache.logging.log4j.ThreadContext; |
4 | 6 | import org.springframework.beans.factory.annotation.Autowire; |
5 | 7 | import org.springframework.beans.factory.annotation.Autowired; |
6 | 8 | import org.springframework.beans.factory.annotation.Configurable; |
7 | | -import org.zstack.core.Platform; |
8 | 9 | import org.zstack.core.cloudbus.CloudBus; |
9 | 10 | import org.zstack.core.cloudbus.CloudBusCallBack; |
10 | 11 | import org.zstack.core.db.DatabaseFacade; |
| 12 | +import org.zstack.core.db.Q; |
| 13 | +import org.zstack.core.thread.ThreadFacade; |
11 | 14 | import org.zstack.header.Constants; |
12 | 15 | import org.zstack.header.core.Completion; |
13 | 16 | import org.zstack.header.core.ReturnValueCompletion; |
14 | | -import org.zstack.header.longjob.LongJobErrors; |
15 | | -import org.zstack.header.longjob.LongJobFor; |
16 | | -import org.zstack.header.longjob.LongJobVO; |
| 17 | +import org.zstack.header.errorcode.ErrorCode; |
| 18 | +import org.zstack.header.longjob.*; |
17 | 19 | import org.zstack.header.message.APIEvent; |
18 | 20 | import org.zstack.header.message.MessageReply; |
19 | 21 | import org.zstack.header.vm.*; |
20 | | -import org.zstack.header.longjob.LongJob; |
21 | | -import org.zstack.longjob.LongJobUtils; |
| 22 | +import org.zstack.header.volume.GetVolumeTaskMsg; |
| 23 | +import org.zstack.header.volume.GetVolumeTaskReply; |
| 24 | +import org.zstack.header.volume.VolumeConstant; |
| 25 | +import org.zstack.header.volume.VolumeVO; |
| 26 | +import org.zstack.header.volume.VolumeVO_; |
22 | 27 | import org.zstack.utils.gson.JSONObjectUtil; |
23 | 28 |
|
24 | | -import static org.zstack.core.Platform.err; |
| 29 | +import java.util.*; |
| 30 | +import java.util.concurrent.TimeUnit; |
| 31 | + |
25 | 32 | import static org.zstack.core.Platform.operr; |
26 | 33 |
|
27 | 34 |
|
|
31 | 38 | @LongJobFor(APIMigrateVmMsg.class) |
32 | 39 | @Configurable(preConstruction = true, autowire = Autowire.BY_TYPE) |
33 | 40 | public class MigrateVmLongJob implements LongJob { |
| 41 | + private static final Logger logger = LogManager.getLogger(MigrateVmLongJob.class); |
| 42 | + private static final int WAIT_CHAIN_TASK_EXIT_MAX_RETRIES = 30; |
| 43 | + private static final long WAIT_CHAIN_TASK_EXIT_INTERVAL_SECS = 1; |
| 44 | + |
34 | 45 | @Autowired |
35 | 46 | protected CloudBus bus; |
36 | 47 | @Autowired |
37 | 48 | protected DatabaseFacade dbf; |
| 49 | + @Autowired |
| 50 | + private ThreadFacade thdf; |
38 | 51 |
|
39 | 52 | protected String auditResourceUuid; |
40 | 53 |
|
41 | 54 | @Override |
42 | 55 | public void start(LongJobVO job, ReturnValueCompletion<APIEvent> completion) { |
43 | 56 | MigrateVmInnerMsg msg = JSONObjectUtil.toObject(job.getJobData(), MigrateVmInnerMsg.class); |
| 57 | + |
| 58 | + List<String> backupTaskLongJobUuids = getBackupTaskLongJobUuids(job.getJobData()); |
| 59 | + if (backupTaskLongJobUuids != null && !backupTaskLongJobUuids.isEmpty()) { |
| 60 | + logger.info(String.format("migrate vm[uuid:%s] longjob has %d backup longjobs to cancel first", |
| 61 | + msg.getVmInstanceUuid(), backupTaskLongJobUuids.size())); |
| 62 | + cancelBackupLongJobsThenMigrate(backupTaskLongJobUuids, msg, completion); |
| 63 | + } else { |
| 64 | + doMigrate(msg, completion); |
| 65 | + } |
| 66 | + } |
| 67 | + |
| 68 | + private List<String> getBackupTaskLongJobUuids(String jobData) { |
| 69 | + Map<String, Object> raw = JSONObjectUtil.toObject(jobData, LinkedHashMap.class); |
| 70 | + Object uuids = raw == null ? null : raw.get("backupTaskLongJobUuids"); |
| 71 | + if (!(uuids instanceof List<?>)) { |
| 72 | + return null; |
| 73 | + } |
| 74 | + |
| 75 | + List<String> result = new ArrayList<>(); |
| 76 | + for (Object item : (List<?>) uuids) { |
| 77 | + if (item == null) { |
| 78 | + continue; |
| 79 | + } |
| 80 | + String uuid = String.valueOf(item).trim(); |
| 81 | + if (!uuid.isEmpty()) { |
| 82 | + result.add(uuid); |
| 83 | + } |
| 84 | + } |
| 85 | + return result.isEmpty() ? null : result; |
| 86 | + } |
| 87 | + |
| 88 | + private void cancelBackupLongJobsThenMigrate(List<String> backupTaskLongJobUuids, |
| 89 | + MigrateVmInnerMsg msg, |
| 90 | + ReturnValueCompletion<APIEvent> completion) { |
| 91 | + cancelBackupLongJobs(backupTaskLongJobUuids.iterator(), new Completion(completion) { |
| 92 | + @Override |
| 93 | + public void success() { |
| 94 | + waitForVolumeChainTasksExit(msg.getVmInstanceUuid(), WAIT_CHAIN_TASK_EXIT_MAX_RETRIES, |
| 95 | + new Completion(completion) { |
| 96 | + @Override |
| 97 | + public void success() { |
| 98 | + doMigrate(msg, completion); |
| 99 | + } |
| 100 | + |
| 101 | + @Override |
| 102 | + public void fail(ErrorCode errorCode) { |
| 103 | + completion.fail(errorCode); |
| 104 | + } |
| 105 | + }); |
| 106 | + } |
| 107 | + |
| 108 | + @Override |
| 109 | + public void fail(ErrorCode errorCode) { |
| 110 | + logger.warn(String.format("failed to cancel backup longjobs for vm[uuid:%s], " + |
| 111 | + "attempting migration anyway: %s", msg.getVmInstanceUuid(), errorCode)); |
| 112 | + doMigrate(msg, completion); |
| 113 | + } |
| 114 | + }); |
| 115 | + } |
| 116 | + |
| 117 | + private void cancelBackupLongJobs(Iterator<String> it, Completion completion) { |
| 118 | + if (!it.hasNext()) { |
| 119 | + completion.success(); |
| 120 | + return; |
| 121 | + } |
| 122 | + |
| 123 | + String longJobUuid = it.next(); |
| 124 | + CancelLongJobMsg cmsg = new CancelLongJobMsg(); |
| 125 | + cmsg.setUuid(longJobUuid); |
| 126 | + bus.makeLocalServiceId(cmsg, LongJobConstants.SERVICE_ID); |
| 127 | + bus.send(cmsg, new CloudBusCallBack(completion) { |
| 128 | + @Override |
| 129 | + public void run(MessageReply reply) { |
| 130 | + if (!reply.isSuccess()) { |
| 131 | + logger.warn(String.format("failed to cancel backup longjob[uuid:%s]: %s", |
| 132 | + longJobUuid, reply.getError())); |
| 133 | + } |
| 134 | + cancelBackupLongJobs(it, completion); |
| 135 | + } |
| 136 | + }); |
| 137 | + } |
| 138 | + |
| 139 | + private void waitForVolumeChainTasksExit(String vmUuid, int retriesLeft, Completion completion) { |
| 140 | + List<String> volUuids = Q.New(VolumeVO.class) |
| 141 | + .eq(VolumeVO_.vmInstanceUuid, vmUuid) |
| 142 | + .select(VolumeVO_.uuid) |
| 143 | + .listValues(); |
| 144 | + |
| 145 | + if (volUuids.isEmpty()) { |
| 146 | + completion.success(); |
| 147 | + return; |
| 148 | + } |
| 149 | + |
| 150 | + GetVolumeTaskMsg gmsg = new GetVolumeTaskMsg(); |
| 151 | + gmsg.setVolumeUuids(volUuids); |
| 152 | + bus.makeLocalServiceId(gmsg, VolumeConstant.SERVICE_ID); |
| 153 | + bus.send(gmsg, new CloudBusCallBack(completion) { |
| 154 | + @Override |
| 155 | + public void run(MessageReply reply) { |
| 156 | + if (!reply.isSuccess()) { |
| 157 | + completion.fail(reply.getError()); |
| 158 | + return; |
| 159 | + } |
| 160 | + |
| 161 | + GetVolumeTaskReply gr = reply.castReply(); |
| 162 | + boolean hasRunningTasks = gr.getResults().values().stream() |
| 163 | + .anyMatch(info -> !info.getRunningTask().isEmpty()); |
| 164 | + |
| 165 | + if (!hasRunningTasks) { |
| 166 | + completion.success(); |
| 167 | + return; |
| 168 | + } |
| 169 | + |
| 170 | + if (retriesLeft <= 0) { |
| 171 | + completion.fail(operr( |
| 172 | + "timeout waiting for volume backup chain tasks to exit for vm[uuid:%s]", vmUuid)); |
| 173 | + return; |
| 174 | + } |
| 175 | + |
| 176 | + logger.debug(String.format( |
| 177 | + "volumes of vm[uuid:%s] still have running tasks, retry in %ds (retries left: %d)", |
| 178 | + vmUuid, WAIT_CHAIN_TASK_EXIT_INTERVAL_SECS, retriesLeft)); |
| 179 | + thdf.submitTimeoutTask( |
| 180 | + () -> waitForVolumeChainTasksExit(vmUuid, retriesLeft - 1, completion), |
| 181 | + TimeUnit.SECONDS, WAIT_CHAIN_TASK_EXIT_INTERVAL_SECS); |
| 182 | + } |
| 183 | + }); |
| 184 | + } |
| 185 | + |
| 186 | + private void doMigrate(MigrateVmInnerMsg msg, ReturnValueCompletion<APIEvent> completion) { |
44 | 187 | bus.makeTargetServiceIdByResourceUuid(msg, VmInstanceConstant.SERVICE_ID, msg.getVmInstanceUuid()); |
45 | 188 | bus.send(msg, new CloudBusCallBack(completion) { |
46 | 189 | @Override |
|
0 commit comments