双grpc需要一起启动赋值

This commit is contained in:
huangsimin 2022-08-08 17:58:58 +08:00
parent d65f9c9b9d
commit ac2854ee7e
4 changed files with 42 additions and 19 deletions

View File

@ -57,6 +57,9 @@ import java.io.ObjectOutputStream;
import java.nio.charset.Charset;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -94,6 +97,9 @@ public class StateMachine extends BaseStateMachine {
return leader.get();
}
public Executor asyncExecutor = Executors.newFixedThreadPool(2);
/**
* initialize the state machine by initilize the state machine storage and
* calling the load method which reads the last applied command and restore it
@ -194,31 +200,40 @@ public class StateMachine extends BaseStateMachine {
return last.getIndex();
}
@Override
public void notifyLeaderChanged(RaftGroupMemberId groupMemberId, RaftPeerId newLeaderId) {
leader.set(newLeaderId == groupMemberId.getPeerId());
log.info("newLeaderId: {} groupMemberId: {} leader {}", newLeaderId , groupMemberId.getPeerId(), isLeader());
// super.notifyLeaderChanged(groupMemberId, newLeaderId);
asyncExecutor.execute(()->{
var op = new Operate(OperateType.PUT_WORKERSTATE, new WorkerState(StateServerFactory.CurrentPeerId()));
try {
var reply = StateServerFactory.send(op);
log.info("123 {}", reply);
} catch (IOException e) {
e.printStackTrace();
}
});
var op = new Operate(OperateType.PUT_WORKERSTATE, new WorkerState(StateServerFactory.CurrentPeerId()));
try {
var reply = StateServerFactory.send(op);
log.info("{}", reply);
} catch (IOException e) {
e.printStackTrace();
}
if (MasterFactory.getMasterExecute().isAlive())
MasterFactory.getMasterExecute().interrupt();
if(isLeader())
MasterFactory.getMasterExecute().start();
super.notifyLeaderChanged(groupMemberId, newLeaderId);
}

View File

@ -67,8 +67,9 @@ public final class StateServer implements Closeable {
public static HashMap<PeerId, PeerId> activesPeers = new HashMap<>();
private final RaftClient raftClient;
private RaftClient raftClient = null;
private final RaftServer raftServer;
private final RaftGroup raftGroupConf;
private final ProcessorServer processorServer;
private PeerId peer = null;
@ -92,23 +93,23 @@ public final class StateServer implements Closeable {
//create the counter state machine which hold the counter value
StateMachine stateMachine = new StateMachine();
RaftGroup raftGroup = RaftGroup.valueOf(
raftGroupConf = RaftGroup.valueOf(
RaftGroupId.valueOf(CLUSTER_GROUP_ID), peers);
log.info("raftGroup: {}", raftGroup);
log.info("raftGroup: {}", raftGroupConf);
//create and start the Raft server
this.raftServer = RaftServer.newBuilder()
.setGroup(raftGroup)
.setGroup(raftGroupConf)
.setProperties(properties)
.setServerId(curpeer.getId())
.setStateMachine(stateMachine)
.build();
log.info("raftGroup: {}", this.raftServer);
this.raftServer.start();
// create RaftClient
raftClient = buildClient(raftGroup);
this.processorServer = new ProcessorServer();
@ -117,9 +118,10 @@ public final class StateServer implements Closeable {
// block
public void start() throws IOException, InterruptedException {
this.processorServer.getGrpcServer().start();
raftServer.start();
this.processorServer.getGrpcServer().start();
this.peer = new PeerId(raftServer.getPeer(), this.processorServer.getGrpcServer().getPort());
raftClient = buildClient(raftGroupConf);
this.processorServer.getGrpcServer().awaitTermination();
}

View File

@ -3,6 +3,7 @@ package com.yuandian.dataflow.statemachine;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
@ -38,7 +39,11 @@ public class StateServerFactory {
}
public static RaftClientReply send(Message msg) throws IOException {
return stateServer.getRaftClient().io().send(msg);
}
public static CompletableFuture<RaftClientReply> asyncSend(Message msg) throws IOException {
return stateServer.getRaftClient().async().send(msg);
}
}

View File

@ -98,6 +98,7 @@ public final class CounterClient {
latch.countDown();
}
//shutdown the executor service and wait until they finish their work
executorService.shutdown();