golang 操作etcd租约以及监听kv变化

栏目: Go · 发布时间: 7年前

内容简介:创建租约:撤销租约会使当前租约的所关联的key-value失效在put操作时,watch会监听到操作,打印出:type:PUT kv:/job/v3/1 prevKey:<nil>

定义错误常量:

const (
    NewLeaseErr  = 101
    LeasTtlErr   = 102
    KeepAliveErr = 103
    PutErr       = 104
    GetErr       = 105
    RevokeErr    = 106
)

创建client:

var conf = clientv3.Config{
        Endpoints:   []string{"172.16.196.129:2380", "192.168.50.250:2380"},
        DialTimeout: 5 * time.Second,
}
client, err := clientv3.New(conf)
defer client.Close()
if err != nil {
    fmt.Printf("创建client失败:\n", err.Error())
    os.Exit(NewLeaseErr)
}

创建租约:

//创建租约
lease := clientv3.NewLease(client)

//设置租约时间
leaseResp, err := lease.Grant(context.TODO(), 10)
if err != nil {
    fmt.Printf("设置租约时间失败:%s\n", err.Error())
    os.Exit(LeasTtlErr)
}

设置续租:

//设置续租
leaseID := leaseResp.ID
ctx, cancelFunc := context.WithCancel(context.TODO())
leaseRespChan, err := lease.KeepAlive(ctx, leaseID)
if err != nil {
    fmt.Printf("续租失败:%s\n", err.Error())
    os.Exit(KeepAliveErr)
}

监听租约:

go func() {
        for  {
            select {
            case leaseKeepResp := <-leaseRespChan:
                if leaseKeepResp == nil {
                    fmt.Printf("已经关闭续租功能\n")
                    return
                } else {
                    fmt.Printf("续租成功\n")
                    goto END
                }
            }
            END:
                time.Sleep(500*time.Millisecond)
        }
    }()

监听某个key的变化

//ctx1, _ := context.WithTimeout(context.TODO(),20)  //设置超时ctx传入Watch里会使watch监听失败,可能是watch是个永久监听,不支持设置timeCtx(我也不太清楚)
go func() {
    wc := client.Watch(context.TODO(), "/job/v3/1", clientv3.WithPrevKV())
    for v := range wc {
        for _, e := range v.Events {
        fmt.Printf("type:%v kv:%v  prevKey:%v \n ", e.Type, string(e.Kv.Key), e.PrevKv)
    }
    }
}()

put操作:

kv := clientv3.NewKV(client)
time.Sleep(3*time.Second)
//通过租约put
putResp, err := kv.Put(context.TODO(), "/job/v3/1", "koock",clientv3.WithLease(leaseID))
if err != nil {
    fmt.Printf("put 失败:%s", err.Error())
    os.Exit(PutErr)
}
fmt.Printf("%v\n",putResp.Header)

取消续租以及撤销租约:

撤销租约会使当前租约的所关联的key-value失效

//关闭续租
cancelFunc() 
//撤销租约
_, err = lease.Revoke(context.TODO(), leaseID)
    if err != nil {
        fmt.Printf("撤销租约失败:%s\n",err.Error())
        os.Exit(RevokeErr)
    }
fmt.Printf("撤销租约成功")

完整代码段:

package main

import (
    "context"
    "fmt"
    "go.etcd.io/etcd/clientv3"
    "os"
    "time"
)

const (
    NewLeaseErr  = 101
    LeasTtlErr   = 102
    KeepAliveErr = 103
    PutErr       = 104
    GetErr       = 105
    RevokeErr    = 106
)

func main() {

    var conf = clientv3.Config{
        Endpoints:   []string{"172.16.196.129:2380", "192.168.50.250:2380"},
        DialTimeout: 5 * time.Second,
    }

    client, err := clientv3.New(conf)
    defer client.Close()
    if err != nil {
        fmt.Printf("创建client失败:\n", err.Error())
        os.Exit(NewLeaseErr)
    }

    //创建租约
    lease := clientv3.NewLease(client)

    //设置租约时间
    leaseResp, err := lease.Grant(context.TODO(), 10)
    if err != nil {
        fmt.Printf("设置租约时间失败:%s\n", err.Error())
        os.Exit(LeasTtlErr)
    }

    //设置续租
    leaseID := leaseResp.ID
    ctx, cancelFunc := context.WithCancel(context.TODO())
    leaseRespChan, err := lease.KeepAlive(ctx, leaseID)
    if err != nil {
        fmt.Printf("续租失败:%s\n", err.Error())
        os.Exit(KeepAliveErr)
    }

    //监听租约
    go func() {
        for  {
            select {
            case leaseKeepResp := <-leaseRespChan:
                if leaseKeepResp == nil {
                    fmt.Printf("已经关闭续租功能\n")
                    return
                } else {
                    fmt.Printf("续租成功\n")
                    goto END
                }
            }
            END:
                time.Sleep(500*time.Millisecond)
        }

    }()

    //监听某个key的变化
    //ctx1, _ := context.WithTimeout(context.TODO(),20)
    go func() {
        wc := client.Watch(context.TODO(), "/job/v3/1", clientv3.WithPrevKV())
            for v := range wc {
                for _, e := range v.Events {
                    fmt.Printf("type:%v kv:%v  prevKey:%v \n ", e.Type, string(e.Kv.Key), e.PrevKv)
                }
            }
    }()

    kv := clientv3.NewKV(client)
    //通过租约put
    putResp, err := kv.Put(context.TODO(), "/job/v3/1", "koock",clientv3.WithLease(leaseID))
    if err != nil {
        fmt.Printf("put 失败:%s", err.Error())
        os.Exit(PutErr)
    }
    fmt.Printf("%v\n",putResp.Header)

    cancelFunc()

    time.Sleep(2*time.Second)
    _, err = lease.Revoke(context.TODO(), leaseID)
    if err != nil {
        fmt.Printf("撤销租约失败:%s\n",err.Error())
        os.Exit(RevokeErr)
    }
    fmt.Printf("撤销租约成功")
    getResp,err := kv.Get(context.TODO(),"/job/v3/1")
    if err != nil{
        fmt.Printf("get 失败:%s",err.Error())
        os.Exit(GetErr)
    }
    fmt.Printf("%v",getResp.Kvs)
    time.Sleep(20 * time.Second)


}

在put操作时,watch会监听到操作,打印出:type:PUT kv:/job/v3/1 prevKey:<nil>

撤销租约后,watch会监听到delete操作,打印出:type:DELETE kv:/job/v3/1 prevKey:key:"/job/v3/1"


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Inside Larry's and Sergey's Brain

Inside Larry's and Sergey's Brain

Richard Brandt / Portfolio / 17 Sep 2009 / USD 24.95

You’ve used their products. You’ve heard about their skyrocketing wealth and “don’t be evil” business motto. But how much do you really know about Google’s founders, Larry Page and Sergey Brin? Inside......一起来看看 《Inside Larry's and Sergey's Brain》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试