From 849821fd8b65802b289b4b4b3af1bde221ecb393 Mon Sep 17 00:00:00 2001 From: huangsimin <474420502@qq.com> Date: Tue, 26 Jul 2022 16:59:00 +0800 Subject: [PATCH] =?UTF-8?q?TODO:=20=E8=A7=A3=E5=86=B3=E5=A4=84=E7=90=86pac?= =?UTF-8?q?kets=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dataflow/controller/PacketsProcessor.java | 39 +++---- .../dataflow/grpc/CollectPackets.java | 26 ++--- .../{SyncClosure.java => OperateClosure.java} | 13 +-- .../dataflow/statemachine/StateMachine.java | 102 +++++++++++++----- .../statemachine/StateServerFactory.java | 71 ++++-------- .../statemachine/operate/Operate.java | 82 ++++++++++++++ ...teProcessor.java => OperateProcessor.java} | 17 +-- .../statemachine/rpc/RaftResponse.java | 3 +- .../rpc/SyncWorkerStateProcessor.java | 78 -------------- .../statemachine/state/StateFactory.java | 93 +++++++--------- .../com/yuandian/dataflow/MongodbTest.java | 6 +- .../statemachine/StateMachineTest.java | 82 +++++++------- 12 files changed, 307 insertions(+), 305 deletions(-) rename src/main/java/com/yuandian/dataflow/statemachine/{SyncClosure.java => OperateClosure.java} (79%) create mode 100644 src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java rename src/main/java/com/yuandian/dataflow/statemachine/rpc/{SyncStateProcessor.java => OperateProcessor.java} (74%) delete mode 100644 src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncWorkerStateProcessor.java diff --git a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java index 84d6536..a19e406 100644 --- a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java +++ b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java @@ -18,10 +18,12 @@ import com.alipay.sofa.jraft.rpc.RpcContext; import com.alipay.sofa.jraft.rpc.RpcProcessor; import com.google.protobuf.Any; import com.yuandian.dataflow.statemachine.StateServerFactory; -import com.yuandian.dataflow.statemachine.SyncClosure; +import com.yuandian.dataflow.statemachine.operate.Operate; +import com.yuandian.dataflow.statemachine.operate.Operate.OperateType; +import com.yuandian.dataflow.statemachine.OperateClosure; import com.yuandian.dataflow.statemachine.rpc.RaftResponse; -import com.yuandian.dataflow.statemachine.rpc.SyncStateProcessor; -import com.yuandian.dataflow.statemachine.rpc.SyncWorkerStateProcessor; +import com.yuandian.dataflow.statemachine.rpc.OperateProcessor; + import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft; import com.yuandian.dataflow.statemachine.state.State; @@ -65,34 +67,17 @@ public class PacketsProcessor implements RpcProcessor() { @Override public void run(Status status) { - log.debug("finsh tasks size {}, size: {}", status, request.packets.size()); - rpcCtx.sendResponse(resp); + log.info("{}", this.getResponse()); + rpcCtx.sendResponse(this.getResponse()); } + }); }); diff --git a/src/main/java/com/yuandian/dataflow/grpc/CollectPackets.java b/src/main/java/com/yuandian/dataflow/grpc/CollectPackets.java index f751c86..0395897 100644 --- a/src/main/java/com/yuandian/dataflow/grpc/CollectPackets.java +++ b/src/main/java/com/yuandian/dataflow/grpc/CollectPackets.java @@ -81,7 +81,7 @@ public class CollectPackets extends CollectPacketsServerImplBase { log.info("次序:{} 条数: {}, {}:ms", count, i, Duration.between(now, Instant.now()).toMillis()); } } catch (Exception e) { - e.printStackTrace(); + log.info("{}", e.toString()); } finally { channel.shutdown().awaitTermination(1, TimeUnit.SECONDS); } @@ -99,7 +99,7 @@ public class CollectPackets extends CollectPacketsServerImplBase { // MongodbTest.insertMsgToMongoDB(result); } catch (InvalidProtocolBufferException e) { - e.printStackTrace(); + log.info("{}", e.toString()); } return null; }); @@ -110,7 +110,7 @@ public class CollectPackets extends CollectPacketsServerImplBase { // MongodbTest.insertMsgToMongoDB(result); } catch (InvalidProtocolBufferException e) { - e.printStackTrace(); + log.info("{}", e.toString()); } return null; });*/ @@ -131,7 +131,7 @@ public class CollectPackets extends CollectPacketsServerImplBase { // System.out.println("result:" + response.getBody()); } catch (InvalidProtocolBufferException e) { - e.printStackTrace(); + log.info("{}", e.toString()); } return null; }); @@ -143,7 +143,7 @@ public class CollectPackets extends CollectPacketsServerImplBase { // MongodbTest.insertMsgToMongoDB(result); } catch (InvalidProtocolBufferException e) { - e.printStackTrace(); + log.info("{}", e.toString()); } return null; }); @@ -154,7 +154,7 @@ public class CollectPackets extends CollectPacketsServerImplBase { // MongodbTest.insertMsgToMongoDB(result); } catch (InvalidProtocolBufferException e) { - e.printStackTrace(); + log.info("{}", e.toString()); } return null; }); @@ -165,7 +165,7 @@ public class CollectPackets extends CollectPacketsServerImplBase { // MongodbTest.insertMsgToMongoDB(result); } catch (InvalidProtocolBufferException e) { - e.printStackTrace(); + log.info("{}", e.toString()); } return null; }); @@ -176,7 +176,7 @@ public class CollectPackets extends CollectPacketsServerImplBase { // MongodbTest.insertMsgToMongoDB(result); } catch (InvalidProtocolBufferException e) { - e.printStackTrace(); + log.info("{}", e.toString()); } return null; }); @@ -187,7 +187,7 @@ public class CollectPackets extends CollectPacketsServerImplBase { // MongodbTest.insertMsgToMongoDB(result); } catch (InvalidProtocolBufferException e) { - e.printStackTrace(); + log.info("{}", e.toString()); } return null; }); @@ -198,7 +198,7 @@ public class CollectPackets extends CollectPacketsServerImplBase { // MongodbTest.insertMsgToMongoDB(result); } catch (InvalidProtocolBufferException e) { - e.printStackTrace(); + log.info("{}", e.toString()); } return null; }); @@ -209,7 +209,7 @@ public class CollectPackets extends CollectPacketsServerImplBase { // MongodbTest.insertMsgToMongoDB(result); } catch (InvalidProtocolBufferException e) { - e.printStackTrace(); + log.info("{}", e.toString()); } return null; }); @@ -220,7 +220,7 @@ public class CollectPackets extends CollectPacketsServerImplBase { // MongodbTest.insertMsgToMongoDB(result); } catch (InvalidProtocolBufferException e) { - e.printStackTrace(); + log.info("{}", e.toString()); } return null; }); @@ -231,7 +231,7 @@ public class CollectPackets extends CollectPacketsServerImplBase { // MongodbTest.insertMsgToMongoDB(result); } catch (InvalidProtocolBufferException e) { - e.printStackTrace(); + log.info("{}", e.toString()); } return null; });*/ diff --git a/src/main/java/com/yuandian/dataflow/statemachine/SyncClosure.java b/src/main/java/com/yuandian/dataflow/statemachine/OperateClosure.java similarity index 79% rename from src/main/java/com/yuandian/dataflow/statemachine/SyncClosure.java rename to src/main/java/com/yuandian/dataflow/statemachine/OperateClosure.java index 19ab8a7..69a10e7 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/SyncClosure.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/OperateClosure.java @@ -3,6 +3,7 @@ package com.yuandian.dataflow.statemachine; import com.alipay.sofa.jraft.Closure; import com.alipay.sofa.jraft.Status; import com.alipay.sofa.jraft.entity.PeerId; +import com.yuandian.dataflow.statemachine.operate.Operate; import com.yuandian.dataflow.statemachine.rpc.RaftResponse; import com.yuandian.dataflow.statemachine.state.State; @@ -18,17 +19,17 @@ import org.slf4j.LoggerFactory; @Getter @Setter @ToString -public abstract class SyncClosure implements Closure { +public abstract class OperateClosure implements Closure { // 状态机的统一响应 private RaftResponse response; // 代表任务状态 - private T value; + private Operate value; - public Object synclock = new Object(); + - public SyncClosure() { + public OperateClosure() { } @@ -40,9 +41,9 @@ public abstract class SyncClosure implements Closure { setResponse(response); } - public void success(final State value) { + public void success(final Operate value) { final RaftResponse response = new RaftResponse(); - response.setState(value); + response.setOperate(value); response.setSuccess(true); setResponse(response); } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java index 5cba656..78589c1 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java @@ -16,9 +16,11 @@ import com.alipay.sofa.jraft.error.RaftException; import com.alipay.sofa.jraft.error.RemotingException; import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader; import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter; +import com.yuandian.dataflow.statemachine.operate.Operate; +import com.yuandian.dataflow.statemachine.operate.Operate.OperateType; import com.yuandian.dataflow.statemachine.rpc.RaftResponse; -import com.yuandian.dataflow.statemachine.rpc.SyncWorkerStateProcessor.RequestWorkerState; -import com.yuandian.dataflow.statemachine.rpc.SyncStateProcessor.RequestState; + +import com.yuandian.dataflow.statemachine.rpc.OperateProcessor.RequestOperate; import com.yuandian.dataflow.statemachine.state.State; import com.yuandian.dataflow.statemachine.state.StateFactory; import com.yuandian.dataflow.statemachine.state.WorkerState; @@ -58,38 +60,64 @@ public class StateMachine extends StateMachineAdapter { return state; } - - - @Override @SuppressWarnings("unchecked") public void onApply(final Iterator iter) { while (iter.hasNext()) { + Operate op = null; + OperateClosure closure = null; if (iter.done() != null) { // This task is applied by this node, get value from closure to avoid additional // parsing. - var closure = (SyncClosure)iter.done(); // 只支持单一个State. 全状态机只支持一种提交 + closure = (OperateClosure)iter.done(); // 只支持单一个State. 全状态机只支持一种提交 // log.info("SyncDataClosure(done) taskState:{} leaderTerm:{} {}",this.state, this.leaderTerm, closure); - this.state = closure.getValue(); - closure.success(state); - closure.run(Status.OK()); + op = closure.getValue(); + } else { // Have to parse FetchAddRequest from this user log. final ByteBuffer data = iter.getData(); try { - state = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize( - data.array(), State.class.getName()); + op = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize( + data.array(),Operate.class.getName()); // log.info("SyncDataClosure(null) taskState:{} leaderTerm:{}", state, this.leaderTerm); - + } catch (CodecException e) { - e.printStackTrace(); + log.info("{}", e.toString()); } } + if(op != null) { + switch(op.getType()) { + case PUT: + WorkerState ws = op.getValue(); + state.getWorkers().put(ws.peerId, ws); + if(closure != null) { + closure.success(op); + closure.run(Status.OK()); + } + break; + case REMOVE: + + + if(closure != null) { + closure.success(op); + closure.run(Status.OK()); + } + break; + default: + break; + + } + } else { + + } + + + iter.next(); } } @@ -123,15 +151,16 @@ public class StateMachine extends StateMachineAdapter { var ws = state.getWorkers().get( ss.getCluster().getServerId() ); if(ws == null) { ws = new WorkerState(ss.getCluster().getServerId()); - state.getWorkers().put(ss.getCluster().getServerId(), ws); + // state.getWorkers().put(ss.getCluster().getServerId(), ws); } - ss.applyState(state, new SyncClosure() { + Operate op = new Operate(OperateType.PUT); + op.setValue(ws); + ss.applyOperate(op, new OperateClosure() { @Override public void run(Status status) { log.debug("master update workerstate: {}", status); } - }); }); @@ -174,18 +203,22 @@ public class StateMachine extends StateMachineAdapter { var ss = StateServerFactory.getStateServer(); var ws = new WorkerState(ss.getCluster().getServerId()); - var request = new RequestWorkerState(); - request.setWorkerState(ws); + log.debug("my: {} leader id {}",ss.getCluster().getServerId(), ss.getNode().getLeaderId()); - RaftResponse resp; - resp = (RaftResponse)StateServerFactory.getStateServer().getRpcClient().invokeSync(ss.getNode().getLeaderId().getEndpoint(), request, 5000); - if(resp == null) { - log.debug("{} set WorkerState is error", resp); - } - log.debug("WorkerState is {}", resp); + + var op = new Operate(OperateType.PUT); + op.setValue(ws); + + Operate.CallOperate(op, new OperateClosure() { + @Override + public void run(Status status) { + log.info("{} {}", status, this.getResponse()); + } + }); + return; - } catch (InterruptedException | RemotingException e) { - e.printStackTrace(); + } catch (Exception e) { + log.info("{}", e.toString()); } super.onStartFollowing(ctx); @@ -199,6 +232,23 @@ public class StateMachine extends StateMachineAdapter { @Override public void onStopFollowing(LeaderChangeContext ctx) { log.debug("{} {}",ctx, StateServerFactory.getCluster().getServerId()); + + var ss = StateServerFactory.getStateServer(); + var ws = new WorkerState(ss.getCluster().getServerId()); + + log.debug("my: {} leader id {}",ss.getCluster().getServerId(), ss.getNode().getLeaderId()); + + var op = new Operate(OperateType.PUT); + op.setValue(ws); + + Operate.CallOperate(op, new OperateClosure() { + @Override + public void run(Status status) { + log.info("{} {}", status, this.getResponse()); + } + }); + + super.onStopFollowing(ctx); } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java index be327a4..69ddde0 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java @@ -39,9 +39,10 @@ import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory; import com.alipay.sofa.jraft.util.BytesUtil; import com.alipay.sofa.jraft.util.Endpoint; import com.alipay.sofa.jraft.util.ThreadPoolUtil; +import com.yuandian.dataflow.statemachine.operate.Operate; import com.yuandian.dataflow.statemachine.rpc.RaftResponse; -import com.yuandian.dataflow.statemachine.rpc.SyncWorkerStateProcessor.RequestWorkerState; -import com.yuandian.dataflow.statemachine.rpc.SyncStateProcessor.RequestState; + +import com.yuandian.dataflow.statemachine.rpc.OperateProcessor.RequestOperate; import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft; import com.yuandian.dataflow.statemachine.state.State; import com.yuandian.dataflow.statemachine.state.WorkerState; @@ -150,9 +151,9 @@ public class StateServerFactory { try { cluster.getRpcServer().registerProcessor((RpcProcessor) pRaftClass.newInstance()); } catch (InstantiationException e) { - e.printStackTrace(); + log.info("{}", e.toString()); } catch (IllegalAccessException e) { - e.printStackTrace(); + log.info("{}", e.toString()); } }); node = cluster.start(); @@ -178,26 +179,24 @@ public class StateServerFactory { public void useFsmStateNotLock(Consumer dofunc) { - var state = ss.fsm.getState(); - synchronized(state) { - dofunc.accept(state); - } + + - // getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() { + getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() { - // @Override - // public void run(Status status, long index, byte[] reqCtx) { - // var state = ss.fsm.getState(); - // dofunc.accept(state); - // } + @Override + public void run(Status status, long index, byte[] reqCtx) { + var state = ss.fsm.getState(); + dofunc.accept(state); + } - // } ); + } ); } - public void applyState(State state, SyncClosure closure) { + public void applyOperate(Operate op, OperateClosure closure) { // 所有的提交都必须再leader进行 if (!ss.isLeader()) { ss.handlerNotLeaderError(closure); @@ -205,9 +204,10 @@ public class StateServerFactory { } try { - closure.setValue(state); + + closure.setValue(op); final Task task = new Task(); - task.setData(ByteBuffer.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(state))); + task.setData(ByteBuffer.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(op))); task.setDone(closure); // 确认所有数据 一致, 不需要加锁 StateServerFactory.getStateServer().getNode().apply(task); } catch (CodecException e) { @@ -218,36 +218,7 @@ public class StateServerFactory { } } - public void applyWorkerState(WorkerState state, SyncClosure closure) { - log.debug("applyWorkerState"); - if (!ss.isLeader()) { - ss.handlerNotLeaderError(closure); - return; - } - - StateServerFactory.getStateServer().useFsmStateNotLock((fsmState)->{ - var wmap = fsmState.getWorkers(); - var wstate = wmap.get(state.getPeerId()); - if(wstate == null) { - wmap.put(state.getPeerId(), state); - } - - try { - final Task task = new Task(); - closure.setValue(fsmState); - task.setData(ByteBuffer.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(fsmState))); - task.setDone(closure); - StateServerFactory.getStateServer().getNode().apply(task); // 提交数据 - } catch (CodecException e) { - String errorMsg = "Fail to encode TaskState"; - log.debug("{}:{}",errorMsg, e); - closure.failure(errorMsg, PeerId.emptyPeer()); - closure.run(new Status(RaftError.EINTERNAL, errorMsg)); - } - }); - - - } + public RaftResponse redirect() { final RaftResponse response = new RaftResponse(); @@ -261,7 +232,7 @@ public class StateServerFactory { return response; } - public void handlerNotLeaderError(final SyncClosure closure) { + public void handlerNotLeaderError(final OperateClosure closure) { closure.failure("Not leader.", redirect().getRedirect()); closure.run(new Status(RaftError.EPERM, "Not leader")); } @@ -289,7 +260,7 @@ public class StateServerFactory { public static void main(String[] args) throws InterruptedException, RemotingException { var rpcClient = new BoltRaftRpcFactory().createRpcClient(); rpcClient.init(new CliOptions()); - var resp = rpcClient.invokeSync(new Endpoint("localhost",4441), new RequestState(), 5000); + var resp = rpcClient.invokeSync(new Endpoint("localhost",4441), new RequestOperate(), 5000); log.info("{}", resp); } } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java b/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java new file mode 100644 index 0000000..d798fdf --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java @@ -0,0 +1,82 @@ +package com.yuandian.dataflow.statemachine.operate; + +import java.io.Serializable; + +import com.alipay.sofa.jraft.Status; +import com.alipay.sofa.jraft.error.RemotingException; +import com.alipay.sofa.jraft.rpc.InvokeCallback; +import com.yuandian.dataflow.projo.Response; +import com.yuandian.dataflow.statemachine.OperateClosure; +import com.yuandian.dataflow.statemachine.StateServerFactory; +import com.yuandian.dataflow.statemachine.rpc.OperateProcessor; +import com.yuandian.dataflow.statemachine.rpc.RaftResponse; +import com.yuandian.dataflow.statemachine.state.StateFactory; + + +import lombok.Data; +import lombok.var; +import lombok.extern.slf4j.Slf4j; + +/** + * 操作 + * + * @author eson + */ +@Slf4j +@Data +@var +public class Operate implements Serializable { + + public static enum OperateType { + PUT, REMOVE; + } + + private OperateType type; + private Object value; + + public Operate(OperateType t) { + this.type = t; + } + + public T getValue() { + return (T) this.value; + }; + + public void setValue(T value) { + this.value = value; + return; + }; + + public static void CallOperate(Operate op, OperateClosure closure) { + + var ss = StateServerFactory.getStateServer(); + if (StateServerFactory.getStateServer().isLeader()) { + ss.applyOperate(op, closure); + return; + } + + var request = new OperateProcessor.RequestOperate(); + request.setOperate(op); + + var leaderId = StateServerFactory.getStateServer().getNode().getLeaderId(); + try { + ss.getRpcClient().invokeAsync(leaderId.getEndpoint(), + request, new InvokeCallback() { + + @Override + public void complete(Object result, Throwable err) { + log.info("{}", result); + var resp = (RaftResponse)result; + closure.setResponse(resp); + closure.success(resp.getOperate()); + } + + }, 5000); + } catch (InterruptedException | RemotingException e) { + // TODO Auto-generated catch block + closure.failure("failure", null); + log.info("{}", e.toString()); + } + + } +} diff --git a/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncStateProcessor.java b/src/main/java/com/yuandian/dataflow/statemachine/rpc/OperateProcessor.java similarity index 74% rename from src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncStateProcessor.java rename to src/main/java/com/yuandian/dataflow/statemachine/rpc/OperateProcessor.java index 6bd48e8..6c96a28 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncStateProcessor.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/rpc/OperateProcessor.java @@ -17,8 +17,9 @@ import com.alipay.sofa.jraft.error.RaftError; import com.alipay.sofa.jraft.rpc.RpcContext; import com.alipay.sofa.jraft.rpc.RpcProcessor; import com.yuandian.dataflow.statemachine.StateServerFactory; -import com.yuandian.dataflow.statemachine.SyncClosure; +import com.yuandian.dataflow.statemachine.OperateClosure; import com.yuandian.dataflow.statemachine.StateServerFactory.StateServer; +import com.yuandian.dataflow.statemachine.operate.Operate; import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft; import com.yuandian.dataflow.statemachine.state.State; import com.alipay.sofa.jraft.entity.PeerId; @@ -39,7 +40,7 @@ import lombok.extern.slf4j.Slf4j; */ @Slf4j @ProcessorRaft -public class SyncStateProcessor implements RpcProcessor { +public class OperateProcessor implements RpcProcessor { /** * 同步状态时, 需要用的结构类. 新增的状态可以在 State结构里添加 @@ -51,20 +52,20 @@ public class SyncStateProcessor implements RpcProcessor closure = new SyncClosure() { + final OperateClosure closure = new OperateClosure() { @Override public void run(Status status) { rpcCtx.sendResponse(getResponse()); @@ -72,12 +73,12 @@ public class SyncStateProcessor implements RpcProcessor { - - @Getter - @Setter - @ToString - public static class RequestWorkerState implements Serializable { - - private static final long serialVersionUID = 1L; - - private WorkerState workerState; - } - - @Override - public void handleRequest(RpcContext rpcCtx, RequestWorkerState request) { - - log.info("RequestWorkerState: {}", request); - final SyncClosure closure = new SyncClosure< State>() { - @Override - public void run(Status status) { - rpcCtx.sendResponse(getResponse()); - log.info("{}", status); - } - }; - - StateServerFactory.getStateServer().applyWorkerState(request.getWorkerState(), closure); - } - - @Override - public String interest() { - return RequestWorkerState.class.getName(); - } - - - -} diff --git a/src/main/java/com/yuandian/dataflow/statemachine/state/StateFactory.java b/src/main/java/com/yuandian/dataflow/statemachine/state/StateFactory.java index c95931f..d6068a0 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/state/StateFactory.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/state/StateFactory.java @@ -20,7 +20,9 @@ import com.google.protobuf.Any; import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest; import com.yuandian.dataflow.proto.msgtype.BacktrackingFlowOuterClass; import com.yuandian.dataflow.statemachine.StateServerFactory; -import com.yuandian.dataflow.statemachine.SyncClosure; +import com.yuandian.dataflow.statemachine.operate.Operate; +import com.yuandian.dataflow.statemachine.operate.Operate.OperateType; +import com.yuandian.dataflow.statemachine.OperateClosure; import lombok.Getter; import lombok.Setter; @@ -43,9 +45,10 @@ public class StateFactory { @Getter @Setter @ToString - public static class PeerIdCap { + public static class PeerIdCap { private PeerId peer; private long cap; + public PeerIdCap(PeerId pid, long cap) { this.peer = pid; this.cap = cap; @@ -61,12 +64,10 @@ public class StateFactory { var alivePeers = StateServerFactory.getCluster().getRaftNode().listAlivePeers(); log.debug("master execute {}", StateServerFactory.getCluster().getRaftNode().listAlivePeers()); if (alivePeers != null) { - + var ss = StateServerFactory.getStateServer(); // var state = ss.getFsm().getState(); - ArrayList pclist = new ArrayList(); - ss.useFsmStateNotLock((state) -> { alivePeers.forEach((peer) -> { WorkerState ws = state.getWorkers().get(peer); @@ -74,70 +75,54 @@ public class StateFactory { var cap = 100 - ws.getTaskQueueSize(); log.debug("cap :{} peer: {}", cap, peer); if (cap <= 0) { - return ; + return; } ws.setUpdateAt(Instant.now()); ws.setTaskQueueSize(100); - var pc = new PeerIdCap(peer, cap); - pc.setCap(cap); - ; - pclist.add(pc); - } - }); - ss.applyState(state, new SyncClosure() { - @Override - public void run(Status status) { - log.info("任务队列更新成功 {}", this.getValue().getWorkers()); + var op = new Operate(OperateType.PUT); + op.setValue(ws); - pclist.forEach((peercap) -> { - - if (peercap.getCap() <= 0) { - return ; - } - - var request = new PacketsRequest(); - for (int i = 0; i < peercap.getCap(); i++) { - var p = Any.pack( - BacktrackingFlowOuterClass.BacktrackingFlow.newBuilder() - .setTableId(10086) - .build()); - request.getPackets().add(p); - } - - - try { - log.debug("rpc {}", peercap); - ss.getRpcClient().invokeAsync(peercap.peer.getEndpoint() , + var request = new PacketsRequest(); + for (int i = 0; i < cap; i++) { + var p = Any.pack( + BacktrackingFlowOuterClass.BacktrackingFlow.newBuilder() + .setTableId(10086) + .build()); + request.getPackets().add(p); + } + + // Operate.CallOperate(op, new OperateClosure() { + // @Override + // public void run(Status status) { + // // TODO Auto-generated method stub + // log.info("{}", status); + // } + // }); + + try { + ss.getRpcClient().invokeAsync(peer.getEndpoint(), request, new InvokeCallback() { + @Override public void complete(Object result, Throwable err) { - if (err != null) { - err.printStackTrace(); - } else { - log.info("{} peer result", result); - } + log.info("{}", result); } - + }, 5000); - } catch (InterruptedException | RemotingException e) { - e.printStackTrace(); - } - - - }); + } catch (InterruptedException | RemotingException e) { + log.info("error send packets {}", e.toString()); + } + } }); - }); - - // ss.applyState(state, new SyncClosure() { - // public void run(Status status) { - // log.debug("{}", status); - // }; + // public void run(Status status) { + // log.debug("{}", status); + // }; // }); } @@ -145,7 +130,7 @@ public class StateFactory { Thread.sleep(5000); } } catch (InterruptedException e) { - e.printStackTrace(); + log.info("{}", e.toString()); } } diff --git a/src/test/java/com/yuandian/dataflow/MongodbTest.java b/src/test/java/com/yuandian/dataflow/MongodbTest.java index 73e1ed9..7822bd7 100644 --- a/src/test/java/com/yuandian/dataflow/MongodbTest.java +++ b/src/test/java/com/yuandian/dataflow/MongodbTest.java @@ -5,12 +5,16 @@ import com.mongodb.MongoCredential; import com.mongodb.ServerAddress; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; + +import lombok.extern.slf4j.Slf4j; + import org.bson.Document; import java.lang.reflect.Field; import java.util.ArrayList; import java.util.List; +@Slf4j public class MongodbTest { public static void insertMsgToMongoDB(T obj) { @@ -34,7 +38,7 @@ public class MongodbTest { System.err.println("insert success"); } catch (Exception e) { - e.printStackTrace(); + log.info("{}", e.toString()); } } diff --git a/src/test/java/com/yuandian/dataflow/statemachine/StateMachineTest.java b/src/test/java/com/yuandian/dataflow/statemachine/StateMachineTest.java index a5e0146..eed9347 100644 --- a/src/test/java/com/yuandian/dataflow/statemachine/StateMachineTest.java +++ b/src/test/java/com/yuandian/dataflow/statemachine/StateMachineTest.java @@ -10,7 +10,7 @@ import com.alipay.sofa.jraft.rpc.InvokeCallback; import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory; import com.alipay.sofa.jraft.util.Endpoint; import com.yuandian.dataflow.statemachine.rpc.RaftResponse; -import com.yuandian.dataflow.statemachine.rpc.SyncStateProcessor.RequestState; +import com.yuandian.dataflow.statemachine.rpc.OperateProcessor.RequestOperate; import com.yuandian.dataflow.statemachine.state.State; import lombok.var; @@ -20,53 +20,53 @@ import lombok.extern.slf4j.Slf4j; public class StateMachineTest { @Test void testOnApply() throws InterruptedException, RemotingException { - var rpcClient = new BoltRaftRpcFactory().createRpcClient(); - rpcClient.init(new CliOptions()); + // var rpcClient = new BoltRaftRpcFactory().createRpcClient(); + // rpcClient.init(new CliOptions()); - var fstate = new State(); - var fdata = new RequestState(); - fdata.setState(fstate); + // var fstate = new State(); + // var fdata = new RequestOperate(); + // fdata.setOperate(fstate); - var leader = new Endpoint("localhost",4441); - RaftResponse resp = (RaftResponse)rpcClient.invokeSync(leader, fdata - , 5000); - log.info("{}", resp); - if( resp != null && !resp.isSuccess() ) { - leader = resp.getRedirect().getEndpoint(); - resp = (RaftResponse)rpcClient.invokeSync(resp.getRedirect().getEndpoint(), fdata - , 5000); - log.info("{}", resp); - } + // var leader = new Endpoint("localhost",4441); + // RaftResponse resp = (RaftResponse)rpcClient.invokeSync(leader, fdata + // , 5000); + // log.info("{}", resp); + // if( resp != null && !resp.isSuccess() ) { + // leader = resp.getRedirect().getEndpoint(); + // resp = (RaftResponse)rpcClient.invokeSync(resp.getRedirect().getEndpoint(), fdata + // , 5000); + // log.info("{}", resp); + // } - int i = 0 ; - while(true) { + // int i = 0 ; + // while(true) { - var state = new State(); - var request = new RequestState(); // 创建请求 - request.setState(state); // 添加请求的参数 + // var state = new State(); + // var request = new RequestOperate(); // 创建请求 + // request.setState(state); // 添加请求的参数 - var wstate = state.getWorkers(); + // var wstate = state.getWorkers(); - // state.getWorker().setPeerId( PeerId.parsePeer("localhost:2222") ); - // state.getWorker().setTaskQueueSize(i); + // // state.getWorker().setPeerId( PeerId.parsePeer("localhost:2222") ); + // // state.getWorker().setTaskQueueSize(i); - var pi = i ; - i++; - if (i >= 1000) { - break; - } - rpcClient.invokeAsync(leader, request, new InvokeCallback() { - @Override - public void complete(Object result, Throwable err) { - // ResponseSM resp = (ResponseSM)result; - log.info("{} {} {}", result, err, pi); - } + // var pi = i ; + // i++; + // if (i >= 1000) { + // break; + // } + // rpcClient.invokeAsync(leader, request, new InvokeCallback() { + // @Override + // public void complete(Object result, Throwable err) { + // // ResponseSM resp = (ResponseSM)result; + // log.info("{} {} {}", result, err, pi); + // } - @Override - public Executor executor() { - return null; - } - } , 5000); - } + // @Override + // public Executor executor() { + // return null; + // } + // } , 5000); + // } } }