Etcd

etcd是一个高可用的键值存储系统,场景主要是

1、主要用于共享配置

2、服务注册与发现

3、分布式锁等

etcd是由CoreOS开发并维护的,灵感来自于 ZooKeeper 等。它使用Go语言编写

安装

docker pull golang:1.12-alpine
# 下载etcd 这边使用的是etcd 3.3.10

# 创建文件夹
 -/home/etcd
 	-conf
 	-data

# 启动出一个容器
 docker run -it  --name etcd  -p 2379:2379  -v /home/etcd:/etcd golang:1.12-alpine sh
 
 # ctrl+d可以直接退出容器
 # 此时容器已经停掉了 docker start etcd可以重启容器


//进入存放etcd 启动文件的文件夹下
docker cp etcd etcd:/usr/bin && docker cp etcdctl etcd:/usr/bin


# 在宿主机/home//etcd/conf下创建如下内容 文件名 etcd.yml
name: $(hostname -s)
data_dir: /etcd/data
listen-client-urls: http://0.0.0.0:2379

#配置完成后进入容器
docker exec -it etcd sh

# 进入usr/bin目录,给etcd 和etcdctl赋予权限

etcd --version  查看版本
etcd --config-file  /etcd/conf/etcd.yml

etcdctl

由于历史原因,存在v2和v3版本。 这里使用v3版本

export ETCDCTL_API=3
# 查看 
etcdctl version

譬如我要把 一个用户信息存进去

用户信息 包含了 ID、name、age    3个字段

写法如下
etcdctl put /user/101/name gewei
etcdctl put /user/101/age  19

//查看用户信息
etcdctl get /user/101/name  
etcdctl get /user/101/age   

//查看用户所有信息
etcdctl get /user/101 --prefix

//删除用户信息
etcdctl del /user/101 --prefix

docker模拟etcd集群创建

  1. 创建一个专门的docker网络
docker network create etcdnet --subnet 172.25.0.0/16 (使用的是bridge,指定了子网)

image-20220323113813393

  1. 在home文件夹下再创建一个etcd2 文件夹

  2. 由于我们前面设置了一个专门的网络,这里我们定义一个ip 规则

etcd1:172.25.0.101
etcd2:172.25.0.102
  1. 修改配置文件
name: etcd1 #文件名称
data-dir: /etcd/data # 数据目录
listen-client-urls: http://172.25.0.101:2379, http://127.0.0.1:2379 #外部访问ip+端口 本地访问ip+端口
advertise-client-urls: http://172.25.0.101:2379 # 通知外部访问的地址
listen-peer-urls: http://172.25.0.101:2380 # 2380 节点间通信使用端口
initial-advertise-peer-urls: http://172.25.0.101:2380 # 通知外部连接端口
initial-cluster: etcd1=http://172.25.0.101:2380,etcd2=http://172.25.0.102:2380 # 集群所有节点
initial-cluster-token: etcd-cluster-token
initial-cluster-state: new

name: etcd2
data-dir: /etcd/data
listen-client-urls: http://172.25.0.102:2379, http://127.0.0.1:2379
advertise-client-urls: http://172.25.0.102:2379
listen-peer-urls: http://172.25.0.102:2380
initial-advertise-peer-urls: http://172.25.0.102:2380
initial-cluster: etcd1=http://172.25.0.101:2380,etcd2=http://172.25.0.102:2380
initial-cluster-token: etcd-cluster-token
initial-cluster-state: new
  1. 固化镜像
# 首先随便创建一个容器
docker run --name testgo -it golang:1.12-alpine sh
# 讲etcd33 文件夹下的etcd 和etcdctl拷贝到容器中
docker cp etcd testgo:/usr/bin && docker cp etcdctl testgo:/usr/bin 
# 重新构建镜像
 docker commit testgo etcd:my
# 删除原来镜像
docker stop testgo && docker rm testgo

image-20220323115225005

  1. 创建容器
 # 创建容器1
 docker run -d  --name etcd1 --network etcdnet --ip 172.25.0.101 -p 23791:2379 -v /home/etcd:/etcd etcd:my  etcd --config-file  /etcd/conf/etcd.yml
