Whamcloud - gitweb
LU-4684 mdt: improve directory stripe lock
[fs/lustre-release.git] / lustre / lod / lod_object.c
index 1fac134..7bcc4f2 100644 (file)
@@ -1982,9 +1982,17 @@ static int lod_prep_md_striped_create(const struct lu_env *env,
                       idx, i, PFID(&fid));
                idx_array[i] = idx;
                /* Set the start index for next stripe allocation */
-               if (!is_specific && i < stripe_count - 1)
+               if (!is_specific && i < stripe_count - 1) {
+                       /*
+                        * for large dir test, put all other slaves on one
+                        * remote MDT, otherwise we may save too many local
+                        * slave locks which will exceed RS_MAX_LOCKS.
+                        */
+                       if (unlikely(OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE)))
+                               idx = master_index;
                        idx_array[i + 1] = (idx + 1) %
                                           (lod->lod_remote_mdt_count + 1);
+               }
                /* tgt_dt and fid must be ready after search avaible OSP
                 * in the above loop */
                LASSERT(tgt_dt != NULL);
@@ -5159,42 +5167,6 @@ static int lod_object_sync(const struct lu_env *env, struct dt_object *dt,
 }
 
 /**
- * Release LDLM locks on the stripes of a striped directory.
- *
- * Iterates over all the locks taken on the stripe objects and
- * cancel them.
- *
- * \param[in] env      execution environment
- * \param[in] dt       striped object
- * \param[in] einfo    lock description
- * \param[in] policy   data describing requested lock
- *
- * \retval             0 on success
- * \retval             negative if failed
- */
-static int lod_object_unlock_internal(const struct lu_env *env,
-                                     struct dt_object *dt,
-                                     struct ldlm_enqueue_info *einfo,
-                                     union ldlm_policy_data *policy)
-{
-       struct lustre_handle_array *slave_locks = einfo->ei_cbdata;
-       int                     rc = 0;
-       int                     i;
-       ENTRY;
-
-       if (slave_locks == NULL)
-               RETURN(0);
-
-       for (i = 1; i < slave_locks->count; i++) {
-               if (lustre_handle_is_used(&slave_locks->handles[i]))
-                       ldlm_lock_decref_and_cancel(&slave_locks->handles[i],
-                                                   einfo->ei_mode);
-       }
-
-       RETURN(rc);
-}
-
-/**
  * Implementation of dt_object_operations::do_object_unlock.
  *
  * Used to release LDLM lock(s).
@@ -5222,13 +5194,18 @@ static int lod_object_unlock(const struct lu_env *env, struct dt_object *dt,
        LASSERT(!dt_object_remote(dt_object_child(dt)));
 
        /* locks were unlocked in MDT layer */
-       for (i = 1; i < slave_locks->count; i++) {
-               LASSERT(!lustre_handle_is_used(&slave_locks->handles[i]));
+       for (i = 0; i < slave_locks->ha_count; i++)
+               LASSERT(!lustre_handle_is_used(&slave_locks->ha_handles[i]));
+
+       /*
+        * NB, ha_count may not equal to ldo_dir_stripe_count, because dir
+        * layout may change, e.g., shrink dir layout after migration.
+        */
+       for (i = 0; i < lo->ldo_dir_stripe_count; i++)
                dt_invalidate(env, lo->ldo_stripe[i]);
-       }
 
-       slave_locks_size = sizeof(*slave_locks) + slave_locks->count *
-                          sizeof(slave_locks->handles[0]);
+       slave_locks_size = offsetof(typeof(*slave_locks),
+                                   ha_handles[slave_locks->ha_count]);
        OBD_FREE(slave_locks, slave_locks_size);
        einfo->ei_cbdata = NULL;
 
