From d65f9c9b9dad4707f961025d62620a362ae4f081 Mon Sep 17 00:00:00 2001 From: huangsimin <474420502@qq.com> Date: Mon, 8 Aug 2022 16:21:27 +0800 Subject: [PATCH] =?UTF-8?q?=E5=90=AF=E5=8A=A8=E9=9B=86=E7=BE=A4=E4=BF=A1?= =?UTF-8?q?=E6=81=AF=E9=94=99=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dataflow/statemachine/PeerId.java | 4 ++- .../dataflow/statemachine/StateServer.java | 27 +++++++++++-------- .../statemachine/StateServerFactory.java | 8 +++--- 3 files changed, 23 insertions(+), 16 deletions(-) diff --git a/src/main/java/com/yuandian/dataflow/statemachine/PeerId.java b/src/main/java/com/yuandian/dataflow/statemachine/PeerId.java index 20b188b..43c64da 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/PeerId.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/PeerId.java @@ -4,11 +4,13 @@ import org.apache.ratis.protocol.RaftPeer; import lombok.Getter; import lombok.Setter; +import lombok.ToString; import lombok.extern.slf4j.Slf4j; @Slf4j @Getter @Setter +@ToString public class PeerId { public PeerId(RaftPeer raftPeer, int processorPort) { @@ -21,7 +23,7 @@ public class PeerId { @Override public boolean equals(Object arg0) { - return getRaftPeer().getId().toString() == ((PeerId)arg0).getRaftPeer().getId().toString(); + return getRaftPeer().getId().hashCode() == ((PeerId)arg0).getRaftPeer().getId().hashCode(); } @Override diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateServer.java b/src/main/java/com/yuandian/dataflow/statemachine/StateServer.java index cecd728..d7fae07 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateServer.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateServer.java @@ -38,6 +38,7 @@ import com.yuandian.dataflow.statemachine_old.MasterFactory; import io.netty.util.internal.StringUtil; import lombok.Getter; import lombok.Setter; +import lombok.extern.slf4j.Slf4j; import java.io.Closeable; import java.io.File; @@ -59,6 +60,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; * Run this application three times with three different parameter set-up a * ratis cluster which maintain a counter value replicated in each server memory */ +@Slf4j @Getter @Setter public final class StateServer implements Closeable { @@ -69,7 +71,7 @@ public final class StateServer implements Closeable { private final RaftServer raftServer; private final ProcessorServer processorServer; - private final PeerId peer; + private PeerId peer = null; public static final UUID CLUSTER_GROUP_ID = UUID.fromString("79642d72-6166-742d-6461-7461666c6f77"); @@ -86,15 +88,14 @@ public final class StateServer implements Closeable { //set the port which server listen to in RaftProperty object final int port = NetUtils.createSocketAddr(curpeer.getAddress()).getPort(); GrpcConfigKeys.Server.setPort(properties, port); - + log.info("curpeer: {}", curpeer); //create the counter state machine which hold the counter value + StateMachine stateMachine = new StateMachine(); RaftGroup raftGroup = RaftGroup.valueOf( RaftGroupId.valueOf(CLUSTER_GROUP_ID), peers); - - - - StateMachine stateMachine = new StateMachine(); + + log.info("raftGroup: {}", raftGroup); //create and start the Raft server this.raftServer = RaftServer.newBuilder() .setGroup(raftGroup) @@ -103,18 +104,22 @@ public final class StateServer implements Closeable { .setStateMachine(stateMachine) .build(); - + log.info("raftGroup: {}", this.raftServer); + this.raftServer.start(); // create RaftClient - raftClient = buildClient(peers,raftGroup); + raftClient = buildClient(raftGroup); this.processorServer = new ProcessorServer(); - this.processorServer.getGrpcServer().start(); - this.peer = new PeerId(curpeer, this.processorServer.getGrpcServer().getPort()); + + + } // block public void start() throws IOException, InterruptedException { + this.processorServer.getGrpcServer().start(); raftServer.start(); + this.peer = new PeerId(raftServer.getPeer(), this.processorServer.getGrpcServer().getPort()); this.processorServer.getGrpcServer().awaitTermination(); } @@ -126,7 +131,7 @@ public final class StateServer implements Closeable { } - private static RaftClient buildClient( ArrayList peers, RaftGroup raftGroup) { + private static RaftClient buildClient( RaftGroup raftGroup) { RaftProperties raftProperties = new RaftProperties(); RaftClient.Builder builder = RaftClient.newBuilder() diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java index 9a5cf26..6d398c2 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java @@ -17,16 +17,16 @@ public class StateServerFactory { String[] addresses = new String[]{"localhost:4440","localhost:4441","localhost:4442"}; for (int i = 0; i < addresses.length; i++) { - // var port = addresses[i].split(":")[1]; - peers.add(RaftPeer.newBuilder().setId("yd-" + sid).setAddress(addresses[i]).build()); + var port = addresses[i].split(":")[1]; + peers.add(RaftPeer.newBuilder().setId("yd-" + port).setAddress(addresses[i]).build()); } //find current peer object based on application parameter - final RaftPeer currentPeer = peers.get(Integer.parseInt(sid)); + // final RaftPeer currentPeer = peers.get(Integer.parseInt(sid)); //start a counter server - final StateServer stateServer = new StateServer(currentPeer, peers); + final StateServer stateServer = new StateServer(peers.get(Integer.parseInt(sid)), peers); stateServer.start(); stateServer.close();