From 49b17944e1a61f88bddb5595bb053a555c8c08da Mon Sep 17 00:00:00 2001 From: Bobi Jam Date: Tue, 4 Nov 2014 21:41:56 +0800 Subject: [PATCH] LU-5823 clio: add coo_obd_info_get and coo_data_version * Add coo_obd_info_get to retrieve object attributes from servers. * Add coo_data_version to retrieve object's data_version. Signed-off-by: Bobi Jam Change-Id: Ia66a3ba1eee5c6478f3aa5c9f942ada77b2b5fe9 Reviewed-on: http://review.whamcloud.com/12638 Tested-by: Jenkins Tested-by: Maloo Reviewed-by: John L. Hammond Reviewed-by: Jinshan Xiong Reviewed-by: Oleg Drokin --- lustre/include/cl_object.h | 47 ++++++++++----- lustre/llite/file.c | 38 ++++-------- lustre/lov/lov_internal.h | 6 ++ lustre/lov/lov_obd.c | 3 +- lustre/lov/lov_object.c | 143 +++++++++++++++++++++++++++++++++++++++++--- lustre/obdclass/cl_object.c | 67 +++++++++++++++++---- lustre/osc/osc_internal.h | 11 ++++ lustre/osc/osc_object.c | 54 ++++++++++++++--- lustre/osc/osc_request.c | 19 +++--- 9 files changed, 303 insertions(+), 85 deletions(-) diff --git a/lustre/include/cl_object.h b/lustre/include/cl_object.h index 23d6ea4..42b104f 100644 --- a/lustre/include/cl_object.h +++ b/lustre/include/cl_object.h @@ -105,6 +105,7 @@ #include #include +struct obd_info; struct inode; struct cl_device; @@ -348,19 +349,19 @@ struct cl_object_operations { */ int (*coo_io_init)(const struct lu_env *env, struct cl_object *obj, struct cl_io *io); - /** - * Fill portion of \a attr that this layer controls. This method is - * called top-to-bottom through all object layers. - * - * \pre cl_object_header::coh_attr_guard of the top-object is locked. - * - * \return 0: to continue - * \return +ve: to stop iterating through layers (but 0 is returned - * from enclosing cl_object_attr_get()) - * \return -ve: to signal error - */ - int (*coo_attr_get)(const struct lu_env *env, struct cl_object *obj, - struct cl_attr *attr); + /** + * Fill portion of \a attr that this layer controls. This method is + * called top-to-bottom through all object layers. + * + * \pre cl_object_header::coh_attr_guard of the top-object is locked. + * + * \return 0: to continue + * \return +ve: to stop iterating through layers (but 0 is returned + * from enclosing cl_object_attr_get()) + * \return -ve: to signal error + */ + int (*coo_attr_get)(const struct lu_env *env, struct cl_object *obj, + struct cl_attr *attr); /** * Update attributes. * @@ -415,6 +416,17 @@ struct cl_object_operations { int (*coo_fiemap)(const struct lu_env *env, struct cl_object *obj, struct ll_fiemap_info_key *fmkey, struct fiemap *fiemap, size_t *buflen); + /** + * Get attributes of the object from server. (top->bottom) + */ + int (*coo_obd_info_get)(const struct lu_env *env, struct cl_object *obj, + struct obd_info *oinfo, + struct ptlrpc_request_set *set); + /** + * Get data version of the object. (top->bottom) + */ + int (*coo_data_version)(const struct lu_env *env, struct cl_object *obj, + __u64 *version, int flags); }; /** @@ -2172,8 +2184,8 @@ void cl_object_put (const struct lu_env *env, struct cl_object *o); void cl_object_get (struct cl_object *o); void cl_object_attr_lock (struct cl_object *o); void cl_object_attr_unlock(struct cl_object *o); -int cl_object_attr_get (const struct lu_env *env, struct cl_object *obj, - struct cl_attr *attr); +int cl_object_attr_get(const struct lu_env *env, struct cl_object *obj, + struct cl_attr *attr); int cl_object_attr_update(const struct lu_env *env, struct cl_object *obj, const struct cl_attr *attr, unsigned valid); int cl_object_glimpse (const struct lu_env *env, struct cl_object *obj, @@ -2189,6 +2201,11 @@ int cl_object_find_cbdata(const struct lu_env *env, struct cl_object *obj, int cl_object_fiemap(const struct lu_env *env, struct cl_object *obj, struct ll_fiemap_info_key *fmkey, struct fiemap *fiemap, size_t *buflen); +int cl_object_obd_info_get(const struct lu_env *env, struct cl_object *obj, + struct obd_info *oinfo, + struct ptlrpc_request_set *set); +int cl_object_data_version(const struct lu_env *env, struct cl_object *obj, + __u64 *version, int flags); /** * Returns true, iff \a o0 and \a o1 are slices of the same object. diff --git a/lustre/llite/file.c b/lustre/llite/file.c index 99b790f..f835a85 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -1854,43 +1854,31 @@ error: * This value is computed using stripe object version on OST. * Version is computed using server side locking. * - * @param sync if do sync on the OST side; + * @param flags if do sync on the OST side; * 0: no sync * LL_DV_RD_FLUSH: flush dirty pages, LCK_PR on OSTs * LL_DV_WR_FLUSH: drop all caching pages, LCK_PW on OSTs */ int ll_data_version(struct inode *inode, __u64 *data_version, int flags) { - struct lov_stripe_md *lsm = NULL; - struct ll_sb_info *sbi = ll_i2sbi(inode); - struct obdo *obdo = NULL; - int rc; + struct lu_env *env; + int refcheck; + int rc; ENTRY; - /* If no stripe, we consider version is 0. */ - lsm = ccc_inode_lsm_get(inode); - if (!lsm_has_objects(lsm)) { + /* If no file object initialized, we consider its version is 0. */ + if (ll_i2info(inode)->lli_clob == NULL) { *data_version = 0; - CDEBUG(D_INODE, "No object for inode\n"); - GOTO(out, rc = 0); + RETURN(0); } - OBD_ALLOC_PTR(obdo); - if (obdo == NULL) - GOTO(out, rc = -ENOMEM); - - rc = ll_lsm_getattr(lsm, sbi->ll_dt_exp, NULL, obdo, flags); - if (rc == 0) { - if (!(obdo->o_valid & OBD_MD_FLDATAVERSION)) - rc = -EOPNOTSUPP; - else - *data_version = obdo->o_data_version; - } + env = cl_env_get(&refcheck); + if (IS_ERR(env)) + RETURN(PTR_ERR(env)); - OBD_FREE_PTR(obdo); - EXIT; -out: - ccc_inode_lsm_put(inode, lsm); + rc = cl_object_data_version(env, ll_i2info(inode)->lli_clob, + data_version, flags); + cl_env_put(env, &refcheck); RETURN(rc); } diff --git a/lustre/lov/lov_internal.h b/lustre/lov/lov_internal.h index 2879f60..8c8e5da 100644 --- a/lustre/lov/lov_internal.h +++ b/lustre/lov/lov_internal.h @@ -188,6 +188,7 @@ int lov_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg, __u32 *indexp, int *genp); int lov_del_target(struct obd_device *obd, __u32 index, struct obd_uuid *uuidp, int gen); +int lov_getattr_interpret(struct ptlrpc_request_set *rqset, void *data, int rc); /* lov_pack.c */ int lov_packmd(struct obd_export *exp, struct lov_mds_md **lmm, @@ -251,4 +252,9 @@ static inline bool lov_oinfo_is_dummy(const struct lov_oinfo *loi) return false; } +static inline struct obd_device *lov2obd(const struct lov_obd *lov) +{ + return container_of0(lov, struct obd_device, u.lov); +} + #endif diff --git a/lustre/lov/lov_obd.c b/lustre/lov/lov_obd.c index 49f332e..1fd3b19 100644 --- a/lustre/lov/lov_obd.c +++ b/lustre/lov/lov_obd.c @@ -1029,8 +1029,7 @@ do { "%p->lsm_magic=%x\n", (lsmp), (lsmp)->lsm_magic); \ } while (0) -static int lov_getattr_interpret(struct ptlrpc_request_set *rqset, - void *data, int rc) +int lov_getattr_interpret(struct ptlrpc_request_set *rqset, void *data, int rc) { struct lov_request_set *lovset = (struct lov_request_set *)data; int err; diff --git a/lustre/lov/lov_object.c b/lustre/lov/lov_object.c index 8feeee2..afe3b9f 100644 --- a/lustre/lov/lov_object.c +++ b/lustre/lov/lov_object.c @@ -1445,6 +1445,130 @@ out: RETURN(rc); } +static int lov_dispatch_obd_info_get(const struct lu_env *env, + struct cl_object *obj, + struct obd_info *oinfo, + struct ptlrpc_request_set *set) +{ + struct cl_object *subobj = NULL; + struct lov_obd *lov = lu2lov_dev(obj->co_lu.lo_dev)->ld_lov; + struct lov_request_set *lovset; + struct list_head *pos; + struct lov_request *req; + int rc; + int rc2; + ENTRY; + + rc = lov_prep_getattr_set(lov2obd(lov)->obd_self_export, oinfo, + &lovset); + if (rc != 0) + RETURN(rc); + + CDEBUG(D_INFO, "objid "DOSTID": %ux%u byte stripes.\n", + POSTID(&oinfo->oi_md->lsm_oi), + oinfo->oi_md->lsm_stripe_count, + oinfo->oi_md->lsm_stripe_size); + + list_for_each(pos, &lovset->set_list) { + req = list_entry(pos, struct lov_request, rq_link); + + CDEBUG(D_INFO, "objid "DOSTID"[%d] has subobj "DOSTID" at idx" + "%u\n", POSTID(&oinfo->oi_oa->o_oi), req->rq_stripe, + POSTID(&req->rq_oi.oi_oa->o_oi), req->rq_idx); + subobj = lov_find_subobj(env, cl2lov(obj), oinfo->oi_md, + req->rq_stripe); + if (IS_ERR(subobj)) + GOTO(errout, rc = PTR_ERR(subobj)); + + rc = cl_object_obd_info_get(env, subobj, &req->rq_oi, set); + cl_object_put(env, subobj); + if (rc != 0) { + CERROR("%s: getattr objid "DOSTID" subobj" + DOSTID" on OST idx %d: rc = %d\n", + lov2obd(lov)->obd_name, + POSTID(&oinfo->oi_oa->o_oi), + POSTID(&req->rq_oi.oi_oa->o_oi), + req->rq_idx, rc); + GOTO(errout, rc); + } + } + + if (!list_empty(&set->set_requests)) { + LASSERT(rc == 0); + LASSERT(set->set_interpret == NULL); + set->set_interpret = lov_getattr_interpret; + set->set_arg = lovset; + GOTO(out, rc); + } +errout: + if (rc) + atomic_set(&lovset->set_completes, 0); + rc2 = lov_fini_getattr_set(lovset); + rc = rc != 0 ? rc : rc2; +out: + RETURN(rc); +} + +static int lov_object_data_version(const struct lu_env *env, + struct cl_object *obj, __u64 *data_version, + int flags) +{ + struct ptlrpc_request_set *set; + struct obd_info oinfo = { { { 0 } } }; + struct obdo *obdo = NULL; + struct lov_stripe_md *lsm; + int rc; + ENTRY; + + lsm = lov_lsm_addref(cl2lov(obj)); + if (!lsm_has_objects(lsm)) { + /* If no stripe, we consider version is 0. */ + *data_version = 0; + GOTO(out, rc = 0); + } + + OBD_ALLOC_PTR(obdo); + if (obdo == NULL) + GOTO(out, rc = -ENOMEM); + + oinfo.oi_md = lsm; + oinfo.oi_oa = obdo; + obdo->o_oi = lsm->lsm_oi; + obdo->o_mode = S_IFREG; + obdo->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLTYPE; + if (flags & (LL_DV_RD_FLUSH | LL_DV_WR_FLUSH)) { + obdo->o_valid |= OBD_MD_FLFLAGS; + obdo->o_flags |= OBD_FL_SRVLOCK; + if (flags & LL_DV_WR_FLUSH) + obdo->o_flags |= OBD_FL_FLUSH; + } + + set = ptlrpc_prep_set(); + if (set == NULL) + GOTO(out_obdo, rc = -ENOMEM); + + rc = lov_dispatch_obd_info_get(env, obj, &oinfo, set); + if (rc == 0) + rc = ptlrpc_set_wait(set); + ptlrpc_set_destroy(set); + if (rc == 0) { + oinfo.oi_oa->o_valid &= OBD_MD_FLDATAVERSION | OBD_MD_FLFLAGS; + if (flags & LL_DV_WR_FLUSH && + !(oinfo.oi_oa->o_valid & OBD_MD_FLFLAGS && + oinfo.oi_oa->o_flags & OBD_FL_FLUSH)) + rc = -EOPNOTSUPP; + else if (!(obdo->o_valid & OBD_MD_FLDATAVERSION)) + rc = -EOPNOTSUPP; + else + *data_version = obdo->o_data_version; + } +out_obdo: + OBD_FREE_PTR(obdo); +out: + lov_lsm_put(obj, lsm); + RETURN(rc); +} + static int lov_object_getstripe(const struct lu_env *env, struct cl_object *obj, struct lov_user_md __user *lum) { @@ -1476,15 +1600,16 @@ static int lov_object_find_cbdata(const struct lu_env *env, } static const struct cl_object_operations lov_ops = { - .coo_page_init = lov_page_init, - .coo_lock_init = lov_lock_init, - .coo_io_init = lov_io_init, - .coo_attr_get = lov_attr_get, - .coo_attr_update = lov_attr_update, - .coo_conf_set = lov_conf_set, - .coo_getstripe = lov_object_getstripe, - .coo_find_cbdata = lov_object_find_cbdata, - .coo_fiemap = lov_object_fiemap, + .coo_page_init = lov_page_init, + .coo_lock_init = lov_lock_init, + .coo_io_init = lov_io_init, + .coo_attr_get = lov_attr_get, + .coo_attr_update = lov_attr_update, + .coo_conf_set = lov_conf_set, + .coo_getstripe = lov_object_getstripe, + .coo_find_cbdata = lov_object_find_cbdata, + .coo_fiemap = lov_object_fiemap, + .coo_data_version = lov_object_data_version, }; static const struct lu_object_operations lov_lu_obj_ops = { diff --git a/lustre/obdclass/cl_object.c b/lustre/obdclass/cl_object.c index 59ce8de..b003952 100644 --- a/lustre/obdclass/cl_object.c +++ b/lustre/obdclass/cl_object.c @@ -202,7 +202,7 @@ EXPORT_SYMBOL(cl_object_attr_unlock); * for. */ int cl_object_attr_get(const struct lu_env *env, struct cl_object *obj, - struct cl_attr *attr) + struct cl_attr *attr) { struct lu_object_header *top; int result; @@ -210,19 +210,19 @@ int cl_object_attr_get(const struct lu_env *env, struct cl_object *obj, assert_spin_locked(cl_object_attr_guard(obj)); ENTRY; - top = obj->co_lu.lo_header; - result = 0; + top = obj->co_lu.lo_header; + result = 0; list_for_each_entry(obj, &top->loh_layers, co_lu.lo_linkage) { - if (obj->co_ops->coo_attr_get != NULL) { - result = obj->co_ops->coo_attr_get(env, obj, attr); - if (result != 0) { - if (result > 0) - result = 0; - break; - } - } - } - RETURN(result); + if (obj->co_ops->coo_attr_get != NULL) { + result = obj->co_ops->coo_attr_get(env, obj, attr); + if (result != 0) { + if (result > 0) + result = 0; + break; + } + } + } + RETURN(result); } EXPORT_SYMBOL(cl_object_attr_get); @@ -418,6 +418,47 @@ int cl_object_fiemap(const struct lu_env *env, struct cl_object *obj, } EXPORT_SYMBOL(cl_object_fiemap); +int cl_object_obd_info_get(const struct lu_env *env, struct cl_object *obj, + struct obd_info *oinfo, + struct ptlrpc_request_set *set) +{ + struct lu_object_header *top; + int result = 0; + ENTRY; + + top = obj->co_lu.lo_header; + list_for_each_entry(obj, &top->loh_layers, co_lu.lo_linkage) { + if (obj->co_ops->coo_obd_info_get != NULL) { + result = obj->co_ops->coo_obd_info_get(env, obj, oinfo, + set); + if (result != 0) + break; + } + } + RETURN(result); +} +EXPORT_SYMBOL(cl_object_obd_info_get); + +int cl_object_data_version(const struct lu_env *env, struct cl_object *obj, + __u64 *data_version, int flags) +{ + struct lu_object_header *top; + int result = 0; + ENTRY; + + top = obj->co_lu.lo_header; + list_for_each_entry(obj, &top->loh_layers, co_lu.lo_linkage) { + if (obj->co_ops->coo_data_version != NULL) { + result = obj->co_ops->coo_data_version(env, obj, + data_version, flags); + if (result != 0) + break; + } + } + RETURN(result); +} +EXPORT_SYMBOL(cl_object_data_version); + /** * Helper function removing all object locks, and marking object for * deletion. All object pages must have been deleted at this point. diff --git a/lustre/osc/osc_internal.h b/lustre/osc/osc_internal.h index 9e44778..1f43cd1 100644 --- a/lustre/osc/osc_internal.h +++ b/lustre/osc/osc_internal.h @@ -196,6 +196,11 @@ struct osc_quota_info { struct hlist_node oqi_hash; obd_uid oqi_id; }; + +struct osc_async_args { + struct obd_info *aa_oi; +}; + int osc_quota_setup(struct obd_device *obd); int osc_quota_cleanup(struct obd_device *obd); int osc_quota_setdq(struct client_obd *cli, const unsigned int qid[], @@ -226,4 +231,10 @@ enum osc_dap_flags { struct ldlm_lock *osc_dlmlock_at_pgoff(const struct lu_env *env, struct osc_object *obj, pgoff_t index, enum osc_dap_flags flags); +void osc_pack_req_body(struct ptlrpc_request *req, struct obd_info *oinfo); +void osc_set_capa_size(struct ptlrpc_request *req, + const struct req_msg_field *field, struct obd_capa *oc); +int osc_getattr_interpret(const struct lu_env *env, + struct ptlrpc_request *req, + struct osc_async_args *aa, int rc); #endif /* OSC_INTERNAL_H */ diff --git a/lustre/osc/osc_object.c b/lustre/osc/osc_object.c index b3614f8..3a2d09b 100644 --- a/lustre/osc/osc_object.c +++ b/lustre/osc/osc_object.c @@ -341,6 +341,41 @@ drop_lock: RETURN(rc); } +static int osc_object_obd_info_get(const struct lu_env *env, + struct cl_object *obj, + struct obd_info *oinfo, + struct ptlrpc_request_set *set) +{ + struct ptlrpc_request *req; + struct osc_async_args *aa; + int rc; + ENTRY; + + req = ptlrpc_request_alloc(class_exp2cliimp(osc_export(cl2osc(obj))), + &RQF_OST_GETATTR); + if (req == NULL) + RETURN(-ENOMEM); + + osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa); + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR); + if (rc != 0) { + ptlrpc_request_free(req); + RETURN(rc); + } + + osc_pack_req_body(req, oinfo); + + ptlrpc_request_set_replen(req); + req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret; + + CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); + aa = ptlrpc_req_async_args(req); + aa->aa_oi = oinfo; + + ptlrpc_set_add_req(set, req); + RETURN(0); +} + void osc_object_set_contended(struct osc_object *obj) { obj->oo_contention_time = cfs_time_current(); @@ -380,15 +415,16 @@ int osc_object_is_contended(struct osc_object *obj) } static const struct cl_object_operations osc_ops = { - .coo_page_init = osc_page_init, - .coo_lock_init = osc_lock_init, - .coo_io_init = osc_io_init, - .coo_attr_get = osc_attr_get, - .coo_attr_update = osc_attr_update, - .coo_glimpse = osc_object_glimpse, - .coo_prune = osc_object_prune, - .coo_find_cbdata = osc_object_find_cbdata, - .coo_fiemap = osc_object_fiemap, + .coo_page_init = osc_page_init, + .coo_lock_init = osc_lock_init, + .coo_io_init = osc_io_init, + .coo_attr_get = osc_attr_get, + .coo_attr_update = osc_attr_update, + .coo_glimpse = osc_object_glimpse, + .coo_prune = osc_object_prune, + .coo_find_cbdata = osc_object_find_cbdata, + .coo_fiemap = osc_object_fiemap, + .coo_obd_info_get = osc_object_obd_info_get, }; static const struct lu_object_operations osc_lu_obj_ops = { diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index b64ad9a..2bcc035 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -68,10 +68,6 @@ struct osc_brw_async_args { #define osc_grant_args osc_brw_async_args -struct osc_async_args { - struct obd_info *aa_oi; -}; - struct osc_setattr_args { struct obdo *sa_oa; obd_enqueue_update_f sa_upcall; @@ -116,8 +112,7 @@ static inline void osc_pack_capa(struct ptlrpc_request *req, DEBUG_CAPA(D_SEC, c, "pack"); } -static inline void osc_pack_req_body(struct ptlrpc_request *req, - struct obd_info *oinfo) +void osc_pack_req_body(struct ptlrpc_request *req, struct obd_info *oinfo) { struct ost_body *body; @@ -129,9 +124,9 @@ static inline void osc_pack_req_body(struct ptlrpc_request *req, osc_pack_capa(req, body, oinfo->oi_capa); } -static inline void osc_set_capa_size(struct ptlrpc_request *req, - const struct req_msg_field *field, - struct obd_capa *oc) +void osc_set_capa_size(struct ptlrpc_request *req, + const struct req_msg_field *field, + struct obd_capa *oc) { if (oc == NULL) req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0); @@ -140,9 +135,9 @@ static inline void osc_set_capa_size(struct ptlrpc_request *req, ; } -static int osc_getattr_interpret(const struct lu_env *env, - struct ptlrpc_request *req, - struct osc_async_args *aa, int rc) +int osc_getattr_interpret(const struct lu_env *env, + struct ptlrpc_request *req, + struct osc_async_args *aa, int rc) { struct ost_body *body; ENTRY; -- 1.8.3.1