containerd源码-content

content 主要负责存储下载后的原本的层

代码版本为v.17.5

content

content主要负责存储下载的layer接口定义在content/content.go

// content/content.go

type ReaderAt interface {
  io.ReaderAt
  io.Closer
  Size() int64
}

type Provider interface {
  ReaderAt(ctx context.Context, desc ocispec.Descriptor) (ReaderAt, error)
}

type Ingester interface {
  Writer(ctx context.Context, opts ...WriterOpt) (Writer, error)
}

type Info struct {
  Digest    digest.Digest
  Size      int64
  CreatedAt time.Time
  UpdatedAt time.Time
  Labels    map[string]string
}

// Status of a content operation
type Status struct {
  Ref       string
  Offset    int64
  Total     int64
  Expected  digest.Digest
  StartedAt time.Time
  UpdatedAt time.Time
}


type WalkFunc func(Info) error
type Manager interface {
  Info(ctx context.Context, dgst digest.Digest) (Info, error)
  Update(ctx context.Context, info Info, fieldpaths ...string) (Info, error)
  Walk(ctx context.Context, fn WalkFunc, filters ...string) error
  Delete(ctx context.Context, dgst digest.Digest) error
}
type IngestManager interface {
  Status(ctx context.Context, ref string) (Status, error)
  ListStatuses(ctx context.Context, filters ...string) ([]Status, error)
  Abort(ctx context.Context, ref string) error
}


type Writer interface {
  io.WriteCloser
  Digest() digest.Digest
  Commit(ctx context.Context, size int64, expected digest.Digest, opts ...Opt) error
  Status() (Status, error)
  Truncate(size int64) error
}

type Store interface {
  Manager
  Provider
  IngestManager
  Ingester
}
content grpc类型
  • grpc类型的content注册在这里,使用统一的注册,申明名字类型以及依赖
  • 然后从initcontent中获取所有service的插件,然后拿到一个ContentService实例
  • 使用这个实例调用contentserver.New(),contentserver.New()实现了grpc相关方法
// services/content/service.go
func init() {
  plugin.Register(&plugin.Registration{
    Type: plugin.GRPCPlugin,
    ID:   "content",
    Requires: []plugin.Type{
      plugin.ServicePlugin,
    },
    InitFn: func(ic *plugin.InitContext) (interface{}, error) {
      plugins, err := ic.GetByType(plugin.ServicePlugin)
      if err != nil {
        return nil, err
      }

      p, ok := plugins[services.ContentService]
      if !ok {
        return nil, errors.New("content store service not found")
      }
      cs, err := p.Instance()
      if err != nil {
        return nil, err
      }
      return contentserver.New(cs.(content.Store)), nil
    },
  })
}
  • service就是抽象了content.Store

  • New()设置了上层

// services/content/contentserver/contentserver.go

type service struct {
  store content.Store
}

// New returns the content GRPC server
func New(cs content.Store) api.ContentServer {
  return &service{store: cs}
}

func (s *service) Register(server *grpc.Server) error {
  api.RegisterContentServer(server, s)
  return nil
}
  • 由于接口很多就不一样介绍了,这里只介绍一个简单的接口
  • 可以看到grpc请求来的参数传到store.Status()然后再将返回的组装成grpc结果并返回,其他api也是类似这种
// services/content/contentserver/contentserver.go

func (s *service) Status(ctx context.Context, req *api.StatusRequest) (*api.StatusResponse, error) {
  status, err := s.store.Status(ctx, req.Ref)
  if err != nil {
    return nil, errdefs.ToGRPCf(err, "could not get status for ref %q", req.Ref)
  }

  var resp api.StatusResponse
  resp.Status = &api.Status{
    StartedAt: status.StartedAt,
    UpdatedAt: status.UpdatedAt,
    Ref:       status.Ref,
    Offset:    status.Offset,
    Total:     status.Total,
    Expected:  status.Expected,
  }

  return &resp, nil
}
content service类型
  • 这里他依赖plugin.MetadataPlugin这个类型,然后将获取的meteada传入meatadata.ContentStore()
// services/content/store.go

func init() {
  plugin.Register(&plugin.Registration{
    Type: plugin.ServicePlugin,
    ID:   services.ContentService,
    Requires: []plugin.Type{
      plugin.MetadataPlugin,
    },
    InitFn: func(ic *plugin.InitContext) (interface{}, error) {
      m, err := ic.Get(plugin.MetadataPlugin)
      if err != nil {
        return nil, err
      }

      // 这里注册 content的svc
      s, err := newContentStore(m.(*metadata.DB).ContentStore(), ic.Events)
      return s, err
    },
  })
}


