diff --git a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java index 69c6117..2163df5 100644 --- a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java +++ b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java @@ -57,36 +57,48 @@ public class PacketsProcessor implements RpcProcessor(); resp.setSuccess(true); - rpcCtx.sendResponse(resp); + rpcCtx.sendResponse(resp); // 因为jraft是异步. 收到数据直接返回 - var ss = StateFactory.getStateServer(); - log.debug("{} handler request.packets.size(): {}", StateFactory.getServerId(), request.packets.size()); + try { + log.debug("{} handler request.packets.size(): {}", StateFactory.getServerId(), request.packets.size()); + // TODO: request.packets 入库,回填, 告警 等操作 - // 读状态 Closure 里的 getValue为 State的状态 - ss.readIndexState(new GenericClosure() { + + + } finally { // 确保 更新 最终的任务状态给master. + + // 读状态 Closure 里的 getValue为 State的状态 + StateFactory.readIndexState(new GenericClosure() { + + @Override + public void run(Status status) { + + if (!status.isOk()) { + log.error("失败 readIndexState {}", status); + } + + // readIndexState 失败后也需要直接 更新自己状态 - @Override - public void run(Status status) { - - if (status.isOk()) { var state = this.getValue(); // 获取返回的状态 var ws = state.getWorkers().get(StateFactory.getServerId()); - ws.setTaskQueueSize(ws.getTaskQueueSize() - request.packets.size()); + ws.setTaskQueueSize(ws.getTaskQueueSize() - request.packets.size()); // 减少 现有的任务数量 ws.setUpdateAt(Instant.now()); // 设置更新时间 + + Operate.CallOperate(new Operate(OperateType.PUT_WORKERSTATE, ws), + new GenericClosure() { + @Override + public void run(Status status) { + if (!status.isOk()) { + log.error("CallOperate [{}] {}", StateFactory.getServerId(), resp); + } + } + }); - // log.debug("workerState taskQueueSize: {} psize: {} state {}", ws.getTaskQueueSize(), - // request.packets.size(), state.getWorkers().size()); - Operate.CallOperate(new Operate(OperateType.PUT_WORKERSTATE, ws), new GenericClosure() { - @Override - public void run(Status status) { - if (status.isOk()) { - log.info("[{}] {}", StateFactory.getServerId(), resp); - } - } - }); } - } - }); + }); + } + ; + } @Override diff --git a/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java b/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java index d385dd2..976af37 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java @@ -3,6 +3,8 @@ package com.yuandian.dataflow.statemachine.operate; import java.io.Serializable; import com.alipay.sofa.jraft.Status; +import com.alipay.sofa.jraft.error.RaftError; +import com.alipay.sofa.jraft.error.RaftException; import com.alipay.sofa.jraft.error.RemotingException; import com.alipay.sofa.jraft.rpc.InvokeCallback; import com.yuandian.dataflow.statemachine.StateFactory; @@ -77,8 +79,8 @@ public class Operate implements Serializable { @Override public void complete(Object result, Throwable err) { - log.info("Object result {}", result); - //TODO: 解决回调的次序问题 + log.debug("Object result {}", result); + var resp = (RaftResponse) result; closure.setResponse(resp); closure.success(resp.getValue()); @@ -87,7 +89,8 @@ public class Operate implements Serializable { }, 5000); } catch (InterruptedException | RemotingException e) { - closure.failure("failure", null); + closure.failure(e.getMessage(), null); + closure.run(new Status(100000, "invokeAsync fail")); log.info("{}", e.toString()); }