}
extern struct lu_object_operations lod_lu_obj_ops;
+extern struct lu_object_operations lod_lu_robj_ops;
extern struct dt_object_operations lod_obj_ops;
/* Slab for OSD object allocation */
const struct lu_object_header *hdr,
struct lu_device *dev)
{
- struct lu_object *lu_obj;
- struct lod_object *lo;
+ struct lod_object *lod_obj;
+ struct lu_object *lu_obj;
+ const struct lu_fid *fid = &hdr->loh_fid;
+ mdsno_t mds;
+ int rc = 0;
+ ENTRY;
- OBD_SLAB_ALLOC_PTR_GFP(lo, lod_object_kmem, CFS_ALLOC_IO);
- if (lo == NULL)
- return NULL;
+ OBD_SLAB_ALLOC_PTR_GFP(lod_obj, lod_object_kmem, CFS_ALLOC_IO);
+ if (lod_obj == NULL)
+ RETURN(ERR_PTR(-ENOMEM));
- lu_obj = lod2lu_obj(lo);
- dt_object_init(&lo->ldo_obj, NULL, dev);
- lo->ldo_obj.do_ops = &lod_obj_ops;
- lu_obj->lo_ops = &lod_lu_obj_ops;
+ rc = lod_fld_lookup(env, lu2lod_dev(dev), fid, &mds, LU_SEQ_RANGE_MDT);
+ if (rc) {
+ OBD_SLAB_FREE_PTR(lod_obj, lod_object_kmem);
+ RETURN(ERR_PTR(rc));
+ }
- return lu_obj;
+ lod_obj->ldo_mds_num = mds;
+ lu_obj = lod2lu_obj(lod_obj);
+ dt_object_init(&lod_obj->ldo_obj, NULL, dev);
+ lod_obj->ldo_obj.do_ops = &lod_obj_ops;
+ if (likely(mds == lu_site2seq(dev->ld_site)->ss_node_id))
+ lu_obj->lo_ops = &lod_lu_obj_ops;
+ else
+ lu_obj->lo_ops = &lod_lu_robj_ops;
+ RETURN(lu_obj);
}
static int lod_cleanup_desc_tgts(const struct lu_env *env,
.loo_object_release = lod_object_release,
.loo_object_print = lod_object_print,
};
+
+/**
+ * Init remote lod object
+ */
+static int lod_robject_init(const struct lu_env *env, struct lu_object *lo,
+ const struct lu_object_conf *conf)
+{
+ struct lod_device *lod = lu2lod_dev(lo->lo_dev);
+ struct lod_tgt_descs *ltd = &lod->lod_mdt_descs;
+ struct lu_device *c_dev = NULL;
+ struct lu_object *c_obj;
+ int i;
+ ENTRY;
+
+ lod_getref(ltd);
+ if (ltd->ltd_tgts_size > 0) {
+ cfs_foreach_bit(ltd->ltd_tgt_bitmap, i) {
+ struct lod_tgt_desc *tgt;
+ tgt = LTD_TGT(ltd, i);
+ LASSERT(tgt && tgt->ltd_tgt);
+ if (tgt->ltd_index ==
+ lu2lod_obj(lo)->ldo_mds_num) {
+ c_dev = &(tgt->ltd_tgt->dd_lu_dev);
+ break;
+ }
+ }
+ }
+ lod_putref(lod, ltd);
+
+ if (unlikely(c_dev == NULL))
+ RETURN(-ENOENT);
+
+ c_obj = c_dev->ld_ops->ldo_object_alloc(env, lo->lo_header, c_dev);
+ if (unlikely(c_obj == NULL))
+ RETURN(-ENOMEM);
+
+ lu_object_add(lo, c_obj);
+
+ RETURN(0);
+}
+
+struct lu_object_operations lod_lu_robj_ops = {
+ .loo_object_init = lod_robject_init,
+ .loo_object_start = lod_object_start,
+ .loo_object_free = lod_object_free,
+ .loo_object_release = lod_object_release,
+ .loo_object_print = lod_object_print,
+};
under = &d->mdd_child->dd_lu_dev;
below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
mdd_pdlock_init(mdd_obj);
- if (below == NULL)
- RETURN(-ENOMEM);
+ if (IS_ERR(below))
+ RETURN(PTR_ERR(below));
lu_object_add(o, below);
int result;
ENTRY;
- /*
- * Create top-level object slice. This will also create
- * lu_object_header.
- */
- top = dev->ld_ops->ldo_object_alloc(env, NULL, dev);
- if (top == NULL)
- RETURN(ERR_PTR(-ENOMEM));
+ /*
+ * Create top-level object slice. This will also create
+ * lu_object_header.
+ */
+ top = dev->ld_ops->ldo_object_alloc(env, NULL, dev);
+ if (top == NULL)
+ RETURN(ERR_PTR(-ENOMEM));
+ if (IS_ERR(top))
+ RETURN(top);
/*
* This is the only place where object fid is assigned. It's constant
* after this point.
const struct lu_object_header *hdr,
struct lu_device *d)
{
- struct lu_object_header *h;
+ struct lu_object_header *h = NULL;
struct osp_object *o;
struct lu_object *l;
- LASSERT(hdr == NULL);
-
OBD_SLAB_ALLOC_PTR_GFP(o, osp_object_kmem, CFS_ALLOC_IO);
if (o != NULL) {
l = &o->opo_obj.do_lu;
- h = &o->opo_header;
- lu_object_header_init(h);
- dt_object_init(&o->opo_obj, h, d);
- lu_object_add_top(h, l);
+ /* For data object, OSP obj would always be the top
+ * object, i.e. hdr is always NULL, see lu_object_alloc.
+ * But for metadata object, we always build the object
+ * stack from MDT. i.e. mdt_object will be the top object
+ * i.e. hdr != NULL */
+ if (hdr == NULL) {
+ /* object for OST */
+ h = &o->opo_header;
+ lu_object_header_init(h);
+ dt_object_init(&o->opo_obj, h, d);
+ lu_object_add_top(h, l);
+ } else {
+ dt_object_init(&o->opo_obj, h, d);
+ }
l->lo_ops = &osp_lu_obj_ops;
.do_destroy = osp_object_destroy,
};
+static int is_ost_obj(struct lu_object *lo)
+{
+ struct osp_device *osp = lu2osp_dev(lo->lo_dev);
+
+ return !osp->opd_connect_mdt;
+}
+
static int osp_object_init(const struct lu_env *env, struct lu_object *o,
const struct lu_object_conf *unused)
{
- struct osp_object *po = lu2osp_obj(o);
+ struct osp_object *po = lu2osp_obj(o);
+ int rc = 0;
+ ENTRY;
- po->opo_obj.do_ops = &osp_obj_ops;
+ if (is_ost_obj(o))
+ po->opo_obj.do_ops = &osp_obj_ops;
- return 0;
+ RETURN(rc);
}
static void osp_object_free(const struct lu_env *env, struct lu_object *o)