From fb80ae7c7601a03c1181de381f067f553e7b8c6f Mon Sep 17 00:00:00 2001 From: wang di Date: Thu, 21 May 2015 02:56:39 -0700 Subject: [PATCH] LU-6602 update: split update llog record If the update llog record size exceeds the limit (llog chunk size), which usually happens when creating striped directory with large stripe count , then it will be split into mulitple records, and these update records will have the same batchid. During recovery, these records will be combined and constructed into the updates of one operation, then do update replay. Allow multiples stripes in a single MDT, so it can verify creating large stripes in small scale test environment. Add sanity 300j/300k and replay-single 116 to verify it. Signed-off-by: wang di Change-Id: I86ca2594fe62d5b921e794de4cd88981d91f7677 Reviewed-on: http://review.whamcloud.com/15162 Tested-by: Jenkins Tested-by: Maloo Reviewed-by: Andreas Dilger Reviewed-by: James Simmons Reviewed-by: Oleg Drokin --- lustre/include/lu_target.h | 5 +- lustre/include/lustre/lustre_idl.h | 65 +++++++- lustre/include/lustre_dlm.h | 2 + lustre/include/lustre_update.h | 134 +++++++++------- lustre/include/obd_support.h | 2 + lustre/llite/dir.c | 9 ++ lustre/lod/lod_dev.c | 43 ++++++ lustre/lod/lod_object.c | 95 +++++++----- lustre/lod/lod_sub_object.c | 85 ++++++++-- lustre/mdt/mdt_reint.c | 2 + lustre/obdclass/llog_osd.c | 5 +- lustre/osp/osp_trans.c | 3 + lustre/ptlrpc/wiretest.c | 20 +-- lustre/target/out_lib.c | 1 + lustre/target/update_records.c | 309 +++++++++++++++++++++++++++++++++++-- lustre/target/update_recovery.c | 185 ++++++++++++++++++---- lustre/target/update_trans.c | 226 ++++++++++++++++++++++----- lustre/tests/replay-single.sh | 50 ++++++ lustre/tests/sanity.sh | 51 ++++++ lustre/utils/liblustreapi.c | 27 +++- lustre/utils/wirecheck.c | 46 ++++++ lustre/utils/wiretest.c | 20 +-- 22 files changed, 1161 insertions(+), 224 deletions(-) diff --git a/lustre/include/lu_target.h b/lustre/include/lu_target.h index 4428e69..6d20d2f 100644 --- a/lustre/include/lu_target.h +++ b/lustre/include/lu_target.h @@ -66,7 +66,10 @@ struct distribute_txn_replay_req { * by this structure */ struct distribute_txn_replay_req_sub { __u32 dtrqs_mdt_index; - struct llog_cookie dtrqs_llog_cookie; + + /* All of cookies for the update will be linked here */ + spinlock_t dtrqs_cookie_list_lock; + struct list_head dtrqs_cookie_list; struct list_head dtrqs_list; }; diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index 281d88d..afef66f 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -983,9 +983,6 @@ struct lu_orphan_ent { }; void lustre_swab_orphan_ent(struct lu_orphan_ent *ent); -struct update_ops; -void lustre_swab_update_ops(struct update_ops *uops, unsigned int op_count); - /** @} lu_fid */ /** \defgroup lu_dir lu_dir @@ -4023,6 +4020,7 @@ enum update_type { OUT_XATTR_DEL = 13, OUT_PUNCH = 14, OUT_READ = 15, + OUT_NOOP = 16, OUT_LAST }; @@ -4223,5 +4221,66 @@ struct close_data { void lustre_swab_close_data(struct close_data *data); +struct update_ops; +void lustre_swab_update_ops(struct update_ops *uops, unsigned int op_count); + +/* Update llog format */ +struct update_op { + struct lu_fid uop_fid; + __u16 uop_type; + __u16 uop_param_count; + __u16 uop_params_off[0]; +}; + +struct update_ops { + struct update_op uops_op[0]; +}; + +struct update_params { + struct object_update_param up_params[0]; +}; + +enum update_records_flag { + UPDATE_RECORD_CONTINUE = 1 >> 0, +}; +/* + * This is the update record format used to store the updates in + * disk. All updates of the operation will be stored in ur_ops. + * All of parameters for updates of the operation will be stored + * in ur_params. + * To save the space of the record, parameters in ur_ops will only + * remember their offset in ur_params, so to avoid storing duplicate + * parameters in ur_params, which can help us save a lot space for + * operation like creating striped directory. + */ +struct update_records { + __u64 ur_master_transno; + __u64 ur_batchid; + __u32 ur_flags; + /* If the operation includes multiple updates, then ur_index + * means the index of the update inside the whole updates. */ + __u32 ur_index; + __u32 ur_update_count; + __u32 ur_param_count; + struct update_ops ur_ops; + /* Note ur_ops has a variable size, so comment out + * the following ur_params, in case some use it directly + * update_records->ur_params + * + * struct update_params ur_params; + */ +}; + +struct llog_update_record { + struct llog_rec_hdr lur_hdr; + struct update_records lur_update_rec; + /* Note ur_update_rec has a variable size, so comment out + * the following ur_tail, in case someone use it directly + * + * struct llog_rec_tail lur_tail; + */ +}; + + #endif /** @} lustreidl */ diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h index 4d64b19..6a2bc6a 100644 --- a/lustre/include/lustre_dlm.h +++ b/lustre/include/lustre_dlm.h @@ -1084,9 +1084,11 @@ struct ldlm_enqueue_info { __u32 ei_type; /** Type of the lock being enqueued. */ __u32 ei_mode; /** Mode of the lock being enqueued. */ void *ei_cb_bl; /** blocking lock callback */ + void *ei_cb_local_bl; /** blocking local lock callback */ void *ei_cb_cp; /** lock completion callback */ void *ei_cb_gl; /** lock glimpse callback */ void *ei_cbdata; /** Data to be passed into callbacks. */ + void *ei_namespace; /** lock namespace **/ unsigned int ei_enq_slave:1; /* whether enqueue slave stripes */ }; diff --git a/lustre/include/lustre_update.h b/lustre/include/lustre_update.h index c2994b1..f9fd97e 100644 --- a/lustre/include/lustre_update.h +++ b/lustre/include/lustre_update.h @@ -40,10 +40,7 @@ struct dt_key; struct dt_rec; struct object_update_param; - -struct update_params { - struct object_update_param up_params[0]; -}; +struct llog_update_record; static inline size_t update_params_size(const struct update_params *params, unsigned int param_count) @@ -98,13 +95,6 @@ update_params_get_param_buf(const struct update_params *params, __u16 index, return param->oup_buf; } -struct update_op { - struct lu_fid uop_fid; - __u16 uop_type; - __u16 uop_param_count; - __u16 uop_params_off[0]; -}; - static inline size_t update_op_size(unsigned int param_count) { @@ -118,11 +108,6 @@ update_op_next_op(const struct update_op *uop) update_op_size(uop->uop_param_count)); } -/* All of updates in the mulitple_update_record */ -struct update_ops { - struct update_op uops_op[0]; -}; - static inline size_t update_ops_size(const struct update_ops *ops, unsigned int update_count) { @@ -137,41 +122,6 @@ static inline size_t update_ops_size(const struct update_ops *ops, return total_size; } -/* - * This is the update record format used to store the updates in - * disk. All updates of the operation will be stored in ur_ops. - * All of parameters for updates of the operation will be stored - * in ur_params. - * To save the space of the record, parameters in ur_ops will only - * remember their offset in ur_params, so to avoid storing duplicate - * parameters in ur_params, which can help us save a lot space for - * operation like creating striped directory. - */ -struct update_records { - __u64 ur_master_transno; - __u64 ur_batchid; - __u32 ur_flags; - __u32 ur_param_count; - __u32 ur_update_count; - struct update_ops ur_ops; - /* Note ur_ops has a variable size, so comment out - * the following ur_params, in case some use it directly - * update_records->ur_params - * - * struct update_params ur_params; - */ -}; - -struct llog_update_record { - struct llog_rec_hdr lur_hdr; - struct update_records lur_update_rec; - /* Note ur_update_rec has a variable size, so comment out - * the following ur_tail, in case someone use it directly - * - * struct llog_rec_tail lur_tail; - */ -}; - static inline struct update_params * update_records_get_params(const struct update_records *record) { @@ -183,13 +133,21 @@ update_records_get_params(const struct update_records *record) static inline size_t update_records_size(const struct update_records *record) { - struct update_params *params; + size_t op_size = 0; + size_t param_size = 0; - params = update_records_get_params(record); + if (record->ur_update_count > 0) + op_size = update_ops_size(&record->ur_ops, + record->ur_update_count); + if (record->ur_param_count > 0) { + struct update_params *params; + + params = update_records_get_params(record); + param_size = update_params_size(params, record->ur_param_count); + } return cfs_size_round(offsetof(struct update_records, ur_ops) + - update_ops_size(&record->ur_ops, record->ur_update_count) + - update_params_size(params, record->ur_param_count)); + op_size + param_size); } static inline size_t @@ -336,6 +294,7 @@ struct top_multiple_thandle { __u64 tmt_batchid; int tmt_result; __u32 tmt_magic; + size_t tmt_record_size; __u32 tmt_committed:1; }; @@ -350,12 +309,17 @@ struct top_thandle { struct top_multiple_thandle *tt_multiple_thandle; }; +struct sub_thandle_cookie { + struct llog_cookie stc_cookie; + struct list_head stc_list; +}; + /* Sub thandle is used to track multiple sub thandles under one parent * thandle */ struct sub_thandle { struct thandle *st_sub_th; struct dt_device *st_dt; - struct llog_cookie st_cookie; + struct list_head st_cookie_list; struct dt_txn_commit_cb st_commit_dcb; struct dt_txn_commit_cb st_stop_dcb; int st_result; @@ -515,6 +479,43 @@ int sub_thandle_trans_create(const struct lu_env *env, struct sub_thandle *st); /* update_records.c */ +size_t update_records_create_size(const struct lu_env *env, + const struct lu_fid *fid, + const struct lu_attr *attr, + const struct dt_allocation_hint *hint, + struct dt_object_format *dof); +size_t update_records_attr_set_size(const struct lu_env *env, + const struct lu_fid *fid, + const struct lu_attr *attr); +size_t update_records_ref_add_size(const struct lu_env *env, + const struct lu_fid *fid); +size_t update_records_ref_del_size(const struct lu_env *env, + const struct lu_fid *fid); +size_t update_records_object_destroy_size(const struct lu_env *env, + const struct lu_fid *fid); +size_t update_records_index_insert_size(const struct lu_env *env, + const struct lu_fid *fid, + const struct dt_rec *rec, + const struct dt_key *key); +size_t update_records_index_delete_size(const struct lu_env *env, + const struct lu_fid *fid, + const struct dt_key *key); +size_t update_records_xattr_set_size(const struct lu_env *env, + const struct lu_fid *fid, + const struct lu_buf *buf, + const char *name, + __u32 flag); +size_t update_records_xattr_del_size(const struct lu_env *env, + const struct lu_fid *fid, + const char *name); +size_t update_records_write_size(const struct lu_env *env, + const struct lu_fid *fid, + const struct lu_buf *buf, + __u64 pos); +size_t update_records_punch_size(const struct lu_env *env, + const struct lu_fid *fid, + __u64 start, __u64 end); + int update_records_create_pack(const struct lu_env *env, struct update_ops *ops, unsigned int *op_count, @@ -616,6 +617,14 @@ int update_records_punch_pack(const struct lu_env *env, size_t *max_param_size, const struct lu_fid *fid, __u64 start, __u64 end); +int update_records_noop_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid); int tur_update_records_extend(struct thandle_update_records *tur, size_t new_size); @@ -663,4 +672,17 @@ int tur_update_extend(struct thandle_update_records *tur, } \ ret; \ }) + +#define update_record_size(env, name, th, ...) \ +({ \ + struct top_thandle *top_th; \ + struct top_multiple_thandle *tmt; \ + \ + top_th = container_of(th, struct top_thandle, tt_super); \ + \ + LASSERT(top_th->tt_multiple_thandle != NULL); \ + tmt = top_th->tt_multiple_thandle; \ + tmt->tmt_record_size += \ + update_records_##name##_size(env, __VA_ARGS__); \ +}) #endif diff --git a/lustre/include/obd_support.h b/lustre/include/obd_support.h index a4d400a..ed20d85 100644 --- a/lustre/include/obd_support.h +++ b/lustre/include/obd_support.h @@ -561,6 +561,8 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type, /* UPDATE */ #define OBD_FAIL_OUT_UPDATE_NET 0x1700 #define OBD_FAIL_OUT_UPDATE_NET_REP 0x1701 +#define OBD_FAIL_SPLIT_UPDATE_REC 0x1702 +#define OBD_FAIL_LARGE_STRIPE 0x1703 /* MIGRATE */ #define OBD_FAIL_MIGRATE_NET_REP 0x1800 diff --git a/lustre/llite/dir.c b/lustre/llite/dir.c index ef46098..9c90098 100644 --- a/lustre/llite/dir.c +++ b/lustre/llite/dir.c @@ -1256,12 +1256,14 @@ lmv_out_free: int mdt_index; int lum_size; int stripe_count; + int max_stripe_count; int i; int rc; if (copy_from_user(&lum, ulmv, sizeof(*ulmv))) RETURN(-EFAULT); + max_stripe_count = lum.lum_stripe_count; /* lum_magic will indicate which stripe the ioctl will like * to get, LMV_MAGIC_V1 is for normal LMV stripe, LMV_USER_MAGIC * is for default LMV stripe */ @@ -1292,6 +1294,13 @@ lmv_out_free: } stripe_count = lmv_mds_md_stripe_count_get(lmm); + if (max_stripe_count < stripe_count) { + lum.lum_stripe_count = stripe_count; + if (copy_to_user(ulmv, &lum, sizeof(lum))) + GOTO(finish_req, rc = -EFAULT); + GOTO(finish_req, rc = -E2BIG); + } + lum_size = lmv_user_md_size(stripe_count, LMV_MAGIC_V1); OBD_ALLOC(tmp, lum_size); if (tmp == NULL) diff --git a/lustre/lod/lod_dev.c b/lustre/lod/lod_dev.c index 02b41d6..63dfca9 100644 --- a/lustre/lod/lod_dev.c +++ b/lustre/lod/lod_dev.c @@ -1101,6 +1101,42 @@ static int lod_trans_cb_add(struct thandle *th, } /** + * add noop update to the update records + * + * Add noop updates to the update records, which is only used in + * test right now. + * + * \param[in] env execution environment + * \param[in] dt dt device of lod + * \param[in] th thandle + * \param[in] count the count of update records to be added. + * + * \retval 0 if adding succeeds. + * \retval negative errno if adding fails. + */ +static int lod_add_noop_records(const struct lu_env *env, + struct dt_device *dt, struct thandle *th, + int count) +{ + struct top_thandle *top_th; + struct lu_fid *fid = &lod_env_info(env)->lti_fid; + int i; + int rc = 0; + + top_th = container_of(th, struct top_thandle, tt_super); + if (top_th->tt_multiple_thandle == NULL) + return 0; + + fid_zero(fid); + for (i = 0; i < count; i++) { + rc = update_record_pack(noop, th, fid); + if (rc < 0) + return rc; + } + return rc; +} + +/** * Implementation of dt_device_operations::dt_trans_stop() for LOD * * Stops the set of local transactions using the targets involved @@ -1111,6 +1147,13 @@ static int lod_trans_cb_add(struct thandle *th, static int lod_trans_stop(const struct lu_env *env, struct dt_device *dt, struct thandle *th) { + if (OBD_FAIL_CHECK(OBD_FAIL_SPLIT_UPDATE_REC)) { + int rc; + + rc = lod_add_noop_records(env, dt, th, 5000); + if (rc < 0) + RETURN(rc); + } return top_trans_stop(env, dt2lod_dev(dt)->lod_child, th); } diff --git a/lustre/lod/lod_object.c b/lustre/lod/lod_object.c index b1318f6..948a675 100644 --- a/lustre/lod/lod_object.c +++ b/lustre/lod/lod_object.c @@ -1787,6 +1787,7 @@ static int lod_prep_md_striped_create(const struct lu_env *env, struct dt_object **stripe; __u32 stripe_count; int *idx_array; + __u32 master_index; int rc = 0; __u32 i; __u32 j; @@ -1799,7 +1800,8 @@ static int lod_prep_md_striped_create(const struct lu_env *env, stripe_count = le32_to_cpu(lum->lum_stripe_count); /* shrink the stripe_count to the avaible MDT count */ - if (stripe_count > lod->lod_remote_mdt_count + 1) + if (stripe_count > lod->lod_remote_mdt_count + 1 && + !OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE)) stripe_count = lod->lod_remote_mdt_count + 1; OBD_ALLOC(stripe, sizeof(stripe[0]) * stripe_count); @@ -1810,6 +1812,9 @@ static int lod_prep_md_striped_create(const struct lu_env *env, if (idx_array == NULL) GOTO(out_free, rc = -ENOMEM); + /* Start index will be the master MDT */ + master_index = lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_node_id; + idx_array[0] = master_index; for (i = 0; i < stripe_count; i++) { struct lod_tgt_desc *tgt = NULL; struct dt_object *dto; @@ -1818,44 +1823,42 @@ static int lod_prep_md_striped_create(const struct lu_env *env, struct lu_object_conf conf = { 0 }; struct dt_device *tgt_dt = NULL; - if (i == 0) { - /* Right now, master stripe and master object are - * on the same MDT */ - idx = lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_node_id; - rc = obd_fid_alloc(env, lod->lod_child_exp, &fid, - NULL); - if (rc < 0) - GOTO(out_put, rc); - tgt_dt = lod->lod_child; - goto next; - } - - idx = (idx_array[i - 1] + 1) % (lod->lod_remote_mdt_count + 1); - + /* Try to find next avaible target */ + idx = idx_array[i]; for (j = 0; j < lod->lod_remote_mdt_count; j++, idx = (idx + 1) % (lod->lod_remote_mdt_count + 1)) { bool already_allocated = false; __u32 k; - CDEBUG(D_INFO, "try idx %d, mdt cnt %u," - " allocated %u, last allocated %d\n", idx, - lod->lod_remote_mdt_count, i, idx_array[i - 1]); + CDEBUG(D_INFO, "try idx %d, mdt cnt %u, allocated %u\n", + idx, lod->lod_remote_mdt_count + 1, i); + if (idx == master_index) { + /* Allocate the FID locally */ + rc = obd_fid_alloc(env, lod->lod_child_exp, + &fid, NULL); + if (rc < 0) + GOTO(out_put, rc); + tgt_dt = lod->lod_child; + break; + } /* Find next available target */ if (!cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx)) continue; - /* check whether the idx already exists - * in current allocated array */ - for (k = 0; k < i; k++) { - if (idx_array[k] == idx) { - already_allocated = true; - break; + if (likely(!OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE))) { + /* check whether the idx already exists + * in current allocated array */ + for (k = 0; k < i; k++) { + if (idx_array[k] == idx) { + already_allocated = true; + break; + } } - } - if (already_allocated) - continue; + if (already_allocated) + continue; + } /* check the status of the OSP */ tgt = LTD_TGT(ltd, idx); @@ -1886,11 +1889,13 @@ static int lod_prep_md_striped_create(const struct lu_env *env, break; } - CDEBUG(D_INFO, "idx %d, mdt cnt %u," - " allocated %u, last allocated %d\n", idx, - lod->lod_remote_mdt_count, i, idx_array[i - 1]); - -next: + CDEBUG(D_INFO, "Get idx %d, for stripe %d "DFID"\n", + idx, i, PFID(&fid)); + idx_array[i] = idx; + /* Set the start index for next stripe allocation */ + if (i < stripe_count) + idx_array[i + 1] = (idx + 1) % + (lod->lod_remote_mdt_count + 1); /* tgt_dt and fid must be ready after search avaible OSP * in the above loop */ LASSERT(tgt_dt != NULL); @@ -1902,7 +1907,6 @@ next: if (IS_ERR(dto)) GOTO(out_put, rc = PTR_ERR(dto)); stripe[i] = dto; - idx_array[i] = idx; } lo->ldo_dir_striped = 1; @@ -3904,9 +3908,28 @@ static int lod_object_lock(const struct lu_env *env, res_id); einfo->ei_res_id = res_id; - LASSERT(lo->ldo_stripe[i]); - rc = dt_object_lock(env, lo->ldo_stripe[i], &lockh, einfo, - policy); + LASSERT(lo->ldo_stripe[i] != NULL); + if (likely(dt_object_remote(lo->ldo_stripe[i]))) { + rc = dt_object_lock(env, lo->ldo_stripe[i], &lockh, + einfo, policy); + } else { + struct ldlm_namespace *ns = einfo->ei_namespace; + ldlm_blocking_callback blocking = einfo->ei_cb_local_bl; + ldlm_completion_callback completion = einfo->ei_cb_cp; + __u64 dlmflags = LDLM_FL_ATOMIC_CB; + + /* This only happens if there are mulitple stripes + * on the master MDT, i.e. except stripe0, there are + * other stripes on the Master MDT as well, Only + * happens in the test case right now. */ + LASSERT(ns != NULL); + rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, + policy, einfo->ei_mode, + &dlmflags, blocking, + completion, NULL, + NULL, 0, LVB_T_NONE, + NULL, &lockh); + } if (rc != 0) GOTO(out, rc); slave_locks->lsl_handle[i] = lockh; diff --git a/lustre/lod/lod_sub_object.c b/lustre/lod/lod_sub_object.c index e86175e..6c46520 100644 --- a/lustre/lod/lod_sub_object.c +++ b/lustre/lod/lod_sub_object.c @@ -98,12 +98,14 @@ struct thandle *lod_sub_get_thandle(const struct lu_env *env, if (type == LU_SEQ_RANGE_OST) RETURN(tth->tt_master_sub_thandle); + sub_th = thandle_get_sub(env, th, sub_obj); + if (IS_ERR(sub_th)) + RETURN(sub_th); + if (tth->tt_multiple_thandle != NULL && record_update != NULL && th->th_result == 0) *record_update = true; - sub_th = thandle_get_sub(env, th, sub_obj); - RETURN(sub_th); } @@ -130,11 +132,16 @@ int lod_sub_object_declare_create(const struct lu_env *env, struct thandle *th) { struct thandle *sub_th; + bool record_update; - sub_th = lod_sub_get_thandle(env, th, dt, NULL); + sub_th = lod_sub_get_thandle(env, th, dt, &record_update); if (IS_ERR(sub_th)) return PTR_ERR(sub_th); + if (record_update) + update_record_size(env, create, th, lu_object_fid(&dt->do_lu), + attr, hint, dof); + return dt_declare_create(env, dt, attr, hint, dof, sub_th); } @@ -199,13 +206,17 @@ int lod_sub_object_declare_ref_add(const struct lu_env *env, struct thandle *th) { struct thandle *sub_th; + bool record_update; int rc; ENTRY; - sub_th = lod_sub_get_thandle(env, th, dt, NULL); + sub_th = lod_sub_get_thandle(env, th, dt, &record_update); if (IS_ERR(sub_th)) RETURN(PTR_ERR(sub_th)); + if (record_update) + update_record_size(env, ref_add, th, lu_object_fid(&dt->do_lu)); + rc = dt_declare_ref_add(env, dt, sub_th); RETURN(rc); @@ -265,13 +276,17 @@ int lod_sub_object_declare_ref_del(const struct lu_env *env, struct thandle *th) { struct thandle *sub_th; + bool record_update; int rc; ENTRY; - sub_th = lod_sub_get_thandle(env, th, dt, NULL); + sub_th = lod_sub_get_thandle(env, th, dt, &record_update); if (IS_ERR(sub_th)) RETURN(PTR_ERR(sub_th)); + if (record_update) + update_record_size(env, ref_del, th, lu_object_fid(&dt->do_lu)); + rc = dt_declare_ref_del(env, dt, sub_th); RETURN(rc); @@ -331,13 +346,18 @@ int lod_sub_object_declare_destroy(const struct lu_env *env, struct thandle *th) { struct thandle *sub_th; + bool record_update; int rc; ENTRY; - sub_th = lod_sub_get_thandle(env, th, dt, NULL); + sub_th = lod_sub_get_thandle(env, th, dt, &record_update); if (IS_ERR(sub_th)) RETURN(PTR_ERR(sub_th)); + if (record_update) + update_record_size(env, object_destroy, th, + lu_object_fid(&dt->do_lu)); + rc = dt_declare_destroy(env, dt, sub_th); RETURN(rc); @@ -401,11 +421,16 @@ int lod_sub_object_declare_insert(const struct lu_env *env, struct thandle *th) { struct thandle *sub_th; + bool record_update; - sub_th = lod_sub_get_thandle(env, th, dt, NULL); + sub_th = lod_sub_get_thandle(env, th, dt, &record_update); if (IS_ERR(sub_th)) return PTR_ERR(sub_th); + if (record_update) + update_record_size(env, index_insert, th, + lu_object_fid(&dt->do_lu), rec, key); + return dt_declare_insert(env, dt, rec, key, sub_th); } @@ -467,11 +492,16 @@ int lod_sub_object_declare_delete(const struct lu_env *env, struct thandle *th) { struct thandle *sub_th; + bool record_update; - sub_th = lod_sub_get_thandle(env, th, dt, NULL); + sub_th = lod_sub_get_thandle(env, th, dt, &record_update); if (IS_ERR(sub_th)) return PTR_ERR(sub_th); + if (record_update) + update_record_size(env, index_delete, th, + lu_object_fid(&dt->do_lu), key); + return dt_declare_delete(env, dt, key, sub_th); } @@ -533,13 +563,19 @@ int lod_sub_object_declare_xattr_set(const struct lu_env *env, struct thandle *th) { struct thandle *sub_th; + bool record_update; int rc; ENTRY; - sub_th = lod_sub_get_thandle(env, th, dt, NULL); + sub_th = lod_sub_get_thandle(env, th, dt, &record_update); if (IS_ERR(sub_th)) RETURN(PTR_ERR(sub_th)); + if (record_update) + update_record_size(env, xattr_set, th, + lu_object_fid(&dt->do_lu), + buf, name, fl); + rc = dt_declare_xattr_set(env, dt, buf, name, fl, sub_th); RETURN(rc); @@ -606,13 +642,18 @@ int lod_sub_object_declare_attr_set(const struct lu_env *env, struct thandle *th) { struct thandle *sub_th; + bool record_update; int rc; ENTRY; - sub_th = lod_sub_get_thandle(env, th, dt, NULL); + sub_th = lod_sub_get_thandle(env, th, dt, &record_update); if (IS_ERR(sub_th)) RETURN(PTR_ERR(sub_th)); + if (record_update) + update_record_size(env, attr_set, th, + lu_object_fid(&dt->do_lu), attr); + rc = dt_declare_attr_set(env, dt, attr, sub_th); RETURN(rc); @@ -677,13 +718,19 @@ int lod_sub_object_declare_xattr_del(const struct lu_env *env, struct thandle *th) { struct thandle *sub_th; + bool record_update; int rc; ENTRY; - sub_th = lod_sub_get_thandle(env, th, dt, NULL); + sub_th = lod_sub_get_thandle(env, th, dt, &record_update); if (IS_ERR(sub_th)) RETURN(PTR_ERR(sub_th)); + if (record_update) + update_record_size(env, xattr_del, th, + lu_object_fid(&dt->do_lu), + name); + rc = dt_declare_xattr_del(env, dt, name, sub_th); RETURN(rc); @@ -749,13 +796,19 @@ int lod_sub_object_declare_write(const struct lu_env *env, struct thandle *th) { struct thandle *sub_th; + bool record_update; int rc; ENTRY; - sub_th = lod_sub_get_thandle(env, th, dt, NULL); + sub_th = lod_sub_get_thandle(env, th, dt, &record_update); if (IS_ERR(sub_th)) RETURN(PTR_ERR(sub_th)); + if (record_update) + update_record_size(env, write, th, + lu_object_fid(&dt->do_lu), + buf, pos); + rc = dt_declare_write(env, dt, buf, pos, sub_th); RETURN(rc); @@ -821,13 +874,19 @@ int lod_sub_object_declare_punch(const struct lu_env *env, struct thandle *th) { struct thandle *sub_th; + bool record_update; int rc; ENTRY; - sub_th = lod_sub_get_thandle(env, th, dt, NULL); + sub_th = lod_sub_get_thandle(env, th, dt, &record_update); if (IS_ERR(sub_th)) RETURN(PTR_ERR(sub_th)); + if (record_update) + update_record_size(env, punch, th, + lu_object_fid(&dt->do_lu), + start, end); + rc = dt_declare_punch(env, dt, start, end, sub_th); RETURN(rc); diff --git a/lustre/mdt/mdt_reint.c b/lustre/mdt/mdt_reint.c index f206e6b..71e49f6 100644 --- a/lustre/mdt/mdt_reint.c +++ b/lustre/mdt/mdt_reint.c @@ -483,8 +483,10 @@ static int mdt_lock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj, einfo->ei_type = LDLM_IBITS; einfo->ei_mode = mode; einfo->ei_cb_bl = mdt_remote_blocking_ast; + einfo->ei_cb_local_bl = mdt_blocking_ast; einfo->ei_cb_cp = ldlm_completion_ast; einfo->ei_enq_slave = 1; + einfo->ei_namespace = mti->mti_mdt->mdt_namespace; memset(policy, 0, sizeof(*policy)); policy->l_inodebits.bits = ibits; diff --git a/lustre/obdclass/llog_osd.c b/lustre/obdclass/llog_osd.c index fb36839..0a09fb5 100644 --- a/lustre/obdclass/llog_osd.c +++ b/lustre/obdclass/llog_osd.c @@ -604,8 +604,9 @@ out_remote_unlock: if (rc < 0) GOTO(out, rc); - CDEBUG(D_OTHER, "added record "DOSTID": idx: %u, %u\n", - POSTID(&loghandle->lgh_id.lgl_oi), index, rec->lrh_len); + CDEBUG(D_OTHER, "added record "DOSTID": idx: %u, %u off"LPU64"\n", + POSTID(&loghandle->lgh_id.lgl_oi), index, rec->lrh_len, + lgi->lgi_off); if (reccookie != NULL) { reccookie->lgc_lgl = loghandle->lgh_id; reccookie->lgc_index = index; diff --git a/lustre/osp/osp_trans.c b/lustre/osp/osp_trans.c index be7db77..efe9a85 100644 --- a/lustre/osp/osp_trans.c +++ b/lustre/osp/osp_trans.c @@ -130,6 +130,9 @@ int osp_object_update_request_create(struct osp_update_request *our, if (ours == NULL) return -ENOMEM; + if (size < OUT_UPDATE_INIT_BUFFER_SIZE) + size = OUT_UPDATE_INIT_BUFFER_SIZE; + ours->ours_req = object_update_request_alloc(size); if (IS_ERR(ours->ours_req)) { diff --git a/lustre/ptlrpc/wiretest.c b/lustre/ptlrpc/wiretest.c index 49b97cc..32fe7c9 100644 --- a/lustre/ptlrpc/wiretest.c +++ b/lustre/ptlrpc/wiretest.c @@ -3729,18 +3729,6 @@ void lustre_assert_wire_constants(void) (long long)(int)offsetof(struct llog_log_hdr, llh_tgtuuid)); LASSERTF((int)sizeof(((struct llog_log_hdr *)0)->llh_tgtuuid) == 40, "found %lld\n", (long long)(int)sizeof(((struct llog_log_hdr *)0)->llh_tgtuuid)); - LASSERTF((int)offsetof(struct llog_log_hdr, llh_reserved) == 84, "found %lld\n", - (long long)(int)offsetof(struct llog_log_hdr, llh_reserved)); - LASSERTF((int)sizeof(((struct llog_log_hdr *)0)->llh_reserved) == 4, "found %lld\n", - (long long)(int)sizeof(((struct llog_log_hdr *)0)->llh_reserved)); - LASSERTF((int)offsetof(struct llog_log_hdr, llh_bitmap) == 88, "found %lld\n", - (long long)(int)offsetof(struct llog_log_hdr, llh_bitmap)); - LASSERTF((int)sizeof(((struct llog_log_hdr *)0)->llh_bitmap) == 8096, "found %lld\n", - (long long)(int)sizeof(((struct llog_log_hdr *)0)->llh_bitmap)); - LASSERTF((int)offsetof(struct llog_log_hdr, llh_tail) == 8184, "found %lld\n", - (long long)(int)offsetof(struct llog_log_hdr, llh_tail)); - LASSERTF((int)sizeof(((struct llog_log_hdr *)0)->llh_tail) == 8, "found %lld\n", - (long long)(int)sizeof(((struct llog_log_hdr *)0)->llh_tail)); /* Checks for struct llog_cookie */ LASSERTF((int)sizeof(struct llog_cookie) == 32, "found %lld\n", @@ -4084,10 +4072,10 @@ void lustre_assert_wire_constants(void) (long long)(int)offsetof(struct fiemap_extent, fe_flags)); LASSERTF((int)sizeof(((struct fiemap_extent *)0)->fe_flags) == 4, "found %lld\n", (long long)(int)sizeof(((struct fiemap_extent *)0)->fe_flags)); - LASSERTF((int)offsetof(struct fiemap_extent, fe_device) == 44, "found %lld\n", - (long long)(int)offsetof(struct fiemap_extent, fe_device)); - LASSERTF((int)sizeof(((struct fiemap_extent *)0)->fe_device) == 4, "found %lld\n", - (long long)(int)sizeof(((struct fiemap_extent *)0)->fe_device)); + LASSERTF((int)offsetof(struct fiemap_extent, fe_reserved[0]) == 44, "found %lld\n", + (long long)(int)offsetof(struct fiemap_extent, fe_reserved[0])); + LASSERTF((int)sizeof(((struct fiemap_extent *)0)->fe_reserved[0]) == 4, "found %lld\n", + (long long)(int)sizeof(((struct fiemap_extent *)0)->fe_reserved[0])); CLASSERT(FIEMAP_EXTENT_LAST == 0x00000001); CLASSERT(FIEMAP_EXTENT_UNKNOWN == 0x00000002); CLASSERT(FIEMAP_EXTENT_DELALLOC == 0x00000004); diff --git a/lustre/target/out_lib.c b/lustre/target/out_lib.c index 62e54d7..e6a08791 100644 --- a/lustre/target/out_lib.c +++ b/lustre/target/out_lib.c @@ -57,6 +57,7 @@ const char *update_op_str(__u16 opc) [OUT_XATTR_DEL] = "xattr_del", [OUT_PUNCH] = "punch", [OUT_READ] = "read", + [OUT_NOOP] = "noop", }; if (opc < ARRAY_SIZE(opc_str) && opc_str[opc] != NULL) diff --git a/lustre/target/update_records.c b/lustre/target/update_records.c index 698dacb..d953f85 100644 --- a/lustre/target/update_records.c +++ b/lustre/target/update_records.c @@ -63,12 +63,9 @@ void update_records_dump(const struct update_records *records, { const struct update_ops *ops; const struct update_op *op = NULL; - struct update_params *params; + struct update_params *params = NULL; unsigned int i; - ops = &records->ur_ops; - params = update_records_get_params(records); - CDEBUG(mask, "master transno = "LPU64" batchid = "LPU64" flags = %x" " ops = %d params = %d\n", records->ur_master_transno, records->ur_batchid, records->ur_flags, records->ur_update_count, @@ -80,27 +77,34 @@ void update_records_dump(const struct update_records *records, if (!dump_updates) return; + ops = &records->ur_ops; + if (records->ur_param_count > 0) + params = update_records_get_params(records); + op = &ops->uops_op[0]; - for (i = 0; i < records->ur_update_count; i++) { + for (i = 0; i < records->ur_update_count; i++, + op = update_op_next_op(op)) { unsigned int j; CDEBUG(mask, "update %dth "DFID" %s params_count = %hu\n", i, PFID(&op->uop_fid), update_op_str(op->uop_type), op->uop_param_count); + if (params == NULL) + continue; + for (j = 0; j < op->uop_param_count; j++) { struct object_update_param *param; param = update_params_get_param(params, - (unsigned int)op->uop_params_off[j], + (unsigned int)op->uop_params_off[j], records->ur_param_count); - LASSERT(param != NULL); + if (param == NULL) + continue; CDEBUG(mask, "param = %p %dth off = %hu size = %hu\n", param, j, op->uop_params_off[j], param->oup_len); } - - op = update_op_next_op(op); } } @@ -239,6 +243,66 @@ static int update_records_update_pack(const struct lu_env *env, } /** + * Calculate update_records size + * + * Calculate update_records size by param_count and param_sizes array. + * + * \param[in] param_count the count of parameters + * \param[in] sizes the size array of these parameters + * + * \retval the size of this update + */ +static size_t update_records_update_size(__u32 param_count, size_t *sizes) +{ + int i; + size_t size; + + /* Check whether the packing exceeding the maximum update size */ + size = update_op_size(param_count); + + for (i = 0; i < param_count; i++) + size += cfs_size_round(sizeof(struct object_update_param) + + sizes[i]); + + return size; +} + +/** + * Calculate create update size + * + * \param[in] env execution environment + * \param[in] ops ur_ops in update records + * \param[in] fid FID of the object to be created + * \param[in] attr attribute of the object to be created + * \param[in] hint creation hint + * \param[in] dof creation format information + * + * \retval size of create update. + */ +size_t update_records_create_size(const struct lu_env *env, + const struct lu_fid *fid, + const struct lu_attr *attr, + const struct dt_allocation_hint *hint, + struct dt_object_format *dof) +{ + size_t sizes[2]; + int param_count = 0; + + if (attr != NULL) { + sizes[param_count] = sizeof(struct obdo); + param_count++; + } + + if (hint != NULL && hint->dah_parent != NULL) { + sizes[param_count] = sizeof(*fid); + param_count++; + } + + return update_records_update_size(param_count, sizes); +} +EXPORT_SYMBOL(update_records_create_size); + +/** * Pack create update * * Pack create update into update records. @@ -304,6 +368,26 @@ int update_records_create_pack(const struct lu_env *env, EXPORT_SYMBOL(update_records_create_pack); /** + * Calculate attr set update size + * + * \param[in] env execution environment + * \param[in] ops ur_ops in update records + * \param[in] fid FID of the object to set attr + * \param[in] attr attribute of attr set + * + * \retval size of attr set update. + */ +size_t update_records_attr_set_size(const struct lu_env *env, + const struct lu_fid *fid, + const struct lu_attr *attr) +{ + size_t size = sizeof(struct obdo); + + return update_records_update_size(1, &size); +} +EXPORT_SYMBOL(update_records_attr_set_size); + +/** * Pack attr set update * * Pack attr_set update into update records. @@ -345,6 +429,21 @@ int update_records_attr_set_pack(const struct lu_env *env, EXPORT_SYMBOL(update_records_attr_set_pack); /** + * Calculate ref add update size + * + * \param[in] env execution environment + * \param[in] fid FID of the object to add reference + * + * \retval size of ref_add udpate. + */ +size_t update_records_ref_add_size(const struct lu_env *env, + const struct lu_fid *fid) +{ + return update_records_update_size(0, NULL); +} +EXPORT_SYMBOL(update_records_ref_add_size); + +/** * Pack ref add update * * Pack ref add update into update records. @@ -377,6 +476,55 @@ int update_records_ref_add_pack(const struct lu_env *env, EXPORT_SYMBOL(update_records_ref_add_pack); /** + * Pack noop update + * + * Pack no op update into update records. Note: no op means + * the update does not need do anything, which is only used + * in test case to verify large size record. + * + * \param[in] env execution environment + * \param[in] ops ur_ops in update records + * \param[in|out] op_count pointer to the count of ops + * \param[in|out] max_op_size maximum size of the update + * \param[in] params ur_params in update records + * \param[in|out] param_count pointer to the count of params + * \param[in|out] max_param_size maximum size of the parameter + * \param[in] fid FID of the object to add reference + * + * \retval 0 if packing succeeds. + * \retval negative errno if packing fails. + */ +int update_records_noop_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid) +{ + return update_records_update_pack(env, fid, OUT_NOOP, ops, op_count, + max_ops_size, params, param_count, + max_param_size, 0, NULL, NULL); +} +EXPORT_SYMBOL(update_records_noop_pack); + +/** + * Calculate ref del update size + * + * \param[in] env execution environment + * \param[in] fid FID of the object to delete reference + * + * \retval size of ref_del update. + */ +size_t update_records_ref_del_size(const struct lu_env *env, + const struct lu_fid *fid) +{ + return update_records_update_size(0, NULL); +} +EXPORT_SYMBOL(update_records_ref_del_size); + +/** * Pack ref del update * * Pack ref del update into update records. @@ -409,6 +557,21 @@ int update_records_ref_del_pack(const struct lu_env *env, EXPORT_SYMBOL(update_records_ref_del_pack); /** + * Calculate object destroy update size + * + * \param[in] env execution environment + * \param[in] fid FID of the object to delete reference + * + * \retval size of object destroy update. + */ +size_t update_records_object_destroy_size(const struct lu_env *env, + const struct lu_fid *fid) +{ + return update_records_update_size(0, NULL); +} +EXPORT_SYMBOL(update_records_object_destroy_size); + +/** * Pack object destroy update * * Pack object destroy update into update records. @@ -441,6 +604,28 @@ int update_records_object_destroy_pack(const struct lu_env *env, EXPORT_SYMBOL(update_records_object_destroy_pack); /** + * Calculate index insert update size + * + * \param[in] env execution environment + * \param[in] fid FID of the object to insert index + * \param[in] rec record of insertion + * \param[in] key key of insertion + * + * \retval the size of index insert update. + */ +size_t update_records_index_insert_size(const struct lu_env *env, + const struct lu_fid *fid, + const struct dt_rec *rec, + const struct dt_key *key) +{ + size_t sizes[3] = { strlen((const char *)key) + 1, + sizeof(struct lu_fid), + sizeof(__u32) }; + return update_records_update_size(3, sizes); +} +EXPORT_SYMBOL(update_records_index_insert_size); + +/** * Pack index insert update * * Pack index insert update into update records. @@ -490,6 +675,25 @@ int update_records_index_insert_pack(const struct lu_env *env, EXPORT_SYMBOL(update_records_index_insert_pack); /** + * Calculate index delete update size + * + * \param[in] env execution environment + * \param[in] fid FID of the object to delete index + * \param[in] key key of deletion + * + * \retval the size of index delete update + */ +size_t update_records_index_delete_size(const struct lu_env *env, + const struct lu_fid *fid, + const struct dt_key *key) +{ + size_t size = strlen((const char *)key) + 1; + + return update_records_update_size(1, &size); +} +EXPORT_SYMBOL(update_records_index_delete_size); + +/** * Pack index delete update * * Pack index delete update into update records. @@ -527,6 +731,28 @@ int update_records_index_delete_pack(const struct lu_env *env, EXPORT_SYMBOL(update_records_index_delete_pack); /** + * Calculate xattr set size + * + * \param[in] env execution environment + * \param[in] fid FID of the object to set xattr + * \param[in] buf xattr to be set + * \param[in] name name of the xattr + * \param[in] flag flag for setting xattr + * + * \retval size of xattr set update. + */ +size_t update_records_xattr_set_size(const struct lu_env *env, + const struct lu_fid *fid, + const struct lu_buf *buf, + const char *name, __u32 flag) +{ + size_t sizes[3] = {strlen(name) + 1, buf->lb_len, sizeof(flag)}; + + return update_records_update_size(3, sizes); +} +EXPORT_SYMBOL(update_records_xattr_set_size); + +/** * Pack xattr set update * * Pack xattr set update into update records. @@ -570,6 +796,25 @@ int update_records_xattr_set_pack(const struct lu_env *env, EXPORT_SYMBOL(update_records_xattr_set_pack); /** + * Calculate xattr delete update size. + * + * \param[in] env execution environment + * \param[in] fid FID of the object to delete xattr + * \param[in] name name of the xattr + * + * \retval size of xattr delet updatee. + */ +size_t update_records_xattr_del_size(const struct lu_env *env, + const struct lu_fid *fid, + const char *name) +{ + size_t size = strlen(name) + 1; + + return update_records_update_size(1, &size); +} +EXPORT_SYMBOL(update_records_xattr_del_size); + +/** * Pack xattr delete update * * Pack xattr delete update into update records. @@ -607,6 +852,27 @@ int update_records_xattr_del_pack(const struct lu_env *env, EXPORT_SYMBOL(update_records_xattr_del_pack); /** + * Calculate write update size + * + * \param[in] env execution environment + * \param[in] fid FID of the object to write into + * \param[in] buf buffer to write which includes an embedded size field + * \param[in] pos offet in the object to start writing at + * + * \retval size of write udpate. + */ +size_t update_records_write_size(const struct lu_env *env, + const struct lu_fid *fid, + const struct lu_buf *buf, + __u64 pos) +{ + size_t sizes[2] = {buf->lb_len, sizeof(pos)}; + + return update_records_update_size(2, sizes); +} +EXPORT_SYMBOL(update_records_write_size); + +/** * Pack write update * * Pack write update into update records. @@ -649,6 +915,26 @@ int update_records_write_pack(const struct lu_env *env, EXPORT_SYMBOL(update_records_write_pack); /** + * Calculate size of punch update. + * + * \param[in] env execution environment + * \param[in] fid FID of the object to write into + * \param[in] start start offset of punch + * \param[in] end end offet of punch + * + * \retval size of update punch. + */ +size_t update_records_punch_size(const struct lu_env *env, + const struct lu_fid *fid, + __u64 start, __u64 end) +{ + size_t sizes[2] = {sizeof(start), sizeof(end)}; + + return update_records_update_size(2, sizes); +} +EXPORT_SYMBOL(update_records_punch_size); + +/** * Pack punch * * Pack punch update into update records. @@ -774,8 +1060,7 @@ int tur_update_extend(struct thandle_update_records *tur, record_size = llog_update_record_size(tur->tur_update_records); /* extend update records buffer */ - if (new_op_size > (tur->tur_update_records_buf_size - record_size - - sizeof(*tur->tur_update_records))) { + if (new_op_size >= (tur->tur_update_records_buf_size - record_size)) { extend_size = round_up(new_op_size, UPDATE_RECORDS_BUFFER_SIZE); rc = tur_update_records_extend(tur, tur->tur_update_records_buf_size + @@ -787,7 +1072,7 @@ int tur_update_extend(struct thandle_update_records *tur, /* extend parameters buffer */ params_size = update_params_size(tur->tur_update_params, tur->tur_update_param_count); - if (new_param_size > (tur->tur_update_params_buf_size - + if (new_param_size >= (tur->tur_update_params_buf_size - params_size)) { extend_size = round_up(new_param_size, UPDATE_PARAMS_BUFFER_SIZE); diff --git a/lustre/target/update_recovery.c b/lustre/target/update_recovery.c index 2fd87d7b..cad0183 100644 --- a/lustre/target/update_recovery.c +++ b/lustre/target/update_recovery.c @@ -193,6 +193,36 @@ dtrq_sub_lookup(struct distribute_txn_replay_req *dtrq, __u32 mdt_index) } /** + * Try to add cookie to sub distribute txn request + * + * Check if the update log cookie has been added to the request, if not, + * add it to the dtrqs_cookie_list. + * + * \param[in] dtrqs sub replay req where cookies to be added. + * \param[in] cookie cookie to be added. + * + * \retval 0 if the cookie is adding succeeds. + * \retval negative errno if adding fails. + */ +static int dtrq_sub_add_cookie(struct distribute_txn_replay_req_sub *dtrqs, + struct llog_cookie *cookie) +{ + struct sub_thandle_cookie *new; + + OBD_ALLOC_PTR(new); + if (new == NULL) + return -ENOMEM; + + INIT_LIST_HEAD(&new->stc_list); + new->stc_cookie = *cookie; + /* Note: only single thread will access one sub_request each time, + * so no need lock here */ + list_add(&new->stc_list, &dtrqs->dtrqs_cookie_list); + + return 0; +} + +/** * Insert distribute txn sub req replay * * Allocate sub replay req and insert distribute txn replay list. @@ -209,31 +239,102 @@ dtrq_sub_create_and_insert(struct distribute_txn_replay_req *dtrq, struct llog_cookie *cookie, __u32 mdt_index) { - struct distribute_txn_replay_req_sub *dtrqs = NULL; - struct distribute_txn_replay_req_sub *new; + struct distribute_txn_replay_req_sub *dtrqs = NULL; + struct distribute_txn_replay_req_sub *new; + int rc; ENTRY; spin_lock(&dtrq->dtrq_sub_list_lock); dtrqs = dtrq_sub_lookup(dtrq, mdt_index); spin_unlock(&dtrq->dtrq_sub_list_lock); - if (dtrqs != NULL) + if (dtrqs != NULL) { + rc = dtrq_sub_add_cookie(dtrqs, cookie); RETURN(0); + } OBD_ALLOC_PTR(new); if (new == NULL) RETURN(-ENOMEM); INIT_LIST_HEAD(&new->dtrqs_list); + INIT_LIST_HEAD(&new->dtrqs_cookie_list); new->dtrqs_mdt_index = mdt_index; - new->dtrqs_llog_cookie = *cookie; spin_lock(&dtrq->dtrq_sub_list_lock); dtrqs = dtrq_sub_lookup(dtrq, mdt_index); - if (dtrqs == NULL) + if (dtrqs == NULL) { list_add(&new->dtrqs_list, &dtrq->dtrq_sub_list); - else + dtrqs = new; + } else { OBD_FREE_PTR(new); + } spin_unlock(&dtrq->dtrq_sub_list_lock); + rc = dtrq_sub_add_cookie(dtrqs, cookie); + + RETURN(rc); +} + +/** + * append updates to the current replay updates + * + * Append more updates to the existent replay update. And this is only + * used when combining mulitple updates into one large updates during + * replay. + * + * \param[in] dtrq the update replay request where the new update + * records will be added. + * \param[in] lur the new update record. + * + * \retval 0 if appending succeeds. + * \retval negative errno if appending fails. + */ +static int dtrq_append_updates(struct distribute_txn_replay_req *dtrq, + struct update_records *record) +{ + struct llog_update_record *new_lur; + size_t lur_size = dtrq->dtrq_lur_size; + void *ptr; + ENTRY; + + /* Because several threads might retrieve the same records from + * different targets, and we only need one copy of records. So + * we will check if the records is in the next one, if not, just + * skip it */ + spin_lock(&dtrq->dtrq_sub_list_lock); + if (dtrq->dtrq_lur->lur_update_rec.ur_index + 1 != record->ur_index) { + spin_unlock(&dtrq->dtrq_sub_list_lock); + RETURN(0); + } + dtrq->dtrq_lur->lur_update_rec.ur_index++; + spin_unlock(&dtrq->dtrq_sub_list_lock); + + lur_size += update_records_size(record); + OBD_ALLOC_LARGE(new_lur, lur_size); + if (new_lur == NULL) { + spin_lock(&dtrq->dtrq_sub_list_lock); + dtrq->dtrq_lur->lur_update_rec.ur_index--; + spin_unlock(&dtrq->dtrq_sub_list_lock); + RETURN(-ENOMEM); + } + + /* Copy the old and new records to the new allocated buffer */ + memcpy(new_lur, dtrq->dtrq_lur, dtrq->dtrq_lur_size); + ptr = (char *)&new_lur->lur_update_rec + + update_records_size(&new_lur->lur_update_rec); + memcpy(ptr, &record->ur_ops, + update_records_size(record) - + offsetof(struct update_records, ur_ops)); + + new_lur->lur_update_rec.ur_update_count += record->ur_update_count; + new_lur->lur_update_rec.ur_param_count += record->ur_param_count; + new_lur->lur_hdr.lrh_len = llog_update_record_size(new_lur); + + /* Replace the records */ + OBD_FREE_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size); + dtrq->dtrq_lur = new_lur; + dtrq->dtrq_lur_size = lur_size; + dtrq->dtrq_lur->lur_update_rec.ur_flags = record->ur_flags; + update_records_dump(&new_lur->lur_update_rec, D_INFO, true); RETURN(0); } @@ -266,7 +367,8 @@ insert_update_records_to_replay_list(struct target_distribute_txn_data *tdtd, CDEBUG(D_HA, "%s: insert record batchid = "LPU64" transno = "LPU64 " mdt_index %u\n", tdtd->tdtd_lut->lut_obd->obd_name, record->ur_batchid, record->ur_master_transno, mdt_index); -again: + + /* First try to build the replay update request with the records */ spin_lock(&tdtd->tdtd_replay_list_lock); dtrq = dtrq_lookup(tdtd, record->ur_batchid); spin_unlock(&tdtd->tdtd_replay_list_lock); @@ -286,27 +388,38 @@ again: spin_lock(&tdtd->tdtd_replay_list_lock); rc = dtrq_insert(tdtd, dtrq); spin_unlock(&tdtd->tdtd_replay_list_lock); - } else if (record->ur_master_transno != 0 && - dtrq->dtrq_lur->lur_update_rec.ur_master_transno != - record->ur_master_transno) { - /* If the master transno in update header is not matched with - * the one in the record, then it means the dtrq is originally - * created by master record, and we need update master transno - * and reposition the dtrq(by master transno). */ - dtrq->dtrq_lur->lur_update_rec.ur_master_transno = - record->ur_master_transno; - list_del_init(&dtrq->dtrq_list); - spin_lock(&tdtd->tdtd_replay_list_lock); - rc = dtrq_insert(tdtd, dtrq); - spin_unlock(&tdtd->tdtd_replay_list_lock); - } + if (rc == -EEXIST) { + /* Some one else already add the record */ + dtrq_destroy(dtrq); + rc = 0; + } + } else { + struct update_records *dtrq_rec; + + /* If the master transno in update header is not + * matched with the one in the record, then it means + * the dtrq is originally created by master record, + * and we need update master transno and reposition + * the dtrq(by master transno). */ + dtrq_rec = &dtrq->dtrq_lur->lur_update_rec; + if (record->ur_master_transno != 0 && + dtrq_rec->ur_master_transno != record->ur_master_transno) { + dtrq_rec->ur_master_transno = record->ur_master_transno; + spin_lock(&tdtd->tdtd_replay_list_lock); + list_del_init(&dtrq->dtrq_list); + rc = dtrq_insert(tdtd, dtrq); + spin_unlock(&tdtd->tdtd_replay_list_lock); + if (rc < 0) + return rc; + } - if (rc == -EEXIST) { - dtrq_destroy(dtrq); - rc = 0; - goto again; + /* This is a partial update records, let's try to append + * the record to the current replay request */ + if (record->ur_flags & UPDATE_RECORD_CONTINUE) + rc = dtrq_append_updates(dtrq, record); } + /* Then create and add sub update request */ rc = dtrq_sub_create_and_insert(dtrq, cookie, mdt_index); RETURN(rc); @@ -350,7 +463,15 @@ void dtrq_destroy(struct distribute_txn_replay_req *dtrq) LASSERT(list_empty(&dtrq->dtrq_list)); spin_lock(&dtrq->dtrq_sub_list_lock); list_for_each_entry_safe(dtrqs, tmp, &dtrq->dtrq_sub_list, dtrqs_list) { + struct sub_thandle_cookie *stc; + struct sub_thandle_cookie *tmp; + list_del(&dtrqs->dtrqs_list); + list_for_each_entry_safe(stc, tmp, &dtrqs->dtrqs_cookie_list, + stc_list) { + list_del(&stc->stc_list); + OBD_FREE_PTR(stc); + } OBD_FREE_PTR(dtrqs); } spin_unlock(&dtrq->dtrq_sub_list_lock); @@ -496,8 +617,15 @@ static int update_is_committed(const struct lu_env *env, dtrqs = dtrq_sub_lookup(dtrq, mdt_index); if (dtrqs != NULL || top_th->tt_multiple_thandle->tmt_committed) { st->st_committed = 1; - if (dtrqs != NULL) - st->st_cookie = dtrqs->dtrqs_llog_cookie; + if (dtrqs != NULL) { + struct sub_thandle_cookie *stc; + struct sub_thandle_cookie *tmp; + + list_for_each_entry_safe(stc, tmp, + &dtrqs->dtrqs_cookie_list, + stc_list) + list_move(&stc->stc_list, &st->st_cookie_list); + } RETURN(0); } @@ -886,6 +1014,9 @@ static int update_recovery_exec(const struct lu_env *env, struct dt_device *sub_dt; struct sub_thandle *st; + if (op->uop_type == OUT_NOOP) + continue; + dt_obj = dt_locate(env, tdtd->tdtd_dt, fid); if (IS_ERR(dt_obj)) { rc = PTR_ERR(dt_obj); diff --git a/lustre/target/update_trans.c b/lustre/target/update_trans.c index 28bf6be..001f21d 100644 --- a/lustre/target/update_trans.c +++ b/lustre/target/update_trans.c @@ -81,12 +81,17 @@ static void top_multiple_thandle_dump(struct top_multiple_thandle *tmt, tmt->tmt_result, tmt->tmt_batchid); list_for_each_entry(st, &tmt->tmt_sub_thandle_list, st_sub_list) { - CDEBUG(mask, "st %p obd %s committed %d sub_th %p " - " cookie "DOSTID": %u\n", + struct sub_thandle_cookie *stc; + + CDEBUG(mask, "st %p obd %s committed %d sub_th %p\n", st, st->st_dt->dd_lu_dev.ld_obd->obd_name, - st->st_committed, st->st_sub_th, - POSTID(&st->st_cookie.lgc_lgl.lgl_oi), - st->st_cookie.lgc_index); + st->st_committed, st->st_sub_th); + + list_for_each_entry(stc, &st->st_cookie_list, stc_list) { + CDEBUG(mask, " cookie "DOSTID": %u\n", + POSTID(&stc->stc_cookie.lgc_lgl.lgl_oi), + stc->stc_cookie.lgc_index); + } } } @@ -99,16 +104,18 @@ static void top_multiple_thandle_dump(struct top_multiple_thandle *tmt, * \param[in] env execution environment * \param[in] record update records being written * \param[in] sub_th sub transaction handle + * \param[in] record_size total update record size * * \retval 0 if writing succeeds * \retval negative errno if writing fails */ static int sub_declare_updates_write(const struct lu_env *env, struct llog_update_record *record, - struct thandle *sub_th) + struct thandle *sub_th, size_t record_size) { struct llog_ctxt *ctxt; struct dt_device *dt = sub_th->th_dev; + int left = record_size; int rc; /* If ctxt is NULL, it means not need to write update, @@ -118,14 +125,24 @@ static int sub_declare_updates_write(const struct lu_env *env, LASSERT(ctxt != NULL); /* Not ready to record updates yet. */ - if (ctxt->loc_handle == NULL) { - llog_ctxt_put(ctxt); - return 0; - } + if (ctxt->loc_handle == NULL) + GOTO(out_put, rc = 0); - rc = llog_declare_add(env, ctxt->loc_handle, &record->lur_hdr, - sub_th); + rc = llog_declare_add(env, ctxt->loc_handle, + &record->lur_hdr, sub_th); + if (rc < 0) + GOTO(out_put, rc); + + while (left > ctxt->loc_chunk_size) { + rc = llog_declare_add(env, ctxt->loc_handle, + &record->lur_hdr, sub_th); + if (rc < 0) + GOTO(out_put, rc); + left -= ctxt->loc_chunk_size; + } + +out_put: llog_ctxt_put(ctxt); return rc; @@ -148,12 +165,21 @@ static int sub_declare_updates_write(const struct lu_env *env, */ static int sub_updates_write(const struct lu_env *env, struct llog_update_record *record, - struct thandle *sub_th, - struct llog_cookie *cookie) + struct sub_thandle *sub_th) { - struct dt_device *dt = sub_th->th_dev; + struct dt_device *dt = sub_th->st_dt; struct llog_ctxt *ctxt; int rc; + struct llog_update_record *lur = NULL; + struct update_params *params = NULL; + __u32 update_count = 0; + __u32 param_count = 0; + __u32 last_update_count = 0; + __u32 last_param_count = 0; + void *src; + void *start; + void *next; + struct sub_thandle_cookie *stc; ENTRY; ctxt = llog_get_context(dt->dd_lu_dev.ld_obd, @@ -162,10 +188,8 @@ static int sub_updates_write(const struct lu_env *env, /* Not ready to record updates yet, usually happens * in error handler path */ - if (ctxt->loc_handle == NULL) { - llog_ctxt_put(ctxt); - RETURN(0); - } + if (ctxt->loc_handle == NULL) + GOTO(llog_put, rc = 0); /* Since the cross-MDT updates will includes both local * and remote updates, the update ops count must > 1 */ @@ -174,16 +198,124 @@ static int sub_updates_write(const struct lu_env *env, "lrh_len %u record_size %zu\n", record->lur_hdr.lrh_len, llog_update_record_size(record)); - rc = llog_add(env, ctxt->loc_handle, &record->lur_hdr, - cookie, sub_th); - llog_ctxt_put(ctxt); + if (likely(record->lur_hdr.lrh_len <= ctxt->loc_chunk_size)) { + OBD_ALLOC_PTR(stc); + if (stc == NULL) + GOTO(llog_put, rc = -ENOMEM); + INIT_LIST_HEAD(&stc->stc_list); + + rc = llog_add(env, ctxt->loc_handle, &record->lur_hdr, + &stc->stc_cookie, sub_th->st_sub_th); + + CDEBUG(D_INFO, "%s: Add update log "DOSTID":%u: rc = %d\n", + dt->dd_lu_dev.ld_obd->obd_name, + POSTID(&stc->stc_cookie.lgc_lgl.lgl_oi), + stc->stc_cookie.lgc_index, rc); + + if (rc > 0) { + list_add(&stc->stc_list, &sub_th->st_cookie_list); + rc = 0; + } else { + OBD_FREE_PTR(stc); + } + + GOTO(llog_put, rc); + } - CDEBUG(D_INFO, "%s: Add update log "DOSTID":%u.\n", - dt->dd_lu_dev.ld_obd->obd_name, - POSTID(&cookie->lgc_lgl.lgl_oi), cookie->lgc_index); + /* Split the records into chunk_size update record */ + OBD_ALLOC_LARGE(lur, ctxt->loc_chunk_size); + if (lur == NULL) + GOTO(llog_put, rc = -ENOMEM); - if (rc > 0) - rc = 0; + memcpy(lur, &record->lur_hdr, sizeof(record->lur_hdr)); + lur->lur_update_rec.ur_update_count = 0; + lur->lur_update_rec.ur_param_count = 0; + src = &record->lur_update_rec.ur_ops; + start = next = src; + lur->lur_hdr.lrh_len = llog_update_record_size(lur); + params = update_records_get_params(&record->lur_update_rec); + do { + size_t rec_len; + + if (update_count < record->lur_update_rec.ur_update_count) { + next = update_op_next_op((struct update_op *)src); + } else { + if (param_count == 0) + next = update_records_get_params( + &record->lur_update_rec); + else + next = (char *)src + + object_update_param_size( + (struct object_update_param *)src); + } + + rec_len = cfs_size_round((unsigned long)(next - src)); + /* If its size > llog chunk_size, then write current chunk to + * the update llog. */ + if (lur->lur_hdr.lrh_len + rec_len + LLOG_MIN_REC_SIZE > + ctxt->loc_chunk_size || + param_count == record->lur_update_rec.ur_param_count) { + lur->lur_update_rec.ur_update_count = + update_count > last_update_count ? + update_count - last_update_count : 0; + lur->lur_update_rec.ur_param_count = param_count - + last_param_count; + + memcpy(&lur->lur_update_rec.ur_ops, start, + (unsigned long)(src - start)); + if (last_update_count != 0) + lur->lur_update_rec.ur_flags |= + UPDATE_RECORD_CONTINUE; + + update_records_dump(&lur->lur_update_rec, D_INFO, true); + lur->lur_hdr.lrh_len = llog_update_record_size(lur); + LASSERT(lur->lur_hdr.lrh_len <= ctxt->loc_chunk_size); + + OBD_ALLOC_PTR(stc); + if (stc == NULL) + GOTO(llog_put, rc = -ENOMEM); + INIT_LIST_HEAD(&stc->stc_list); + + rc = llog_add(env, ctxt->loc_handle, + &lur->lur_hdr, + &stc->stc_cookie, sub_th->st_sub_th); + + CDEBUG(D_INFO, "%s: Add update log "DOSTID":%u" + " rc = %d\n", dt->dd_lu_dev.ld_obd->obd_name, + POSTID(&stc->stc_cookie.lgc_lgl.lgl_oi), + stc->stc_cookie.lgc_index, rc); + + if (rc > 0) { + list_add(&stc->stc_list, + &sub_th->st_cookie_list); + rc = 0; + } else { + OBD_FREE_PTR(stc); + GOTO(llog_put, rc); + } + + last_update_count = update_count; + last_param_count = param_count; + start = src; + lur->lur_update_rec.ur_update_count = 0; + lur->lur_update_rec.ur_param_count = 0; + lur->lur_hdr.lrh_len = llog_update_record_size(lur); + } + + src = next; + lur->lur_hdr.lrh_len += cfs_size_round(rec_len); + if (update_count < record->lur_update_rec.ur_update_count) + update_count++; + else if (param_count < record->lur_update_rec.ur_param_count) + param_count++; + else + break; + } while (1); + +llog_put: + if (lur != NULL) + OBD_FREE_LARGE(lur, ctxt->loc_chunk_size); + llog_ctxt_put(ctxt); RETURN(rc); } @@ -308,6 +440,7 @@ struct sub_thandle *create_sub_thandle(struct top_multiple_thandle *tmt, RETURN(ERR_PTR(-ENOMEM)); INIT_LIST_HEAD(&st->st_sub_list); + INIT_LIST_HEAD(&st->st_cookie_list); st->st_dt = dt_dev; list_add(&st->st_sub_list, &tmt->tmt_sub_thandle_list); @@ -503,7 +636,8 @@ static int declare_updates_write(const struct lu_env *env, if (st->st_sub_th == NULL) continue; - rc = sub_declare_updates_write(env, record, st->st_sub_th); + rc = sub_declare_updates_write(env, record, st->st_sub_th, + tmt->tmt_record_size); if (rc < 0) break; } @@ -814,8 +948,7 @@ int top_trans_stop(const struct lu_env *env, struct dt_device *master_dev, lur = tur->tur_update_records; /* Write updates to the master MDT */ - rc = sub_updates_write(env, lur, master_st->st_sub_th, - &master_st->st_cookie); + rc = sub_updates_write(env, lur, master_st); /* Cleanup the common parameters in the update records, * master transno callback might add more parameters. @@ -877,8 +1010,7 @@ stop_master_trans: st->st_sub_th->th_result < 0) continue; - rc = sub_updates_write(env, lur, st->st_sub_th, - &st->st_cookie); + rc = sub_updates_write(env, lur, st); if (rc < 0) { th->th_result = rc; break; @@ -1052,7 +1184,15 @@ void top_multiple_thandle_destroy(struct top_multiple_thandle *tmt) LASSERT(tmt->tmt_magic == TOP_THANDLE_MAGIC); list_for_each_entry_safe(st, tmp, &tmt->tmt_sub_thandle_list, st_sub_list) { + struct sub_thandle_cookie *stc; + struct sub_thandle_cookie *tmp; + list_del(&st->st_sub_list); + list_for_each_entry_safe(stc, tmp, &st->st_cookie_list, + stc_list) { + list_del(&stc->stc_list); + OBD_FREE_PTR(stc); + } OBD_FREE_PTR(st); } OBD_FREE_PTR(tmt); @@ -1083,23 +1223,27 @@ static int distribute_txn_cancel_records(const struct lu_env *env, struct llog_ctxt *ctxt; struct obd_device *obd; struct llog_cookie *cookie; + struct sub_thandle_cookie *stc; int rc; - cookie = &st->st_cookie; - if (fid_is_zero(&cookie->lgc_lgl.lgl_oi.oi_fid)) - continue; - obd = st->st_dt->dd_lu_dev.ld_obd; ctxt = llog_get_context(obd, LLOG_UPDATELOG_ORIG_CTXT); LASSERT(ctxt); + list_for_each_entry(stc, &st->st_cookie_list, stc_list) { + cookie = &stc->stc_cookie; + if (fid_is_zero(&cookie->lgc_lgl.lgl_oi.oi_fid)) + continue; - rc = llog_cat_cancel_records(env, ctxt->loc_handle, 1, - cookie); + rc = llog_cat_cancel_records(env, ctxt->loc_handle, 1, + cookie); + CDEBUG(D_HA, "%s: batchid %llu cancel update log " + DOSTID ".%u : rc = %d\n", obd->obd_name, + tmt->tmt_batchid, + POSTID(&cookie->lgc_lgl.lgl_oi), + cookie->lgc_index, rc); + } llog_ctxt_put(ctxt); - CDEBUG(D_HA, "%s: batchid %llu cancel update log "DOSTID - ".%u : rc = %d\n", obd->obd_name, tmt->tmt_batchid, - POSTID(&cookie->lgc_lgl.lgl_oi), cookie->lgc_index, rc); } RETURN(0); diff --git a/lustre/tests/replay-single.sh b/lustre/tests/replay-single.sh index df9c3e1..7d9eb05 100755 --- a/lustre/tests/replay-single.sh +++ b/lustre/tests/replay-single.sh @@ -3752,6 +3752,56 @@ test_115() { } run_test 115 "failover for create/unlink striped directory" +test_116a() { + [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return 0 + [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.7.55) ] && + skip "Do not support large update log before 2.7.55" && + return 0 + ([ $FAILURE_MODE == "HARD" ] && + [ "$(facet_host mds1)" == "$(facet_host mds2)" ]) && + skip "MDTs needs to be on diff hosts for HARD fail mode" && + return 0 + local fail_index=0 + + mkdir -p $DIR/$tdir + replay_barrier mds1 + + # OBD_FAIL_SPLIT_UPDATE_REC 0x1702 + do_facet mds1 "lctl set_param fail_loc=0x80001702" + $LFS setdirstripe -c$MDSCOUNT $DIR/$tdir/striped_dir + + fail mds1 + $CHECKSTAT -t dir $DIR/$tdir/striped_dir || + error "stried_dir does not exists" +} +run_test 116a "large update log master MDT recovery" + +test_116b() { + [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return 0 + [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.7.55) ] && + skip "Do not support large update log before 2.7.55" && + return 0 + + ([ $FAILURE_MODE == "HARD" ] && + [ "$(facet_host mds1)" == "$(facet_host mds2)" ]) && + skip "MDTs needs to be on diff hosts for HARD fail mode" && + return 0 + local fail_index=0 + + mkdir -p $DIR/$tdir + replay_barrier mds2 + + # OBD_FAIL_SPLIT_UPDATE_REC 0x1702 + do_facet mds2 "lctl set_param fail_loc=0x80001702" + $LFS setdirstripe -c$MDSCOUNT $DIR/$tdir/striped_dir + + fail mds2 + $CHECKSTAT -t dir $DIR/$tdir/striped_dir || + error "stried_dir does not exists" +} +run_test 116b "large update log slave MDT recovery" + + complete $SECONDS check_and_cleanup_lustre exit_status diff --git a/lustre/tests/sanity.sh b/lustre/tests/sanity.sh index 679a528..0a9e496 100644 --- a/lustre/tests/sanity.sh +++ b/lustre/tests/sanity.sh @@ -13399,6 +13399,57 @@ test_300i() { } run_test 300i "client handle unknown hash type striped directory" +test_300j() { + [ $PARALLEL == "yes" ] && skip "skip parallel run" && return + [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.7.55) ] && + skip "Need MDS version at least 2.7.55" && return + [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return + local stripe_count + local file + + mkdir $DIR/$tdir + + #define OBD_FAIL_SPLIT_UPDATE_REC 0x1702 + $LCTL set_param fail_loc=0x1702 + $LFS setdirstripe -i 0 -c$MDSCOUNT -t all_char $DIR/$tdir/striped_dir || + error "set striped dir error" + + createmany -o $DIR/$tdir/striped_dir/f- 10 || + error "create files under striped dir failed" + + $LCTL set_param fail_loc=0 + + rm -rf $DIR/$tdir || error "unlink striped dir fails" + + return 0 +} +run_test 300j "test large update record" + +test_300k() { + [ $PARALLEL == "yes" ] && skip "skip parallel run" && return + [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.7.55) ] && + skip "Need MDS version at least 2.7.55" && return + [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return + local stripe_count + local file + + mkdir $DIR/$tdir + + #define OBD_FAIL_LARGE_STRIPE 0x1703 + $LCTL set_param fail_loc=0x1703 + $LFS setdirstripe -i 0 -c512 $DIR/$tdir/striped_dir || + error "set striped dir error" + $LCTL set_param fail_loc=0 + + $LFS getdirstripe $DIR/$tdir/striped_dir || + error "getstripeddir fails" + rm -rf $DIR/$tdir/striped_dir || + error "unlink striped dir fails" + + return 0 +} +run_test 300k "test large striped directory" + prepare_remote_file() { mkdir $DIR/$tdir/src_dir || error "create remote source failed" diff --git a/lustre/utils/liblustreapi.c b/lustre/utils/liblustreapi.c index 8dde0ce..c2dcbda 100644 --- a/lustre/utils/liblustreapi.c +++ b/lustre/utils/liblustreapi.c @@ -1617,13 +1617,38 @@ static DIR *opendir_parent(char *path) static int cb_get_dirstripe(char *path, DIR *d, struct find_param *param) { + int ret; + +again: param->fp_lmv_md->lum_stripe_count = param->fp_lmv_stripe_count; if (param->fp_get_default_lmv) param->fp_lmv_md->lum_magic = LMV_USER_MAGIC; else param->fp_lmv_md->lum_magic = LMV_MAGIC_V1; - return ioctl(dirfd(d), LL_IOC_LMV_GETSTRIPE, param->fp_lmv_md); + ret = ioctl(dirfd(d), LL_IOC_LMV_GETSTRIPE, param->fp_lmv_md); + if (errno == E2BIG && ret != 0) { + int stripe_count; + int lmv_size; + + stripe_count = (__u32)param->fp_lmv_md->lum_stripe_count; + if (stripe_count <= param->fp_lmv_stripe_count) + return ret; + + free(param->fp_lmv_md); + param->fp_lmv_stripe_count = stripe_count; + lmv_size = lmv_user_md_size(stripe_count, LMV_MAGIC_V1); + param->fp_lmv_md = malloc(lmv_size); + if (param->fp_lmv_md == NULL) { + llapi_error(LLAPI_MSG_ERROR, -ENOMEM, + "error: allocation of %d bytes for ioctl", + lmv_user_md_size(param->fp_lmv_stripe_count, + LMV_MAGIC_V1)); + return -ENOMEM; + } + goto again; + } + return ret; } static int get_lmd_info(char *path, DIR *parent, DIR *dir, diff --git a/lustre/utils/wirecheck.c b/lustre/utils/wirecheck.c index 46cbd53..aa458fe 100644 --- a/lustre/utils/wirecheck.c +++ b/lustre/utils/wirecheck.c @@ -2167,6 +2167,52 @@ static void check_lfsck_reply(void) CHECK_MEMBER(lfsck_reply, lr_padding_2); } +static void check_update_params(void) +{ + BLANK_LINE(); + CHECK_STRUCT(update_params); + CHECK_MEMBER(update_params, up_params); +} + +static void check_update_op(void) +{ + BLANK_LINE(); + CHECK_STRUCT(update_op); + CHECK_MEMBER(update_op, uop_fid); + CHECK_MEMBER(update_op, uop_type); + CHECK_MEMBER(update_op, uop_param_count); + CHECK_MEMBER(update_op, uop_params_off); +} + +static void check_update_ops(void) +{ + BLANK_LINE(); + CHECK_STRUCT(update_ops); + CHECK_MEMBER(update_ops, uops_op); +} + +static void check_update_records(void) +{ + BLANK_LINE(); + CHECK_STRUCT(update_records); + CHECK_MEMBER(update_records, ur_master_transno); + CHECK_MEMBER(update_records, ur_batchid); + CHECK_MEMBER(update_records, ur_flags); + CHECK_MEMBER(update_records, ur_index); + CHECK_MEMBER(update_records, ur_update_count); + CHECK_MEMBER(update_records, ur_param_count); + + CHECK_VALUE_X(UPDATE_RECORD_CONTINUE); +} + +static void check_llog_update_record(void) +{ + BLANK_LINE(); + CHECK_STRUCT(llog_update_record); + CHECK_MEMBER(llog_update_record, lur_hdr); + CHECK_MEMBER(llog_update_record, lur_update_rec); +} + static void system_string(char *cmdline, char *str, int len) { int fds[2]; diff --git a/lustre/utils/wiretest.c b/lustre/utils/wiretest.c index 435dfc6..7497a91 100644 --- a/lustre/utils/wiretest.c +++ b/lustre/utils/wiretest.c @@ -3738,18 +3738,6 @@ void lustre_assert_wire_constants(void) (long long)(int)offsetof(struct llog_log_hdr, llh_tgtuuid)); LASSERTF((int)sizeof(((struct llog_log_hdr *)0)->llh_tgtuuid) == 40, "found %lld\n", (long long)(int)sizeof(((struct llog_log_hdr *)0)->llh_tgtuuid)); - LASSERTF((int)offsetof(struct llog_log_hdr, llh_reserved) == 84, "found %lld\n", - (long long)(int)offsetof(struct llog_log_hdr, llh_reserved)); - LASSERTF((int)sizeof(((struct llog_log_hdr *)0)->llh_reserved) == 4, "found %lld\n", - (long long)(int)sizeof(((struct llog_log_hdr *)0)->llh_reserved)); - LASSERTF((int)offsetof(struct llog_log_hdr, llh_bitmap) == 88, "found %lld\n", - (long long)(int)offsetof(struct llog_log_hdr, llh_bitmap)); - LASSERTF((int)sizeof(((struct llog_log_hdr *)0)->llh_bitmap) == 8096, "found %lld\n", - (long long)(int)sizeof(((struct llog_log_hdr *)0)->llh_bitmap)); - LASSERTF((int)offsetof(struct llog_log_hdr, llh_tail) == 8184, "found %lld\n", - (long long)(int)offsetof(struct llog_log_hdr, llh_tail)); - LASSERTF((int)sizeof(((struct llog_log_hdr *)0)->llh_tail) == 8, "found %lld\n", - (long long)(int)sizeof(((struct llog_log_hdr *)0)->llh_tail)); /* Checks for struct llog_cookie */ LASSERTF((int)sizeof(struct llog_cookie) == 32, "found %lld\n", @@ -4093,10 +4081,10 @@ void lustre_assert_wire_constants(void) (long long)(int)offsetof(struct fiemap_extent, fe_flags)); LASSERTF((int)sizeof(((struct fiemap_extent *)0)->fe_flags) == 4, "found %lld\n", (long long)(int)sizeof(((struct fiemap_extent *)0)->fe_flags)); - LASSERTF((int)offsetof(struct fiemap_extent, fe_device) == 44, "found %lld\n", - (long long)(int)offsetof(struct fiemap_extent, fe_device)); - LASSERTF((int)sizeof(((struct fiemap_extent *)0)->fe_device) == 4, "found %lld\n", - (long long)(int)sizeof(((struct fiemap_extent *)0)->fe_device)); + LASSERTF((int)offsetof(struct fiemap_extent, fe_reserved[0]) == 44, "found %lld\n", + (long long)(int)offsetof(struct fiemap_extent, fe_reserved[0])); + LASSERTF((int)sizeof(((struct fiemap_extent *)0)->fe_reserved[0]) == 4, "found %lld\n", + (long long)(int)sizeof(((struct fiemap_extent *)0)->fe_reserved[0])); CLASSERT(FIEMAP_EXTENT_LAST == 0x00000001); CLASSERT(FIEMAP_EXTENT_UNKNOWN == 0x00000002); CLASSERT(FIEMAP_EXTENT_DELALLOC == 0x00000004); -- 1.8.3.1