This commit is contained in:
huangsimin 2022-08-09 18:04:28 +08:00
parent e063b5471c
commit 73cbcc9a0e
5 changed files with 23 additions and 24 deletions

View File

@ -94,7 +94,7 @@ public class Operate implements Message,Serializable {
var output = ByteString.newOutput(); var output = ByteString.newOutput();
var outputStream = new ObjectOutputStream(output); var outputStream = new ObjectOutputStream(output);
outputStream.writeObject( this); outputStream.writeObject(this);
outputStream.close(); outputStream.close();
output.close(); output.close();

View File

@ -1,5 +1,7 @@
package com.yuandian.dataflow.statemachine; package com.yuandian.dataflow.statemachine;
import java.io.Serializable;
import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.protocol.RaftPeerId;
@ -12,23 +14,22 @@ import lombok.extern.slf4j.Slf4j;
@Getter @Getter
@Setter @Setter
@ToString @ToString
public class Peer { public class Peer implements Serializable {
public Peer(RaftPeerId raftPeer, int processorPort) { public Peer(String raftPeerId) {
this.raftPeerId = raftPeer; this.raftPeerId = raftPeerId;
this.processorPort = processorPort;
} }
public Peer() { public Peer() {
} }
private RaftPeerId raftPeerId; private String raftPeerId;
private int processorPort; private int processorPort;
@Override @Override
public boolean equals(Object arg0) { public boolean equals(Object other) {
return getRaftPeerId().hashCode() == ((Peer)arg0).getRaftPeerId().hashCode(); return getRaftPeerId().hashCode() == ((Peer)other).getRaftPeerId().hashCode();
} }
@Override @Override

View File

@ -99,7 +99,7 @@ public class StateMachine extends BaseStateMachine {
} }
public Executor asyncExecutor = Executors.newFixedThreadPool(2); public Executor asyncExecutor = Executors.newFixedThreadPool(8);
/** /**
* initialize the state machine by initilize the state machine storage and * initialize the state machine by initilize the state machine storage and
@ -206,10 +206,12 @@ public class StateMachine extends BaseStateMachine {
@Override @Override
public void notifyLeaderChanged(RaftGroupMemberId groupMemberId, RaftPeerId newLeaderId) { public void notifyLeaderChanged(RaftGroupMemberId groupMemberId, RaftPeerId newLeaderId) {
log.info("msg {}", groupMemberId.getPeerId()); log.info("msg {}", groupMemberId.getPeerId());
StateServerFactory.setCurrentPeerId( new Peer(groupMemberId.getPeerId(), StateServerFactory.stateServer.getProcessorServer().getGrpcServer().getPort()) ); StateServerFactory.getCurrentPeerId().setRaftPeerId(groupMemberId.getPeerId().toString());
log.info("msg {}", groupMemberId.getPeerId());
leader.set(newLeaderId == groupMemberId.getPeerId()); leader.set(newLeaderId == groupMemberId.getPeerId());
log.info("newLeaderId: {} groupMemberId: {} leader {}", newLeaderId , groupMemberId.getPeerId(), isLeader()); log.info("newLeaderId: {} groupMemberId: {} leader {}", newLeaderId , groupMemberId.getPeerId(), isLeader());
// super.notifyLeaderChanged(groupMemberId, newLeaderId); // super.notifyLeaderChanged(groupMemberId, newLeaderId);
@ -218,7 +220,7 @@ public class StateMachine extends BaseStateMachine {
var op = new Operate(OperateType.PUT_WORKERSTATE, new WorkerState(StateServerFactory.getCurrentPeerId() )); var op = new Operate(OperateType.PUT_WORKERSTATE, new WorkerState(StateServerFactory.getCurrentPeerId() ));
try { try {
var reply = StateServerFactory.send(op); var reply = StateServerFactory.send(op);
log.info("123 {}", reply); log.info("{}", reply);
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
} }
@ -354,7 +356,7 @@ public class StateMachine extends BaseStateMachine {
//if leader, log the incremented value and it's log index //if leader, log the incremented value and it's log index
if (isLeader()) { if (isLeader()) {
log.info("{}: getType {}", index, op.getType()); log.info("{}: getType {}, state {}", index, op.getType(), state);
} }
// log.info("applyTransaction {}", 6); // log.info("applyTransaction {}", 6);

View File

@ -67,13 +67,13 @@ public final class StateServer implements Closeable {
public static HashMap<Peer, Peer> activesPeers = new HashMap<>(); public static HashMap<Peer, Peer> activesPeers = new HashMap<>();
private RaftClient raftClient = null; private RaftClient raftClient ;
private final RaftServer raftServer; private final RaftServer raftServer;
private final RaftGroup raftGroupConf; private final RaftGroup raftGroupConf;
private final ProcessorServer processorServer; private final ProcessorServer processorServer;
private Peer peer ; private Peer peer = new Peer() ;
private int grpcPort;
public static final UUID CLUSTER_GROUP_ID = UUID.fromString("79642d72-6166-742d-6461-7461666c6f77"); public static final UUID CLUSTER_GROUP_ID = UUID.fromString("79642d72-6166-742d-6461-7461666c6f77");
@ -106,21 +106,17 @@ public final class StateServer implements Closeable {
.setStateMachine(stateMachine) .setStateMachine(stateMachine)
.build(); .build();
raftClient = buildClient(raftGroupConf);
// create RaftClient // create RaftClient
this.processorServer = new ProcessorServer(); this.processorServer = new ProcessorServer();
this.processorServer.getGrpcServer().start(); this.processorServer.getGrpcServer().start();
this.grpcPort = this.processorServer.getGrpcServer().getPort(); this.peer.setProcessorPort(this.processorServer.getGrpcServer().getPort());
} }
// block // block
public void start() throws IOException, InterruptedException { public void start() throws IOException, InterruptedException {
raftServer.start(); raftServer.start();
this.processorServer.getGrpcServer().awaitTermination(); this.processorServer.getGrpcServer().awaitTermination();
} }

View File

@ -27,7 +27,7 @@ public class StateServerFactory {
//start a counter server //start a counter server
final StateServer stateServer = new StateServer(peers.get(Integer.parseInt(sid)), peers); stateServer = new StateServer(peers.get(Integer.parseInt(sid)), peers);
stateServer.start(); stateServer.start();
stateServer.close(); stateServer.close();