diff --git a/pom.xml b/pom.xml index 6aa74f6..d35d4da 100644 --- a/pom.xml +++ b/pom.xml @@ -147,13 +147,14 @@ ${grpc.version} - + org.projectlombok lombok 1.18.24 provided + diff --git a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java index ff00dc9..df20041 100644 --- a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java +++ b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java @@ -7,15 +7,23 @@ package com.yuandian.dataflow.controller; import java.io.Serializable; +import java.time.Instant; import java.util.ArrayList; +import com.alipay.sofa.jraft.Status; import com.alipay.sofa.jraft.entity.PeerId; +import com.alipay.sofa.jraft.error.RemotingException; +import com.alipay.sofa.jraft.rpc.InvokeCallback; import com.alipay.sofa.jraft.rpc.RpcContext; import com.alipay.sofa.jraft.rpc.RpcProcessor; import com.google.protobuf.Any; import com.yuandian.dataflow.statemachine.StateServerFactory; -import com.yuandian.dataflow.statemachine.rpc.ResponseSM; +import com.yuandian.dataflow.statemachine.SyncClosure; +import com.yuandian.dataflow.statemachine.rpc.RaftResponse; +import com.yuandian.dataflow.statemachine.rpc.SyncStateProcessor; +import com.yuandian.dataflow.statemachine.rpc.SyncWorkerStateProcessor; import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft; +import com.yuandian.dataflow.statemachine.state.State; import lombok.Getter; import lombok.Setter; @@ -42,19 +50,62 @@ public class PacketsProcessor implements RpcProcessor{ + + var work = state.getWorkers().get( ss.getCluster().getServerId()); + + + work.setTaskQueueSize( work.getTaskQueueSize() - request.packets.size()); + work.setUpdateAt(Instant.now()); + + if(!ss.isLeader()) { + var requestUpdateState = new SyncWorkerStateProcessor.RequestWorkerState(); + requestUpdateState.setWorkerState(work); + log.info("转发 {}", work); + try { + ss.getRpcClient().invokeAsync(StateServerFactory.getNode().getLeaderId().getEndpoint(), + requestUpdateState, new InvokeCallback() { + @Override + public void complete(Object result, Throwable err) { + if (err != null) { + err.printStackTrace(); + } else { + log.debug("转发 udate workerState {}", result); + } + rpcCtx.sendResponse(resp); + }}, 5000); + } catch (InterruptedException | RemotingException e) { + e.printStackTrace(); + } + return; + } + + + StateServerFactory.getStateServer().applyWorkerState(work, new SyncClosure() { + @Override + public void run(Status status) { + log.debug("finsh tasks size {}, size: {}", status, request.packets.size()); + rpcCtx.sendResponse(resp); + } + + }); + }); + + } @Override public String interest() { - // TODO Auto-generated method stub return PacketsRequest.class.getName(); } - - - } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java index 59afa42..5cba656 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java @@ -16,8 +16,8 @@ import com.alipay.sofa.jraft.error.RaftException; import com.alipay.sofa.jraft.error.RemotingException; import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader; import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter; -import com.yuandian.dataflow.statemachine.rpc.ResponseSM; -import com.yuandian.dataflow.statemachine.rpc.SyncConditionProcessor.RequestCondition; +import com.yuandian.dataflow.statemachine.rpc.RaftResponse; +import com.yuandian.dataflow.statemachine.rpc.SyncWorkerStateProcessor.RequestWorkerState; import com.yuandian.dataflow.statemachine.rpc.SyncStateProcessor.RequestState; import com.yuandian.dataflow.statemachine.state.State; import com.yuandian.dataflow.statemachine.state.StateFactory; @@ -58,46 +58,9 @@ public class StateMachine extends StateMachineAdapter { return state; } - - /** - * Returns current value. 读取修改都在这个函数域内进行 - */ - public void useState(Function dofunc) { - dofunc.apply(this.state); - } - /** - * Returns current value. 读取修改都在这个函数域内进行 - * @throws RemotingException - * @throws InterruptedException - */ - public void updateState(Function dofunc) throws InterruptedException, RemotingException { - - var newstate = dofunc.apply(this.state); - var ss = StateServerFactory.getStateServer(); - if(!isLeader()) { - var request = new RequestState(); - request.setState(newstate); - var result = ss.getRpcClient().invokeSync(ss.getNode().getLeaderId().getEndpoint(), request, 5000); - log.info("{}", result); - return; - } - - // this.state = newstate; - if(newstate != null) { - var colsure = new SyncClosure() { - @Override - public void run(Status status) { - - } - }; - colsure.setValue(newstate); - StateServerFactory.getStateServer().applyState(newstate, colsure); - } - - } - - + + @Override @SuppressWarnings("unchecked") public void onApply(final Iterator iter) { @@ -107,7 +70,7 @@ public class StateMachine extends StateMachineAdapter { // This task is applied by this node, get value from closure to avoid additional // parsing. var closure = (SyncClosure)iter.done(); // 只支持单一个State. 全状态机只支持一种提交 - log.info("SyncDataClosure(done) taskState:{} leaderTerm:{} {}",this.state, this.leaderTerm, closure); + // log.info("SyncDataClosure(done) taskState:{} leaderTerm:{} {}",this.state, this.leaderTerm, closure); this.state = closure.getValue(); closure.success(state); closure.run(Status.OK()); @@ -115,11 +78,11 @@ public class StateMachine extends StateMachineAdapter { // Have to parse FetchAddRequest from this user log. final ByteBuffer data = iter.getData(); try { - synchronized(state) { - state = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize( - data.array(), State.class.getName()); - log.info("SyncDataClosure(null) taskState:{} leaderTerm:{}", state, this.leaderTerm); - } + + state = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize( + data.array(), State.class.getName()); + // log.info("SyncDataClosure(null) taskState:{} leaderTerm:{}", state, this.leaderTerm); + } catch (CodecException e) { e.printStackTrace(); @@ -150,20 +113,31 @@ public class StateMachine extends StateMachineAdapter { public void onLeaderStart(final long term) { log.debug("onLeaderStart {}", StateServerFactory.getCluster().getServerId()); this.leaderTerm.set(term); - try { - updateState((state)->{ - var ws = new WorkerState(StateServerFactory.getStateServer().getCluster().getServerId()); - state.getWorkers().put(ws.peerId, ws); - return state; - }); - if(!StateFactory.getMasterExecute().isAlive()) { - StateFactory.getMasterExecute().start(); - } - } catch (InterruptedException e) { - e.printStackTrace(); - } catch (RemotingException e) { - e.printStackTrace(); + + if(StateFactory.getMasterExecute().isAlive()) { + StateFactory.getMasterExecute().interrupt(); } + + var ss = StateServerFactory.getStateServer(); + ss.useFsmStateNotLock((state)->{ + var ws = state.getWorkers().get( ss.getCluster().getServerId() ); + if(ws == null) { + ws = new WorkerState(ss.getCluster().getServerId()); + state.getWorkers().put(ss.getCluster().getServerId(), ws); + } + + ss.applyState(state, new SyncClosure() { + @Override + public void run(Status status) { + log.debug("master update workerstate: {}", status); + } + + }); + + }); + + StateFactory.getMasterExecute().start(); + super.onLeaderStart(term); } @@ -177,17 +151,7 @@ public class StateMachine extends StateMachineAdapter { StateFactory.getMasterExecute().interrupt(); } - try { - updateState((state)->{ - state.getWorkers().remove( StateServerFactory.getServerId() ); - return state; - }); - - } catch (InterruptedException e) { - e.printStackTrace(); - } catch (RemotingException e) { - e.printStackTrace(); - } + } @@ -210,11 +174,11 @@ public class StateMachine extends StateMachineAdapter { var ss = StateServerFactory.getStateServer(); var ws = new WorkerState(ss.getCluster().getServerId()); - var request = new RequestCondition(); + var request = new RequestWorkerState(); request.setWorkerState(ws); log.debug("my: {} leader id {}",ss.getCluster().getServerId(), ss.getNode().getLeaderId()); - ResponseSM resp; - resp = (ResponseSM)StateServerFactory.getStateServer().getRpcClient().invokeSync(ss.getNode().getLeaderId().getEndpoint(), request, 5000); + RaftResponse resp; + resp = (RaftResponse)StateServerFactory.getStateServer().getRpcClient().invokeSync(ss.getNode().getLeaderId().getEndpoint(), request, 5000); if(resp == null) { log.debug("{} set WorkerState is error", resp); } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java index 4f47e9e..cef50ce 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java @@ -12,6 +12,7 @@ import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; +import java.util.function.Consumer; import java.util.function.Function; import org.reflections.Reflections; @@ -38,8 +39,8 @@ import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory; import com.alipay.sofa.jraft.util.BytesUtil; import com.alipay.sofa.jraft.util.Endpoint; import com.alipay.sofa.jraft.util.ThreadPoolUtil; -import com.yuandian.dataflow.statemachine.rpc.ResponseSM; -import com.yuandian.dataflow.statemachine.rpc.SyncConditionProcessor.RequestCondition; +import com.yuandian.dataflow.statemachine.rpc.RaftResponse; +import com.yuandian.dataflow.statemachine.rpc.SyncWorkerStateProcessor.RequestWorkerState; import com.yuandian.dataflow.statemachine.rpc.SyncStateProcessor.RequestState; import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft; import com.yuandian.dataflow.statemachine.state.State; @@ -166,164 +167,21 @@ public class StateServerFactory { return this.fsm.isLeader(); } - - public void useFsmStateAsync(Function dofunc) { - - SyncClosure closure = new SyncClosure() { - @Override - public void run(Status status) { - if(status.isOk()) { - dofunc.apply(this.getValue()); - } - } - }; - - getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() { - @Override - public void run(Status status, long index, byte[] reqCtx) { - - getFsm().useState((fsmState)->{ - if(status.isOk()){ - closure.setValue(fsmState); - closure.success(fsmState); - closure.run(Status.OK()); - return null; - } - - readIndexExecutor.execute(() -> { - if(isLeader()){ - log.info("Fail to get value with 'ReadIndex': {}, try to applying to the state machine.", getNode()); - applyState(fsmState, closure); - }else { - handlerNotLeaderError(closure); - } - }); - return null; - }); - } - }); - - return ; - } - - /** - * 同步更新 WorkerState - * @param dofunc - * @throws InterruptedException - * @throws RemotingException - */ - public void updateFsmWorkerState(WorkerState ws) throws InterruptedException, RemotingException { - - // leader就直接提交 - if(isLeader()) { - var closure = new SyncClosure() { - @Override - public void run(Status status) { - // log.debug("leader {}", status); - this.synclock.notify(); - } - }; - StateServerFactory.getStateServer().applyWorkerState(ws, closure); - closure.synclock.wait(5000); - return; - } - - try { - // 非leader就 rpc请求 - var ss = StateServerFactory.getStateServer(); - var request = new RequestCondition(); - request.setWorkerState(ws); - log.debug("my: {} leader id {}",ss.getCluster().getServerId(), ss.getNode().getLeaderId()); - ResponseSM resp; - resp = (ResponseSM)StateServerFactory.getRpcClient().invokeSync(StateServerFactory.getNode().getLeaderId().getEndpoint(), request, 5000); - if(resp == null) { - log.error("{} set WorkerState is error", resp); - } - // log.debug("follow is {}", resp); - return; - } catch (InterruptedException | RemotingException e) { - e.printStackTrace(); - } - } - - /** - * 同步更新整个State - * @param s - * @throws InterruptedException - * @throws RemotingException - */ - public void updateFsmState(State s) throws InterruptedException, RemotingException { - if(isLeader()) { - var closure = new SyncClosure() { - @Override - public void run(Status status) { - // log.debug("leader update {}", status); - synclock.notify(); - } - }; - StateServerFactory.getStateServer().applyState(s, closure); - closure.synclock.wait(5000); - return; - } - - var ss = StateServerFactory.getStateServer(); - var request = new RequestState(); - request.setState(s); - log.debug("my: {} leader id {}",ss.getCluster().getServerId(), ss.getNode().getLeaderId()); - ResponseSM resp; - resp = (ResponseSM)StateServerFactory.getStateServer().getRpcClient().invokeSync(ss.getNode().getLeaderId().getEndpoint(), request, 5000); - if(resp == null) { - log.error("{} set State is error", resp); - } - // log.debug("follow is {}", resp); - return; - - } - - - public void updateFsmStateAsync(State s, Function onCompleted) throws InterruptedException, RemotingException { - if(isLeader()) { - var closure = new SyncClosure() { - @Override - public void run(Status status) { - // log.debug("leader update {}", status); - if(status.isOk()) { - onCompleted.apply(status); - } - } - }; - StateServerFactory.getStateServer().applyState(s, closure); - return; - } - - var ss = StateServerFactory.getStateServer(); - var request = new RequestState(); - request.setState(s); - log.debug("my: {} leader id {}",ss.getCluster().getServerId(), ss.getNode().getLeaderId()); - StateServerFactory - .getStateServer() - .getRpcClient() - .invokeAsync(ss.getNode().getLeaderId().getEndpoint(), request, new InvokeCallback() { - @Override - public void complete(Object result, Throwable err) { - if(result != null){ - onCompleted.apply(Status.OK()); - } else { - var status = new Status(10000, "rpc invokeAsync with request: {}", request); - log.debug("{}", status); - onCompleted.apply(status); - } - } - - }, 5000); - - // log.debug("follow is {}", resp); - return; - + public void useFsmState(Consumer dofunc) { + var state = ss.fsm.getState(); + synchronized(state) { + dofunc.accept(state); + } } + public void useFsmStateNotLock(Consumer dofunc) { + var state = ss.fsm.getState(); + dofunc.accept(state); + } + + public void applyState(State state, SyncClosure closure) { // 所有的提交都必须再leader进行 @@ -351,10 +209,8 @@ public class StateServerFactory { ss.handlerNotLeaderError(closure); return; } - - - useFsmStateAsync((fsmState)->{ - + + StateServerFactory.getStateServer().useFsmStateNotLock((fsmState)->{ var wmap = fsmState.getWorkers(); var wstate = wmap.get(state.getPeerId()); @@ -376,16 +232,13 @@ public class StateServerFactory { closure.success(fsmState); closure.run(Status.OK()); } - return null; }); - - - + } - public ResponseSM redirect() { - final ResponseSM response = new ResponseSM(); + public RaftResponse redirect() { + final RaftResponse response = new RaftResponse(); response.setSuccess(false); if (this.node != null) { final PeerId leader = this.node.getLeaderId(); diff --git a/src/main/java/com/yuandian/dataflow/statemachine/SyncClosure.java b/src/main/java/com/yuandian/dataflow/statemachine/SyncClosure.java index 40ba6d9..19ab8a7 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/SyncClosure.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/SyncClosure.java @@ -3,7 +3,7 @@ package com.yuandian.dataflow.statemachine; import com.alipay.sofa.jraft.Closure; import com.alipay.sofa.jraft.Status; import com.alipay.sofa.jraft.entity.PeerId; -import com.yuandian.dataflow.statemachine.rpc.ResponseSM; +import com.yuandian.dataflow.statemachine.rpc.RaftResponse; import com.yuandian.dataflow.statemachine.state.State; import lombok.Getter; @@ -21,7 +21,7 @@ import org.slf4j.LoggerFactory; public abstract class SyncClosure implements Closure { // 状态机的统一响应 - private ResponseSM response; + private RaftResponse response; // 代表任务状态 private T value; @@ -33,7 +33,7 @@ public abstract class SyncClosure implements Closure { } public void failure(final String errorMsg, final PeerId redirect) { - final ResponseSM response = new ResponseSM(); + final RaftResponse response = new RaftResponse(); response.setSuccess(false); response.setMsg(errorMsg); response.setRedirect(redirect); @@ -41,7 +41,7 @@ public abstract class SyncClosure implements Closure { } public void success(final State value) { - final ResponseSM response = new ResponseSM(); + final RaftResponse response = new RaftResponse(); response.setState(value); response.setSuccess(true); setResponse(response); diff --git a/src/main/java/com/yuandian/dataflow/statemachine/rpc/ResponseSM.java b/src/main/java/com/yuandian/dataflow/statemachine/rpc/RaftResponse.java similarity index 93% rename from src/main/java/com/yuandian/dataflow/statemachine/rpc/ResponseSM.java rename to src/main/java/com/yuandian/dataflow/statemachine/rpc/RaftResponse.java index 5e4694f..f2769d8 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/rpc/ResponseSM.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/rpc/RaftResponse.java @@ -26,7 +26,7 @@ import lombok.extern.slf4j.Slf4j; @Getter @Setter @ToString -public class ResponseSM implements Serializable { +public class RaftResponse implements Serializable { private static final long serialVersionUID = 1L; diff --git a/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncConditionProcessor.java b/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncWorkerStateProcessor.java similarity index 85% rename from src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncConditionProcessor.java rename to src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncWorkerStateProcessor.java index 4bd0352..252f473 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncConditionProcessor.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncWorkerStateProcessor.java @@ -41,12 +41,12 @@ import lombok.extern.slf4j.Slf4j; */ @Slf4j @ProcessorRaft -public class SyncConditionProcessor implements RpcProcessor { +public class SyncWorkerStateProcessor implements RpcProcessor { @Getter @Setter @ToString - public static class RequestCondition implements Serializable { + public static class RequestWorkerState implements Serializable { private static final long serialVersionUID = 1L; @@ -54,7 +54,7 @@ public class SyncConditionProcessor implements RpcProcessor closure = new SyncClosure< State>() { @@ -70,7 +70,7 @@ public class SyncConditionProcessor implements RpcProcessor { - synchronized (alivePeers) { - alivePeers.forEach((peer) -> { - WorkerState ws = state.getWorkers().get(peer); - if (ws != null) { - var cap = 100 - ws.getTaskQueueSize(); - if (cap > 0) { - log.debug("{}", cap); - var request = new PacketsRequest(); - for (int i = 0; i < cap; i++) { - var p = Any.pack( - BacktrackingFlowOuterClass.BacktrackingFlow.newBuilder() - .setTableId(10086) - .build()); + // var state = ss.getFsm().getState(); + ss.useFsmStateNotLock((state) -> { + alivePeers.forEach((peer) -> { + WorkerState ws = state.getWorkers().get(peer); - request.getPackets().add(p); - - } - - try { - var result = ss.getRpcClient().invokeSync(ws.getPeerId().getEndpoint(), - request, 5000); - log.info("{}", result); - ws.setUpdateAt( Instant.now() ); - ws.setTaskQueueSize(ws.getTaskQueueSize() - cap); - - - - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } catch (RemotingException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } + if (ws != null) { + var cap = 100 - ws.getTaskQueueSize(); + log.debug("cap :{} peer: {}", cap, peer); + if (cap <= 0) { + return ; } - ss.applyState(state, new SyncClosure() { - public void run(Status status) { - log.debug("{}", status); - }; - } ); - alivePeers.notifyAll(); - }); - } + var request = new PacketsRequest(); + for (int i = 0; i < cap; i++) { + var p = Any.pack( + BacktrackingFlowOuterClass.BacktrackingFlow.newBuilder() + .setTableId(10086) + .build()); + request.getPackets().add(p); + } - return null; + + try { + log.debug("rpc {}", peer); + ss.getRpcClient().invokeAsync(peer.getEndpoint(), + request, new InvokeCallback() { + @Override + public void complete(Object result, Throwable err) { + if (err != null) { + err.printStackTrace(); + } else { + log.info("{} peer result", result); + } + } + + }, 5000); + } catch (InterruptedException | RemotingException e) { + e.printStackTrace(); + } + ws.setUpdateAt(Instant.now()); + ws.setTaskQueueSize(100); + } + }); + + ss.applyState(state, new SyncClosure() { + @Override + public void run(Status status) { + log.info("任务队列更新成功 {}", this.getValue().getWorkers()); + } + }); + + }); - synchronized (alivePeers) { - alivePeers.wait(5000); - } + + + + // ss.applyState(state, new SyncClosure() { + // public void run(Status status) { + // log.debug("{}", status); + // }; + // }); } diff --git a/src/test/java/com/yuandian/dataflow/statemachine/StateMachineTest.java b/src/test/java/com/yuandian/dataflow/statemachine/StateMachineTest.java index 565a0f2..a5e0146 100644 --- a/src/test/java/com/yuandian/dataflow/statemachine/StateMachineTest.java +++ b/src/test/java/com/yuandian/dataflow/statemachine/StateMachineTest.java @@ -9,7 +9,7 @@ import com.alipay.sofa.jraft.option.CliOptions; import com.alipay.sofa.jraft.rpc.InvokeCallback; import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory; import com.alipay.sofa.jraft.util.Endpoint; -import com.yuandian.dataflow.statemachine.rpc.ResponseSM; +import com.yuandian.dataflow.statemachine.rpc.RaftResponse; import com.yuandian.dataflow.statemachine.rpc.SyncStateProcessor.RequestState; import com.yuandian.dataflow.statemachine.state.State; @@ -28,12 +28,12 @@ public class StateMachineTest { fdata.setState(fstate); var leader = new Endpoint("localhost",4441); - ResponseSM resp = (ResponseSM)rpcClient.invokeSync(leader, fdata + RaftResponse resp = (RaftResponse)rpcClient.invokeSync(leader, fdata , 5000); log.info("{}", resp); if( resp != null && !resp.isSuccess() ) { leader = resp.getRedirect().getEndpoint(); - resp = (ResponseSM)rpcClient.invokeSync(resp.getRedirect().getEndpoint(), fdata + resp = (RaftResponse)rpcClient.invokeSync(resp.getRedirect().getEndpoint(), fdata , 5000); log.info("{}", resp); }