加入vscode调试方式

This commit is contained in:
huangsimin 2022-07-27 16:25:10 +08:00
parent 4fa9035983
commit 1ea40571d9
12 changed files with 132 additions and 77 deletions

18
.vscode/launch.json vendored Normal file
View File

@ -0,0 +1,18 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"type": "java",
"name": "Launch Server",
"request": "launch",
"mainClass": "com.yuandian.dataflow.Server",
"projectName": "dataflow",
"args": "2",
"preLaunchTask": "restart",
"postDebugTask": "stopall",
}
]
}

3
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,3 @@
{
"java.configuration.updateBuildConfiguration": "automatic"
}

27
.vscode/tasks.json vendored Normal file
View File

@ -0,0 +1,27 @@
{
"version": "2.0.0",
"tasks": [
{
"label": "restart",
"type": "shell",
"command": "sh restart.sh",
"presentation": {
"echo": true,
"reveal": "always",
"focus": false,
"panel": "shared",
"showReuseMessage": true,
"clear": false,
"close": true
}
},
{
"label": "stopall",
"type": "shell",
"command": "sh stop.sh",
"presentation": {
"close": true
}
}
]
}

View File

@ -1,2 +1,2 @@
#! /bin/bash #! /bin/bash
sh stop.sh & rm raftdata/ -rf && mvn -T4 package && truncate -s 0 screenlog.0 && sh start.sh sh stop.sh & rm raftdata/ -rf && mvn package && truncate -s 0 screenlog.0 && sh start.sh

View File

@ -52,40 +52,45 @@ public class PacketsProcessor implements RpcProcessor<PacketsProcessor.PacketsRe
@Override @Override
public void handleRequest(RpcContext rpcCtx, PacketsRequest request) { public void handleRequest(RpcContext rpcCtx, PacketsRequest request) {
// StateServerFactory.getStateServer().updateFsmStateAsync(s, onCompleted); // StateServerFactory.getStateServer().updateFsmStateAsync(s, onCompleted);
var resp = new RaftResponse<>();
resp.setSuccess(true);
rpcCtx.sendResponse(resp);
var ss = StateServerFactory.getStateServer(); var ss = StateServerFactory.getStateServer();
log.info("{} handler request.packets.size(): {}", StateServerFactory.getServerId(), request.packets.size()); log.debug("{} handler request.packets.size(): {}", StateServerFactory.getServerId(), request.packets.size());
ss.readIndexState( new GenericClosure<State>() { ss.readIndexState( new GenericClosure<State>() {
@Override @Override
public void run(Status status) { public void run(Status status) {
log.debug("status {}", status);
var state = this.getValue(); if(status.isOk()) {
var ws = state.getWorkers().get(StateServerFactory.getServerId()); var state = this.getValue();
ws.setTaskQueueSize( ws.getTaskQueueSize() - request.packets.size()); var ws = state.getWorkers().get(StateServerFactory.getServerId());
ws.setUpdateAt(Instant.now());
log.debug("workerState taskQueueSize: {} psize: {} state {}", ws.getTaskQueueSize(), request.packets.size(), state.getWorkers().size()); ws.setTaskQueueSize( ws.getTaskQueueSize() - request.packets.size());
ws.setUpdateAt(Instant.now());
Operate.CallOperate(new Operate(OperateType.PUT,ws), new GenericClosure<Operate>() {
@Override log.debug("workerState taskQueueSize: {} psize: {} state {}", ws.getTaskQueueSize(), request.packets.size(), state.getWorkers().size());
public void run(Status status) {
var resp = this.getResponse(); Operate.CallOperate(new Operate(OperateType.PUT,ws), new GenericClosure<Operate>() {
resp.setMsg(rpcCtx.getRemoteAddress()); @Override
if(status.isOk()) { public void run(Status status) {
resp.setSuccess(true); if(status.isOk()) {
log.info("{}", resp); log.info("{}", resp);
} else { }
resp.setSuccess(false);
} }
rpcCtx.sendResponse(resp); });
} }
});
} }
} ); } );
} }
@Override @Override

