idx, i, PFID(&fid));
idx_array[i] = idx;
/* Set the start index for next stripe allocation */
- if (!is_specific && i < stripe_count - 1)
+ if (!is_specific && i < stripe_count - 1) {
+ /*
+ * for large dir test, put all other slaves on one
+ * remote MDT, otherwise we may save too many local
+ * slave locks which will exceed RS_MAX_LOCKS.
+ */
+ if (unlikely(OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE)))
+ idx = master_index;
idx_array[i + 1] = (idx + 1) %
(lod->lod_remote_mdt_count + 1);
+ }
/* tgt_dt and fid must be ready after search avaible OSP
* in the above loop */
LASSERT(tgt_dt != NULL);
}
/**
- * Release LDLM locks on the stripes of a striped directory.
- *
- * Iterates over all the locks taken on the stripe objects and
- * cancel them.
- *
- * \param[in] env execution environment
- * \param[in] dt striped object
- * \param[in] einfo lock description
- * \param[in] policy data describing requested lock
- *
- * \retval 0 on success
- * \retval negative if failed
- */
-static int lod_object_unlock_internal(const struct lu_env *env,
- struct dt_object *dt,
- struct ldlm_enqueue_info *einfo,
- union ldlm_policy_data *policy)
-{
- struct lustre_handle_array *slave_locks = einfo->ei_cbdata;
- int rc = 0;
- int i;
- ENTRY;
-
- if (slave_locks == NULL)
- RETURN(0);
-
- for (i = 1; i < slave_locks->count; i++) {
- if (lustre_handle_is_used(&slave_locks->handles[i]))
- ldlm_lock_decref_and_cancel(&slave_locks->handles[i],
- einfo->ei_mode);
- }
-
- RETURN(rc);
-}
-
-/**
* Implementation of dt_object_operations::do_object_unlock.
*
* Used to release LDLM lock(s).
LASSERT(!dt_object_remote(dt_object_child(dt)));
/* locks were unlocked in MDT layer */
- for (i = 1; i < slave_locks->count; i++) {
- LASSERT(!lustre_handle_is_used(&slave_locks->handles[i]));
+ for (i = 0; i < slave_locks->ha_count; i++)
+ LASSERT(!lustre_handle_is_used(&slave_locks->ha_handles[i]));
+
+ /*
+ * NB, ha_count may not equal to ldo_dir_stripe_count, because dir
+ * layout may change, e.g., shrink dir layout after migration.
+ */
+ for (i = 0; i < lo->ldo_dir_stripe_count; i++)
dt_invalidate(env, lo->ldo_stripe[i]);
- }
- slave_locks_size = sizeof(*slave_locks) + slave_locks->count *
- sizeof(slave_locks->handles[0]);
+ slave_locks_size = offsetof(typeof(*slave_locks),
+ ha_handles[slave_locks->ha_count]);
OBD_FREE(slave_locks, slave_locks_size);
einfo->ei_cbdata = NULL;
struct ldlm_enqueue_info *einfo,
union ldlm_policy_data *policy)
{
- struct lod_object *lo = lod_dt_obj(dt);
- int rc = 0;
- int i;
- int slave_locks_size;
+ struct lod_object *lo = lod_dt_obj(dt);
+ int slave_locks_size;
struct lustre_handle_array *slave_locks = NULL;
+ int i;
+ int rc;
ENTRY;
/* remote object lock */
}
if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
- GOTO(out, rc = -ENOTDIR);
+ RETURN(-ENOTDIR);
rc = lod_load_striping(env, lo);
if (rc != 0)
- GOTO(out, rc);
+ RETURN(rc);
/* No stripes */
- if (lo->ldo_dir_stripe_count <= 1) {
- /*
- * NB, ei_cbdata stores pointer to slave locks, if no locks
- * taken, make sure it's set to NULL, otherwise MDT will try to
- * unlock them.
- */
- einfo->ei_cbdata = NULL;
- GOTO(out, rc = 0);
- }
+ if (lo->ldo_dir_stripe_count <= 1)
+ RETURN(0);
- slave_locks_size = sizeof(*slave_locks) + lo->ldo_dir_stripe_count *
- sizeof(slave_locks->handles[0]);
+ slave_locks_size = offsetof(typeof(*slave_locks),
+ ha_handles[lo->ldo_dir_stripe_count]);
/* Freed in lod_object_unlock */
OBD_ALLOC(slave_locks, slave_locks_size);
- if (slave_locks == NULL)
- GOTO(out, rc = -ENOMEM);
- slave_locks->count = lo->ldo_dir_stripe_count;
+ if (!slave_locks)
+ RETURN(-ENOMEM);
+ slave_locks->ha_count = lo->ldo_dir_stripe_count;
/* striped directory lock */
- for (i = 1; i < lo->ldo_dir_stripe_count; i++) {
- struct lustre_handle lockh;
- struct ldlm_res_id *res_id;
+ for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
+ struct lustre_handle lockh;
+ struct ldlm_res_id *res_id;
res_id = &lod_env_info(env)->lti_res_id;
fid_build_reg_res_name(lu_object_fid(&lo->ldo_stripe[i]->do_lu),
einfo->ei_res_id = res_id;
LASSERT(lo->ldo_stripe[i] != NULL);
- if (likely(dt_object_remote(lo->ldo_stripe[i]))) {
+ if (dt_object_remote(lo->ldo_stripe[i])) {
+ set_bit(i, (void *)slave_locks->ha_map);
rc = dt_object_lock(env, lo->ldo_stripe[i], &lockh,
einfo, policy);
} else {
struct ldlm_namespace *ns = einfo->ei_namespace;
ldlm_blocking_callback blocking = einfo->ei_cb_local_bl;
ldlm_completion_callback completion = einfo->ei_cb_cp;
- __u64 dlmflags = LDLM_FL_ATOMIC_CB;
+ __u64 dlmflags = LDLM_FL_ATOMIC_CB;
if (einfo->ei_mode == LCK_PW ||
einfo->ei_mode == LCK_EX)
dlmflags |= LDLM_FL_COS_INCOMPAT;
- /* This only happens if there are mulitple stripes
- * on the master MDT, i.e. except stripe0, there are
- * other stripes on the Master MDT as well, Only
- * happens in the test case right now. */
LASSERT(ns != NULL);
rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS,
policy, einfo->ei_mode,
NULL, 0, LVB_T_NONE,
NULL, &lockh);
}
- if (rc != 0)
- break;
- slave_locks->handles[i] = lockh;
+ if (rc) {
+ while (i--)
+ ldlm_lock_decref_and_cancel(
+ &slave_locks->ha_handles[i],
+ einfo->ei_mode);
+ OBD_FREE(slave_locks, slave_locks_size);
+ RETURN(rc);
+ }
+ slave_locks->ha_handles[i] = lockh;
}
einfo->ei_cbdata = slave_locks;
- if (rc != 0 && slave_locks != NULL) {
- lod_object_unlock_internal(env, dt, einfo, policy);
- OBD_FREE(slave_locks, slave_locks_size);
- }
- EXIT;
-out:
- if (rc != 0)
- einfo->ei_cbdata = NULL;
- RETURN(rc);
+ RETURN(0);
}
/**