调试leader

This commit is contained in:
huangsimin 2022-07-27 18:02:24 +08:00
parent 1ea40571d9
commit 5298c168ef
9 changed files with 78 additions and 17 deletions

3
.vscode/launch.json vendored
View File

@ -10,9 +10,10 @@
"request": "launch",
"mainClass": "com.yuandian.dataflow.Server",
"projectName": "dataflow",
"args": "2",
"args": ["2"],
"preLaunchTask": "restart",
"postDebugTask": "stopall",
}
]
}

3
.vscode/tasks.json vendored
View File

@ -5,9 +5,10 @@
"label": "restart",
"type": "shell",
"command": "sh restart.sh",
"isBackground": true,
"presentation": {
"echo": true,
"reveal": "always",
"reveal": "silent",
"focus": false,
"panel": "shared",
"showReuseMessage": true,

View File

@ -1,2 +1,3 @@
#! /bin/bash
sh stop.sh & rm raftdata/ -rf && mvn package && truncate -s 0 screenlog.0 && sh start.sh

View File

@ -23,8 +23,11 @@ public class Server {
public static String sprPort;
public static Configuration conf ;
public static void main(String[] args) throws Exception {
String[] peers = new String[]{"localhost:4440","localhost:4441","localhost:4442"};
String[] sprPeers = new String[]{"3440","3441","3442"};
@ -39,6 +42,8 @@ public class Server {
conf = JRaftUtils.getConfiguration(String.join(",", peers));
StateServerFactory.startStateServer(peeridstr, conf);
// System.setProperty("server.port", sprPort);
// ConfigurableApplicationContext app = SpringApplication.run(Server.class, args);
// StateServerFactory.setAppCxt(app);

View File

@ -70,7 +70,6 @@ public class PacketsProcessor implements RpcProcessor<PacketsProcessor.PacketsRe
var state = this.getValue();
var ws = state.getWorkers().get(StateServerFactory.getServerId());
ws.setTaskQueueSize( ws.getTaskQueueSize() - request.packets.size());
ws.setUpdateAt(Instant.now());

View File

@ -0,0 +1,39 @@
package com.yuandian.dataflow.controller;
import java.io.Serializable;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.rpc.RpcContext;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest;
import com.yuandian.dataflow.statemachine.StateServerFactory;
import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@ProcessorRaft
public class TransferLeaderProcessor implements RpcProcessor<TransferLeaderProcessor.LeaderRequest> {
@Setter
@Getter
public static class LeaderRequest implements Serializable {
PeerId peer;
}
@Override
public void handleRequest(RpcContext rpcCtx, LeaderRequest request) {
Status status = StateServerFactory.getCluster().getRaftNode().transferLeadershipTo(request.peer);
rpcCtx.sendResponse(status);
log.debug("[TransferLeader] {} change leader to {}", status, request.peer);
}
@Override
public String interest() {
return LeaderRequest.class.getName();
}
}

View File

@ -14,8 +14,11 @@ import com.alipay.sofa.jraft.core.StateMachineAdapter;
import com.alipay.sofa.jraft.entity.LeaderChangeContext;
import com.alipay.sofa.jraft.error.RaftException;
import com.alipay.sofa.jraft.error.RemotingException;
import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter;
import com.yuandian.dataflow.Server;
import com.yuandian.dataflow.controller.TransferLeaderProcessor;
import com.yuandian.dataflow.statemachine.closure.GenericClosure;
import com.yuandian.dataflow.statemachine.operate.Operate;
import com.yuandian.dataflow.statemachine.operate.Operate.OperateType;
@ -173,13 +176,12 @@ public class StateMachine extends StateMachineAdapter {
public void onLeaderStop(final Status status) {
log.debug("onLeaderStop {}", StateServerFactory.getCluster().getServerId());
this.leaderTerm.set(-1);
super.onLeaderStop(status);
// 判断是否Master线程还在跑, 如果存在则中断
if (MasterFactory.getMasterExecute().isAlive()) {
MasterFactory.getMasterExecute().interrupt();
}
super.onLeaderStop(status);
}
@Override
@ -198,18 +200,19 @@ public class StateMachine extends StateMachineAdapter {
MasterFactory.getMasterExecute().interrupt();
}
var ws = new WorkerState(StateServerFactory.getServerId());
log.debug("my: {} leader id {}", StateServerFactory.getServerId(), StateServerFactory.getLeaderId());
var op = new Operate(OperateType.PUT, ws);
Operate.CallOperate(op, new GenericClosure() {
Operate.CallOperate(op, new GenericClosure<Operate>() {
@Override
public void run(Status status) {
log.info("{} {}", status, this.getResponse());
}
});
return;
} catch (Exception e) {
log.info("{}", e.toString());

View File

@ -67,8 +67,19 @@ public class OperateProcessor implements RpcProcessor<OperateProcessor.OperateRe
final GenericClosure<Operate> closure = new GenericClosure<Operate>() {
@Override
public void run(Status status) {
rpcCtx.sendResponse(getResponse());
if(status.isOk()) {
log.info("{}", status);
rpcCtx.sendResponse(getResponse());
return;
}
if(status.getRaftError() == RaftError.EPERM) {
//TODO: Not leader 需要转发
log.info("{}", status);
}
}
};

View File

@ -1,13 +1,14 @@
#! /bin/bash
screen -S raft-0 -X quit
screen -S raft-1 -X quit
screen -S raft-2 -X quit
# screen -S raft-2 -X quit
sleep 5s
VERSION=1.0.0-SNAPSHOT
sleep 1
screen -dmS raft-0 -L java -jar target/dataflow-${VERSION}.jar 0
screen -dmS raft-1 -L java -jar target/dataflow-${VERSION}.jar 1
# screen -dmS raft-2 -L java -jar target/dataflow-${VERSION}.jar 2