X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fosp%2Fosp_md_object.c;h=6ff56f4a8af58042f007dbdcf7f027b6e3b3df09;hb=03963106926883cf322e085feb8caa3ea64db1d1;hp=643e31a95eead002765fe2d3c9ef158acca59e6c;hpb=2fe2d1e82005746180309d9b79057a418a729e54;p=fs%2Flustre-release.git diff --git a/lustre/osp/osp_md_object.c b/lustre/osp/osp_md_object.c index 643e31a..6ff56f4 100644 --- a/lustre/osp/osp_md_object.c +++ b/lustre/osp/osp_md_object.c @@ -20,7 +20,7 @@ * GPL HEADER END */ /* - * Copyright (c) 2013, 2015, Intel Corporation. + * Copyright (c) 2013, 2017, Intel Corporation. */ /* * lustre/osp/osp_md_object.c @@ -53,6 +53,7 @@ #define DEBUG_SUBSYSTEM S_MDS +#include #include #include "osp_internal.h" @@ -76,27 +77,22 @@ * * \retval only return 0 for now */ -static int osp_object_create_interpreter(const struct lu_env *env, - struct object_update_reply *reply, - struct ptlrpc_request *req, - struct osp_object *obj, - void *data, int index, int rc) +static int osp_create_interpreter(const struct lu_env *env, + struct object_update_reply *reply, + struct ptlrpc_request *req, + struct osp_object *obj, + void *data, int index, int rc) { if (rc != 0 && rc != -EEXIST) { obj->opo_obj.do_lu.lo_header->loh_attr &= ~LOHA_EXISTS; obj->opo_non_exist = 1; } - /* Invalid the opo cache for the object after the object - * is being created, so attr_get will try to get attr - * from the remote object. XXX this can be improved when - * we have object lock/cache invalidate mechanism in OSP - * layer */ - if (obj->opo_ooa != NULL) { - spin_lock(&obj->opo_lock); - obj->opo_ooa->ooa_attr.la_valid = 0; - spin_unlock(&obj->opo_lock); - } + /* + * invalidate opo cache for the object after the object is created, so + * attr_get will try to get attr from remote object. + */ + osp_obj_invalidate_cache(obj); return 0; } @@ -117,22 +113,10 @@ static int osp_object_create_interpreter(const struct lu_env *env, * \retval 0 if preparation succeeds. * \retval negative errno if preparation fails. */ -int osp_md_declare_object_create(const struct lu_env *env, - struct dt_object *dt, - struct lu_attr *attr, - struct dt_allocation_hint *hint, - struct dt_object_format *dof, - struct thandle *th) +int osp_md_declare_create(const struct lu_env *env, struct dt_object *dt, + struct lu_attr *attr, struct dt_allocation_hint *hint, + struct dt_object_format *dof, struct thandle *th) { - struct osp_object *obj = dt2osp_obj(dt); - int rc; - - if (obj->opo_ooa == NULL) { - rc = osp_oac_init(obj); - if (rc != 0) - return rc; - } - return osp_trans_update_request_create(th); } @@ -169,34 +153,39 @@ update_buffer_get_update(struct object_update_request *request, * \retval 0 if packing creation succeeds. * \retval negative errno if packing creation fails. */ -int osp_md_object_create(const struct lu_env *env, struct dt_object *dt, - struct lu_attr *attr, struct dt_allocation_hint *hint, - struct dt_object_format *dof, struct thandle *th) +int osp_md_create(const struct lu_env *env, struct dt_object *dt, + struct lu_attr *attr, struct dt_allocation_hint *hint, + struct dt_object_format *dof, struct thandle *th) { - struct osp_update_request *update; - struct osp_object *obj = dt2osp_obj(dt); - int rc; + struct osp_update_request *update; + struct osp_object *obj = dt2osp_obj(dt); + int rc; update = thandle_to_osp_update_request(th); LASSERT(update != NULL); - LASSERT(attr->la_valid & LA_TYPE); - rc = osp_update_rpc_pack(env, create, update, OUT_CREATE, + if (!(attr->la_valid & LA_TYPE)) { + CERROR("%s: create type not specified: valid %llx\n", + dt->do_lu.lo_dev->ld_obd->obd_name, attr->la_valid); + GOTO(out, rc = -EINVAL); + } + + rc = OSP_UPDATE_RPC_PACK(env, out_create_pack, update, lu_object_fid(&dt->do_lu), attr, hint, dof); if (rc != 0) GOTO(out, rc); rc = osp_insert_update_callback(env, update, dt2osp_obj(dt), NULL, - osp_object_create_interpreter); + osp_create_interpreter); if (rc < 0) GOTO(out, rc); dt->do_lu.lo_header->loh_attr |= LOHA_EXISTS | (attr->la_mode & S_IFMT); dt2osp_obj(dt)->opo_non_exist = 0; + obj->opo_stale = 0; - LASSERT(obj->opo_ooa != NULL); - obj->opo_ooa->ooa_attr = *attr; + obj->opo_attr = *attr; out: return rc; } @@ -236,13 +225,13 @@ static int osp_md_declare_ref_del(const struct lu_env *env, static int osp_md_ref_del(const struct lu_env *env, struct dt_object *dt, struct thandle *th) { - struct osp_update_request *update; - int rc; + struct osp_update_request *update; + int rc; update = thandle_to_osp_update_request(th); LASSERT(update != NULL); - rc = osp_update_rpc_pack(env, ref_del, update, OUT_REF_DEL, + rc = OSP_UPDATE_RPC_PACK(env, out_ref_del_pack, update, lu_object_fid(&dt->do_lu)); return rc; } @@ -282,13 +271,13 @@ static int osp_md_declare_ref_add(const struct lu_env *env, static int osp_md_ref_add(const struct lu_env *env, struct dt_object *dt, struct thandle *th) { - struct osp_update_request *update; - int rc; + struct osp_update_request *update; + int rc; update = thandle_to_osp_update_request(th); LASSERT(update != NULL); - rc = osp_update_rpc_pack(env, ref_add, update, OUT_REF_ADD, + rc = OSP_UPDATE_RPC_PACK(env, out_ref_add_pack, update, lu_object_fid(&dt->do_lu)); return rc; } @@ -316,7 +305,6 @@ static void osp_md_ah_init(const struct lu_env *env, LASSERT(ah); ah->dah_parent = parent; - ah->dah_mode = child_mode; } /** @@ -358,13 +346,13 @@ int osp_md_declare_attr_set(const struct lu_env *env, struct dt_object *dt, int osp_md_attr_set(const struct lu_env *env, struct dt_object *dt, const struct lu_attr *attr, struct thandle *th) { - struct osp_update_request *update; - int rc; + struct osp_update_request *update; + int rc; update = thandle_to_osp_update_request(th); LASSERT(update != NULL); - rc = osp_update_rpc_pack(env, attr_set, update, OUT_ATTR_SET, + rc = OSP_UPDATE_RPC_PACK(env, out_attr_set_pack, update, lu_object_fid(&dt->do_lu), attr); return rc; } @@ -372,17 +360,17 @@ int osp_md_attr_set(const struct lu_env *env, struct dt_object *dt, /** * Implementation of dt_object_operations::do_read_lock * - * osp_md_object_{read,write}_lock() will only lock the remote object in the + * osp_md_{read,write}_lock() will only lock the remote object in the * local cache, which uses the semaphore (opo_sem) inside the osp_object to * lock the object. Note: it will not lock the object in the whole cluster, * which relies on the LDLM lock. * * \param[in] env execution environment * \param[in] dt object to be locked - * \param[in] role lock role from MDD layer, see mdd_object_role(). + * \param[in] role lock role from MDD layer, see dt_object_role(). */ -static void osp_md_object_read_lock(const struct lu_env *env, - struct dt_object *dt, unsigned role) +static void osp_md_read_lock(const struct lu_env *env, struct dt_object *dt, + unsigned role) { struct osp_object *obj = dt2osp_obj(dt); @@ -399,10 +387,10 @@ static void osp_md_object_read_lock(const struct lu_env *env, * * \param[in] env execution environment * \param[in] dt object to be locked - * \param[in] role lock role from MDD layer, see mdd_object_role(). + * \param[in] role lock role from MDD layer, see dt_object_role(). */ -static void osp_md_object_write_lock(const struct lu_env *env, - struct dt_object *dt, unsigned role) +static void osp_md_write_lock(const struct lu_env *env, struct dt_object *dt, + unsigned role) { struct osp_object *obj = dt2osp_obj(dt); @@ -420,8 +408,7 @@ static void osp_md_object_write_lock(const struct lu_env *env, * \param[in] env execution environment * \param[in] dt object to be unlocked */ -static void osp_md_object_read_unlock(const struct lu_env *env, - struct dt_object *dt) +static void osp_md_read_unlock(const struct lu_env *env, struct dt_object *dt) { struct osp_object *obj = dt2osp_obj(dt); @@ -436,8 +423,7 @@ static void osp_md_object_read_unlock(const struct lu_env *env, * \param[in] env execution environment * \param[in] dt object to be unlocked */ -static void osp_md_object_write_unlock(const struct lu_env *env, - struct dt_object *dt) +static void osp_md_write_unlock(const struct lu_env *env, struct dt_object *dt) { struct osp_object *obj = dt2osp_obj(dt); @@ -454,8 +440,7 @@ static void osp_md_object_write_unlock(const struct lu_env *env, * \param[in] env execution environment * \param[in] dt object to be tested */ -static int osp_md_object_write_locked(const struct lu_env *env, - struct dt_object *dt) +static int osp_md_write_locked(const struct lu_env *env, struct dt_object *dt) { struct osp_object *obj = dt2osp_obj(dt); @@ -497,7 +482,7 @@ static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt, if (IS_ERR(update)) RETURN(PTR_ERR(update)); - rc = osp_update_rpc_pack(env, index_lookup, update, OUT_INDEX_LOOKUP, + rc = OSP_UPDATE_RPC_PACK(env, out_index_lookup_pack, update, lu_object_fid(&dt->do_lu), rec, key); if (rc != 0) { CERROR("%s: Insert update error: rc = %d\n", @@ -532,7 +517,7 @@ static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt, } fid = lbuf->lb_buf; - if (ptlrpc_rep_need_swab(req)) + if (req_capsule_rep_need_swab(&req->rq_pill)) lustre_swab_lu_fid(fid); if (!fid_is_sane(fid)) { CERROR("%s: lookup "DFID" %s invalid fid "DFID"\n", @@ -549,7 +534,7 @@ out: if (req != NULL) ptlrpc_req_finished(req); - osp_update_request_destroy(update); + osp_update_request_destroy(env, update); return rc; } @@ -589,25 +574,21 @@ static int osp_md_declare_index_insert(const struct lu_env *env, * \param[in] rec record of the index to be inserted * \param[in] key key of the index to be inserted * \param[in] th the transaction handle - * \param[in] ignore_quota quota enforcement for insert * * \retval 0 if packing index insert succeeds. * \retval negative errno if packing fails. */ -static int osp_md_index_insert(const struct lu_env *env, - struct dt_object *dt, +static int osp_md_index_insert(const struct lu_env *env, struct dt_object *dt, const struct dt_rec *rec, - const struct dt_key *key, - struct thandle *th, - int ignore_quota) + const struct dt_key *key, struct thandle *th) { struct osp_update_request *update; - int rc; + int rc; update = thandle_to_osp_update_request(th); LASSERT(update != NULL); - rc = osp_update_rpc_pack(env, index_insert, update, OUT_INDEX_INSERT, + rc = OSP_UPDATE_RPC_PACK(env, out_index_insert_pack, update, lu_object_fid(&dt->do_lu), rec, key); return rc; } @@ -659,7 +640,7 @@ static int osp_md_index_delete(const struct lu_env *env, update = thandle_to_osp_update_request(th); LASSERT(update != NULL); - rc = osp_update_rpc_pack(env, index_delete, update, OUT_INDEX_DELETE, + rc = OSP_UPDATE_RPC_PACK(env, out_index_delete_pack, update, lu_object_fid(&dt->do_lu), key); return rc; @@ -838,6 +819,95 @@ const struct dt_index_operations osp_md_index_ops = { }; /** + * Implement OSP layer dt_object_operations::do_xattr_list() interface. + * + * List extended attribute from the specified MDT/OST object, result is not + * cached because this is called by directory migration only. + * + * \param[in] env pointer to the thread context + * \param[in] dt pointer to the OSP layer dt_object + * \param[out] buf pointer to the lu_buf to hold the extended attribute + * + * \retval positive bytes used/required in the buffer + * \retval negative error number on failure + */ +static int osp_md_xattr_list(const struct lu_env *env, struct dt_object *dt, + const struct lu_buf *buf) +{ + struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev); + struct osp_object *obj = dt2osp_obj(dt); + struct dt_device *dev = &osp->opd_dt_dev; + struct lu_buf *rbuf = &osp_env_info(env)->osi_lb2; + struct osp_update_request *update = NULL; + struct ptlrpc_request *req = NULL; + struct object_update_reply *reply; + const char *dname = dt->do_lu.lo_dev->ld_obd->obd_name; + int rc = 0; + + ENTRY; + + LASSERT(buf); + + if (unlikely(obj->opo_non_exist)) + RETURN(-ENOENT); + + update = osp_update_request_create(dev); + if (IS_ERR(update)) + RETURN(PTR_ERR(update)); + + rc = OSP_UPDATE_RPC_PACK(env, out_xattr_list_pack, update, + lu_object_fid(&dt->do_lu), buf->lb_len); + if (rc) { + CERROR("%s: Insert update error "DFID": rc = %d\n", + dname, PFID(lu_object_fid(&dt->do_lu)), rc); + GOTO(out, rc); + } + + rc = osp_remote_sync(env, osp, update, &req); + if (rc < 0) { + if (rc == -ENOENT) { + dt->do_lu.lo_header->loh_attr &= ~LOHA_EXISTS; + obj->opo_non_exist = 1; + } + GOTO(out, rc); + } + + reply = req_capsule_server_sized_get(&req->rq_pill, + &RMF_OUT_UPDATE_REPLY, + OUT_UPDATE_REPLY_SIZE); + if (reply->ourp_magic != UPDATE_REPLY_MAGIC) { + DEBUG_REQ(D_ERROR, req, + "%s: Wrong version %x expected %x "DFID": rc = %d", + dname, reply->ourp_magic, UPDATE_REPLY_MAGIC, + PFID(lu_object_fid(&dt->do_lu)), -EPROTO); + + GOTO(out, rc = -EPROTO); + } + + rc = object_update_result_data_get(reply, rbuf, 0); + if (rc < 0) + GOTO(out, rc); + + if (!buf->lb_buf) + GOTO(out, rc); + + if (unlikely(buf->lb_len < rbuf->lb_len)) + GOTO(out, rc = -ERANGE); + + memcpy(buf->lb_buf, rbuf->lb_buf, rbuf->lb_len); + EXIT; + +out: + if (req) + ptlrpc_req_finished(req); + + if (update && !IS_ERR(update)) + osp_update_request_destroy(env, update); + + return rc; +} + +/** * Implementation of dt_object_operations::do_index_try * * Try to initialize the index API pointer for the given object. This @@ -883,54 +953,30 @@ static int osp_md_object_lock(const struct lu_env *env, union ldlm_policy_data *policy) { struct ldlm_res_id *res_id; - struct dt_device *dt_dev = lu2dt_dev(dt->do_lu.lo_dev); - struct osp_device *osp = dt2osp_dev(dt_dev); - struct lu_device *top_device; + struct osp_device *osp = dt2osp_dev(lu2dt_dev(dt->do_lu.lo_dev)); struct ptlrpc_request *req; int rc = 0; - __u64 flags = 0; - enum ldlm_mode mode; + __u64 flags = LDLM_FL_NO_LRU; + ENTRY; res_id = einfo->ei_res_id; LASSERT(res_id != NULL); - mode = ldlm_lock_match(osp->opd_obd->obd_namespace, - LDLM_FL_BLOCK_GRANTED, res_id, - einfo->ei_type, policy, - einfo->ei_mode, lh, 0); - if (mode > 0) - return ELDLM_OK; - - if (einfo->ei_nonblock) - flags |= LDLM_FL_BLOCK_NOWAIT; + if (einfo->ei_mode & (LCK_EX | LCK_PW)) + flags |= LDLM_FL_COS_INCOMPAT; req = ldlm_enqueue_pack(osp->opd_exp, 0); if (IS_ERR(req)) RETURN(PTR_ERR(req)); - /* During recovery, it needs to let OSP send enqueue - * without checking recoverying status, in case the - * other target is being recovered at the same time, - * and if we wait here for the import to be recovered, - * it might cause deadlock */ - top_device = dt_dev->dd_lu_dev.ld_site->ls_top_dev; - if (top_device->ld_obd->obd_recovering) - req->rq_allow_replay = 1; - + osp_set_req_replay(osp, req); rc = ldlm_cli_enqueue(osp->opd_exp, &req, einfo, res_id, (const union ldlm_policy_data *)policy, &flags, NULL, 0, LVB_T_NONE, lh, 0); ptlrpc_req_finished(req); - if (rc == ELDLM_OK) { - struct ldlm_lock *lock; - - lock = __ldlm_handle2lock(lh, 0); - ldlm_set_cbpending(lock); - LDLM_LOCK_PUT(lock); - } - return rc == ELDLM_OK ? 0 : -EIO; + RETURN(rc == ELDLM_OK ? 0 : -EIO); } /** @@ -971,12 +1017,21 @@ static int osp_md_object_unlock(const struct lu_env *env, * \retval 0 for success * \retval negative error number on failure */ -int osp_md_declare_object_destroy(const struct lu_env *env, - struct dt_object *dt, struct thandle *th) +int osp_md_declare_destroy(const struct lu_env *env, struct dt_object *dt, + struct thandle *th) { return osp_trans_update_request_create(th); } +static int osp_destroy_interpreter(const struct lu_env *env, + struct object_update_reply *reply, + struct ptlrpc_request *req, + struct osp_object *obj, + void *data, int index, int rc) +{ + return 0; +} + /** * Implement OSP layer dt_object_operations::do_destroy() interface. * @@ -992,26 +1047,33 @@ int osp_md_declare_object_destroy(const struct lu_env *env, * \retval 0 for success * \retval negative error number on failure */ -int osp_md_object_destroy(const struct lu_env *env, struct dt_object *dt, - struct thandle *th) +int osp_md_destroy(const struct lu_env *env, struct dt_object *dt, + struct thandle *th) { - struct osp_object *o = dt2osp_obj(dt); - struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev); - struct osp_update_request *update; - int rc = 0; - + struct osp_object *o = dt2osp_obj(dt); + struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev); + struct osp_update_request *update; + int rc = 0; ENTRY; + o->opo_non_exist = 1; + o->opo_destroyed = 1; LASSERT(osp->opd_connect_mdt); update = thandle_to_osp_update_request(th); LASSERT(update != NULL); - rc = osp_update_rpc_pack(env, object_destroy, update, OUT_DESTROY, + rc = OSP_UPDATE_RPC_PACK(env, out_destroy_pack, update, lu_object_fid(&dt->do_lu)); if (rc != 0) RETURN(rc); + /* retain the object and it's status until it's destroyed on remote */ + rc = osp_insert_update_callback(env, update, o, NULL, + osp_destroy_interpreter); + if (rc != 0) + RETURN(rc); + set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags); rc = osp_insert_update_callback(env, update, dt2osp_obj(dt), NULL, NULL); @@ -1019,25 +1081,26 @@ int osp_md_object_destroy(const struct lu_env *env, struct dt_object *dt, RETURN(rc); } -struct dt_object_operations osp_md_obj_ops = { - .do_read_lock = osp_md_object_read_lock, - .do_write_lock = osp_md_object_write_lock, - .do_read_unlock = osp_md_object_read_unlock, - .do_write_unlock = osp_md_object_write_unlock, - .do_write_locked = osp_md_object_write_locked, - .do_declare_create = osp_md_declare_object_create, - .do_create = osp_md_object_create, +const struct dt_object_operations osp_md_obj_ops = { + .do_read_lock = osp_md_read_lock, + .do_write_lock = osp_md_write_lock, + .do_read_unlock = osp_md_read_unlock, + .do_write_unlock = osp_md_write_unlock, + .do_write_locked = osp_md_write_locked, + .do_declare_create = osp_md_declare_create, + .do_create = osp_md_create, .do_declare_ref_add = osp_md_declare_ref_add, .do_ref_add = osp_md_ref_add, .do_declare_ref_del = osp_md_declare_ref_del, .do_ref_del = osp_md_ref_del, - .do_declare_destroy = osp_md_declare_object_destroy, - .do_destroy = osp_md_object_destroy, + .do_declare_destroy = osp_md_declare_destroy, + .do_destroy = osp_md_destroy, .do_ah_init = osp_md_ah_init, .do_attr_get = osp_attr_get, .do_declare_attr_set = osp_md_declare_attr_set, .do_attr_set = osp_md_attr_set, .do_xattr_get = osp_xattr_get, + .do_xattr_list = osp_md_xattr_list, .do_declare_xattr_set = osp_declare_xattr_set, .do_xattr_set = osp_xattr_set, .do_declare_xattr_del = osp_declare_xattr_del, @@ -1045,6 +1108,8 @@ struct dt_object_operations osp_md_obj_ops = { .do_index_try = osp_md_index_try, .do_object_lock = osp_md_object_lock, .do_object_unlock = osp_md_object_unlock, + .do_invalidate = osp_invalidate, + .do_check_stale = osp_check_stale, }; /** @@ -1067,7 +1132,41 @@ static ssize_t osp_md_declare_write(const struct lu_env *env, const struct lu_buf *buf, loff_t pos, struct thandle *th) { - return osp_trans_update_request_create(th); + struct osp_device *osp = dt2osp_dev(th->th_dev); + int rc; + + if (dt2osp_obj(dt)->opo_destroyed) + return -ENOENT; + + rc = osp_trans_update_request_create(th); + if (rc != 0) + return rc; + + if (osp->opd_update == NULL) + return 0; + + if (dt2osp_obj(dt)->opo_stale) + return -ESTALE; + + return 0; +} + +static int osp_write_interpreter(const struct lu_env *env, + struct object_update_reply *reply, + struct ptlrpc_request *req, + struct osp_object *obj, + void *data, int index, int rc) +{ + if (rc) { + CDEBUG(D_HA, "error "DFID": rc = %d\n", + PFID(lu_object_fid(&obj->opo_obj.do_lu)), rc); + OBD_RACE(OBD_FAIL_OUT_OBJECT_MISS); + spin_lock(&obj->opo_lock); + obj->opo_attr.la_valid = 0; + obj->opo_stale = 1; + spin_unlock(&obj->opo_lock); + } + return 0; } /** @@ -1081,14 +1180,13 @@ static ssize_t osp_md_declare_write(const struct lu_env *env, * \param[in] buf buffer to write which includes an embedded size field * \param[in] pos offet in the object to start writing at * \param[in] th transaction handle - * \param[in] ignore_quota quota enforcement for this write * * \retval the buffer size in bytes if packing succeeds. * \retval negative errno if packing fails. */ static ssize_t osp_md_write(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, loff_t *pos, - struct thandle *th, int ignore_quota) + struct thandle *th) { struct osp_object *obj = dt2osp_obj(dt); struct osp_update_request *update; @@ -1096,32 +1194,58 @@ static ssize_t osp_md_write(const struct lu_env *env, struct dt_object *dt, ssize_t rc; ENTRY; + if (obj->opo_destroyed) + RETURN(-ENOENT); + update = thandle_to_osp_update_request(th); LASSERT(update != NULL); - rc = osp_update_rpc_pack(env, write, update, OUT_WRITE, + CDEBUG(D_INFO, "write "DFID" offset = %llu length = %zu\n", + PFID(lu_object_fid(&dt->do_lu)), *pos, buf->lb_len); + + rc = OSP_UPDATE_RPC_PACK(env, out_write_pack, update, lu_object_fid(&dt->do_lu), buf, *pos); if (rc < 0) RETURN(rc); - CDEBUG(D_INFO, "write "DFID" offset = "LPU64" length = %zu\n", - PFID(lu_object_fid(&dt->do_lu)), *pos, buf->lb_len); + rc = osp_check_and_set_rpc_version(oth, obj); + if (rc < 0) + RETURN(rc); + + /* to be able to invalidate object's state in case of an error */ + rc = osp_insert_update_callback(env, update, obj, NULL, + osp_write_interpreter); + if (rc < 0) + RETURN(rc); /* XXX: how about the write error happened later? */ *pos += buf->lb_len; - if (obj->opo_ooa != NULL && - obj->opo_ooa->ooa_attr.la_valid & LA_SIZE && - obj->opo_ooa->ooa_attr.la_size < *pos) - obj->opo_ooa->ooa_attr.la_size = *pos; + if (obj->opo_attr.la_valid & LA_SIZE && obj->opo_attr.la_size < *pos) + obj->opo_attr.la_size = *pos; - rc = osp_check_and_set_rpc_version(oth); - if (rc < 0) - RETURN(rc); + spin_lock(&obj->opo_lock); + if (list_empty(&obj->opo_invalidate_cb_list)) { + lu_object_get(&obj->opo_obj.do_lu); + + list_add_tail(&obj->opo_invalidate_cb_list, + &update->our_invalidate_cb_list); + } + spin_unlock(&obj->opo_lock); RETURN(buf->lb_len); } +static inline void orr_le_to_cpu(struct out_read_reply *orr_dst, + const struct out_read_reply *orr_src) +{ + orr_dst->orr_size = le32_to_cpu(orr_src->orr_size); + orr_dst->orr_padding = le32_to_cpu(orr_src->orr_padding); + orr_dst->orr_offset = le64_to_cpu(orr_dst->orr_offset); +} + + + static ssize_t osp_md_read(const struct lu_env *env, struct dt_object *dt, struct lu_buf *rbuf, loff_t *pos) { @@ -1129,63 +1253,56 @@ static ssize_t osp_md_read(const struct lu_env *env, struct dt_object *dt, struct dt_device *dt_dev = &osp->opd_dt_dev; struct lu_buf *lbuf = &osp_env_info(env)->osi_lb2; char *ptr = rbuf->lb_buf; - struct osp_update_request *update = NULL; + struct osp_update_request *update; struct ptlrpc_request *req = NULL; struct out_read_reply *orr; struct ptlrpc_bulk_desc *desc; struct object_update_reply *reply; - __u32 left_size; - int nbufs; - int i; + int pages; int rc; ENTRY; + if (dt2osp_obj(dt)->opo_destroyed) + RETURN(-ENOENT); + /* Because it needs send the update buffer right away, * just create an update buffer, instead of attaching the * update_remote list of the thandle. */ update = osp_update_request_create(dt_dev); if (IS_ERR(update)) - GOTO(out, rc = PTR_ERR(update)); + RETURN(PTR_ERR(update)); - rc = osp_update_rpc_pack(env, read, update, OUT_READ, + rc = OSP_UPDATE_RPC_PACK(env, out_read_pack, update, lu_object_fid(&dt->do_lu), rbuf->lb_len, *pos); if (rc != 0) { CERROR("%s: cannot insert update: rc = %d\n", dt_dev->dd_lu_dev.ld_obd->obd_name, rc); - GOTO(out, rc); + GOTO(out_update, rc); } + CDEBUG(D_INFO, "%s "DFID" read offset %llu size %zu\n", + dt_dev->dd_lu_dev.ld_obd->obd_name, + PFID(lu_object_fid(&dt->do_lu)), *pos, rbuf->lb_len); rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import, update, &req); if (rc != 0) - GOTO(out, rc); + GOTO(out_update, rc); + + /* First *and* last might be partial pages, hence +1 */ + pages = DIV_ROUND_UP(rbuf->lb_len, PAGE_SIZE) + 1; - nbufs = (rbuf->lb_len + OUT_BULK_BUFFER_SIZE - 1) / - OUT_BULK_BUFFER_SIZE; /* allocate bulk descriptor */ - desc = ptlrpc_prep_bulk_imp(req, nbufs, 1, - PTLRPC_BULK_PUT_SINK | PTLRPC_BULK_BUF_KVEC, - MDS_BULK_PORTAL, &ptlrpc_bulk_kvec_ops); + desc = ptlrpc_prep_bulk_imp(req, pages, 1, + PTLRPC_BULK_PUT_SINK, + MDS_BULK_PORTAL, + &ptlrpc_bulk_kiov_nopin_ops); if (desc == NULL) GOTO(out, rc = -ENOMEM); - /* split the buffer into small chunk size */ - left_size = rbuf->lb_len; - for (i = 0; i < nbufs; i++) { - int read_size; - - read_size = left_size > OUT_BULK_BUFFER_SIZE ? - OUT_BULK_BUFFER_SIZE : left_size; - desc->bd_frag_ops->add_iov_frag(desc, ptr, read_size); - - ptr += read_size; - } + desc->bd_frag_ops->add_iov_frag(desc, ptr, rbuf->lb_len); - /* This will only be called with read-only update, and these updates - * might be used to retrieve update log during recovery process, so - * it will be allowed to send during recovery process */ - req->rq_allow_replay = 1; + osp_set_req_replay(osp, req); req->rq_bulk_read = 1; /* send request to master and wait for RPC to complete */ rc = ptlrpc_queue_wait(req); @@ -1220,17 +1337,16 @@ static ssize_t osp_md_read(const struct lu_env *env, struct dt_object *dt, rc = orr->orr_size; *pos = orr->orr_offset; out: - if (req != NULL) - ptlrpc_req_finished(req); + ptlrpc_req_finished(req); - if (update != NULL) - osp_update_request_destroy(update); +out_update: + osp_update_request_destroy(env, update); RETURN(rc); } /* These body operation will be used to write symlinks during migration etc */ -struct dt_body_operations osp_md_body_ops = { +const struct dt_body_operations osp_md_body_ops = { .dbo_declare_write = osp_md_declare_write, .dbo_write = osp_md_write, .dbo_read = osp_md_read,