添加mongodb例子

This commit is contained in:
huangsimin 2022-07-27 12:37:32 +08:00
parent 9c12f24c29
commit 4fa9035983
14 changed files with 406 additions and 373 deletions

12
pom.xml
View File

@ -23,7 +23,7 @@
<slf4j.version>1.7.36</slf4j.version> <slf4j.version>1.7.36</slf4j.version>
<jraft.version>1.3.11</jraft.version> <jraft.version>1.3.11</jraft.version>
<spring.boot.version>2.7.1</spring.boot.version> <spring.boot.version>2.7.1</spring.boot.version>
<mongo.driver.version>3.12.11</mongo.driver.version> <mongo.driver.version>4.7.0</mongo.driver.version>
<nacos.version>2.1.0</nacos.version> <nacos.version>2.1.0</nacos.version>
<snakeyaml.version>1.30</snakeyaml.version> <snakeyaml.version>1.30</snakeyaml.version>
<logback.version>1.2.11</logback.version> <logback.version>1.2.11</logback.version>
@ -83,11 +83,15 @@
</dependency> </dependency>
<dependency>
<!-- https://mvnrepository.com/artifact/org.mongodb/mongodb-driver-sync -->
<dependency>
<groupId>org.mongodb</groupId> <groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId> <artifactId>mongodb-driver-sync</artifactId>
<version>${mongo.driver.version}</version> <version>${mongo.driver.version}</version>
</dependency> </dependency>
<!-- https://mvnrepository.com/artifact/org.reflections/reflections --> <!-- https://mvnrepository.com/artifact/org.reflections/reflections -->
<dependency> <dependency>

View File

@ -18,9 +18,9 @@ import com.alipay.sofa.jraft.rpc.RpcContext;
import com.alipay.sofa.jraft.rpc.RpcProcessor; import com.alipay.sofa.jraft.rpc.RpcProcessor;
import com.google.protobuf.Any; import com.google.protobuf.Any;
import com.yuandian.dataflow.statemachine.StateServerFactory; import com.yuandian.dataflow.statemachine.StateServerFactory;
import com.yuandian.dataflow.statemachine.closure.GenericClosure;
import com.yuandian.dataflow.statemachine.operate.Operate; import com.yuandian.dataflow.statemachine.operate.Operate;
import com.yuandian.dataflow.statemachine.operate.Operate.OperateType; import com.yuandian.dataflow.statemachine.operate.Operate.OperateType;
import com.yuandian.dataflow.statemachine.OperateClosure;
import com.yuandian.dataflow.statemachine.rpc.RaftResponse; import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
import com.yuandian.dataflow.statemachine.rpc.OperateProcessor; import com.yuandian.dataflow.statemachine.rpc.OperateProcessor;
@ -53,36 +53,37 @@ public class PacketsProcessor implements RpcProcessor<PacketsProcessor.PacketsRe
public void handleRequest(RpcContext rpcCtx, PacketsRequest request) { public void handleRequest(RpcContext rpcCtx, PacketsRequest request) {
// StateServerFactory.getStateServer().updateFsmStateAsync(s, onCompleted); // StateServerFactory.getStateServer().updateFsmStateAsync(s, onCompleted);
var resp = new RaftResponse();
resp.setMsg(rpcCtx.getRemoteAddress());
var ss = StateServerFactory.getStateServer(); var ss = StateServerFactory.getStateServer();
log.info("{} handler request.packets.size(): {}", StateServerFactory.getServerId(), request.packets.size()); log.info("{} handler request.packets.size(): {}", StateServerFactory.getServerId(), request.packets.size());
ss.readIndexState((state)->{ ss.readIndexState( new GenericClosure<State>() {
@Override
public void run(Status status) {
var state = this.getValue();
var ws = state.getWorkers().get(StateServerFactory.getServerId()); var ws = state.getWorkers().get(StateServerFactory.getServerId());
ws.setTaskQueueSize( ws.getTaskQueueSize() - request.packets.size()); ws.setTaskQueueSize( ws.getTaskQueueSize() - request.packets.size());
ws.setUpdateAt(Instant.now()); ws.setUpdateAt(Instant.now());
log.debug("workerState taskQueueSize: {} psize: {} state {}", ws.getTaskQueueSize(), request.packets.size(), state.getWorkers().size()); log.debug("workerState taskQueueSize: {} psize: {} state {}", ws.getTaskQueueSize(), request.packets.size(), state.getWorkers().size());
Operate.CallOperate(new Operate(OperateType.PUT,ws), new OperateClosure() { Operate.CallOperate(new Operate(OperateType.PUT,ws), new GenericClosure<Operate>() {
@Override @Override
public void run(Status status) { public void run(Status status) {
var resp = this.getResponse(); var resp = this.getResponse();
resp.setMsg(rpcCtx.getRemoteAddress());
if(status.isOk()) { if(status.isOk()) {
resp.setSuccess(true); resp.setSuccess(true);
log.info("{}", resp); log.info("{}", resp);
} else { } else {
resp.setSuccess(false); resp.setSuccess(false);
} }
rpcCtx.sendResponse(resp); rpcCtx.sendResponse(resp);
} }
}); });
}); }
} );
} }

