X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fosp%2Fosp_md_object.c;h=c0314572e9493001a36cf350d5c72a095a055a8c;hp=9bed8da8840ce11ccd712ac1c817971a6edb682b;hb=26b8238659974959780cd49de92595b4b0bdf89f;hpb=e88992a3b5b9d9ba0a69883671f1b5888514e05d diff --git a/lustre/osp/osp_md_object.c b/lustre/osp/osp_md_object.c index 9bed8da..c031457 100644 --- a/lustre/osp/osp_md_object.c +++ b/lustre/osp/osp_md_object.c @@ -20,12 +20,33 @@ * GPL HEADER END */ /* - * Copyright (c) 2013, Intel Corporation. + * Copyright (c) 2013, 2014, Intel Corporation. */ /* * lustre/osp/osp_md_object.c * - * Lustre MDT Proxy Device + * OST/MDT proxy device (OSP) Metadata methods + * + * This file implements methods for remote MD object, which include + * dt_object_operations, dt_index_operations and dt_body_operations. + * + * If there are multiple MDTs in one filesystem, one operation might + * include modifications in several MDTs. In such cases, clients + * send the RPC to the master MDT, then the operation is decomposed into + * object updates which will be dispatched to OSD or OSP. The local updates + * go to local OSD and the remote updates go to OSP. In OSP, these remote + * object updates will be packed into an update RPC, sent to the remote MDT + * and handled by Object Update Target (OUT). + * + * In DNE phase I, because of missing complete recovery solution, updates + * will be executed in order and synchronously. + * 1. The transaction is created. + * 2. In transaction declare, it collects and packs remote + * updates (in osp_md_declare_xxx()). + * 3. In transaction start, it sends these remote updates + * to remote MDTs, which will execute these updates synchronously. + * 4. In transaction execute phase, the local updates will be executed + * synchronously. * * Author: Di Wang */ @@ -38,22 +59,33 @@ static const char dot[] = "."; static const char dotdot[] = ".."; -int osp_md_declare_object_create(const struct lu_env *env, - struct dt_object *dt, - struct lu_attr *attr, - struct dt_allocation_hint *hint, - struct dt_object_format *dof, - struct thandle *th) +/** + * Add OUT_CREATE sub-request into the OUT RPC. + * + * Note: if the object has already been created, we must add object + * destroy sub-request ahead of the create, so it will destroy then + * re-create the object. + * + * \param[in] env execution environment + * \param[in] dt object to be created + * \param[in] attr attribute of the created object + * \param[in] hint creation hint + * \param[in] dof creation format information + * \param[in] th the transaction handle + * + * \retval only return 0 for now + */ +static int __osp_md_declare_object_create(const struct lu_env *env, + struct dt_object *dt, + struct lu_attr *attr, + struct dt_allocation_hint *hint, + struct dt_object_format *dof, + struct thandle *th) { - struct osp_thread_info *osi = osp_env_info(env); struct dt_update_request *update; - struct lu_fid *fid1; - int sizes[2] = {sizeof(struct obdo), 0}; - char *bufs[2] = {NULL, NULL}; - int buf_count; int rc; - update = out_find_create_update_loc(th, dt); + update = dt_update_request_find_or_create(th, dt); if (IS_ERR(update)) { CERROR("%s: Get OSP update buf failed: rc = %d\n", dt->do_lu.lo_dev->ld_obd->obd_name, @@ -61,25 +93,6 @@ int osp_md_declare_object_create(const struct lu_env *env, return PTR_ERR(update); } - osi->osi_obdo.o_valid = 0; - obdo_from_la(&osi->osi_obdo, attr, attr->la_valid); - lustre_set_wire_obdo(NULL, &osi->osi_obdo, &osi->osi_obdo); - obdo_cpu_to_le(&osi->osi_obdo, &osi->osi_obdo); - - bufs[0] = (char *)&osi->osi_obdo; - buf_count = 1; - fid1 = (struct lu_fid *)lu_object_fid(&dt->do_lu); - if (hint != NULL && hint->dah_parent) { - struct lu_fid *fid2; - struct lu_fid *tmp_fid = &osi->osi_fid; - - fid2 = (struct lu_fid *)lu_object_fid(&hint->dah_parent->do_lu); - fid_cpu_to_le(tmp_fid, fid2); - sizes[1] = sizeof(*tmp_fid); - bufs[1] = (char *)tmp_fid; - buf_count++; - } - if (lu_object_exists(&dt->do_lu)) { /* If the object already exists, we needs to destroy * this orphan object first. @@ -95,23 +108,27 @@ int osp_md_declare_object_create(const struct lu_env *env, * but find the object already exists */ CDEBUG(D_HA, "%s: object "DFID" exists, destroy this orphan\n", - dt->do_lu.lo_dev->ld_obd->obd_name, PFID(fid1)); + dt->do_lu.lo_dev->ld_obd->obd_name, + PFID(lu_object_fid(&dt->do_lu))); - rc = out_insert_update(env, update, OUT_REF_DEL, fid1, 0, - NULL, NULL); + rc = out_ref_del_pack(env, &update->dur_buf, + lu_object_fid(&dt->do_lu), + update->dur_batchid); if (rc != 0) GOTO(out, rc); if (S_ISDIR(lu_object_attr(&dt->do_lu))) { /* decrease for ".." */ - rc = out_insert_update(env, update, OUT_REF_DEL, fid1, - 0, NULL, NULL); + rc = out_ref_del_pack(env, &update->dur_buf, + lu_object_fid(&dt->do_lu), + update->dur_batchid); if (rc != 0) GOTO(out, rc); } - rc = out_insert_update(env, update, OUT_DESTROY, fid1, 0, NULL, - NULL); + rc = out_object_destroy_pack(env, &update->dur_buf, + lu_object_fid(&dt->do_lu), + update->dur_batchid); if (rc != 0) GOTO(out, rc); @@ -121,8 +138,11 @@ int osp_md_declare_object_create(const struct lu_env *env, update_inc_batchid(update); } - rc = out_insert_update(env, update, OUT_CREATE, fid1, buf_count, sizes, - (const char **)bufs); + rc = out_create_pack(env, &update->dur_buf, + lu_object_fid(&dt->do_lu), attr, hint, dof, + update->dur_batchid); + if (rc != 0) + GOTO(out, rc); out: if (rc) CERROR("%s: Insert update error: rc = %d\n", @@ -131,33 +151,101 @@ out: return rc; } +/** + * Implementation of dt_object_operations::do_declare_create + * + * For non-remote transaction, it will add an OUT_CREATE sub-request + * into the OUT RPC that will be flushed when the transaction start. + * + * \param[in] env execution environment + * \param[in] dt remote object to be created + * \param[in] attr attribute of the created object + * \param[in] hint creation hint + * \param[in] dof creation format information + * \param[in] th the transaction handle + * + * \retval 0 if the insertion succeeds. + * \retval negative errno if the insertion fails. + */ +int osp_md_declare_object_create(const struct lu_env *env, + struct dt_object *dt, + struct lu_attr *attr, + struct dt_allocation_hint *hint, + struct dt_object_format *dof, + struct thandle *th) +{ + int rc = 0; + + if (!is_only_remote_trans(th)) { + rc = __osp_md_declare_object_create(env, dt, attr, hint, + dof, th); + + CDEBUG(D_INFO, "declare create md_object "DFID": rc = %d\n", + PFID(&dt->do_lu.lo_header->loh_fid), rc); + } + + return rc; +} + +/** + * Implementation of dt_object_operations::do_create + * + * For remote transaction, it will add an OUT_CREATE sub-request into + * the OUT RPC that will be flushed when the transaction stop. + * + * It sets necessary flags for created object. In DNE phase I, + * remote updates are actually executed during transaction start, + * i.e. the object has already been created when calling this method. + * + * \param[in] env execution environment + * \param[in] dt object to be created + * \param[in] attr attribute of the created object + * \param[in] hint creation hint + * \param[in] dof creation format information + * \param[in] th the transaction handle + * + * \retval only return 0 for now + */ int osp_md_object_create(const struct lu_env *env, struct dt_object *dt, struct lu_attr *attr, struct dt_allocation_hint *hint, struct dt_object_format *dof, struct thandle *th) { - struct osp_object *obj = dt2osp_obj(dt); + int rc = 0; - CDEBUG(D_INFO, "create object "DFID"\n", - PFID(&dt->do_lu.lo_header->loh_fid)); + if (is_only_remote_trans(th)) { + rc = __osp_md_declare_object_create(env, dt, attr, hint, + dof, th); - /* Because the create update RPC will be sent during declare phase, - * if creation reaches here, it means the object has been created - * successfully */ - dt->do_lu.lo_header->loh_attr |= LOHA_EXISTS | (attr->la_mode & S_IFMT); - obj->opo_empty = 1; + CDEBUG(D_INFO, "create md_object "DFID": rc = %d\n", + PFID(&dt->do_lu.lo_header->loh_fid), rc); + } - return 0; + if (rc == 0) { + dt->do_lu.lo_header->loh_attr |= LOHA_EXISTS | + (attr->la_mode & S_IFMT); + dt2osp_obj(dt)->opo_non_exist = 0; + } + + return rc; } -static int osp_md_declare_object_ref_del(const struct lu_env *env, - struct dt_object *dt, - struct thandle *th) +/** + * Add OUT_REF_DEL sub-request into the OUT RPC. + * + * \param[in] env execution environment + * \param[in] dt object to decrease the reference count. + * \param[in] th the transaction handle of refcount decrease. + * + * \retval 0 if the insertion succeeds. + * \retval negative errno if the insertion fails. + */ +static int __osp_md_ref_del(const struct lu_env *env, struct dt_object *dt, + struct thandle *th) { struct dt_update_request *update; - struct lu_fid *fid; int rc; - update = out_find_create_update_loc(th, dt); + update = dt_update_request_find_or_create(th, dt); if (IS_ERR(update)) { CERROR("%s: Get OSP update buf failed: rc = %d\n", dt->do_lu.lo_dev->ld_obd->obd_name, @@ -165,31 +253,84 @@ static int osp_md_declare_object_ref_del(const struct lu_env *env, return PTR_ERR(update); } - fid = (struct lu_fid *)lu_object_fid(&dt->do_lu); + rc = out_ref_del_pack(env, &update->dur_buf, + lu_object_fid(&dt->do_lu), + update->dur_batchid); + return rc; +} - rc = out_insert_update(env, update, OUT_REF_DEL, fid, 0, NULL, NULL); +/** + * Implementation of dt_object_operations::do_declare_ref_del + * + * For non-remote transaction, it will add an OUT_REF_DEL sub-request + * into the OUT RPC that will be flushed when the transaction start. + * + * \param[in] env execution environment + * \param[in] dt object to decrease the reference count. + * \param[in] th the transaction handle of refcount decrease. + * + * \retval 0 if the insertion succeeds. + * \retval negative errno if the insertion fails. + */ +static int osp_md_declare_ref_del(const struct lu_env *env, + struct dt_object *dt, struct thandle *th) +{ + int rc = 0; + + if (!is_only_remote_trans(th)) { + rc = __osp_md_ref_del(env, dt, th); + + CDEBUG(D_INFO, "declare ref del "DFID": rc = %d\n", + PFID(&dt->do_lu.lo_header->loh_fid), rc); + } return rc; } -static int osp_md_object_ref_del(const struct lu_env *env, - struct dt_object *dt, - struct thandle *th) +/** + * Implementation of dt_object_operations::do_ref_del + * + * For remote transaction, it will add an OUT_REF_DEL sub-request into + * the OUT RPC that will be flushed when the transaction stop. + * + * \param[in] env execution environment + * \param[in] dt object to decrease the reference count + * \param[in] th the transaction handle + * + * \retval only return 0 for now + */ +static int osp_md_ref_del(const struct lu_env *env, struct dt_object *dt, + struct thandle *th) { - CDEBUG(D_INFO, "ref del object "DFID"\n", - PFID(&dt->do_lu.lo_header->loh_fid)); + int rc = 0; - return 0; + if (is_only_remote_trans(th)) { + rc = __osp_md_ref_del(env, dt, th); + + CDEBUG(D_INFO, "ref del "DFID": rc = %d\n", + PFID(&dt->do_lu.lo_header->loh_fid), rc); + } + + return rc; } -static int osp_md_declare_ref_add(const struct lu_env *env, - struct dt_object *dt, struct thandle *th) +/** + * Add OUT_REF_ADD sub-request into the OUT RPC. + * + * \param[in] env execution environment + * \param[in] dt object on which to increase the reference count. + * \param[in] th the transaction handle. + * + * \retval 0 if the insertion succeeds. + * \retval negative errno if the insertion fails. + */ +static int __osp_md_ref_add(const struct lu_env *env, struct dt_object *dt, + struct thandle *th) { struct dt_update_request *update; - struct lu_fid *fid; int rc; - update = out_find_create_update_loc(th, dt); + update = dt_update_request_find_or_create(th, dt); if (IS_ERR(update)) { CERROR("%s: Get OSP update buf failed: rc = %d\n", dt->do_lu.lo_dev->ld_obd->obd_name, @@ -197,23 +338,82 @@ static int osp_md_declare_ref_add(const struct lu_env *env, return PTR_ERR(update); } - fid = (struct lu_fid *)lu_object_fid(&dt->do_lu); + rc = out_ref_add_pack(env, &update->dur_buf, + lu_object_fid(&dt->do_lu), + update->dur_batchid); + + return rc; +} + +/** + * Implementation of dt_object_operations::do_declare_ref_del + * + * For non-remote transaction, it will add an OUT_REF_ADD sub-request + * into the OUT RPC that will be flushed when the transaction start. + * + * \param[in] env execution environment + * \param[in] dt object on which to increase the reference count. + * \param[in] th the transaction handle. + * + * \retval 0 if the insertion succeeds. + * \retval negative errno if the insertion fails. + */ +static int osp_md_declare_ref_add(const struct lu_env *env, + struct dt_object *dt, struct thandle *th) +{ + int rc = 0; + + if (!is_only_remote_trans(th)) { + rc = __osp_md_ref_add(env, dt, th); - rc = out_insert_update(env, update, OUT_REF_ADD, fid, 0, NULL, NULL); + CDEBUG(D_INFO, "declare ref add "DFID": rc = %d\n", + PFID(&dt->do_lu.lo_header->loh_fid), rc); + } return rc; } -static int osp_md_object_ref_add(const struct lu_env *env, - struct dt_object *dt, - struct thandle *th) +/** + * Implementation of dt_object_operations::do_ref_add + * + * For remote transaction, it will add an OUT_REF_ADD sub-request into + * the OUT RPC that will be flushed when the transaction stop. + * + * \param[in] env execution environment + * \param[in] dt object on which to increase the reference count + * \param[in] th the transaction handle + * + * \retval only return 0 for now + */ +static int osp_md_ref_add(const struct lu_env *env, struct dt_object *dt, + struct thandle *th) { - CDEBUG(D_INFO, "ref add object "DFID"\n", - PFID(&dt->do_lu.lo_header->loh_fid)); + int rc = 0; - return 0; + if (is_only_remote_trans(th)) { + rc = __osp_md_ref_add(env, dt, th); + + CDEBUG(D_INFO, "ref add "DFID": rc = %d\n", + PFID(&dt->do_lu.lo_header->loh_fid), rc); + } + + return rc; } +/** + * Implementation of dt_object_operations::do_ah_init + * + * Initialize the allocation hint for object creation, which is usually called + * before the creation, and these hints (parent and child mode) will be sent to + * the remote Object Update Target (OUT) and used in the object create process, + * same as OSD object creation. + * + * \param[in] env execution environment + * \param[in] ah the hint to be initialized + * \param[in] parent the parent of the object + * \param[in] child the object to be created + * \param[in] child_mode the mode of the created object + */ static void osp_md_ah_init(const struct lu_env *env, struct dt_allocation_hint *ah, struct dt_object *parent, @@ -226,17 +426,24 @@ static void osp_md_ah_init(const struct lu_env *env, ah->dah_mode = child_mode; } -int osp_md_declare_attr_set(const struct lu_env *env, struct dt_object *dt, - const struct lu_attr *attr, struct thandle *th) +/** + * Add OUT_ATTR_SET sub-request into the OUT RPC. + * + * \param[in] env execution environment + * \param[in] dt object on which to set attributes + * \param[in] attr attributes to be set + * \param[in] th the transaction handle + * + * \retval 0 if the insertion succeeds. + * \retval negative errno if the insertion fails. + */ +int __osp_md_attr_set(const struct lu_env *env, struct dt_object *dt, + const struct lu_attr *attr, struct thandle *th) { - struct osp_thread_info *osi = osp_env_info(env); struct dt_update_request *update; - struct lu_fid *fid; - int size = sizeof(struct obdo); - char *buf; int rc; - update = out_find_create_update_loc(th, dt); + update = dt_update_request_find_or_create(th, dt); if (IS_ERR(update)) { CERROR("%s: Get OSP update buf failed: %d\n", dt->do_lu.lo_dev->ld_obd->obd_name, @@ -244,31 +451,86 @@ int osp_md_declare_attr_set(const struct lu_env *env, struct dt_object *dt, return PTR_ERR(update); } - osi->osi_obdo.o_valid = 0; - obdo_from_la(&osi->osi_obdo, (struct lu_attr *)attr, - attr->la_valid); - lustre_set_wire_obdo(NULL, &osi->osi_obdo, &osi->osi_obdo); - obdo_cpu_to_le(&osi->osi_obdo, &osi->osi_obdo); + rc = out_attr_set_pack(env, &update->dur_buf, + lu_object_fid(&dt->do_lu), attr, + update->dur_batchid); - buf = (char *)&osi->osi_obdo; - fid = (struct lu_fid *)lu_object_fid(&dt->do_lu); + return rc; +} - rc = out_insert_update(env, update, OUT_ATTR_SET, fid, 1, &size, - (const char **)&buf); +/** + * Implementation of dt_object_operations::do_declare_attr_get + * + * Declare setting attributes to the specified remote object. + * + * If the transaction is a non-remote transaction, then add the OUT_ATTR_SET + * sub-request into the OUT RPC that will be flushed when the transaction start. + * + * \param[in] env execution environment + * \param[in] dt object on which to set attributes + * \param[in] attr attributes to be set + * \param[in] th the transaction handle + * + * \retval 0 if the insertion succeeds. + * \retval negative errno if the insertion fails. + */ +int osp_md_declare_attr_set(const struct lu_env *env, struct dt_object *dt, + const struct lu_attr *attr, struct thandle *th) +{ + int rc = 0; + + if (!is_only_remote_trans(th)) { + rc = __osp_md_attr_set(env, dt, attr, th); + + CDEBUG(D_INFO, "declare attr set md_object "DFID": rc = %d\n", + PFID(&dt->do_lu.lo_header->loh_fid), rc); + } return rc; } +/** + * Implementation of dt_object_operations::do_attr_set + * + * Set attributes to the specified remote object. + * + * If the transaction is a remote transaction, then add the OUT_ATTR_SET + * sub-request into the OUT RPC that will be flushed when the transaction stop. + * + * \param[in] env execution environment + * \param[in] dt object to set attributes + * \param[in] attr attributes to be set + * \param[in] th the transaction handle + * + * \retval only return 0 for now + */ int osp_md_attr_set(const struct lu_env *env, struct dt_object *dt, - const struct lu_attr *attr, struct thandle *th, - struct lustre_capa *capa) + const struct lu_attr *attr, struct thandle *th) { - CDEBUG(D_INFO, "attr set object "DFID"\n", - PFID(&dt->do_lu.lo_header->loh_fid)); + int rc = 0; + + if (is_only_remote_trans(th)) { + rc = __osp_md_attr_set(env, dt, attr, th); + + CDEBUG(D_INFO, "attr set md_object "DFID": rc = %d\n", + PFID(&dt->do_lu.lo_header->loh_fid), rc); + } - RETURN(0); + return rc; } +/** + * Implementation of dt_object_operations::do_read_lock + * + * osp_md_object_{read,write}_lock() will only lock the remote object in the + * local cache, which uses the semaphore (opo_sem) inside the osp_object to + * lock the object. Note: it will not lock the object in the whole cluster, + * which relies on the LDLM lock. + * + * \param[in] env execution environment + * \param[in] dt object to be locked + * \param[in] role lock role from MDD layer, see mdd_object_role(). + */ static void osp_md_object_read_lock(const struct lu_env *env, struct dt_object *dt, unsigned role) { @@ -280,6 +542,15 @@ static void osp_md_object_read_lock(const struct lu_env *env, LASSERT(obj->opo_owner == NULL); } +/** + * Implementation of dt_object_operations::do_write_lock + * + * Lock the remote object in write mode. + * + * \param[in] env execution environment + * \param[in] dt object to be locked + * \param[in] role lock role from MDD layer, see mdd_object_role(). + */ static void osp_md_object_write_lock(const struct lu_env *env, struct dt_object *dt, unsigned role) { @@ -291,6 +562,14 @@ static void osp_md_object_write_lock(const struct lu_env *env, obj->opo_owner = env; } +/** + * Implementation of dt_object_operations::do_read_unlock + * + * Unlock the read lock of remote object. + * + * \param[in] env execution environment + * \param[in] dt object to be unlocked + */ static void osp_md_object_read_unlock(const struct lu_env *env, struct dt_object *dt) { @@ -299,6 +578,14 @@ static void osp_md_object_read_unlock(const struct lu_env *env, up_read(&obj->opo_sem); } +/** + * Implementation of dt_object_operations::do_write_unlock + * + * Unlock the write lock of remote object. + * + * \param[in] env execution environment + * \param[in] dt object to be unlocked + */ static void osp_md_object_write_unlock(const struct lu_env *env, struct dt_object *dt) { @@ -309,6 +596,14 @@ static void osp_md_object_write_unlock(const struct lu_env *env, up_write(&obj->opo_sem); } +/** + * Implementation of dt_object_operations::do_write_locked + * + * Test if the object is locked in write mode. + * + * \param[in] env execution environment + * \param[in] dt object to be tested + */ static int osp_md_object_write_locked(const struct lu_env *env, struct dt_object *dt) { @@ -317,9 +612,22 @@ static int osp_md_object_write_locked(const struct lu_env *env, return obj->opo_owner == env; } +/** + * Implementation of dt_index_operations::dio_lookup + * + * Look up record by key under a remote index object. It packs lookup update + * into RPC, sends to the remote OUT and waits for the lookup result. + * + * \param[in] env execution environment + * \param[in] dt index object to lookup + * \param[out] rec record in which to return lookup result + * \param[in] key key of index which will be looked up + * + * \retval 1 if the lookup succeeds. + * \retval negative errno if the lookup fails. + */ static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt, - struct dt_rec *rec, const struct dt_key *key, - struct lustre_capa *capa) + struct dt_rec *rec, const struct dt_key *key) { struct lu_buf *lbuf = &osp_env_info(env)->osi_lb2; struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev); @@ -327,7 +635,6 @@ static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt, struct dt_update_request *update; struct object_update_reply *reply; struct ptlrpc_request *req = NULL; - int size = strlen((char *)key) + 1; struct lu_fid *fid; int rc; ENTRY; @@ -336,20 +643,19 @@ static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt, * just create an update buffer, instead of attaching the * update_remote list of the thandle. */ - update = out_create_update_req(dt_dev); + update = dt_update_request_create(dt_dev); if (IS_ERR(update)) RETURN(PTR_ERR(update)); - rc = out_insert_update(env, update, OUT_INDEX_LOOKUP, - lu_object_fid(&dt->do_lu), - 1, &size, (const char **)&key); - if (rc) { + rc = out_index_lookup_pack(env, &update->dur_buf, + lu_object_fid(&dt->do_lu), rec, key); + if (rc != 0) { CERROR("%s: Insert update error: rc = %d\n", dt_dev->dd_lu_dev.ld_obd->obd_name, rc); GOTO(out, rc); } - rc = out_remote_sync(env, osp->opd_obd->u.cli.cl_import, update, &req); + rc = osp_remote_sync(env, osp, update, &req, false); if (rc < 0) GOTO(out, rc); @@ -365,7 +671,7 @@ static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt, rc = object_update_result_data_get(reply, lbuf, 0); if (rc < 0) - GOTO(out, rc = size); + GOTO(out, rc); if (lbuf->lb_len != sizeof(*fid)) { CERROR("%s: lookup "DFID" %s wrong size %d\n", @@ -376,7 +682,8 @@ static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt, } fid = lbuf->lb_buf; - fid_le_to_cpu(fid, fid); + if (ptlrpc_rep_need_swab(req)) + lustre_swab_lu_fid(fid); if (!fid_is_sane(fid)) { CERROR("%s: lookup "DFID" %s invalid fid "DFID"\n", dt_dev->dd_lu_dev.ld_obd->obd_name, @@ -392,26 +699,33 @@ out: if (req != NULL) ptlrpc_req_finished(req); - out_destroy_update_req(update); + dt_update_request_destroy(update); return rc; } -static int osp_md_declare_insert(const struct lu_env *env, +/** + * Add OUT_INDEX_INSERT sub-request into the OUT RPC. + * + * \param[in] env execution environment + * \param[in] dt object for which to insert index + * \param[in] rec record of the index which will be inserted + * \param[in] key key of the index which will be inserted + * \param[in] th the transaction handle + * + * \retval 0 if the insertion succeeds. + * \retval negative errno if the insertion fails. + */ +static int __osp_md_index_insert(const struct lu_env *env, struct dt_object *dt, const struct dt_rec *rec, const struct dt_key *key, struct thandle *th) { struct dt_update_request *update; - struct lu_fid *fid; - struct lu_fid *rec_fid = (struct lu_fid *)rec; - int size[2] = {strlen((char *)key) + 1, - sizeof(*rec_fid)}; - const char *bufs[2] = {(char *)key, (char *)rec_fid}; int rc; - update = out_find_create_update_loc(th, dt); + update = dt_update_request_find_or_create(th, dt); if (IS_ERR(update)) { CERROR("%s: Get OSP update buf failed: rc = %d\n", dt->do_lu.lo_dev->ld_obd->obd_name, @@ -419,41 +733,103 @@ static int osp_md_declare_insert(const struct lu_env *env, return PTR_ERR(update); } - fid = (struct lu_fid *)lu_object_fid(&dt->do_lu); + rc = out_index_insert_pack(env, &update->dur_buf, + lu_object_fid(&dt->do_lu), rec, key, + update->dur_batchid); + return rc; +} - CDEBUG(D_INFO, "%s: insert index of "DFID" %s: "DFID"\n", - dt->do_lu.lo_dev->ld_obd->obd_name, - PFID(fid), (char *)key, PFID(rec_fid)); +/** + * Implementation of dt_index_operations::dio_declare_insert + * + * For non-remote transaction, it will add an OUT_INDEX_INSERT sub-request + * into the OUT RPC that will be flushed when the transaction start. + * + * \param[in] env execution environment + * \param[in] dt object for which to insert index + * \param[in] rec record of the index which will be inserted + * \param[in] key key of the index which will be inserted + * \param[in] th the transaction handle + * + * \retval 0 if the insertion succeeds. + * \retval negative errno if the insertion fails. + */ +static int osp_md_declare_index_insert(const struct lu_env *env, + struct dt_object *dt, + const struct dt_rec *rec, + const struct dt_key *key, + struct thandle *th) +{ + int rc = 0; - fid_cpu_to_le(rec_fid, rec_fid); + if (!is_only_remote_trans(th)) { + rc = __osp_md_index_insert(env, dt, rec, key, th); + + CDEBUG(D_INFO, "declare index insert "DFID" key %s, rec "DFID + ": rc = %d\n", PFID(&dt->do_lu.lo_header->loh_fid), + (char *)key, + PFID(((struct dt_insert_rec *)rec)->rec_fid), rc); + } - rc = out_insert_update(env, update, OUT_INDEX_INSERT, fid, - ARRAY_SIZE(size), size, bufs); return rc; } +/** + * Implementation of dt_index_operations::dio_insert + * + * For remote transaction, it will add an OUT_INDEX_INSERT sub-request + * into the OUT RPC that will be flushed when the transaction stop. + * + * \param[in] env execution environment + * \param[in] dt object for which to insert index + * \param[in] rec record of the index to be inserted + * \param[in] key key of the index to be inserted + * \param[in] th the transaction handle + * \param[in] ignore_quota quota enforcement for insert + * + * \retval only return 0 for now + */ static int osp_md_index_insert(const struct lu_env *env, struct dt_object *dt, const struct dt_rec *rec, const struct dt_key *key, struct thandle *th, - struct lustre_capa *capa, int ignore_quota) { - return 0; + int rc = 0; + + if (is_only_remote_trans(th)) { + rc = __osp_md_index_insert(env, dt, rec, key, th); + + CDEBUG(D_INFO, "index insert "DFID" key %s, rec "DFID + ": rc = %d\n", PFID(&dt->do_lu.lo_header->loh_fid), + (char *)key, + PFID(((struct dt_insert_rec *)rec)->rec_fid), rc); + } + + return rc; } -static int osp_md_declare_delete(const struct lu_env *env, +/** + * Add OUT_INDEX_DELETE sub-request into the OUT RPC. + * + * \param[in] env execution environment + * \param[in] dt object for which to delete index + * \param[in] key key of the index + * \param[in] th the transaction handle + * + * \retval 0 if the insertion succeeds. + * \retval negative errno if the insertion fails. + */ +static int __osp_md_index_delete(const struct lu_env *env, struct dt_object *dt, const struct dt_key *key, struct thandle *th) { struct dt_update_request *update; - struct lu_fid *fid; - int size = strlen((char *)key) + 1; - int rc; + int rc; - update = out_find_create_update_loc(th, dt); + update = dt_update_request_find_or_create(th, dt); if (IS_ERR(update)) { CERROR("%s: Get OSP update buf failed: rc = %d\n", dt->do_lu.lo_dev->ld_obd->obd_name, @@ -461,134 +837,259 @@ static int osp_md_declare_delete(const struct lu_env *env, return PTR_ERR(update); } - fid = (struct lu_fid *)lu_object_fid(&dt->do_lu); - - rc = out_insert_update(env, update, OUT_INDEX_DELETE, fid, 1, &size, - (const char **)&key); - + rc = out_index_delete_pack(env, &update->dur_buf, + lu_object_fid(&dt->do_lu), key, + update->dur_batchid); return rc; } -static int osp_md_index_delete(const struct lu_env *env, - struct dt_object *dt, - const struct dt_key *key, - struct thandle *th, - struct lustre_capa *capa) +/** + * Implementation of dt_index_operations::dio_declare_delete + * + * For non-remote transaction, it will add an OUT_INDEX_DELETE sub-request + * into the OUT RPC that will be flushed when the transaction start. + * + * \param[in] env execution environment + * \param[in] dt object for which to delete index + * \param[in] key key of the index + * \param[in] th the transaction handle + * + * \retval 0 if the insertion succeeds. + * \retval negative errno if the insertion fails. + */ +static int osp_md_declare_index_delete(const struct lu_env *env, + struct dt_object *dt, + const struct dt_key *key, + struct thandle *th) { - CDEBUG(D_INFO, "index delete "DFID" %s\n", - PFID(&dt->do_lu.lo_header->loh_fid), (char *)key); + int rc = 0; - return 0; + if (!is_only_remote_trans(th)) { + rc = __osp_md_index_delete(env, dt, key, th); + + CDEBUG(D_INFO, "declare index delete "DFID" %s: rc = %d\n", + PFID(&dt->do_lu.lo_header->loh_fid), (char *)key, rc); + } + + return rc; } /** - * Creates or initializes iterator context. + * Implementation of dt_index_operations::dio_delete + * + * For remote transaction, it will add an OUT_INDEX_DELETE sub-request + * into the OUT RPC that will be flushed when the transaction stop. * - * Note: for OSP, these index iterate api is only used to check - * whether the directory is empty now (see mdd_dir_is_empty). - * Since dir_empty will be return by OUT_ATTR_GET(see osp_attr_get/ - * out_attr_get). So the implementation of these iterator is simplied - * to make mdd_dir_is_empty happy. The real iterator should be - * implemented, if we need it one day. + * \param[in] env execution environment + * \param[in] dt object for which to delete index + * \param[in] key key of the index which will be deleted + * \param[in] th the transaction handle + * + * \retval only return 0 for now */ -static struct dt_it *osp_it_init(const struct lu_env *env, - struct dt_object *dt, - __u32 attr, - struct lustre_capa *capa) +static int osp_md_index_delete(const struct lu_env *env, + struct dt_object *dt, + const struct dt_key *key, + struct thandle *th) { - lu_object_get(&dt->do_lu); - return (struct dt_it *)dt; -} + int rc = 0; -static void osp_it_fini(const struct lu_env *env, struct dt_it *di) -{ - struct dt_object *dt = (struct dt_object *)di; - lu_object_put(env, &dt->do_lu); -} + if (is_only_remote_trans(th)) { + rc = __osp_md_index_delete(env, dt, key, th); -static int osp_it_get(const struct lu_env *env, - struct dt_it *di, const struct dt_key *key) -{ - return 1; -} + CDEBUG(D_INFO, "index delete "DFID" %s: rc = %d\n", + PFID(&dt->do_lu.lo_header->loh_fid), (char *)key, rc); + } -static void osp_it_put(const struct lu_env *env, struct dt_it *di) -{ - return; + return rc; } -static int osp_it_next(const struct lu_env *env, struct dt_it *di) +/** + * Implementation of dt_index_operations::dio_it.next + * + * Advance the pointer of the iterator to the next entry. It shares a similar + * internal implementation with osp_orphan_it_next(), which is being used for + * remote orphan index object. This method will be used for remote directory. + * + * \param[in] env execution environment + * \param[in] di iterator of this iteration + * + * \retval 0 if the pointer is advanced successfuly. + * \retval 1 if it reaches to the end of the index object. + * \retval negative errno if the pointer cannot be advanced. + */ +static int osp_md_index_it_next(const struct lu_env *env, struct dt_it *di) { - struct dt_object *dt = (struct dt_object *)di; - struct osp_object *o = dt2osp_obj(dt); + struct osp_it *it = (struct osp_it *)di; + struct lu_idxpage *idxpage; + struct lu_dirent *ent = (struct lu_dirent *)it->ooi_ent; + int rc; + ENTRY; - if (o->opo_empty) - return 1; +again: + idxpage = it->ooi_cur_idxpage; + if (idxpage != NULL) { + if (idxpage->lip_nr == 0) + RETURN(1); + + it->ooi_pos_ent++; + if (ent == NULL) { + it->ooi_ent = + (struct lu_dirent *)idxpage->lip_entries; + RETURN(0); + } else if (le16_to_cpu(ent->lde_reclen) != 0 && + it->ooi_pos_ent < idxpage->lip_nr) { + ent = (struct lu_dirent *)(((char *)ent) + + le16_to_cpu(ent->lde_reclen)); + it->ooi_ent = ent; + RETURN(0); + } else { + it->ooi_ent = NULL; + } + } - return 0; + rc = osp_it_next_page(env, di); + if (rc == 0) + goto again; + + RETURN(rc); } +/** + * Implementation of dt_index_operations::dio_it.key + * + * Get the key at current iterator poisiton. These iteration methods + * (dio_it) will only be used for iterating the remote directory, so + * the key is the name of the directory entry. + * + * \param[in] env execution environment + * \param[in] di iterator of this iteration + * + * \retval name of the current entry + */ static struct dt_key *osp_it_key(const struct lu_env *env, const struct dt_it *di) { - LBUG(); - return NULL; + struct osp_it *it = (struct osp_it *)di; + struct lu_dirent *ent = (struct lu_dirent *)it->ooi_ent; + + return (struct dt_key *)ent->lde_name; } +/** + * Implementation of dt_index_operations::dio_it.key_size + * + * Get the key size at current iterator poisiton. These iteration methods + * (dio_it) will only be used for iterating the remote directory, so the key + * size is the name size of the directory entry. + * + * \param[in] env execution environment + * \param[in] di iterator of this iteration + * + * \retval name size of the current entry + */ + static int osp_it_key_size(const struct lu_env *env, const struct dt_it *di) { - LBUG(); - return 0; -} + struct osp_it *it = (struct osp_it *)di; + struct lu_dirent *ent = (struct lu_dirent *)it->ooi_ent; -static int osp_it_rec(const struct lu_env *env, const struct dt_it *di, - struct dt_rec *lde, __u32 attr) -{ - LBUG(); - return 0; + return (int)le16_to_cpu(ent->lde_namelen); } -static __u64 osp_it_store(const struct lu_env *env, const struct dt_it *di) +/** + * Implementation of dt_index_operations::dio_it.rec + * + * Get the record at current iterator position. These iteration methods + * (dio_it) will only be used for iterating the remote directory, so it + * uses lu_dirent_calc_size() to calculate the record size. + * + * \param[in] env execution environment + * \param[in] di iterator of this iteration + * \param[out] rec the record to be returned + * \param[in] attr attributes of the index object, so it knows + * how to pack the entry. + * + * \retval only return 0 for now + */ +static int osp_md_index_it_rec(const struct lu_env *env, const struct dt_it *di, + struct dt_rec *rec, __u32 attr) { - LBUG(); + struct osp_it *it = (struct osp_it *)di; + struct lu_dirent *ent = (struct lu_dirent *)it->ooi_ent; + size_t reclen; + + reclen = lu_dirent_calc_size(le16_to_cpu(ent->lde_namelen), attr); + memcpy(rec, ent, reclen); return 0; } +/** + * Implementation of dt_index_operations::dio_it.load + * + * Locate the iteration cursor to the specified position (cookie). + * + * \param[in] env pointer to the thread context + * \param[in] di pointer to the iteration structure + * \param[in] hash the specified position + * + * \retval positive number for locating to the exactly position + * or the next + * \retval 0 for arriving at the end of the iteration + * \retval negative error number on failure + */ static int osp_it_load(const struct lu_env *env, const struct dt_it *di, __u64 hash) { - LBUG(); - return 0; -} + struct osp_it *it = (struct osp_it *)di; + int rc; -static int osp_it_key_rec(const struct lu_env *env, const struct dt_it *di, - void *key_rec) -{ - LBUG(); - return 0; + it->ooi_next = hash; + rc = osp_md_index_it_next(env, (struct dt_it *)di); + if (rc == 1) + return 0; + + if (rc == 0) + return 1; + + return rc; } -static const struct dt_index_operations osp_md_index_ops = { +const struct dt_index_operations osp_md_index_ops = { .dio_lookup = osp_md_index_lookup, - .dio_declare_insert = osp_md_declare_insert, + .dio_declare_insert = osp_md_declare_index_insert, .dio_insert = osp_md_index_insert, - .dio_declare_delete = osp_md_declare_delete, + .dio_declare_delete = osp_md_declare_index_delete, .dio_delete = osp_md_index_delete, .dio_it = { .init = osp_it_init, .fini = osp_it_fini, .get = osp_it_get, .put = osp_it_put, - .next = osp_it_next, + .next = osp_md_index_it_next, .key = osp_it_key, .key_size = osp_it_key_size, - .rec = osp_it_rec, + .rec = osp_md_index_it_rec, .store = osp_it_store, .load = osp_it_load, .key_rec = osp_it_key_rec, } }; +/** + * Implementation of dt_object_operations::do_index_try + * + * Try to initialize the index API pointer for the given object. This + * is the entry point of the index API, i.e. we must call this method + * to initialize the index object before calling other index methods. + * + * \param[in] env execution environment + * \param[in] dt index object to be initialized + * \param[in] feat the index feature of the object + * + * \retval 0 if the initialization succeeds. + * \retval negative errno if the initialization fails. + */ static int osp_md_index_try(const struct lu_env *env, struct dt_object *dt, const struct dt_index_features *feat) @@ -597,6 +1098,21 @@ static int osp_md_index_try(const struct lu_env *env, return 0; } +/** + * Implementation of dt_object_operations::do_object_lock + * + * Enqueue a lock (by ldlm_cli_enqueue()) of remote object on the remote MDT, + * which will lock the object in the global namespace. + * + * \param[in] env execution environment + * \param[in] dt object to be locked + * \param[out] lh lock handle + * \param[in] einfo enqueue information + * \param[in] policy lock policy + * + * \retval ELDLM_OK if locking the object succeeds. + * \retval negative errno if locking fails. + */ static int osp_md_object_lock(const struct lu_env *env, struct dt_object *dt, struct lustre_handle *lh, @@ -634,6 +1150,18 @@ static int osp_md_object_lock(const struct lu_env *env, return rc == ELDLM_OK ? 0 : -EIO; } +/** + * Implementation of dt_object_operations::do_object_unlock + * + * Cancel a lock of a remote object. + * + * \param[in] env execution environment + * \param[in] dt object to be unlocked + * \param[in] einfo lock enqueue information + * \param[in] policy lock policy + * + * \retval Only return 0 for now. + */ static int osp_md_object_unlock(const struct lu_env *env, struct dt_object *dt, struct ldlm_enqueue_info *einfo, @@ -656,9 +1184,9 @@ struct dt_object_operations osp_md_obj_ops = { .do_declare_create = osp_md_declare_object_create, .do_create = osp_md_object_create, .do_declare_ref_add = osp_md_declare_ref_add, - .do_ref_add = osp_md_object_ref_add, - .do_declare_ref_del = osp_md_declare_object_ref_del, - .do_ref_del = osp_md_object_ref_del, + .do_ref_add = osp_md_ref_add, + .do_declare_ref_del = osp_md_declare_ref_del, + .do_ref_del = osp_md_ref_del, .do_declare_destroy = osp_declare_object_destroy, .do_destroy = osp_object_destroy, .do_ah_init = osp_md_ah_init, @@ -668,24 +1196,37 @@ struct dt_object_operations osp_md_obj_ops = { .do_xattr_get = osp_xattr_get, .do_declare_xattr_set = osp_declare_xattr_set, .do_xattr_set = osp_xattr_set, + .do_declare_xattr_del = osp_declare_xattr_del, + .do_xattr_del = osp_xattr_del, .do_index_try = osp_md_index_try, .do_object_lock = osp_md_object_lock, .do_object_unlock = osp_md_object_unlock, }; +/** + * Implementation of dt_body_operations::dbo_declare_write + * + * Declare an object write. In DNE phase I, it will pack the write + * object update into the RPC. + * + * \param[in] env execution environment + * \param[in] dt object to be written + * \param[in] buf buffer to write which includes an embedded size field + * \param[in] pos offet in the object to start writing at + * \param[in] th transaction handle + * + * \retval 0 if the insertion succeeds. + * \retval negative errno if the insertion fails. + */ static ssize_t osp_md_declare_write(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, loff_t pos, struct thandle *th) { struct dt_update_request *update; - struct lu_fid *fid; - int sizes[2] = {buf->lb_len, sizeof(pos)}; - const char *bufs[2] = {(char *)buf->lb_buf, - (char *)&pos}; ssize_t rc; - update = out_find_create_update_loc(th, dt); + update = dt_update_request_find_or_create(th, dt); if (IS_ERR(update)) { CERROR("%s: Get OSP update buf failed: rc = %d\n", dt->do_lu.lo_dev->ld_obd->obd_name, @@ -693,21 +1234,34 @@ static ssize_t osp_md_declare_write(const struct lu_env *env, return PTR_ERR(update); } - pos = cpu_to_le64(pos); - bufs[1] = (char *)&pos; - fid = (struct lu_fid *)lu_object_fid(&dt->do_lu); - rc = out_insert_update(env, update, OUT_WRITE, fid, - ARRAY_SIZE(sizes), sizes, bufs); + rc = out_write_pack(env, &update->dur_buf, lu_object_fid(&dt->do_lu), + buf, pos, update->dur_batchid); return rc; } +/** + * Implementation of dt_body_operations::dbo_write + * + * Return the buffer size. In DNE phase I, remote updates + * are actually executed during transaction start, the buffer has + * already been written when this method is being called. + * + * \param[in] env execution environment + * \param[in] dt object to be written + * \param[in] buf buffer to write which includes an embedded size field + * \param[in] pos offet in the object to start writing at + * \param[in] th transaction handle + * \param[in] ignore_quota quota enforcement for this write + * + * \retval the buffer size in bytes. + */ static ssize_t osp_md_write(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, loff_t *pos, - struct thandle *handle, - struct lustre_capa *capa, int ignore_quota) + struct thandle *handle, int ignore_quota) { + *pos += buf->lb_len; return buf->lb_len; }