From 254d3d23f3f3bca7cc8329f05f152862978a60a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E6=80=9D=E6=95=8F?= Date: Fri, 26 Aug 2022 10:23:06 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- device.go | 265 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 265 insertions(+) create mode 100644 device.go diff --git a/device.go b/device.go new file mode 100644 index 0000000..0b6448e --- /dev/null +++ b/device.go @@ -0,0 +1,265 @@ +package main + +import ( + "bytes" + "context" + "encoding/binary" + "encoding/gob" + "fmt" + "log" + "os/exec" + gen "slimming/proto/gen" + "strings" + "sync/atomic" + "time" + + "github.com/klauspost/compress/zstd" + "github.com/songgao/packets/ethernet" + "github.com/songgao/water" + "github.com/songgao/water/waterutil" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +type ExchangeBuffer struct { + BytesArray [][]byte +} + +type NetCard struct { + FrameChan chan []byte + ifce *water.Interface + server *RPCServer + + clientMap map[string]*RPCClient +} + +type RPCClient struct { + FrameChan chan *ExchangeBuffer + conn *grpc.ClientConn +} + +func (cli *RPCClient) connect(realAddr string) { + log.Println("rpcclient connect", realAddr) + // defer log.Println("rpcclient exit") + + conn, err := grpc.Dial(realAddr, + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + log.Printf("did not connect: %v", err) + return + } + + cli.conn = conn + go cli.run() + time.Sleep(time.Millisecond) +} + +func (cli *RPCClient) run() { + defer log.Println("rpcclient exit") + defer func() { + if err := recover(); err != nil { + log.Println("recover") + cli.conn = nil + } + }() + + c := gen.NewFrameServiceClient(cli.conn) + stream, err := c.SendFrames(context.Background()) + if err != nil { + log.Panic(err) + } + + buf := bytes.NewBuffer(nil) + for { + // Contact the server and print out its response. + + buf.Reset() + + zenc, err := zstd.NewWriter(buf) + if err != nil { + log.Panic(err) + } + + enc := gob.NewEncoder(zenc) + cliBuffer := <-cli.FrameChan + enc.Encode(cliBuffer.BytesArray) + + err = zenc.Flush() + if err != nil { + log.Println(err) + } + + log.Printf("%v %d", cliBuffer, len(buf.Bytes())) + // 发到对面的网卡 + err = stream.Send(&gen.Request{ + Frames: buf.Bytes(), + }) + + if err != nil { + log.Panic(err) + } + } +} + +func (nc *NetCard) Run() { + go nc.runRead() + go nc.runWrite() + // go nc.cli.run() + time.Sleep(time.Second) + + cmdstr := fmt.Sprintf("ip addr add %s dev stun", config.Network.Self.Virt) + log.Println(cmdstr) + cmd := strings.Split(cmdstr, " ") + err := exec.Command(cmd[0], cmd[1:]...).Run() + if err != nil { + log.Panic(err) + } + + cmdstr = "ip link set dev stun up" + cmd = strings.Split(cmdstr, " ") + err = exec.Command(cmd[0], cmd[1:]...).Run() + if err != nil { + log.Panic(err) + } + nc.server.run() +} + +func NewNetCard() *NetCard { + + config := water.Config{ + DeviceType: water.TUN, + } + config.Name = "stun" + + ifce, err := water.New(config) + if err != nil { + log.Panic(err) + } + + nc := &NetCard{ + FrameChan: make(chan []byte, 3000), + ifce: ifce, + clientMap: map[string]*RPCClient{}, + } + nc.server = newRPCServer(nc) + return nc +} + +func (nc *NetCard) runRead() { + log.Println("start netcard read") + + var ifce *water.Interface = nc.ifce + + for { + + var bytesMap map[string]*ExchangeBuffer = make(map[string]*ExchangeBuffer) + + var ok bool + var isLoop int32 = 1 + + // 20 微秒后停止收集数据 + go func() { + var after = time.NewTimer(time.Millisecond * 20) + <-after.C + atomic.StoreInt32(&isLoop, 0) + }() + + for atomic.LoadInt32(&isLoop) > 0 { + var rframe ethernet.Frame + rframe.Resize(1500) + + n, err := ifce.Read([]byte(rframe)) + if err != nil { + log.Fatal(err) + } + rframe = rframe[:n] + if !waterutil.IsIPv4(rframe) { + continue + } + + var realAddr string = "" + config.Lock(func() { + if realAddr, ok = config.IPv4Nodes[binary.LittleEndian.Uint32([]byte(waterutil.IPv4Destination(rframe)))]; !ok { + return + } + }) + if realAddr == "" { + continue + } + + // log.Printf("Payload: % x\n", rframe.Payload()) + + log.Printf("Dst: %s Src %s\n", waterutil.IPv4Destination(rframe), waterutil.IPv4Source(rframe)) + + log.Printf("Ethertype: % x %v\n", rframe.Ethertype(), waterutil.IsIPv4(rframe)) + + var buffer *ExchangeBuffer + if buffer, ok = bytesMap[realAddr]; !ok { + buffer = &ExchangeBuffer{BytesArray: make([][]byte, 0, 1000)} + bytesMap[realAddr] = buffer + } + + buffer.BytesArray = append(buffer.BytesArray, []byte(rframe)) + } + + for dst, buffer := range bytesMap { + + var cli *RPCClient + if cli, ok = nc.clientMap[dst]; !ok { + + cli = &RPCClient{FrameChan: make(chan *ExchangeBuffer)} + nc.clientMap[dst] = cli + } + + if cli.conn == nil { + cli.connect(dst) + } else { + cli.FrameChan <- buffer // 网卡数据 发到对方 + } + + } + + // 写到grpc服务 + + // log.Printf("Dst: %s\n", rframe.Destination()[0:4]) + // log.Printf("Src: %s\n", rframe.Source()[0:4]) + // log.Printf("Ethertype: % x\n", rframe.Ethertype()) + // log.Printf("Payload: % x\n", rframe.Payload()) + + } +} + +func (nc *NetCard) runWrite() { + var ifce *water.Interface = nc.ifce + + var buf = bytes.NewBuffer(nil) + for wframe := range nc.FrameChan { + + log.Printf("get wframes bytes len: %d", len(wframe)) + + buf.Reset() + buf.Write(wframe) + + zdec, err := zstd.NewReader(buf) + if err != nil { + log.Panic(err) + } + + dec := gob.NewDecoder(zdec) + + var bufs [][]byte + err = dec.Decode(&bufs) + if err != nil { + log.Panic(err) + } + + for _, buf := range bufs { + _, err := ifce.Write(buf) + if err != nil { + log.Panic(err) + } + } + } + +}