Whamcloud - gitweb
LU-146 mds_open() may deadlock
authorLai Siyao <laisiyao@whamcloud.com>
Wed, 17 Aug 2011 08:31:42 +0000 (01:31 -0700)
committerJohann Lombardi <johann@whamcloud.com>
Mon, 3 Oct 2011 12:37:32 +0000 (08:37 -0400)
* Get open lock inside mds_get_parent_child_locked() to avoid
  deadlock.
* Never get open lock if child is newly created to avoid deadlock.

Signed-off-by: Lai Siyao <laisiyao@whamcloud.com>
Change-Id: I1a1dadafef364f36ad3b892ba7a2d669f6027f10
Reviewed-on: http://review.whamcloud.com/1259
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
Reviewed-by: Johann Lombardi <johann@whamcloud.com>
lustre/mds/handler.c
lustre/mds/mds_internal.h
lustre/mds/mds_open.c
lustre/mds/mds_reint.c

index 6225881..1288132 100644 (file)
@@ -950,7 +950,8 @@ static int mds_getattr_lock(struct ptlrpc_request *req, int offset,
                                                          MDS_INODELOCK_UPDATE,
                                                          name, namesize,
                                                          child_lockh, &dchild,
-                                                         LCK_CR, child_part);
+                                                         LCK_CR, child_part,
+                                                         IT_GETATTR, 0);
                 } else {
                         /* For revalidate by fid we always take UPDATE lock */
                         dchild = mds_fid2locked_dentry(obd, &body->fid2, NULL,
index 7c8979b..5d3c336 100644 (file)
@@ -161,7 +161,8 @@ int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds,
                                 char *name, int namelen,
                                 struct lustre_handle *child_lockh,
                                 struct dentry **dchildp, int child_mode,
-                                __u64 child_lockpart);
+                                __u64 child_lockpart,
+                                int it_op, __u32 flags);
 int mds_lock_new_child(struct obd_device *obd, struct inode *inode,
                        struct lustre_handle *child_lockh);
 int mds_osc_setattr_async(struct obd_device *obd, struct inode *inode,
index 8ff4860..b05a9d3 100644 (file)
@@ -178,7 +178,6 @@ static int mds_get_write_access(struct mds_obd *mds, struct inode *inode,
                 RETURN(-ETXTBSY);
         }
 
-
         if (MDS_FILTERDATA(inode) && MDS_FILTERDATA(inode)->io_epoch != 0) {
                 CDEBUG(D_INODE, "continuing MDS epoch "LPU64" for ino %lu/%u\n",
                        MDS_FILTERDATA(inode)->io_epoch, inode->i_ino,
@@ -997,27 +996,6 @@ int mds_lock_new_child(struct obd_device *obd, struct inode *inode,
         RETURN(rc);
 }
 
-static int noinline mds_get_open_lock(struct obd_device *obd,
-                                      struct dentry *dchild,
-                                      int child_mode,
-                                      struct lustre_handle *child_lockh)
-{
-        ldlm_policy_data_t policy = {
-                .l_inodebits = { MDS_INODELOCK_LOOKUP | MDS_INODELOCK_OPEN }
-        };
-        struct ldlm_res_id child_res_id = {
-                .name[0] = dchild->d_inode->i_ino,
-                .name[1] = dchild->d_inode->i_generation,
-        };
-        int lock_flags = 0;
-
-        return ldlm_cli_enqueue_local(obd->obd_namespace, &child_res_id,
-                                      LDLM_IBITS, &policy, child_mode,
-                                      &lock_flags, ldlm_blocking_ast,
-                                      ldlm_completion_ast, NULL, NULL,
-                                      0, NULL, child_lockh);
-}
-
 int mds_open(struct mds_update_record *rec, int offset,
              struct ptlrpc_request *req, struct lustre_handle *child_lockh)
 {
@@ -1041,7 +1019,7 @@ int mds_open(struct mds_update_record *rec, int offset,
         /* Always returning LOOKUP lock if open succesful to guard
            dentry on client. */
         int quota_pending[2] = {0, 0};
-        int use_parent, need_open_lock;
+        int use_parent;
         unsigned int gid = current_fsgid();
         ENTRY;
 
@@ -1143,9 +1121,6 @@ int mds_open(struct mds_update_record *rec, int offset,
                      (rec->ur_flags & MDS_OPEN_LOCK) && (rec->ur_namelen == 1)) ||
                      (rec->ur_flags & MDS_OPEN_JOIN_FILE);
 
-        need_open_lock = !(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) &&
-                           (rec->ur_flags & MDS_OPEN_LOCK);
-
         /* Try to lock both parent and child first. If child is not found,
          * return only locked parent.  This is enough to prevent other
          * threads from changing this directory until creation is finished. */
@@ -1156,11 +1131,11 @@ int mds_open(struct mds_update_record *rec, int offset,
                                          MDS_INODELOCK_UPDATE,
                                          use_parent ? NULL : rec->ur_name,
                                          rec->ur_namelen,
-                                         (rec->ur_flags & MDS_OPEN_LOCK) ?
-                                                child_lockh : NULL,
+                                         child_lockh,
                                          &dchild, child_mode,
                                          MDS_INODELOCK_LOOKUP |
-                                         MDS_INODELOCK_OPEN);
+                                         MDS_INODELOCK_OPEN,
+                                         IT_OPEN, rec->ur_flags);
 
         if (rc) {
                 if (rc != -ENOENT) {
@@ -1308,13 +1283,12 @@ int mds_open(struct mds_update_record *rec, int offset,
         } else {
                 acc_mode = accmode(dchild->d_inode, rec->ur_flags);
                 /* Child previously existed so the lookup and lock is already
-                 * done, so no further locking is needed. */
+                 * done. */
                 /* for nfs and join - we need two locks for same fid, but
                  * with different mode */
-                if (need_open_lock && !use_parent)  {
+                if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) &&
+                    (rec->ur_flags & MDS_OPEN_LOCK) && !use_parent)
                         ldlm_reply_set_disposition(rep, DISP_OPEN_LOCK);
-                        need_open_lock = 0;
-                }
         }
 
         LASSERTF(!mds_inode_is_orphan(dchild->d_inode),
@@ -1376,28 +1350,6 @@ found_child:
                 GOTO(cleanup, rc = -EAGAIN);
         }
 
-        if (need_open_lock) {
-                rc = mds_get_open_lock(obd, dchild, child_mode, child_lockh);
-                if (rc != ELDLM_OK)
-                        GOTO(cleanup, rc);
-                /* Let mds_intent_policy know that we have a lock to return */
-                ldlm_reply_set_disposition(rep, DISP_OPEN_LOCK);
-        } else if (!(rec->ur_flags & MDS_OPEN_LOCK)    &&
-                   S_ISREG(dchild->d_inode->i_mode)    &&
-                   (dchild->d_inode->i_mode & S_IXUGO) &&
-                   (rec->ur_flags & (FMODE_WRITE | MDS_FMODE_EXEC))) {
-                /* if this is an executable, and a non-nfsd client open write or
-                 * execute it, revoke open lock in case other client holds a
-                 * open lock which denies writing/executing in mds_finish_open()
-                 * below. LU-146
-                 */
-                rc = mds_get_open_lock(obd, dchild, child_mode, child_lockh);
-                if (rc != ELDLM_OK)
-                        GOTO(cleanup, rc);
-                ldlm_lock_decref(child_lockh, child_mode);
-                memset(child_lockh, 0, sizeof(*child_lockh));
-        }
-
         if (!S_ISREG(dchild->d_inode->i_mode) &&
             !S_ISDIR(dchild->d_inode->i_mode) &&
             (req->rq_export->exp_connect_flags & OBD_CONNECT_NODEVOH)) {
index 2568220..3558c2d 100644 (file)
@@ -1595,7 +1595,7 @@ int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds,
                                 char *name, int namelen,
                                 struct lustre_handle *child_lockh,
                                 struct dentry **dchildp, int child_mode,
-                                __u64 child_lockpart)
+                                __u64 child_lockpart, int it_op, __u32 flags)
 {
         struct ldlm_res_id child_res_id = { .name = {0} };
         struct ldlm_res_id parent_res_id = { .name = {0} };
@@ -1624,6 +1624,7 @@ int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds,
 
         if (name == NULL)
                 GOTO(retry_locks, rc);
+
         /* Step 2: Lookup child (without DLM lock, to get resource name) */
         *dchildp = mds_lookup(obd, name, *dparentp, namelen - 1);
         if (IS_ERR(*dchildp)) {
@@ -1659,6 +1660,20 @@ int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds,
         if ((child_mode & (LCK_CR|LCK_PR|LCK_CW)) && INODE_CTIME_OLD(inode))
                 child_policy.l_inodebits.bits |= MDS_INODELOCK_UPDATE;
 
+        if (it_op == IT_OPEN && !(flags & MDS_OPEN_LOCK)) {
+                /*
+                 * LU-146
+                 * if this is an executable, and a non-nfsd client open write or
+                 * execute it, revoke open lock in case other client holds an
+                 * open lock which denies write/execute in mds_finish_open().
+                 */
+                LASSERT(child_lockh != NULL);
+                if (!(S_ISREG(inode->i_mode) &&
+                      (inode->i_mode & S_IXUGO) &&
+                      (flags & (FMODE_WRITE | MDS_FMODE_EXEC))))
+                        child_lockh = NULL;
+        }
+
         iput(inode);
 
 retry_locks:
@@ -1688,6 +1703,17 @@ retry_locks:
                 GOTO(cleanup, rc);
         }
 
+        if (it_op == IT_OPEN && !(flags & MDS_OPEN_LOCK) && child_lockh &&
+            (*dchildp)->d_inode != NULL) {
+                /*
+                 * LU-146
+                 * See above, revoke open lock only, no need to reply child
+                 * lock back to client.
+                 */
+                ldlm_lock_decref(child_lockh, child_mode);
+                memset(child_lockh, 0, sizeof(*child_lockh));
+        }
+
 cleanup:
         if (rc) {
                 switch (cleanup_phase) {
@@ -1926,7 +1952,8 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
                                          MDS_INODELOCK_UPDATE, 
                                          rec->ur_name, rec->ur_namelen,
                                          &child_lockh, &dchild, LCK_EX, 
-                                         MDS_INODELOCK_FULL);
+                                         MDS_INODELOCK_FULL,
+                                         IT_UNLINK, 0);
         if (rc)
                 GOTO(cleanup, rc);