containerd源码-启动容器

containerd得启动也分为服务端和客户端

代码版本为v.17.5

客户端

ctr启动一个pod有两种方式,一个是是run命令直接启动一个pod,还有一种先创建container,在创建task在启动,run命令指示把contaIner和task一块处理了

container create

  • 入口这里
// cmd/ctr/commands/containers/containers.go
var createCommand = cli.Command{
  Name:      "create",
  Usage:     "create container",
  ArgsUsage: "[flags] Image|RootFS CONTAINER [COMMAND] [ARG...]",
  Flags:     append(commands.SnapshotterFlags, commands.ContainerFlags...),
  Action: func(context *cli.Context) error {
    // 参数处理
    client, ctx, cancel, err := commands.NewClient(context)
    if err != nil {
      return err
    }
    defer cancel()
    _, err = run.NewContainer(ctx, client, context)
    if err != nil {
      return err
    }
    return nil
  },
}
  • 去除配置相关的主要是查看snapshotter中有没有解压如果没有则解压,然后将处理后的配置信息传递给client.NewContainer()
// cmd/ctr/commands/run/run_unix.go

func NewContainer(ctx gocontext.Context, client *containerd.Client, context *cli.Context) (containerd.Container, error) {
      // ...
      i, err := client.ImageService().Get(ctx, ref)
      if ps := context.String("platform"); ps != "" {
        platform, err := platforms.Parse(ps)
        image = containerd.NewImageWithPlatform(client, i, platforms.Only(platform))
      } else {
        image = containerd.NewImage(client, i)

      unpacked, err := image.IsUnpacked(ctx, snapshotter)
      if err != nil {
        return nil, err
      }
      if !unpacked {
        if err := image.Unpack(ctx, snapshotter); err != nil {
          return nil, err
        }
      }

  cOpts = append(cOpts, spec)

  return client.NewContainer(ctx, id, cOpts...)
}
  • 随后将前面的参数传递到Container,然后调用grpc创建container
// client.go


// NewContainer will create a new container in container with the provided id
// the id must be unique within the namespace
func (c *Client) NewContainer(ctx context.Context, id string, opts ...NewContainerOpts) (Container, error) {
  ctx, done, err := c.WithLease(ctx)
  defer done(ctx)

  container := containers.Container{
    ID: id,
    Runtime: containers.RuntimeInfo{
      Name: c.runtime,
    },
  }
  for _, o := range opts {
    if err := o(ctx, c, &container); err != nil {
      return nil, err
    }
  }

  r, err := c.ContainerService().Create(ctx, container)

  return containerFromRecord(c, r), nil
}

task start

  • 前面创建完容器之后就需要创建一个task
  • 创建完task之后就启动task,这里处理detach这个参数如果有则不会退出之后删除
// cmd/ctr/commands/tasks/start.go

var startCommand = cli.Command{
  Name:      "start",
  Usage:     "start a container that has been created",
  ArgsUsage: "CONTAINER",
  Flags: []cli.Flag{
    cli.BoolFlag{
      Name:  "null-io",
      Usage: "send all IO to /dev/null",
    },
    cli.StringFlag{
      Name:  "log-uri",
      Usage: "log uri",
    },
    cli.StringFlag{
      Name:  "fifo-dir",
      Usage: "directory used for storing IO FIFOs",
    },
    cli.StringFlag{
      Name:  "pid-file",
      Usage: "file path to write the task's pid",
    },
    cli.BoolFlag{
      Name:  "detach,d",
      Usage: "detach from the task after it has started execution",
    },
  },
  Action: func(context *cli.Context) error {
    client, ctx, cancel, err := commands.NewClient(context)

    defer cancel()
    container, err := client.LoadContainer(ctx, id)

    spec, err := container.Spec(ctx)
    var con console.Console
    if tty {
      con = console.Current()
      defer con.Reset()
      if err := con.SetRaw(); err != nil {
        return err
      }
    }

    task, err := NewTask(ctx, client, container, "", con, context.Bool("null-io"), context.String("log-uri"), ioOpts, opts...)

    var statusC <-chan containerd.ExitStatus
    if !detach {
      defer task.Delete(ctx)
      if statusC, err = task.Wait(ctx); err != nil {
        return err
      }
    }
    if context.IsSet("pid-file") {
      if err := commands.WritePidFile(context.String("pid-file"), int(task.Pid())); err != nil {
        return err
      }
    }
    if err := task.Start(ctx); err != nil {
      return err
    }
    if detach {
      return nil
    }

    status := <-statusC
    code, _, err := status.Result()
    if err != nil {
      return err
    }
    if _, err := task.Delete(ctx); err != nil {
      return err
    }
    if code != 0 {
      return cli.NewExitError("", int(code))
    }
    return nil
  },
}
  • NewTask处理了下命令行相关,然后调用了container.NewTask()
// cmd/ctr/commands/tasks/tasks_unix.go

func NewTask(ctx gocontext.Context, client *containerd.Client, container containerd.Container, checkpoint string, con console.Console, nullIO bool, logURI string, ioOpts []cio.Opt, opts ...containerd.NewTaskOpts) (containerd.Task, error) {
  stdinC := &stdinCloser{
    stdin: os.Stdin,
  }
  if checkpoint != "" {
    im, err := client.GetImage(ctx, checkpoint)

    opts = append(opts, containerd.WithTaskCheckpoint(im))
  }
  var ioCreator cio.Creator
  if con != nil {
    if nullIO {
      return nil, errors.New("tty and null-io cannot be used together")
    }
    ioCreator = cio.NewCreator(append([]cio.Opt{cio.WithStreams(con, con, nil), cio.WithTerminal}, ioOpts...)...)
  } else if nullIO {
    ioCreator = cio.NullIO
  } else if logURI != "" {
    u, err := url.Parse(logURI)
    ioCreator = cio.LogURI(u)
  } else {
    ioCreator = cio.NewCreator(append([]cio.Opt{cio.WithStreams(stdinC, os.Stdout, os.Stderr)}, ioOpts...)...)
  }
  t, err := container.NewTask(ctx, ioCreator, opts...)

  stdinC.closer = func() {
    t.CloseIO(ctx, containerd.WithStdinCloser)
  }
  return t, nil
}
run命令
  • 基本就是将container createtask start的逻辑组合到一块
// Command runs a container
var Command = cli.Command{
  Name:           "run",
  Usage:          "run a container",
  ArgsUsage:      "[flags] Image|RootFS ID [COMMAND] [ARG...]",
  SkipArgReorder: true,
  Flags: append([]cli.Flag{
    cli.BoolFlag{
      Name:  "detach,d",
      Usage: "detach from the task after it has started execution",
    },

  }, append(platformRunFlags, append(commands.SnapshotterFlags, commands.ContainerFlags...)...)...),
  Action: func(context *cli.Context) error {

    client, ctx, cancel, err := commands.NewClient(context)
  
    defer cancel()
    container, err := NewContainer(ctx, client, context)
  
    if context.Bool("rm") && !detach {
      defer container.Delete(ctx, containerd.WithSnapshotCleanup)
    }
    var con console.Console
    if tty {
      con = console.Current()
      defer con.Reset()
      if err := con.SetRaw(); err != nil {
        return err
      }
    }
    var network gocni.CNI
    if enableCNI {
      if network, err = gocni.New(gocni.WithDefaultConf); err != nil {
        return err
      }
    }

    opts := getNewTaskOpts(context)
    ioOpts := []cio.Opt{cio.WithFIFODir(context.String("fifo-dir"))}
    task, err := tasks.NewTask(ctx, client, container, context.String("checkpoint"), con, context.Bool("null-io"), context.String("log-uri"), ioOpts, opts...)
 

    var statusC <-chan containerd.ExitStatus
    if !detach {
      defer func() {
        if enableCNI {
          if err := network.Remove(ctx, fullID(ctx, container), ""); err != nil {
            logrus.WithError(err).Error("network review")
          }
        }
        task.Delete(ctx)
      }()

      if statusC, err = task.Wait(ctx); err != nil {
        return err
      }
    }

    if err := task.Start(ctx); err != nil {
      return err
    }
    }
    if tty {
      if err := tasks.HandleConsoleResize(ctx, task, con); err != nil {
        logrus.WithError(err).Error("console resize")
      }
    } else {
      sigc := commands.ForwardAllSignals(ctx, task)
      defer commands.StopCatch(sigc)
    }
    status := <-statusC
    code, _, err := status.Result()
    if err != nil {
      return err
    }
    if _, err := task.Delete(ctx); err != nil {
      return err
    }
    if code != 0 {
      return cli.NewExitError("", int(code))
    }
    return nil
  },
}

