状态同步 TODO: readIndex

This commit is contained in:
huangsimin 2022-07-14 18:19:45 +08:00
parent aa7749f924
commit 886a5ffe1a
13 changed files with 392 additions and 163 deletions

View File

@ -16,10 +16,11 @@ import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.option.NodeOptions;
import com.yuandian.dataflow.statemachine.SyncDataClosure;
import com.yuandian.dataflow.statemachine.StateMachine;
import com.yuandian.dataflow.statemachine.StateServer;
import com.yuandian.dataflow.statemachine.StateServerFactory;
import com.yuandian.dataflow.statemachine.rpc.SyncDataProcessor;
import lombok.var;
import lombok.extern.slf4j.Slf4j;
@ -28,26 +29,17 @@ import lombok.var;
* Hello world!
*
*/
@Slf4j
@SpringBootApplication
@SpringBootConfiguration
public class Server {
@Autowired
public static Node node;
public static SyncDataClosure done;
private static StateServer stateServer;
public static Node GetNode() {
return node;
}
public static SyncDataClosure GetDone() {
return done;
}
public static void main(String[] args) {
public static void main(String[] args) throws Exception {
String[] peers = new String[]{"localhost:4440","localhost:4441","localhost:4442"};
String[] sprPeers = new String[]{"3440","3441","3442"};
@ -56,11 +48,35 @@ public class Server {
Configuration conf = JRaftUtils.getConfiguration("localhost:4440,localhost:4441,localhost:4442");
stateServer = new StateServer(peeridstr, conf);
// StateServerFactory.my = new StateServerFactory(peeridstr, conf);
StateServerFactory.InitStateServer(peeridstr, conf);
// Thread printer = new Thread( new Runnable(){
// @Override
// public void run() {
// // TODO Auto-generated method stub
// while(true) {
// var state = StateServerFactory.getStateServer().getFsm().getTaskState();
// log.info("{}", state);
// try {
// Thread.sleep(1000);
// } catch (InterruptedException e) {
// // TODO Auto-generated catch block
// e.printStackTrace();
// }
// }
// }
// } );
// printer.start();
System.setProperty("server.port", sprPort);
var app = SpringApplication.run(Server.class, args);
app.start();
}
}

View File

@ -10,8 +10,11 @@ import com.alipay.sofa.jraft.rpc.RpcClient;
import com.alipay.sofa.jraft.rpc.impl.BoltRpcClient;
import com.yuandian.dataflow.Server;
import com.yuandian.dataflow.projo.Response;
import com.yuandian.dataflow.statemachine.StateServerFactory;
import com.yuandian.dataflow.statemachine.SyncDataClosure;
import com.yuandian.dataflow.statemachine.rpc.TaskState;
import lombok.var;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.ObjectUtils.Null;
@ -29,42 +32,32 @@ import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestParam;
import com.alipay.sofa.jraft.Node;
import com.alipay.sofa.jraft.Status;
@Slf4j
@Controller
public class TaskLog {
private static Node node = Server.GetNode();
@GetMapping(path = "/test")
public ResponseEntity<Response> Processing() {
log.info("node.isLeader {} {} node.getNodeId() {}", node.getNodeState(), node.getLeaderId(), node.getNodeId());
if(node.isLeader()) {
// Task task = new Task();
// // 处理状态机
// RaftClosure done = new RaftClosure();
// task.setData(ByteBuffer.wrap("hello".getBytes()));
// task.setDone(done);
// log.error("{} {} {}",node, node.toString(), task);
// node.apply(task);
// log.error("{}", "RaftClosure");
}
// var state = StateServerFactory.getStateServer().getFsm().getTaskState();
var c = new SyncDataClosure() {
@Override
public void run(Status status) {
log.info(getTaskState().toString());
}
};
StateServerFactory.getStateServer().readIndexState(true, c );
Response response = new Response();
response.Code = HttpStatus.OK;
response.Message = HttpStatus.OK.toString();
response.Message = "OK";
return new ResponseEntity<Response>(response, HttpStatus.OK);
}

View File

@ -20,7 +20,7 @@ public final class UsrFlowOuterClass {
/**
* <pre>
*编号19
*编号19
* </pre>
*
* <code>int32 table_id = 1;</code>
@ -501,7 +501,7 @@ public final class UsrFlowOuterClass {
private int tableId_;
/**
* <pre>
*编号19
*编号19
* </pre>
*
* <code>int32 table_id = 1;</code>
@ -1793,7 +1793,7 @@ public final class UsrFlowOuterClass {
private int tableId_ ;
/**
* <pre>
*编号19
*编号19
* </pre>
*
* <code>int32 table_id = 1;</code>
@ -1805,7 +1805,7 @@ public final class UsrFlowOuterClass {
}
/**
* <pre>
*编号19
*编号19
* </pre>
*
* <code>int32 table_id = 1;</code>
@ -1820,7 +1820,7 @@ public final class UsrFlowOuterClass {
}
/**
* <pre>
*编号19
*编号19
* </pre>
*
* <code>int32 table_id = 1;</code>

View File

@ -3,6 +3,7 @@ package com.yuandian.dataflow.statemachine;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -12,12 +13,22 @@ import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.Iterator;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.core.StateMachineAdapter;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.error.RaftException;
import com.alipay.sofa.jraft.error.RemotingException;
import com.alipay.sofa.jraft.option.CliOptions;
import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter;
import com.alipay.sofa.jraft.util.Endpoint;
import com.alipay.sofa.jraft.util.Utils;
import com.yuandian.dataflow.statemachine.rpc.SMResponse;
import com.yuandian.dataflow.statemachine.rpc.SyncData;
import com.yuandian.dataflow.statemachine.rpc.TaskState;
import lombok.var;
import lombok.extern.slf4j.Slf4j;
/**
@ -28,6 +39,7 @@ import lombok.extern.slf4j.Slf4j;
* 2018-Apr-09 4:52:31 PM
*/
@Slf4j
public class StateMachine extends StateMachineAdapter {
// private static final Logger LOG = LoggerFactory.getLogger(StateMachine.class);
@ -35,7 +47,7 @@ public class StateMachine extends StateMachineAdapter {
/**
* Counter value
*/
private final AtomicLong value = new AtomicLong(0);
private TaskState taskState = new TaskState();
/**
* Leader term
*/
@ -48,8 +60,8 @@ public class StateMachine extends StateMachineAdapter {
/**
* Returns current value.
*/
public long getValue() {
return this.value.get();
public TaskState getTaskState() {
return taskState;
}
@Override
@ -59,12 +71,22 @@ public class StateMachine extends StateMachineAdapter {
if (iter.done() != null) {
// This task is applied by this node, get value from closure to avoid additional
// parsing.
log.error("done:{}",iter.getData().toString());
var closure = (SyncDataClosure)iter.done();
taskState = closure.getTaskState();
log.info("SyncDataClosure(done) taskState:{} leaderTerm:{}",taskState, this.leaderTerm);
closure.success(taskState);
closure.run(Status.OK());
} else {
// Have to parse FetchAddRequest from this user log.
log.error("null:{}",iter.getData().toString());
// Have to parse FetchAddRequest from this user log.
final ByteBuffer data = iter.getData();
try {
taskState = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize(
data.array(), TaskState.class.getName());
log.info("SyncDataClosure(null) taskState:{} leaderTerm:{}", taskState, this.leaderTerm);
} catch (CodecException e) {
e.printStackTrace();
}
}
iter.next();
@ -100,4 +122,8 @@ public class StateMachine extends StateMachineAdapter {
super.onLeaderStop(status);
}
public static void main(String[] args) throws InterruptedException, RemotingException {
}
}

View File

@ -1,97 +0,0 @@
/**
* description
*
* @author eson
*2022年7月12日-13:36:24
*/
package com.yuandian.dataflow.statemachine;
import java.io.File;
import com.alibaba.nacos.common.remote.client.RpcClient;
import com.alipay.sofa.jraft.JRaftUtils;
import com.alipay.sofa.jraft.Node;
import com.alipay.sofa.jraft.RaftGroupService;
import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.error.RemotingException;
import com.alipay.sofa.jraft.option.CliOptions;
import com.alipay.sofa.jraft.option.NodeOptions;
import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory;
import com.alipay.sofa.jraft.util.Endpoint;
import com.yuandian.dataflow.statemachine.SyncDataClosure;
import com.yuandian.dataflow.statemachine.StateMachine;
import com.yuandian.dataflow.statemachine.rpc.SyncData;
import com.yuandian.dataflow.statemachine.rpc.SyncDataProcessor;
import lombok.var;
import lombok.extern.slf4j.Slf4j;
/**
* description
*
* @author eson
*2022年7月12日-13:36:24
*/
@Slf4j
@var
public class StateServer {
public Node node;
public RaftGroupService cluster;
public StateMachine fsm;
private String groupId = "dataflow";
public StateServer(String addr, Configuration conf) {
String[] peers = new String[]{"localhost:4440","localhost:4441","localhost:4442"};
String[] sprPeers = new String[]{"3440","3441","3442"};
// var peeridstr = peers[Integer.parseInt(serverId)];
// var sprPort = sprPeers[Integer.parseInt(args[0])];
// String groupId = "jraft";
// Configuration conf = JRaftUtils.getConfiguration("localhost:4440,localhost:4441,localhost:4442");
PeerId serverId = JRaftUtils.getPeerId(addr);
int port = serverId.getPort();
NodeOptions nodeOptions = new NodeOptions();
nodeOptions.setElectionTimeoutMs(1000);
nodeOptions.setSnapshotLogIndexMargin(3600);
nodeOptions.setInitialConf(conf);
File RaftDataFile = new File(String.format("./raftdata/%d", port) );
log.info("{}",RaftDataFile.mkdirs());
nodeOptions.setLogUri( String.format("./raftdata/%d/logs", port) );
nodeOptions.setRaftMetaUri(String.format("./raftdata/%d/rafts", port));
nodeOptions.setSnapshotUri(String.format("./raftdata/%d/snaps", port));
fsm = new StateMachine(); // 状态实例初始化
nodeOptions.setFsm(fsm);
cluster = new RaftGroupService(groupId, serverId, nodeOptions);
cluster.getRpcServer().registerProcessor(new SyncDataProcessor());
node = cluster.start();
}
public static void main(String[] args) throws InterruptedException, RemotingException {
var rpcClient = new BoltRaftRpcFactory().createRpcClient();
rpcClient.init(new CliOptions());
var resp = rpcClient.invokeSync(new Endpoint("localhost",4441), new SyncData(), 5000);
log.info("{}", resp);
// done = new RaftClosure();
// node.shutdown(done);
}
}

View File

@ -0,0 +1,198 @@
/**
* description
*
* @author eson
*2022年7月12日-13:36:24
*/
package com.yuandian.dataflow.statemachine;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import com.alipay.remoting.exception.CodecException;
import com.alipay.remoting.serialization.SerializerManager;
import com.alipay.sofa.jraft.JRaftUtils;
import com.alipay.sofa.jraft.Node;
import com.alipay.sofa.jraft.RaftGroupService;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.closure.ReadIndexClosure;
import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.entity.Task;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.error.RemotingException;
import com.alipay.sofa.jraft.option.CliOptions;
import com.alipay.sofa.jraft.option.NodeOptions;
import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory;
import com.alipay.sofa.jraft.util.BytesUtil;
import com.alipay.sofa.jraft.util.Endpoint;
import com.yuandian.dataflow.statemachine.rpc.SMResponse;
import com.yuandian.dataflow.statemachine.rpc.SyncData;
import com.yuandian.dataflow.statemachine.rpc.SyncDataProcessor;
import com.yuandian.dataflow.statemachine.rpc.TaskState;
import lombok.Getter;
import lombok.Setter;
import lombok.var;
import lombok.extern.slf4j.Slf4j;
/**
* description
*
* @author eson
*2022年7月12日-13:36:24
*/
@Slf4j
@var
public class StateServerFactory {
private static StateServer ss;
// 必须初始化
public static void InitStateServer(String peerstr, Configuration conf) throws Exception {
if(ss != null) {
throw new Exception("重复初始化 InitStateServer");
}
ss = new StateServerFactory.StateServer(peerstr, conf);
}
// 获取状态服务的对象
public static StateServer getStateServer() {
return ss;
}
@Getter
@Setter
public static class StateServer {
private Node node;
private RaftGroupService cluster;
private StateMachine fsm;
private String groupId = "dataflow";
private Executor readIndexExecutor;
public StateServer(String addr, Configuration conf) {
String[] peers = new String[]{"localhost:4440","localhost:4441","localhost:4442"};
String[] sprPeers = new String[]{"3440","3441","3442"};
// var peeridstr = peers[Integer.parseInt(serverId)];
// var sprPort = sprPeers[Integer.parseInt(args[0])];
// String groupId = "jraft";
// Configuration conf = JRaftUtils.getConfiguration("localhost:4440,localhost:4441,localhost:4442");
PeerId serverId = JRaftUtils.getPeerId(addr);
int port = serverId.getPort();
NodeOptions nodeOptions = new NodeOptions();
nodeOptions.setElectionTimeoutMs(1000);
nodeOptions.setSnapshotLogIndexMargin(3600);
nodeOptions.setInitialConf(conf);
File RaftDataFile = new File(String.format("./raftdata/%d", port) );
log.info("{}",RaftDataFile.mkdirs());
nodeOptions.setLogUri( String.format("./raftdata/%d/logs", port) );
nodeOptions.setRaftMetaUri(String.format("./raftdata/%d/rafts", port));
nodeOptions.setSnapshotUri(String.format("./raftdata/%d/snaps", port));
fsm = new StateMachine(); // 状态实例初始化
nodeOptions.setFsm(fsm);
cluster = new RaftGroupService(groupId, serverId, nodeOptions);
cluster.getRpcServer().registerProcessor(new SyncDataProcessor());
node = cluster.start();
}
public boolean isLeader() {
return this.fsm.isLeader();
}
public TaskState getTaskState() {
return this.fsm.getTaskState();
}
public void applyState(TaskState state, SyncDataClosure closure) {
if (!ss.isLeader()) {
ss.handlerNotLeaderError(closure);
return;
}
try {
closure.setTaskState(state);
final Task task = new Task();
task.setData(ByteBuffer.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(state)));
task.setDone(closure);
StateServerFactory.getStateServer().getNode().apply(task);
} catch (CodecException e) {
String errorMsg = "Fail to encode TaskState";
log.error(errorMsg, e);
closure.failure(errorMsg, PeerId.emptyPeer());
closure.run(new Status(RaftError.EINTERNAL, errorMsg));
}
}
public void readIndexState(final boolean readOnlySafe, final SyncDataClosure closure) {
if(!readOnlySafe){
closure.success(getTaskState());
closure.run(Status.OK());
return;
}
getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {
@Override
public void run(Status status, long index, byte[] reqCtx) {
if(status.isOk()){
closure.success(getTaskState());
closure.run(Status.OK());
return;
}
readIndexExecutor.execute(() -> {
if(isLeader()){
log.debug("Fail to get value with 'ReadIndex': {}, try to applying to the state machine.", status);
applyState( getTaskState(), closure);
}else {
handlerNotLeaderError(closure);
}
});
}
});
}
public SMResponse redirect() {
final SMResponse response = new SMResponse();
response.setSuccess(false);
if (this.node != null) {
final PeerId leader = this.node.getLeaderId();
if (leader != null) {
response.setRedirect(leader);
}
}
return response;
}
public void handlerNotLeaderError(final SyncDataClosure closure) {
closure.failure("Not leader.", redirect().getRedirect());
closure.run(new Status(RaftError.EPERM, "Not leader"));
}
}
public static void main(String[] args) throws InterruptedException, RemotingException {
var rpcClient = new BoltRaftRpcFactory().createRpcClient();
rpcClient.init(new CliOptions());
var resp = rpcClient.invokeSync(new Endpoint("localhost",4441), new SyncData(), 5000);
log.info("{}", resp);
}
}

View File

@ -2,6 +2,7 @@ package com.yuandian.dataflow.statemachine;
import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.entity.PeerId;
import com.yuandian.dataflow.statemachine.rpc.SMResponse;
import com.yuandian.dataflow.statemachine.rpc.TaskState;
@ -22,10 +23,10 @@ public abstract class SyncDataClosure implements Closure {
// 状态机的统一响应
private SMResponse response;
// 代表任务状态
private TaskState state;
private TaskState taskState;
protected void failure(final String errorMsg, final String redirect) {
public void failure(final String errorMsg, final PeerId redirect) {
final SMResponse response = new SMResponse();
response.setSuccess(false);
response.setMsg(errorMsg);
@ -33,7 +34,7 @@ public abstract class SyncDataClosure implements Closure {
setResponse(response);
}
protected void success(final TaskState value) {
public void success(final TaskState value) {
final SMResponse response = new SMResponse();
response.setState(value);
response.setSuccess(true);

View File

@ -6,6 +6,10 @@
*/
package com.yuandian.dataflow.statemachine.rpc;
import java.io.Serializable;
import com.alipay.sofa.jraft.entity.PeerId;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
@ -21,7 +25,9 @@ import lombok.extern.slf4j.Slf4j;
@Getter
@Setter
@ToString
public class SMResponse {
public class SMResponse implements Serializable {
private static final long serialVersionUID = 1L;
private TaskState state;
@ -30,7 +36,7 @@ public class SMResponse {
/**
* redirect peer id
*/
private String redirect;
private PeerId redirect;
private String msg;
}

View File

@ -29,6 +29,8 @@ public class SyncData implements Serializable {
private static final long serialVersionUID = 1L;
private TaskState state;
private TaskState taskState;
}

View File

@ -6,11 +6,23 @@
*/
package com.yuandian.dataflow.statemachine.rpc;
import java.nio.ByteBuffer;
import com.alipay.remoting.exception.CodecException;
import com.alipay.remoting.serialization.SerializerManager;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.entity.Task;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.rpc.RpcContext;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
import com.yuandian.dataflow.statemachine.StateServerFactory;
import com.yuandian.dataflow.statemachine.SyncDataClosure;
import com.yuandian.dataflow.statemachine.StateServerFactory.StateServer;
import com.alipay.sofa.jraft.entity.PeerId;
import org.apache.commons.lang.StringUtils;
import lombok.var;
import lombok.extern.slf4j.Slf4j;
/**
@ -24,18 +36,18 @@ public class SyncDataProcessor implements RpcProcessor<SyncData> {
@Override
public void handleRequest(RpcContext rpcCtx, SyncData request) {
log.info("{}", rpcCtx);
log.info("{}", request);
log.info("request: {}", request);
final SyncDataClosure closure = new SyncDataClosure() {
@Override
public void run(Status status) {
rpcCtx.sendResponse(getResponse());
log.info("{}", status);
}
};
StateServerFactory.getStateServer().applyState(request.getTaskState(), closure);
}
@Override
@ -43,8 +55,6 @@ public class SyncDataProcessor implements RpcProcessor<SyncData> {
return SyncData.class.getName();
}
public static void main(String[] args) {
}
}

View File

@ -12,6 +12,7 @@ import com.alipay.sofa.jraft.entity.PeerId;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import java.lang.management.ManagementFactory;
@ -28,6 +29,7 @@ import java.lang.reflect.Modifier;
@Slf4j
@Getter
@Setter
@ToString
public class TaskState implements Serializable {
private static final long serialVersionUID = -1L;
@ -36,7 +38,5 @@ public class TaskState implements Serializable {
private long taskQueueSize;
public static void main(String[] args) {
}
}

View File

@ -3,7 +3,7 @@
class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>
%d{yyyyMMdd HH:mm:ss.SSS} %-5level%thread\(%file:%line\): %msg%n
%d{yyyyMMdd HH:mm:ss.SSS} %level %thread\(%file:%line\): %msg%n
</pattern>
</encoder>
</appender>

View File

@ -0,0 +1,74 @@
package com.yuandian.dataflow.statemachine;
import java.util.concurrent.Executor;
import org.junit.jupiter.api.Test;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.error.RemotingException;
import com.alipay.sofa.jraft.option.CliOptions;
import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory;
import com.alipay.sofa.jraft.util.Endpoint;
import com.yuandian.dataflow.statemachine.rpc.SMResponse;
import com.yuandian.dataflow.statemachine.rpc.SyncData;
import com.yuandian.dataflow.statemachine.rpc.TaskState;
import lombok.var;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class StateMachineTest {
@Test
void testOnApply() throws InterruptedException, RemotingException {
var rpcClient = new BoltRaftRpcFactory().createRpcClient();
rpcClient.init(new CliOptions());
var fstate = new TaskState();
var fdata = new SyncData();
fdata.setTaskState(fstate);
var leader = new Endpoint("localhost",4441);
SMResponse resp = (SMResponse)rpcClient.invokeSync(leader, fdata
, 5000);
log.info("{}", resp);
if( resp != null && !resp.isSuccess() ) {
leader = resp.getRedirect().getEndpoint();
resp = (SMResponse)rpcClient.invokeSync(resp.getRedirect().getEndpoint(), fdata
, 5000);
log.info("{}", resp);
}
int i = 0 ;
while(true) {
var state = new TaskState();
var data = new SyncData();
data.setTaskState(state);
state.setPeerId( PeerId.parsePeer("localhost:2222") );
state.setTaskQueueSize(i);
var pi = i ;
i++;
if (i >= 1000) {
break;
}
rpcClient.invokeAsync(leader, data, new InvokeCallback() {
@Override
public void complete(Object result, Throwable err) {
// SMResponse resp = (SMResponse)result;
log.info("{} {} {}", result, err, pi);
}
@Override
public Executor executor() {
return null;
}
} , 5000);
}
}
}