运维开发网

log-pilot源码简析

运维开发网 https://www.qedev.com 2021-03-01 09:45 出处:51CTO 作者:paniho
log-pilot 是阿里开源的一款容器日志收集项目,具有动态伸缩、动态配置等特性。它的核心原理是:监听docker events,自动配置与重载filebeat/fluentd来达到日志收集随容器的动态调度而自动伸缩的效果源码分析整个项目的核心是:Polit 结构体Politer 接口Polit主要监听 docker 容器事件并获取容器日志挂载目录、标签、环境变量等信息,动态生成filebeat

log-pilot 是阿里开源的一款容器日志收集项目,具有动态伸缩、动态配置等特性。它的核心原理是:监听docker events,自动配置与重载filebeat/fluentd来达到日志收集随容器的动态调度而自动伸缩的效果

整个项目的核心是:

  • Polit 结构体

  • Politer 接口

Polit

主要监听 docker 容器事件并获取容器日志挂载目录、标签、环境变量等信息,动态生成filebeat/fluentd配置文件

type Pilot struct {
    piloter       Piloter               // Piloter 相关
    mutex         sync.Mutex            // 并发锁,多个容器事件触发,谁先抢到锁,谁先处理
    templ         *template.Template    // 日志采集客户端配置文件模板,log-pilot 使用 golang 的 text/template 模块渲染配置文件
    client        *k8s.Client           // docker 容器客户端,通过docker 事件接口 api 获取相关容器信息
    lastReload    time.Time             // 最后一次配置文件重载时间
    reloadChan    chan bool             // 重载通知 chan
    stopChan      chan bool             // 停止通知 chan
    baseDir       string                // docker 在宿主机上面的数据存储位置
    logPrefix     []string              // 定义环境变量以什么字符开头来表示应用日志所在目录,log-pilot 以配置环境变量的方式配置定每个容器中应用程序的日志路径位置
    createSymlink bool                  // 是否创建硬连接的方式关联要搜集的日志文件
}

Politer

Politer 定义了收集工具需要操作的一些方法,主要负责收集工具的启用停止reload的具体操作

type Piloter interface {
    Name() string       // "filebeat" 与 "fluentd",分别表示不同的搜集工具
 
    Start() error       // 启动搜集工具
    Reload() error      // 重载配置文件
    Stop() error        // 停止搜集工具
 
    GetBaseConf() string    // 日志采集客户端配置文件位置,如 filebeat 的 /etc/filebeat
    GetConfHome() string    // 日志采集客户端统一配置文件目录,如 filebeat 的 prospectors.d 位置
    GetConfPath(container string) string    // 具体配置文件路径
 
    OnDestroyEvent(container string) error  // 监听容器停止事件
}

main 函数

程序入口、命令行处理:日志收集配置模板指定、log-pilot 本身的日志等级配置等 

Plot.Run

  • 初始化 Polit 数据,Polit 中包含了对应 filebeat/fluentd 配置模版、dokcer client、并发锁、piloter 对象等
  • 开启容器事件监控
func Run(templ string, baseDir string) error {
p, err := New(templ, baseDir)
....
return p.watch()
}

Pilot.watch

  • 使用 docker api 连接 docker,并watch docker 事件
