* Add coo_obd_info_get to retrieve object attributes from servers.
* Add coo_data_version to retrieve object's data_version.
Signed-off-by: Bobi Jam <bobijam.xu@intel.com>
Change-Id: Ia66a3ba1eee5c6478f3aa5c9f942ada77b2b5fe9
Reviewed-on: http://review.whamcloud.com/12638
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: John L. Hammond <john.hammond@intel.com>
Reviewed-by: Jinshan Xiong <jinshan.xiong@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
#include <linux/wait.h>
#include <lustre_dlm.h>
+struct obd_info;
struct inode;
struct cl_device;
*/
int (*coo_io_init)(const struct lu_env *env,
struct cl_object *obj, struct cl_io *io);
- /**
- * Fill portion of \a attr that this layer controls. This method is
- * called top-to-bottom through all object layers.
- *
- * \pre cl_object_header::coh_attr_guard of the top-object is locked.
- *
- * \return 0: to continue
- * \return +ve: to stop iterating through layers (but 0 is returned
- * from enclosing cl_object_attr_get())
- * \return -ve: to signal error
- */
- int (*coo_attr_get)(const struct lu_env *env, struct cl_object *obj,
- struct cl_attr *attr);
+ /**
+ * Fill portion of \a attr that this layer controls. This method is
+ * called top-to-bottom through all object layers.
+ *
+ * \pre cl_object_header::coh_attr_guard of the top-object is locked.
+ *
+ * \return 0: to continue
+ * \return +ve: to stop iterating through layers (but 0 is returned
+ * from enclosing cl_object_attr_get())
+ * \return -ve: to signal error
+ */
+ int (*coo_attr_get)(const struct lu_env *env, struct cl_object *obj,
+ struct cl_attr *attr);
/**
* Update attributes.
*
int (*coo_fiemap)(const struct lu_env *env, struct cl_object *obj,
struct ll_fiemap_info_key *fmkey,
struct fiemap *fiemap, size_t *buflen);
+ /**
+ * Get attributes of the object from server. (top->bottom)
+ */
+ int (*coo_obd_info_get)(const struct lu_env *env, struct cl_object *obj,
+ struct obd_info *oinfo,
+ struct ptlrpc_request_set *set);
+ /**
+ * Get data version of the object. (top->bottom)
+ */
+ int (*coo_data_version)(const struct lu_env *env, struct cl_object *obj,
+ __u64 *version, int flags);
};
/**
void cl_object_get (struct cl_object *o);
void cl_object_attr_lock (struct cl_object *o);
void cl_object_attr_unlock(struct cl_object *o);
-int cl_object_attr_get (const struct lu_env *env, struct cl_object *obj,
- struct cl_attr *attr);
+int cl_object_attr_get(const struct lu_env *env, struct cl_object *obj,
+ struct cl_attr *attr);
int cl_object_attr_update(const struct lu_env *env, struct cl_object *obj,
const struct cl_attr *attr, unsigned valid);
int cl_object_glimpse (const struct lu_env *env, struct cl_object *obj,
int cl_object_fiemap(const struct lu_env *env, struct cl_object *obj,
struct ll_fiemap_info_key *fmkey, struct fiemap *fiemap,
size_t *buflen);
+int cl_object_obd_info_get(const struct lu_env *env, struct cl_object *obj,
+ struct obd_info *oinfo,
+ struct ptlrpc_request_set *set);
+int cl_object_data_version(const struct lu_env *env, struct cl_object *obj,
+ __u64 *version, int flags);
/**
* Returns true, iff \a o0 and \a o1 are slices of the same object.
* This value is computed using stripe object version on OST.
* Version is computed using server side locking.
*
- * @param sync if do sync on the OST side;
+ * @param flags if do sync on the OST side;
* 0: no sync
* LL_DV_RD_FLUSH: flush dirty pages, LCK_PR on OSTs
* LL_DV_WR_FLUSH: drop all caching pages, LCK_PW on OSTs
*/
int ll_data_version(struct inode *inode, __u64 *data_version, int flags)
{
- struct lov_stripe_md *lsm = NULL;
- struct ll_sb_info *sbi = ll_i2sbi(inode);
- struct obdo *obdo = NULL;
- int rc;
+ struct lu_env *env;
+ int refcheck;
+ int rc;
ENTRY;
- /* If no stripe, we consider version is 0. */
- lsm = ccc_inode_lsm_get(inode);
- if (!lsm_has_objects(lsm)) {
+ /* If no file object initialized, we consider its version is 0. */
+ if (ll_i2info(inode)->lli_clob == NULL) {
*data_version = 0;
- CDEBUG(D_INODE, "No object for inode\n");
- GOTO(out, rc = 0);
+ RETURN(0);
}
- OBD_ALLOC_PTR(obdo);
- if (obdo == NULL)
- GOTO(out, rc = -ENOMEM);
-
- rc = ll_lsm_getattr(lsm, sbi->ll_dt_exp, NULL, obdo, flags);
- if (rc == 0) {
- if (!(obdo->o_valid & OBD_MD_FLDATAVERSION))
- rc = -EOPNOTSUPP;
- else
- *data_version = obdo->o_data_version;
- }
+ env = cl_env_get(&refcheck);
+ if (IS_ERR(env))
+ RETURN(PTR_ERR(env));
- OBD_FREE_PTR(obdo);
- EXIT;
-out:
- ccc_inode_lsm_put(inode, lsm);
+ rc = cl_object_data_version(env, ll_i2info(inode)->lli_clob,
+ data_version, flags);
+ cl_env_put(env, &refcheck);
RETURN(rc);
}
__u32 *indexp, int *genp);
int lov_del_target(struct obd_device *obd, __u32 index,
struct obd_uuid *uuidp, int gen);
+int lov_getattr_interpret(struct ptlrpc_request_set *rqset, void *data, int rc);
/* lov_pack.c */
int lov_packmd(struct obd_export *exp, struct lov_mds_md **lmm,
return false;
}
+static inline struct obd_device *lov2obd(const struct lov_obd *lov)
+{
+ return container_of0(lov, struct obd_device, u.lov);
+}
+
#endif
"%p->lsm_magic=%x\n", (lsmp), (lsmp)->lsm_magic); \
} while (0)
-static int lov_getattr_interpret(struct ptlrpc_request_set *rqset,
- void *data, int rc)
+int lov_getattr_interpret(struct ptlrpc_request_set *rqset, void *data, int rc)
{
struct lov_request_set *lovset = (struct lov_request_set *)data;
int err;
RETURN(rc);
}
+static int lov_dispatch_obd_info_get(const struct lu_env *env,
+ struct cl_object *obj,
+ struct obd_info *oinfo,
+ struct ptlrpc_request_set *set)
+{
+ struct cl_object *subobj = NULL;
+ struct lov_obd *lov = lu2lov_dev(obj->co_lu.lo_dev)->ld_lov;
+ struct lov_request_set *lovset;
+ struct list_head *pos;
+ struct lov_request *req;
+ int rc;
+ int rc2;
+ ENTRY;
+
+ rc = lov_prep_getattr_set(lov2obd(lov)->obd_self_export, oinfo,
+ &lovset);
+ if (rc != 0)
+ RETURN(rc);
+
+ CDEBUG(D_INFO, "objid "DOSTID": %ux%u byte stripes.\n",
+ POSTID(&oinfo->oi_md->lsm_oi),
+ oinfo->oi_md->lsm_stripe_count,
+ oinfo->oi_md->lsm_stripe_size);
+
+ list_for_each(pos, &lovset->set_list) {
+ req = list_entry(pos, struct lov_request, rq_link);
+
+ CDEBUG(D_INFO, "objid "DOSTID"[%d] has subobj "DOSTID" at idx"
+ "%u\n", POSTID(&oinfo->oi_oa->o_oi), req->rq_stripe,
+ POSTID(&req->rq_oi.oi_oa->o_oi), req->rq_idx);
+ subobj = lov_find_subobj(env, cl2lov(obj), oinfo->oi_md,
+ req->rq_stripe);
+ if (IS_ERR(subobj))
+ GOTO(errout, rc = PTR_ERR(subobj));
+
+ rc = cl_object_obd_info_get(env, subobj, &req->rq_oi, set);
+ cl_object_put(env, subobj);
+ if (rc != 0) {
+ CERROR("%s: getattr objid "DOSTID" subobj"
+ DOSTID" on OST idx %d: rc = %d\n",
+ lov2obd(lov)->obd_name,
+ POSTID(&oinfo->oi_oa->o_oi),
+ POSTID(&req->rq_oi.oi_oa->o_oi),
+ req->rq_idx, rc);
+ GOTO(errout, rc);
+ }
+ }
+
+ if (!list_empty(&set->set_requests)) {
+ LASSERT(rc == 0);
+ LASSERT(set->set_interpret == NULL);
+ set->set_interpret = lov_getattr_interpret;
+ set->set_arg = lovset;
+ GOTO(out, rc);
+ }
+errout:
+ if (rc)
+ atomic_set(&lovset->set_completes, 0);
+ rc2 = lov_fini_getattr_set(lovset);
+ rc = rc != 0 ? rc : rc2;
+out:
+ RETURN(rc);
+}
+
+static int lov_object_data_version(const struct lu_env *env,
+ struct cl_object *obj, __u64 *data_version,
+ int flags)
+{
+ struct ptlrpc_request_set *set;
+ struct obd_info oinfo = { { { 0 } } };
+ struct obdo *obdo = NULL;
+ struct lov_stripe_md *lsm;
+ int rc;
+ ENTRY;
+
+ lsm = lov_lsm_addref(cl2lov(obj));
+ if (!lsm_has_objects(lsm)) {
+ /* If no stripe, we consider version is 0. */
+ *data_version = 0;
+ GOTO(out, rc = 0);
+ }
+
+ OBD_ALLOC_PTR(obdo);
+ if (obdo == NULL)
+ GOTO(out, rc = -ENOMEM);
+
+ oinfo.oi_md = lsm;
+ oinfo.oi_oa = obdo;
+ obdo->o_oi = lsm->lsm_oi;
+ obdo->o_mode = S_IFREG;
+ obdo->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLTYPE;
+ if (flags & (LL_DV_RD_FLUSH | LL_DV_WR_FLUSH)) {
+ obdo->o_valid |= OBD_MD_FLFLAGS;
+ obdo->o_flags |= OBD_FL_SRVLOCK;
+ if (flags & LL_DV_WR_FLUSH)
+ obdo->o_flags |= OBD_FL_FLUSH;
+ }
+
+ set = ptlrpc_prep_set();
+ if (set == NULL)
+ GOTO(out_obdo, rc = -ENOMEM);
+
+ rc = lov_dispatch_obd_info_get(env, obj, &oinfo, set);
+ if (rc == 0)
+ rc = ptlrpc_set_wait(set);
+ ptlrpc_set_destroy(set);
+ if (rc == 0) {
+ oinfo.oi_oa->o_valid &= OBD_MD_FLDATAVERSION | OBD_MD_FLFLAGS;
+ if (flags & LL_DV_WR_FLUSH &&
+ !(oinfo.oi_oa->o_valid & OBD_MD_FLFLAGS &&
+ oinfo.oi_oa->o_flags & OBD_FL_FLUSH))
+ rc = -EOPNOTSUPP;
+ else if (!(obdo->o_valid & OBD_MD_FLDATAVERSION))
+ rc = -EOPNOTSUPP;
+ else
+ *data_version = obdo->o_data_version;
+ }
+out_obdo:
+ OBD_FREE_PTR(obdo);
+out:
+ lov_lsm_put(obj, lsm);
+ RETURN(rc);
+}
+
static int lov_object_getstripe(const struct lu_env *env, struct cl_object *obj,
struct lov_user_md __user *lum)
{
}
static const struct cl_object_operations lov_ops = {
- .coo_page_init = lov_page_init,
- .coo_lock_init = lov_lock_init,
- .coo_io_init = lov_io_init,
- .coo_attr_get = lov_attr_get,
- .coo_attr_update = lov_attr_update,
- .coo_conf_set = lov_conf_set,
- .coo_getstripe = lov_object_getstripe,
- .coo_find_cbdata = lov_object_find_cbdata,
- .coo_fiemap = lov_object_fiemap,
+ .coo_page_init = lov_page_init,
+ .coo_lock_init = lov_lock_init,
+ .coo_io_init = lov_io_init,
+ .coo_attr_get = lov_attr_get,
+ .coo_attr_update = lov_attr_update,
+ .coo_conf_set = lov_conf_set,
+ .coo_getstripe = lov_object_getstripe,
+ .coo_find_cbdata = lov_object_find_cbdata,
+ .coo_fiemap = lov_object_fiemap,
+ .coo_data_version = lov_object_data_version,
};
static const struct lu_object_operations lov_lu_obj_ops = {
* for.
*/
int cl_object_attr_get(const struct lu_env *env, struct cl_object *obj,
- struct cl_attr *attr)
+ struct cl_attr *attr)
{
struct lu_object_header *top;
int result;
assert_spin_locked(cl_object_attr_guard(obj));
ENTRY;
- top = obj->co_lu.lo_header;
- result = 0;
+ top = obj->co_lu.lo_header;
+ result = 0;
list_for_each_entry(obj, &top->loh_layers, co_lu.lo_linkage) {
- if (obj->co_ops->coo_attr_get != NULL) {
- result = obj->co_ops->coo_attr_get(env, obj, attr);
- if (result != 0) {
- if (result > 0)
- result = 0;
- break;
- }
- }
- }
- RETURN(result);
+ if (obj->co_ops->coo_attr_get != NULL) {
+ result = obj->co_ops->coo_attr_get(env, obj, attr);
+ if (result != 0) {
+ if (result > 0)
+ result = 0;
+ break;
+ }
+ }
+ }
+ RETURN(result);
}
EXPORT_SYMBOL(cl_object_attr_get);
}
EXPORT_SYMBOL(cl_object_fiemap);
+int cl_object_obd_info_get(const struct lu_env *env, struct cl_object *obj,
+ struct obd_info *oinfo,
+ struct ptlrpc_request_set *set)
+{
+ struct lu_object_header *top;
+ int result = 0;
+ ENTRY;
+
+ top = obj->co_lu.lo_header;
+ list_for_each_entry(obj, &top->loh_layers, co_lu.lo_linkage) {
+ if (obj->co_ops->coo_obd_info_get != NULL) {
+ result = obj->co_ops->coo_obd_info_get(env, obj, oinfo,
+ set);
+ if (result != 0)
+ break;
+ }
+ }
+ RETURN(result);
+}
+EXPORT_SYMBOL(cl_object_obd_info_get);
+
+int cl_object_data_version(const struct lu_env *env, struct cl_object *obj,
+ __u64 *data_version, int flags)
+{
+ struct lu_object_header *top;
+ int result = 0;
+ ENTRY;
+
+ top = obj->co_lu.lo_header;
+ list_for_each_entry(obj, &top->loh_layers, co_lu.lo_linkage) {
+ if (obj->co_ops->coo_data_version != NULL) {
+ result = obj->co_ops->coo_data_version(env, obj,
+ data_version, flags);
+ if (result != 0)
+ break;
+ }
+ }
+ RETURN(result);
+}
+EXPORT_SYMBOL(cl_object_data_version);
+
/**
* Helper function removing all object locks, and marking object for
* deletion. All object pages must have been deleted at this point.
struct hlist_node oqi_hash;
obd_uid oqi_id;
};
+
+struct osc_async_args {
+ struct obd_info *aa_oi;
+};
+
int osc_quota_setup(struct obd_device *obd);
int osc_quota_cleanup(struct obd_device *obd);
int osc_quota_setdq(struct client_obd *cli, const unsigned int qid[],
struct ldlm_lock *osc_dlmlock_at_pgoff(const struct lu_env *env,
struct osc_object *obj, pgoff_t index,
enum osc_dap_flags flags);
+void osc_pack_req_body(struct ptlrpc_request *req, struct obd_info *oinfo);
+void osc_set_capa_size(struct ptlrpc_request *req,
+ const struct req_msg_field *field, struct obd_capa *oc);
+int osc_getattr_interpret(const struct lu_env *env,
+ struct ptlrpc_request *req,
+ struct osc_async_args *aa, int rc);
#endif /* OSC_INTERNAL_H */
RETURN(rc);
}
+static int osc_object_obd_info_get(const struct lu_env *env,
+ struct cl_object *obj,
+ struct obd_info *oinfo,
+ struct ptlrpc_request_set *set)
+{
+ struct ptlrpc_request *req;
+ struct osc_async_args *aa;
+ int rc;
+ ENTRY;
+
+ req = ptlrpc_request_alloc(class_exp2cliimp(osc_export(cl2osc(obj))),
+ &RQF_OST_GETATTR);
+ if (req == NULL)
+ RETURN(-ENOMEM);
+
+ osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
+ rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
+ if (rc != 0) {
+ ptlrpc_request_free(req);
+ RETURN(rc);
+ }
+
+ osc_pack_req_body(req, oinfo);
+
+ ptlrpc_request_set_replen(req);
+ req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
+
+ CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
+ aa = ptlrpc_req_async_args(req);
+ aa->aa_oi = oinfo;
+
+ ptlrpc_set_add_req(set, req);
+ RETURN(0);
+}
+
void osc_object_set_contended(struct osc_object *obj)
{
obj->oo_contention_time = cfs_time_current();
}
static const struct cl_object_operations osc_ops = {
- .coo_page_init = osc_page_init,
- .coo_lock_init = osc_lock_init,
- .coo_io_init = osc_io_init,
- .coo_attr_get = osc_attr_get,
- .coo_attr_update = osc_attr_update,
- .coo_glimpse = osc_object_glimpse,
- .coo_prune = osc_object_prune,
- .coo_find_cbdata = osc_object_find_cbdata,
- .coo_fiemap = osc_object_fiemap,
+ .coo_page_init = osc_page_init,
+ .coo_lock_init = osc_lock_init,
+ .coo_io_init = osc_io_init,
+ .coo_attr_get = osc_attr_get,
+ .coo_attr_update = osc_attr_update,
+ .coo_glimpse = osc_object_glimpse,
+ .coo_prune = osc_object_prune,
+ .coo_find_cbdata = osc_object_find_cbdata,
+ .coo_fiemap = osc_object_fiemap,
+ .coo_obd_info_get = osc_object_obd_info_get,
};
static const struct lu_object_operations osc_lu_obj_ops = {
#define osc_grant_args osc_brw_async_args
-struct osc_async_args {
- struct obd_info *aa_oi;
-};
-
struct osc_setattr_args {
struct obdo *sa_oa;
obd_enqueue_update_f sa_upcall;
DEBUG_CAPA(D_SEC, c, "pack");
}
-static inline void osc_pack_req_body(struct ptlrpc_request *req,
- struct obd_info *oinfo)
+void osc_pack_req_body(struct ptlrpc_request *req, struct obd_info *oinfo)
{
struct ost_body *body;
osc_pack_capa(req, body, oinfo->oi_capa);
}
-static inline void osc_set_capa_size(struct ptlrpc_request *req,
- const struct req_msg_field *field,
- struct obd_capa *oc)
+void osc_set_capa_size(struct ptlrpc_request *req,
+ const struct req_msg_field *field,
+ struct obd_capa *oc)
{
if (oc == NULL)
req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
;
}
-static int osc_getattr_interpret(const struct lu_env *env,
- struct ptlrpc_request *req,
- struct osc_async_args *aa, int rc)
+int osc_getattr_interpret(const struct lu_env *env,
+ struct ptlrpc_request *req,
+ struct osc_async_args *aa, int rc)
{
struct ost_body *body;
ENTRY;