# 创建容器2
 docker run -d  --name etcd2 --network etcdnet --ip 172.25.0.102 -p 23792:2379 -v /home/etcd2:/etcd etcd:my  etcd --config-file  /etcd/conf/etcd.yml

此时这两个容器已经创建为集群了

image-20220323130514090

如果想在本机测试,可以直接执行

# 进入etcd33文件夹下
etcdctl --endpoints http://localhost:23791 member list

image-20220323130826388

go操作etcd

# 指定环境变量
docker run -d  --name etcd1 --network etcdnet --ip 172.25.0.101 -p 23791:2379 -e ETCDCTL_API=3 -v /home/etcd:/etcd etcd:my  etcd --config-file  /etcd/conf/etcd.yml

 docker run -d  --name etcd2 --network etcdnet --ip 172.25.0.102 -p 23792:2379 -e ETCDCTL_API=3 -v /home/etcd2:/etcd etcd:my  etcd --config-file  /etcd/conf/etcd.yml
go get go.etcd.io/etcd/client/v3
import (
   "context"
   clientv3 "go.etcd.io/etcd/client/v3"
   "time"
)

func main() {
   client, err := clientv3.New(clientv3.Config{
      Endpoints:   []string{"127.0.0.1:23791", "127.0.0.1:23792"},
      DialTimeout: 2 * time.Second,
   })
   if err != nil {
      panic(err)
   }
   defer client.Close()

   client.Put(context.Background(), "/service/user","user1")
}
租约(lease)

和redis设置过期是有区别的

首先要创建一个租约(好比租房合同),并设置过期时间 etcdctl lease grant 20 (设置了一个 20秒的租约)

# 设置租约(200s)
etcdctl lease grant 200  
# 查看租约剩余时间
etcdctl lease timetolive (租约名称) 
# 查看租约列表
etcdctl lease list
# 毁约(删除租约)
etcdctl lease revoke (租约名称) 
# 租约一直保持存活
etcdctl lease keep-alive (租约名称) 
# 一旦租约过期,或被删掉,key就没了
etcdctl put /user gewei --lease=xxxxxooo 
# 可以查看该租约下的所有key 
etcdctl lease timetolive   xxxxxxx --keys 

image-20220323152448532

服务注册与发现

什么是注册?

其实就是往etcd中存入一个 key ,value就是 这个服务的地址

/services/product1/prouct
/services/product2/prouct

/services/user1/user

注册信息存储到etcd

func NewService() *Service {
   client, err := clientv3.New(clientv3.Config{
      Endpoints:   []string{"139.198.37.13:23791", "139.198.37.13:23792"},
      DialTimeout: 2 * time.Second,
   })
   if err != nil {
      panic(err)
   }
   return &Service{
      client: client,
   }
}

//RegService 注册服务
func (this *Service) RegService(id string, name string, address string) error {
   key_prefix := "/services/"
   _, err := this.client.Put(context.Background(), key_prefix+id+"/"+name, address)
   return err
}

服务启动幽雅性

对服务启动进行优化

func main() {
   router := mux.NewRouter()
   router.HandleFunc("/product/{id}", func(writer http.ResponseWriter, request *http.Request) {
      vars := mux.Vars(request)
      str := "get product Byid" + vars["id"]
      writer.Write([]byte(str))
   })

   serviceID := "p1"
   serviceName := "productservice"
   serviceAddr := "192.168.29.1"
   servicePort := 8081
   s := util.NewService()
   errChan := make(chan error)
   go func() {
      err := s.RegService(serviceID, serviceName, serviceAddr+":"+strconv.Itoa(servicePort))
      if err != nil {
         errChan <- err
         return
      }
      err = http.ListenAndServe(":"+strconv.Itoa(servicePort), router)
      if err != nil {
         errChan <- err
         return
      }
   }()

   go func() {
      sig := make(chan os.Signal)
      signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
      errChan <- fmt.Errorf("%s", <-sig)
   }()

   getErr := <-errChan
   log.Println("发生异常, 服务正在停止。。。")
   log.Fatal(getErr)

}