View File

@ -4,33 +4,66 @@ package com.yuandian.dataflow.projo;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import org.bson.BsonDocument;
import org.bson.BsonInt64;
import org.bson.BsonValue;
import org.bson.Document; import org.bson.Document;
import org.bson.codecs.configuration.CodecProvider;
import org.bson.codecs.configuration.CodecRegistry;
import org.bson.codecs.pojo.PojoCodecProvider;
import static com.mongodb.MongoClientSettings.getDefaultCodecRegistry;
import static org.bson.codecs.configuration.CodecRegistries.fromProviders;
import static org.bson.codecs.configuration.CodecRegistries.fromRegistries;
import org.bson.codecs.pojo.annotations.BsonProperty; import org.bson.codecs.pojo.annotations.BsonProperty;
import org.bson.conversions.Bson;
import org.bson.types.ObjectId;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Setter @Setter
@Getter @Getter
public final class Doc extends Document { @ToString
public final class Doc {
@JsonProperty("code") @BsonProperty("retryPackets")
@BsonProperty("code") public int retryPackets ;
public int Code ;
@JsonProperty("ts") @BsonProperty("serverResponseTime")
@BsonProperty("ts") public int serverResponseTime ;
public LocalDateTime TS;
@JsonProperty("desc") @BsonProperty("requestBytes")
@BsonProperty("desc") public int requestBytes ;
public String Desc;
@JsonProperty("data") @BsonProperty("businessName")
@BsonProperty("data") public String businessName ;
public Document Data;
@BsonProperty("responseIp")
public int responseIp ;
public static void main(String[] args) {
MongoClient mgo = MongoClients.create("mongodb://yuandian:yuandian123@192.168.1.113:27017");
CodecProvider pojoCodecProvider = PojoCodecProvider.builder().register("com.yuandian.dataflow.projo").build();
CodecRegistry pojoCodecRegistry = fromRegistries(getDefaultCodecRegistry(),fromProviders(pojoCodecProvider));
MongoDatabase oriDatabase = mgo.getDatabase("ori-database").withCodecRegistry(pojoCodecRegistry);
MongoCollection<Doc> db = oriDatabase.getCollection("network_performace_flow_2022072400", Doc.class);
log.debug("{}", db.countDocuments( new BsonDocument("serverResponseTime", new BsonDocument("$gt", new BsonInt64(2083478517) )) ));
}
} }

View File

@ -0,0 +1,124 @@
/**
* description
*
* @author eson
*2022年7月20日-10:00:05
*/
package com.yuandian.dataflow.statemachine;
import java.time.Instant;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.error.RemotingException;
import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.google.protobuf.Any;
import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest;
import com.yuandian.dataflow.proto.msgtype.BacktrackingFlowOuterClass;
import com.yuandian.dataflow.statemachine.closure.GenericClosure;
import com.yuandian.dataflow.statemachine.operate.Operate;
import com.yuandian.dataflow.statemachine.operate.Operate.OperateType;
import com.yuandian.dataflow.statemachine.state.State;
import com.yuandian.dataflow.statemachine.state.WorkerState;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.var;
import lombok.extern.slf4j.Slf4j;
/**
* Master主线程, 用于接收packets
*
* @author eson
* 2022年7月20日-10:00:05
*/
@Slf4j
@Getter
@Setter
@ToString
public class MasterFactory {
public static final int MAX_TASKS = 100;
public static Thread masterExecute = new Thread(new Runnable() {
@Override
public void run() {
try {
while (true) {
log.debug("master execute {}", StateServerFactory.getServerId());
var alivePeers = StateServerFactory.getRaftNode().listAlivePeers();
log.debug("master execute {}", StateServerFactory.getRaftNode().listAlivePeers());
if (alivePeers != null) {
var ss = StateServerFactory.getStateServer();
// 读一致性
ss.readIndexState( new GenericClosure<State>() {
@Override
public void run(Status status) {
var state = this.getValue();
alivePeers.forEach((peer) -> {
WorkerState ws = state.getWorkers().get(peer);
if (ws != null) {
//
var canDealTasks = MAX_TASKS - ws.getTaskQueueSize();
log.debug("cap :{} peer: {}", canDealTasks, peer);
if (canDealTasks <= 0) {
return;
}
ws.setUpdateAt(Instant.now());
ws.setTaskQueueSize(MAX_TASKS);
var request = new PacketsRequest();
for (int i = 0; i < canDealTasks; i++) {
var p = Any.pack(BacktrackingFlowOuterClass.BacktrackingFlow
.newBuilder()
.setTableId(10086)
.build());
request.getPackets().add(p);
}
Operate.CallOperate(new Operate(OperateType.PUT, ws), new GenericClosure<Operate>() {
@Override
public void run(Status status) {
log.info("{}", status);
try {
ss.getRpcClient().invokeAsync(peer.getEndpoint(), request,
new InvokeCallback() {
@Override
public void complete(Object result, Throwable err) {
log.info("PacketsRequest: {}", result);
}
}, 5000);
} catch (InterruptedException | RemotingException e) {
log.info("error send packets {}", e.toString());
}
}
});
}
});
}
} );
}
Thread.sleep(5000);
}
} catch (InterruptedException e) {
log.info("{}", e.toString());
}
}
});
public static Thread getMasterExecute() {
return masterExecute;
}
public static void Init() {
}
}

