From e063b5471c19b21cd2a29141f462dc4fe6a5ef78 Mon Sep 17 00:00:00 2001 From: huangsimin Date: Tue, 9 Aug 2022 00:24:07 +0800 Subject: [PATCH] =?UTF-8?q?=E9=9C=80=E8=A6=81=E5=8A=A0=E9=94=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../statemachine/{PeerId.java => Peer.java} | 17 +++++++++------ .../dataflow/statemachine/StateMachine.java | 13 +++++++----- .../dataflow/statemachine/StateServer.java | 21 +++++++++---------- .../statemachine/StateServerFactory.java | 7 ++++++- .../dataflow/statemachine/state/State.java | 4 ++-- .../statemachine/state/WorkerState.java | 6 +++--- 6 files changed, 40 insertions(+), 28 deletions(-) rename src/main/java/com/yuandian/dataflow/statemachine/{PeerId.java => Peer.java} (54%) diff --git a/src/main/java/com/yuandian/dataflow/statemachine/PeerId.java b/src/main/java/com/yuandian/dataflow/statemachine/Peer.java similarity index 54% rename from src/main/java/com/yuandian/dataflow/statemachine/PeerId.java rename to src/main/java/com/yuandian/dataflow/statemachine/Peer.java index 43c64da..cfa7de9 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/PeerId.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/Peer.java @@ -1,6 +1,7 @@ package com.yuandian.dataflow.statemachine; import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; import lombok.Getter; import lombok.Setter; @@ -11,24 +12,28 @@ import lombok.extern.slf4j.Slf4j; @Getter @Setter @ToString -public class PeerId { +public class Peer { - public PeerId(RaftPeer raftPeer, int processorPort) { - this.raftPeer = raftPeer; + public Peer(RaftPeerId raftPeer, int processorPort) { + this.raftPeerId = raftPeer; this.processorPort = processorPort; } - private RaftPeer raftPeer; + public Peer() { + + } + + private RaftPeerId raftPeerId; private int processorPort; @Override public boolean equals(Object arg0) { - return getRaftPeer().getId().hashCode() == ((PeerId)arg0).getRaftPeer().getId().hashCode(); + return getRaftPeerId().hashCode() == ((Peer)arg0).getRaftPeerId().hashCode(); } @Override public int hashCode() { - return getRaftPeer().getId().hashCode() ; + return getRaftPeerId().hashCode() ; } } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java index 2e8985e..cc8393b 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java @@ -20,6 +20,7 @@ package com.yuandian.dataflow.statemachine; import org.apache.http.entity.InputStreamEntity; import org.apache.ratis.proto.RaftProtos; +import org.apache.ratis.proto.RaftProtos.RaftConfigurationProto; import org.apache.ratis.proto.RaftProtos.RoleInfoProto; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftGroupId; @@ -201,20 +202,20 @@ public class StateMachine extends BaseStateMachine { return last.getIndex(); } - @Override public void notifyLeaderChanged(RaftGroupMemberId groupMemberId, RaftPeerId newLeaderId) { - - - + log.info("msg {}", groupMemberId.getPeerId()); + StateServerFactory.setCurrentPeerId( new Peer(groupMemberId.getPeerId(), StateServerFactory.stateServer.getProcessorServer().getGrpcServer().getPort()) ); + log.info("msg {}", groupMemberId.getPeerId()); leader.set(newLeaderId == groupMemberId.getPeerId()); log.info("newLeaderId: {} groupMemberId: {} leader {}", newLeaderId , groupMemberId.getPeerId(), isLeader()); // super.notifyLeaderChanged(groupMemberId, newLeaderId); asyncExecutor.execute(()->{ - var op = new Operate(OperateType.PUT_WORKERSTATE, new WorkerState(StateServerFactory.CurrentPeerId())); + log.info("asyncExecutor"); + var op = new Operate(OperateType.PUT_WORKERSTATE, new WorkerState(StateServerFactory.getCurrentPeerId() )); try { var reply = StateServerFactory.send(op); log.info("123 {}", reply); @@ -224,6 +225,8 @@ public class StateMachine extends BaseStateMachine { }); + + diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateServer.java b/src/main/java/com/yuandian/dataflow/statemachine/StateServer.java index c2c6ce7..af09696 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateServer.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateServer.java @@ -65,14 +65,15 @@ import static java.nio.charset.StandardCharsets.UTF_8; @Setter public final class StateServer implements Closeable { - public static HashMap activesPeers = new HashMap<>(); + public static HashMap activesPeers = new HashMap<>(); private RaftClient raftClient = null; private final RaftServer raftServer; private final RaftGroup raftGroupConf; private final ProcessorServer processorServer; - private PeerId peer = null; + private Peer peer ; + private int grpcPort; public static final UUID CLUSTER_GROUP_ID = UUID.fromString("79642d72-6166-742d-6461-7461666c6f77"); @@ -89,14 +90,14 @@ public final class StateServer implements Closeable { //set the port which server listen to in RaftProperty object final int port = NetUtils.createSocketAddr(curpeer.getAddress()).getPort(); GrpcConfigKeys.Server.setPort(properties, port); - log.info("curpeer: {}", curpeer); + //create the counter state machine which hold the counter value StateMachine stateMachine = new StateMachine(); raftGroupConf = RaftGroup.valueOf( RaftGroupId.valueOf(CLUSTER_GROUP_ID), peers); - log.info("raftGroup: {}", raftGroupConf); + //create and start the Raft server this.raftServer = RaftServer.newBuilder() .setGroup(raftGroupConf) @@ -104,24 +105,22 @@ public final class StateServer implements Closeable { .setServerId(curpeer.getId()) .setStateMachine(stateMachine) .build(); - - log.info("raftGroup: {}", this.raftServer); + // create RaftClient this.processorServer = new ProcessorServer(); - - + this.processorServer.getGrpcServer().start(); + this.grpcPort = this.processorServer.getGrpcServer().getPort(); } // block public void start() throws IOException, InterruptedException { + raftServer.start(); - this.processorServer.getGrpcServer().start(); - this.peer = new PeerId(raftServer.getPeer(), this.processorServer.getGrpcServer().getPort()); - raftClient = buildClient(raftGroupConf); + this.processorServer.getGrpcServer().awaitTermination(); } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java index 8d6a105..b8c247c 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java @@ -34,7 +34,12 @@ public class StateServerFactory { } - public static PeerId CurrentPeerId() { + public static void setCurrentPeerId(Peer peer) { + stateServer.setPeer(peer); + } + + + public static Peer getCurrentPeerId() { return stateServer.getPeer(); } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/state/State.java b/src/main/java/com/yuandian/dataflow/statemachine/state/State.java index 2449ab8..0b41804 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/state/State.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/state/State.java @@ -21,7 +21,7 @@ import java.lang.reflect.Method; import java.lang.reflect.Modifier; import java.util.HashMap; -import com.yuandian.dataflow.statemachine.PeerId; +import com.yuandian.dataflow.statemachine.Peer; /** * 代表任务状态 暂时全局使用这个结构. 添加新增状态 @@ -37,5 +37,5 @@ public class State implements Serializable { private static final long serialVersionUID = -1L; - private HashMap workers = new HashMap<>(); + private HashMap workers = new HashMap<>(); } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/state/WorkerState.java b/src/main/java/com/yuandian/dataflow/statemachine/state/WorkerState.java index e4869ff..249bda1 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/state/WorkerState.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/state/WorkerState.java @@ -9,7 +9,7 @@ package com.yuandian.dataflow.statemachine.state; import java.io.Serializable; import java.time.Instant; -import com.yuandian.dataflow.statemachine.PeerId; +import com.yuandian.dataflow.statemachine.Peer; import lombok.Getter; import lombok.Setter; @@ -31,7 +31,7 @@ public class WorkerState implements Serializable { /** * 节点的对应peerID */ - public PeerId peerId; + public Peer peerId; /** * 任务队列的数量 */ @@ -46,7 +46,7 @@ public class WorkerState implements Serializable { * 初始化 并构造 updateAt时间 * @param peer 传入当前服务的peer */ - public WorkerState(PeerId peer) { + public WorkerState(Peer peer) { this.peerId = peer; this.updateAt = Instant.now(); }