反注册

//UnregService 解除注册
func (this *Service) UnregService(id string) error {
   key_prefix := "/services/"
   //根据key_prefix 删除所有的key
   _, err := this.client.Delete(context.Background(), key_prefix+id, clientv3.WithPrefix())
   return err
}
func main() {
   router := mux.NewRouter()
   router.HandleFunc("/product/{id}", func(writer http.ResponseWriter, request *http.Request) {
      vars := mux.Vars(request)
      str := "get product Byid" + vars["id"]
      writer.Write([]byte(str))
   })

   serviceID := "p1"
   serviceName := "productservice"
   serviceAddr := "192.168.29.1"
   servicePort := 8081
   HttpServer := &http.Server{
      Addr:    ":" + strconv.Itoa(servicePort),
      Handler: router,
   }
   s := util.NewService()
   errChan := make(chan error)
   go func() {
      err := s.RegService(serviceID, serviceName, serviceAddr+":"+strconv.Itoa(servicePort))
      if err != nil {
         errChan <- err
         return
      }
      err = HttpServer.ListenAndServe()
      if err != nil {
         errChan <- err
         return
      }
   }()

   go func() {
      sig := make(chan os.Signal)
      signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
      errChan <- fmt.Errorf("%s", <-sig)
   }()

   getErr := <-errChan
   err := s.UnregService(serviceID)
   if err != nil {
      fmt.Printf("err:%v", err)
   }
   err = HttpServer.Shutdown(context.Background())
   if err != nil {
      log.Fatal(err)
   }
   log.Println("发生异常, 服务正在停止。。。")
   log.Fatal(getErr)

}

设置租约

//RegService 注册服务
func (this *Service) RegService(id string, name string, address string) error {
   key_prefix := "/services/"
   ctx := context.Background()
   //设置租约
   lease, err := clientv3.NewLease(this.client).Grant(ctx, 20)
   if err != nil {
      return err
   }
   _, err = this.client.Put(ctx, key_prefix+id+"/"+name, address, clientv3.WithLease(lease.ID))
   return err
}

定期续约

//RegService 注册服务
func (s *Service) RegService(id string, name string, address string) error {
   key_prefix := "/services/"
   ctx := context.Background()
   //设置租约
   lease := clientv3.NewLease(s.client)
   leaseRes, err := lease.Grant(ctx, 20)
   if err != nil {
      return err
   }
   _, err = s.client.Put(ctx, key_prefix+id+"/"+name, address, clientv3.WithLease(leaseRes.ID))
   if err != nil {
      return err
   }
   keepAliveRes, err := lease.KeepAlive(ctx, leaseRes.ID)
   go lisKeepAlive(keepAliveRes)
   return err
}
func lisKeepAlive(keepAliveRes <-chan *clientv3.LeaseKeepAliveResponse) {
   for {
      select {
      case ret := <-keepAliveRes:
         if ret != nil {
            fmt.Println("续约成功", time.Now())
         }
      }
   }
}

读取服务器数据

func (this *Client) GetService() {
   res, err := this.client.Get(context.TODO(), "/services", clientv3.WithPrefix())
   if err != nil {
      fmt.Printf("%v", err)
      return
   }
   for _, item := range res.Kvs {
      fmt.Println(string(item.Key))
   }
}

仿go-kit优雅调用服务

编写service类

type ProdRequest struct {
   ProdId int
}
func ProdEncodeFunc(ctx context.Context, httpRequest *http.Request, requestParam interface{}) error {
   prodr := requestParam.(ProdRequest)
   httpRequest.URL.Path += "/product/" + strconv.Itoa(prodr.ProdId)
   return nil
}

处理请求

