From 597cbf739b8e2b8ee11ca51366f27d36e425c856 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E6=80=9D=E6=95=8F?= Date: Mon, 29 Aug 2022 14:54:52 +0800 Subject: [PATCH] =?UTF-8?q?=E7=BC=BA=E5=B0=91=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- netcard.go | 285 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 285 insertions(+) create mode 100644 netcard.go diff --git a/netcard.go b/netcard.go new file mode 100644 index 0000000..d0438a3 --- /dev/null +++ b/netcard.go @@ -0,0 +1,285 @@ +package main + +import ( + "bytes" + "context" + "encoding/binary" + "encoding/gob" + "fmt" + "log" + "net" + "os/exec" + gen "slimming/proto/gen" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/songgao/packets/ethernet" + "github.com/songgao/water" + "github.com/songgao/water/waterutil" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +type ExchangeBuffer struct { + BytesArray [][]byte +} + +type NetCard struct { + FrameChan chan []byte + ifce *water.Interface + server *RPCServer + + clientMap map[string]*RPCClient + + lock sync.Mutex +} + +func (nc *NetCard) Lock(do func()) { + nc.lock.Lock() + defer nc.lock.Unlock() + do() +} + +type RPCClient struct { + FrameChan chan *ExchangeBuffer + conn *grpc.ClientConn +} + +func (cli *RPCClient) connect(realAddr string) { + log.Println("rpcclient connect", realAddr) + // defer log.Println("rpcclient exit") + + conn, err := grpc.Dial(realAddr, + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + log.Printf("did not connect: %v", err) + return + } + + cli.conn = conn + go cli.run() + time.Sleep(time.Millisecond) +} + +func (cli *RPCClient) run() { + defer log.Println("rpcclient exit") + defer func() { + if err := recover(); err != nil { + log.Println("recover") + cli.conn = nil + } + }() + + cliService := gen.NewFrameServiceClient(cli.conn) + stream, err := cliService.SendFrames(context.Background()) + if err != nil { + log.Panic(err) + } + + buf := bytes.NewBuffer(nil) + for { + // Contact the server and print out its response. + + buf.Reset() + + // encode gob + enc := gob.NewEncoder(buf) + cliBuffer := <-cli.FrameChan + + log.Printf("send to target %s source bytes: %d", waterutil.IPv4Destination(cliBuffer.BytesArray[0]), len(cliBuffer.BytesArray[0])) + + err := enc.Encode(cliBuffer.BytesArray) + if err != nil { + log.Panic(err) + } + + // zstd compress + // zenc, err := zstd.NewWriter(buf) + // if err != nil { + // log.Panic(err) + // } + // err = zenc.Flush() + // if err != nil { + // log.Println(err) + // } + + // 发到对面的网卡 + err = stream.Send(&gen.Request{ + Frames: buf.Bytes(), + }) + + if err != nil { + log.Panic(err) + } + } +} + +func (nc *NetCard) Run() { + go nc.runRead() + go nc.runWrite() + // go nc.cli.run() + time.Sleep(time.Second) + + cmdstr := fmt.Sprintf("ip addr add %s dev stun", config.Network.Self.Virt) + log.Println(cmdstr) + cmd := strings.Split(cmdstr, " ") + err := exec.Command(cmd[0], cmd[1:]...).Run() + if err != nil { + log.Panic(err) + } + + cmdstr = "ip link set dev stun up" + cmd = strings.Split(cmdstr, " ") + err = exec.Command(cmd[0], cmd[1:]...).Run() + if err != nil { + log.Panic(err) + } + nc.server.run() +} + +func NewNetCard() *NetCard { + + config := water.Config{ + DeviceType: water.TUN, + PlatformSpecificParams: water.PlatformSpecificParams{ + Name: "stun", + Persist: true, + }, + } + + ifce, err := water.New(config) + if err != nil { + log.Panic(err) + } + + nc := &NetCard{ + FrameChan: make(chan []byte, 3000), + ifce: ifce, + clientMap: map[string]*RPCClient{}, + } + nc.server = newRPCServer(nc) + return nc +} + +func (nc *NetCard) runRead() { + log.Println("start netcard read") + + var ifce *water.Interface = nc.ifce + + for { + + var bytesMap map[string]*ExchangeBuffer = make(map[string]*ExchangeBuffer) + + var ok bool + var isLoop int32 = 1 + + // 20 微秒后停止收集数据 + go func() { + var after = time.NewTimer(time.Millisecond * 20) + <-after.C + atomic.StoreInt32(&isLoop, 0) + }() + + for atomic.LoadInt32(&isLoop) > 0 { + var rframe ethernet.Frame + rframe.Resize(1500) + n, err := ifce.Read([]byte(rframe)) + if err != nil { + log.Fatal(err) + } + rframe = rframe[:n] + + if !waterutil.IsIPv4(rframe) || waterutil.IPv4Source(rframe).Equal(net.IPv4(0, 0, 0, 0)) { + continue + } + + var realAddr string = "" + config.Lock(func() { + if realAddr, ok = config.IPv4Nodes[binary.LittleEndian.Uint32([]byte(waterutil.IPv4Destination(rframe)))]; !ok { + return + } + }) + + if realAddr == "" { + log.Printf("%s is not exists", waterutil.IPv4Destination(rframe)) + continue + } + + // log.Printf("Payload: % x\n", rframe.Payload()) + + log.Printf("Ethertype: % x %v realAddr %s\n", rframe.Ethertype(), waterutil.IsIPv4(rframe), realAddr) + log.Printf("Src %s Dst: %s\n", waterutil.IPv4Source(rframe), waterutil.IPv4Destination(rframe)) + + var buffer *ExchangeBuffer + if buffer, ok = bytesMap[realAddr]; !ok { + buffer = &ExchangeBuffer{BytesArray: make([][]byte, 0, 1000)} + bytesMap[realAddr] = buffer + } + + buffer.BytesArray = append(buffer.BytesArray, []byte(rframe)) + } + + for dst, buffer := range bytesMap { + + var cli *RPCClient + if cli, ok = nc.clientMap[dst]; !ok { + cli = &RPCClient{FrameChan: make(chan *ExchangeBuffer)} + nc.clientMap[dst] = cli + } + + if cli.conn == nil { + cli.connect(dst) + } else { + cli.FrameChan <- buffer // 网卡数据 发到对方 + } + + } + + // 写到grpc服务 + + // log.Printf("Dst: %s\n", rframe.Destination()[0:4]) + // log.Printf("Src: %s\n", rframe.Source()[0:4]) + // log.Printf("Ethertype: % x\n", rframe.Ethertype()) + // log.Printf("Payload: % x\n", rframe.Payload()) + + } +} + +func (nc *NetCard) runWrite() { + var ifce *water.Interface = nc.ifce + var err error + + for wframe := range nc.FrameChan { + + log.Printf("get wframes bytes len: %d", len(wframe)) + var buf = bytes.NewBuffer(wframe) + + // zdec, err := zstd.NewReader(buf) + // if err != nil { + // log.Panic(err) + // } + // zdec.Close() + + dec := gob.NewDecoder(buf) + var bufs [][]byte + err = dec.Decode(&bufs) + if err != nil { + log.Panic(err) + } + + for _, buf := range bufs { + + log.Printf("get wframes decode len: %d", len(buf)) + log.Printf("get decode frames decode source: %s dst: %s", waterutil.IPv4Source(buf), waterutil.IPv4Destination(buf)) + _, err := ifce.Write(buf) + if err != nil { + log.Panic(err) + } + } + + } + +}