TODO: 各个节点的销毁关系

This commit is contained in:
huangsimin 2022-07-19 18:26:56 +08:00
parent 9e6159ee16
commit d7cd8ed758
7 changed files with 115 additions and 39 deletions

View File

@ -38,8 +38,7 @@ public class Server {
public static void main(String[] args) throws Exception {
String[] peers = new String[]{"localhost:4440","localhost:4441","localhost:4442"};
String[] sprPeers = new String[]{"3440","3441","3442"};
@ -54,12 +53,10 @@ public class Server {
Configuration conf = JRaftUtils.getConfiguration("localhost:4440,localhost:4441,localhost:4442");
StateServerFactory.InitStateServer(peeridstr, conf);
StateServerFactory.initStateServer(peeridstr, conf);
System.setProperty("server.port", sprPort);
var app = SpringApplication.run(Server.class, args);
app.start();
}
}

View File

@ -24,26 +24,36 @@ public class TaskLog {
@GetMapping(path = "/test")
public ResponseEntity<Response> Processing() throws InterruptedException, RemotingException {
var ws = new WorkerState(new PeerId());
StateServerFactory.getStateServer().updateFsmWorkerState(ws);
final Response response = new Response();
// var ws = new WorkerState(new PeerId());
// StateServerFactory.getStateServer().updateFsmWorkerState(ws);
Response response = new Response();
StateServerFactory.getStateServer().useFsmState((fsmState)->{
log.debug(fsmState.toString() );
log.debug( StateServerFactory.getNode().getLeaderId().toString() );
response.Message = fsmState.toString();
return null;
});
response.Code = HttpStatus.OK;
return new ResponseEntity<Response>(response, HttpStatus.OK);
}
@GetMapping(path = "/test2")
public ResponseEntity<Response> MongodbTest(@RequestParam int status) {
public ResponseEntity<Response> MongodbTest() {
Response response = new Response();
StateServerFactory.getStateServer().useFsmStateAsync((fsmState)->{
log.debug("{} {}", fsmState.toString());
// response.Message = fsmState.toString();
return null;
});
return new ResponseEntity<Response>(response, HttpStatus.OK);
}
@GetMapping(path = "/test3")
public ResponseEntity<Response> RemoveLeader() {
Response response = new Response();
StateServerFactory.getNode().shutdown();
return new ResponseEntity<Response>(response, HttpStatus.OK);
}
}

View File

@ -111,7 +111,7 @@ public class StateMachine extends StateMachineAdapter {
@Override
// @SuppressWarnings("unchecked")
@SuppressWarnings("unchecked")
public void onApply(final Iterator iter) {
while (iter.hasNext()) {
@ -131,7 +131,7 @@ public class StateMachine extends StateMachineAdapter {
final ByteBuffer data = iter.getData();
try {
synchronized(state) {
state = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize(
state = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize(
data.array(), State.class.getName());
log.info("SyncDataClosure(null) taskState:{} leaderTerm:{}", state, this.leaderTerm);
}
@ -153,20 +153,17 @@ public class StateMachine extends StateMachineAdapter {
@Override
public void onError(final RaftException e) {
log.debug("Raft error: {}", e, e);
log.error("Raft error: {}", e, e);
}
@Override
public boolean onSnapshotLoad(final SnapshotReader reader) {
return true;
}
@Override
public void onLeaderStart(final long term) {
this.leaderTerm.set(term);
super.onLeaderStart(term);
try {
updateState((state)->{
var ws = new WorkerState(StateServerFactory.getStateServer().getCluster().getServerId());
@ -178,15 +175,31 @@ public class StateMachine extends StateMachineAdapter {
} catch (RemotingException e) {
e.printStackTrace();
}
return;
super.onLeaderStart(term);
}
@Override
public void onLeaderStop(final Status status) {
this.leaderTerm.set(-1);
super.onLeaderStop(status);
try {
updateState((state)->{
state.getWorkers().remove( StateServerFactory.getServerId() );
return state;
});
} catch (InterruptedException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
}
}
@Override
public void onStartFollowing(LeaderChangeContext ctx) {
super.onStartFollowing(ctx);
try {
var ss = StateServerFactory.getStateServer();
@ -204,15 +217,15 @@ public class StateMachine extends StateMachineAdapter {
} catch (InterruptedException | RemotingException e) {
e.printStackTrace();
}
super.onStartFollowing(ctx);
}
@Override
public void onLeaderStop(final Status status) {
this.leaderTerm.set(-1);
super.onLeaderStop(status);
}
public static void main(String[] args) throws InterruptedException, RemotingException {
}

View File

@ -58,13 +58,28 @@ public class StateServerFactory {
private static StateServer ss;
public static void InitStateServer(String peerstr, Configuration conf) throws Exception {
public static void initStateServer(String peerstr, Configuration conf) throws Exception {
if(ss != null) {
throw new Exception("重复初始化 InitStateServer");
}
ss = new StateServerFactory.StateServer(peerstr, conf);
log.debug("init peerid {}", ss.node.getNodeId().getPeerId());
}
public static PeerId getServerId() {
return ss.getCluster().getServerId();
}
public static Node getNode() {
return ss.getNode() ;
}
public static RpcClient getRpcClient() {
return ss.getRpcClient();
}
public static RaftGroupService getCluster() {
return ss.getCluster();
}
// 获取状态服务的对象
@ -136,11 +151,11 @@ public class StateServerFactory {
SyncClosure<State> closure = new SyncClosure<State>() {
@Override
public void run(Status status) {
}
};
getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {
@Override
public void run(Status status, long index, byte[] reqCtx) {
@ -167,7 +182,7 @@ public class StateServerFactory {
});
synchronized(dofunc) {
log.debug("dofunc notify");
// log.debug("dofunc notify {}", getNode());
dofunc.notify();
}
}
@ -175,9 +190,9 @@ public class StateServerFactory {
try {
synchronized(dofunc) {
log.debug("dofunc wait");
// log.debug("dofunc wait");
dofunc.wait(5000);
log.debug("dofunc unwait");
// log.debug("dofunc unwait");
}
} catch (InterruptedException e) {
e.printStackTrace();
@ -186,6 +201,43 @@ public class StateServerFactory {
}
public void useFsmStateAsync(Function<State, Void> dofunc) {
SyncClosure<State> closure = new SyncClosure<State>() {
@Override
public void run(Status status) {
dofunc.apply(this.getValue());
}
};
getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {
@Override
public void run(Status status, long index, byte[] reqCtx) {
getFsm().useState((fsmState)->{
if(status.isOk()){
closure.setValue(fsmState);
closure.success(fsmState);
closure.run(Status.OK());
return null;
}
readIndexExecutor.execute(() -> {
if(isLeader()){
log.info("Fail to get value with 'ReadIndex': {}, try to applying to the state machine.", getNode());
applyState(fsmState, closure);
}else {
handlerNotLeaderError(closure);
}
});
return null;
});
}
});
return ;
}
/**
* 同步更新 WorkerState
* @param dofunc
@ -194,6 +246,7 @@ public class StateServerFactory {
*/
public void updateFsmWorkerState(WorkerState ws) throws InterruptedException, RemotingException {
// leader就直接提交
if(isLeader()) {
var closure = new SyncClosure<State>() {
@Override
@ -208,6 +261,7 @@ public class StateServerFactory {
}
try {
// 非leader就 rpc请求
var ss = StateServerFactory.getStateServer();
var request = new RequestCondition();
request.setWorkerState(ws);

View File

@ -6,11 +6,11 @@
%d{yyyyMMdd HH:mm:ss.SSS} %level %thread\(%file:%line\): %msg%n
</pattern>
</encoder>
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<!-- <filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>DEBUG</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
</filter> -->
</appender>

View File

@ -41,6 +41,7 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class AppTest {
@FunctionalInterface
public interface FuncReturn {
public float Execute();

View File

@ -1,6 +1,6 @@
screen -S raft-0 -X quit
screen -S raft-1 -X quit
screen -S raft-2 -X quit
screen -S raft-2 -X quit
sleep 1
@ -8,6 +8,7 @@ screen -dmS raft-0 -L java -jar target/dataflow-1.0.0-SNAPSHOT.jar 0
screen -dmS raft-1 -L java -jar target/dataflow-1.0.0-SNAPSHOT.jar 1
screen -dmS raft-2 -L java -jar target/dataflow-1.0.0-SNAPSHOT.jar 2
sleep 1
screen -S raft-0 -X logfile flush 0
screen -S raft-1 -X logfile flush 0