diff --git a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java index e9f6d0a..a35b520 100644 --- a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java +++ b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java @@ -55,31 +55,32 @@ public class PacketsProcessor implements RpcProcessor{ + log.info("{} handler request.packets.size(): {}", StateServerFactory.getServerId(), request.packets.size()); + ss.readIndexState((state)->{ - var work = state.getWorkers().get( ss.getCluster().getServerId()); - - - work.setTaskQueueSize( work.getTaskQueueSize() - request.packets.size()); - work.setUpdateAt(Instant.now()); - log.debug("workerState taskQueueSize: {} psize: {} state {}", work.getTaskQueueSize(), request.packets.size(), state.getWorkers().size()); - var op = new Operate(OperateType.PUT); - op.setValue(work); - Operate.CallOperate(op, new OperateClosure() { + var ws = state.getWorkers().get(StateServerFactory.getServerId()); + ws.setTaskQueueSize( ws.getTaskQueueSize() - request.packets.size()); + ws.setUpdateAt(Instant.now()); + + log.debug("workerState taskQueueSize: {} psize: {} state {}", ws.getTaskQueueSize(), request.packets.size(), state.getWorkers().size()); + + Operate.CallOperate(new Operate(OperateType.PUT,ws), new OperateClosure() { @Override public void run(Status status) { - log.info("{}", this.getResponse()); - var resp = new RaftResponse(); - resp.setSuccess(true); - // resp.setRedirect(StateServerFactory.getStateServer().getNode().getLeaderId()); + var resp = this.getResponse(); + if(status.isOk()) { + resp.setSuccess(true); + log.info("{}", resp); + } else { + resp.setSuccess(false); + } + rpcCtx.sendResponse(resp); } - }); }); diff --git a/src/main/java/com/yuandian/dataflow/grpc/CollectPackets.java b/src/main/java/com/yuandian/dataflow/grpc/CollectPackets.java index 0395897..17215ad 100644 --- a/src/main/java/com/yuandian/dataflow/grpc/CollectPackets.java +++ b/src/main/java/com/yuandian/dataflow/grpc/CollectPackets.java @@ -16,7 +16,6 @@ import java.util.stream.Collectors; import com.google.protobuf.*; import com.google.protobuf.util.JsonFormat; import com.yuandian.common.Config; -import com.yuandian.dataflow.projo.Response; import com.yuandian.dataflow.proto.CollectPacketsServerGrpc; diff --git a/src/main/java/com/yuandian/dataflow/projo/Response.java b/src/main/java/com/yuandian/dataflow/projo/Response.java deleted file mode 100644 index 05bfe65..0000000 --- a/src/main/java/com/yuandian/dataflow/projo/Response.java +++ /dev/null @@ -1,14 +0,0 @@ - package com.yuandian.dataflow.projo; - -import org.apache.http.HttpStatus; - -import com.fasterxml.jackson.annotation.JsonProperty; - -public class Response { - @JsonProperty("code") - public org.apache.http.HttpStatus Code; - @JsonProperty("message") - public String Message; - @JsonProperty("data") - public Object Data; -} diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java index 78589c1..3a75347 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java @@ -20,7 +20,7 @@ import com.yuandian.dataflow.statemachine.operate.Operate; import com.yuandian.dataflow.statemachine.operate.Operate.OperateType; import com.yuandian.dataflow.statemachine.rpc.RaftResponse; -import com.yuandian.dataflow.statemachine.rpc.OperateProcessor.RequestOperate; +import com.yuandian.dataflow.statemachine.rpc.OperateProcessor.OperateRequest; import com.yuandian.dataflow.statemachine.state.State; import com.yuandian.dataflow.statemachine.state.StateFactory; import com.yuandian.dataflow.statemachine.state.WorkerState; @@ -147,15 +147,14 @@ public class StateMachine extends StateMachineAdapter { } var ss = StateServerFactory.getStateServer(); - ss.useFsmStateNotLock((state)->{ - var ws = state.getWorkers().get( ss.getCluster().getServerId() ); + ss.readIndexState((state)->{ + var ws = state.getWorkers().get( StateServerFactory.getServerId() ); if(ws == null) { - ws = new WorkerState(ss.getCluster().getServerId()); + ws = new WorkerState(StateServerFactory.getServerId()); // state.getWorkers().put(ss.getCluster().getServerId(), ws); } - Operate op = new Operate(OperateType.PUT); - op.setValue(ws); + Operate op = new Operate(OperateType.PUT, ws); ss.applyOperate(op, new OperateClosure() { @Override public void run(Status status) { @@ -204,10 +203,9 @@ public class StateMachine extends StateMachineAdapter { var ss = StateServerFactory.getStateServer(); var ws = new WorkerState(ss.getCluster().getServerId()); - log.debug("my: {} leader id {}",ss.getCluster().getServerId(), ss.getNode().getLeaderId()); + log.debug("my: {} leader id {}",StateServerFactory.getServerId(), StateServerFactory.getLeaderId()); - var op = new Operate(OperateType.PUT); - op.setValue(ws); + var op = new Operate(OperateType.PUT, ws); Operate.CallOperate(op, new OperateClosure() { @Override @@ -234,13 +232,11 @@ public class StateMachine extends StateMachineAdapter { log.debug("{} {}",ctx, StateServerFactory.getCluster().getServerId()); var ss = StateServerFactory.getStateServer(); - var ws = new WorkerState(ss.getCluster().getServerId()); + var ws = new WorkerState(StateServerFactory.getServerId()); - log.debug("my: {} leader id {}",ss.getCluster().getServerId(), ss.getNode().getLeaderId()); - - var op = new Operate(OperateType.PUT); - op.setValue(ws); + log.debug("my: {} leader id {}",StateServerFactory.getServerId(), StateServerFactory.getLeaderId()); + var op = new Operate(OperateType.PUT, ws); Operate.CallOperate(op, new OperateClosure() { @Override public void run(Status status) { diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java index 69ddde0..66cad64 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java @@ -42,7 +42,7 @@ import com.alipay.sofa.jraft.util.ThreadPoolUtil; import com.yuandian.dataflow.statemachine.operate.Operate; import com.yuandian.dataflow.statemachine.rpc.RaftResponse; -import com.yuandian.dataflow.statemachine.rpc.OperateProcessor.RequestOperate; +import com.yuandian.dataflow.statemachine.rpc.OperateProcessor.OperateRequest; import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft; import com.yuandian.dataflow.statemachine.state.State; import com.yuandian.dataflow.statemachine.state.WorkerState; @@ -64,9 +64,6 @@ public class StateServerFactory { private static StateServer ss; - private static String myPeerStr; - private static Configuration raftConf; - public static void startStateServer(String peerstr, Configuration conf) throws Exception { if(ss != null) { throw new Exception("重复初始化 InitStateServer"); @@ -75,9 +72,14 @@ public class StateServerFactory { } + public static boolean isLeader() { + return ss.getNode().isLeader() ; + } - + public static PeerId getLeaderId() { + return ss.node.getLeaderId() ; + } public static PeerId getServerId() { return ss.getCluster().getServerId(); @@ -177,21 +179,15 @@ public class StateServerFactory { } } - public void useFsmStateNotLock(Consumer dofunc) { - - - - + public void readIndexState(Consumer dofunc) { getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() { - @Override public void run(Status status, long index, byte[] reqCtx) { - var state = ss.fsm.getState(); - dofunc.accept(state); + if( status.isOk()) { + dofunc.accept(ss.fsm.getState()); + } } - } ); - } @@ -204,7 +200,6 @@ public class StateServerFactory { } try { - closure.setValue(op); final Task task = new Task(); task.setData(ByteBuffer.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(op))); @@ -242,16 +237,16 @@ public class StateServerFactory { // final StoreEngineOptions opts = new StoreEngineOptions(); // return StoreEngineHelper.createReadIndexExecutor(opts.getReadIndexCoreThreads()); - return ThreadPoolUtil.newBuilder() // - .poolName("ReadIndexPool") // - .enableMetric(true) // - .coreThreads(4) // - .maximumThreads(4) // - .keepAliveSeconds(60L) // - .workQueue(new SynchronousQueue<>()) // - .threadFactory(new NamedThreadFactory("ReadIndexService", true)) // - .rejectedHandler(new ThreadPoolExecutor.AbortPolicy()) // - .build(); + return ThreadPoolUtil.newBuilder() // + .poolName("ReadIndexPool") // + .enableMetric(true) // + .coreThreads(4) // + .maximumThreads(4) // + .keepAliveSeconds(60L) // + .workQueue(new SynchronousQueue<>()) // + .threadFactory(new NamedThreadFactory("ReadIndexService", true)) // + .rejectedHandler(new ThreadPoolExecutor.AbortPolicy()) // + .build(); } } @@ -260,7 +255,7 @@ public class StateServerFactory { public static void main(String[] args) throws InterruptedException, RemotingException { var rpcClient = new BoltRaftRpcFactory().createRpcClient(); rpcClient.init(new CliOptions()); - var resp = rpcClient.invokeSync(new Endpoint("localhost",4441), new RequestOperate(), 5000); + var resp = rpcClient.invokeSync(new Endpoint("localhost",4441), new OperateRequest(), 5000); log.info("{}", resp); } } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java b/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java index d798fdf..bab840a 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java @@ -2,17 +2,14 @@ package com.yuandian.dataflow.statemachine.operate; import java.io.Serializable; -import com.alipay.sofa.jraft.Status; import com.alipay.sofa.jraft.error.RemotingException; import com.alipay.sofa.jraft.rpc.InvokeCallback; -import com.yuandian.dataflow.projo.Response; import com.yuandian.dataflow.statemachine.OperateClosure; import com.yuandian.dataflow.statemachine.StateServerFactory; import com.yuandian.dataflow.statemachine.rpc.OperateProcessor; import com.yuandian.dataflow.statemachine.rpc.RaftResponse; -import com.yuandian.dataflow.statemachine.state.StateFactory; +import com.yuandian.dataflow.statemachine.state.WorkerState; - import lombok.Data; import lombok.var; import lombok.extern.slf4j.Slf4j; @@ -34,8 +31,9 @@ public class Operate implements Serializable { private OperateType type; private Object value; - public Operate(OperateType t) { + public Operate(OperateType t, WorkerState ws) { this.type = t; + this.value = ws; } public T getValue() { @@ -48,17 +46,17 @@ public class Operate implements Serializable { }; public static void CallOperate(Operate op, OperateClosure closure) { - + var ss = StateServerFactory.getStateServer(); - if (StateServerFactory.getStateServer().isLeader()) { + if (StateServerFactory.isLeader()) { ss.applyOperate(op, closure); return; } - var request = new OperateProcessor.RequestOperate(); + var request = new OperateProcessor.OperateRequest(); request.setOperate(op); - var leaderId = StateServerFactory.getStateServer().getNode().getLeaderId(); + var leaderId = StateServerFactory.getLeaderId(); try { ss.getRpcClient().invokeAsync(leaderId.getEndpoint(), request, new InvokeCallback() { @@ -66,7 +64,7 @@ public class Operate implements Serializable { @Override public void complete(Object result, Throwable err) { log.info("{}", result); - var resp = (RaftResponse)result; + var resp = (RaftResponse) result; closure.setResponse(resp); closure.success(resp.getOperate()); } @@ -75,7 +73,7 @@ public class Operate implements Serializable { } catch (InterruptedException | RemotingException e) { // TODO Auto-generated catch block closure.failure("failure", null); - log.info("{}", e.toString()); + log.info("{}", e.toString()); } } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/rpc/OperateProcessor.java b/src/main/java/com/yuandian/dataflow/statemachine/rpc/OperateProcessor.java index 6c96a28..b8b4c8a 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/rpc/OperateProcessor.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/rpc/OperateProcessor.java @@ -40,19 +40,18 @@ import lombok.extern.slf4j.Slf4j; */ @Slf4j @ProcessorRaft -public class OperateProcessor implements RpcProcessor { +public class OperateProcessor implements RpcProcessor { /** - * 同步状态时, 需要用的结构类. 新增的状态可以在 State结构里添加 + * 同步状态时, 需要用的结构类. 新增的状态可以在 Operate结构里添加 * * @author eson *2022年7月11日-16:01:07 */ - @Getter @Setter @ToString - public static class RequestOperate implements Serializable { + public static class OperateRequest implements Serializable { private static final long serialVersionUID = 1L; @@ -61,7 +60,7 @@ public class OperateProcessor implements RpcProcessor { + ss.readIndexState((state) -> { alivePeers.forEach((peer) -> { WorkerState ws = state.getWorkers().get(peer); if (ws != null) { @@ -89,8 +89,8 @@ public class StateFactory { request.getPackets().add(p); } - var op = new Operate(OperateType.PUT); - op.setValue(ws); + var op = new Operate(OperateType.PUT, ws); + Operate.CallOperate(op, new OperateClosure() { @Override public void run(Status status) { diff --git a/src/test/java/com/yuandian/dataflow/statemachine/StateMachineTest.java b/src/test/java/com/yuandian/dataflow/statemachine/StateMachineTest.java index eed9347..5e117a0 100644 --- a/src/test/java/com/yuandian/dataflow/statemachine/StateMachineTest.java +++ b/src/test/java/com/yuandian/dataflow/statemachine/StateMachineTest.java @@ -10,7 +10,7 @@ import com.alipay.sofa.jraft.rpc.InvokeCallback; import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory; import com.alipay.sofa.jraft.util.Endpoint; import com.yuandian.dataflow.statemachine.rpc.RaftResponse; -import com.yuandian.dataflow.statemachine.rpc.OperateProcessor.RequestOperate; +import com.yuandian.dataflow.statemachine.rpc.OperateProcessor.OperateRequest; import com.yuandian.dataflow.statemachine.state.State; import lombok.var;