本文将从docker(1.12.6)源码的角度剖析docker daemon怎么将容器的日志网络出来并通过配置的log-driver发送出去,并结合示例先容了好雨云帮中实现的一个zmq-loger。阅读本文,你也可以实现适宜自己业务场景的log-driver。
阅读准备
本文适宜能够阅读和编写golang代码的同学。(1)首先你须要认知以下几个关键词:

你须要知道关于进程产生日志的形式
:进程产生日志有两类输出办法,一类是写入到文件中。另一类是直接写到stdout或者stderr,例如php的echo
python的print
golang的fmt.Println(\"大众\"大众)
等等。(3)是否知道docker-daemon与运行中container的关系?
一个container便是一个分外的进程,它是由docker daemon创建并启动,因此container是docker daemon的子进程。由docker daemon守护和管理。因此container的stdout能够被docker daemon获取到。基于此理论,我们来剖析docker daemon干系代码。docker-daemon关于日志源码剖析
container实例源码
# /container/container.go:62type CommonContainer struct{ StreamConfig stream.Config ...}# /container/stream/streams.go:26type Config struct {sync.WaitGroupstdout broadcaster.Unbufferedstderr broadcaster.Unbufferedstdin io.ReadCloserstdinPipe io.WriteCloser}
找到如上所示对应的代码,显示了每一个container实例都有几个属性stdout,stderr,stdin,以及管道stdinPipe。这里说下stdinPipe,当容器利用-i参数启动时标准输入将被运行,daemon将能够利用此管道向容器内写入标准输入。
# /container/container.go:312func (container Container) StartLogger(cfg containertypes.LogConfig) (logger.Logger, error) {c, err := logger.GetLogDriver(cfg.Type)if err != nil {return nil, fmt.Errorf(\"大众Failed to get logging factory: %v\"大众, err)}ctx := logger.Context{Config: cfg.Config,ContainerID: container.ID,ContainerName: container.Name,ContainerEntrypoint: container.Path,ContainerArgs: container.Args,ContainerImageID: container.ImageID.String(),ContainerImageName: container.Config.Image,ContainerCreated: container.Created,ContainerEnv: container.Config.Env,ContainerLabels: container.Config.Labels,DaemonName: \"大众docker\"大众,}// Set logging file for \"大众json-logger\"大众if cfg.Type == jsonfilelog.Name {ctx.LogPath, err = container.GetRootResourcePath(fmt.Sprintf(\公众%s-json.log\"大众, container.ID))if err != nil {return nil, err}}return c(ctx)}#/container/container.go:978func (container Container) startLogging() error {if container.HostConfig.LogConfig.Type == \公众none\"大众 {return nil // do not start logging routines}l, err := container.StartLogger(container.HostConfig.LogConfig)if err != nil {return fmt.Errorf(\"大众Failed to initialize logging driver: %v\"大众, err)}copier := logger.NewCopier(map[string]io.Reader{\"大众stdout\公众: container.StdoutPipe(), \"大众stderr\"大众: container.StderrPipe()}, l)container.LogCopier = copiercopier.Run()container.LogDriver = l// set LogPath field only for json-file logdriverif jl, ok := l.(jsonfilelog.JSONFileLogger); ok {container.LogPath = jl.LogPath()}return nil}
第一个方法是为container查找log-driver。首先根据容器配置的log-driver种别调用:logger.GetLogDriver(cfg.Type)
返回一个方法类型:
/daemon/logger/factory.go:9type Creator func(Context) (Logger, error)
本色便是从工厂类注册的logdriver插件去查找,详细源码下文剖析。获取到c方法后构建调用参数详细便是容器的一些信息。然后利用调用c方法返回driver。driver是个接口类型,我们看看有哪些方法:
# /daemon/logger/logger.go:61type Logger interface {Log(Message) errorName() stringClose() error}
很大略的三个方法,也很随意马虎理解,Log()
发送日志到driver,Close()
进行关闭操作(根据不同实现)。也便是说我们自己实现一个logdriver,只须要实现如上三个方法,然后注册到logger工厂类中即可。下面我们来看/daemon/logger/factory.go
第二个方法便是处理日志了,获取到日志driver,在创建一个Copier
,顾名思义便是复制日志,分别从stdout 和stderr复制到logger driver。下面看看详细关键实现:
#/daemon/logger/copir.go:41func (c Copier) copySrc(name string, src io.Reader) {defer c.copyJobs.Done()reader := bufio.NewReader(src)for {select {case <-c.closed:returndefault:line, err := reader.ReadBytes('\n')line = bytes.TrimSuffix(line, []byte{'\n'})// ReadBytes can return full or partial output even when it failed.// e.g. it can return a full entry and EOF.if err == nil || len(line) >0 {if logErr := c.dst.Log(&Message{Line: line, Source: name, Timestamp: time.Now().UTC()}); logErr != nil {logrus.Errorf(\公众Failed to log msg %q for logger %s: %s\"大众, line, c.dst.Name(), logErr)}}if err != nil {if err != io.EOF {logrus.Errorf(\公众Error scanning log stream: %s\公众, err)}return}}}}
每读取一行数据,构建一个,调用logdriver的log方法发送到driver处理。
日志driver注册器
位于/daemon/logger/factory.go
的源码实现即时日志driver的注册器,个中几个主要的方法(上文已经提到一个):# /daemon/logger/factory.go:21func (lf logdriverFactory) register(name string, c Creator) error {if lf.driverRegistered(name) {return fmt.Errorf(\公众logger: log driver named '%s' is already registered\"大众, name)}lf.m.Lock()lf.registry[name] = clf.m.Unlock()return nil}# /daemon/logger/factory.go:39func (lf logdriverFactory) registerLogOptValidator(name string, l LogOptValidator) error {lf.m.Lock()defer lf.m.Unlock()if _, ok := lf.optValidator[name]; ok {return fmt.Errorf(\"大众logger: log validator named '%s' is already registered\公众, name)}lf.optValidator[name] = lreturn nil}
看起来很大略,便是将一个Creator
方法类型添加到一个map构造中,将LogOptValidator
添加到另一个map这里把稳加锁的操作。
#/daemon/logger/factory.go:13type LogOptValidator func(cfg map[string]string) error
这个紧张是验证driver的参数 ,dockerd和docker启动参数中有:--log-opt
好雨云帮自己实现一个基于zmq的log-driver
上文已经完全剖析了docker daemon管理logdriver和处理日志的全体流程。相信你已经比较明白了。下面我们以zmq-driver为例讲讲我们怎么实现自己的driver。直接吸收留器的日志。上文我们已经谈了一个log-driver须要实现的几个方法。我们可以看看位于/daemon/logger
目录下的已有的driver的实现,例如fluentd
,awslogs
等。下面我们来剖析zmq-driver详细的代码://定义一个struct,这里包含一个zmq套接字type ZmqLogger struct {writer zmq.SocketcontainerId stringtenantId stringserviceId stringfelock sync.Mutex}//定义init方法调用logger注册器的方法注册当前driver//和参数验证方法。func init() {if err := logger.RegisterLogDriver(name, New); err != nil {logrus.Fatal(err)}if err := logger.RegisterLogOptValidator(name, ValidateLogOpt); err != nil {logrus.Fatal(err)}}//实现一个上文提到的Creator方法注册logdriver.//这里新建一个zmq套接字构建一个实例func New(ctx logger.Context) (logger.Logger, error) {zmqaddress := ctx.Config[zmqAddress]puber, err := zmq.NewSocket(zmq.PUB)if err != nil {return nil, err}var (env = make(map[string]string)tenantId stringserviceId string)for _, pair := range ctx.ContainerEnv {p := strings.SplitN(pair, \"大众=\"大众,2)//logrus.Errorf(\公众ContainerEnv pair: %s\公众, pair)if len(p) ==2 {key := p[0]value := p[1]env[key] = value}}tenantId = env[\公众TENANT_ID\"大众]serviceId = env[\公众SERVICE_ID\公众]if tenantId == \公众\"大众 {tenantId = \"大众default\公众}if serviceId == \公众\"大众 {serviceId = \"大众default\"大众}puber.Connect(zmqaddress)return &ZmqLogger{writer: puber,containerId: ctx.ID(),tenantId: tenantId,serviceId: serviceId,felock: sync.Mutex{},}, nil}//实现Log方法,这里利用zmq socket发送日志//这里必须把稳,zmq socket是线程不屈安的,我们知道//本方法可能被两个线程(复制stdout和肤质stderr)调用//必须利用锁担保线程安全。否则会发生缺点。func (s ZmqLogger) Log(msg logger.Message) error {s.felock.Lock()defer s.felock.Unlock()s.writer.Send(s.tenantId, zmq.SNDMORE)s.writer.Send(s.serviceId, zmq.SNDMORE)if msg.Source == \"大众stderr\"大众 {s.writer.Send(s.containerId+\"大众: \"大众+string(msg.Line), zmq.DONTWAIT)} else {s.writer.Send(s.containerId+\"大众: \公众+string(msg.Line), zmq.DONTWAIT)}return nil}//实现Close方法,这里用来关闭zmq socket。//同样把稳线程安全,调用此方法的是容器关闭协程。func (s ZmqLogger) Close() error {s.felock.Lock()defer s.felock.Unlock()if s.writer != nil {return s.writer.Close()}return nil}func (s ZmqLogger) Name() string {return name}//验证参数的方法,我们利用参数传入zmq pub的地址。func ValidateLogOpt(cfg map[string]string) error {for key := range cfg {switch key {case zmqAddress:default:return fmt.Errorf(\公众unknown log opt '%s' for %s log driver\公众, key, name)}}if cfg[zmqAddress] == \"大众\公众 {return fmt.Errorf(\公众must specify a value for log opt '%s'\"大众, zmqAddress)}return nil}
总结
多研究源码可以方便我们理解docker的事情事理。本日我们剖析了日志部分。希望读者对这部分功能能够理解得更清晰。