@@ -5249,11 +5226,11 @@ static int lod_object_lock(const struct lu_env *env,
                           struct ldlm_enqueue_info *einfo,
                           union ldlm_policy_data *policy)
 {
-       struct lod_object       *lo = lod_dt_obj(dt);
-       int                     rc = 0;
-       int                     i;
-       int                     slave_locks_size;
+       struct lod_object *lo = lod_dt_obj(dt);
+       int slave_locks_size;
        struct lustre_handle_array *slave_locks = NULL;
+       int i;
+       int rc;
        ENTRY;
 
        /* remote object lock */
@@ -5264,35 +5241,28 @@ static int lod_object_lock(const struct lu_env *env,
        }
 
        if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
-               GOTO(out, rc = -ENOTDIR);
+               RETURN(-ENOTDIR);
 
        rc = lod_load_striping(env, lo);
        if (rc != 0)
-               GOTO(out, rc);
+               RETURN(rc);
 
        /* No stripes */
-       if (lo->ldo_dir_stripe_count <= 1) {
-               /*
-                * NB, ei_cbdata stores pointer to slave locks, if no locks
-                * taken, make sure it's set to NULL, otherwise MDT will try to
-                * unlock them.
-                */
-               einfo->ei_cbdata = NULL;
-               GOTO(out, rc = 0);
-       }
+       if (lo->ldo_dir_stripe_count <= 1)
+               RETURN(0);
 
-       slave_locks_size = sizeof(*slave_locks) + lo->ldo_dir_stripe_count *
-                          sizeof(slave_locks->handles[0]);
+       slave_locks_size = offsetof(typeof(*slave_locks),
+                                   ha_handles[lo->ldo_dir_stripe_count]);
        /* Freed in lod_object_unlock */
        OBD_ALLOC(slave_locks, slave_locks_size);
-       if (slave_locks == NULL)
-               GOTO(out, rc = -ENOMEM);
-       slave_locks->count = lo->ldo_dir_stripe_count;
+       if (!slave_locks)
+               RETURN(-ENOMEM);
+       slave_locks->ha_count = lo->ldo_dir_stripe_count;
 
        /* striped directory lock */
-       for (i = 1; i < lo->ldo_dir_stripe_count; i++) {
-               struct lustre_handle    lockh;
-               struct ldlm_res_id      *res_id;
+       for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
+               struct lustre_handle lockh;
+               struct ldlm_res_id *res_id;
 
                res_id = &lod_env_info(env)->lti_res_id;
                fid_build_reg_res_name(lu_object_fid(&lo->ldo_stripe[i]->do_lu),
@@ -5300,23 +5270,20 @@ static int lod_object_lock(const struct lu_env *env,
                einfo->ei_res_id = res_id;
 
                LASSERT(lo->ldo_stripe[i] != NULL);
-               if (likely(dt_object_remote(lo->ldo_stripe[i]))) {
+               if (dt_object_remote(lo->ldo_stripe[i])) {
+                       set_bit(i, (void *)slave_locks->ha_map);
                        rc = dt_object_lock(env, lo->ldo_stripe[i], &lockh,
                                            einfo, policy);
                } else {
                        struct ldlm_namespace *ns = einfo->ei_namespace;
                        ldlm_blocking_callback blocking = einfo->ei_cb_local_bl;
                        ldlm_completion_callback completion = einfo->ei_cb_cp;
-                       __u64   dlmflags = LDLM_FL_ATOMIC_CB;
+                       __u64 dlmflags = LDLM_FL_ATOMIC_CB;
 
                        if (einfo->ei_mode == LCK_PW ||
                            einfo->ei_mode == LCK_EX)
                                dlmflags |= LDLM_FL_COS_INCOMPAT;
 
-                       /* This only happens if there are mulitple stripes
-                        * on the master MDT, i.e. except stripe0, there are
-                        * other stripes on the Master MDT as well, Only
-                        * happens in the test case right now. */
                        LASSERT(ns != NULL);
                        rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS,
                                                    policy, einfo->ei_mode,
@@ -5325,21 +5292,19 @@ static int lod_object_lock(const struct lu_env *env,
                                                    NULL, 0, LVB_T_NONE,
                                                    NULL, &lockh);
                }
-               if (rc != 0)
-                       break;
-               slave_locks->handles[i] = lockh;
+               if (rc) {
+                       while (i--)
+                               ldlm_lock_decref_and_cancel(
+                                               &slave_locks->ha_handles[i],
+                                               einfo->ei_mode);
+                       OBD_FREE(slave_locks, slave_locks_size);
+                       RETURN(rc);
+               }
+               slave_locks->ha_handles[i] = lockh;
        }
        einfo->ei_cbdata = slave_locks;
 
-       if (rc != 0 && slave_locks != NULL) {
-               lod_object_unlock_internal(env, dt, einfo, policy);
-               OBD_FREE(slave_locks, slave_locks_size);
-       }
-       EXIT;
-out:
-       if (rc != 0)
-               einfo->ei_cbdata = NULL;
-       RETURN(rc);
+       RETURN(0);
 }
 
 /**