TODO: last

This commit is contained in:
huangsimin 2022-07-29 18:06:05 +08:00
parent 9c1e5a62ec
commit ae6abc715c
9 changed files with 80 additions and 46 deletions

View File

@ -10,8 +10,10 @@ import org.slf4j.MarkerFactory;
import com.alipay.sofa.jraft.JRaftUtils;
import com.alipay.sofa.jraft.conf.Configuration;
import com.yuandian.dataflow.statemachine.StateFactory;
import com.yuandian.dataflow.utils.Utils;
import io.netty.util.internal.StringUtil;
import javassist.ClassClassPath;
import lombok.extern.slf4j.Slf4j;
@ -30,20 +32,11 @@ public class Server {
public static void main(String[] args) throws Exception {
final var resourceUrl = ClassLoader.getSystemResources("");
while(resourceUrl.hasMoreElements()) {
var e = resourceUrl.nextElement();
log.info("{} {}", e.getFile(), e.getProtocol());
}
System.exit(0);
String[] peers = new String[]{"localhost:4440","localhost:4441","localhost:4442"};

View File

@ -59,7 +59,7 @@ public class PacketsProcessor implements RpcProcessor<PacketsProcessor.PacketsRe
// TODO: request.packets 入库,回填, 告警 等操作
Thread.sleep(ThreadLocalRandom .current().nextLong(100, 3000));
Thread.sleep(ThreadLocalRandom.current().nextLong(100, 3000));
} catch (InterruptedException e) {
log.info(e.toString());

View File

@ -8,6 +8,8 @@ package com.yuandian.dataflow.statemachine;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.error.RemotingException;
@ -41,18 +43,31 @@ public class MasterFactory {
public static final int MAX_TASKS = 100;
public static class MasterContext {
AtomicBoolean isExit = new AtomicBoolean(false);
Object share;
}
private static MasterContext cxt = new MasterContext();
private static Consumer<MasterContext> masterMainLoop;
public static Thread masterExecute = new Thread(new Runnable() {
@Override
public void run() {
try {
while (true) {
while (!cxt.isExit.get()) {
masterMainLoop.accept(cxt);
var alivePeers = List.copyOf(StateFactory.getRaftNode().listAlivePeers());
log.debug("master({}) execute {}", StateFactory.getServerId(),
StateFactory.getRaftNode().listAlivePeers());
if (alivePeers != null) {
// 读一致性
StateFactory.readIndexState(new GenericClosure<State>() {
@ -91,9 +106,9 @@ public class MasterFactory {
request.getPackets().add(p);
}
// 先提交 节点的 剩余能处理的任务数量. 然后再处理
Operate.CallOperate(new Operate(OperateType.PUT_WORKERSTATE, ws),
Operate.CallOperate(
new Operate(OperateType.PUT_WORKERSTATE, ws),
new GenericClosure<Operate>() {
@Override
public void run(Status status) {
@ -115,7 +130,6 @@ public class MasterFactory {
}
}
});
});
}

View File

@ -11,12 +11,14 @@ import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Consumer;
import org.reflections.ReflectionUtils;
import org.reflections.Reflections;
@ -50,6 +52,7 @@ import com.yuandian.dataflow.statemachine.rpc.OperateProcessor.OperateRequest;
import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft;
import com.yuandian.dataflow.statemachine.state.State;
import com.yuandian.dataflow.utils.Utils;
import lombok.Getter;
import lombok.Setter;
@ -156,6 +159,8 @@ public class StateFactory {
// conf =
// JRaftUtils.getConfiguration("localhost:4440,localhost:4441,localhost:4442");
PeerId serverId = JRaftUtils.getPeerId(addr);
int port = serverId.getPort();
@ -176,25 +181,18 @@ public class StateFactory {
cluster = new RaftGroupService(groupId, serverId, nodeOptions);
// 扫描注解RaftProccessor 注册
var now = Instant.now();
HashMap<String, Class<?>> scansMap = new HashMap<>();
Set<String> packageNames = new HashSet<>();
for (var p : Package.getPackages()) {
packageNames.add(p.getName().split("\\.")[0] );
}
for (var name: packageNames) {
Set<Class<?>> scans = new Reflections(name).getTypesAnnotatedWith(ProcessorRaft.class);
scans.forEach((pRaftClass) -> {
scansMap.put(pRaftClass.getName(), pRaftClass);
});
}
log.info("scan annotates cost time: {}", Duration.between(now, Instant.now())) ;
var traces = Thread.currentThread().getStackTrace();
var clsName = traces[traces.length - 1].getClassName();
var packName = clsName.substring(0, Utils.indexOfAsPossible(clsName, ".", 3));
log.info("获取 {} -> {} 下包的所有注解",clsName, packName );
Set<Class<?>> scans = new Reflections(packName).getTypesAnnotatedWith(ProcessorRaft.class);
scans.forEach((pRaftClass) -> {
scansMap.put(pRaftClass.getName(), pRaftClass);
});
log.info("扫描注解的时间(scan annotations cost time): {} ms", Duration.between(now, Instant.now()).toMillis()) ;
scansMap.forEach((name, pRaftClass) -> {
try {
cluster.getRpcServer()
@ -204,11 +202,12 @@ public class StateFactory {
log.info("{} {}", name, e.toString());
}
});
// 启动集群
node = cluster.start();
rpcClient = new BoltRaftRpcFactory().createRpcClient();
rpcClient.init(new CliOptions());
rpcClient = new BoltRaftRpcFactory().createRpcClient(); // 创建rpc客户端.
rpcClient.init(new CliOptions()); // 初始化
}
public boolean isLeader() {

View File

@ -60,10 +60,9 @@ public class Operate implements Serializable {
@java.lang.SuppressWarnings("unchecked")
public static void CallOperate(Operate op, GenericClosure<Operate> closure) {
log.debug("CallOperate Value {}", op.<WorkerState>getValue());
var ss = StateFactory.getStateServer();
// 如果是leader 就直接提交
if (StateFactory.isLeader()) {
ss.applyOperate(op, closure);
StateFactory.applyOperate(op, closure);
return;
}

View File

@ -17,10 +17,8 @@ import java.lang.annotation.Target;
* @author eson
*2022年7月21日-14:27:49
*/
@Retention(RetentionPolicy.SOURCE)
@Retention(RetentionPolicy.CLASS)
@Target(ElementType.TYPE)
public @interface Master {
public int order() default 0 ;
}

View File

@ -17,7 +17,7 @@ import java.lang.annotation.Target;
* @author eson
*2022年7月21日-14:27:49
*/
@Retention(RetentionPolicy.SOURCE)
@Retention(RetentionPolicy.CLASS)
@Target(ElementType.TYPE)
public @interface ProcessorRaft {
}

View File

@ -1,6 +1,34 @@
package com.yuandian.dataflow.utils;
public class Utils {
public static int indexOf(String str, String substr, int n) {
int pos = str.indexOf(substr);
while (--n > 0 && pos != -1) {
pos = str.indexOf(substr, pos + 1);
}
return pos;
}
/**
* 尽可能的拿到 n个substr索引点的字符串. 如果超出就拿最大值.
* @param str 原始字符串
* @param substr 需要定位的字符串
* @param n
* @return
*/
public static int indexOfAsPossible(String str, String substr, int n) {
int pos = str.indexOf(substr);
while (--n > 0 && pos != -1) {
pos = str.indexOf(substr, pos + 1);
}
if(pos == -1) {
return str.length();
}
return pos;
}
public static void main(String[] args) {

View File

@ -10,6 +10,9 @@ import lombok.extern.slf4j.Slf4j;
public class StateMachineTest {
@Test
void testOnApply() throws InterruptedException, RemotingException {
// var rpcClient = new BoltRaftRpcFactory().createRpcClient();
// rpcClient.init(new CliOptions());