X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fosp%2Fosp_md_object.c;h=a587b57aac6486db49ef20c405606b342db8479a;hb=b4e6b6b280626dbafebe3e35858707f3143de24a;hp=2b924cda129cac56794b1741113861d881ea9d9a;hpb=308518230e4f410bd961e82b54e868eb53616b97;p=fs%2Flustre-release.git diff --git a/lustre/osp/osp_md_object.c b/lustre/osp/osp_md_object.c index 2b924cd..a587b57 100644 --- a/lustre/osp/osp_md_object.c +++ b/lustre/osp/osp_md_object.c @@ -20,7 +20,7 @@ * GPL HEADER END */ /* - * Copyright (c) 2013, Intel Corporation. + * Copyright (c) 2013, 2014, Intel Corporation. */ /* * lustre/osp/osp_md_object.c @@ -60,61 +60,33 @@ static const char dot[] = "."; static const char dotdot[] = ".."; /** - * Implementation of dt_object_operations::do_declare_create + * Add OUT_CREATE sub-request into the OUT RPC. * - * Insert object create update into the RPC, which will be sent during - * transaction start. Note: if the object has already been created, - * we must add object destroy updates ahead of create updates, so it will - * destroy then recreate the object. + * Note: if the object has already been created, we must add object + * destroy sub-request ahead of the create, so it will destroy then + * re-create the object. * * \param[in] env execution environment - * \param[in] dt remote object to be created + * \param[in] dt object to be created * \param[in] attr attribute of the created object * \param[in] hint creation hint * \param[in] dof creation format information * \param[in] th the transaction handle * - * \retval 0 if the insertion succeeds. - * \retval negative errno if the insertion fails. + * \retval only return 0 for now */ -int osp_md_declare_object_create(const struct lu_env *env, - struct dt_object *dt, - struct lu_attr *attr, - struct dt_allocation_hint *hint, - struct dt_object_format *dof, - struct thandle *th) +static int __osp_md_declare_object_create(const struct lu_env *env, + struct dt_object *dt, + struct lu_attr *attr, + struct dt_allocation_hint *hint, + struct dt_object_format *dof, + struct thandle *th) { - struct osp_thread_info *osi = osp_env_info(env); struct dt_update_request *update; - struct lu_fid *fid1; - int sizes[2] = {sizeof(struct obdo), 0}; - char *bufs[2] = {NULL, NULL}; - int buf_count; int rc; - update = out_find_create_update_loc(th, dt); - if (IS_ERR(update)) { - CERROR("%s: Get OSP update buf failed: rc = %d\n", - dt->do_lu.lo_dev->ld_obd->obd_name, - (int)PTR_ERR(update)); - return PTR_ERR(update); - } - - osi->osi_obdo.o_valid = 0; - obdo_from_la(&osi->osi_obdo, attr, attr->la_valid); - lustre_set_wire_obdo(NULL, &osi->osi_obdo, &osi->osi_obdo); - - bufs[0] = (char *)&osi->osi_obdo; - buf_count = 1; - fid1 = (struct lu_fid *)lu_object_fid(&dt->do_lu); - if (hint != NULL && hint->dah_parent) { - struct lu_fid *fid2; - - fid2 = (struct lu_fid *)lu_object_fid(&hint->dah_parent->do_lu); - sizes[1] = sizeof(*fid2); - bufs[1] = (char *)fid2; - buf_count++; - } + update = thandle_to_dt_update_request(th); + LASSERT(update != NULL); if (lu_object_exists(&dt->do_lu)) { /* If the object already exists, we needs to destroy @@ -131,23 +103,27 @@ int osp_md_declare_object_create(const struct lu_env *env, * but find the object already exists */ CDEBUG(D_HA, "%s: object "DFID" exists, destroy this orphan\n", - dt->do_lu.lo_dev->ld_obd->obd_name, PFID(fid1)); + dt->do_lu.lo_dev->ld_obd->obd_name, + PFID(lu_object_fid(&dt->do_lu))); - rc = out_insert_update(env, update, OUT_REF_DEL, fid1, 0, - NULL, NULL); + rc = out_ref_del_pack(env, &update->dur_buf, + lu_object_fid(&dt->do_lu), + update->dur_batchid); if (rc != 0) GOTO(out, rc); if (S_ISDIR(lu_object_attr(&dt->do_lu))) { /* decrease for ".." */ - rc = out_insert_update(env, update, OUT_REF_DEL, fid1, - 0, NULL, NULL); + rc = out_ref_del_pack(env, &update->dur_buf, + lu_object_fid(&dt->do_lu), + update->dur_batchid); if (rc != 0) GOTO(out, rc); } - rc = out_insert_update(env, update, OUT_DESTROY, fid1, 0, NULL, - NULL); + rc = out_object_destroy_pack(env, &update->dur_buf, + lu_object_fid(&dt->do_lu), + update->dur_batchid); if (rc != 0) GOTO(out, rc); @@ -157,8 +133,11 @@ int osp_md_declare_object_create(const struct lu_env *env, update_inc_batchid(update); } - rc = out_insert_update(env, update, OUT_CREATE, fid1, buf_count, sizes, - (const char **)bufs); + rc = out_create_pack(env, &update->dur_buf, + lu_object_fid(&dt->do_lu), attr, hint, dof, + update->dur_batchid); + if (rc != 0) + GOTO(out, rc); out: if (rc) CERROR("%s: Insert update error: rc = %d\n", @@ -168,8 +147,47 @@ out: } /** + * Implementation of dt_object_operations::do_declare_create + * + * For non-remote transaction, it will add an OUT_CREATE sub-request + * into the OUT RPC that will be flushed when the transaction start. + * + * \param[in] env execution environment + * \param[in] dt remote object to be created + * \param[in] attr attribute of the created object + * \param[in] hint creation hint + * \param[in] dof creation format information + * \param[in] th the transaction handle + * + * \retval 0 if the insertion succeeds. + * \retval negative errno if the insertion fails. + */ +int osp_md_declare_object_create(const struct lu_env *env, + struct dt_object *dt, + struct lu_attr *attr, + struct dt_allocation_hint *hint, + struct dt_object_format *dof, + struct thandle *th) +{ + int rc = 0; + + if (!is_only_remote_trans(th)) { + rc = __osp_md_declare_object_create(env, dt, attr, hint, + dof, th); + + CDEBUG(D_INFO, "declare create md_object "DFID": rc = %d\n", + PFID(&dt->do_lu.lo_header->loh_fid), rc); + } + + return rc; +} + +/** * Implementation of dt_object_operations::do_create * + * For remote transaction, it will add an OUT_CREATE sub-request into + * the OUT RPC that will be flushed when the transaction stop. + * * It sets necessary flags for created object. In DNE phase I, * remote updates are actually executed during transaction start, * i.e. the object has already been created when calling this method. @@ -187,23 +205,27 @@ int osp_md_object_create(const struct lu_env *env, struct dt_object *dt, struct lu_attr *attr, struct dt_allocation_hint *hint, struct dt_object_format *dof, struct thandle *th) { - CDEBUG(D_INFO, "create object "DFID"\n", - PFID(&dt->do_lu.lo_header->loh_fid)); + int rc = 0; - /* Because the create update RPC will be sent during declare phase, - * if creation reaches here, it means the object has been created - * successfully */ - dt->do_lu.lo_header->loh_attr |= LOHA_EXISTS | (attr->la_mode & S_IFMT); + if (is_only_remote_trans(th)) { + rc = __osp_md_declare_object_create(env, dt, attr, hint, + dof, th); - return 0; + CDEBUG(D_INFO, "create md_object "DFID": rc = %d\n", + PFID(&dt->do_lu.lo_header->loh_fid), rc); + } + + if (rc == 0) { + dt->do_lu.lo_header->loh_attr |= LOHA_EXISTS | + (attr->la_mode & S_IFMT); + dt2osp_obj(dt)->opo_non_exist = 0; + } + + return rc; } /** - * Implementation of dt_object_operations::do_declare_ref_del - * - * Declare decreasing the reference count of the remote object, i.e. insert - * decreasing object reference count update into the RPC, which will be sent - * during transaction start. + * Add OUT_REF_DEL sub-request into the OUT RPC. * * \param[in] env execution environment * \param[in] dt object to decrease the reference count. @@ -212,25 +234,45 @@ int osp_md_object_create(const struct lu_env *env, struct dt_object *dt, * \retval 0 if the insertion succeeds. * \retval negative errno if the insertion fails. */ -static int osp_md_declare_object_ref_del(const struct lu_env *env, - struct dt_object *dt, - struct thandle *th) +static int __osp_md_ref_del(const struct lu_env *env, struct dt_object *dt, + struct thandle *th) { struct dt_update_request *update; - struct lu_fid *fid; int rc; - update = out_find_create_update_loc(th, dt); - if (IS_ERR(update)) { - CERROR("%s: Get OSP update buf failed: rc = %d\n", - dt->do_lu.lo_dev->ld_obd->obd_name, - (int)PTR_ERR(update)); - return PTR_ERR(update); - } + update = thandle_to_dt_update_request(th); + LASSERT(update != NULL); + + rc = out_ref_del_pack(env, &update->dur_buf, + lu_object_fid(&dt->do_lu), + update->dur_batchid); + return rc; +} + +/** + * Implementation of dt_object_operations::do_declare_ref_del + * + * For non-remote transaction, it will add an OUT_REF_DEL sub-request + * into the OUT RPC that will be flushed when the transaction start. + * + * \param[in] env execution environment + * \param[in] dt object to decrease the reference count. + * \param[in] th the transaction handle of refcount decrease. + * + * \retval 0 if the insertion succeeds. + * \retval negative errno if the insertion fails. + */ +static int osp_md_declare_ref_del(const struct lu_env *env, + struct dt_object *dt, struct thandle *th) +{ + int rc = 0; - fid = (struct lu_fid *)lu_object_fid(&dt->do_lu); + if (!is_only_remote_trans(th)) { + rc = __osp_md_ref_del(env, dt, th); - rc = out_insert_update(env, update, OUT_REF_DEL, fid, 0, NULL, NULL); + CDEBUG(D_INFO, "declare ref del "DFID": rc = %d\n", + PFID(&dt->do_lu.lo_header->loh_fid), rc); + } return rc; } @@ -238,9 +280,8 @@ static int osp_md_declare_object_ref_del(const struct lu_env *env, /** * Implementation of dt_object_operations::do_ref_del * - * Do nothing in this method for now. In DNE phase I, remote updates are - * actually executed during transaction start, i.e. the object reference - * count has already been decreased when calling this method. + * For remote transaction, it will add an OUT_REF_DEL sub-request into + * the OUT RPC that will be flushed when the transaction stop. * * \param[in] env execution environment * \param[in] dt object to decrease the reference count @@ -248,21 +289,52 @@ static int osp_md_declare_object_ref_del(const struct lu_env *env, * * \retval only return 0 for now */ -static int osp_md_object_ref_del(const struct lu_env *env, - struct dt_object *dt, - struct thandle *th) +static int osp_md_ref_del(const struct lu_env *env, struct dt_object *dt, + struct thandle *th) { - CDEBUG(D_INFO, "ref del object "DFID"\n", - PFID(&dt->do_lu.lo_header->loh_fid)); + int rc = 0; - return 0; + if (is_only_remote_trans(th)) { + rc = __osp_md_ref_del(env, dt, th); + + CDEBUG(D_INFO, "ref del "DFID": rc = %d\n", + PFID(&dt->do_lu.lo_header->loh_fid), rc); + } + + return rc; +} + +/** + * Add OUT_REF_ADD sub-request into the OUT RPC. + * + * \param[in] env execution environment + * \param[in] dt object on which to increase the reference count. + * \param[in] th the transaction handle. + * + * \retval 0 if the insertion succeeds. + * \retval negative errno if the insertion fails. + */ +static int __osp_md_ref_add(const struct lu_env *env, struct dt_object *dt, + struct thandle *th) +{ + struct dt_update_request *update; + int rc; + + update = thandle_to_dt_update_request(th); + LASSERT(update != NULL); + + rc = out_ref_add_pack(env, &update->dur_buf, + lu_object_fid(&dt->do_lu), + update->dur_batchid); + + return rc; } /** * Implementation of dt_object_operations::do_declare_ref_del * - * Declare increasing the reference count of the remote object, - * i.e. insert increasing object reference count update into RPC. + * For non-remote transaction, it will add an OUT_REF_ADD sub-request + * into the OUT RPC that will be flushed when the transaction start. * * \param[in] env execution environment * \param[in] dt object on which to increase the reference count. @@ -274,21 +346,14 @@ static int osp_md_object_ref_del(const struct lu_env *env, static int osp_md_declare_ref_add(const struct lu_env *env, struct dt_object *dt, struct thandle *th) { - struct dt_update_request *update; - struct lu_fid *fid; - int rc; - - update = out_find_create_update_loc(th, dt); - if (IS_ERR(update)) { - CERROR("%s: Get OSP update buf failed: rc = %d\n", - dt->do_lu.lo_dev->ld_obd->obd_name, - (int)PTR_ERR(update)); - return PTR_ERR(update); - } + int rc = 0; - fid = (struct lu_fid *)lu_object_fid(&dt->do_lu); + if (!is_only_remote_trans(th)) { + rc = __osp_md_ref_add(env, dt, th); - rc = out_insert_update(env, update, OUT_REF_ADD, fid, 0, NULL, NULL); + CDEBUG(D_INFO, "declare ref add "DFID": rc = %d\n", + PFID(&dt->do_lu.lo_header->loh_fid), rc); + } return rc; } @@ -296,9 +361,8 @@ static int osp_md_declare_ref_add(const struct lu_env *env, /** * Implementation of dt_object_operations::do_ref_add * - * Do nothing in this method for now. In DNE phase I, remote updates are - * actually executed during transaction start, i.e. the object reference - * count has already been increased when calling this method. + * For remote transaction, it will add an OUT_REF_ADD sub-request into + * the OUT RPC that will be flushed when the transaction stop. * * \param[in] env execution environment * \param[in] dt object on which to increase the reference count @@ -306,13 +370,19 @@ static int osp_md_declare_ref_add(const struct lu_env *env, * * \retval only return 0 for now */ -static int osp_md_object_ref_add(const struct lu_env *env, struct dt_object *dt, - struct thandle *th) +static int osp_md_ref_add(const struct lu_env *env, struct dt_object *dt, + struct thandle *th) { - CDEBUG(D_INFO, "ref add object "DFID"\n", - PFID(&dt->do_lu.lo_header->loh_fid)); + int rc = 0; - return 0; + if (is_only_remote_trans(th)) { + rc = __osp_md_ref_add(env, dt, th); + + CDEBUG(D_INFO, "ref add "DFID": rc = %d\n", + PFID(&dt->do_lu.lo_header->loh_fid), rc); + } + + return rc; } /** @@ -342,10 +412,7 @@ static void osp_md_ah_init(const struct lu_env *env, } /** - * Implementation of dt_object_operations::do_declare_attr_get - * - * Declare setting attributes of the remote object, i.e. insert remote - * object attr_set update into RPC. + * Add OUT_ATTR_SET sub-request into the OUT RPC. * * \param[in] env execution environment * \param[in] dt object on which to set attributes @@ -355,34 +422,49 @@ static void osp_md_ah_init(const struct lu_env *env, * \retval 0 if the insertion succeeds. * \retval negative errno if the insertion fails. */ -int osp_md_declare_attr_set(const struct lu_env *env, struct dt_object *dt, - const struct lu_attr *attr, struct thandle *th) +int __osp_md_attr_set(const struct lu_env *env, struct dt_object *dt, + const struct lu_attr *attr, struct thandle *th) { - struct osp_thread_info *osi = osp_env_info(env); struct dt_update_request *update; - struct lu_fid *fid; - int size = sizeof(struct obdo); - char *buf; int rc; - update = out_find_create_update_loc(th, dt); - if (IS_ERR(update)) { - CERROR("%s: Get OSP update buf failed: %d\n", - dt->do_lu.lo_dev->ld_obd->obd_name, - (int)PTR_ERR(update)); - return PTR_ERR(update); - } + update = thandle_to_dt_update_request(th); + LASSERT(update != NULL); - osi->osi_obdo.o_valid = 0; - obdo_from_la(&osi->osi_obdo, (struct lu_attr *)attr, - attr->la_valid); - lustre_set_wire_obdo(NULL, &osi->osi_obdo, &osi->osi_obdo); + rc = out_attr_set_pack(env, &update->dur_buf, + lu_object_fid(&dt->do_lu), attr, + update->dur_batchid); - buf = (char *)&osi->osi_obdo; - fid = (struct lu_fid *)lu_object_fid(&dt->do_lu); + return rc; +} - rc = out_insert_update(env, update, OUT_ATTR_SET, fid, 1, &size, - (const char **)&buf); +/** + * Implementation of dt_object_operations::do_declare_attr_get + * + * Declare setting attributes to the specified remote object. + * + * If the transaction is a non-remote transaction, then add the OUT_ATTR_SET + * sub-request into the OUT RPC that will be flushed when the transaction start. + * + * \param[in] env execution environment + * \param[in] dt object on which to set attributes + * \param[in] attr attributes to be set + * \param[in] th the transaction handle + * + * \retval 0 if the insertion succeeds. + * \retval negative errno if the insertion fails. + */ +int osp_md_declare_attr_set(const struct lu_env *env, struct dt_object *dt, + const struct lu_attr *attr, struct thandle *th) +{ + int rc = 0; + + if (!is_only_remote_trans(th)) { + rc = __osp_md_attr_set(env, dt, attr, th); + + CDEBUG(D_INFO, "declare attr set md_object "DFID": rc = %d\n", + PFID(&dt->do_lu.lo_header->loh_fid), rc); + } return rc; } @@ -390,26 +472,31 @@ int osp_md_declare_attr_set(const struct lu_env *env, struct dt_object *dt, /** * Implementation of dt_object_operations::do_attr_set * - * Do nothing in this method for now. In DNE phase I, remote updates - * are actually executed during transaction start, i.e. object attributes - * have already been set when calling this method. + * Set attributes to the specified remote object. + * + * If the transaction is a remote transaction, then add the OUT_ATTR_SET + * sub-request into the OUT RPC that will be flushed when the transaction stop. * * \param[in] env execution environment * \param[in] dt object to set attributes * \param[in] attr attributes to be set * \param[in] th the transaction handle - * \param[in] capa capability of setting attributes (not yet implemented). * * \retval only return 0 for now */ int osp_md_attr_set(const struct lu_env *env, struct dt_object *dt, - const struct lu_attr *attr, struct thandle *th, - struct lustre_capa *capa) + const struct lu_attr *attr, struct thandle *th) { - CDEBUG(D_INFO, "attr set object "DFID"\n", - PFID(&dt->do_lu.lo_header->loh_fid)); + int rc = 0; + + if (is_only_remote_trans(th)) { + rc = __osp_md_attr_set(env, dt, attr, th); + + CDEBUG(D_INFO, "attr set md_object "DFID": rc = %d\n", + PFID(&dt->do_lu.lo_header->loh_fid), rc); + } - RETURN(0); + return rc; } /** @@ -515,14 +602,12 @@ static int osp_md_object_write_locked(const struct lu_env *env, * \param[in] dt index object to lookup * \param[out] rec record in which to return lookup result * \param[in] key key of index which will be looked up - * \param[in] capa capability of lookup (not yet implemented) * * \retval 1 if the lookup succeeds. * \retval negative errno if the lookup fails. */ static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt, - struct dt_rec *rec, const struct dt_key *key, - struct lustre_capa *capa) + struct dt_rec *rec, const struct dt_key *key) { struct lu_buf *lbuf = &osp_env_info(env)->osi_lb2; struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev); @@ -530,7 +615,6 @@ static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt, struct dt_update_request *update; struct object_update_reply *reply; struct ptlrpc_request *req = NULL; - int size = strlen((char *)key) + 1; struct lu_fid *fid; int rc; ENTRY; @@ -539,20 +623,19 @@ static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt, * just create an update buffer, instead of attaching the * update_remote list of the thandle. */ - update = out_create_update_req(dt_dev); + update = dt_update_request_create(dt_dev); if (IS_ERR(update)) RETURN(PTR_ERR(update)); - rc = out_insert_update(env, update, OUT_INDEX_LOOKUP, - lu_object_fid(&dt->do_lu), - 1, &size, (const char **)&key); - if (rc) { + rc = out_index_lookup_pack(env, &update->dur_buf, + lu_object_fid(&dt->do_lu), rec, key); + if (rc != 0) { CERROR("%s: Insert update error: rc = %d\n", dt_dev->dd_lu_dev.ld_obd->obd_name, rc); GOTO(out, rc); } - rc = out_remote_sync(env, osp->opd_obd->u.cli.cl_import, update, &req); + rc = osp_remote_sync(env, osp, update, &req, false); if (rc < 0) GOTO(out, rc); @@ -596,16 +679,13 @@ out: if (req != NULL) ptlrpc_req_finished(req); - out_destroy_update_req(update); + dt_update_request_destroy(update); return rc; } /** - * Implementation of dt_index_operations::dio_declare_insert - * - * Declare the index insert of the remote object, i.e. pack index insert update - * into the RPC, which will be sent during transaction start. + * Add OUT_INDEX_INSERT sub-request into the OUT RPC. * * \param[in] env execution environment * \param[in] dt object for which to insert index @@ -616,58 +696,78 @@ out: * \retval 0 if the insertion succeeds. * \retval negative errno if the insertion fails. */ -static int osp_md_declare_insert(const struct lu_env *env, +static int __osp_md_index_insert(const struct lu_env *env, struct dt_object *dt, const struct dt_rec *rec, const struct dt_key *key, struct thandle *th) { - struct osp_thread_info *info = osp_env_info(env); - struct dt_update_request *update; - struct dt_insert_rec *rec1 = (struct dt_insert_rec *)rec; - struct lu_fid *fid = - (struct lu_fid *)lu_object_fid(&dt->do_lu); - struct lu_fid *rec_fid = &info->osi_fid; - __u32 type = cpu_to_le32(rec1->rec_type); - int size[3] = { strlen((char *)key) + 1, - sizeof(*rec_fid), - sizeof(type) }; - const char *bufs[3] = { (char *)key, - (char *)rec_fid, - (char *)&type }; - int rc; - - update = out_find_create_update_loc(th, dt); - if (IS_ERR(update)) { - CERROR("%s: Get OSP update buf failed: rc = %d\n", - dt->do_lu.lo_dev->ld_obd->obd_name, - (int)PTR_ERR(update)); - return PTR_ERR(update); - } + struct osp_thandle *oth = thandle_to_osp_thandle(th); + struct dt_update_request *update = oth->ot_dur; + int rc; + - CDEBUG(D_INFO, "%s: insert index of "DFID" %s: "DFID", %u\n", - dt->do_lu.lo_dev->ld_obd->obd_name, - PFID(fid), (char *)key, PFID(rec1->rec_fid), rec1->rec_type); + rc = out_index_insert_pack(env, &update->dur_buf, + lu_object_fid(&dt->do_lu), rec, key, + update->dur_batchid); + if (rc != 0) + return rc; + + /* Before async update is allowed, if it will insert remote + * name entry, it should make sure the local object is created, + * i.e. the remote update RPC should be sent after local + * update(create object) */ + oth->ot_send_updates_after_local_trans = true; + + return rc; +} + +/** + * Implementation of dt_index_operations::dio_declare_insert + * + * For non-remote transaction, it will add an OUT_INDEX_INSERT sub-request + * into the OUT RPC that will be flushed when the transaction start. + * + * \param[in] env execution environment + * \param[in] dt object for which to insert index + * \param[in] rec record of the index which will be inserted + * \param[in] key key of the index which will be inserted + * \param[in] th the transaction handle + * + * \retval 0 if the insertion succeeds. + * \retval negative errno if the insertion fails. + */ +static int osp_md_declare_index_insert(const struct lu_env *env, + struct dt_object *dt, + const struct dt_rec *rec, + const struct dt_key *key, + struct thandle *th) +{ + int rc = 0; + + if (!is_only_remote_trans(th)) { + rc = __osp_md_index_insert(env, dt, rec, key, th); + + CDEBUG(D_INFO, "declare index insert "DFID" key %s, rec "DFID + ": rc = %d\n", PFID(&dt->do_lu.lo_header->loh_fid), + (char *)key, + PFID(((struct dt_insert_rec *)rec)->rec_fid), rc); + } - fid_cpu_to_le(rec_fid, rec1->rec_fid); - rc = out_insert_update(env, update, OUT_INDEX_INSERT, fid, - ARRAY_SIZE(size), size, bufs); return rc; } /** * Implementation of dt_index_operations::dio_insert * - * Do nothing in this method for now. In DNE phase I, remote updates - * are actually executed during transaction start, i.e. the index has - * already been inserted when calling this method. + * For remote transaction, it will add an OUT_INDEX_INSERT sub-request + * into the OUT RPC that will be flushed when the transaction stop. * * \param[in] env execution environment * \param[in] dt object for which to insert index * \param[in] rec record of the index to be inserted * \param[in] key key of the index to be inserted * \param[in] th the transaction handle - * \param[in] capa capability of insert (not yet implemented) * \param[in] ignore_quota quota enforcement for insert * * \retval only return 0 for now @@ -677,17 +777,24 @@ static int osp_md_index_insert(const struct lu_env *env, const struct dt_rec *rec, const struct dt_key *key, struct thandle *th, - struct lustre_capa *capa, int ignore_quota) { - return 0; + int rc = 0; + + if (is_only_remote_trans(th)) { + rc = __osp_md_index_insert(env, dt, rec, key, th); + + CDEBUG(D_INFO, "index insert "DFID" key %s, rec "DFID + ": rc = %d\n", PFID(&dt->do_lu.lo_header->loh_fid), + (char *)key, + PFID(((struct dt_insert_rec *)rec)->rec_fid), rc); + } + + return rc; } /** - * Implementation of dt_index_operations::dio_declare_delete - * - * Declare the index delete of the remote object, i.e. insert index delete - * update into the RPC, which will be sent during transaction start. + * Add OUT_INDEX_DELETE sub-request into the OUT RPC. * * \param[in] env execution environment * \param[in] dt object for which to delete index @@ -697,28 +804,50 @@ static int osp_md_index_insert(const struct lu_env *env, * \retval 0 if the insertion succeeds. * \retval negative errno if the insertion fails. */ -static int osp_md_declare_delete(const struct lu_env *env, +static int __osp_md_index_delete(const struct lu_env *env, struct dt_object *dt, const struct dt_key *key, struct thandle *th) { struct dt_update_request *update; - struct lu_fid *fid; - int size = strlen((char *)key) + 1; - int rc; + int rc; - update = out_find_create_update_loc(th, dt); - if (IS_ERR(update)) { - CERROR("%s: Get OSP update buf failed: rc = %d\n", - dt->do_lu.lo_dev->ld_obd->obd_name, - (int)PTR_ERR(update)); - return PTR_ERR(update); - } + update = thandle_to_dt_update_request(th); + LASSERT(update != NULL); - fid = (struct lu_fid *)lu_object_fid(&dt->do_lu); + rc = out_index_delete_pack(env, &update->dur_buf, + lu_object_fid(&dt->do_lu), key, + update->dur_batchid); + return rc; +} + +/** + * Implementation of dt_index_operations::dio_declare_delete + * + * For non-remote transaction, it will add an OUT_INDEX_DELETE sub-request + * into the OUT RPC that will be flushed when the transaction start. + * + * \param[in] env execution environment + * \param[in] dt object for which to delete index + * \param[in] key key of the index + * \param[in] th the transaction handle + * + * \retval 0 if the insertion succeeds. + * \retval negative errno if the insertion fails. + */ +static int osp_md_declare_index_delete(const struct lu_env *env, + struct dt_object *dt, + const struct dt_key *key, + struct thandle *th) +{ + int rc = 0; - rc = out_insert_update(env, update, OUT_INDEX_DELETE, fid, 1, &size, - (const char **)&key); + if (!is_only_remote_trans(th)) { + rc = __osp_md_index_delete(env, dt, key, th); + + CDEBUG(D_INFO, "declare index delete "DFID" %s: rc = %d\n", + PFID(&dt->do_lu.lo_header->loh_fid), (char *)key, rc); + } return rc; } @@ -726,28 +855,31 @@ static int osp_md_declare_delete(const struct lu_env *env, /** * Implementation of dt_index_operations::dio_delete * - * Do nothing in this method for now. Because in DNE phase I, remote updates - * are actually executed during transaction start, i.e. the index has already - * been deleted when calling this method. + * For remote transaction, it will add an OUT_INDEX_DELETE sub-request + * into the OUT RPC that will be flushed when the transaction stop. * * \param[in] env execution environment * \param[in] dt object for which to delete index * \param[in] key key of the index which will be deleted * \param[in] th the transaction handle - * \param[in] capa capability of delete (not yet implemented) * * \retval only return 0 for now */ static int osp_md_index_delete(const struct lu_env *env, struct dt_object *dt, const struct dt_key *key, - struct thandle *th, - struct lustre_capa *capa) + struct thandle *th) { - CDEBUG(D_INFO, "index delete "DFID" %s\n", - PFID(&dt->do_lu.lo_header->loh_fid), (char *)key); + int rc = 0; - return 0; + if (is_only_remote_trans(th)) { + rc = __osp_md_index_delete(env, dt, key, th); + + CDEBUG(D_INFO, "index delete "DFID" %s: rc = %d\n", + PFID(&dt->do_lu.lo_header->loh_fid), (char *)key, rc); + } + + return rc; } /** @@ -764,7 +896,7 @@ static int osp_md_index_delete(const struct lu_env *env, * \retval 1 if it reaches to the end of the index object. * \retval negative errno if the pointer cannot be advanced. */ -int osp_md_index_it_next(const struct lu_env *env, struct dt_it *di) +static int osp_md_index_it_next(const struct lu_env *env, struct dt_it *di) { struct osp_it *it = (struct osp_it *)di; struct lu_idxpage *idxpage; @@ -863,7 +995,7 @@ static int osp_md_index_it_rec(const struct lu_env *env, const struct dt_it *di, { struct osp_it *it = (struct osp_it *)di; struct lu_dirent *ent = (struct lu_dirent *)it->ooi_ent; - int reclen; + size_t reclen; reclen = lu_dirent_calc_size(le16_to_cpu(ent->lde_namelen), attr); memcpy(rec, ent, reclen); @@ -903,9 +1035,9 @@ static int osp_it_load(const struct lu_env *env, const struct dt_it *di, const struct dt_index_operations osp_md_index_ops = { .dio_lookup = osp_md_index_lookup, - .dio_declare_insert = osp_md_declare_insert, + .dio_declare_insert = osp_md_declare_index_insert, .dio_insert = osp_md_index_insert, - .dio_declare_delete = osp_md_declare_delete, + .dio_declare_delete = osp_md_declare_index_delete, .dio_delete = osp_md_index_delete, .dio_it = { .init = osp_it_init, @@ -1030,9 +1162,9 @@ struct dt_object_operations osp_md_obj_ops = { .do_declare_create = osp_md_declare_object_create, .do_create = osp_md_object_create, .do_declare_ref_add = osp_md_declare_ref_add, - .do_ref_add = osp_md_object_ref_add, - .do_declare_ref_del = osp_md_declare_object_ref_del, - .do_ref_del = osp_md_object_ref_del, + .do_ref_add = osp_md_ref_add, + .do_declare_ref_del = osp_md_declare_ref_del, + .do_ref_del = osp_md_ref_del, .do_declare_destroy = osp_declare_object_destroy, .do_destroy = osp_object_destroy, .do_ah_init = osp_md_ah_init, @@ -1070,25 +1202,13 @@ static ssize_t osp_md_declare_write(const struct lu_env *env, loff_t pos, struct thandle *th) { struct dt_update_request *update; - struct lu_fid *fid; - int sizes[2] = {buf->lb_len, sizeof(pos)}; - const char *bufs[2] = {(char *)buf->lb_buf, - (char *)&pos}; ssize_t rc; - update = out_find_create_update_loc(th, dt); - if (IS_ERR(update)) { - CERROR("%s: Get OSP update buf failed: rc = %d\n", - dt->do_lu.lo_dev->ld_obd->obd_name, - (int)PTR_ERR(update)); - return PTR_ERR(update); - } + update = thandle_to_dt_update_request(th); + LASSERT(update != NULL); - pos = cpu_to_le64(pos); - bufs[1] = (char *)&pos; - fid = (struct lu_fid *)lu_object_fid(&dt->do_lu); - rc = out_insert_update(env, update, OUT_WRITE, fid, - ARRAY_SIZE(sizes), sizes, bufs); + rc = out_write_pack(env, &update->dur_buf, lu_object_fid(&dt->do_lu), + buf, pos, update->dur_batchid); return rc; @@ -1106,16 +1226,15 @@ static ssize_t osp_md_declare_write(const struct lu_env *env, * \param[in] buf buffer to write which includes an embedded size field * \param[in] pos offet in the object to start writing at * \param[in] th transaction handle - * \param[in] capa capability of the write (not yet implemented) * \param[in] ignore_quota quota enforcement for this write * * \retval the buffer size in bytes. */ static ssize_t osp_md_write(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, loff_t *pos, - struct thandle *handle, - struct lustre_capa *capa, int ignore_quota) + struct thandle *handle, int ignore_quota) { + *pos += buf->lb_len; return buf->lb_len; }