X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fosp%2Fosp_md_object.c;h=b607b78a94339fa475193c4f32d4048dd0fee6df;hp=237e8aec8e9e7470a19169e26e3f72c99e9e3f82;hb=9e1071b517578ed3752efb1412017c8f93cd333b;hpb=164a6637e21114da38102bb7809342f00c4a99a4 diff --git a/lustre/osp/osp_md_object.c b/lustre/osp/osp_md_object.c index 237e8ae..b607b78 100644 --- a/lustre/osp/osp_md_object.c +++ b/lustre/osp/osp_md_object.c @@ -20,7 +20,7 @@ * GPL HEADER END */ /* - * Copyright (c) 2013, Intel Corporation. + * Copyright (c) 2013, 2017, Intel Corporation. */ /* * lustre/osp/osp_md_object.c @@ -53,19 +53,55 @@ #define DEBUG_SUBSYSTEM S_MDS +#include #include #include "osp_internal.h" -static const char dot[] = "."; -static const char dotdot[] = ".."; +#define OUT_UPDATE_BUFFER_SIZE_ADD 4096 +#define OUT_UPDATE_BUFFER_SIZE_MAX (256 * 4096) /* 1M update size now */ + +/** + * Interpreter call for object creation + * + * Object creation interpreter, which will be called after creating + * the remote object to set flags and status. + * + * \param[in] env execution environment + * \param[in] reply update reply + * \param[in] req ptlrpc update request for creating object + * \param[in] obj object to be created + * \param[in] data data used in this function. + * \param[in] index index(position) of create update in the whole + * updates + * \param[in] rc update result on the remote MDT. + * + * \retval only return 0 for now + */ +static int osp_create_interpreter(const struct lu_env *env, + struct object_update_reply *reply, + struct ptlrpc_request *req, + struct osp_object *obj, + void *data, int index, int rc) +{ + if (rc != 0 && rc != -EEXIST) { + obj->opo_obj.do_lu.lo_header->loh_attr &= ~LOHA_EXISTS; + obj->opo_non_exist = 1; + } + + /* + * invalidate opo cache for the object after the object is created, so + * attr_get will try to get attr from remote object. + */ + osp_obj_invalidate_cache(obj); + + return 0; +} /** * Implementation of dt_object_operations::do_declare_create * - * Insert object create update into the RPC, which will be sent during - * transaction start. Note: if the object has already been created, - * we must add object destroy updates ahead of create updates, so it will - * destroy then recreate the object. + * Create the osp_update_request to track the update for this OSP + * in the transaction. * * \param[in] env execution environment * \param[in] dt remote object to be created @@ -74,105 +110,38 @@ static const char dotdot[] = ".."; * \param[in] dof creation format information * \param[in] th the transaction handle * - * \retval 0 if the insertion succeeds. - * \retval negative errno if the insertion fails. + * \retval 0 if preparation succeeds. + * \retval negative errno if preparation fails. */ -int osp_md_declare_object_create(const struct lu_env *env, - struct dt_object *dt, - struct lu_attr *attr, - struct dt_allocation_hint *hint, - struct dt_object_format *dof, - struct thandle *th) +int osp_md_declare_create(const struct lu_env *env, struct dt_object *dt, + struct lu_attr *attr, struct dt_allocation_hint *hint, + struct dt_object_format *dof, struct thandle *th) { - struct osp_thread_info *osi = osp_env_info(env); - struct dt_update_request *update; - struct lu_fid *fid1; - int sizes[2] = {sizeof(struct obdo), 0}; - char *bufs[2] = {NULL, NULL}; - int buf_count; - int rc; - - update = out_find_create_update_loc(th, dt); - if (IS_ERR(update)) { - CERROR("%s: Get OSP update buf failed: rc = %d\n", - dt->do_lu.lo_dev->ld_obd->obd_name, - (int)PTR_ERR(update)); - return PTR_ERR(update); - } - - osi->osi_obdo.o_valid = 0; - obdo_from_la(&osi->osi_obdo, attr, attr->la_valid); - lustre_set_wire_obdo(NULL, &osi->osi_obdo, &osi->osi_obdo); - - bufs[0] = (char *)&osi->osi_obdo; - buf_count = 1; - fid1 = (struct lu_fid *)lu_object_fid(&dt->do_lu); - if (hint != NULL && hint->dah_parent) { - struct lu_fid *fid2; - - fid2 = (struct lu_fid *)lu_object_fid(&hint->dah_parent->do_lu); - sizes[1] = sizeof(*fid2); - bufs[1] = (char *)fid2; - buf_count++; - } - - if (lu_object_exists(&dt->do_lu)) { - /* If the object already exists, we needs to destroy - * this orphan object first. - * - * The scenario might happen in this case - * - * 1. client send remote create to MDT0. - * 2. MDT0 send create update to MDT1. - * 3. MDT1 finished create synchronously. - * 4. MDT0 failed and reboot. - * 5. client resend remote create to MDT0. - * 6. MDT0 tries to resend create update to MDT1, - * but find the object already exists - */ - CDEBUG(D_HA, "%s: object "DFID" exists, destroy this orphan\n", - dt->do_lu.lo_dev->ld_obd->obd_name, PFID(fid1)); - - rc = out_insert_update(env, update, OUT_REF_DEL, fid1, 0, - NULL, NULL); - if (rc != 0) - GOTO(out, rc); - - if (S_ISDIR(lu_object_attr(&dt->do_lu))) { - /* decrease for ".." */ - rc = out_insert_update(env, update, OUT_REF_DEL, fid1, - 0, NULL, NULL); - if (rc != 0) - GOTO(out, rc); - } + return osp_trans_update_request_create(th); +} - rc = out_insert_update(env, update, OUT_DESTROY, fid1, 0, NULL, - NULL); - if (rc != 0) - GOTO(out, rc); +struct object_update * +update_buffer_get_update(struct object_update_request *request, + unsigned int index) +{ + void *ptr; + int i; - dt->do_lu.lo_header->loh_attr &= ~LOHA_EXISTS; - /* Increase batchid to add this orphan object deletion - * to separate transaction */ - update_inc_batchid(update); - } + if (index > request->ourq_count) + return NULL; - rc = out_insert_update(env, update, OUT_CREATE, fid1, buf_count, sizes, - (const char **)bufs); -out: - if (rc) - CERROR("%s: Insert update error: rc = %d\n", - dt->do_lu.lo_dev->ld_obd->obd_name, rc); + ptr = &request->ourq_updates[0]; + for (i = 0; i < index; i++) + ptr += object_update_size(ptr); - return rc; + return ptr; } /** * Implementation of dt_object_operations::do_create * - * It sets necessary flags for created object. In DNE phase I, - * remote updates are actually executed during transaction start, - * i.e. the object has already been created when calling this method. + * It adds an OUT_CREATE sub-request into the OUT RPC that will be flushed + * when the transaction stop, and sets necessary flags for created object. * * \param[in] env execution environment * \param[in] dt object to be created @@ -181,139 +150,131 @@ out: * \param[in] dof creation format information * \param[in] th the transaction handle * - * \retval only return 0 for now + * \retval 0 if packing creation succeeds. + * \retval negative errno if packing creation fails. */ -int osp_md_object_create(const struct lu_env *env, struct dt_object *dt, - struct lu_attr *attr, struct dt_allocation_hint *hint, - struct dt_object_format *dof, struct thandle *th) +int osp_md_create(const struct lu_env *env, struct dt_object *dt, + struct lu_attr *attr, struct dt_allocation_hint *hint, + struct dt_object_format *dof, struct thandle *th) { - CDEBUG(D_INFO, "create object "DFID"\n", - PFID(&dt->do_lu.lo_header->loh_fid)); + struct osp_update_request *update; + struct osp_object *obj = dt2osp_obj(dt); + int rc; + + update = thandle_to_osp_update_request(th); + LASSERT(update != NULL); + + LASSERT(attr->la_valid & LA_TYPE); + rc = OSP_UPDATE_RPC_PACK(env, out_create_pack, update, + lu_object_fid(&dt->do_lu), attr, hint, dof); + if (rc != 0) + GOTO(out, rc); + + rc = osp_insert_update_callback(env, update, dt2osp_obj(dt), NULL, + osp_create_interpreter); + + if (rc < 0) + GOTO(out, rc); - /* Because the create update RPC will be sent during declare phase, - * if creation reaches here, it means the object has been created - * successfully */ dt->do_lu.lo_header->loh_attr |= LOHA_EXISTS | (attr->la_mode & S_IFMT); dt2osp_obj(dt)->opo_non_exist = 0; + obj->opo_stale = 0; - return 0; + obj->opo_attr = *attr; +out: + return rc; } /** * Implementation of dt_object_operations::do_declare_ref_del * - * Declare decreasing the reference count of the remote object, i.e. insert - * decreasing object reference count update into the RPC, which will be sent - * during transaction start. + * Create the osp_update_request to track the update for this OSP + * in the transaction. * * \param[in] env execution environment * \param[in] dt object to decrease the reference count. * \param[in] th the transaction handle of refcount decrease. * - * \retval 0 if the insertion succeeds. - * \retval negative errno if the insertion fails. + * \retval 0 if preparation succeeds. + * \retval negative errno if preparation fails. */ -static int osp_md_declare_object_ref_del(const struct lu_env *env, - struct dt_object *dt, - struct thandle *th) +static int osp_md_declare_ref_del(const struct lu_env *env, + struct dt_object *dt, struct thandle *th) { - struct dt_update_request *update; - struct lu_fid *fid; - int rc; - - update = out_find_create_update_loc(th, dt); - if (IS_ERR(update)) { - CERROR("%s: Get OSP update buf failed: rc = %d\n", - dt->do_lu.lo_dev->ld_obd->obd_name, - (int)PTR_ERR(update)); - return PTR_ERR(update); - } - - fid = (struct lu_fid *)lu_object_fid(&dt->do_lu); - - rc = out_insert_update(env, update, OUT_REF_DEL, fid, 0, NULL, NULL); - - return rc; + return osp_trans_update_request_create(th); } /** * Implementation of dt_object_operations::do_ref_del * - * Do nothing in this method for now. In DNE phase I, remote updates are - * actually executed during transaction start, i.e. the object reference - * count has already been decreased when calling this method. + * Add an OUT_REF_DEL sub-request into the OUT RPC that will be + * flushed when the transaction stop. * * \param[in] env execution environment * \param[in] dt object to decrease the reference count * \param[in] th the transaction handle * - * \retval only return 0 for now + * \retval 0 if packing ref_del succeeds. + * \retval negative errno if packing fails. */ -static int osp_md_object_ref_del(const struct lu_env *env, - struct dt_object *dt, - struct thandle *th) +static int osp_md_ref_del(const struct lu_env *env, struct dt_object *dt, + struct thandle *th) { - CDEBUG(D_INFO, "ref del object "DFID"\n", - PFID(&dt->do_lu.lo_header->loh_fid)); + struct osp_update_request *update; + int rc; - return 0; + update = thandle_to_osp_update_request(th); + LASSERT(update != NULL); + + rc = OSP_UPDATE_RPC_PACK(env, out_ref_del_pack, update, + lu_object_fid(&dt->do_lu)); + return rc; } /** * Implementation of dt_object_operations::do_declare_ref_del * - * Declare increasing the reference count of the remote object, - * i.e. insert increasing object reference count update into RPC. + * Create the osp_update_request to track the update for this OSP + * in the transaction. * * \param[in] env execution environment * \param[in] dt object on which to increase the reference count. * \param[in] th the transaction handle. * - * \retval 0 if the insertion succeeds. - * \retval negative errno if the insertion fails. + * \retval 0 if preparation succeeds. + * \retval negative errno if preparation fails. */ static int osp_md_declare_ref_add(const struct lu_env *env, struct dt_object *dt, struct thandle *th) { - struct dt_update_request *update; - struct lu_fid *fid; - int rc; - - update = out_find_create_update_loc(th, dt); - if (IS_ERR(update)) { - CERROR("%s: Get OSP update buf failed: rc = %d\n", - dt->do_lu.lo_dev->ld_obd->obd_name, - (int)PTR_ERR(update)); - return PTR_ERR(update); - } - - fid = (struct lu_fid *)lu_object_fid(&dt->do_lu); - - rc = out_insert_update(env, update, OUT_REF_ADD, fid, 0, NULL, NULL); - - return rc; + return osp_trans_update_request_create(th); } /** * Implementation of dt_object_operations::do_ref_add * - * Do nothing in this method for now. In DNE phase I, remote updates are - * actually executed during transaction start, i.e. the object reference - * count has already been increased when calling this method. + * Add an OUT_REF_ADD sub-request into the OUT RPC that will be flushed + * when the transaction stop. * * \param[in] env execution environment * \param[in] dt object on which to increase the reference count * \param[in] th the transaction handle * - * \retval only return 0 for now + * \retval 0 if packing ref_add succeeds. + * \retval negative errno if packing fails. */ -static int osp_md_object_ref_add(const struct lu_env *env, struct dt_object *dt, - struct thandle *th) +static int osp_md_ref_add(const struct lu_env *env, struct dt_object *dt, + struct thandle *th) { - CDEBUG(D_INFO, "ref add object "DFID"\n", - PFID(&dt->do_lu.lo_header->loh_fid)); + struct osp_update_request *update; + int rc; - return 0; + update = thandle_to_osp_update_request(th); + LASSERT(update != NULL); + + rc = OSP_UPDATE_RPC_PACK(env, out_ref_add_pack, update, + lu_object_fid(&dt->do_lu)); + return rc; } /** @@ -345,88 +306,67 @@ static void osp_md_ah_init(const struct lu_env *env, /** * Implementation of dt_object_operations::do_declare_attr_get * - * Declare setting attributes of the remote object, i.e. insert remote - * object attr_set update into RPC. + * Create the osp_update_request to track the update for this OSP + * in the transaction. * * \param[in] env execution environment * \param[in] dt object on which to set attributes * \param[in] attr attributes to be set * \param[in] th the transaction handle * - * \retval 0 if the insertion succeeds. - * \retval negative errno if the insertion fails. + * \retval 0 if preparation succeeds. + * \retval negative errno if preparation fails. */ int osp_md_declare_attr_set(const struct lu_env *env, struct dt_object *dt, const struct lu_attr *attr, struct thandle *th) { - struct osp_thread_info *osi = osp_env_info(env); - struct dt_update_request *update; - struct lu_fid *fid; - int size = sizeof(struct obdo); - char *buf; - int rc; - - update = out_find_create_update_loc(th, dt); - if (IS_ERR(update)) { - CERROR("%s: Get OSP update buf failed: %d\n", - dt->do_lu.lo_dev->ld_obd->obd_name, - (int)PTR_ERR(update)); - return PTR_ERR(update); - } - - osi->osi_obdo.o_valid = 0; - obdo_from_la(&osi->osi_obdo, (struct lu_attr *)attr, - attr->la_valid); - lustre_set_wire_obdo(NULL, &osi->osi_obdo, &osi->osi_obdo); - - buf = (char *)&osi->osi_obdo; - fid = (struct lu_fid *)lu_object_fid(&dt->do_lu); - - rc = out_insert_update(env, update, OUT_ATTR_SET, fid, 1, &size, - (const char **)&buf); - - return rc; + return osp_trans_update_request_create(th); } /** * Implementation of dt_object_operations::do_attr_set * - * Do nothing in this method for now. In DNE phase I, remote updates - * are actually executed during transaction start, i.e. object attributes - * have already been set when calling this method. + * Set attributes to the specified remote object. + * + * Add the OUT_ATTR_SET sub-request into the OUT RPC that will be flushed + * when the transaction stop. * * \param[in] env execution environment * \param[in] dt object to set attributes * \param[in] attr attributes to be set * \param[in] th the transaction handle - * \param[in] capa capability of setting attributes (not yet implemented). * - * \retval only return 0 for now + * \retval 0 if packing attr_set succeeds. + * \retval negative errno if packing fails. */ int osp_md_attr_set(const struct lu_env *env, struct dt_object *dt, - const struct lu_attr *attr, struct thandle *th, - struct lustre_capa *capa) + const struct lu_attr *attr, struct thandle *th) { - CDEBUG(D_INFO, "attr set object "DFID"\n", - PFID(&dt->do_lu.lo_header->loh_fid)); + struct osp_update_request *update; + int rc; - RETURN(0); + update = thandle_to_osp_update_request(th); + LASSERT(update != NULL); + + rc = OSP_UPDATE_RPC_PACK(env, out_attr_set_pack, update, + lu_object_fid(&dt->do_lu), attr); + return rc; } /** * Implementation of dt_object_operations::do_read_lock * - * osp_md_object_{read,write}_lock() will only lock the remote object in the + * osp_md_{read,write}_lock() will only lock the remote object in the * local cache, which uses the semaphore (opo_sem) inside the osp_object to * lock the object. Note: it will not lock the object in the whole cluster, * which relies on the LDLM lock. * * \param[in] env execution environment * \param[in] dt object to be locked - * \param[in] role lock role from MDD layer, see mdd_object_role(). + * \param[in] role lock role from MDD layer, see dt_object_role(). */ -static void osp_md_object_read_lock(const struct lu_env *env, - struct dt_object *dt, unsigned role) +static void osp_md_read_lock(const struct lu_env *env, struct dt_object *dt, + unsigned role) { struct osp_object *obj = dt2osp_obj(dt); @@ -443,10 +383,10 @@ static void osp_md_object_read_lock(const struct lu_env *env, * * \param[in] env execution environment * \param[in] dt object to be locked - * \param[in] role lock role from MDD layer, see mdd_object_role(). + * \param[in] role lock role from MDD layer, see dt_object_role(). */ -static void osp_md_object_write_lock(const struct lu_env *env, - struct dt_object *dt, unsigned role) +static void osp_md_write_lock(const struct lu_env *env, struct dt_object *dt, + unsigned role) { struct osp_object *obj = dt2osp_obj(dt); @@ -464,8 +404,7 @@ static void osp_md_object_write_lock(const struct lu_env *env, * \param[in] env execution environment * \param[in] dt object to be unlocked */ -static void osp_md_object_read_unlock(const struct lu_env *env, - struct dt_object *dt) +static void osp_md_read_unlock(const struct lu_env *env, struct dt_object *dt) { struct osp_object *obj = dt2osp_obj(dt); @@ -480,8 +419,7 @@ static void osp_md_object_read_unlock(const struct lu_env *env, * \param[in] env execution environment * \param[in] dt object to be unlocked */ -static void osp_md_object_write_unlock(const struct lu_env *env, - struct dt_object *dt) +static void osp_md_write_unlock(const struct lu_env *env, struct dt_object *dt) { struct osp_object *obj = dt2osp_obj(dt); @@ -498,8 +436,7 @@ static void osp_md_object_write_unlock(const struct lu_env *env, * \param[in] env execution environment * \param[in] dt object to be tested */ -static int osp_md_object_write_locked(const struct lu_env *env, - struct dt_object *dt) +static int osp_md_write_locked(const struct lu_env *env, struct dt_object *dt) { struct osp_object *obj = dt2osp_obj(dt); @@ -516,22 +453,19 @@ static int osp_md_object_write_locked(const struct lu_env *env, * \param[in] dt index object to lookup * \param[out] rec record in which to return lookup result * \param[in] key key of index which will be looked up - * \param[in] capa capability of lookup (not yet implemented) * * \retval 1 if the lookup succeeds. * \retval negative errno if the lookup fails. */ static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt, - struct dt_rec *rec, const struct dt_key *key, - struct lustre_capa *capa) + struct dt_rec *rec, const struct dt_key *key) { struct lu_buf *lbuf = &osp_env_info(env)->osi_lb2; struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev); struct dt_device *dt_dev = &osp->opd_dt_dev; - struct dt_update_request *update; + struct osp_update_request *update; struct object_update_reply *reply; struct ptlrpc_request *req = NULL; - int size = strlen((char *)key) + 1; struct lu_fid *fid; int rc; ENTRY; @@ -540,20 +474,19 @@ static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt, * just create an update buffer, instead of attaching the * update_remote list of the thandle. */ - update = out_create_update_req(dt_dev); + update = osp_update_request_create(dt_dev); if (IS_ERR(update)) RETURN(PTR_ERR(update)); - rc = out_insert_update(env, update, OUT_INDEX_LOOKUP, - lu_object_fid(&dt->do_lu), - 1, &size, (const char **)&key); - if (rc) { + rc = OSP_UPDATE_RPC_PACK(env, out_index_lookup_pack, update, + lu_object_fid(&dt->do_lu), rec, key); + if (rc != 0) { CERROR("%s: Insert update error: rc = %d\n", dt_dev->dd_lu_dev.ld_obd->obd_name, rc); GOTO(out, rc); } - rc = out_remote_sync(env, osp->opd_obd->u.cli.cl_import, update, &req); + rc = osp_remote_sync(env, osp, update, &req); if (rc < 0) GOTO(out, rc); @@ -597,7 +530,7 @@ out: if (req != NULL) ptlrpc_req_finished(req); - out_destroy_update_req(update); + osp_update_request_destroy(env, update); return rc; } @@ -605,8 +538,8 @@ out: /** * Implementation of dt_index_operations::dio_declare_insert * - * Declare the index insert of the remote object, i.e. pack index insert update - * into the RPC, which will be sent during transaction start. + * Create the osp_update_request to track the update for this OSP + * in the transaction. * * \param[in] env execution environment * \param[in] dt object for which to insert index @@ -614,141 +547,99 @@ out: * \param[in] key key of the index which will be inserted * \param[in] th the transaction handle * - * \retval 0 if the insertion succeeds. - * \retval negative errno if the insertion fails. + * \retval 0 if preparation succeeds. + * \retval negative errno if preparation fails. */ -static int osp_md_declare_insert(const struct lu_env *env, - struct dt_object *dt, - const struct dt_rec *rec, - const struct dt_key *key, - struct thandle *th) +static int osp_md_declare_index_insert(const struct lu_env *env, + struct dt_object *dt, + const struct dt_rec *rec, + const struct dt_key *key, + struct thandle *th) { - struct osp_thread_info *info = osp_env_info(env); - struct dt_update_request *update; - struct dt_insert_rec *rec1 = (struct dt_insert_rec *)rec; - struct lu_fid *fid = - (struct lu_fid *)lu_object_fid(&dt->do_lu); - struct lu_fid *rec_fid = &info->osi_fid; - __u32 type = cpu_to_le32(rec1->rec_type); - int size[3] = { strlen((char *)key) + 1, - sizeof(*rec_fid), - sizeof(type) }; - const char *bufs[3] = { (char *)key, - (char *)rec_fid, - (char *)&type }; - int rc; - - update = out_find_create_update_loc(th, dt); - if (IS_ERR(update)) { - CERROR("%s: Get OSP update buf failed: rc = %d\n", - dt->do_lu.lo_dev->ld_obd->obd_name, - (int)PTR_ERR(update)); - return PTR_ERR(update); - } - - CDEBUG(D_INFO, "%s: insert index of "DFID" %s: "DFID", %u\n", - dt->do_lu.lo_dev->ld_obd->obd_name, - PFID(fid), (char *)key, PFID(rec1->rec_fid), rec1->rec_type); - - fid_cpu_to_le(rec_fid, rec1->rec_fid); - rc = out_insert_update(env, update, OUT_INDEX_INSERT, fid, - ARRAY_SIZE(size), size, bufs); - return rc; + return osp_trans_update_request_create(th); } /** * Implementation of dt_index_operations::dio_insert * - * Do nothing in this method for now. In DNE phase I, remote updates - * are actually executed during transaction start, i.e. the index has - * already been inserted when calling this method. + * Add an OUT_INDEX_INSERT sub-request into the OUT RPC that will + * be flushed when the transaction stop. * * \param[in] env execution environment * \param[in] dt object for which to insert index * \param[in] rec record of the index to be inserted * \param[in] key key of the index to be inserted * \param[in] th the transaction handle - * \param[in] capa capability of insert (not yet implemented) - * \param[in] ignore_quota quota enforcement for insert * - * \retval only return 0 for now + * \retval 0 if packing index insert succeeds. + * \retval negative errno if packing fails. */ -static int osp_md_index_insert(const struct lu_env *env, - struct dt_object *dt, +static int osp_md_index_insert(const struct lu_env *env, struct dt_object *dt, const struct dt_rec *rec, - const struct dt_key *key, - struct thandle *th, - struct lustre_capa *capa, - int ignore_quota) + const struct dt_key *key, struct thandle *th) { - return 0; + struct osp_update_request *update; + int rc; + + update = thandle_to_osp_update_request(th); + LASSERT(update != NULL); + + rc = OSP_UPDATE_RPC_PACK(env, out_index_insert_pack, update, + lu_object_fid(&dt->do_lu), rec, key); + return rc; } /** * Implementation of dt_index_operations::dio_declare_delete * - * Declare the index delete of the remote object, i.e. insert index delete - * update into the RPC, which will be sent during transaction start. + * Create the osp_update_request to track the update for this OSP + * in the transaction. * * \param[in] env execution environment * \param[in] dt object for which to delete index * \param[in] key key of the index * \param[in] th the transaction handle * - * \retval 0 if the insertion succeeds. - * \retval negative errno if the insertion fails. + * \retval 0 if preparation succeeds. + * \retval negative errno if preparation fails. */ -static int osp_md_declare_delete(const struct lu_env *env, - struct dt_object *dt, - const struct dt_key *key, - struct thandle *th) +static int osp_md_declare_index_delete(const struct lu_env *env, + struct dt_object *dt, + const struct dt_key *key, + struct thandle *th) { - struct dt_update_request *update; - struct lu_fid *fid; - int size = strlen((char *)key) + 1; - int rc; - - update = out_find_create_update_loc(th, dt); - if (IS_ERR(update)) { - CERROR("%s: Get OSP update buf failed: rc = %d\n", - dt->do_lu.lo_dev->ld_obd->obd_name, - (int)PTR_ERR(update)); - return PTR_ERR(update); - } - - fid = (struct lu_fid *)lu_object_fid(&dt->do_lu); - - rc = out_insert_update(env, update, OUT_INDEX_DELETE, fid, 1, &size, - (const char **)&key); - - return rc; + return osp_trans_update_request_create(th); } /** * Implementation of dt_index_operations::dio_delete * - * Do nothing in this method for now. Because in DNE phase I, remote updates - * are actually executed during transaction start, i.e. the index has already - * been deleted when calling this method. + * Add an OUT_INDEX_DELETE sub-request into the OUT RPC that will + * be flushed when the transaction stop. * * \param[in] env execution environment * \param[in] dt object for which to delete index * \param[in] key key of the index which will be deleted * \param[in] th the transaction handle - * \param[in] capa capability of delete (not yet implemented) * - * \retval only return 0 for now + * \retval 0 if packing index delete succeeds. + * \retval negative errno if packing fails. */ static int osp_md_index_delete(const struct lu_env *env, struct dt_object *dt, const struct dt_key *key, - struct thandle *th, - struct lustre_capa *capa) + struct thandle *th) { - CDEBUG(D_INFO, "index delete "DFID" %s\n", - PFID(&dt->do_lu.lo_header->loh_fid), (char *)key); + struct osp_update_request *update; + int rc; - return 0; + update = thandle_to_osp_update_request(th); + LASSERT(update != NULL); + + rc = OSP_UPDATE_RPC_PACK(env, out_index_delete_pack, update, + lu_object_fid(&dt->do_lu), key); + + return rc; } /** @@ -761,11 +652,11 @@ static int osp_md_index_delete(const struct lu_env *env, * \param[in] env execution environment * \param[in] di iterator of this iteration * - * \retval 0 if the pointer is advanced successfuly. + * \retval 0 if the pointer is advanced successfully. * \retval 1 if it reaches to the end of the index object. * \retval negative errno if the pointer cannot be advanced. */ -int osp_md_index_it_next(const struct lu_env *env, struct dt_it *di) +static int osp_md_index_it_next(const struct lu_env *env, struct dt_it *di) { struct osp_it *it = (struct osp_it *)di; struct lu_idxpage *idxpage; @@ -864,7 +755,7 @@ static int osp_md_index_it_rec(const struct lu_env *env, const struct dt_it *di, { struct osp_it *it = (struct osp_it *)di; struct lu_dirent *ent = (struct lu_dirent *)it->ooi_ent; - int reclen; + size_t reclen; reclen = lu_dirent_calc_size(le16_to_cpu(ent->lde_namelen), attr); memcpy(rec, ent, reclen); @@ -904,9 +795,9 @@ static int osp_it_load(const struct lu_env *env, const struct dt_it *di, const struct dt_index_operations osp_md_index_ops = { .dio_lookup = osp_md_index_lookup, - .dio_declare_insert = osp_md_declare_insert, + .dio_declare_insert = osp_md_declare_index_insert, .dio_insert = osp_md_index_insert, - .dio_declare_delete = osp_md_declare_delete, + .dio_declare_delete = osp_md_declare_index_delete, .dio_delete = osp_md_index_delete, .dio_it = { .init = osp_it_init, @@ -924,6 +815,95 @@ const struct dt_index_operations osp_md_index_ops = { }; /** + * Implement OSP layer dt_object_operations::do_xattr_list() interface. + * + * List extended attribute from the specified MDT/OST object, result is not + * cached because this is called by directory migration only. + * + * \param[in] env pointer to the thread context + * \param[in] dt pointer to the OSP layer dt_object + * \param[out] buf pointer to the lu_buf to hold the extended attribute + * + * \retval positive bytes used/required in the buffer + * \retval negative error number on failure + */ +static int osp_md_xattr_list(const struct lu_env *env, struct dt_object *dt, + const struct lu_buf *buf) +{ + struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev); + struct osp_object *obj = dt2osp_obj(dt); + struct dt_device *dev = &osp->opd_dt_dev; + struct lu_buf *rbuf = &osp_env_info(env)->osi_lb2; + struct osp_update_request *update = NULL; + struct ptlrpc_request *req = NULL; + struct object_update_reply *reply; + const char *dname = dt->do_lu.lo_dev->ld_obd->obd_name; + int rc = 0; + + ENTRY; + + LASSERT(buf); + + if (unlikely(obj->opo_non_exist)) + RETURN(-ENOENT); + + update = osp_update_request_create(dev); + if (IS_ERR(update)) + RETURN(PTR_ERR(update)); + + rc = OSP_UPDATE_RPC_PACK(env, out_xattr_list_pack, update, + lu_object_fid(&dt->do_lu), buf->lb_len); + if (rc) { + CERROR("%s: Insert update error "DFID": rc = %d\n", + dname, PFID(lu_object_fid(&dt->do_lu)), rc); + GOTO(out, rc); + } + + rc = osp_remote_sync(env, osp, update, &req); + if (rc < 0) { + if (rc == -ENOENT) { + dt->do_lu.lo_header->loh_attr &= ~LOHA_EXISTS; + obj->opo_non_exist = 1; + } + GOTO(out, rc); + } + + reply = req_capsule_server_sized_get(&req->rq_pill, + &RMF_OUT_UPDATE_REPLY, + OUT_UPDATE_REPLY_SIZE); + if (reply->ourp_magic != UPDATE_REPLY_MAGIC) { + DEBUG_REQ(D_ERROR, req, + "%s: Wrong version %x expected %x "DFID": rc = %d", + dname, reply->ourp_magic, UPDATE_REPLY_MAGIC, + PFID(lu_object_fid(&dt->do_lu)), -EPROTO); + + GOTO(out, rc = -EPROTO); + } + + rc = object_update_result_data_get(reply, rbuf, 0); + if (rc < 0) + GOTO(out, rc); + + if (!buf->lb_buf) + GOTO(out, rc); + + if (unlikely(buf->lb_len < rbuf->lb_len)) + GOTO(out, rc = -ERANGE); + + memcpy(buf->lb_buf, rbuf->lb_buf, rbuf->lb_len); + EXIT; + +out: + if (req) + ptlrpc_req_finished(req); + + if (update && !IS_ERR(update)) + osp_update_request_destroy(env, update); + + return rc; +} + +/** * Implementation of dt_object_operations::do_index_try * * Try to initialize the index API pointer for the given object. This @@ -949,7 +929,9 @@ static int osp_md_index_try(const struct lu_env *env, * Implementation of dt_object_operations::do_object_lock * * Enqueue a lock (by ldlm_cli_enqueue()) of remote object on the remote MDT, - * which will lock the object in the global namespace. + * which will lock the object in the global namespace. And because the + * cross-MDT locks are relatively rare compared with normal local MDT operation, + * let's release it right away, instead of putting it into the LRU list. * * \param[in] env execution environment * \param[in] dt object to be locked @@ -964,37 +946,33 @@ static int osp_md_object_lock(const struct lu_env *env, struct dt_object *dt, struct lustre_handle *lh, struct ldlm_enqueue_info *einfo, - ldlm_policy_data_t *policy) + union ldlm_policy_data *policy) { struct ldlm_res_id *res_id; - struct dt_device *dt_dev = lu2dt_dev(dt->do_lu.lo_dev); - struct osp_device *osp = dt2osp_dev(dt_dev); + struct osp_device *osp = dt2osp_dev(lu2dt_dev(dt->do_lu.lo_dev)); struct ptlrpc_request *req; int rc = 0; - __u64 flags = 0; - ldlm_mode_t mode; + __u64 flags = LDLM_FL_NO_LRU; + ENTRY; res_id = einfo->ei_res_id; LASSERT(res_id != NULL); - mode = ldlm_lock_match(osp->opd_obd->obd_namespace, - LDLM_FL_BLOCK_GRANTED, res_id, - einfo->ei_type, policy, - einfo->ei_mode, lh, 0); - if (mode > 0) - return ELDLM_OK; + if (einfo->ei_mode & (LCK_EX | LCK_PW)) + flags |= LDLM_FL_COS_INCOMPAT; req = ldlm_enqueue_pack(osp->opd_exp, 0); if (IS_ERR(req)) RETURN(PTR_ERR(req)); + osp_set_req_replay(osp, req); rc = ldlm_cli_enqueue(osp->opd_exp, &req, einfo, res_id, - (const ldlm_policy_data_t *)policy, + (const union ldlm_policy_data *)policy, &flags, NULL, 0, LVB_T_NONE, lh, 0); ptlrpc_req_finished(req); - return rc == ELDLM_OK ? 0 : -EIO; + RETURN(rc == ELDLM_OK ? 0 : -EIO); } /** @@ -1012,7 +990,7 @@ static int osp_md_object_lock(const struct lu_env *env, static int osp_md_object_unlock(const struct lu_env *env, struct dt_object *dt, struct ldlm_enqueue_info *einfo, - ldlm_policy_data_t *policy) + union ldlm_policy_data *policy) { struct lustre_handle *lockh = einfo->ei_cbdata; @@ -1022,25 +1000,87 @@ static int osp_md_object_unlock(const struct lu_env *env, return 0; } +/** + * Implement OSP layer dt_object_operations::do_declare_destroy() interface. + * + * Create the dt_update_request to track the update for this OSP + * in the transaction. + * + * \param[in] env pointer to the thread context + * \param[in] dt pointer to the OSP layer dt_object to be destroyed + * \param[in] th pointer to the transaction handler + * + * \retval 0 for success + * \retval negative error number on failure + */ +int osp_md_declare_destroy(const struct lu_env *env, struct dt_object *dt, + struct thandle *th) +{ + return osp_trans_update_request_create(th); +} + +/** + * Implement OSP layer dt_object_operations::do_destroy() interface. + * + * Pack the destroy update into the RPC buffer, which will be sent + * to the remote MDT during transaction stop. + * + * It also marks the object as non-cached. + * + * \param[in] env pointer to the thread context + * \param[in] dt pointer to the OSP layer dt_object to be destroyed + * \param[in] th pointer to the transaction handler + * + * \retval 0 for success + * \retval negative error number on failure + */ +int osp_md_destroy(const struct lu_env *env, struct dt_object *dt, + struct thandle *th) +{ + struct osp_object *o = dt2osp_obj(dt); + struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev); + struct osp_update_request *update; + int rc = 0; + + ENTRY; + o->opo_non_exist = 1; + + LASSERT(osp->opd_connect_mdt); + update = thandle_to_osp_update_request(th); + LASSERT(update != NULL); + + rc = OSP_UPDATE_RPC_PACK(env, out_destroy_pack, update, + lu_object_fid(&dt->do_lu)); + if (rc != 0) + RETURN(rc); + + set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags); + rc = osp_insert_update_callback(env, update, dt2osp_obj(dt), NULL, + NULL); + + RETURN(rc); +} + struct dt_object_operations osp_md_obj_ops = { - .do_read_lock = osp_md_object_read_lock, - .do_write_lock = osp_md_object_write_lock, - .do_read_unlock = osp_md_object_read_unlock, - .do_write_unlock = osp_md_object_write_unlock, - .do_write_locked = osp_md_object_write_locked, - .do_declare_create = osp_md_declare_object_create, - .do_create = osp_md_object_create, + .do_read_lock = osp_md_read_lock, + .do_write_lock = osp_md_write_lock, + .do_read_unlock = osp_md_read_unlock, + .do_write_unlock = osp_md_write_unlock, + .do_write_locked = osp_md_write_locked, + .do_declare_create = osp_md_declare_create, + .do_create = osp_md_create, .do_declare_ref_add = osp_md_declare_ref_add, - .do_ref_add = osp_md_object_ref_add, - .do_declare_ref_del = osp_md_declare_object_ref_del, - .do_ref_del = osp_md_object_ref_del, - .do_declare_destroy = osp_declare_object_destroy, - .do_destroy = osp_object_destroy, + .do_ref_add = osp_md_ref_add, + .do_declare_ref_del = osp_md_declare_ref_del, + .do_ref_del = osp_md_ref_del, + .do_declare_destroy = osp_md_declare_destroy, + .do_destroy = osp_md_destroy, .do_ah_init = osp_md_ah_init, .do_attr_get = osp_attr_get, .do_declare_attr_set = osp_md_declare_attr_set, .do_attr_set = osp_md_attr_set, .do_xattr_get = osp_xattr_get, + .do_xattr_list = osp_md_xattr_list, .do_declare_xattr_set = osp_declare_xattr_set, .do_xattr_set = osp_xattr_set, .do_declare_xattr_del = osp_declare_xattr_del, @@ -1048,80 +1088,235 @@ struct dt_object_operations osp_md_obj_ops = { .do_index_try = osp_md_index_try, .do_object_lock = osp_md_object_lock, .do_object_unlock = osp_md_object_unlock, + .do_invalidate = osp_invalidate, }; /** * Implementation of dt_body_operations::dbo_declare_write * - * Declare an object write. In DNE phase I, it will pack the write - * object update into the RPC. - * + * Create the osp_update_request to track the update for this OSP + * in the transaction. + * * \param[in] env execution environment * \param[in] dt object to be written * \param[in] buf buffer to write which includes an embedded size field * \param[in] pos offet in the object to start writing at * \param[in] th transaction handle * - * \retval 0 if the insertion succeeds. - * \retval negative errno if the insertion fails. + * \retval 0 if preparation succeeds. + * \retval negative errno if preparation fails. */ static ssize_t osp_md_declare_write(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, loff_t pos, struct thandle *th) { - struct dt_update_request *update; - struct lu_fid *fid; - int sizes[2] = {buf->lb_len, sizeof(pos)}; - const char *bufs[2] = {(char *)buf->lb_buf, - (char *)&pos}; - ssize_t rc; + struct osp_device *osp = dt2osp_dev(th->th_dev); + int rc; - update = out_find_create_update_loc(th, dt); - if (IS_ERR(update)) { - CERROR("%s: Get OSP update buf failed: rc = %d\n", - dt->do_lu.lo_dev->ld_obd->obd_name, - (int)PTR_ERR(update)); - return PTR_ERR(update); - } + rc = osp_trans_update_request_create(th); + if (rc != 0) + return rc; - pos = cpu_to_le64(pos); - bufs[1] = (char *)&pos; - fid = (struct lu_fid *)lu_object_fid(&dt->do_lu); - rc = out_insert_update(env, update, OUT_WRITE, fid, - ARRAY_SIZE(sizes), sizes, bufs); + if (osp->opd_update == NULL) + return 0; - return rc; + if (dt2osp_obj(dt)->opo_stale) + return -ESTALE; + + return 0; +} +static int osp_write_interpreter(const struct lu_env *env, + struct object_update_reply *reply, + struct ptlrpc_request *req, + struct osp_object *obj, + void *data, int index, int rc) +{ + if (rc) { + CDEBUG(D_HA, "error "DFID": rc = %d\n", + PFID(lu_object_fid(&obj->opo_obj.do_lu)), rc); + spin_lock(&obj->opo_lock); + obj->opo_attr.la_valid = 0; + obj->opo_stale = 1; + spin_unlock(&obj->opo_lock); + } + return 0; } /** * Implementation of dt_body_operations::dbo_write * - * Return the buffer size. In DNE phase I, remote updates - * are actually executed during transaction start, the buffer has - * already been written when this method is being called. + * Pack the write object update into the RPC buffer, which will be sent + * to the remote MDT during transaction stop. * * \param[in] env execution environment * \param[in] dt object to be written * \param[in] buf buffer to write which includes an embedded size field * \param[in] pos offet in the object to start writing at * \param[in] th transaction handle - * \param[in] capa capability of the write (not yet implemented) - * \param[in] ignore_quota quota enforcement for this write * - * \retval the buffer size in bytes. + * \retval the buffer size in bytes if packing succeeds. + * \retval negative errno if packing fails. */ static ssize_t osp_md_write(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, loff_t *pos, - struct thandle *handle, - struct lustre_capa *capa, int ignore_quota) + struct thandle *th) { - return buf->lb_len; + struct osp_object *obj = dt2osp_obj(dt); + struct osp_update_request *update; + struct osp_thandle *oth = thandle_to_osp_thandle(th); + ssize_t rc; + ENTRY; + + update = thandle_to_osp_update_request(th); + LASSERT(update != NULL); + + CDEBUG(D_INFO, "write "DFID" offset = %llu length = %zu\n", + PFID(lu_object_fid(&dt->do_lu)), *pos, buf->lb_len); + + rc = OSP_UPDATE_RPC_PACK(env, out_write_pack, update, + lu_object_fid(&dt->do_lu), buf, *pos); + if (rc < 0) + RETURN(rc); + + rc = osp_check_and_set_rpc_version(oth, obj); + if (rc < 0) + RETURN(rc); + + /* to be able to invalidate object's state in case of an error */ + rc = osp_insert_update_callback(env, update, obj, NULL, + osp_write_interpreter); + if (rc < 0) + RETURN(rc); + + /* XXX: how about the write error happened later? */ + *pos += buf->lb_len; + + if (obj->opo_attr.la_valid & LA_SIZE && obj->opo_attr.la_size < *pos) + obj->opo_attr.la_size = *pos; + + spin_lock(&obj->opo_lock); + if (list_empty(&obj->opo_invalidate_cb_list)) { + lu_object_get(&obj->opo_obj.do_lu); + + list_add_tail(&obj->opo_invalidate_cb_list, + &update->our_invalidate_cb_list); + } + spin_unlock(&obj->opo_lock); + + RETURN(buf->lb_len); +} + +static inline void orr_le_to_cpu(struct out_read_reply *orr_dst, + const struct out_read_reply *orr_src) +{ + orr_dst->orr_size = le32_to_cpu(orr_src->orr_size); + orr_dst->orr_padding = le32_to_cpu(orr_src->orr_padding); + orr_dst->orr_offset = le64_to_cpu(orr_dst->orr_offset); +} + + + +static ssize_t osp_md_read(const struct lu_env *env, struct dt_object *dt, + struct lu_buf *rbuf, loff_t *pos) +{ + struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev); + struct dt_device *dt_dev = &osp->opd_dt_dev; + struct lu_buf *lbuf = &osp_env_info(env)->osi_lb2; + char *ptr = rbuf->lb_buf; + struct osp_update_request *update; + struct ptlrpc_request *req = NULL; + struct out_read_reply *orr; + struct ptlrpc_bulk_desc *desc; + struct object_update_reply *reply; + int pages; + int rc; + ENTRY; + + /* Because it needs send the update buffer right away, + * just create an update buffer, instead of attaching the + * update_remote list of the thandle. */ + update = osp_update_request_create(dt_dev); + if (IS_ERR(update)) + RETURN(PTR_ERR(update)); + + rc = OSP_UPDATE_RPC_PACK(env, out_read_pack, update, + lu_object_fid(&dt->do_lu), + rbuf->lb_len, *pos); + if (rc != 0) { + CERROR("%s: cannot insert update: rc = %d\n", + dt_dev->dd_lu_dev.ld_obd->obd_name, rc); + GOTO(out_update, rc); + } + + CDEBUG(D_INFO, "%s "DFID" read offset %llu size %zu\n", + dt_dev->dd_lu_dev.ld_obd->obd_name, + PFID(lu_object_fid(&dt->do_lu)), *pos, rbuf->lb_len); + rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import, update, + &req); + if (rc != 0) + GOTO(out_update, rc); + + /* First *and* last might be partial pages, hence +1 */ + pages = DIV_ROUND_UP(rbuf->lb_len, PAGE_SIZE) + 1; + + /* allocate bulk descriptor */ + desc = ptlrpc_prep_bulk_imp(req, pages, 1, + PTLRPC_BULK_PUT_SINK, + MDS_BULK_PORTAL, + &ptlrpc_bulk_kiov_nopin_ops); + if (desc == NULL) + GOTO(out, rc = -ENOMEM); + + desc->bd_frag_ops->add_iov_frag(desc, ptr, rbuf->lb_len); + + osp_set_req_replay(osp, req); + req->rq_bulk_read = 1; + /* send request to master and wait for RPC to complete */ + rc = ptlrpc_queue_wait(req); + if (rc != 0) + GOTO(out, rc); + + rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, + req->rq_bulk->bd_nob_transferred); + if (rc < 0) + GOTO(out, rc); + + reply = req_capsule_server_sized_get(&req->rq_pill, + &RMF_OUT_UPDATE_REPLY, + OUT_UPDATE_REPLY_SIZE); + + if (reply->ourp_magic != UPDATE_REPLY_MAGIC) { + CERROR("%s: invalid update reply magic %x expected %x:" + " rc = %d\n", dt_dev->dd_lu_dev.ld_obd->obd_name, + reply->ourp_magic, UPDATE_REPLY_MAGIC, -EPROTO); + GOTO(out, rc = -EPROTO); + } + + rc = object_update_result_data_get(reply, lbuf, 0); + if (rc < 0) + GOTO(out, rc); + + if (lbuf->lb_len < sizeof(*orr)) + GOTO(out, rc = -EPROTO); + + orr = lbuf->lb_buf; + orr_le_to_cpu(orr, orr); + rc = orr->orr_size; + *pos = orr->orr_offset; +out: + ptlrpc_req_finished(req); + +out_update: + osp_update_request_destroy(env, update); + + RETURN(rc); } /* These body operation will be used to write symlinks during migration etc */ struct dt_body_operations osp_md_body_ops = { .dbo_declare_write = osp_md_declare_write, .dbo_write = osp_md_write, + .dbo_read = osp_md_read, };