From: Wang Di Date: Fri, 20 Jun 2014 12:57:54 +0000 (-0700) Subject: LU-3534 osp: move RPC pack from declare to execution phase X-Git-Tag: 2.7.52~5 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=de8572645d287d17c409b99dabdf176822d91486;hp=3a36f39a86d6b9984edbeb08dbd74bebc3b579ee LU-3534 osp: move RPC pack from declare to execution phase 1. Since we will have full aysnc update support, i.e. do not need order update anymore, move RPC pack from declare to execution phase, and these remote updates will be sent during transaction stop after local transaction is stopped. 2. Add update callback for each update, so after the update request is done, these callback will be called correspondently for each update. 3. Remove tu_sent_after_local_trans, because every remote transaction will be sent after local transaction is finished, in lod_trans_stop()->osp_trans_stop(). Note: RPC should be sent after local transaction is stopped, to avoid sending RPC while holding transaction. Change-Id: I296e2eaf8b922fe58fe88df074458545f102a69a Signed-off-by: Wang Di Reviewed-on: http://review.whamcloud.com/10794 Tested-by: Jenkins Reviewed-by: Alex Zhuravlev Tested-by: Maloo Reviewed-by: Lai Siyao Reviewed-by: Oleg Drokin --- diff --git a/lustre/include/lustre_fid.h b/lustre/include/lustre_fid.h index f9aa2c1..f325849 100644 --- a/lustre/include/lustre_fid.h +++ b/lustre/include/lustre_fid.h @@ -236,7 +236,9 @@ enum local_oid { MDD_LOV_OBJ_OSEQ = 4121UL, LFSCK_NAMESPACE_OID = 4122UL, REMOTE_PARENT_DIR_OID = 4123UL, - SLAVE_LLOG_CATALOGS_OID = 4124UL, + /* This definition is obsolete + * SLAVE_LLOG_CATALOGS_OID = 4124UL, + */ }; static inline void lu_local_obj_fid(struct lu_fid *fid, __u32 oid) diff --git a/lustre/include/lustre_update.h b/lustre/include/lustre_update.h index 861d631..3d941b9 100644 --- a/lustre/include/lustre_update.h +++ b/lustre/include/lustre_update.h @@ -182,9 +182,6 @@ static inline void update_inc_batchid(struct dt_update_request *update) } /* target/out_lib.c */ -void dt_update_request_destroy(struct dt_update_request *update); -struct dt_update_request *dt_update_request_create(struct dt_device *dt); - int out_update_pack(const struct lu_env *env, struct update_buffer *ubuf, enum update_type op, const struct lu_fid *fid, int params_count, __u16 *param_sizes, const void **bufs, diff --git a/lustre/lod/lod_object.c b/lustre/lod/lod_object.c index ad146e6..b6c9a4f 100644 --- a/lustre/lod/lod_object.c +++ b/lustre/lod/lod_object.c @@ -2732,6 +2732,14 @@ static int lod_xattr_set(const struct lu_env *env, rc = lod_sub_object_xattr_set(env, next, buf, name, fl, th); + } else if (dt_object_remote(dt)) { + /* This only happens during migration, see + * mdd_migrate_create(), in which Master MDT will + * create a remote target object, and only set + * (migrating) stripe EA on the remote object, + * and does not need creating each stripes. */ + rc = lod_sub_object_xattr_set(env, next, buf, name, + fl, th); } else { rc = lod_striping_create(env, dt, NULL, NULL, th); } diff --git a/lustre/mdd/mdd_dir.c b/lustre/mdd/mdd_dir.c index 92b3e8f..24ef562 100644 --- a/lustre/mdd/mdd_dir.c +++ b/lustre/mdd/mdd_dir.c @@ -3603,13 +3603,6 @@ static int mdd_migrate_entries(const struct lu_env *env, name, handle); if (rc != 0) GOTO(out_put, rc); - - if (is_dir) { - rc = mdo_ref_add(env, mdd_tobj, handle); - if (rc != 0) - GOTO(out_put, rc); - - } } rc = __mdd_index_delete(env, mdd_sobj, name, is_dir, handle); @@ -3724,6 +3717,10 @@ static int mdd_declare_migrate_update_name(const struct lu_env *env, if (rc != 0) return rc; + rc = mdd_declare_links_add(env, mdd_tobj, handle, NULL, MLAO_IGNORE); + if (rc != 0) + return rc; + if (S_ISDIR(mdd_object_type(mdd_sobj))) { rc = mdo_declare_ref_add(env, mdd_pobj, handle); if (rc != 0) diff --git a/lustre/mdd/mdd_object.c b/lustre/mdd/mdd_object.c index 88d2e67..0bfd37d 100644 --- a/lustre/mdd/mdd_object.c +++ b/lustre/mdd/mdd_object.c @@ -366,8 +366,6 @@ int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p, rc = mdo_create_obj(env, c, attr, hint, dof, handle); - LASSERT(ergo(rc == 0, mdd_object_exists(c))); - RETURN(rc); } diff --git a/lustre/osd-ldiskfs/osd_scrub.c b/lustre/osd-ldiskfs/osd_scrub.c index 435c0fc..dca0776 100644 --- a/lustre/osd-ldiskfs/osd_scrub.c +++ b/lustre/osd-ldiskfs/osd_scrub.c @@ -1653,10 +1653,6 @@ static const struct osd_lf_map osd_lf_maps[] = { { "LAST_GROUP", { FID_SEQ_LOCAL_FILE, OFD_LAST_GROUP_OID, 0 }, OLF_SHOW_NAME, sizeof("LAST_GROUP") - 1, NULL, NULL }, - /* SLAVE_LOG, llog for destroy slave stripes of striped dir */ - { "SLAVE_LOG", { FID_SEQ_LOCAL_FILE, SLAVE_LLOG_CATALOGS_OID, 0 }, - OLF_SHOW_NAME, sizeof("SLAVE_LOG") - 1, NULL, NULL }, - /* lost+found */ { "lost+found", { FID_SEQ_LOCAL_FILE, OSD_LPF_OID, 0 }, OLF_SCAN_SUBITEMS, sizeof("lost+found") - 1, diff --git a/lustre/osp/osp_dev.c b/lustre/osp/osp_dev.c index db105c4..445dfd2 100644 --- a/lustre/osp/osp_dev.c +++ b/lustre/osp/osp_dev.c @@ -501,9 +501,10 @@ static int osp_shutdown(const struct lu_env *env, struct osp_device *d) rc = osp_disconnect(d); - osp_sync_fini(d); - if (!d->opd_connect_mdt) { + /* stop sync thread */ + osp_sync_fini(d); + /* stop precreate thread */ osp_precreate_fini(d); @@ -689,6 +690,10 @@ static int osp_sync(const struct lu_env *env, struct dt_device *dev) unsigned long start = cfs_time_current(); ENTRY; + /* No Sync between MDTs yet. */ + if (d->opd_connect_mdt) + RETURN(0); + if (unlikely(d->opd_imp_active == 0)) RETURN(-ENOTCONN); @@ -1046,16 +1051,16 @@ static int osp_init0(const struct lu_env *env, struct osp_device *osp, rc = osp_init_precreate(osp); if (rc) GOTO(out_last_used, rc); - } - /* - * Initialize synhronization mechanism taking - * care of propogating changes to OST in near - * transactional manner. - */ - rc = osp_sync_init(env, osp); - if (rc) - GOTO(out_precreat, rc); + /* + * Initialize synhronization mechanism taking + * care of propogating changes to OST in near + * transactional manner. + */ + rc = osp_sync_init(env, osp); + if (rc < 0) + GOTO(out_precreat, rc); + } /* * Initiate connect to OST @@ -1073,8 +1078,9 @@ static int osp_init0(const struct lu_env *env, struct osp_device *osp, RETURN(0); out: - /* stop sync thread */ - osp_sync_fini(osp); + if (!osp->opd_connect_mdt) + /* stop sync thread */ + osp_sync_fini(osp); out_precreat: /* stop precreate thread */ if (!osp->opd_connect_mdt) diff --git a/lustre/osp/osp_internal.h b/lustre/osp/osp_internal.h index f5fd23d..fd2d41d 100644 --- a/lustre/osp/osp_internal.h +++ b/lustre/osp/osp_internal.h @@ -299,7 +299,6 @@ struct osp_it { struct osp_thandle { struct thandle ot_super; struct dt_update_request *ot_dur; - bool ot_send_updates_after_local_trans:1; /* OSP will use this thandle to update last oid*/ struct thandle *ot_storage_th; @@ -536,11 +535,11 @@ static inline int osp_is_fid_client(struct osp_device *osp) return imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_FID; } -typedef int (*osp_async_request_interpreter_t)(const struct lu_env *env, - struct object_update_reply *rep, - struct ptlrpc_request *req, - struct osp_object *obj, - void *data, int index, int rc); +typedef int (*osp_update_interpreter_t)(const struct lu_env *env, + struct object_update_reply *rep, + struct ptlrpc_request *req, + struct osp_object *obj, + void *data, int index, int rc); /* osp_dev.c */ void osp_update_last_id(struct osp_device *d, u64 objid); @@ -550,21 +549,32 @@ extern struct llog_operations osp_mds_ost_orig_logops; int osp_insert_async_request(const struct lu_env *env, enum update_type op, struct osp_object *obj, int count, __u16 *lens, const void **bufs, void *data, - osp_async_request_interpreter_t interpreter); + osp_update_interpreter_t interpreter); + int osp_unplug_async_request(const struct lu_env *env, struct osp_device *osp, struct dt_update_request *update); +int osp_trans_update_request_create(struct thandle *th); struct thandle *osp_trans_create(const struct lu_env *env, struct dt_device *d); int osp_trans_start(const struct lu_env *env, struct dt_device *dt, struct thandle *th); +int osp_insert_update_callback(const struct lu_env *env, + struct dt_update_request *update, + struct osp_object *obj, void *data, + osp_update_interpreter_t interpreter); +int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp, + const struct object_update_request *ureq, + struct ptlrpc_request **reqp); +struct dt_update_request *dt_update_request_create(struct dt_device *dt); +void dt_update_request_destroy(struct dt_update_request *dt_update); int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp, const struct object_update_request *ureq, struct ptlrpc_request **reqp); int osp_remote_sync(const struct lu_env *env, struct osp_device *osp, struct dt_update_request *update, - struct ptlrpc_request **reqp, bool rpc_lock); + struct ptlrpc_request **reqp); struct thandle *osp_get_storage_thandle(const struct lu_env *env, struct thandle *th, @@ -585,11 +595,6 @@ int osp_declare_xattr_del(const struct lu_env *env, struct dt_object *dt, int osp_xattr_del(const struct lu_env *env, struct dt_object *dt, const char *name, struct thandle *th); -int osp_declare_object_destroy(const struct lu_env *env, - struct dt_object *dt, struct thandle *th); -int osp_object_destroy(const struct lu_env *env, struct dt_object *dt, - struct thandle *th); - int osp_trans_stop(const struct lu_env *env, struct dt_device *dt, struct thandle *th); @@ -613,8 +618,10 @@ int osp_md_declare_object_create(const struct lu_env *env, int osp_md_object_create(const struct lu_env *env, struct dt_object *dt, struct lu_attr *attr, struct dt_allocation_hint *hint, struct dt_object_format *dof, struct thandle *th); -int __osp_md_attr_set(const struct lu_env *env, struct dt_object *dt, - const struct lu_attr *attr, struct thandle *th); +int osp_md_declare_attr_set(const struct lu_env *env, struct dt_object *dt, + const struct lu_attr *attr, struct thandle *th); +int osp_md_attr_set(const struct lu_env *env, struct dt_object *dt, + const struct lu_attr *attr, struct thandle *th); extern const struct dt_index_operations osp_md_index_ops; /* osp_precreate.c */ diff --git a/lustre/osp/osp_md_object.c b/lustre/osp/osp_md_object.c index a587b57..fa86222 100644 --- a/lustre/osp/osp_md_object.c +++ b/lustre/osp/osp_md_object.c @@ -60,97 +60,40 @@ static const char dot[] = "."; static const char dotdot[] = ".."; /** - * Add OUT_CREATE sub-request into the OUT RPC. + * Interpreter call for object creation * - * Note: if the object has already been created, we must add object - * destroy sub-request ahead of the create, so it will destroy then - * re-create the object. + * Object creation interpreter, which will be called after creating + * the remote object to set flags and status. * * \param[in] env execution environment - * \param[in] dt object to be created - * \param[in] attr attribute of the created object - * \param[in] hint creation hint - * \param[in] dof creation format information - * \param[in] th the transaction handle + * \param[in] reply update reply + * \param[in] req ptlrpc update request for creating object + * \param[in] obj object to be created + * \param[in] data data used in this function. + * \param[in] index index(position) of create update in the whole + * updates + * \param[in] rc update result on the remote MDT. * * \retval only return 0 for now */ -static int __osp_md_declare_object_create(const struct lu_env *env, - struct dt_object *dt, - struct lu_attr *attr, - struct dt_allocation_hint *hint, - struct dt_object_format *dof, - struct thandle *th) +static int osp_object_create_interpreter(const struct lu_env *env, + struct object_update_reply *reply, + struct ptlrpc_request *req, + struct osp_object *obj, + void *data, int index, int rc) { - struct dt_update_request *update; - int rc; - - update = thandle_to_dt_update_request(th); - LASSERT(update != NULL); - - if (lu_object_exists(&dt->do_lu)) { - /* If the object already exists, we needs to destroy - * this orphan object first. - * - * The scenario might happen in this case - * - * 1. client send remote create to MDT0. - * 2. MDT0 send create update to MDT1. - * 3. MDT1 finished create synchronously. - * 4. MDT0 failed and reboot. - * 5. client resend remote create to MDT0. - * 6. MDT0 tries to resend create update to MDT1, - * but find the object already exists - */ - CDEBUG(D_HA, "%s: object "DFID" exists, destroy this orphan\n", - dt->do_lu.lo_dev->ld_obd->obd_name, - PFID(lu_object_fid(&dt->do_lu))); - - rc = out_ref_del_pack(env, &update->dur_buf, - lu_object_fid(&dt->do_lu), - update->dur_batchid); - if (rc != 0) - GOTO(out, rc); - - if (S_ISDIR(lu_object_attr(&dt->do_lu))) { - /* decrease for ".." */ - rc = out_ref_del_pack(env, &update->dur_buf, - lu_object_fid(&dt->do_lu), - update->dur_batchid); - if (rc != 0) - GOTO(out, rc); - } - - rc = out_object_destroy_pack(env, &update->dur_buf, - lu_object_fid(&dt->do_lu), - update->dur_batchid); - if (rc != 0) - GOTO(out, rc); - - dt->do_lu.lo_header->loh_attr &= ~LOHA_EXISTS; - /* Increase batchid to add this orphan object deletion - * to separate transaction */ - update_inc_batchid(update); + if (rc != 0) { + obj->opo_obj.do_lu.lo_header->loh_attr &= ~LOHA_EXISTS; + obj->opo_non_exist = 1; } - - rc = out_create_pack(env, &update->dur_buf, - lu_object_fid(&dt->do_lu), attr, hint, dof, - update->dur_batchid); - if (rc != 0) - GOTO(out, rc); -out: - if (rc) - CERROR("%s: Insert update error: rc = %d\n", - dt->do_lu.lo_dev->ld_obd->obd_name, rc); - - return rc; + return 0; } /** * Implementation of dt_object_operations::do_declare_create * - * For non-remote transaction, it will add an OUT_CREATE sub-request - * into the OUT RPC that will be flushed when the transaction start. + * Create the dt_update_request to track the update for this OSP + * in the transaction. * * \param[in] env execution environment * \param[in] dt remote object to be created @@ -159,8 +102,8 @@ out: * \param[in] dof creation format information * \param[in] th the transaction handle * - * \retval 0 if the insertion succeeds. - * \retval negative errno if the insertion fails. + * \retval 0 if preparation succeeds. + * \retval negative errno if preparation fails. */ int osp_md_declare_object_create(const struct lu_env *env, struct dt_object *dt, @@ -169,28 +112,14 @@ int osp_md_declare_object_create(const struct lu_env *env, struct dt_object_format *dof, struct thandle *th) { - int rc = 0; - - if (!is_only_remote_trans(th)) { - rc = __osp_md_declare_object_create(env, dt, attr, hint, - dof, th); - - CDEBUG(D_INFO, "declare create md_object "DFID": rc = %d\n", - PFID(&dt->do_lu.lo_header->loh_fid), rc); - } - - return rc; + return osp_trans_update_request_create(th); } /** * Implementation of dt_object_operations::do_create * - * For remote transaction, it will add an OUT_CREATE sub-request into - * the OUT RPC that will be flushed when the transaction stop. - * - * It sets necessary flags for created object. In DNE phase I, - * remote updates are actually executed during transaction start, - * i.e. the object has already been created when calling this method. + * It adds an OUT_CREATE sub-request into the OUT RPC that will be flushed + * when the transaction stop, and sets necessary flags for created object. * * \param[in] env execution environment * \param[in] dt object to be created @@ -199,189 +128,128 @@ int osp_md_declare_object_create(const struct lu_env *env, * \param[in] dof creation format information * \param[in] th the transaction handle * - * \retval only return 0 for now + * \retval 0 if packing creation succeeds. + * \retval negative errno if packing creation fails. */ int osp_md_object_create(const struct lu_env *env, struct dt_object *dt, struct lu_attr *attr, struct dt_allocation_hint *hint, struct dt_object_format *dof, struct thandle *th) { - int rc = 0; - - if (is_only_remote_trans(th)) { - rc = __osp_md_declare_object_create(env, dt, attr, hint, - dof, th); - - CDEBUG(D_INFO, "create md_object "DFID": rc = %d\n", - PFID(&dt->do_lu.lo_header->loh_fid), rc); - } - - if (rc == 0) { - dt->do_lu.lo_header->loh_attr |= LOHA_EXISTS | - (attr->la_mode & S_IFMT); - dt2osp_obj(dt)->opo_non_exist = 0; - } - - return rc; -} - -/** - * Add OUT_REF_DEL sub-request into the OUT RPC. - * - * \param[in] env execution environment - * \param[in] dt object to decrease the reference count. - * \param[in] th the transaction handle of refcount decrease. - * - * \retval 0 if the insertion succeeds. - * \retval negative errno if the insertion fails. - */ -static int __osp_md_ref_del(const struct lu_env *env, struct dt_object *dt, - struct thandle *th) -{ struct dt_update_request *update; int rc; update = thandle_to_dt_update_request(th); LASSERT(update != NULL); - rc = out_ref_del_pack(env, &update->dur_buf, - lu_object_fid(&dt->do_lu), - update->dur_batchid); + rc = out_create_pack(env, &update->dur_buf, + lu_object_fid(&dt->do_lu), attr, hint, dof, + update->dur_batchid); + if (rc != 0) + GOTO(out, rc); + + rc = osp_insert_update_callback(env, update, dt2osp_obj(dt), attr, + osp_object_create_interpreter); + + if (rc < 0) + GOTO(out, rc); + + dt->do_lu.lo_header->loh_attr |= LOHA_EXISTS | (attr->la_mode & S_IFMT); + dt2osp_obj(dt)->opo_non_exist = 0; +out: return rc; } /** * Implementation of dt_object_operations::do_declare_ref_del * - * For non-remote transaction, it will add an OUT_REF_DEL sub-request - * into the OUT RPC that will be flushed when the transaction start. + * Create the dt_update_request to track the update for this OSP + * in the transaction. * * \param[in] env execution environment * \param[in] dt object to decrease the reference count. * \param[in] th the transaction handle of refcount decrease. * - * \retval 0 if the insertion succeeds. - * \retval negative errno if the insertion fails. + * \retval 0 if preparation succeeds. + * \retval negative errno if preparation fails. */ static int osp_md_declare_ref_del(const struct lu_env *env, struct dt_object *dt, struct thandle *th) { - int rc = 0; - - if (!is_only_remote_trans(th)) { - rc = __osp_md_ref_del(env, dt, th); - - CDEBUG(D_INFO, "declare ref del "DFID": rc = %d\n", - PFID(&dt->do_lu.lo_header->loh_fid), rc); - } - - return rc; + return osp_trans_update_request_create(th); } /** * Implementation of dt_object_operations::do_ref_del * - * For remote transaction, it will add an OUT_REF_DEL sub-request into - * the OUT RPC that will be flushed when the transaction stop. + * Add an OUT_REF_DEL sub-request into the OUT RPC that will be + * flushed when the transaction stop. * * \param[in] env execution environment * \param[in] dt object to decrease the reference count * \param[in] th the transaction handle * - * \retval only return 0 for now + * \retval 0 if packing ref_del succeeds. + * \retval negative errno if packing fails. */ static int osp_md_ref_del(const struct lu_env *env, struct dt_object *dt, struct thandle *th) { - int rc = 0; - - if (is_only_remote_trans(th)) { - rc = __osp_md_ref_del(env, dt, th); - - CDEBUG(D_INFO, "ref del "DFID": rc = %d\n", - PFID(&dt->do_lu.lo_header->loh_fid), rc); - } - - return rc; -} - -/** - * Add OUT_REF_ADD sub-request into the OUT RPC. - * - * \param[in] env execution environment - * \param[in] dt object on which to increase the reference count. - * \param[in] th the transaction handle. - * - * \retval 0 if the insertion succeeds. - * \retval negative errno if the insertion fails. - */ -static int __osp_md_ref_add(const struct lu_env *env, struct dt_object *dt, - struct thandle *th) -{ struct dt_update_request *update; int rc; update = thandle_to_dt_update_request(th); LASSERT(update != NULL); - rc = out_ref_add_pack(env, &update->dur_buf, + rc = out_ref_del_pack(env, &update->dur_buf, lu_object_fid(&dt->do_lu), update->dur_batchid); - return rc; } /** * Implementation of dt_object_operations::do_declare_ref_del * - * For non-remote transaction, it will add an OUT_REF_ADD sub-request - * into the OUT RPC that will be flushed when the transaction start. + * Create the dt_update_request to track the update for this OSP + * in the transaction. * * \param[in] env execution environment * \param[in] dt object on which to increase the reference count. * \param[in] th the transaction handle. * - * \retval 0 if the insertion succeeds. - * \retval negative errno if the insertion fails. + * \retval 0 if preparation succeeds. + * \retval negative errno if preparation fails. */ static int osp_md_declare_ref_add(const struct lu_env *env, struct dt_object *dt, struct thandle *th) { - int rc = 0; - - if (!is_only_remote_trans(th)) { - rc = __osp_md_ref_add(env, dt, th); - - CDEBUG(D_INFO, "declare ref add "DFID": rc = %d\n", - PFID(&dt->do_lu.lo_header->loh_fid), rc); - } - - return rc; + return osp_trans_update_request_create(th); } /** * Implementation of dt_object_operations::do_ref_add * - * For remote transaction, it will add an OUT_REF_ADD sub-request into - * the OUT RPC that will be flushed when the transaction stop. + * Add an OUT_REF_ADD sub-request into the OUT RPC that will be flushed + * when the transaction stop. * * \param[in] env execution environment * \param[in] dt object on which to increase the reference count * \param[in] th the transaction handle * - * \retval only return 0 for now + * \retval 0 if packing ref_add succeeds. + * \retval negative errno if packing fails. */ static int osp_md_ref_add(const struct lu_env *env, struct dt_object *dt, struct thandle *th) { - int rc = 0; - - if (is_only_remote_trans(th)) { - rc = __osp_md_ref_add(env, dt, th); + struct dt_update_request *update; + int rc; - CDEBUG(D_INFO, "ref add "DFID": rc = %d\n", - PFID(&dt->do_lu.lo_header->loh_fid), rc); - } + update = thandle_to_dt_update_request(th); + LASSERT(update != NULL); + rc = out_ref_add_pack(env, &update->dur_buf, + lu_object_fid(&dt->do_lu), + update->dur_batchid); return rc; } @@ -412,61 +280,23 @@ static void osp_md_ah_init(const struct lu_env *env, } /** - * Add OUT_ATTR_SET sub-request into the OUT RPC. - * - * \param[in] env execution environment - * \param[in] dt object on which to set attributes - * \param[in] attr attributes to be set - * \param[in] th the transaction handle - * - * \retval 0 if the insertion succeeds. - * \retval negative errno if the insertion fails. - */ -int __osp_md_attr_set(const struct lu_env *env, struct dt_object *dt, - const struct lu_attr *attr, struct thandle *th) -{ - struct dt_update_request *update; - int rc; - - update = thandle_to_dt_update_request(th); - LASSERT(update != NULL); - - rc = out_attr_set_pack(env, &update->dur_buf, - lu_object_fid(&dt->do_lu), attr, - update->dur_batchid); - - return rc; -} - -/** * Implementation of dt_object_operations::do_declare_attr_get * - * Declare setting attributes to the specified remote object. - * - * If the transaction is a non-remote transaction, then add the OUT_ATTR_SET - * sub-request into the OUT RPC that will be flushed when the transaction start. + * Create the dt_update_request to track the update for this OSP + * in the transaction. * * \param[in] env execution environment * \param[in] dt object on which to set attributes * \param[in] attr attributes to be set * \param[in] th the transaction handle * - * \retval 0 if the insertion succeeds. - * \retval negative errno if the insertion fails. + * \retval 0 if preparation succeeds. + * \retval negative errno if preparation fails. */ int osp_md_declare_attr_set(const struct lu_env *env, struct dt_object *dt, const struct lu_attr *attr, struct thandle *th) { - int rc = 0; - - if (!is_only_remote_trans(th)) { - rc = __osp_md_attr_set(env, dt, attr, th); - - CDEBUG(D_INFO, "declare attr set md_object "DFID": rc = %d\n", - PFID(&dt->do_lu.lo_header->loh_fid), rc); - } - - return rc; + return osp_trans_update_request_create(th); } /** @@ -474,27 +304,29 @@ int osp_md_declare_attr_set(const struct lu_env *env, struct dt_object *dt, * * Set attributes to the specified remote object. * - * If the transaction is a remote transaction, then add the OUT_ATTR_SET - * sub-request into the OUT RPC that will be flushed when the transaction stop. + * Add the OUT_ATTR_SET sub-request into the OUT RPC that will be flushed + * when the transaction stop. * * \param[in] env execution environment * \param[in] dt object to set attributes * \param[in] attr attributes to be set * \param[in] th the transaction handle * - * \retval only return 0 for now + * \retval 0 if packing attr_set succeeds. + * \retval negative errno if packing fails. */ int osp_md_attr_set(const struct lu_env *env, struct dt_object *dt, const struct lu_attr *attr, struct thandle *th) { - int rc = 0; + struct dt_update_request *update; + int rc; - if (is_only_remote_trans(th)) { - rc = __osp_md_attr_set(env, dt, attr, th); + update = thandle_to_dt_update_request(th); + LASSERT(update != NULL); - CDEBUG(D_INFO, "attr set md_object "DFID": rc = %d\n", - PFID(&dt->do_lu.lo_header->loh_fid), rc); - } + rc = out_attr_set_pack(env, &update->dur_buf, + lu_object_fid(&dt->do_lu), attr, + update->dur_batchid); return rc; } @@ -635,7 +467,7 @@ static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt, GOTO(out, rc); } - rc = osp_remote_sync(env, osp, update, &req, false); + rc = osp_remote_sync(env, osp, update, &req); if (rc < 0) GOTO(out, rc); @@ -685,48 +517,10 @@ out: } /** - * Add OUT_INDEX_INSERT sub-request into the OUT RPC. - * - * \param[in] env execution environment - * \param[in] dt object for which to insert index - * \param[in] rec record of the index which will be inserted - * \param[in] key key of the index which will be inserted - * \param[in] th the transaction handle - * - * \retval 0 if the insertion succeeds. - * \retval negative errno if the insertion fails. - */ -static int __osp_md_index_insert(const struct lu_env *env, - struct dt_object *dt, - const struct dt_rec *rec, - const struct dt_key *key, - struct thandle *th) -{ - struct osp_thandle *oth = thandle_to_osp_thandle(th); - struct dt_update_request *update = oth->ot_dur; - int rc; - - - rc = out_index_insert_pack(env, &update->dur_buf, - lu_object_fid(&dt->do_lu), rec, key, - update->dur_batchid); - if (rc != 0) - return rc; - - /* Before async update is allowed, if it will insert remote - * name entry, it should make sure the local object is created, - * i.e. the remote update RPC should be sent after local - * update(create object) */ - oth->ot_send_updates_after_local_trans = true; - - return rc; -} - -/** * Implementation of dt_index_operations::dio_declare_insert * - * For non-remote transaction, it will add an OUT_INDEX_INSERT sub-request - * into the OUT RPC that will be flushed when the transaction start. + * Create the dt_update_request to track the update for this OSP + * in the transaction. * * \param[in] env execution environment * \param[in] dt object for which to insert index @@ -734,8 +528,8 @@ static int __osp_md_index_insert(const struct lu_env *env, * \param[in] key key of the index which will be inserted * \param[in] th the transaction handle * - * \retval 0 if the insertion succeeds. - * \retval negative errno if the insertion fails. + * \retval 0 if preparation succeeds. + * \retval negative errno if preparation fails. */ static int osp_md_declare_index_insert(const struct lu_env *env, struct dt_object *dt, @@ -743,25 +537,14 @@ static int osp_md_declare_index_insert(const struct lu_env *env, const struct dt_key *key, struct thandle *th) { - int rc = 0; - - if (!is_only_remote_trans(th)) { - rc = __osp_md_index_insert(env, dt, rec, key, th); - - CDEBUG(D_INFO, "declare index insert "DFID" key %s, rec "DFID - ": rc = %d\n", PFID(&dt->do_lu.lo_header->loh_fid), - (char *)key, - PFID(((struct dt_insert_rec *)rec)->rec_fid), rc); - } - - return rc; + return osp_trans_update_request_create(th); } /** * Implementation of dt_index_operations::dio_insert * - * For remote transaction, it will add an OUT_INDEX_INSERT sub-request - * into the OUT RPC that will be flushed when the transaction stop. + * Add an OUT_INDEX_INSERT sub-request into the OUT RPC that will + * be flushed when the transaction stop. * * \param[in] env execution environment * \param[in] dt object for which to insert index @@ -770,7 +553,8 @@ static int osp_md_declare_index_insert(const struct lu_env *env, * \param[in] th the transaction handle * \param[in] ignore_quota quota enforcement for insert * - * \retval only return 0 for now + * \retval 0 if packing index insert succeeds. + * \retval negative errno if packing fails. */ static int osp_md_index_insert(const struct lu_env *env, struct dt_object *dt, @@ -779,106 +563,68 @@ static int osp_md_index_insert(const struct lu_env *env, struct thandle *th, int ignore_quota) { - int rc = 0; - - if (is_only_remote_trans(th)) { - rc = __osp_md_index_insert(env, dt, rec, key, th); - - CDEBUG(D_INFO, "index insert "DFID" key %s, rec "DFID - ": rc = %d\n", PFID(&dt->do_lu.lo_header->loh_fid), - (char *)key, - PFID(((struct dt_insert_rec *)rec)->rec_fid), rc); - } - - return rc; -} - -/** - * Add OUT_INDEX_DELETE sub-request into the OUT RPC. - * - * \param[in] env execution environment - * \param[in] dt object for which to delete index - * \param[in] key key of the index - * \param[in] th the transaction handle - * - * \retval 0 if the insertion succeeds. - * \retval negative errno if the insertion fails. - */ -static int __osp_md_index_delete(const struct lu_env *env, - struct dt_object *dt, - const struct dt_key *key, - struct thandle *th) -{ - struct dt_update_request *update; + struct osp_thandle *oth = thandle_to_osp_thandle(th); + struct dt_update_request *update = oth->ot_dur; int rc; - update = thandle_to_dt_update_request(th); - LASSERT(update != NULL); - rc = out_index_delete_pack(env, &update->dur_buf, - lu_object_fid(&dt->do_lu), key, + rc = out_index_insert_pack(env, &update->dur_buf, + lu_object_fid(&dt->do_lu), rec, key, update->dur_batchid); + return rc; } /** * Implementation of dt_index_operations::dio_declare_delete * - * For non-remote transaction, it will add an OUT_INDEX_DELETE sub-request - * into the OUT RPC that will be flushed when the transaction start. + * Create the dt_update_request to track the update for this OSP + * in the transaction. * * \param[in] env execution environment * \param[in] dt object for which to delete index * \param[in] key key of the index * \param[in] th the transaction handle * - * \retval 0 if the insertion succeeds. - * \retval negative errno if the insertion fails. + * \retval 0 if preparation succeeds. + * \retval negative errno if preparation fails. */ static int osp_md_declare_index_delete(const struct lu_env *env, struct dt_object *dt, const struct dt_key *key, struct thandle *th) { - int rc = 0; - - if (!is_only_remote_trans(th)) { - rc = __osp_md_index_delete(env, dt, key, th); - - CDEBUG(D_INFO, "declare index delete "DFID" %s: rc = %d\n", - PFID(&dt->do_lu.lo_header->loh_fid), (char *)key, rc); - } - - return rc; + return osp_trans_update_request_create(th); } /** * Implementation of dt_index_operations::dio_delete * - * For remote transaction, it will add an OUT_INDEX_DELETE sub-request - * into the OUT RPC that will be flushed when the transaction stop. + * Add an OUT_INDEX_DELETE sub-request into the OUT RPC that will + * be flushed when the transaction stop. * * \param[in] env execution environment * \param[in] dt object for which to delete index * \param[in] key key of the index which will be deleted * \param[in] th the transaction handle * - * \retval only return 0 for now + * \retval 0 if packing index delete succeeds. + * \retval negative errno if packing fails. */ static int osp_md_index_delete(const struct lu_env *env, struct dt_object *dt, const struct dt_key *key, struct thandle *th) { - int rc = 0; - - if (is_only_remote_trans(th)) { - rc = __osp_md_index_delete(env, dt, key, th); + struct dt_update_request *update; + int rc; - CDEBUG(D_INFO, "index delete "DFID" %s: rc = %d\n", - PFID(&dt->do_lu.lo_header->loh_fid), (char *)key, rc); - } + update = thandle_to_dt_update_request(th); + LASSERT(update != NULL); + rc = out_index_delete_pack(env, &update->dur_buf, + lu_object_fid(&dt->do_lu), key, + update->dur_batchid); return rc; } @@ -1153,6 +899,66 @@ static int osp_md_object_unlock(const struct lu_env *env, return 0; } +/** + * Implement OSP layer dt_object_operations::do_declare_destroy() interface. + * + * Create the dt_update_request to track the update for this OSP + * in the transaction. + * + * \param[in] env pointer to the thread context + * \param[in] dt pointer to the OSP layer dt_object to be destroyed + * \param[in] th pointer to the transaction handler + * + * \retval 0 for success + * \retval negative error number on failure + */ +int osp_md_declare_object_destroy(const struct lu_env *env, + struct dt_object *dt, struct thandle *th) +{ + return osp_trans_update_request_create(th); +} + +/** + * Implement OSP layer dt_object_operations::do_destroy() interface. + * + * Pack the destroy update into the RPC buffer, which will be sent + * to the remote MDT during transaction stop. + * + * It also marks the object as non-cached. + * + * \param[in] env pointer to the thread context + * \param[in] dt pointer to the OSP layer dt_object to be destroyed + * \param[in] th pointer to the transaction handler + * + * \retval 0 for success + * \retval negative error number on failure + */ +int osp_md_object_destroy(const struct lu_env *env, struct dt_object *dt, + struct thandle *th) +{ + struct osp_object *o = dt2osp_obj(dt); + struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev); + struct dt_update_request *update; + int rc = 0; + + ENTRY; + o->opo_non_exist = 1; + + LASSERT(osp->opd_connect_mdt); + update = thandle_to_dt_update_request(th); + LASSERT(update != NULL); + + rc = out_object_destroy_pack(env, &update->dur_buf, + lu_object_fid(&dt->do_lu), update->dur_batchid); + if (rc != 0) + RETURN(rc); + + /* not needed in cache any more */ + set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags); + + RETURN(rc); +} + struct dt_object_operations osp_md_obj_ops = { .do_read_lock = osp_md_object_read_lock, .do_write_lock = osp_md_object_write_lock, @@ -1165,8 +971,8 @@ struct dt_object_operations osp_md_obj_ops = { .do_ref_add = osp_md_ref_add, .do_declare_ref_del = osp_md_declare_ref_del, .do_ref_del = osp_md_ref_del, - .do_declare_destroy = osp_declare_object_destroy, - .do_destroy = osp_object_destroy, + .do_declare_destroy = osp_md_declare_object_destroy, + .do_destroy = osp_md_object_destroy, .do_ah_init = osp_md_ah_init, .do_attr_get = osp_attr_get, .do_declare_attr_set = osp_md_declare_attr_set, @@ -1184,42 +990,31 @@ struct dt_object_operations osp_md_obj_ops = { /** * Implementation of dt_body_operations::dbo_declare_write * - * Declare an object write. In DNE phase I, it will pack the write - * object update into the RPC. - * + * Create the dt_update_request to track the update for this OSP + * in the transaction. + * * \param[in] env execution environment * \param[in] dt object to be written * \param[in] buf buffer to write which includes an embedded size field * \param[in] pos offet in the object to start writing at * \param[in] th transaction handle * - * \retval 0 if the insertion succeeds. - * \retval negative errno if the insertion fails. + * \retval 0 if preparation succeeds. + * \retval negative errno if preparation fails. */ static ssize_t osp_md_declare_write(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, loff_t pos, struct thandle *th) { - struct dt_update_request *update; - ssize_t rc; - - update = thandle_to_dt_update_request(th); - LASSERT(update != NULL); - - rc = out_write_pack(env, &update->dur_buf, lu_object_fid(&dt->do_lu), - buf, pos, update->dur_batchid); - - return rc; - + return osp_trans_update_request_create(th); } /** * Implementation of dt_body_operations::dbo_write * - * Return the buffer size. In DNE phase I, remote updates - * are actually executed during transaction start, the buffer has - * already been written when this method is being called. + * Pack the write object update into the RPC buffer, which will be sent + * to the remote MDT during transaction stop. * * \param[in] env execution environment * \param[in] dt object to be written @@ -1228,12 +1023,25 @@ static ssize_t osp_md_declare_write(const struct lu_env *env, * \param[in] th transaction handle * \param[in] ignore_quota quota enforcement for this write * - * \retval the buffer size in bytes. + * \retval the buffer size in bytes if packing succeeds. + * \retval negative errno if packing fails. */ static ssize_t osp_md_write(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, loff_t *pos, - struct thandle *handle, int ignore_quota) + struct thandle *th, int ignore_quota) { + struct dt_update_request *update; + ssize_t rc; + + update = thandle_to_dt_update_request(th); + LASSERT(update != NULL); + + rc = out_write_pack(env, &update->dur_buf, lu_object_fid(&dt->do_lu), + buf, *pos, update->dur_batchid); + if (rc < 0) + return rc; + + /* XXX: how about the write error happened later? */ *pos += buf->lb_len; return buf->lb_len; } diff --git a/lustre/osp/osp_object.c b/lustre/osp/osp_object.c index c984151..2898da6 100644 --- a/lustre/osp/osp_object.c +++ b/lustre/osp/osp_object.c @@ -575,7 +575,7 @@ int osp_attr_get(const struct lu_env *env, struct dt_object *dt, GOTO(out, rc); } - rc = osp_remote_sync(env, osp, update, &req, false); + rc = osp_remote_sync(env, osp, update, &req); if (rc != 0) { if (rc == -ENOENT) { osp2lu_obj(obj)->lo_header->loh_attr &= ~LOHA_EXISTS; @@ -612,15 +612,29 @@ out: return rc; } -static int __osp_attr_set(const struct lu_env *env, struct dt_object *dt, - const struct lu_attr *attr, struct thandle *th) +/** + * Implement OSP layer dt_object_operations::do_declare_attr_set() interface. + * + * If the transaction is not remote one, then declare the credits that will + * be used for the subsequent llog record for the object's attributes. + * + * \param[in] env pointer to the thread context + * \param[in] dt pointer to the OSP layer dt_object + * \param[in] attr pointer to the attribute to be set + * \param[in] th pointer to the transaction handler + * + * \retval 0 for success + * \retval negative error number on failure + */ +static int osp_declare_attr_set(const struct lu_env *env, struct dt_object *dt, + const struct lu_attr *attr, struct thandle *th) { struct osp_device *d = lu2osp_dev(dt->do_lu.lo_dev); struct osp_object *o = dt2osp_obj(dt); - struct lu_attr *la; - int rc = 0; - ENTRY; + int rc; + if (is_only_remote_trans(th)) + return osp_md_declare_attr_set(env, dt, attr, th); /* * Usually we don't allow server stack to manipulate size * but there is a special case when striping is created @@ -649,71 +663,17 @@ static int __osp_attr_set(const struct lu_env *env, struct dt_object *dt, LASSERT(!dt_object_exists(dt)); osp_object_assign_fid(env, d, o); rc = osp_object_truncate(env, dt, attr->la_size); - if (rc) + if (rc != 0) RETURN(rc); } if (!(attr->la_valid & (LA_UID | LA_GID))) RETURN(0); - if (!is_only_remote_trans(th)) { - /* - * track all UID/GID changes via llog - */ - rc = osp_sync_declare_add(env, o, MDS_SETATTR64_REC, th); - } else { - /* It is for OST-object attr_set directly without updating - * local MDT-object attribute. It is usually used by LFSCK. */ - rc = __osp_md_attr_set(env, dt, attr, th); - } - - if (rc != 0 || o->opo_ooa == NULL) - RETURN(rc); - - /* Update the OSP object attributes cache. */ - la = &o->opo_ooa->ooa_attr; - spin_lock(&o->opo_lock); - if (attr->la_valid & LA_UID) { - la->la_uid = attr->la_uid; - la->la_valid |= LA_UID; - } - - if (attr->la_valid & LA_GID) { - la->la_gid = attr->la_gid; - la->la_valid |= LA_GID; - } - spin_unlock(&o->opo_lock); - - RETURN(0); -} - -/** - * Implement OSP layer dt_object_operations::do_declare_attr_set() interface. - * - * If the transaction is not remote one, then declare the credits that will - * be used for the subsequent llog record for the object's attributes. - * - * \param[in] env pointer to the thread context - * \param[in] dt pointer to the OSP layer dt_object - * \param[in] attr pointer to the attribute to be set - * \param[in] th pointer to the transaction handler - * - * \retval 0 for success - * \retval negative error number on failure - */ -static int osp_declare_attr_set(const struct lu_env *env, struct dt_object *dt, - const struct lu_attr *attr, struct thandle *th) -{ - int rc = 0; - - if (!is_only_remote_trans(th)) { - rc = __osp_attr_set(env, dt, attr, th); - - CDEBUG(D_INFO, "declare set attr "DFID": rc = %d\n", - PFID(&dt->do_lu.lo_header->loh_fid), rc); - } + /* track all UID/GID changes via llog */ + rc = osp_sync_declare_add(env, o, MDS_SETATTR64_REC, th); - return rc; + return 0; } /** @@ -744,24 +704,39 @@ static int osp_attr_set(const struct lu_env *env, struct dt_object *dt, int rc = 0; ENTRY; - if (is_only_remote_trans(th)) { - rc = __osp_attr_set(env, dt, attr, th); + /* we're interested in uid/gid changes only */ + if (!(attr->la_valid & (LA_UID | LA_GID))) + RETURN(0); + + if (!is_only_remote_trans(th)) { + rc = osp_sync_add(env, o, MDS_SETATTR64_REC, th, attr); + /* XXX: send new uid/gid to OST ASAP? */ + } else { + struct lu_attr *la; + /* It is for OST-object attr_set directly without updating + * local MDT-object attribute. It is usually used by LFSCK. */ + rc = osp_md_attr_set(env, dt, attr, th); CDEBUG(D_INFO, "(1) set attr "DFID": rc = %d\n", PFID(&dt->do_lu.lo_header->loh_fid), rc); - RETURN(rc); - } - - /* we're interested in uid/gid changes only */ - if (!(attr->la_valid & (LA_UID | LA_GID))) - RETURN(0); + if (rc != 0 || o->opo_ooa == NULL) + RETURN(rc); - rc = osp_sync_add(env, o, MDS_SETATTR64_REC, th, attr); - /* XXX: send new uid/gid to OST ASAP? */ + /* Update the OSP object attributes cache. */ + la = &o->opo_ooa->ooa_attr; + spin_lock(&o->opo_lock); + if (attr->la_valid & LA_UID) { + la->la_uid = attr->la_uid; + la->la_valid |= LA_UID; + } - CDEBUG(D_INFO, "(2) set attr "DFID": rc = %d\n", - PFID(&dt->do_lu.lo_header->loh_fid), rc); + if (attr->la_valid & LA_GID) { + la->la_gid = attr->la_gid; + la->la_valid |= LA_GID; + } + spin_unlock(&o->opo_lock); + } RETURN(rc); } @@ -996,7 +971,7 @@ unlock: GOTO(out, rc); } - rc = osp_remote_sync(env, osp, update, &req, false); + rc = osp_remote_sync(env, osp, update, &req); if (rc != 0) { if (rc == -ENOENT) { dt->do_lu.lo_header->loh_attr &= ~LOHA_EXISTS; @@ -1110,68 +1085,6 @@ out: return rc; } -static int __osp_xattr_set(const struct lu_env *env, struct dt_object *dt, - const struct lu_buf *buf, const char *name, - int flag, struct thandle *th) -{ - struct osp_object *o = dt2osp_obj(dt); - struct dt_update_request *update; - struct osp_xattr_entry *oxe; - int rc; - ENTRY; - - LASSERT(buf->lb_len > 0 && buf->lb_buf != NULL); - update = thandle_to_dt_update_request(th); - LASSERT(update != NULL); - - rc = out_xattr_set_pack(env, &update->dur_buf, - lu_object_fid(&dt->do_lu), - buf, name, flag, update->dur_batchid); - if (rc != 0 || o->opo_ooa == NULL) - RETURN(rc); - - oxe = osp_oac_xattr_find_or_add(o, name, buf->lb_len); - if (oxe == NULL) { - CWARN("%s: cannot cache xattr '%s' of "DFID"\n", - dt->do_lu.lo_dev->ld_obd->obd_name, - name, PFID(lu_object_fid(&dt->do_lu))); - - RETURN(0); - } - - if (oxe->oxe_buflen - oxe->oxe_namelen - 1 < buf->lb_len) { - struct osp_xattr_entry *old = oxe; - struct osp_xattr_entry *tmp; - - tmp = osp_oac_xattr_replace(o, &old, buf->lb_len); - osp_oac_xattr_put(oxe); - oxe = tmp; - if (tmp == NULL) { - CWARN("%s: cannot update cached xattr '%s' of "DFID"\n", - dt->do_lu.lo_dev->ld_obd->obd_name, - name, PFID(lu_object_fid(&dt->do_lu))); - spin_lock(&o->opo_lock); - old->oxe_ready = 0; - spin_unlock(&o->opo_lock); - - RETURN(0); - } - - /* Drop the ref for entry on list. */ - osp_oac_xattr_put(old); - } - - spin_lock(&o->opo_lock); - oxe->oxe_vallen = buf->lb_len; - memcpy(oxe->oxe_value, buf->lb_buf, buf->lb_len); - oxe->oxe_exist = 1; - oxe->oxe_ready = 1; - spin_unlock(&o->opo_lock); - osp_oac_xattr_put(oxe); - - RETURN(0); -} - /** * Implement OSP layer dt_object_operations::do_declare_xattr_set() interface. * @@ -1199,16 +1112,7 @@ int osp_declare_xattr_set(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, const char *name, int flag, struct thandle *th) { - int rc = 0; - - if (!is_only_remote_trans(th)) { - rc = __osp_xattr_set(env, dt, buf, name, flag, th); - - CDEBUG(D_INFO, "declare xattr %s set object "DFID": rc = %d\n", - name, PFID(&dt->do_lu.lo_header->loh_fid), rc); - } - - return rc; + return osp_trans_update_request_create(th); } /** @@ -1216,11 +1120,10 @@ int osp_declare_xattr_set(const struct lu_env *env, struct dt_object *dt, * * Set extended attribute to the specified MDT/OST object. * - * If it is remote transaction, it will add an OUT_XATTR_SET sub-request into - * the OUT RPC that will be flushed when the transaction stop. And if the OSP - * attributes cache is initialized, then check whether the name extended - * attribute entry exists in the cache or not. If yes, replace it; otherwise, - * add the extended attribute to the cache. + * Add an OUT_XATTR_SET sub-request into the OUT RPC that will be flushed in + * the transaction stop. And if the OSP attributes cache is initialized, then + * check whether the name extended attribute entry exists in the cache or not. + * If yes, replace it; otherwise, add the extended attribute to the cache. * * \param[in] env pointer to the thread context * \param[in] dt pointer to the OSP layer dt_object @@ -1237,44 +1140,65 @@ int osp_xattr_set(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, const char *name, int fl, struct thandle *th) { - int rc = 0; - - if (is_only_remote_trans(th)) { - rc = __osp_xattr_set(env, dt, buf, name, fl, th); - - CDEBUG(D_INFO, "xattr %s set object "DFID": rc = %d\n", - name, PFID(&dt->do_lu.lo_header->loh_fid), rc); - } - - return rc; -} - -static int __osp_xattr_del(const struct lu_env *env, struct dt_object *dt, - const char *name, struct thandle *th) -{ + struct osp_object *o = dt2osp_obj(dt); struct dt_update_request *update; - const struct lu_fid *fid; - struct osp_object *o = dt2osp_obj(dt); - struct osp_xattr_entry *oxe; - int rc; + struct osp_xattr_entry *oxe; + int rc; + ENTRY; + LASSERT(buf->lb_len > 0 && buf->lb_buf != NULL); update = thandle_to_dt_update_request(th); LASSERT(update != NULL); - fid = lu_object_fid(&dt->do_lu); - - rc = out_xattr_del_pack(env, &update->dur_buf, fid, name, - update->dur_batchid); + CDEBUG(D_INODE, DFID" set xattr '%s' with size %zd\n", + PFID(lu_object_fid(&dt->do_lu)), name, buf->lb_len); + rc = out_xattr_set_pack(env, &update->dur_buf, + lu_object_fid(&dt->do_lu), + buf, name, fl, update->dur_batchid); if (rc != 0 || o->opo_ooa == NULL) return rc; - oxe = osp_oac_xattr_find(o, name, true); - if (oxe != NULL) - /* Drop the ref for entry on list. */ + oxe = osp_oac_xattr_find_or_add(o, name, buf->lb_len); + if (oxe == NULL) { + CWARN("%s: cannot cache xattr '%s' of "DFID"\n", + dt->do_lu.lo_dev->ld_obd->obd_name, + name, PFID(lu_object_fid(&dt->do_lu))); + + RETURN(0); + } + + if (oxe->oxe_buflen - oxe->oxe_namelen - 1 < buf->lb_len) { + struct osp_xattr_entry *old = oxe; + struct osp_xattr_entry *tmp; + + tmp = osp_oac_xattr_replace(o, &old, buf->lb_len); osp_oac_xattr_put(oxe); + oxe = tmp; + if (tmp == NULL) { + CWARN("%s: cannot update cached xattr '%s' of "DFID"\n", + dt->do_lu.lo_dev->ld_obd->obd_name, + name, PFID(lu_object_fid(&dt->do_lu))); + spin_lock(&o->opo_lock); + old->oxe_ready = 0; + spin_unlock(&o->opo_lock); - return 0; + RETURN(0); + } + + /* Drop the ref for entry on list. */ + osp_oac_xattr_put(old); + } + + spin_lock(&o->opo_lock); + oxe->oxe_vallen = buf->lb_len; + memcpy(oxe->oxe_value, buf->lb_buf, buf->lb_len); + oxe->oxe_exist = 1; + oxe->oxe_ready = 1; + spin_unlock(&o->opo_lock); + osp_oac_xattr_put(oxe); + + RETURN(0); } /** @@ -1299,16 +1223,7 @@ static int __osp_xattr_del(const struct lu_env *env, struct dt_object *dt, int osp_declare_xattr_del(const struct lu_env *env, struct dt_object *dt, const char *name, struct thandle *th) { - int rc = 0; - - if (!is_only_remote_trans(th)) { - rc = __osp_xattr_del(env, dt, name, th); - - CDEBUG(D_INFO, "declare xattr %s del object "DFID": rc = %d\n", - name, PFID(&dt->do_lu.lo_header->loh_fid), rc); - } - - return rc; + return osp_trans_update_request_create(th); } /** @@ -1332,16 +1247,26 @@ int osp_declare_xattr_del(const struct lu_env *env, struct dt_object *dt, int osp_xattr_del(const struct lu_env *env, struct dt_object *dt, const char *name, struct thandle *th) { - int rc = 0; + struct dt_update_request *update; + const struct lu_fid *fid = lu_object_fid(&dt->do_lu); + struct osp_object *o = dt2osp_obj(dt); + struct osp_xattr_entry *oxe; + int rc; - if (is_only_remote_trans(th)) { - rc = __osp_xattr_del(env, dt, name, th); + update = thandle_to_dt_update_request(th); + LASSERT(update != NULL); - CDEBUG(D_INFO, "xattr %s del object "DFID": rc = %d\n", - name, PFID(&dt->do_lu.lo_header->loh_fid), rc); - } + rc = out_xattr_del_pack(env, &update->dur_buf, fid, name, + update->dur_batchid); + if (rc != 0 || o->opo_ooa == NULL) + return rc; - return rc; + oxe = osp_oac_xattr_find(o, name, true); + if (oxe != NULL) + /* Drop the ref for entry on list. */ + osp_oac_xattr_put(oxe); + + return 0; } /** @@ -1593,13 +1518,12 @@ int osp_declare_object_destroy(const struct lu_env *env, struct dt_object *dt, struct thandle *th) { struct osp_object *o = dt2osp_obj(dt); + struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev); int rc = 0; ENTRY; - /* - * track objects to be destroyed via llog - */ + LASSERT(!osp->opd_connect_mdt); rc = osp_sync_declare_add(env, o, MDS_UNLINK64_REC, th); RETURN(rc); @@ -1622,20 +1546,22 @@ int osp_declare_object_destroy(const struct lu_env *env, * \retval 0 for success * \retval negative error number on failure */ -int osp_object_destroy(const struct lu_env *env, struct dt_object *dt, - struct thandle *th) +static int osp_object_destroy(const struct lu_env *env, struct dt_object *dt, + struct thandle *th) { struct osp_object *o = dt2osp_obj(dt); + struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev); int rc = 0; ENTRY; - o->opo_non_exist = 1; - /* - * once transaction is committed put proper command on - * the queue going to our OST - */ + + LASSERT(!osp->opd_connect_mdt); + /* once transaction is committed put proper command on + * the queue going to our OST. */ rc = osp_sync_add(env, o, MDS_UNLINK64_REC, th, NULL); + if (rc < 0) + RETURN(rc); /* not needed in cache any more */ set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags); diff --git a/lustre/osp/osp_sync.c b/lustre/osp/osp_sync.c index 17d90ed..2f9b78a 100644 --- a/lustre/osp/osp_sync.c +++ b/lustre/osp/osp_sync.c @@ -720,82 +720,6 @@ static int osp_sync_new_unlink_job(struct osp_device *d, } /** - * Prepare OUT-based object destroy RPC. - * - * The function allocates a new RPC with OUT format. Then initializes the RPC - * to contain OUT_DESTROY update against the object specified in the llog - * record provided by the caller. - * - * \param[in] env LU environment provided by the caller - * \param[in] osp OSP device - * \param[in] llh llog handle where the record is stored - * \param[in] h llog record - * \param[out] reqp request prepared - * - * \retval 0 on success - * \retval negative negated errno on error - */ -static int osp_prep_unlink_update_req(const struct lu_env *env, - struct osp_device *osp, - struct llog_handle *llh, - struct llog_rec_hdr *h, - struct ptlrpc_request **reqp) -{ - struct llog_unlink64_rec *rec = (struct llog_unlink64_rec *)h; - struct dt_update_request *update = NULL; - struct ptlrpc_request *req; - struct llog_cookie lcookie; - const void *buf; - __u16 size; - int rc; - ENTRY; - - update = dt_update_request_create(&osp->opd_dt_dev); - if (IS_ERR(update)) - RETURN(PTR_ERR(update)); - - /* This can only happens for unlink slave directory, so decrease - * ref for ".." and "." */ - rc = out_update_pack(env, &update->dur_buf, OUT_REF_DEL, &rec->lur_fid, - 0, NULL, NULL, 0); - if (rc != 0) - GOTO(out, rc); - - rc = out_update_pack(env, &update->dur_buf, OUT_REF_DEL, &rec->lur_fid, - 0, NULL, NULL, 0); - if (rc != 0) - GOTO(out, rc); - - lcookie.lgc_lgl = llh->lgh_id; - lcookie.lgc_subsys = LLOG_MDS_OST_ORIG_CTXT; - lcookie.lgc_index = h->lrh_index; - size = sizeof(lcookie); - buf = &lcookie; - - rc = out_update_pack(env, &update->dur_buf, OUT_DESTROY, &rec->lur_fid, - 1, &size, &buf, 0); - if (rc != 0) - GOTO(out, rc); - - rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import, - update->dur_buf.ub_req, &req); - if (rc != 0) - GOTO(out, rc); - - req->rq_interpret_reply = osp_sync_interpret; - req->rq_commit_cb = osp_sync_request_commit_cb; - req->rq_cb_data = osp; - - ptlrpc_request_set_replen(req); - *reqp = req; -out: - if (update != NULL) - dt_update_request_destroy(update); - - RETURN(rc); -} - -/** * Generate a request for unlink change. * * The function prepares a new RPC, initializes it with unlink(destroy) @@ -826,27 +750,20 @@ static int osp_sync_new_unlink64_job(const struct lu_env *env, ENTRY; LASSERT(h->lrh_type == MDS_UNLINK64_REC); + req = osp_sync_new_job(d, llh, h, OST_DESTROY, + &RQF_OST_DESTROY); + if (IS_ERR(req)) + RETURN(PTR_ERR(req)); - if (d->opd_connect_mdt) { - rc = osp_prep_unlink_update_req(env, d, llh, h, &req); - if (rc != 0) - RETURN(rc); - } else { - req = osp_sync_new_job(d, llh, h, OST_DESTROY, - &RQF_OST_DESTROY); - if (IS_ERR(req)) - RETURN(PTR_ERR(req)); - - body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); - if (body == NULL) - RETURN(-EFAULT); - rc = fid_to_ostid(&rec->lur_fid, &body->oa.o_oi); - if (rc < 0) - RETURN(rc); - body->oa.o_misc = rec->lur_count; - body->oa.o_valid = OBD_MD_FLGROUP | OBD_MD_FLID | - OBD_MD_FLOBJCOUNT; - } + body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); + if (body == NULL) + RETURN(-EFAULT); + rc = fid_to_ostid(&rec->lur_fid, &body->oa.o_oi); + if (rc < 0) + RETURN(rc); + body->oa.o_misc = rec->lur_count; + body->oa.o_valid = OBD_MD_FLGROUP | OBD_MD_FLID | + OBD_MD_FLOBJCOUNT; osp_sync_send_new_rpc(d, req); RETURN(1); } @@ -1028,27 +945,10 @@ static void osp_sync_process_committed(const struct lu_env *env, req = container_of((void *)jra, struct ptlrpc_request, rq_async_args); - if (d->opd_connect_mdt) { - struct object_update_request *ureq; - struct object_update *update; - ureq = req_capsule_client_get(&req->rq_pill, - &RMF_OUT_UPDATE); - LASSERT(ureq != NULL && - ureq->ourq_magic == UPDATE_REQUEST_MAGIC); - - /* 1st/2nd is for decref . and .., 3rd one is for - * destroy, where the log cookie is stored. - * See osp_prep_unlink_update_req */ - update = object_update_request_get(ureq, 2, NULL); - LASSERT(update != NULL); - lcookie = object_update_param_get(update, 0, NULL); - LASSERT(lcookie != NULL); - } else { - body = req_capsule_client_get(&req->rq_pill, - &RMF_OST_BODY); - LASSERT(body); - lcookie = &body->oa.o_lcookie; - } + body = req_capsule_client_get(&req->rq_pill, + &RMF_OST_BODY); + LASSERT(body); + lcookie = &body->oa.o_lcookie; /* import can be closing, thus all commit cb's are * called we can check committness directly */ if (req->rq_transno <= imp->imp_peer_committed_transno) { @@ -1308,10 +1208,7 @@ static int osp_sync_llog_init(const struct lu_env *env, struct osp_device *d) OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt); obd->obd_lvfs_ctxt.dt = d->opd_storage; - if (d->opd_connect_mdt) - lu_local_obj_fid(fid, SLAVE_LLOG_CATALOGS_OID); - else - lu_local_obj_fid(fid, LLOG_CATALOGS_OID); + lu_local_obj_fid(fid, LLOG_CATALOGS_OID); rc = llog_osd_get_cat_list(env, d->opd_storage, d->opd_index, 1, &osi->osi_cid, fid); diff --git a/lustre/osp/osp_trans.c b/lustre/osp/osp_trans.c index 3daed73..740270d 100644 --- a/lustre/osp/osp_trans.c +++ b/lustre/osp/osp_trans.c @@ -53,6 +53,13 @@ * will be called one by one to handle each own result. * * + * There are three kinds of transactions + * + * 1. Local transaction, all of updates of the transaction are in the local MDT. + * 2. Remote transaction, all of updates of the transaction are in one remote + * MDT, which only happens in LFSCK now. + * 3. Distribute transaction, updates for the transaction are in mulitple MDTs. + * * Author: Di Wang * Author: Fan, Yong */ @@ -61,29 +68,108 @@ #include "osp_internal.h" -struct osp_async_update_args { +/** + * The argument for the interpreter callback of osp request. + */ +struct osp_update_args { struct dt_update_request *oaua_update; atomic_t *oaua_count; wait_queue_head_t *oaua_waitq; bool oaua_flow_control; }; -struct osp_async_request { +/** + * Call back for each update request. + */ +struct osp_update_callback { /* list in the dt_update_request::dur_cb_items */ - struct list_head oar_list; + struct list_head ouc_list; /* The target of the async update request. */ - struct osp_object *oar_obj; + struct osp_object *ouc_obj; - /* The data used by oar_interpreter. */ - void *oar_data; + /* The data used by or_interpreter. */ + void *ouc_data; /* The interpreter function called after the async request handled. */ - osp_async_request_interpreter_t oar_interpreter; + osp_update_interpreter_t ouc_interpreter; }; +static struct object_update_request *object_update_request_alloc(size_t size) +{ + struct object_update_request *ourq; + + OBD_ALLOC_LARGE(ourq, size); + if (ourq == NULL) + return ERR_PTR(-ENOMEM); + + ourq->ourq_magic = UPDATE_REQUEST_MAGIC; + ourq->ourq_count = 0; + + return ourq; +} + +static void object_update_request_free(struct object_update_request *ourq, + size_t ourq_size) +{ + if (ourq != NULL) + OBD_FREE_LARGE(ourq, ourq_size); +} + /** - * Allocate an asynchronous request and initialize it with the given parameters. + * Allocate and initialize dt_update_request + * + * dt_update_request is being used to track updates being executed on + * this dt_device(OSD or OSP). The update buffer will be 4k initially, + * and increased if needed. + * + * \param [in] dt dt device + * + * \retval dt_update_request being allocated if succeed + * \retval ERR_PTR(errno) if failed + */ +struct dt_update_request *dt_update_request_create(struct dt_device *dt) +{ + struct dt_update_request *dt_update; + struct object_update_request *ourq; + + OBD_ALLOC_PTR(dt_update); + if (dt_update == NULL) + return ERR_PTR(-ENOMEM); + + ourq = object_update_request_alloc(OUT_UPDATE_INIT_BUFFER_SIZE); + if (IS_ERR(ourq)) { + OBD_FREE_PTR(dt_update); + return ERR_CAST(ourq); + } + + dt_update->dur_buf.ub_req = ourq; + dt_update->dur_buf.ub_req_size = OUT_UPDATE_INIT_BUFFER_SIZE; + + dt_update->dur_dt = dt; + dt_update->dur_batchid = 0; + INIT_LIST_HEAD(&dt_update->dur_cb_items); + + return dt_update; +} + +/** + * Destroy dt_update_request + * + * \param [in] dt_update dt_update_request being destroyed + */ +void dt_update_request_destroy(struct dt_update_request *dt_update) +{ + if (dt_update == NULL) + return; + + object_update_request_free(dt_update->dur_buf.ub_req, + dt_update->dur_buf.ub_req_size); + OBD_FREE_PTR(dt_update); +} + +/** + * Allocate an osp request and initialize it with the given parameters. * * \param[in] obj pointer to the operation target * \param[in] data pointer to the data used by the interpreter @@ -92,38 +178,38 @@ struct osp_async_request { * \retval pointer to the asychronous request * \retval NULL if the allocation failed */ -static struct osp_async_request * -osp_async_request_init(struct osp_object *obj, void *data, - osp_async_request_interpreter_t interpreter) +static struct osp_update_callback * +osp_update_callback_init(struct osp_object *obj, void *data, + osp_update_interpreter_t interpreter) { - struct osp_async_request *oar; + struct osp_update_callback *ouc; - OBD_ALLOC_PTR(oar); - if (oar == NULL) + OBD_ALLOC_PTR(ouc); + if (ouc == NULL) return NULL; lu_object_get(osp2lu_obj(obj)); - INIT_LIST_HEAD(&oar->oar_list); - oar->oar_obj = obj; - oar->oar_data = data; - oar->oar_interpreter = interpreter; + INIT_LIST_HEAD(&ouc->ouc_list); + ouc->ouc_obj = obj; + ouc->ouc_data = data; + ouc->ouc_interpreter = interpreter; - return oar; + return ouc; } /** - * Destroy the asychronous request. + * Destroy the osp_update_callback. * * \param[in] env pointer to the thread context - * \param[in] oar pointer to asychronous request + * \param[in] ouc pointer to osp_update_callback */ -static void osp_async_request_fini(const struct lu_env *env, - struct osp_async_request *oar) +static void osp_update_callback_fini(const struct lu_env *env, + struct osp_update_callback *ouc) { - LASSERT(list_empty(&oar->oar_list)); + LASSERT(list_empty(&ouc->ouc_list)); - lu_object_put(env, osp2lu_obj(oar->oar_obj)); - OBD_FREE_PTR(oar); + lu_object_put(env, osp2lu_obj(ouc->ouc_obj)); + OBD_FREE_PTR(ouc); } /** @@ -140,15 +226,14 @@ static void osp_async_request_fini(const struct lu_env *env, * \retval 0 for success * \retval negative error number on failure */ -static int osp_async_update_interpret(const struct lu_env *env, - struct ptlrpc_request *req, - void *arg, int rc) +static int osp_update_interpret(const struct lu_env *env, + struct ptlrpc_request *req, void *arg, int rc) { struct object_update_reply *reply = NULL; - struct osp_async_update_args *oaua = arg; + struct osp_update_args *oaua = arg; struct dt_update_request *dt_update = oaua->oaua_update; - struct osp_async_request *oar; - struct osp_async_request *next; + struct osp_update_callback *ouc; + struct osp_update_callback *next; int count = 0; int index = 0; int rc1 = 0; @@ -170,9 +255,9 @@ static int osp_async_update_interpret(const struct lu_env *env, rc1 = rc; } - list_for_each_entry_safe(oar, next, &dt_update->dur_cb_items, - oar_list) { - list_del_init(&oar->oar_list); + list_for_each_entry_safe(ouc, next, &dt_update->dur_cb_items, + ouc_list) { + list_del_init(&ouc->ouc_list); /* The peer may only have handled some requests (indicated * by the 'count') in the packaged OUT RPC, we can only get @@ -191,9 +276,11 @@ static int osp_async_update_interpret(const struct lu_env *env, rc1 = -EINVAL; } - oar->oar_interpreter(env, reply, req, oar->oar_obj, - oar->oar_data, index, rc1); - osp_async_request_fini(env, oar); + if (ouc->ouc_interpreter != NULL) + ouc->ouc_interpreter(env, reply, req, ouc->ouc_obj, + ouc->ouc_data, index, rc1); + + osp_update_callback_fini(env, ouc); index++; } @@ -220,22 +307,24 @@ int osp_unplug_async_request(const struct lu_env *env, struct osp_device *osp, struct dt_update_request *update) { - struct osp_async_update_args *args; - struct ptlrpc_request *req = NULL; - int rc; + struct osp_update_args *args; + struct ptlrpc_request *req = NULL; + int rc; rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import, update->dur_buf.ub_req, &req); if (rc != 0) { - struct osp_async_request *oar; - struct osp_async_request *next; - - list_for_each_entry_safe(oar, next, - &update->dur_cb_items, oar_list) { - list_del_init(&oar->oar_list); - oar->oar_interpreter(env, NULL, NULL, oar->oar_obj, - oar->oar_data, 0, rc); - osp_async_request_fini(env, oar); + struct osp_update_callback *ouc; + struct osp_update_callback *next; + + list_for_each_entry_safe(ouc, next, + &update->dur_cb_items, ouc_list) { + list_del_init(&ouc->ouc_list); + if (ouc->ouc_interpreter != NULL) + ouc->ouc_interpreter(env, NULL, NULL, + ouc->ouc_obj, + ouc->ouc_data, 0, rc); + osp_update_callback_fini(env, ouc); } dt_update_request_destroy(update); } else { @@ -244,7 +333,7 @@ int osp_unplug_async_request(const struct lu_env *env, args->oaua_count = NULL; args->oaua_waitq = NULL; args->oaua_flow_control = false; - req->rq_interpret_reply = osp_async_update_interpret; + req->rq_interpret_reply = osp_update_interpret; ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1); } @@ -279,6 +368,37 @@ osp_find_or_create_async_update_request(struct osp_device *osp) } /** + * Insert an osp_update_callback into the dt_update_request. + * + * Insert an osp_update_callback to the dt_update_request. Usually each update + * in the dt_update_request will have one correspondent callback, and these + * callbacks will be called in rq_interpret_reply. + * + * \param[in] env pointer to the thread context + * \param[in] obj pointer to the operation target object + * \param[in] data pointer to the data used by the interpreter + * \param[in] interpreter pointer to the interpreter function + * + * \retval 0 for success + * \retval negative error number on failure + */ +int osp_insert_update_callback(const struct lu_env *env, + struct dt_update_request *update, + struct osp_object *obj, void *data, + osp_update_interpreter_t interpreter) +{ + struct osp_update_callback *ouc; + + ouc = osp_update_callback_init(obj, data, interpreter); + if (ouc == NULL) + RETURN(-ENOMEM); + + list_add_tail(&ouc->ouc_list, &update->dur_cb_items); + + return 0; +} + +/** * Insert an asynchronous idempotent request to the shared request queue that * is attached to the osp_device. * @@ -305,21 +425,16 @@ osp_find_or_create_async_update_request(struct osp_device *osp) int osp_insert_async_request(const struct lu_env *env, enum update_type op, struct osp_object *obj, int count, __u16 *lens, const void **bufs, void *data, - osp_async_request_interpreter_t interpreter) + osp_update_interpreter_t interpreter) { - struct osp_async_request *oar; struct osp_device *osp = lu2osp_dev(osp2lu_obj(obj)->lo_dev); struct dt_update_request *update; int rc = 0; ENTRY; - oar = osp_async_request_init(obj, data, interpreter); - if (oar == NULL) - RETURN(-ENOMEM); - update = osp_find_or_create_async_update_request(osp); if (IS_ERR(update)) - GOTO(out, rc = PTR_ERR(update)); + RETURN(PTR_ERR(update)); again: /* The queue is full. */ @@ -333,27 +448,37 @@ again: rc = osp_unplug_async_request(env, osp, update); mutex_lock(&osp->opd_async_requests_mutex); if (rc != 0) - GOTO(out, rc); + RETURN(rc); update = osp_find_or_create_async_update_request(osp); if (IS_ERR(update)) - GOTO(out, rc = PTR_ERR(update)); + RETURN(PTR_ERR(update)); goto again; } - if (rc == 0) - list_add_tail(&oar->oar_list, &update->dur_cb_items); + rc = osp_insert_update_callback(env, update, obj, data, interpreter); - GOTO(out, rc); + RETURN(rc); +} -out: - if (rc != 0) - osp_async_request_fini(env, oar); +int osp_trans_update_request_create(struct thandle *th) +{ + struct osp_thandle *oth = thandle_to_osp_thandle(th); + struct dt_update_request *update; - return rc; -} + if (oth->ot_dur != NULL) + return 0; + update = dt_update_request_create(th->th_dev); + if (IS_ERR(update)) { + th->th_result = PTR_ERR(update); + return PTR_ERR(update); + } + + oth->ot_dur = update; + return 0; +} /** * The OSP layer dt_device_operations::dt_trans_create() interface * to create a transaction. @@ -385,7 +510,6 @@ struct thandle *osp_trans_create(const struct lu_env *env, struct dt_device *d) { struct osp_thandle *oth; struct thandle *th = NULL; - struct dt_update_request *update; ENTRY; OBD_ALLOC_PTR(oth); @@ -396,15 +520,6 @@ struct thandle *osp_trans_create(const struct lu_env *env, struct dt_device *d) th->th_dev = d; th->th_tags = LCT_TX_HANDLE; - update = dt_update_request_create(d); - if (IS_ERR(update)) { - OBD_FREE_PTR(oth); - RETURN(ERR_CAST(update)); - } - - oth->ot_dur = update; - oth->ot_send_updates_after_local_trans = false; - RETURN(th); } @@ -475,7 +590,7 @@ int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp, */ int osp_remote_sync(const struct lu_env *env, struct osp_device *osp, struct dt_update_request *dt_update, - struct ptlrpc_request **reqp, bool rpc_lock) + struct ptlrpc_request **reqp) { struct obd_import *imp = osp->opd_obd->u.cli.cl_import; struct ptlrpc_request *req = NULL; @@ -488,11 +603,7 @@ int osp_remote_sync(const struct lu_env *env, struct osp_device *osp, /* Note: some dt index api might return non-zero result here, like * osd_index_ea_lookup, so we should only check rc < 0 here */ - if (rpc_lock) - osp_get_rpc_lock(osp); rc = ptlrpc_queue_wait(req); - if (rpc_lock) - osp_put_rpc_lock(osp); if (rc < 0) { ptlrpc_req_finished(req); dt_update->dur_rc = rc; @@ -514,9 +625,8 @@ int osp_remote_sync(const struct lu_env *env, struct osp_device *osp, /** * Trigger the request for remote updates. * - * If the transaction is not a remote one or it is required to be sync mode - * (th->th_sync is set), then it will be sent synchronously; otherwise, the - * RPC will be sent asynchronously. + * If th_sync is set, then the request will be sent synchronously, + * otherwise, the RPC will be sent asynchronously. * * Please refer to osp_trans_create() for transaction type. * @@ -524,44 +634,50 @@ int osp_remote_sync(const struct lu_env *env, struct osp_device *osp, * \param[in] osp pointer to the OSP device * \param[in] dt_update pointer to the dt_update_request * \param[in] th pointer to the transaction handler - * \param[in] flow_control whether need to control the flow + * \param[out] sent whether the RPC has been sent * * \retval 0 for success * \retval negative error number on failure */ static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp, struct dt_update_request *dt_update, - struct thandle *th, bool flow_control) + struct thandle *th, int *sent) { + struct osp_update_args *args; + struct ptlrpc_request *req; int rc = 0; + ENTRY; - if (is_only_remote_trans(th) && !th->th_sync) { - struct osp_async_update_args *args; - struct ptlrpc_request *req; - - rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import, - dt_update->dur_buf.ub_req, &req); - if (rc != 0) - return rc; - down_read(&osp->opd_async_updates_rwsem); - - args = ptlrpc_req_async_args(req); - args->oaua_update = dt_update; - args->oaua_count = &osp->opd_async_updates_count; - args->oaua_waitq = &osp->opd_syn_barrier_waitq; - args->oaua_flow_control = flow_control; - req->rq_interpret_reply = - osp_async_update_interpret; + rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import, + dt_update->dur_buf.ub_req, &req); + if (rc != 0) + RETURN(rc); - atomic_inc(args->oaua_count); - up_read(&osp->opd_async_updates_rwsem); + *sent = 1; + req->rq_interpret_reply = osp_update_interpret; + args = ptlrpc_req_async_args(req); + args->oaua_update = dt_update; + if (is_only_remote_trans(th) && !th->th_sync) { + args->oaua_flow_control = true; + + if (!osp->opd_connect_mdt) { + down_read(&osp->opd_async_updates_rwsem); + args->oaua_count = &osp->opd_async_updates_count; + args->oaua_waitq = &osp->opd_syn_barrier_waitq; + up_read(&osp->opd_async_updates_rwsem); + atomic_inc(args->oaua_count); + } ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1); } else { - rc = osp_remote_sync(env, osp, dt_update, NULL, true); + osp_get_rpc_lock(osp); + args->oaua_flow_control = false; + rc = ptlrpc_queue_wait(req); + osp_put_rpc_lock(osp); + ptlrpc_req_finished(req); } - return rc; + RETURN(rc); } /** @@ -618,19 +734,7 @@ struct thandle *osp_get_storage_thandle(const struct lu_env *env, * to start the transaction. * * If the transaction is a remote transaction, then related remote - * updates will be triggered in the osp_trans_stop(); otherwise the - * transaction contains both local and remote update(s), then when - * the OUT RPC will be triggered depends on the operation, and is - * indicated by the dt_device::tu_sent_after_local_trans, for example: - * - * 1) If it is remote create, it will send the remote req after local - * transaction. i.e. create the object locally first, then insert the - * remote name entry. - * - * 2) If it is remote unlink, it will send the remote req before the - * local transaction, i.e. delete the name entry remotely first, then - * destroy the local object. - * + * updates will be triggered in the osp_trans_stop(). * Please refer to osp_trans_create() for transaction type. * * \param[in] env pointer to the thread context @@ -643,51 +747,20 @@ struct thandle *osp_get_storage_thandle(const struct lu_env *env, int osp_trans_start(const struct lu_env *env, struct dt_device *dt, struct thandle *th) { - struct osp_thandle *oth = thandle_to_osp_thandle(th); - struct dt_update_request *dt_update; - int rc = 0; + struct osp_thandle *oth = thandle_to_osp_thandle(th); - dt_update = oth->ot_dur; - LASSERT(dt_update != NULL); - - /* return if there are no updates, */ - if (dt_update->dur_buf.ub_req == NULL || - dt_update->dur_buf.ub_req->ourq_count == 0) - GOTO(out, rc = 0); - - /* Note: some updates needs to send before local transaction, - * some needs to send after local transaction. - * - * If the transaction only includes remote updates, it will - * send updates to remote MDT in osp_trans_stop. - * - * If it is remote create, it will send the remote req after - * local transaction. i.e. create the object locally first, - * then insert the name entry. - * - * If it is remote unlink, it will send the remote req before - * the local transaction, i.e. delete the name entry remote - * first, then destroy the local object. */ - if (!is_only_remote_trans(th) && - !oth->ot_send_updates_after_local_trans) - rc = osp_trans_trigger(env, dt2osp_dev(dt), dt_update, th, - false); - -out: /* For remote thandle, if there are local thandle, start it here*/ - if (th->th_top == NULL && oth->ot_storage_th != NULL) - rc = dt_trans_start(env, oth->ot_storage_th->th_dev, - oth->ot_storage_th); - - return rc; + if (is_only_remote_trans(th) && oth->ot_storage_th != NULL) + return dt_trans_start(env, oth->ot_storage_th->th_dev, + oth->ot_storage_th); + return 0; } /** * The OSP layer dt_device_operations::dt_trans_stop() interface * to stop the transaction. * - * If the transaction is a remote transaction, or the update handler - * is marked as 'tu_sent_after_local_trans', then related remote + * If the transaction is a remote transaction, related remote * updates will be triggered here via osp_trans_trigger(). * * For synchronous mode update or any failed update, the request @@ -705,17 +778,12 @@ out: int osp_trans_stop(const struct lu_env *env, struct dt_device *dt, struct thandle *th) { - struct osp_thandle *oth = thandle_to_osp_thandle(th); struct dt_update_request *dt_update; int rc = 0; - bool keep_dt_update = false; + int sent = 0; ENTRY; - dt_update = oth->ot_dur; - LASSERT(dt_update != NULL); - LASSERT(dt_update != LP_POISON); - /* For remote transaction, if there is local storage thandle, * stop it first */ if (oth->ot_storage_th != NULL && th->th_top == NULL) { @@ -723,46 +791,62 @@ int osp_trans_stop(const struct lu_env *env, struct dt_device *dt, oth->ot_storage_th); oth->ot_storage_th = NULL; } + + dt_update = oth->ot_dur; + if (dt_update == NULL) + GOTO(out, rc); + + LASSERT(dt_update != LP_POISON); + /* If there are no updates, destroy dt_update and thandle */ if (dt_update->dur_buf.ub_req == NULL || - dt_update->dur_buf.ub_req->ourq_count == 0) + dt_update->dur_buf.ub_req->ourq_count == 0) { + dt_update_request_destroy(dt_update); GOTO(out, rc); + } if (is_only_remote_trans(th) && !th->th_sync) { struct osp_device *osp = dt2osp_dev(th->th_dev); struct client_obd *cli = &osp->opd_obd->u.cli; - if (th->th_result != 0) { - rc = th->th_result; + rc = obd_get_request_slot(cli); + if (rc != 0) GOTO(out, rc); - } - rc = obd_get_request_slot(cli); if (!osp->opd_imp_active || !osp->opd_imp_connected) { - if (rc == 0) - obd_put_request_slot(cli); - rc = -ENOTCONN; + obd_put_request_slot(cli); + GOTO(out, rc = -ENOTCONN); } - if (rc != 0) - GOTO(out, rc); rc = osp_trans_trigger(env, dt2osp_dev(dt), - dt_update, th, true); + dt_update, th, &sent); if (rc != 0) obd_put_request_slot(cli); - else - keep_dt_update = true; } else { - if (oth->ot_send_updates_after_local_trans || - (is_only_remote_trans(th) && th->th_sync)) - rc = osp_trans_trigger(env, dt2osp_dev(dt), dt_update, - th, false); - rc = dt_update->dur_rc; + rc = osp_trans_trigger(env, dt2osp_dev(dt), dt_update, + th, &sent); } out: - if (!keep_dt_update) + /* If RPC is triggered successfully, dt_update will be freed in + * osp_update_interpreter() */ + if (rc != 0 && dt_update != NULL && sent == 0) { + struct osp_update_callback *ouc; + struct osp_update_callback *next; + + list_for_each_entry_safe(ouc, next, &dt_update->dur_cb_items, + ouc_list) { + list_del_init(&ouc->ouc_list); + if (ouc->ouc_interpreter != NULL) + ouc->ouc_interpreter(env, NULL, NULL, + ouc->ouc_obj, + ouc->ouc_data, 0, rc); + osp_update_callback_fini(env, ouc); + } + dt_update_request_destroy(dt_update); + } + OBD_FREE_PTR(oth); RETURN(rc); diff --git a/lustre/target/out_lib.c b/lustre/target/out_lib.c index 192a2b8..a6d6b36 100644 --- a/lustre/target/out_lib.c +++ b/lustre/target/out_lib.c @@ -38,85 +38,6 @@ #define OUT_UPDATE_BUFFER_SIZE_ADD 4096 #define OUT_UPDATE_BUFFER_SIZE_MAX (256 * 4096) /* 1MB update size now */ -static inline struct object_update_request * -object_update_request_alloc(size_t size) -{ - struct object_update_request *ourq; - - OBD_ALLOC_LARGE(ourq, size); - if (ourq == NULL) - RETURN(ERR_PTR(-ENOMEM)); - - ourq->ourq_magic = UPDATE_REQUEST_MAGIC; - ourq->ourq_count = 0; - - RETURN(ourq); -} - -static inline void -object_update_request_free(struct object_update_request *ourq, - size_t ourq_size) -{ - if (ourq != NULL) - OBD_FREE_LARGE(ourq, ourq_size); -} - -/** - * Allocate and initialize dt_update_request - * - * dt_update_request is being used to track updates being executed on - * this dt_device(OSD or OSP). The update buffer will be 4k initially, - * and increased if needed. - * - * \param [in] dt dt device - * - * \retval dt_update_request being allocated if succeed - * \retval ERR_PTR(errno) if failed - */ -struct dt_update_request *dt_update_request_create(struct dt_device *dt) -{ - struct dt_update_request *dt_update; - struct object_update_request *ourq; - - OBD_ALLOC_PTR(dt_update); - if (!dt_update) - return ERR_PTR(-ENOMEM); - - ourq = object_update_request_alloc(OUT_UPDATE_INIT_BUFFER_SIZE); - if (IS_ERR(ourq)) { - OBD_FREE_PTR(dt_update); - return ERR_CAST(ourq); - } - - dt_update->dur_buf.ub_req = ourq; - dt_update->dur_buf.ub_req_size = OUT_UPDATE_INIT_BUFFER_SIZE; - - dt_update->dur_dt = dt; - dt_update->dur_batchid = 0; - INIT_LIST_HEAD(&dt_update->dur_cb_items); - - return dt_update; -} -EXPORT_SYMBOL(dt_update_request_create); - -/** - * Destroy dt_update_request - * - * \param [in] dt_update dt_update_request being destroyed - */ -void dt_update_request_destroy(struct dt_update_request *dt_update) -{ - if (dt_update == NULL) - return; - - object_update_request_free(dt_update->dur_buf.ub_req, - dt_update->dur_buf.ub_req_size); - OBD_FREE_PTR(dt_update); - - return; -} -EXPORT_SYMBOL(dt_update_request_destroy); - /** * resize update buffer * @@ -217,11 +138,11 @@ out_update_header_pack(const struct lu_env *env, struct update_buffer *ubuf, param = (struct object_update_param *)((char *)param + object_update_param_size(param)); } - ureq->ourq_count++; CDEBUG(D_INFO, "%p "DFID" idx %u: op %d params %d:%d\n", ureq, PFID(fid), ureq->ourq_count, op, params_count, (int)update_size); + ureq->ourq_count++; RETURN(obj_update); }