TODO: 完善逻辑

This commit is contained in:
huangsimin 2022-07-26 17:32:51 +08:00
parent 849821fd8b
commit 159e25ddc6
2 changed files with 25 additions and 23 deletions

View File

@ -52,13 +52,14 @@ public class PacketsProcessor implements RpcProcessor<PacketsProcessor.PacketsRe
@Override
public void handleRequest(RpcContext rpcCtx, PacketsRequest request) {
// StateServerFactory.getStateServer().updateFsmStateAsync(s, onCompleted);
log.info("request.packets.size(): {}",request.packets.size());
var resp = new RaftResponse();
resp.setMsg(rpcCtx.getRemoteAddress());
resp.setSuccess(true);
var ss = StateServerFactory.getStateServer();
log.info("{} handler request.packets.size(): {}", ss.getNode().getNodeId().getPeerId() ,request.packets.size());
ss.useFsmStateNotLock((state)->{
var work = state.getWorkers().get( ss.getCluster().getServerId());
@ -75,7 +76,9 @@ public class PacketsProcessor implements RpcProcessor<PacketsProcessor.PacketsRe
@Override
public void run(Status status) {
log.info("{}", this.getResponse());
rpcCtx.sendResponse(this.getResponse());
var resp = new RaftResponse();
resp.setRedirect(StateServerFactory.getStateServer().getNode().getLeaderId());
rpcCtx.sendResponse(resp);
}
});

View File

@ -80,9 +80,6 @@ public class StateFactory {
ws.setUpdateAt(Instant.now());
ws.setTaskQueueSize(100);
var op = new Operate(OperateType.PUT);
op.setValue(ws);
var request = new PacketsRequest();
for (int i = 0; i < cap; i++) {
var p = Any.pack(
@ -92,27 +89,29 @@ public class StateFactory {
request.getPackets().add(p);
}
// Operate.CallOperate(op, new OperateClosure() {
// @Override
// public void run(Status status) {
// // TODO Auto-generated method stub
// log.info("{}", status);
// }
// });
var op = new Operate(OperateType.PUT);
op.setValue(ws);
Operate.CallOperate(op, new OperateClosure() {
@Override
public void run(Status status) {
try {
ss.getRpcClient().invokeAsync(peer.getEndpoint(),
request, new InvokeCallback() {
log.info("{}", status);
@Override
public void complete(Object result, Throwable err) {
log.info("{}", result);
}
try {
ss.getRpcClient().invokeAsync(peer.getEndpoint(),
request, new InvokeCallback() {
}, 5000);
} catch (InterruptedException | RemotingException e) {
log.info("error send packets {}", e.toString());
}
@Override
public void complete(Object result, Throwable err) {
log.info("{}", result);
}
}, 5000);
} catch (InterruptedException | RemotingException e) {
log.info("error send packets {}", e.toString());
}
}
});
}
});