TODO: 解决读一致性死锁的问题. 异步很容易导致死锁
This commit is contained in:
parent
52a6169323
commit
cbf0308841
1
.vscode/launch.json
vendored
1
.vscode/launch.json
vendored
|
@ -14,7 +14,6 @@
|
||||||
"args": [
|
"args": [
|
||||||
"0"
|
"0"
|
||||||
],
|
],
|
||||||
"preLaunchTask": "rename",
|
|
||||||
"presentation": {
|
"presentation": {
|
||||||
"reveal": "always",
|
"reveal": "always",
|
||||||
"plane": "new",
|
"plane": "new",
|
||||||
|
|
15
.vscode/tasks.json
vendored
15
.vscode/tasks.json
vendored
|
@ -28,21 +28,10 @@
|
||||||
"panel": "new",
|
"panel": "new",
|
||||||
"showReuseMessage": true,
|
"showReuseMessage": true,
|
||||||
"clear": false,
|
"clear": false,
|
||||||
"close": false
|
"close": true
|
||||||
|
|
||||||
},
|
},
|
||||||
},
|
}
|
||||||
{
|
|
||||||
"label": "rename",
|
|
||||||
"type": "shell",
|
|
||||||
"command": "ehco ${workbench.action.terminal.rename}",
|
|
||||||
"args": [
|
|
||||||
"123"
|
|
||||||
]
|
|
||||||
|
|
||||||
},
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
],
|
],
|
||||||
|
|
||||||
|
|
49
readme.md
Normal file
49
readme.md
Normal file
|
@ -0,0 +1,49 @@
|
||||||
|
# 使用
|
||||||
|
|
||||||
|
## 状态机的使用
|
||||||
|
|
||||||
|
* 实现一个Master循环
|
||||||
|
```java
|
||||||
|
@MasterRegister
|
||||||
|
public class MasterProcessor implements MasterExecute {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 主循环入口
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void loop(MasterContext cxt) {
|
||||||
|
//TODO:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
* 实现多个Worker RPC接口
|
||||||
|
```java
|
||||||
|
@WorkerRegister
|
||||||
|
public class PacketsProcessor implements RpcProcessor<PacketsProcessor.PacketsRequest> {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 请求参数类
|
||||||
|
*/
|
||||||
|
@Setter
|
||||||
|
@Getter
|
||||||
|
public static class PacketsRequest implements Serializable {
|
||||||
|
private ArrayList<Any> packets = new ArrayList<>(); // 传参
|
||||||
|
private int Code; // 传参
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handleRequest(RpcContext rpcCtx, PacketsRequest request) {
|
||||||
|
// TODO: 处理请求
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 返回请求的类名
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public String interest() {
|
||||||
|
return PacketsRequest.class.getName();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
|
@ -2,6 +2,9 @@ package com.yuandian.dataflow.controller;
|
||||||
|
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import com.alipay.sofa.jraft.Status;
|
import com.alipay.sofa.jraft.Status;
|
||||||
import com.alipay.sofa.jraft.error.RemotingException;
|
import com.alipay.sofa.jraft.error.RemotingException;
|
||||||
|
@ -29,27 +32,29 @@ public class MasterProcessor implements MasterExecute {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void loop(MasterContext cxt) {
|
public void loop(MasterContext cxt) {
|
||||||
try {
|
|
||||||
Thread.sleep(3000);
|
// try {
|
||||||
} catch (InterruptedException e1) {
|
// Thread.sleep(1000);
|
||||||
e1.printStackTrace();
|
// } catch (InterruptedException e) {
|
||||||
return;
|
// // TODO Auto-generated catch block
|
||||||
}
|
// e.printStackTrace();
|
||||||
|
// }
|
||||||
|
|
||||||
var alivePeers = List.copyOf(StateFactory.getRaftNode().listAlivePeers());
|
var alivePeers = List.copyOf(StateFactory.getRaftNode().listAlivePeers());
|
||||||
log.debug("master({}) execute {}", StateFactory.getServerId(),
|
log.debug("master({}) execute {}", StateFactory.getServerId(), alivePeers);
|
||||||
StateFactory.getRaftNode().listAlivePeers());
|
|
||||||
if (alivePeers == null) {
|
if (alivePeers == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
CountDownLatch latch = new CountDownLatch(alivePeers.size());
|
||||||
// 读一致性
|
// 读一致性
|
||||||
StateFactory.readIndexState(new GenericClosure<State>() {
|
StateFactory.readIndexState(new GenericClosure() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run(Status status) {
|
public void run(Status status) {
|
||||||
var state = this.getValue();
|
var state = this.<State>getValue();
|
||||||
// log.debug("masterExecute start {} {}", status, alivePeers);
|
// log.debug("masterExecute start {} {}", status, alivePeers);
|
||||||
|
|
||||||
alivePeers.forEach((peer) -> {
|
alivePeers.forEach((peer) -> {
|
||||||
|
|
||||||
if (state == null) {
|
if (state == null) {
|
||||||
|
@ -82,9 +87,10 @@ public class MasterProcessor implements MasterExecute {
|
||||||
}
|
}
|
||||||
|
|
||||||
// 先提交 节点的 剩余能处理的任务数量. 然后再处理
|
// 先提交 节点的 剩余能处理的任务数量. 然后再处理
|
||||||
|
|
||||||
Operate.CallOperate(
|
Operate.CallOperate(
|
||||||
new Operate(OperateType.PUT_WORKERSTATE, ws),
|
new Operate(OperateType.PUT_WORKERSTATE, ws),
|
||||||
new GenericClosure<Operate>() {
|
new GenericClosure() {
|
||||||
@Override
|
@Override
|
||||||
public void run(Status status) {
|
public void run(Status status) {
|
||||||
log.info("PacketsRequest run {}", status);
|
log.info("PacketsRequest run {}", status);
|
||||||
|
@ -93,6 +99,8 @@ public class MasterProcessor implements MasterExecute {
|
||||||
new InvokeCallback() {
|
new InvokeCallback() {
|
||||||
@Override
|
@Override
|
||||||
public void complete(Object result, Throwable err) {
|
public void complete(Object result, Throwable err) {
|
||||||
|
latch.countDown();
|
||||||
|
log.debug("countDown {}", latch.getCount());
|
||||||
if (err != null) {
|
if (err != null) {
|
||||||
// TODO: 如果错误, 需要让节点恢复任务处理的状态
|
// TODO: 如果错误, 需要让节点恢复任务处理的状态
|
||||||
log.debug("{}", err);
|
log.debug("{}", err);
|
||||||
|
@ -105,10 +113,19 @@ public class MasterProcessor implements MasterExecute {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
});
|
});
|
||||||
|
|
||||||
|
try {
|
||||||
|
latch.await(5000, TimeUnit.MILLISECONDS);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
log.error("{}", e.toString());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -50,7 +50,7 @@ public class PacketsProcessor implements RpcProcessor<PacketsProcessor.PacketsRe
|
||||||
@Override
|
@Override
|
||||||
public void handleRequest(RpcContext rpcCtx, PacketsRequest request) {
|
public void handleRequest(RpcContext rpcCtx, PacketsRequest request) {
|
||||||
// StateServerFactory.getStateServer().updateFsmStateAsync(s, onCompleted);
|
// StateServerFactory.getStateServer().updateFsmStateAsync(s, onCompleted);
|
||||||
var resp = new RaftResponse<>();
|
var resp = new RaftResponse();
|
||||||
resp.setSuccess(true);
|
resp.setSuccess(true);
|
||||||
rpcCtx.sendResponse(resp); // 因为jraft是异步. 收到数据直接返回
|
rpcCtx.sendResponse(resp); // 因为jraft是异步. 收到数据直接返回
|
||||||
|
|
||||||
|
@ -59,44 +59,30 @@ public class PacketsProcessor implements RpcProcessor<PacketsProcessor.PacketsRe
|
||||||
// TODO: request.packets 入库,回填, 告警 等操作
|
// TODO: request.packets 入库,回填, 告警 等操作
|
||||||
|
|
||||||
|
|
||||||
Thread.sleep(ThreadLocalRandom.current().nextLong(100, 3000));
|
Thread.sleep(ThreadLocalRandom.current().nextLong(100, 2000));
|
||||||
|
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
log.info(e.toString());
|
log.info(e.toString());
|
||||||
} finally { // 确保 更新 最终的任务状态给master.
|
} finally { // 确保 更新 最终的任务状态给master.
|
||||||
|
|
||||||
// 读状态 Closure<State> 里的 getValue<State>为 State的状态
|
// 读状态 Closure<State> 里的 getValue<State>为 State的状态
|
||||||
StateFactory.readIndexState(new GenericClosure<State>() {
|
var state = StateFactory.getStateServer().getFsm().getState() ; // 获取返回的状态
|
||||||
|
var ws = state.getWorkers().get(StateFactory.getServerId());
|
||||||
|
ws.setTaskQueueSize(ws.getTaskQueueSize() - request.packets.size()); // 减少 现有的任务数量
|
||||||
|
ws.setUpdateAt(Instant.now()); // 设置更新时间
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run(Status status) {
|
|
||||||
|
|
||||||
if (!status.isOk()) {
|
|
||||||
log.error("失败 readIndexState {}", status);
|
|
||||||
}
|
|
||||||
|
|
||||||
// readIndexState 失败后也需要直接 更新自己状态
|
|
||||||
|
|
||||||
var state = this.getValue(); // 获取返回的状态
|
|
||||||
var ws = state.getWorkers().get(StateFactory.getServerId());
|
|
||||||
ws.setTaskQueueSize(ws.getTaskQueueSize() - request.packets.size()); // 减少 现有的任务数量
|
|
||||||
ws.setUpdateAt(Instant.now()); // 设置更新时间
|
|
||||||
|
|
||||||
Operate.CallOperate(new Operate(OperateType.PUT_WORKERSTATE, ws),
|
Operate.CallOperate(new Operate(OperateType.PUT_WORKERSTATE, ws),
|
||||||
new GenericClosure<Operate>() {
|
new GenericClosure() {
|
||||||
@Override
|
@Override
|
||||||
public void run(Status status) {
|
public void run(Status status) {
|
||||||
if (!status.isOk()) {
|
if (!status.isOk()) {
|
||||||
log.error("CallOperate [{}] {}", StateFactory.getServerId(), resp);
|
log.error("CallOperate [{}] {}", StateFactory.getServerId(), resp);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
;
|
|
||||||
|
|
||||||
|
} ;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -57,7 +57,7 @@ public class MasterFactory {
|
||||||
public void run() {
|
public void run() {
|
||||||
|
|
||||||
MasterContext cxt = new MasterContext();
|
MasterContext cxt = new MasterContext();
|
||||||
while (!cxt.getIsExit().get()) {
|
while (!cxt.getIsExit()) {
|
||||||
masterExecuteCls.loop(cxt);
|
masterExecuteCls.loop(cxt);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,6 +18,8 @@ import java.util.Set;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.SynchronousQueue;
|
import java.util.concurrent.SynchronousQueue;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
import org.reflections.ReflectionUtils;
|
import org.reflections.ReflectionUtils;
|
||||||
|
@ -33,6 +35,7 @@ import com.alipay.sofa.jraft.Status;
|
||||||
import com.alipay.sofa.jraft.closure.ReadIndexClosure;
|
import com.alipay.sofa.jraft.closure.ReadIndexClosure;
|
||||||
import com.alipay.sofa.jraft.conf.Configuration;
|
import com.alipay.sofa.jraft.conf.Configuration;
|
||||||
import com.alipay.sofa.jraft.entity.PeerId;
|
import com.alipay.sofa.jraft.entity.PeerId;
|
||||||
|
import com.alipay.sofa.jraft.entity.ReadIndexStatus;
|
||||||
import com.alipay.sofa.jraft.entity.Task;
|
import com.alipay.sofa.jraft.entity.Task;
|
||||||
import com.alipay.sofa.jraft.error.RaftError;
|
import com.alipay.sofa.jraft.error.RaftError;
|
||||||
import com.alipay.sofa.jraft.error.RemotingException;
|
import com.alipay.sofa.jraft.error.RemotingException;
|
||||||
|
@ -51,6 +54,7 @@ import com.yuandian.dataflow.statemachine.annotations.WorkerRegister;
|
||||||
import com.yuandian.dataflow.statemachine.closure.GenericClosure;
|
import com.yuandian.dataflow.statemachine.closure.GenericClosure;
|
||||||
import com.yuandian.dataflow.statemachine.master.MasterExecute;
|
import com.yuandian.dataflow.statemachine.master.MasterExecute;
|
||||||
import com.yuandian.dataflow.statemachine.operate.Operate;
|
import com.yuandian.dataflow.statemachine.operate.Operate;
|
||||||
|
import com.yuandian.dataflow.statemachine.operate.Operate.OperateType;
|
||||||
import com.yuandian.dataflow.statemachine.rpc.OperateProcessor.OperateRequest;
|
import com.yuandian.dataflow.statemachine.rpc.OperateProcessor.OperateRequest;
|
||||||
import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
|
import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
|
||||||
import com.yuandian.dataflow.statemachine.state.State;
|
import com.yuandian.dataflow.statemachine.state.State;
|
||||||
|
@ -70,6 +74,7 @@ import lombok.extern.slf4j.Slf4j;
|
||||||
public class StateFactory {
|
public class StateFactory {
|
||||||
|
|
||||||
private static StateServer ss;
|
private static StateServer ss;
|
||||||
|
private static ReentrantLock lockReadIndex = new ReentrantLock();
|
||||||
|
|
||||||
public static void startStateServer(String peerstr, Configuration conf) throws Exception {
|
public static void startStateServer(String peerstr, Configuration conf) throws Exception {
|
||||||
if (ss != null) {
|
if (ss != null) {
|
||||||
|
@ -111,11 +116,11 @@ public class StateFactory {
|
||||||
return ss;
|
return ss;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void readIndexState(GenericClosure<State> closure) {
|
public static void readIndexState(GenericClosure closure) {
|
||||||
ss.readIndexState(closure);
|
ss.readIndexState(closure);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void applyOperate(Operate op, GenericClosure<Operate> closure) {
|
public static void applyOperate(Operate op, GenericClosure closure) {
|
||||||
ss.applyOperate(op, closure);
|
ss.applyOperate(op, closure);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -161,8 +166,6 @@ public class StateFactory {
|
||||||
// conf =
|
// conf =
|
||||||
// JRaftUtils.getConfiguration("localhost:4440,localhost:4441,localhost:4442");
|
// JRaftUtils.getConfiguration("localhost:4440,localhost:4441,localhost:4442");
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
PeerId serverId = JRaftUtils.getPeerId(addr);
|
PeerId serverId = JRaftUtils.getPeerId(addr);
|
||||||
int port = serverId.getPort();
|
int port = serverId.getPort();
|
||||||
|
|
||||||
|
@ -189,16 +192,15 @@ public class StateFactory {
|
||||||
var traces = Thread.currentThread().getStackTrace();
|
var traces = Thread.currentThread().getStackTrace();
|
||||||
var clsName = traces[traces.length - 1].getClassName();
|
var clsName = traces[traces.length - 1].getClassName();
|
||||||
var packName = clsName.substring(0, Utils.indexOfAsPossible(clsName, ".", 3));
|
var packName = clsName.substring(0, Utils.indexOfAsPossible(clsName, ".", 3));
|
||||||
log.info("获取 {} -> {} 下包的所有注解",clsName, packName );
|
log.info("获取 {} -> {} 下包的所有注解", clsName, packName);
|
||||||
|
|
||||||
var refl = new Reflections(packName);
|
var refl = new Reflections(packName);
|
||||||
Set<Class<?>> scans = refl.getTypesAnnotatedWith(WorkerRegister.class);
|
Set<Class<?>> scans = refl.getTypesAnnotatedWith(WorkerRegister.class);
|
||||||
|
|
||||||
|
|
||||||
scans.forEach((pRaftClass) -> {
|
scans.forEach((pRaftClass) -> {
|
||||||
scansMap.put(pRaftClass.getName(), pRaftClass);
|
scansMap.put(pRaftClass.getName(), pRaftClass);
|
||||||
});
|
});
|
||||||
log.info("扫描注解的时间(scan annotations cost time): {} ms", Duration.between(now, Instant.now()).toMillis()) ;
|
log.info("扫描注解的时间(scan annotations cost time): {} ms", Duration.between(now, Instant.now()).toMillis());
|
||||||
scansMap.forEach((name, pRaftClass) -> {
|
scansMap.forEach((name, pRaftClass) -> {
|
||||||
try {
|
try {
|
||||||
cluster.getRpcServer()
|
cluster.getRpcServer()
|
||||||
|
@ -209,22 +211,23 @@ public class StateFactory {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
refl.getTypesAnnotatedWith(MasterRegister.class).forEach((pClass)->{
|
refl.getTypesAnnotatedWith(MasterRegister.class).forEach((pClass) -> {
|
||||||
try {
|
try {
|
||||||
MasterExecute execute = (MasterExecute)pClass.getDeclaredConstructor().newInstance();
|
MasterExecute execute = (MasterExecute) pClass.getDeclaredConstructor().newInstance();
|
||||||
MasterFactory.registerMasterLoop(execute);
|
MasterFactory.registerMasterLoop(execute);
|
||||||
|
|
||||||
} catch (InstantiationException | IllegalAccessException | IllegalArgumentException
|
} catch (InstantiationException | IllegalAccessException | IllegalArgumentException
|
||||||
| InvocationTargetException | NoSuchMethodException | SecurityException e) {
|
| InvocationTargetException | NoSuchMethodException | SecurityException e) {
|
||||||
log.info("{}", e.toString());
|
log.info("{}", e.toString());
|
||||||
|
|
||||||
}
|
}
|
||||||
});;
|
});
|
||||||
|
;
|
||||||
|
|
||||||
// 启动集群
|
// 启动集群
|
||||||
node = cluster.start();
|
node = cluster.start();
|
||||||
|
|
||||||
rpcClient = new BoltRaftRpcFactory().createRpcClient(); // 创建rpc客户端.
|
rpcClient = new BoltRaftRpcFactory().createRpcClient(); // 创建rpc客户端.
|
||||||
rpcClient.init(new CliOptions()); // 初始化
|
rpcClient.init(new CliOptions()); // 初始化
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -232,27 +235,35 @@ public class StateFactory {
|
||||||
return this.fsm.isLeader();
|
return this.fsm.isLeader();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void readIndexState(GenericClosure<State> closure) {
|
public void readIndexState(GenericClosure closure) {
|
||||||
getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {
|
|
||||||
|
getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure(2000) {
|
||||||
@Override
|
@Override
|
||||||
public void run(Status status, long index, byte[] reqCtx) {
|
public void run(Status status, long index, byte[] reqCtx) {
|
||||||
log.debug("readIndexState({}) {}", getServerId(), status);
|
log.debug("readIndexState({}) {}", getServerId(), status);
|
||||||
if (status.isOk()) {
|
if (status.isOk()) {
|
||||||
|
|
||||||
closure.success(ss.fsm.getState());
|
closure.success(ss.fsm.getState());
|
||||||
closure.setValue(ss.fsm.getState());
|
closure.setValue(ss.fsm.getState());
|
||||||
} else {
|
closure.run(status);
|
||||||
|
return ;
|
||||||
// 提交同步
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// 回调失败
|
// 回调失败
|
||||||
closure.run(status);
|
// 提交同步
|
||||||
|
|
||||||
|
log.info("status not ok");
|
||||||
|
readIndexExecutor.execute(()->{
|
||||||
|
if(isLeader()) {
|
||||||
|
Operate.CallOperate(new Operate(OperateType.GET_STATE, null), closure);
|
||||||
|
} else {
|
||||||
|
handlerNotLeaderError(closure);
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void applyOperate(Operate op, GenericClosure<Operate> closure) {
|
public void applyOperate(Operate op, GenericClosure closure) {
|
||||||
// 所有的提交都必须再leader进行
|
// 所有的提交都必须再leader进行
|
||||||
if (!ss.isLeader()) {
|
if (!ss.isLeader()) {
|
||||||
ss.handlerNotLeaderError(closure);
|
ss.handlerNotLeaderError(closure);
|
||||||
|
@ -265,17 +276,17 @@ public class StateFactory {
|
||||||
task.setData(
|
task.setData(
|
||||||
ByteBuffer.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(op)));
|
ByteBuffer.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(op)));
|
||||||
task.setDone(closure); // 确认所有数据 一致, 不需要加锁
|
task.setDone(closure); // 确认所有数据 一致, 不需要加锁
|
||||||
StateFactory.getStateServer().getNode().apply(task);
|
StateFactory.getNode().apply(task);
|
||||||
} catch (CodecException e) {
|
} catch (CodecException e) {
|
||||||
String errorMsg = "Fail to encode TaskState";
|
String errorMsg = "Fail to encode WorkerState";
|
||||||
log.debug(errorMsg, e);
|
log.debug("{} {}", errorMsg, e);
|
||||||
closure.failure(errorMsg, PeerId.emptyPeer());
|
closure.failure(errorMsg, PeerId.emptyPeer());
|
||||||
closure.run(new Status(RaftError.EINTERNAL, errorMsg));
|
closure.run(new Status(RaftError.EINTERNAL, errorMsg));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public <T> RaftResponse<T> redirect() {
|
public RaftResponse redirect() {
|
||||||
final RaftResponse<T> response = new RaftResponse<T>();
|
final RaftResponse response = new RaftResponse ();
|
||||||
response.setSuccess(false);
|
response.setSuccess(false);
|
||||||
if (this.node != null) {
|
if (this.node != null) {
|
||||||
final PeerId leader = this.node.getLeaderId();
|
final PeerId leader = this.node.getLeaderId();
|
||||||
|
@ -286,7 +297,7 @@ public class StateFactory {
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
public <T> void handlerNotLeaderError(final GenericClosure<T> closure) {
|
public void handlerNotLeaderError(final GenericClosure closure) {
|
||||||
closure.failure("Not leader.", redirect().getRedirect());
|
closure.failure("Not leader.", redirect().getRedirect());
|
||||||
closure.run(new Status(RaftError.EPERM, "Not leader"));
|
closure.run(new Status(RaftError.EPERM, "Not leader"));
|
||||||
}
|
}
|
||||||
|
|
|
@ -61,12 +61,12 @@ public class StateMachine extends StateMachineAdapter {
|
||||||
while (iter.hasNext()) {
|
while (iter.hasNext()) {
|
||||||
|
|
||||||
Operate op = null;
|
Operate op = null;
|
||||||
GenericClosure<Operate> closure = null;
|
GenericClosure closure = null;
|
||||||
if (iter.done() != null) {
|
if (iter.done() != null) {
|
||||||
|
|
||||||
// leader可以直接从 回调closure里提取operate
|
// leader可以直接从 回调closure里提取operate
|
||||||
closure = (GenericClosure<Operate>)iter.done(); // 只支持单一个State. 全状态机只支持一种提交
|
closure = (GenericClosure)iter.done(); // 只支持单一个State. 全状态机只支持一种提交
|
||||||
op = closure.getValue();
|
op = (Operate)closure.getValue();
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
// 非leader 需要从getData反序列化出来后处理
|
// 非leader 需要从getData反序列化出来后处理
|
||||||
|
@ -93,21 +93,24 @@ public class StateMachine extends StateMachineAdapter {
|
||||||
WorkerState ws = op.getValue();
|
WorkerState ws = op.getValue();
|
||||||
log.debug("PUT {}", ws.peerId);
|
log.debug("PUT {}", ws.peerId);
|
||||||
state.getWorkers().put(ws.peerId, ws);
|
state.getWorkers().put(ws.peerId, ws);
|
||||||
if (closure != null) {
|
|
||||||
closure.success(op);
|
break;
|
||||||
closure.run(Status.OK());
|
case GET_STATE:
|
||||||
}
|
closure.setValue(this.state);
|
||||||
|
log.info("GET_STATE value={} at logIndex={}", this.state, iter.getIndex());
|
||||||
break;
|
break;
|
||||||
case REMOVE:
|
case REMOVE:
|
||||||
if (closure != null) {
|
|
||||||
closure.success(op);
|
|
||||||
closure.run(Status.OK());
|
|
||||||
}
|
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (closure != null) {
|
||||||
|
closure.success(op);
|
||||||
|
closure.run(Status.OK());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
iter.next();
|
iter.next();
|
||||||
|
@ -131,7 +134,7 @@ public class StateMachine extends StateMachineAdapter {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onLeaderStart(final long term) {
|
public void onLeaderStart(final long term) {
|
||||||
log.debug("onLeaderStart {}", StateFactory.getServerId());
|
log.debug("onLeaderStart[{}]", StateFactory.getServerId());
|
||||||
this.leaderTerm.set(term);
|
this.leaderTerm.set(term);
|
||||||
|
|
||||||
// 判断是否Master线程还在跑, 如果存在则中断
|
// 判断是否Master线程还在跑, 如果存在则中断
|
||||||
|
@ -139,17 +142,17 @@ public class StateMachine extends StateMachineAdapter {
|
||||||
MasterFactory.getMasterExecute().interrupt();
|
MasterFactory.getMasterExecute().interrupt();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
StateFactory.readIndexState(new GenericClosure() {
|
||||||
StateFactory.readIndexState(new GenericClosure<State>() {
|
|
||||||
@Override
|
@Override
|
||||||
public void run(Status status) {
|
public void run(Status status) {
|
||||||
|
|
||||||
var ws = state.getWorkers().get(StateFactory.getServerId());
|
var ws = this.<State>getValue().getWorkers().get(StateFactory.getServerId());
|
||||||
if (ws == null) {
|
if (ws == null) {
|
||||||
ws = new WorkerState(StateFactory.getServerId());
|
ws = new WorkerState(StateFactory.getServerId());
|
||||||
}
|
}
|
||||||
|
|
||||||
StateFactory.applyOperate(new Operate(OperateType.PUT_WORKERSTATE, ws), new GenericClosure<Operate>() {
|
// 更新当前WorkerState
|
||||||
|
StateFactory.applyOperate(new Operate(OperateType.PUT_WORKERSTATE, ws), new GenericClosure () {
|
||||||
@Override
|
@Override
|
||||||
public void run(Status status) {
|
public void run(Status status) {
|
||||||
log.debug("master update workerstate: {}", status);
|
log.debug("master update workerstate: {}", status);
|
||||||
|
@ -160,31 +163,29 @@ public class StateMachine extends StateMachineAdapter {
|
||||||
|
|
||||||
// 当成为master时候 必须启动
|
// 当成为master时候 必须启动
|
||||||
MasterFactory.getMasterExecute().start();
|
MasterFactory.getMasterExecute().start();
|
||||||
|
|
||||||
super.onLeaderStart(term);
|
super.onLeaderStart(term);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onLeaderStop(final Status status) {
|
public void onLeaderStop(final Status status) {
|
||||||
log.debug("onLeaderStop {}", StateFactory.getCluster().getServerId());
|
log.debug("onLeaderStop[{}]", StateFactory.getServerId());
|
||||||
this.leaderTerm.set(-1);
|
this.leaderTerm.set(-1);
|
||||||
// 判断是否Master线程还在跑, 如果存在则中断
|
// 判断是否Master线程还在跑, 如果存在则中断
|
||||||
if (MasterFactory.getMasterExecute().isAlive()) {
|
if (MasterFactory.getMasterExecute().isAlive()) {
|
||||||
MasterFactory.getMasterExecute().interrupt();
|
MasterFactory.getMasterExecute().interrupt();
|
||||||
}
|
}
|
||||||
|
|
||||||
super.onLeaderStop(status);
|
super.onLeaderStop(status);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onShutdown() {
|
public void onShutdown() {
|
||||||
log.debug("onShutdown");
|
log.info("onShutdown[{}]",StateFactory.getServerId());
|
||||||
super.onShutdown();
|
super.onShutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onStartFollowing(LeaderChangeContext ctx) {
|
public void onStartFollowing(LeaderChangeContext ctx) {
|
||||||
log.debug("[onStartFollowing] {} {}", ctx, StateFactory.getCluster().getServerId());
|
log.debug("onStartFollowing[{}]] {}", StateFactory.getServerId(),ctx);
|
||||||
try {
|
try {
|
||||||
|
|
||||||
// 判断是否Master线程还在跑, 如果存在则中断
|
// 判断是否Master线程还在跑, 如果存在则中断
|
||||||
|
@ -192,14 +193,14 @@ public class StateMachine extends StateMachineAdapter {
|
||||||
MasterFactory.getMasterExecute().interrupt();
|
MasterFactory.getMasterExecute().interrupt();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 在startFollowing不能使用 readIndexState
|
||||||
var ws = new WorkerState(StateFactory.getServerId());
|
|
||||||
log.debug("my: {} leader id {}", StateFactory.getServerId(), StateFactory.getLeaderId());
|
|
||||||
|
|
||||||
Operate.CallOperate(new Operate(OperateType.PUT_WORKERSTATE, ws), new GenericClosure<Operate>() {
|
|
||||||
|
// 更新当前WorkerState
|
||||||
|
Operate.CallOperate(new Operate(OperateType.PUT_WORKERSTATE, new WorkerState(StateFactory.getServerId())), new GenericClosure() {
|
||||||
@Override
|
@Override
|
||||||
public void run(Status status) {
|
public void run(Status status) {
|
||||||
log.info("onStartFollowing CallOperate {} {}", status, this.getResponse());
|
log.debug("onStartFollowing update workerstate: {}", status);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -219,14 +220,11 @@ public class StateMachine extends StateMachineAdapter {
|
||||||
@Override
|
@Override
|
||||||
|
|
||||||
public void onStopFollowing(LeaderChangeContext ctx) {
|
public void onStopFollowing(LeaderChangeContext ctx) {
|
||||||
log.debug("{} {}", ctx, StateFactory.getCluster().getServerId());
|
log.debug("onStopFollowing[{}] {}", StateFactory.getServerId(), ctx);
|
||||||
|
|
||||||
var ws = new WorkerState(StateFactory.getServerId());
|
var ws = new WorkerState(StateFactory.getServerId());
|
||||||
|
|
||||||
log.debug("my: {} leader id {}", StateFactory.getServerId(), StateFactory.getLeaderId());
|
|
||||||
|
|
||||||
var op = new Operate(OperateType.PUT_WORKERSTATE, ws);
|
var op = new Operate(OperateType.PUT_WORKERSTATE, ws);
|
||||||
Operate.CallOperate(op, new GenericClosure<Operate>() {
|
Operate.CallOperate(op, new GenericClosure() {
|
||||||
@Override
|
@Override
|
||||||
public void run(Status status) {
|
public void run(Status status) {
|
||||||
log.info("{} {}", status, this.getResponse());
|
log.info("{} {}", status, this.getResponse());
|
||||||
|
|
|
@ -13,13 +13,17 @@ import lombok.extern.slf4j.Slf4j;
|
||||||
@Getter
|
@Getter
|
||||||
@Setter
|
@Setter
|
||||||
@ToString
|
@ToString
|
||||||
public abstract class GenericClosure<T> implements Closure {
|
public abstract class GenericClosure implements Closure {
|
||||||
|
|
||||||
|
|
||||||
// 状态机的统一响应
|
// 状态机的统一响应
|
||||||
private RaftResponse<T> response;
|
private RaftResponse response;
|
||||||
// 代表任务状态
|
// 代表任务状态
|
||||||
private T value;
|
private Object value;
|
||||||
|
|
||||||
|
public <T> T getValue() {
|
||||||
|
return (T)this.value;
|
||||||
|
}
|
||||||
|
|
||||||
public GenericClosure() {
|
public GenericClosure() {
|
||||||
|
|
||||||
|
@ -31,9 +35,10 @@ public abstract class GenericClosure<T> implements Closure {
|
||||||
* @param redirect
|
* @param redirect
|
||||||
*/
|
*/
|
||||||
public void failure(final String errorMsg, final PeerId redirect) {
|
public void failure(final String errorMsg, final PeerId redirect) {
|
||||||
final RaftResponse<T> response = new RaftResponse<T>();
|
final RaftResponse response = new RaftResponse();
|
||||||
response.setSuccess(false);
|
response.setSuccess(false);
|
||||||
response.setMsg(errorMsg);
|
response.setMsg(errorMsg);
|
||||||
|
log.error("{}", errorMsg);
|
||||||
response.setRedirect(redirect);
|
response.setRedirect(redirect);
|
||||||
setResponse(response);
|
setResponse(response);
|
||||||
}
|
}
|
||||||
|
@ -42,8 +47,8 @@ public abstract class GenericClosure<T> implements Closure {
|
||||||
* 成功时调用该方法. 自动装配response
|
* 成功时调用该方法. 自动装配response
|
||||||
* @param value
|
* @param value
|
||||||
*/
|
*/
|
||||||
public void success(final T value) {
|
public void success(final Object value) {
|
||||||
final RaftResponse<T> response = new RaftResponse<T>();
|
final RaftResponse response = new RaftResponse ();
|
||||||
response.setValue(value);
|
response.setValue(value);
|
||||||
response.setSuccess(true);
|
response.setSuccess(true);
|
||||||
setResponse(response);
|
setResponse(response);
|
||||||
|
|
|
@ -6,10 +6,25 @@ import lombok.Getter;
|
||||||
import lombok.Setter;
|
import lombok.Setter;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
@Slf4j
|
|
||||||
@Getter
|
|
||||||
@Setter
|
|
||||||
public class MasterContext {
|
public class MasterContext {
|
||||||
private AtomicBoolean isExit = new AtomicBoolean(false);
|
private AtomicBoolean isExit = new AtomicBoolean(false);
|
||||||
|
|
||||||
private Object share;
|
private Object share;
|
||||||
|
|
||||||
|
public Boolean getIsExit() {
|
||||||
|
return isExit.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setIsExit(Boolean isExit) {
|
||||||
|
this.isExit.set(isExit);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Object getShare() {
|
||||||
|
return share;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setShare(Object share) {
|
||||||
|
this.share = share;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,6 +28,10 @@ public class Operate implements Serializable {
|
||||||
* 同步WorkerState状态.
|
* 同步WorkerState状态.
|
||||||
*/
|
*/
|
||||||
PUT_WORKERSTATE,
|
PUT_WORKERSTATE,
|
||||||
|
/**
|
||||||
|
* 获取State状态
|
||||||
|
*/
|
||||||
|
GET_STATE,
|
||||||
/**
|
/**
|
||||||
* 暂无想法
|
* 暂无想法
|
||||||
*/
|
*/
|
||||||
|
@ -58,7 +62,7 @@ public class Operate implements Serializable {
|
||||||
* @param closure 回调函数. Operate为返回值
|
* @param closure 回调函数. Operate为返回值
|
||||||
*/
|
*/
|
||||||
@java.lang.SuppressWarnings("unchecked")
|
@java.lang.SuppressWarnings("unchecked")
|
||||||
public static void CallOperate(Operate op, GenericClosure<Operate> closure) {
|
public static void CallOperate(Operate op, GenericClosure closure) {
|
||||||
log.debug("CallOperate Value {}", op.<WorkerState>getValue());
|
log.debug("CallOperate Value {}", op.<WorkerState>getValue());
|
||||||
// 如果是leader 就直接提交
|
// 如果是leader 就直接提交
|
||||||
if (StateFactory.isLeader()) {
|
if (StateFactory.isLeader()) {
|
||||||
|
@ -70,13 +74,12 @@ public class Operate implements Serializable {
|
||||||
var request = new OperateProcessor.OperateRequest();
|
var request = new OperateProcessor.OperateRequest();
|
||||||
request.setOperate(op);
|
request.setOperate(op);
|
||||||
|
|
||||||
var leaderId = StateFactory.getLeaderId();
|
|
||||||
try {
|
try {
|
||||||
StateFactory.rpcClientInvokeAsync(leaderId.getEndpoint(), request, new InvokeCallback() {
|
StateFactory.rpcClientInvokeAsync(StateFactory.getLeaderId().getEndpoint(), request, new InvokeCallback() {
|
||||||
@Override
|
@Override
|
||||||
public void complete(Object result, Throwable err) {
|
public void complete(Object result, Throwable err) {
|
||||||
log.debug("Object result {}", result);
|
log.debug("Object result {}", result);
|
||||||
var resp = (RaftResponse<Operate>) result;
|
var resp = (RaftResponse) result;
|
||||||
closure.setResponse(resp);
|
closure.setResponse(resp);
|
||||||
closure.success(resp.getValue());
|
closure.success(resp.getValue());
|
||||||
closure.run(Status.OK());
|
closure.run(Status.OK());
|
||||||
|
|
|
@ -56,7 +56,7 @@ public class OperateProcessor implements RpcProcessor<OperateProcessor.OperateRe
|
||||||
|
|
||||||
log.info("request: {}", request);
|
log.info("request: {}", request);
|
||||||
|
|
||||||
final GenericClosure<Operate> closure = new GenericClosure<Operate>() {
|
final GenericClosure closure = new GenericClosure () {
|
||||||
@Override
|
@Override
|
||||||
public void run(Status status) {
|
public void run(Status status) {
|
||||||
|
|
||||||
|
@ -81,15 +81,4 @@ public class OperateProcessor implements RpcProcessor<OperateProcessor.OperateRe
|
||||||
public String interest() {
|
public String interest() {
|
||||||
return OperateRequest.class.getName();
|
return OperateRequest.class.getName();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public static void main(String[] args) {
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
for(var a : Package.getPackages()) {
|
|
||||||
log.info("{}", a.getName() );
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,11 +25,11 @@ import lombok.extern.slf4j.Slf4j;
|
||||||
@Getter
|
@Getter
|
||||||
@Setter
|
@Setter
|
||||||
@ToString
|
@ToString
|
||||||
public class RaftResponse<T> implements Serializable {
|
public class RaftResponse implements Serializable {
|
||||||
|
|
||||||
private static final long serialVersionUID = 1L;
|
private static final long serialVersionUID = 1L;
|
||||||
|
|
||||||
private T value;
|
private Object value;
|
||||||
|
|
||||||
private boolean success;
|
private boolean success;
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -3,6 +3,13 @@ package com.yuandian.dataflow.utils;
|
||||||
public class Utils {
|
public class Utils {
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 找到第n个substr索引点的字符串索引
|
||||||
|
* @param str 原始字符串
|
||||||
|
* @param substr 需要定位的字符串
|
||||||
|
* @param n 第几个
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
public static int indexOf(String str, String substr, int n) {
|
public static int indexOf(String str, String substr, int n) {
|
||||||
int pos = str.indexOf(substr);
|
int pos = str.indexOf(substr);
|
||||||
while (--n > 0 && pos != -1) {
|
while (--n > 0 && pos != -1) {
|
||||||
|
@ -13,10 +20,10 @@ public class Utils {
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 尽可能的拿到 n个substr索引点的字符串. 如果超出就拿最大值.
|
* 尽可能的找到n个substr索引点的字符串索引. 如果超出就拿最大值.
|
||||||
* @param str 原始字符串
|
* @param str 原始字符串
|
||||||
* @param substr 需要定位的字符串
|
* @param substr 需要定位的字符串
|
||||||
* @param n
|
* @param n 第几个
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
public static int indexOfAsPossible(String str, String substr, int n) {
|
public static int indexOfAsPossible(String str, String substr, int n) {
|
||||||
|
@ -30,7 +37,5 @@ public class Utils {
|
||||||
return pos;
|
return pos;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void main(String[] args) {
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
6
start.sh
6
start.sh
|
@ -1,17 +1,17 @@
|
||||||
#! /bin/bash
|
#! /bin/bash
|
||||||
screen -S raft-0 -X quit
|
screen -S raft-0 -X quit
|
||||||
screen -S raft-1 -X quit
|
screen -S raft-1 -X quit
|
||||||
# screen -S raft-2 -X quit
|
screen -S raft-2 -X quit
|
||||||
|
|
||||||
sleep 1s
|
sleep 1s
|
||||||
|
|
||||||
VERSION=1.0.0-SNAPSHOT
|
VERSION=1.0.0-SNAPSHOT
|
||||||
screen -dmS raft-0 -L java -jar target/dataflow-${VERSION}.jar 0
|
screen -dmS raft-0 -L java -jar target/dataflow-${VERSION}.jar 0
|
||||||
screen -dmS raft-1 -L java -jar target/dataflow-${VERSION}.jar 1
|
screen -dmS raft-1 -L java -jar target/dataflow-${VERSION}.jar 1
|
||||||
# screen -dmS raft-2 -L java -jar target/dataflow-${VERSION}.jar 2
|
screen -dmS raft-2 -L java -jar target/dataflow-${VERSION}.jar 2
|
||||||
|
|
||||||
sleep 0.5s
|
sleep 0.5s
|
||||||
|
|
||||||
screen -S raft-0 -X logfile flush 0
|
screen -S raft-0 -X logfile flush 0
|
||||||
screen -S raft-1 -X logfile flush 0
|
screen -S raft-1 -X logfile flush 0
|
||||||
# screen -S raft-2 -X logfile flush 0
|
screen -S raft-2 -X logfile flush 0
|
Loading…
Reference in New Issue
Block a user