TODO: 解决处理packets的问题

This commit is contained in:
huangsimin 2022-07-26 16:59:00 +08:00
parent bf415d45e8
commit 849821fd8b
12 changed files with 307 additions and 305 deletions

View File

@ -18,10 +18,12 @@ import com.alipay.sofa.jraft.rpc.RpcContext;
import com.alipay.sofa.jraft.rpc.RpcProcessor; import com.alipay.sofa.jraft.rpc.RpcProcessor;
import com.google.protobuf.Any; import com.google.protobuf.Any;
import com.yuandian.dataflow.statemachine.StateServerFactory; import com.yuandian.dataflow.statemachine.StateServerFactory;
import com.yuandian.dataflow.statemachine.SyncClosure; import com.yuandian.dataflow.statemachine.operate.Operate;
import com.yuandian.dataflow.statemachine.operate.Operate.OperateType;
import com.yuandian.dataflow.statemachine.OperateClosure;
import com.yuandian.dataflow.statemachine.rpc.RaftResponse; import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
import com.yuandian.dataflow.statemachine.rpc.SyncStateProcessor; import com.yuandian.dataflow.statemachine.rpc.OperateProcessor;
import com.yuandian.dataflow.statemachine.rpc.SyncWorkerStateProcessor;
import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft; import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft;
import com.yuandian.dataflow.statemachine.state.State; import com.yuandian.dataflow.statemachine.state.State;
@ -65,34 +67,17 @@ public class PacketsProcessor implements RpcProcessor<PacketsProcessor.PacketsRe
work.setTaskQueueSize( work.getTaskQueueSize() - request.packets.size()); work.setTaskQueueSize( work.getTaskQueueSize() - request.packets.size());
work.setUpdateAt(Instant.now()); work.setUpdateAt(Instant.now());
log.debug("workerState taskQueueSize: {} psize: {}", work.getTaskQueueSize(), request.packets.size()); log.debug("workerState taskQueueSize: {} psize: {}", work.getTaskQueueSize(), request.packets.size());
if(!ss.isLeader()) { var op = new Operate(OperateType.PUT);
var requestUpdateState = new SyncWorkerStateProcessor.RequestWorkerState(); op.setValue(work);
requestUpdateState.setWorkerState(work);
log.info("转发 {}", work); Operate.CallOperate(op, new OperateClosure() {
try {
ss.getRpcClient().invokeAsync(StateServerFactory.getNode().getLeaderId().getEndpoint(),
requestUpdateState, new InvokeCallback() {
@Override
public void complete(Object result, Throwable err) {
if (err != null) {
err.printStackTrace();
} else {
log.debug("转发 update WorkerState {}", result);
}
rpcCtx.sendResponse(resp);
}}, 5000);
} catch (InterruptedException | RemotingException e) {
e.printStackTrace();
}
return;
}
StateServerFactory.getStateServer().applyWorkerState(work, new SyncClosure<State>() {
@Override @Override
public void run(Status status) { public void run(Status status) {
log.debug("finsh tasks size {}, size: {}", status, request.packets.size()); log.info("{}", this.getResponse());
rpcCtx.sendResponse(resp); rpcCtx.sendResponse(this.getResponse());
} }
}); });
}); });

View File

