最新修改

This commit is contained in:
huangsimin 2022-08-12 08:58:37 +08:00
parent 38ef58dd90
commit 5530f935d4
10 changed files with 293 additions and 128 deletions

View File

@ -1,9 +1,12 @@
package com.yuandian.dataflow.controller;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
@ -16,9 +19,11 @@ import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.google.protobuf.Any;
// import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest;
import com.yuandian.dataflow.proto.msgtype.BacktrackingFlowOuterClass;
import com.yuandian.dataflow.statemachine.StateServerFactory;
import com.yuandian.dataflow.statemachine.annotations.MasterRegister;
import com.yuandian.dataflow.statemachine.master.MasterContext;
import com.yuandian.dataflow.statemachine.master.MasterExecute;
import com.yuandian.dataflow.statemachine.state.Peer;
import com.yuandian.dataflow.statemachine.state.State;
import com.yuandian.dataflow.statemachine.state.WorkerState;
import com.yuandian.dataflow.utils.PacketsManager;
@ -26,13 +31,81 @@ import com.yuandian.dataflow.utils.Utils;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@MasterRegister
public class MasterProcessor implements MasterExecute {
// ConcurrentLinkedQueue<Any> packets = new ConcurrentLinkedQueue<>();
PacketsManager packetsManager = new PacketsManager();
@Override
public void loop(MasterContext cxt) {
// TODO Auto-generated method stub
// ArrayList<Any> packets = new ArrayList<>();
// 模拟发送包的数据到该节点上
for (int i = 0; i < ThreadLocalRandom.current().nextLong(50, 100); i++) {
var p = Any.pack(BacktrackingFlowOuterClass.BacktrackingFlow
.newBuilder()
.setTableId(10086)
.build());
packetsManager.addPacket(p);
// packets.add(p);
}
try {
var giter = StateServerFactory.stateServer.getRaftServer().getGroups().iterator();
var iter = giter.next().getPeers().iterator();
if(iter.hasNext()) {
cxt.sleep(100);
return;
}
var peers = new ArrayList<Peer>();
while (iter.hasNext()) {
var curPeer = iter.next();
if (packetsManager.size() >= 100000) {
log.error("告警 数据流无法正常消耗: 缓冲packets:{} 直接放弃一部分数据", packetsManager.size());
packetsManager.discardPackets(50000);
log.debug("master({}) execute {} packets: {}", StateServerFactory.getPeer(), curPeer,
packetsManager.size());
cxt.sleep(5000);
}
}
} catch (IOException e) {
e.printStackTrace();
}
// // 必须复制. raft有一直使用该list
// var alivePeers = List.copyOf(StateFactory.getRaftNode().listAlivePeers());
// if ( Operate.packetsManager.size() >= 100000) {
// log.error("告警 数据流无法正常消耗: 缓冲packets:{} 直接放弃一部分数据",
// Operate.packetsManager.size());
// Operate.packetsManager.discardPackets(50000);
// log.debug("master({}) execute {} packets: {}", StateFactory.getServerId(),
// alivePeers,
// Operate.packetsManager.size());
// cxt.sleep(5000);
// } else {
// // log.debug("master({}) execute {} packets: {}", StateFactory.getServerId(),
// // alivePeers, Operate.packetsManager.size());
// }
// if (alivePeers == null) {
// cxt.sleep(100); // 休眠100毫秒.
// return;
// }
// PeerId[] peers = new PeerId[alivePeers.size()];
// alivePeers.toArray(peers);
// // 等待全部反馈后才能进入下次循环
}
@ -42,8 +115,6 @@ public class MasterProcessor implements MasterExecute {
// private final int MAX_TASKS = 1000;
// private final int DEFAULT_ASYNC_TIMEOUT = 5000;
// @Override
// public void loop(MasterContext cxt) {
@ -63,9 +134,11 @@ public class MasterProcessor implements MasterExecute {
// var alivePeers = List.copyOf(StateFactory.getRaftNode().listAlivePeers());
// if ( Operate.packetsManager.size() >= 100000) {
// log.error("告警 数据流无法正常消耗: 缓冲packets:{} 直接放弃一部分数据", Operate.packetsManager.size());
// log.error("告警 数据流无法正常消耗: 缓冲packets:{} 直接放弃一部分数据",
// Operate.packetsManager.size());
// Operate.packetsManager.discardPackets(50000);
// log.debug("master({}) execute {} packets: {}", StateFactory.getServerId(), alivePeers,
// log.debug("master({}) execute {} packets: {}", StateFactory.getServerId(),
// alivePeers,
// Operate.packetsManager.size());
// cxt.sleep(5000);
// } else {
@ -82,8 +155,8 @@ public class MasterProcessor implements MasterExecute {
// alivePeers.toArray(peers);
// // 等待全部反馈后才能进入下次循环
// Operate.CallOperate(new Operate(OperateType.ALLOCATE_PACKETS, alivePeers), new GenericClosure() {
// Operate.CallOperate(new Operate(OperateType.ALLOCATE_PACKETS, alivePeers),
// new GenericClosure() {
// @Override
// public void run(Status status) {

View File

@ -85,7 +85,7 @@ public class CollectPackets extends CollectPacketsServerImplBase {
log.info("次序:{} 条数: {}, {}:ms", count, i, Duration.between(now, Instant.now()).toMillis());
}
} catch (Exception e) {
log.info("{}", e.toString());
log.error("{}", e.toString());
} finally {
channel.shutdown().awaitTermination(1, TimeUnit.SECONDS);
}
@ -103,7 +103,7 @@ public class CollectPackets extends CollectPacketsServerImplBase {
// MongodbTest.insertMsgToMongoDB(result);
} catch (InvalidProtocolBufferException e) {
log.info("{}", e.toString());
log.error("{}", e.toString());
}
return null;
});
@ -114,7 +114,7 @@ public class CollectPackets extends CollectPacketsServerImplBase {
// MongodbTest.insertMsgToMongoDB(result);
} catch (InvalidProtocolBufferException e) {
log.info("{}", e.toString());
log.error("{}", e.toString());
}
return null;
});*/
@ -135,7 +135,7 @@ public class CollectPackets extends CollectPacketsServerImplBase {
// System.out.println("result:" + response.getBody());
} catch (InvalidProtocolBufferException e) {
log.info("{}", e.toString());
log.error("{}", e.toString());
}
return null;
});
@ -147,7 +147,7 @@ public class CollectPackets extends CollectPacketsServerImplBase {
// MongodbTest.insertMsgToMongoDB(result);
} catch (InvalidProtocolBufferException e) {
log.info("{}", e.toString());
log.error("{}", e.toString());
}
return null;
});
@ -158,7 +158,7 @@ public class CollectPackets extends CollectPacketsServerImplBase {
// MongodbTest.insertMsgToMongoDB(result);
} catch (InvalidProtocolBufferException e) {
log.info("{}", e.toString());
log.error("{}", e.toString());
}
return null;
});
@ -169,7 +169,7 @@ public class CollectPackets extends CollectPacketsServerImplBase {
// MongodbTest.insertMsgToMongoDB(result);
} catch (InvalidProtocolBufferException e) {
log.info("{}", e.toString());
log.error("{}", e.toString());
}
return null;
});
@ -180,7 +180,7 @@ public class CollectPackets extends CollectPacketsServerImplBase {
// MongodbTest.insertMsgToMongoDB(result);
} catch (InvalidProtocolBufferException e) {
log.info("{}", e.toString());
log.error("{}", e.toString());
}
return null;
});
@ -191,7 +191,7 @@ public class CollectPackets extends CollectPacketsServerImplBase {
// MongodbTest.insertMsgToMongoDB(result);
} catch (InvalidProtocolBufferException e) {
log.info("{}", e.toString());
log.error("{}", e.toString());
}
return null;
});
@ -202,7 +202,7 @@ public class CollectPackets extends CollectPacketsServerImplBase {
// MongodbTest.insertMsgToMongoDB(result);
} catch (InvalidProtocolBufferException e) {
log.info("{}", e.toString());
log.error("{}", e.toString());
}
return null;
});
@ -213,7 +213,7 @@ public class CollectPackets extends CollectPacketsServerImplBase {
// MongodbTest.insertMsgToMongoDB(result);
} catch (InvalidProtocolBufferException e) {
log.info("{}", e.toString());
log.error("{}", e.toString());
}
return null;
});
@ -224,7 +224,7 @@ public class CollectPackets extends CollectPacketsServerImplBase {
// MongodbTest.insertMsgToMongoDB(result);
} catch (InvalidProtocolBufferException e) {
log.info("{}", e.toString());
log.error("{}", e.toString());
}
return null;
});
@ -235,7 +235,7 @@ public class CollectPackets extends CollectPacketsServerImplBase {
// MongodbTest.insertMsgToMongoDB(result);
} catch (InvalidProtocolBufferException e) {
log.info("{}", e.toString());
log.error("{}", e.toString());
}
return null;
});*/