func (this *Client) GetService(sname string, method string, encodeFunc EncodeRequestFunc) Endpoint {
   for _, service := range this.Services {
      if service.ServiceName == sname {
         return func(ctx context.Context, requestParam interface{}) (responseParam interface{}, err error) {
            httpClient := &http.Client{}
            request, err := http.NewRequest(method, "http://"+service.ServiceAddr, nil)
            if err != nil {
               return nil, err
            }
            err = encodeFunc(ctx, request, requestParam)
            if err != nil {
               return nil, err
            }
            res, err := httpClient.Do(request)
            if err != nil {
               return nil, err
            }
            defer res.Body.Close()
            body, err := ioutil.ReadAll(res.Body)
            if err != nil {
               return nil, err
            }
            return string(body), nil
         }
      }
   }
   return nil
}

客户端调用

func main() {
   client := util.NewClient()
   err := client.LoadService()
   if err != nil {
      log.Fatal(err)
   }
   endpoint := client.GetService("productservice", "GET", service.ProdEncodeFunc)
   res, err := endpoint(context.Background(), service.ProdRequest{ProdId: 100})
   if err != nil {
      log.Fatal(err)
   }
   fmt.Println(res)
}

启动多个服务

指定端口号和服务名,serviceId使用uuid

name := flag.String("name", "", "服务名称")
port := flag.Int("p", 0, "服务端口")
flag.Parse()
if *name == "" {
   log.Fatal("请指定服务名")
}
if *port == 0 {
   log.Fatal("请指定端口名")
}
serviceID := uuid.New().String()
serviceName := *name
serviceAddr := "127.0.0.1"
servicePort := *port

编写bat文件,启动多个项目

@Echo off
start "productservice1" go run p1.go -name productservice -p 8081 &
start "productservice2" go run p1.go -name productservice -p 8082
pause

负载均衡

随机负载均衡算法

type LoadBalance struct {
   Servers []*ServiceInfo
}

func NewLoadBalance(server []*ServiceInfo) *LoadBalance {
   return &LoadBalance{Servers: server}

}

func(this *LoadBalance) getByRand(sname string ) *ServiceInfo{
   tmp:=make([]*ServiceInfo,0)
   for _,service:=range this.Servers{
      if service.ServiceName==sname{
         tmp=append(tmp,service)
      }
   }
   if len(tmp)==0 {
      return nil
   }
   rand.Seed(time.Now().UnixNano())
   i:=rand.Intn(len(tmp))
   return this.Servers[i]
}

注意此时政策匹配也修改了

reg := regexp.MustCompile("/services/(.+)/(\\w+)")

配置中心

Confd

一款高可用统一配置管理工具

github地址

创建一个文件夹/home/confd

vi Dockerfile

FROM golang:1.12-alpine  as confd
ARG CONFD_VERSION=0.16.0
ADD https://github.com/kelseyhightower/confd/archive/v${CONFD_VERSION}.tar.gz /tmp/
RUN apk add --no-cache \
bzip2 \
make && \
mkdir -p /go/src/github.com/kelseyhightower/confd && \
cd /go/src/github.com/kelseyhightower/confd && \
tar --strip-components=1 -zxf /tmp/v${CONFD_VERSION}.tar.gz && \
go install github.com/kelseyhightower/confd && \
rm -rf /tmp/v${CONFD_VERSION}.tar.gz
ENTRYPOINT ["/go/bin/confd"]

创建镜像:

docker build -t confd:my .
# 进入etcd客户端 
# 插入以下两个值
etcdctl put /myconfig/mysql/user root
etcdctl put /myconfig/mysql/pass 123123

创建一个文件夹/home/confdfiles

# /home/confdfiles 下创建三个文件夹
 - conf.d
 	- myconfig.toml
 - template
    - myconfig.conf.tmpl
 - dest

myconfig.toml写入以下内容

[template]
src = "myconfig.conf.tmpl"
dest = "/etc/confd/dest/myconfig.conf"
keys = [
    "/myconfig/mysql/user",
    "/myconfig/mysql/pass",
]

myconfig.conf.tmpl写入以下内容

[this is myconfig]
database_user = {{getv "/myconfig/mysql/user"}}
database_pass = {{getv "/myconfig/mysql/pass"}}

运行容器

