Whamcloud - gitweb
LU-6997 mdt: enqueue nonblocking locks in migration 70/15970/4
authorwang di <di.wang@intel.com>
Wed, 12 Aug 2015 06:37:20 +0000 (23:37 -0700)
committerOleg Drokin <oleg.drokin@intel.com>
Mon, 24 Aug 2015 14:12:38 +0000 (14:12 +0000)
During migration, it needs to lock all of parent objects
in the linkea of the migrating object, so these parent
objects can change the name entry after the migration.

But it is not easy to order these objects(in linkEA) in
parent-child order. To avoid deadlock, let's use
mdt_object_lock_try(), which will enqueue nonblocking
locks, i.e. once there are conflicting locks, the
migration will release all of locks then return -EBUSY.

Signed-off-by: wang di <di.wang@intel.com>
Change-Id: Id657cfd9d9caefaa7c699d5a5ca90604cf707ac0
Reviewed-on: http://review.whamcloud.com/15970
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: James Simmons <uja.ornl@yahoo.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/include/lustre_dlm.h
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_reint.c
lustre/osp/osp_md_object.c

index 0ec28c3..7b1cb16 100644 (file)
@@ -1103,7 +1103,8 @@ struct ldlm_enqueue_info {
        void *ei_cb_gl;  /** lock glimpse callback */
        void *ei_cbdata; /** Data to be passed into callbacks. */
        void *ei_namespace; /** lock namespace **/
-       unsigned int ei_enq_slave:1; /* whether enqueue slave stripes */
+       unsigned int ei_enq_slave:1, /* whether enqueue slave stripes */
+                    ei_nonblock:1; /* non block enqueue */
 };
 
 #define ei_res_id      ei_cb_gl
