From d7cf1f87296fc59119e16cb93c105afcf0ddad49 Mon Sep 17 00:00:00 2001 From: wangdi Date: Tue, 1 Oct 2013 04:13:00 -0700 Subject: [PATCH] LU-1187 lod: Add remote object for DNE LOD will do FLD lookup to check whether the object is for OST or remote MDT object. Because there are remote object invlove, it might return some error other than ENOMEM for object allocation. Signed-off-by: wang di Change-Id: Ie226b3c475524fc7e705665c6200e4c66d50acf0 Reviewed-on: http://review.whamcloud.com/4924 Tested-by: Hudson Reviewed-by: Alex Zhuravlev Reviewed-by: Mike Pershin Tested-by: Maloo Reviewed-by: Oleg Drokin --- lustre/lod/lod_dev.c | 34 ++++++++++++++++++++++---------- lustre/lod/lod_object.c | 48 +++++++++++++++++++++++++++++++++++++++++++++ lustre/mdd/mdd_object.c | 4 ++-- lustre/obdclass/lu_object.c | 16 ++++++++------- lustre/osp/osp_dev.c | 22 ++++++++++++++------- lustre/osp/osp_object.c | 16 ++++++++++++--- 6 files changed, 111 insertions(+), 29 deletions(-) diff --git a/lustre/lod/lod_dev.c b/lustre/lod/lod_dev.c index 608f754..4d1d854 100644 --- a/lustre/lod/lod_dev.c +++ b/lustre/lod/lod_dev.c @@ -94,6 +94,7 @@ int lod_fld_lookup(const struct lu_env *env, struct lod_device *lod, } extern struct lu_object_operations lod_lu_obj_ops; +extern struct lu_object_operations lod_lu_robj_ops; extern struct dt_object_operations lod_obj_ops; /* Slab for OSD object allocation */ @@ -117,19 +118,32 @@ struct lu_object *lod_object_alloc(const struct lu_env *env, const struct lu_object_header *hdr, struct lu_device *dev) { - struct lu_object *lu_obj; - struct lod_object *lo; + struct lod_object *lod_obj; + struct lu_object *lu_obj; + const struct lu_fid *fid = &hdr->loh_fid; + mdsno_t mds; + int rc = 0; + ENTRY; - OBD_SLAB_ALLOC_PTR_GFP(lo, lod_object_kmem, CFS_ALLOC_IO); - if (lo == NULL) - return NULL; + OBD_SLAB_ALLOC_PTR_GFP(lod_obj, lod_object_kmem, CFS_ALLOC_IO); + if (lod_obj == NULL) + RETURN(ERR_PTR(-ENOMEM)); - lu_obj = lod2lu_obj(lo); - dt_object_init(&lo->ldo_obj, NULL, dev); - lo->ldo_obj.do_ops = &lod_obj_ops; - lu_obj->lo_ops = &lod_lu_obj_ops; + rc = lod_fld_lookup(env, lu2lod_dev(dev), fid, &mds, LU_SEQ_RANGE_MDT); + if (rc) { + OBD_SLAB_FREE_PTR(lod_obj, lod_object_kmem); + RETURN(ERR_PTR(rc)); + } - return lu_obj; + lod_obj->ldo_mds_num = mds; + lu_obj = lod2lu_obj(lod_obj); + dt_object_init(&lod_obj->ldo_obj, NULL, dev); + lod_obj->ldo_obj.do_ops = &lod_obj_ops; + if (likely(mds == lu_site2seq(dev->ld_site)->ss_node_id)) + lu_obj->lo_ops = &lod_lu_obj_ops; + else + lu_obj->lo_ops = &lod_lu_robj_ops; + RETURN(lu_obj); } static int lod_cleanup_desc_tgts(const struct lu_env *env, diff --git a/lustre/lod/lod_object.c b/lustre/lod/lod_object.c index 4573531..de28d9c 100644 --- a/lustre/lod/lod_object.c +++ b/lustre/lod/lod_object.c @@ -1225,3 +1225,51 @@ struct lu_object_operations lod_lu_obj_ops = { .loo_object_release = lod_object_release, .loo_object_print = lod_object_print, }; + +/** + * Init remote lod object + */ +static int lod_robject_init(const struct lu_env *env, struct lu_object *lo, + const struct lu_object_conf *conf) +{ + struct lod_device *lod = lu2lod_dev(lo->lo_dev); + struct lod_tgt_descs *ltd = &lod->lod_mdt_descs; + struct lu_device *c_dev = NULL; + struct lu_object *c_obj; + int i; + ENTRY; + + lod_getref(ltd); + if (ltd->ltd_tgts_size > 0) { + cfs_foreach_bit(ltd->ltd_tgt_bitmap, i) { + struct lod_tgt_desc *tgt; + tgt = LTD_TGT(ltd, i); + LASSERT(tgt && tgt->ltd_tgt); + if (tgt->ltd_index == + lu2lod_obj(lo)->ldo_mds_num) { + c_dev = &(tgt->ltd_tgt->dd_lu_dev); + break; + } + } + } + lod_putref(lod, ltd); + + if (unlikely(c_dev == NULL)) + RETURN(-ENOENT); + + c_obj = c_dev->ld_ops->ldo_object_alloc(env, lo->lo_header, c_dev); + if (unlikely(c_obj == NULL)) + RETURN(-ENOMEM); + + lu_object_add(lo, c_obj); + + RETURN(0); +} + +struct lu_object_operations lod_lu_robj_ops = { + .loo_object_init = lod_robject_init, + .loo_object_start = lod_object_start, + .loo_object_free = lod_object_free, + .loo_object_release = lod_object_release, + .loo_object_print = lod_object_print, +}; diff --git a/lustre/mdd/mdd_object.c b/lustre/mdd/mdd_object.c index 0d7d7db..da4f2d9 100644 --- a/lustre/mdd/mdd_object.c +++ b/lustre/mdd/mdd_object.c @@ -214,8 +214,8 @@ static int mdd_object_init(const struct lu_env *env, struct lu_object *o, under = &d->mdd_child->dd_lu_dev; below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under); mdd_pdlock_init(mdd_obj); - if (below == NULL) - RETURN(-ENOMEM); + if (IS_ERR(below)) + RETURN(PTR_ERR(below)); lu_object_add(o, below); diff --git a/lustre/obdclass/lu_object.c b/lustre/obdclass/lu_object.c index 28988c52..09c189b 100644 --- a/lustre/obdclass/lu_object.c +++ b/lustre/obdclass/lu_object.c @@ -183,13 +183,15 @@ static struct lu_object *lu_object_alloc(const struct lu_env *env, int result; ENTRY; - /* - * Create top-level object slice. This will also create - * lu_object_header. - */ - top = dev->ld_ops->ldo_object_alloc(env, NULL, dev); - if (top == NULL) - RETURN(ERR_PTR(-ENOMEM)); + /* + * Create top-level object slice. This will also create + * lu_object_header. + */ + top = dev->ld_ops->ldo_object_alloc(env, NULL, dev); + if (top == NULL) + RETURN(ERR_PTR(-ENOMEM)); + if (IS_ERR(top)) + RETURN(top); /* * This is the only place where object fid is assigned. It's constant * after this point. diff --git a/lustre/osp/osp_dev.c b/lustre/osp/osp_dev.c index 6ec6216..f0206dc 100644 --- a/lustre/osp/osp_dev.c +++ b/lustre/osp/osp_dev.c @@ -72,20 +72,28 @@ struct lu_object *osp_object_alloc(const struct lu_env *env, const struct lu_object_header *hdr, struct lu_device *d) { - struct lu_object_header *h; + struct lu_object_header *h = NULL; struct osp_object *o; struct lu_object *l; - LASSERT(hdr == NULL); - OBD_SLAB_ALLOC_PTR_GFP(o, osp_object_kmem, CFS_ALLOC_IO); if (o != NULL) { l = &o->opo_obj.do_lu; - h = &o->opo_header; - lu_object_header_init(h); - dt_object_init(&o->opo_obj, h, d); - lu_object_add_top(h, l); + /* For data object, OSP obj would always be the top + * object, i.e. hdr is always NULL, see lu_object_alloc. + * But for metadata object, we always build the object + * stack from MDT. i.e. mdt_object will be the top object + * i.e. hdr != NULL */ + if (hdr == NULL) { + /* object for OST */ + h = &o->opo_header; + lu_object_header_init(h); + dt_object_init(&o->opo_obj, h, d); + lu_object_add_top(h, l); + } else { + dt_object_init(&o->opo_obj, h, d); + } l->lo_ops = &osp_lu_obj_ops; diff --git a/lustre/osp/osp_object.c b/lustre/osp/osp_object.c index 337b710..e5ffe31 100644 --- a/lustre/osp/osp_object.c +++ b/lustre/osp/osp_object.c @@ -348,14 +348,24 @@ struct dt_object_operations osp_obj_ops = { .do_destroy = osp_object_destroy, }; +static int is_ost_obj(struct lu_object *lo) +{ + struct osp_device *osp = lu2osp_dev(lo->lo_dev); + + return !osp->opd_connect_mdt; +} + static int osp_object_init(const struct lu_env *env, struct lu_object *o, const struct lu_object_conf *unused) { - struct osp_object *po = lu2osp_obj(o); + struct osp_object *po = lu2osp_obj(o); + int rc = 0; + ENTRY; - po->opo_obj.do_ops = &osp_obj_ops; + if (is_ost_obj(o)) + po->opo_obj.do_ops = &osp_obj_ops; - return 0; + RETURN(rc); } static void osp_object_free(const struct lu_env *env, struct lu_object *o) -- 1.8.3.1