From ad78dd261389e10ec850774245f53aaafaa70026 Mon Sep 17 00:00:00 2001 From: huangsimin Date: Mon, 18 Jul 2022 17:54:37 +0800 Subject: [PATCH] =?UTF-8?q?TODO:=20=E6=9B=B4=E5=A5=BD=E8=AE=BE=E8=AE=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../yuandian/dataflow/controller/TaskLog.java | 4 +- .../dataflow/statemachine/StateMachine.java | 20 +++--- .../statemachine/StateServerFactory.java | 72 +++++++++++-------- 3 files changed, 54 insertions(+), 42 deletions(-) diff --git a/src/main/java/com/yuandian/dataflow/controller/TaskLog.java b/src/main/java/com/yuandian/dataflow/controller/TaskLog.java index 06254c7..1360774 100644 --- a/src/main/java/com/yuandian/dataflow/controller/TaskLog.java +++ b/src/main/java/com/yuandian/dataflow/controller/TaskLog.java @@ -31,9 +31,9 @@ public class TaskLog { var state = new State(); closure.setValue(state); - StateServerFactory.getStateServer().useState((fsmState)->{ + StateServerFactory.getStateServer().useFsmState((fsmState)->{ log.error(fsmState.toString() ); - return fsmState; + return null; }); diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java index 91234ed..969eac1 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java @@ -64,9 +64,9 @@ public class StateMachine extends StateMachineAdapter { /** * Returns current value. 只有Get 操作状态由协议流程决定 Apply */ - // public State getState() { - // return state; - // } + public State getState() { + return state; + } /** @@ -106,10 +106,12 @@ public class StateMachine extends StateMachineAdapter { // Have to parse FetchAddRequest from this user log. final ByteBuffer data = iter.getData(); try { - state = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize( - data.array(), State.class.getName()); - log.info("SyncDataClosure(null) taskState:{} leaderTerm:{}", state, this.leaderTerm); - + + state = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize( + data.array(), State.class.getName()); + log.info("SyncDataClosure(null) taskState:{} leaderTerm:{}", state, this.leaderTerm); + + } catch (CodecException e) { e.printStackTrace(); } @@ -147,7 +149,7 @@ public class StateMachine extends StateMachineAdapter { @Override public void onStartFollowing(LeaderChangeContext ctx) { - // TODO Auto-generated method stub + super.onStartFollowing(ctx); var ss = StateServerFactory.getStateServer(); @@ -162,7 +164,7 @@ public class StateMachine extends StateMachineAdapter { resp = (ResponseSM)StateServerFactory.getStateServer().getRpcClient().invokeSync(node.getLeaderId().getEndpoint(), request, 5000); log.error("{}", resp); } catch (InterruptedException | RemotingException e) { - // TODO Auto-generated catch block + e.printStackTrace(); } } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java index 0b54ff8..9d0f091 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java @@ -126,48 +126,54 @@ public class StateServerFactory { } - public void useState(Function dofunc) { - - this.fsm.useState((fsmState)->{ - - - + public void useFsmState(Function dofunc) { + + SyncClosure closure = new SyncClosure() { @Override public void run(Status status) { - + } }; - + getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() { @Override public void run(Status status, long index, byte[] reqCtx) { - if(status.isOk()){ - log.error("readIndex {}", fsmState); - closure.success(fsmState); - closure.run(Status.OK()); - return; - } - - readIndexExecutor.execute(() -> { - if(isLeader()){ - log.error("Fail to get value with 'ReadIndex': {}, try to applying to the state machine.", status); - applyState(fsmState, closure); - }else { - handlerNotLeaderError(closure); + + getFsm().useState((fsmState)->{ + if(status.isOk()){ + log.error("readIndex {}", fsmState); + closure.success(fsmState); + closure.run(Status.OK()); + dofunc.apply(fsmState); + return null; } + + readIndexExecutor.execute(() -> { + if(isLeader()){ + log.error("Fail to get value with 'ReadIndex': {}, try to applying to the state machine.", status); + applyState(fsmState, closure); + }else { + handlerNotLeaderError(closure); + } + }); + return null; }); + + } }); - return null; - }); + return ; + } + public void applyState(State state, SyncClosure closure) { + // 所有的提交都必须再leader进行 if (!ss.isLeader()) { ss.handlerNotLeaderError(closure); return; @@ -177,7 +183,7 @@ public class StateServerFactory { closure.setValue(state); final Task task = new Task(); task.setData(ByteBuffer.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(state))); - task.setDone(closure); + task.setDone(closure); // 确认所有数据 一致, 不需要加锁 StateServerFactory.getStateServer().getNode().apply(task); } catch (CodecException e) { @@ -194,7 +200,9 @@ public class StateServerFactory { return; } - useState((fsmState)->{ + + useFsmState((fsmState)->{ + var wmap = fsmState.getWorkers(); var wstate = wmap.get(state.getPeerId()); @@ -205,16 +213,18 @@ public class StateServerFactory { log.error("{}", fsmState); task.setData(ByteBuffer.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(fsmState))); task.setDone(closure); - StateServerFactory.getStateServer().getNode().apply(task); + StateServerFactory.getStateServer().getNode().apply(task); // 提交数据 } catch (CodecException e) { String errorMsg = "Fail to encode TaskState"; log.error(errorMsg, e); closure.failure(errorMsg, PeerId.emptyPeer()); closure.run(new Status(RaftError.EINTERNAL, errorMsg)); } + } else { + closure.success(fsmState); + closure.run(Status.OK()); } - - return fsmState; + return null; }); @@ -225,13 +235,13 @@ public class StateServerFactory { public void readIndexState(final boolean readOnlySafe, final SyncClosure closure) { - useState((fsmState)->{ + useFsmState((fsmState)->{ closure.setValue(fsmState); if(!readOnlySafe){ closure.success(fsmState); closure.run(Status.OK()); - return fsmState; + return null; } getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() { @@ -256,7 +266,7 @@ public class StateServerFactory { } }); - return fsmState; + return null; });