Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@
import org.zstack.cbd.kvm.CbdVolumeTo;
import org.zstack.compute.host.HostGlobalConfig;
import org.zstack.core.CoreGlobalProperty;
import org.zstack.core.ansible.AnsibleGlobalProperty;
import org.zstack.core.asyncbatch.While;
import org.zstack.core.cloudbus.CloudBus;
import org.zstack.core.cloudbus.CloudBusCallBack;
import org.zstack.core.db.DatabaseFacade;
import org.zstack.core.db.Q;
import org.zstack.core.db.SQL;
import org.zstack.core.workflow.FlowChainBuilder;
import org.zstack.core.workflow.ShareFlow;
Expand Down Expand Up @@ -99,6 +101,7 @@ public class ZbsStorageController implements PrimaryStorageControllerSvc, Primar
public static final String ROLLBACK_SNAPSHOT_PATH = "/zbs/primarystorage/snapshot/rollback";
public static final String CHECK_HOST_STORAGE_CONNECTION_PATH = "/zbs/primarystorage/check/host/connection";
public static final String GET_VOLUME_CLIENTS_PATH = "/zbs/primarystorage/volume/clients";
public static final String UPDATE_HOST_DEPENDENCY_PATH = "/zbs/primarystorage/host/updatedependency";

private static final StorageCapabilities capabilities = new StorageCapabilities();

