TODO: 解决数据不到达的问题

This commit is contained in:
huangsimin 2022-07-27 23:37:40 +08:00
parent 5298c168ef
commit 7795362d5a
14 changed files with 72 additions and 86 deletions

16
.vscode/tasks.json vendored
View File

@ -5,24 +5,30 @@
"label": "restart",
"type": "shell",
"command": "sh restart.sh",
"isBackground": true,
"isBackground": false,
"presentation": {
"echo": true,
"reveal": "silent",
"reveal": "always",
"focus": false,
"panel": "shared",
"panel": "new",
"showReuseMessage": true,
"clear": false,
"close": true
}
},
},
{
"label": "stopall",
"type": "shell",
"command": "sh stop.sh",
"presentation": {
"echo": true,
"reveal": "always",
"focus": false,
"panel": "shared",
"close": true
}
},
}
]
}

View File

@ -11,9 +11,9 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<java.version>8</java.version>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<java.version>11</java.version>
<protobuf.version>3.20.1</protobuf.version>
<protostuff.version>1.7.4</protostuff.version>

View File

@ -1,3 +1,4 @@
#! /bin/bash
sh stop.sh & rm raftdata/ -rf && mvn package && truncate -s 0 screenlog.0 && sh start.sh
sh stop.sh & rm raftdata/ -rf && mvn package && truncate -s 0 screenlog.0
sh start.sh

View File

