启动集群信息错误

This commit is contained in:
huangsimin 2022-08-08 16:21:27 +08:00
parent 32debdd367
commit d65f9c9b9d
3 changed files with 23 additions and 16 deletions

View File

@ -4,11 +4,13 @@ import org.apache.ratis.protocol.RaftPeer;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@Slf4j @Slf4j
@Getter @Getter
@Setter @Setter
@ToString
public class PeerId { public class PeerId {
public PeerId(RaftPeer raftPeer, int processorPort) { public PeerId(RaftPeer raftPeer, int processorPort) {
@ -21,7 +23,7 @@ public class PeerId {
@Override @Override
public boolean equals(Object arg0) { public boolean equals(Object arg0) {
return getRaftPeer().getId().toString() == ((PeerId)arg0).getRaftPeer().getId().toString(); return getRaftPeer().getId().hashCode() == ((PeerId)arg0).getRaftPeer().getId().hashCode();
} }
@Override @Override

View File

@ -38,6 +38,7 @@ import com.yuandian.dataflow.statemachine_old.MasterFactory;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import java.io.Closeable; import java.io.Closeable;
import java.io.File; import java.io.File;
@ -59,6 +60,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
* Run this application three times with three different parameter set-up a * Run this application three times with three different parameter set-up a
* ratis cluster which maintain a counter value replicated in each server memory * ratis cluster which maintain a counter value replicated in each server memory
*/ */
@Slf4j
@Getter @Getter
@Setter @Setter
public final class StateServer implements Closeable { public final class StateServer implements Closeable {
@ -69,7 +71,7 @@ public final class StateServer implements Closeable {
private final RaftServer raftServer; private final RaftServer raftServer;
private final ProcessorServer processorServer; private final ProcessorServer processorServer;
private final PeerId peer; private PeerId peer = null;
public static final UUID CLUSTER_GROUP_ID = UUID.fromString("79642d72-6166-742d-6461-7461666c6f77"); public static final UUID CLUSTER_GROUP_ID = UUID.fromString("79642d72-6166-742d-6461-7461666c6f77");
@ -86,15 +88,14 @@ public final class StateServer implements Closeable {
//set the port which server listen to in RaftProperty object //set the port which server listen to in RaftProperty object
final int port = NetUtils.createSocketAddr(curpeer.getAddress()).getPort(); final int port = NetUtils.createSocketAddr(curpeer.getAddress()).getPort();
GrpcConfigKeys.Server.setPort(properties, port); GrpcConfigKeys.Server.setPort(properties, port);
log.info("curpeer: {}", curpeer);
//create the counter state machine which hold the counter value //create the counter state machine which hold the counter value
StateMachine stateMachine = new StateMachine();
RaftGroup raftGroup = RaftGroup.valueOf( RaftGroup raftGroup = RaftGroup.valueOf(
RaftGroupId.valueOf(CLUSTER_GROUP_ID), peers); RaftGroupId.valueOf(CLUSTER_GROUP_ID), peers);
log.info("raftGroup: {}", raftGroup);
StateMachine stateMachine = new StateMachine();
//create and start the Raft server //create and start the Raft server
this.raftServer = RaftServer.newBuilder() this.raftServer = RaftServer.newBuilder()
.setGroup(raftGroup) .setGroup(raftGroup)
@ -103,18 +104,22 @@ public final class StateServer implements Closeable {
.setStateMachine(stateMachine) .setStateMachine(stateMachine)
.build(); .build();
log.info("raftGroup: {}", this.raftServer);
this.raftServer.start();
// create RaftClient // create RaftClient
raftClient = buildClient(peers,raftGroup); raftClient = buildClient(raftGroup);
this.processorServer = new ProcessorServer(); this.processorServer = new ProcessorServer();
this.processorServer.getGrpcServer().start();
this.peer = new PeerId(curpeer, this.processorServer.getGrpcServer().getPort());
} }
// block // block
public void start() throws IOException, InterruptedException { public void start() throws IOException, InterruptedException {
this.processorServer.getGrpcServer().start();
raftServer.start(); raftServer.start();
this.peer = new PeerId(raftServer.getPeer(), this.processorServer.getGrpcServer().getPort());
this.processorServer.getGrpcServer().awaitTermination(); this.processorServer.getGrpcServer().awaitTermination();
} }
@ -126,7 +131,7 @@ public final class StateServer implements Closeable {
} }
private static RaftClient buildClient( ArrayList<RaftPeer> peers, RaftGroup raftGroup) { private static RaftClient buildClient( RaftGroup raftGroup) {
RaftProperties raftProperties = new RaftProperties(); RaftProperties raftProperties = new RaftProperties();
RaftClient.Builder builder = RaftClient.newBuilder() RaftClient.Builder builder = RaftClient.newBuilder()

View File

@ -17,16 +17,16 @@ public class StateServerFactory {
String[] addresses = new String[]{"localhost:4440","localhost:4441","localhost:4442"}; String[] addresses = new String[]{"localhost:4440","localhost:4441","localhost:4442"};
for (int i = 0; i < addresses.length; i++) { for (int i = 0; i < addresses.length; i++) {
// var port = addresses[i].split(":")[1]; var port = addresses[i].split(":")[1];
peers.add(RaftPeer.newBuilder().setId("yd-" + sid).setAddress(addresses[i]).build()); peers.add(RaftPeer.newBuilder().setId("yd-" + port).setAddress(addresses[i]).build());
} }
//find current peer object based on application parameter //find current peer object based on application parameter
final RaftPeer currentPeer = peers.get(Integer.parseInt(sid)); // final RaftPeer currentPeer = peers.get(Integer.parseInt(sid));
//start a counter server //start a counter server
final StateServer stateServer = new StateServer(currentPeer, peers); final StateServer stateServer = new StateServer(peers.get(Integer.parseInt(sid)), peers);
stateServer.start(); stateServer.start();
stateServer.close(); stateServer.close();