Expand Down Expand Up @@ -187,28 +190,85 @@ public List<String> getActiveVolumesLocation(HostInventory h) {

@Override
public void deployClient(HostInventory h, Completion comp) {
KVMHostVO host = org.zstack.core.db.Q.New(KVMHostVO.class).eq(KVMHostVO_.uuid, h.getUuid()).find();
if (host == null) {
comp.fail(operr("cannot found kvm host[uuid:%s], unable to deploy client", h.getUuid()));
return;
}

DeployClientCmd cmd = new DeployClientCmd();
cmd.setIp(h.getManagementIp());
cmd.setPort(host.getPort());
cmd.setUsername(host.getUsername());
cmd.setPassword(host.getPassword());
httpCall(DEPLOY_CLIENT_PATH, cmd, DeployClientRsp.class, new ReturnValueCompletion<DeployClientRsp>(comp) {
FlowChain chain = FlowChainBuilder.newShareFlowChain();
chain.setName(String.format("deploy-zbs-client-on-host-%s", h.getUuid()));
chain.then(new ShareFlow() {
@Override
public void success(DeployClientRsp returnValue) {
comp.success();
}
public void setup() {
flow(new NoRollbackFlow() {
String __name__ = "deploy-client";

@Override
public void fail(ErrorCode errorCode) {
comp.fail(errorCode);
@Override
public void run(FlowTrigger trigger, Map data) {
KVMHostVO host = Q.New(KVMHostVO.class).eq(KVMHostVO_.uuid, h.getUuid()).find();
if (host == null) {
comp.fail(operr("cannot found kvm host[uuid:%s], unable to deploy client", h.getUuid()));
return;
}
Comment on lines +202 to +207
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

严重问题:FlowChain 触发器未正确处理,会导致流程挂起。

host == null 时,代码调用了 comp.fail() 并返回,但没有调用 trigger.fail()。这会导致 FlowChain 永远无法完成,因为 FlowTrigger 既没有调用 next() 也没有调用 fail()

应用以下修复:

                     public void run(FlowTrigger trigger, Map data) {
                         KVMHostVO host = Q.New(KVMHostVO.class).eq(KVMHostVO_.uuid, h.getUuid()).find();
                         if (host == null) {
-                            comp.fail(operr("cannot found kvm host[uuid:%s], unable to deploy client", h.getUuid()));
+                            trigger.fail(operr("cannot find kvm host[uuid:%s], unable to deploy client", h.getUuid()));
                             return;
                         }

注意:错误消息中 "cannot found" 应改为 "cannot find"(语法修正)。

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
public void run(FlowTrigger trigger, Map data) {
KVMHostVO host = Q.New(KVMHostVO.class).eq(KVMHostVO_.uuid, h.getUuid()).find();
if (host == null) {
comp.fail(operr("cannot found kvm host[uuid:%s], unable to deploy client", h.getUuid()));
return;
}
public void run(FlowTrigger trigger, Map data) {
KVMHostVO host = Q.New(KVMHostVO.class).eq(KVMHostVO_.uuid, h.getUuid()).find();
if (host == null) {
trigger.fail(operr("cannot find kvm host[uuid:%s], unable to deploy client", h.getUuid()));
return;
}
🤖 Prompt for AI Agents
In plugin/zbs/src/main/java/org/zstack/storage/zbs/ZbsStorageController.java
around lines 202 to 207, the FlowChain trigger isn't notified when host==null
which can hang the flow; replace the current comp.fail(...) + return behavior
with notifying the trigger and the completion: create an error (operr) with
corrected text "cannot find kvm host[uuid:%s], unable to deploy client", call
trigger.fail(thatError) to unblock the FlowChain, then call
comp.fail(theSameError) and return.


DeployClientCmd cmd = new DeployClientCmd();
cmd.setIp(h.getManagementIp());
cmd.setPort(host.getPort());
cmd.setUsername(host.getUsername());
cmd.setPassword(host.getPassword());
httpCall(DEPLOY_CLIENT_PATH, cmd, DeployClientRsp.class, new ReturnValueCompletion<DeployClientRsp>(comp) {
@Override
public void success(DeployClientRsp returnValue) {
trigger.next();;
}

@Override
public void fail(ErrorCode errorCode) {
trigger.fail(errorCode);;
}
});
}
});

flow(new NoRollbackFlow() {
String __name__ = "update-host-client-dependency";

@Override
public void run(FlowTrigger trigger, Map data) {
UpdateHostDependencyCmd cmd = new UpdateHostDependencyCmd();
cmd.updatePackages = "libcbd";
cmd.zstackRepo = AnsibleGlobalProperty.ZSTACK_REPO;

KVMHostAsyncHttpCallMsg msg = new KVMHostAsyncHttpCallMsg();
msg.setCommand(cmd);
msg.setHostUuid(h.getUuid());
msg.setPath(UPDATE_HOST_DEPENDENCY_PATH);
msg.setNoStatusCheck(true);
bus.makeTargetServiceIdByResourceUuid(msg, HostConstant.SERVICE_ID, msg.getHostUuid());
bus.send(msg, new CloudBusCallBack(trigger) {
@Override
public void run(MessageReply reply) {
if (!reply.isSuccess()) {
trigger.fail(reply.getError());
return;
}

trigger.next();
}
});
}
});

done(new FlowDoneHandler(comp) {
@Override
public void handle(Map data) {
comp.success();
}
});

error(new FlowErrorHandler(comp) {
@Override
public void handle(ErrorCode errCode, Map data) {
comp.fail(errCode);
}
});
}
});
}).start();
}

@Override
Expand Down Expand Up @@ -357,7 +417,7 @@ public void fail(ErrorCode errorCode) {
flow(new NoRollbackFlow() {
String __name__ = "deploy-client";

List<PrimaryStorageClusterRefVO> refs = org.zstack.core.db.Q.New(PrimaryStorageClusterRefVO.class)
List<PrimaryStorageClusterRefVO> refs = Q.New(PrimaryStorageClusterRefVO.class)
.eq(PrimaryStorageClusterRefVO_.primaryStorageUuid, self.getUuid())
.list();

Expand All @@ -372,12 +432,12 @@ public void run(FlowTrigger trigger, Map data) {
.map(PrimaryStorageClusterRefVO::getClusterUuid)
.collect(Collectors.toList());

List<HostVO> hosts = org.zstack.core.db.Q.New(HostVO.class)
List<HostVO> hosts = Q.New(HostVO.class)
.in(HostAO_.clusterUuid, clusterUuids)
.list();

new While<>(hosts).each((h, comp) -> {
KVMHostVO host = org.zstack.core.db.Q.New(KVMHostVO.class).eq(KVMHostVO_.uuid, h.getUuid()).find();
KVMHostVO host = Q.New(KVMHostVO.class).eq(KVMHostVO_.uuid, h.getUuid()).find();
if (host == null) {
comp.addError(operr("cannot found kvm host[uuid:%s], unable to deploy client", h.getUuid()));
comp.allDone();
Expand Down Expand Up @@ -1897,6 +1957,14 @@ public void setPath(String path) {
public static class CheckHostStorageConnectionRsp extends AgentResponse {
}

public static class UpdateHostDependencyCmd extends AgentCommand {
public String updatePackages;
public String zstackRepo;
}

public static class UpdateHostDependencyRsp extends AgentResponse {
}

public static class AgentResponse extends ZbsMdsBase.AgentResponse {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ class ExternalPrimaryStorageSpec extends PrimaryStorageSpec {
return rsp
}

simulator(ZbsStorageController.UPDATE_HOST_DEPENDENCY_PATH) { HttpEntity<String> e, EnvSpec spec ->
def rsp = new ZbsStorageController.UpdateHostDependencyRsp()
rsp.success = true

return rsp
}

simulator(ZbsStorageController.GET_FACTS_PATH) { HttpEntity<String> e, EnvSpec spec ->
ZbsStorageController.GetFactsCmd cmd = JSONObjectUtil.toObject(e.body, ZbsStorageController.GetFactsCmd.class)
ExternalPrimaryStorageSpec zspec = spec.specByUuid(cmd.uuid)
Expand Down