View File

@ -20,7 +20,7 @@ import com.yuandian.dataflow.statemachine.closure.GenericClosure;
import com.yuandian.dataflow.statemachine.operate.Operate; import com.yuandian.dataflow.statemachine.operate.Operate;
import com.yuandian.dataflow.statemachine.operate.Operate.OperateType; import com.yuandian.dataflow.statemachine.operate.Operate.OperateType;
import com.yuandian.dataflow.statemachine.rpc.RaftResponse; import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
import com.yuandian.dataflow.statemachine.rpc.OperateProcessor.OperateRequest; import com.yuandian.dataflow.statemachine.rpc.OperateProcessor.OperateRequest;
import com.yuandian.dataflow.statemachine.state.State; import com.yuandian.dataflow.statemachine.state.State;
import com.yuandian.dataflow.statemachine.state.WorkerState; import com.yuandian.dataflow.statemachine.state.WorkerState;
@ -38,7 +38,8 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j @Slf4j
public class StateMachine extends StateMachineAdapter { public class StateMachine extends StateMachineAdapter {
// private static final Logger LOG = LoggerFactory.getLogger(StateMachine.class); // private static final Logger LOG =
// LoggerFactory.getLogger(StateMachine.class);
/** /**
* State value 全局使用的唯一状态 * State value 全局使用的唯一状态
@ -70,46 +71,46 @@ public class StateMachine extends StateMachineAdapter {
if (iter.done() != null) { if (iter.done() != null) {
// This task is applied by this node, get value from closure to avoid additional // This task is applied by this node, get value from closure to avoid additional
// parsing. // parsing.
closure = (GenericClosure<Operate>)iter.done(); // 只支持单一个State. 全状态机只支持一种提交 closure = (GenericClosure<Operate>) iter.done(); // 只支持单一个State. 全状态机只支持一种提交
op = closure.getValue(); op = closure.getValue();
} else { } else {
// Have to parse FetchAddRequest from this user log. // Have to parse FetchAddRequest from this user log.
final ByteBuffer data = iter.getData(); final ByteBuffer data = iter.getData();
try { try {
op = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize(data.array(),Operate.class.getName()); op = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize(
data.array(),
Operate.class.getName());
} catch (CodecException e) { } catch (CodecException e) {
log.info("{}", e.toString()); log.info("{}", e.toString());
} }
} }
if(op != null) { if (op != null) {
switch(op.getType()) { switch (op.getType()) {
case PUT: case PUT:
WorkerState ws = op.getValue(); WorkerState ws = op.getValue();
state.getWorkers().put(ws.peerId, ws); state.getWorkers().put(ws.peerId, ws);
if(closure != null) { if (closure != null) {
closure.success(op); closure.success(op);
closure.run(Status.OK()); closure.run(Status.OK());
} }
break; break;
case REMOVE: case REMOVE:
if(closure != null) { if (closure != null) {
closure.success(op); closure.success(op);
closure.run(Status.OK()); closure.run(Status.OK());
} }
break; break;
default: default:
break; break;
} }
} else { } else {
} }
iter.next(); iter.next();
} }
@ -136,21 +137,21 @@ public class StateMachine extends StateMachineAdapter {
this.leaderTerm.set(term); this.leaderTerm.set(term);
// 判断是否Master线程还在跑, 如果存在则中断 // 判断是否Master线程还在跑, 如果存在则中断
if(MasterFactory.getMasterExecute().isAlive()) { if (MasterFactory.getMasterExecute().isAlive()) {
MasterFactory.getMasterExecute().interrupt(); MasterFactory.getMasterExecute().interrupt();
} }
var ss = StateServerFactory.getStateServer(); var ss = StateServerFactory.getStateServer();
ss.readIndexState( new GenericClosure<State>() { ss.readIndexState(new GenericClosure<State>() {
@Override @Override
public void run(Status status) { public void run(Status status) {
var ws = state.getWorkers().get( StateServerFactory.getServerId() ); var ws = state.getWorkers().get(StateServerFactory.getServerId());
if(ws == null) { if (ws == null) {
ws = new WorkerState(StateServerFactory.getServerId()); ws = new WorkerState(StateServerFactory.getServerId());
} }
Operate op = new Operate(OperateType.PUT, ws); Operate op = new Operate(OperateType.PUT, ws);
ss.applyOperate(op, new GenericClosure<Operate>() { ss.applyOperate(op, new GenericClosure<Operate>() {
@Override @Override
@ -159,7 +160,7 @@ public class StateMachine extends StateMachineAdapter {
} }
}); });
} }
}); });
// 当成为master时候 必须启动 // 当成为master时候 必须启动
@ -175,21 +176,17 @@ public class StateMachine extends StateMachineAdapter {
super.onLeaderStop(status); super.onLeaderStop(status);
// 判断是否Master线程还在跑, 如果存在则中断 // 判断是否Master线程还在跑, 如果存在则中断
if(MasterFactory.getMasterExecute().isAlive()) { if (MasterFactory.getMasterExecute().isAlive()) {
MasterFactory.getMasterExecute().interrupt(); MasterFactory.getMasterExecute().interrupt();
} }
} }
@Override @Override
public void onShutdown() { public void onShutdown() {
log.debug("onShutdown"); log.debug("onShutdown");
super.onShutdown(); super.onShutdown();
} }
@Override @Override
public void onStartFollowing(LeaderChangeContext ctx) { public void onStartFollowing(LeaderChangeContext ctx) {
@ -197,12 +194,12 @@ public class StateMachine extends StateMachineAdapter {
try { try {
// 判断是否Master线程还在跑, 如果存在则中断 // 判断是否Master线程还在跑, 如果存在则中断
if(MasterFactory.getMasterExecute().isAlive()) { if (MasterFactory.getMasterExecute().isAlive()) {
MasterFactory.getMasterExecute().interrupt(); MasterFactory.getMasterExecute().interrupt();
} }
var ws = new WorkerState(StateServerFactory.getServerId()); var ws = new WorkerState(StateServerFactory.getServerId());
log.debug("my: {} leader id {}",StateServerFactory.getServerId(), StateServerFactory.getLeaderId()); log.debug("my: {} leader id {}", StateServerFactory.getServerId(), StateServerFactory.getLeaderId());
var op = new Operate(OperateType.PUT, ws); var op = new Operate(OperateType.PUT, ws);
@ -212,10 +209,10 @@ public class StateMachine extends StateMachineAdapter {
log.info("{} {}", status, this.getResponse()); log.info("{} {}", status, this.getResponse());
} }
}); });
return; return;
} catch (Exception e) { } catch (Exception e) {
log.info("{}", e.toString()); log.info("{}", e.toString());
} }
super.onStartFollowing(ctx); super.onStartFollowing(ctx);
@ -228,12 +225,12 @@ public class StateMachine extends StateMachineAdapter {
@Override @Override
public void onStopFollowing(LeaderChangeContext ctx) { public void onStopFollowing(LeaderChangeContext ctx) {
log.debug("{} {}",ctx, StateServerFactory.getCluster().getServerId()); log.debug("{} {}", ctx, StateServerFactory.getCluster().getServerId());
var ss = StateServerFactory.getStateServer(); var ss = StateServerFactory.getStateServer();
var ws = new WorkerState(StateServerFactory.getServerId()); var ws = new WorkerState(StateServerFactory.getServerId());
log.debug("my: {} leader id {}",StateServerFactory.getServerId(), StateServerFactory.getLeaderId()); log.debug("my: {} leader id {}", StateServerFactory.getServerId(), StateServerFactory.getLeaderId());
var op = new Operate(OperateType.PUT, ws); var op = new Operate(OperateType.PUT, ws);
Operate.CallOperate(op, new GenericClosure() { Operate.CallOperate(op, new GenericClosure() {
@ -243,14 +240,7 @@ public class StateMachine extends StateMachineAdapter {
} }
}); });
super.onStopFollowing(ctx); super.onStopFollowing(ctx);
} }
} }

View File

@ -188,17 +188,18 @@ public class StateServerFactory {
getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() { getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {
@Override @Override
public void run(Status status, long index, byte[] reqCtx) { public void run(Status status, long index, byte[] reqCtx) {
if( status.isOk()) { if(status.isOk()) {
// 回调失败
closure.success(ss.fsm.getState()); closure.success(ss.fsm.getState());
closure.run(status); }
} closure.run(status);
} }
} ); } );
} }
public void applyOperate(Operate op, GenericClosure closure) { public void applyOperate(Operate op, GenericClosure<Operate> closure) {
// 所有的提交都必须再leader进行 // 所有的提交都必须再leader进行
if (!ss.isLeader()) { if (!ss.isLeader()) {
ss.handlerNotLeaderError(closure); ss.handlerNotLeaderError(closure);

View File

@ -2,6 +2,7 @@ package com.yuandian.dataflow.statemachine.operate;
import java.io.Serializable; import java.io.Serializable;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.error.RemotingException; import com.alipay.sofa.jraft.error.RemotingException;
import com.alipay.sofa.jraft.rpc.InvokeCallback; import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.yuandian.dataflow.statemachine.StateServerFactory; import com.yuandian.dataflow.statemachine.StateServerFactory;
@ -75,6 +76,7 @@ public class Operate implements Serializable {
var resp = (RaftResponse<Operate>) result; var resp = (RaftResponse<Operate>) result;
closure.setResponse(resp); closure.setResponse(resp);
closure.success(resp.getValue()); closure.success(resp.getValue());
closure.run(Status.OK());
} }
}, 5000); }, 5000);

View File

@ -64,7 +64,7 @@ public class OperateProcessor implements RpcProcessor<OperateProcessor.OperateRe
log.info("request: {}", request); log.info("request: {}", request);
final GenericClosure closure = new GenericClosure() { final GenericClosure<Operate> closure = new GenericClosure<Operate>() {
@Override @Override
public void run(Status status) { public void run(Status status) {
rpcCtx.sendResponse(getResponse()); rpcCtx.sendResponse(getResponse());

View File

@ -0,0 +1,8 @@
package com.yuandian.dataflow.utils;
public class Utils {
public static void main(String[] args) {
}
}

View File

@ -10,10 +10,10 @@ sleep 1
screen -dmS raft-0 -L java -jar target/dataflow-${VERSION}.jar 0 screen -dmS raft-0 -L java -jar target/dataflow-${VERSION}.jar 0
screen -dmS raft-1 -L java -jar target/dataflow-${VERSION}.jar 1 screen -dmS raft-1 -L java -jar target/dataflow-${VERSION}.jar 1
screen -dmS raft-2 -L java -jar target/dataflow-${VERSION}.jar 2 # screen -dmS raft-2 -L java -jar target/dataflow-${VERSION}.jar 2
sleep 1 sleep 1
screen -S raft-0 -X logfile flush 0 screen -S raft-0 -X logfile flush 0
screen -S raft-1 -X logfile flush 0 screen -S raft-1 -X logfile flush 0
screen -S raft-2 -X logfile flush 0 # screen -S raft-2 -X logfile flush 0

View File

@ -2,3 +2,4 @@
screen -S raft-0 -X quit screen -S raft-0 -X quit
screen -S raft-1 -X quit screen -S raft-1 -X quit
screen -S raft-2 -X quit screen -S raft-2 -X quit
exit 0