diff --git a/src/main/java/com/yuandian/dataflow/Server.java b/src/main/java/com/yuandian/dataflow/Server.java index b59fd1e..e1fbba3 100644 --- a/src/main/java/com/yuandian/dataflow/Server.java +++ b/src/main/java/com/yuandian/dataflow/Server.java @@ -8,6 +8,7 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringBootConfiguration; import org.springframework.boot.autoconfigure.SpringBootApplication; +import com.alipay.remoting.serialization.SerializerManager; import com.alipay.sofa.jraft.JRaftUtils; import com.alipay.sofa.jraft.Node; import com.alipay.sofa.jraft.RaftGroupService; @@ -37,14 +38,21 @@ public class Server { public static void main(String[] args) throws Exception { + + String[] peers = new String[]{"localhost:4440","localhost:4441","localhost:4442"}; String[] sprPeers = new String[]{"3440","3441","3442"}; + var peeridstr = peers[ Integer.parseInt(args[0] )]; var sprPort = sprPeers[Integer.parseInt(args[0] )]; + // var peeridstr = peers[2]; + // var sprPort = sprPeers[2]; + + Configuration conf = JRaftUtils.getConfiguration("localhost:4440,localhost:4441,localhost:4442"); - // StateServerFactory.my = new StateServerFactory(peeridstr, conf); + StateServerFactory.InitStateServer(peeridstr, conf); // Thread printer = new Thread( new Runnable(){ @@ -53,7 +61,7 @@ public class Server { // public void run() { // // TODO Auto-generated method stub // while(true) { - // var state = StateServerFactory.getStateServer().getFsm().getTaskState(); + // var state = StateServerFactory.getStateServer().getFsm().getState(); // log.info("{}", state); // try { // Thread.sleep(1000); diff --git a/src/main/java/com/yuandian/dataflow/controller/TaskLog.java b/src/main/java/com/yuandian/dataflow/controller/TaskLog.java index 78c9dcb..59ddb33 100644 --- a/src/main/java/com/yuandian/dataflow/controller/TaskLog.java +++ b/src/main/java/com/yuandian/dataflow/controller/TaskLog.java @@ -10,7 +10,7 @@ import com.alipay.sofa.jraft.Status; import com.yuandian.dataflow.projo.Response; import com.yuandian.dataflow.statemachine.StateServerFactory; import com.yuandian.dataflow.statemachine.SyncDataClosure; -import com.yuandian.dataflow.statemachine.rpc.TaskState; +import com.yuandian.dataflow.statemachine.state.State; import lombok.var; import lombok.extern.slf4j.Slf4j; @@ -23,34 +23,26 @@ public class TaskLog { @GetMapping(path = "/test") public ResponseEntity Processing() throws InterruptedException { - - // var state = StateServerFactory.getStateServer().getFsm().getTaskState(); - - + SyncDataClosure closure = new SyncDataClosure() { @Override public void run(Status status) { - synchronized(lockObject) { - lockObject.notify(); - } + log.info(getState().toString()); } }; - var state = new TaskState(); - state.setTaskQueueSize(1); - closure.setTaskState(state); - StateServerFactory.getStateServer().readIndexState(true, closure); - - synchronized(closure.lockObject) { - closure.lockObject.wait(); - } - - + + var state = new State(); + closure.setState(state); + + log.info(StateServerFactory.getStateServer().getFsm().getState().toString() ); + // state.getWorker().setPeerId(StateServerFactory.getStateServer().getNode().getNodeId().getPeerId()); + // state.getWorker().setTaskQueueSize(1); - + StateServerFactory.getStateServer().readIndexState(true, closure); final Response response = new Response(); response.Code = HttpStatus.OK; - response.Message = closure.getTaskState().toString(); + response.Message = "ok"; return new ResponseEntity(response, HttpStatus.OK); } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java index 454c8ba..9fc88e3 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java @@ -25,8 +25,8 @@ import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter; import com.alipay.sofa.jraft.util.Endpoint; import com.alipay.sofa.jraft.util.Utils; import com.yuandian.dataflow.statemachine.rpc.SMResponse; -import com.yuandian.dataflow.statemachine.rpc.SyncData; -import com.yuandian.dataflow.statemachine.rpc.TaskState; +import com.yuandian.dataflow.statemachine.rpc.SyncDataRequest; +import com.yuandian.dataflow.statemachine.state.State; import lombok.var; import lombok.extern.slf4j.Slf4j; @@ -39,15 +39,14 @@ import lombok.extern.slf4j.Slf4j; * 2018-Apr-09 4:52:31 PM */ @Slf4j - public class StateMachine extends StateMachineAdapter { // private static final Logger LOG = LoggerFactory.getLogger(StateMachine.class); /** - * Counter value + * State value 全局使用的唯一状态 */ - private TaskState taskState = new TaskState(); + private State state = new State(); /** * Leader term */ @@ -58,10 +57,10 @@ public class StateMachine extends StateMachineAdapter { } /** - * Returns current value. + * Returns current value. 只有Get 操作状态由协议流程决定 Apply */ - public TaskState getTaskState() { - return taskState; + public State getState() { + return state; } @Override @@ -72,17 +71,17 @@ public class StateMachine extends StateMachineAdapter { // This task is applied by this node, get value from closure to avoid additional // parsing. var closure = (SyncDataClosure)iter.done(); - taskState = closure.getTaskState(); - log.info("SyncDataClosure(done) taskState:{} leaderTerm:{}",taskState, this.leaderTerm); - closure.success(taskState); + state = closure.getState(); + log.info("SyncDataClosure(done) taskState:{} leaderTerm:{}",state, this.leaderTerm); + closure.success(state); closure.run(Status.OK()); } else { // Have to parse FetchAddRequest from this user log. final ByteBuffer data = iter.getData(); try { - taskState = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize( - data.array(), TaskState.class.getName()); - log.info("SyncDataClosure(null) taskState:{} leaderTerm:{}", taskState, this.leaderTerm); + state = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize( + data.array(), State.class.getName()); + log.info("SyncDataClosure(null) taskState:{} leaderTerm:{}", state, this.leaderTerm); } catch (CodecException e) { e.printStackTrace(); } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java index 25bd435..a811f2a 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java @@ -9,7 +9,10 @@ package com.yuandian.dataflow.statemachine; import java.io.File; import java.nio.ByteBuffer; import java.util.concurrent.Executor; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import com.alipay.remoting.NamedThreadFactory; import com.alipay.remoting.exception.CodecException; import com.alipay.remoting.serialization.SerializerManager; import com.alipay.sofa.jraft.JRaftUtils; @@ -27,10 +30,12 @@ import com.alipay.sofa.jraft.option.NodeOptions; import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory; import com.alipay.sofa.jraft.util.BytesUtil; import com.alipay.sofa.jraft.util.Endpoint; +import com.alipay.sofa.jraft.util.ThreadPoolUtil; import com.yuandian.dataflow.statemachine.rpc.SMResponse; -import com.yuandian.dataflow.statemachine.rpc.SyncData; +import com.yuandian.dataflow.statemachine.rpc.SyncDataRequest; import com.yuandian.dataflow.statemachine.rpc.SyncDataProcessor; -import com.yuandian.dataflow.statemachine.rpc.TaskState; +import com.yuandian.dataflow.statemachine.state.State; +import com.yuandian.dataflow.statemachine.state.WorkerState; import lombok.Getter; import lombok.Setter; @@ -54,6 +59,27 @@ public class StateServerFactory { throw new Exception("重复初始化 InitStateServer"); } ss = new StateServerFactory.StateServer(peerstr, conf); + + + ss.readIndexState(true, new SyncDataClosure() { + @Override + public void run(Status status) { + log.info("add peerid {}", getState()); + var wstate = getState().getWorkers().get(ss.node.getNodeId().getPeerId()); + if(wstate == null) { + wstate = new WorkerState(); + getState().getWorkers().put(ss.node.getNodeId().getPeerId(), wstate); + log.info("update: {}", getState()); + ss.applyState(getState(), new SyncDataClosure() { + @Override + public void run(Status status) { + log.info("{} add workers", ss.node.getNodeId().getPeerId()); + } + } ); + } + } + + }); } // 获取状态服务的对象 @@ -70,7 +96,7 @@ public class StateServerFactory { private StateMachine fsm; private String groupId = "dataflow"; - private Executor readIndexExecutor; + private Executor readIndexExecutor = createReadIndexExecutor(); public StateServer(String addr, Configuration conf) { String[] peers = new String[]{"localhost:4440","localhost:4441","localhost:4442"}; @@ -100,31 +126,32 @@ public class StateServerFactory { fsm = new StateMachine(); // 状态实例初始化 nodeOptions.setFsm(fsm); - cluster = new RaftGroupService(groupId, serverId, nodeOptions); + cluster = new RaftGroupService(groupId, serverId, nodeOptions); cluster.getRpcServer().registerProcessor(new SyncDataProcessor()); - - - - node = cluster.start(); + + if(node.isLeader()) { + + } + } public boolean isLeader() { return this.fsm.isLeader(); } - public TaskState getTaskState() { - return this.fsm.getTaskState(); + public State getState() { + return this.fsm.getState(); } - public void applyState(TaskState state, SyncDataClosure closure) { + public void applyState(State state, SyncDataClosure closure) { if (!ss.isLeader()) { ss.handlerNotLeaderError(closure); return; } try { - closure.setTaskState(state); + closure.setState(state); final Task task = new Task(); task.setData(ByteBuffer.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(state))); task.setDone(closure); @@ -140,9 +167,8 @@ public class StateServerFactory { public void readIndexState(final boolean readOnlySafe, final SyncDataClosure closure) { if(!readOnlySafe){ - - closure.setTaskState(getTaskState()); - closure.success(getTaskState()); + closure.setState(getState()); + closure.success(getState()); closure.run(Status.OK()); return; } @@ -150,23 +176,19 @@ public class StateServerFactory { getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() { @Override public void run(Status status, long index, byte[] reqCtx) { - if(status.isOk()){ - if(closure.getTaskState() != null){ - applyState(closure.getTaskState(), closure); - } else { - closure.setTaskState(getTaskState()); - closure.success(getTaskState()); - closure.run(Status.OK()); - } - + if(status.isOk()){ + log.info("readIndex {}", getState()); + closure.setState(getState()); + closure.success(getState()); + closure.run(Status.OK()); return; } readIndexExecutor.execute(() -> { if(isLeader()){ log.info("Fail to get value with 'ReadIndex': {}, try to applying to the state machine.", status); - applyState(getTaskState(), closure); + applyState(getState(), closure); }else { handlerNotLeaderError(closure); } @@ -176,7 +198,6 @@ public class StateServerFactory { } public SMResponse redirect() { - final SMResponse response = new SMResponse(); response.setSuccess(false); if (this.node != null) { @@ -186,22 +207,35 @@ public class StateServerFactory { } } return response; - } public void handlerNotLeaderError(final SyncDataClosure closure) { closure.failure("Not leader.", redirect().getRedirect()); closure.run(new Status(RaftError.EPERM, "Not leader")); } + + + private Executor createReadIndexExecutor() { + // final StoreEngineOptions opts = new StoreEngineOptions(); + // return StoreEngineHelper.createReadIndexExecutor(opts.getReadIndexCoreThreads()); + + return ThreadPoolUtil.newBuilder() // + .poolName("CounterPool") // + .enableMetric(true) // + .coreThreads(4) // + .maximumThreads(4) // + .keepAliveSeconds(60L) // + .workQueue(new SynchronousQueue<>()) // + .threadFactory(new NamedThreadFactory("CounterService", true)) // + .rejectedHandler(new ThreadPoolExecutor.AbortPolicy()) // + .build(); + } } - public static void main(String[] args) throws InterruptedException, RemotingException { var rpcClient = new BoltRaftRpcFactory().createRpcClient(); - rpcClient.init(new CliOptions()); - - var resp = rpcClient.invokeSync(new Endpoint("localhost",4441), new SyncData(), 5000); + var resp = rpcClient.invokeSync(new Endpoint("localhost",4441), new SyncDataRequest(), 5000); log.info("{}", resp); } } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/SyncDataClosure.java b/src/main/java/com/yuandian/dataflow/statemachine/SyncDataClosure.java index 7a83439..3d0b891 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/SyncDataClosure.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/SyncDataClosure.java @@ -4,7 +4,7 @@ import com.alipay.sofa.jraft.Closure; import com.alipay.sofa.jraft.Status; import com.alipay.sofa.jraft.entity.PeerId; import com.yuandian.dataflow.statemachine.rpc.SMResponse; -import com.yuandian.dataflow.statemachine.rpc.TaskState; +import com.yuandian.dataflow.statemachine.state.State; import lombok.Getter; import lombok.Setter; @@ -23,15 +23,14 @@ public abstract class SyncDataClosure implements Closure { // 状态机的统一响应 private SMResponse response; // 代表任务状态 - private TaskState taskState; + private State state; - public Object lockObject = new Object(); + public Object locksync = new Object(); public SyncDataClosure() { } - public void failure(final String errorMsg, final PeerId redirect) { final SMResponse response = new SMResponse(); response.setSuccess(false); @@ -40,11 +39,10 @@ public abstract class SyncDataClosure implements Closure { setResponse(response); } - public void success(final TaskState value) { + public void success(final State value) { final SMResponse response = new SMResponse(); response.setState(value); response.setSuccess(true); setResponse(response); } - } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/rpc/SMResponse.java b/src/main/java/com/yuandian/dataflow/statemachine/rpc/SMResponse.java index 8a260f2..8e28092 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/rpc/SMResponse.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/rpc/SMResponse.java @@ -9,6 +9,7 @@ package com.yuandian.dataflow.statemachine.rpc; import java.io.Serializable; import com.alipay.sofa.jraft.entity.PeerId; +import com.yuandian.dataflow.statemachine.state.State; import lombok.Getter; import lombok.Setter; @@ -29,7 +30,7 @@ public class SMResponse implements Serializable { private static final long serialVersionUID = 1L; - private TaskState state; + private State state; private boolean success; diff --git a/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncDataProcessor.java b/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncDataProcessor.java index 2181cb1..0e1f4ae 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncDataProcessor.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncDataProcessor.java @@ -32,10 +32,10 @@ import lombok.extern.slf4j.Slf4j; *2022年7月12日-11:10:54 */ @Slf4j -public class SyncDataProcessor implements RpcProcessor { +public class SyncDataProcessor implements RpcProcessor { @Override - public void handleRequest(RpcContext rpcCtx, SyncData request) { + public void handleRequest(RpcContext rpcCtx, SyncDataRequest request) { log.info("request: {}", request); @@ -47,12 +47,12 @@ public class SyncDataProcessor implements RpcProcessor { } }; - StateServerFactory.getStateServer().applyState(request.getTaskState(), closure); + StateServerFactory.getStateServer().applyState(request.getState(), closure); } @Override public String interest() { - return SyncData.class.getName(); + return SyncDataRequest.class.getName(); } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncData.java b/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncDataRequest.java similarity index 65% rename from src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncData.java rename to src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncDataRequest.java index e2349e3..70e7228 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncData.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncDataRequest.java @@ -9,6 +9,7 @@ package com.yuandian.dataflow.statemachine.rpc; import java.io.Serializable; import com.alipay.sofa.jraft.Closure; +import com.yuandian.dataflow.statemachine.state.State; import lombok.Getter; import lombok.Setter; @@ -16,7 +17,7 @@ import lombok.ToString; import lombok.extern.slf4j.Slf4j; /** - * description + * 同步状态时, 需要用的结构类. 新增的状态可以在 State结构里添加 * * @author eson *2022年7月11日-16:01:07 @@ -25,12 +26,9 @@ import lombok.extern.slf4j.Slf4j; @Getter @Setter @ToString -public class SyncData implements Serializable { +public class SyncDataRequest implements Serializable { private static final long serialVersionUID = 1L; + private State state; - private TaskState taskState; - - - } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/rpc/TaskState.java b/src/main/java/com/yuandian/dataflow/statemachine/state/State.java similarity index 68% rename from src/main/java/com/yuandian/dataflow/statemachine/rpc/TaskState.java rename to src/main/java/com/yuandian/dataflow/statemachine/state/State.java index af05361..c48a18f 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/rpc/TaskState.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/state/State.java @@ -4,7 +4,7 @@ * @author eson *2022年7月13日-09:11:26 */ -package com.yuandian.dataflow.statemachine.rpc; +package com.yuandian.dataflow.statemachine.state; import java.io.Serializable; @@ -19,9 +19,10 @@ import java.lang.management.ManagementFactory; import java.lang.management.OperatingSystemMXBean; import java.lang.reflect.Method; import java.lang.reflect.Modifier; +import java.util.HashMap; /** - * 代表任务状态 暂时全局使用这个结构 + * 代表任务状态 暂时全局使用这个结构. 添加新增状态 * * @author eson *2022年7月13日-09:11:26 @@ -30,13 +31,9 @@ import java.lang.reflect.Modifier; @Getter @Setter @ToString -public class TaskState implements Serializable { +public class State implements Serializable { + private static final long serialVersionUID = -1L; - // 节点的对应peerID - private PeerId peerId; - private long taskQueueSize; - - - + private HashMap workers = new HashMap<>(); } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/state/WorkerState.java b/src/main/java/com/yuandian/dataflow/statemachine/state/WorkerState.java new file mode 100644 index 0000000..bc7e058 --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/statemachine/state/WorkerState.java @@ -0,0 +1,32 @@ +/** + * description + * + * @author eson + *2022年7月15日-10:04:00 + */ +package com.yuandian.dataflow.statemachine.state; + +import java.io.Serializable; + +import com.alipay.sofa.jraft.entity.PeerId; + +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + +/** + * description + * + * @author eson + *2022年7月15日-10:04:00 + */ +@Getter +@Setter +@ToString +public class WorkerState implements Serializable { + + private static final long serialVersionUID = -1L; + // 节点的对应peerID + public PeerId peerId; + public long taskQueueSize; +} diff --git a/src/test/java/com/yuandian/dataflow/statemachine/StateMachineTest.java b/src/test/java/com/yuandian/dataflow/statemachine/StateMachineTest.java index b8c37b9..1384ebe 100644 --- a/src/test/java/com/yuandian/dataflow/statemachine/StateMachineTest.java +++ b/src/test/java/com/yuandian/dataflow/statemachine/StateMachineTest.java @@ -11,8 +11,8 @@ import com.alipay.sofa.jraft.rpc.InvokeCallback; import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory; import com.alipay.sofa.jraft.util.Endpoint; import com.yuandian.dataflow.statemachine.rpc.SMResponse; -import com.yuandian.dataflow.statemachine.rpc.SyncData; -import com.yuandian.dataflow.statemachine.rpc.TaskState; +import com.yuandian.dataflow.statemachine.rpc.SyncDataRequest; +import com.yuandian.dataflow.statemachine.state.State; import lombok.var; import lombok.extern.slf4j.Slf4j; @@ -25,9 +25,9 @@ public class StateMachineTest { rpcClient.init(new CliOptions()); - var fstate = new TaskState(); - var fdata = new SyncData(); - fdata.setTaskState(fstate); + var fstate = new State(); + var fdata = new SyncDataRequest(); + fdata.setState(fstate); var leader = new Endpoint("localhost",4441); SMResponse resp = (SMResponse)rpcClient.invokeSync(leader, fdata @@ -44,20 +44,21 @@ public class StateMachineTest { int i = 0 ; while(true) { - var state = new TaskState(); - var data = new SyncData(); - data.setTaskState(state); + var state = new State(); + var request = new SyncDataRequest(); // 创建请求 + request.setState(state); // 添加请求的参数 - state.setPeerId( PeerId.parsePeer("localhost:2222") ); + var wstate = state.getWorkers(); - state.setTaskQueueSize(i); + // state.getWorker().setPeerId( PeerId.parsePeer("localhost:2222") ); + // state.getWorker().setTaskQueueSize(i); var pi = i ; i++; if (i >= 1000) { break; } - rpcClient.invokeAsync(leader, data, new InvokeCallback() { + rpcClient.invokeAsync(leader, request, new InvokeCallback() { @Override public void complete(Object result, Throwable err) { // SMResponse resp = (SMResponse)result; diff --git a/start.sh b/start.sh index 5997f9d..66d3c5d 100755 --- a/start.sh +++ b/start.sh @@ -6,4 +6,4 @@ sleep 2 screen -dmS raft-0 -L java -jar target/dataflow-1.0.0-SNAPSHOT.jar 0 screen -dmS raft-1 -L java -jar target/dataflow-1.0.0-SNAPSHOT.jar 1 -screen -dmS raft-2 -L java -jar target/dataflow-1.0.0-SNAPSHOT.jar 2 +screen -dmS raft-2 -L java -jar target/dataflow-1.0.0-SNAPSHOT.jar 2 \ No newline at end of file