@ -81,7 +81,7 @@ public class CollectPackets extends CollectPacketsServerImplBase {
log.info("次序:{} 条数: {}, {}:ms", count, i, Duration.between(now, Instant.now()).toMillis()); log.info("次序:{} 条数: {}, {}:ms", count, i, Duration.between(now, Instant.now()).toMillis());
} }
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); log.info("{}", e.toString());
} finally { } finally {
channel.shutdown().awaitTermination(1, TimeUnit.SECONDS); channel.shutdown().awaitTermination(1, TimeUnit.SECONDS);
} }
@ -99,7 +99,7 @@ public class CollectPackets extends CollectPacketsServerImplBase {
// MongodbTest.insertMsgToMongoDB(result); // MongodbTest.insertMsgToMongoDB(result);
} catch (InvalidProtocolBufferException e) { } catch (InvalidProtocolBufferException e) {
e.printStackTrace(); log.info("{}", e.toString());
} }
return null; return null;
}); });
@ -110,7 +110,7 @@ public class CollectPackets extends CollectPacketsServerImplBase {
// MongodbTest.insertMsgToMongoDB(result); // MongodbTest.insertMsgToMongoDB(result);
} catch (InvalidProtocolBufferException e) { } catch (InvalidProtocolBufferException e) {
e.printStackTrace(); log.info("{}", e.toString());
} }
return null; return null;
});*/ });*/
@ -131,7 +131,7 @@ public class CollectPackets extends CollectPacketsServerImplBase {
// System.out.println("result:" + response.getBody()); // System.out.println("result:" + response.getBody());
} catch (InvalidProtocolBufferException e) { } catch (InvalidProtocolBufferException e) {
e.printStackTrace(); log.info("{}", e.toString());
} }
return null; return null;
}); });
@ -143,7 +143,7 @@ public class CollectPackets extends CollectPacketsServerImplBase {
// MongodbTest.insertMsgToMongoDB(result); // MongodbTest.insertMsgToMongoDB(result);
} catch (InvalidProtocolBufferException e) { } catch (InvalidProtocolBufferException e) {
e.printStackTrace(); log.info("{}", e.toString());
} }
return null; return null;
}); });
@ -154,7 +154,7 @@ public class CollectPackets extends CollectPacketsServerImplBase {
// MongodbTest.insertMsgToMongoDB(result); // MongodbTest.insertMsgToMongoDB(result);
} catch (InvalidProtocolBufferException e) { } catch (InvalidProtocolBufferException e) {
e.printStackTrace(); log.info("{}", e.toString());
} }
return null; return null;
}); });
@ -165,7 +165,7 @@ public class CollectPackets extends CollectPacketsServerImplBase {
// MongodbTest.insertMsgToMongoDB(result); // MongodbTest.insertMsgToMongoDB(result);
} catch (InvalidProtocolBufferException e) { } catch (InvalidProtocolBufferException e) {
e.printStackTrace(); log.info("{}", e.toString());
} }
return null; return null;
}); });
@ -176,7 +176,7 @@ public class CollectPackets extends CollectPacketsServerImplBase {
// MongodbTest.insertMsgToMongoDB(result); // MongodbTest.insertMsgToMongoDB(result);
} catch (InvalidProtocolBufferException e) { } catch (InvalidProtocolBufferException e) {
e.printStackTrace(); log.info("{}", e.toString());
} }
return null; return null;
}); });
@ -187,7 +187,7 @@ public class CollectPackets extends CollectPacketsServerImplBase {
// MongodbTest.insertMsgToMongoDB(result); // MongodbTest.insertMsgToMongoDB(result);
} catch (InvalidProtocolBufferException e) { } catch (InvalidProtocolBufferException e) {
e.printStackTrace(); log.info("{}", e.toString());
} }
return null; return null;
}); });
@ -198,7 +198,7 @@ public class CollectPackets extends CollectPacketsServerImplBase {
// MongodbTest.insertMsgToMongoDB(result); // MongodbTest.insertMsgToMongoDB(result);
} catch (InvalidProtocolBufferException e) { } catch (InvalidProtocolBufferException e) {
e.printStackTrace(); log.info("{}", e.toString());
} }
return null; return null;
}); });
@ -209,7 +209,7 @@ public class CollectPackets extends CollectPacketsServerImplBase {
// MongodbTest.insertMsgToMongoDB(result); // MongodbTest.insertMsgToMongoDB(result);
} catch (InvalidProtocolBufferException e) { } catch (InvalidProtocolBufferException e) {
e.printStackTrace(); log.info("{}", e.toString());
} }
return null; return null;
}); });
@ -220,7 +220,7 @@ public class CollectPackets extends CollectPacketsServerImplBase {
// MongodbTest.insertMsgToMongoDB(result); // MongodbTest.insertMsgToMongoDB(result);
} catch (InvalidProtocolBufferException e) { } catch (InvalidProtocolBufferException e) {
e.printStackTrace(); log.info("{}", e.toString());
} }
return null; return null;
}); });
@ -231,7 +231,7 @@ public class CollectPackets extends CollectPacketsServerImplBase {
// MongodbTest.insertMsgToMongoDB(result); // MongodbTest.insertMsgToMongoDB(result);
} catch (InvalidProtocolBufferException e) { } catch (InvalidProtocolBufferException e) {
e.printStackTrace(); log.info("{}", e.toString());
} }
return null; return null;
});*/ });*/

View File

