This commit is contained in:
huangsimin 2019-01-24 14:45:59 +08:00
parent d9e2c1fcdb
commit 246235c1f9
2 changed files with 88 additions and 22 deletions

View File

@ -22,13 +22,19 @@ type IValue interface {
// Unlock value的解锁 // Unlock value的解锁
Unlock() Unlock()
// SetValue 设置动态的值 TakeValue 调用 // SetValue 设置动态的值 TakeValue 调用.
SetValue(v interface{}) SetValue(v interface{})
Close() // CloseSync 关闭同步
IsClose() bool CloseSync()
// IsCloseSync 判断是否要关闭同步. 非使用者调用
IsCloseSync() bool
// SetSync 设置是否已经同步的信息. 非使用者调用
SetSync(is bool) SetSync(is bool)
// IsSync 设置是否已经同步的信息. 非使用者调用
IsSync() bool IsSync() bool
} }
@ -43,78 +49,93 @@ type Value struct {
isSync bool isSync bool
} }
// SetValue 设置动态的值 TakeValue 调用
func (value *Value) SetValue(v interface{}) { func (value *Value) SetValue(v interface{}) {
value.value = v value.value = v
} }
// Lock value的锁
func (value *Value) Lock() { func (value *Value) Lock() {
value.mutex.Lock() value.mutex.Lock()
} }
// Unlock value的解锁
func (value *Value) Unlock() { func (value *Value) Unlock() {
value.mutex.Unlock() value.mutex.Unlock()
} }
// IsSync 设置是否已经同步的信息. 非使用者调用
func (value *Value) IsSync() bool { func (value *Value) IsSync() bool {
return value.isSync return value.isSync
} }
// SetSync 设置是否已经同步的信息. 非使用者调用
func (value *Value) SetSync(is bool) { func (value *Value) SetSync(is bool) {
value.isSync = is value.isSync = is
} }
// GetValue 获取value. 带并发锁
func (value *Value) GetValue() interface{} { func (value *Value) GetValue() interface{} {
value.mutex.Lock() value.mutex.Lock()
defer value.mutex.Unlock() defer value.mutex.Unlock()
return value.value return value.value
} }
func (value *Value) Close() { // CloseSync 关闭同步
func (value *Value) CloseSync() {
value.mutex.Lock() value.mutex.Lock()
defer value.mutex.Unlock() defer value.mutex.Unlock()
value.isClose = true value.isClose = true
} }
func (value *Value) IsClose() bool { // IsCloseSync 判断是否要关闭同步. 非使用者调用
func (value *Value) IsCloseSync() bool {
return value.isClose return value.isClose
} }
func (value *Value) TakeUpdateInterval() time.Duration { // UpdateInterval 每次 TakeValue 休眠的时间. 可覆盖
func (value *Value) UpdateInterval() time.Duration {
return time.Second * 5 return time.Second * 5
} }
// StartSynchronize 开始同步的方法
func StartSynchronize(value IValue) bool { func StartSynchronize(value IValue) bool {
// 判断必要方法是否已经覆盖
if !reflect.ValueOf(value.TakeValue).CanInterface() { if !reflect.ValueOf(value.TakeValue).CanInterface() {
panic(errors.New("TakeValue method is need OverWrite")) panic(errors.New("TakeValue method is need OverWrite"))
} }
// 防止忘记Close内存泄漏
runtime.SetFinalizer(value, func(obj IValue) { runtime.SetFinalizer(value, func(obj IValue) {
obj.Close() obj.CloseSync()
}) })
if value.IsSync() { if value.IsSync() {
return false return false
} }
value.SetValue(value.TakeValue())
go func() { go func() {
for { for {
value.Lock()
value.SetValue(value.TakeValue())
isClose := value.IsClose()
value.Unlock()
if isClose {
log.Println(value, "is Closed")
break
}
time.Sleep(value.UpdateInterval()) time.Sleep(value.UpdateInterval())
tvalue := value.TakeValue()
value.Lock()
defer value.Unlock()
isClose := value.IsCloseSync()
// 减少在休眠期间再操作一次, TakeValue
if isClose { if isClose {
log.Println(value, "is Closed") log.Println(value, "is Closed")
break break
} else {
value.SetValue(tvalue)
} }
} }

View File

@ -2,16 +2,19 @@ package svalue
import ( import (
"context" "context"
"net"
"testing"
"time" "time"
"go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/clientv3"
) )
type EtcdCurl struct { type EtcdGet struct {
Value Value
} }
func (c *EtcdCurl) TakeValue() interface{} { func (c *EtcdGet) TakeValue() interface{} {
// 换成redis原理一样
cli, err := clientv3.New(clientv3.Config{ cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"http://10.10.0.1:2279"}, Endpoints: []string{"http://10.10.0.1:2279"},
DialTimeout: 5 * time.Second, DialTimeout: 5 * time.Second,
@ -21,6 +24,48 @@ func (c *EtcdCurl) TakeValue() interface{} {
return string(resp.Kvs[0].Value)[0:20] return string(resp.Kvs[0].Value)[0:20]
} }
func (c *EtcdCurl) UpdateInterval() time.Duration { func (c *EtcdGet) UpdateInterval() time.Duration {
return time.Second * 1 return time.Second / 2
}
func TestWithVPN(t *testing.T) {
ec := EtcdGet{}
StartSynchronize(&ec)
ifaces, err := net.Interfaces()
must(err)
fitThis := false
for _, iface := range ifaces {
if iface.Name == "tun0" {
fitThis = true
}
}
if fitThis {
for i := 0; i < 3; i++ {
time.Sleep(time.Second)
v := ec.GetValue()
switch vt := v.(type) {
case string:
t.Log(vt)
default:
t.Error(v)
}
if i == 1 {
ec.CloseSync()
}
}
} else {
t.Log("pass this test", t.Name())
}
}
// EtcdWatch 方法的实现, 先忽略
type EtcdWatch struct {
Value
} }