View File

@ -16,13 +16,13 @@ import com.alipay.sofa.jraft.error.RaftException;
import com.alipay.sofa.jraft.error.RemotingException; import com.alipay.sofa.jraft.error.RemotingException;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader; import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter; import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter;
import com.yuandian.dataflow.statemachine.closure.GenericClosure;
import com.yuandian.dataflow.statemachine.operate.Operate; import com.yuandian.dataflow.statemachine.operate.Operate;
import com.yuandian.dataflow.statemachine.operate.Operate.OperateType; import com.yuandian.dataflow.statemachine.operate.Operate.OperateType;
import com.yuandian.dataflow.statemachine.rpc.RaftResponse; import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
import com.yuandian.dataflow.statemachine.rpc.OperateProcessor.OperateRequest; import com.yuandian.dataflow.statemachine.rpc.OperateProcessor.OperateRequest;
import com.yuandian.dataflow.statemachine.state.State; import com.yuandian.dataflow.statemachine.state.State;
import com.yuandian.dataflow.statemachine.state.StateFactory;
import com.yuandian.dataflow.statemachine.state.WorkerState; import com.yuandian.dataflow.statemachine.state.WorkerState;
import lombok.var; import lombok.var;
@ -66,12 +66,11 @@ public class StateMachine extends StateMachineAdapter {
while (iter.hasNext()) { while (iter.hasNext()) {
Operate op = null; Operate op = null;
OperateClosure closure = null; GenericClosure<Operate> closure = null;
if (iter.done() != null) { if (iter.done() != null) {
// This task is applied by this node, get value from closure to avoid additional // This task is applied by this node, get value from closure to avoid additional
// parsing. // parsing.
closure = (OperateClosure)iter.done(); // 只支持单一个State. 全状态机只支持一种提交 closure = (GenericClosure<Operate>)iter.done(); // 只支持单一个State. 全状态机只支持一种提交
// log.info("SyncDataClosure(done) taskState:{} leaderTerm:{} {}",this.state, this.leaderTerm, closure);
op = closure.getValue(); op = closure.getValue();
} else { } else {
@ -79,11 +78,7 @@ public class StateMachine extends StateMachineAdapter {
final ByteBuffer data = iter.getData(); final ByteBuffer data = iter.getData();
try { try {
op = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize( op = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize(data.array(),Operate.class.getName());
data.array(),Operate.class.getName());
// log.info("SyncDataClosure(null) taskState:{} leaderTerm:{}", state, this.leaderTerm);
} catch (CodecException e) { } catch (CodecException e) {
log.info("{}", e.toString()); log.info("{}", e.toString());
} }
@ -101,8 +96,6 @@ public class StateMachine extends StateMachineAdapter {
} }
break; break;
case REMOVE: case REMOVE:
if(closure != null) { if(closure != null) {
closure.success(op); closure.success(op);
closure.run(Status.OK()); closure.run(Status.OK());
@ -139,32 +132,38 @@ public class StateMachine extends StateMachineAdapter {
@Override @Override
public void onLeaderStart(final long term) { public void onLeaderStart(final long term) {
log.debug("onLeaderStart {}", StateServerFactory.getCluster().getServerId()); log.debug("onLeaderStart {}", StateServerFactory.getServerId());
this.leaderTerm.set(term); this.leaderTerm.set(term);
if(StateFactory.getMasterExecute().isAlive()) { // 判断是否Master线程还在跑, 如果存在则中断
StateFactory.getMasterExecute().interrupt(); if(MasterFactory.getMasterExecute().isAlive()) {
MasterFactory.getMasterExecute().interrupt();
} }
var ss = StateServerFactory.getStateServer(); var ss = StateServerFactory.getStateServer();
ss.readIndexState((state)->{ ss.readIndexState( new GenericClosure<State>() {
@Override
public void run(Status status) {
var ws = state.getWorkers().get( StateServerFactory.getServerId() ); var ws = state.getWorkers().get( StateServerFactory.getServerId() );
if(ws == null) { if(ws == null) {
ws = new WorkerState(StateServerFactory.getServerId()); ws = new WorkerState(StateServerFactory.getServerId());
// state.getWorkers().put(ss.getCluster().getServerId(), ws);
} }
Operate op = new Operate(OperateType.PUT, ws); Operate op = new Operate(OperateType.PUT, ws);
ss.applyOperate(op, new OperateClosure() { ss.applyOperate(op, new GenericClosure<Operate>() {
@Override @Override
public void run(Status status) { public void run(Status status) {
log.debug("master update workerstate: {}", status); log.debug("master update workerstate: {}", status);
} }
}); });
}
}); });
StateFactory.getMasterExecute().start(); // 当成为master时候 必须启动
MasterFactory.getMasterExecute().start();
super.onLeaderStart(term); super.onLeaderStart(term);
} }
@ -175,8 +174,9 @@ public class StateMachine extends StateMachineAdapter {
this.leaderTerm.set(-1); this.leaderTerm.set(-1);
super.onLeaderStop(status); super.onLeaderStop(status);
if(StateFactory.getMasterExecute().isAlive()) { // 判断是否Master线程还在跑, 如果存在则中断
StateFactory.getMasterExecute().interrupt(); if(MasterFactory.getMasterExecute().isAlive()) {
MasterFactory.getMasterExecute().interrupt();
} }
@ -196,18 +196,17 @@ public class StateMachine extends StateMachineAdapter {
log.debug("[onStartFollowing] {} {}", ctx, StateServerFactory.getCluster().getServerId()); log.debug("[onStartFollowing] {} {}", ctx, StateServerFactory.getCluster().getServerId());
try { try {
if(StateFactory.getMasterExecute().isAlive()) { // 判断是否Master线程还在跑, 如果存在则中断
StateFactory.getMasterExecute().interrupt(); if(MasterFactory.getMasterExecute().isAlive()) {
MasterFactory.getMasterExecute().interrupt();
} }
var ss = StateServerFactory.getStateServer(); var ws = new WorkerState(StateServerFactory.getServerId());
var ws = new WorkerState(ss.getCluster().getServerId());
log.debug("my: {} leader id {}",StateServerFactory.getServerId(), StateServerFactory.getLeaderId()); log.debug("my: {} leader id {}",StateServerFactory.getServerId(), StateServerFactory.getLeaderId());
var op = new Operate(OperateType.PUT, ws); var op = new Operate(OperateType.PUT, ws);
Operate.CallOperate(op, new OperateClosure() { Operate.CallOperate(op, new GenericClosure() {
@Override @Override
public void run(Status status) { public void run(Status status) {
log.info("{} {}", status, this.getResponse()); log.info("{} {}", status, this.getResponse());
@ -237,7 +236,7 @@ public class StateMachine extends StateMachineAdapter {
log.debug("my: {} leader id {}",StateServerFactory.getServerId(), StateServerFactory.getLeaderId()); log.debug("my: {} leader id {}",StateServerFactory.getServerId(), StateServerFactory.getLeaderId());
var op = new Operate(OperateType.PUT, ws); var op = new Operate(OperateType.PUT, ws);
Operate.CallOperate(op, new OperateClosure() { Operate.CallOperate(op, new GenericClosure() {
@Override @Override
public void run(Status status) { public void run(Status status) {
log.info("{} {}", status, this.getResponse()); log.info("{} {}", status, this.getResponse());

View File

@ -39,6 +39,7 @@ import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory;
import com.alipay.sofa.jraft.util.BytesUtil; import com.alipay.sofa.jraft.util.BytesUtil;
import com.alipay.sofa.jraft.util.Endpoint; import com.alipay.sofa.jraft.util.Endpoint;
import com.alipay.sofa.jraft.util.ThreadPoolUtil; import com.alipay.sofa.jraft.util.ThreadPoolUtil;
import com.yuandian.dataflow.statemachine.closure.GenericClosure;
import com.yuandian.dataflow.statemachine.operate.Operate; import com.yuandian.dataflow.statemachine.operate.Operate;
import com.yuandian.dataflow.statemachine.rpc.RaftResponse; import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
@ -73,7 +74,7 @@ public class StateServerFactory {
public static boolean isLeader() { public static boolean isLeader() {
return ss.getNode().isLeader() ; return ss.node.isLeader() ;
} }
@ -82,11 +83,15 @@ public class StateServerFactory {
} }
public static PeerId getServerId() { public static PeerId getServerId() {
return ss.getCluster().getServerId(); return ss.cluster.getServerId();
} }
public static Node getNode() { public static Node getNode() {
return ss.getNode() ; return ss.node ;
}
public static Node getRaftNode() {
return ss.cluster.getRaftNode() ;
} }
public static RpcClient getRpcClient() { public static RpcClient getRpcClient() {
@ -179,12 +184,13 @@ public class StateServerFactory {
} }
} }
public void readIndexState(Consumer<State> dofunc) { public void readIndexState(GenericClosure<State> closure) {
getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() { getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {
@Override @Override
public void run(Status status, long index, byte[] reqCtx) { public void run(Status status, long index, byte[] reqCtx) {
if( status.isOk()) { if( status.isOk()) {
dofunc.accept(ss.fsm.getState()); closure.success(ss.fsm.getState());
closure.run(status);
} }
} }
} ); } );
@ -192,7 +198,7 @@ public class StateServerFactory {
public void applyOperate(Operate op, OperateClosure closure) { public void applyOperate(Operate op, GenericClosure closure) {
// 所有的提交都必须再leader进行 // 所有的提交都必须再leader进行
if (!ss.isLeader()) { if (!ss.isLeader()) {
ss.handlerNotLeaderError(closure); ss.handlerNotLeaderError(closure);
@ -227,7 +233,7 @@ public class StateServerFactory {
return response; return response;
} }
public void handlerNotLeaderError(final OperateClosure closure) { public void handlerNotLeaderError(final GenericClosure closure) {
closure.failure("Not leader.", redirect().getRedirect()); closure.failure("Not leader.", redirect().getRedirect());
closure.run(new Status(RaftError.EPERM, "Not leader")); closure.run(new Status(RaftError.EPERM, "Not leader"));
} }

View File

@ -1,4 +1,4 @@
package com.yuandian.dataflow.statemachine; package com.yuandian.dataflow.statemachine.closure;
import com.alipay.sofa.jraft.Closure; import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.Status; import com.alipay.sofa.jraft.Status;
@ -19,31 +19,28 @@ import org.slf4j.LoggerFactory;
@Getter @Getter
@Setter @Setter
@ToString @ToString
public abstract class OperateClosure implements Closure { public abstract class GenericClosure<T> implements Closure {
// 状态机的统一响应 // 状态机的统一响应
private RaftResponse response; private RaftResponse<T> response;
// 代表任务状态 // 代表任务状态
private Operate value; private T value;
public GenericClosure() {
public OperateClosure() {
} }
public void failure(final String errorMsg, final PeerId redirect) { public void failure(final String errorMsg, final PeerId redirect) {
final RaftResponse response = new RaftResponse(); final RaftResponse<T> response = new RaftResponse<T>();
response.setSuccess(false); response.setSuccess(false);
response.setMsg(errorMsg); response.setMsg(errorMsg);
response.setRedirect(redirect); response.setRedirect(redirect);
setResponse(response); setResponse(response);
} }
public void success(final Operate value) { public void success(final T value) {
final RaftResponse response = new RaftResponse(); final RaftResponse<T> response = new RaftResponse<T>();
response.setOperate(value); response.setValue(value);
response.setSuccess(true); response.setSuccess(true);
setResponse(response); setResponse(response);
} }

View File

@ -0,0 +1,5 @@
package com.yuandian.dataflow.statemachine.closure;
public class StateClosure {
}

View File

@ -4,8 +4,8 @@ import java.io.Serializable;
import com.alipay.sofa.jraft.error.RemotingException; import com.alipay.sofa.jraft.error.RemotingException;
import com.alipay.sofa.jraft.rpc.InvokeCallback; import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.yuandian.dataflow.statemachine.OperateClosure;
import com.yuandian.dataflow.statemachine.StateServerFactory; import com.yuandian.dataflow.statemachine.StateServerFactory;
import com.yuandian.dataflow.statemachine.closure.GenericClosure;
import com.yuandian.dataflow.statemachine.rpc.OperateProcessor; import com.yuandian.dataflow.statemachine.rpc.OperateProcessor;
import com.yuandian.dataflow.statemachine.rpc.RaftResponse; import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
import com.yuandian.dataflow.statemachine.state.WorkerState; import com.yuandian.dataflow.statemachine.state.WorkerState;
@ -45,14 +45,21 @@ public class Operate implements Serializable {
return; return;
}; };
public static void CallOperate(Operate op, OperateClosure closure) { /**
* 调用操作设置
* @param op 传入的操作类
* @param closure 回调函数. Operate为返回值
*/
public static void CallOperate(Operate op, GenericClosure<Operate> closure) {
var ss = StateServerFactory.getStateServer(); var ss = StateServerFactory.getStateServer();
// 如果是leader 就直接提交
if (StateServerFactory.isLeader()) { if (StateServerFactory.isLeader()) {
ss.applyOperate(op, closure); ss.applyOperate(op, closure);
return; return;
} }
// 非leader 转发请求 统一有leader处理
var request = new OperateProcessor.OperateRequest(); var request = new OperateProcessor.OperateRequest();
request.setOperate(op); request.setOperate(op);
@ -64,14 +71,14 @@ public class Operate implements Serializable {
@Override @Override
public void complete(Object result, Throwable err) { public void complete(Object result, Throwable err) {
log.info("{}", result); log.info("{}", result);
var resp = (RaftResponse) result; //TODO: 解决回调的次序问题
var resp = (RaftResponse<Operate>) result;
closure.setResponse(resp); closure.setResponse(resp);
closure.success(resp.getOperate()); closure.success(resp.getValue());
} }
}, 5000); }, 5000);
} catch (InterruptedException | RemotingException e) { } catch (InterruptedException | RemotingException e) {
// TODO Auto-generated catch block
closure.failure("failure", null); closure.failure("failure", null);
log.info("{}", e.toString()); log.info("{}", e.toString());
} }

View File

@ -17,8 +17,8 @@ import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.rpc.RpcContext; import com.alipay.sofa.jraft.rpc.RpcContext;
import com.alipay.sofa.jraft.rpc.RpcProcessor; import com.alipay.sofa.jraft.rpc.RpcProcessor;
import com.yuandian.dataflow.statemachine.StateServerFactory; import com.yuandian.dataflow.statemachine.StateServerFactory;
import com.yuandian.dataflow.statemachine.OperateClosure;
import com.yuandian.dataflow.statemachine.StateServerFactory.StateServer; import com.yuandian.dataflow.statemachine.StateServerFactory.StateServer;
import com.yuandian.dataflow.statemachine.closure.GenericClosure;
import com.yuandian.dataflow.statemachine.operate.Operate; import com.yuandian.dataflow.statemachine.operate.Operate;
import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft; import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft;
import com.yuandian.dataflow.statemachine.state.State; import com.yuandian.dataflow.statemachine.state.State;
@ -64,7 +64,7 @@ public class OperateProcessor implements RpcProcessor<OperateProcessor.OperateRe
log.info("request: {}", request); log.info("request: {}", request);
final OperateClosure closure = new OperateClosure() { final GenericClosure closure = new GenericClosure() {
@Override @Override
public void run(Status status) { public void run(Status status) {
rpcCtx.sendResponse(getResponse()); rpcCtx.sendResponse(getResponse());

View File

@ -27,11 +27,11 @@ import lombok.extern.slf4j.Slf4j;
@Getter @Getter
@Setter @Setter
@ToString @ToString
public class RaftResponse implements Serializable { public class RaftResponse<T> implements Serializable {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
private Operate operate; private T value;
private boolean success; private boolean success;

View File

@ -1,146 +0,0 @@
/**
* description
*
* @author eson
*2022年7月20日-10:00:05
*/
package com.yuandian.dataflow.statemachine.state;
import java.time.Instant;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import com.alibaba.nacos.shaded.io.grpc.netty.shaded.io.netty.handler.codec.http.HttpContentEncoder.Result;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.error.RemotingException;
import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.google.protobuf.Any;
import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest;
import com.yuandian.dataflow.proto.msgtype.BacktrackingFlowOuterClass;
import com.yuandian.dataflow.statemachine.StateServerFactory;
import com.yuandian.dataflow.statemachine.operate.Operate;
import com.yuandian.dataflow.statemachine.operate.Operate.OperateType;
import com.yuandian.dataflow.statemachine.OperateClosure;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.var;
import lombok.extern.slf4j.Slf4j;
/**
* description
*
* @author eson
* 2022年7月20日-10:00:05
*/
@Slf4j
@Getter
@Setter
@ToString
public class StateFactory {
@Getter
@Setter
@ToString
public static class PeerIdCap {
private PeerId peer;
private long cap;
public PeerIdCap(PeerId pid, long cap) {
this.peer = pid;
this.cap = cap;
}
}
public static Thread masterExecute = new Thread(new Runnable() {
@Override
public void run() {
try {
while (true) {
log.debug("master execute {}", StateServerFactory.getServerId());
var alivePeers = StateServerFactory.getCluster().getRaftNode().listAlivePeers();
log.debug("master execute {}", StateServerFactory.getCluster().getRaftNode().listAlivePeers());
if (alivePeers != null) {
var ss = StateServerFactory.getStateServer();
// var state = ss.getFsm().getState();
ss.readIndexState((state) -> {
alivePeers.forEach((peer) -> {
WorkerState ws = state.getWorkers().get(peer);
if (ws != null) {
var cap = 100 - ws.getTaskQueueSize();
log.debug("cap :{} peer: {}", cap, peer);
if (cap <= 0) {
return;
}
ws.setUpdateAt(Instant.now());
ws.setTaskQueueSize(100);
var request = new PacketsRequest();
for (int i = 0; i < cap; i++) {
var p = Any.pack(
BacktrackingFlowOuterClass.BacktrackingFlow.newBuilder()
.setTableId(10086)
.build());
request.getPackets().add(p);
}
var op = new Operate(OperateType.PUT, ws);
Operate.CallOperate(op, new OperateClosure() {
@Override
public void run(Status status) {
log.info("{}", status);
try {
ss.getRpcClient().invokeAsync(peer.getEndpoint(),
request, new InvokeCallback() {
@Override
public void complete(Object result, Throwable err) {
log.info("{}", result);
}
}, 5000);
} catch (InterruptedException | RemotingException e) {
log.info("error send packets {}", e.toString());
}
}
});
}
});
});
// ss.applyState(state, new SyncClosure<State>() {
// public void run(Status status) {
// log.debug("{}", status);
// };
// });
}
Thread.sleep(5000);
}
} catch (InterruptedException e) {
log.info("{}", e.toString());
}
}
});
public static Thread getMasterExecute() {
return masterExecute;
}
public static void Init() {
}
}

View File

@ -23,7 +23,8 @@ import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
// import org.springframework.expression.spel.ast.FunctionReference; // import org.springframework.expression.spel.ast.FunctionReference;
import com.mongodb.MongoClient; import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.model.InsertManyOptions; import com.mongodb.client.model.InsertManyOptions;
import com.yuandian.dataflow.projo.Doc; import com.yuandian.dataflow.projo.Doc;
@ -122,75 +123,77 @@ public class AppTest {
} }
} }
@Test // @Test
public void Mongodb() throws InterruptedException { // public void Mongodb() throws InterruptedException {
ArrayList<Thread> execs = new ArrayList<>(); // ArrayList<Thread> execs = new ArrayList<>();
final Metric metric = new Metric(); // final Metric metric = new Metric();
metric.start(); // metric.start();
for (int c = 0; c < 10; c++) { // for (int c = 0; c < 10; c++) {
Thread exec = new Thread(() -> { // Thread exec = new Thread(() -> {
@Cleanup // @Cleanup
MongoClient mgo = new MongoClient("localhost", 27017); // MongoClient mgo = MongoClients.create("mongodb://yuandian:yuandian123@192.168.1.113:27017");
log.info("msg");
long LoopNumber = 5;
long BatchSize = 20000;
var db = mgo.getDatabase("yuandian"); // log.info("msg");
var cltdoc = db.getCollection("doc");
for (int n = 0; n < LoopNumber; n++) { // long LoopNumber = 5;
// long BatchSize = 20000;
metric.push(() -> { // var db = mgo.getDatabase("yuandian");
// var cltdoc = db.getCollection("doc");
List<Doc> documents = new ArrayList<>(); // for (int n = 0; n < LoopNumber; n++) {
Random r = new Random();
for (int i = 0; i < BatchSize; i++) { // metric.push(() -> {
var doc = new Doc(); // List<Doc> documents = new ArrayList<>();
var datadoc = new Document(); // Random r = new Random();
doc.append("code", r.nextInt(100)); // for (int i = 0; i < BatchSize; i++) {
doc.append("desc", "desc");
doc.append("ts", Instant.now());
for (int ii = 0; ii < 24; ii++) { // var doc = new Doc();
UUID uid = UUID.randomUUID(); // var datadoc = new Document();
datadoc
.append(uid.toString(), uid.toString());
}
doc.append("data", datadoc); // doc.append("code", r.nextInt(100));
documents.add(doc); // doc.append("desc", "desc");
} // doc.append("ts", Instant.now());
var opt = new InsertManyOptions(); // for (int ii = 0; ii < 24; ii++) {
cltdoc.insertMany(documents, opt); // UUID uid = UUID.randomUUID();
return BatchSize; // datadoc
}); // .append(uid.toString(), uid.toString());
} // }
});
exec.start();
execs.add(exec);
}
;
execs.forEach((e) -> { // doc.append("data", datadoc);
try { // documents.add(doc);
e.join(); // }
} catch (InterruptedException e1) {
e1.printStackTrace();
}
});
metric.close(); // var opt = new InsertManyOptions();
// cltdoc.insertMany(documents, opt);
// return BatchSize;
// });
// }
// });
// exec.start();
// execs.add(exec);
// }
// ;
} // execs.forEach((e) -> {
// try {
// e.join();
// } catch (InterruptedException e1) {
// e1.printStackTrace();
// }
// });
// metric.close();
// }

View File

@ -1,6 +1,6 @@
package com.yuandian.dataflow; package com.yuandian.dataflow;
import com.mongodb.MongoClient;
import com.mongodb.MongoCredential; import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress; import com.mongodb.ServerAddress;
import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCollection;
@ -18,73 +18,73 @@ import java.util.List;
public class MongodbTest { public class MongodbTest {
public static <T> void insertMsgToMongoDB(T obj) { public static <T> void insertMsgToMongoDB(T obj) {
try { // try {
ServerAddress serverAddress = new ServerAddress("192.168.1.113", 27017); // ServerAddress serverAddress = new ServerAddress("192.168.1.113", 27017);
List<ServerAddress> addrs = new ArrayList<>(); // List<ServerAddress> addrs = new ArrayList<>();
addrs.add(serverAddress); // addrs.add(serverAddress);
MongoCredential credential = MongoCredential.createScramSha1Credential("yd-rw", "admin", "yuandian123".toCharArray()); // MongoCredential credential = MongoCredential.createScramSha1Credential("yd-rw", "admin", "yuandian123".toCharArray());
List<MongoCredential> credentials = new ArrayList<>(); // List<MongoCredential> credentials = new ArrayList<>();
credentials.add(credential); // credentials.add(credential);
MongoClient mongoClient = new MongoClient(addrs, credentials); // MongoClient mongoClient = new MongoClient(addrs, credentials);
MongoDatabase db = mongoClient.getDatabase("yd-base"); // MongoDatabase db = mongoClient.getDatabase("yd-base");
// todo 修改名字 // // todo 修改名字
MongoCollection<Document> collection = db.getCollection("lxy-test"); // MongoCollection<Document> collection = db.getCollection("lxy-test");
collection.insertOne(obj2Doc(obj)); // collection.insertOne(obj2Doc(obj));
System.err.println("insert success"); // System.err.println("insert success");
} catch (Exception e) { // } catch (Exception e) {
log.info("{}", e.toString()); // log.info("{}", e.toString());
} // }
} // }
public static <T> Document obj2Doc(T obj) throws Exception { // public static <T> Document obj2Doc(T obj) throws Exception {
Document doc = new Document(); // Document doc = new Document();
Field[] fields = obj.getClass().getDeclaredFields(); // Field[] fields = obj.getClass().getDeclaredFields();
for (Field field : fields) { // for (Field field : fields) {
String varName = field.getName(); // String varName = field.getName();
boolean accessFlag = field.isAccessible(); // boolean accessFlag = field.isAccessible();
if (!accessFlag) { // if (!accessFlag) {
field.setAccessible(true); // field.setAccessible(true);
} // }
Object param = field.get(obj); // Object param = field.get(obj);
if (param == null) { // if (param == null) {
continue; // continue;
} else if (param instanceof Integer) { // } else if (param instanceof Integer) {
int value = ((Integer) param).intValue(); // int value = ((Integer) param).intValue();
doc.put(varName, value); // doc.put(varName, value);
} else if (param instanceof String) { // } else if (param instanceof String) {
String value = (String) param; // String value = (String) param;
doc.put(varName, value); // doc.put(varName, value);
} else if (param instanceof Double) { // } else if (param instanceof Double) {
double value = ((Double) param).doubleValue(); // double value = ((Double) param).doubleValue();
doc.put(varName, value); // doc.put(varName, value);
} else if (param instanceof Float) { // } else if (param instanceof Float) {
float value = ((Float) param).floatValue(); // float value = ((Float) param).floatValue();
doc.put(varName, value); // doc.put(varName, value);
} else if (param instanceof Long) { // } else if (param instanceof Long) {
long value = ((Long) param).longValue(); // long value = ((Long) param).longValue();
doc.put(varName, value); // doc.put(varName, value);
} else if (param instanceof Boolean) { // } else if (param instanceof Boolean) {
boolean value = ((Boolean) param).booleanValue(); // boolean value = ((Boolean) param).booleanValue();
doc.put(varName, value); // doc.put(varName, value);
} // }
field.setAccessible(accessFlag); // field.setAccessible(accessFlag);
} // }
return doc; // return doc;
} // }
public static <T> T doc2Obj(Document doc, Class<T> clazz) throws Exception { // public static <T> T doc2Obj(Document doc, Class<T> clazz) throws Exception {
T obj = clazz.newInstance(); // T obj = clazz.newInstance();
for (String key : doc.keySet()) { // for (String key : doc.keySet()) {
Field field = clazz.getDeclaredField(key); // Field field = clazz.getDeclaredField(key);
field.setAccessible(true); // field.setAccessible(true);
field.set(obj, doc.get(key)); // field.set(obj, doc.get(key));
} // }
return obj; // return obj;
} }
} }