服务端

container
container grpc
  • 插件注册,依赖一个service
// services/containers/service.go

func init() {
  plugin.Register(&plugin.Registration{
    Type: plugin.GRPCPlugin,
    ID:   "containers",
    Requires: []plugin.Type{
      plugin.ServicePlugin,
    },
    InitFn: func(ic *plugin.InitContext) (interface{}, error) {
      plugins, err := ic.GetByType(plugin.ServicePlugin)
      if err != nil {
        return nil, err
      }
      p, ok := plugins[services.ContainersService]
      if !ok {
        return nil, errors.New("containers service not found")
      }
      i, err := p.Instance()
      if err != nil {
        return nil, err
      }
      return &service{local: i.(api.ContainersClient)}, nil
    },
  })
}
  • api则直接调用了上层
func (s *service) Create(ctx context.Context, req *api.CreateContainerRequest) (*api.CreateContainerResponse, error) {
  return s.local.Create(ctx, req)
}
container service
  • 插件注册
// services/containers/local.go
func init() {
  plugin.Register(&plugin.Registration{
    Type: plugin.ServicePlugin,
    ID:   services.ContainersService,
    Requires: []plugin.Type{
      plugin.MetadataPlugin,
    },
    InitFn: func(ic *plugin.InitContext) (interface{}, error) {
      m, err := ic.Get(plugin.MetadataPlugin)

      db := m.(*metadata.DB)
      return &local{
        Store:     metadata.NewContainerStore(db),
        db:        db,
        publisher: ic.Events,
      }, nil
    },
  })
}
  • 主要调用了Store.Create()数据库中创建一个container,且上传了事件
