@kklinan
2018-11-15T11:37:59.000000Z
字数 7135
阅读 1079
Etcd
Golang
etcd 是一个分布式键值存储,旨在可靠,快速地保存和提供对关键数据的访问。它通过分布式锁定,leader 选举和写入障碍实现可靠的分布式协调。etcd集群旨在实现高可用性和永久数据存储和检索。
GitHub: https://github.com/etcd-io/etcd
etcd 与 Zokeeper类似,使用场景包括:配置管理、服务注册发现、选主、应用调度、分布式锁、事务。
具体使用见下文 Go 代码,包括 put、get、delete、watcher、租约、事务、分布式锁。
快速开始请跳至 开始搭建,全面了解请阅读官方文档。
下载文档: https://github.com/etcd-io/etcd/blob/master/Documentation/dl_build.md
配置参数: https://github.com/etcd-io/etcd/blob/master/Documentation/op-guide/configuration.md
集群文档: https://github.com/etcd-io/etcd/blob/master/Documentation/dev-guide/local_cluster.md
重新配置集群成员: https://github.com/etcd-io/etcd/blob/master/Documentation/op-guide/runtime-configuration.md
基本操作文档: https://github.com/etcd-io/etcd/blob/master/Documentation/dev-guide/interacting_v3.md
角色控制: https://github.com/etcd-io/etcd/blob/master/Documentation/op-guide/authentication.md
证书配置: https://github.com/etcd-io/etcd/blob/master/Documentation/op-guide/security.md
Go client: https://godoc.org/github.com/coreos/etcd/clientv3
Web ui: https://github.com/soyking/e3w
Etcd 中文文档: http://etcd.doczh.cn/documentation/
最简单的安装方式:yum,根据需要自定义 systemd service。
cat /usr/lib/systemd/system/etcd.service
[Unit]
Description=Etcd Server
After=network.target
After=network-online.target
Wants=network-online.target
[Service]
Type=notify
WorkingDirectory=/var/lib/etcd/
EnvironmentFile=-/etc/etcd/etcd.conf
User=etcd
# set GOMAXPROCS to number of processors
ExecStart=/bin/bash -c "GOMAXPROCS=$(nproc) /usr/bin/etcd --name=\"${ETCD_NAME}\" --data-dir=\"${ETCD_DATA_DIR}\" --listen-client-urls=\"${ETCD_LISTEN_CLIENT_URLS}\""
Restart=on-failure
LimitNOFILE=65536
[Install]
WantedBy=multi-user.target
1、创建专用服务发现URL
curl https://discovery.etcd.io/new?size=3
以上命令返会一个 URL
类似:https://discovery.etcd.io/54acde2d3f35c56813b0d8cb6404c29a
2、创建节点
etcd --name infra1 \
--initial-advertise-peer-urls http://0.0.0.0:2380 \
--listen-peer-urls http://0.0.0.0:2380 \
--listen-client-urls http://0.0.0.0:2379 \
--advertise-client-urls http://0.0.0.0:2379 \
--discovery https://discovery.etcd.io/54acde2d3f35c56813b0d8cb6404c29a
etcd --name infra2 \
--initial-advertise-peer-urls http://0.0.0.0:22380 \
--listen-peer-urls http://0.0.0.0:22380 \
--listen-client-urls http://0.0.0.0:22379 \
--advertise-client-urls http://0.0.0.0:22379 \
--discovery https://discovery.etcd.io/54acde2d3f35c56813b0d8cb6404c29a
etcd --name infra3 \
--initial-advertise-peer-urls http://0.0.0.0:32380 \
--listen-peer-urls http://0.0.0.0:32380 \
--listen-client-urls http://0.0.0.0:32379 \
--advertise-client-urls http://0.0.0.0:32379 \
--discovery https://discovery.etcd.io/54acde2d3f35c56813b0d8cb6404c29a
3、搭建 e3w
3.1 按照文档 build 项目
3.2 修改配置文件
3.3 运行项目
package main
import (
"context"
"fmt"
"log"
"strconv"
"time"
"go.etcd.io/etcd/clientv3"
)
func main() {
var (
key, opkey, lockKey string
value, opvalue string
err error
putResp *clientv3.PutResponse
getResp *clientv3.GetResponse
delResp *clientv3.DeleteResponse
txnResp *clientv3.TxnResponse
op clientv3.Op
opResp clientv3.OpResponse
lease clientv3.Lease
leaseGrantResp *clientv3.LeaseGrantResponse
watchChan clientv3.WatchChan
)
clt, err := clientv3.New(clientv3.Config{
Endpoints: []string{"http://0.0.0.0:2379", "http://0.0.0.0:22379", "http://0.0.0.0:32379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatal(err)
}
defer clt.Close()
key = "/root/config"
value = `{"name":"测试","port":8080}`
// value = "etcdv3_dir_$2H#%gRe3*t" // 此值在 e3w(变量名称:DEFAULT_DIR_VALUE) 中被当做目录渲染
// 设置 key
if putResp, err = clt.Put(context.TODO(), key, value); err != nil {
fmt.Println(err)
}
fmt.Printf("[put response] %v \n", putResp)
// [put response] &{cluster_id:4280164070870598743 member_id:7885079459987601789 revision:41 raft_term:2 <nil>}
// 获取 key 信息
getResp, _ = clt.Get(context.TODO(), key)
fmt.Printf("[get response] key: %v; resp: %v \n", key, getResp.Kvs[0])
// [get response] key: config; resp: key:"config" create_revision:40 mod_revision:41 version:2 value:"{\"name\":\"\346\265\213\350\257\225\",\"port\":8080}"
fmt.Printf("[get response] key: %v; value: %v \n", key, string(getResp.Kvs[0].Value))
// [get response] key: config; value: {"name":"测试","port":8080}
// 删除 key
delResp, _ = clt.Delete(context.TODO(), key)
fmt.Printf("[del response] key: %v; resp: %v \n", key, delResp)
// [del response] key: config; resp: &{cluster_id:4280164070870598743 member_id:16310151726053752060 revision:42 raft_term:2 1 []}
kv := clientv3.NewKV(clt)
// 事务操作
txnResp, err = kv.Txn(context.TODO()).
If(clientv3.Compare(clientv3.Value(key), "=", value)).
Then(clientv3.OpPut(key, "progressing")).
Else(clientv3.OpPut(key, value)).
Commit()
if err != nil {
fmt.Println(err)
}
fmt.Printf("[txn response] isSucceeded:%v, resp:%v \n", txnResp.Succeeded, txnResp)
// [txn response] isSucceeded:true, resp:&{cluster_id:4280164070870598743 member_id:15482725622599376132 revision:118 raft_term:2 true [response_put:<header:<revision:118 > > ]}
// op 系列
opkey = "/root/op-key"
opvalue = "op-value"
op = clientv3.OpPut(opkey, opvalue, clientv3.WithKeysOnly())
if opResp, err = clt.Do(context.TODO(), op); err != nil {
fmt.Println(err)
}
fmt.Printf("[op put response] resp: %v \n", opResp.Put())
// [op response] resp: &{cluster_id:4280164070870598743 member_id:7885079459987601789 revision:172 raft_term:2 <nil>}
op = clientv3.OpGet(opkey)
if opResp, err = clt.Do(context.TODO(), op); err != nil {
fmt.Println(err)
}
fmt.Printf("[op get response] resp: %v \n", opResp.Get().Kvs[0])
// [op get response] resp: key:"root/op-key" create_revision:136 mod_revision:180 version:12 value:"op-value"
// 租约模式
lease = clientv3.NewLease(clt)
// 创建 2s 租约
if leaseGrantResp, err = lease.Grant(context.TODO(), 2); err != nil {
fmt.Println(err)
}
fmt.Printf("[lease response] lease id:%v \n", leaseGrantResp.ID)
fmt.Printf("lease TTL: %v \n", leaseGrantResp.TTL)
// key 绑定租约
if putResp, err = kv.Put(context.TODO(), key, value, clientv3.WithLease(leaseGrantResp.ID)); err != nil {
fmt.Println(err)
}
fmt.Printf("[lease put response] response:%v \n", putResp.Header.Revision)
// 自动持续续租
// lease.KeepAlive(context.TODO(), leaseGrantResp.ID)
// 自动续租 2s
ctx, cancelFunc := context.WithTimeout(context.TODO(), 2*time.Second)
defer cancelFunc()
defer lease.Revoke(ctx, leaseGrantResp.ID)
lease.KeepAlive(ctx, leaseGrantResp.ID)
time.AfterFunc(4*time.Second, func() {
getResp, _ = clt.Get(context.TODO(), key)
if getResp.Count == 0 {
fmt.Println("key has expired")
}
})
time.Sleep(5 * time.Second)
// 分布式锁
lockKey = "/root/lock"
lease = clientv3.NewLease(clt)
// 创建 15s 租约
if leaseGrantResp, err = lease.Grant(context.TODO(), 15); err != nil {
fmt.Println(err)
}
defer lease.Revoke(ctx, leaseGrantResp.ID)
lease.KeepAlive(context.TODO(), leaseGrantResp.ID)
txnResp, err = kv.Txn(context.TODO()).
If(clientv3.Compare(clientv3.CreateRevision(lockKey), "=", 0)).
Then(clientv3.OpPut(lockKey, "lockvalue", clientv3.WithLease(leaseGrantResp.ID))).
Else(clientv3.OpGet(lockKey)).
Commit()
if err != nil {
fmt.Printf("lock failed: %v", err)
}
if !txnResp.Succeeded {
fmt.Printf("lock failed, get revision:%v \n", txnResp.OpResponse().Txn().Header.Revision)
} else {
fmt.Printf("lock succeed, put revision:%v \n", txnResp.OpResponse().Txn().Header.Revision)
}
// return
// watcher
go func() {
for i := 1; i < 3; i++ {
op = clientv3.OpPut(opkey, strconv.Itoa(i), clientv3.WithKeysOnly())
if opResp, err = clt.Do(context.TODO(), op); err != nil {
fmt.Println(err)
}
fmt.Printf("[op put response] resp: %v \n", opResp.Put())
}
}()
// 监听此前缀的 key
// watchChan = clt.Watch(context.TODO(), "root", clientv3.WithPrefix())
// 监听某个 key
watchChan = clt.Watch(context.TODO(), opkey)
for wchanResp := range watchChan {
fmt.Printf("[watch response] resp: %v \n", wchanResp.Events[0].Kv)
// [watch response] resp: key:"root/op-key" create_revision:136 mod_revision:1539 version:1347 value:"9"
}
}