* by this structure */
struct distribute_txn_replay_req_sub {
__u32 dtrqs_mdt_index;
- struct llog_cookie dtrqs_llog_cookie;
+
+ /* All of cookies for the update will be linked here */
+ spinlock_t dtrqs_cookie_list_lock;
+ struct list_head dtrqs_cookie_list;
struct list_head dtrqs_list;
};
};
void lustre_swab_orphan_ent(struct lu_orphan_ent *ent);
-struct update_ops;
-void lustre_swab_update_ops(struct update_ops *uops, unsigned int op_count);
-
/** @} lu_fid */
/** \defgroup lu_dir lu_dir
OUT_XATTR_DEL = 13,
OUT_PUNCH = 14,
OUT_READ = 15,
+ OUT_NOOP = 16,
OUT_LAST
};
void lustre_swab_close_data(struct close_data *data);
+struct update_ops;
+void lustre_swab_update_ops(struct update_ops *uops, unsigned int op_count);
+
+/* Update llog format */
+struct update_op {
+ struct lu_fid uop_fid;
+ __u16 uop_type;
+ __u16 uop_param_count;
+ __u16 uop_params_off[0];
+};
+
+struct update_ops {
+ struct update_op uops_op[0];
+};
+
+struct update_params {
+ struct object_update_param up_params[0];
+};
+
+enum update_records_flag {
+ UPDATE_RECORD_CONTINUE = 1 >> 0,
+};
+/*
+ * This is the update record format used to store the updates in
+ * disk. All updates of the operation will be stored in ur_ops.
+ * All of parameters for updates of the operation will be stored
+ * in ur_params.
+ * To save the space of the record, parameters in ur_ops will only
+ * remember their offset in ur_params, so to avoid storing duplicate
+ * parameters in ur_params, which can help us save a lot space for
+ * operation like creating striped directory.
+ */
+struct update_records {
+ __u64 ur_master_transno;
+ __u64 ur_batchid;
+ __u32 ur_flags;
+ /* If the operation includes multiple updates, then ur_index
+ * means the index of the update inside the whole updates. */
+ __u32 ur_index;
+ __u32 ur_update_count;
+ __u32 ur_param_count;
+ struct update_ops ur_ops;
+ /* Note ur_ops has a variable size, so comment out
+ * the following ur_params, in case some use it directly
+ * update_records->ur_params
+ *
+ * struct update_params ur_params;
+ */
+};
+
+struct llog_update_record {
+ struct llog_rec_hdr lur_hdr;
+ struct update_records lur_update_rec;
+ /* Note ur_update_rec has a variable size, so comment out
+ * the following ur_tail, in case someone use it directly
+ *
+ * struct llog_rec_tail lur_tail;
+ */
+};
+
+
#endif
/** @} lustreidl */
__u32 ei_type; /** Type of the lock being enqueued. */
__u32 ei_mode; /** Mode of the lock being enqueued. */
void *ei_cb_bl; /** blocking lock callback */
+ void *ei_cb_local_bl; /** blocking local lock callback */
void *ei_cb_cp; /** lock completion callback */
void *ei_cb_gl; /** lock glimpse callback */
void *ei_cbdata; /** Data to be passed into callbacks. */
+ void *ei_namespace; /** lock namespace **/
unsigned int ei_enq_slave:1; /* whether enqueue slave stripes */
};
struct dt_key;
struct dt_rec;
struct object_update_param;
-
-struct update_params {
- struct object_update_param up_params[0];
-};
+struct llog_update_record;
static inline size_t update_params_size(const struct update_params *params,
unsigned int param_count)
return param->oup_buf;
}
-struct update_op {
- struct lu_fid uop_fid;
- __u16 uop_type;
- __u16 uop_param_count;
- __u16 uop_params_off[0];
-};
-
static inline size_t
update_op_size(unsigned int param_count)
{
update_op_size(uop->uop_param_count));
}
-/* All of updates in the mulitple_update_record */
-struct update_ops {
- struct update_op uops_op[0];
-};
-
static inline size_t update_ops_size(const struct update_ops *ops,
unsigned int update_count)
{
return total_size;
}
-/*
- * This is the update record format used to store the updates in
- * disk. All updates of the operation will be stored in ur_ops.
- * All of parameters for updates of the operation will be stored
- * in ur_params.
- * To save the space of the record, parameters in ur_ops will only
- * remember their offset in ur_params, so to avoid storing duplicate
- * parameters in ur_params, which can help us save a lot space for
- * operation like creating striped directory.
- */
-struct update_records {
- __u64 ur_master_transno;
- __u64 ur_batchid;
- __u32 ur_flags;
- __u32 ur_param_count;
- __u32 ur_update_count;
- struct update_ops ur_ops;
- /* Note ur_ops has a variable size, so comment out
- * the following ur_params, in case some use it directly
- * update_records->ur_params
- *
- * struct update_params ur_params;
- */
-};
-
-struct llog_update_record {
- struct llog_rec_hdr lur_hdr;
- struct update_records lur_update_rec;
- /* Note ur_update_rec has a variable size, so comment out
- * the following ur_tail, in case someone use it directly
- *
- * struct llog_rec_tail lur_tail;
- */
-};
-
static inline struct update_params *
update_records_get_params(const struct update_records *record)
{
static inline size_t
update_records_size(const struct update_records *record)
{
- struct update_params *params;
+ size_t op_size = 0;
+ size_t param_size = 0;
- params = update_records_get_params(record);
+ if (record->ur_update_count > 0)
+ op_size = update_ops_size(&record->ur_ops,
+ record->ur_update_count);
+ if (record->ur_param_count > 0) {
+ struct update_params *params;
+
+ params = update_records_get_params(record);
+ param_size = update_params_size(params, record->ur_param_count);
+ }
return cfs_size_round(offsetof(struct update_records, ur_ops) +
- update_ops_size(&record->ur_ops, record->ur_update_count) +
- update_params_size(params, record->ur_param_count));
+ op_size + param_size);
}
static inline size_t
__u64 tmt_batchid;
int tmt_result;
__u32 tmt_magic;
+ size_t tmt_record_size;
__u32 tmt_committed:1;
};
struct top_multiple_thandle *tt_multiple_thandle;
};
+struct sub_thandle_cookie {
+ struct llog_cookie stc_cookie;
+ struct list_head stc_list;
+};
+
/* Sub thandle is used to track multiple sub thandles under one parent
* thandle */
struct sub_thandle {
struct thandle *st_sub_th;
struct dt_device *st_dt;
- struct llog_cookie st_cookie;
+ struct list_head st_cookie_list;
struct dt_txn_commit_cb st_commit_dcb;
struct dt_txn_commit_cb st_stop_dcb;
int st_result;
struct sub_thandle *st);
/* update_records.c */
+size_t update_records_create_size(const struct lu_env *env,
+ const struct lu_fid *fid,
+ const struct lu_attr *attr,
+ const struct dt_allocation_hint *hint,
+ struct dt_object_format *dof);
+size_t update_records_attr_set_size(const struct lu_env *env,
+ const struct lu_fid *fid,
+ const struct lu_attr *attr);
+size_t update_records_ref_add_size(const struct lu_env *env,
+ const struct lu_fid *fid);
+size_t update_records_ref_del_size(const struct lu_env *env,
+ const struct lu_fid *fid);
+size_t update_records_object_destroy_size(const struct lu_env *env,
+ const struct lu_fid *fid);
+size_t update_records_index_insert_size(const struct lu_env *env,
+ const struct lu_fid *fid,
+ const struct dt_rec *rec,
+ const struct dt_key *key);
+size_t update_records_index_delete_size(const struct lu_env *env,
+ const struct lu_fid *fid,
+ const struct dt_key *key);
+size_t update_records_xattr_set_size(const struct lu_env *env,
+ const struct lu_fid *fid,
+ const struct lu_buf *buf,
+ const char *name,
+ __u32 flag);
+size_t update_records_xattr_del_size(const struct lu_env *env,
+ const struct lu_fid *fid,
+ const char *name);
+size_t update_records_write_size(const struct lu_env *env,
+ const struct lu_fid *fid,
+ const struct lu_buf *buf,
+ __u64 pos);
+size_t update_records_punch_size(const struct lu_env *env,
+ const struct lu_fid *fid,
+ __u64 start, __u64 end);
+
int update_records_create_pack(const struct lu_env *env,
struct update_ops *ops,
unsigned int *op_count,
size_t *max_param_size,
const struct lu_fid *fid,
__u64 start, __u64 end);
+int update_records_noop_pack(const struct lu_env *env,
+ struct update_ops *ops,
+ unsigned int *op_count,
+ size_t *max_ops_size,
+ struct update_params *params,
+ unsigned int *param_count,
+ size_t *max_param_size,
+ const struct lu_fid *fid);
int tur_update_records_extend(struct thandle_update_records *tur,
size_t new_size);
} \
ret; \
})
+
+#define update_record_size(env, name, th, ...) \
+({ \
+ struct top_thandle *top_th; \
+ struct top_multiple_thandle *tmt; \
+ \
+ top_th = container_of(th, struct top_thandle, tt_super); \
+ \
+ LASSERT(top_th->tt_multiple_thandle != NULL); \
+ tmt = top_th->tt_multiple_thandle; \
+ tmt->tmt_record_size += \
+ update_records_##name##_size(env, __VA_ARGS__); \
+})
#endif
/* UPDATE */
#define OBD_FAIL_OUT_UPDATE_NET 0x1700
#define OBD_FAIL_OUT_UPDATE_NET_REP 0x1701
+#define OBD_FAIL_SPLIT_UPDATE_REC 0x1702
+#define OBD_FAIL_LARGE_STRIPE 0x1703
/* MIGRATE */
#define OBD_FAIL_MIGRATE_NET_REP 0x1800
int mdt_index;
int lum_size;
int stripe_count;
+ int max_stripe_count;
int i;
int rc;
if (copy_from_user(&lum, ulmv, sizeof(*ulmv)))
RETURN(-EFAULT);
+ max_stripe_count = lum.lum_stripe_count;
/* lum_magic will indicate which stripe the ioctl will like
* to get, LMV_MAGIC_V1 is for normal LMV stripe, LMV_USER_MAGIC
* is for default LMV stripe */
}
stripe_count = lmv_mds_md_stripe_count_get(lmm);
+ if (max_stripe_count < stripe_count) {
+ lum.lum_stripe_count = stripe_count;
+ if (copy_to_user(ulmv, &lum, sizeof(lum)))
+ GOTO(finish_req, rc = -EFAULT);
+ GOTO(finish_req, rc = -E2BIG);
+ }
+
lum_size = lmv_user_md_size(stripe_count, LMV_MAGIC_V1);
OBD_ALLOC(tmp, lum_size);
if (tmp == NULL)
}
/**
+ * add noop update to the update records
+ *
+ * Add noop updates to the update records, which is only used in
+ * test right now.
+ *
+ * \param[in] env execution environment
+ * \param[in] dt dt device of lod
+ * \param[in] th thandle
+ * \param[in] count the count of update records to be added.
+ *
+ * \retval 0 if adding succeeds.
+ * \retval negative errno if adding fails.
+ */
+static int lod_add_noop_records(const struct lu_env *env,
+ struct dt_device *dt, struct thandle *th,
+ int count)
+{
+ struct top_thandle *top_th;
+ struct lu_fid *fid = &lod_env_info(env)->lti_fid;
+ int i;
+ int rc = 0;
+
+ top_th = container_of(th, struct top_thandle, tt_super);
+ if (top_th->tt_multiple_thandle == NULL)
+ return 0;
+
+ fid_zero(fid);
+ for (i = 0; i < count; i++) {
+ rc = update_record_pack(noop, th, fid);
+ if (rc < 0)
+ return rc;
+ }
+ return rc;
+}
+
+/**
* Implementation of dt_device_operations::dt_trans_stop() for LOD
*
* Stops the set of local transactions using the targets involved
static int lod_trans_stop(const struct lu_env *env, struct dt_device *dt,
struct thandle *th)
{
+ if (OBD_FAIL_CHECK(OBD_FAIL_SPLIT_UPDATE_REC)) {
+ int rc;
+
+ rc = lod_add_noop_records(env, dt, th, 5000);
+ if (rc < 0)
+ RETURN(rc);
+ }
return top_trans_stop(env, dt2lod_dev(dt)->lod_child, th);
}
struct dt_object **stripe;
__u32 stripe_count;
int *idx_array;
+ __u32 master_index;
int rc = 0;
__u32 i;
__u32 j;
stripe_count = le32_to_cpu(lum->lum_stripe_count);
/* shrink the stripe_count to the avaible MDT count */
- if (stripe_count > lod->lod_remote_mdt_count + 1)
+ if (stripe_count > lod->lod_remote_mdt_count + 1 &&
+ !OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE))
stripe_count = lod->lod_remote_mdt_count + 1;
OBD_ALLOC(stripe, sizeof(stripe[0]) * stripe_count);
if (idx_array == NULL)
GOTO(out_free, rc = -ENOMEM);
+ /* Start index will be the master MDT */
+ master_index = lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_node_id;
+ idx_array[0] = master_index;
for (i = 0; i < stripe_count; i++) {
struct lod_tgt_desc *tgt = NULL;
struct dt_object *dto;
struct lu_object_conf conf = { 0 };
struct dt_device *tgt_dt = NULL;
- if (i == 0) {
- /* Right now, master stripe and master object are
- * on the same MDT */
- idx = lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_node_id;
- rc = obd_fid_alloc(env, lod->lod_child_exp, &fid,
- NULL);
- if (rc < 0)
- GOTO(out_put, rc);
- tgt_dt = lod->lod_child;
- goto next;
- }
-
- idx = (idx_array[i - 1] + 1) % (lod->lod_remote_mdt_count + 1);
-
+ /* Try to find next avaible target */
+ idx = idx_array[i];
for (j = 0; j < lod->lod_remote_mdt_count;
j++, idx = (idx + 1) % (lod->lod_remote_mdt_count + 1)) {
bool already_allocated = false;
__u32 k;
- CDEBUG(D_INFO, "try idx %d, mdt cnt %u,"
- " allocated %u, last allocated %d\n", idx,
- lod->lod_remote_mdt_count, i, idx_array[i - 1]);
+ CDEBUG(D_INFO, "try idx %d, mdt cnt %u, allocated %u\n",
+ idx, lod->lod_remote_mdt_count + 1, i);
+ if (idx == master_index) {
+ /* Allocate the FID locally */
+ rc = obd_fid_alloc(env, lod->lod_child_exp,
+ &fid, NULL);
+ if (rc < 0)
+ GOTO(out_put, rc);
+ tgt_dt = lod->lod_child;
+ break;
+ }
/* Find next available target */
if (!cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx))
continue;
- /* check whether the idx already exists
- * in current allocated array */
- for (k = 0; k < i; k++) {
- if (idx_array[k] == idx) {
- already_allocated = true;
- break;
+ if (likely(!OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE))) {
+ /* check whether the idx already exists
+ * in current allocated array */
+ for (k = 0; k < i; k++) {
+ if (idx_array[k] == idx) {
+ already_allocated = true;
+ break;
+ }
}
- }
- if (already_allocated)
- continue;
+ if (already_allocated)
+ continue;
+ }
/* check the status of the OSP */
tgt = LTD_TGT(ltd, idx);
break;
}
- CDEBUG(D_INFO, "idx %d, mdt cnt %u,"
- " allocated %u, last allocated %d\n", idx,
- lod->lod_remote_mdt_count, i, idx_array[i - 1]);
-
-next:
+ CDEBUG(D_INFO, "Get idx %d, for stripe %d "DFID"\n",
+ idx, i, PFID(&fid));
+ idx_array[i] = idx;
+ /* Set the start index for next stripe allocation */
+ if (i < stripe_count)
+ idx_array[i + 1] = (idx + 1) %
+ (lod->lod_remote_mdt_count + 1);
/* tgt_dt and fid must be ready after search avaible OSP
* in the above loop */
LASSERT(tgt_dt != NULL);
if (IS_ERR(dto))
GOTO(out_put, rc = PTR_ERR(dto));
stripe[i] = dto;
- idx_array[i] = idx;
}
lo->ldo_dir_striped = 1;
res_id);
einfo->ei_res_id = res_id;
- LASSERT(lo->ldo_stripe[i]);
- rc = dt_object_lock(env, lo->ldo_stripe[i], &lockh, einfo,
- policy);
+ LASSERT(lo->ldo_stripe[i] != NULL);
+ if (likely(dt_object_remote(lo->ldo_stripe[i]))) {
+ rc = dt_object_lock(env, lo->ldo_stripe[i], &lockh,
+ einfo, policy);
+ } else {
+ struct ldlm_namespace *ns = einfo->ei_namespace;
+ ldlm_blocking_callback blocking = einfo->ei_cb_local_bl;
+ ldlm_completion_callback completion = einfo->ei_cb_cp;
+ __u64 dlmflags = LDLM_FL_ATOMIC_CB;
+
+ /* This only happens if there are mulitple stripes
+ * on the master MDT, i.e. except stripe0, there are
+ * other stripes on the Master MDT as well, Only
+ * happens in the test case right now. */
+ LASSERT(ns != NULL);
+ rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS,
+ policy, einfo->ei_mode,
+ &dlmflags, blocking,
+ completion, NULL,
+ NULL, 0, LVB_T_NONE,
+ NULL, &lockh);
+ }
if (rc != 0)
GOTO(out, rc);
slave_locks->lsl_handle[i] = lockh;
if (type == LU_SEQ_RANGE_OST)
RETURN(tth->tt_master_sub_thandle);
+ sub_th = thandle_get_sub(env, th, sub_obj);
+ if (IS_ERR(sub_th))
+ RETURN(sub_th);
+
if (tth->tt_multiple_thandle != NULL && record_update != NULL &&
th->th_result == 0)
*record_update = true;
- sub_th = thandle_get_sub(env, th, sub_obj);
-
RETURN(sub_th);
}
struct thandle *th)
{
struct thandle *sub_th;
+ bool record_update;
- sub_th = lod_sub_get_thandle(env, th, dt, NULL);
+ sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
if (IS_ERR(sub_th))
return PTR_ERR(sub_th);
+ if (record_update)
+ update_record_size(env, create, th, lu_object_fid(&dt->do_lu),
+ attr, hint, dof);
+
return dt_declare_create(env, dt, attr, hint, dof, sub_th);
}
struct thandle *th)
{
struct thandle *sub_th;
+ bool record_update;
int rc;
ENTRY;
- sub_th = lod_sub_get_thandle(env, th, dt, NULL);
+ sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
if (IS_ERR(sub_th))
RETURN(PTR_ERR(sub_th));
+ if (record_update)
+ update_record_size(env, ref_add, th, lu_object_fid(&dt->do_lu));
+
rc = dt_declare_ref_add(env, dt, sub_th);
RETURN(rc);
struct thandle *th)
{
struct thandle *sub_th;
+ bool record_update;
int rc;
ENTRY;
- sub_th = lod_sub_get_thandle(env, th, dt, NULL);
+ sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
if (IS_ERR(sub_th))
RETURN(PTR_ERR(sub_th));
+ if (record_update)
+ update_record_size(env, ref_del, th, lu_object_fid(&dt->do_lu));
+
rc = dt_declare_ref_del(env, dt, sub_th);
RETURN(rc);
struct thandle *th)
{
struct thandle *sub_th;
+ bool record_update;
int rc;
ENTRY;
- sub_th = lod_sub_get_thandle(env, th, dt, NULL);
+ sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
if (IS_ERR(sub_th))
RETURN(PTR_ERR(sub_th));
+ if (record_update)
+ update_record_size(env, object_destroy, th,
+ lu_object_fid(&dt->do_lu));
+
rc = dt_declare_destroy(env, dt, sub_th);
RETURN(rc);
struct thandle *th)
{
struct thandle *sub_th;
+ bool record_update;
- sub_th = lod_sub_get_thandle(env, th, dt, NULL);
+ sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
if (IS_ERR(sub_th))
return PTR_ERR(sub_th);
+ if (record_update)
+ update_record_size(env, index_insert, th,
+ lu_object_fid(&dt->do_lu), rec, key);
+
return dt_declare_insert(env, dt, rec, key, sub_th);
}
struct thandle *th)
{
struct thandle *sub_th;
+ bool record_update;
- sub_th = lod_sub_get_thandle(env, th, dt, NULL);
+ sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
if (IS_ERR(sub_th))
return PTR_ERR(sub_th);
+ if (record_update)
+ update_record_size(env, index_delete, th,
+ lu_object_fid(&dt->do_lu), key);
+
return dt_declare_delete(env, dt, key, sub_th);
}
struct thandle *th)
{
struct thandle *sub_th;
+ bool record_update;
int rc;
ENTRY;
- sub_th = lod_sub_get_thandle(env, th, dt, NULL);
+ sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
if (IS_ERR(sub_th))
RETURN(PTR_ERR(sub_th));
+ if (record_update)
+ update_record_size(env, xattr_set, th,
+ lu_object_fid(&dt->do_lu),
+ buf, name, fl);
+
rc = dt_declare_xattr_set(env, dt, buf, name, fl, sub_th);
RETURN(rc);
struct thandle *th)
{
struct thandle *sub_th;
+ bool record_update;
int rc;
ENTRY;
- sub_th = lod_sub_get_thandle(env, th, dt, NULL);
+ sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
if (IS_ERR(sub_th))
RETURN(PTR_ERR(sub_th));
+ if (record_update)
+ update_record_size(env, attr_set, th,
+ lu_object_fid(&dt->do_lu), attr);
+
rc = dt_declare_attr_set(env, dt, attr, sub_th);
RETURN(rc);
struct thandle *th)
{
struct thandle *sub_th;
+ bool record_update;
int rc;
ENTRY;
- sub_th = lod_sub_get_thandle(env, th, dt, NULL);
+ sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
if (IS_ERR(sub_th))
RETURN(PTR_ERR(sub_th));
+ if (record_update)
+ update_record_size(env, xattr_del, th,
+ lu_object_fid(&dt->do_lu),
+ name);
+
rc = dt_declare_xattr_del(env, dt, name, sub_th);
RETURN(rc);
struct thandle *th)
{
struct thandle *sub_th;
+ bool record_update;
int rc;
ENTRY;
- sub_th = lod_sub_get_thandle(env, th, dt, NULL);
+ sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
if (IS_ERR(sub_th))
RETURN(PTR_ERR(sub_th));
+ if (record_update)
+ update_record_size(env, write, th,
+ lu_object_fid(&dt->do_lu),
+ buf, pos);
+
rc = dt_declare_write(env, dt, buf, pos, sub_th);
RETURN(rc);
struct thandle *th)
{
struct thandle *sub_th;
+ bool record_update;
int rc;
ENTRY;
- sub_th = lod_sub_get_thandle(env, th, dt, NULL);
+ sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
if (IS_ERR(sub_th))
RETURN(PTR_ERR(sub_th));
+ if (record_update)
+ update_record_size(env, punch, th,
+ lu_object_fid(&dt->do_lu),
+ start, end);
+
rc = dt_declare_punch(env, dt, start, end, sub_th);
RETURN(rc);
einfo->ei_type = LDLM_IBITS;
einfo->ei_mode = mode;
einfo->ei_cb_bl = mdt_remote_blocking_ast;
+ einfo->ei_cb_local_bl = mdt_blocking_ast;
einfo->ei_cb_cp = ldlm_completion_ast;
einfo->ei_enq_slave = 1;
+ einfo->ei_namespace = mti->mti_mdt->mdt_namespace;
memset(policy, 0, sizeof(*policy));
policy->l_inodebits.bits = ibits;
if (rc < 0)
GOTO(out, rc);
- CDEBUG(D_OTHER, "added record "DOSTID": idx: %u, %u\n",
- POSTID(&loghandle->lgh_id.lgl_oi), index, rec->lrh_len);
+ CDEBUG(D_OTHER, "added record "DOSTID": idx: %u, %u off"LPU64"\n",
+ POSTID(&loghandle->lgh_id.lgl_oi), index, rec->lrh_len,
+ lgi->lgi_off);
if (reccookie != NULL) {
reccookie->lgc_lgl = loghandle->lgh_id;
reccookie->lgc_index = index;
if (ours == NULL)
return -ENOMEM;
+ if (size < OUT_UPDATE_INIT_BUFFER_SIZE)
+ size = OUT_UPDATE_INIT_BUFFER_SIZE;
+
ours->ours_req = object_update_request_alloc(size);
if (IS_ERR(ours->ours_req)) {
(long long)(int)offsetof(struct llog_log_hdr, llh_tgtuuid));
LASSERTF((int)sizeof(((struct llog_log_hdr *)0)->llh_tgtuuid) == 40, "found %lld\n",
(long long)(int)sizeof(((struct llog_log_hdr *)0)->llh_tgtuuid));
- LASSERTF((int)offsetof(struct llog_log_hdr, llh_reserved) == 84, "found %lld\n",
- (long long)(int)offsetof(struct llog_log_hdr, llh_reserved));
- LASSERTF((int)sizeof(((struct llog_log_hdr *)0)->llh_reserved) == 4, "found %lld\n",
- (long long)(int)sizeof(((struct llog_log_hdr *)0)->llh_reserved));
- LASSERTF((int)offsetof(struct llog_log_hdr, llh_bitmap) == 88, "found %lld\n",
- (long long)(int)offsetof(struct llog_log_hdr, llh_bitmap));
- LASSERTF((int)sizeof(((struct llog_log_hdr *)0)->llh_bitmap) == 8096, "found %lld\n",
- (long long)(int)sizeof(((struct llog_log_hdr *)0)->llh_bitmap));
- LASSERTF((int)offsetof(struct llog_log_hdr, llh_tail) == 8184, "found %lld\n",
- (long long)(int)offsetof(struct llog_log_hdr, llh_tail));
- LASSERTF((int)sizeof(((struct llog_log_hdr *)0)->llh_tail) == 8, "found %lld\n",
- (long long)(int)sizeof(((struct llog_log_hdr *)0)->llh_tail));
/* Checks for struct llog_cookie */
LASSERTF((int)sizeof(struct llog_cookie) == 32, "found %lld\n",
(long long)(int)offsetof(struct fiemap_extent, fe_flags));
LASSERTF((int)sizeof(((struct fiemap_extent *)0)->fe_flags) == 4, "found %lld\n",
(long long)(int)sizeof(((struct fiemap_extent *)0)->fe_flags));
- LASSERTF((int)offsetof(struct fiemap_extent, fe_device) == 44, "found %lld\n",
- (long long)(int)offsetof(struct fiemap_extent, fe_device));
- LASSERTF((int)sizeof(((struct fiemap_extent *)0)->fe_device) == 4, "found %lld\n",
- (long long)(int)sizeof(((struct fiemap_extent *)0)->fe_device));
+ LASSERTF((int)offsetof(struct fiemap_extent, fe_reserved[0]) == 44, "found %lld\n",
+ (long long)(int)offsetof(struct fiemap_extent, fe_reserved[0]));
+ LASSERTF((int)sizeof(((struct fiemap_extent *)0)->fe_reserved[0]) == 4, "found %lld\n",
+ (long long)(int)sizeof(((struct fiemap_extent *)0)->fe_reserved[0]));
CLASSERT(FIEMAP_EXTENT_LAST == 0x00000001);
CLASSERT(FIEMAP_EXTENT_UNKNOWN == 0x00000002);
CLASSERT(FIEMAP_EXTENT_DELALLOC == 0x00000004);
[OUT_XATTR_DEL] = "xattr_del",
[OUT_PUNCH] = "punch",
[OUT_READ] = "read",
+ [OUT_NOOP] = "noop",
};
if (opc < ARRAY_SIZE(opc_str) && opc_str[opc] != NULL)
{
const struct update_ops *ops;
const struct update_op *op = NULL;
- struct update_params *params;
+ struct update_params *params = NULL;
unsigned int i;
- ops = &records->ur_ops;
- params = update_records_get_params(records);
-
CDEBUG(mask, "master transno = "LPU64" batchid = "LPU64" flags = %x"
" ops = %d params = %d\n", records->ur_master_transno,
records->ur_batchid, records->ur_flags, records->ur_update_count,
if (!dump_updates)
return;
+ ops = &records->ur_ops;
+ if (records->ur_param_count > 0)
+ params = update_records_get_params(records);
+
op = &ops->uops_op[0];
- for (i = 0; i < records->ur_update_count; i++) {
+ for (i = 0; i < records->ur_update_count; i++,
+ op = update_op_next_op(op)) {
unsigned int j;
CDEBUG(mask, "update %dth "DFID" %s params_count = %hu\n", i,
PFID(&op->uop_fid), update_op_str(op->uop_type),
op->uop_param_count);
+ if (params == NULL)
+ continue;
+
for (j = 0; j < op->uop_param_count; j++) {
struct object_update_param *param;
param = update_params_get_param(params,
- (unsigned int)op->uop_params_off[j],
+ (unsigned int)op->uop_params_off[j],
records->ur_param_count);
- LASSERT(param != NULL);
+ if (param == NULL)
+ continue;
CDEBUG(mask, "param = %p %dth off = %hu size = %hu\n",
param, j, op->uop_params_off[j], param->oup_len);
}
-
- op = update_op_next_op(op);
}
}
}
/**
+ * Calculate update_records size
+ *
+ * Calculate update_records size by param_count and param_sizes array.
+ *
+ * \param[in] param_count the count of parameters
+ * \param[in] sizes the size array of these parameters
+ *
+ * \retval the size of this update
+ */
+static size_t update_records_update_size(__u32 param_count, size_t *sizes)
+{
+ int i;
+ size_t size;
+
+ /* Check whether the packing exceeding the maximum update size */
+ size = update_op_size(param_count);
+
+ for (i = 0; i < param_count; i++)
+ size += cfs_size_round(sizeof(struct object_update_param) +
+ sizes[i]);
+
+ return size;
+}
+
+/**
+ * Calculate create update size
+ *
+ * \param[in] env execution environment
+ * \param[in] ops ur_ops in update records
+ * \param[in] fid FID of the object to be created
+ * \param[in] attr attribute of the object to be created
+ * \param[in] hint creation hint
+ * \param[in] dof creation format information
+ *
+ * \retval size of create update.
+ */
+size_t update_records_create_size(const struct lu_env *env,
+ const struct lu_fid *fid,
+ const struct lu_attr *attr,
+ const struct dt_allocation_hint *hint,
+ struct dt_object_format *dof)
+{
+ size_t sizes[2];
+ int param_count = 0;
+
+ if (attr != NULL) {
+ sizes[param_count] = sizeof(struct obdo);
+ param_count++;
+ }
+
+ if (hint != NULL && hint->dah_parent != NULL) {
+ sizes[param_count] = sizeof(*fid);
+ param_count++;
+ }
+
+ return update_records_update_size(param_count, sizes);
+}
+EXPORT_SYMBOL(update_records_create_size);
+
+/**
* Pack create update
*
* Pack create update into update records.
EXPORT_SYMBOL(update_records_create_pack);
/**
+ * Calculate attr set update size
+ *
+ * \param[in] env execution environment
+ * \param[in] ops ur_ops in update records
+ * \param[in] fid FID of the object to set attr
+ * \param[in] attr attribute of attr set
+ *
+ * \retval size of attr set update.
+ */
+size_t update_records_attr_set_size(const struct lu_env *env,
+ const struct lu_fid *fid,
+ const struct lu_attr *attr)
+{
+ size_t size = sizeof(struct obdo);
+
+ return update_records_update_size(1, &size);
+}
+EXPORT_SYMBOL(update_records_attr_set_size);
+
+/**
* Pack attr set update
*
* Pack attr_set update into update records.
EXPORT_SYMBOL(update_records_attr_set_pack);
/**
+ * Calculate ref add update size
+ *
+ * \param[in] env execution environment
+ * \param[in] fid FID of the object to add reference
+ *
+ * \retval size of ref_add udpate.
+ */
+size_t update_records_ref_add_size(const struct lu_env *env,
+ const struct lu_fid *fid)
+{
+ return update_records_update_size(0, NULL);
+}
+EXPORT_SYMBOL(update_records_ref_add_size);
+
+/**
* Pack ref add update
*
* Pack ref add update into update records.
EXPORT_SYMBOL(update_records_ref_add_pack);
/**
+ * Pack noop update
+ *
+ * Pack no op update into update records. Note: no op means
+ * the update does not need do anything, which is only used
+ * in test case to verify large size record.
+ *
+ * \param[in] env execution environment
+ * \param[in] ops ur_ops in update records
+ * \param[in|out] op_count pointer to the count of ops
+ * \param[in|out] max_op_size maximum size of the update
+ * \param[in] params ur_params in update records
+ * \param[in|out] param_count pointer to the count of params
+ * \param[in|out] max_param_size maximum size of the parameter
+ * \param[in] fid FID of the object to add reference
+ *
+ * \retval 0 if packing succeeds.
+ * \retval negative errno if packing fails.
+ */
+int update_records_noop_pack(const struct lu_env *env,
+ struct update_ops *ops,
+ unsigned int *op_count,
+ size_t *max_ops_size,
+ struct update_params *params,
+ unsigned int *param_count,
+ size_t *max_param_size,
+ const struct lu_fid *fid)
+{
+ return update_records_update_pack(env, fid, OUT_NOOP, ops, op_count,
+ max_ops_size, params, param_count,
+ max_param_size, 0, NULL, NULL);
+}
+EXPORT_SYMBOL(update_records_noop_pack);
+
+/**
+ * Calculate ref del update size
+ *
+ * \param[in] env execution environment
+ * \param[in] fid FID of the object to delete reference
+ *
+ * \retval size of ref_del update.
+ */
+size_t update_records_ref_del_size(const struct lu_env *env,
+ const struct lu_fid *fid)
+{
+ return update_records_update_size(0, NULL);
+}
+EXPORT_SYMBOL(update_records_ref_del_size);
+
+/**
* Pack ref del update
*
* Pack ref del update into update records.
EXPORT_SYMBOL(update_records_ref_del_pack);
/**
+ * Calculate object destroy update size
+ *
+ * \param[in] env execution environment
+ * \param[in] fid FID of the object to delete reference
+ *
+ * \retval size of object destroy update.
+ */
+size_t update_records_object_destroy_size(const struct lu_env *env,
+ const struct lu_fid *fid)
+{
+ return update_records_update_size(0, NULL);
+}
+EXPORT_SYMBOL(update_records_object_destroy_size);
+
+/**
* Pack object destroy update
*
* Pack object destroy update into update records.
EXPORT_SYMBOL(update_records_object_destroy_pack);
/**
+ * Calculate index insert update size
+ *
+ * \param[in] env execution environment
+ * \param[in] fid FID of the object to insert index
+ * \param[in] rec record of insertion
+ * \param[in] key key of insertion
+ *
+ * \retval the size of index insert update.
+ */
+size_t update_records_index_insert_size(const struct lu_env *env,
+ const struct lu_fid *fid,
+ const struct dt_rec *rec,
+ const struct dt_key *key)
+{
+ size_t sizes[3] = { strlen((const char *)key) + 1,
+ sizeof(struct lu_fid),
+ sizeof(__u32) };
+ return update_records_update_size(3, sizes);
+}
+EXPORT_SYMBOL(update_records_index_insert_size);
+
+/**
* Pack index insert update
*
* Pack index insert update into update records.
EXPORT_SYMBOL(update_records_index_insert_pack);
/**
+ * Calculate index delete update size
+ *
+ * \param[in] env execution environment
+ * \param[in] fid FID of the object to delete index
+ * \param[in] key key of deletion
+ *
+ * \retval the size of index delete update
+ */
+size_t update_records_index_delete_size(const struct lu_env *env,
+ const struct lu_fid *fid,
+ const struct dt_key *key)
+{
+ size_t size = strlen((const char *)key) + 1;
+
+ return update_records_update_size(1, &size);
+}
+EXPORT_SYMBOL(update_records_index_delete_size);
+
+/**
* Pack index delete update
*
* Pack index delete update into update records.
EXPORT_SYMBOL(update_records_index_delete_pack);
/**
+ * Calculate xattr set size
+ *
+ * \param[in] env execution environment
+ * \param[in] fid FID of the object to set xattr
+ * \param[in] buf xattr to be set
+ * \param[in] name name of the xattr
+ * \param[in] flag flag for setting xattr
+ *
+ * \retval size of xattr set update.
+ */
+size_t update_records_xattr_set_size(const struct lu_env *env,
+ const struct lu_fid *fid,
+ const struct lu_buf *buf,
+ const char *name, __u32 flag)
+{
+ size_t sizes[3] = {strlen(name) + 1, buf->lb_len, sizeof(flag)};
+
+ return update_records_update_size(3, sizes);
+}
+EXPORT_SYMBOL(update_records_xattr_set_size);
+
+/**
* Pack xattr set update
*
* Pack xattr set update into update records.
EXPORT_SYMBOL(update_records_xattr_set_pack);
/**
+ * Calculate xattr delete update size.
+ *
+ * \param[in] env execution environment
+ * \param[in] fid FID of the object to delete xattr
+ * \param[in] name name of the xattr
+ *
+ * \retval size of xattr delet updatee.
+ */
+size_t update_records_xattr_del_size(const struct lu_env *env,
+ const struct lu_fid *fid,
+ const char *name)
+{
+ size_t size = strlen(name) + 1;
+
+ return update_records_update_size(1, &size);
+}
+EXPORT_SYMBOL(update_records_xattr_del_size);
+
+/**
* Pack xattr delete update
*
* Pack xattr delete update into update records.
EXPORT_SYMBOL(update_records_xattr_del_pack);
/**
+ * Calculate write update size
+ *
+ * \param[in] env execution environment
+ * \param[in] fid FID of the object to write into
+ * \param[in] buf buffer to write which includes an embedded size field
+ * \param[in] pos offet in the object to start writing at
+ *
+ * \retval size of write udpate.
+ */
+size_t update_records_write_size(const struct lu_env *env,
+ const struct lu_fid *fid,
+ const struct lu_buf *buf,
+ __u64 pos)
+{
+ size_t sizes[2] = {buf->lb_len, sizeof(pos)};
+
+ return update_records_update_size(2, sizes);
+}
+EXPORT_SYMBOL(update_records_write_size);
+
+/**
* Pack write update
*
* Pack write update into update records.
EXPORT_SYMBOL(update_records_write_pack);
/**
+ * Calculate size of punch update.
+ *
+ * \param[in] env execution environment
+ * \param[in] fid FID of the object to write into
+ * \param[in] start start offset of punch
+ * \param[in] end end offet of punch
+ *
+ * \retval size of update punch.
+ */
+size_t update_records_punch_size(const struct lu_env *env,
+ const struct lu_fid *fid,
+ __u64 start, __u64 end)
+{
+ size_t sizes[2] = {sizeof(start), sizeof(end)};
+
+ return update_records_update_size(2, sizes);
+}
+EXPORT_SYMBOL(update_records_punch_size);
+
+/**
* Pack punch
*
* Pack punch update into update records.
record_size = llog_update_record_size(tur->tur_update_records);
/* extend update records buffer */
- if (new_op_size > (tur->tur_update_records_buf_size - record_size -
- sizeof(*tur->tur_update_records))) {
+ if (new_op_size >= (tur->tur_update_records_buf_size - record_size)) {
extend_size = round_up(new_op_size, UPDATE_RECORDS_BUFFER_SIZE);
rc = tur_update_records_extend(tur,
tur->tur_update_records_buf_size +
/* extend parameters buffer */
params_size = update_params_size(tur->tur_update_params,
tur->tur_update_param_count);
- if (new_param_size > (tur->tur_update_params_buf_size -
+ if (new_param_size >= (tur->tur_update_params_buf_size -
params_size)) {
extend_size = round_up(new_param_size,
UPDATE_PARAMS_BUFFER_SIZE);
}
/**
+ * Try to add cookie to sub distribute txn request
+ *
+ * Check if the update log cookie has been added to the request, if not,
+ * add it to the dtrqs_cookie_list.
+ *
+ * \param[in] dtrqs sub replay req where cookies to be added.
+ * \param[in] cookie cookie to be added.
+ *
+ * \retval 0 if the cookie is adding succeeds.
+ * \retval negative errno if adding fails.
+ */
+static int dtrq_sub_add_cookie(struct distribute_txn_replay_req_sub *dtrqs,
+ struct llog_cookie *cookie)
+{
+ struct sub_thandle_cookie *new;
+
+ OBD_ALLOC_PTR(new);
+ if (new == NULL)
+ return -ENOMEM;
+
+ INIT_LIST_HEAD(&new->stc_list);
+ new->stc_cookie = *cookie;
+ /* Note: only single thread will access one sub_request each time,
+ * so no need lock here */
+ list_add(&new->stc_list, &dtrqs->dtrqs_cookie_list);
+
+ return 0;
+}
+
+/**
* Insert distribute txn sub req replay
*
* Allocate sub replay req and insert distribute txn replay list.
struct llog_cookie *cookie,
__u32 mdt_index)
{
- struct distribute_txn_replay_req_sub *dtrqs = NULL;
- struct distribute_txn_replay_req_sub *new;
+ struct distribute_txn_replay_req_sub *dtrqs = NULL;
+ struct distribute_txn_replay_req_sub *new;
+ int rc;
ENTRY;
spin_lock(&dtrq->dtrq_sub_list_lock);
dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
spin_unlock(&dtrq->dtrq_sub_list_lock);
- if (dtrqs != NULL)
+ if (dtrqs != NULL) {
+ rc = dtrq_sub_add_cookie(dtrqs, cookie);
RETURN(0);
+ }
OBD_ALLOC_PTR(new);
if (new == NULL)
RETURN(-ENOMEM);
INIT_LIST_HEAD(&new->dtrqs_list);
+ INIT_LIST_HEAD(&new->dtrqs_cookie_list);
new->dtrqs_mdt_index = mdt_index;
- new->dtrqs_llog_cookie = *cookie;
spin_lock(&dtrq->dtrq_sub_list_lock);
dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
- if (dtrqs == NULL)
+ if (dtrqs == NULL) {
list_add(&new->dtrqs_list, &dtrq->dtrq_sub_list);
- else
+ dtrqs = new;
+ } else {
OBD_FREE_PTR(new);
+ }
spin_unlock(&dtrq->dtrq_sub_list_lock);
+ rc = dtrq_sub_add_cookie(dtrqs, cookie);
+
+ RETURN(rc);
+}
+
+/**
+ * append updates to the current replay updates
+ *
+ * Append more updates to the existent replay update. And this is only
+ * used when combining mulitple updates into one large updates during
+ * replay.
+ *
+ * \param[in] dtrq the update replay request where the new update
+ * records will be added.
+ * \param[in] lur the new update record.
+ *
+ * \retval 0 if appending succeeds.
+ * \retval negative errno if appending fails.
+ */
+static int dtrq_append_updates(struct distribute_txn_replay_req *dtrq,
+ struct update_records *record)
+{
+ struct llog_update_record *new_lur;
+ size_t lur_size = dtrq->dtrq_lur_size;
+ void *ptr;
+ ENTRY;
+
+ /* Because several threads might retrieve the same records from
+ * different targets, and we only need one copy of records. So
+ * we will check if the records is in the next one, if not, just
+ * skip it */
+ spin_lock(&dtrq->dtrq_sub_list_lock);
+ if (dtrq->dtrq_lur->lur_update_rec.ur_index + 1 != record->ur_index) {
+ spin_unlock(&dtrq->dtrq_sub_list_lock);
+ RETURN(0);
+ }
+ dtrq->dtrq_lur->lur_update_rec.ur_index++;
+ spin_unlock(&dtrq->dtrq_sub_list_lock);
+
+ lur_size += update_records_size(record);
+ OBD_ALLOC_LARGE(new_lur, lur_size);
+ if (new_lur == NULL) {
+ spin_lock(&dtrq->dtrq_sub_list_lock);
+ dtrq->dtrq_lur->lur_update_rec.ur_index--;
+ spin_unlock(&dtrq->dtrq_sub_list_lock);
+ RETURN(-ENOMEM);
+ }
+
+ /* Copy the old and new records to the new allocated buffer */
+ memcpy(new_lur, dtrq->dtrq_lur, dtrq->dtrq_lur_size);
+ ptr = (char *)&new_lur->lur_update_rec +
+ update_records_size(&new_lur->lur_update_rec);
+ memcpy(ptr, &record->ur_ops,
+ update_records_size(record) -
+ offsetof(struct update_records, ur_ops));
+
+ new_lur->lur_update_rec.ur_update_count += record->ur_update_count;
+ new_lur->lur_update_rec.ur_param_count += record->ur_param_count;
+ new_lur->lur_hdr.lrh_len = llog_update_record_size(new_lur);
+
+ /* Replace the records */
+ OBD_FREE_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
+ dtrq->dtrq_lur = new_lur;
+ dtrq->dtrq_lur_size = lur_size;
+ dtrq->dtrq_lur->lur_update_rec.ur_flags = record->ur_flags;
+ update_records_dump(&new_lur->lur_update_rec, D_INFO, true);
RETURN(0);
}
CDEBUG(D_HA, "%s: insert record batchid = "LPU64" transno = "LPU64
" mdt_index %u\n", tdtd->tdtd_lut->lut_obd->obd_name,
record->ur_batchid, record->ur_master_transno, mdt_index);
-again:
+
+ /* First try to build the replay update request with the records */
spin_lock(&tdtd->tdtd_replay_list_lock);
dtrq = dtrq_lookup(tdtd, record->ur_batchid);
spin_unlock(&tdtd->tdtd_replay_list_lock);
spin_lock(&tdtd->tdtd_replay_list_lock);
rc = dtrq_insert(tdtd, dtrq);
spin_unlock(&tdtd->tdtd_replay_list_lock);
- } else if (record->ur_master_transno != 0 &&
- dtrq->dtrq_lur->lur_update_rec.ur_master_transno !=
- record->ur_master_transno) {
- /* If the master transno in update header is not matched with
- * the one in the record, then it means the dtrq is originally
- * created by master record, and we need update master transno
- * and reposition the dtrq(by master transno). */
- dtrq->dtrq_lur->lur_update_rec.ur_master_transno =
- record->ur_master_transno;
- list_del_init(&dtrq->dtrq_list);
- spin_lock(&tdtd->tdtd_replay_list_lock);
- rc = dtrq_insert(tdtd, dtrq);
- spin_unlock(&tdtd->tdtd_replay_list_lock);
- }
+ if (rc == -EEXIST) {
+ /* Some one else already add the record */
+ dtrq_destroy(dtrq);
+ rc = 0;
+ }
+ } else {
+ struct update_records *dtrq_rec;
+
+ /* If the master transno in update header is not
+ * matched with the one in the record, then it means
+ * the dtrq is originally created by master record,
+ * and we need update master transno and reposition
+ * the dtrq(by master transno). */
+ dtrq_rec = &dtrq->dtrq_lur->lur_update_rec;
+ if (record->ur_master_transno != 0 &&
+ dtrq_rec->ur_master_transno != record->ur_master_transno) {
+ dtrq_rec->ur_master_transno = record->ur_master_transno;
+ spin_lock(&tdtd->tdtd_replay_list_lock);
+ list_del_init(&dtrq->dtrq_list);
+ rc = dtrq_insert(tdtd, dtrq);
+ spin_unlock(&tdtd->tdtd_replay_list_lock);
+ if (rc < 0)
+ return rc;
+ }
- if (rc == -EEXIST) {
- dtrq_destroy(dtrq);
- rc = 0;
- goto again;
+ /* This is a partial update records, let's try to append
+ * the record to the current replay request */
+ if (record->ur_flags & UPDATE_RECORD_CONTINUE)
+ rc = dtrq_append_updates(dtrq, record);
}
+ /* Then create and add sub update request */
rc = dtrq_sub_create_and_insert(dtrq, cookie, mdt_index);
RETURN(rc);
LASSERT(list_empty(&dtrq->dtrq_list));
spin_lock(&dtrq->dtrq_sub_list_lock);
list_for_each_entry_safe(dtrqs, tmp, &dtrq->dtrq_sub_list, dtrqs_list) {
+ struct sub_thandle_cookie *stc;
+ struct sub_thandle_cookie *tmp;
+
list_del(&dtrqs->dtrqs_list);
+ list_for_each_entry_safe(stc, tmp, &dtrqs->dtrqs_cookie_list,
+ stc_list) {
+ list_del(&stc->stc_list);
+ OBD_FREE_PTR(stc);
+ }
OBD_FREE_PTR(dtrqs);
}
spin_unlock(&dtrq->dtrq_sub_list_lock);
dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
if (dtrqs != NULL || top_th->tt_multiple_thandle->tmt_committed) {
st->st_committed = 1;
- if (dtrqs != NULL)
- st->st_cookie = dtrqs->dtrqs_llog_cookie;
+ if (dtrqs != NULL) {
+ struct sub_thandle_cookie *stc;
+ struct sub_thandle_cookie *tmp;
+
+ list_for_each_entry_safe(stc, tmp,
+ &dtrqs->dtrqs_cookie_list,
+ stc_list)
+ list_move(&stc->stc_list, &st->st_cookie_list);
+ }
RETURN(0);
}
struct dt_device *sub_dt;
struct sub_thandle *st;
+ if (op->uop_type == OUT_NOOP)
+ continue;
+
dt_obj = dt_locate(env, tdtd->tdtd_dt, fid);
if (IS_ERR(dt_obj)) {
rc = PTR_ERR(dt_obj);
tmt->tmt_result, tmt->tmt_batchid);
list_for_each_entry(st, &tmt->tmt_sub_thandle_list, st_sub_list) {
- CDEBUG(mask, "st %p obd %s committed %d sub_th %p "
- " cookie "DOSTID": %u\n",
+ struct sub_thandle_cookie *stc;
+
+ CDEBUG(mask, "st %p obd %s committed %d sub_th %p\n",
st, st->st_dt->dd_lu_dev.ld_obd->obd_name,
- st->st_committed, st->st_sub_th,
- POSTID(&st->st_cookie.lgc_lgl.lgl_oi),
- st->st_cookie.lgc_index);
+ st->st_committed, st->st_sub_th);
+
+ list_for_each_entry(stc, &st->st_cookie_list, stc_list) {
+ CDEBUG(mask, " cookie "DOSTID": %u\n",
+ POSTID(&stc->stc_cookie.lgc_lgl.lgl_oi),
+ stc->stc_cookie.lgc_index);
+ }
}
}
* \param[in] env execution environment
* \param[in] record update records being written
* \param[in] sub_th sub transaction handle
+ * \param[in] record_size total update record size
*
* \retval 0 if writing succeeds
* \retval negative errno if writing fails
*/
static int sub_declare_updates_write(const struct lu_env *env,
struct llog_update_record *record,
- struct thandle *sub_th)
+ struct thandle *sub_th, size_t record_size)
{
struct llog_ctxt *ctxt;
struct dt_device *dt = sub_th->th_dev;
+ int left = record_size;
int rc;
/* If ctxt is NULL, it means not need to write update,
LASSERT(ctxt != NULL);
/* Not ready to record updates yet. */
- if (ctxt->loc_handle == NULL) {
- llog_ctxt_put(ctxt);
- return 0;
- }
+ if (ctxt->loc_handle == NULL)
+ GOTO(out_put, rc = 0);
- rc = llog_declare_add(env, ctxt->loc_handle, &record->lur_hdr,
- sub_th);
+ rc = llog_declare_add(env, ctxt->loc_handle,
+ &record->lur_hdr, sub_th);
+ if (rc < 0)
+ GOTO(out_put, rc);
+
+ while (left > ctxt->loc_chunk_size) {
+ rc = llog_declare_add(env, ctxt->loc_handle,
+ &record->lur_hdr, sub_th);
+ if (rc < 0)
+ GOTO(out_put, rc);
+ left -= ctxt->loc_chunk_size;
+ }
+
+out_put:
llog_ctxt_put(ctxt);
return rc;
*/
static int sub_updates_write(const struct lu_env *env,
struct llog_update_record *record,
- struct thandle *sub_th,
- struct llog_cookie *cookie)
+ struct sub_thandle *sub_th)
{
- struct dt_device *dt = sub_th->th_dev;
+ struct dt_device *dt = sub_th->st_dt;
struct llog_ctxt *ctxt;
int rc;
+ struct llog_update_record *lur = NULL;
+ struct update_params *params = NULL;
+ __u32 update_count = 0;
+ __u32 param_count = 0;
+ __u32 last_update_count = 0;
+ __u32 last_param_count = 0;
+ void *src;
+ void *start;
+ void *next;
+ struct sub_thandle_cookie *stc;
ENTRY;
ctxt = llog_get_context(dt->dd_lu_dev.ld_obd,
/* Not ready to record updates yet, usually happens
* in error handler path */
- if (ctxt->loc_handle == NULL) {
- llog_ctxt_put(ctxt);
- RETURN(0);
- }
+ if (ctxt->loc_handle == NULL)
+ GOTO(llog_put, rc = 0);
/* Since the cross-MDT updates will includes both local
* and remote updates, the update ops count must > 1 */
"lrh_len %u record_size %zu\n", record->lur_hdr.lrh_len,
llog_update_record_size(record));
- rc = llog_add(env, ctxt->loc_handle, &record->lur_hdr,
- cookie, sub_th);
- llog_ctxt_put(ctxt);
+ if (likely(record->lur_hdr.lrh_len <= ctxt->loc_chunk_size)) {
+ OBD_ALLOC_PTR(stc);
+ if (stc == NULL)
+ GOTO(llog_put, rc = -ENOMEM);
+ INIT_LIST_HEAD(&stc->stc_list);
+
+ rc = llog_add(env, ctxt->loc_handle, &record->lur_hdr,
+ &stc->stc_cookie, sub_th->st_sub_th);
+
+ CDEBUG(D_INFO, "%s: Add update log "DOSTID":%u: rc = %d\n",
+ dt->dd_lu_dev.ld_obd->obd_name,
+ POSTID(&stc->stc_cookie.lgc_lgl.lgl_oi),
+ stc->stc_cookie.lgc_index, rc);
+
+ if (rc > 0) {
+ list_add(&stc->stc_list, &sub_th->st_cookie_list);
+ rc = 0;
+ } else {
+ OBD_FREE_PTR(stc);
+ }
+
+ GOTO(llog_put, rc);
+ }
- CDEBUG(D_INFO, "%s: Add update log "DOSTID":%u.\n",
- dt->dd_lu_dev.ld_obd->obd_name,
- POSTID(&cookie->lgc_lgl.lgl_oi), cookie->lgc_index);
+ /* Split the records into chunk_size update record */
+ OBD_ALLOC_LARGE(lur, ctxt->loc_chunk_size);
+ if (lur == NULL)
+ GOTO(llog_put, rc = -ENOMEM);
- if (rc > 0)
- rc = 0;
+ memcpy(lur, &record->lur_hdr, sizeof(record->lur_hdr));
+ lur->lur_update_rec.ur_update_count = 0;
+ lur->lur_update_rec.ur_param_count = 0;
+ src = &record->lur_update_rec.ur_ops;
+ start = next = src;
+ lur->lur_hdr.lrh_len = llog_update_record_size(lur);
+ params = update_records_get_params(&record->lur_update_rec);
+ do {
+ size_t rec_len;
+
+ if (update_count < record->lur_update_rec.ur_update_count) {
+ next = update_op_next_op((struct update_op *)src);
+ } else {
+ if (param_count == 0)
+ next = update_records_get_params(
+ &record->lur_update_rec);
+ else
+ next = (char *)src +
+ object_update_param_size(
+ (struct object_update_param *)src);
+ }
+
+ rec_len = cfs_size_round((unsigned long)(next - src));
+ /* If its size > llog chunk_size, then write current chunk to
+ * the update llog. */
+ if (lur->lur_hdr.lrh_len + rec_len + LLOG_MIN_REC_SIZE >
+ ctxt->loc_chunk_size ||
+ param_count == record->lur_update_rec.ur_param_count) {
+ lur->lur_update_rec.ur_update_count =
+ update_count > last_update_count ?
+ update_count - last_update_count : 0;
+ lur->lur_update_rec.ur_param_count = param_count -
+ last_param_count;
+
+ memcpy(&lur->lur_update_rec.ur_ops, start,
+ (unsigned long)(src - start));
+ if (last_update_count != 0)
+ lur->lur_update_rec.ur_flags |=
+ UPDATE_RECORD_CONTINUE;
+
+ update_records_dump(&lur->lur_update_rec, D_INFO, true);
+ lur->lur_hdr.lrh_len = llog_update_record_size(lur);
+ LASSERT(lur->lur_hdr.lrh_len <= ctxt->loc_chunk_size);
+
+ OBD_ALLOC_PTR(stc);
+ if (stc == NULL)
+ GOTO(llog_put, rc = -ENOMEM);
+ INIT_LIST_HEAD(&stc->stc_list);
+
+ rc = llog_add(env, ctxt->loc_handle,
+ &lur->lur_hdr,
+ &stc->stc_cookie, sub_th->st_sub_th);
+
+ CDEBUG(D_INFO, "%s: Add update log "DOSTID":%u"
+ " rc = %d\n", dt->dd_lu_dev.ld_obd->obd_name,
+ POSTID(&stc->stc_cookie.lgc_lgl.lgl_oi),
+ stc->stc_cookie.lgc_index, rc);
+
+ if (rc > 0) {
+ list_add(&stc->stc_list,
+ &sub_th->st_cookie_list);
+ rc = 0;
+ } else {
+ OBD_FREE_PTR(stc);
+ GOTO(llog_put, rc);
+ }
+
+ last_update_count = update_count;
+ last_param_count = param_count;
+ start = src;
+ lur->lur_update_rec.ur_update_count = 0;
+ lur->lur_update_rec.ur_param_count = 0;
+ lur->lur_hdr.lrh_len = llog_update_record_size(lur);
+ }
+
+ src = next;
+ lur->lur_hdr.lrh_len += cfs_size_round(rec_len);
+ if (update_count < record->lur_update_rec.ur_update_count)
+ update_count++;
+ else if (param_count < record->lur_update_rec.ur_param_count)
+ param_count++;
+ else
+ break;
+ } while (1);
+
+llog_put:
+ if (lur != NULL)
+ OBD_FREE_LARGE(lur, ctxt->loc_chunk_size);
+ llog_ctxt_put(ctxt);
RETURN(rc);
}
RETURN(ERR_PTR(-ENOMEM));
INIT_LIST_HEAD(&st->st_sub_list);
+ INIT_LIST_HEAD(&st->st_cookie_list);
st->st_dt = dt_dev;
list_add(&st->st_sub_list, &tmt->tmt_sub_thandle_list);
if (st->st_sub_th == NULL)
continue;
- rc = sub_declare_updates_write(env, record, st->st_sub_th);
+ rc = sub_declare_updates_write(env, record, st->st_sub_th,
+ tmt->tmt_record_size);
if (rc < 0)
break;
}
lur = tur->tur_update_records;
/* Write updates to the master MDT */
- rc = sub_updates_write(env, lur, master_st->st_sub_th,
- &master_st->st_cookie);
+ rc = sub_updates_write(env, lur, master_st);
/* Cleanup the common parameters in the update records,
* master transno callback might add more parameters.
st->st_sub_th->th_result < 0)
continue;
- rc = sub_updates_write(env, lur, st->st_sub_th,
- &st->st_cookie);
+ rc = sub_updates_write(env, lur, st);
if (rc < 0) {
th->th_result = rc;
break;
LASSERT(tmt->tmt_magic == TOP_THANDLE_MAGIC);
list_for_each_entry_safe(st, tmp, &tmt->tmt_sub_thandle_list,
st_sub_list) {
+ struct sub_thandle_cookie *stc;
+ struct sub_thandle_cookie *tmp;
+
list_del(&st->st_sub_list);
+ list_for_each_entry_safe(stc, tmp, &st->st_cookie_list,
+ stc_list) {
+ list_del(&stc->stc_list);
+ OBD_FREE_PTR(stc);
+ }
OBD_FREE_PTR(st);
}
OBD_FREE_PTR(tmt);
struct llog_ctxt *ctxt;
struct obd_device *obd;
struct llog_cookie *cookie;
+ struct sub_thandle_cookie *stc;
int rc;
- cookie = &st->st_cookie;
- if (fid_is_zero(&cookie->lgc_lgl.lgl_oi.oi_fid))
- continue;
-
obd = st->st_dt->dd_lu_dev.ld_obd;
ctxt = llog_get_context(obd, LLOG_UPDATELOG_ORIG_CTXT);
LASSERT(ctxt);
+ list_for_each_entry(stc, &st->st_cookie_list, stc_list) {
+ cookie = &stc->stc_cookie;
+ if (fid_is_zero(&cookie->lgc_lgl.lgl_oi.oi_fid))
+ continue;
- rc = llog_cat_cancel_records(env, ctxt->loc_handle, 1,
- cookie);
+ rc = llog_cat_cancel_records(env, ctxt->loc_handle, 1,
+ cookie);
+ CDEBUG(D_HA, "%s: batchid %llu cancel update log "
+ DOSTID ".%u : rc = %d\n", obd->obd_name,
+ tmt->tmt_batchid,
+ POSTID(&cookie->lgc_lgl.lgl_oi),
+ cookie->lgc_index, rc);
+ }
llog_ctxt_put(ctxt);
- CDEBUG(D_HA, "%s: batchid %llu cancel update log "DOSTID
- ".%u : rc = %d\n", obd->obd_name, tmt->tmt_batchid,
- POSTID(&cookie->lgc_lgl.lgl_oi), cookie->lgc_index, rc);
}
RETURN(0);
}
run_test 115 "failover for create/unlink striped directory"
+test_116a() {
+ [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return 0
+ [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.7.55) ] &&
+ skip "Do not support large update log before 2.7.55" &&
+ return 0
+ ([ $FAILURE_MODE == "HARD" ] &&
+ [ "$(facet_host mds1)" == "$(facet_host mds2)" ]) &&
+ skip "MDTs needs to be on diff hosts for HARD fail mode" &&
+ return 0
+ local fail_index=0
+
+ mkdir -p $DIR/$tdir
+ replay_barrier mds1
+
+ # OBD_FAIL_SPLIT_UPDATE_REC 0x1702
+ do_facet mds1 "lctl set_param fail_loc=0x80001702"
+ $LFS setdirstripe -c$MDSCOUNT $DIR/$tdir/striped_dir
+
+ fail mds1
+ $CHECKSTAT -t dir $DIR/$tdir/striped_dir ||
+ error "stried_dir does not exists"
+}
+run_test 116a "large update log master MDT recovery"
+
+test_116b() {
+ [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return 0
+ [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.7.55) ] &&
+ skip "Do not support large update log before 2.7.55" &&
+ return 0
+
+ ([ $FAILURE_MODE == "HARD" ] &&
+ [ "$(facet_host mds1)" == "$(facet_host mds2)" ]) &&
+ skip "MDTs needs to be on diff hosts for HARD fail mode" &&
+ return 0
+ local fail_index=0
+
+ mkdir -p $DIR/$tdir
+ replay_barrier mds2
+
+ # OBD_FAIL_SPLIT_UPDATE_REC 0x1702
+ do_facet mds2 "lctl set_param fail_loc=0x80001702"
+ $LFS setdirstripe -c$MDSCOUNT $DIR/$tdir/striped_dir
+
+ fail mds2
+ $CHECKSTAT -t dir $DIR/$tdir/striped_dir ||
+ error "stried_dir does not exists"
+}
+run_test 116b "large update log slave MDT recovery"
+
+
complete $SECONDS
check_and_cleanup_lustre
exit_status
}
run_test 300i "client handle unknown hash type striped directory"
+test_300j() {
+ [ $PARALLEL == "yes" ] && skip "skip parallel run" && return
+ [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.7.55) ] &&
+ skip "Need MDS version at least 2.7.55" && return
+ [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return
+ local stripe_count
+ local file
+
+ mkdir $DIR/$tdir
+
+ #define OBD_FAIL_SPLIT_UPDATE_REC 0x1702
+ $LCTL set_param fail_loc=0x1702
+ $LFS setdirstripe -i 0 -c$MDSCOUNT -t all_char $DIR/$tdir/striped_dir ||
+ error "set striped dir error"
+
+ createmany -o $DIR/$tdir/striped_dir/f- 10 ||
+ error "create files under striped dir failed"
+
+ $LCTL set_param fail_loc=0
+
+ rm -rf $DIR/$tdir || error "unlink striped dir fails"
+
+ return 0
+}
+run_test 300j "test large update record"
+
+test_300k() {
+ [ $PARALLEL == "yes" ] && skip "skip parallel run" && return
+ [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.7.55) ] &&
+ skip "Need MDS version at least 2.7.55" && return
+ [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return
+ local stripe_count
+ local file
+
+ mkdir $DIR/$tdir
+
+ #define OBD_FAIL_LARGE_STRIPE 0x1703
+ $LCTL set_param fail_loc=0x1703
+ $LFS setdirstripe -i 0 -c512 $DIR/$tdir/striped_dir ||
+ error "set striped dir error"
+ $LCTL set_param fail_loc=0
+
+ $LFS getdirstripe $DIR/$tdir/striped_dir ||
+ error "getstripeddir fails"
+ rm -rf $DIR/$tdir/striped_dir ||
+ error "unlink striped dir fails"
+
+ return 0
+}
+run_test 300k "test large striped directory"
+
prepare_remote_file() {
mkdir $DIR/$tdir/src_dir ||
error "create remote source failed"
static int cb_get_dirstripe(char *path, DIR *d, struct find_param *param)
{
+ int ret;
+
+again:
param->fp_lmv_md->lum_stripe_count = param->fp_lmv_stripe_count;
if (param->fp_get_default_lmv)
param->fp_lmv_md->lum_magic = LMV_USER_MAGIC;
else
param->fp_lmv_md->lum_magic = LMV_MAGIC_V1;
- return ioctl(dirfd(d), LL_IOC_LMV_GETSTRIPE, param->fp_lmv_md);
+ ret = ioctl(dirfd(d), LL_IOC_LMV_GETSTRIPE, param->fp_lmv_md);
+ if (errno == E2BIG && ret != 0) {
+ int stripe_count;
+ int lmv_size;
+
+ stripe_count = (__u32)param->fp_lmv_md->lum_stripe_count;
+ if (stripe_count <= param->fp_lmv_stripe_count)
+ return ret;
+
+ free(param->fp_lmv_md);
+ param->fp_lmv_stripe_count = stripe_count;
+ lmv_size = lmv_user_md_size(stripe_count, LMV_MAGIC_V1);
+ param->fp_lmv_md = malloc(lmv_size);
+ if (param->fp_lmv_md == NULL) {
+ llapi_error(LLAPI_MSG_ERROR, -ENOMEM,
+ "error: allocation of %d bytes for ioctl",
+ lmv_user_md_size(param->fp_lmv_stripe_count,
+ LMV_MAGIC_V1));
+ return -ENOMEM;
+ }
+ goto again;
+ }
+ return ret;
}
static int get_lmd_info(char *path, DIR *parent, DIR *dir,
CHECK_MEMBER(lfsck_reply, lr_padding_2);
}
+static void check_update_params(void)
+{
+ BLANK_LINE();
+ CHECK_STRUCT(update_params);
+ CHECK_MEMBER(update_params, up_params);
+}
+
+static void check_update_op(void)
+{
+ BLANK_LINE();
+ CHECK_STRUCT(update_op);
+ CHECK_MEMBER(update_op, uop_fid);
+ CHECK_MEMBER(update_op, uop_type);
+ CHECK_MEMBER(update_op, uop_param_count);
+ CHECK_MEMBER(update_op, uop_params_off);
+}
+
+static void check_update_ops(void)
+{
+ BLANK_LINE();
+ CHECK_STRUCT(update_ops);
+ CHECK_MEMBER(update_ops, uops_op);
+}
+
+static void check_update_records(void)
+{
+ BLANK_LINE();
+ CHECK_STRUCT(update_records);
+ CHECK_MEMBER(update_records, ur_master_transno);
+ CHECK_MEMBER(update_records, ur_batchid);
+ CHECK_MEMBER(update_records, ur_flags);
+ CHECK_MEMBER(update_records, ur_index);
+ CHECK_MEMBER(update_records, ur_update_count);
+ CHECK_MEMBER(update_records, ur_param_count);
+
+ CHECK_VALUE_X(UPDATE_RECORD_CONTINUE);
+}
+
+static void check_llog_update_record(void)
+{
+ BLANK_LINE();
+ CHECK_STRUCT(llog_update_record);
+ CHECK_MEMBER(llog_update_record, lur_hdr);
+ CHECK_MEMBER(llog_update_record, lur_update_rec);
+}
+
static void system_string(char *cmdline, char *str, int len)
{
int fds[2];
(long long)(int)offsetof(struct llog_log_hdr, llh_tgtuuid));
LASSERTF((int)sizeof(((struct llog_log_hdr *)0)->llh_tgtuuid) == 40, "found %lld\n",
(long long)(int)sizeof(((struct llog_log_hdr *)0)->llh_tgtuuid));
- LASSERTF((int)offsetof(struct llog_log_hdr, llh_reserved) == 84, "found %lld\n",
- (long long)(int)offsetof(struct llog_log_hdr, llh_reserved));
- LASSERTF((int)sizeof(((struct llog_log_hdr *)0)->llh_reserved) == 4, "found %lld\n",
- (long long)(int)sizeof(((struct llog_log_hdr *)0)->llh_reserved));
- LASSERTF((int)offsetof(struct llog_log_hdr, llh_bitmap) == 88, "found %lld\n",
- (long long)(int)offsetof(struct llog_log_hdr, llh_bitmap));
- LASSERTF((int)sizeof(((struct llog_log_hdr *)0)->llh_bitmap) == 8096, "found %lld\n",
- (long long)(int)sizeof(((struct llog_log_hdr *)0)->llh_bitmap));
- LASSERTF((int)offsetof(struct llog_log_hdr, llh_tail) == 8184, "found %lld\n",
- (long long)(int)offsetof(struct llog_log_hdr, llh_tail));
- LASSERTF((int)sizeof(((struct llog_log_hdr *)0)->llh_tail) == 8, "found %lld\n",
- (long long)(int)sizeof(((struct llog_log_hdr *)0)->llh_tail));
/* Checks for struct llog_cookie */
LASSERTF((int)sizeof(struct llog_cookie) == 32, "found %lld\n",
(long long)(int)offsetof(struct fiemap_extent, fe_flags));
LASSERTF((int)sizeof(((struct fiemap_extent *)0)->fe_flags) == 4, "found %lld\n",
(long long)(int)sizeof(((struct fiemap_extent *)0)->fe_flags));
- LASSERTF((int)offsetof(struct fiemap_extent, fe_device) == 44, "found %lld\n",
- (long long)(int)offsetof(struct fiemap_extent, fe_device));
- LASSERTF((int)sizeof(((struct fiemap_extent *)0)->fe_device) == 4, "found %lld\n",
- (long long)(int)sizeof(((struct fiemap_extent *)0)->fe_device));
+ LASSERTF((int)offsetof(struct fiemap_extent, fe_reserved[0]) == 44, "found %lld\n",
+ (long long)(int)offsetof(struct fiemap_extent, fe_reserved[0]));
+ LASSERTF((int)sizeof(((struct fiemap_extent *)0)->fe_reserved[0]) == 4, "found %lld\n",
+ (long long)(int)sizeof(((struct fiemap_extent *)0)->fe_reserved[0]));
CLASSERT(FIEMAP_EXTENT_LAST == 0x00000001);
CLASSERT(FIEMAP_EXTENT_UNKNOWN == 0x00000002);
CLASSERT(FIEMAP_EXTENT_DELALLOC == 0x00000004);