todo: next readIndex

This commit is contained in:
huangsimin 2022-07-15 18:42:35 +08:00
parent dce906a7eb
commit 5a50ce22ef
12 changed files with 165 additions and 105 deletions

View File

@ -8,6 +8,7 @@ import org.springframework.boot.SpringApplication;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import com.alipay.remoting.serialization.SerializerManager;
import com.alipay.sofa.jraft.JRaftUtils;
import com.alipay.sofa.jraft.Node;
import com.alipay.sofa.jraft.RaftGroupService;
@ -37,14 +38,21 @@ public class Server {
public static void main(String[] args) throws Exception {
String[] peers = new String[]{"localhost:4440","localhost:4441","localhost:4442"};
String[] sprPeers = new String[]{"3440","3441","3442"};
var peeridstr = peers[ Integer.parseInt(args[0] )];
var sprPort = sprPeers[Integer.parseInt(args[0] )];
// var peeridstr = peers[2];
// var sprPort = sprPeers[2];
Configuration conf = JRaftUtils.getConfiguration("localhost:4440,localhost:4441,localhost:4442");
// StateServerFactory.my = new StateServerFactory(peeridstr, conf);
StateServerFactory.InitStateServer(peeridstr, conf);
// Thread printer = new Thread( new Runnable(){
@ -53,7 +61,7 @@ public class Server {
// public void run() {
// // TODO Auto-generated method stub
// while(true) {
// var state = StateServerFactory.getStateServer().getFsm().getTaskState();
// var state = StateServerFactory.getStateServer().getFsm().getState();
// log.info("{}", state);
// try {
// Thread.sleep(1000);

View File

@ -10,7 +10,7 @@ import com.alipay.sofa.jraft.Status;
import com.yuandian.dataflow.projo.Response;
import com.yuandian.dataflow.statemachine.StateServerFactory;
import com.yuandian.dataflow.statemachine.SyncDataClosure;
import com.yuandian.dataflow.statemachine.rpc.TaskState;
import com.yuandian.dataflow.statemachine.state.State;
import lombok.var;
import lombok.extern.slf4j.Slf4j;
@ -23,34 +23,26 @@ public class TaskLog {
@GetMapping(path = "/test")
public ResponseEntity<Response> Processing() throws InterruptedException {
// var state = StateServerFactory.getStateServer().getFsm().getTaskState();
SyncDataClosure closure = new SyncDataClosure() {
@Override
public void run(Status status) {
synchronized(lockObject) {
lockObject.notify();
}
log.info(getState().toString());
}
};
var state = new TaskState();
state.setTaskQueueSize(1);
closure.setTaskState(state);
StateServerFactory.getStateServer().readIndexState(true, closure);
synchronized(closure.lockObject) {
closure.lockObject.wait();
}
var state = new State();
closure.setState(state);
log.info(StateServerFactory.getStateServer().getFsm().getState().toString() );
// state.getWorker().setPeerId(StateServerFactory.getStateServer().getNode().getNodeId().getPeerId());
// state.getWorker().setTaskQueueSize(1);
StateServerFactory.getStateServer().readIndexState(true, closure);
final Response response = new Response();
response.Code = HttpStatus.OK;
response.Message = closure.getTaskState().toString();
response.Message = "ok";
return new ResponseEntity<Response>(response, HttpStatus.OK);
}

View File

@ -25,8 +25,8 @@ import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter;
import com.alipay.sofa.jraft.util.Endpoint;
import com.alipay.sofa.jraft.util.Utils;
import com.yuandian.dataflow.statemachine.rpc.SMResponse;
import com.yuandian.dataflow.statemachine.rpc.SyncData;
import com.yuandian.dataflow.statemachine.rpc.TaskState;
import com.yuandian.dataflow.statemachine.rpc.SyncDataRequest;
import com.yuandian.dataflow.statemachine.state.State;
import lombok.var;
import lombok.extern.slf4j.Slf4j;
@ -39,15 +39,14 @@ import lombok.extern.slf4j.Slf4j;
* 2018-Apr-09 4:52:31 PM
*/
@Slf4j
public class StateMachine extends StateMachineAdapter {
// private static final Logger LOG = LoggerFactory.getLogger(StateMachine.class);
/**
* Counter value
* State value 全局使用的唯一状态
*/
private TaskState taskState = new TaskState();
private State state = new State();
/**
* Leader term
*/
@ -58,10 +57,10 @@ public class StateMachine extends StateMachineAdapter {
}
/**
* Returns current value.
* Returns current value. 只有Get 操作状态由协议流程决定 Apply
*/
public TaskState getTaskState() {
return taskState;
public State getState() {
return state;
}
@Override
@ -72,17 +71,17 @@ public class StateMachine extends StateMachineAdapter {
// This task is applied by this node, get value from closure to avoid additional
// parsing.
var closure = (SyncDataClosure)iter.done();
taskState = closure.getTaskState();
log.info("SyncDataClosure(done) taskState:{} leaderTerm:{}",taskState, this.leaderTerm);
closure.success(taskState);
state = closure.getState();
log.info("SyncDataClosure(done) taskState:{} leaderTerm:{}",state, this.leaderTerm);
closure.success(state);
closure.run(Status.OK());
} else {
// Have to parse FetchAddRequest from this user log.
final ByteBuffer data = iter.getData();
try {
taskState = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize(
data.array(), TaskState.class.getName());
log.info("SyncDataClosure(null) taskState:{} leaderTerm:{}", taskState, this.leaderTerm);
state = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize(
data.array(), State.class.getName());
log.info("SyncDataClosure(null) taskState:{} leaderTerm:{}", state, this.leaderTerm);
} catch (CodecException e) {
e.printStackTrace();
}

View File

@ -9,7 +9,10 @@ package com.yuandian.dataflow.statemachine;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import com.alipay.remoting.NamedThreadFactory;
import com.alipay.remoting.exception.CodecException;
import com.alipay.remoting.serialization.SerializerManager;
import com.alipay.sofa.jraft.JRaftUtils;
@ -27,10 +30,12 @@ import com.alipay.sofa.jraft.option.NodeOptions;
import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory;
import com.alipay.sofa.jraft.util.BytesUtil;
import com.alipay.sofa.jraft.util.Endpoint;
import com.alipay.sofa.jraft.util.ThreadPoolUtil;
import com.yuandian.dataflow.statemachine.rpc.SMResponse;
import com.yuandian.dataflow.statemachine.rpc.SyncData;
import com.yuandian.dataflow.statemachine.rpc.SyncDataRequest;
import com.yuandian.dataflow.statemachine.rpc.SyncDataProcessor;
import com.yuandian.dataflow.statemachine.rpc.TaskState;
import com.yuandian.dataflow.statemachine.state.State;
import com.yuandian.dataflow.statemachine.state.WorkerState;
import lombok.Getter;
import lombok.Setter;
@ -54,6 +59,27 @@ public class StateServerFactory {
throw new Exception("重复初始化 InitStateServer");
}
ss = new StateServerFactory.StateServer(peerstr, conf);
ss.readIndexState(true, new SyncDataClosure() {
@Override
public void run(Status status) {
log.info("add peerid {}", getState());
var wstate = getState().getWorkers().get(ss.node.getNodeId().getPeerId());
if(wstate == null) {
wstate = new WorkerState();
getState().getWorkers().put(ss.node.getNodeId().getPeerId(), wstate);
log.info("update: {}", getState());
ss.applyState(getState(), new SyncDataClosure() {
@Override
public void run(Status status) {
log.info("{} add workers", ss.node.getNodeId().getPeerId());
}
} );
}
}
});
}
// 获取状态服务的对象
@ -70,7 +96,7 @@ public class StateServerFactory {
private StateMachine fsm;
private String groupId = "dataflow";
private Executor readIndexExecutor;
private Executor readIndexExecutor = createReadIndexExecutor();
public StateServer(String addr, Configuration conf) {
String[] peers = new String[]{"localhost:4440","localhost:4441","localhost:4442"};
@ -100,31 +126,32 @@ public class StateServerFactory {
fsm = new StateMachine(); // 状态实例初始化
nodeOptions.setFsm(fsm);
cluster = new RaftGroupService(groupId, serverId, nodeOptions);
cluster = new RaftGroupService(groupId, serverId, nodeOptions);
cluster.getRpcServer().registerProcessor(new SyncDataProcessor());
node = cluster.start();
if(node.isLeader()) {
}
}
public boolean isLeader() {
return this.fsm.isLeader();
}
public TaskState getTaskState() {
return this.fsm.getTaskState();
public State getState() {
return this.fsm.getState();
}
public void applyState(TaskState state, SyncDataClosure closure) {
public void applyState(State state, SyncDataClosure closure) {
if (!ss.isLeader()) {
ss.handlerNotLeaderError(closure);
return;
}
try {
closure.setTaskState(state);
closure.setState(state);
final Task task = new Task();
task.setData(ByteBuffer.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(state)));
task.setDone(closure);
@ -140,9 +167,8 @@ public class StateServerFactory {
public void readIndexState(final boolean readOnlySafe, final SyncDataClosure closure) {
if(!readOnlySafe){
closure.setTaskState(getTaskState());
closure.success(getTaskState());
closure.setState(getState());
closure.success(getState());
closure.run(Status.OK());
return;
}
@ -150,23 +176,19 @@ public class StateServerFactory {
getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {
@Override
public void run(Status status, long index, byte[] reqCtx) {
if(status.isOk()){
if(closure.getTaskState() != null){
applyState(closure.getTaskState(), closure);
} else {
closure.setTaskState(getTaskState());
closure.success(getTaskState());
closure.run(Status.OK());
}
if(status.isOk()){
log.info("readIndex {}", getState());
closure.setState(getState());
closure.success(getState());
closure.run(Status.OK());
return;
}
readIndexExecutor.execute(() -> {
if(isLeader()){
log.info("Fail to get value with 'ReadIndex': {}, try to applying to the state machine.", status);
applyState(getTaskState(), closure);
applyState(getState(), closure);
}else {
handlerNotLeaderError(closure);
}
@ -176,7 +198,6 @@ public class StateServerFactory {
}
public SMResponse redirect() {
final SMResponse response = new SMResponse();
response.setSuccess(false);
if (this.node != null) {
@ -186,22 +207,35 @@ public class StateServerFactory {
}
}
return response;
}
public void handlerNotLeaderError(final SyncDataClosure closure) {
closure.failure("Not leader.", redirect().getRedirect());
closure.run(new Status(RaftError.EPERM, "Not leader"));
}
private Executor createReadIndexExecutor() {
// final StoreEngineOptions opts = new StoreEngineOptions();
// return StoreEngineHelper.createReadIndexExecutor(opts.getReadIndexCoreThreads());
return ThreadPoolUtil.newBuilder() //
.poolName("CounterPool") //
.enableMetric(true) //
.coreThreads(4) //
.maximumThreads(4) //
.keepAliveSeconds(60L) //
.workQueue(new SynchronousQueue<>()) //
.threadFactory(new NamedThreadFactory("CounterService", true)) //
.rejectedHandler(new ThreadPoolExecutor.AbortPolicy()) //
.build();
}
}
public static void main(String[] args) throws InterruptedException, RemotingException {
var rpcClient = new BoltRaftRpcFactory().createRpcClient();
rpcClient.init(new CliOptions());
var resp = rpcClient.invokeSync(new Endpoint("localhost",4441), new SyncData(), 5000);
var resp = rpcClient.invokeSync(new Endpoint("localhost",4441), new SyncDataRequest(), 5000);
log.info("{}", resp);
}
}

View File

@ -4,7 +4,7 @@ import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.entity.PeerId;
import com.yuandian.dataflow.statemachine.rpc.SMResponse;
import com.yuandian.dataflow.statemachine.rpc.TaskState;
import com.yuandian.dataflow.statemachine.state.State;
import lombok.Getter;
import lombok.Setter;
@ -23,15 +23,14 @@ public abstract class SyncDataClosure implements Closure {
// 状态机的统一响应
private SMResponse response;
// 代表任务状态
private TaskState taskState;
private State state;
public Object lockObject = new Object();
public Object locksync = new Object();
public SyncDataClosure() {
}
public void failure(final String errorMsg, final PeerId redirect) {
final SMResponse response = new SMResponse();
response.setSuccess(false);
@ -40,11 +39,10 @@ public abstract class SyncDataClosure implements Closure {
setResponse(response);
}
public void success(final TaskState value) {
public void success(final State value) {
final SMResponse response = new SMResponse();
response.setState(value);
response.setSuccess(true);
setResponse(response);
}
}

View File

@ -9,6 +9,7 @@ package com.yuandian.dataflow.statemachine.rpc;
import java.io.Serializable;
import com.alipay.sofa.jraft.entity.PeerId;
import com.yuandian.dataflow.statemachine.state.State;
import lombok.Getter;
import lombok.Setter;
@ -29,7 +30,7 @@ public class SMResponse implements Serializable {
private static final long serialVersionUID = 1L;
private TaskState state;
private State state;
private boolean success;

View File

@ -32,10 +32,10 @@ import lombok.extern.slf4j.Slf4j;
*2022年7月12日-11:10:54
*/
@Slf4j
public class SyncDataProcessor implements RpcProcessor<SyncData> {
public class SyncDataProcessor implements RpcProcessor<SyncDataRequest> {
@Override
public void handleRequest(RpcContext rpcCtx, SyncData request) {
public void handleRequest(RpcContext rpcCtx, SyncDataRequest request) {
log.info("request: {}", request);
@ -47,12 +47,12 @@ public class SyncDataProcessor implements RpcProcessor<SyncData> {
}
};
StateServerFactory.getStateServer().applyState(request.getTaskState(), closure);
StateServerFactory.getStateServer().applyState(request.getState(), closure);
}
@Override
public String interest() {
return SyncData.class.getName();
return SyncDataRequest.class.getName();
}

View File

@ -9,6 +9,7 @@ package com.yuandian.dataflow.statemachine.rpc;
import java.io.Serializable;
import com.alipay.sofa.jraft.Closure;
import com.yuandian.dataflow.statemachine.state.State;
import lombok.Getter;
import lombok.Setter;
@ -16,7 +17,7 @@ import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
/**
* description
* 同步状态时, 需要用的结构类. 新增的状态可以在 State结构里添加
*
* @author eson
*2022年7月11日-16:01:07
@ -25,12 +26,9 @@ import lombok.extern.slf4j.Slf4j;
@Getter
@Setter
@ToString
public class SyncData implements Serializable {
public class SyncDataRequest implements Serializable {
private static final long serialVersionUID = 1L;
private State state;
private TaskState taskState;
}

View File

@ -4,7 +4,7 @@
* @author eson
*2022年7月13日-09:11:26
*/
package com.yuandian.dataflow.statemachine.rpc;
package com.yuandian.dataflow.statemachine.state;
import java.io.Serializable;
@ -19,9 +19,10 @@ import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.HashMap;
/**
* 代表任务状态 暂时全局使用这个结构
* 代表任务状态 暂时全局使用这个结构. 添加新增状态
*
* @author eson
*2022年7月13日-09:11:26
@ -30,13 +31,9 @@ import java.lang.reflect.Modifier;
@Getter
@Setter
@ToString
public class TaskState implements Serializable {
public class State implements Serializable {
private static final long serialVersionUID = -1L;
// 节点的对应peerID
private PeerId peerId;
private long taskQueueSize;
private HashMap<PeerId,WorkerState> workers = new HashMap<>();
}

View File

@ -0,0 +1,32 @@
/**
* description
*
* @author eson
*2022年7月15日-10:04:00
*/
package com.yuandian.dataflow.statemachine.state;
import java.io.Serializable;
import com.alipay.sofa.jraft.entity.PeerId;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
/**
* description
*
* @author eson
*2022年7月15日-10:04:00
*/
@Getter
@Setter
@ToString
public class WorkerState implements Serializable {
private static final long serialVersionUID = -1L;
// 节点的对应peerID
public PeerId peerId;
public long taskQueueSize;
}

View File

@ -11,8 +11,8 @@ import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory;
import com.alipay.sofa.jraft.util.Endpoint;
import com.yuandian.dataflow.statemachine.rpc.SMResponse;
import com.yuandian.dataflow.statemachine.rpc.SyncData;
import com.yuandian.dataflow.statemachine.rpc.TaskState;
import com.yuandian.dataflow.statemachine.rpc.SyncDataRequest;
import com.yuandian.dataflow.statemachine.state.State;
import lombok.var;
import lombok.extern.slf4j.Slf4j;
@ -25,9 +25,9 @@ public class StateMachineTest {
rpcClient.init(new CliOptions());
var fstate = new TaskState();
var fdata = new SyncData();
fdata.setTaskState(fstate);
var fstate = new State();
var fdata = new SyncDataRequest();
fdata.setState(fstate);
var leader = new Endpoint("localhost",4441);
SMResponse resp = (SMResponse)rpcClient.invokeSync(leader, fdata
@ -44,20 +44,21 @@ public class StateMachineTest {
int i = 0 ;
while(true) {
var state = new TaskState();
var data = new SyncData();
data.setTaskState(state);
var state = new State();
var request = new SyncDataRequest(); // 创建请求
request.setState(state); // 添加请求的参数
state.setPeerId( PeerId.parsePeer("localhost:2222") );
var wstate = state.getWorkers();
state.setTaskQueueSize(i);
// state.getWorker().setPeerId( PeerId.parsePeer("localhost:2222") );
// state.getWorker().setTaskQueueSize(i);
var pi = i ;
i++;
if (i >= 1000) {
break;
}
rpcClient.invokeAsync(leader, data, new InvokeCallback() {
rpcClient.invokeAsync(leader, request, new InvokeCallback() {
@Override
public void complete(Object result, Throwable err) {
// SMResponse resp = (SMResponse)result;

View File

@ -6,4 +6,4 @@ sleep 2
screen -dmS raft-0 -L java -jar target/dataflow-1.0.0-SNAPSHOT.jar 0
screen -dmS raft-1 -L java -jar target/dataflow-1.0.0-SNAPSHOT.jar 1
screen -dmS raft-2 -L java -jar target/dataflow-1.0.0-SNAPSHOT.jar 2
screen -dmS raft-2 -L java -jar target/dataflow-1.0.0-SNAPSHOT.jar 2