[关闭]
@kklinan 2018-11-15T11:37:59.000000Z 字数 7135 阅读 1079

Etcd 基础使用

Etcd Golang


简介

etcd 是一个分布式键值存储,旨在可靠,快速地保存和提供对关键数据的访问。它通过分布式锁定,leader 选举和写入障碍实现可靠的分布式协调。etcd集群旨在实现高可用性和永久数据存储和检索。

GitHub: https://github.com/etcd-io/etcd

etcd 与 Zokeeper类似,使用场景包括:配置管理、服务注册发现、选主、应用调度、分布式锁、事务。

具体使用见下文 Go 代码,包括 put、get、delete、watcher、租约、事务、分布式锁。

快速开始请跳至 开始搭建,全面了解请阅读官方文档。

文档

下载文档: https://github.com/etcd-io/etcd/blob/master/Documentation/dl_build.md

配置参数: https://github.com/etcd-io/etcd/blob/master/Documentation/op-guide/configuration.md

集群文档: https://github.com/etcd-io/etcd/blob/master/Documentation/dev-guide/local_cluster.md

重新配置集群成员: https://github.com/etcd-io/etcd/blob/master/Documentation/op-guide/runtime-configuration.md

基本操作文档: https://github.com/etcd-io/etcd/blob/master/Documentation/dev-guide/interacting_v3.md

角色控制: https://github.com/etcd-io/etcd/blob/master/Documentation/op-guide/authentication.md

证书配置: https://github.com/etcd-io/etcd/blob/master/Documentation/op-guide/security.md

Go client: https://godoc.org/github.com/coreos/etcd/clientv3

Web ui: https://github.com/soyking/e3w

Etcd 中文文档: http://etcd.doczh.cn/documentation/

最简单的安装方式:yum,根据需要自定义 systemd service。

  1. cat /usr/lib/systemd/system/etcd.service
  2. [Unit]
  3. Description=Etcd Server
  4. After=network.target
  5. After=network-online.target
  6. Wants=network-online.target
  7. [Service]
  8. Type=notify
  9. WorkingDirectory=/var/lib/etcd/
  10. EnvironmentFile=-/etc/etcd/etcd.conf
  11. User=etcd
  12. # set GOMAXPROCS to number of processors
  13. ExecStart=/bin/bash -c "GOMAXPROCS=$(nproc) /usr/bin/etcd --name=\"${ETCD_NAME}\" --data-dir=\"${ETCD_DATA_DIR}\" --listen-client-urls=\"${ETCD_LISTEN_CLIENT_URLS}\""
  14. Restart=on-failure
  15. LimitNOFILE=65536
  16. [Install]
  17. WantedBy=multi-user.target

开始搭建

公共 etcd 服务发现模式搭建3节点集群

1、创建专用服务发现URL

  1. curl https://discovery.etcd.io/new?size=3

以上命令返会一个 URL
类似:https://discovery.etcd.io/54acde2d3f35c56813b0d8cb6404c29a

2、创建节点

  1. etcd --name infra1 \
  2. --initial-advertise-peer-urls http://0.0.0.0:2380 \
  3. --listen-peer-urls http://0.0.0.0:2380 \
  4. --listen-client-urls http://0.0.0.0:2379 \
  5. --advertise-client-urls http://0.0.0.0:2379 \
  6. --discovery https://discovery.etcd.io/54acde2d3f35c56813b0d8cb6404c29a
  7. etcd --name infra2 \
  8. --initial-advertise-peer-urls http://0.0.0.0:22380 \
  9. --listen-peer-urls http://0.0.0.0:22380 \
  10. --listen-client-urls http://0.0.0.0:22379 \
  11. --advertise-client-urls http://0.0.0.0:22379 \
  12. --discovery https://discovery.etcd.io/54acde2d3f35c56813b0d8cb6404c29a
  13. etcd --name infra3 \
  14. --initial-advertise-peer-urls http://0.0.0.0:32380 \
  15. --listen-peer-urls http://0.0.0.0:32380 \
  16. --listen-client-urls http://0.0.0.0:32379 \
  17. --advertise-client-urls http://0.0.0.0:32379 \
  18. --discovery https://discovery.etcd.io/54acde2d3f35c56813b0d8cb6404c29a

