ratis grpc版本
This commit is contained in:
parent
f54b3257d7
commit
cf4ff63b31
13
.vscode/launch.json
vendored
13
.vscode/launch.json
vendored
|
@ -4,6 +4,13 @@
|
|||
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
|
||||
"version": "0.2.0",
|
||||
"configurations": [
|
||||
{
|
||||
"type": "java",
|
||||
"name": "Launch CounterClient",
|
||||
"request": "launch",
|
||||
"mainClass": "com.yuandian.dataflow.statemachine.client.CounterClient",
|
||||
"projectName": "dataflow"
|
||||
},
|
||||
{
|
||||
"type": "java",
|
||||
"name": "Launch Utils",
|
||||
|
@ -15,7 +22,7 @@
|
|||
"type": "java",
|
||||
"name": "Raft-0",
|
||||
"request": "launch",
|
||||
"mainClass": "com.yuandian.dataflow.Server",
|
||||
"mainClass": "com.yuandian.dataflow.statemachine.CounterServer",
|
||||
"projectName": "dataflow",
|
||||
"console": "integratedTerminal",
|
||||
"args": [
|
||||
|
@ -33,7 +40,7 @@
|
|||
"type": "java",
|
||||
"name": "Raft-1",
|
||||
"request": "launch",
|
||||
"mainClass": "com.yuandian.dataflow.Server",
|
||||
"mainClass": "com.yuandian.dataflow.statemachine.CounterServer",
|
||||
"projectName": "dataflow",
|
||||
"console": "integratedTerminal",
|
||||
"args": [
|
||||
|
@ -51,7 +58,7 @@
|
|||
"type": "java",
|
||||
"name": "Raft-2",
|
||||
"request": "launch",
|
||||
"mainClass": "com.yuandian.dataflow.Server",
|
||||
"mainClass": "com.yuandian.dataflow.statemachine.CounterServer",
|
||||
"projectName": "dataflow",
|
||||
"console": "integratedTerminal",
|
||||
"args": [
|
||||
|
|
53
pom.xml
53
pom.xml
|
@ -75,6 +75,45 @@
|
|||
<version>${snakeyaml.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.ratis</groupId>
|
||||
<artifactId>ratis-common</artifactId>
|
||||
<version>2.3.0</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.ratis</groupId>
|
||||
<artifactId>ratis-grpc</artifactId>
|
||||
<version>2.3.0</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.ratis</groupId>
|
||||
<artifactId>ratis-server</artifactId>
|
||||
<version>2.3.0</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.ratis</groupId>
|
||||
<artifactId>ratis-thirdparty-misc</artifactId>
|
||||
<version>1.0.1</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.ratis</groupId>
|
||||
<artifactId>ratis-proto</artifactId>
|
||||
<version>2.3.0</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.ratis</groupId>
|
||||
<artifactId>ratis-tools</artifactId>
|
||||
<version>2.3.0</version>
|
||||
</dependency>
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
<dependency>
|
||||
<groupId>com.alipay.sofa</groupId>
|
||||
|
@ -106,6 +145,16 @@
|
|||
<version>${spring.boot.version}</version>
|
||||
</dependency>
|
||||
|
||||
|
||||
<!-- https://mvnrepository.com/artifact/io.dropwizard.metrics/metrics-core -->
|
||||
<dependency>
|
||||
<groupId>io.dropwizard.metrics</groupId>
|
||||
<artifactId>metrics-core</artifactId>
|
||||
<version>3.2.6</version>
|
||||
</dependency>
|
||||
|
||||
|
||||
|
||||
|
||||
<!-- protobuf 依赖 -->
|
||||
|
||||
|
@ -251,7 +300,7 @@
|
|||
<manifest>
|
||||
<addClasspath>true</addClasspath>
|
||||
<classpathPrefix>lib/</classpathPrefix>
|
||||
<mainClass>com.yuandian.dataflow.Server</mainClass>
|
||||
<mainClass>com.yuandian.dataflow.statemachine.CounterServer</mainClass>
|
||||
</manifest>
|
||||
</archive>
|
||||
|
||||
|
@ -295,7 +344,7 @@
|
|||
<appendAssemblyId>false</appendAssemblyId>
|
||||
<archive>
|
||||
<manifest>
|
||||
<mainClass>com.yuandian.dataflow.Server</mainClass>
|
||||
<mainClass>com.yuandian.dataflow.statemachine.CounterServer</mainClass>
|
||||
</manifest>
|
||||
</archive>
|
||||
<descriptors>
|
||||
|
|
|
@ -9,7 +9,7 @@ import org.slf4j.MarkerFactory;
|
|||
|
||||
import com.alipay.sofa.jraft.JRaftUtils;
|
||||
import com.alipay.sofa.jraft.conf.Configuration;
|
||||
import com.yuandian.dataflow.statemachine.StateFactory;
|
||||
import com.yuandian.dataflow.statemachine_old.StateFactory;
|
||||
import com.yuandian.dataflow.utils.Utils;
|
||||
|
||||
import io.netty.util.internal.StringUtil;
|
||||
|
|
|
@ -16,15 +16,15 @@ import com.alipay.sofa.jraft.rpc.InvokeCallback;
|
|||
import com.google.protobuf.Any;
|
||||
import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest;
|
||||
import com.yuandian.dataflow.proto.msgtype.BacktrackingFlowOuterClass;
|
||||
import com.yuandian.dataflow.statemachine.StateFactory;
|
||||
import com.yuandian.dataflow.statemachine.annotations.MasterRegister;
|
||||
import com.yuandian.dataflow.statemachine.closure.GenericClosure;
|
||||
import com.yuandian.dataflow.statemachine.master.MasterContext;
|
||||
import com.yuandian.dataflow.statemachine.master.MasterExecute;
|
||||
import com.yuandian.dataflow.statemachine.operate.Operate;
|
||||
import com.yuandian.dataflow.statemachine.operate.Operate.OperateType;
|
||||
import com.yuandian.dataflow.statemachine.state.State;
|
||||
import com.yuandian.dataflow.statemachine.state.WorkerState;
|
||||
import com.yuandian.dataflow.statemachine_old.StateFactory;
|
||||
import com.yuandian.dataflow.statemachine_old.annotations.MasterRegister;
|
||||
import com.yuandian.dataflow.statemachine_old.closure.GenericClosure;
|
||||
import com.yuandian.dataflow.statemachine_old.master.MasterContext;
|
||||
import com.yuandian.dataflow.statemachine_old.master.MasterExecute;
|
||||
import com.yuandian.dataflow.statemachine_old.operate.Operate;
|
||||
import com.yuandian.dataflow.statemachine_old.operate.Operate.OperateType;
|
||||
import com.yuandian.dataflow.statemachine_old.state.State;
|
||||
import com.yuandian.dataflow.statemachine_old.state.WorkerState;
|
||||
import com.yuandian.dataflow.utils.PacketsManager;
|
||||
import com.yuandian.dataflow.utils.Utils;
|
||||
|
||||
|
|
|
@ -17,13 +17,13 @@ import com.alipay.sofa.jraft.Status;
|
|||
import com.alipay.sofa.jraft.rpc.RpcContext;
|
||||
import com.alipay.sofa.jraft.rpc.RpcProcessor;
|
||||
import com.google.protobuf.Any;
|
||||
import com.yuandian.dataflow.statemachine.StateFactory;
|
||||
import com.yuandian.dataflow.statemachine.annotations.WorkerRegister;
|
||||
import com.yuandian.dataflow.statemachine.closure.GenericClosure;
|
||||
import com.yuandian.dataflow.statemachine.operate.Operate;
|
||||
import com.yuandian.dataflow.statemachine.operate.Operate.OperateType;
|
||||
import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
|
||||
import com.yuandian.dataflow.statemachine.state.State;
|
||||
import com.yuandian.dataflow.statemachine_old.StateFactory;
|
||||
import com.yuandian.dataflow.statemachine_old.annotations.WorkerRegister;
|
||||
import com.yuandian.dataflow.statemachine_old.closure.GenericClosure;
|
||||
import com.yuandian.dataflow.statemachine_old.operate.Operate;
|
||||
import com.yuandian.dataflow.statemachine_old.operate.Operate.OperateType;
|
||||
import com.yuandian.dataflow.statemachine_old.rpc.RaftResponse;
|
||||
import com.yuandian.dataflow.statemachine_old.state.State;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
|
|
|
@ -6,8 +6,8 @@ import com.alipay.sofa.jraft.Status;
|
|||
import com.alipay.sofa.jraft.entity.PeerId;
|
||||
import com.alipay.sofa.jraft.rpc.RpcContext;
|
||||
import com.alipay.sofa.jraft.rpc.RpcProcessor;
|
||||
import com.yuandian.dataflow.statemachine.StateFactory;
|
||||
import com.yuandian.dataflow.statemachine.annotations.WorkerRegister;
|
||||
import com.yuandian.dataflow.statemachine_old.StateFactory;
|
||||
import com.yuandian.dataflow.statemachine_old.annotations.WorkerRegister;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
|
|
|
@ -0,0 +1,124 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.yuandian.dataflow.statemachine;
|
||||
|
||||
import org.apache.ratis.conf.RaftProperties;
|
||||
// import org.apache.ratis.examples.common.Constants;
|
||||
import org.apache.ratis.grpc.GrpcConfigKeys;
|
||||
import org.apache.ratis.protocol.RaftGroup;
|
||||
import org.apache.ratis.protocol.RaftGroupId;
|
||||
import org.apache.ratis.protocol.RaftPeer;
|
||||
import org.apache.ratis.server.RaftServer;
|
||||
import org.apache.ratis.server.RaftServerConfigKeys;
|
||||
import org.apache.ratis.util.NetUtils;
|
||||
|
||||
import io.netty.util.internal.StringUtil;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Scanner;
|
||||
import java.util.UUID;
|
||||
|
||||
import static java.nio.charset.StandardCharsets.UTF_8;
|
||||
|
||||
/**
|
||||
* Simplest Ratis server, use a simple state machine {@link CounterStateMachine}
|
||||
* which maintain a counter across multi server.
|
||||
* This server application designed to run several times with different
|
||||
* parameters (1,2 or 3). server addresses hard coded in {@link Constants}
|
||||
* <p>
|
||||
* Run this application three times with three different parameter set-up a
|
||||
* ratis cluster which maintain a counter value replicated in each server memory
|
||||
*/
|
||||
public final class CounterServer implements Closeable {
|
||||
private final RaftServer server;
|
||||
|
||||
public static final UUID CLUSTER_GROUP_ID = UUID.fromString("79642d72-6166-742d-6461-7461666c6f77");
|
||||
|
||||
public CounterServer(RaftPeer peer, ArrayList<RaftPeer> peers, File storageDir) throws IOException {
|
||||
//create a property object
|
||||
RaftProperties properties = new RaftProperties();
|
||||
|
||||
//set the storage directory (different for each peer) in RaftProperty object
|
||||
RaftServerConfigKeys.setStorageDir(properties, Collections.singletonList(storageDir));
|
||||
|
||||
//set the port which server listen to in RaftProperty object
|
||||
final int port = NetUtils.createSocketAddr(peer.getAddress()).getPort();
|
||||
GrpcConfigKeys.Server.setPort(properties, port);
|
||||
|
||||
//create the counter state machine which hold the counter value
|
||||
CounterStateMachine counterStateMachine = new CounterStateMachine();
|
||||
|
||||
RaftGroup raftGroup = RaftGroup.valueOf(
|
||||
RaftGroupId.valueOf(CLUSTER_GROUP_ID), peers);
|
||||
|
||||
//create and start the Raft server
|
||||
this.server = RaftServer.newBuilder()
|
||||
.setGroup(raftGroup)
|
||||
.setProperties(properties)
|
||||
.setServerId(peer.getId())
|
||||
.setStateMachine(counterStateMachine)
|
||||
.build();
|
||||
}
|
||||
|
||||
public void start() throws IOException {
|
||||
server.start();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
server.close();
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws IOException {
|
||||
if (args.length < 1) {
|
||||
System.err.println("Usage: java -cp *.jar org.apache.ratis.examples.counter.server.CounterServer {serverIndex}");
|
||||
System.err.println("{serverIndex} could be 1, 2 or 3");
|
||||
System.exit(1);
|
||||
}
|
||||
|
||||
var peers = new ArrayList<RaftPeer>();
|
||||
String[] addresses = new String[]{"localhost:4440","localhost:4441","localhost:4442"};
|
||||
|
||||
for (int i = 0; i < addresses.length; i++) {
|
||||
var port = addresses[i].split(":")[1];
|
||||
peers.add(RaftPeer.newBuilder().setId("" + port).setAddress(addresses[i]).build());
|
||||
}
|
||||
|
||||
//find current peer object based on application parameter
|
||||
final RaftPeer currentPeer = peers.get(Integer.parseInt(args[0]));
|
||||
|
||||
//start a counter server
|
||||
final File storageDir = new File("./raftdata/" + currentPeer.getId());
|
||||
final CounterServer counterServer = new CounterServer(currentPeer, peers, storageDir);
|
||||
|
||||
counterServer.start();
|
||||
|
||||
|
||||
|
||||
|
||||
//exit when any input entered
|
||||
Scanner scanner = new Scanner(System.in, UTF_8.name());
|
||||
scanner.nextLine();
|
||||
counterServer.close();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,288 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.yuandian.dataflow.statemachine;
|
||||
|
||||
import org.apache.ratis.proto.RaftProtos;
|
||||
import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
|
||||
import org.apache.ratis.protocol.Message;
|
||||
import org.apache.ratis.protocol.RaftGroupId;
|
||||
import org.apache.ratis.protocol.RaftGroupMemberId;
|
||||
import org.apache.ratis.protocol.RaftPeer;
|
||||
import org.apache.ratis.protocol.RaftPeerId;
|
||||
import org.apache.ratis.server.RaftServer;
|
||||
import org.apache.ratis.server.protocol.TermIndex;
|
||||
import org.apache.ratis.server.raftlog.RaftLog;
|
||||
import org.apache.ratis.server.storage.RaftStorage;
|
||||
import org.apache.ratis.statemachine.TransactionContext;
|
||||
import org.apache.ratis.statemachine.impl.BaseStateMachine;
|
||||
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
|
||||
import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
|
||||
import org.apache.ratis.util.AutoCloseableLock;
|
||||
import org.apache.ratis.util.JavaUtils;
|
||||
|
||||
import com.yuandian.dataflow.statemachine_old.operate.Operate;
|
||||
import com.yuandian.dataflow.statemachine_old.state.State;
|
||||
import com.yuandian.dataflow.statemachine_old.state.WorkerState;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.ObjectInputStream;
|
||||
import java.io.ObjectOutputStream;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.Collection;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
/**
|
||||
* State machine implementation for Counter server application. This class
|
||||
* maintain a {@link AtomicInteger} object as a state and accept two commands:
|
||||
* GET and INCREMENT, GET is a ReadOnly command which will be handled by
|
||||
* {@code query} method however INCREMENT is a transactional command which
|
||||
* will be handled by {@code applyTransaction}.
|
||||
*/
|
||||
@Slf4j
|
||||
public class CounterStateMachine extends BaseStateMachine {
|
||||
private final SimpleStateMachineStorage storage =
|
||||
new SimpleStateMachineStorage();
|
||||
|
||||
|
||||
private State state = new State();
|
||||
|
||||
private AtomicInteger counter = new AtomicInteger(0);
|
||||
|
||||
private AtomicBoolean leader = new AtomicBoolean(false);
|
||||
|
||||
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
|
||||
|
||||
private AutoCloseableLock readLock() {
|
||||
return AutoCloseableLock.acquire(lock.readLock());
|
||||
}
|
||||
|
||||
private AutoCloseableLock writeLock() {
|
||||
return AutoCloseableLock.acquire(lock.writeLock());
|
||||
}
|
||||
|
||||
public Boolean isLeader() {
|
||||
return leader.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* initialize the state machine by initilize the state machine storage and
|
||||
* calling the load method which reads the last applied command and restore it
|
||||
* in counter object)
|
||||
*
|
||||
* @param server the current server information
|
||||
* @param groupId the cluster groupId
|
||||
* @param raftStorage the raft storage which is used to keep raft related
|
||||
* stuff
|
||||
* @throws IOException if any error happens during load state
|
||||
*/
|
||||
@Override
|
||||
public void initialize(RaftServer server, RaftGroupId groupId,
|
||||
RaftStorage raftStorage) throws IOException {
|
||||
super.initialize(server, groupId, raftStorage);
|
||||
this.storage.init(raftStorage);
|
||||
load(storage.getLatestSnapshot());
|
||||
}
|
||||
|
||||
/**
|
||||
* very similar to initialize method, but doesn't initialize the storage
|
||||
* system because the state machine reinitialized from the PAUSE state and
|
||||
* storage system initialized before.
|
||||
*
|
||||
* @throws IOException if any error happens during load state
|
||||
*/
|
||||
@Override
|
||||
public void reinitialize() throws IOException {
|
||||
load(storage.getLatestSnapshot());
|
||||
}
|
||||
|
||||
/**
|
||||
* Store the current state as an snapshot file in the stateMachineStorage.
|
||||
*
|
||||
* @return the index of the snapshot
|
||||
*/
|
||||
@Override
|
||||
public long takeSnapshot() {
|
||||
//get the last applied index
|
||||
final TermIndex last = getLastAppliedTermIndex();
|
||||
|
||||
//create a file with a proper name to store the snapshot
|
||||
final File snapshotFile =
|
||||
storage.getSnapshotFile(last.getTerm(), last.getIndex());
|
||||
|
||||
//serialize the counter object and write it into the snapshot file
|
||||
try (ObjectOutputStream out = new ObjectOutputStream(
|
||||
new BufferedOutputStream(new FileOutputStream(snapshotFile)))) {
|
||||
out.writeObject(counter);
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("Failed to write snapshot file \"" + snapshotFile
|
||||
+ "\", last applied index=" + last);
|
||||
}
|
||||
|
||||
//return the index of the stored snapshot (which is the last applied one)
|
||||
return last.getIndex();
|
||||
}
|
||||
|
||||
/**
|
||||
* Load the state of the state machine from the storage.
|
||||
*
|
||||
* @param snapshot to load
|
||||
* @return the index of the snapshot or -1 if snapshot is invalid
|
||||
* @throws IOException if any error happens during read from storage
|
||||
*/
|
||||
private long load(SingleFileSnapshotInfo snapshot) throws IOException {
|
||||
//check the snapshot nullity
|
||||
if (snapshot == null) {
|
||||
LOG.warn("The snapshot info is null.");
|
||||
return RaftLog.INVALID_LOG_INDEX;
|
||||
}
|
||||
|
||||
//check the existance of the snapshot file
|
||||
final File snapshotFile = snapshot.getFile().getPath().toFile();
|
||||
if (!snapshotFile.exists()) {
|
||||
LOG.warn("The snapshot file {} does not exist for snapshot {}",
|
||||
snapshotFile, snapshot);
|
||||
return RaftLog.INVALID_LOG_INDEX;
|
||||
}
|
||||
|
||||
//load the TermIndex object for the snapshot using the file name pattern of
|
||||
// the snapshot
|
||||
final TermIndex last =
|
||||
SimpleStateMachineStorage.getTermIndexFromSnapshotFile(snapshotFile);
|
||||
|
||||
//read the file and cast it to the AtomicInteger and set the counter
|
||||
try (ObjectInputStream in = new ObjectInputStream(
|
||||
new BufferedInputStream(new FileInputStream(snapshotFile)))) {
|
||||
//set the last applied termIndex to the termIndex of the snapshot
|
||||
setLastAppliedTermIndex(last);
|
||||
|
||||
//read, cast and set the counter
|
||||
counter = JavaUtils.cast(in.readObject());
|
||||
} catch (ClassNotFoundException e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
|
||||
return last.getIndex();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void notifyLeaderChanged(RaftGroupMemberId groupMemberId, RaftPeerId newLeaderId) {
|
||||
log.info("newLeaderId: {} groupMemberId: {}", newLeaderId , groupMemberId.getPeerId());
|
||||
leader.set(newLeaderId == groupMemberId.getPeerId());
|
||||
super.notifyLeaderChanged(groupMemberId, newLeaderId);
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Handle GET command, which used by clients to get the counter value.
|
||||
*
|
||||
* @param request the GET message
|
||||
* @return the Message containing the current counter value
|
||||
*/
|
||||
@Override
|
||||
public CompletableFuture<Message> query(Message request) {
|
||||
String msg = request.getContent().toString(Charset.defaultCharset());
|
||||
|
||||
if (!msg.equals("GET")) {
|
||||
return CompletableFuture.completedFuture(
|
||||
Message.valueOf("Invalid Command"));
|
||||
}
|
||||
|
||||
return CompletableFuture.completedFuture(
|
||||
Message.valueOf(counter.toString()));
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Apply the INCREMENT command by incrementing the counter object.
|
||||
*
|
||||
* @param trx the transaction context
|
||||
* @return the message containing the updated counter value
|
||||
*/
|
||||
@Override
|
||||
public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
|
||||
final RaftProtos.LogEntryProto entry = trx.getLogEntry();
|
||||
|
||||
//check if the command is valid
|
||||
// String logData = entry.getStateMachineLogEntry().getLogData()
|
||||
// .toString(Charset.defaultCharset());
|
||||
|
||||
Operate op ;
|
||||
try {
|
||||
op = (Operate)new ObjectInputStream(entry.getStateMachineLogEntry().getLogData().newInput()).readObject();
|
||||
} catch (IOException | ClassNotFoundException e) {
|
||||
e.printStackTrace();
|
||||
return CompletableFuture.completedFuture(Message.valueOf("错误op"));
|
||||
}
|
||||
|
||||
// if (!logData.equals("INCREMENT")) {
|
||||
// return CompletableFuture.completedFuture(
|
||||
// Message.valueOf("Invalid Command"));
|
||||
// }
|
||||
//update the last applied term and index
|
||||
final long index = entry.getIndex();
|
||||
|
||||
try(var r = writeLock()) {
|
||||
switch(op.getType()) {
|
||||
case ALLOCATE_PACKETS:
|
||||
break;
|
||||
case GET_STATE:
|
||||
break;
|
||||
case PUT_WORKERSTATE:
|
||||
var ws = op.<WorkerState>getValue();
|
||||
state.getWorkers().put(ws.getPeerId() , ws);
|
||||
break;
|
||||
case REMOVE:
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
|
||||
}
|
||||
updateLastAppliedTermIndex(entry.getTerm(), entry.getIndex());
|
||||
}
|
||||
|
||||
|
||||
//actual execution of the command: increment the counter
|
||||
// counter.incrementAndGet();
|
||||
|
||||
//return the new value of the counter to the client
|
||||
final CompletableFuture<Message> f =
|
||||
CompletableFuture.completedFuture(Message.valueOf("put ok"));
|
||||
|
||||
//if leader, log the incremented value and it's log index
|
||||
if (isLeader()) {
|
||||
log.info("{}: Increment to {}", index, counter.toString());
|
||||
}
|
||||
|
||||
|
||||
return f;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,137 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.yuandian.dataflow.statemachine.client;
|
||||
|
||||
|
||||
|
||||
import org.apache.ratis.client.RaftClient;
|
||||
import org.apache.ratis.conf.Parameters;
|
||||
import org.apache.ratis.conf.RaftProperties;
|
||||
import org.apache.ratis.grpc.GrpcConfigKeys;
|
||||
import org.apache.ratis.grpc.GrpcFactory;
|
||||
import org.apache.ratis.protocol.ClientId;
|
||||
import org.apache.ratis.protocol.Message;
|
||||
import org.apache.ratis.protocol.RaftClientReply;
|
||||
import org.apache.ratis.protocol.RaftGroup;
|
||||
import org.apache.ratis.protocol.RaftGroupId;
|
||||
import org.apache.ratis.protocol.RaftPeer;
|
||||
import org.apache.ratis.protocol.RaftPeerId;
|
||||
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
|
||||
import org.apache.ratis.util.NetUtils;
|
||||
import org.springframework.cglib.proxy.CallbackFilter;
|
||||
|
||||
import com.yuandian.dataflow.statemachine.CounterServer;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.Charset;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* Counter client application, this application sends specific number of
|
||||
* INCREMENT command to the Counter cluster and at the end sends a GET command
|
||||
* and print the result
|
||||
* <p>
|
||||
* Parameter to this application indicate the number of INCREMENT command, if no
|
||||
* parameter found, application use default value which is 10
|
||||
*/
|
||||
@Slf4j
|
||||
public final class CounterClient {
|
||||
|
||||
private CounterClient(){
|
||||
}
|
||||
|
||||
|
||||
public static void main(String[] args)
|
||||
throws IOException, InterruptedException {
|
||||
//indicate the number of INCREMENT command, set 10 if no parameter passed
|
||||
int increment = args.length > 0 ? Integer.parseInt(args[0]) : 10;
|
||||
|
||||
//build the counter cluster client
|
||||
RaftClient raftClient = buildClient();
|
||||
|
||||
//use a executor service with 10 thread to send INCREMENT commands
|
||||
// concurrently
|
||||
ExecutorService executorService = Executors.newFixedThreadPool(10);
|
||||
|
||||
increment = 1000;
|
||||
CountDownLatch latch = new CountDownLatch(increment);
|
||||
//send INCREMENT commands concurrently
|
||||
System.out.printf("Sending %d increment command...%n", increment);
|
||||
Instant now = Instant.now();
|
||||
for (int i = 0; i < increment; i++) {
|
||||
executorService.submit(() ->
|
||||
raftClient.io().send(Message.valueOf("INCREMENT")));
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
|
||||
//shutdown the executor service and wait until they finish their work
|
||||
executorService.shutdown();
|
||||
executorService.awaitTermination(increment * 500L, TimeUnit.MILLISECONDS);
|
||||
latch.await();
|
||||
log.info("{}", Duration.between(now, Instant.now()).toMillis());
|
||||
//send GET command and print the response
|
||||
|
||||
|
||||
RaftClientReply count = raftClient.io().sendReadOnly(Message.valueOf("GET"));
|
||||
String response = count.getMessage().getContent().toString(Charset.defaultCharset());
|
||||
System.out.println(response);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* build the RaftClient instance which is used to communicate to
|
||||
* Counter cluster
|
||||
*
|
||||
* @return the created client of Counter cluster
|
||||
*/
|
||||
private static RaftClient buildClient() {
|
||||
RaftProperties raftProperties = new RaftProperties();
|
||||
//set the storage directory (different for each peer) in RaftProperty object
|
||||
|
||||
|
||||
//set the port which server listen to in RaftProperty object
|
||||
// final int port = NetUtils.createSocketAddr("localhost:4440").getPort();
|
||||
// GrpcConfigKeys.Server.setPort(raftProperties, port);
|
||||
|
||||
var peers = new ArrayList<RaftPeer>();
|
||||
String[] addresses = new String[]{"localhost:4440","localhost:4441","localhost:4442"};
|
||||
|
||||
for (int i = 0; i < addresses.length; i++) {
|
||||
var port = addresses[i].split(":")[1];
|
||||
peers.add(RaftPeer.newBuilder().setId("" + port).setAddress(addresses[i]).build());
|
||||
}
|
||||
RaftGroup raftGroup = RaftGroup.valueOf(
|
||||
RaftGroupId.valueOf(CounterServer.CLUSTER_GROUP_ID), peers);
|
||||
|
||||
RaftClient.Builder builder = RaftClient.newBuilder()
|
||||
.setProperties(raftProperties)
|
||||
.setRaftGroup(raftGroup)
|
||||
.setClientRpc(new GrpcFactory(new Parameters()).newRaftClientRpc(ClientId.randomId(), raftProperties));
|
||||
return builder.build();
|
||||
}
|
||||
}
|
|
@ -4,7 +4,7 @@
|
|||
* @author eson
|
||||
*2022年7月20日-10:00:05
|
||||
*/
|
||||
package com.yuandian.dataflow.statemachine;
|
||||
package com.yuandian.dataflow.statemachine_old;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
|
@ -18,13 +18,13 @@ import com.alipay.sofa.jraft.rpc.InvokeCallback;
|
|||
import com.google.protobuf.Any;
|
||||
import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest;
|
||||
import com.yuandian.dataflow.proto.msgtype.BacktrackingFlowOuterClass;
|
||||
import com.yuandian.dataflow.statemachine.closure.GenericClosure;
|
||||
import com.yuandian.dataflow.statemachine.master.MasterContext;
|
||||
import com.yuandian.dataflow.statemachine.master.MasterExecute;
|
||||
import com.yuandian.dataflow.statemachine.operate.Operate;
|
||||
import com.yuandian.dataflow.statemachine.operate.Operate.OperateType;
|
||||
import com.yuandian.dataflow.statemachine.state.State;
|
||||
import com.yuandian.dataflow.statemachine.state.WorkerState;
|
||||
import com.yuandian.dataflow.statemachine_old.closure.GenericClosure;
|
||||
import com.yuandian.dataflow.statemachine_old.master.MasterContext;
|
||||
import com.yuandian.dataflow.statemachine_old.master.MasterExecute;
|
||||
import com.yuandian.dataflow.statemachine_old.operate.Operate;
|
||||
import com.yuandian.dataflow.statemachine_old.operate.Operate.OperateType;
|
||||
import com.yuandian.dataflow.statemachine_old.state.State;
|
||||
import com.yuandian.dataflow.statemachine_old.state.WorkerState;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
|
@ -4,7 +4,7 @@
|
|||
* @author eson
|
||||
*2022年7月12日-13:36:24
|
||||
*/
|
||||
package com.yuandian.dataflow.statemachine;
|
||||
package com.yuandian.dataflow.statemachine_old;
|
||||
|
||||
import java.io.File;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
|
@ -49,15 +49,15 @@ import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory;
|
|||
import com.alipay.sofa.jraft.util.BytesUtil;
|
||||
import com.alipay.sofa.jraft.util.Endpoint;
|
||||
import com.alipay.sofa.jraft.util.ThreadPoolUtil;
|
||||
import com.yuandian.dataflow.statemachine.annotations.MasterRegister;
|
||||
import com.yuandian.dataflow.statemachine.annotations.WorkerRegister;
|
||||
import com.yuandian.dataflow.statemachine.closure.GenericClosure;
|
||||
import com.yuandian.dataflow.statemachine.master.MasterExecute;
|
||||
import com.yuandian.dataflow.statemachine.operate.Operate;
|
||||
import com.yuandian.dataflow.statemachine.operate.Operate.OperateType;
|
||||
import com.yuandian.dataflow.statemachine.rpc.OperateProcessor.OperateRequest;
|
||||
import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
|
||||
import com.yuandian.dataflow.statemachine.state.State;
|
||||
import com.yuandian.dataflow.statemachine_old.annotations.MasterRegister;
|
||||
import com.yuandian.dataflow.statemachine_old.annotations.WorkerRegister;
|
||||
import com.yuandian.dataflow.statemachine_old.closure.GenericClosure;
|
||||
import com.yuandian.dataflow.statemachine_old.master.MasterExecute;
|
||||
import com.yuandian.dataflow.statemachine_old.operate.Operate;
|
||||
import com.yuandian.dataflow.statemachine_old.operate.Operate.OperateType;
|
||||
import com.yuandian.dataflow.statemachine_old.rpc.RaftResponse;
|
||||
import com.yuandian.dataflow.statemachine_old.rpc.OperateProcessor.OperateRequest;
|
||||
import com.yuandian.dataflow.statemachine_old.state.State;
|
||||
import com.yuandian.dataflow.utils.Utils;
|
||||
|
||||
import lombok.Getter;
|
|
@ -1,4 +1,4 @@
|
|||
package com.yuandian.dataflow.statemachine;
|
||||
package com.yuandian.dataflow.statemachine_old;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.time.Instant;
|
||||
|
@ -18,11 +18,11 @@ import com.alipay.sofa.jraft.error.RaftException;
|
|||
import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader;
|
||||
import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter;
|
||||
import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest;
|
||||
import com.yuandian.dataflow.statemachine.closure.GenericClosure;
|
||||
import com.yuandian.dataflow.statemachine.operate.Operate;
|
||||
import com.yuandian.dataflow.statemachine.operate.Operate.OperateType;
|
||||
import com.yuandian.dataflow.statemachine.state.State;
|
||||
import com.yuandian.dataflow.statemachine.state.WorkerState;
|
||||
import com.yuandian.dataflow.statemachine_old.closure.GenericClosure;
|
||||
import com.yuandian.dataflow.statemachine_old.operate.Operate;
|
||||
import com.yuandian.dataflow.statemachine_old.operate.Operate.OperateType;
|
||||
import com.yuandian.dataflow.statemachine_old.state.State;
|
||||
import com.yuandian.dataflow.statemachine_old.state.WorkerState;
|
||||
import com.yuandian.dataflow.utils.Utils;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
|
@ -4,7 +4,7 @@
|
|||
* @author eson
|
||||
*2022年7月21日-14:27:49
|
||||
*/
|
||||
package com.yuandian.dataflow.statemachine.annotations;
|
||||
package com.yuandian.dataflow.statemachine_old.annotations;
|
||||
|
||||
import java.lang.annotation.ElementType;
|
||||
import java.lang.annotation.Retention;
|
|
@ -4,7 +4,7 @@
|
|||
* @author eson
|
||||
*2022年7月21日-14:27:49
|
||||
*/
|
||||
package com.yuandian.dataflow.statemachine.annotations;
|
||||
package com.yuandian.dataflow.statemachine_old.annotations;
|
||||
|
||||
import java.lang.annotation.ElementType;
|
||||
import java.lang.annotation.Retention;
|
|
@ -1,8 +1,8 @@
|
|||
package com.yuandian.dataflow.statemachine.closure;
|
||||
package com.yuandian.dataflow.statemachine_old.closure;
|
||||
|
||||
import com.alipay.sofa.jraft.Closure;
|
||||
import com.alipay.sofa.jraft.entity.PeerId;
|
||||
import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
|
||||
import com.yuandian.dataflow.statemachine_old.rpc.RaftResponse;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
|
@ -1,4 +1,4 @@
|
|||
package com.yuandian.dataflow.statemachine.master;
|
||||
package com.yuandian.dataflow.statemachine_old.master;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
@ -1,4 +1,4 @@
|
|||
package com.yuandian.dataflow.statemachine.master;
|
||||
package com.yuandian.dataflow.statemachine_old.master;
|
||||
|
||||
/**
|
||||
* Master的主线程循环
|
|
@ -1,15 +1,15 @@
|
|||
package com.yuandian.dataflow.statemachine.operate;
|
||||
package com.yuandian.dataflow.statemachine_old.operate;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
import com.alipay.sofa.jraft.Status;
|
||||
import com.alipay.sofa.jraft.error.RemotingException;
|
||||
import com.alipay.sofa.jraft.rpc.InvokeCallback;
|
||||
import com.yuandian.dataflow.statemachine.StateFactory;
|
||||
import com.yuandian.dataflow.statemachine.closure.GenericClosure;
|
||||
import com.yuandian.dataflow.statemachine.rpc.OperateProcessor;
|
||||
import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
|
||||
import com.yuandian.dataflow.statemachine.state.WorkerState;
|
||||
import com.yuandian.dataflow.statemachine_old.StateFactory;
|
||||
import com.yuandian.dataflow.statemachine_old.closure.GenericClosure;
|
||||
import com.yuandian.dataflow.statemachine_old.rpc.OperateProcessor;
|
||||
import com.yuandian.dataflow.statemachine_old.rpc.RaftResponse;
|
||||
import com.yuandian.dataflow.statemachine_old.state.WorkerState;
|
||||
import com.yuandian.dataflow.utils.PacketsManager;
|
||||
|
||||
import lombok.Data;
|
|
@ -4,7 +4,7 @@
|
|||
* @author eson
|
||||
*2022年7月12日-11:10:54
|
||||
*/
|
||||
package com.yuandian.dataflow.statemachine.rpc;
|
||||
package com.yuandian.dataflow.statemachine_old.rpc;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
|
@ -12,10 +12,10 @@ import com.alipay.sofa.jraft.Status;
|
|||
import com.alipay.sofa.jraft.error.RaftError;
|
||||
import com.alipay.sofa.jraft.rpc.RpcContext;
|
||||
import com.alipay.sofa.jraft.rpc.RpcProcessor;
|
||||
import com.yuandian.dataflow.statemachine.StateFactory;
|
||||
import com.yuandian.dataflow.statemachine.annotations.WorkerRegister;
|
||||
import com.yuandian.dataflow.statemachine.closure.GenericClosure;
|
||||
import com.yuandian.dataflow.statemachine.operate.Operate;
|
||||
import com.yuandian.dataflow.statemachine_old.StateFactory;
|
||||
import com.yuandian.dataflow.statemachine_old.annotations.WorkerRegister;
|
||||
import com.yuandian.dataflow.statemachine_old.closure.GenericClosure;
|
||||
import com.yuandian.dataflow.statemachine_old.operate.Operate;
|
||||
|
||||
import javassist.ClassPath;
|
||||
|
|
@ -4,7 +4,7 @@
|
|||
* @author eson
|
||||
*2022年7月13日-09:07:22
|
||||
*/
|
||||
package com.yuandian.dataflow.statemachine.rpc;
|
||||
package com.yuandian.dataflow.statemachine_old.rpc;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
|
@ -4,7 +4,7 @@
|
|||
* @author eson
|
||||
*2022年7月13日-09:11:26
|
||||
*/
|
||||
package com.yuandian.dataflow.statemachine.state;
|
||||
package com.yuandian.dataflow.statemachine_old.state;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
|
@ -4,7 +4,7 @@
|
|||
* @author eson
|
||||
*2022年7月15日-10:04:00
|
||||
*/
|
||||
package com.yuandian.dataflow.statemachine.state;
|
||||
package com.yuandian.dataflow.statemachine_old.state;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.time.Instant;
|
|
@ -24,7 +24,7 @@
|
|||
<appender-ref ref="CONSOLE" />
|
||||
</root> -->
|
||||
|
||||
<logger name="com.yuandian.dataflow" level="debug|info">
|
||||
<logger name="com.yuandian.dataflow" level="debug|info|error">
|
||||
<appender-ref ref="CONSOLE"/>
|
||||
</logger>
|
||||
</configuration>
|
Loading…
Reference in New Issue
Block a user