@kklinan
2018-11-15T03:37:59.000000Z
字数 7135
阅读 1217
Etcd Golang
etcd 是一个分布式键值存储,旨在可靠,快速地保存和提供对关键数据的访问。它通过分布式锁定,leader 选举和写入障碍实现可靠的分布式协调。etcd集群旨在实现高可用性和永久数据存储和检索。
GitHub: https://github.com/etcd-io/etcd
etcd 与 Zokeeper类似,使用场景包括:配置管理、服务注册发现、选主、应用调度、分布式锁、事务。
具体使用见下文 Go 代码,包括 put、get、delete、watcher、租约、事务、分布式锁。
快速开始请跳至 开始搭建,全面了解请阅读官方文档。
下载文档: https://github.com/etcd-io/etcd/blob/master/Documentation/dl_build.md
配置参数: https://github.com/etcd-io/etcd/blob/master/Documentation/op-guide/configuration.md
集群文档: https://github.com/etcd-io/etcd/blob/master/Documentation/dev-guide/local_cluster.md
重新配置集群成员: https://github.com/etcd-io/etcd/blob/master/Documentation/op-guide/runtime-configuration.md
基本操作文档: https://github.com/etcd-io/etcd/blob/master/Documentation/dev-guide/interacting_v3.md
角色控制: https://github.com/etcd-io/etcd/blob/master/Documentation/op-guide/authentication.md
证书配置: https://github.com/etcd-io/etcd/blob/master/Documentation/op-guide/security.md
Go client: https://godoc.org/github.com/coreos/etcd/clientv3
Web ui: https://github.com/soyking/e3w
Etcd 中文文档: http://etcd.doczh.cn/documentation/
最简单的安装方式:yum,根据需要自定义 systemd service。
cat /usr/lib/systemd/system/etcd.service[Unit]Description=Etcd ServerAfter=network.targetAfter=network-online.targetWants=network-online.target[Service]Type=notifyWorkingDirectory=/var/lib/etcd/EnvironmentFile=-/etc/etcd/etcd.confUser=etcd# set GOMAXPROCS to number of processorsExecStart=/bin/bash -c "GOMAXPROCS=$(nproc) /usr/bin/etcd --name=\"${ETCD_NAME}\" --data-dir=\"${ETCD_DATA_DIR}\" --listen-client-urls=\"${ETCD_LISTEN_CLIENT_URLS}\""Restart=on-failureLimitNOFILE=65536[Install]WantedBy=multi-user.target
1、创建专用服务发现URL
curl https://discovery.etcd.io/new?size=3
以上命令返会一个 URL
类似:https://discovery.etcd.io/54acde2d3f35c56813b0d8cb6404c29a
2、创建节点
etcd --name infra1 \--initial-advertise-peer-urls http://0.0.0.0:2380 \--listen-peer-urls http://0.0.0.0:2380 \--listen-client-urls http://0.0.0.0:2379 \--advertise-client-urls http://0.0.0.0:2379 \--discovery https://discovery.etcd.io/54acde2d3f35c56813b0d8cb6404c29aetcd --name infra2 \--initial-advertise-peer-urls http://0.0.0.0:22380 \--listen-peer-urls http://0.0.0.0:22380 \--listen-client-urls http://0.0.0.0:22379 \--advertise-client-urls http://0.0.0.0:22379 \--discovery https://discovery.etcd.io/54acde2d3f35c56813b0d8cb6404c29aetcd --name infra3 \--initial-advertise-peer-urls http://0.0.0.0:32380 \--listen-peer-urls http://0.0.0.0:32380 \--listen-client-urls http://0.0.0.0:32379 \--advertise-client-urls http://0.0.0.0:32379 \--discovery https://discovery.etcd.io/54acde2d3f35c56813b0d8cb6404c29a
3、搭建 e3w
3.1 按照文档 build 项目
3.2 修改配置文件
3.3 运行项目
package mainimport ("context""fmt""log""strconv""time""go.etcd.io/etcd/clientv3")func main() {var (key, opkey, lockKey stringvalue, opvalue stringerr errorputResp *clientv3.PutResponsegetResp *clientv3.GetResponsedelResp *clientv3.DeleteResponsetxnResp *clientv3.TxnResponseop clientv3.OpopResp clientv3.OpResponselease clientv3.LeaseleaseGrantResp *clientv3.LeaseGrantResponsewatchChan clientv3.WatchChan)clt, err := clientv3.New(clientv3.Config{Endpoints: []string{"http://0.0.0.0:2379", "http://0.0.0.0:22379", "http://0.0.0.0:32379"},DialTimeout: 5 * time.Second,})if err != nil {log.Fatal(err)}defer clt.Close()key = "/root/config"value = `{"name":"测试","port":8080}`// value = "etcdv3_dir_$2H#%gRe3*t" // 此值在 e3w(变量名称:DEFAULT_DIR_VALUE) 中被当做目录渲染// 设置 keyif putResp, err = clt.Put(context.TODO(), key, value); err != nil {fmt.Println(err)}fmt.Printf("[put response] %v \n", putResp)// [put response] &{cluster_id:4280164070870598743 member_id:7885079459987601789 revision:41 raft_term:2 <nil>}// 获取 key 信息getResp, _ = clt.Get(context.TODO(), key)fmt.Printf("[get response] key: %v; resp: %v \n", key, getResp.Kvs[0])// [get response] key: config; resp: key:"config" create_revision:40 mod_revision:41 version:2 value:"{\"name\":\"\346\265\213\350\257\225\",\"port\":8080}"fmt.Printf("[get response] key: %v; value: %v \n", key, string(getResp.Kvs[0].Value))// [get response] key: config; value: {"name":"测试","port":8080}// 删除 keydelResp, _ = clt.Delete(context.TODO(), key)fmt.Printf("[del response] key: %v; resp: %v \n", key, delResp)// [del response] key: config; resp: &{cluster_id:4280164070870598743 member_id:16310151726053752060 revision:42 raft_term:2 1 []}kv := clientv3.NewKV(clt)// 事务操作txnResp, err = kv.Txn(context.TODO()).If(clientv3.Compare(clientv3.Value(key), "=", value)).Then(clientv3.OpPut(key, "progressing")).Else(clientv3.OpPut(key, value)).Commit()if err != nil {fmt.Println(err)}fmt.Printf("[txn response] isSucceeded:%v, resp:%v \n", txnResp.Succeeded, txnResp)// [txn response] isSucceeded:true, resp:&{cluster_id:4280164070870598743 member_id:15482725622599376132 revision:118 raft_term:2 true [response_put:<header:<revision:118 > > ]}// op 系列opkey = "/root/op-key"opvalue = "op-value"op = clientv3.OpPut(opkey, opvalue, clientv3.WithKeysOnly())if opResp, err = clt.Do(context.TODO(), op); err != nil {fmt.Println(err)}fmt.Printf("[op put response] resp: %v \n", opResp.Put())// [op response] resp: &{cluster_id:4280164070870598743 member_id:7885079459987601789 revision:172 raft_term:2 <nil>}op = clientv3.OpGet(opkey)if opResp, err = clt.Do(context.TODO(), op); err != nil {fmt.Println(err)}fmt.Printf("[op get response] resp: %v \n", opResp.Get().Kvs[0])// [op get response] resp: key:"root/op-key" create_revision:136 mod_revision:180 version:12 value:"op-value"// 租约模式lease = clientv3.NewLease(clt)// 创建 2s 租约if leaseGrantResp, err = lease.Grant(context.TODO(), 2); err != nil {fmt.Println(err)}fmt.Printf("[lease response] lease id:%v \n", leaseGrantResp.ID)fmt.Printf("lease TTL: %v \n", leaseGrantResp.TTL)// key 绑定租约if putResp, err = kv.Put(context.TODO(), key, value, clientv3.WithLease(leaseGrantResp.ID)); err != nil {fmt.Println(err)}fmt.Printf("[lease put response] response:%v \n", putResp.Header.Revision)// 自动持续续租// lease.KeepAlive(context.TODO(), leaseGrantResp.ID)// 自动续租 2sctx, cancelFunc := context.WithTimeout(context.TODO(), 2*time.Second)defer cancelFunc()defer lease.Revoke(ctx, leaseGrantResp.ID)lease.KeepAlive(ctx, leaseGrantResp.ID)time.AfterFunc(4*time.Second, func() {getResp, _ = clt.Get(context.TODO(), key)if getResp.Count == 0 {fmt.Println("key has expired")}})time.Sleep(5 * time.Second)// 分布式锁lockKey = "/root/lock"lease = clientv3.NewLease(clt)// 创建 15s 租约if leaseGrantResp, err = lease.Grant(context.TODO(), 15); err != nil {fmt.Println(err)}defer lease.Revoke(ctx, leaseGrantResp.ID)lease.KeepAlive(context.TODO(), leaseGrantResp.ID)txnResp, err = kv.Txn(context.TODO()).If(clientv3.Compare(clientv3.CreateRevision(lockKey), "=", 0)).Then(clientv3.OpPut(lockKey, "lockvalue", clientv3.WithLease(leaseGrantResp.ID))).Else(clientv3.OpGet(lockKey)).Commit()if err != nil {fmt.Printf("lock failed: %v", err)}if !txnResp.Succeeded {fmt.Printf("lock failed, get revision:%v \n", txnResp.OpResponse().Txn().Header.Revision)} else {fmt.Printf("lock succeed, put revision:%v \n", txnResp.OpResponse().Txn().Header.Revision)}// return// watchergo func() {for i := 1; i < 3; i++ {op = clientv3.OpPut(opkey, strconv.Itoa(i), clientv3.WithKeysOnly())if opResp, err = clt.Do(context.TODO(), op); err != nil {fmt.Println(err)}fmt.Printf("[op put response] resp: %v \n", opResp.Put())}}()// 监听此前缀的 key// watchChan = clt.Watch(context.TODO(), "root", clientv3.WithPrefix())// 监听某个 keywatchChan = clt.Watch(context.TODO(), opkey)for wchanResp := range watchChan {fmt.Printf("[watch response] resp: %v \n", wchanResp.Events[0].Kv)// [watch response] resp: key:"root/op-key" create_revision:136 mod_revision:1539 version:1347 value:"9"}}