index e1ffb3d..9c69c54 100644 (file)
@@ -2309,7 +2309,7 @@ int mdt_check_resent_lock(struct mdt_thread_info *info,
 int mdt_remote_object_lock(struct mdt_thread_info *mti,
                           struct mdt_object *o, const struct lu_fid *fid,
                           struct lustre_handle *lh, ldlm_mode_t mode,
-                          __u64 ibits)
+                          __u64 ibits, bool nonblock)
 {
        struct ldlm_enqueue_info *einfo = &mti->mti_einfo;
        ldlm_policy_data_t *policy = &mti->mti_policy;
@@ -2328,6 +2328,8 @@ int mdt_remote_object_lock(struct mdt_thread_info *mti,
        einfo->ei_cb_cp = ldlm_completion_ast;
        einfo->ei_enq_slave = 0;
        einfo->ei_res_id = res_id;
+       if (nonblock)
+               einfo->ei_nonblock = 1;
 
        memset(policy, 0, sizeof(*policy));
        policy->l_inodebits.bits = ibits;
@@ -2433,6 +2435,7 @@ mdt_object_lock_internal(struct mdt_thread_info *info, struct mdt_object *o,
                         struct mdt_lock_handle *lh, __u64 ibits,
                         bool nonblock)
 {
+       struct mdt_lock_handle *local_lh = NULL;
        int rc;
        ENTRY;
 
@@ -2442,6 +2445,18 @@ mdt_object_lock_internal(struct mdt_thread_info *info, struct mdt_object *o,
        /* XXX do not support PERM/LAYOUT/XATTR lock for remote object yet */
        ibits &= ~(MDS_INODELOCK_PERM | MDS_INODELOCK_LAYOUT |
                   MDS_INODELOCK_XATTR);
+
+       /* Only enqueue LOOKUP lock for remote object */
+       if (ibits & MDS_INODELOCK_LOOKUP) {
+               rc = mdt_object_local_lock(info, o, lh,
+                                          MDS_INODELOCK_LOOKUP,
+                                          nonblock);
+               if (rc != ELDLM_OK)
+                       RETURN(rc);
+
+               local_lh = lh;
+       }
+
        if (ibits & MDS_INODELOCK_UPDATE) {
                /* Sigh, PDO needs to enqueue 2 locks right now, but
                 * enqueue RPC can only request 1 lock, to avoid extra
@@ -2459,18 +2474,12 @@ mdt_object_lock_internal(struct mdt_thread_info *info, struct mdt_object *o,
                rc = mdt_remote_object_lock(info, o, mdt_object_fid(o),
                                            &lh->mlh_rreg_lh,
                                            lh->mlh_rreg_mode,
-                                           MDS_INODELOCK_UPDATE);
-               if (rc != ELDLM_OK)
-                       RETURN(rc);
-       }
-
-       /* Only enqueue LOOKUP lock for remote object */
-       if (ibits & MDS_INODELOCK_LOOKUP) {
-               rc = mdt_object_local_lock(info, o, lh,
-                                          MDS_INODELOCK_LOOKUP,
-                                          nonblock);
-               if (rc != ELDLM_OK)
+                                           MDS_INODELOCK_UPDATE, nonblock);
+               if (rc != ELDLM_OK) {
+                       if (local_lh != NULL)
+                               mdt_object_unlock(info, o, local_lh, rc);
                        RETURN(rc);
+               }
        }
 
        RETURN(0);
index 0ebcac2..824021e 100644 (file)
@@ -613,7 +613,7 @@ void mdt_client_compatibility(struct mdt_thread_info *info);
 int mdt_remote_object_lock(struct mdt_thread_info *mti,
                           struct mdt_object *o, const struct lu_fid *fid,
                           struct lustre_handle *lh,
-                          ldlm_mode_t mode, __u64 ibits);
+                          ldlm_mode_t mode, __u64 ibits, bool nonblock);
 
 enum mdt_name_flags {
        MNF_FIX_ANON = 1,
index 67e6a33..fb25134 100644 (file)
@@ -912,7 +912,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
                rc = mdt_remote_object_lock(info, mp, mdt_object_fid(mc),
                                            &child_lh->mlh_rreg_lh,
                                            child_lh->mlh_rreg_mode,
-                                           MDS_INODELOCK_LOOKUP);
+                                           MDS_INODELOCK_LOOKUP, false);
                if (rc != ELDLM_OK)
                        GOTO(put_child, rc);
 
@@ -1152,7 +1152,7 @@ static int mdt_rename_lock(struct mdt_thread_info *info,
                rc = mdt_remote_object_lock(info, obj,
                                            &LUSTRE_BFL_FID, lh,
                                            LCK_EX,
-                                           MDS_INODELOCK_UPDATE);
+                                           MDS_INODELOCK_UPDATE, false);
                mdt_object_put(info->mti_env, obj);
        } else {
                struct ldlm_namespace   *ns = info->mti_mdt->mdt_namespace;
@@ -1309,15 +1309,20 @@ static int mdt_lock_objects_in_linkea(struct mdt_thread_info *info,
                        GOTO(out, rc = -ENOMEM);
                }
 
+               /* Since this needs to lock all of objects in linkea, to avoid
+                * deadlocks, because it does not follow parent-child order as
+                * other MDT operation, let's use try_lock here, i.e. it will
+                * return immediately once there are conflict locks, and return
+                * EBUSY to client */
                mdt_lock_pdo_init(&mll->mll_lh, LCK_PW, &name);
-               rc = mdt_object_lock(info, mdt_pobj, &mll->mll_lh,
-                                    MDS_INODELOCK_UPDATE);
-               if (rc != 0) {
-                       CERROR("%s: cannot lock "DFID": rc =%d\n",
+               rc = mdt_object_lock_try(info, mdt_pobj, &mll->mll_lh,
+                                        MDS_INODELOCK_UPDATE);
+               if (rc == 0) {
+                       CDEBUG(D_ERROR, "%s: cannot lock "DFID": rc =%d\n",
                               mdt_obd_name(mdt), PFID(&fid), rc);
                        mdt_object_put(info->mti_env, mdt_pobj);
                        OBD_FREE_PTR(mll);
-                       GOTO(out, rc);
+                       GOTO(out, rc = -EBUSY);
                }
 
                INIT_LIST_HEAD(&mll->mll_list);
@@ -1475,7 +1480,7 @@ out_lease:
                rc = mdt_remote_object_lock(info, msrcdir, mdt_object_fid(mold),
                                            &lh_childp->mlh_rreg_lh,
                                            lh_childp->mlh_rreg_mode,
-                                           MDS_INODELOCK_LOOKUP);
+                                           MDS_INODELOCK_LOOKUP, false);
                if (rc != ELDLM_OK)
                        GOTO(out_unlock_list, rc);
 
@@ -1527,7 +1532,7 @@ out_lease:
                                            mdt_object_fid(mnew),
                                            &lh_tgtp->mlh_rreg_lh,
                                            lh_tgtp->mlh_rreg_mode,
-                                           MDS_INODELOCK_UPDATE);
+                                           MDS_INODELOCK_UPDATE, false);
                if (rc != 0) {
                        lh_tgtp = NULL;
                        GOTO(out_put_new, rc);
@@ -1840,7 +1845,8 @@ static int mdt_reint_rename_internal(struct mdt_thread_info *info,
                                                    mdt_object_fid(mold),
                                                    &lh_oldp->mlh_rreg_lh,
                                                    lh_oldp->mlh_rreg_mode,
-                                                   MDS_INODELOCK_LOOKUP);
+                                                   MDS_INODELOCK_LOOKUP,
+                                                   false);
                        if (rc != ELDLM_OK)
                                GOTO(out_put_new, rc);
 
@@ -1885,7 +1891,8 @@ static int mdt_reint_rename_internal(struct mdt_thread_info *info,
                                                    mdt_object_fid(mold),
                                                    &lh_oldp->mlh_rreg_lh,
                                                    lh_oldp->mlh_rreg_mode,
-                                                   MDS_INODELOCK_LOOKUP);
+                                                   MDS_INODELOCK_LOOKUP,
+                                                   false);
                        if (rc != ELDLM_OK)
                                GOTO(out_put_new, rc);
 
index af49456..65879d5 100644 (file)
@@ -898,6 +898,9 @@ static int osp_md_object_lock(const struct lu_env *env,
        if (mode > 0)
                return ELDLM_OK;
 
+       if (einfo->ei_nonblock)
+               flags |= LDLM_FL_BLOCK_NOWAIT;
+
        req = ldlm_enqueue_pack(osp->opd_exp, 0);
        if (IS_ERR(req))
                RETURN(PTR_ERR(req));