前言
在上篇文章中我们讲解了Influxdb中MonitorService的工作原理,在本篇文章中会继续讲解PrecreatorService,从名字上来看,这个Service的作用是预先创建一些资源,那么具体Influxdb会预先初始化哪些资源,以及这些资源原先创建的优势是什么呢,让我们今天一起来从代码里面找答案。
PrecreatorService解析
首先我们来看下PrecreatorService的创建过程,从这个Service的结构中我们可以看到它包含了两个时间的属性checkInterval和advancePeriod,此外包含了一个MetaClient接口的引用以及一个waitgroup,由此可以猜测,这个Service的主要实现方式是通过周期性的通过多个并发的goroutines实现创建资源,然后再讲meta信息通过MetaClient的实例应用写入到meta.db中。
//cmd/influxd/run/server.go 320行
func (s *Server) appendPrecreatorService(c precreator.Config) error {
if !c.Enabled {
return nil
}
srv := precreator.NewService(c)
srv.MetaClient = s.MetaClient
s.Services = append(s.Services, srv)
return nil
}
// services/precreator/service.go 12行
// Service manages the shard precreation service.
type Service struct {
checkInterval time.Duration
advancePeriod time.Duration
Logger *zap.Logger
done chan struct{}
wg sync.WaitGroup
MetaClient interface {
PrecreateShardGroups(now, cutoff time.Time) error
}
}
// NewService returns an instance of the precreation service.
func NewService(c Config) *Service {
return &Service{
checkInterval: time.Duration(c.CheckInterval),
advancePeriod: time.Duration(c.AdvancePeriod),
Logger: zap.NewNop(),
}
}
下面我们就通过实际的这个Service的运行Code进行下验证。从源码中可以发现,这个Service主要作用就是在checkInterval的周期到达的时候,通过时间戳进行ShardGroups的创建。
// services/precreator/service.go 71行
// runPrecreation continually checks if resources need precreation.
func (s *Service) runPrecreation() {
defer s.wg.Done()
for {
select {
case <-time.After(s.checkInterval):
if err := s.precreate(time.Now().UTC()); err != nil {
s.Logger.Info("Failed to precreate shards", zap.Error(err))
}
case <-s.done:
s.Logger.Info("Terminating precreation service")
return
}
}
}
// precreate performs actual resource precreation.
func (s *Service) precreate(now time.Time) error {
cutoff := now.Add(s.advancePeriod).UTC()
return s.MetaClient.PrecreateShardGroups(now, cutoff)
}
在代码中有一个很有趣的用法cutoff := now.Add(s.advancePeriod).UTC(),这个的作用是什么呢,查询了一下官方的文档,这个参数的主要作用是提前多长时间创建ShardGroups,默认情况下checkInterval的取值为10m,而advancePeriod的取值为30m,也就是说Influxdb会每10分钟进行一下检查,看是否需要为30分钟后预创建ShardGroups。接下来深入到创建ShardGroups的源码中。
// services/meta/client.go 777行
// PrecreateShardGroups creates shard groups whose endtime is before the 'to' time passed in, but
// is yet to expire before 'from'. This is to avoid the need for these shards to be created when data
// for the corresponding time range arrives. Shard creation involves Raft consensus, and precreation
// avoids taking the hit at write-time.
func (c *Client) PrecreateShardGroups(from, to time.Time) error {
c.mu.Lock()
defer c.mu.Unlock()
data := c.cacheData.Clone()
var changed bool
for _, di := range data.Databases {
for _, rp := range di.RetentionPolicies {
if len(rp.ShardGroups) == 0 {
// No data was ever written to this group, or all groups have been deleted.
continue
}
g := rp.ShardGroups[len(rp.ShardGroups)-1] // Get the last group in time.
if !g.Deleted() && g.EndTime.Before(to) && g.EndTime.After(from) {
// Group is not deleted, will end before the future time, but is still yet to expire.
// This last check is important, so the system doesn't create shards groups wholly
// in the past.
// Create successive shard group.
nextShardGroupTime := g.EndTime.Add(1 * time.Nanosecond)
// if it already exists, continue
if sg, _ := data.ShardGroupByTimestamp(di.Name, rp.Name, nextShardGroupTime); sg != nil {
c.logger.Info("Shard group already exists",
logger.ShardGroup(sg.ID),
logger.Database(di.Name),
logger.RetentionPolicy(rp.Name))
continue
}
newGroup, err := createShardGroup(data, di.Name, rp.Name, nextShardGroupTime)
if err != nil {
c.logger.Info("Failed to precreate successive shard group",
zap.Uint64("group_id", g.ID), zap.Error(err))
continue
}
changed = true
c.logger.Info("New shard group successfully precreated",
logger.ShardGroup(newGroup.ID),
logger.Database(di.Name),
logger.RetentionPolicy(rp.Name))
}
}
}
if changed {
if err := c.commit(data); err != nil {
return err
}
}
return nil
}
在前几篇文章中我们稍微涉及了一些关于Influxdb存储结构上的内容,一个Influxdb可以包含多个库,每个库的存储是根据retention policy来分目录的,而retention policy之下才是真正的Shard,因此在预创建Shard的时候,需要为每一个符合条件的库以及库之下的retention policy都创建Shard。
如果此时retention policy下没有任何的Shard,则会认为无需预创建,因为大部分的情况下,这种场景意味着数据被清空的中间态。为了保证创建的Shard的时序性,在预创建的时候会获取一个retention policy下的最后一个shard,并检查当前shard的所属时间序列是否会和新创建的Shard有重合,只有新创建的Shard与最新的Shard之间不存在重合关系的时候,才会进行创建。继续跟踪代码,查看创建Shard的流程。
// services/meta/data.go 350行
// CreateShardGroup creates a shard group on a database and policy for a given timestamp.
func (data *Data) CreateShardGroup(database, policy string, timestamp time.Time) error {
// Find retention policy.
rpi, err := data.RetentionPolicy(database, policy)
if err != nil {
return err
} else if rpi == nil {
return influxdb.ErrRetentionPolicyNotFound(policy)
}
// Verify that shard group doesn't already exist for this timestamp.
if rpi.ShardGroupByTimestamp(timestamp) != nil {
return nil
}
// Create the shard group.
data.MaxShardGroupID++
sgi := ShardGroupInfo{}
sgi.ID = data.MaxShardGroupID
sgi.StartTime = timestamp.Truncate(rpi.ShardGroupDuration).UTC()
sgi.EndTime = sgi.StartTime.Add(rpi.ShardGroupDuration).UTC()
if sgi.EndTime.After(time.Unix(0, models.MaxNanoTime)) {
// Shard group range is [start, end) so add one to the max time.
sgi.EndTime = time.Unix(0, models.MaxNanoTime+1)
}
data.MaxShardID++
sgi.Shards = []ShardInfo{
{ID: data.MaxShardID},
}
// Retention policy has a new shard group, so update the policy. Shard
// Groups must be stored in sorted order, as other parts of the system
// assume this to be the case.
rpi.ShardGroups = append(rpi.ShardGroups, sgi)
sort.Sort(ShardGroupInfos(rpi.ShardGroups))
return nil
}
当我们看到创建Shard的代码逻辑的时候,其实是有些失望的,此时创建的Shard支持在内存中进行了创建,并没有在磁盘中创建相应的目录结构,那么在磁盘上面Shard的目录结构和对应的Shard的名字是怎么来的呢。实际上这个Shard的信息只是在meta与内存中作了一个标示,并不是真的开始存储数据,这部分的内容会在PointsWriter.WritePoints的时候刷到磁盘上的,在后面解析PointsWriter的部分会进行深入的讨论。
总结
至此PrecreatorService的部分已经比较明晰了,他的原理非常简单,通过异步定时执行,建立在内存中和meta数据中的shard索引,为后续写入数据的Shard分布提供索引,这样当数据通过PointsWriter写入数据时会根据对应的索引hash分布到真正的Shard中。也就是说通过预分配Shard的部分可以让数据更好的Hash到不同的存储文件中,通过时间戳可以对应到Shard,从而可以使得数据更好的切分,更快的查询。