TODO: 解决 主从操作 更新混乱的问题

This commit is contained in:
huangsimin 2022-07-25 17:04:45 +08:00
parent d35d161f41
commit f7dbbb35cb
10 changed files with 191 additions and 310 deletions

View File

@ -147,13 +147,14 @@
<version>${grpc.version}</version>
</dependency>
<!-- proto自动生成java文件所需的编译插件 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
<scope>provided</scope>
</dependency>
</dependencies>
<scm>

View File

@ -7,15 +7,23 @@
package com.yuandian.dataflow.controller;
import java.io.Serializable;
import java.time.Instant;
import java.util.ArrayList;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.error.RemotingException;
import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.alipay.sofa.jraft.rpc.RpcContext;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
import com.google.protobuf.Any;
import com.yuandian.dataflow.statemachine.StateServerFactory;
import com.yuandian.dataflow.statemachine.rpc.ResponseSM;
import com.yuandian.dataflow.statemachine.SyncClosure;
import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
import com.yuandian.dataflow.statemachine.rpc.SyncStateProcessor;
import com.yuandian.dataflow.statemachine.rpc.SyncWorkerStateProcessor;
import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft;
import com.yuandian.dataflow.statemachine.state.State;
import lombok.Getter;
import lombok.Setter;
@ -42,19 +50,62 @@ public class PacketsProcessor implements RpcProcessor<PacketsProcessor.PacketsRe
@Override
public void handleRequest(RpcContext rpcCtx, PacketsRequest request) {
// StateServerFactory.getStateServer().updateFsmStateAsync(s, onCompleted);
log.info("{}",request.packets.size());
var resp = new ResponseSM();
log.info("request.packets.size(): {}",request.packets.size());
var resp = new RaftResponse();
resp.setMsg(rpcCtx.getRemoteAddress());
rpcCtx.sendResponse(resp);
resp.setSuccess(true);
var ss = StateServerFactory.getStateServer();
ss.useFsmStateNotLock((state)->{
var work = state.getWorkers().get( ss.getCluster().getServerId());
work.setTaskQueueSize( work.getTaskQueueSize() - request.packets.size());
work.setUpdateAt(Instant.now());
if(!ss.isLeader()) {
var requestUpdateState = new SyncWorkerStateProcessor.RequestWorkerState();
requestUpdateState.setWorkerState(work);
log.info("转发 {}", work);
try {
ss.getRpcClient().invokeAsync(StateServerFactory.getNode().getLeaderId().getEndpoint(),
requestUpdateState, new InvokeCallback() {
@Override
public void complete(Object result, Throwable err) {
if (err != null) {
err.printStackTrace();
} else {
log.debug("转发 udate workerState {}", result);
}
rpcCtx.sendResponse(resp);
}}, 5000);
} catch (InterruptedException | RemotingException e) {
e.printStackTrace();
}
return;
}
StateServerFactory.getStateServer().applyWorkerState(work, new SyncClosure<State>() {
@Override
public void run(Status status) {
log.debug("finsh tasks size {}, size: {}", status, request.packets.size());
rpcCtx.sendResponse(resp);
}
});
});
}
@Override
public String interest() {
// TODO Auto-generated method stub
return PacketsRequest.class.getName();
}
}

View File

