This commit is contained in:
huangsimin 2022-07-27 00:02:25 +08:00
parent eea062ce8e
commit 9c12f24c29
9 changed files with 68 additions and 94 deletions

View File

@ -55,31 +55,32 @@ public class PacketsProcessor implements RpcProcessor<PacketsProcessor.PacketsRe
var resp = new RaftResponse();
resp.setMsg(rpcCtx.getRemoteAddress());
resp.setSuccess(true);
var ss = StateServerFactory.getStateServer();
log.info("{} handler request.packets.size(): {}", ss.getNode().getNodeId().getPeerId() ,request.packets.size());
ss.useFsmStateNotLock((state)->{
log.info("{} handler request.packets.size(): {}", StateServerFactory.getServerId(), request.packets.size());
ss.readIndexState((state)->{
var work = state.getWorkers().get( ss.getCluster().getServerId());
work.setTaskQueueSize( work.getTaskQueueSize() - request.packets.size());
work.setUpdateAt(Instant.now());
log.debug("workerState taskQueueSize: {} psize: {} state {}", work.getTaskQueueSize(), request.packets.size(), state.getWorkers().size());
var op = new Operate(OperateType.PUT);
op.setValue(work);
Operate.CallOperate(op, new OperateClosure() {
var ws = state.getWorkers().get(StateServerFactory.getServerId());
ws.setTaskQueueSize( ws.getTaskQueueSize() - request.packets.size());
ws.setUpdateAt(Instant.now());
log.debug("workerState taskQueueSize: {} psize: {} state {}", ws.getTaskQueueSize(), request.packets.size(), state.getWorkers().size());
Operate.CallOperate(new Operate(OperateType.PUT,ws), new OperateClosure() {
@Override
public void run(Status status) {
log.info("{}", this.getResponse());
var resp = new RaftResponse();
resp.setSuccess(true);
// resp.setRedirect(StateServerFactory.getStateServer().getNode().getLeaderId());
var resp = this.getResponse();
if(status.isOk()) {
resp.setSuccess(true);
log.info("{}", resp);
} else {
resp.setSuccess(false);
}
rpcCtx.sendResponse(resp);
}
});
});

View File

@ -16,7 +16,6 @@ import java.util.stream.Collectors;
import com.google.protobuf.*;
import com.google.protobuf.util.JsonFormat;
import com.yuandian.common.Config;
import com.yuandian.dataflow.projo.Response;
import com.yuandian.dataflow.proto.CollectPacketsServerGrpc;

View File

@ -1,14 +0,0 @@
package com.yuandian.dataflow.projo;
import org.apache.http.HttpStatus;
import com.fasterxml.jackson.annotation.JsonProperty;
public class Response {
@JsonProperty("code")
public org.apache.http.HttpStatus Code;
@JsonProperty("message")
public String Message;
@JsonProperty("data")
public Object Data;
}

View File

@ -20,7 +20,7 @@ import com.yuandian.dataflow.statemachine.operate.Operate;
import com.yuandian.dataflow.statemachine.operate.Operate.OperateType;
import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
import com.yuandian.dataflow.statemachine.rpc.OperateProcessor.RequestOperate;
import com.yuandian.dataflow.statemachine.rpc.OperateProcessor.OperateRequest;
import com.yuandian.dataflow.statemachine.state.State;
import com.yuandian.dataflow.statemachine.state.StateFactory;
import com.yuandian.dataflow.statemachine.state.WorkerState;
@ -147,15 +147,14 @@ public class StateMachine extends StateMachineAdapter {
}
var ss = StateServerFactory.getStateServer();
ss.useFsmStateNotLock((state)->{
var ws = state.getWorkers().get( ss.getCluster().getServerId() );
ss.readIndexState((state)->{
var ws = state.getWorkers().get( StateServerFactory.getServerId() );
if(ws == null) {
ws = new WorkerState(ss.getCluster().getServerId());
ws = new WorkerState(StateServerFactory.getServerId());
// state.getWorkers().put(ss.getCluster().getServerId(), ws);
}
Operate op = new Operate(OperateType.PUT);
op.setValue(ws);
Operate op = new Operate(OperateType.PUT, ws);
ss.applyOperate(op, new OperateClosure() {
@Override
public void run(Status status) {
@ -204,10 +203,9 @@ public class StateMachine extends StateMachineAdapter {
var ss = StateServerFactory.getStateServer();
var ws = new WorkerState(ss.getCluster().getServerId());
log.debug("my: {} leader id {}",ss.getCluster().getServerId(), ss.getNode().getLeaderId());
log.debug("my: {} leader id {}",StateServerFactory.getServerId(), StateServerFactory.getLeaderId());
var op = new Operate(OperateType.PUT);
op.setValue(ws);
var op = new Operate(OperateType.PUT, ws);
Operate.CallOperate(op, new OperateClosure() {
@Override
@ -234,13 +232,11 @@ public class StateMachine extends StateMachineAdapter {
log.debug("{} {}",ctx, StateServerFactory.getCluster().getServerId());
var ss = StateServerFactory.getStateServer();
var ws = new WorkerState(ss.getCluster().getServerId());
var ws = new WorkerState(StateServerFactory.getServerId());
log.debug("my: {} leader id {}",ss.getCluster().getServerId(), ss.getNode().getLeaderId());
var op = new Operate(OperateType.PUT);
op.setValue(ws);
log.debug("my: {} leader id {}",StateServerFactory.getServerId(), StateServerFactory.getLeaderId());
var op = new Operate(OperateType.PUT, ws);
Operate.CallOperate(op, new OperateClosure() {
@Override
public void run(Status status) {

View File

@ -42,7 +42,7 @@ import com.alipay.sofa.jraft.util.ThreadPoolUtil;
import com.yuandian.dataflow.statemachine.operate.Operate;
import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
import com.yuandian.dataflow.statemachine.rpc.OperateProcessor.RequestOperate;
import com.yuandian.dataflow.statemachine.rpc.OperateProcessor.OperateRequest;
import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft;
import com.yuandian.dataflow.statemachine.state.State;
import com.yuandian.dataflow.statemachine.state.WorkerState;
@ -64,9 +64,6 @@ public class StateServerFactory {
private static StateServer ss;
private static String myPeerStr;
private static Configuration raftConf;
public static void startStateServer(String peerstr, Configuration conf) throws Exception {
if(ss != null) {
throw new Exception("重复初始化 InitStateServer");
@ -75,9 +72,14 @@ public class StateServerFactory {
}
public static boolean isLeader() {
return ss.getNode().isLeader() ;
}
public static PeerId getLeaderId() {
return ss.node.getLeaderId() ;
}
public static PeerId getServerId() {
return ss.getCluster().getServerId();
@ -177,21 +179,15 @@ public class StateServerFactory {
}
}
public void useFsmStateNotLock(Consumer<State> dofunc) {
public void readIndexState(Consumer<State> dofunc) {
getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {
@Override
public void run(Status status, long index, byte[] reqCtx) {
var state = ss.fsm.getState();
dofunc.accept(state);
if( status.isOk()) {
dofunc.accept(ss.fsm.getState());
}
}
} );
}
@ -204,7 +200,6 @@ public class StateServerFactory {
}
try {
closure.setValue(op);
final Task task = new Task();
task.setData(ByteBuffer.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(op)));
@ -242,16 +237,16 @@ public class StateServerFactory {
// final StoreEngineOptions opts = new StoreEngineOptions();
// return StoreEngineHelper.createReadIndexExecutor(opts.getReadIndexCoreThreads());
return ThreadPoolUtil.newBuilder() //
.poolName("ReadIndexPool") //
.enableMetric(true) //
.coreThreads(4) //
.maximumThreads(4) //
.keepAliveSeconds(60L) //
.workQueue(new SynchronousQueue<>()) //
.threadFactory(new NamedThreadFactory("ReadIndexService", true)) //
.rejectedHandler(new ThreadPoolExecutor.AbortPolicy()) //
.build();
return ThreadPoolUtil.newBuilder() //
.poolName("ReadIndexPool") //
.enableMetric(true) //
.coreThreads(4) //
.maximumThreads(4) //
.keepAliveSeconds(60L) //
.workQueue(new SynchronousQueue<>()) //
.threadFactory(new NamedThreadFactory("ReadIndexService", true)) //
.rejectedHandler(new ThreadPoolExecutor.AbortPolicy()) //
.build();
}
}
@ -260,7 +255,7 @@ public class StateServerFactory {
public static void main(String[] args) throws InterruptedException, RemotingException {
var rpcClient = new BoltRaftRpcFactory().createRpcClient();
rpcClient.init(new CliOptions());
var resp = rpcClient.invokeSync(new Endpoint("localhost",4441), new RequestOperate(), 5000);
var resp = rpcClient.invokeSync(new Endpoint("localhost",4441), new OperateRequest(), 5000);
log.info("{}", resp);
}
}

View File

@ -2,17 +2,14 @@ package com.yuandian.dataflow.statemachine.operate;
import java.io.Serializable;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.error.RemotingException;
import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.yuandian.dataflow.projo.Response;
import com.yuandian.dataflow.statemachine.OperateClosure;
import com.yuandian.dataflow.statemachine.StateServerFactory;
import com.yuandian.dataflow.statemachine.rpc.OperateProcessor;
import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
import com.yuandian.dataflow.statemachine.state.StateFactory;
import com.yuandian.dataflow.statemachine.state.WorkerState;
import lombok.Data;
import lombok.var;
import lombok.extern.slf4j.Slf4j;
@ -34,8 +31,9 @@ public class Operate implements Serializable {
private OperateType type;
private Object value;
public Operate(OperateType t) {
public Operate(OperateType t, WorkerState ws) {
this.type = t;
this.value = ws;
}
public <T> T getValue() {
@ -48,17 +46,17 @@ public class Operate implements Serializable {
};
public static void CallOperate(Operate op, OperateClosure closure) {
var ss = StateServerFactory.getStateServer();
if (StateServerFactory.getStateServer().isLeader()) {
if (StateServerFactory.isLeader()) {
ss.applyOperate(op, closure);
return;
}
var request = new OperateProcessor.RequestOperate();
var request = new OperateProcessor.OperateRequest();
request.setOperate(op);
var leaderId = StateServerFactory.getStateServer().getNode().getLeaderId();
var leaderId = StateServerFactory.getLeaderId();
try {
ss.getRpcClient().invokeAsync(leaderId.getEndpoint(),
request, new InvokeCallback() {
@ -66,7 +64,7 @@ public class Operate implements Serializable {
@Override
public void complete(Object result, Throwable err) {
log.info("{}", result);
var resp = (RaftResponse)result;
var resp = (RaftResponse) result;
closure.setResponse(resp);
closure.success(resp.getOperate());
}
@ -75,7 +73,7 @@ public class Operate implements Serializable {
} catch (InterruptedException | RemotingException e) {
// TODO Auto-generated catch block
closure.failure("failure", null);
log.info("{}", e.toString());
log.info("{}", e.toString());
}
}

View File

@ -40,19 +40,18 @@ import lombok.extern.slf4j.Slf4j;
*/
@Slf4j
@ProcessorRaft
public class OperateProcessor implements RpcProcessor<OperateProcessor.RequestOperate> {
public class OperateProcessor implements RpcProcessor<OperateProcessor.OperateRequest> {
/**
* 同步状态时, 需要用的结构类. 新增的状态可以在 State结构里添加
* 同步状态时, 需要用的结构类. 新增的状态可以在 Operate结构里添加
*
* @author eson
*2022年7月11日-16:01:07
*/
@Getter
@Setter
@ToString
public static class RequestOperate implements Serializable {
public static class OperateRequest implements Serializable {
private static final long serialVersionUID = 1L;
@ -61,7 +60,7 @@ public class OperateProcessor implements RpcProcessor<OperateProcessor.RequestOp
@Override
public void handleRequest(RpcContext rpcCtx, RequestOperate request) {
public void handleRequest(RpcContext rpcCtx, OperateRequest request) {
log.info("request: {}", request);
@ -78,7 +77,7 @@ public class OperateProcessor implements RpcProcessor<OperateProcessor.RequestOp
@Override
public String interest() {
return RequestOperate.class.getName();
return OperateRequest.class.getName();
}

View File

@ -68,7 +68,7 @@ public class StateFactory {
var ss = StateServerFactory.getStateServer();
// var state = ss.getFsm().getState();
ss.useFsmStateNotLock((state) -> {
ss.readIndexState((state) -> {
alivePeers.forEach((peer) -> {
WorkerState ws = state.getWorkers().get(peer);
if (ws != null) {
@ -89,8 +89,8 @@ public class StateFactory {
request.getPackets().add(p);
}
var op = new Operate(OperateType.PUT);
op.setValue(ws);
var op = new Operate(OperateType.PUT, ws);
Operate.CallOperate(op, new OperateClosure() {
@Override
public void run(Status status) {

View File

@ -10,7 +10,7 @@ import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory;
import com.alipay.sofa.jraft.util.Endpoint;
import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
import com.yuandian.dataflow.statemachine.rpc.OperateProcessor.RequestOperate;
import com.yuandian.dataflow.statemachine.rpc.OperateProcessor.OperateRequest;
import com.yuandian.dataflow.statemachine.state.State;
import lombok.var;