完成同步更新的api

This commit is contained in:
huangsimin 2022-07-19 12:06:03 +08:00
parent 6f743686a1
commit 9e6159ee16
5 changed files with 123 additions and 79 deletions

View File

@ -25,25 +25,18 @@ public class TaskLog {
@GetMapping(path = "/test")
public ResponseEntity<Response> Processing() throws InterruptedException, RemotingException {
var ws = new WorkerState(new PeerId());
StateServerFactory.getStateServer().updateFsmWorkerState(ws);
final Response response = new Response();
StateServerFactory.getStateServer().useFsmState((fsmState)->{
log.debug(fsmState.toString() );
response.Message = fsmState.toString();
return null;
});
// state.getWorker().setPeerId(StateServerFactory.getStateServer().getNode().getNodeId().getPeerId());
// state.getWorker().setTaskQueueSize(1);
StateServerFactory.getStateServer().updateFsmState((fsmState)->{
log.debug(fsmState.toString() );
fsmState.getWorkers().put(new PeerId(), new WorkerState());
return fsmState;
});
final Response response = new Response();
response.Code = HttpStatus.OK;
response.Message = "ok";
return new ResponseEntity<Response>(response, HttpStatus.OK);
}

View File

@ -167,16 +167,19 @@ public class StateMachine extends StateMachineAdapter {
this.leaderTerm.set(term);
super.onLeaderStart(term);
var ss = StateServerFactory.getStateServer();
var ws = new WorkerState();
ws.setPeerId(ss.getCluster().getServerId());
final SyncClosure<State> closure = new SyncClosure< State>() {
@Override
public void run(Status status) {
log.debug("leader set WorkerState {} ", status);
}
};
ss.applyWorkerState(ws, closure);
try {
updateState((state)->{
var ws = new WorkerState(StateServerFactory.getStateServer().getCluster().getServerId());
state.getWorkers().put(ws.peerId, ws);
return state;
});
} catch (InterruptedException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
}
return;
}
@ -184,10 +187,10 @@ public class StateMachine extends StateMachineAdapter {
public void onStartFollowing(LeaderChangeContext ctx) {
super.onStartFollowing(ctx);
try {
var ss = StateServerFactory.getStateServer();
var ws = new WorkerState();
ws.setPeerId(ss.getCluster().getServerId());
var ws = new WorkerState(ss.getCluster().getServerId());
var request = new RequestCondition();
request.setWorkerState(ws);
log.debug("my: {} leader id {}",ss.getCluster().getServerId(), ss.getNode().getLeaderId());

View File

@ -33,6 +33,7 @@ import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory;
import com.alipay.sofa.jraft.util.BytesUtil;
import com.alipay.sofa.jraft.util.Endpoint;
import com.alipay.sofa.jraft.util.ThreadPoolUtil;
import com.yuandian.dataflow.statemachine.rpc.RequestCondition;
import com.yuandian.dataflow.statemachine.rpc.RequestState;
import com.yuandian.dataflow.statemachine.rpc.ResponseSM;
import com.yuandian.dataflow.statemachine.rpc.SyncConditionProcessor;
@ -126,9 +127,12 @@ public class StateServerFactory {
}
/**
* 同步 可以使用follow使用, 但是可能延迟于leader. 只读
* @param dofunc
*/
public void useFsmState(Function<State, Void> dofunc) {
SyncClosure<State> closure = new SyncClosure<State>() {
@Override
public void run(Status status) {
@ -136,17 +140,18 @@ public class StateServerFactory {
}
};
getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {
@Override
public void run(Status status, long index, byte[] reqCtx) {
getFsm().useState((fsmState)->{
if(status.isOk()){
log.debug("readIndex {}", fsmState);
closure.success(fsmState);
closure.run(Status.OK());
dofunc.apply(fsmState);
synchronized(dofunc) {
dofunc.apply(fsmState);
}
return null;
}
@ -160,22 +165,99 @@ public class StateServerFactory {
});
return null;
});
synchronized(dofunc) {
log.debug("dofunc notify");
dofunc.notify();
}
}
});
try {
synchronized(dofunc) {
log.debug("dofunc wait");
dofunc.wait(5000);
log.debug("dofunc unwait");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return ;
}
public void updateFsmState(Function<State, State> dofunc) throws InterruptedException, RemotingException {
this.getFsm().updateState(dofunc);
}
/**
* 同步更新 WorkerState
* @param dofunc
* @throws InterruptedException
* @throws RemotingException
*/
public void updateFsmWorkerState(WorkerState ws) throws InterruptedException, RemotingException {
if(isLeader()) {
var closure = new SyncClosure<State>() {
@Override
public void run(Status status) {
log.debug("leader {}", status);
this.synclock.notify();
}
};
StateServerFactory.getStateServer().applyWorkerState(ws, closure);
closure.synclock.wait(5000);
return;
}
try {
var ss = StateServerFactory.getStateServer();
var request = new RequestCondition();
request.setWorkerState(ws);
log.debug("my: {} leader id {}",ss.getCluster().getServerId(), ss.getNode().getLeaderId());
ResponseSM resp;
resp = (ResponseSM)StateServerFactory.getStateServer().getRpcClient().invokeSync(ss.getNode().getLeaderId().getEndpoint(), request, 5000);
if(resp == null) {
log.error("{} set WorkerState is error", resp);
}
log.debug("follow is {}", resp);
return;
} catch (InterruptedException | RemotingException e) {
e.printStackTrace();
}
}
/**
* 同步更新整个State
* @param s
* @throws InterruptedException
* @throws RemotingException
*/
public void updateFsmState(State s) throws InterruptedException, RemotingException {
if(isLeader()) {
var closure = new SyncClosure<State>() {
@Override
public void run(Status status) {
log.debug("leader {}", status);
synclock.notify();
}
};
StateServerFactory.getStateServer().applyState(s, closure);
closure.synclock.wait(5000);
return;
}
var ss = StateServerFactory.getStateServer();
var request = new RequestState();
request.setState(s);
log.debug("my: {} leader id {}",ss.getCluster().getServerId(), ss.getNode().getLeaderId());
ResponseSM resp;
resp = (ResponseSM)StateServerFactory.getStateServer().getRpcClient().invokeSync(ss.getNode().getLeaderId().getEndpoint(), request, 5000);
if(resp == null) {
log.error("{} set State is error", resp);
}
log.debug("follow is {}", resp);
return;
}
public void applyState(State state, SyncClosure<State> closure) {
@ -235,46 +317,6 @@ public class StateServerFactory {
}
public void readIndexState(final boolean readOnlySafe, final SyncClosure<State> closure) {
useFsmState((fsmState)->{
closure.setValue(fsmState);
if(!readOnlySafe){
closure.success(fsmState);
closure.run(Status.OK());
return null;
}
getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {
@Override
public void run(Status status, long index, byte[] reqCtx) {
if(status.isOk()){
log.debug("readIndex {}", fsmState);
closure.success(fsmState);
closure.run(Status.OK());
return;
}
readIndexExecutor.execute(() -> {
if(isLeader()){
log.debug("Fail to get value with 'ReadIndex': {}, try to applying to the state machine.", status);
applyState(fsmState, closure);
}else {
handlerNotLeaderError(closure);
}
});
}
});
return null;
});
}
public ResponseSM redirect() {

View File

@ -25,7 +25,9 @@ public abstract class SyncClosure<T> implements Closure {
// 代表任务状态
private T value;
public Object synclock = new Object();
public SyncClosure() {
}

View File

@ -29,4 +29,8 @@ public class WorkerState implements Serializable {
// 节点的对应peerID
public PeerId peerId;
public long taskQueueSize;
public WorkerState(PeerId pid) {
this.peerId = pid;
}
}