// services/containers/local.go

func (l *local) Create(ctx context.Context, req *api.CreateContainerRequest, _ ...grpc.CallOption) (*api.CreateContainerResponse, error) {
  var resp api.CreateContainerResponse
  if err := l.withStoreUpdate(ctx, func(ctx context.Context) error {
    container := containerFromProto(&req.Container)
    created, err := l.Store.Create(ctx, container)
    resp.Container = containerToProto(&created)
  }); err != nil {
    return &resp, errdefs.ToGRPC(err)
  }
  if err := l.publisher.Publish(ctx, "/containers/create", &eventstypes.ContainerCreate{
    ID:    resp.Container.ID,
    Image: resp.Container.Image,
    Runtime: &eventstypes.ContainerCreate_Runtime{
      Name:    resp.Container.Runtime.Name,
      Options: resp.Container.Runtime.Options,
    },
  }); err != nil {
    return &resp, err
  }

  return &resp, nil
}
  • 看下数据中对于Container的实现
// metadata/containers.go

// NewContainerStore returns a Store backed by an underlying bolt DB
func NewContainerStore(db *DB) containers.Store {
  return &containerStore{
    db: db,
  }
}
  • 首先校验了了一下container,然后在数据库中创建一个记录
// metadata/containers.go

func (s *containerStore) Create(ctx context.Context, container containers.Container) (containers.Container, error) {
  namespace, err := namespaces.NamespaceRequired(ctx)
  if err := validateContainer(&container); err != nil {
    return containers.Container{}, errors.Wrap(err, "create container failed validation")
  }
  if err := update(ctx, s.db, func(tx *bolt.Tx) error {
    bkt, err := createContainersBucket(tx, namespace)
    cbkt, err := bkt.CreateBucket([]byte(container.ID))
    if err != nil {
      if err == bolt.ErrBucketExists {
        err = errors.Wrapf(errdefs.ErrAlreadyExists, "container %q", container.ID)
      }
      return err
    }
    container.CreatedAt = time.Now().UTC()
    container.UpdatedAt = container.CreatedAt
    if err := writeContainer(cbkt, &container); err != nil {
      return errors.Wrapf(err, "failed to write container %q", container.ID)
    }
    return nil
  }); err != nil {
    return containers.Container{}, err
  }
  return container, nil
}

task

task grpc
  • 注册依赖于service的插件
// services/tasks/service.go

func init() {
  plugin.Register(&plugin.Registration{
    Type: plugin.GRPCPlugin,
    ID:   "tasks",
    Requires: []plugin.Type{
      plugin.ServicePlugin,
    },
    InitFn: func(ic *plugin.InitContext) (interface{}, error) {
      plugins, err := ic.GetByType(plugin.ServicePlugin)
      p, ok := plugins[services.TasksService]
      if !ok {
        return nil, errors.New("tasks service not found")
      }
      i, err := p.Instance()
      return &service{local: i.(api.TasksClient)}, nil
    },
  })
}
  • api直接调用上层的插件了
