diff --git a/src/main/java/com/yuandian/dataflow/controller/TaskLog.java b/src/main/java/com/yuandian/dataflow/controller/TaskLog.java index 01ae05a..216a65c 100644 --- a/src/main/java/com/yuandian/dataflow/controller/TaskLog.java +++ b/src/main/java/com/yuandian/dataflow/controller/TaskLog.java @@ -25,25 +25,18 @@ public class TaskLog { @GetMapping(path = "/test") public ResponseEntity Processing() throws InterruptedException, RemotingException { - + + var ws = new WorkerState(new PeerId()); + StateServerFactory.getStateServer().updateFsmWorkerState(ws); + final Response response = new Response(); StateServerFactory.getStateServer().useFsmState((fsmState)->{ log.debug(fsmState.toString() ); + response.Message = fsmState.toString(); return null; }); - // state.getWorker().setPeerId(StateServerFactory.getStateServer().getNode().getNodeId().getPeerId()); - // state.getWorker().setTaskQueueSize(1); - - StateServerFactory.getStateServer().updateFsmState((fsmState)->{ - log.debug(fsmState.toString() ); - fsmState.getWorkers().put(new PeerId(), new WorkerState()); - return fsmState; - }); - - final Response response = new Response(); response.Code = HttpStatus.OK; - response.Message = "ok"; return new ResponseEntity(response, HttpStatus.OK); } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java index 8019d81..ede6e55 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java @@ -167,16 +167,19 @@ public class StateMachine extends StateMachineAdapter { this.leaderTerm.set(term); super.onLeaderStart(term); - var ss = StateServerFactory.getStateServer(); - var ws = new WorkerState(); - ws.setPeerId(ss.getCluster().getServerId()); - final SyncClosure closure = new SyncClosure< State>() { - @Override - public void run(Status status) { - log.debug("leader set WorkerState {} ", status); - } - }; - ss.applyWorkerState(ws, closure); + try { + updateState((state)->{ + var ws = new WorkerState(StateServerFactory.getStateServer().getCluster().getServerId()); + state.getWorkers().put(ws.peerId, ws); + return state; + }); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (RemotingException e) { + e.printStackTrace(); + } + + return; } @@ -184,10 +187,10 @@ public class StateMachine extends StateMachineAdapter { public void onStartFollowing(LeaderChangeContext ctx) { super.onStartFollowing(ctx); + try { var ss = StateServerFactory.getStateServer(); - var ws = new WorkerState(); - ws.setPeerId(ss.getCluster().getServerId()); + var ws = new WorkerState(ss.getCluster().getServerId()); var request = new RequestCondition(); request.setWorkerState(ws); log.debug("my: {} leader id {}",ss.getCluster().getServerId(), ss.getNode().getLeaderId()); diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java index 93173ea..3e6338c 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java @@ -33,6 +33,7 @@ import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory; import com.alipay.sofa.jraft.util.BytesUtil; import com.alipay.sofa.jraft.util.Endpoint; import com.alipay.sofa.jraft.util.ThreadPoolUtil; +import com.yuandian.dataflow.statemachine.rpc.RequestCondition; import com.yuandian.dataflow.statemachine.rpc.RequestState; import com.yuandian.dataflow.statemachine.rpc.ResponseSM; import com.yuandian.dataflow.statemachine.rpc.SyncConditionProcessor; @@ -126,9 +127,12 @@ public class StateServerFactory { } + /** + * 同步 可以使用follow使用, 但是可能延迟于leader. 只读 + * @param dofunc + */ public void useFsmState(Function dofunc) { - - + SyncClosure closure = new SyncClosure() { @Override public void run(Status status) { @@ -136,17 +140,18 @@ public class StateServerFactory { } }; - + getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() { @Override public void run(Status status, long index, byte[] reqCtx) { getFsm().useState((fsmState)->{ if(status.isOk()){ - log.debug("readIndex {}", fsmState); closure.success(fsmState); closure.run(Status.OK()); - dofunc.apply(fsmState); + synchronized(dofunc) { + dofunc.apply(fsmState); + } return null; } @@ -160,22 +165,99 @@ public class StateServerFactory { }); return null; }); - - + + synchronized(dofunc) { + log.debug("dofunc notify"); + dofunc.notify(); + } } }); - - - + + try { + synchronized(dofunc) { + log.debug("dofunc wait"); + dofunc.wait(5000); + log.debug("dofunc unwait"); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } return ; } - public void updateFsmState(Function dofunc) throws InterruptedException, RemotingException { - this.getFsm().updateState(dofunc); - } + /** + * 同步更新 WorkerState + * @param dofunc + * @throws InterruptedException + * @throws RemotingException + */ + public void updateFsmWorkerState(WorkerState ws) throws InterruptedException, RemotingException { - + if(isLeader()) { + var closure = new SyncClosure() { + @Override + public void run(Status status) { + log.debug("leader {}", status); + this.synclock.notify(); + } + }; + StateServerFactory.getStateServer().applyWorkerState(ws, closure); + closure.synclock.wait(5000); + return; + } + + try { + var ss = StateServerFactory.getStateServer(); + var request = new RequestCondition(); + request.setWorkerState(ws); + log.debug("my: {} leader id {}",ss.getCluster().getServerId(), ss.getNode().getLeaderId()); + ResponseSM resp; + resp = (ResponseSM)StateServerFactory.getStateServer().getRpcClient().invokeSync(ss.getNode().getLeaderId().getEndpoint(), request, 5000); + if(resp == null) { + log.error("{} set WorkerState is error", resp); + } + log.debug("follow is {}", resp); + return; + } catch (InterruptedException | RemotingException e) { + e.printStackTrace(); + } + } + + /** + * 同步更新整个State + * @param s + * @throws InterruptedException + * @throws RemotingException + */ + public void updateFsmState(State s) throws InterruptedException, RemotingException { + if(isLeader()) { + var closure = new SyncClosure() { + @Override + public void run(Status status) { + log.debug("leader {}", status); + synclock.notify(); + } + }; + StateServerFactory.getStateServer().applyState(s, closure); + closure.synclock.wait(5000); + + return; + } + + var ss = StateServerFactory.getStateServer(); + var request = new RequestState(); + request.setState(s); + log.debug("my: {} leader id {}",ss.getCluster().getServerId(), ss.getNode().getLeaderId()); + ResponseSM resp; + resp = (ResponseSM)StateServerFactory.getStateServer().getRpcClient().invokeSync(ss.getNode().getLeaderId().getEndpoint(), request, 5000); + if(resp == null) { + log.error("{} set State is error", resp); + } + log.debug("follow is {}", resp); + return; + + } public void applyState(State state, SyncClosure closure) { @@ -235,46 +317,6 @@ public class StateServerFactory { - } - - public void readIndexState(final boolean readOnlySafe, final SyncClosure closure) { - - - useFsmState((fsmState)->{ - closure.setValue(fsmState); - if(!readOnlySafe){ - - closure.success(fsmState); - closure.run(Status.OK()); - return null; - } - - getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() { - @Override - public void run(Status status, long index, byte[] reqCtx) { - - if(status.isOk()){ - log.debug("readIndex {}", fsmState); - closure.success(fsmState); - closure.run(Status.OK()); - return; - } - - readIndexExecutor.execute(() -> { - if(isLeader()){ - log.debug("Fail to get value with 'ReadIndex': {}, try to applying to the state machine.", status); - applyState(fsmState, closure); - }else { - handlerNotLeaderError(closure); - } - }); - } - }); - - return null; - }); - - } public ResponseSM redirect() { diff --git a/src/main/java/com/yuandian/dataflow/statemachine/SyncClosure.java b/src/main/java/com/yuandian/dataflow/statemachine/SyncClosure.java index f19f155..40ba6d9 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/SyncClosure.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/SyncClosure.java @@ -25,7 +25,9 @@ public abstract class SyncClosure implements Closure { // 代表任务状态 private T value; - + public Object synclock = new Object(); + + public SyncClosure() { } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/state/WorkerState.java b/src/main/java/com/yuandian/dataflow/statemachine/state/WorkerState.java index bc7e058..2d46d94 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/state/WorkerState.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/state/WorkerState.java @@ -29,4 +29,8 @@ public class WorkerState implements Serializable { // 节点的对应peerID public PeerId peerId; public long taskQueueSize; + + public WorkerState(PeerId pid) { + this.peerId = pid; + } }