func newContentStore(cs content.Store, publisher events.Publisher) (content.Store, error) {
  return &store{
    Store:     cs,
    publisher: publisher,
  }, nil
}
  • 可以看到前面调用的ContentStore()返回的就是初始化,而meteadata创建的注册在services/server/server.go前面介绍启动过程介绍过
// metadata/db.go


// NewDB creates a new metadata database using the provided
// bolt database, content store, and snapshotters.
func NewDB(db *bolt.DB, cs content.Store, ss map[string]snapshots.Snapshotter, opts ...DBOpt) *DB {
  m := &DB{
    db:      db,
    ss:      make(map[string]*snapshotter, len(ss)),
    dirtySS: map[string]struct{}{},
    dbopts: dbOptions{
      shared: true,
    },
  }

  for _, opt := range opts {
    opt(&m.dbopts)
  }

  // Initialize data stores
  m.cs = newContentStore(m, m.dbopts.shared, cs)
  for name, sn := range ss {
    m.ss[name] = newSnapshotter(m, name, sn)
  }

  return m
}

// ContentStore returns a namespaced content store
// proxied to a content store.
func (m *DB) ContentStore() content.Store {
  if m.cs == nil {
    return nil
  }
  return m.cs
}
  • 同样实现了content的很多方法,下面得了例子可以看到这里先读取数据库,然后在调用store.Status()
// metadata/content.go

func (cs *contentStore) Status(ctx context.Context, ref string) (content.Status, error) {
  ns, err := namespaces.NamespaceRequired(ctx)

  var bref string
  if err := view(ctx, cs.db, func(tx *bolt.Tx) error {
    bref = getRef(tx, ns, ref)
    if bref == "" {
      return errors.Wrapf(errdefs.ErrNotFound, "reference %v", ref)
    }

    return nil
  }); err != nil {
    return content.Status{}, err
  }

  st, err := cs.Store.Status(ctx, bref)
  if err != nil {
    return content.Status{}, err
  }
  st.Ref = ref
  return st, nil
}
content类型

content有2中实现,一种本地(local),一种prox(远程)

  • local:就是本地实现,目前可以理解为真正实现

  • proxy:则是调用远程的实现,因为content有插件

  • 注册则在loadPlugin()中首先会将本地的注册,随后读取配置文件中的proxy_plugin配置在注册proxy类型的,
    需要注意的是插件在整理之后会返回第一个可能导致你注册的content需要再配置文件中disabled_plugins参数关闭local强制使用proxy类型的

// services/server/server.go

  plugin.Register(&plugin.Registration{
    Type: plugin.ContentPlugin,
    ID:   "content",
    InitFn: func(ic *plugin.InitContext) (interface{}, error) {
      ic.Meta.Exports["root"] = ic.Root
      return local.NewStore(ic.Root)
    },
  })

  clients := &proxyClients{}
  for name, pp := range config.ProxyPlugins {
    var (
      t plugin.Type
      f func(*grpc.ClientConn) interface{}

      address = pp.Address
    )

    // nsap逻辑

    case string(plugin.ContentPlugin), "content":
      t = plugin.ContentPlugin
      f = func(conn *grpc.ClientConn) interface{} {
        return csproxy.NewContentStore(csapi.NewContentClient(conn))
      }
    default:
      log.G(ctx).WithField("type", pp.Type).Warn("unknown proxy plugin type")
    }

    plugin.Register(&plugin.Registration{
      Type: t,
      ID:   name,
      InitFn: func(ic *plugin.InitContext) (interface{}, error) {
        ic.Meta.Exports["address"] = address
        conn, err := clients.getClient(address)
        if err != nil {
          return nil, err
        }
        return f(conn), nil
      },
    })
  • 接口实现本质就是读取存储的文件一些信息,然后返回
// content/local/store.go

// status works like stat above except uses the path to the ingest.
func (s *store) status(ingestPath string) (content.Status, error) {
  dp := filepath.Join(ingestPath, "data")
  fi, err := os.Stat(dp)

  ref, err := readFileString(filepath.Join(ingestPath, "ref"))

  startedAt, err := readFileTimestamp(filepath.Join(ingestPath, "startedat"))
 
  updatedAt, err := readFileTimestamp(filepath.Join(ingestPath, "updatedat"))
 
  // because we don't write updatedat on every write, the mod time may
  // actually be more up to date.
  if fi.ModTime().After(updatedAt) {
    updatedAt = fi.ModTime()
  }

  return content.Status{
    Ref:       ref,
    Offset:    fi.Size(),
    Total:     s.total(ingestPath),
    UpdatedAt: updatedAt,
    StartedAt: startedAt,
  }, nil
}