func (p *Pilot) watch() error {
    ....
     
    err := p.piloter.Start()            // 启动收集工具
    ....
     
    msgs, errs := p.client.Events(ctx, options)  // 接受 docker 事件,返回 chan
 
    go func() {
        ....
 
        for {           // 无限循环获取事件
            select {
            case msg := <-msgs:
                if err := p.processEvent(msg); err != nil {     // 处理 docker 事件
                    log.Errorf("fail to process event: %v,  %v", msg, err)
                }
            ........
        }
    }()
    ....
}

Pilot.processEvent

  docker event 的handler函数

func (p *Pilot) processEvent(msg events.Message) error {
	....
	switch msg.Action {
	case "start", "restart":
            ....
            return p.newContainer(&containerJSON)
	case "destroy", "die":
            ....
	    err := p.delContainer(containerId)
	return nil
}

Pilot.newContainer

  • 处理环境变量/tag标签/mount

  • 渲染配置文键模板,生成新的配置文件并reload生效 

func (p *Pilot) newContainer(containerJSON *types.ContainerJSON) error {
    ....
    // containerJSON 是 docker接口 Client.ContainerInspect 返回的数据类型
 
    container := container(containerJSON)
 
    for _, e := range env {         // 处理环境变量, env由containerJSON 得到
        .....
    }
    // 获取配置文件模板数据
    logConfigs, err := p.getLogConfigs(jsonLogPath, mounts, labels)
    if err != nil {
        return err
    }
    
 
    ....
 
    // 关联 docker 容器中应用日志文件或目录
    p.createVolumeSymlink(containerJSON)
    
 
    // 渲染配置文件模板数据,生成具体的配置文件
    logConfig, err := p.render(id, container, logConfigs)
    if err != nil {
        return err
    }
    //TODO validate config before save
    //log.Debugf("container %s log config: %s", id, logConfig)
    if err = ioutil.WriteFile(p.piloter.GetConfPath(id), []byte(logConfig), os.FileMode(0644)); err != nil {
        return err
    }
    // 重载配置文件
    p.tryReload()
    return nil
}

Pilot.delContainer

  • 渲染配置文键模板,删除配置文件
  • reload 配置文件

func (p *Pilot) delContainer(id string) error {
	p.removeVolumeSymlink(id)

	//fixme refactor in the future
	if p.piloter.Name() == PILOT_FLUENTD {
		clean := func() {
			log.Infof("Try removing log config %s", id)
			if err := os.Remove(p.piloter.GetConfPath(id)); err != nil {
				log.Warnf("removing %s log config failure", id)
				return
			}
			p.tryReload()
		}
		time.AfterFunc(15*time.Minute, clean)
		return nil
	}

	return p.piloter.OnDestroyEvent(id)
}

LogConfig

动态渲染配置文件模板数据集

type LogConfig struct {
    Name         string                 // 日志名
    HostDir      string                 // 日志文件在宿主机上的目录
    ContainerDir string                 // 容器应用日志目录
    Format       string
    FormatConfig map[string]string     
    File         string                 // 具体的日志文件名
    Tags         map[string]string      // 标签数据
    Target       string                 // 索引或者kafka主题
    EstimateTime bool
    Stdout       bool
 
    CustomFields  map[string]string     // 自定义添加日志字段
    CustomConfigs map[string]string     // 自定义配置文件项
}

getLogConfigs

  • 获取配置文件模板渲染数据

parseLogConfig

  • 接续容器数据获得配置文件模板渲染数据

最终生成的filebeat 配置文件

- type: log
  enabled: true
  paths:
      - /host/var/lib/docker/containers/b61b94c9f38eec70df32d45df408ea09ad05987bf4ff92d5d5f2eae3fd9e503d/b61b94c9f38eec70df32d45df408ea09ad05987bf4ff92d5d5f2eae3fd9e503d-json.log*
  scan_frequency: 10s
  fields_under_root: true
 
  docker-json: true
  
  fields:
 
      index: aaa-test
 
      topic: aaa-test
  
      docker_container: k8s_tomcat_tomcat_default_6cc39a2f-2a2b-45a2-94d8-a51faf68dd14_0
 
      k8s_container_name: tomcat
 
      k8s_node_name: cn-hangzhou.172.16.179.195
 
      k8s_pod: tomcat
 
      k8s_pod_namespace: default
 
  tail_files: false
  close_inactive: 2h
  close_eof: false
  close_removed: true
  clean_removed: true
  close_renamed: false

扫码领视频副本.gif

0

精彩评论

暂无评论...
验证码 换一张
取 消

关注公众号