@ -16,8 +16,8 @@ import com.alipay.sofa.jraft.error.RaftException;
import com.alipay.sofa.jraft.error.RemotingException;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter;
import com.yuandian.dataflow.statemachine.rpc.ResponseSM;
import com.yuandian.dataflow.statemachine.rpc.SyncConditionProcessor.RequestCondition;
import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
import com.yuandian.dataflow.statemachine.rpc.SyncWorkerStateProcessor.RequestWorkerState;
import com.yuandian.dataflow.statemachine.rpc.SyncStateProcessor.RequestState;
import com.yuandian.dataflow.statemachine.state.State;
import com.yuandian.dataflow.statemachine.state.StateFactory;
@ -58,46 +58,9 @@ public class StateMachine extends StateMachineAdapter {
return state;
}
/**
* Returns current value. 读取修改都在这个函数域内进行
*/
public void useState(Function<State, Void> dofunc) {
dofunc.apply(this.state);
}
/**
* Returns current value. 读取修改都在这个函数域内进行
* @throws RemotingException
* @throws InterruptedException
*/
public void updateState(Function<State, State> dofunc) throws InterruptedException, RemotingException {
var newstate = dofunc.apply(this.state);
var ss = StateServerFactory.getStateServer();
if(!isLeader()) {
var request = new RequestState();
request.setState(newstate);
var result = ss.getRpcClient().invokeSync(ss.getNode().getLeaderId().getEndpoint(), request, 5000);
log.info("{}", result);
return;
}
// this.state = newstate;
if(newstate != null) {
var colsure = new SyncClosure<State>() {
@Override
public void run(Status status) {
}
};
colsure.setValue(newstate);
StateServerFactory.getStateServer().applyState(newstate, colsure);
}
}
@Override
@SuppressWarnings("unchecked")
public void onApply(final Iterator iter) {
@ -107,7 +70,7 @@ public class StateMachine extends StateMachineAdapter {
// This task is applied by this node, get value from closure to avoid additional
// parsing.
var closure = (SyncClosure<State>)iter.done(); // 只支持单一个State. 全状态机只支持一种提交
log.info("SyncDataClosure(done) taskState:{} leaderTerm:{} {}",this.state, this.leaderTerm, closure);
// log.info("SyncDataClosure(done) taskState:{} leaderTerm:{} {}",this.state, this.leaderTerm, closure);
this.state = closure.getValue();
closure.success(state);
closure.run(Status.OK());
@ -115,11 +78,11 @@ public class StateMachine extends StateMachineAdapter {
// Have to parse FetchAddRequest from this user log.
final ByteBuffer data = iter.getData();
try {
synchronized(state) {
state = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize(
data.array(), State.class.getName());
log.info("SyncDataClosure(null) taskState:{} leaderTerm:{}", state, this.leaderTerm);
}
state = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize(
data.array(), State.class.getName());
// log.info("SyncDataClosure(null) taskState:{} leaderTerm:{}", state, this.leaderTerm);
} catch (CodecException e) {
e.printStackTrace();
@ -150,20 +113,31 @@ public class StateMachine extends StateMachineAdapter {
public void onLeaderStart(final long term) {
log.debug("onLeaderStart {}", StateServerFactory.getCluster().getServerId());
this.leaderTerm.set(term);
try {
updateState((state)->{
var ws = new WorkerState(StateServerFactory.getStateServer().getCluster().getServerId());
state.getWorkers().put(ws.peerId, ws);
return state;
});
if(!StateFactory.getMasterExecute().isAlive()) {
StateFactory.getMasterExecute().start();
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
if(StateFactory.getMasterExecute().isAlive()) {
StateFactory.getMasterExecute().interrupt();
}
var ss = StateServerFactory.getStateServer();
ss.useFsmStateNotLock((state)->{
var ws = state.getWorkers().get( ss.getCluster().getServerId() );
if(ws == null) {
ws = new WorkerState(ss.getCluster().getServerId());
state.getWorkers().put(ss.getCluster().getServerId(), ws);
}
ss.applyState(state, new SyncClosure<State>() {
@Override
public void run(Status status) {
log.debug("master update workerstate: {}", status);
}
});
});
StateFactory.getMasterExecute().start();
super.onLeaderStart(term);
}
@ -177,17 +151,7 @@ public class StateMachine extends StateMachineAdapter {
StateFactory.getMasterExecute().interrupt();
}
try {
updateState((state)->{
state.getWorkers().remove( StateServerFactory.getServerId() );
return state;
});
} catch (InterruptedException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
}
}
@ -210,11 +174,11 @@ public class StateMachine extends StateMachineAdapter {
var ss = StateServerFactory.getStateServer();
var ws = new WorkerState(ss.getCluster().getServerId());
var request = new RequestCondition();
var request = new RequestWorkerState();
request.setWorkerState(ws);
log.debug("my: {} leader id {}",ss.getCluster().getServerId(), ss.getNode().getLeaderId());
ResponseSM resp;
resp = (ResponseSM)StateServerFactory.getStateServer().getRpcClient().invokeSync(ss.getNode().getLeaderId().getEndpoint(), request, 5000);
RaftResponse resp;
resp = (RaftResponse)StateServerFactory.getStateServer().getRpcClient().invokeSync(ss.getNode().getLeaderId().getEndpoint(), request, 5000);
if(resp == null) {
log.debug("{} set WorkerState is error", resp);
}

View File

@ -12,6 +12,7 @@ import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Consumer;
import java.util.function.Function;
import org.reflections.Reflections;
@ -38,8 +39,8 @@ import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory;
import com.alipay.sofa.jraft.util.BytesUtil;
import com.alipay.sofa.jraft.util.Endpoint;
import com.alipay.sofa.jraft.util.ThreadPoolUtil;
import com.yuandian.dataflow.statemachine.rpc.ResponseSM;
import com.yuandian.dataflow.statemachine.rpc.SyncConditionProcessor.RequestCondition;
import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
import com.yuandian.dataflow.statemachine.rpc.SyncWorkerStateProcessor.RequestWorkerState;
import com.yuandian.dataflow.statemachine.rpc.SyncStateProcessor.RequestState;
import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft;
import com.yuandian.dataflow.statemachine.state.State;
@ -166,164 +167,21 @@ public class StateServerFactory {
return this.fsm.isLeader();
}
public void useFsmStateAsync(Function<State, Void> dofunc) {
SyncClosure<State> closure = new SyncClosure<State>() {
@Override
public void run(Status status) {
if(status.isOk()) {
dofunc.apply(this.getValue());
}
}
};
getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {
@Override
public void run(Status status, long index, byte[] reqCtx) {
getFsm().useState((fsmState)->{
if(status.isOk()){
closure.setValue(fsmState);
closure.success(fsmState);
closure.run(Status.OK());
return null;
}
readIndexExecutor.execute(() -> {
if(isLeader()){
log.info("Fail to get value with 'ReadIndex': {}, try to applying to the state machine.", getNode());
applyState(fsmState, closure);
}else {
handlerNotLeaderError(closure);
}
});
return null;
});
}
});
return ;
}
/**
* 同步更新 WorkerState
* @param dofunc
* @throws InterruptedException
* @throws RemotingException
*/
public void updateFsmWorkerState(WorkerState ws) throws InterruptedException, RemotingException {
// leader就直接提交
if(isLeader()) {
var closure = new SyncClosure<State>() {
@Override
public void run(Status status) {
// log.debug("leader {}", status);
this.synclock.notify();
}
};
StateServerFactory.getStateServer().applyWorkerState(ws, closure);
closure.synclock.wait(5000);
return;
}
try {
// 非leader就 rpc请求
var ss = StateServerFactory.getStateServer();
var request = new RequestCondition();
request.setWorkerState(ws);
log.debug("my: {} leader id {}",ss.getCluster().getServerId(), ss.getNode().getLeaderId());
ResponseSM resp;
resp = (ResponseSM)StateServerFactory.getRpcClient().invokeSync(StateServerFactory.getNode().getLeaderId().getEndpoint(), request, 5000);
if(resp == null) {
log.error("{} set WorkerState is error", resp);
}
// log.debug("follow is {}", resp);
return;
} catch (InterruptedException | RemotingException e) {
e.printStackTrace();
}
}
/**
* 同步更新整个State
* @param s
* @throws InterruptedException
* @throws RemotingException
*/
public void updateFsmState(State s) throws InterruptedException, RemotingException {
if(isLeader()) {
var closure = new SyncClosure<State>() {
@Override
public void run(Status status) {
// log.debug("leader update {}", status);
synclock.notify();
}
};
StateServerFactory.getStateServer().applyState(s, closure);
closure.synclock.wait(5000);
return;
}
var ss = StateServerFactory.getStateServer();
var request = new RequestState();
request.setState(s);
log.debug("my: {} leader id {}",ss.getCluster().getServerId(), ss.getNode().getLeaderId());
ResponseSM resp;
resp = (ResponseSM)StateServerFactory.getStateServer().getRpcClient().invokeSync(ss.getNode().getLeaderId().getEndpoint(), request, 5000);
if(resp == null) {
log.error("{} set State is error", resp);
}
// log.debug("follow is {}", resp);
return;
}
public void updateFsmStateAsync(State s, Function<Status, Void> onCompleted) throws InterruptedException, RemotingException {
if(isLeader()) {
var closure = new SyncClosure<State>() {
@Override
public void run(Status status) {
// log.debug("leader update {}", status);
if(status.isOk()) {
onCompleted.apply(status);
}
}
};
StateServerFactory.getStateServer().applyState(s, closure);
return;
}
var ss = StateServerFactory.getStateServer();
var request = new RequestState();
request.setState(s);
log.debug("my: {} leader id {}",ss.getCluster().getServerId(), ss.getNode().getLeaderId());
StateServerFactory
.getStateServer()
.getRpcClient()
.invokeAsync(ss.getNode().getLeaderId().getEndpoint(), request, new InvokeCallback() {
@Override
public void complete(Object result, Throwable err) {
if(result != null){
onCompleted.apply(Status.OK());
} else {
var status = new Status(10000, "rpc invokeAsync with request: {}", request);
log.debug("{}", status);
onCompleted.apply(status);
}
}
}, 5000);
// log.debug("follow is {}", resp);
return;
public void useFsmState(Consumer<State> dofunc) {
var state = ss.fsm.getState();
synchronized(state) {
dofunc.accept(state);
}
}
public void useFsmStateNotLock(Consumer<State> dofunc) {
var state = ss.fsm.getState();
dofunc.accept(state);
}
public void applyState(State state, SyncClosure<State> closure) {
// 所有的提交都必须再leader进行
@ -351,10 +209,8 @@ public class StateServerFactory {
ss.handlerNotLeaderError(closure);
return;
}
useFsmStateAsync((fsmState)->{
StateServerFactory.getStateServer().useFsmStateNotLock((fsmState)->{
var wmap = fsmState.getWorkers();
var wstate = wmap.get(state.getPeerId());
@ -376,16 +232,13 @@ public class StateServerFactory {
closure.success(fsmState);
closure.run(Status.OK());
}
return null;
});
}
public ResponseSM redirect() {
final ResponseSM response = new ResponseSM();
public RaftResponse redirect() {
final RaftResponse response = new RaftResponse();
response.setSuccess(false);
if (this.node != null) {
final PeerId leader = this.node.getLeaderId();

View File

@ -3,7 +3,7 @@ package com.yuandian.dataflow.statemachine;
import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.entity.PeerId;
import com.yuandian.dataflow.statemachine.rpc.ResponseSM;
import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
import com.yuandian.dataflow.statemachine.state.State;
import lombok.Getter;
@ -21,7 +21,7 @@ import org.slf4j.LoggerFactory;
public abstract class SyncClosure<T> implements Closure {
// 状态机的统一响应
private ResponseSM response;
private RaftResponse response;
// 代表任务状态
private T value;
@ -33,7 +33,7 @@ public abstract class SyncClosure<T> implements Closure {
}
public void failure(final String errorMsg, final PeerId redirect) {
final ResponseSM response = new ResponseSM();
final RaftResponse response = new RaftResponse();
response.setSuccess(false);
response.setMsg(errorMsg);
response.setRedirect(redirect);
@ -41,7 +41,7 @@ public abstract class SyncClosure<T> implements Closure {
}
public void success(final State value) {
final ResponseSM response = new ResponseSM();
final RaftResponse response = new RaftResponse();
response.setState(value);
response.setSuccess(true);
setResponse(response);

View File

@ -26,7 +26,7 @@ import lombok.extern.slf4j.Slf4j;
@Getter
@Setter
@ToString
public class ResponseSM implements Serializable {
public class RaftResponse implements Serializable {
private static final long serialVersionUID = 1L;

View File

@ -41,12 +41,12 @@ import lombok.extern.slf4j.Slf4j;
*/
@Slf4j
@ProcessorRaft
public class SyncConditionProcessor implements RpcProcessor<SyncConditionProcessor.RequestCondition> {
public class SyncWorkerStateProcessor implements RpcProcessor<SyncWorkerStateProcessor.RequestWorkerState> {
@Getter
@Setter
@ToString
public static class RequestCondition implements Serializable {
public static class RequestWorkerState implements Serializable {
private static final long serialVersionUID = 1L;
@ -54,7 +54,7 @@ public class SyncConditionProcessor implements RpcProcessor<SyncConditionProcess
}
@Override
public void handleRequest(RpcContext rpcCtx, RequestCondition request) {
public void handleRequest(RpcContext rpcCtx, RequestWorkerState request) {
log.info("request: {}", request);
final SyncClosure<State> closure = new SyncClosure< State>() {
@ -70,7 +70,7 @@ public class SyncConditionProcessor implements RpcProcessor<SyncConditionProcess
@Override
public String interest() {
return RequestCondition.class.getName();
return RequestWorkerState.class.getName();
}

View File

@ -20,5 +20,4 @@ import java.lang.annotation.Target;
@Retention(RetentionPolicy.CLASS)
@Target(ElementType.TYPE)
public @interface ProcessorRaft {
}

View File

@ -7,10 +7,13 @@
package com.yuandian.dataflow.statemachine.state;
import java.time.Instant;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import com.alibaba.nacos.shaded.io.grpc.netty.shaded.io.netty.handler.codec.http.HttpContentEncoder.Result;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.error.RemotingException;
import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.google.protobuf.Any;
import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest;
import com.yuandian.dataflow.proto.msgtype.BacktrackingFlowOuterClass;
@ -44,59 +47,69 @@ public class StateFactory {
var alivePeers = StateServerFactory.getCluster().getRaftNode().listAlivePeers();
log.debug("master execute {}", StateServerFactory.getCluster().getRaftNode().listAlivePeers());
if (alivePeers != null) {
var ss = StateServerFactory.getStateServer();
StateServerFactory.getStateServer().useFsmStateAsync((state) -> {
synchronized (alivePeers) {
alivePeers.forEach((peer) -> {
WorkerState ws = state.getWorkers().get(peer);
if (ws != null) {
var cap = 100 - ws.getTaskQueueSize();
if (cap > 0) {
log.debug("{}", cap);
var request = new PacketsRequest();
for (int i = 0; i < cap; i++) {
var p = Any.pack(
BacktrackingFlowOuterClass.BacktrackingFlow.newBuilder()
.setTableId(10086)
.build());
// var state = ss.getFsm().getState();
ss.useFsmStateNotLock((state) -> {
alivePeers.forEach((peer) -> {
WorkerState ws = state.getWorkers().get(peer);
request.getPackets().add(p);
}
try {
var result = ss.getRpcClient().invokeSync(ws.getPeerId().getEndpoint(),
request, 5000);
log.info("{}", result);
ws.setUpdateAt( Instant.now() );
ws.setTaskQueueSize(ws.getTaskQueueSize() - cap);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (RemotingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if (ws != null) {
var cap = 100 - ws.getTaskQueueSize();
log.debug("cap :{} peer: {}", cap, peer);
if (cap <= 0) {
return ;
}
ss.applyState(state, new SyncClosure<State>() {
public void run(Status status) {
log.debug("{}", status);
};
} );
alivePeers.notifyAll();
});
}
var request = new PacketsRequest();
for (int i = 0; i < cap; i++) {
var p = Any.pack(
BacktrackingFlowOuterClass.BacktrackingFlow.newBuilder()
.setTableId(10086)
.build());
request.getPackets().add(p);
}
return null;
try {
log.debug("rpc {}", peer);
ss.getRpcClient().invokeAsync(peer.getEndpoint(),
request, new InvokeCallback() {
@Override
public void complete(Object result, Throwable err) {
if (err != null) {
err.printStackTrace();
} else {
log.info("{} peer result", result);
}
}
}, 5000);
} catch (InterruptedException | RemotingException e) {
e.printStackTrace();
}
ws.setUpdateAt(Instant.now());
ws.setTaskQueueSize(100);
}
});
ss.applyState(state, new SyncClosure<State>() {
@Override
public void run(Status status) {
log.info("任务队列更新成功 {}", this.getValue().getWorkers());
}
});
});
synchronized (alivePeers) {
alivePeers.wait(5000);
}
// ss.applyState(state, new SyncClosure<State>() {
// public void run(Status status) {
// log.debug("{}", status);
// };
// });
}

View File

@ -9,7 +9,7 @@ import com.alipay.sofa.jraft.option.CliOptions;
import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory;
import com.alipay.sofa.jraft.util.Endpoint;
import com.yuandian.dataflow.statemachine.rpc.ResponseSM;
import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
import com.yuandian.dataflow.statemachine.rpc.SyncStateProcessor.RequestState;
import com.yuandian.dataflow.statemachine.state.State;
@ -28,12 +28,12 @@ public class StateMachineTest {
fdata.setState(fstate);
var leader = new Endpoint("localhost",4441);
ResponseSM resp = (ResponseSM)rpcClient.invokeSync(leader, fdata
RaftResponse resp = (RaftResponse)rpcClient.invokeSync(leader, fdata
, 5000);
log.info("{}", resp);
if( resp != null && !resp.isSuccess() ) {
leader = resp.getRedirect().getEndpoint();
resp = (ResponseSM)rpcClient.invokeSync(resp.getRedirect().getEndpoint(), fdata
resp = (RaftResponse)rpcClient.invokeSync(resp.getRedirect().getEndpoint(), fdata
, 5000);
log.info("{}", resp);
}