@ -3,6 +3,7 @@ package com.yuandian.dataflow.statemachine;
import com.alipay.sofa.jraft.Closure; import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.Status; import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.entity.PeerId; import com.alipay.sofa.jraft.entity.PeerId;
import com.yuandian.dataflow.statemachine.operate.Operate;
import com.yuandian.dataflow.statemachine.rpc.RaftResponse; import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
import com.yuandian.dataflow.statemachine.state.State; import com.yuandian.dataflow.statemachine.state.State;
@ -18,17 +19,17 @@ import org.slf4j.LoggerFactory;
@Getter @Getter
@Setter @Setter
@ToString @ToString
public abstract class SyncClosure<T> implements Closure { public abstract class OperateClosure implements Closure {
// 状态机的统一响应 // 状态机的统一响应
private RaftResponse response; private RaftResponse response;
// 代表任务状态 // 代表任务状态
private T value; private Operate value;
public Object synclock = new Object();
public SyncClosure() {
public OperateClosure() {
} }
@ -40,9 +41,9 @@ public abstract class SyncClosure<T> implements Closure {
setResponse(response); setResponse(response);
} }
public void success(final State value) { public void success(final Operate value) {
final RaftResponse response = new RaftResponse(); final RaftResponse response = new RaftResponse();
response.setState(value); response.setOperate(value);
response.setSuccess(true); response.setSuccess(true);
setResponse(response); setResponse(response);
} }

View File

@ -16,9 +16,11 @@ import com.alipay.sofa.jraft.error.RaftException;
import com.alipay.sofa.jraft.error.RemotingException; import com.alipay.sofa.jraft.error.RemotingException;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader; import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter; import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter;
import com.yuandian.dataflow.statemachine.operate.Operate;
import com.yuandian.dataflow.statemachine.operate.Operate.OperateType;
import com.yuandian.dataflow.statemachine.rpc.RaftResponse; import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
import com.yuandian.dataflow.statemachine.rpc.SyncWorkerStateProcessor.RequestWorkerState;
import com.yuandian.dataflow.statemachine.rpc.SyncStateProcessor.RequestState; import com.yuandian.dataflow.statemachine.rpc.OperateProcessor.RequestOperate;
import com.yuandian.dataflow.statemachine.state.State; import com.yuandian.dataflow.statemachine.state.State;
import com.yuandian.dataflow.statemachine.state.StateFactory; import com.yuandian.dataflow.statemachine.state.StateFactory;
import com.yuandian.dataflow.statemachine.state.WorkerState; import com.yuandian.dataflow.statemachine.state.WorkerState;
@ -58,38 +60,64 @@ public class StateMachine extends StateMachineAdapter {
return state; return state;
} }
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void onApply(final Iterator iter) { public void onApply(final Iterator iter) {
while (iter.hasNext()) { while (iter.hasNext()) {
Operate op = null;
OperateClosure closure = null;
if (iter.done() != null) { if (iter.done() != null) {
// This task is applied by this node, get value from closure to avoid additional // This task is applied by this node, get value from closure to avoid additional
// parsing. // parsing.
var closure = (SyncClosure<State>)iter.done(); // 只支持单一个State. 全状态机只支持一种提交 closure = (OperateClosure)iter.done(); // 只支持单一个State. 全状态机只支持一种提交
// log.info("SyncDataClosure(done) taskState:{} leaderTerm:{} {}",this.state, this.leaderTerm, closure); // log.info("SyncDataClosure(done) taskState:{} leaderTerm:{} {}",this.state, this.leaderTerm, closure);
this.state = closure.getValue(); op = closure.getValue();
closure.success(state);
closure.run(Status.OK());
} else { } else {
// Have to parse FetchAddRequest from this user log. // Have to parse FetchAddRequest from this user log.
final ByteBuffer data = iter.getData(); final ByteBuffer data = iter.getData();
try { try {
state = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize( op = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize(
data.array(), State.class.getName()); data.array(),Operate.class.getName());
// log.info("SyncDataClosure(null) taskState:{} leaderTerm:{}", state, this.leaderTerm); // log.info("SyncDataClosure(null) taskState:{} leaderTerm:{}", state, this.leaderTerm);
} catch (CodecException e) { } catch (CodecException e) {
e.printStackTrace(); log.info("{}", e.toString());
} }
} }
if(op != null) {
switch(op.getType()) {
case PUT:
WorkerState ws = op.getValue();
state.getWorkers().put(ws.peerId, ws);
if(closure != null) {
closure.success(op);
closure.run(Status.OK());
}
break;
case REMOVE:
if(closure != null) {
closure.success(op);
closure.run(Status.OK());
}
break;
default:
break;
}
} else {
}
iter.next(); iter.next();
} }
} }
@ -123,15 +151,16 @@ public class StateMachine extends StateMachineAdapter {
var ws = state.getWorkers().get( ss.getCluster().getServerId() ); var ws = state.getWorkers().get( ss.getCluster().getServerId() );
if(ws == null) { if(ws == null) {
ws = new WorkerState(ss.getCluster().getServerId()); ws = new WorkerState(ss.getCluster().getServerId());
state.getWorkers().put(ss.getCluster().getServerId(), ws); // state.getWorkers().put(ss.getCluster().getServerId(), ws);
} }
ss.applyState(state, new SyncClosure<State>() { Operate op = new Operate(OperateType.PUT);
op.setValue(ws);
ss.applyOperate(op, new OperateClosure() {
@Override @Override
public void run(Status status) { public void run(Status status) {
log.debug("master update workerstate: {}", status); log.debug("master update workerstate: {}", status);
} }
}); });
}); });
@ -174,18 +203,22 @@ public class StateMachine extends StateMachineAdapter {
var ss = StateServerFactory.getStateServer(); var ss = StateServerFactory.getStateServer();
var ws = new WorkerState(ss.getCluster().getServerId()); var ws = new WorkerState(ss.getCluster().getServerId());
var request = new RequestWorkerState();
request.setWorkerState(ws);
log.debug("my: {} leader id {}",ss.getCluster().getServerId(), ss.getNode().getLeaderId()); log.debug("my: {} leader id {}",ss.getCluster().getServerId(), ss.getNode().getLeaderId());
RaftResponse resp;
resp = (RaftResponse)StateServerFactory.getStateServer().getRpcClient().invokeSync(ss.getNode().getLeaderId().getEndpoint(), request, 5000); var op = new Operate(OperateType.PUT);
if(resp == null) { op.setValue(ws);
log.debug("{} set WorkerState is error", resp);
Operate.CallOperate(op, new OperateClosure() {
@Override
public void run(Status status) {
log.info("{} {}", status, this.getResponse());
} }
log.debug("WorkerState is {}", resp); });
return; return;
} catch (InterruptedException | RemotingException e) { } catch (Exception e) {
e.printStackTrace(); log.info("{}", e.toString());
} }
super.onStartFollowing(ctx); super.onStartFollowing(ctx);
@ -199,6 +232,23 @@ public class StateMachine extends StateMachineAdapter {
@Override @Override
public void onStopFollowing(LeaderChangeContext ctx) { public void onStopFollowing(LeaderChangeContext ctx) {
log.debug("{} {}",ctx, StateServerFactory.getCluster().getServerId()); log.debug("{} {}",ctx, StateServerFactory.getCluster().getServerId());
var ss = StateServerFactory.getStateServer();
var ws = new WorkerState(ss.getCluster().getServerId());
log.debug("my: {} leader id {}",ss.getCluster().getServerId(), ss.getNode().getLeaderId());
var op = new Operate(OperateType.PUT);
op.setValue(ws);
Operate.CallOperate(op, new OperateClosure() {
@Override
public void run(Status status) {
log.info("{} {}", status, this.getResponse());
}
});
super.onStopFollowing(ctx); super.onStopFollowing(ctx);
} }

View File

@ -39,9 +39,10 @@ import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory;
import com.alipay.sofa.jraft.util.BytesUtil; import com.alipay.sofa.jraft.util.BytesUtil;
import com.alipay.sofa.jraft.util.Endpoint; import com.alipay.sofa.jraft.util.Endpoint;
import com.alipay.sofa.jraft.util.ThreadPoolUtil; import com.alipay.sofa.jraft.util.ThreadPoolUtil;
import com.yuandian.dataflow.statemachine.operate.Operate;
import com.yuandian.dataflow.statemachine.rpc.RaftResponse; import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
import com.yuandian.dataflow.statemachine.rpc.SyncWorkerStateProcessor.RequestWorkerState;
import com.yuandian.dataflow.statemachine.rpc.SyncStateProcessor.RequestState; import com.yuandian.dataflow.statemachine.rpc.OperateProcessor.RequestOperate;
import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft; import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft;
import com.yuandian.dataflow.statemachine.state.State; import com.yuandian.dataflow.statemachine.state.State;
import com.yuandian.dataflow.statemachine.state.WorkerState; import com.yuandian.dataflow.statemachine.state.WorkerState;
@ -150,9 +151,9 @@ public class StateServerFactory {
try { try {
cluster.getRpcServer().registerProcessor((RpcProcessor<?>) pRaftClass.newInstance()); cluster.getRpcServer().registerProcessor((RpcProcessor<?>) pRaftClass.newInstance());
} catch (InstantiationException e) { } catch (InstantiationException e) {
e.printStackTrace(); log.info("{}", e.toString());
} catch (IllegalAccessException e) { } catch (IllegalAccessException e) {
e.printStackTrace(); log.info("{}", e.toString());
} }
}); });
node = cluster.start(); node = cluster.start();
@ -178,26 +179,24 @@ public class StateServerFactory {
public void useFsmStateNotLock(Consumer<State> dofunc) { public void useFsmStateNotLock(Consumer<State> dofunc) {
getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {
@Override
public void run(Status status, long index, byte[] reqCtx) {
var state = ss.fsm.getState(); var state = ss.fsm.getState();
synchronized(state) {
dofunc.accept(state); dofunc.accept(state);
} }
// getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() { } );
// @Override
// public void run(Status status, long index, byte[] reqCtx) {
// var state = ss.fsm.getState();
// dofunc.accept(state);
// }
// } );
} }
public void applyState(State state, SyncClosure<State> closure) { public void applyOperate(Operate op, OperateClosure closure) {
// 所有的提交都必须再leader进行 // 所有的提交都必须再leader进行
if (!ss.isLeader()) { if (!ss.isLeader()) {
ss.handlerNotLeaderError(closure); ss.handlerNotLeaderError(closure);
@ -205,9 +204,10 @@ public class StateServerFactory {
} }
try { try {
closure.setValue(state);
closure.setValue(op);
final Task task = new Task(); final Task task = new Task();
task.setData(ByteBuffer.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(state))); task.setData(ByteBuffer.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(op)));
task.setDone(closure); // 确认所有数据 一致, 不需要加锁 task.setDone(closure); // 确认所有数据 一致, 不需要加锁
StateServerFactory.getStateServer().getNode().apply(task); StateServerFactory.getStateServer().getNode().apply(task);
} catch (CodecException e) { } catch (CodecException e) {
@ -218,36 +218,7 @@ public class StateServerFactory {
} }
} }
public void applyWorkerState(WorkerState state, SyncClosure<State> closure) {
log.debug("applyWorkerState");
if (!ss.isLeader()) {
ss.handlerNotLeaderError(closure);
return;
}
StateServerFactory.getStateServer().useFsmStateNotLock((fsmState)->{
var wmap = fsmState.getWorkers();
var wstate = wmap.get(state.getPeerId());
if(wstate == null) {
wmap.put(state.getPeerId(), state);
}
try {
final Task task = new Task();
closure.setValue(fsmState);
task.setData(ByteBuffer.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(fsmState)));
task.setDone(closure);
StateServerFactory.getStateServer().getNode().apply(task); // 提交数据
} catch (CodecException e) {
String errorMsg = "Fail to encode TaskState";
log.debug("{}:{}",errorMsg, e);
closure.failure(errorMsg, PeerId.emptyPeer());
closure.run(new Status(RaftError.EINTERNAL, errorMsg));
}
});
}
public RaftResponse redirect() { public RaftResponse redirect() {
final RaftResponse response = new RaftResponse(); final RaftResponse response = new RaftResponse();
@ -261,7 +232,7 @@ public class StateServerFactory {
return response; return response;
} }
public void handlerNotLeaderError(final SyncClosure closure) { public void handlerNotLeaderError(final OperateClosure closure) {
closure.failure("Not leader.", redirect().getRedirect()); closure.failure("Not leader.", redirect().getRedirect());
closure.run(new Status(RaftError.EPERM, "Not leader")); closure.run(new Status(RaftError.EPERM, "Not leader"));
} }
@ -289,7 +260,7 @@ public class StateServerFactory {
public static void main(String[] args) throws InterruptedException, RemotingException { public static void main(String[] args) throws InterruptedException, RemotingException {
var rpcClient = new BoltRaftRpcFactory().createRpcClient(); var rpcClient = new BoltRaftRpcFactory().createRpcClient();
rpcClient.init(new CliOptions()); rpcClient.init(new CliOptions());
var resp = rpcClient.invokeSync(new Endpoint("localhost",4441), new RequestState(), 5000); var resp = rpcClient.invokeSync(new Endpoint("localhost",4441), new RequestOperate(), 5000);
log.info("{}", resp); log.info("{}", resp);
} }
} }

View File

@ -0,0 +1,82 @@
package com.yuandian.dataflow.statemachine.operate;
import java.io.Serializable;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.error.RemotingException;
import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.yuandian.dataflow.projo.Response;
import com.yuandian.dataflow.statemachine.OperateClosure;
import com.yuandian.dataflow.statemachine.StateServerFactory;
import com.yuandian.dataflow.statemachine.rpc.OperateProcessor;
import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
import com.yuandian.dataflow.statemachine.state.StateFactory;
import lombok.Data;
import lombok.var;
import lombok.extern.slf4j.Slf4j;
/**
* 操作
*
* @author eson
*/
@Slf4j
@Data
@var
public class Operate implements Serializable {
public static enum OperateType {
PUT, REMOVE;
}
private OperateType type;
private Object value;
public Operate(OperateType t) {
this.type = t;
}
public <T> T getValue() {
return (T) this.value;
};
public <T> void setValue(T value) {
this.value = value;
return;
};
public static void CallOperate(Operate op, OperateClosure closure) {
var ss = StateServerFactory.getStateServer();
if (StateServerFactory.getStateServer().isLeader()) {
ss.applyOperate(op, closure);
return;
}
var request = new OperateProcessor.RequestOperate();
request.setOperate(op);
var leaderId = StateServerFactory.getStateServer().getNode().getLeaderId();
try {
ss.getRpcClient().invokeAsync(leaderId.getEndpoint(),
request, new InvokeCallback() {
@Override
public void complete(Object result, Throwable err) {
log.info("{}", result);
var resp = (RaftResponse)result;
closure.setResponse(resp);
closure.success(resp.getOperate());
}
}, 5000);
} catch (InterruptedException | RemotingException e) {
// TODO Auto-generated catch block
closure.failure("failure", null);
log.info("{}", e.toString());
}
}
}

View File

@ -17,8 +17,9 @@ import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.rpc.RpcContext; import com.alipay.sofa.jraft.rpc.RpcContext;
import com.alipay.sofa.jraft.rpc.RpcProcessor; import com.alipay.sofa.jraft.rpc.RpcProcessor;
import com.yuandian.dataflow.statemachine.StateServerFactory; import com.yuandian.dataflow.statemachine.StateServerFactory;
import com.yuandian.dataflow.statemachine.SyncClosure; import com.yuandian.dataflow.statemachine.OperateClosure;
import com.yuandian.dataflow.statemachine.StateServerFactory.StateServer; import com.yuandian.dataflow.statemachine.StateServerFactory.StateServer;
import com.yuandian.dataflow.statemachine.operate.Operate;
import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft; import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft;
import com.yuandian.dataflow.statemachine.state.State; import com.yuandian.dataflow.statemachine.state.State;
import com.alipay.sofa.jraft.entity.PeerId; import com.alipay.sofa.jraft.entity.PeerId;
@ -39,7 +40,7 @@ import lombok.extern.slf4j.Slf4j;
*/ */
@Slf4j @Slf4j
@ProcessorRaft @ProcessorRaft
public class SyncStateProcessor implements RpcProcessor<SyncStateProcessor.RequestState> { public class OperateProcessor implements RpcProcessor<OperateProcessor.RequestOperate> {
/** /**
* 同步状态时, 需要用的结构类. 新增的状态可以在 State结构里添加 * 同步状态时, 需要用的结构类. 新增的状态可以在 State结构里添加
@ -51,20 +52,20 @@ public class SyncStateProcessor implements RpcProcessor<SyncStateProcessor.Reque
@Getter @Getter
@Setter @Setter
@ToString @ToString
public static class RequestState implements Serializable { public static class RequestOperate implements Serializable {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
private State state; private Operate operate;
} }
@Override @Override
public void handleRequest(RpcContext rpcCtx, RequestState request) { public void handleRequest(RpcContext rpcCtx, RequestOperate request) {
log.info("request: {}", request); log.info("request: {}", request);
final SyncClosure<State> closure = new SyncClosure<State>() { final OperateClosure closure = new OperateClosure() {
@Override @Override
public void run(Status status) { public void run(Status status) {
rpcCtx.sendResponse(getResponse()); rpcCtx.sendResponse(getResponse());
@ -72,12 +73,12 @@ public class SyncStateProcessor implements RpcProcessor<SyncStateProcessor.Reque
} }
}; };
StateServerFactory.getStateServer().applyState(request.getState(), closure); StateServerFactory.getStateServer().applyOperate(request.getOperate(), closure);
} }
@Override @Override
public String interest() { public String interest() {
return RequestState.class.getName(); return RequestOperate.class.getName();
} }

View File

@ -9,6 +9,7 @@ package com.yuandian.dataflow.statemachine.rpc;
import java.io.Serializable; import java.io.Serializable;
import com.alipay.sofa.jraft.entity.PeerId; import com.alipay.sofa.jraft.entity.PeerId;
import com.yuandian.dataflow.statemachine.operate.Operate;
import com.yuandian.dataflow.statemachine.state.State; import com.yuandian.dataflow.statemachine.state.State;
import lombok.Getter; import lombok.Getter;
@ -30,7 +31,7 @@ public class RaftResponse implements Serializable {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
private State state; private Operate operate;
private boolean success; private boolean success;

View File

@ -1,78 +0,0 @@
/**
* description
*
* @author eson
*2022年7月12日-11:10:54
*/
package com.yuandian.dataflow.statemachine.rpc;
import java.io.Serializable;
import java.nio.ByteBuffer;
import com.alipay.remoting.exception.CodecException;
import com.alipay.remoting.serialization.SerializerManager;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.entity.Task;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.rpc.RpcContext;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
import com.lmax.disruptor.WorkProcessor;
import com.yuandian.dataflow.statemachine.StateServerFactory;
import com.yuandian.dataflow.statemachine.SyncClosure;
import com.yuandian.dataflow.statemachine.StateServerFactory.StateServer;
import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft;
import com.yuandian.dataflow.statemachine.state.State;
import com.yuandian.dataflow.statemachine.state.WorkerState;
import com.alipay.sofa.jraft.entity.PeerId;
import org.apache.commons.lang.StringUtils;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.var;
import lombok.extern.slf4j.Slf4j;
/**
* description
*
* @author eson
*2022年7月12日-11:10:54
*/
@Slf4j
@ProcessorRaft
public class SyncWorkerStateProcessor implements RpcProcessor<SyncWorkerStateProcessor.RequestWorkerState> {
@Getter
@Setter
@ToString
public static class RequestWorkerState implements Serializable {
private static final long serialVersionUID = 1L;
private WorkerState workerState;
}
@Override
public void handleRequest(RpcContext rpcCtx, RequestWorkerState request) {
log.info("RequestWorkerState: {}", request);
final SyncClosure<State> closure = new SyncClosure< State>() {
@Override
public void run(Status status) {
rpcCtx.sendResponse(getResponse());
log.info("{}", status);
}
};
StateServerFactory.getStateServer().applyWorkerState(request.getWorkerState(), closure);
}
@Override
public String interest() {
return RequestWorkerState.class.getName();
}
}

View File

@ -20,7 +20,9 @@ import com.google.protobuf.Any;
import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest; import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest;
import com.yuandian.dataflow.proto.msgtype.BacktrackingFlowOuterClass; import com.yuandian.dataflow.proto.msgtype.BacktrackingFlowOuterClass;
import com.yuandian.dataflow.statemachine.StateServerFactory; import com.yuandian.dataflow.statemachine.StateServerFactory;
import com.yuandian.dataflow.statemachine.SyncClosure; import com.yuandian.dataflow.statemachine.operate.Operate;
import com.yuandian.dataflow.statemachine.operate.Operate.OperateType;
import com.yuandian.dataflow.statemachine.OperateClosure;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
@ -46,6 +48,7 @@ public class StateFactory {
public static class PeerIdCap { public static class PeerIdCap {
private PeerId peer; private PeerId peer;
private long cap; private long cap;
public PeerIdCap(PeerId pid, long cap) { public PeerIdCap(PeerId pid, long cap) {
this.peer = pid; this.peer = pid;
this.cap = cap; this.cap = cap;
@ -65,8 +68,6 @@ public class StateFactory {
var ss = StateServerFactory.getStateServer(); var ss = StateServerFactory.getStateServer();
// var state = ss.getFsm().getState(); // var state = ss.getFsm().getState();
ArrayList<PeerIdCap> pclist = new ArrayList<PeerIdCap>();
ss.useFsmStateNotLock((state) -> { ss.useFsmStateNotLock((state) -> {
alivePeers.forEach((peer) -> { alivePeers.forEach((peer) -> {
WorkerState ws = state.getWorkers().get(peer); WorkerState ws = state.getWorkers().get(peer);
@ -74,30 +75,16 @@ public class StateFactory {
var cap = 100 - ws.getTaskQueueSize(); var cap = 100 - ws.getTaskQueueSize();
log.debug("cap :{} peer: {}", cap, peer); log.debug("cap :{} peer: {}", cap, peer);
if (cap <= 0) { if (cap <= 0) {
return ; return;
} }
ws.setUpdateAt(Instant.now()); ws.setUpdateAt(Instant.now());
ws.setTaskQueueSize(100); ws.setTaskQueueSize(100);
var pc = new PeerIdCap(peer, cap);
pc.setCap(cap);
;
pclist.add(pc);
}
});
ss.applyState(state, new SyncClosure<State>() { var op = new Operate(OperateType.PUT);
@Override op.setValue(ws);
public void run(Status status) {
log.info("任务队列更新成功 {}", this.getValue().getWorkers());
pclist.forEach((peercap) -> {
if (peercap.getCap() <= 0) {
return ;
}
var request = new PacketsRequest(); var request = new PacketsRequest();
for (int i = 0; i < peercap.getCap(); i++) { for (int i = 0; i < cap; i++) {
var p = Any.pack( var p = Any.pack(
BacktrackingFlowOuterClass.BacktrackingFlow.newBuilder() BacktrackingFlowOuterClass.BacktrackingFlow.newBuilder()
.setTableId(10086) .setTableId(10086)
@ -105,35 +92,33 @@ public class StateFactory {
request.getPackets().add(p); request.getPackets().add(p);
} }
// Operate.CallOperate(op, new OperateClosure() {
// @Override
// public void run(Status status) {
// // TODO Auto-generated method stub
// log.info("{}", status);
// }
// });
try { try {
log.debug("rpc {}", peercap); ss.getRpcClient().invokeAsync(peer.getEndpoint(),
ss.getRpcClient().invokeAsync(peercap.peer.getEndpoint() ,
request, new InvokeCallback() { request, new InvokeCallback() {
@Override @Override
public void complete(Object result, Throwable err) { public void complete(Object result, Throwable err) {
if (err != null) { log.info("{}", result);
err.printStackTrace();
} else {
log.info("{} peer result", result);
}
} }
}, 5000); }, 5000);
} catch (InterruptedException | RemotingException e) { } catch (InterruptedException | RemotingException e) {
e.printStackTrace(); log.info("error send packets {}", e.toString());
} }
});
} }
}); });
}); });
// ss.applyState(state, new SyncClosure<State>() { // ss.applyState(state, new SyncClosure<State>() {
// public void run(Status status) { // public void run(Status status) {
// log.debug("{}", status); // log.debug("{}", status);
@ -145,7 +130,7 @@ public class StateFactory {
Thread.sleep(5000); Thread.sleep(5000);
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); log.info("{}", e.toString());
} }
} }

View File

@ -5,12 +5,16 @@ import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress; import com.mongodb.ServerAddress;
import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase; import com.mongodb.client.MongoDatabase;
import lombok.extern.slf4j.Slf4j;
import org.bson.Document; import org.bson.Document;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@Slf4j
public class MongodbTest { public class MongodbTest {
public static <T> void insertMsgToMongoDB(T obj) { public static <T> void insertMsgToMongoDB(T obj) {
@ -34,7 +38,7 @@ public class MongodbTest {
System.err.println("insert success"); System.err.println("insert success");
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); log.info("{}", e.toString());
} }
} }

View File

@ -10,7 +10,7 @@ import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory; import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory;
import com.alipay.sofa.jraft.util.Endpoint; import com.alipay.sofa.jraft.util.Endpoint;
import com.yuandian.dataflow.statemachine.rpc.RaftResponse; import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
import com.yuandian.dataflow.statemachine.rpc.SyncStateProcessor.RequestState; import com.yuandian.dataflow.statemachine.rpc.OperateProcessor.RequestOperate;
import com.yuandian.dataflow.statemachine.state.State; import com.yuandian.dataflow.statemachine.state.State;
import lombok.var; import lombok.var;
@ -20,53 +20,53 @@ import lombok.extern.slf4j.Slf4j;
public class StateMachineTest { public class StateMachineTest {
@Test @Test
void testOnApply() throws InterruptedException, RemotingException { void testOnApply() throws InterruptedException, RemotingException {
var rpcClient = new BoltRaftRpcFactory().createRpcClient(); // var rpcClient = new BoltRaftRpcFactory().createRpcClient();
rpcClient.init(new CliOptions()); // rpcClient.init(new CliOptions());
var fstate = new State(); // var fstate = new State();
var fdata = new RequestState(); // var fdata = new RequestOperate();
fdata.setState(fstate); // fdata.setOperate(fstate);
var leader = new Endpoint("localhost",4441); // var leader = new Endpoint("localhost",4441);
RaftResponse resp = (RaftResponse)rpcClient.invokeSync(leader, fdata // RaftResponse resp = (RaftResponse)rpcClient.invokeSync(leader, fdata
, 5000); // , 5000);
log.info("{}", resp); // log.info("{}", resp);
if( resp != null && !resp.isSuccess() ) { // if( resp != null && !resp.isSuccess() ) {
leader = resp.getRedirect().getEndpoint(); // leader = resp.getRedirect().getEndpoint();
resp = (RaftResponse)rpcClient.invokeSync(resp.getRedirect().getEndpoint(), fdata // resp = (RaftResponse)rpcClient.invokeSync(resp.getRedirect().getEndpoint(), fdata
, 5000); // , 5000);
log.info("{}", resp); // log.info("{}", resp);
} // }
int i = 0 ; // int i = 0 ;
while(true) { // while(true) {
var state = new State(); // var state = new State();
var request = new RequestState(); // 创建请求 // var request = new RequestOperate(); // 创建请求
request.setState(state); // 添加请求的参数 // request.setState(state); // 添加请求的参数
var wstate = state.getWorkers(); // var wstate = state.getWorkers();
// state.getWorker().setPeerId( PeerId.parsePeer("localhost:2222") ); // // state.getWorker().setPeerId( PeerId.parsePeer("localhost:2222") );
// state.getWorker().setTaskQueueSize(i); // // state.getWorker().setTaskQueueSize(i);
var pi = i ; // var pi = i ;
i++; // i++;
if (i >= 1000) { // if (i >= 1000) {
break; // break;
} // }
rpcClient.invokeAsync(leader, request, new InvokeCallback() { // rpcClient.invokeAsync(leader, request, new InvokeCallback() {
@Override // @Override
public void complete(Object result, Throwable err) { // public void complete(Object result, Throwable err) {
// ResponseSM resp = (ResponseSM)result; // // ResponseSM resp = (ResponseSM)result;
log.info("{} {} {}", result, err, pi); // log.info("{} {} {}", result, err, pi);
} // }
@Override // @Override
public Executor executor() { // public Executor executor() {
return null; // return null;
} // }
} , 5000); // } , 5000);
} // }
} }
} }