struct dt_object **stripe;
__u32 stripe_count;
int *idx_array;
+ __u32 master_index;
int rc = 0;
__u32 i;
__u32 j;
stripe_count = le32_to_cpu(lum->lum_stripe_count);
/* shrink the stripe_count to the avaible MDT count */
- if (stripe_count > lod->lod_remote_mdt_count + 1)
+ if (stripe_count > lod->lod_remote_mdt_count + 1 &&
+ !OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE))
stripe_count = lod->lod_remote_mdt_count + 1;
OBD_ALLOC(stripe, sizeof(stripe[0]) * stripe_count);
if (idx_array == NULL)
GOTO(out_free, rc = -ENOMEM);
+ /* Start index will be the master MDT */
+ master_index = lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_node_id;
+ idx_array[0] = master_index;
for (i = 0; i < stripe_count; i++) {
struct lod_tgt_desc *tgt = NULL;
struct dt_object *dto;
struct lu_object_conf conf = { 0 };
struct dt_device *tgt_dt = NULL;
- if (i == 0) {
- /* Right now, master stripe and master object are
- * on the same MDT */
- idx = lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_node_id;
- rc = obd_fid_alloc(env, lod->lod_child_exp, &fid,
- NULL);
- if (rc < 0)
- GOTO(out_put, rc);
- tgt_dt = lod->lod_child;
- goto next;
- }
-
- idx = (idx_array[i - 1] + 1) % (lod->lod_remote_mdt_count + 1);
-
+ /* Try to find next avaible target */
+ idx = idx_array[i];
for (j = 0; j < lod->lod_remote_mdt_count;
j++, idx = (idx + 1) % (lod->lod_remote_mdt_count + 1)) {
bool already_allocated = false;
__u32 k;
- CDEBUG(D_INFO, "try idx %d, mdt cnt %u,"
- " allocated %u, last allocated %d\n", idx,
- lod->lod_remote_mdt_count, i, idx_array[i - 1]);
+ CDEBUG(D_INFO, "try idx %d, mdt cnt %u, allocated %u\n",
+ idx, lod->lod_remote_mdt_count + 1, i);
+ if (idx == master_index) {
+ /* Allocate the FID locally */
+ rc = obd_fid_alloc(env, lod->lod_child_exp,
+ &fid, NULL);
+ if (rc < 0)
+ GOTO(out_put, rc);
+ tgt_dt = lod->lod_child;
+ break;
+ }
/* Find next available target */
if (!cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx))
continue;
- /* check whether the idx already exists
- * in current allocated array */
- for (k = 0; k < i; k++) {
- if (idx_array[k] == idx) {
- already_allocated = true;
- break;
+ if (likely(!OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE))) {
+ /* check whether the idx already exists
+ * in current allocated array */
+ for (k = 0; k < i; k++) {
+ if (idx_array[k] == idx) {
+ already_allocated = true;
+ break;
+ }
}
- }
- if (already_allocated)
- continue;
+ if (already_allocated)
+ continue;
+ }
/* check the status of the OSP */
tgt = LTD_TGT(ltd, idx);
break;
}
- CDEBUG(D_INFO, "idx %d, mdt cnt %u,"
- " allocated %u, last allocated %d\n", idx,
- lod->lod_remote_mdt_count, i, idx_array[i - 1]);
-
-next:
+ CDEBUG(D_INFO, "Get idx %d, for stripe %d "DFID"\n",
+ idx, i, PFID(&fid));
+ idx_array[i] = idx;
+ /* Set the start index for next stripe allocation */
+ if (i < stripe_count)
+ idx_array[i + 1] = (idx + 1) %
+ (lod->lod_remote_mdt_count + 1);
/* tgt_dt and fid must be ready after search avaible OSP
* in the above loop */
LASSERT(tgt_dt != NULL);
if (IS_ERR(dto))
GOTO(out_put, rc = PTR_ERR(dto));
stripe[i] = dto;
- idx_array[i] = idx;
}
lo->ldo_dir_striped = 1;
res_id);
einfo->ei_res_id = res_id;
- LASSERT(lo->ldo_stripe[i]);
- rc = dt_object_lock(env, lo->ldo_stripe[i], &lockh, einfo,
- policy);
+ LASSERT(lo->ldo_stripe[i] != NULL);
+ if (likely(dt_object_remote(lo->ldo_stripe[i]))) {
+ rc = dt_object_lock(env, lo->ldo_stripe[i], &lockh,
+ einfo, policy);
+ } else {
+ struct ldlm_namespace *ns = einfo->ei_namespace;
+ ldlm_blocking_callback blocking = einfo->ei_cb_local_bl;
+ ldlm_completion_callback completion = einfo->ei_cb_cp;
+ __u64 dlmflags = LDLM_FL_ATOMIC_CB;
+
+ /* This only happens if there are mulitple stripes
+ * on the master MDT, i.e. except stripe0, there are
+ * other stripes on the Master MDT as well, Only
+ * happens in the test case right now. */
+ LASSERT(ns != NULL);
+ rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS,
+ policy, einfo->ei_mode,
+ &dlmflags, blocking,
+ completion, NULL,
+ NULL, 0, LVB_T_NONE,
+ NULL, &lockh);
+ }
if (rc != 0)
GOTO(out, rc);
slave_locks->lsl_handle[i] = lockh;