From 5530f935d414cde46425c39fa9b8340a43c81ef1 Mon Sep 17 00:00:00 2001 From: huangsimin <474420502@qq.com> Date: Fri, 12 Aug 2022 08:58:37 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9C=80=E6=96=B0=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dataflow/controller/MasterProcessor.java | 241 ++++++++++++------ .../dataflow/grpc/CollectPackets.java | 26 +- .../dataflow/statemachine/StateMachine.java | 28 +- .../dataflow/statemachine/StateServer.java | 16 +- .../statemachine/StateServerFactory.java | 20 +- .../statemachine/grpc/ProcessorClient.java | 67 +++++ .../statemachine/grpc/ProcessorServer.java | 16 +- .../dataflow/statemachine/state/Peer.java | 1 + .../dataflow/utils/PacketsManager.java | 4 + .../com/yuandian/dataflow/MongodbTest.java | 2 +- 10 files changed, 293 insertions(+), 128 deletions(-) create mode 100644 src/main/java/com/yuandian/dataflow/statemachine/grpc/ProcessorClient.java diff --git a/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java b/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java index 1292226..2848928 100644 --- a/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java +++ b/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java @@ -1,9 +1,12 @@ package com.yuandian.dataflow.controller; +import java.io.IOException; import java.time.Instant; import java.util.ArrayList; import java.util.List; +import java.util.Queue; import java.util.Random; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; @@ -16,122 +19,192 @@ import com.alipay.sofa.jraft.rpc.InvokeCallback; import com.google.protobuf.Any; // import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest; import com.yuandian.dataflow.proto.msgtype.BacktrackingFlowOuterClass; +import com.yuandian.dataflow.statemachine.StateServerFactory; import com.yuandian.dataflow.statemachine.annotations.MasterRegister; import com.yuandian.dataflow.statemachine.master.MasterContext; import com.yuandian.dataflow.statemachine.master.MasterExecute; +import com.yuandian.dataflow.statemachine.state.Peer; import com.yuandian.dataflow.statemachine.state.State; import com.yuandian.dataflow.statemachine.state.WorkerState; import com.yuandian.dataflow.utils.PacketsManager; import com.yuandian.dataflow.utils.Utils; import lombok.extern.slf4j.Slf4j; - @Slf4j @MasterRegister public class MasterProcessor implements MasterExecute { + + // ConcurrentLinkedQueue packets = new ConcurrentLinkedQueue<>(); + + PacketsManager packetsManager = new PacketsManager(); + @Override public void loop(MasterContext cxt) { - // TODO Auto-generated method stub - + // ArrayList packets = new ArrayList<>(); + // 模拟发送包的数据到该节点上 + + for (int i = 0; i < ThreadLocalRandom.current().nextLong(50, 100); i++) { + var p = Any.pack(BacktrackingFlowOuterClass.BacktrackingFlow + .newBuilder() + .setTableId(10086) + .build()); + packetsManager.addPacket(p); + // packets.add(p); + } + + try { + + var giter = StateServerFactory.stateServer.getRaftServer().getGroups().iterator(); + var iter = giter.next().getPeers().iterator(); + + if(iter.hasNext()) { + cxt.sleep(100); + return; + } + + var peers = new ArrayList(); + while (iter.hasNext()) { + + + + var curPeer = iter.next(); + + if (packetsManager.size() >= 100000) { + log.error("告警 数据流无法正常消耗: 缓冲packets:{} 直接放弃一部分数据", packetsManager.size()); + packetsManager.discardPackets(50000); + log.debug("master({}) execute {} packets: {}", StateServerFactory.getPeer(), curPeer, + packetsManager.size()); + cxt.sleep(5000); + } + } + } catch (IOException e) { + e.printStackTrace(); + } + + // // 必须复制. raft有一直使用该list + // var alivePeers = List.copyOf(StateFactory.getRaftNode().listAlivePeers()); + + // if ( Operate.packetsManager.size() >= 100000) { + // log.error("告警 数据流无法正常消耗: 缓冲packets:{} 直接放弃一部分数据", + // Operate.packetsManager.size()); + // Operate.packetsManager.discardPackets(50000); + // log.debug("master({}) execute {} packets: {}", StateFactory.getServerId(), + // alivePeers, + // Operate.packetsManager.size()); + // cxt.sleep(5000); + // } else { + // // log.debug("master({}) execute {} packets: {}", StateFactory.getServerId(), + // // alivePeers, Operate.packetsManager.size()); + // } + + // if (alivePeers == null) { + // cxt.sleep(100); // 休眠100毫秒. + // return; + // } + + // PeerId[] peers = new PeerId[alivePeers.size()]; + // alivePeers.toArray(peers); + // // 等待全部反馈后才能进入下次循环 + } // /** - // * 最大任务数限制 - // */ + // * 最大任务数限制 + // */ // private final int MAX_TASKS = 1000; // private final int DEFAULT_ASYNC_TIMEOUT = 5000; - - // @Override // public void loop(MasterContext cxt) { - // // ArrayList packets = new ArrayList<>(); - // // 模拟发送包的数据到该节点上 + // // ArrayList packets = new ArrayList<>(); + // // 模拟发送包的数据到该节点上 - // for (int i = 0; i < ThreadLocalRandom.current().nextLong(50, 100); i++) { - // var p = Any.pack(BacktrackingFlowOuterClass.BacktrackingFlow - // .newBuilder() - // .setTableId(10086) - // .build()); - // Operate.packetsManager.addPacket(p); - // // packets.add(p); - // } + // for (int i = 0; i < ThreadLocalRandom.current().nextLong(50, 100); i++) { + // var p = Any.pack(BacktrackingFlowOuterClass.BacktrackingFlow + // .newBuilder() + // .setTableId(10086) + // .build()); + // Operate.packetsManager.addPacket(p); + // // packets.add(p); + // } - // // 必须复制. raft有一直使用该list - // var alivePeers = List.copyOf(StateFactory.getRaftNode().listAlivePeers()); - - // if ( Operate.packetsManager.size() >= 100000) { - // log.error("告警 数据流无法正常消耗: 缓冲packets:{} 直接放弃一部分数据", Operate.packetsManager.size()); - // Operate.packetsManager.discardPackets(50000); - // log.debug("master({}) execute {} packets: {}", StateFactory.getServerId(), alivePeers, - // Operate.packetsManager.size()); - // cxt.sleep(5000); - // } else { - // // log.debug("master({}) execute {} packets: {}", StateFactory.getServerId(), - // // alivePeers, Operate.packetsManager.size()); - // } + // // 必须复制. raft有一直使用该list + // var alivePeers = List.copyOf(StateFactory.getRaftNode().listAlivePeers()); - // if (alivePeers == null) { - // cxt.sleep(100); // 休眠100毫秒. - // return; - // } + // if ( Operate.packetsManager.size() >= 100000) { + // log.error("告警 数据流无法正常消耗: 缓冲packets:{} 直接放弃一部分数据", + // Operate.packetsManager.size()); + // Operate.packetsManager.discardPackets(50000); + // log.debug("master({}) execute {} packets: {}", StateFactory.getServerId(), + // alivePeers, + // Operate.packetsManager.size()); + // cxt.sleep(5000); + // } else { + // // log.debug("master({}) execute {} packets: {}", StateFactory.getServerId(), + // // alivePeers, Operate.packetsManager.size()); + // } - // PeerId[] peers = new PeerId[alivePeers.size()]; - // alivePeers.toArray(peers); - // // 等待全部反馈后才能进入下次循环 - + // if (alivePeers == null) { + // cxt.sleep(100); // 休眠100毫秒. + // return; + // } - // Operate.CallOperate(new Operate(OperateType.ALLOCATE_PACKETS, alivePeers), new GenericClosure() { + // PeerId[] peers = new PeerId[alivePeers.size()]; + // alivePeers.toArray(peers); + // // 等待全部反馈后才能进入下次循环 - // @Override - // public void run(Status status) { - // int[] allocTasks = this.getValue(); - - // if(allocTasks == null) { - // cxt.sleep(5000);; - // return; - // } + // Operate.CallOperate(new Operate(OperateType.ALLOCATE_PACKETS, alivePeers), + // new GenericClosure() { - // var all = 0; - // for(var i : allocTasks) { - // all += i; - // } - // if(all != 0) { - // log.info("需要处理的任务数量[{}] :{}", StateFactory.getLeaderId(), allocTasks); - // } - // for (int i = 0; i < peers.length; i++) { + // @Override + // public void run(Status status) { + // int[] allocTasks = this.getValue(); + + // if(allocTasks == null) { + // cxt.sleep(5000);; + // return; + // } + + // var all = 0; + // for(var i : allocTasks) { + // all += i; + // } + // if(all != 0) { + // log.info("需要处理的任务数量[{}] :{}", StateFactory.getLeaderId(), allocTasks); + // } + // for (int i = 0; i < peers.length; i++) { + + // var peer = peers[i]; + // if (allocTasks[i] <= 0) { + // continue; + // } + // var packets = Operate.packetsManager.popPackets(allocTasks[i]); + // // 先提交 节点的 剩余能处理的任务数量. 然后再处理 + // var request = new PacketsRequest(); // 数据包切片 + // request.setPackets(packets); + + // try { + // StateFactory.rpcClientInvokeAsync(peer.getEndpoint(), request, + // new InvokeCallback() { + // @Override + // public void complete(Object result, Throwable err) { + + // if (err != null) { + // // TODO: 如果错误, 需要让节点恢复任务处理的状态 + // log.debug("{}", err); + // } + // // log.debug("PacketsRequest: {}", result); + // } + // }, DEFAULT_ASYNC_TIMEOUT); + // } catch (InterruptedException | RemotingException e) { + // log.info("error send packets {}", e.toString()); + // } + // } + // } + // }); - // var peer = peers[i]; - // if (allocTasks[i] <= 0) { - // continue; - // } - // var packets = Operate.packetsManager.popPackets(allocTasks[i]); - // // 先提交 节点的 剩余能处理的任务数量. 然后再处理 - // var request = new PacketsRequest(); // 数据包切片 - // request.setPackets(packets); - - // try { - // StateFactory.rpcClientInvokeAsync(peer.getEndpoint(), request, - // new InvokeCallback() { - // @Override - // public void complete(Object result, Throwable err) { - - // if (err != null) { - // // TODO: 如果错误, 需要让节点恢复任务处理的状态 - // log.debug("{}", err); - // } - // // log.debug("PacketsRequest: {}", result); - // } - // }, DEFAULT_ASYNC_TIMEOUT); - // } catch (InterruptedException | RemotingException e) { - // log.info("error send packets {}", e.toString()); - // } - // } - // } - // }); - // } } diff --git a/src/main/java/com/yuandian/dataflow/grpc/CollectPackets.java b/src/main/java/com/yuandian/dataflow/grpc/CollectPackets.java index c6bb7d5..17b68a5 100644 --- a/src/main/java/com/yuandian/dataflow/grpc/CollectPackets.java +++ b/src/main/java/com/yuandian/dataflow/grpc/CollectPackets.java @@ -85,7 +85,7 @@ public class CollectPackets extends CollectPacketsServerImplBase { log.info("次序:{} 条数: {}, {}:ms", count, i, Duration.between(now, Instant.now()).toMillis()); } } catch (Exception e) { - log.info("{}", e.toString()); + log.error("{}", e.toString()); } finally { channel.shutdown().awaitTermination(1, TimeUnit.SECONDS); } @@ -103,7 +103,7 @@ public class CollectPackets extends CollectPacketsServerImplBase { // MongodbTest.insertMsgToMongoDB(result); } catch (InvalidProtocolBufferException e) { - log.info("{}", e.toString()); + log.error("{}", e.toString()); } return null; }); @@ -114,7 +114,7 @@ public class CollectPackets extends CollectPacketsServerImplBase { // MongodbTest.insertMsgToMongoDB(result); } catch (InvalidProtocolBufferException e) { - log.info("{}", e.toString()); + log.error("{}", e.toString()); } return null; });*/ @@ -135,7 +135,7 @@ public class CollectPackets extends CollectPacketsServerImplBase { // System.out.println("result:" + response.getBody()); } catch (InvalidProtocolBufferException e) { - log.info("{}", e.toString()); + log.error("{}", e.toString()); } return null; }); @@ -147,7 +147,7 @@ public class CollectPackets extends CollectPacketsServerImplBase { // MongodbTest.insertMsgToMongoDB(result); } catch (InvalidProtocolBufferException e) { - log.info("{}", e.toString()); + log.error("{}", e.toString()); } return null; }); @@ -158,7 +158,7 @@ public class CollectPackets extends CollectPacketsServerImplBase { // MongodbTest.insertMsgToMongoDB(result); } catch (InvalidProtocolBufferException e) { - log.info("{}", e.toString()); + log.error("{}", e.toString()); } return null; }); @@ -169,7 +169,7 @@ public class CollectPackets extends CollectPacketsServerImplBase { // MongodbTest.insertMsgToMongoDB(result); } catch (InvalidProtocolBufferException e) { - log.info("{}", e.toString()); + log.error("{}", e.toString()); } return null; }); @@ -180,7 +180,7 @@ public class CollectPackets extends CollectPacketsServerImplBase { // MongodbTest.insertMsgToMongoDB(result); } catch (InvalidProtocolBufferException e) { - log.info("{}", e.toString()); + log.error("{}", e.toString()); } return null; }); @@ -191,7 +191,7 @@ public class CollectPackets extends CollectPacketsServerImplBase { // MongodbTest.insertMsgToMongoDB(result); } catch (InvalidProtocolBufferException e) { - log.info("{}", e.toString()); + log.error("{}", e.toString()); } return null; }); @@ -202,7 +202,7 @@ public class CollectPackets extends CollectPacketsServerImplBase { // MongodbTest.insertMsgToMongoDB(result); } catch (InvalidProtocolBufferException e) { - log.info("{}", e.toString()); + log.error("{}", e.toString()); } return null; }); @@ -213,7 +213,7 @@ public class CollectPackets extends CollectPacketsServerImplBase { // MongodbTest.insertMsgToMongoDB(result); } catch (InvalidProtocolBufferException e) { - log.info("{}", e.toString()); + log.error("{}", e.toString()); } return null; }); @@ -224,7 +224,7 @@ public class CollectPackets extends CollectPacketsServerImplBase { // MongodbTest.insertMsgToMongoDB(result); } catch (InvalidProtocolBufferException e) { - log.info("{}", e.toString()); + log.error("{}", e.toString()); } return null; }); @@ -235,7 +235,7 @@ public class CollectPackets extends CollectPacketsServerImplBase { // MongodbTest.insertMsgToMongoDB(result); } catch (InvalidProtocolBufferException e) { - log.info("{}", e.toString()); + log.error("{}", e.toString()); } return null; });*/ diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java index 9681c6a..9db675e 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java @@ -83,7 +83,7 @@ public class StateMachine extends BaseStateMachine { private State state = new State(); - private AtomicInteger counter = new AtomicInteger(0); + private AtomicBoolean leader = new AtomicBoolean(false); @@ -150,7 +150,7 @@ public class StateMachine extends BaseStateMachine { // serialize the counter object and write it into the snapshot file try { ObjectOutputStream out = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(snapshotFile))); - out.writeObject(counter); + // out.writeObject(counter); out.close(); } catch (IOException ioe) { LOG.warn("Failed to write snapshot file \"" + snapshotFile @@ -194,9 +194,7 @@ public class StateMachine extends BaseStateMachine { setLastAppliedTermIndex(last); // read, cast and set the counter - counter = JavaUtils.cast(in.readObject()); - } catch (ClassNotFoundException e) { - throw new IllegalStateException(e); + // counter = JavaUtils.cast(in.readObject()); } return last.getIndex(); @@ -206,19 +204,22 @@ public class StateMachine extends BaseStateMachine { public void notifyLeaderChanged(RaftGroupMemberId groupMemberId, RaftPeerId newLeaderId) { log.info("msg {}", groupMemberId.getPeerId()); - if (StateServerFactory.getCurrentPeerId().getRaftPeerId() == null) { - StateServerFactory.getCurrentPeerId().setRaftPeerId(groupMemberId.getPeerId().toString()); + if (StateServerFactory.getPeer().getRaftPeerId() == null) { + var current = StateServerFactory.getPeer(); + current.setRaftPeerId(groupMemberId.getPeerId().toString()); + var raftPeer = StateServerFactory.stateServer.getRaftGroup().getPeer(groupMemberId.getPeerId()); + current.setAddress(raftPeer.getAddress()); } leader.set(newLeaderId == groupMemberId.getPeerId()); - + log.info("newLeaderId: {} groupMemberId: {} leader {}", newLeaderId, groupMemberId.getPeerId(), isLeader()); asyncExecutor.execute(() -> { log.info("asyncExecutor"); - var op = new Operate(OperateType.PUT_WORKERSTATE, new WorkerState(StateServerFactory.getCurrentPeerId())); + var op = new Operate(OperateType.PUT_WORKERSTATE, new WorkerState(StateServerFactory.getPeer())); try { - var reply = StateServerFactory.send(op); + var reply = StateServerFactory.raftSend(op); log.info("{}", MessageUtils.fromMessage(reply.getMessage())); } catch (IOException | ClassNotFoundException e) { log.error("{}",e.toString()); @@ -286,12 +287,13 @@ public class StateMachine extends BaseStateMachine { try { - var data = entry.getStateMachineLogEntry().getLogData(); - final var op = MessageUtils.fromByteString(data); + + final var op = MessageUtils.fromByteString(entry.getStateMachineLogEntry().getLogData()); try (var wlock = writeLock()) { switch (op.getType()) { case ALLOCATE_PACKETS: + var peers = (WorkerState) op.getValue(); break; case PUT_WORKERSTATE: var ws = (WorkerState) op.getValue(); @@ -313,7 +315,7 @@ public class StateMachine extends BaseStateMachine { } } catch (IOException | ClassNotFoundException e) { - log.info("{}", e.toString()); + log.error("{}", e.toString()); return CompletableFuture.completedFuture(Status.setError(reply, "错误op")); } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateServer.java b/src/main/java/com/yuandian/dataflow/statemachine/StateServer.java index b9058e3..1a1ffc4 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateServer.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateServer.java @@ -32,6 +32,7 @@ import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.util.NetUtils; +import com.yuandian.dataflow.statemachine.grpc.ProcessorClient; import com.yuandian.dataflow.statemachine.grpc.ProcessorServer; import com.yuandian.dataflow.statemachine.master.MasterFactory; import com.yuandian.dataflow.statemachine.state.Peer; @@ -70,7 +71,9 @@ public final class StateServer implements Closeable { private RaftClient raftClient ; private final RaftServer raftServer; - private final RaftGroup raftGroupConf; + private final RaftGroup raftGroup; + + private final ProcessorClient processorClient; private final ProcessorServer processorServer; private Peer peer = new Peer() ; @@ -95,25 +98,26 @@ public final class StateServer implements Closeable { //create the counter state machine which hold the counter value StateMachine stateMachine = new StateMachine(); - raftGroupConf = RaftGroup.valueOf( + raftGroup = RaftGroup.valueOf( RaftGroupId.valueOf(CLUSTER_GROUP_ID), peers); //create and start the Raft server this.raftServer = RaftServer.newBuilder() - .setGroup(raftGroupConf) + .setGroup(raftGroup) .setProperties(properties) .setServerId(curpeer.getId()) .setStateMachine(stateMachine) .build(); - raftClient = buildClient(raftGroupConf); + this.raftClient = buildClient(raftGroup); // create RaftClient - + this.processorServer = new ProcessorServer(); this.processorServer.getGrpcServer().start(); - this.peer.setProcessorPort(this.processorServer.getGrpcServer().getPort()); + + this.processorClient = new ProcessorClient(); } // block diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java index d94e2ec..4271ed3 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java @@ -9,6 +9,8 @@ import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftPeer; +import com.yuandian.dataflow.proto.ProcessorServerGrpc.ProcessorServerBlockingStub; +import com.yuandian.dataflow.proto.ProcessorServerGrpc.ProcessorServerFutureStub; import com.yuandian.dataflow.statemachine.state.Peer; public class StateServerFactory { @@ -36,21 +38,29 @@ public class StateServerFactory { } - public static void setCurrentPeerId(Peer peer) { + public static void setPeer(Peer peer) { stateServer.setPeer(peer); } - public static Peer getCurrentPeerId() { + public static Peer getPeer() { return stateServer.getPeer(); } - public static RaftClientReply send(Message msg) throws IOException { + public static RaftClientReply raftSend(Message msg) throws IOException { return stateServer.getRaftClient().io().send(msg); } - public static CompletableFuture asyncSend(Message msg) throws IOException { - + public static CompletableFuture raftSendAsync(Message msg) throws IOException { return stateServer.getRaftClient().async().send(msg); } + + public static ProcessorServerBlockingStub processSend(Peer peer){ + return stateServer.getProcessorClient().getBlockingStub(peer); + } + + public static ProcessorServerFutureStub processSendAsync(Peer peer){ + return stateServer.getProcessorClient().getFutureStub(peer); + } + } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/grpc/ProcessorClient.java b/src/main/java/com/yuandian/dataflow/statemachine/grpc/ProcessorClient.java new file mode 100644 index 0000000..fb90f00 --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/statemachine/grpc/ProcessorClient.java @@ -0,0 +1,67 @@ +package com.yuandian.dataflow.statemachine.grpc; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.thirdparty.io.grpc.internal.ManagedChannelImplBuilder; + +import com.yuandian.dataflow.proto.ProcessorServerGrpc; +import com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest; +import com.yuandian.dataflow.proto.Processor.PacketsProcessorRequestOrBuilder; +import com.yuandian.dataflow.proto.ProcessorServerGrpc.ProcessorServerBlockingStub; +import com.yuandian.dataflow.proto.ProcessorServerGrpc.ProcessorServerFutureStub; +import com.yuandian.dataflow.statemachine.StateServerFactory; +import com.yuandian.dataflow.statemachine.state.Peer; + +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; + +public class ProcessorClient { + + private HashMap peerChannelMap = new HashMap(); + + public ProcessorServerBlockingStub getBlockingStub(Peer peer) { + ManagedChannel channel ; + synchronized(peerChannelMap) { + channel = peerChannelMap.get(peer); + if(channel == null) { + var managedChannelBuilder = ManagedChannelBuilder.forAddress(peer.getAddress(), peer.getProcessorPort()); + channel = managedChannelBuilder.maxInboundMessageSize(Integer.MAX_VALUE).usePlaintext().build(); + peerChannelMap.put(peer, channel); + } + } + return ProcessorServerGrpc.newBlockingStub(channel); + } + + public ProcessorServerFutureStub getFutureStub(Peer peer) { + ManagedChannel channel ; + synchronized(peerChannelMap) { + channel = peerChannelMap.get(peer); + if(channel == null) { + var managedChannelBuilder = ManagedChannelBuilder.forAddress(peer.getAddress(), peer.getProcessorPort()); + channel = managedChannelBuilder.maxInboundMessageSize(Integer.MAX_VALUE).usePlaintext().build(); + peerChannelMap.put(peer, channel); + } + } + + return ProcessorServerGrpc.newFutureStub(channel); + } + + public static void main(String[] args) { + + var managedChannelBuilder = ManagedChannelBuilder.forAddress("10.1.1.117", 60017); + // var managedChannelBuilder = ManagedChannelBuilder.forAddress("127.0.0.1", 60017); + var channel = managedChannelBuilder.maxInboundMessageSize(Integer.MAX_VALUE).usePlaintext().build(); + + var stub = ProcessorServerGrpc.newBlockingStub(channel); + + var request = PacketsProcessorRequest.newBuilder(); + + + + var response = stub.allPackets(request.build()); + } +} diff --git a/src/main/java/com/yuandian/dataflow/statemachine/grpc/ProcessorServer.java b/src/main/java/com/yuandian/dataflow/statemachine/grpc/ProcessorServer.java index cdb24d4..b392251 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/grpc/ProcessorServer.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/grpc/ProcessorServer.java @@ -20,6 +20,7 @@ import com.yuandian.dataflow.statemachine.master.MasterFactory; import com.yuandian.dataflow.utils.Utils; import io.grpc.BindableService; +import io.grpc.ManagedChannelBuilder; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.ServerServiceDefinition; @@ -32,13 +33,14 @@ import lombok.extern.slf4j.Slf4j; @Getter @Setter public class ProcessorServer { - - + private Server grpcServer; - public ProcessorServer() { + + + ServerBuilder builder = ServerBuilder.forPort(0); // 扫描注解RaftProccessor 注册 @@ -62,7 +64,7 @@ public class ProcessorServer { builder.addService( (BindableService)pRaftClass.getDeclaredConstructor().newInstance() ); } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException | NoSuchMethodException | SecurityException e) { - log.info("{}",e.toString()); + log.error("{}", e.toString()); } }); @@ -72,8 +74,7 @@ public class ProcessorServer { MasterFactory.registerMasterLoop(execute); } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException | NoSuchMethodException | SecurityException e) { - log.info("{}", e.toString()); - + log.error("{}", e.toString()); } }); @@ -92,6 +93,9 @@ public class ProcessorServer { log.info("*** server shut down"); } }); + + + } } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/state/Peer.java b/src/main/java/com/yuandian/dataflow/statemachine/state/Peer.java index cc324f9..af2f8cc 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/state/Peer.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/state/Peer.java @@ -25,6 +25,7 @@ public class Peer implements Serializable { } private String raftPeerId; + private String address; private int processorPort; @Override diff --git a/src/main/java/com/yuandian/dataflow/utils/PacketsManager.java b/src/main/java/com/yuandian/dataflow/utils/PacketsManager.java index 932b683..8cbab37 100644 --- a/src/main/java/com/yuandian/dataflow/utils/PacketsManager.java +++ b/src/main/java/com/yuandian/dataflow/utils/PacketsManager.java @@ -92,6 +92,10 @@ public class PacketsManager { return result; } + /** + * 丢弃数据, 保持数据量在一定值 + * @param remainSize 保留范围 + */ public void discardPackets(int remainSize) { this.lockTemp((_packets)->{ var delsize = _packets.size() - remainSize ; diff --git a/src/test/java/com/yuandian/dataflow/MongodbTest.java b/src/test/java/com/yuandian/dataflow/MongodbTest.java index a4530be..ad4394c 100644 --- a/src/test/java/com/yuandian/dataflow/MongodbTest.java +++ b/src/test/java/com/yuandian/dataflow/MongodbTest.java @@ -31,7 +31,7 @@ public class MongodbTest { // System.err.println("insert success"); // } catch (Exception e) { - // log.info("{}", e.toString()); + // log.error("{}", e.toString()); // } // }