go任务调度9(op实现分布式乐观锁)

栏目: Go · 发布时间: 7年前

package main

import (
    "go.etcd.io/etcd/clientv3"
    "time"
    "fmt"
    "context"
)

func main() {
    var (
        config clientv3.Config
        client *clientv3.Client
        err error
        lease clientv3.Lease
        leaseGrantResp *clientv3.LeaseGrantResponse
        leaseId clientv3.LeaseID
        keepRespChan <-chan *clientv3.LeaseKeepAliveResponse
        keepResp *clientv3.LeaseKeepAliveResponse
        ctx context.Context
        cancelFunc context.CancelFunc
        kv clientv3.KV
        txn clientv3.Txn
        txnResp *clientv3.TxnResponse
    )

    // 客户端配置
    config = clientv3.Config{
        Endpoints: []string{"0.0.0.0:2379"},
        DialTimeout: 5 * time.Second,
    }

    // 建立连接
    if client, err = clientv3.New(config); err != nil {
        fmt.Println(err)
        return
    }

    // lease实现锁自动过期(上锁之后,如果节点宕机,锁会一直占用,所以要过期机制,也要续租机制):
    // op操作
    // txn事务: if else then

    // 1, 上锁 (创建租约, 自动续租, 拿着租约去抢占一个key)
    lease = clientv3.NewLease(client)

    // 申请一个5秒的租约
    if leaseGrantResp, err = lease.Grant(context.TODO(), 5); err != nil {
        fmt.Println(err)
        return
    }

    // 拿到租约的ID
    leaseId = leaseGrantResp.ID

    // 准备一个用于取消自动续租的context
    ctx, cancelFunc = context.WithCancel(context.TODO())

    // 确保函数退出后, 自动续租会停止
    defer cancelFunc() //终止自动续租协程(goroutine)
    defer lease.Revoke(context.TODO(), leaseId) //告诉etcd把租约直接释放掉,更直接,立即删除,锁就释放了

    // 5秒后会取消自动续租
    if keepRespChan, err = lease.KeepAlive(ctx, leaseId); err != nil {
        fmt.Println(err)
        return
    }

    // 处理续约应答的协程
    go func() {
        for {
            select {
            case keepResp = <- keepRespChan:
                if keepRespChan == nil {
                    fmt.Println("租约已经失效了")
                    goto END
                } else {    // 每秒会续租一次, 所以就会受到一次应答
                    fmt.Println("收到自动续租应答:", keepResp.ID)
                }
            }
        }
    END:
    }()

    //  if 不存在key, then 设置它, else 抢锁失败
    kv = clientv3.NewKV(client)

    // 创建事务
    txn = kv.Txn(context.TODO())

    // 定义事务

    // 如果key不存在(创建版本是0说明没有被创建)
    txn.If(clientv3.Compare(clientv3.CreateRevision("/cron/lock/job9"), "=", 0)).
        Then(clientv3.OpPut("/cron/lock/job9", "xxx", clientv3.WithLease(leaseId))).
        Else(clientv3.OpGet("/cron/lock/job9")) // 否则抢锁失败

    // 提交事务
    if txnResp, err = txn.Commit(); err != nil {
        fmt.Println(err)
        return // 没有问题
    }

    // 判断是否抢到了锁
    if !txnResp.Succeeded {
        fmt.Println("锁被占用:", string(txnResp.Responses[0].GetResponseRange().Kvs[0].Value))
        return
    }

    // 2, 处理业务

    fmt.Println("处理任务")
    time.Sleep(5 * time.Second)

    // 3, 释放锁(取消自动续租, 释放租约)
    // 上面的defer 会把租约释放掉, 关联的KV就被删除了
}

go任务调度9(op实现分布式乐观锁) (右边的先执行,左边的后执行,左边会提示锁已被占用)


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Practical Vim, Second Edition

Practical Vim, Second Edition

Drew Neil / The Pragmatic Bookshelf / 2015-10-31 / USD 29.00

Vim is a fast and efficient text editor that will make you a faster and more efficient developer. It’s available on almost every OS, and if you master the techniques in this book, you’ll never need an......一起来看看 《Practical Vim, Second Edition》 这本书的介绍吧!

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具