From 73cbcc9a0e3ee45b7f2a23430832de0088137837 Mon Sep 17 00:00:00 2001 From: huangsimin <474420502@qq.com> Date: Tue, 9 Aug 2022 18:04:28 +0800 Subject: [PATCH] TODO: --- .../yuandian/dataflow/statemachine/Operate.java | 2 +- .../com/yuandian/dataflow/statemachine/Peer.java | 15 ++++++++------- .../dataflow/statemachine/StateMachine.java | 12 +++++++----- .../dataflow/statemachine/StateServer.java | 16 ++++++---------- .../statemachine/StateServerFactory.java | 2 +- 5 files changed, 23 insertions(+), 24 deletions(-) diff --git a/src/main/java/com/yuandian/dataflow/statemachine/Operate.java b/src/main/java/com/yuandian/dataflow/statemachine/Operate.java index feed7a4..cf42896 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/Operate.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/Operate.java @@ -94,7 +94,7 @@ public class Operate implements Message,Serializable { var output = ByteString.newOutput(); var outputStream = new ObjectOutputStream(output); - outputStream.writeObject( this); + outputStream.writeObject(this); outputStream.close(); output.close(); diff --git a/src/main/java/com/yuandian/dataflow/statemachine/Peer.java b/src/main/java/com/yuandian/dataflow/statemachine/Peer.java index cfa7de9..c399ac4 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/Peer.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/Peer.java @@ -1,5 +1,7 @@ package com.yuandian.dataflow.statemachine; +import java.io.Serializable; + import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; @@ -12,23 +14,22 @@ import lombok.extern.slf4j.Slf4j; @Getter @Setter @ToString -public class Peer { +public class Peer implements Serializable { - public Peer(RaftPeerId raftPeer, int processorPort) { - this.raftPeerId = raftPeer; - this.processorPort = processorPort; + public Peer(String raftPeerId) { + this.raftPeerId = raftPeerId; } public Peer() { } - private RaftPeerId raftPeerId; + private String raftPeerId; private int processorPort; @Override - public boolean equals(Object arg0) { - return getRaftPeerId().hashCode() == ((Peer)arg0).getRaftPeerId().hashCode(); + public boolean equals(Object other) { + return getRaftPeerId().hashCode() == ((Peer)other).getRaftPeerId().hashCode(); } @Override diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java index cc8393b..179a90c 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java @@ -99,7 +99,7 @@ public class StateMachine extends BaseStateMachine { } - public Executor asyncExecutor = Executors.newFixedThreadPool(2); + public Executor asyncExecutor = Executors.newFixedThreadPool(8); /** * initialize the state machine by initilize the state machine storage and @@ -206,10 +206,12 @@ public class StateMachine extends BaseStateMachine { @Override public void notifyLeaderChanged(RaftGroupMemberId groupMemberId, RaftPeerId newLeaderId) { + log.info("msg {}", groupMemberId.getPeerId()); - StateServerFactory.setCurrentPeerId( new Peer(groupMemberId.getPeerId(), StateServerFactory.stateServer.getProcessorServer().getGrpcServer().getPort()) ); - log.info("msg {}", groupMemberId.getPeerId()); + StateServerFactory.getCurrentPeerId().setRaftPeerId(groupMemberId.getPeerId().toString()); + leader.set(newLeaderId == groupMemberId.getPeerId()); + log.info("newLeaderId: {} groupMemberId: {} leader {}", newLeaderId , groupMemberId.getPeerId(), isLeader()); // super.notifyLeaderChanged(groupMemberId, newLeaderId); @@ -218,7 +220,7 @@ public class StateMachine extends BaseStateMachine { var op = new Operate(OperateType.PUT_WORKERSTATE, new WorkerState(StateServerFactory.getCurrentPeerId() )); try { var reply = StateServerFactory.send(op); - log.info("123 {}", reply); + log.info("{}", reply); } catch (IOException e) { e.printStackTrace(); } @@ -354,7 +356,7 @@ public class StateMachine extends BaseStateMachine { //if leader, log the incremented value and it's log index if (isLeader()) { - log.info("{}: getType {}", index, op.getType()); + log.info("{}: getType {}, state {}", index, op.getType(), state); } // log.info("applyTransaction {}", 6); diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateServer.java b/src/main/java/com/yuandian/dataflow/statemachine/StateServer.java index af09696..728a28c 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateServer.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateServer.java @@ -67,13 +67,13 @@ public final class StateServer implements Closeable { public static HashMap activesPeers = new HashMap<>(); - private RaftClient raftClient = null; + private RaftClient raftClient ; private final RaftServer raftServer; private final RaftGroup raftGroupConf; private final ProcessorServer processorServer; - private Peer peer ; - private int grpcPort; + private Peer peer = new Peer() ; + public static final UUID CLUSTER_GROUP_ID = UUID.fromString("79642d72-6166-742d-6461-7461666c6f77"); @@ -106,21 +106,17 @@ public final class StateServer implements Closeable { .setStateMachine(stateMachine) .build(); - + raftClient = buildClient(raftGroupConf); // create RaftClient - - + this.processorServer = new ProcessorServer(); this.processorServer.getGrpcServer().start(); - this.grpcPort = this.processorServer.getGrpcServer().getPort(); - + this.peer.setProcessorPort(this.processorServer.getGrpcServer().getPort()); } // block public void start() throws IOException, InterruptedException { - raftServer.start(); - this.processorServer.getGrpcServer().awaitTermination(); } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java index b8c247c..9ec41bc 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java @@ -27,7 +27,7 @@ public class StateServerFactory { //start a counter server - final StateServer stateServer = new StateServer(peers.get(Integer.parseInt(sid)), peers); + stateServer = new StateServer(peers.get(Integer.parseInt(sid)), peers); stateServer.start(); stateServer.close();