From d184fd014ad2276440065d0395f2bc5214115f98 Mon Sep 17 00:00:00 2001 From: huangsimin <474420502@qq.com> Date: Wed, 3 Aug 2022 14:41:23 +0800 Subject: [PATCH] =?UTF-8?q?=E6=89=80=E6=9C=89=E6=B6=89=E8=AF=BB=E5=86=99?= =?UTF-8?q?=E6=93=8D=E4=BD=9C=E5=BF=85=E9=A1=BB=E4=B8=80=E4=BD=93=E5=9C=A8?= =?UTF-8?q?onApply=E4=B8=8A=E5=AE=8C=E6=88=90.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dataflow/controller/MasterProcessor.java | 14 ++++++-------- .../dataflow/controller/PacketsProcessor.java | 14 +++++++------- .../dataflow/statemachine/MasterFactory.java | 3 ++- .../dataflow/statemachine/StateFactory.java | 3 ++- .../dataflow/statemachine/StateMachine.java | 2 +- .../statemachine/master/MasterContext.java | 16 ++++++++++++++++ 6 files changed, 34 insertions(+), 18 deletions(-) diff --git a/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java b/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java index 642c803..bcfabf1 100644 --- a/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java +++ b/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java @@ -65,7 +65,7 @@ public class MasterProcessor implements MasterExecute { log.debug("master({}) execute {} packets: {}", StateFactory.getServerId(), alivePeers, packetsManager.size()); cxt.sleep(5000); } else { - log.debug("master({}) execute {} packets: {}", StateFactory.getServerId(), alivePeers, packetsManager.size()); + // log.debug("master({}) execute {} packets: {}", StateFactory.getServerId(), alivePeers, packetsManager.size()); } @@ -77,12 +77,11 @@ public class MasterProcessor implements MasterExecute { // 等待全部反馈后才能进入下次循环 CountDownLatch latch = new CountDownLatch(alivePeers.size()); // 读一致性 - StateFactory.readIndexState(new GenericClosure() { + - @Override - public void run(Status status) { + var state = StateFactory.getStateServer().getFsm().getState(); // log.debug("masterExecute start {} {}", status, alivePeers); - var state = this.getValue(); + // var state = this.getValue(); if (state == null) { log.error("readIndexState获取的状态为 {}", state); return; @@ -142,7 +141,6 @@ public class MasterProcessor implements MasterExecute { @Override public void complete(Object result, Throwable err) { latch.countDown(); - log.debug("countDown {}", latch.getCount()); if (err != null) { // TODO: 如果错误, 需要让节点恢复任务处理的状态 log.debug("{}", err); @@ -159,9 +157,9 @@ public class MasterProcessor implements MasterExecute { } - } + - }); + try { latch.await(DEFAULT_ASYNC_TIMEOUT, TimeUnit.MILLISECONDS); diff --git a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java index db3041e..aed39b9 100644 --- a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java +++ b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java @@ -58,22 +58,22 @@ public class PacketsProcessor implements RpcProcessor 里的 getValue为 State的状态 - var state = StateFactory.getStateServer().getFsm().getState() ; // 获取返回的状态 - var ws = state.getWorkers().get(StateFactory.getServerId()); - ws.setTaskQueueSize(ws.getTaskQueueSize() - request.packets.size()); // 减少 现有的任务数量 - ws.setUpdateAt(Instant.now()); // 设置更新时间 + // 读状态 Closure 里的 getValue为 State的状态 + var state = StateFactory.getStateServer().getFsm().getState() ; // 获取返回的状态 + var ws = state.getWorkers().get(StateFactory.getServerId()); + ws.setTaskQueueSize(ws.getTaskQueueSize() - request.packets.size()); // 减少 现有的任务数量 + ws.setUpdateAt(Instant.now()); // 设置更新时间 Operate.CallOperate(new Operate(OperateType.PUT_WORKERSTATE, ws), diff --git a/src/main/java/com/yuandian/dataflow/statemachine/MasterFactory.java b/src/main/java/com/yuandian/dataflow/statemachine/MasterFactory.java index e952445..9fde206 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/MasterFactory.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/MasterFactory.java @@ -61,7 +61,8 @@ public class MasterFactory { while (!cxt.getIsExit()) { Instant now = Instant.now(); masterExecuteCls.loop(cxt); - log.info("master loop execute time: {} ms",Duration.between(now, Instant.now()).toMillis()); + cxt.setLastLoopExecuteTime(Duration.between(now, Instant.now())); + // log.info("master loop execute time: {} ms",Duration.between(now, Instant.now()).toMillis()); } } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateFactory.java b/src/main/java/com/yuandian/dataflow/statemachine/StateFactory.java index 0f6f68e..1e5b65d 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateFactory.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateFactory.java @@ -277,8 +277,9 @@ public class StateFactory { ByteBuffer.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(op))); task.setDone(closure); // 确认所有数据 一致, 不需要加锁 StateFactory.getNode().apply(task); + } catch (CodecException e) { - String errorMsg = "Fail to encode WorkerState"; + String errorMsg = "Fail to encode Operate"; log.debug("{} {}", errorMsg, e); closure.failure(errorMsg, PeerId.emptyPeer()); closure.run(new Status(RaftError.EINTERNAL, errorMsg)); diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java index 938ba78..5185095 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java @@ -97,7 +97,7 @@ public class StateMachine extends StateMachineAdapter { break; case GET_STATE: closure.setValue(this.state); - // log.info("GET_STATE value={} at logIndex={}", this.state, iter.getIndex()); + log.info("GET_STATE value={} at logIndex={}", this.state, iter.getIndex()); break; case REMOVE: diff --git a/src/main/java/com/yuandian/dataflow/statemachine/master/MasterContext.java b/src/main/java/com/yuandian/dataflow/statemachine/master/MasterContext.java index 181d38b..d9cd8ba 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/master/MasterContext.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/master/MasterContext.java @@ -1,5 +1,6 @@ package com.yuandian.dataflow.statemachine.master; +import java.time.Duration; import java.util.concurrent.atomic.AtomicBoolean; import lombok.Getter; @@ -9,6 +10,11 @@ import lombok.extern.slf4j.Slf4j; @Slf4j public class MasterContext { private AtomicBoolean isExit = new AtomicBoolean(false); + private Duration lastLoopExecuteTime = Duration.ZERO; + + + + private Object share; public Boolean getIsExit() { @@ -27,6 +33,16 @@ public class MasterContext { this.share = share; } + public void setLastLoopExecuteTime(Duration lastLoopExecuteTime) { + this.lastLoopExecuteTime = lastLoopExecuteTime; + } + + public Duration getLastLoopExecuteTime() { + return lastLoopExecuteTime; + } + + + public void sleep(long millis) { try { Thread.sleep(millis);