func (s *service) Create(ctx context.Context, r *api.CreateTaskRequest) (*api.CreateTaskResponse, error) {
  return s.local.Create(ctx, r)
}

func (s *service) Start(ctx context.Context, r *api.StartRequest) (*api.StartResponse, error) {
  return s.local.Start(ctx, r)
}
task serivce
  • 初始化的过程根据平台来选,分别有bsd,unix和win主要看unix
func init() {
  plugin.Register(&plugin.Registration{
    Type:     plugin.ServicePlugin,
    ID:       services.TasksService,
    Requires: tasksServiceRequires,
    InitFn:   initFunc,
  })

  timeout.Set(stateTimeout, 2*time.Second)
  • unix加载的插件
// services/tasks/local_unix.go
var tasksServiceRequires = []plugin.Type{
  plugin.RuntimePlugin,
  plugin.RuntimePluginV2,
  plugin.MetadataPlugin,
  plugin.TaskMonitorPlugin,
}
  • 根据平台调用loadV1Runtimes()加载runtimev1然后遍历
  • runtimev2 则是通过ic.Get(plugin.RuntimePluginV2)通过插件的形式拿到
// services/tasks/local.go

func initFunc(ic *plugin.InitContext) (interface{}, error) {
  runtimes, err := loadV1Runtimes(ic)

  v2r, err := ic.Get(plugin.RuntimePluginV2)

  m, err := ic.Get(plugin.MetadataPlugin)

  monitor, err := ic.Get(plugin.TaskMonitorPlugin)
  if err != nil {
    if !errdefs.IsNotFound(err) {
      return nil, err
    }
    monitor = runtime.NewNoopMonitor()
  }

  db := m.(*metadata.DB)
  l := &local{
    runtimes:   runtimes,
    containers: metadata.NewContainerStore(db),
    store:      db.ContentStore(),
    publisher:  ic.Events,
    monitor:    monitor.(runtime.TaskMonitor),
    v2Runtime:  v2r.(*v2.TaskManager),
  }
  for _, r := range runtimes {
    tasks, err := r.Tasks(ic.Context, true)
    for _, t := range tasks {
      l.monitor.Monitor(t)
    }
  }
  v2Tasks, err := l.v2Runtime.Tasks(ic.Context, true)
  for _, t := range v2Tasks {
    l.monitor.Monitor(t)
  }
  return l, nil
}
}
  • 具体api实现方面,通过请求的容器id获取容器,处理下需要恢复路径,因为contaInerd重启容器并不会退出,所以需要contaIner找到之前的容器
  • 处理rootfs
  • 获取一个runtime.get() 获取一个task没然后执行创建task
  • 随后调用monitor.Monitor(c)监控容器
// services/tasks/local.go

