From: Wang Di Date: Thu, 19 Jun 2014 10:18:29 +0000 (-0700) Subject: LU-3536 lod: Separate thandle to different layers. X-Git-Tag: 2.7.52~8 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=b4e6b6b280626dbafebe3e35858707f3143de24a LU-3536 lod: Separate thandle to different layers. Separate thandle into different layers on MDT stack. The current implementation use single thandle in all layers, which might cause some issues for cross-MDT transaction, for example during transaction stop, it needs to stop local OSD transaction first, then send remote RPC, because we do not want hold the transaction, during RPC sending, but once we stop osd transaction, which might cause this single thandle be destroyed (see osd_trans_stop()), but all of remote updates are still attached in this thandle. This patch will separate the thandle to different layers: 1. MDD thandle will present itself in MDD transaction. 2. LOD thandle will distribute the thandle to all sub-thandles, and also attach the update blob buffer to store updates for cross-MDT operations. 3. OSP thandle will store the updates for the remote correspondent target, and also manage to send them to the remote target. 4. OSD thandle will still be the same as the original one, relying on the bottom FS to achieve the "atomic". Signed-off-by: wang di Change-Id: I0c73cc80fa692c2e5d5a09e441c28e228d822ce0 Reviewed-on: http://review.whamcloud.com/10640 Tested-by: Jenkins Reviewed-by: Alex Zhuravlev Tested-by: Maloo Reviewed-by: Lai Siyao Reviewed-by: Oleg Drokin --- diff --git a/lustre/include/dt_object.h b/lustre/include/dt_object.h index cb99e18..c192728 100644 --- a/lustre/include/dt_object.h +++ b/lustre/include/dt_object.h @@ -1776,19 +1776,6 @@ static inline struct dt_object *lu2dt_obj(struct lu_object *o) return container_of0(o, struct dt_object, do_lu); } -struct thandle_update { - /* In DNE, one transaction can be disassembled into - * updates on several different MDTs, and these updates - * will be attached to tu_remote_update_list per target. - * Only single thread will access the list, no need lock - */ - struct list_head tu_remote_update_list; - - /* sent after or before local transaction */ - unsigned int tu_sent_after_local_trans:1, - tu_only_remote_trans:1; -}; - /** * This is the general purpose transaction handle. * 1. Transaction Life Cycle @@ -1807,9 +1794,8 @@ struct thandle { /** the dt device on which the transactions are executed */ struct dt_device *th_dev; - atomic_t th_refc; - /* the size of transaction */ - int th_alloc_size; + /* In some callback function, it needs to access the top_th directly */ + struct thandle *th_top; /** context for this transaction, tag is LCT_TX_HANDLE */ struct lu_context th_ctx; @@ -1822,27 +1808,11 @@ struct thandle { __s32 th_result; /** whether we need sync commit */ - unsigned int th_sync:1; - + unsigned int th_sync:1, /* local transation, no need to inform other layers */ - unsigned int th_local:1; - - struct thandle_update *th_update; + th_local:1; }; -static inline void thandle_get(struct thandle *thandle) -{ - atomic_inc(&thandle->th_refc); -} - -static inline void thandle_put(struct thandle *thandle) -{ - if (atomic_dec_and_test(&thandle->th_refc)) { - if (thandle->th_update != NULL) - OBD_FREE_PTR(thandle->th_update); - OBD_FREE(thandle, thandle->th_alloc_size); - } -} /** * Transaction call-backs. * @@ -1919,6 +1889,18 @@ dt_locate(const struct lu_env *env, struct dt_device *dev, dev->dd_lu_dev.ld_site->ls_top_dev, NULL); } +static inline struct dt_object * +dt_object_locate(struct dt_object *dto, struct dt_device *dt_dev) +{ + struct lu_object *lo; + + list_for_each_entry(lo, &dto->do_lu.lo_header->loh_layers, lo_linkage) { + if (lo->lo_dev == &dt_dev->dd_lu_dev) + return container_of(lo, struct dt_object, do_lu); + } + return NULL; +} + int local_oid_storage_init(const struct lu_env *env, struct dt_device *dev, const struct lu_fid *first_fid, struct local_oid_storage **los); @@ -2351,6 +2333,27 @@ static inline int dt_read_prep(const struct lu_env *env, struct dt_object *d, return d->do_body_ops->dbo_read_prep(env, d, lnb, n); } +static inline int dt_declare_write(const struct lu_env *env, + struct dt_object *dt, + const struct lu_buf *buf, loff_t pos, + struct thandle *th) +{ + LASSERT(dt); + LASSERT(dt->do_body_ops); + LASSERT(dt->do_body_ops->dbo_declare_write); + return dt->do_body_ops->dbo_declare_write(env, dt, buf, pos, th); +} + +static inline ssize_t dt_write(const struct lu_env *env, struct dt_object *dt, + const struct lu_buf *buf, loff_t *pos, + struct thandle *th, int rq) +{ + LASSERT(dt); + LASSERT(dt->do_body_ops); + LASSERT(dt->do_body_ops->dbo_write); + return dt->do_body_ops->dbo_write(env, dt, buf, pos, th, rq); +} + static inline int dt_declare_punch(const struct lu_env *env, struct dt_object *dt, __u64 start, __u64 end, struct thandle *th) diff --git a/lustre/include/lustre_update.h b/lustre/include/lustre_update.h index c75e4fb..861d631 100644 --- a/lustre/include/lustre_update.h +++ b/lustre/include/lustre_update.h @@ -31,30 +31,45 @@ #ifndef _LUSTRE_UPDATE_H #define _LUSTRE_UPDATE_H #include +#include #define OUT_UPDATE_INIT_BUFFER_SIZE 4096 #define OUT_UPDATE_REPLY_SIZE 8192 -struct dt_object; -struct dt_object_hint; -struct dt_object_format; -struct dt_allocation_hint; struct dt_key; struct dt_rec; -struct thandle; struct update_buffer { struct object_update_request *ub_req; size_t ub_req_size; }; +#define TOP_THANDLE_MAGIC 0x20140917 +/* {top,sub}_thandle are used to manage distributed transactions which + * include updates on several nodes. A top_handle represents the + * whole operation, and sub_thandle represents updates on each node. */ +struct top_thandle { + struct thandle tt_super; + __u32 tt_magic; + /* The master sub transaction. */ + struct thandle *tt_master_sub_thandle; + + /* Other sub thandle will be listed here. */ + struct list_head tt_sub_thandle_list; +}; + +struct sub_thandle { + /* point to the osd/osp_thandle */ + struct thandle *st_sub_th; + struct list_head st_sub_list; +}; + /** * Tracking the updates being executed on this dt_device. */ struct dt_update_request { struct dt_device *dur_dt; /* attached itself to thandle */ - struct list_head dur_list; int dur_flags; /* update request result */ int dur_rc; @@ -167,13 +182,9 @@ static inline void update_inc_batchid(struct dt_update_request *update) } /* target/out_lib.c */ -struct thandle_update; -struct dt_update_request *out_find_update(struct thandle_update *tu, - struct dt_device *dt_dev); void dt_update_request_destroy(struct dt_update_request *update); struct dt_update_request *dt_update_request_create(struct dt_device *dt); -struct dt_update_request *dt_update_request_find_or_create(struct thandle *th, - struct dt_object *dt); + int out_update_pack(const struct lu_env *env, struct update_buffer *ubuf, enum update_type op, const struct lu_fid *fid, int params_count, __u16 *param_sizes, const void **bufs, @@ -214,4 +225,27 @@ int out_index_lookup_pack(const struct lu_env *env, struct update_buffer *ubuf, const struct dt_key *key); int out_xattr_get_pack(const struct lu_env *env, struct update_buffer *ubuf, const struct lu_fid *fid, const char *name); + +/* target/update_trans.c */ +struct thandle *thandle_get_sub_by_dt(const struct lu_env *env, + struct thandle *th, + struct dt_device *sub_dt); + +static inline struct thandle * +thandle_get_sub(const struct lu_env *env, struct thandle *th, + const struct dt_object *sub_obj) +{ + return thandle_get_sub_by_dt(env, th, lu2dt_dev(sub_obj->do_lu.lo_dev)); +} + +struct thandle * +top_trans_create(const struct lu_env *env, struct dt_device *master_dev); + +int top_trans_start(const struct lu_env *env, struct dt_device *master_dev, + struct thandle *th); + +int top_trans_stop(const struct lu_env *env, struct dt_device *master_dev, + struct thandle *th); + +void top_thandle_destroy(struct top_thandle *top_th); #endif diff --git a/lustre/lfsck/lfsck_layout.c b/lustre/lfsck/lfsck_layout.c index bb873c8..e156fd2 100644 --- a/lustre/lfsck/lfsck_layout.c +++ b/lustre/lfsck/lfsck_layout.c @@ -2876,7 +2876,7 @@ static int lfsck_layout_repair_multiple_references(const struct lu_env *env, struct dt_object_format *dof = &info->lti_dof; struct ost_id *oi = &info->lti_oi; struct lfsck_instance *lfsck = com->lc_lfsck; - struct dt_device *dev = lfsck->li_bottom; + struct dt_device *dev; struct lu_device *d = &lfsck_obj2dev(llr->llr_child)->dd_lu_dev; struct lu_object *o; @@ -2922,6 +2922,7 @@ static int lfsck_layout_repair_multiple_references(const struct lu_env *env, la->la_valid = LA_UID | LA_GID; memset(dof, 0, sizeof(*dof)); + dev = lfsck_obj2dev(child); handle = dt_trans_create(env, dev); if (IS_ERR(handle)) GOTO(log, rc = PTR_ERR(handle)); @@ -2930,7 +2931,7 @@ static int lfsck_layout_repair_multiple_references(const struct lu_env *env, if (rc != 0) GOTO(stop, rc); - rc = dt_trans_start_local(env, dev, handle); + rc = dt_trans_start(env, dev, handle); if (rc != 0) GOTO(stop, rc); diff --git a/lustre/lfsck/lfsck_namespace.c b/lustre/lfsck/lfsck_namespace.c index 70e36a1..1aa9e4a 100644 --- a/lustre/lfsck/lfsck_namespace.c +++ b/lustre/lfsck/lfsck_namespace.c @@ -2038,6 +2038,7 @@ int lfsck_namespace_repair_dirent(const struct lu_env *env, const struct lu_fid *cfid = lfsck_dto2fid(child); struct lu_fid tfid; struct lfsck_instance *lfsck = com->lc_lfsck; + struct dt_object *dto; struct dt_device *dev = lfsck->li_next; struct thandle *th = NULL; struct lfsck_lock_handle *llh = &info->lti_llh; @@ -2061,14 +2062,15 @@ int lfsck_namespace_repair_dirent(const struct lu_env *env, if (IS_ERR(th)) GOTO(unlock1, rc = PTR_ERR(th)); - rc = dt_declare_delete(env, parent, (const struct dt_key *)name, th); + dto = dt_object_locate(parent, th->th_dev); + rc = dt_declare_delete(env, dto, (const struct dt_key *)name, th); if (rc != 0) GOTO(stop, rc); if (update) { rec->rec_type = lfsck_object_type(child) & S_IFMT; rec->rec_fid = cfid; - rc = dt_declare_insert(env, parent, + rc = dt_declare_insert(env, dto, (const struct dt_rec *)rec, (const struct dt_key *)name2, th); if (rc != 0) @@ -2076,7 +2078,7 @@ int lfsck_namespace_repair_dirent(const struct lu_env *env, } if (dec) { - rc = dt_declare_ref_del(env, parent, th); + rc = dt_declare_ref_del(env, dto, th); if (rc != 0) GOTO(stop, rc); } @@ -2085,8 +2087,9 @@ int lfsck_namespace_repair_dirent(const struct lu_env *env, if (rc != 0) GOTO(stop, rc); - dt_write_lock(env, parent, 0); - rc = dt_lookup(env, parent, (struct dt_rec *)&tfid, + + dt_write_lock(env, dto, 0); + rc = dt_lookup(env, dto, (struct dt_rec *)&tfid, (const struct dt_key *)name); /* Someone has removed the bad name entry by race. */ if (rc == -ENOENT) @@ -2103,12 +2106,12 @@ int lfsck_namespace_repair_dirent(const struct lu_env *env, if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN) GOTO(unlock2, rc = 1); - rc = dt_delete(env, parent, (const struct dt_key *)name, th); + rc = dt_delete(env, dto, (const struct dt_key *)name, th); if (rc != 0) GOTO(unlock2, rc); if (update) { - rc = dt_insert(env, parent, + rc = dt_insert(env, dto, (const struct dt_rec *)rec, (const struct dt_key *)name2, th, 1); if (rc != 0) @@ -2116,7 +2119,7 @@ int lfsck_namespace_repair_dirent(const struct lu_env *env, } if (dec) { - rc = dt_ref_del(env, parent, th); + rc = dt_ref_del(env, dto, th); if (rc != 0) GOTO(unlock2, rc); } diff --git a/lustre/lod/Makefile.in b/lustre/lod/Makefile.in index 31459b5..29acce8 100644 --- a/lustre/lod/Makefile.in +++ b/lustre/lod/Makefile.in @@ -1,5 +1,6 @@ MODULES := lod -lod-objs := lod_dev.o lod_lov.o lproc_lod.o lod_pool.o lod_object.o lod_qos.o +lod-objs := lod_dev.o lod_lov.o lproc_lod.o lod_pool.o lod_object.o lod_qos.o \ + lod_sub_object.o EXTRA_DIST = $(lod-objs:.o=.c) lod_internal.h diff --git a/lustre/lod/lod_dev.c b/lustre/lod/lod_dev.c index 14e76df..1c1b6ce 100644 --- a/lustre/lod/lod_dev.c +++ b/lustre/lod/lod_dev.c @@ -533,14 +533,16 @@ static int lod_statfs(const struct lu_env *env, * see include/dt_object.h for the details. */ static struct thandle *lod_trans_create(const struct lu_env *env, - struct dt_device *dev) + struct dt_device *dt) { struct thandle *th; - th = dt_trans_create(env, dt2lod_dev(dev)->lod_child); + th = top_trans_create(env, dt2lod_dev(dt)->lod_child); if (IS_ERR(th)) return th; + th->th_dev = dt; + return th; } @@ -552,25 +554,10 @@ static struct thandle *lod_trans_create(const struct lu_env *env, * * see include/dt_object.h for the details. */ -static int lod_trans_start(const struct lu_env *env, struct dt_device *dev, +static int lod_trans_start(const struct lu_env *env, struct dt_device *dt, struct thandle *th) { - struct lod_device *lod = dt2lod_dev((struct dt_device *) dev); - int rc = 0; - - if (unlikely(th->th_update != NULL)) { - struct thandle_update *tu = th->th_update; - struct dt_update_request *update; - - list_for_each_entry(update, &tu->tu_remote_update_list, - dur_list) { - LASSERT(update->dur_dt != NULL); - rc = dt_trans_start(env, update->dur_dt, th); - if (rc != 0) - return rc; - } - } - return dt_trans_start(env, lod->lod_child, th); + return top_trans_start(env, dt2lod_dev(dt)->lod_child, th); } /** @@ -584,31 +571,7 @@ static int lod_trans_start(const struct lu_env *env, struct dt_device *dev, static int lod_trans_stop(const struct lu_env *env, struct dt_device *dt, struct thandle *th) { - struct thandle_update *tu = th->th_update; - struct dt_update_request *update; - struct dt_update_request *tmp; - int rc2 = 0; - int rc; - ENTRY; - - thandle_get(th); - rc = dt_trans_stop(env, th->th_dev, th); - if (likely(tu == NULL)) { - thandle_put(th); - RETURN(rc); - } - - list_for_each_entry_safe(update, tmp, - &tu->tu_remote_update_list, - dur_list) { - /* update will be freed inside dt_trans_stop */ - rc2 = dt_trans_stop(env, update->dur_dt, th); - if (unlikely(rc2 != 0 && rc == 0)) - rc = rc2; - } - thandle_put(th); - - RETURN(rc); + return top_trans_stop(env, dt2lod_dev(dt)->lod_child, th); } /** diff --git a/lustre/lod/lod_internal.h b/lustre/lod/lod_internal.h index 3d6fa6d..2830b83 100644 --- a/lustre/lod/lod_internal.h +++ b/lustre/lod/lod_internal.h @@ -482,5 +482,80 @@ int lod_striping_create(const struct lu_env *env, struct dt_object *dt, struct thandle *th); void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo); +/* lod_sub_object.c */ +struct thandle *lod_sub_get_thandle(const struct lu_env *env, + struct thandle *th, + const struct dt_object *sub_obj); +int lod_sub_object_declare_create(const struct lu_env *env, + struct dt_object *dt, + struct lu_attr *attr, + struct dt_allocation_hint *hint, + struct dt_object_format *dof, + struct thandle *th); +int lod_sub_object_create(const struct lu_env *env, struct dt_object *dt, + struct lu_attr *attr, + struct dt_allocation_hint *hint, + struct dt_object_format *dof, + struct thandle *th); +int lod_sub_object_declare_ref_add(const struct lu_env *env, + struct dt_object *dt, + struct thandle *th); +int lod_sub_object_ref_add(const struct lu_env *env, struct dt_object *dt, + struct thandle *th); +int lod_sub_object_declare_ref_del(const struct lu_env *env, + struct dt_object *dt, + struct thandle *th); +int lod_sub_object_ref_del(const struct lu_env *env, struct dt_object *dt, + struct thandle *th); +int lod_sub_object_declare_destroy(const struct lu_env *env, + struct dt_object *dt, + struct thandle *th); +int lod_sub_object_destroy(const struct lu_env *env, struct dt_object *dt, + struct thandle *th); +int lod_sub_object_declare_insert(const struct lu_env *env, + struct dt_object *dt, + const struct dt_rec *rec, + const struct dt_key *key, + struct thandle *th); +int lod_sub_object_index_insert(const struct lu_env *env, struct dt_object *dt, + const struct dt_rec *rec, + const struct dt_key *key, struct thandle *th, + int ign); +int lod_sub_object_declare_delete(const struct lu_env *env, + struct dt_object *dt, + const struct dt_key *key, + struct thandle *th); +int lod_sub_object_delete(const struct lu_env *env, struct dt_object *dt, + const struct dt_key *name, struct thandle *th); +int lod_sub_object_declare_xattr_set(const struct lu_env *env, + struct dt_object *dt, + const struct lu_buf *buf, + const char *name, int fl, + struct thandle *th); +int lod_sub_object_xattr_set(const struct lu_env *env, struct dt_object *dt, + const struct lu_buf *buf, const char *name, int fl, + struct thandle *th); +int lod_sub_object_declare_attr_set(const struct lu_env *env, + struct dt_object *dt, + const struct lu_attr *attr, + struct thandle *th); +int lod_sub_object_attr_set(const struct lu_env *env, + struct dt_object *dt, + const struct lu_attr *attr, + struct thandle *th); +int lod_sub_object_declare_xattr_del(const struct lu_env *env, + struct dt_object *dt, + const char *name, + struct thandle *th); +int lod_sub_object_xattr_del(const struct lu_env *env, + struct dt_object *dt, + const char *name, + struct thandle *th); +int lod_sub_object_declare_write(const struct lu_env *env, + struct dt_object *dt, + const struct lu_buf *buf, loff_t pos, + struct thandle *th); +ssize_t lod_sub_object_write(const struct lu_env *env, struct dt_object *dt, + const struct lu_buf *buf, loff_t *pos, + struct thandle *th, int rq); #endif - diff --git a/lustre/lod/lod_lov.c b/lustre/lod/lod_lov.c index f9bc8ae..c4a6544 100644 --- a/lustre/lod/lod_lov.c +++ b/lustre/lod/lod_lov.c @@ -667,10 +667,12 @@ int lod_generate_and_set_lovea(const struct lu_env *env, info->lti_buf.lb_buf = lmm; info->lti_buf.lb_len = lmm_size; - rc = dt_xattr_set(env, next, &info->lti_buf, XATTR_NAME_LOV, 0, - th); - if (rc < 0) + rc = lod_sub_object_xattr_set(env, next, &info->lti_buf, XATTR_NAME_LOV, + 0, th); + if (rc < 0) { lod_object_free_striping(env, lo); + RETURN(rc); + } RETURN(rc); } diff --git a/lustre/lod/lod_object.c b/lustre/lod/lod_object.c index f8536d0..ad146e6 100644 --- a/lustre/lod/lod_object.c +++ b/lustre/lod/lod_object.c @@ -86,9 +86,10 @@ static int lod_declare_index_insert(const struct lu_env *env, struct dt_object *dt, const struct dt_rec *rec, const struct dt_key *key, - struct thandle *handle) + struct thandle *th) { - return dt_declare_insert(env, dt_object_child(dt), rec, key, handle); + return lod_sub_object_declare_insert(env, dt_object_child(dt), + rec, key, th); } /** @@ -105,7 +106,8 @@ static int lod_index_insert(const struct lu_env *env, struct thandle *th, int ign) { - return dt_insert(env, dt_object_child(dt), rec, key, th, ign); + return lod_sub_object_index_insert(env, dt_object_child(dt), rec, key, + th, ign); } /** @@ -121,7 +123,8 @@ static int lod_declare_index_delete(const struct lu_env *env, const struct dt_key *key, struct thandle *th) { - return dt_declare_delete(env, dt_object_child(dt), key, th); + return lod_sub_object_declare_delete(env, dt_object_child(dt), key, + th); } /** @@ -136,7 +139,7 @@ static int lod_index_delete(const struct lu_env *env, const struct dt_key *key, struct thandle *th) { - return dt_delete(env, dt_object_child(dt), key, th); + return lod_sub_object_delete(env, dt_object_child(dt), key, th); } /** @@ -153,7 +156,6 @@ static struct dt_it *lod_it_init(const struct lu_env *env, struct lod_it *it = &lod_env_info(env)->lti_it; struct dt_it *it_next; - it_next = next->do_index_ops->dio_it.init(env, next, attr); if (IS_ERR(it_next)) return it_next; @@ -1075,7 +1077,7 @@ static int lod_attr_get(const struct lu_env *env, **/ static int lod_mark_dead_object(const struct lu_env *env, struct dt_object *dt, - struct thandle *handle, + struct thandle *th, bool declare) { struct lod_object *lo = lod_dt_obj(dt); @@ -1111,13 +1113,14 @@ static int lod_mark_dead_object(const struct lu_env *env, buf.lb_buf = lmv; buf.lb_len = sizeof(*lmv); if (declare) { - rc = dt_declare_xattr_set(env, lo->ldo_stripe[i], &buf, - XATTR_NAME_LMV, - LU_XATTR_REPLACE, handle); + rc = lod_sub_object_declare_xattr_set(env, + lo->ldo_stripe[i], &buf, + XATTR_NAME_LMV, + LU_XATTR_REPLACE, th); } else { - rc = dt_xattr_set(env, lo->ldo_stripe[i], &buf, - XATTR_NAME_LMV, LU_XATTR_REPLACE, - handle); + rc = lod_sub_object_xattr_set(env, lo->ldo_stripe[i], + &buf, XATTR_NAME_LMV, + LU_XATTR_REPLACE, th); } if (rc != 0) break; @@ -1137,7 +1140,7 @@ static int lod_mark_dead_object(const struct lu_env *env, static int lod_declare_attr_set(const struct lu_env *env, struct dt_object *dt, const struct lu_attr *attr, - struct thandle *handle) + struct thandle *th) { struct dt_object *next = dt_object_child(dt); struct lod_object *lo = lod_dt_obj(dt); @@ -1147,14 +1150,14 @@ static int lod_declare_attr_set(const struct lu_env *env, /* Set dead object on all other stripes */ if (attr->la_valid & LA_FLAGS && !(attr->la_valid & ~LA_FLAGS) && attr->la_flags & LUSTRE_SLAVE_DEAD_FL) { - rc = lod_mark_dead_object(env, dt, handle, true); + rc = lod_mark_dead_object(env, dt, th, true); RETURN(rc); } /* * declare setattr on the local object */ - rc = dt_declare_attr_set(env, next, attr, handle); + rc = lod_sub_object_declare_attr_set(env, next, attr, th); if (rc) RETURN(rc); @@ -1193,20 +1196,20 @@ static int lod_declare_attr_set(const struct lu_env *env, */ LASSERT(lo->ldo_stripe); for (i = 0; i < lo->ldo_stripenr; i++) { - if (likely(lo->ldo_stripe[i] != NULL)) { - rc = dt_declare_attr_set(env, lo->ldo_stripe[i], attr, - handle); - if (rc != 0) { - CERROR("failed declaration: %d\n", rc); - break; - } - } + if (lo->ldo_stripe[i] == NULL) + continue; + rc = lod_sub_object_declare_attr_set(env, + lo->ldo_stripe[i], attr, + th); + if (rc != 0) + RETURN(rc); } if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) && dt_object_exists(next) != 0 && dt_object_remote(next) == 0) - dt_declare_xattr_del(env, next, XATTR_NAME_LOV, handle); + lod_sub_object_declare_xattr_del(env, next, + XATTR_NAME_LOV, th); if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) && dt_object_exists(next) && @@ -1216,8 +1219,9 @@ static int lod_declare_attr_set(const struct lu_env *env, buf->lb_buf = info->lti_ea_store; buf->lb_len = info->lti_ea_store_size; - dt_declare_xattr_set(env, next, buf, XATTR_NAME_LOV, - LU_XATTR_REPLACE, handle); + lod_sub_object_declare_xattr_set(env, next, buf, + XATTR_NAME_LOV, + LU_XATTR_REPLACE, th); } RETURN(rc); @@ -1234,7 +1238,7 @@ static int lod_declare_attr_set(const struct lu_env *env, static int lod_attr_set(const struct lu_env *env, struct dt_object *dt, const struct lu_attr *attr, - struct thandle *handle) + struct thandle *th) { struct dt_object *next = dt_object_child(dt); struct lod_object *lo = lod_dt_obj(dt); @@ -1244,14 +1248,14 @@ static int lod_attr_set(const struct lu_env *env, /* Set dead object on all other stripes */ if (attr->la_valid & LA_FLAGS && !(attr->la_valid & ~LA_FLAGS) && attr->la_flags & LUSTRE_SLAVE_DEAD_FL) { - rc = lod_mark_dead_object(env, dt, handle, false); + rc = lod_mark_dead_object(env, dt, th, false); RETURN(rc); } /* * apply changes to the local object */ - rc = dt_attr_set(env, next, attr, handle); + rc = lod_sub_object_attr_set(env, next, attr, th); if (rc) RETURN(rc); @@ -1277,21 +1281,20 @@ static int lod_attr_set(const struct lu_env *env, for (i = 0; i < lo->ldo_stripenr; i++) { if (unlikely(lo->ldo_stripe[i] == NULL)) continue; + if (S_ISDIR(dt->do_lu.lo_header->loh_attr) && (dt_object_exists(lo->ldo_stripe[i]) == 0)) continue; - rc = dt_attr_set(env, lo->ldo_stripe[i], attr, handle); - if (rc != 0) { - CERROR("failed declaration: %d\n", rc); + rc = lod_sub_object_attr_set(env, lo->ldo_stripe[i], attr, th); + if (rc != 0) break; - } } if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) && dt_object_exists(next) != 0 && dt_object_remote(next) == 0) - dt_xattr_del(env, next, XATTR_NAME_LOV, handle); + rc = lod_sub_object_xattr_del(env, next, XATTR_NAME_LOV, th); if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) && dt_object_exists(next) && @@ -1322,8 +1325,9 @@ static int lod_attr_set(const struct lu_env *env, fid->f_oid--; fid_to_ostid(fid, oi); ostid_cpu_to_le(oi, &objs->l_ost_oi); - dt_xattr_set(env, next, buf, XATTR_NAME_LOV, - LU_XATTR_REPLACE, handle); + + rc = lod_sub_object_xattr_set(env, next, buf, XATTR_NAME_LOV, + LU_XATTR_REPLACE, th); } RETURN(rc); @@ -1629,6 +1633,139 @@ out: * \retval 0 on success * \retval negative if failed */ +static int lod_dir_declare_create_stripes(const struct lu_env *env, + struct dt_object *dt, + struct lu_attr *attr, + struct dt_object_format *dof, + struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lu_buf lmv_buf; + struct lu_buf slave_lmv_buf; + struct lmv_mds_md_v1 *lmm; + struct lmv_mds_md_v1 *slave_lmm = NULL; + struct dt_insert_rec *rec = &info->lti_dt_rec; + struct lod_object *lo = lod_dt_obj(dt); + int rc; + __u32 i; + ENTRY; + + rc = lod_prep_lmv_md(env, dt, &lmv_buf); + if (rc != 0) + GOTO(out, rc); + lmm = lmv_buf.lb_buf; + + OBD_ALLOC_PTR(slave_lmm); + if (slave_lmm == NULL) + GOTO(out, rc = -ENOMEM); + + lod_prep_slave_lmv_md(slave_lmm, lmm); + slave_lmv_buf.lb_buf = slave_lmm; + slave_lmv_buf.lb_len = sizeof(*slave_lmm); + + if (!dt_try_as_dir(env, dt_object_child(dt))) + GOTO(out, rc = -EINVAL); + + rec->rec_type = S_IFDIR; + for (i = 0; i < lo->ldo_stripenr; i++) { + struct dt_object *dto = lo->ldo_stripe[i]; + char *stripe_name = info->lti_key; + struct lu_name *sname; + struct linkea_data ldata = { NULL }; + struct lu_buf linkea_buf; + + rc = lod_sub_object_declare_create(env, dto, attr, NULL, + dof, th); + if (rc != 0) + GOTO(out, rc); + + if (!dt_try_as_dir(env, dto)) + GOTO(out, rc = -EINVAL); + + rc = lod_sub_object_declare_ref_add(env, dto, th); + if (rc != 0) + GOTO(out, rc); + + rec->rec_fid = lu_object_fid(&dto->do_lu); + rc = lod_sub_object_declare_insert(env, dto, + (const struct dt_rec *)rec, + (const struct dt_key *)dot, th); + if (rc != 0) + GOTO(out, rc); + + /* master stripe FID will be put to .. */ + rec->rec_fid = lu_object_fid(&dt->do_lu); + rc = lod_sub_object_declare_insert(env, dto, + (const struct dt_rec *)rec, + (const struct dt_key *)dotdot, + th); + if (rc != 0) + GOTO(out, rc); + + if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SLAVE_LMV) || + cfs_fail_val != i) { + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_LMV) && + cfs_fail_val == i) + slave_lmm->lmv_master_mdt_index = + cpu_to_le32(i + 1); + else + slave_lmm->lmv_master_mdt_index = + cpu_to_le32(i); + rc = lod_sub_object_declare_xattr_set(env, dto, + &slave_lmv_buf, XATTR_NAME_LMV, 0, th); + if (rc != 0) + GOTO(out, rc); + } + + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) && + cfs_fail_val == i) + snprintf(stripe_name, sizeof(info->lti_key), DFID":%u", + PFID(lu_object_fid(&dto->do_lu)), i + 1); + else + snprintf(stripe_name, sizeof(info->lti_key), DFID":%u", + PFID(lu_object_fid(&dto->do_lu)), i); + + sname = lod_name_get(env, stripe_name, strlen(stripe_name)); + rc = linkea_data_new(&ldata, &info->lti_linkea_buf); + if (rc != 0) + GOTO(out, rc); + + rc = linkea_add_buf(&ldata, sname, lu_object_fid(&dt->do_lu)); + if (rc != 0) + GOTO(out, rc); + + linkea_buf.lb_buf = ldata.ld_buf->lb_buf; + linkea_buf.lb_len = ldata.ld_leh->leh_len; + rc = lod_sub_object_declare_xattr_set(env, dto, &linkea_buf, + XATTR_NAME_LINK, 0, th); + if (rc != 0) + GOTO(out, rc); + + rec->rec_fid = lu_object_fid(&dto->do_lu); + rc = lod_sub_object_declare_insert(env, dt_object_child(dt), + (const struct dt_rec *)rec, + (const struct dt_key *)stripe_name, + th); + if (rc != 0) + GOTO(out, rc); + + rc = lod_sub_object_declare_ref_add(env, dt_object_child(dt), + th); + if (rc != 0) + GOTO(out, rc); + } + + rc = lod_sub_object_declare_xattr_set(env, dt_object_child(dt), + &lmv_buf, XATTR_NAME_LMV, 0, th); + if (rc != 0) + GOTO(out, rc); +out: + if (slave_lmm != NULL) + OBD_FREE_PTR(slave_lmm); + + RETURN(rc); +} + static int lod_prep_md_striped_create(const struct lu_env *env, struct dt_object *dt, struct lu_attr *attr, @@ -1639,13 +1776,7 @@ static int lod_prep_md_striped_create(const struct lu_env *env, struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev); struct lod_tgt_descs *ltd = &lod->lod_mdt_descs; struct lod_object *lo = lod_dt_obj(dt); - struct lod_thread_info *info = lod_env_info(env); struct dt_object **stripe; - struct lu_buf lmv_buf; - struct lu_buf slave_lmv_buf; - struct lmv_mds_md_v1 *lmm; - struct lmv_mds_md_v1 *slave_lmm = NULL; - struct dt_insert_rec *rec = &info->lti_dt_rec; __u32 stripe_count; int *idx_array; int rc = 0; @@ -1774,145 +1905,7 @@ next: if (lo->ldo_stripenr == 0) GOTO(out_put, rc = -ENOSPC); - rc = lod_prep_lmv_md(env, dt, &lmv_buf); - if (rc != 0) - GOTO(out_put, rc); - lmm = lmv_buf.lb_buf; - - OBD_ALLOC_PTR(slave_lmm); - if (slave_lmm == NULL) - GOTO(out_put, rc = -ENOMEM); - - lod_prep_slave_lmv_md(slave_lmm, lmm); - slave_lmv_buf.lb_buf = slave_lmm; - slave_lmv_buf.lb_len = sizeof(*slave_lmm); - - if (!dt_try_as_dir(env, dt_object_child(dt))) - GOTO(out_put, rc = -EINVAL); - - rec->rec_type = S_IFDIR; - for (i = 0; i < lo->ldo_stripenr; i++) { - struct dt_object *dto = stripe[i]; - char *stripe_name = info->lti_key; - struct lu_name *sname; - struct linkea_data ldata = { NULL }; - struct lu_buf linkea_buf; - - rc = dt_declare_create(env, dto, attr, NULL, dof, th); - if (rc != 0) - GOTO(out_put, rc); - - if (!dt_try_as_dir(env, dto)) - GOTO(out_put, rc = -EINVAL); - - rc = dt_declare_ref_add(env, dto, th); - if (rc != 0) - GOTO(out_put, rc); - - rec->rec_fid = lu_object_fid(&dto->do_lu); - rc = dt_declare_insert(env, dto, (const struct dt_rec *)rec, - (const struct dt_key *)dot, th); - if (rc != 0) - GOTO(out_put, rc); - - /* master stripe FID will be put to .. */ - rec->rec_fid = lu_object_fid(&dt->do_lu); - rc = dt_declare_insert(env, dto, (const struct dt_rec *)rec, - (const struct dt_key *)dotdot, th); - if (rc != 0) - GOTO(out_put, rc); - - /* probably nothing to inherite */ - if (lo->ldo_def_striping_set && - !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size, - lo->ldo_def_stripenr, - lo->ldo_def_stripe_offset, - lo->ldo_pool)) { - struct lov_user_md_v3 *v3; - - /* sigh, lti_ea_store has been used for lmv_buf, - * so we have to allocate buffer for default - * stripe EA */ - OBD_ALLOC_PTR(v3); - if (v3 == NULL) - GOTO(out_put, rc = -ENOMEM); - - memset(v3, 0, sizeof(*v3)); - v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3); - v3->lmm_stripe_count = - cpu_to_le16(lo->ldo_def_stripenr); - v3->lmm_stripe_offset = - cpu_to_le16(lo->ldo_def_stripe_offset); - v3->lmm_stripe_size = - cpu_to_le32(lo->ldo_def_stripe_size); - if (lo->ldo_pool != NULL) - strlcpy(v3->lmm_pool_name, lo->ldo_pool, - sizeof(v3->lmm_pool_name)); - - info->lti_buf.lb_buf = v3; - info->lti_buf.lb_len = sizeof(*v3); - rc = dt_declare_xattr_set(env, dto, - &info->lti_buf, - XATTR_NAME_LOV, - 0, th); - OBD_FREE_PTR(v3); - if (rc != 0) - GOTO(out_put, rc); - } - - if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SLAVE_LMV) || - cfs_fail_val != i) { - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_LMV) && - cfs_fail_val == i) - slave_lmm->lmv_master_mdt_index = - cpu_to_le32(i + 1); - else - slave_lmm->lmv_master_mdt_index = - cpu_to_le32(i); - rc = dt_declare_xattr_set(env, dto, &slave_lmv_buf, - XATTR_NAME_LMV, 0, th); - if (rc != 0) - GOTO(out_put, rc); - } - - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) && - cfs_fail_val == i) - snprintf(stripe_name, sizeof(info->lti_key), DFID":%u", - PFID(lu_object_fid(&dto->do_lu)), i + 1); - else - snprintf(stripe_name, sizeof(info->lti_key), DFID":%u", - PFID(lu_object_fid(&dto->do_lu)), i); - - sname = lod_name_get(env, stripe_name, strlen(stripe_name)); - rc = linkea_data_new(&ldata, &info->lti_linkea_buf); - if (rc != 0) - GOTO(out_put, rc); - - rc = linkea_add_buf(&ldata, sname, lu_object_fid(&dt->do_lu)); - if (rc != 0) - GOTO(out_put, rc); - - linkea_buf.lb_buf = ldata.ld_buf->lb_buf; - linkea_buf.lb_len = ldata.ld_leh->leh_len; - rc = dt_declare_xattr_set(env, dto, &linkea_buf, - XATTR_NAME_LINK, 0, th); - if (rc != 0) - GOTO(out_put, rc); - - rec->rec_fid = lu_object_fid(&dto->do_lu); - rc = dt_declare_insert(env, dt_object_child(dt), - (const struct dt_rec *)rec, - (const struct dt_key *)stripe_name, th); - if (rc != 0) - GOTO(out_put, rc); - - rc = dt_declare_ref_add(env, dt_object_child(dt), th); - if (rc != 0) - GOTO(out_put, rc); - } - - rc = dt_declare_xattr_set(env, dt_object_child(dt), &lmv_buf, - XATTR_NAME_LMV, 0, th); + rc = lod_dir_declare_create_stripes(env, dt, attr, dof, th); if (rc != 0) GOTO(out_put, rc); @@ -1930,8 +1923,6 @@ out_put: out_free: if (idx_array != NULL) OBD_FREE(idx_array, sizeof(idx_array[0]) * stripe_count); - if (slave_lmm != NULL) - OBD_FREE_PTR(slave_lmm); RETURN(rc); } @@ -2028,7 +2019,7 @@ static int lod_dir_declare_xattr_set(const struct lu_env *env, RETURN(rc); } - rc = dt_declare_xattr_set(env, next, buf, name, fl, th); + rc = lod_sub_object_declare_xattr_set(env, next, buf, name, fl, th); if (rc != 0) RETURN(rc); @@ -2046,8 +2037,9 @@ static int lod_dir_declare_xattr_set(const struct lu_env *env, for (i = 0; i < lo->ldo_stripenr; i++) { LASSERT(lo->ldo_stripe[i]); - rc = dt_declare_xattr_set(env, lo->ldo_stripe[i], buf, - name, fl, th); + + rc = lod_sub_object_declare_xattr_set(env, lo->ldo_stripe[i], + buf, name, fl, th); if (rc != 0) break; } @@ -2102,7 +2094,8 @@ static int lod_declare_xattr_set(const struct lu_env *env, } else if (S_ISDIR(mode)) { rc = lod_dir_declare_xattr_set(env, dt, buf, name, fl, th); } else { - rc = dt_declare_xattr_set(env, next, buf, name, fl, th); + rc = lod_sub_object_declare_xattr_set(env, next, buf, name, + fl, th); } RETURN(rc); @@ -2150,7 +2143,7 @@ static int lod_xattr_set_internal(const struct lu_env *env, int i; ENTRY; - rc = dt_xattr_set(env, next, buf, name, fl, th); + rc = lod_sub_object_xattr_set(env, next, buf, name, fl, th); if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr)) RETURN(rc); @@ -2163,7 +2156,9 @@ static int lod_xattr_set_internal(const struct lu_env *env, for (i = 0; i < lo->ldo_stripenr; i++) { LASSERT(lo->ldo_stripe[i]); - rc = dt_xattr_set(env, lo->ldo_stripe[i], buf, name, fl, th); + + rc = lod_sub_object_xattr_set(env, lo->ldo_stripe[i], buf, name, + fl, th); if (rc != 0) break; } @@ -2194,7 +2189,7 @@ static int lod_xattr_del_internal(const struct lu_env *env, int i; ENTRY; - rc = dt_xattr_del(env, next, name, th); + rc = lod_sub_object_xattr_del(env, next, name, th); if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr)) RETURN(rc); @@ -2203,7 +2198,9 @@ static int lod_xattr_del_internal(const struct lu_env *env, for (i = 0; i < lo->ldo_stripenr; i++) { LASSERT(lo->ldo_stripe[i]); - rc = dt_xattr_del(env, lo->ldo_stripe[i], name, th); + + rc = lod_sub_object_xattr_del(env, lo->ldo_stripe[i], name, + th); if (rc != 0) break; } @@ -2409,71 +2406,39 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, rec->rec_type = S_IFDIR; for (i = 0; i < lo->ldo_stripenr; i++) { - struct dt_object *dto; - char *stripe_name = info->lti_key; + struct dt_object *dto; + char *stripe_name = info->lti_key; struct lu_name *sname; struct linkea_data ldata = { NULL }; struct lu_buf linkea_buf; dto = lo->ldo_stripe[i]; + dt_write_lock(env, dto, MOR_TGT_CHILD); - rc = dt_create(env, dto, attr, NULL, dof, th); + rc = lod_sub_object_create(env, dto, attr, NULL, dof, + th); if (rc != 0) { dt_write_unlock(env, dto); - RETURN(rc); + GOTO(out, rc); } - rc = dt_ref_add(env, dto, th); + rc = lod_sub_object_ref_add(env, dto, th); dt_write_unlock(env, dto); if (rc != 0) - RETURN(rc); + GOTO(out, rc); rec->rec_fid = lu_object_fid(&dto->do_lu); - rc = dt_insert(env, dto, (const struct dt_rec *)rec, - (const struct dt_key *)dot, th, 0); + rc = lod_sub_object_index_insert(env, dto, + (const struct dt_rec *)rec, + (const struct dt_key *)dot, th, 0); if (rc != 0) - RETURN(rc); + GOTO(out, rc); rec->rec_fid = lu_object_fid(&dt->do_lu); - rc = dt_insert(env, dto, (struct dt_rec *)rec, + rc = lod_sub_object_index_insert(env, dto, (struct dt_rec *)rec, (const struct dt_key *)dotdot, th, 0); if (rc != 0) - RETURN(rc); - - if (lo->ldo_def_striping_set && - !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size, - lo->ldo_def_stripenr, - lo->ldo_def_stripe_offset, - lo->ldo_pool)) { - struct lov_user_md_v3 *v3; - - /* sigh, lti_ea_store has been used for lmv_buf, - * so we have to allocate buffer for default - * stripe EA */ - OBD_ALLOC_PTR(v3); - if (v3 == NULL) - GOTO(out, rc); - - memset(v3, 0, sizeof(*v3)); - v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3); - v3->lmm_stripe_count = - cpu_to_le16(lo->ldo_def_stripenr); - v3->lmm_stripe_offset = - cpu_to_le16(lo->ldo_def_stripe_offset); - v3->lmm_stripe_size = - cpu_to_le32(lo->ldo_def_stripe_size); - if (lo->ldo_pool != NULL) - strlcpy(v3->lmm_pool_name, lo->ldo_pool, - sizeof(v3->lmm_pool_name)); - - info->lti_buf.lb_buf = v3; - info->lti_buf.lb_len = sizeof(*v3); - rc = dt_xattr_set(env, dto, &info->lti_buf, - XATTR_NAME_LOV, 0, th); - OBD_FREE_PTR(v3); - if (rc != 0) - GOTO(out, rc); - } + GOTO(out, rc); if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SLAVE_LMV) || cfs_fail_val != i) { @@ -2484,8 +2449,9 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, else slave_lmm->lmv_master_mdt_index = cpu_to_le32(i); - rc = dt_xattr_set(env, dto, &slave_lmv_buf, - XATTR_NAME_LMV, fl, th); + + rc = lod_sub_object_xattr_set(env, dto, &slave_lmv_buf, + XATTR_NAME_LMV, fl, th); if (rc != 0) GOTO(out, rc); } @@ -2509,27 +2475,26 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, linkea_buf.lb_buf = ldata.ld_buf->lb_buf; linkea_buf.lb_len = ldata.ld_leh->leh_len; - rc = dt_xattr_set(env, dto, &linkea_buf, XATTR_NAME_LINK, - 0, th); + rc = lod_sub_object_xattr_set(env, dto, &linkea_buf, + XATTR_NAME_LINK, 0, th); if (rc != 0) GOTO(out, rc); rec->rec_fid = lu_object_fid(&dto->do_lu); - rc = dt_insert(env, dt_object_child(dt), + rc = lod_sub_object_index_insert(env, dt_object_child(dt), (const struct dt_rec *)rec, (const struct dt_key *)stripe_name, th, 0); if (rc != 0) GOTO(out, rc); - rc = dt_ref_add(env, dt_object_child(dt), th); + rc = lod_sub_object_ref_add(env, dt_object_child(dt), th); if (rc != 0) GOTO(out, rc); } if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MASTER_LMV)) - rc = dt_xattr_set(env, dt_object_child(dt), &lmv_buf, - XATTR_NAME_LMV, fl, th); - + rc = lod_sub_object_xattr_set(env, dt_object_child(dt), + &lmv_buf, XATTR_NAME_LMV, fl, th); out: if (slave_lmm != NULL) OBD_FREE_PTR(slave_lmm); @@ -2736,7 +2701,8 @@ static int lod_xattr_set(const struct lu_env *env, if (lmm != NULL && le32_to_cpu(lmm->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION) - rc = dt_xattr_set(env, next, buf, name, fl, th); + rc = lod_sub_object_xattr_set(env, next, buf, name, fl, + th); else rc = lod_dir_striping_create(env, dt, NULL, NULL, th); @@ -2759,12 +2725,13 @@ static int lod_xattr_set(const struct lu_env *env, /* in case of lov EA swap, just set it * if not, it is a replay so check striping match what we * already have during req replay, declare_xattr_set() - * defines striping, then create() does the work - */ + * defines striping, then create() does the work */ if (fl & LU_XATTR_REPLACE) { /* free stripes, then update disk */ lod_object_free_striping(env, lod_dt_obj(dt)); - rc = dt_xattr_set(env, next, buf, name, fl, th); + + rc = lod_sub_object_xattr_set(env, next, buf, name, + fl, th); } else { rc = lod_striping_create(env, dt, NULL, NULL, th); } @@ -2792,7 +2759,8 @@ static int lod_declare_xattr_del(const struct lu_env *env, int i; ENTRY; - rc = dt_declare_xattr_del(env, dt_object_child(dt), name, th); + rc = lod_sub_object_declare_xattr_del(env, dt_object_child(dt), + name, th); if (rc != 0) RETURN(rc); @@ -2809,7 +2777,8 @@ static int lod_declare_xattr_del(const struct lu_env *env, for (i = 0; i < lo->ldo_stripenr; i++) { LASSERT(lo->ldo_stripe[i]); - rc = dt_declare_xattr_del(env, lo->ldo_stripe[i], name, th); + rc = lod_sub_object_declare_xattr_del(env, lo->ldo_stripe[i], + name, th); if (rc != 0) break; } @@ -2837,7 +2806,7 @@ static int lod_xattr_del(const struct lu_env *env, struct dt_object *dt, if (!strcmp(name, XATTR_NAME_LOV)) lod_object_free_striping(env, lod_dt_obj(dt)); - rc = dt_xattr_del(env, next, name, th); + rc = lod_sub_object_xattr_del(env, next, name, th); if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr)) RETURN(rc); @@ -2846,7 +2815,8 @@ static int lod_xattr_del(const struct lu_env *env, struct dt_object *dt, for (i = 0; i < lo->ldo_stripenr; i++) { LASSERT(lo->ldo_stripe[i]); - rc = dt_xattr_del(env, lo->ldo_stripe[i], name, th); + + rc = lod_sub_object_xattr_del(env, lo->ldo_stripe[i], name, th); if (rc != 0) break; } @@ -3319,7 +3289,8 @@ static int lod_declare_init_size(const struct lu_env *env, attr->la_valid = LA_SIZE; attr->la_size = size; - rc = dt_declare_attr_set(env, lo->ldo_stripe[stripe], attr, th); + rc = lod_sub_object_declare_attr_set(env, lo->ldo_stripe[stripe], attr, + th); RETURN(rc); } @@ -3385,8 +3356,8 @@ int lod_declare_striped_object(const struct lu_env *env, struct dt_object *dt, info->lti_buf = *lovea; } - rc = dt_declare_xattr_set(env, next, &info->lti_buf, - XATTR_NAME_LOV, 0, th); + rc = lod_sub_object_declare_xattr_set(env, next, &info->lti_buf, + XATTR_NAME_LOV, 0, th); if (rc) GOTO(out, rc); @@ -3432,8 +3403,8 @@ static int lod_declare_object_create(const struct lu_env *env, /* * first of all, we declare creation of local object */ - rc = dt_declare_create(env, next, attr, hint, dof, th); - if (rc) + rc = lod_sub_object_declare_create(env, next, attr, hint, dof, th); + if (rc != 0) GOTO(out, rc); if (dof->dof_type == DFT_SYM) @@ -3513,8 +3484,8 @@ int lod_striping_create(const struct lu_env *env, struct dt_object *dt, /* create all underlying objects */ for (i = 0; i < lo->ldo_stripenr; i++) { LASSERT(lo->ldo_stripe[i]); - rc = dt_create(env, lo->ldo_stripe[i], attr, NULL, dof, th); - + rc = lod_sub_object_create(env, lo->ldo_stripe[i], attr, NULL, + dof, th); if (rc) break; } @@ -3542,13 +3513,13 @@ static int lod_object_create(const struct lu_env *env, struct dt_object *dt, struct dt_allocation_hint *hint, struct dt_object_format *dof, struct thandle *th) { - struct dt_object *next = dt_object_child(dt); struct lod_object *lo = lod_dt_obj(dt); int rc; ENTRY; /* create local object */ - rc = dt_create(env, next, attr, hint, dof, th); + rc = lod_sub_object_create(env, dt_object_child(dt), attr, hint, dof, + th); if (rc != 0) RETURN(rc); @@ -3598,22 +3569,24 @@ static int lod_declare_object_destroy(const struct lu_env *env, RETURN(rc); for (i = 0; i < lo->ldo_stripenr; i++) { - rc = dt_declare_ref_del(env, next, th); + rc = lod_sub_object_declare_ref_del(env, next, th); if (rc != 0) RETURN(rc); + snprintf(stripe_name, sizeof(info->lti_key), DFID":%d", PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)), i); - rc = dt_declare_delete(env, next, + rc = lod_sub_object_declare_delete(env, next, (const struct dt_key *)stripe_name, th); if (rc != 0) RETURN(rc); } } + /* * we declare destroy for the local object */ - rc = dt_declare_destroy(env, next, th); + rc = lod_sub_object_declare_destroy(env, next, th); if (rc) RETURN(rc); @@ -3622,18 +3595,17 @@ static int lod_declare_object_destroy(const struct lu_env *env, /* declare destroy all striped objects */ for (i = 0; i < lo->ldo_stripenr; i++) { - if (likely(lo->ldo_stripe[i] != NULL)) { - if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) { - rc = dt_declare_ref_del(env, lo->ldo_stripe[i], - th); - if (rc != 0) - RETURN(rc); - } + if (lo->ldo_stripe[i] == NULL) + continue; - rc = dt_declare_destroy(env, lo->ldo_stripe[i], th); - if (rc != 0) - break; - } + if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) + rc = lod_sub_object_declare_ref_del(env, + lo->ldo_stripe[i], th); + + rc = lod_sub_object_declare_destroy(env, lo->ldo_stripe[i], + th); + if (rc != 0) + break; } RETURN(rc); @@ -3667,7 +3639,7 @@ static int lod_object_destroy(const struct lu_env *env, RETURN(rc); for (i = 0; i < lo->ldo_stripenr; i++) { - rc = dt_ref_del(env, next, th); + rc = lod_sub_object_ref_del(env, next, th); if (rc != 0) RETURN(rc); @@ -3679,13 +3651,14 @@ static int lod_object_destroy(const struct lu_env *env, PFID(lu_object_fid(&dt->do_lu)), stripe_name, PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu))); - rc = dt_delete(env, next, + rc = lod_sub_object_delete(env, next, (const struct dt_key *)stripe_name, th); if (rc != 0) RETURN(rc); } } - rc = dt_destroy(env, next, th); + + rc = lod_sub_object_destroy(env, next, th); if (rc != 0) RETURN(rc); @@ -3700,13 +3673,14 @@ static int lod_object_destroy(const struct lu_env *env, if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) { dt_write_lock(env, lo->ldo_stripe[i], MOR_TGT_CHILD); - rc = dt_ref_del(env, lo->ldo_stripe[i], th); + rc = lod_sub_object_ref_del(env, + lo->ldo_stripe[i], th); dt_write_unlock(env, lo->ldo_stripe[i]); if (rc != 0) break; } - rc = dt_destroy(env, lo->ldo_stripe[i], th); + rc = lod_sub_object_destroy(env, lo->ldo_stripe[i], th); if (rc != 0) break; } @@ -3724,7 +3698,7 @@ static int lod_object_destroy(const struct lu_env *env, static int lod_declare_ref_add(const struct lu_env *env, struct dt_object *dt, struct thandle *th) { - return dt_declare_ref_add(env, dt_object_child(dt), th); + return lod_sub_object_declare_ref_add(env, dt_object_child(dt), th); } /** @@ -3735,7 +3709,7 @@ static int lod_declare_ref_add(const struct lu_env *env, static int lod_ref_add(const struct lu_env *env, struct dt_object *dt, struct thandle *th) { - return dt_ref_add(env, dt_object_child(dt), th); + return lod_sub_object_ref_add(env, dt_object_child(dt), th); } /** @@ -3747,7 +3721,7 @@ static int lod_ref_add(const struct lu_env *env, static int lod_declare_ref_del(const struct lu_env *env, struct dt_object *dt, struct thandle *th) { - return dt_declare_ref_del(env, dt_object_child(dt), th); + return lod_sub_object_declare_ref_del(env, dt_object_child(dt), th); } /** @@ -3758,7 +3732,7 @@ static int lod_declare_ref_del(const struct lu_env *env, static int lod_ref_del(const struct lu_env *env, struct dt_object *dt, struct thandle *th) { - return dt_ref_del(env, dt_object_child(dt), th); + return lod_sub_object_ref_del(env, dt_object_child(dt), th); } /** @@ -3996,8 +3970,8 @@ static ssize_t lod_declare_write(const struct lu_env *env, const struct lu_buf *buf, loff_t pos, struct thandle *th) { - return dt_declare_record_write(env, dt_object_child(dt), - buf, pos, th); + return lod_sub_object_declare_write(env, dt_object_child(dt), buf, pos, + th); } /** @@ -4009,9 +3983,7 @@ static ssize_t lod_write(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, loff_t *pos, struct thandle *th, int iq) { - struct dt_object *next = dt_object_child(dt); - LASSERT(next); - return next->do_body_ops->dbo_write(env, next, buf, pos, th, iq); + return lod_sub_object_write(env, dt_object_child(dt), buf, pos, th, iq); } static const struct dt_body_operations lod_body_lnk_ops = { diff --git a/lustre/lod/lod_qos.c b/lustre/lod/lod_qos.c index 33f8d0f..7ee3d33 100644 --- a/lustre/lod/lod_qos.c +++ b/lustre/lod/lod_qos.c @@ -696,8 +696,8 @@ static struct dt_object *lod_qos_declare_object_on(const struct lu_env *env, dt = container_of(n, struct dt_object, do_lu); - rc = dt_declare_create(env, dt, NULL, NULL, NULL, th); - if (rc) { + rc = lod_sub_object_declare_create(env, dt, NULL, NULL, NULL, th); + if (rc < 0) { CDEBUG(D_OTHER, "can't declare creation on #%u: %d\n", ost_idx, rc); lu_object_put(env, o); @@ -1900,8 +1900,9 @@ int lod_qos_prep_create(const struct lu_env *env, struct lod_object *lo, o = lo->ldo_stripe[i]; LASSERT(o); - rc = dt_declare_create(env, o, attr, NULL, NULL, th); - if (rc) { + rc = lod_sub_object_declare_create(env, o, attr, NULL, + NULL, th); + if (rc < 0) { CERROR("can't declare create: %d\n", rc); break; } diff --git a/lustre/lod/lod_sub_object.c b/lustre/lod/lod_sub_object.c new file mode 100644 index 0000000..cce804f --- /dev/null +++ b/lustre/lod/lod_sub_object.c @@ -0,0 +1,702 @@ +/* + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License version 2 for more details. A copy is + * included in the COPYING file that accompanied this code. + + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * GPL HEADER END + */ +/* + * Copyright (c) 2014, Intel Corporation. + */ +/* + * lustre/lod/lod_sub_object.c + * + * LOD sub object methods + * + * This file implements sub-object methods for LOD. + * + * LOD is Logic volume layer in the MDS stack, which will handle striping + * and distribute the update to different OSP/OSD. After directing the updates + * to one specific OSD/OSP, it also needs to do some thing before calling + * OSD/OSP API, for example recording updates for cross-MDT operation, get + * the next level transaction etc. + * + * Author: Di Wang + */ + +#define DEBUG_SUBSYSTEM S_MDS + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "lod_internal.h" + +struct thandle *lod_sub_get_thandle(const struct lu_env *env, + struct thandle *th, + const struct dt_object *sub_obj) +{ + struct lod_device *lod = dt2lod_dev(th->th_dev); + struct top_thandle *tth; + struct thandle *sub_th; + int type = LU_SEQ_RANGE_ANY; + __u32 mdt_index; + int rc; + ENTRY; + + if (th->th_top == NULL) + RETURN(th); + + tth = container_of(th, struct top_thandle, tt_super); + LASSERT(tth->tt_magic == TOP_THANDLE_MAGIC); + /* local object must be mdt object, Note: during ost object + * creation, FID is not assigned until osp_object_create(), + * so if the FID of sub_obj is zero, it means OST object. */ + if (!dt_object_remote(sub_obj) || + fid_is_zero(lu_object_fid(&sub_obj->do_lu))) + RETURN(tth->tt_master_sub_thandle); + + rc = lod_fld_lookup(env, lod, lu_object_fid(&sub_obj->do_lu), + &mdt_index, &type); + if (rc < 0) + RETURN(ERR_PTR(rc)); + + if (type == LU_SEQ_RANGE_OST) + RETURN(tth->tt_master_sub_thandle); + + sub_th = thandle_get_sub(env, th, sub_obj); + + RETURN(sub_th); +} + +/** + * Declare sub-object creation. + * + * Get transaction of next layer and declare the creation of the object. + * + * \param[in] env execution environment + * \param[in] dt the object being created + * \param[in] attr the attributes of the object being created + * \param[in] hint the hint of the creation + * \param[in] dof the object format of the creation + * \param[th] th the transaction handle + * + * \retval 0 if the declaration succeeds + * \retval negative errno if the declaration fails. + */ +int lod_sub_object_declare_create(const struct lu_env *env, + struct dt_object *dt, + struct lu_attr *attr, + struct dt_allocation_hint *hint, + struct dt_object_format *dof, + struct thandle *th) +{ + struct thandle *sub_th; + + sub_th = lod_sub_get_thandle(env, th, dt); + if (IS_ERR(sub_th)) + return PTR_ERR(sub_th); + + return dt_declare_create(env, dt, attr, hint, dof, sub_th); +} + +/** + * Create sub-object. + * + * Get transaction of next layer, record updates if it belongs to cross-MDT + * operation, and create the object. + * + * \param[in] env execution environment + * \param[in] dt the object being created + * \param[in] attr the attributes of the object being created + * \param[in] hint the hint of the creation + * \param[in] dof the object format of the creation + * \param[th] th the transaction handle + * + * \retval 0 if the creation succeeds + * \retval negative errno if the creation fails. + */ +int lod_sub_object_create(const struct lu_env *env, struct dt_object *dt, + struct lu_attr *attr, + struct dt_allocation_hint *hint, + struct dt_object_format *dof, + struct thandle *th) +{ + struct thandle *sub_th; + int rc; + ENTRY; + + sub_th = lod_sub_get_thandle(env, th, dt); + if (IS_ERR(sub_th)) + RETURN(PTR_ERR(sub_th)); + + rc = dt_create(env, dt, attr, hint, dof, sub_th); + + RETURN(rc); +} + +/** + * Declare adding reference for the sub-object + * + * Get transaction of next layer and declare the reference adding. + * + * \param[in] env execution environment + * \param[in] dt dt object to add reference + * \param[in] th transaction handle + * + * \retval 0 if the declaration succeeds. + * \retval negative errno if the declaration fails. + */ +int lod_sub_object_declare_ref_add(const struct lu_env *env, + struct dt_object *dt, + struct thandle *th) +{ + struct thandle *sub_th; + int rc; + ENTRY; + + sub_th = lod_sub_get_thandle(env, th, dt); + if (IS_ERR(sub_th)) + RETURN(PTR_ERR(sub_th)); + + rc = dt_declare_ref_add(env, dt, sub_th); + + RETURN(rc); +} + +/** + * Add reference for the sub-object + * + * Get transaction of next layer, record updates if it belongs to cross-MDT + * operation and add reference of the object. + * + * \param[in] env execution environment + * \param[in] dt dt object to add reference + * \param[in] th transaction handle + * + * \retval 0 if it succeeds. + * \retval negative errno if it fails. + */ +int lod_sub_object_ref_add(const struct lu_env *env, struct dt_object *dt, + struct thandle *th) +{ + struct thandle *sub_th; + int rc; + ENTRY; + + sub_th = lod_sub_get_thandle(env, th, dt); + if (IS_ERR(sub_th)) + RETURN(PTR_ERR(sub_th)); + + rc = dt_ref_add(env, dt, sub_th); + + RETURN(rc); +} + +/** + * Declare deleting reference for the sub-object + * + * Get transaction of next layer and declare the reference deleting. + * + * \param[in] env execution environment + * \param[in] dt dt object to delete reference + * \param[in] th transaction handle + * + * \retval 0 if the declaration succeeds. + * \retval negative errno if the declaration fails. + */ +int lod_sub_object_declare_ref_del(const struct lu_env *env, + struct dt_object *dt, + struct thandle *th) +{ + struct thandle *sub_th; + int rc; + ENTRY; + + sub_th = lod_sub_get_thandle(env, th, dt); + if (IS_ERR(sub_th)) + RETURN(PTR_ERR(sub_th)); + + rc = dt_declare_ref_del(env, dt, sub_th); + + RETURN(rc); +} + +/** + * Delete reference for the sub-object + * + * Get transaction of next layer, record updates if it belongs to cross-MDT + * operation and delete reference of the object. + * + * \param[in] env execution environment + * \param[in] dt dt object to delete reference + * \param[in] th transaction handle + * + * \retval 0 if it succeeds. + * \retval negative errno if it fails. + */ +int lod_sub_object_ref_del(const struct lu_env *env, struct dt_object *dt, + struct thandle *th) +{ + struct thandle *sub_th; + int rc; + ENTRY; + + sub_th = lod_sub_get_thandle(env, th, dt); + if (IS_ERR(sub_th)) + RETURN(PTR_ERR(sub_th)); + + rc = dt_ref_del(env, dt, sub_th); + + RETURN(rc); +} + +/** + * Declare destroying sub-object + * + * Get transaction of next layer and declare the sub-object destroy. + * + * \param[in] env execution environment + * \param[in] dt dt object to be destroyed + * \param[in] th transaction handle + * + * \retval 0 if the declaration succeeds. + * \retval negative errno if the declaration fails. + */ +int lod_sub_object_declare_destroy(const struct lu_env *env, + struct dt_object *dt, + struct thandle *th) +{ + struct thandle *sub_th; + int rc; + ENTRY; + + sub_th = lod_sub_get_thandle(env, th, dt); + if (IS_ERR(sub_th)) + RETURN(PTR_ERR(sub_th)); + + rc = dt_declare_destroy(env, dt, sub_th); + + RETURN(rc); +} + +/** + * Destroy sub-object + * + * Get transaction of next layer, record updates if it belongs to cross-MDT + * operation and destroy the object. + * + * \param[in] env execution environment + * \param[in] dt dt object to be destroyed + * \param[in] th transaction handle + * + * \retval 0 if the destroy succeeds. + * \retval negative errno if the destroy fails. + */ +int lod_sub_object_destroy(const struct lu_env *env, struct dt_object *dt, + struct thandle *th) +{ + struct thandle *sub_th; + int rc; + ENTRY; + + sub_th = lod_sub_get_thandle(env, th, dt); + if (IS_ERR(sub_th)) + RETURN(PTR_ERR(sub_th)); + + rc = dt_destroy(env, dt, sub_th); + + RETURN(rc); +} + +/** + * Declare sub-object index insert + * + * Get transaction of next layer and declare index insert. + * + * \param[in] env execution environment + * \param[in] dt object for which to insert index + * \param[in] rec record of the index which will be inserted + * \param[in] key key of the index which will be inserted + * \param[in] th the transaction handle + * + * \retval 0 if the declaration succeeds. + * \retval negative errno if the declaration fails. + */ +int lod_sub_object_declare_insert(const struct lu_env *env, + struct dt_object *dt, + const struct dt_rec *rec, + const struct dt_key *key, + struct thandle *th) +{ + struct thandle *sub_th; + + sub_th = lod_sub_get_thandle(env, th, dt); + if (IS_ERR(sub_th)) + return PTR_ERR(sub_th); + + return dt_declare_insert(env, dt, rec, key, sub_th); +} + +/** + * Insert index of sub object + * + * Get transaction of next layer, record updates if it belongs to cross-MDT + * operation, and insert the index. + * + * \param[in] env execution environment + * \param[in] dt object for which to insert index + * \param[in] rec record of the index to be inserted + * \param[in] key key of the index to be inserted + * \param[in] th the transaction handle + * \param[in] ign whether ignore quota + * + * \retval 0 if the insertion succeeds. + * \retval negative errno if the insertion fails. + */ +int lod_sub_object_index_insert(const struct lu_env *env, struct dt_object *dt, + const struct dt_rec *rec, + const struct dt_key *key, struct thandle *th, + int ign) +{ + struct thandle *sub_th; + + sub_th = lod_sub_get_thandle(env, th, dt); + if (IS_ERR(sub_th)) + return PTR_ERR(sub_th); + + return dt_insert(env, dt, rec, key, sub_th, ign); +} + +/** + * Declare sub-object index delete + * + * Get transaction of next layer and declare index deletion. + * + * \param[in] env execution environment + * \param[in] dt object for which to delete index + * \param[in] key key of the index which will be deleted + * \param[in] th the transaction handle + * + * \retval 0 if the declaration succeeds. + * \retval negative errno if the declaration fails. + */ +int lod_sub_object_declare_delete(const struct lu_env *env, + struct dt_object *dt, + const struct dt_key *key, + struct thandle *th) +{ + struct thandle *sub_th; + + sub_th = lod_sub_get_thandle(env, th, dt); + if (IS_ERR(sub_th)) + return PTR_ERR(sub_th); + + return dt_declare_delete(env, dt, key, sub_th); +} + +/** + * Delete index of sub object + * + * Get transaction of next layer, record updates if it belongs to cross-MDT + * operation, and delete the index. + * + * \param[in] env execution environment + * \param[in] dt object for which to delete index + * \param[in] key key of the index to be deleted + * \param[in] th the transaction handle + * + * \retval 0 if the deletion succeeds. + * \retval negative errno if the deletion fails. + */ +int lod_sub_object_delete(const struct lu_env *env, struct dt_object *dt, + const struct dt_key *name, struct thandle *th) +{ + struct thandle *sub_th; + int rc; + ENTRY; + + sub_th = lod_sub_get_thandle(env, th, dt); + if (IS_ERR(sub_th)) + RETURN(PTR_ERR(sub_th)); + + rc = dt_delete(env, dt, name, sub_th); + RETURN(rc); +} + +/** + * Declare xattr_set + * + * Get transaction of next layer, and declare xattr set. + * + * \param[in] env execution environment + * \param[in] dt object on which to set xattr + * \param[in] buf xattr to be set + * \param[in] name name of the xattr + * \param[in] fl flag for setting xattr + * + * \retval 0 if the declaration succeeds. + * \retval negative errno if the declaration fails. + */ +int lod_sub_object_declare_xattr_set(const struct lu_env *env, + struct dt_object *dt, + const struct lu_buf *buf, + const char *name, int fl, + struct thandle *th) +{ + struct thandle *sub_th; + int rc; + ENTRY; + + sub_th = lod_sub_get_thandle(env, th, dt); + if (IS_ERR(sub_th)) + RETURN(PTR_ERR(sub_th)); + + rc = dt_declare_xattr_set(env, dt, buf, name, fl, sub_th); + + RETURN(rc); +} + +/** + * Set xattr + * + * Get transaction of next layer, record updates if it belongs to cross-MDT + * operation, and set xattr to the object. + * + * \param[in] env execution environment + * \param[in] dt object on which to set xattr + * \param[in] buf xattr to be set + * \param[in] name name of the xattr + * \param[in] fl flag for setting xattr + * \param[in] th transaction handle + * + * \retval 0 if the xattr setting succeeds. + * \retval negative errno if xattr setting fails. + */ +int lod_sub_object_xattr_set(const struct lu_env *env, struct dt_object *dt, + const struct lu_buf *buf, const char *name, int fl, + struct thandle *th) +{ + struct thandle *sub_th; + int rc; + ENTRY; + + sub_th = lod_sub_get_thandle(env, th, dt); + if (IS_ERR(sub_th)) + RETURN(PTR_ERR(sub_th)); + + rc = dt_xattr_set(env, dt, buf, name, fl, sub_th); + + RETURN(rc); +} + +/** + * Declare attr_set + * + * Get transaction of next layer, and declare attr set. + * + * \param[in] env execution environment + * \param[in] dt object on which to set attr + * \param[in] attr attributes to be set + * \param[in] th transaction handle + * + * \retval 0 if the declaration succeeds. + * \retval negative errno if the declaration fails. + */ +int lod_sub_object_declare_attr_set(const struct lu_env *env, + struct dt_object *dt, + const struct lu_attr *attr, + struct thandle *th) +{ + struct thandle *sub_th; + int rc; + ENTRY; + + sub_th = lod_sub_get_thandle(env, th, dt); + if (IS_ERR(sub_th)) + RETURN(PTR_ERR(sub_th)); + + rc = dt_declare_attr_set(env, dt, attr, sub_th); + + RETURN(rc); +} + +/** + * attributes set + * + * Get transaction of next layer, record updates if it belongs to cross-MDT + * operation, and set attributes to the object. + * + * \param[in] env execution environment + * \param[in] dt object on which to set attr + * \param[in] attr attrbutes to be set + * \param[in] th transaction handle + * + * \retval 0 if attributes setting succeeds. + * \retval negative errno if the attributes setting fails. + */ +int lod_sub_object_attr_set(const struct lu_env *env, + struct dt_object *dt, + const struct lu_attr *attr, + struct thandle *th) +{ + struct thandle *sub_th; + int rc; + ENTRY; + + sub_th = lod_sub_get_thandle(env, th, dt); + if (IS_ERR(sub_th)) + RETURN(PTR_ERR(sub_th)); + + rc = dt_attr_set(env, dt, attr, sub_th); + + RETURN(rc); +} + +/** + * Declare xattr_del + * + * Get transaction of next layer, and declare xattr deletion. + * + * \param[in] env execution environment + * \param[in] dt object on which to delete xattr + * \param[in] name name of the xattr to be deleted + * \param[in] th transaction handle + * + * \retval 0 if the declaration succeeds. + * \retval negative errno if the declaration fails. + */ +int lod_sub_object_declare_xattr_del(const struct lu_env *env, + struct dt_object *dt, + const char *name, + struct thandle *th) +{ + struct thandle *sub_th; + int rc; + ENTRY; + + sub_th = lod_sub_get_thandle(env, th, dt); + if (IS_ERR(sub_th)) + RETURN(PTR_ERR(sub_th)); + + rc = dt_declare_xattr_del(env, dt, name, sub_th); + + RETURN(rc); +} + +/** + * xattribute deletion + * + * Get transaction of next layer, record update if it belongs to cross-MDT + * operation and delete xattr. + * + * \param[in] env execution environment + * \param[in] dt object on which to delete xattr + * \param[in] name name of the xattr to be deleted + * \param[in] th transaction handle + * + * \retval 0 if the deletion succeeds. + * \retval negative errno if the deletion fails. + */ +int lod_sub_object_xattr_del(const struct lu_env *env, + struct dt_object *dt, + const char *name, + struct thandle *th) +{ + struct thandle *sub_th; + int rc; + ENTRY; + + sub_th = lod_sub_get_thandle(env, th, dt); + if (IS_ERR(sub_th)) + RETURN(PTR_ERR(sub_th)); + + rc = dt_xattr_del(env, dt, name, sub_th); + + RETURN(rc); +} + +/** + * Declare buffer write + * + * Get transaction of next layer and declare buffer write. + * + * \param[in] env execution environment + * \param[in] dt object to be written + * \param[in] buf buffer to write which includes an embedded size field + * \param[in] pos offet in the object to start writing at + * \param[in] th transaction handle + * + * \retval 0 if the insertion succeeds. + * \retval negative errno if the insertion fails. + */ +int lod_sub_object_declare_write(const struct lu_env *env, + struct dt_object *dt, + const struct lu_buf *buf, loff_t pos, + struct thandle *th) +{ + struct thandle *sub_th; + int rc; + ENTRY; + + sub_th = lod_sub_get_thandle(env, th, dt); + if (IS_ERR(sub_th)) + RETURN(PTR_ERR(sub_th)); + + rc = dt_declare_write(env, dt, buf, pos, sub_th); + + RETURN(rc); +} + +/** + * Write buffer to sub object + * + * Get transaction of next layer, records buffer write if it belongs to + * Cross-MDT operation, and write buffer. + * + * \param[in] env execution environment + * \param[in] dt object to be written + * \param[in] buf buffer to write which includes an embedded size field + * \param[in] pos offet in the object to start writing at + * \param[in] th transaction handle + * \param[in] rq enforcement for this write + * + * \retval the buffer size in bytes if it succeeds. + * \retval negative errno if it fails. + */ +ssize_t lod_sub_object_write(const struct lu_env *env, struct dt_object *dt, + const struct lu_buf *buf, loff_t *pos, + struct thandle *th, int rq) +{ + struct thandle *sub_th; + ssize_t rc; + ENTRY; + + sub_th = lod_sub_get_thandle(env, th, dt); + if (IS_ERR(sub_th)) + RETURN(PTR_ERR(sub_th)); + + rc = dt_write(env, dt, buf, pos, sub_th, rq); + RETURN(rc); +} diff --git a/lustre/mdd/mdd_dir.c b/lustre/mdd/mdd_dir.c index d581740..92b3e8f 100644 --- a/lustre/mdd/mdd_dir.c +++ b/lustre/mdd/mdd_dir.c @@ -663,6 +663,7 @@ int mdd_declare_changelog_store(const struct lu_env *env, struct llog_ctxt *ctxt; struct llog_changelog_rec *rec; struct lu_buf *buf; + struct thandle *llog_th; int reclen; int rc; @@ -683,7 +684,13 @@ int mdd_declare_changelog_store(const struct lu_env *env, if (ctxt == NULL) return -ENXIO; - rc = llog_declare_add(env, ctxt->loc_handle, &rec->cr_hdr, handle); + llog_th = thandle_get_sub(env, handle, ctxt->loc_handle->lgh_obj); + if (IS_ERR(llog_th)) + GOTO(out_put, rc = PTR_ERR(llog_th)); + + rc = llog_declare_add(env, ctxt->loc_handle, &rec->cr_hdr, llog_th); + +out_put: llog_ctxt_put(ctxt); return rc; @@ -701,6 +708,7 @@ int mdd_changelog_store(const struct lu_env *env, struct mdd_device *mdd, { struct obd_device *obd = mdd2obd_dev(mdd); struct llog_ctxt *ctxt; + struct thandle *llog_th; int rc; rec->cr_hdr.lrh_len = llog_data_len(sizeof(*rec) + @@ -721,12 +729,17 @@ int mdd_changelog_store(const struct lu_env *env, struct mdd_device *mdd, if (ctxt == NULL) return -ENXIO; + llog_th = thandle_get_sub(env, th, ctxt->loc_handle->lgh_obj); + if (IS_ERR(llog_th)) + GOTO(out_put, rc = PTR_ERR(llog_th)); + /* nested journal transaction */ - rc = llog_add(env, ctxt->loc_handle, &rec->cr_hdr, NULL, th); + rc = llog_add(env, ctxt->loc_handle, &rec->cr_hdr, NULL, llog_th); + +out_put: llog_ctxt_put(ctxt); if (rc > 0) rc = 0; - return rc; } @@ -1111,6 +1124,7 @@ int mdd_links_write(const struct lu_env *env, struct mdd_object *mdd_obj, if (unlikely(rc == -ENOSPC) && S_ISREG(mdd_object_type(mdd_obj)) && mdd_object_remote(mdd_obj) == 0) { struct lfsck_request *lr = &mdd_env_info(env)->mti_lr; + struct thandle *sub_th; /* XXX: If the linkEA is overflow, then we need to notify the * namespace LFSCK to skip "nlink" attribute verification @@ -1120,8 +1134,11 @@ int mdd_links_write(const struct lu_env *env, struct mdd_object *mdd_obj, * mechanism in future. LU-5802. */ lfsck_pack_rfa(lr, mdo2fid(mdd_obj), LE_SKIP_NLINK, LFSCK_TYPE_NAMESPACE); + + sub_th = thandle_get_sub_by_dt(env, handle, + mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom); lfsck_in_notify(env, mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom, - lr, handle); + lr, sub_th); } return rc; @@ -1152,6 +1169,7 @@ int mdd_declare_links_add(const struct lu_env *env, struct mdd_object *mdd_obj, if (mdd_object_remote(mdd_obj) == 0 && overflow == MLAO_CHECK) { struct lfsck_request *lr = &mdd_env_info(env)->mti_lr; + struct thandle *sub_th; /* XXX: If the linkEA is overflow, then we need to notify the * namespace LFSCK to skip "nlink" attribute verification @@ -1161,9 +1179,12 @@ int mdd_declare_links_add(const struct lu_env *env, struct mdd_object *mdd_obj, * mechanism in future. LU-5802. */ lfsck_pack_rfa(lr, mdo2fid(mdd_obj), LE_SKIP_NLINK_DECLARE, LFSCK_TYPE_NAMESPACE); + + sub_th = thandle_get_sub_by_dt(env, handle, + mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom); rc = lfsck_in_notify(env, mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom, - lr, handle); + lr, sub_th); } return rc; @@ -2064,13 +2085,6 @@ static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd, if (rc) return rc; } - - /* XXX: For remote create, it should indicate the remote RPC - * will be sent after local transaction is finished, which - * is not very nice, but it will be removed once we fully support - * async update */ - if (mdd_object_remote(p) && handle->th_update != NULL) - handle->th_update->tu_sent_after_local_trans = 1; out: return rc; } diff --git a/lustre/osd-ldiskfs/osd_handler.c b/lustre/osd-ldiskfs/osd_handler.c index e2d7e68..6aa802e 100644 --- a/lustre/osd-ldiskfs/osd_handler.c +++ b/lustre/osd-ldiskfs/osd_handler.c @@ -959,7 +959,7 @@ static void osd_trans_commit_cb(struct super_block *sb, lu_context_exit(&th->th_ctx); lu_context_fini(&th->th_ctx); - thandle_put(th); + OBD_FREE_PTR(oh); } static struct thandle *osd_trans_create(const struct lu_env *env, @@ -984,8 +984,6 @@ static struct thandle *osd_trans_create(const struct lu_env *env, th->th_result = 0; th->th_tags = LCT_TX_HANDLE; oh->ot_credits = 0; - atomic_set(&th->th_refc, 1); - th->th_alloc_size = sizeof(*oh); INIT_LIST_HEAD(&oh->ot_dcb_list); osd_th_alloced(oh); @@ -1169,7 +1167,7 @@ static int osd_trans_stop(const struct lu_env *env, struct dt_device *dt, CERROR("%s: failed to stop transaction: rc = %d\n", osd_name(osd), rc); } else { - thandle_put(&oh->ot_super); + OBD_FREE_PTR(oh); } /* inform the quota slave device that the transaction is stopping */ diff --git a/lustre/osd-zfs/osd_handler.c b/lustre/osd-zfs/osd_handler.c index dfae067..1625b62 100644 --- a/lustre/osd-zfs/osd_handler.c +++ b/lustre/osd-zfs/osd_handler.c @@ -166,7 +166,7 @@ static void osd_trans_commit_cb(void *cb_data, int error) th->th_dev = NULL; lu_context_exit(&th->th_ctx); lu_context_fini(&th->th_ctx); - thandle_put(&oh->ot_super); + OBD_FREE_PTR(oh); EXIT; } @@ -248,7 +248,7 @@ static int osd_trans_stop(const struct lu_env *env, struct dt_device *dt, /* there won't be any commit, release reserved quota space now, * if any */ qsd_op_end(env, osd->od_quota_slave, &oh->ot_quota_trans); - thandle_put(&oh->ot_super); + OBD_FREE_PTR(oh); RETURN(0); } @@ -308,8 +308,6 @@ static struct thandle *osd_trans_create(const struct lu_env *env, th->th_dev = dt; th->th_result = 0; th->th_tags = LCT_TX_HANDLE; - atomic_set(&th->th_refc, 1); - th->th_alloc_size = sizeof(*oh); RETURN(th); } diff --git a/lustre/osp/osp_internal.h b/lustre/osp/osp_internal.h index 243d3ca..f5fd23d 100644 --- a/lustre/osp/osp_internal.h +++ b/lustre/osp/osp_internal.h @@ -296,11 +296,35 @@ struct osp_it { struct page **ooi_pages; }; +struct osp_thandle { + struct thandle ot_super; + struct dt_update_request *ot_dur; + bool ot_send_updates_after_local_trans:1; + + /* OSP will use this thandle to update last oid*/ + struct thandle *ot_storage_th; +}; + +static inline struct osp_thandle * +thandle_to_osp_thandle(struct thandle *th) +{ + return container_of(th, struct osp_thandle, ot_super); +} + +static inline struct dt_update_request * +thandle_to_dt_update_request(struct thandle *th) +{ + struct osp_thandle *oth; + + oth = thandle_to_osp_thandle(th); + return oth->ot_dur; +} + /* The transaction only include the updates on the remote node, and * no local updates at all */ static inline bool is_only_remote_trans(struct thandle *th) { - return th->th_dev != NULL && th->th_dev->dd_ops == &osp_dt_ops; + return th->th_top == NULL; } static inline void osp_objid_buf_prep(struct lu_buf *buf, loff_t *off, @@ -534,12 +558,17 @@ struct thandle *osp_trans_create(const struct lu_env *env, struct dt_device *d); int osp_trans_start(const struct lu_env *env, struct dt_device *dt, struct thandle *th); + int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp, const struct object_update_request *ureq, struct ptlrpc_request **reqp); int osp_remote_sync(const struct lu_env *env, struct osp_device *osp, struct dt_update_request *update, struct ptlrpc_request **reqp, bool rpc_lock); + +struct thandle *osp_get_storage_thandle(const struct lu_env *env, + struct thandle *th, + struct osp_device *osp); /* osp_object.c */ int osp_attr_get(const struct lu_env *env, struct dt_object *dt, struct lu_attr *attr); diff --git a/lustre/osp/osp_md_object.c b/lustre/osp/osp_md_object.c index c031457..a587b57 100644 --- a/lustre/osp/osp_md_object.c +++ b/lustre/osp/osp_md_object.c @@ -85,13 +85,8 @@ static int __osp_md_declare_object_create(const struct lu_env *env, struct dt_update_request *update; int rc; - update = dt_update_request_find_or_create(th, dt); - if (IS_ERR(update)) { - CERROR("%s: Get OSP update buf failed: rc = %d\n", - dt->do_lu.lo_dev->ld_obd->obd_name, - (int)PTR_ERR(update)); - return PTR_ERR(update); - } + update = thandle_to_dt_update_request(th); + LASSERT(update != NULL); if (lu_object_exists(&dt->do_lu)) { /* If the object already exists, we needs to destroy @@ -245,13 +240,8 @@ static int __osp_md_ref_del(const struct lu_env *env, struct dt_object *dt, struct dt_update_request *update; int rc; - update = dt_update_request_find_or_create(th, dt); - if (IS_ERR(update)) { - CERROR("%s: Get OSP update buf failed: rc = %d\n", - dt->do_lu.lo_dev->ld_obd->obd_name, - (int)PTR_ERR(update)); - return PTR_ERR(update); - } + update = thandle_to_dt_update_request(th); + LASSERT(update != NULL); rc = out_ref_del_pack(env, &update->dur_buf, lu_object_fid(&dt->do_lu), @@ -330,13 +320,8 @@ static int __osp_md_ref_add(const struct lu_env *env, struct dt_object *dt, struct dt_update_request *update; int rc; - update = dt_update_request_find_or_create(th, dt); - if (IS_ERR(update)) { - CERROR("%s: Get OSP update buf failed: rc = %d\n", - dt->do_lu.lo_dev->ld_obd->obd_name, - (int)PTR_ERR(update)); - return PTR_ERR(update); - } + update = thandle_to_dt_update_request(th); + LASSERT(update != NULL); rc = out_ref_add_pack(env, &update->dur_buf, lu_object_fid(&dt->do_lu), @@ -443,13 +428,8 @@ int __osp_md_attr_set(const struct lu_env *env, struct dt_object *dt, struct dt_update_request *update; int rc; - update = dt_update_request_find_or_create(th, dt); - if (IS_ERR(update)) { - CERROR("%s: Get OSP update buf failed: %d\n", - dt->do_lu.lo_dev->ld_obd->obd_name, - (int)PTR_ERR(update)); - return PTR_ERR(update); - } + update = thandle_to_dt_update_request(th); + LASSERT(update != NULL); rc = out_attr_set_pack(env, &update->dur_buf, lu_object_fid(&dt->do_lu), attr, @@ -722,20 +702,23 @@ static int __osp_md_index_insert(const struct lu_env *env, const struct dt_key *key, struct thandle *th) { - struct dt_update_request *update; + struct osp_thandle *oth = thandle_to_osp_thandle(th); + struct dt_update_request *update = oth->ot_dur; int rc; - update = dt_update_request_find_or_create(th, dt); - if (IS_ERR(update)) { - CERROR("%s: Get OSP update buf failed: rc = %d\n", - dt->do_lu.lo_dev->ld_obd->obd_name, - (int)PTR_ERR(update)); - return PTR_ERR(update); - } rc = out_index_insert_pack(env, &update->dur_buf, lu_object_fid(&dt->do_lu), rec, key, update->dur_batchid); + if (rc != 0) + return rc; + + /* Before async update is allowed, if it will insert remote + * name entry, it should make sure the local object is created, + * i.e. the remote update RPC should be sent after local + * update(create object) */ + oth->ot_send_updates_after_local_trans = true; + return rc; } @@ -829,13 +812,8 @@ static int __osp_md_index_delete(const struct lu_env *env, struct dt_update_request *update; int rc; - update = dt_update_request_find_or_create(th, dt); - if (IS_ERR(update)) { - CERROR("%s: Get OSP update buf failed: rc = %d\n", - dt->do_lu.lo_dev->ld_obd->obd_name, - (int)PTR_ERR(update)); - return PTR_ERR(update); - } + update = thandle_to_dt_update_request(th); + LASSERT(update != NULL); rc = out_index_delete_pack(env, &update->dur_buf, lu_object_fid(&dt->do_lu), key, @@ -1226,13 +1204,8 @@ static ssize_t osp_md_declare_write(const struct lu_env *env, struct dt_update_request *update; ssize_t rc; - update = dt_update_request_find_or_create(th, dt); - if (IS_ERR(update)) { - CERROR("%s: Get OSP update buf failed: rc = %d\n", - dt->do_lu.lo_dev->ld_obd->obd_name, - (int)PTR_ERR(update)); - return PTR_ERR(update); - } + update = thandle_to_dt_update_request(th); + LASSERT(update != NULL); rc = out_write_pack(env, &update->dur_buf, lu_object_fid(&dt->do_lu), buf, pos, update->dur_batchid); diff --git a/lustre/osp/osp_object.c b/lustre/osp/osp_object.c index 8388f63..c984151 100644 --- a/lustre/osp/osp_object.c +++ b/lustre/osp/osp_object.c @@ -1121,16 +1121,8 @@ static int __osp_xattr_set(const struct lu_env *env, struct dt_object *dt, ENTRY; LASSERT(buf->lb_len > 0 && buf->lb_buf != NULL); - - update = dt_update_request_find_or_create(th, dt); - if (IS_ERR(update)) { - CERROR("%s: Get OSP update buf failed "DFID": rc = %d\n", - dt->do_lu.lo_dev->ld_obd->obd_name, - PFID(lu_object_fid(&dt->do_lu)), - (int)PTR_ERR(update)); - - RETURN(PTR_ERR(update)); - } + update = thandle_to_dt_update_request(th); + LASSERT(update != NULL); rc = out_xattr_set_pack(env, &update->dur_buf, lu_object_fid(&dt->do_lu), @@ -1266,9 +1258,8 @@ static int __osp_xattr_del(const struct lu_env *env, struct dt_object *dt, struct osp_xattr_entry *oxe; int rc; - update = dt_update_request_find_or_create(th, dt); - if (IS_ERR(update)) - return PTR_ERR(update); + update = thandle_to_dt_update_request(th); + LASSERT(update != NULL); fid = lu_object_fid(&dt->do_lu); @@ -1388,6 +1379,7 @@ static int osp_declare_object_create(const struct lu_env *env, struct osp_device *d = lu2osp_dev(dt->do_lu.lo_dev); struct osp_object *o = dt2osp_obj(dt); const struct lu_fid *fid = lu_object_fid(&dt->do_lu); + struct thandle *local_th; int rc = 0; ENTRY; @@ -1416,13 +1408,19 @@ static int osp_declare_object_create(const struct lu_env *env, */ /* rc = osp_sync_declare_add(env, o, MDS_UNLINK64_REC, th); */ + local_th = osp_get_storage_thandle(env, th, d); + if (IS_ERR(local_th)) + RETURN(PTR_ERR(local_th)); + if (unlikely(!fid_is_zero(fid))) { /* replay case: caller knows fid */ osi->osi_off = sizeof(osi->osi_id) * d->opd_index; osi->osi_lb.lb_len = sizeof(osi->osi_id); osi->osi_lb.lb_buf = NULL; + rc = dt_declare_record_write(env, d->opd_last_used_oid_file, - &osi->osi_lb, osi->osi_off, th); + &osi->osi_lb, osi->osi_off, + local_th); RETURN(rc); } @@ -1447,7 +1445,8 @@ static int osp_declare_object_create(const struct lu_env *env, osi->osi_lb.lb_len = sizeof(osi->osi_id); osi->osi_lb.lb_buf = NULL; rc = dt_declare_record_write(env, d->opd_last_used_oid_file, - &osi->osi_lb, osi->osi_off, th); + &osi->osi_lb, osi->osi_off, + local_th); } else { /* not needed in the cache anymore */ set_bit(LU_OBJECT_HEARD_BANSHEE, @@ -1487,6 +1486,7 @@ static int osp_object_create(const struct lu_env *env, struct dt_object *dt, struct osp_object *o = dt2osp_obj(dt); int rc = 0; struct lu_fid *fid = &osi->osi_fid; + struct thandle *local_th; ENTRY; if (is_only_remote_trans(th) && @@ -1529,6 +1529,9 @@ static int osp_object_create(const struct lu_env *env, struct dt_object *dt, if (osp_precreate_end_seq(env, d) && osp_is_fid_client(d)) th->th_sync = 1; + local_th = osp_get_storage_thandle(env, th, d); + if (IS_ERR(local_th)) + RETURN(PTR_ERR(local_th)); /* * it's OK if the import is inactive by this moment - id was created * by OST earlier, we just need to maintain it consistently on the disk @@ -1563,7 +1566,7 @@ static int osp_object_create(const struct lu_env *env, struct dt_object *dt, &d->opd_last_used_fid.f_oid, d->opd_index); rc = dt_record_write(env, d->opd_last_used_oid_file, &osi->osi_lb, - &osi->osi_off, th); + &osi->osi_off, local_th); CDEBUG(D_HA, "%s: Wrote last used FID: "DFID", index %d: %d\n", d->opd_obd->obd_name, PFID(fid), d->opd_index, rc); diff --git a/lustre/osp/osp_sync.c b/lustre/osp/osp_sync.c index a480daa0..17d90ed 100644 --- a/lustre/osp/osp_sync.c +++ b/lustre/osp/osp_sync.c @@ -243,13 +243,17 @@ int osp_sync_declare_add(const struct lu_env *env, struct osp_object *o, struct osp_thread_info *osi = osp_env_info(env); struct osp_device *d = lu2osp_dev(o->opo_obj.do_lu.lo_dev); struct llog_ctxt *ctxt; + struct thandle *storage_th; int rc; ENTRY; /* it's a layering violation, to access internals of th, * but we can do this as a sanity check, for a while */ - LASSERT(th->th_dev == d->opd_storage); + LASSERT(th->th_top != NULL); + storage_th = thandle_get_sub_by_dt(env, th->th_top, d->opd_storage); + if (IS_ERR(storage_th)) + RETURN(PTR_ERR(storage_th)); switch (type) { case MDS_UNLINK64_REC: @@ -263,12 +267,13 @@ int osp_sync_declare_add(const struct lu_env *env, struct osp_object *o, } /* we want ->dt_trans_start() to allocate per-thandle structure */ - th->th_tags |= LCT_OSP_THREAD; + storage_th->th_tags |= LCT_OSP_THREAD; ctxt = llog_get_context(d->opd_obd, LLOG_MDS_OST_ORIG_CTXT); LASSERT(ctxt); - rc = llog_declare_add(env, ctxt->loc_handle, &osi->osi_hdr, th); + rc = llog_declare_add(env, ctxt->loc_handle, &osi->osi_hdr, + storage_th); llog_ctxt_put(ctxt); RETURN(rc); @@ -303,13 +308,17 @@ static int osp_sync_add_rec(const struct lu_env *env, struct osp_device *d, struct osp_thread_info *osi = osp_env_info(env); struct llog_ctxt *ctxt; struct osp_txn_info *txn; + struct thandle *storage_th; int rc; ENTRY; /* it's a layering violation, to access internals of th, * but we can do this as a sanity check, for a while */ - LASSERT(th->th_dev == d->opd_storage); + LASSERT(th->th_top != NULL); + storage_th = thandle_get_sub_by_dt(env, th->th_top, d->opd_storage); + if (IS_ERR(storage_th)) + RETURN(PTR_ERR(storage_th)); switch (type) { case MDS_UNLINK64_REC: @@ -335,7 +344,7 @@ static int osp_sync_add_rec(const struct lu_env *env, struct osp_device *d, LBUG(); } - txn = osp_txn_info(&th->th_ctx); + txn = osp_txn_info(&storage_th->th_ctx); LASSERT(txn); txn->oti_current_id = osp_sync_id_get(d, txn->oti_current_id); @@ -346,7 +355,7 @@ static int osp_sync_add_rec(const struct lu_env *env, struct osp_device *d, RETURN(-ENOMEM); rc = llog_add(env, ctxt->loc_handle, &osi->osi_hdr, &osi->osi_cookie, - th); + storage_th); llog_ctxt_put(ctxt); if (likely(rc >= 0)) { diff --git a/lustre/osp/osp_trans.c b/lustre/osp/osp_trans.c index d00dbe7..3daed73 100644 --- a/lustre/osp/osp_trans.c +++ b/lustre/osp/osp_trans.c @@ -239,8 +239,6 @@ int osp_unplug_async_request(const struct lu_env *env, } dt_update_request_destroy(update); } else { - LASSERT(list_empty(&update->dur_list)); - args = ptlrpc_req_async_args(req); args->oaua_update = update; args->oaua_count = NULL; @@ -385,37 +383,29 @@ out: */ struct thandle *osp_trans_create(const struct lu_env *env, struct dt_device *d) { - struct thandle *th = NULL; - struct thandle_update *tu = NULL; - int rc = 0; + struct osp_thandle *oth; + struct thandle *th = NULL; + struct dt_update_request *update; + ENTRY; - OBD_ALLOC_PTR(th); - if (unlikely(th == NULL)) - GOTO(out, rc = -ENOMEM); + OBD_ALLOC_PTR(oth); + if (unlikely(oth == NULL)) + RETURN(ERR_PTR(-ENOMEM)); + th = &oth->ot_super; th->th_dev = d; th->th_tags = LCT_TX_HANDLE; - atomic_set(&th->th_refc, 1); - th->th_alloc_size = sizeof(*th); - - OBD_ALLOC_PTR(tu); - if (tu == NULL) - GOTO(out, rc = -ENOMEM); - INIT_LIST_HEAD(&tu->tu_remote_update_list); - tu->tu_only_remote_trans = 1; - th->th_update = tu; - -out: - if (rc != 0) { - if (tu != NULL) - OBD_FREE_PTR(tu); - if (th != NULL) - OBD_FREE_PTR(th); - th = ERR_PTR(rc); + update = dt_update_request_create(d); + if (IS_ERR(update)) { + OBD_FREE_PTR(oth); + RETURN(ERR_CAST(update)); } - return th; + oth->ot_dur = update; + oth->ot_send_updates_after_local_trans = false; + + RETURN(th); } /** @@ -543,45 +533,31 @@ static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp, struct dt_update_request *dt_update, struct thandle *th, bool flow_control) { - struct thandle_update *tu = th->th_update; - int rc = 0; - - LASSERT(tu != NULL); + int rc = 0; - if (is_only_remote_trans(th)) { + if (is_only_remote_trans(th) && !th->th_sync) { struct osp_async_update_args *args; struct ptlrpc_request *req; - list_del_init(&dt_update->dur_list); - if (th->th_sync) { - rc = osp_remote_sync(env, osp, dt_update, NULL, true); - dt_update_request_destroy(dt_update); - - return rc; - } - rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import, dt_update->dur_buf.ub_req, &req); - if (rc == 0) { - down_read(&osp->opd_async_updates_rwsem); + if (rc != 0) + return rc; + down_read(&osp->opd_async_updates_rwsem); - args = ptlrpc_req_async_args(req); - args->oaua_update = dt_update; - args->oaua_count = &osp->opd_async_updates_count; - args->oaua_waitq = &osp->opd_syn_barrier_waitq; - args->oaua_flow_control = flow_control; - req->rq_interpret_reply = - osp_async_update_interpret; + args = ptlrpc_req_async_args(req); + args->oaua_update = dt_update; + args->oaua_count = &osp->opd_async_updates_count; + args->oaua_waitq = &osp->opd_syn_barrier_waitq; + args->oaua_flow_control = flow_control; + req->rq_interpret_reply = + osp_async_update_interpret; - atomic_inc(args->oaua_count); - up_read(&osp->opd_async_updates_rwsem); + atomic_inc(args->oaua_count); + up_read(&osp->opd_async_updates_rwsem); - ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1); - } else { - dt_update_request_destroy(dt_update); - } + ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1); } else { - th->th_sync = 1; rc = osp_remote_sync(env, osp, dt_update, NULL, true); } @@ -589,6 +565,55 @@ static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp, } /** + * Get local thandle for osp_thandle + * + * Get the local OSD thandle from the OSP thandle. Currently, there + * are a few OSP API (osp_object_create() and osp_sync_add()) needs + * to update the object on local OSD device. + * + * If the osp_thandle comes from normal stack (MDD->LOD->OSP), then + * we will get local thandle by thandle_get_sub_by_dt. + * + * If the osp_thandle is remote thandle (th_top == NULL, only used + * by LFSCK), then it will create a local thandle, and stop it in + * osp_trans_stop(). And this only happens on OSP for OST. + * + * These are temporary solution, once OSP accessing OSD object is + * being fixed properly, this function should be removed. XXX + * + * \param[in] env pointer to the thread context + * \param[in] th pointer to the transaction handler + * \param[in] dt pointer to the OSP device + * + * \retval pointer to the local thandle + * \retval ERR_PTR(errno) if it fails. + **/ +struct thandle *osp_get_storage_thandle(const struct lu_env *env, + struct thandle *th, + struct osp_device *osp) +{ + struct osp_thandle *oth; + struct thandle *local_th; + + if (th->th_top != NULL) + return thandle_get_sub_by_dt(env, th->th_top, + osp->opd_storage); + + LASSERT(!osp->opd_connect_mdt); + oth = thandle_to_osp_thandle(th); + if (oth->ot_storage_th != NULL) + return oth->ot_storage_th; + + local_th = dt_trans_create(env, osp->opd_storage); + if (IS_ERR(local_th)) + return local_th; + + oth->ot_storage_th = local_th; + + return local_th; +} + +/** * The OSP layer dt_device_operations::dt_trans_start() interface * to start the transaction. * @@ -618,22 +643,42 @@ static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp, int osp_trans_start(const struct lu_env *env, struct dt_device *dt, struct thandle *th) { - struct thandle_update *tu = th->th_update; - struct dt_update_request *dt_update; - int rc = 0; - - if (tu == NULL) - return rc; + struct osp_thandle *oth = thandle_to_osp_thandle(th); + struct dt_update_request *dt_update; + int rc = 0; - /* Check whether there are updates related with this OSP */ - dt_update = out_find_update(tu, dt); - if (dt_update == NULL) - return rc; + dt_update = oth->ot_dur; + LASSERT(dt_update != NULL); - if (!is_only_remote_trans(th) && !tu->tu_sent_after_local_trans) + /* return if there are no updates, */ + if (dt_update->dur_buf.ub_req == NULL || + dt_update->dur_buf.ub_req->ourq_count == 0) + GOTO(out, rc = 0); + + /* Note: some updates needs to send before local transaction, + * some needs to send after local transaction. + * + * If the transaction only includes remote updates, it will + * send updates to remote MDT in osp_trans_stop. + * + * If it is remote create, it will send the remote req after + * local transaction. i.e. create the object locally first, + * then insert the name entry. + * + * If it is remote unlink, it will send the remote req before + * the local transaction, i.e. delete the name entry remote + * first, then destroy the local object. */ + if (!is_only_remote_trans(th) && + !oth->ot_send_updates_after_local_trans) rc = osp_trans_trigger(env, dt2osp_dev(dt), dt_update, th, false); +out: + /* For remote thandle, if there are local thandle, start it here*/ + if (th->th_top == NULL && oth->ot_storage_th != NULL) + rc = dt_trans_start(env, oth->ot_storage_th->th_dev, + oth->ot_storage_th); + return rc; } @@ -660,66 +705,65 @@ int osp_trans_start(const struct lu_env *env, struct dt_device *dt, int osp_trans_stop(const struct lu_env *env, struct dt_device *dt, struct thandle *th) { - struct thandle_update *tu = th->th_update; - struct dt_update_request *dt_update; - int rc = 0; - ENTRY; - LASSERT(tu != NULL); - LASSERT(tu != LP_POISON); + struct osp_thandle *oth = thandle_to_osp_thandle(th); + struct dt_update_request *dt_update; + int rc = 0; + bool keep_dt_update = false; + ENTRY; - /* Check whether there are updates related with this OSP */ - dt_update = out_find_update(tu, dt); - if (dt_update == NULL) { - if (!is_only_remote_trans(th)) - RETURN(rc); + dt_update = oth->ot_dur; + LASSERT(dt_update != NULL); + LASSERT(dt_update != LP_POISON); - GOTO(put, rc); + /* For remote transaction, if there is local storage thandle, + * stop it first */ + if (oth->ot_storage_th != NULL && th->th_top == NULL) { + dt_trans_stop(env, oth->ot_storage_th->th_dev, + oth->ot_storage_th); + oth->ot_storage_th = NULL; } - + /* If there are no updates, destroy dt_update and thandle */ if (dt_update->dur_buf.ub_req == NULL || - dt_update->dur_buf.ub_req->ourq_count == 0) { - dt_update_request_destroy(dt_update); - GOTO(put, rc); - } - - if (is_only_remote_trans(th)) { - if (th->th_result == 0) { - struct osp_device *osp = dt2osp_dev(th->th_dev); - struct client_obd *cli = &osp->opd_obd->u.cli; + dt_update->dur_buf.ub_req->ourq_count == 0) + GOTO(out, rc); - rc = obd_get_request_slot(cli); - if (!osp->opd_imp_active || !osp->opd_imp_connected) { - if (rc == 0) - obd_put_request_slot(cli); + if (is_only_remote_trans(th) && !th->th_sync) { + struct osp_device *osp = dt2osp_dev(th->th_dev); + struct client_obd *cli = &osp->opd_obd->u.cli; - rc = -ENOTCONN; - } - - if (rc != 0) { - dt_update_request_destroy(dt_update); - GOTO(put, rc); - } + if (th->th_result != 0) { + rc = th->th_result; + GOTO(out, rc); + } - rc = osp_trans_trigger(env, dt2osp_dev(dt), - dt_update, th, true); - if (rc != 0) + rc = obd_get_request_slot(cli); + if (!osp->opd_imp_active || !osp->opd_imp_connected) { + if (rc == 0) obd_put_request_slot(cli); - } else { - rc = th->th_result; - dt_update_request_destroy(dt_update); + rc = -ENOTCONN; } + if (rc != 0) + GOTO(out, rc); + + rc = osp_trans_trigger(env, dt2osp_dev(dt), + dt_update, th, true); + if (rc != 0) + obd_put_request_slot(cli); + else + keep_dt_update = true; } else { - if (tu->tu_sent_after_local_trans) - rc = osp_trans_trigger(env, dt2osp_dev(dt), - dt_update, th, false); + if (oth->ot_send_updates_after_local_trans || + (is_only_remote_trans(th) && th->th_sync)) + rc = osp_trans_trigger(env, dt2osp_dev(dt), dt_update, + th, false); rc = dt_update->dur_rc; - dt_update_request_destroy(dt_update); } - GOTO(put, rc); +out: + if (!keep_dt_update) + dt_update_request_destroy(dt_update); + OBD_FREE_PTR(oth); -put: - thandle_put(th); - return rc; + RETURN(rc); } diff --git a/lustre/ptlrpc/Makefile.in b/lustre/ptlrpc/Makefile.in index 2a54e95..7a32f73 100644 --- a/lustre/ptlrpc/Makefile.in +++ b/lustre/ptlrpc/Makefile.in @@ -20,6 +20,7 @@ ptlrpc_objs += nrs_tbf.o errno.o target_objs := $(TARGET)tgt_main.o $(TARGET)tgt_lastrcvd.o target_objs += $(TARGET)tgt_handler.o $(TARGET)out_handler.o target_objs += $(TARGET)out_lib.o +target_objs += $(TARGET)update_trans.o nodemap_objs = nodemap_handler.o nodemap_lproc.o nodemap_range.o nodemap_objs += nodemap_idmap.o nodemap_rbtree.o nodemap_member.o diff --git a/lustre/ptlrpc/layout.c b/lustre/ptlrpc/layout.c index eec818c..877235e 100644 --- a/lustre/ptlrpc/layout.c +++ b/lustre/ptlrpc/layout.c @@ -61,7 +61,6 @@ /* obd2cli_tgt() (required by DEBUG_REQ()) */ #include #include -#include #endif /* !__REQ_LAYOUT_USER__ */ /* struct ptlrpc_request, lustre_msg* */ diff --git a/lustre/target/Makefile.am b/lustre/target/Makefile.am index b11c11d..eaf3957 100644 --- a/lustre/target/Makefile.am +++ b/lustre/target/Makefile.am @@ -33,3 +33,4 @@ MOSTLYCLEANFILES := @MOSTLYCLEANFILES@ EXTRA_DIST = tgt_main.c tgt_lastrcvd.c tgt_handler.c tgt_internal.h \ out_handler.c out_lib.c +EXTRA_DIST += update_trans.c diff --git a/lustre/target/out_lib.c b/lustre/target/out_lib.c index dce9da9..192a2b8 100644 --- a/lustre/target/out_lib.c +++ b/lustre/target/out_lib.c @@ -38,22 +38,8 @@ #define OUT_UPDATE_BUFFER_SIZE_ADD 4096 #define OUT_UPDATE_BUFFER_SIZE_MAX (256 * 4096) /* 1MB update size now */ - -struct dt_update_request* -out_find_update(struct thandle_update *tu, struct dt_device *dt_dev) -{ - struct dt_update_request *dt_update; - - list_for_each_entry(dt_update, &tu->tu_remote_update_list, - dur_list) { - if (dt_update->dur_dt == dt_dev) - return dt_update; - } - return NULL; -} -EXPORT_SYMBOL(out_find_update); - -static struct object_update_request *object_update_request_alloc(size_t size) +static inline struct object_update_request * +object_update_request_alloc(size_t size) { struct object_update_request *ourq; @@ -67,31 +53,19 @@ static struct object_update_request *object_update_request_alloc(size_t size) RETURN(ourq); } -static void object_update_request_free(struct object_update_request *ourq, - size_t ourq_size) +static inline void +object_update_request_free(struct object_update_request *ourq, + size_t ourq_size) { if (ourq != NULL) OBD_FREE_LARGE(ourq, ourq_size); } -void dt_update_request_destroy(struct dt_update_request *dt_update) -{ - if (dt_update == NULL) - return; - - list_del(&dt_update->dur_list); - - object_update_request_free(dt_update->dur_buf.ub_req, - dt_update->dur_buf.ub_req_size); - OBD_FREE_PTR(dt_update); -} -EXPORT_SYMBOL(dt_update_request_destroy); - /** * Allocate and initialize dt_update_request * * dt_update_request is being used to track updates being executed on - * this dt_device(OSD or OSP). The update buffer will be 8k initially, + * this dt_device(OSD or OSP). The update buffer will be 4k initially, * and increased if needed. * * \param [in] dt dt device @@ -117,7 +91,6 @@ struct dt_update_request *dt_update_request_create(struct dt_device *dt) dt_update->dur_buf.ub_req = ourq; dt_update->dur_buf.ub_req_size = OUT_UPDATE_INIT_BUFFER_SIZE; - INIT_LIST_HEAD(&dt_update->dur_list); dt_update->dur_dt = dt; dt_update->dur_batchid = 0; INIT_LIST_HEAD(&dt_update->dur_cb_items); @@ -127,53 +100,22 @@ struct dt_update_request *dt_update_request_create(struct dt_device *dt) EXPORT_SYMBOL(dt_update_request_create); /** - * Find or create dt_update_request. + * Destroy dt_update_request * - * Find or create one loc in th_dev/dev_obj_update for the update, - * Because only one thread can access this thandle, no need - * lock now. - * - * \param[in] th transaction handle - * \param[in] dt lookup update request by dt_object - * - * \retval pointer of dt_update_request if it can be created - * or found. - * \retval ERR_PTR(errno) if it can not be created or found. + * \param [in] dt_update dt_update_request being destroyed */ -struct dt_update_request * -dt_update_request_find_or_create(struct thandle *th, struct dt_object *dt) +void dt_update_request_destroy(struct dt_update_request *dt_update) { - struct dt_device *dt_dev = lu2dt_dev(dt->do_lu.lo_dev); - struct thandle_update *tu = th->th_update; - struct dt_update_request *update; - ENTRY; - - if (tu == NULL) { - OBD_ALLOC_PTR(tu); - if (tu == NULL) - RETURN(ERR_PTR(-ENOMEM)); - - INIT_LIST_HEAD(&tu->tu_remote_update_list); - tu->tu_sent_after_local_trans = 0; - th->th_update = tu; - } - - update = out_find_update(tu, dt_dev); - if (update != NULL) - RETURN(update); - - update = dt_update_request_create(dt_dev); - if (IS_ERR(update)) - RETURN(update); - - list_add_tail(&update->dur_list, &tu->tu_remote_update_list); + if (dt_update == NULL) + return; - if (!tu->tu_only_remote_trans) - thandle_get(th); + object_update_request_free(dt_update->dur_buf.ub_req, + dt_update->dur_buf.ub_req_size); + OBD_FREE_PTR(dt_update); - RETURN(update); + return; } -EXPORT_SYMBOL(dt_update_request_find_or_create); +EXPORT_SYMBOL(dt_update_request_destroy); /** * resize update buffer @@ -223,9 +165,9 @@ static int update_buffer_resize(struct update_buffer *ubuf, size_t new_size) * \param[in] batchid batchid(transaction no) of this update * * \retval 0 pack update succeed. - * negative errno pack update failed. + * \retval negative errno pack update failed. **/ -static struct object_update* +static struct object_update * out_update_header_pack(const struct lu_env *env, struct update_buffer *ubuf, enum update_type op, const struct lu_fid *fid, int params_count, __u16 *param_sizes, __u64 batchid) diff --git a/lustre/target/update_trans.c b/lustre/target/update_trans.c new file mode 100644 index 0000000..b57fbe8 --- /dev/null +++ b/lustre/target/update_trans.c @@ -0,0 +1,261 @@ +/* + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.gnu.org/licenses/gpl-2.0.html + * + * GPL HEADER END + */ +/* + * Copyright (c) 2014, Intel Corporation. + */ +/* + * lustre/target/update_trans.c + * + * This file implements the update distribute transaction API. + * + * To manage the cross-MDT operation (distribute operation) transaction, + * the transaction will also be separated two layers on MD stack, top + * transaction and sub transaction. + * + * During the distribute operation, top transaction is created in the LOD + * layer, and represent the operation. Sub transaction is created by + * each OSD or OSP. Top transaction start/stop will trigger all of its sub + * transaction start/stop. Top transaction (the whole operation) is committed + * only all of its sub transaction are committed. + * + * there are three kinds of transactions + * 1. local transaction: All updates are in a single local OSD. + * 2. Remote transaction: All Updates are only in the remote OSD, + * i.e. locally all updates are in OSP. + * 3. Mixed transaction: Updates are both in local OSD and remote + * OSD. + * + * Author: Di Wang + */ + +#define DEBUG_SUBSYSTEM S_CLASS + +#include +#include +#include +#include + +/** + * Create the top transaction. + * + * Create the top transaction on the master device. It will create a top + * thandle and a sub thandle on the master device. + * + * \param[in] env execution environment + * \param[in] master_dev master_dev the top thandle will be created + * + * \retval pointer to the created thandle. + * \retval ERR_PTR(errno) if creation failed. + */ +struct thandle * +top_trans_create(const struct lu_env *env, struct dt_device *master_dev) +{ + struct top_thandle *top_th; + struct thandle *child_th; + + OBD_ALLOC_GFP(top_th, sizeof(*top_th), __GFP_IO); + if (top_th == NULL) + return ERR_PTR(-ENOMEM); + + child_th = dt_trans_create(env, master_dev); + if (IS_ERR(child_th)) { + OBD_FREE_PTR(top_th); + return child_th; + } + + top_th->tt_magic = TOP_THANDLE_MAGIC; + top_th->tt_master_sub_thandle = child_th; + child_th->th_top = &top_th->tt_super; + INIT_LIST_HEAD(&top_th->tt_sub_thandle_list); + top_th->tt_super.th_top = &top_th->tt_super; + + return &top_th->tt_super; +} +EXPORT_SYMBOL(top_trans_create); + +/** + * start the top transaction. + * + * Start all of its sub transactions, then start master sub transaction. + * + * \param[in] env execution environment + * \param[in] master_dev master_dev the top thandle will be start + * \param[in] th top thandle + * + * \retval 0 if transaction start succeeds. + * \retval negative errno if start fails. + */ +int top_trans_start(const struct lu_env *env, struct dt_device *master_dev, + struct thandle *th) +{ + struct top_thandle *top_th = container_of(th, struct top_thandle, + tt_super); + struct sub_thandle *lst; + int rc = 0; + + LASSERT(top_th->tt_magic == TOP_THANDLE_MAGIC); + list_for_each_entry(lst, &top_th->tt_sub_thandle_list, st_sub_list) { + lst->st_sub_th->th_sync = th->th_sync; + lst->st_sub_th->th_local = th->th_local; + rc = dt_trans_start(env, lst->st_sub_th->th_dev, + lst->st_sub_th); + if (rc != 0) + return rc; + } + + top_th->tt_master_sub_thandle->th_local = th->th_local; + top_th->tt_master_sub_thandle->th_sync = th->th_sync; + + return dt_trans_start(env, master_dev, top_th->tt_master_sub_thandle); +} +EXPORT_SYMBOL(top_trans_start); + +/** + * Stop the top transaction. + * + * Stop the transaction on the master device first, then stop transactions + * on other sub devices. + * + * \param[in] env execution environment + * \param[in] master_dev master_dev the top thandle will be created + * \param[in] th top thandle + * + * \retval 0 if stop transaction succeeds. + * \retval negative errno if stop transaction fails. + */ +int top_trans_stop(const struct lu_env *env, struct dt_device *master_dev, + struct thandle *th) +{ + struct sub_thandle *lst; + struct top_thandle *top_th = container_of(th, struct top_thandle, + tt_super); + int rc; + ENTRY; + + /* Note: we always need walk through all of sub_transaction to do + * transaction stop to release the resource here */ + LASSERT(top_th->tt_magic == TOP_THANDLE_MAGIC); + + top_th->tt_master_sub_thandle->th_local = th->th_local; + top_th->tt_master_sub_thandle->th_sync = th->th_sync; + + /* To avoid sending RPC while holding thandle, it always stop local + * transaction first, then other sub thandle */ + rc = dt_trans_stop(env, master_dev, top_th->tt_master_sub_thandle); + + list_for_each_entry(lst, &top_th->tt_sub_thandle_list, st_sub_list) { + int rc2; + + if (rc != 0) + lst->st_sub_th->th_result = rc; + lst->st_sub_th->th_sync = th->th_sync; + lst->st_sub_th->th_local = th->th_local; + rc2 = dt_trans_stop(env, lst->st_sub_th->th_dev, + lst->st_sub_th); + if (unlikely(rc2 < 0 && rc == 0)) + rc = rc2; + } + + top_thandle_destroy(top_th); + + RETURN(rc); +} +EXPORT_SYMBOL(top_trans_stop); + +/** + * Get sub thandle. + * + * Get sub thandle from the top thandle according to the sub dt_device. + * + * \param[in] env execution environment + * \param[in] th thandle on the top layer. + * \param[in] sub_dt sub dt_device used to get sub transaction + * + * \retval thandle of sub transaction if succeed + * \retval PTR_ERR(errno) if failed + */ +struct thandle *thandle_get_sub_by_dt(const struct lu_env *env, + struct thandle *th, + struct dt_device *sub_dt) +{ + struct sub_thandle *lst; + struct top_thandle *top_th = container_of(th, struct top_thandle, + tt_super); + struct thandle *sub_th; + ENTRY; + + LASSERT(top_th->tt_magic == TOP_THANDLE_MAGIC); + LASSERT(top_th->tt_master_sub_thandle != NULL); + if (likely(sub_dt == top_th->tt_master_sub_thandle->th_dev)) + RETURN(top_th->tt_master_sub_thandle); + + /* Find or create the transaction in tt_trans_list, since there is + * always only one thread access the list, so no need lock here */ + list_for_each_entry(lst, &top_th->tt_sub_thandle_list, st_sub_list) { + if (lst->st_sub_th->th_dev == sub_dt) + RETURN(lst->st_sub_th); + } + + sub_th = dt_trans_create(env, sub_dt); + if (IS_ERR(sub_th)) + RETURN(sub_th); + + /* XXX all of mixed transaction (see struct th_handle) will + * be synchronized until async update is done */ + th->th_sync = 1; + + sub_th->th_top = th; + OBD_ALLOC_PTR(lst); + if (lst == NULL) { + dt_trans_stop(env, sub_dt, sub_th); + RETURN(ERR_PTR(-ENOMEM)); + } + + INIT_LIST_HEAD(&lst->st_sub_list); + lst->st_sub_th = sub_th; + list_add(&lst->st_sub_list, &top_th->tt_sub_thandle_list); + + RETURN(sub_th); +} +EXPORT_SYMBOL(thandle_get_sub_by_dt); + +/** + * Top thandle destroy + * + * Destroy the top thandle and all of its sub thandle. + * + * \param[in] top_th top thandle to be destroyed. + */ +void top_thandle_destroy(struct top_thandle *top_th) +{ + struct sub_thandle *st; + struct sub_thandle *tmp; + + LASSERT(top_th->tt_magic == TOP_THANDLE_MAGIC); + list_for_each_entry_safe(st, tmp, &top_th->tt_sub_thandle_list, + st_sub_list) { + list_del(&st->st_sub_list); + OBD_FREE_PTR(st); + } + OBD_FREE_PTR(top_th); +} +EXPORT_SYMBOL(top_thandle_destroy); diff --git a/lustre/tests/test-framework.sh b/lustre/tests/test-framework.sh index ee32957..5fd1573 100755 --- a/lustre/tests/test-framework.sh +++ b/lustre/tests/test-framework.sh @@ -1193,6 +1193,12 @@ mount_facet() { else set_default_debug_facet $facet + if [[ $facet == mds* ]]; then + do_facet $facet \ + lctl set_param -n mdt.${FSNAME}*.enable_remote_dir=1 \ + 2>/dev/null + fi + label=$(devicelabel ${facet} ${!dev}) [ -z "$label" ] && echo no label for ${!dev} && exit 1 eval export ${facet}_svc=${label}