docker run -it --rm  --name confd -v /home/confdfiles:/etc/confd confd:my -onetime -backend etcdv3 -node http://139.198.37.13:23792

定时生成配置文件

修改运行容器的配置

docker run -it -d --rm  --name confd -v /home/confdfiles:/etc/confd confd:my -interval 5 -backend etcdv3 -node http://139.198.37.13:23792

读取配置信息

go get "gopkg.in/ini.v1"
func main() {
   port := flag.Int("p", 0, "服务端口")
   flag.Parse()
   if *port == 0 {
      log.Fatal("请指定端口")
   }
   file, err := ini.Load("my.ini")
   if err != nil {
      log.Fatal(err)
   }
   http.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {
      dbUser := file.Section("db").Key("db_user").Value()
      dbPass := file.Section("db").Key("db_pass").Value()
      writer.Write([]byte("<h1>" + dbUser + "<h1>"))
      writer.Write([]byte("<h1>" + dbPass + "<h1>"))
   })
   http.ListenAndServe(":"+strconv.Itoa(*port), nil)
}

创建文件my.ini

[db]
db_user = gewei
db_pass = 123

交叉编译

set GOARCH=amd64
set GOOS=linux
go build myhttp.go

配置重载

docker run -d   --name confd \
-v /home/confdfiles:/etc/confd \
-v /home/mygo:/mygo \
confd:my \
-watch  -backend etcdv3 -node http://192.168.29.135:23792
# watch 会监听key值的变化

创建一个策略,当key值变化时,会调用reload接口

 mux.HandleFunc("/reload",func(writer http.ResponseWriter, request *http.Request) {
		newCFG,_:=ini.Load("my.ini")
		cfg=newCFG
  })

修改myconfig文件,加入

reload_cmd = "curl http://192.168.29.135:8001/reload"

由于我们用的容器是alpine 的。纯净的,不带curl,因此需要进入容器安装

echo http://mirrors.ustc.edu.cn/alpine/v3.7/main > /etc/apk/repositories && \
echo http://mirrors.ustc.edu.cn/alpine/v3.7/community >> /etc/apk/repositories
apk add curl

监控文件变化

获取文件的md5值

func getFileMD5(filePath string) (string, error) {
   file, err := os.Open(filePath)
   hash := md5.New()
   //适合小文件,大文件不会全部写入
   if _, err = io.Copy(hash, file); err != nil {
      return "", err
   }
   hashInBytes := hash.Sum(nil)[:16]
   return hex.EncodeToString(hashInBytes), nil
}
# 开启一个线程监听文件发生变化
go func() {
   fileMD5, err := getFileMD5("my.ini")
   if err != nil {
      log.Println(err)
      return
   }
   for {
      newMD5, err := getFileMD5("my.ini")
      if err != nil {
         log.Println(err)
         return
      }
      if fileMD5 != newMD5 {
         fileMD5 = newMD5
         fmt.Println("文件发生了变化")
      }
   }
}()

平滑重启

go get -u github.com/jpillora/overseer
# 这个库会开两个进程起来
	server:=&http.Server{
		// Addr:":"+strconv.Itoa(*port),
		 Handler:mux,
	}
	prog:= func(state overseer.State) {
		server.Serve(state.Listener)
	}
	
	go(func() {
		overseer.Run(overseer.Config{
			Program: prog,
			TerminateTimeout:time.Second*2,
			Address: ":"+strconv.Itoa(*port),
		})
	})()
	//监控配置文件变化
	go(func() {
		fileMd5,err:=getFileMD5("my.ini")
		if err!=nil{
			log.Println(err)
			return
		}
		for{
			newMd5,err:=getFileMD5("my.ini")
			if err!=nil{
				log.Println(err)
				break;
			}
			if strings.Compare(newMd5,fileMd5)!=0{
				fileMd5=newMd5
				fmt.Println("文件发生了变化")
				overseer.Restart()
			}
			time.Sleep(time.Second*2)
		}
	})()

Q.E.D.


勤俭节约,艰苦奋斗。