View File

@ -83,7 +83,7 @@ public class StateMachine extends BaseStateMachine {
private State state = new State();
private AtomicInteger counter = new AtomicInteger(0);
private AtomicBoolean leader = new AtomicBoolean(false);
@ -150,7 +150,7 @@ public class StateMachine extends BaseStateMachine {
// serialize the counter object and write it into the snapshot file
try {
ObjectOutputStream out = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(snapshotFile)));
out.writeObject(counter);
// out.writeObject(counter);
out.close();
} catch (IOException ioe) {
LOG.warn("Failed to write snapshot file \"" + snapshotFile
@ -194,9 +194,7 @@ public class StateMachine extends BaseStateMachine {
setLastAppliedTermIndex(last);
// read, cast and set the counter
counter = JavaUtils.cast(in.readObject());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e);
// counter = JavaUtils.cast(in.readObject());
}
return last.getIndex();
@ -206,8 +204,11 @@ public class StateMachine extends BaseStateMachine {
public void notifyLeaderChanged(RaftGroupMemberId groupMemberId, RaftPeerId newLeaderId) {
log.info("msg {}", groupMemberId.getPeerId());
if (StateServerFactory.getCurrentPeerId().getRaftPeerId() == null) {
StateServerFactory.getCurrentPeerId().setRaftPeerId(groupMemberId.getPeerId().toString());
if (StateServerFactory.getPeer().getRaftPeerId() == null) {
var current = StateServerFactory.getPeer();
current.setRaftPeerId(groupMemberId.getPeerId().toString());
var raftPeer = StateServerFactory.stateServer.getRaftGroup().getPeer(groupMemberId.getPeerId());
current.setAddress(raftPeer.getAddress());
}
leader.set(newLeaderId == groupMemberId.getPeerId());
@ -216,9 +217,9 @@ public class StateMachine extends BaseStateMachine {
asyncExecutor.execute(() -> {
log.info("asyncExecutor");
var op = new Operate(OperateType.PUT_WORKERSTATE, new WorkerState(StateServerFactory.getCurrentPeerId()));
var op = new Operate(OperateType.PUT_WORKERSTATE, new WorkerState(StateServerFactory.getPeer()));
try {
var reply = StateServerFactory.send(op);
var reply = StateServerFactory.raftSend(op);
log.info("{}", MessageUtils.<RaftReply>fromMessage(reply.getMessage()));
} catch (IOException | ClassNotFoundException e) {
log.error("{}",e.toString());
@ -286,12 +287,13 @@ public class StateMachine extends BaseStateMachine {
try {
var data = entry.getStateMachineLogEntry().getLogData();
final var op = MessageUtils.<Operate>fromByteString(data);
final var op = MessageUtils.<Operate>fromByteString(entry.getStateMachineLogEntry().getLogData());
try (var wlock = writeLock()) {
switch (op.getType()) {
case ALLOCATE_PACKETS:
var peers = (WorkerState) op.getValue();
break;
case PUT_WORKERSTATE:
var ws = (WorkerState) op.getValue();
@ -313,7 +315,7 @@ public class StateMachine extends BaseStateMachine {
}
} catch (IOException | ClassNotFoundException e) {
log.info("{}", e.toString());
log.error("{}", e.toString());
return CompletableFuture.completedFuture(Status.setError(reply, "错误op"));
}

View File

@ -32,6 +32,7 @@ import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.util.NetUtils;
import com.yuandian.dataflow.statemachine.grpc.ProcessorClient;
import com.yuandian.dataflow.statemachine.grpc.ProcessorServer;
import com.yuandian.dataflow.statemachine.master.MasterFactory;
import com.yuandian.dataflow.statemachine.state.Peer;
@ -70,7 +71,9 @@ public final class StateServer implements Closeable {
private RaftClient raftClient ;
private final RaftServer raftServer;
private final RaftGroup raftGroupConf;
private final RaftGroup raftGroup;
private final ProcessorClient processorClient;
private final ProcessorServer processorServer;
private Peer peer = new Peer() ;
@ -95,25 +98,26 @@ public final class StateServer implements Closeable {
//create the counter state machine which hold the counter value
StateMachine stateMachine = new StateMachine();
raftGroupConf = RaftGroup.valueOf(
raftGroup = RaftGroup.valueOf(
RaftGroupId.valueOf(CLUSTER_GROUP_ID), peers);
//create and start the Raft server
this.raftServer = RaftServer.newBuilder()
.setGroup(raftGroupConf)
.setGroup(raftGroup)
.setProperties(properties)
.setServerId(curpeer.getId())
.setStateMachine(stateMachine)
.build();
raftClient = buildClient(raftGroupConf);
this.raftClient = buildClient(raftGroup);
// create RaftClient
this.processorServer = new ProcessorServer();
this.processorServer.getGrpcServer().start();
this.peer.setProcessorPort(this.processorServer.getGrpcServer().getPort());
this.processorClient = new ProcessorClient();
}
// block

View File

@ -9,6 +9,8 @@ import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftPeer;
import com.yuandian.dataflow.proto.ProcessorServerGrpc.ProcessorServerBlockingStub;
import com.yuandian.dataflow.proto.ProcessorServerGrpc.ProcessorServerFutureStub;
import com.yuandian.dataflow.statemachine.state.Peer;
public class StateServerFactory {
@ -36,21 +38,29 @@ public class StateServerFactory {
}
public static void setCurrentPeerId(Peer peer) {
public static void setPeer(Peer peer) {
stateServer.setPeer(peer);
}
public static Peer getCurrentPeerId() {
public static Peer getPeer() {
return stateServer.getPeer();
}
public static RaftClientReply send(Message msg) throws IOException {
public static RaftClientReply raftSend(Message msg) throws IOException {
return stateServer.getRaftClient().io().send(msg);
}
public static CompletableFuture<RaftClientReply> asyncSend(Message msg) throws IOException {
public static CompletableFuture<RaftClientReply> raftSendAsync(Message msg) throws IOException {
return stateServer.getRaftClient().async().send(msg);
}
public static ProcessorServerBlockingStub processSend(Peer peer){
return stateServer.getProcessorClient().getBlockingStub(peer);
}
public static ProcessorServerFutureStub processSendAsync(Peer peer){
return stateServer.getProcessorClient().getFutureStub(peer);
}
}

View File

@ -0,0 +1,67 @@
package com.yuandian.dataflow.statemachine.grpc;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.thirdparty.io.grpc.internal.ManagedChannelImplBuilder;
import com.yuandian.dataflow.proto.ProcessorServerGrpc;
import com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest;
import com.yuandian.dataflow.proto.Processor.PacketsProcessorRequestOrBuilder;
import com.yuandian.dataflow.proto.ProcessorServerGrpc.ProcessorServerBlockingStub;
import com.yuandian.dataflow.proto.ProcessorServerGrpc.ProcessorServerFutureStub;
import com.yuandian.dataflow.statemachine.StateServerFactory;
import com.yuandian.dataflow.statemachine.state.Peer;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
public class ProcessorClient {
private HashMap<Peer, ManagedChannel> peerChannelMap = new HashMap();
public ProcessorServerBlockingStub getBlockingStub(Peer peer) {
ManagedChannel channel ;
synchronized(peerChannelMap) {
channel = peerChannelMap.get(peer);
if(channel == null) {
var managedChannelBuilder = ManagedChannelBuilder.forAddress(peer.getAddress(), peer.getProcessorPort());
channel = managedChannelBuilder.maxInboundMessageSize(Integer.MAX_VALUE).usePlaintext().build();
peerChannelMap.put(peer, channel);
}
}
return ProcessorServerGrpc.newBlockingStub(channel);
}
public ProcessorServerFutureStub getFutureStub(Peer peer) {
ManagedChannel channel ;
synchronized(peerChannelMap) {
channel = peerChannelMap.get(peer);
if(channel == null) {
var managedChannelBuilder = ManagedChannelBuilder.forAddress(peer.getAddress(), peer.getProcessorPort());
channel = managedChannelBuilder.maxInboundMessageSize(Integer.MAX_VALUE).usePlaintext().build();
peerChannelMap.put(peer, channel);
}
}
return ProcessorServerGrpc.newFutureStub(channel);
}
public static void main(String[] args) {
var managedChannelBuilder = ManagedChannelBuilder.forAddress("10.1.1.117", 60017);
// var managedChannelBuilder = ManagedChannelBuilder.forAddress("127.0.0.1", 60017);
var channel = managedChannelBuilder.maxInboundMessageSize(Integer.MAX_VALUE).usePlaintext().build();
var stub = ProcessorServerGrpc.newBlockingStub(channel);
var request = PacketsProcessorRequest.newBuilder();
var response = stub.allPackets(request.build());
}
}

View File

@ -20,6 +20,7 @@ import com.yuandian.dataflow.statemachine.master.MasterFactory;
import com.yuandian.dataflow.utils.Utils;
import io.grpc.BindableService;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerServiceDefinition;
@ -34,11 +35,12 @@ import lombok.extern.slf4j.Slf4j;
public class ProcessorServer {
private Server grpcServer;
public ProcessorServer() {
ServerBuilder builder = ServerBuilder.forPort(0);
// 扫描注解RaftProccessor 注册
@ -62,7 +64,7 @@ public class ProcessorServer {
builder.addService( (BindableService)pRaftClass.getDeclaredConstructor().newInstance() );
} catch (InstantiationException | IllegalAccessException | IllegalArgumentException
| InvocationTargetException | NoSuchMethodException | SecurityException e) {
log.info("{}",e.toString());
log.error("{}", e.toString());
}
});
@ -72,8 +74,7 @@ public class ProcessorServer {
MasterFactory.registerMasterLoop(execute);
} catch (InstantiationException | IllegalAccessException | IllegalArgumentException
| InvocationTargetException | NoSuchMethodException | SecurityException e) {
log.info("{}", e.toString());
log.error("{}", e.toString());
}
});
@ -92,6 +93,9 @@ public class ProcessorServer {
log.info("*** server shut down");
}
});
}
}

View File

@ -25,6 +25,7 @@ public class Peer implements Serializable {
}
private String raftPeerId;
private String address;
private int processorPort;
@Override

View File

@ -92,6 +92,10 @@ public class PacketsManager {
return result;
}
/**
* 丢弃数据, 保持数据量在一定值
* @param remainSize 保留范围
*/
public void discardPackets(int remainSize) {
this.lockTemp((_packets)->{
var delsize = _packets.size() - remainSize ;

View File

@ -31,7 +31,7 @@ public class MongodbTest {
// System.err.println("insert success");
// } catch (Exception e) {
// log.info("{}", e.toString());
// log.error("{}", e.toString());
// }
// }