func (l *local) Create(ctx context.Context, r *api.CreateTaskRequest, _ ...grpc.CallOption) (*api.CreateTaskResponse, error) {
  container, err := l.getContainer(ctx, r.ContainerID)
  checkpointPath, err := getRestorePath(container.Runtime.Name, r.Options)

  if checkpointPath == "" && r.Checkpoint != nil {
    checkpointPath, err = ioutil.TempDir(os.Getenv("XDG_RUNTIME_DIR"), "ctrd-checkpoint")
 
    if r.Checkpoint.MediaType != images.MediaTypeContainerd1Checkpoint {
      return nil, fmt.Errorf("unsupported checkpoint type %q", r.Checkpoint.MediaType)
    }
    reader, err := l.store.ReaderAt(ctx, ocispec.Descriptor{
      MediaType:   r.Checkpoint.MediaType,
      Digest:      r.Checkpoint.Digest,
      Size:        r.Checkpoint.Size_,
      Annotations: r.Checkpoint.Annotations,
    })

    _, err = archive.Apply(ctx, checkpointPath, content.NewReader(reader))
    reader.Close()

  }
  opts := runtime.CreateOpts{
    Spec: container.Spec,
    IO: runtime.IO{
      Stdin:    r.Stdin,
      Stdout:   r.Stdout,
      Stderr:   r.Stderr,
      Terminal: r.Terminal,
    },
    Checkpoint:     checkpointPath,
    Runtime:        container.Runtime.Name,
    RuntimeOptions: container.Runtime.Options,
    TaskOptions:    r.Options,
  }
  for _, m := range r.Rootfs {
    opts.Rootfs = append(opts.Rootfs, mount.Mount{
      Type:    m.Type,
      Source:  m.Source,
      Options: m.Options,
    })
  }
  if strings.HasPrefix(container.Runtime.Name, "io.containerd.runtime.v1.") {
    log.G(ctx).Warn("runtime v1 is deprecated since containerd v1.4, consider using runtime v2")
  } else if container.Runtime.Name == plugin.RuntimeRuncV1 {
    log.G(ctx).Warnf("%q is deprecated since containerd v1.4, consider using %q", plugin.RuntimeRuncV1, plugin.RuntimeRuncV2)
  }
  rtime, err := l.getRuntime(container.Runtime.Name)
  _, err = rtime.Get(ctx, r.ContainerID)
  if err != nil && err != runtime.ErrTaskNotExists {
    return nil, errdefs.ToGRPC(err)
  }
  if err == nil {
    return nil, errdefs.ToGRPC(fmt.Errorf("task %s already exists", r.ContainerID))
  }
  c, err := rtime.Create(ctx, r.ContainerID, opts)
  if err != nil {
    return nil, errdefs.ToGRPC(err)
  }
  if err := l.monitor.Monitor(c); err != nil {
    return nil, errors.Wrap(err, "monitor task")
  }
  return &api.CreateTaskResponse{
    ContainerID: r.ContainerID,
    Pid:         c.PID(),
  }, nil
}
  • start根据上一步创建的task获取进程然后启动进程
// services/tasks/local.go

func (l *local) Start(ctx context.Context, r *api.StartRequest, _ ...grpc.CallOption) (*api.StartResponse, error) {
  t, err := l.getTask(ctx, r.ContainerID)

  p := runtime.Process(t)
  if r.ExecID != "" {
    if p, err = t.Process(ctx, r.ExecID); err != nil {
      return nil, errdefs.ToGRPC(err)
    }
  }
  if err := p.Start(ctx); err != nil {
    return nil, errdefs.ToGRPC(err)
  }
  state, err := p.State(ctx)
  if err != nil {
    return nil, errdefs.ToGRPC(err)
  }
  return &api.StartResponse{
    Pid: state.Pid,
  }, nil
}
runtime
  • runtime有2个版本现在普遍使用v2,他的初始化会根据平台传递一个config,最后拿到的参数传递给New()
// runtime/v2/manager.go

func init() {
  plugin.Register(&plugin.Registration{
    Type: plugin.RuntimePluginV2,
    ID:   "task",
    Requires: []plugin.Type{
      plugin.MetadataPlugin,
    },
    Config: &Config{
      Platforms: defaultPlatforms(),
    },
    InitFn: func(ic *plugin.InitContext) (interface{}, error) {
      supportedPlatforms, err := parsePlatforms(ic.Config.(*Config).Platforms)

      ic.Meta.Platforms = supportedPlatforms
      if err := os.MkdirAll(ic.Root, 0711); err != nil {
        return nil, err
      }
      if err := os.MkdirAll(ic.State, 0711); err != nil {
        return nil, err
      }
      m, err := ic.Get(plugin.MetadataPlugin)

      cs := metadata.NewContainerStore(m.(*metadata.DB))

      return New(ic.Context, ic.Root, ic.State, ic.Address, ic.TTRPCAddress, ic.Events, cs)
    },
  })
}
  • New()创建了文件夹和初始化了一个TaskManager,然后调用loadExistingTasks()方法加载已经存在的task
// New task manager for v2 shims
func New(ctx context.Context, root, state, containerdAddress, containerdTTRPCAddress string, events *exchange.Exchange, cs containers.Store) (*TaskManager, error) {
  for _, d := range []string{root, state} {
    if err := os.MkdirAll(d, 0711); err != nil {
      return nil, err
    }
  }
  m := &TaskManager{
    root:                   root,
    state:                  state,
    containerdAddress:      containerdAddress,
    containerdTTRPCAddress: containerdTTRPCAddress,
    tasks:                  runtime.NewTaskList(),
    events:                 events,
    containers:             cs,
  }
  if err := m.loadExistingTasks(ctx); err != nil {
    return nil, err
  }
  return m, nil
}
  • 最终通过shim创建容器,然后添加task
