EtcdServer初始化
NewServer
etcdserver.NewServer (server.go:305) go.etcd.io/etcd/server/v3/etcdserver
embed.StartEtcd (etcd.go:247) go.etcd.io/etcd/server/v3/embed
etcdmain.startEtcd (etcd.go:203) go.etcd.io/etcd/server/v3/etcdmain
etcdmain.startEtcdOrProxyV2 (etcd.go:113) go.etcd.io/etcd/server/v3/etcdmain
etcdmain.Main (main.go:40) go.etcd.io/etcd/server/v3/etcdmain
main.main (main.go:32) main
runtime.main (proc.go:255) runtime
runtime.goexit (asm_amd64.s:1581) runtime
- Async Stack Trace
<autogenerated>:2
startEtcdOrProxyV2
startEtcdOrProxyV2会针对命令行参数解析并进行初始化和启动etcd服务器。其执行流程大致如下:
- 解析参数
- 初始化日志
- 初始化host
- 初始化db存储名
- 启动服务器
- 然后等待服务器停止
func startEtcdOrProxyV2(args []string) {
grpc.EnableTracing = false
// 解析参数
cfg := newConfig()
defaultInitialCluster := cfg.ec.InitialCluster
err := cfg.parse(args[1:])
// 初始化日志
lg := cfg.ec.GetLogger()
// If we failed to parse the whole configuration, print the error using
// preferably the resolved logger from the config,
// but if does not exists, create a new temporary logger.
if lg == nil {
var zapError error
// use this logger
lg, zapError = logutil.CreateDefaultZapLogger(zap.InfoLevel)
if zapError != nil {
fmt.Printf("error creating zap logger %v", zapError)
os.Exit(1)
}
}
lg.Info("Running: ", zap.Strings("args", args))
if err != nil {
lg.Warn("failed to verify flags", zap.Error(err))
switch err {
case embed.ErrUnsetAdvertiseClientURLsFlag:
lg.Warn("advertise client URLs are not set", zap.Error(err))
}
os.Exit(1)
}
cfg.ec.SetupGlobalLoggers()
defer func() {
logger := cfg.ec.GetLogger()
if logger != nil {
logger.Sync()
}
}()
// 初始化host
defaultHost, dhErr := (&cfg.ec).UpdateDefaultClusterFromName(defaultInitialCluster)
if defaultHost != "" {
lg.Info(
"detected default host for advertise",
zap.String("host", defaultHost),
)
}
if dhErr != nil {
lg.Info("failed to detect default host", zap.Error(dhErr))
}
// 初始化存储目录
if cfg.ec.Dir == "" {
cfg.ec.Dir = fmt.Sprintf("%v.etcd", cfg.ec.Name)
lg.Warn(
"'data-dir' was empty; using default",
zap.String("data-dir", cfg.ec.Dir),
)
}
var stopped <-chan struct{}
var errc <-chan error
// 3. 启动etcd服务器
which := identifyDataDirOrDie(cfg.ec.GetLogger(), cfg.ec.Dir)
if which != dirEmpty {
lg.Info(
"server has already been initialized",
zap.String("data-dir", cfg.ec.Dir),
zap.String("dir-type", string(which)),
)
switch which {
case dirMember:
stopped, errc, err = startEtcd(&cfg.ec)
case dirProxy:
lg.Panic("v2 http proxy has already been deprecated in 3.6", zap.String("dir-type", string(which)))
default:
lg.Panic(
"unknown directory type",
zap.String("dir-type", string(which)),
)
}
} else {
stopped, errc, err = startEtcd(&cfg.ec)
if err != nil {
lg.Warn("failed to start etcd", zap.Error(err))
}
}
if err != nil {
if derr, ok := err.(*errors.DiscoveryError); ok {
switch derr.Err {
case v2discovery.ErrDuplicateID:
lg.Warn(
"member has been registered with discovery service",
zap.String("name", cfg.ec.Name),
zap.String("discovery-token", cfg.ec.Durl),
zap.Error(derr.Err),
)
lg.Warn(
"but could not find valid cluster configuration",
zap.String("data-dir", cfg.ec.Dir),
)
lg.Warn("check data dir if previous bootstrap succeeded")
lg.Warn("or use a new discovery token if previous bootstrap failed")
case v2discovery.ErrDuplicateName:
lg.Warn(
"member with duplicated name has already been registered",
zap.String("discovery-token", cfg.ec.Durl),
zap.Error(derr.Err),
)
lg.Warn("cURL the discovery token URL for details")
lg.Warn("do not reuse discovery token; generate a new one to bootstrap a cluster")
default:
lg.Warn(
"failed to bootstrap; discovery token was already used",
zap.String("discovery-token", cfg.ec.Durl),
zap.Error(err),
)
lg.Warn("do not reuse discovery token; generate a new one to bootstrap a cluster")
}
os.Exit(1)
}
if strings.Contains(err.Error(), "include") && strings.Contains(err.Error(), "--initial-cluster") {
lg.Warn("failed to start", zap.Error(err))
if cfg.ec.InitialCluster == cfg.ec.InitialClusterFromName(cfg.ec.Name) {
lg.Warn("forgot to set --initial-cluster?")
}
if types.URLs(cfg.ec.APUrls).String() == embed.DefaultInitialAdvertisePeerURLs {
lg.Warn("forgot to set --initial-advertise-peer-urls?")
}
if cfg.ec.InitialCluster == cfg.ec.InitialClusterFromName(cfg.ec.Name) && len(cfg.ec.Durl) == 0 && len(cfg.ec.DiscoveryCfg.Endpoints) == 0 {
lg.Warn("V2 discovery settings (i.e., --discovery) or v3 discovery settings (i.e., --discovery-token, --discovery-endpoints) are not set")
}
os.Exit(1)
}
lg.Fatal("discovery failed", zap.Error(err))
}
osutil.HandleInterrupts(lg)
// At this point, the initialization of etcd is done.
// The listeners are listening on the TCP ports and ready
// for accepting connections. The etcd instance should be
// joined with the cluster and ready to serve incoming
// connections.
notifySystemd(lg)
select {
case lerr := <-errc:
// fatal out on listener errors
lg.Fatal("listener failed", zap.Error(lerr))
case <-stopped:
}
osutil.Exit(0)
}
StartEtcd
StartEtcd启动etcd服务器和HTTP服务,用于客户端/服务器通信。
返回etcd。
服务器不能保证一定加入集群,所以需要等待Etcd.Server.ReadyNotify()。
func StartEtcd(inCfg *Config) (e *Etcd, err error) {
if err = inCfg.Validate(); err != nil {
return nil, err
}
serving := false
e = &Etcd{cfg: *inCfg, stopc: make(chan struct{})}
cfg := &e.cfg
defer func() {
if e == nil || err == nil {
return
}
if !serving {
// errored before starting gRPC server for serveCtx.serversC
for _, sctx := range e.sctxs {
close(sctx.serversC)
}
}
e.Close()
e = nil
}()
if !cfg.SocketOpts.Empty() {
cfg.logger.Info(
"configuring socket options",
zap.Bool("reuse-address", cfg.SocketOpts.ReuseAddress),
zap.Bool("reuse-port", cfg.SocketOpts.ReusePort),
)
}
e.cfg.logger.Info(
"configuring peer listeners",
zap.Strings("listen-peer-urls", e.cfg.getLPURLs()),
)
if e.Peers, err = configurePeerListeners(cfg); err != nil {
return e, err
}
e.cfg.logger.Info(
"configuring client listeners",
zap.Strings("listen-client-urls", e.cfg.getLCURLs()),
)
if e.sctxs, err = configureClientListeners(cfg); err != nil {
return e, err
}
for _, sctx := range e.sctxs {
e.Clients = append(e.Clients, sctx.l)
}
var (
urlsmap types.URLsMap
token string
)
memberInitialized := true
if !isMemberInitialized(cfg) {
memberInitialized = false
urlsmap, token, err = cfg.PeerURLsMapAndToken("etcd")
if err != nil {
return e, fmt.Errorf("error setting up initial cluster: %v", err)
}
}
// 是否自动压缩,默认未设置
if len(cfg.AutoCompactionRetention) == 0 {
cfg.AutoCompactionRetention = "0"
}
autoCompactionRetention, err := parseCompactionRetention(cfg.AutoCompactionMode, cfg.AutoCompactionRetention)
if err != nil {
return e, err
}
backendFreelistType := parseBackendFreelistType(cfg.BackendFreelistType)
srvcfg := config.ServerConfig{
Name: cfg.Name,
ClientURLs: cfg.ACUrls,
PeerURLs: cfg.APUrls,
DataDir: cfg.Dir,
DedicatedWALDir: cfg.WalDir,
SnapshotCount: cfg.SnapshotCount,
SnapshotCatchUpEntries: cfg.SnapshotCatchUpEntries,
MaxSnapFiles: cfg.MaxSnapFiles,
MaxWALFiles: cfg.MaxWalFiles,
InitialPeerURLsMap: urlsmap,
InitialClusterToken: token,
DiscoveryURL: cfg.Durl,
DiscoveryProxy: cfg.Dproxy,
DiscoveryCfg: cfg.DiscoveryCfg,
NewCluster: cfg.IsNewCluster(),
PeerTLSInfo: cfg.PeerTLSInfo,
TickMs: cfg.TickMs,
ElectionTicks: cfg.ElectionTicks(),
WaitClusterReadyTimeout: cfg.ExperimentalWaitClusterReadyTimeout,
InitialElectionTickAdvance: cfg.InitialElectionTickAdvance,
AutoCompactionRetention: autoCompactionRetention,
AutoCompactionMode: cfg.AutoCompactionMode,
QuotaBackendBytes: cfg.QuotaBackendBytes,
BackendBatchLimit: cfg.BackendBatchLimit,
BackendFreelistType: backendFreelistType,
BackendBatchInterval: cfg.BackendBatchInterval,
MaxTxnOps: cfg.MaxTxnOps,
MaxRequestBytes: cfg.MaxRequestBytes,
MaxConcurrentStreams: cfg.MaxConcurrentStreams,
SocketOpts: cfg.SocketOpts,
StrictReconfigCheck: cfg.StrictReconfigCheck,
ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth,
AuthToken: cfg.AuthToken,
BcryptCost: cfg.BcryptCost,
TokenTTL: cfg.AuthTokenTTL,
CORS: cfg.CORS,
HostWhitelist: cfg.HostWhitelist,
InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck,
CorruptCheckTime: cfg.ExperimentalCorruptCheckTime,
CompactHashCheckEnabled: cfg.ExperimentalCompactHashCheckEnabled,
CompactHashCheckTime: cfg.ExperimentalCompactHashCheckTime,
PreVote: cfg.PreVote,
Logger: cfg.logger,
ForceNewCluster: cfg.ForceNewCluster,
EnableGRPCGateway: cfg.EnableGRPCGateway,
ExperimentalEnableDistributedTracing: cfg.ExperimentalEnableDistributedTracing,
UnsafeNoFsync: cfg.UnsafeNoFsync,
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
LeaseCheckpointPersist: cfg.ExperimentalEnableLeaseCheckpointPersist,
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
CompactionSleepInterval: cfg.ExperimentalCompactionSleepInterval,
WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval,
DowngradeCheckTime: cfg.ExperimentalDowngradeCheckTime,
WarningApplyDuration: cfg.ExperimentalWarningApplyDuration,
WarningUnaryRequestDuration: cfg.ExperimentalWarningUnaryRequestDuration,
ExperimentalMemoryMlock: cfg.ExperimentalMemoryMlock,
ExperimentalTxnModeWriteWithSharedBuffer: cfg.ExperimentalTxnModeWriteWithSharedBuffer,
ExperimentalBootstrapDefragThresholdMegabytes: cfg.ExperimentalBootstrapDefragThresholdMegabytes,
ExperimentalMaxLearners: cfg.ExperimentalMaxLearners,
V2Deprecation: cfg.V2DeprecationEffective(),
}
if srvcfg.ExperimentalEnableDistributedTracing {
tctx := context.Background()
tracingExporter, opts, err := setupTracingExporter(tctx, cfg)
if err != nil {
return e, err
}
if tracingExporter == nil || len(opts) == 0 {
return e, fmt.Errorf("error setting up distributed tracing")
}
e.tracingExporterShutdown = func() { tracingExporter.Shutdown(tctx) }
srvcfg.ExperimentalTracerOptions = opts
e.cfg.logger.Info(
"distributed tracing setup enabled",
)
}
print(e.cfg.logger, *cfg, srvcfg, memberInitialized)
if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
return e, err
}
// buffer channel so goroutines on closed connections won't wait forever
e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs))
// newly started member ("memberInitialized==false")
// does not need corruption check
if memberInitialized && srvcfg.InitialCorruptCheck {
if err = e.Server.CorruptionChecker().InitialCheck(); err != nil {
// set "EtcdServer" to nil, so that it does not block on "EtcdServer.Close()"
// (nothing to close since rafthttp transports have not been started)
e.cfg.logger.Error("checkInitialHashKV failed", zap.Error(err))
e.Server.Cleanup()
e.Server = nil
return e, err
}
}
e.Server.Start()
if err = e.servePeers(); err != nil {
return e, err
}
if err = e.serveClients(); err != nil {
return e, err
}
if err = e.serveMetrics(); err != nil {
return e, err
}
e.cfg.logger.Info(
"now serving peer/client/metrics",
zap.String("local-member-id", e.Server.MemberId().String()),
zap.Strings("initial-advertise-peer-urls", e.cfg.getAPURLs()),
zap.Strings("listen-peer-urls", e.cfg.getLPURLs()),
zap.Strings("advertise-client-urls", e.cfg.getACURLs()),
zap.Strings("listen-client-urls", e.cfg.getLCURLs()),
zap.Strings("listen-metrics-urls", e.cfg.getMetricsURLs()),
)
serving = true
return e, nil
}
NewServer
创建一个etcd服务器
func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
b, err := bootstrap(cfg)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
b.Close()
}
}()
sstats := stats.NewServerStats(cfg.Name, b.cluster.cl.String())
lstats := stats.NewLeaderStats(cfg.Logger, b.cluster.nodeID.String())
heartbeat := time.Duration(cfg.TickMs) * time.Millisecond
srv = &EtcdServer{
readych: make(chan struct{}),
Cfg: cfg,
lgMu: new(sync.RWMutex),
lg: cfg.Logger,
errorc: make(chan error, 1),
v2store: b.storage.st,
snapshotter: b.ss,
r: *b.raft.newRaftNode(b.ss, b.storage.wal.w, b.cluster.cl),
memberId: b.cluster.nodeID,
attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
cluster: b.cluster.cl,
stats: sstats,
lstats: lstats,
SyncTicker: time.NewTicker(500 * time.Millisecond),
peerRt: b.prt,
reqIDGen: idutil.NewGenerator(uint16(b.cluster.nodeID), time.Now()),
AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist},
consistIndex: b.storage.backend.ci,
firstCommitInTerm: notify.NewNotifier(),
clusterVersionChanged: notify.NewNotifier(),
}
serverID.With(prometheus.Labels{"server_id": b.cluster.nodeID.String()}).Set(1)
srv.cluster.SetVersionChangedNotifier(srv.clusterVersionChanged)
srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster)
srv.be = b.storage.backend.be
srv.beHooks = b.storage.backend.beHooks
minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat
//kv前总要收回出租人。当我们恢复mvcc。KV会把key重新附在租约上。
// 如果我们恢复mvcc。首先是KV,它会在恢复之前把key给错误的出租人。
srv.lessor = lease.NewLessor(srv.Logger(), srv.be, srv.cluster, lease.LessorConfig{
MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())),
CheckpointInterval: cfg.LeaseCheckpointInterval,
CheckpointPersist: cfg.LeaseCheckpointPersist,
ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
})
tp, err := auth.NewTokenProvider(cfg.Logger, cfg.AuthToken,
func(index uint64) <-chan struct{} {
return srv.applyWait.Wait(index)
},
time.Duration(cfg.TokenTTL)*time.Second,
)
if err != nil {
cfg.Logger.Warn("failed to create token provider", zap.Error(err))
return nil, err
}
mvccStoreConfig := mvcc.StoreConfig{
CompactionBatchLimit: cfg.CompactionBatchLimit,
CompactionSleepInterval: cfg.CompactionSleepInterval,
}
srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig)
srv.corruptionChecker = newCorruptionChecker(cfg.Logger, srv, srv.kv.HashStorage())
srv.authStore = auth.NewAuthStore(srv.Logger(), schema.NewAuthBackend(srv.Logger(), srv.be), tp, int(cfg.BcryptCost))
newSrv := srv // since srv == nil in defer if srv is returned as nil
defer func() {
// closing backend without first closing kv can cause
// resumed compactions to fail with closed tx errors
if err != nil {
newSrv.kv.Close()
}
}()
if num := cfg.AutoCompactionRetention; num != 0 {
srv.compactor, err = v3compactor.New(cfg.Logger, cfg.AutoCompactionMode, num, srv.kv, srv)
if err != nil {
return nil, err
}
srv.compactor.Run()
}
if err = srv.restoreAlarms(); err != nil {
return nil, err
}
srv.uberApply = srv.NewUberApplier()
if srv.Cfg.EnableLeaseCheckpoint {
// setting checkpointer enables lease checkpoint feature.
srv.lessor.SetCheckpointer(func(ctx context.Context, cp *pb.LeaseCheckpointRequest) {
srv.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseCheckpoint: cp})
})
}
// Set the hook after EtcdServer finishes the initialization to avoid
// the hook being called during the initialization process.
srv.be.SetTxPostLockInsideApplyHook(srv.getTxPostLockInsideApplyHook())
// TODO: move transport initialization near the definition of remote
tr := &rafthttp.Transport{
Logger: cfg.Logger,
TLSInfo: cfg.PeerTLSInfo,
DialTimeout: cfg.PeerDialTimeout(),
ID: b.cluster.nodeID,
URLs: cfg.PeerURLs,
ClusterID: b.cluster.cl.ID(),
Raft: srv,
Snapshotter: b.ss,
ServerStats: sstats,
LeaderStats: lstats,
ErrorC: srv.errorc,
}
if err = tr.Start(); err != nil {
return nil, err
}
// add all remotes into transport
for _, m := range b.cluster.remotes {
if m.ID != b.cluster.nodeID {
tr.AddRemote(m.ID, m.PeerURLs)
}
}
for _, m := range b.cluster.cl.Members() {
if m.ID != b.cluster.nodeID {
tr.AddPeer(m.ID, m.PeerURLs)
}
}
srv.r.transport = tr
return srv, nil
}
bootstrap
boostrap进行主要的初始化:
func bootstrap(cfg config.ServerConfig) (b *bootstrappedServer, err error) {
if cfg.MaxRequestBytes > recommendedMaxRequestBytes {
cfg.Logger.Warn(
"exceeded recommended request limit",
zap.Uint("max-request-bytes", cfg.MaxRequestBytes),
zap.String("max-request-size", humanize.Bytes(uint64(cfg.MaxRequestBytes))),
zap.Int("recommended-request-bytes", recommendedMaxRequestBytes),
zap.String("recommended-request-size", recommendedMaxRequestBytesString),
)
}
//每个etcd节点都有会将其数据保存到“节点名称.etcdlmember ”目录下 如果在下面没有特殊说明,
//则捉到的目录都是该目录下的子目录。这里会先检测该目录是否存在 如果不存在就创建该目录
if terr := fileutil.TouchDirAll(cfg.Logger, cfg.DataDir); terr != nil {
return nil, fmt.Errorf("cannot access data directory: %v", terr)
}
if terr := fileutil.TouchDirAll(cfg.Logger, cfg.MemberDir()); terr != nil {
return nil, fmt.Errorf("cannot access member directory: %v", terr)
}
// 初始化snap
ss := bootstrapSnapshot(cfg)
prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.PeerDialTimeout())
if err != nil {
return nil, err
}
// 检测wal目录下是否存在wal日志文件
haveWAL := wal.Exist(cfg.WALDir())
// 创建版本存储
st := v2store.New(StoreClusterPrefix, StoreKeysPrefix)
// 初始化backend
backend, err := bootstrapBackend(cfg, haveWAL, st, ss)
if err != nil {
return nil, err
}
var (
bwal *bootstrappedWAL
)
if haveWAL {
if err = fileutil.IsDirWriteable(cfg.WALDir()); err != nil {
return nil, fmt.Errorf("cannot write to WAL directory: %v", err)
}
bwal = bootstrapWALFromSnapshot(cfg, backend.snapshot)
}
// 初始化集群
cluster, err := bootstrapCluster(cfg, bwal, prt)
if err != nil {
backend.Close()
return nil, err
}
// 初始化存储
s, err := bootstrapStorage(cfg, st, backend, bwal, cluster)
if err != nil {
backend.Close()
return nil, err
}
err = cluster.Finalize(cfg, s)
if err != nil {
backend.Close()
return nil, err
}
// 初始化raft模块
raft := bootstrapRaft(cfg, cluster, s.wal)
return &bootstrappedServer{
prt: prt,
ss: ss,
storage: s,
cluster: cluster,
raft: raft,
}, nil
}