TODO: 更新状态的 重新设计 避免自锁

This commit is contained in:
huangsimin 2022-07-21 16:32:48 +08:00
parent 08fbedeccc
commit 761a8dd664
4 changed files with 54 additions and 37 deletions

View File

@ -45,6 +45,7 @@ public class PacketsProcessor implements RpcProcessor<PacketsProcessor.PacketsRe
log.info("{}",request.packets.size());
var resp = new ResponseSM();
resp.setMsg(rpcCtx.getRemoteAddress());
rpcCtx.sendResponse(resp);
}

View File

@ -126,6 +126,7 @@ public class StateServerFactory {
int port = serverId.getPort();
NodeOptions nodeOptions = new NodeOptions();
nodeOptions.setElectionTimeoutMs(1000);
nodeOptions.setSnapshotLogIndexMargin(3600);
nodeOptions.setInitialConf(conf);
@ -140,7 +141,8 @@ public class StateServerFactory {
nodeOptions.setFsm(fsm);
cluster = new RaftGroupService(groupId, serverId, nodeOptions);
Set<Class<?>> scans = new Reflections("com.yuandian.dataflow").getTypesAnnotatedWith(ProcessorRaft.class);
scans.forEach((pRaftClass)->{

View File

@ -6,13 +6,16 @@
*/
package com.yuandian.dataflow.statemachine.state;
import java.time.Instant;
import com.alibaba.nacos.shaded.io.grpc.netty.shaded.io.netty.handler.codec.http.HttpContentEncoder.Result;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.error.RemotingException;
import com.google.protobuf.Any;
import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest;
import com.yuandian.dataflow.proto.msgtype.BacktrackingFlowOuterClass;
import com.yuandian.dataflow.statemachine.StateServerFactory;
import com.yuandian.dataflow.statemachine.SyncClosure;
import lombok.Getter;
import lombok.Setter;
@ -43,47 +46,58 @@ public class StateFactory {
if (alivePeers != null) {
var ss = StateServerFactory.getStateServer();
StateServerFactory.getStateServer().useFsmStateAsync((state) -> {
synchronized(alivePeers){
alivePeers.forEach((peer) -> {
var ws = state.getWorkers().get(peer);
if (ws != null) {
var cap = 100 - ws.getTaskQueueSize();
if (cap > 0) {
log.debug("{}", cap);
var request = new PacketsRequest();
for(int i = 0; i < cap ; i++ ) {
var p = Any.pack(
BacktrackingFlowOuterClass.BacktrackingFlow.newBuilder()
.setTableId(10086)
.build()
);
synchronized (alivePeers) {
alivePeers.forEach((peer) -> {
WorkerState ws = state.getWorkers().get(peer);
if (ws != null) {
var cap = 100 - ws.getTaskQueueSize();
if (cap > 0) {
log.debug("{}", cap);
var request = new PacketsRequest();
for (int i = 0; i < cap; i++) {
var p = Any.pack(
BacktrackingFlowOuterClass.BacktrackingFlow.newBuilder()
.setTableId(10086)
.build());
request.getPackets().add(p);
}
try {
var result = ss.getRpcClient().invokeSync(ws.getPeerId().getEndpoint(), request, 5000);
log.info("{}", result);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (RemotingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
try {
var result = ss.getRpcClient().invokeSync(ws.getPeerId().getEndpoint(),
request, 5000);
log.info("{}", result);
ws.setUpdateAt( Instant.now() );
ws.setTaskQueueSize(ws.getTaskQueueSize() - cap);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (RemotingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
alivePeers.notifyAll();
});
}
ss.applyState(state, new SyncClosure<State>() {
public void run(Status status) {
log.debug("{}", status);
};
} );
alivePeers.notifyAll();
});
}
return null;
});
synchronized(alivePeers){
synchronized (alivePeers) {
alivePeers.wait(5000);
}
}
Thread.sleep(2000);

View File

@ -20,11 +20,11 @@
</appender>
<!-- <root level="info">
<!-- <root level="debug">
<appender-ref ref="CONSOLE" />
</root> -->
<logger name="com.yuandian.dataflow" level="debug">
<appender-ref ref="CONSOLE" />
<logger name="com.yuandian.dataflow" level="debug|info">
<appender-ref ref="CONSOLE"/>
</logger>
</configuration>