// Create a new task
func (m *TaskManager) Create(ctx context.Context, id string, opts runtime.CreateOpts) (_ runtime.Task, retErr error) {
  bundle, err := NewBundle(ctx, m.root, m.state, id, opts.Spec.Value)
  defer func() {
    if retErr != nil {
      bundle.Delete()
    }
  }()

  shim, err := m.startShim(ctx, bundle, id, opts)

  defer func() {
    if retErr != nil {
      m.deleteShim(shim)
    }
  }()

  t, err := shim.Create(ctx, opts)

  if err := m.tasks.Add(ctx, t); err != nil {
    return nil, errors.Wrap(err, "failed to add task")
  }

  return t, nil
}
  • 看了startShim实现。通过bundel等参数构造出一个binary,然后调用start()方法
// runtime/v2/manager.go

func (m *TaskManager) startShim(ctx context.Context, bundle *Bundle, id string, opts runtime.CreateOpts) (*shim, error) {
  ns, err := namespaces.NamespaceRequired(ctx)

  topts := opts.TaskOptions

  b := shimBinary(ctx, bundle, opts.Runtime, m.containerdAddress, m.containerdTTRPCAddress, m.events, m.tasks)
  shim, err := b.Start(ctx, topts, func() {
    log.G(ctx).WithField("id", id).Info("shim disconnected")

    cleanupAfterDeadShim(context.Background(), id, ns, m.tasks, m.events, b)
    // Remove self from the runtime task list. Even though the cleanupAfterDeadShim()
    // would publish taskExit event, but the shim.Delete() would always failed with ttrpc
    // disconnect and there is no chance to remove this dead task from runtime task lists.
    // Thus it's better to delete it here.
    m.tasks.Delete(ctx, id)
  })

  return shim, nil
}
  • 这里通过client.Command()组装出命令然后启动程序
  • 随后创建一个ttrpc客户端返回
func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_ *shim, err error) {
  args := []string{"-id", b.bundle.ID}
  if logrus.GetLevel() == logrus.DebugLevel {
    args = append(args, "-debug")
  }
  args = append(args, "start")

  cmd, err := client.Command(
    ctx,
    b.runtime,
    b.containerdAddress,
    b.containerdTTRPCAddress,
    b.bundle.Path,
    opts,
    args...,
  )

  // Windows needs a namespace when openShimLog
  ns, _ := namespaces.Namespace(ctx)
  shimCtx, cancelShimLog := context.WithCancel(namespaces.WithNamespace(context.Background(), ns))
  defer func() {
    if err != nil {
      cancelShimLog()
    }
  }()
  f, err := openShimLog(shimCtx, b.bundle, client.AnonDialer)
  if err != nil {
    return nil, errors.Wrap(err, "open shim log pipe")
  }
  defer func() {
    if err != nil {
      f.Close()
    }
  }()
  // open the log pipe and block until the writer is ready
  // this helps with synchronization of the shim
  // copy the shim's logs to containerd's output
  go func() {
    defer f.Close()
    _, err := io.Copy(os.Stderr, f)
    // To prevent flood of error messages, the expected error
    // should be reset, like os.ErrClosed or os.ErrNotExist, which
    // depends on platform.
    err = checkCopyShimLogError(ctx, err)
    if err != nil {
      log.G(ctx).WithError(err).Error("copy shim log")
    }
  }()
  out, err := cmd.CombinedOutput()
  if err != nil {
    return nil, errors.Wrapf(err, "%s", out)
  }
  address := strings.TrimSpace(string(out))
  conn, err := client.Connect(address, client.AnonDialer)
  if err != nil {
    return nil, err
  }
  onCloseWithShimLog := func() {
    onClose()
    cancelShimLog()
    f.Close()
  }
  client := ttrpc.NewClient(conn, ttrpc.WithOnClose(onCloseWithShimLog))
  return &shim{
    bundle:  b.bundle,
    client:  client,
    task:    task.NewTaskClient(client),
    events:  b.events,
    rtTasks: b.rtTasks,
  }, nil
}

总结

sequenceDiagram
    autonumber
    participant client as 客户端
    participant container as container-service
    participant task as task-service
    #participant content as content-service
    #participant snapshotter as snapshotter-service
    participant image as image-service
    
    client->>image:获取image信息
    client->>container:创建容器
    client->>task:创建task
    client->>task:启动task

参考资料

http://blog.naturelr.cc