diff --git a/value.go b/value.go index 34c022d..2c3b677 100644 --- a/value.go +++ b/value.go @@ -22,13 +22,19 @@ type IValue interface { // Unlock value的解锁 Unlock() - // SetValue 设置动态的值 TakeValue 调用 + // SetValue 设置动态的值 TakeValue 调用. SetValue(v interface{}) - Close() - IsClose() bool + // CloseSync 关闭同步 + CloseSync() + // IsCloseSync 判断是否要关闭同步. 非使用者调用 + IsCloseSync() bool + + // SetSync 设置是否已经同步的信息. 非使用者调用 SetSync(is bool) + + // IsSync 设置是否已经同步的信息. 非使用者调用 IsSync() bool } @@ -43,78 +49,93 @@ type Value struct { isSync bool } +// SetValue 设置动态的值 TakeValue 调用 func (value *Value) SetValue(v interface{}) { value.value = v } +// Lock value的锁 func (value *Value) Lock() { value.mutex.Lock() } +// Unlock value的解锁 func (value *Value) Unlock() { value.mutex.Unlock() } +// IsSync 设置是否已经同步的信息. 非使用者调用 func (value *Value) IsSync() bool { return value.isSync } +// SetSync 设置是否已经同步的信息. 非使用者调用 func (value *Value) SetSync(is bool) { value.isSync = is } +// GetValue 获取value. 带并发锁 func (value *Value) GetValue() interface{} { value.mutex.Lock() defer value.mutex.Unlock() return value.value } -func (value *Value) Close() { +// CloseSync 关闭同步 +func (value *Value) CloseSync() { value.mutex.Lock() defer value.mutex.Unlock() value.isClose = true } -func (value *Value) IsClose() bool { +// IsCloseSync 判断是否要关闭同步. 非使用者调用 +func (value *Value) IsCloseSync() bool { return value.isClose } -func (value *Value) TakeUpdateInterval() time.Duration { +// UpdateInterval 每次 TakeValue 休眠的时间. 可覆盖 +func (value *Value) UpdateInterval() time.Duration { return time.Second * 5 } +// StartSynchronize 开始同步的方法 func StartSynchronize(value IValue) bool { + + // 判断必要方法是否已经覆盖 if !reflect.ValueOf(value.TakeValue).CanInterface() { panic(errors.New("TakeValue method is need OverWrite")) } + // 防止忘记Close内存泄漏 runtime.SetFinalizer(value, func(obj IValue) { - obj.Close() + obj.CloseSync() }) if value.IsSync() { return false } + value.SetValue(value.TakeValue()) + go func() { for { - value.Lock() - - value.SetValue(value.TakeValue()) - isClose := value.IsClose() - - value.Unlock() - - if isClose { - log.Println(value, "is Closed") - break - } time.Sleep(value.UpdateInterval()) + + tvalue := value.TakeValue() + + value.Lock() + defer value.Unlock() + + isClose := value.IsCloseSync() + + // 减少在休眠期间再操作一次, TakeValue if isClose { log.Println(value, "is Closed") break + } else { + value.SetValue(tvalue) } } diff --git a/value_test.go b/value_test.go index 2d8e774..d09b263 100644 --- a/value_test.go +++ b/value_test.go @@ -2,16 +2,19 @@ package svalue import ( "context" + "net" + "testing" "time" "go.etcd.io/etcd/clientv3" ) -type EtcdCurl struct { +type EtcdGet struct { Value } -func (c *EtcdCurl) TakeValue() interface{} { +func (c *EtcdGet) TakeValue() interface{} { + // 换成redis原理一样 cli, err := clientv3.New(clientv3.Config{ Endpoints: []string{"http://10.10.0.1:2279"}, DialTimeout: 5 * time.Second, @@ -21,6 +24,48 @@ func (c *EtcdCurl) TakeValue() interface{} { return string(resp.Kvs[0].Value)[0:20] } -func (c *EtcdCurl) UpdateInterval() time.Duration { - return time.Second * 1 +func (c *EtcdGet) UpdateInterval() time.Duration { + return time.Second / 2 +} + +func TestWithVPN(t *testing.T) { + ec := EtcdGet{} + StartSynchronize(&ec) + + ifaces, err := net.Interfaces() + must(err) + + fitThis := false + + for _, iface := range ifaces { + if iface.Name == "tun0" { + fitThis = true + } + } + + if fitThis { + for i := 0; i < 3; i++ { + time.Sleep(time.Second) + v := ec.GetValue() + + switch vt := v.(type) { + case string: + t.Log(vt) + default: + t.Error(v) + } + + if i == 1 { + ec.CloseSync() + } + + } + } else { + t.Log("pass this test", t.Name()) + } +} + +// EtcdWatch 方法的实现, 先忽略 +type EtcdWatch struct { + Value }