3、搭建 e3w

3.1 按照文档 build 项目

3.2 修改配置文件

3.3 运行项目

Golang 操作 etcd

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "log"
  6. "strconv"
  7. "time"
  8. "go.etcd.io/etcd/clientv3"
  9. )
  10. func main() {
  11. var (
  12. key, opkey, lockKey string
  13. value, opvalue string
  14. err error
  15. putResp *clientv3.PutResponse
  16. getResp *clientv3.GetResponse
  17. delResp *clientv3.DeleteResponse
  18. txnResp *clientv3.TxnResponse
  19. op clientv3.Op
  20. opResp clientv3.OpResponse
  21. lease clientv3.Lease
  22. leaseGrantResp *clientv3.LeaseGrantResponse
  23. watchChan clientv3.WatchChan
  24. )
  25. clt, err := clientv3.New(clientv3.Config{
  26. Endpoints: []string{"http://0.0.0.0:2379", "http://0.0.0.0:22379", "http://0.0.0.0:32379"},
  27. DialTimeout: 5 * time.Second,
  28. })
  29. if err != nil {
  30. log.Fatal(err)
  31. }
  32. defer clt.Close()
  33. key = "/root/config"
  34. value = `{"name":"测试","port":8080}`
  35. // value = "etcdv3_dir_$2H#%gRe3*t" // 此值在 e3w(变量名称:DEFAULT_DIR_VALUE) 中被当做目录渲染
  36. // 设置 key
  37. if putResp, err = clt.Put(context.TODO(), key, value); err != nil {
  38. fmt.Println(err)
  39. }
  40. fmt.Printf("[put response] %v \n", putResp)
  41. // [put response] &{cluster_id:4280164070870598743 member_id:7885079459987601789 revision:41 raft_term:2 <nil>}
  42. // 获取 key 信息
  43. getResp, _ = clt.Get(context.TODO(), key)
  44. fmt.Printf("[get response] key: %v; resp: %v \n", key, getResp.Kvs[0])
  45. // [get response] key: config; resp: key:"config" create_revision:40 mod_revision:41 version:2 value:"{\"name\":\"\346\265\213\350\257\225\",\"port\":8080}"
  46. fmt.Printf("[get response] key: %v; value: %v \n", key, string(getResp.Kvs[0].Value))
  47. // [get response] key: config; value: {"name":"测试","port":8080}
  48. // 删除 key
  49. delResp, _ = clt.Delete(context.TODO(), key)
  50. fmt.Printf("[del response] key: %v; resp: %v \n", key, delResp)
  51. // [del response] key: config; resp: &{cluster_id:4280164070870598743 member_id:16310151726053752060 revision:42 raft_term:2 1 []}
  52. kv := clientv3.NewKV(clt)
  53. // 事务操作
  54. txnResp, err = kv.Txn(context.TODO()).
  55. If(clientv3.Compare(clientv3.Value(key), "=", value)).
  56. Then(clientv3.OpPut(key, "progressing")).
  57. Else(clientv3.OpPut(key, value)).
  58. Commit()
  59. if err != nil {
  60. fmt.Println(err)
  61. }
  62. fmt.Printf("[txn response] isSucceeded:%v, resp:%v \n", txnResp.Succeeded, txnResp)
  63. // [txn response] isSucceeded:true, resp:&{cluster_id:4280164070870598743 member_id:15482725622599376132 revision:118 raft_term:2 true [response_put:<header:<revision:118 > > ]}
  64. // op 系列
  65. opkey = "/root/op-key"
  66. opvalue = "op-value"
  67. op = clientv3.OpPut(opkey, opvalue, clientv3.WithKeysOnly())
  68. if opResp, err = clt.Do(context.TODO(), op); err != nil {
  69. fmt.Println(err)
  70. }
  71. fmt.Printf("[op put response] resp: %v \n", opResp.Put())
  72. // [op response] resp: &{cluster_id:4280164070870598743 member_id:7885079459987601789 revision:172 raft_term:2 <nil>}
  73. op = clientv3.OpGet(opkey)
  74. if opResp, err = clt.Do(context.TODO(), op); err != nil {
  75. fmt.Println(err)
  76. }
  77. fmt.Printf("[op get response] resp: %v \n", opResp.Get().Kvs[0])
  78. // [op get response] resp: key:"root/op-key" create_revision:136 mod_revision:180 version:12 value:"op-value"
  79. // 租约模式
  80. lease = clientv3.NewLease(clt)
  81. // 创建 2s 租约
  82. if leaseGrantResp, err = lease.Grant(context.TODO(), 2); err != nil {
  83. fmt.Println(err)
  84. }
  85. fmt.Printf("[lease response] lease id:%v \n", leaseGrantResp.ID)
  86. fmt.Printf("lease TTL: %v \n", leaseGrantResp.TTL)
  87. // key 绑定租约
  88. if putResp, err = kv.Put(context.TODO(), key, value, clientv3.WithLease(leaseGrantResp.ID)); err != nil {
  89. fmt.Println(err)
  90. }
  91. fmt.Printf("[lease put response] response:%v \n", putResp.Header.Revision)
  92. // 自动持续续租
  93. // lease.KeepAlive(context.TODO(), leaseGrantResp.ID)
  94. // 自动续租 2s
  95. ctx, cancelFunc := context.WithTimeout(context.TODO(), 2*time.Second)
  96. defer cancelFunc()
  97. defer lease.Revoke(ctx, leaseGrantResp.ID)
  98. lease.KeepAlive(ctx, leaseGrantResp.ID)
  99. time.AfterFunc(4*time.Second, func() {
  100. getResp, _ = clt.Get(context.TODO(), key)
  101. if getResp.Count == 0 {
  102. fmt.Println("key has expired")
  103. }
  104. })
  105. time.Sleep(5 * time.Second)
  106. // 分布式锁
  107. lockKey = "/root/lock"
  108. lease = clientv3.NewLease(clt)
  109. // 创建 15s 租约
  110. if leaseGrantResp, err = lease.Grant(context.TODO(), 15); err != nil {
  111. fmt.Println(err)
  112. }
  113. defer lease.Revoke(ctx, leaseGrantResp.ID)
  114. lease.KeepAlive(context.TODO(), leaseGrantResp.ID)
  115. txnResp, err = kv.Txn(context.TODO()).
  116. If(clientv3.Compare(clientv3.CreateRevision(lockKey), "=", 0)).
  117. Then(clientv3.OpPut(lockKey, "lockvalue", clientv3.WithLease(leaseGrantResp.ID))).
  118. Else(clientv3.OpGet(lockKey)).
  119. Commit()
  120. if err != nil {
  121. fmt.Printf("lock failed: %v", err)
  122. }
  123. if !txnResp.Succeeded {
  124. fmt.Printf("lock failed, get revision:%v \n", txnResp.OpResponse().Txn().Header.Revision)
  125. } else {
  126. fmt.Printf("lock succeed, put revision:%v \n", txnResp.OpResponse().Txn().Header.Revision)
  127. }
  128. // return
  129. // watcher
  130. go func() {
  131. for i := 1; i < 3; i++ {
  132. op = clientv3.OpPut(opkey, strconv.Itoa(i), clientv3.WithKeysOnly())
  133. if opResp, err = clt.Do(context.TODO(), op); err != nil {
  134. fmt.Println(err)
  135. }
  136. fmt.Printf("[op put response] resp: %v \n", opResp.Put())
  137. }
  138. }()
  139. // 监听此前缀的 key
  140. // watchChan = clt.Watch(context.TODO(), "root", clientv3.WithPrefix())
  141. // 监听某个 key
  142. watchChan = clt.Watch(context.TODO(), opkey)
  143. for wchanResp := range watchChan {
  144. fmt.Printf("[watch response] resp: %v \n", wchanResp.Events[0].Kv)
  145. // [watch response] resp: key:"root/op-key" create_revision:136 mod_revision:1539 version:1347 value:"9"
  146. }
  147. }
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注