@ -23,7 +23,7 @@ import com.yuandian.dataflow.statemachine.operate.Operate;
import com.yuandian.dataflow.statemachine.operate.Operate.OperateType;
import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
import com.yuandian.dataflow.statemachine.rpc.OperateProcessor;
import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft;
import com.yuandian.dataflow.statemachine.state.State;
@ -36,7 +36,7 @@ import lombok.extern.slf4j.Slf4j;
* description
*
* @author eson
*2022年7月21日-13:48:01
* 2022年7月21日-13:48:01
*/
@Slf4j
@ -51,45 +51,39 @@ public class PacketsProcessor implements RpcProcessor<PacketsProcessor.PacketsRe
@Override
public void handleRequest(RpcContext rpcCtx, PacketsRequest request) {
// StateServerFactory.getStateServer().updateFsmStateAsync(s, onCompleted);
// StateServerFactory.getStateServer().updateFsmStateAsync(s, onCompleted);
var resp = new RaftResponse<>();
resp.setSuccess(true);
rpcCtx.sendResponse(resp);
var resp = new RaftResponse<>();
resp.setSuccess(true);
rpcCtx.sendResponse(resp);
var ss = StateServerFactory.getStateServer();
log.debug("{} handler request.packets.size(): {}", StateServerFactory.getServerId(), request.packets.size());
var ss = StateServerFactory.getStateServer();
log.debug("{} handler request.packets.size(): {}", StateServerFactory.getServerId(), request.packets.size());
ss.readIndexState( new GenericClosure<State>() {
ss.readIndexState(new GenericClosure<State>() {
@Override
public void run(Status status) {
log.debug("status {}", status);
if(status.isOk()) {
var state = this.getValue();
var ws = state.getWorkers().get(StateServerFactory.getServerId());
@Override
public void run(Status status) {
log.debug("status {}", status);
if (status.isOk()) {
var state = this.getValue();
var ws = state.getWorkers().get(StateServerFactory.getServerId());
ws.setTaskQueueSize( ws.getTaskQueueSize() - request.packets.size());
ws.setUpdateAt(Instant.now());
log.debug("workerState taskQueueSize: {} psize: {} state {}", ws.getTaskQueueSize(), request.packets.size(), state.getWorkers().size());
Operate.CallOperate(new Operate(OperateType.PUT,ws), new GenericClosure<Operate>() {
@Override
public void run(Status status) {
if(status.isOk()) {
log.info("{}", resp);
}
ws.setTaskQueueSize(ws.getTaskQueueSize() - request.packets.size());
ws.setUpdateAt(Instant.now());
log.debug("workerState taskQueueSize: {} psize: {} state {}", ws.getTaskQueueSize(),
request.packets.size(), state.getWorkers().size());
Operate.CallOperate(new Operate(OperateType.PUT, ws), new GenericClosure<Operate>() {
@Override
public void run(Status status) {
if (status.isOk()) {
log.info("{}", resp);
}
});
}
}
});
}
} );
}
});
}
@Override

View File

@ -52,10 +52,7 @@ public final class Doc {
@BsonProperty("responseIp")
public int responseIp ;
public static void main(String[] args) {
MongoClient mgo = MongoClients.create("mongodb://yuandian:yuandian123@192.168.1.113:27017");
CodecProvider pojoCodecProvider = PojoCodecProvider.builder().register("com.yuandian.dataflow.projo").build();

View File

@ -62,16 +62,16 @@ public class MasterFactory {
WorkerState ws = state.getWorkers().get(peer);
if (ws != null) {
//
var canDealTasks = MAX_TASKS - ws.getTaskQueueSize();
log.debug("cap :{} peer: {}", canDealTasks, peer);
if (canDealTasks <= 0) {
var canTasks = MAX_TASKS - ws.getTaskQueueSize();
log.debug("cap :{} peer: {}", canTasks, peer);
if (canTasks <= 0) {
return;
}
ws.setUpdateAt(Instant.now());
ws.setTaskQueueSize(MAX_TASKS);
var request = new PacketsRequest();
for (int i = 0; i < canDealTasks; i++) {
for (int i = 0; i < canTasks; i++) {
var p = Any.pack(BacktrackingFlowOuterClass.BacktrackingFlow
.newBuilder()
.setTableId(10086)
@ -82,14 +82,16 @@ public class MasterFactory {
Operate.CallOperate(new Operate(OperateType.PUT, ws), new GenericClosure<Operate>() {
@Override
public void run(Status status) {
log.info("{}", status);
log.info("PacketsRequest run {}", status);
try {
ss.getRpcClient().invokeAsync(peer.getEndpoint(), request,
new InvokeCallback() {
@Override
public void complete(Object result, Throwable err) {
log.info("PacketsRequest: {}", result);
if(err != null) {
log.debug("{}", err);
}
log.debug("PacketsRequest: {}", result);
}
}, 5000);
} catch (InterruptedException | RemotingException e) {

View File

@ -211,7 +211,6 @@ public class StateMachine extends StateMachineAdapter {
log.info("{} {}", status, this.getResponse());
}
});
return;
} catch (Exception e) {

View File

@ -7,6 +7,7 @@
package com.yuandian.dataflow.statemachine;
import java.io.File;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.util.Set;
import java.util.concurrent.Executor;
@ -141,7 +142,7 @@ public class StateServerFactory {
nodeOptions.setInitialConf(conf);
File RaftDataFile = new File(String.format("./raftdata/%d", port) );
log.info("{}",RaftDataFile.mkdirs());
log.info("mkdirs: {}",RaftDataFile.mkdirs());
nodeOptions.setLogUri( String.format("./raftdata/%d/logs", port) );
nodeOptions.setRaftMetaUri(String.format("./raftdata/%d/rafts", port));
@ -150,18 +151,14 @@ public class StateServerFactory {
nodeOptions.setFsm(fsm);
cluster = new RaftGroupService(groupId, serverId, nodeOptions);
Set<Class<?>> scans = new Reflections("com.yuandian.dataflow").getTypesAnnotatedWith(ProcessorRaft.class);
scans.forEach((pRaftClass)->{
try {
cluster.getRpcServer().registerProcessor((RpcProcessor<?>) pRaftClass.newInstance());
} catch (InstantiationException e) {
cluster.getRpcServer().registerProcessor((RpcProcessor<?>) pRaftClass.getDeclaredConstructor().newInstance());
} catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException | NoSuchMethodException | SecurityException e) {
log.info("{}", e.toString());
} catch (IllegalAccessException e) {
log.info("{}", e.toString());
}
}
});
node = cluster.start();
@ -219,11 +216,9 @@ public class StateServerFactory {
closure.run(new Status(RaftError.EINTERNAL, errorMsg));
}
}
public RaftResponse redirect() {
final RaftResponse response = new RaftResponse();
public <T> RaftResponse<T> redirect() {
final RaftResponse<T> response = new RaftResponse<T>();
response.setSuccess(false);
if (this.node != null) {
final PeerId leader = this.node.getLeaderId();
@ -234,16 +229,12 @@ public class StateServerFactory {
return response;
}
public void handlerNotLeaderError(final GenericClosure closure) {
public <T> void handlerNotLeaderError(final GenericClosure<T> closure) {
closure.failure("Not leader.", redirect().getRedirect());
closure.run(new Status(RaftError.EPERM, "Not leader"));
}
private Executor createReadIndexExecutor() {
// final StoreEngineOptions opts = new StoreEngineOptions();
// return StoreEngineHelper.createReadIndexExecutor(opts.getReadIndexCoreThreads());
return ThreadPoolUtil.newBuilder() //
.poolName("ReadIndexPool") //
.enableMetric(true) //

View File

@ -71,7 +71,7 @@ public class Operate implements Serializable {
@Override
public void complete(Object result, Throwable err) {
log.info("{}", result);
log.info("Object result {}", result);
//TODO: 解决回调的次序问题
var resp = (RaftResponse<Operate>) result;
closure.setResponse(resp);

View File

@ -73,8 +73,7 @@ public class OperateProcessor implements RpcProcessor<OperateProcessor.OperateRe
rpcCtx.sendResponse(getResponse());
return;
}
if(status.getRaftError() == RaftError.EPERM) {
//TODO: Not leader 需要转发
log.info("{}", status);

View File

@ -33,7 +33,6 @@ public class RaftResponse<T> implements Serializable {
private T value;
private boolean success;
/**
* redirect peer id

View File

@ -40,6 +40,7 @@ import lombok.extern.slf4j.Slf4j;
*/
@DisplayName("AppTest")
@Slf4j
@var
public class AppTest {

View File

@ -13,7 +13,7 @@ import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
import com.yuandian.dataflow.statemachine.rpc.OperateProcessor.OperateRequest;
import com.yuandian.dataflow.statemachine.state.State;
import lombok.var;
import lombok.extern.slf4j.Slf4j;
@Slf4j

View File

@ -3,17 +3,14 @@ screen -S raft-0 -X quit
screen -S raft-1 -X quit
# screen -S raft-2 -X quit
sleep 5s
sleep 1s
VERSION=1.0.0-SNAPSHOT
screen -dmS raft-0 -L java -jar target/dataflow-${VERSION}.jar 0
screen -dmS raft-1 -L java -jar target/dataflow-${VERSION}.jar 1
# screen -dmS raft-2 -L java -jar target/dataflow-${VERSION}.jar 2
sleep 1
sleep 0.5s
screen -S raft-0 -X logfile flush 0
screen -S raft-1 -X logfile flush 0