• 1. Docker Daemon架构示意图
  • 2. Docker Daemon启动流程图
  • 3. mainDaemon的具体实现
    • 3.1. 配置初始化
    • 3.2. flag参数检查
    • 3.3. 创建engine对象
    • 3.4. 设置engine的信号捕获
    • 3.5. 加载builtins
      • 3.5.1. 注册初始化网络驱动的Handler
      • 3.5.2. 注册API服务的Handler
      • 3.5.3. 注册events事件的Handler
      • 3.5.4. 注册版本的Handler
      • 3.5.5. 注册registry的Handler
    • 3.6. 使用goroutine加载daemon对象
      • 3.6.1. 创建daemon对象
      • 3.6.2. 通过daemon对象为engine注册Handler
      • 3.6.3. 运行acceptconnections的job
    • 3.7. 打印Docker版本及驱动信息
    • 3.8. serveapi的创建与运行

    1. Docker Daemon架构示意图

    Docker Daemon - 图1

    Docker Daemon是Docker架构中运行在后台的守护进程,大致可以分为Docker Server、Engine和Job三部分。

    Docker Daemon可以认为是通过Docker Server模块接受Docker Client的请求,并在Engine中处理请求,然后根据请求类型,创建出指定的Job并运行。

    运行过程的作用有以下几种可能:

    • 向Docker Registry获取镜像,
    • 通过graphdriver执行容器镜像的本地化操作,
    • 通过networkdriver执行容器网络环境的配置,
    • 通过execdriver执行容器内部运行的执行工作等。

    说明:本文分析的代码为Docker 1.2.0版本。

    2. Docker Daemon启动流程图

    Docker Daemon - 图2

    启动Docker Daemon时,一般可以使用以下命令:docker —daemon=true; docker –d; docker –d=true等。接着由docker的main()函数来解析以上命令的相应flag参数,并最终完成Docker Daemon的启动。

    /docker/docker.go

    1. func main() {
    2. ...
    3. if *flDaemon {
    4. mainDaemon()
    5. return
    6. }
    7. ...
    8. }

    3. mainDaemon的具体实现

    宏观来讲,mainDaemon()完成创建一个daemon进程,并使其正常运行。

    从功能的角度来说,mainDaemon()实现了两部分内容:

    • 第一,创建Docker运行环境;
    • 第二,服务于Docker Client,接收并处理相应请求。

    3.1. 配置初始化

    /docker/daemon.go

    1. var (
    2. daemonCfg = &daemon.Config{}
    3. )
    4. func init() {
    5. daemonCfg.InstallFlags()
    6. }

    在mainDaemon()运行之前,关于Docker Daemon所需要的config配置信息均已经初始化完毕。

    声明一个为daemon包中Config类型的变量,名为daemonCfg。而Config对象,定义了Docker Daemon所需的配置信息。在Docker Daemon在启动时,daemonCfg变量被传递至Docker Daemon并被使用。

    /daemon/config.go

    1. type Config struct {
    2. Pidfile string //Docker Daemon所属进程的PID文件
    3. Root string //Docker运行时所使用的root路径
    4. AutoRestart bool //已被启用,转而支持docker run时的重启
    5. Dns []string //Docker使用的DNS Server地址
    6. DnsSearch []string //Docker使用的指定的DNS查找域名
    7. Mirrors []string //指定的优先Docker Registry镜像
    8. EnableIptables bool //启用Docker的iptables功能
    9. EnableIpForward bool //启用net.ipv4.ip_forward功能
    10. EnableIpMasq bool //启用IP伪装技术
    11. DefaultIp net.IP //绑定容器端口时使用的默认IP
    12. BridgeIface string //添加容器网络至已有的网桥
    13. BridgeIP string //创建网桥的IP地址
    14. FixedCIDR string //指定IP的IPv4子网,必须被网桥子网包含
    15. InterContainerCommunication bool //是否允许相同host上容器间的通信
    16. GraphDriver string //Docker运行时使用的特定存储驱动
    17. GraphOptions []string //可设置的存储驱动选项
    18. ExecDriver string // Docker运行时使用的特定exec驱动
    19. Mtu int //设置容器网络的MTU
    20. DisableNetwork bool //有定义,之后未初始化
    21. EnableSelinuxSupport bool //启用SELinux功能的支持
    22. Context map[string][]string //有定义,之后未初始化
    23. }

    init()函数实现了daemonCfg变量中各属性的赋值,具体的实现为:daemonCfg.InstallFlags()

    /daemon/config.go

    1. // InstallFlags adds command-line options to the top-level flag parser for
    2. // the current process.
    3. // Subsequent calls to `flag.Parse` will populate config with values parsed
    4. // from the command-line.
    5. func (config *Config) InstallFlags() {
    6. flag.StringVar(&config.Pidfile, []string{"p", "-pidfile"}, "/var/run/docker.pid", "Path to use for daemon PID file")
    7. flag.StringVar(&config.Root, []string{"g", "-graph"}, "/var/lib/docker", "Path to use as the root of the Docker runtime")
    8. flag.BoolVar(&config.AutoRestart, []string{"#r", "#-restart"}, true, "--restart on the daemon has been deprecated infavor of --restart policies on docker run")
    9. flag.BoolVar(&config.EnableIptables, []string{"#iptables", "-iptables"}, true, "Enable Docker's addition of iptables rules")
    10. flag.BoolVar(&config.EnableIpForward, []string{"#ip-forward", "-ip-forward"}, true, "Enable net.ipv4.ip_forward")
    11. flag.StringVar(&config.BridgeIP, []string{"#bip", "-bip"}, "", "Use this CIDR notation address for the network bridge's IP, not compatible with -b")
    12. flag.StringVar(&config.BridgeIface, []string{"b", "-bridge"}, "", "Attach containers to a pre-existing network bridge/nuse 'none' to disable container networking")
    13. flag.BoolVar(&config.InterContainerCommunication, []string{"#icc", "-icc"}, true, "Enable inter-container communication")
    14. flag.StringVar(&config.GraphDriver, []string{"s", "-storage-driver"}, "", "Force the Docker runtime to use a specific storage driver")
    15. flag.StringVar(&config.ExecDriver, []string{"e", "-exec-driver"}, "native", "Force the Docker runtime to use a specific exec driver")
    16. flag.BoolVar(&config.EnableSelinuxSupport, []string{"-selinux-enabled"}, false, "Enable selinux support. SELinux does not presently support the BTRFS storage driver")
    17. flag.IntVar(&config.Mtu, []string{"#mtu", "-mtu"}, 0, "Set the containers network MTU/nif no value is provided: default to the default route MTU or 1500 if no default route is available")
    18. opts.IPVar(&config.DefaultIp, []string{"#ip", "-ip"}, "0.0.0.0", "Default IP address to use when binding container ports")
    19. opts.ListVar(&config.GraphOptions, []string{"-storage-opt"}, "Set storage driver options")
    20. // FIXME: why the inconsistency between "hosts" and "sockets"?
    21. opts.IPListVar(&config.Dns, []string{"#dns", "-dns"}, "Force Docker to use specific DNS servers")
    22. opts.DnsSearchListVar(&config.DnsSearch, []string{"-dns-search"}, "Force Docker to use specific DNS search domains")
    23. }

    在InstallFlags()函数的实现过程中,主要是定义某种类型的flag参数,并将该参数的值绑定在config变量的指定属性上,如:

    flag.StringVar(&config.Pidfile, []string{“p”, “-pidfile”}, “ /var/run/docker.pid”, “Path to use for daemon PID file”)

    以上语句的含义为:

    • 定义一个为String类型的flag参数;
    • 该flag的名称为”p”或者”-pidfile”;
    • 该flag的值为” /var/run/docker.pid”,并将该值绑定在变量config.Pidfile上;
    • 该flag的描述信息为”Path to use for daemon PID file”。

    3.2. flag参数检查

    /docker/daemon.go

    1. if flag.NArg() != 0 {
    2. flag.Usage()
    3. return
    4. }
    • 参数个数不为0,则说明在启动Docker Daemon的时候,传入了多余的参数,此时会输出错误提示,并退出运行程序。
    • 若为0,则说明Docker Daemon的启动命令无误,正常运行。

    3.3. 创建engine对象

    /docker/daemon.go

    1. eng := engine.New()

    Engine是Docker架构中的运行引擎,同时也是Docker运行的核心模块。Engine扮演着Docker container存储仓库的角色,并且通过job的形式来管理这些容器。

    /engine/engine.go

    1. type Engine struct {
    2. handlers map[string]Handler
    3. catchall Handler
    4. hack Hack // data for temporary hackery (see hack.go)
    5. id string
    6. Stdout io.Writer
    7. Stderr io.Writer
    8. Stdin io.Reader
    9. Logging bool
    10. tasks sync.WaitGroup
    11. l sync.RWMutex // lock for shutdown
    12. shutdown bool
    13. onShutdown []func() // shutdown handlers
    14. }

    Engine结构体中最为重要的即为handlers属性。该handlers属性为map类型,key为string类型,value为Handler类型。Handler为一个定义的函数。该函数传入的参数为Job指针,返回为Status状态。

    /engine/engine.go

    1. type Handler func(*Job) Status

    New()函数的实现:

    /engine/engine.go

    1. // New initializes a new engine.
    2. func New() *Engine {
    3. eng := &Engine{
    4. handlers: make(map[string]Handler),
    5. id: utils.RandomString(),
    6. Stdout: os.Stdout,
    7. Stderr: os.Stderr,
    8. Stdin: os.Stdin,
    9. Logging: true,
    10. }
    11. eng.Register("commands", func(job *Job) Status {
    12. for _, name := range eng.commands() {
    13. job.Printf("%s/n", name)
    14. }
    15. return StatusOK
    16. })
    17. // Copy existing global handlers
    18. for k, v := range globalHandlers {
    19. eng.handlers[k] = v
    20. }
    21. return eng
    22. }
    1. 创建一个Engine结构体实例eng
    2. 向eng对象注册名为commands的Handler,其中Handler为临时定义的函数func(job *Job) Status{ } , 该函数的作用是通过job来打印所有已经注册完毕的command名称,最终返回状态StatusOK。
    3. 将已定义的变量globalHandlers中的所有的Handler,都复制到eng对象的handlers属性中。最后成功返回eng对象。

    3.4. 设置engine的信号捕获

    /daemon/daemon.go

    1. signal.Trap(eng.Shutdown)

    在Docker Daemon的运行中,设置Trap特定信号的处理方法,特定信号有SIGINT,SIGTERM以及SIGQUIT;当程序捕获到SIGINT或者SIGTERM信号时,执行相应的善后操作,最后保证Docker Daemon程序退出。

    /pkg/signal/trap.go

    1. //Trap sets up a simplified signal "trap", appropriate for common
    2. // behavior expected from a vanilla unix command-line tool in general
    3. // (and the Docker engine in particular).
    4. //
    5. // * If SIGINT or SIGTERM are received, `cleanup` is called, then the process is terminated.
    6. // * If SIGINT or SIGTERM are repeated 3 times before cleanup is complete, then cleanup is
    7. // skipped and the process terminated directly.
    8. // * If "DEBUG" is set in the environment, SIGQUIT causes an exit without cleanup.
    9. //
    10. func Trap(cleanup func()) {
    11. c := make(chan os.Signal, 1)
    12. signals := []os.Signal{os.Interrupt, syscall.SIGTERM}
    13. if os.Getenv("DEBUG") == "" {
    14. signals = append(signals, syscall.SIGQUIT)
    15. }
    16. gosignal.Notify(c, signals...)
    17. go func() {
    18. interruptCount := uint32(0)
    19. for sig := range c {
    20. go func(sig os.Signal) {
    21. log.Printf("Received signal '%v', starting shutdown of docker.../n", sig)
    22. switch sig {
    23. case os.Interrupt, syscall.SIGTERM:
    24. // If the user really wants to interrupt, let him do so.
    25. if atomic.LoadUint32(&interruptCount) < 3 {
    26. atomic.AddUint32(&interruptCount, 1)
    27. // Initiate the cleanup only once
    28. if atomic.LoadUint32(&interruptCount) == 1 {
    29. // Call cleanup handler
    30. cleanup()
    31. os.Exit(0)
    32. } else {
    33. return
    34. }
    35. } else {
    36. log.Printf("Force shutdown of docker, interrupting cleanup/n")
    37. }
    38. case syscall.SIGQUIT:
    39. }
    40. os.Exit(128 + int(sig.(syscall.Signal)))
    41. }(sig)
    42. }
    43. }()
    44. }
    • 创建并设置一个channel,用于发送信号通知;
    • 定义signals数组变量,初始值为os.SIGINT, os.SIGTERM;若环境变量DEBUG为空的话,则添加os.SIGQUIT至signals数组;
    • 通过gosignal.Notify(c, signals…)中Notify函数来实现将接收到的signal信号传递给c。需要注意的是只有signals中被罗列出的信号才会被传递给c,其余信号会被直接忽略;
    • 创建一个goroutine来处理具体的signal信号,当信号类型为os.Interrupt或者syscall.SIGTERM时,执行传入Trap函数的具体执行方法,形参为cleanup(),实参为eng.Shutdown。

    Shutdown()函数的定义位于./docker/engine/engine.go,主要做的工作是为Docker Daemon的关闭做一些善后工作。

    /engine/engine.go

    1. // Shutdown permanently shuts down eng as follows:
    2. // - It refuses all new jobs, permanently.
    3. // - It waits for all active jobs to complete (with no timeout)
    4. // - It calls all shutdown handlers concurrently (if any)
    5. // - It returns when all handlers complete, or after 15 seconds,
    6. // whichever happens first.
    7. func (eng *Engine) Shutdown() {
    8. eng.l.Lock()
    9. if eng.shutdown {
    10. eng.l.Unlock()
    11. return
    12. }
    13. eng.shutdown = true
    14. eng.l.Unlock()
    15. // We don't need to protect the rest with a lock, to allow
    16. // for other calls to immediately fail with "shutdown" instead
    17. // of hanging for 15 seconds.
    18. // This requires all concurrent calls to check for shutdown, otherwise
    19. // it might cause a race.
    20. // Wait for all jobs to complete.
    21. // Timeout after 5 seconds.
    22. tasksDone := make(chan struct{})
    23. go func() {
    24. eng.tasks.Wait()
    25. close(tasksDone)
    26. }()
    27. select {
    28. case <-time.After(time.Second * 5):
    29. case <-tasksDone:
    30. }
    31. // Call shutdown handlers, if any.
    32. // Timeout after 10 seconds.
    33. var wg sync.WaitGroup
    34. for _, h := range eng.onShutdown {
    35. wg.Add(1)
    36. go func(h func()) {
    37. defer wg.Done()
    38. h()
    39. }(h)
    40. }
    41. done := make(chan struct{})
    42. go func() {
    43. wg.Wait()
    44. close(done)
    45. }()
    46. select {
    47. case <-time.After(time.Second * 10):
    48. case <-done:
    49. }
    50. return
    51. }
    • Docker Daemon不再接收任何新的Job;
    • Docker Daemon等待所有存活的Job执行完毕;
    • Docker Daemon调用所有shutdown的处理方法;
    • 当所有的handler执行完毕,或者15秒之后,Shutdown()函数返回。

    由于在signal.Trap( eng.Shutdown )函数的具体实现中执行eng.Shutdown,在执行完eng.Shutdown之后,随即执行os.Exit(0),完成当前程序的立即退出。

    3.5. 加载builtins

    /docker/daemon.go

    1. if err := builtins.Register(eng); err != nil {
    2. log.Fatal(err)
    3. }

    为engine注册多个Handler,以便后续在执行相应任务时,运行指定的Handler。

    这些Handler包括:

    • 网络初始化、
    • web API服务、
    • 事件查询、
    • 版本查看、
    • Docker Registry验证与搜索。

    /builtins/builtins.go

    1. func Register(eng *engine.Engine) error {
    2. if err := daemon(eng); err != nil {
    3. return err
    4. }
    5. if err := remote(eng); err != nil {
    6. return err
    7. }
    8. if err := events.New().Install(eng); err != nil {
    9. return err
    10. }
    11. if err := eng.Register("version", dockerVersion); err != nil {
    12. return err
    13. }
    14. return registry.NewService().Install(eng)
    15. }

    3.5.1. 注册初始化网络驱动的Handler

    daemon(eng)的实现过程,主要为eng对象注册了一个key为”init_networkdriver”的Handler,该Handler的值为bridge.InitDriver函数,代码如下:

    /builtins/builtins.go

    1. func daemon(eng *engine.Engine) error {
    2. return eng.Register("init_networkdriver", bridge.InitDriver)
    3. }

    需要注意的是,向eng对象注册Handler,并不代表Handler的值函数会被直接运行,如bridge.InitDriver,并不会直接运行,而是将bridge.InitDriver的函数入口,写入eng的handlers属性中。

    /daemon/networkdriver/bridge/driver.go

    1. func InitDriver(job *engine.Job) engine.Status {
    2. var (
    3. network *net.IPNet
    4. enableIPTables = job.GetenvBool("EnableIptables")
    5. icc = job.GetenvBool("InterContainerCommunication")
    6. ipForward = job.GetenvBool("EnableIpForward")
    7. bridgeIP = job.Getenv("BridgeIP")
    8. )
    9. if defaultIP := job.Getenv("DefaultBindingIP"); defaultIP != "" {
    10. defaultBindingIP = net.ParseIP(defaultIP)
    11. }
    12. bridgeIface = job.Getenv("BridgeIface")
    13. usingDefaultBridge := false
    14. if bridgeIface == "" {
    15. usingDefaultBridge = true
    16. bridgeIface = DefaultNetworkBridge
    17. }
    18. addr, err := networkdriver.GetIfaceAddr(bridgeIface)
    19. if err != nil {
    20. // If we're not using the default bridge, fail without trying to create it
    21. if !usingDefaultBridge {
    22. job.Logf("bridge not found: %s", bridgeIface)
    23. return job.Error(err)
    24. }
    25. // If the iface is not found, try to create it
    26. job.Logf("creating new bridge for %s", bridgeIface)
    27. if err := createBridge(bridgeIP); err != nil {
    28. return job.Error(err)
    29. }
    30. job.Logf("getting iface addr")
    31. addr, err = networkdriver.GetIfaceAddr(bridgeIface)
    32. if err != nil {
    33. return job.Error(err)
    34. }
    35. network = addr.(*net.IPNet)
    36. } else {
    37. network = addr.(*net.IPNet)
    38. // validate that the bridge ip matches the ip specified by BridgeIP
    39. if bridgeIP != "" {
    40. bip, _, err := net.ParseCIDR(bridgeIP)
    41. if err != nil {
    42. return job.Error(err)
    43. }
    44. if !network.IP.Equal(bip) {
    45. return job.Errorf("bridge ip (%s) does not match existing bridge configuration %s", network.IP, bip)
    46. }
    47. }
    48. }
    49. // Configure iptables for link support
    50. if enableIPTables {
    51. if err := setupIPTables(addr, icc); err != nil {
    52. return job.Error(err)
    53. }
    54. }
    55. if ipForward {
    56. // Enable IPv4 forwarding
    57. if err := ioutil.WriteFile("/proc/sys/net/ipv4/ip_forward", []byte{'1', '/n'}, 0644); err != nil {
    58. job.Logf("WARNING: unable to enable IPv4 forwarding: %s/n", err)
    59. }
    60. }
    61. // We can always try removing the iptables
    62. if err := iptables.RemoveExistingChain("DOCKER"); err != nil {
    63. return job.Error(err)
    64. }
    65. if enableIPTables {
    66. chain, err := iptables.NewChain("DOCKER", bridgeIface)
    67. if err != nil {
    68. return job.Error(err)
    69. }
    70. portmapper.SetIptablesChain(chain)
    71. }
    72. bridgeNetwork = network
    73. // https://github.com/docker/docker/issues/2768
    74. job.Eng.Hack_SetGlobalVar("httpapi.bridgeIP", bridgeNetwork.IP)
    75. for name, f := range map[string]engine.Handler{
    76. "allocate_interface": Allocate,
    77. "release_interface": Release,
    78. "allocate_port": AllocatePort,
    79. "link": LinkContainers,
    80. } {
    81. if err := job.Eng.Register(name, f); err != nil {
    82. return job.Error(err)
    83. }
    84. }
    85. return engine.StatusOK
    86. }

    Bridge.InitDriver的作用:

    • 获取为Docker服务的网络设备的地址;
    • 创建指定IP地址的网桥;
    • 配置网络iptables规则;
    • 另外还为eng对象注册了多个Handler,如 ”allocate_interface”, ”release_interface”, ”allocate_port”,”link”。

    3.5.2. 注册API服务的Handler

    remote(eng)的实现过程,主要为eng对象注册了两个Handler,分别为”serveapi”与”acceptconnections”。代码实现如下:

    /builtins/builtins.go

    1. func remote(eng *engine.Engine) error {
    2. if err := eng.Register("serveapi", apiserver.ServeApi); err != nil {
    3. return err
    4. }
    5. return eng.Register("acceptconnections", apiserver.AcceptConnections)
    6. }

    注册的两个Handler名称分别为”serveapi”与”acceptconnections”

    • ServeApi执行时,通过循环多种协议,创建出goroutine来配置指定的http.Server,最终为不同的协议请求服务;
    • AcceptConnections的实现主要是为了通知init守护进程,Docker Daemon已经启动完毕,可以让Docker Daemon进程接受请求。(守护进程)

    3.5.3. 注册events事件的Handler

    events.New().Install(eng)的实现过程,为Docker注册了多个event事件,功能是给Docker用户提供API,使得用户可以通过这些API查看Docker内部的events信息,log信息以及subscribers_count信息。

    /events/events.go

    1. type Events struct {
    2. mu sync.RWMutex
    3. events []*utils.JSONMessage
    4. subscribers []listener
    5. }
    6. func New() *Events {
    7. return &Events{
    8. events: make([]*utils.JSONMessage, 0, eventsLimit),
    9. }
    10. }
    11. // Install installs events public api in docker engine
    12. func (e *Events) Install(eng *engine.Engine) error {
    13. // Here you should describe public interface
    14. jobs := map[string]engine.Handler{
    15. "events": e.Get,
    16. "log": e.Log,
    17. "subscribers_count": e.SubscribersCount,
    18. }
    19. for name, job := range jobs {
    20. if err := eng.Register(name, job); err != nil {
    21. return err
    22. }
    23. }
    24. return nil
    25. }

    3.5.4. 注册版本的Handler

    eng.Register(“version”,dockerVersion)的实现过程,向eng对象注册key为”version”,value为”dockerVersion”执行方法的Handler,dockerVersion的执行过程中,会向名为version的job的标准输出中写入Docker的版本,Docker API的版本,git版本,Go语言运行时版本以及操作系统等版本信息。

    /builtins/builtins.go

    1. // builtins jobs independent of any subsystem
    2. func dockerVersion(job *engine.Job) engine.Status {
    3. v := &engine.Env{}
    4. v.SetJson("Version", dockerversion.VERSION)
    5. v.SetJson("ApiVersion", api.APIVERSION)
    6. v.Set("GitCommit", dockerversion.GITCOMMIT)
    7. v.Set("GoVersion", runtime.Version())
    8. v.Set("Os", runtime.GOOS)
    9. v.Set("Arch", runtime.GOARCH)
    10. if kernelVersion, err := kernel.GetKernelVersion(); err == nil {
    11. v.Set("KernelVersion", kernelVersion.String())
    12. }
    13. if _, err := v.WriteTo(job.Stdout); err != nil {
    14. return job.Error(err)
    15. }
    16. return engine.StatusOK
    17. }

    3.5.5. 注册registry的Handler

    registry.NewService().Install(eng)的实现过程位于./docker/registry/service.go,在eng对象对外暴露的API信息中添加docker registry的信息。当registry.NewService()成功被Install安装完毕的话,则有两个调用能够被eng使用:”auth”,向公有registry进行认证;”search”,在公有registry上搜索指定的镜像。

    /registry/service.go

    1. // NewService returns a new instance of Service ready to be
    2. // installed no an engine.
    3. func NewService() *Service {
    4. return &Service{}
    5. }
    6. // Install installs registry capabilities to eng.
    7. func (s *Service) Install(eng *engine.Engine) error {
    8. eng.Register("auth", s.Auth)
    9. eng.Register("search", s.Search)
    10. return nil
    11. }

    3.6. 使用goroutine加载daemon对象

    执行完builtins的加载,回到mainDaemon()的执行,通过一个goroutine来加载daemon对象并开始运行。这一环节的执行,主要包含三个步骤:

    • 通过init函数中初始化的daemonCfg与eng对象来创建一个daemon对象d;(守护进程)
    • 通过daemon对象的Install函数,向eng对象中注册众多的Handler;
    • 在Docker Daemon启动完毕之后,运行名为”acceptconnections”的job,主要工作为向init守护进程发送”READY=1”信号,以便开始正常接受请求。

    /docker/daemon.go

    1. // load the daemon in the background so we can immediately start
    2. // the http api so that connections don't fail while the daemon
    3. // is booting
    4. go func() {
    5. d, err := daemon.NewDaemon(daemonCfg, eng)
    6. if err != nil {
    7. log.Fatal(err)
    8. }
    9. if err := d.Install(eng); err != nil {
    10. log.Fatal(err)
    11. }
    12. // after the daemon is done setting up we can tell the api to start
    13. // accepting connections
    14. if err := eng.Job("acceptconnections").Run(); err != nil {
    15. log.Fatal(err)
    16. }
    17. }()

    3.6.1. 创建daemon对象

    /docker/daemon.go

    1. d, err := daemon.NewDaemon(daemonCfg, eng)
    2. if err != nil {
    3. log.Fatal(err)
    4. }

    daemon.NewDaemon(daemonCfg, eng)是创建daemon对象d的核心部分。主要作用为初始化Docker Daemon的基本环境,如处理config参数,验证系统支持度,配置Docker工作目录,设置与加载多种driver,创建graph环境等,验证DNS配置等。具体参考NewDaemon 。

    3.6.2. 通过daemon对象为engine注册Handler

    当创建完daemon对象,goroutine执行d.Install(eng)

    /daemon/daemon.go

    1. type Daemon struct {
    2. repository string
    3. sysInitPath string
    4. containers *contStore
    5. graph *graph.Graph
    6. repositories *graph.TagStore
    7. idIndex *truncindex.TruncIndex
    8. sysInfo *sysinfo.SysInfo
    9. volumes *graph.Graph
    10. eng *engine.Engine
    11. config *Config
    12. containerGraph *graphdb.Database
    13. driver graphdriver.Driver
    14. execDriver execdriver.Driver
    15. }
    16. // Install installs daemon capabilities to eng.
    17. func (daemon *Daemon) Install(eng *engine.Engine) error {
    18. // FIXME: rename "delete" to "rm" for consistency with the CLI command
    19. // FIXME: rename ContainerDestroy to ContainerRm for consistency with the CLI command
    20. // FIXME: remove ImageDelete's dependency on Daemon, then move to graph/
    21. for name, method := range map[string]engine.Handler{
    22. "attach": daemon.ContainerAttach,
    23. "build": daemon.CmdBuild,
    24. "commit": daemon.ContainerCommit,
    25. "container_changes": daemon.ContainerChanges,
    26. "container_copy": daemon.ContainerCopy,
    27. "container_inspect": daemon.ContainerInspect,
    28. "containers": daemon.Containers,
    29. "create": daemon.ContainerCreate,
    30. "delete": daemon.ContainerDestroy,
    31. "export": daemon.ContainerExport,
    32. "info": daemon.CmdInfo,
    33. "kill": daemon.ContainerKill,
    34. "logs": daemon.ContainerLogs,
    35. "pause": daemon.ContainerPause,
    36. "resize": daemon.ContainerResize,
    37. "restart": daemon.ContainerRestart,
    38. "start": daemon.ContainerStart,
    39. "stop": daemon.ContainerStop,
    40. "top": daemon.ContainerTop,
    41. "unpause": daemon.ContainerUnpause,
    42. "wait": daemon.ContainerWait,
    43. "image_delete": daemon.ImageDelete, // FIXME: see above
    44. } {
    45. if err := eng.Register(name, method); err != nil {
    46. return err
    47. }
    48. }
    49. if err := daemon.Repositories().Install(eng); err != nil {
    50. return err
    51. }
    52. // FIXME: this hack is necessary for legacy integration tests to access
    53. // the daemon object.
    54. eng.Hack_SetGlobalVar("httpapi.daemon", daemon)
    55. return nil
    56. }

    以上代码的实现分为三部分:

    • 向eng对象中注册众多的Handler对象;
    • daemon.Repositories().Install(eng)实现了向eng对象注册多个与image相关的Handler,Install的实现位于./docker/graph/service.go;
    • eng.Hack_SetGlobalVar(“httpapi.daemon”, daemon)实现向eng对象中map类型的hack对象中添加一条记录,key为”httpapi.daemon”,value为daemon。

    3.6.3. 运行acceptconnections的job

    /docker/daemon.go

    1. if err := eng.Job("acceptconnections").Run(); err != nil {
    2. log.Fatal(err)
    3. }

    在goroutine内部最后运行名为”acceptconnections”的job,主要作用是通知init守护进程,Docker Daemon可以开始接受请求了。

    首先执行eng.Job(“acceptconnections”),返回一个Job,随后再执行eng.Job(“acceptconnections”).Run(),也就是该执行Job的run函数。

    /engine/engine.go

    1. // Job creates a new job which can later be executed.
    2. // This function mimics `Command` from the standard os/exec package.
    3. func (eng *Engine) Job(name string, args ...string) *Job {
    4. job := &Job{
    5. Eng: eng,
    6. Name: name,
    7. Args: args,
    8. Stdin: NewInput(),
    9. Stdout: NewOutput(),
    10. Stderr: NewOutput(),
    11. env: &Env{},
    12. }
    13. if eng.Logging {
    14. job.Stderr.Add(utils.NopWriteCloser(eng.Stderr))
    15. }
    16. // Catchall is shadowed by specific Register.
    17. if handler, exists := eng.handlers[name]; exists {
    18. job.handler = handler
    19. } else if eng.catchall != nil && name != "" {
    20. // empty job names are illegal, catchall or not.
    21. job.handler = eng.catchall
    22. }
    23. return job
    24. }
    1. 首先创建一个类型为Job的job对象,该对象中Eng属性为函数的调用者eng,Name属性为”acceptconnections”,没有参数传入。
    2. 另外在eng对象所有的handlers属性中寻找键为”acceptconnections”记录的值,由于在加载builtins操作中的remote(eng)中已经向eng注册过这样的一条记录,key为”acceptconnections”,value为apiserver.AcceptConnections。
    3. 因此job对象的handler为apiserver.AcceptConnections。
    4. 最后返回已经初始化完毕的对象job。

    创建完job对象之后,随即执行该job对象的run()函数。

    /engine/job.go

    1. // A job is the fundamental unit of work in the docker engine.
    2. // Everything docker can do should eventually be exposed as a job.
    3. // For example: execute a process in a container, create a new container,
    4. // download an archive from the internet, serve the http api, etc.
    5. //
    6. // The job API is designed after unix processes: a job has a name, arguments,
    7. // environment variables, standard streams for input, output and error, and
    8. // an exit status which can indicate success (0) or error (anything else).
    9. //
    10. // One slight variation is that jobs report their status as a string. The
    11. // string "0" indicates success, and any other strings indicates an error.
    12. // This allows for richer error reporting.
    13. //
    14. type Job struct {
    15. Eng *Engine
    16. Name string
    17. Args []string
    18. env *Env
    19. Stdout *Output
    20. Stderr *Output
    21. Stdin *Input
    22. handler Handler
    23. status Status
    24. end time.Time
    25. }
    26. type Status int
    27. const (
    28. StatusOK Status = 0
    29. StatusErr Status = 1
    30. StatusNotFound Status = 127
    31. )
    32. // Run executes the job and blocks until the job completes.
    33. // If the job returns a failure status, an error is returned
    34. // which includes the status.
    35. func (job *Job) Run() error {
    36. if job.Eng.IsShutdown() {
    37. return fmt.Errorf("engine is shutdown")
    38. }
    39. // FIXME: this is a temporary workaround to avoid Engine.Shutdown
    40. // waiting 5 seconds for server/api.ServeApi to complete (which it never will)
    41. // everytime the daemon is cleanly restarted.
    42. // The permanent fix is to implement Job.Stop and Job.OnStop so that
    43. // ServeApi can cooperate and terminate cleanly.
    44. if job.Name != "serveapi" {
    45. job.Eng.l.Lock()
    46. job.Eng.tasks.Add(1)
    47. job.Eng.l.Unlock()
    48. defer job.Eng.tasks.Done()
    49. }
    50. // FIXME: make this thread-safe
    51. // FIXME: implement wait
    52. if !job.end.IsZero() {
    53. return fmt.Errorf("%s: job has already completed", job.Name)
    54. }
    55. // Log beginning and end of the job
    56. job.Eng.Logf("+job %s", job.CallString())
    57. defer func() {
    58. job.Eng.Logf("-job %s%s", job.CallString(), job.StatusString())
    59. }()
    60. var errorMessage = bytes.NewBuffer(nil)
    61. job.Stderr.Add(errorMessage)
    62. if job.handler == nil {
    63. job.Errorf("%s: command not found", job.Name)
    64. job.status = 127
    65. } else {
    66. job.status = job.handler(job)
    67. job.end = time.Now()
    68. }
    69. // Wait for all background tasks to complete
    70. if err := job.Stdout.Close(); err != nil {
    71. return err
    72. }
    73. if err := job.Stderr.Close(); err != nil {
    74. return err
    75. }
    76. if err := job.Stdin.Close(); err != nil {
    77. return err
    78. }
    79. if job.status != 0 {
    80. return fmt.Errorf("%s", Tail(errorMessage, 1))
    81. }
    82. return nil
    83. }

    Run()函数的实现位于./docker/engine/job.go,该函数执行指定的job,并在job执行完成前一直阻塞。对于名为”acceptconnections”的job对象,运行代码为job.status = job.handler(job),由于job.handler值为apiserver.AcceptConnections,故真正执行的是job.status = apiserver.AcceptConnections(job)。

    进入AcceptConnections的具体实现,位于./docker/api/server/server.go,如下:

    /api/server/server.go

    1. func AcceptConnections(job *engine.Job) engine.Status {
    2. // Tell the init daemon we are accepting requests
    3. go systemd.SdNotify("READY=1")
    4. if activationLock != nil {
    5. close(activationLock)
    6. }
    7. return engine.StatusOK
    8. }

    重点为go systemd.SdNotify(“READY=1”)的实现,位于./docker/pkg/system/sd_notify.go,主要作用是通知init守护进程Docker Daemon的启动已经全部完成,潜在的功能是使得Docker Daemon开始接受Docker Client发送来的API请求。

    至此,已经完成通过goroutine来加载daemon对象并运行。

    3.7. 打印Docker版本及驱动信息

    显示docker的版本信息,以及ExecDriver和GraphDriver这两个驱动的具体信息

    /docker/daemon.go

    1. // TODO actually have a resolved graphdriver to show?
    2. log.Printf("docker daemon: %s %s; execdriver: %s; graphdriver: %s",
    3. dockerversion.VERSION,
    4. dockerversion.GITCOMMIT,
    5. daemonCfg.ExecDriver,
    6. daemonCfg.GraphDriver,
    7. )

    3.8. serveapi的创建与运行

    打印部分Docker具体信息之后,Docker Daemon立即创建并运行名为”serveapi”的job,主要作用为让Docker Daemon提供API访问服务。

    /docker/daemon.go

    1. // Serve api
    2. job := eng.Job("serveapi", flHosts...)
    3. job.SetenvBool("Logging", true)
    4. job.SetenvBool("EnableCors", *flEnableCors)
    5. job.Setenv("Version", dockerversion.VERSION)
    6. job.Setenv("SocketGroup", *flSocketGroup)
    7. job.SetenvBool("Tls", *flTls)
    8. job.SetenvBool("TlsVerify", *flTlsVerify)
    9. job.Setenv("TlsCa", *flCa)
    10. job.Setenv("TlsCert", *flCert)
    11. job.Setenv("TlsKey", *flKey)
    12. job.SetenvBool("BufferRequests", true)
    13. if err := job.Run(); err != nil {
    14. log.Fatal(err)
    15. }
    1. 创建一个名为”serveapi”的job,并将flHosts的值赋给job.Args。flHost的作用主要是为Docker Daemon提供使用的协议与监听的地址。
    2. Docker Daemon为该job设置了众多的环境变量,如安全传输层协议的环境变量等。最后通过job.Run()运行该serveapi的job。

    由于在eng中key为”serveapi”的handler,value为apiserver.ServeApi,故该job运行时,执行apiserver.ServeApi函数,位于./docker/api/server/server.go。ServeApi函数的作用主要是对于用户定义的所有支持协议,Docker Daemon均创建一个goroutine来启动相应的http.Server,分别为不同的协议服务。具体参考Docker Server。

    参考:

    • 《Docker源码分析》