This commit is contained in:
huangsimin 2022-07-18 15:22:20 +08:00
parent 493f3d4907
commit 18d329910d
7 changed files with 176 additions and 70 deletions

View File

@ -19,8 +19,6 @@ import lombok.extern.slf4j.Slf4j;
@Controller
public class TaskLog {
@GetMapping(path = "/test")
public ResponseEntity<Response> Processing() throws InterruptedException {
@ -33,8 +31,12 @@ public class TaskLog {
var state = new State();
closure.setValue(state);
StateServerFactory.getStateServer().useState((fsmState)->{
log.error(fsmState.toString() );
return fsmState;
});
log.error(StateServerFactory.getStateServer().getFsm().getState().toString() );
// state.getWorker().setPeerId(StateServerFactory.getStateServer().getNode().getNodeId().getPeerId());
// state.getWorker().setTaskQueueSize(1);

View File

@ -5,6 +5,8 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alipay.remoting.exception.CodecException;
@ -13,6 +15,7 @@ import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.Iterator;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.core.StateMachineAdapter;
import com.alipay.sofa.jraft.entity.LeaderChangeContext;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.error.RaftException;
@ -25,8 +28,10 @@ import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter;
import com.alipay.sofa.jraft.util.Endpoint;
import com.alipay.sofa.jraft.util.Utils;
import com.yuandian.dataflow.statemachine.rpc.ResponseSM;
import com.yuandian.dataflow.statemachine.rpc.RequestCondition;
import com.yuandian.dataflow.statemachine.rpc.RequestState;
import com.yuandian.dataflow.statemachine.state.State;
import com.yuandian.dataflow.statemachine.state.WorkerState;
import lombok.var;
import lombok.extern.slf4j.Slf4j;
@ -59,11 +64,33 @@ public class StateMachine extends StateMachineAdapter {
/**
* Returns current value. 只有Get 操作状态由协议流程决定 Apply
*/
public State getState() {
return state;
// public State getState() {
// return state;
// }
/**
* Returns current value. 读取修改都在这个函数域内进行
*/
public void useState(Function<State, Void> dofunc) {
synchronized(this.state) {
dofunc.apply(this.state);
}
}
/**
* Returns current value. 读取修改都在这个函数域内进行
*/
public void updateState(Function<State, State> dofunc) {
synchronized(this.state) {
var newstate = dofunc.apply(this.state);
this.state = newstate;
}
}
@Override
// @SuppressWarnings("unchecked")
public void onApply(final Iterator iter) {
while (iter.hasNext()) {
@ -82,6 +109,7 @@ public class StateMachine extends StateMachineAdapter {
state = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize(
data.array(), State.class.getName());
log.info("SyncDataClosure(null) taskState:{} leaderTerm:{}", state, this.leaderTerm);
} catch (CodecException e) {
e.printStackTrace();
}
@ -113,6 +141,30 @@ public class StateMachine extends StateMachineAdapter {
this.leaderTerm.set(term);
super.onLeaderStart(term);
}
@Override
public void onStartFollowing(LeaderChangeContext ctx) {
// TODO Auto-generated method stub
super.onStartFollowing(ctx);
var ss = StateServerFactory.getStateServer();
var node = ss.getNode();
try {
var request = new RequestCondition();
request.setWorkerState( new WorkerState() );
request.getWorkerState().setPeerId( ss.getCluster().getServerId() );
log.error("------------ leader id {}", node.getLeaderId());
ResponseSM resp;
resp = (ResponseSM)StateServerFactory.getStateServer().getRpcClient().invokeSync(node.getLeaderId().getEndpoint(), request, 5000);
log.error("{}", resp);
} catch (InterruptedException | RemotingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override

View File

@ -11,10 +11,10 @@ import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Function;
import com.alipay.remoting.NamedThreadFactory;
import com.alipay.remoting.exception.CodecException;
import com.alipay.remoting.serialization.SerializerManager;
import com.alipay.sofa.jraft.JRaftUtils;
import com.alipay.sofa.jraft.Node;
@ -33,9 +33,9 @@ import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory;
import com.alipay.sofa.jraft.util.BytesUtil;
import com.alipay.sofa.jraft.util.Endpoint;
import com.alipay.sofa.jraft.util.ThreadPoolUtil;
import com.yuandian.dataflow.statemachine.rpc.ResponseSM;
import com.yuandian.dataflow.statemachine.rpc.RequestCondition;
import com.yuandian.dataflow.statemachine.rpc.RequestState;
import com.yuandian.dataflow.statemachine.rpc.ResponseSM;
import com.yuandian.dataflow.statemachine.rpc.SyncConditionProcessor;
import com.yuandian.dataflow.statemachine.rpc.SyncStateProcessor;
import com.yuandian.dataflow.statemachine.state.State;
import com.yuandian.dataflow.statemachine.state.WorkerState;
@ -63,16 +63,7 @@ public class StateServerFactory {
}
ss = new StateServerFactory.StateServer(peerstr, conf);
log.error("init peerid {}", ss.node.getNodeId().getPeerId());
ss.getNode().join();
var request = new RequestCondition();
request.setWorkerState( new WorkerState() );
request.getWorkerState().setPeerId( ss.cluster.getServerId() );
log.error("{}", ss.getNode().getNodeMetrics() );
ResponseSM resp = (ResponseSM)ss.getRpcClient().invokeSync(ss.getNode().getLeaderId().getEndpoint(), request, 5000);
log.info("{}", resp);
}
// 获取状态服务的对象
@ -86,8 +77,6 @@ public class StateServerFactory {
RpcClient rpcClient;
private Node node;
private RaftGroupService cluster;
private StateMachine fsm;
@ -125,6 +114,7 @@ public class StateServerFactory {
cluster = new RaftGroupService(groupId, serverId, nodeOptions);
cluster.getRpcServer().registerProcessor(new SyncStateProcessor());
cluster.getRpcServer().registerProcessor(new SyncConditionProcessor());
node = cluster.start();
rpcClient = new BoltRaftRpcFactory().createRpcClient();
@ -135,8 +125,46 @@ public class StateServerFactory {
return this.fsm.isLeader();
}
public State getFSMState() {
return this.fsm.getState();
public void useState(Function<State, Void> dofunc) {
this.fsm.useState((fsmState)->{
SyncClosure<State> closure = new SyncClosure<State>() {
@Override
public void run(Status status) {
}
};
getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {
@Override
public void run(Status status, long index, byte[] reqCtx) {
if(status.isOk()){
log.error("readIndex {}", fsmState);
closure.success(fsmState);
closure.run(Status.OK());
return;
}
readIndexExecutor.execute(() -> {
if(isLeader()){
log.error("Fail to get value with 'ReadIndex': {}, try to applying to the state machine.", status);
applyState(fsmState, closure);
}else {
handlerNotLeaderError(closure);
}
});
}
});
return null;
});
}
public void applyState(State state, SyncClosure<State> closure) {
@ -151,6 +179,7 @@ public class StateServerFactory {
task.setData(ByteBuffer.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(state)));
task.setDone(closure);
StateServerFactory.getStateServer().getNode().apply(task);
} catch (CodecException e) {
String errorMsg = "Fail to encode TaskState";
log.error(errorMsg, e);
@ -165,52 +194,72 @@ public class StateServerFactory {
return;
}
var wmap = getFSMState().getWorkers();
var wstate = wmap.get(ss.node.getNodeId().getPeerId());
if(wstate == null) {
wstate = new WorkerState();
wmap.put(ss.node.getNodeId().getPeerId(), wstate);
log.error("update: {}", wmap.size());
ss.applyState(getFSMState(), new SyncClosure<State>() {
@Override
public void run(Status status) {
log.error("{} add workers", ss.node.getNodeId().getPeerId());
useState((fsmState)->{
var wmap = fsmState.getWorkers();
var wstate = wmap.get(state.getPeerId());
if(wstate == null) {
wmap.put(state.getPeerId(), state);
try {
final Task task = new Task();
log.error("{}", fsmState);
task.setData(ByteBuffer.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(fsmState)));
task.setDone(closure);
StateServerFactory.getStateServer().getNode().apply(task);
} catch (CodecException e) {
String errorMsg = "Fail to encode TaskState";
log.error(errorMsg, e);
closure.failure(errorMsg, PeerId.emptyPeer());
closure.run(new Status(RaftError.EINTERNAL, errorMsg));
}
} );
}
}
return fsmState;
});
}
public void readIndexState(final boolean readOnlySafe, final SyncClosure<State> closure) {
closure.setValue(getFSMState());
if(!readOnlySafe){
closure.success(getFSMState());
closure.run(Status.OK());
return;
}
getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {
@Override
public void run(Status status, long index, byte[] reqCtx) {
if(status.isOk()){
log.error("readIndex {}", getFSMState());
closure.success(getFSMState());
closure.run(Status.OK());
return;
}
readIndexExecutor.execute(() -> {
if(isLeader()){
log.error("Fail to get value with 'ReadIndex': {}, try to applying to the state machine.", status);
applyState(getFSMState(), closure);
}else {
handlerNotLeaderError(closure);
}
});
useState((fsmState)->{
closure.setValue(fsmState);
if(!readOnlySafe){
closure.success(fsmState);
closure.run(Status.OK());
return fsmState;
}
getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {
@Override
public void run(Status status, long index, byte[] reqCtx) {
if(status.isOk()){
log.error("readIndex {}", fsmState);
closure.success(fsmState);
closure.run(Status.OK());
return;
}
readIndexExecutor.execute(() -> {
if(isLeader()){
log.error("Fail to get value with 'ReadIndex': {}, try to applying to the state machine.", status);
applyState(fsmState, closure);
}else {
handlerNotLeaderError(closure);
}
});
}
});
return fsmState;
});
}
public ResponseSM redirect() {

View File

@ -15,9 +15,11 @@ import com.alipay.sofa.jraft.entity.Task;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.rpc.RpcContext;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
import com.lmax.disruptor.WorkProcessor;
import com.yuandian.dataflow.statemachine.StateServerFactory;
import com.yuandian.dataflow.statemachine.SyncClosure;
import com.yuandian.dataflow.statemachine.StateServerFactory.StateServer;
import com.yuandian.dataflow.statemachine.state.WorkerState;
import com.alipay.sofa.jraft.entity.PeerId;
import org.apache.commons.lang.StringUtils;
@ -38,7 +40,7 @@ public class SyncConditionProcessor implements RpcProcessor<RequestCondition> {
public void handleRequest(RpcContext rpcCtx, RequestCondition request) {
log.info("request: {}", request);
final SyncClosure closure = new SyncClosure() {
final SyncClosure<WorkerState> closure = new SyncClosure<WorkerState>() {
@Override
public void run(Status status) {
rpcCtx.sendResponse(getResponse());
@ -51,7 +53,7 @@ public class SyncConditionProcessor implements RpcProcessor<RequestCondition> {
@Override
public String interest() {
return RequestState.class.getName();
return RequestCondition.class.getName();
}

View File

@ -40,7 +40,7 @@ public class SyncStateProcessor implements RpcProcessor<RequestState> {
log.info("request: {}", request);
final SyncClosure closure = new SyncClosure() {
final SyncClosure<State> closure = new SyncClosure<State>() {
@Override
public void run(Status status) {
rpcCtx.sendResponse(getResponse());

View File

@ -10,7 +10,7 @@ import com.alipay.sofa.jraft.option.CliOptions;
import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory;
import com.alipay.sofa.jraft.util.Endpoint;
import com.yuandian.dataflow.statemachine.rpc.SMResponse;
import com.yuandian.dataflow.statemachine.rpc.ResponseSM;
import com.yuandian.dataflow.statemachine.rpc.RequestState;
import com.yuandian.dataflow.statemachine.state.State;
@ -30,12 +30,12 @@ public class StateMachineTest {
fdata.setState(fstate);
var leader = new Endpoint("localhost",4441);
SMResponse resp = (SMResponse)rpcClient.invokeSync(leader, fdata
ResponseSM resp = (ResponseSM)rpcClient.invokeSync(leader, fdata
, 5000);
log.info("{}", resp);
if( resp != null && !resp.isSuccess() ) {
leader = resp.getRedirect().getEndpoint();
resp = (SMResponse)rpcClient.invokeSync(resp.getRedirect().getEndpoint(), fdata
resp = (ResponseSM)rpcClient.invokeSync(resp.getRedirect().getEndpoint(), fdata
, 5000);
log.info("{}", resp);
}
@ -61,7 +61,7 @@ public class StateMachineTest {
rpcClient.invokeAsync(leader, request, new InvokeCallback() {
@Override
public void complete(Object result, Throwable err) {
// SMResponse resp = (SMResponse)result;
// ResponseSM resp = (ResponseSM)result;
log.info("{} {} {}", result, err, pi);
}

View File

@ -2,12 +2,13 @@ screen -S raft-0 -X quit
screen -S raft-1 -X quit
screen -S raft-2 -X quit
sleep 2
sleep 1
screen -dmS raft-0 -L java -jar target/dataflow-1.0.0-SNAPSHOT.jar 0
screen -dmS raft-1 -L java -jar target/dataflow-1.0.0-SNAPSHOT.jar 1
screen -dmS raft-2 -L java -jar target/dataflow-1.0.0-SNAPSHOT.jar 2
screen -S raft-0 -X logfile flush 1
screen -S raft-1 -X logfile flush 1
screen -S raft-2 -X logfile flush 1
screen -S raft-0 -X logfile flush 0
screen -S raft-1 -X logfile flush 0
screen -S raft-2 -X logfile flush 0