Whamcloud - gitweb
b=1844
authorphil <phil>
Wed, 3 Dec 2003 05:31:34 +0000 (05:31 +0000)
committerphil <phil>
Wed, 3 Dec 2003 05:31:34 +0000 (05:31 +0000)
Andreas's patch to fix MDS lock inversions in getattr/reint paths.
I'm giving it one more day to bake on ALC before I commit to the 1.0.x
branch.

lustre/ChangeLog
lustre/include/linux/lustre_mds.h
lustre/include/linux/lvfs.h
lustre/ldlm/ldlm_resource.c
lustre/mds/handler.c
lustre/mds/mds_internal.h
lustre/mds/mds_open.c
lustre/mds/mds_reint.c
lustre/tests/createmany-mpi.c
lustre/tests/createmany.c
lustre/tests/sanityN.sh

index 872df60..0126cbe 100644 (file)
@@ -15,6 +15,7 @@ tbd         Cluster File Systems, Inc. <info@clusterfs.com>
        - __init/__exit are not for prototype decls (ldlm_init/exit)
        - x86-64 compile warning fixes
        - fix gateway LMC keyword conflict (2318)
+       - fix MDS lock inversions in getattr/reint paths (1844)
        * miscellania
        - allow configurable automake binary, for testing new versions
        - small update to the lfs documentation
index d16eca9..fa3bafc 100644 (file)
@@ -174,20 +174,12 @@ int mds_reint_rec(struct mds_update_record *r, int offset,
 
 /* mds/handler.c */
 #ifdef __KERNEL__
-struct dentry *mds_name2locked_dentry(struct obd_device *, struct dentry *dir,
-                                      struct vfsmount **mnt, char *name,
-                                      int namelen, int lock_mode,
-                                      struct lustre_handle *lockh,
-                                      int dir_lock_mode);
 struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid,
                                      struct vfsmount **mnt, int lock_mode,
                                      struct lustre_handle *lockh,
                                      char *name, int namelen);
 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
                               struct vfsmount **mnt);
-int mds_reint(struct ptlrpc_request *req, int offset, struct lustre_handle *);
-void mds_steal_ack_locks(struct obd_export *exp,
-                         struct ptlrpc_request *req);
 int mds_update_server_data(struct obd_device *, int force_sync);
 
 /* mds/mds_fs.c */
index 6e77ee1..6f48bcc 100644 (file)
@@ -75,7 +75,7 @@ static inline void l_dput(struct dentry *de)
  * systems, and instantiating two inodes for the same entry.  We still
  * protect against concurrent addition/removal races with the DLM locking.
  */
-static inline struct dentry *ll_lookup_one_len(char *fid_name,
+static inline struct dentry *ll_lookup_one_len(const char *fid_name,
                                                struct dentry *dparent,
                                                int fid_namelen)
 {
index 59f0478..5305087 100644 (file)
@@ -497,6 +497,7 @@ ldlm_resource_get(struct ldlm_namespace *ns, struct ldlm_resource *parent,
 
         LASSERT(ns != NULL);
         LASSERT(ns->ns_hash != NULL);
+        LASSERT(name.name[0] != 0);
 
         l_lock(&ns->ns_lock);
         bucket = ns->ns_hash + ldlm_hash_fn(parent, name);
index e6cb437..0f0a608 100644 (file)
@@ -199,7 +199,7 @@ struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
 
         snprintf(fid_name, sizeof(fid_name), "0x%lx", ino);
 
-        CDEBUG(D_DENTRY, "--> mds_fid2dentry: ino %lu, gen %u, sb %p\n",
+        CDEBUG(D_DENTRY, "--> mds_fid2dentry: ino/gen %lu/%u, sb %p\n",
                ino, generation, mds->mds_sb);
 
         /* under ext3 this is neither supposed to return bad inodes
@@ -603,13 +603,11 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req,
         struct ldlm_reply *rep = NULL;
         struct obd_run_ctxt saved;
         struct mds_body *body;
-        struct dentry *de = NULL, *dchild = NULL;
-        struct inode *dir;
+        struct dentry *dparent = NULL, *dchild = NULL;
         struct obd_ucred uc;
-        struct ldlm_res_id child_res_id = { .name = {0} };
         struct lustre_handle parent_lockh;
         int namesize;
-        int flags = 0, rc = 0, cleanup_phase = 0;
+        int rc = 0, cleanup_phase = 0;
         char *name;
         ENTRY;
 
@@ -645,20 +643,10 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req,
         uc.ouc_suppgid1 = body->suppgid;
         uc.ouc_suppgid2 = -1;
         push_ctxt(&saved, &obd->obd_ctxt, &uc);
-        /* Step 1: Lookup/lock parent */
+        cleanup_phase = 1; /* kernel context */
         intent_set_disposition(rep, DISP_LOOKUP_EXECD);
-        de = mds_fid2locked_dentry(obd, &body->fid1, NULL, LCK_PR,
-                                   &parent_lockh, name, namesize - 1);
-        if (IS_ERR(de))
-                GOTO(cleanup, rc = PTR_ERR(de));
-        dir = de->d_inode;
-        LASSERT(dir);
-
-        cleanup_phase = 1; /* parent dentry and lock */
 
-        CDEBUG(D_INODE, "parent ino %lu, name %s\n", dir->i_ino, name);
-
-        /* Step 2: Lookup child */
+        /* FIXME: handle raw lookup */
 #if 0
         if (body->valid == OBD_MD_FLID) {
                 struct mds_body *mds_reply;
@@ -681,12 +669,14 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req,
         }
 #endif
 
-        dchild = ll_lookup_one_len(name, de, namesize - 1);
-        if (IS_ERR(dchild)) {
-                CDEBUG(D_INODE, "child lookup error %ld\n", PTR_ERR(dchild));
-                GOTO(cleanup, rc = PTR_ERR(dchild));
-        }
-        cleanup_phase = 2; /* child dentry */
+        rc = mds_get_parent_child_locked(obd, &obd->u.mds, &body->fid1,
+                                         &parent_lockh, &dparent, LCK_PR,
+                                         name, namesize, child_lockh,
+                                         &dchild, LCK_PR);
+        if (rc)
+                GOTO(cleanup, rc);
+
+        cleanup_phase = 2; /* dchild, dparent, locks */
 
         if (dchild->d_inode == NULL) {
                 intent_set_disposition(rep, DISP_LOOKUP_NEG);
@@ -697,25 +687,6 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req,
                 intent_set_disposition(rep, DISP_LOOKUP_POS);
         }
 
-        /* Step 3: Lock child */
-        /* fixup_handle_for_resent_req might have set the child_lockh for us, if
-         * the lock was already granted for this request on the last
-         * transmission. */
-        if (child_lockh->cookie == 0) {
-                child_res_id.name[0] = dchild->d_inode->i_ino;
-                child_res_id.name[1] = dchild->d_inode->i_generation;
-                rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
-                                      child_res_id, LDLM_PLAIN, NULL, 0, LCK_PR,
-                                      &flags, ldlm_completion_ast,
-                                      mds_blocking_ast, NULL, child_lockh);
-                if (rc != ELDLM_OK) {
-                        CERROR("ldlm_cli_enqueue: %d\n", rc);
-                        GOTO(cleanup, rc = -EIO);
-                }
-        }
-
-        cleanup_phase = 3; /* child lock */
-
         if (req->rq_repmsg == NULL) {
                 rc = mds_getattr_pack_msg(req, dchild->d_inode, offset);
                 if (rc != 0) {
@@ -729,22 +700,16 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req,
 
  cleanup:
         switch (cleanup_phase) {
-        case 3:
-                if (rc)
-                        ldlm_lock_decref(child_lockh, LCK_PR);
         case 2:
+                if (rc && dchild->d_inode)
+                        ldlm_lock_decref(child_lockh, LCK_PR);
+                ldlm_lock_decref(&parent_lockh, LCK_PR);
                 l_dput(dchild);
-
+                l_dput(dparent);
         case 1:
-                if (rc) {
-                        ldlm_lock_decref(&parent_lockh, LCK_PR);
-                } else {
-                        ldlm_put_lock_into_req(req, &parent_lockh, LCK_PR);
-                }
-                l_dput(de);
+                pop_ctxt(&saved, &obd->obd_ctxt, &uc);
         default: ;
         }
-        pop_ctxt(&saved, &obd->obd_ctxt, &uc);
         return rc;
 }
 
@@ -1003,24 +968,6 @@ static char *reint_names[] = {
         [REINT_OPEN]    "open",
 };
 
-void mds_steal_ack_locks(struct obd_export *exp,
-                         struct ptlrpc_request *req)
-{
-        unsigned long  flags;
-        struct ptlrpc_request *oldrep = exp->exp_outstanding_reply;
-        
-        if (oldrep == NULL)
-                return;
-        memcpy(req->rq_ack_locks, oldrep->rq_ack_locks,
-               sizeof req->rq_ack_locks);
-        spin_lock_irqsave (&req->rq_lock, flags);
-        oldrep->rq_resent = 1;
-        wake_up(&oldrep->rq_reply_waitq);
-        spin_unlock_irqrestore (&req->rq_lock, flags);
-        DEBUG_REQ(D_HA, oldrep, "stole locks from");
-        DEBUG_REQ(D_HA, req, "stole locks for");
-}
-
 int mds_handle(struct ptlrpc_request *req)
 {
         int should_process;
index 5dcd667..48d739c 100644 (file)
@@ -19,11 +19,25 @@ static inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req)
 }
 
 /* mds/mds_reint.c */
+int res_gt(struct ldlm_res_id *res1, struct ldlm_res_id *res2);
+int enqueue_ordered_locks(struct obd_device *obd, struct ldlm_res_id *p1_res_id,
+                          struct lustre_handle *p1_lockh, int p1_lock_mode,
+                          struct ldlm_res_id *p2_res_id,
+                          struct lustre_handle *p2_lockh, int p2_lock_mode);
 void mds_commit_cb(struct obd_device *, __u64 last_rcvd, void *data, int error);
 int mds_finish_transno(struct mds_obd *mds, struct inode *inode, void *handle,
                        struct ptlrpc_request *req, int rc, __u32 op_data);
 void mds_reconstruct_generic(struct ptlrpc_request *req);
 void mds_req_from_mcd(struct ptlrpc_request *req, struct mds_client_data *mcd);
+int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds,
+                                struct ll_fid *fid,
+                                struct lustre_handle *parent_lockh,
+                                struct dentry **dparentp, int parent_mode,
+                                char *name, int namelen,
+                                struct lustre_handle *child_lockh,
+                                struct dentry **dchildp, int child_mode);
+int mds_lock_new_child(struct obd_device *obd, struct inode *inode,
+                       struct lustre_handle *child_lockh);
 
 /* mds/mds_lib.c */
 int mds_update_unpack(struct ptlrpc_request *, int offset,
index 43fa20c..89f2edb 100644 (file)
@@ -720,6 +720,32 @@ int mds_pin(struct ptlrpc_request *req)
         RETURN(rc);
 }
 
+/*  Get a lock on the ino to sync with creation WRT inode reuse (bug 2029).
+ *  If child_lockh is NULL we just get the lock as a barrier to wait for
+ *  other holders of this lock, and drop it right away again. */
+int mds_lock_new_child(struct obd_device *obd, struct inode *inode,
+                       struct lustre_handle *child_lockh)
+{
+        struct ldlm_res_id child_res_id = { .name = { inode->i_ino } };
+        struct lustre_handle lockh;
+        int lock_flags = 0;
+        int rc;
+
+        if (child_lockh == NULL)
+                child_lockh = &lockh;
+
+        rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
+                              child_res_id, LDLM_PLAIN, NULL, 0,
+                              LCK_EX, &lock_flags, ldlm_completion_ast,
+                              mds_blocking_ast, NULL, child_lockh);
+        if (rc != ELDLM_OK)
+                CERROR("ldlm_cli_enqueue: %d\n", rc);
+        else if (child_lockh == &lockh)
+                ldlm_lock_decref(child_lockh, LCK_EX);
+
+        return rc;
+}
+
 int mds_open(struct mds_update_record *rec, int offset,
              struct ptlrpc_request *req, struct lustre_handle *child_lockh)
 {
@@ -728,11 +754,11 @@ int mds_open(struct mds_update_record *rec, int offset,
         struct mds_obd *mds = mds_req2mds(req);
         struct ldlm_reply *rep = NULL;
         struct mds_body *body = NULL;
-        struct dentry *dchild = NULL, *parent = NULL;
+        struct dentry *dchild = NULL, *dparent = NULL;
         struct mds_export_data *med;
-        struct ldlm_res_id child_res_id = { .name = {0} };
         struct lustre_handle parent_lockh;
-        int rc = 0, parent_mode = 0, cleanup_phase = 0, acc_mode, created = 0;
+        int rc = 0, cleanup_phase = 0, acc_mode, created = 0;
+        int parent_mode = LCK_PR;
         void *handle = NULL;
         struct dentry_params dp;
         ENTRY;
@@ -772,21 +798,22 @@ int mds_open(struct mds_update_record *rec, int offset,
         acc_mode = accmode(rec->ur_flags);
 
         /* Step 1: Find and lock the parent */
-        parent_mode = (rec->ur_flags & MDS_OPEN_CREAT) ? LCK_PW : LCK_PR;
-        parent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, parent_mode,
-                                       &parent_lockh, rec->ur_name,
-                                       rec->ur_namelen - 1);
-        if (IS_ERR(parent)) {
-                rc = PTR_ERR(parent);
+        if (rec->ur_flags & O_CREAT)
+                parent_mode = LCK_PW;
+        dparent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, parent_mode,
+                                        &parent_lockh, rec->ur_name,
+                                        rec->ur_namelen - 1);
+        if (IS_ERR(dparent)) {
+                rc = PTR_ERR(dparent);
                 CERROR("parent lookup error %d\n", rc);
                 GOTO(cleanup, rc);
         }
-        LASSERT(parent->d_inode != NULL);
+        LASSERT(dparent->d_inode != NULL);
 
         cleanup_phase = 1; /* parent dentry and lock */
 
         /* Step 2: Lookup the child */
-        dchild = ll_lookup_one_len(rec->ur_name, parent, rec->ur_namelen - 1);
+        dchild = ll_lookup_one_len(rec->ur_name, dparent, rec->ur_namelen - 1);
         if (IS_ERR(dchild))
                 GOTO(cleanup, rc = PTR_ERR(dchild));
 
@@ -810,7 +837,7 @@ int mds_open(struct mds_update_record *rec, int offset,
                 }
 
                 intent_set_disposition(rep, DISP_OPEN_CREATE);
-                handle = fsfilt_start(obd, parent->d_inode, FSFILT_OP_CREATE,
+                handle = fsfilt_start(obd, dparent->d_inode, FSFILT_OP_CREATE,
                                       NULL);
                 if (IS_ERR(handle)) {
                         rc = PTR_ERR(handle);
@@ -821,7 +848,7 @@ int mds_open(struct mds_update_record *rec, int offset,
                 dp.p_ptr = req;
                 dp.p_inum = ino;
 
-                rc = ll_vfs_create(parent->d_inode, dchild, rec->ur_mode, NULL);
+                rc = ll_vfs_create(dparent->d_inode, dchild, rec->ur_mode,NULL);
                 if (dchild->d_fsdata == (void *)(unsigned long)ino)
                         dchild->d_fsdata = NULL;
 
@@ -844,8 +871,8 @@ int mds_open(struct mds_update_record *rec, int offset,
                 LTIME_S(iattr.ia_mtime) = rec->ur_time;
 
                 iattr.ia_uid = rec->ur_fsuid;
-                if (parent->d_inode->i_mode & S_ISGID)
-                        iattr.ia_gid = parent->d_inode->i_gid;
+                if (dparent->d_inode->i_mode & S_ISGID)
+                        iattr.ia_gid = dparent->d_inode->i_gid;
                 else
                         iattr.ia_gid = rec->ur_fsgid;
 
@@ -858,7 +885,7 @@ int mds_open(struct mds_update_record *rec, int offset,
 
                 iattr.ia_valid = ATTR_MTIME | ATTR_CTIME;
 
-                rc = fsfilt_setattr(obd, parent, handle, &iattr, 0);
+                rc = fsfilt_setattr(obd, dparent, handle, &iattr, 0);
                 if (rc)
                         CERROR("error on parent setattr: rc = %d\n", rc);
 
@@ -912,25 +939,13 @@ int mds_open(struct mds_update_record *rec, int offset,
         rc = mds_finish_transno(mds, dchild ? dchild->d_inode : NULL, handle,
                                 req, rc, rep ? rep->lock_policy_res1 : 0);
         /* XXX what do we do here if mds_finish_transno itself failed? */
-        if (created) {
-                /* Step 4: Lock new child to sync inode reuse (bug 2029) */
-                int lock_flags = 0, err;
-                child_res_id.name[0] = dchild->d_inode->i_ino;
-                child_res_id.name[1] = 0;
-                err = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
-                                       child_res_id, LDLM_PLAIN, NULL, 0,
-                                       LCK_EX, &lock_flags, ldlm_completion_ast,
-                                       mds_blocking_ast, NULL, child_lockh);
-                if (err == ELDLM_OK)
-                        ldlm_lock_decref(child_lockh, LCK_EX);
-                else
-                        CERROR("ldlm_cli_enqueue: %d\n", err);
-        }
+        if (created)
+                mds_lock_new_child(obd, dchild->d_inode, NULL);
 
         switch (cleanup_phase) {
         case 2:
                 if (rc && created) {
-                        int err = vfs_unlink(parent->d_inode, dchild);
+                        int err = vfs_unlink(dparent->d_inode, dchild);
                         if (err) {
                                 CERROR("unlink(%*s) in error path: %d\n",
                                        dchild->d_name.len, dchild->d_name.name,
@@ -939,10 +954,10 @@ int mds_open(struct mds_update_record *rec, int offset,
                 }
                 l_dput(dchild);
         case 1:
-                if (parent == NULL)
+                if (dparent == NULL)
                         break;
 
-                l_dput(parent);
+                l_dput(dparent);
                 if (rc)
                         ldlm_lock_decref(&parent_lockh, parent_mode);
                 else
index 392edab..31891a1 100644 (file)
@@ -275,6 +275,23 @@ int mds_fix_attr(struct inode *inode, struct mds_update_record *rec)
         RETURN(0);
 }
 
+void mds_steal_ack_locks(struct obd_export *exp, struct ptlrpc_request *req)
+{
+        unsigned long flags;
+        struct ptlrpc_request *oldrep = exp->exp_outstanding_reply;
+
+        if (oldrep == NULL)
+                return;
+        memcpy(req->rq_ack_locks, oldrep->rq_ack_locks,
+               sizeof req->rq_ack_locks);
+        spin_lock_irqsave(&req->rq_lock, flags);
+        oldrep->rq_resent = 1;
+        wake_up(&oldrep->rq_reply_waitq);
+        spin_unlock_irqrestore(&req->rq_lock, flags);
+        DEBUG_REQ(D_HA, oldrep, "stole locks from");
+        DEBUG_REQ(D_HA, req, "stole locks for");
+}
+
 void mds_req_from_mcd(struct ptlrpc_request *req, struct mds_client_data *mcd)
 {
         DEBUG_REQ(D_HA, req, "restoring transno "LPD64"/status %d",
@@ -296,9 +313,6 @@ static void reconstruct_reint_setattr(struct mds_update_record *rec,
 
         mds_req_from_mcd(req, med->med_mcd);
 
-        if (req->rq_export->exp_outstanding_reply)
-                mds_steal_ack_locks(req->rq_export, req);
-
         de = mds_fid2dentry(obd, rec->ur_fid1, NULL);
         if (IS_ERR(de)) {
                 LASSERT(PTR_ERR(de) == req->rq_status);
@@ -342,6 +356,9 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset,
 
         LASSERT(offset == 0);
 
+        DEBUG_REQ(D_INODE, req, "setattr "LPU64"/%u %x\n", rec->ur_fid1->id,
+                  rec->ur_fid1->generation, rec->ur_iattr.ia_valid);
+
         MDS_CHECK_RESENT(req, reconstruct_reint_setattr(rec, offset, req));
 
         if (rec->ur_iattr.ia_valid & ATTR_FROM_OPEN) {
@@ -362,7 +379,6 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset,
         if (S_ISREG(inode->i_mode) && rec->ur_eadata != NULL)
                 down(&inode->i_sem);
 
-        CDEBUG(D_INODE, "ino %lu\n", inode->i_ino);
         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_SETATTR_WRITE, inode->i_sb);
 
         handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
@@ -491,6 +507,10 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
         LASSERT(offset == 0);
         LASSERT(!strcmp(req->rq_export->exp_obd->obd_type->typ_name, "mds"));
 
+        DEBUG_REQ(D_INODE, req, "parent "LPU64"/%u name %s mode %o\n",
+                  rec->ur_fid1->id, rec->ur_fid1->generation,
+                  rec->ur_name, rec->ur_mode);
+
         MDS_CHECK_RESENT(req, reconstruct_reint_create(rec, offset, req));
 
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
@@ -506,8 +526,6 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
         cleanup_phase = 1; /* locked parent dentry */
         dir = dparent->d_inode;
         LASSERT(dir);
-        CDEBUG(D_INODE, "parent ino %lu creating name %s mode %o\n",
-               dir->i_ino, rec->ur_name, rec->ur_mode);
 
         ldlm_lock_dump_handle(D_OTHER, &lockh);
 
@@ -695,70 +713,349 @@ cleanup:
         return 0;
 }
 
+int res_gt(struct ldlm_res_id *res1, struct ldlm_res_id *res2)
+{
+        int i;
+
+        for (i = 0; i < RES_NAME_SIZE; i++) {
+                /* return 1 here, because enqueue_ordered will skip resources
+                 * of all zeroes if they're sorted to the end of the list. */
+                if (res1->name[i] == 0 && res2->name[i] != 0)
+                        return 1;
+                if (res2->name[i] == 0 && res1->name[i] != 0)
+                        return 0;
+
+                if (res1->name[i] > res2->name[i])
+                        return 1;
+                if (res1->name[i] < res2->name[i])
+                        return 0;
+        }
+        return 0;
+}
+
 /* This function doesn't use ldlm_match_or_enqueue because we're always called
  * with EX or PW locks, and the MDS is no longer allowed to match write locks,
  * because they take the place of local semaphores.
  *
- * Two locks are taken in numerical order */
-int enqueue_ordered_locks(int lock_mode, struct obd_device *obd,
-                          struct ldlm_res_id *p1_res_id,
+ * One or two locks are taken in numerical order.  A res_id->name[0] of 0 means
+ * no lock is taken for that res_id.  Must be at least one non-zero res_id. */
+int enqueue_ordered_locks(struct obd_device *obd, struct ldlm_res_id *p1_res_id,
+                          struct lustre_handle *p1_lockh, int p1_lock_mode,
                           struct ldlm_res_id *p2_res_id,
-                          struct lustre_handle *p1_lockh,
-                          struct lustre_handle *p2_lockh)
+                          struct lustre_handle *p2_lockh, int p2_lock_mode)
 {
-        struct ldlm_res_id res_id[2];
-        struct lustre_handle *handles[2] = {p1_lockh, p2_lockh};
+        struct ldlm_res_id *res_id[2] = { p1_res_id, p2_res_id };
+        struct lustre_handle *handles[2] = { p1_lockh, p2_lockh };
+        int lock_modes[2] = { p1_lock_mode, p2_lock_mode };
         int rc, flags;
         ENTRY;
 
         LASSERT(p1_res_id != NULL && p2_res_id != NULL);
 
         CDEBUG(D_INFO, "locks before: "LPU64"/"LPU64"\n",
-               p1_res_id[0].name[0], p2_res_id[0].name[0]);
+               res_id[0]->name[0], res_id[1]->name[0]);
 
-        if (p1_res_id->name[0] < p2_res_id->name[0]) {
-                handles[0] = p1_lockh;
-                handles[1] = p2_lockh;
-                res_id[0] = *p1_res_id;
-                res_id[1] = *p2_res_id;
-        } else {
+        if (res_gt(p1_res_id, p2_res_id)) {
                 handles[1] = p1_lockh;
                 handles[0] = p2_lockh;
-                res_id[1] = *p1_res_id;
-                res_id[0] = *p2_res_id;
+                res_id[1] = p1_res_id;
+                res_id[0] = p2_res_id;
+                lock_modes[1] = p1_lock_mode;
+                lock_modes[0] = p2_lock_mode;
         }
 
-        CDEBUG(D_INFO, "lock order: "LPU64"/"LPU64"\n",
-               p1_res_id[0].name[0], p2_res_id[0].name[0]);
+        CDEBUG(D_DLMTRACE, "lock order: "LPU64"/"LPU64"\n",
+               res_id[0]->name[0], res_id[1]->name[0]);
 
         flags = LDLM_FL_LOCAL_ONLY;
-        rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL, res_id[0],
-                              LDLM_PLAIN, NULL, 0, lock_mode, &flags,
+        rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL, *res_id[0],
+                              LDLM_PLAIN, NULL, 0, lock_modes[0], &flags,
                               ldlm_completion_ast, mds_blocking_ast, NULL,
                               handles[0]);
         if (rc != ELDLM_OK)
                 RETURN(-EIO);
         ldlm_lock_dump_handle(D_OTHER, handles[0]);
 
-        if (memcmp(&res_id[0], &res_id[1], sizeof(res_id[0])) == 0) {
+        if (memcmp(res_id[0], res_id[1], sizeof(*res_id[0])) == 0) {
                 memcpy(handles[1], handles[0], sizeof(*(handles[1])));
-                ldlm_lock_addref(handles[1], lock_mode);
-        } else {
+                ldlm_lock_addref(handles[1], lock_modes[1]);
+        } else if (res_id[1]->name[0] != 0) {
                 flags = LDLM_FL_LOCAL_ONLY;
                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
-                                      res_id[1], LDLM_PLAIN, NULL, 0, lock_mode,
-                                      &flags, ldlm_completion_ast,
+                                      *res_id[1], LDLM_PLAIN, NULL, 0,
+                                      lock_modes[1], &flags,ldlm_completion_ast,
                                       mds_blocking_ast, NULL, handles[1]);
                 if (rc != ELDLM_OK) {
-                        ldlm_lock_decref(handles[0], lock_mode);
+                        ldlm_lock_decref(handles[0], lock_modes[0]);
                         RETURN(-EIO);
                 }
+                ldlm_lock_dump_handle(D_OTHER, handles[1]);
         }
-        ldlm_lock_dump_handle(D_OTHER, handles[1]);
 
         RETURN(0);
 }
 
+int enqueue_4ordered_locks(struct obd_device *obd,struct ldlm_res_id *p1_res_id,
+                           struct lustre_handle *p1_lockh, int p1_lock_mode,
+                           struct ldlm_res_id *p2_res_id,
+                           struct lustre_handle *p2_lockh, int p2_lock_mode,
+                           struct ldlm_res_id *c1_res_id,
+                           struct lustre_handle *c1_lockh, int c1_lock_mode,
+                           struct ldlm_res_id *c2_res_id,
+                           struct lustre_handle *c2_lockh, int c2_lock_mode)
+{
+        struct ldlm_res_id *res_id[5] = { p1_res_id, p2_res_id,
+                                          c1_res_id, c2_res_id };
+        struct lustre_handle *dlm_handles[5] = { p1_lockh, p2_lockh,
+                                                 c1_lockh, c2_lockh };
+        int lock_modes[5] = { p1_lock_mode, p2_lock_mode,
+                              c1_lock_mode, c2_lock_mode };
+        int rc, i, j, sorted, flags;
+        ENTRY;
+
+        CDEBUG(D_DLMTRACE, "locks before: "LPU64"/"LPU64"/"LPU64"/"LPU64"\n",
+               res_id[0]->name[0], res_id[1]->name[0], res_id[2]->name[0],
+               res_id[3]->name[0]);
+
+        /* simple insertion sort - we have at most 4 elements */
+        for (i = 1; i < 4; i++) {
+                j = i - 1;
+                dlm_handles[4] = dlm_handles[i];
+                res_id[4] = res_id[i];
+                lock_modes[4] = lock_modes[i];
+
+                sorted = 0;
+                do {
+                        if (res_gt(res_id[j], res_id[4])) {
+                                dlm_handles[j + 1] = dlm_handles[j];
+                                res_id[j + 1] = res_id[j];
+                                lock_modes[j + 1] = lock_modes[j];
+                                j--;
+                        } else {
+                                sorted = 1;
+                        }
+                } while (j >= 0 && !sorted);
+
+                dlm_handles[j + 1] = dlm_handles[4];
+                res_id[j + 1] = res_id[4];
+                lock_modes[j + 1] = lock_modes[4];
+        }
+
+        CDEBUG(D_DLMTRACE, "lock order: "LPU64"/"LPU64"/"LPU64"/"LPU64"\n",
+               res_id[0]->name[0], res_id[1]->name[0], res_id[2]->name[0],
+               res_id[3]->name[0]);
+
+        /* XXX we could send ASTs on all these locks first before blocking? */
+        for (i = 0; i < 4; i++) {
+                flags = 0;
+                if (res_id[i]->name[0] == 0)
+                        break;
+                if (i != 0 &&
+                    memcmp(res_id[i], res_id[i-1], sizeof(*res_id[i])) == 0) {
+                        memcpy(dlm_handles[i], dlm_handles[i-1],
+                               sizeof(*(dlm_handles[i])));
+                        ldlm_lock_addref(dlm_handles[i], lock_modes[i]);
+                } else {
+                        rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
+                                              NULL, *res_id[i], LDLM_PLAIN,
+                                              NULL, 0, lock_modes[i], &flags,
+                                              ldlm_completion_ast,
+                                              mds_blocking_ast, NULL,
+                                              dlm_handles[i]);
+                        if (rc != ELDLM_OK)
+                                GOTO(out_err, rc = -EIO);
+                        ldlm_lock_dump_handle(D_OTHER, dlm_handles[i]);
+                }
+        }
+
+        RETURN(0);
+out_err:
+        while (i-- > 0)
+                ldlm_lock_decref(dlm_handles[i], lock_modes[i]);
+
+        return rc;
+}
+
+/* In the unlikely case that the child changed while we were waiting
+ * on the lock, we need to drop the lock on the old child and either:
+ * - if the child has a lower resource name, then we have to also
+ *   drop the parent lock and regain the locks in the right order
+ * - in the rename case, if the child has a lower resource name than one of
+ *   the other parent/child resources (maxres) we also need to reget the locks
+ * - if the child has a higher resource name (this is the common case)
+ *   we can just get the lock on the new child (still in lock order)
+ *
+ * Returns 0 if the child did not change or if it changed but could be locked.
+ * Returns 1 if the child changed and we need to re-lock (no locks held).
+ * Returns -ve error with a valid dchild (no locks held). */
+static int mds_verify_child(struct obd_device *obd,
+                            struct ldlm_res_id *parent_res_id,
+                            struct lustre_handle *parent_lockh,
+                            struct dentry *dparent, int parent_mode,
+                            struct ldlm_res_id *child_res_id,
+                            struct lustre_handle *child_lockh,
+                            struct dentry **dchildp, int child_mode,
+                            const char *name, int namelen,
+                            struct ldlm_res_id *maxres)
+{
+        struct dentry *vchild, *dchild = *dchildp;
+        int rc = 0, cleanup_phase = 2; /* parent, child locks */
+        ENTRY;
+
+        vchild = ll_lookup_one_len(name, dparent, namelen - 1);
+        if (IS_ERR(vchild))
+                GOTO(cleanup, rc = PTR_ERR(vchild));
+
+        if (likely((vchild->d_inode == NULL && child_res_id->name[0] == 0) ||
+                   (vchild->d_inode != NULL &&
+                    child_res_id->name[0] == vchild->d_inode->i_ino &&
+                    child_res_id->name[1] == vchild->d_inode->i_generation))) {
+                if (dchild != NULL)
+                        l_dput(dchild);
+                *dchildp = vchild;
+
+                RETURN(0);
+        }
+
+        CDEBUG(D_DLMTRACE, "child inode changed: %p != %p (%lu != "LPU64")\n",
+               vchild->d_inode, dchild ? dchild->d_inode : 0,
+               vchild->d_inode ? vchild->d_inode->i_ino : 0,
+               child_res_id->name[0]);
+        if (child_res_id->name[0] != 0)
+                ldlm_lock_decref(child_lockh, child_mode);
+        if (dchild)
+                l_dput(dchild);
+
+        cleanup_phase = 1; /* parent lock only */
+        *dchildp = dchild = vchild;
+
+        if (dchild->d_inode) {
+                int flags = 0;
+                child_res_id->name[0] = dchild->d_inode->i_ino;
+                child_res_id->name[1] = dchild->d_inode->i_generation;
+
+                if (res_gt(parent_res_id, child_res_id) ||
+                    res_gt(maxres, child_res_id)) {
+                        CDEBUG(D_DLMTRACE, "relock "LPU64"<("LPU64"|"LPU64")\n",
+                               child_res_id->name[0], parent_res_id->name[0],
+                               maxres->name[0]);
+                        GOTO(cleanup, rc = 1);
+                }
+
+                rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
+                                      NULL, *child_res_id, LDLM_PLAIN,
+                                      NULL, 0, child_mode, &flags,
+                                      ldlm_completion_ast, mds_blocking_ast,
+                                      NULL, child_lockh);
+                if (rc != ELDLM_OK)
+                        GOTO(cleanup, rc = -EIO);
+        } else {
+                memset(child_res_id, 0, sizeof(*child_res_id));
+        }
+
+        EXIT;
+cleanup:
+        if (rc) {
+                switch(cleanup_phase) {
+                case 2:
+                        if (child_res_id->name[0] != 0)
+                                ldlm_lock_decref(child_lockh, child_mode);
+                case 1:
+                        ldlm_lock_decref(parent_lockh, parent_mode);
+                }
+        }
+        return rc;
+}
+
+int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds,
+                                struct ll_fid *fid,
+                                struct lustre_handle *parent_lockh,
+                                struct dentry **dparentp, int parent_mode,
+                                char *name, int namelen,
+                                struct lustre_handle *child_lockh,
+                                struct dentry **dchildp, int child_mode)
+{
+        struct ldlm_res_id child_res_id = { .name = {0} };
+        struct ldlm_res_id parent_res_id = { .name = {0} };
+        struct inode *inode;
+        int rc = 0, cleanup_phase = 0;
+        ENTRY;
+
+        /* Step 1: Lookup parent */
+        *dparentp = mds_fid2dentry(mds, fid, NULL);
+        if (IS_ERR(*dparentp))
+                RETURN(rc = PTR_ERR(*dparentp));
+        LASSERT((*dparentp)->d_inode);
+
+        CDEBUG(D_INODE, "parent ino %lu, name %s\n",
+               (*dparentp)->d_inode->i_ino, name);
+
+        parent_res_id.name[0] = (*dparentp)->d_inode->i_ino;
+        parent_res_id.name[1] = (*dparentp)->d_inode->i_generation;
+
+        cleanup_phase = 1; /* parent dentry */
+
+        /* Step 2: Lookup child (without lock, to get resource name) */
+        *dchildp = ll_lookup_one_len(name, *dparentp, namelen - 1);
+        if (IS_ERR(*dchildp)) {
+                rc = PTR_ERR(*dchildp);
+                CDEBUG(D_INODE, "child lookup error %d\n", rc);
+                GOTO(cleanup, rc);
+        }
+
+        inode = (*dchildp)->d_inode;
+        if (inode != NULL)
+                inode = igrab(inode);
+        if (inode == NULL)
+                goto retry_locks;
+
+        child_res_id.name[0] = inode->i_ino;
+        child_res_id.name[1] = inode->i_generation;
+        iput(inode);
+
+retry_locks:
+        cleanup_phase = 2; /* child dentry */
+
+        /* Step 3: Lock parent and child in resource order.  If child doesn't
+         *         exist, we still have to lock the parent and re-lookup. */
+        rc = enqueue_ordered_locks(obd, &parent_res_id,parent_lockh,parent_mode,
+                                   &child_res_id, child_lockh, child_mode);
+        if (rc)
+                GOTO(cleanup, rc);
+
+        if (!(*dchildp)->d_inode)
+                cleanup_phase = 3; /* parent lock */
+        else
+                cleanup_phase = 4; /* child lock */
+
+        /* Step 4: Re-lookup child to verify it hasn't changed since locking */
+        rc = mds_verify_child(obd, &parent_res_id, parent_lockh, *dparentp,
+                              parent_mode, &child_res_id, child_lockh, dchildp,
+                              child_mode, name, namelen, &parent_res_id);
+        if (rc > 0)
+                goto retry_locks;
+        if (rc < 0) {
+                cleanup_phase = 3;
+                GOTO(cleanup, rc);
+        }
+
+cleanup:
+        if (rc) {
+                switch (cleanup_phase) {
+                case 4:
+                        ldlm_lock_decref(child_lockh, child_mode);
+                case 3:
+                        ldlm_lock_decref(parent_lockh, parent_mode);
+                case 2:
+                        l_dput(*dchildp);
+                case 1:
+                        l_dput(*dparentp);
+                default: ;
+                }
+        }
+        return rc;
+}
+
 void mds_reconstruct_generic(struct ptlrpc_request *req)
 {
         struct mds_export_data *med = &req->rq_export->exp_mds_data;
@@ -770,41 +1067,34 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
                             struct ptlrpc_request *req,
                             struct lustre_handle *lh)
 {
-        struct dentry *dparent = NULL;
-        struct dentry *dchild = NULL;
+        struct dentry *dparent, *dchild;
         struct mds_obd *mds = mds_req2mds(req);
         struct obd_device *obd = req->rq_export->exp_obd;
         struct mds_body *body = NULL;
         struct inode *child_inode;
         struct lustre_handle parent_lockh, child_lockh, child_reuse_lockh;
         void *handle = NULL;
-        struct ldlm_res_id child_res_id = { .name = {0} };
-        int rc = 0, flags = 0, log_unlink = 0, cleanup_phase = 0;
+        int rc = 0, log_unlink = 0, cleanup_phase = 0;
         ENTRY;
 
         LASSERT(offset == 0 || offset == 2);
 
+        DEBUG_REQ(D_INODE, req, "parent ino "LPU64"/%u, child %s",
+                  rec->ur_fid1->id, rec->ur_fid1->generation, rec->ur_name);
+
         MDS_CHECK_RESENT(req, mds_reconstruct_generic(req));
 
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
                 GOTO(cleanup, rc = -ENOENT);
 
-        /* Step 1: Lookup the parent by FID */
-        dparent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PW,
-                                        &parent_lockh, rec->ur_name,
-                                        rec->ur_namelen - 1);
-        if (IS_ERR(dparent))
-                GOTO(cleanup, rc = PTR_ERR(dparent));
-        LASSERT(dparent->d_inode);
-
-        cleanup_phase = 1; /* Have parent dentry lock */
-
-        /* Step 2: Lookup the child */
-        dchild = ll_lookup_one_len(rec->ur_name, dparent, rec->ur_namelen - 1);
-        if (IS_ERR(dchild))
-                GOTO(cleanup, rc = PTR_ERR(dchild));
+        rc = mds_get_parent_child_locked(obd, mds, rec->ur_fid1,
+                                         &parent_lockh, &dparent, LCK_PW,
+                                         rec->ur_name, rec->ur_namelen,
+                                         &child_lockh, &dchild, LCK_EX);
+        if (rc)
+                GOTO(cleanup, rc);
 
-        cleanup_phase = 2; /* child dentry */
+        cleanup_phase = 1; /* dchild, dparent, locks */
 
         child_inode = dchild->d_inode;
         if (child_inode == NULL) {
@@ -813,35 +1103,16 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
                 GOTO(cleanup, rc = -ENOENT);
         }
 
-        DEBUG_REQ(D_INODE, req, "parent ino %lu, child ino %lu",
-                  dparent->d_inode->i_ino, child_inode->i_ino);
-
-        /* Step 3: Get a lock on the child */
-        child_res_id.name[0] = child_inode->i_ino;
-        child_res_id.name[1] = child_inode->i_generation;
-
-        rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
-                              child_res_id, LDLM_PLAIN, NULL, 0, LCK_EX,
-                              &flags, ldlm_completion_ast, mds_blocking_ast,
-                              NULL, &child_lockh);
-        if (rc != ELDLM_OK)
-                GOTO(cleanup, rc);
-
-        cleanup_phase = 3; /* child lock */
+        cleanup_phase = 2; /* dchild has a lock */
 
         /* Step 4: Get a lock on the ino to sync with creation WRT inode
          * reuse (see bug 2029). */
-        child_res_id.name[1] = 0;
-        
-        rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
-                              child_res_id, LDLM_PLAIN, NULL, 0, LCK_EX,
-                              &flags, ldlm_completion_ast, mds_blocking_ast,
-                              NULL, &child_reuse_lockh);
+        rc = mds_lock_new_child(obd, child_inode, &child_reuse_lockh);
         if (rc != ELDLM_OK)
                 GOTO(cleanup, rc);
 
-        cleanup_phase = 4; /* child lock */
-        
+        cleanup_phase = 3; /* child inum lock */
+
         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_UNLINK_WRITE, dparent->d_inode->i_sb);
 
         /* ldlm_reply in buf[0] if called via intent */
@@ -852,11 +1123,11 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
         LASSERT(body != NULL);
 
         /* If this is the last reference to this inode, get the OBD EA
-         * data first so the client can destroy OST objects. 
+         * data first so the client can destroy OST objects.
          * we only do the object removal if no open files remain.
-         * Nobody can get at this name anymore because of the locks so 
+         * Nobody can get at this name anymore because of the locks so
          * we make decisions here as to whether to remove the inode */
-        if (S_ISREG(child_inode->i_mode) && child_inode->i_nlink == 1 && 
+        if (S_ISREG(child_inode->i_mode) && child_inode->i_nlink == 1 &&
             mds_open_orphan_count(child_inode) == 0) {
                 mds_pack_inode2fid(&body->fid1, child_inode);
                 mds_pack_inode2body(body, child_inode);
@@ -887,7 +1158,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
         if (child_inode->i_nlink == (S_ISDIR(child_inode->i_mode) ? 2 : 1) &&
             mds_open_orphan_count(child_inode) > 0) {
                 rc = mds_open_unlink_rename(rec, obd, dparent, dchild, &handle);
-                cleanup_phase = 5; /* transaction */
+                cleanup_phase = 4; /* transaction */
                 GOTO(cleanup, rc);
         }
 
@@ -902,7 +1173,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
                                       NULL);
                 if (IS_ERR(handle))
                         GOTO(cleanup, rc = PTR_ERR(handle));
-                cleanup_phase = 5; /* transaction */
+                cleanup_phase = 4; /* transaction */
                 rc = vfs_rmdir(dparent->d_inode, dchild);
                 break;
         case S_IFREG: {
@@ -911,7 +1182,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
                 if (IS_ERR(handle))
                         GOTO(cleanup, rc = PTR_ERR(handle));
 
-                cleanup_phase = 5; /* transaction */
+                cleanup_phase = 4; /* transaction */
                 rc = vfs_unlink(dparent->d_inode, dchild);
 
 #ifdef ENABLE_ORPHANS
@@ -932,7 +1203,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
                                       NULL);
                 if (IS_ERR(handle))
                         GOTO(cleanup, rc = PTR_ERR(handle));
-                cleanup_phase = 5;
+                cleanup_phase = 4; /* transaction */
                 rc = vfs_unlink(dparent->d_inode, dchild);
                 break;
         default:
@@ -957,13 +1228,13 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
         }
 
         switch(cleanup_phase) {
-        case 5:
+        case 4:
                 rc = mds_finish_transno(mds, dparent->d_inode, handle, req,
                                         rc, 0);
                 if (!rc)
                         (void)obd_set_info(mds->mds_osc_exp, strlen("unlinked"),
                                            "unlinked", 0, NULL);
-        case 4: /* child ino-reuse lock */
+        case 3: /* child ino-reuse lock */
                 if (rc && body != NULL) {
                         // Don't unlink the OST objects if the MDS unlink failed
                         body->valid = 0;
@@ -972,16 +1243,14 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
                         ldlm_lock_decref(&child_reuse_lockh, LCK_EX);
                 else
                         ldlm_put_lock_into_req(req, &child_reuse_lockh, LCK_EX);
-        case 3: /* child lock */
+        case 2: /* child lock */
                 ldlm_lock_decref(&child_lockh, LCK_EX);
-        case 2: /* child dentry */
-                l_dput(dchild);
-        case 1: /* parent dentry and lock */
-                if (rc) {
+        case 1: /* child and parent dentry, parent lock */
+                if (rc)
                         ldlm_lock_decref(&parent_lockh, LCK_PW);
-                } else {
+                else
                         ldlm_put_lock_into_req(req, &parent_lockh, LCK_PW);
-                }
+                l_dput(dchild);
                 l_dput(dparent);
         case 0:
                 break;
@@ -1005,11 +1274,15 @@ static int mds_reint_link(struct mds_update_record *rec, int offset,
         struct lustre_handle *handle = NULL, tgt_dir_lockh, src_lockh;
         struct ldlm_res_id src_res_id = { .name = {0} };
         struct ldlm_res_id tgt_dir_res_id = { .name = {0} };
-        int lock_mode = 0, rc = 0, cleanup_phase = 0;
+        int rc = 0, cleanup_phase = 0;
         ENTRY;
 
         LASSERT(offset == 0);
 
+        DEBUG_REQ(D_INODE, req, "original "LPU64"/%u to "LPU64"/%u %s",
+                  rec->ur_fid1->id, rec->ur_fid1->generation,
+                  rec->ur_fid2->id, rec->ur_fid2->generation, rec->ur_name);
+
         MDS_CHECK_RESENT(req, mds_reconstruct_generic(req));
 
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
@@ -1033,16 +1306,15 @@ static int mds_reint_link(struct mds_update_record *rec, int offset,
                de_src->d_inode->i_ino);
 
         /* Step 2: Take the two locks */
-        lock_mode = LCK_EX;
         src_res_id.name[0] = de_src->d_inode->i_ino;
         src_res_id.name[1] = de_src->d_inode->i_generation;
         tgt_dir_res_id.name[0] = de_tgt_dir->d_inode->i_ino;
         tgt_dir_res_id.name[1] = de_tgt_dir->d_inode->i_generation;
 
-        rc = enqueue_ordered_locks(LCK_EX, obd, &src_res_id, &tgt_dir_res_id,
-                                   &src_lockh, &tgt_dir_lockh);
-        if (rc != ELDLM_OK)
-                GOTO(cleanup, rc = -EIO);
+        rc = enqueue_ordered_locks(obd, &src_res_id, &src_lockh, LCK_EX,
+                                   &tgt_dir_res_id, &tgt_dir_lockh, LCK_EX);
+        if (rc)
+                GOTO(cleanup, rc);
 
         cleanup_phase = 3; /* locks */
 
@@ -1086,11 +1358,11 @@ cleanup:
                 l_dput(dchild);
         case 3: /* locks */
                 if (rc) {
-                        ldlm_lock_decref(&src_lockh, lock_mode);
-                        ldlm_lock_decref(&tgt_dir_lockh, lock_mode);
+                        ldlm_lock_decref(&src_lockh, LCK_EX);
+                        ldlm_lock_decref(&tgt_dir_lockh, LCK_EX);
                 } else {
-                        ldlm_put_lock_into_req(req, &src_lockh, lock_mode);
-                        ldlm_put_lock_into_req(req, &tgt_dir_lockh, lock_mode);
+                        ldlm_put_lock_into_req(req, &src_lockh, LCK_EX);
+                        ldlm_put_lock_into_req(req, &tgt_dir_lockh, LCK_EX);
                 }
         case 2: /* target dentry */
                 l_dput(de_tgt_dir);
@@ -1156,6 +1428,193 @@ out_lock:
         RETURN(rc);
 }
 
+/* The idea here is that we need to get four locks in the end:
+ * one on each parent directory, one on each child.  We need to take
+ * these locks in some kind of order (to avoid deadlocks), and the order
+ * I selected is "increasing resource number" order.  We need to look up
+ * the children, however, before we know what the resource number(s) are.
+ * Thus the following plan:
+ *
+ * 1,2. Look up the parents
+ * 3,4. Look up the children
+ * 5. Take locks on the parents and children, in order
+ * 6. Verify that the children haven't changed since they were looked up
+ *
+ * If there was a race and the children changed since they were first looked
+ * up, it is possible that mds_verify_child() will be able to just grab the
+ * lock on the new child resource (if it has a higher resource than any other)
+ * but we need to compare against not only its parent, but also against the
+ * parent and child of the "other half" of the rename, hence maxres_{src,tgt}.
+ *
+ * We need the fancy igrab() on the child inodes because we aren't holding a
+ * lock on the parent after the lookup is done, so dentry->d_inode may change
+ * at any time, and igrab() itself doesn't like getting passed a NULL argument.
+ */
+static int mds_get_parents_children_locked(struct obd_device *obd,
+                                           struct mds_obd *mds,
+                                           struct ll_fid *p1_fid,
+                                           struct dentry **de_srcdirp,
+                                           struct ll_fid *p2_fid,
+                                           struct dentry **de_tgtdirp,
+                                           int parent_mode,
+                                           const char *old_name, int old_len,
+                                           struct dentry **de_oldp,
+                                           const char *new_name, int new_len,
+                                           struct dentry **de_newp,
+                                           struct lustre_handle *dlm_handles,
+                                           int child_mode)
+{
+        struct ldlm_res_id p1_res_id = { .name = {0} };
+        struct ldlm_res_id p2_res_id = { .name = {0} };
+        struct ldlm_res_id c1_res_id = { .name = {0} };
+        struct ldlm_res_id c2_res_id = { .name = {0} };
+        struct ldlm_res_id *maxres_src, *maxres_tgt;
+        struct inode *inode;
+        int rc = 0, cleanup_phase = 0;
+        ENTRY;
+
+        /* Step 1: Lookup the source directory */
+        *de_srcdirp = mds_fid2dentry(mds, p1_fid, NULL);
+        if (IS_ERR(*de_srcdirp))
+                GOTO(cleanup, rc = PTR_ERR(*de_srcdirp));
+
+        cleanup_phase = 1; /* source directory dentry */
+
+        p1_res_id.name[0] = (*de_srcdirp)->d_inode->i_ino;
+        p1_res_id.name[1] = (*de_srcdirp)->d_inode->i_generation;
+
+        /* Step 2: Lookup the target directory */
+        if (memcmp(p1_fid, p2_fid, sizeof(*p1_fid)) == 0) {
+                *de_tgtdirp = dget(*de_srcdirp);
+        } else {
+                *de_tgtdirp = mds_fid2dentry(mds, p2_fid, NULL);
+                if (IS_ERR(*de_tgtdirp))
+                        GOTO(cleanup, rc = PTR_ERR(*de_tgtdirp));
+        }
+
+        cleanup_phase = 2; /* target directory dentry */
+
+        p2_res_id.name[0] = (*de_tgtdirp)->d_inode->i_ino;
+        p2_res_id.name[1] = (*de_tgtdirp)->d_inode->i_generation;
+
+        /* Step 3: Lookup the source child entry */
+        *de_oldp = ll_lookup_one_len(old_name, *de_srcdirp, old_len - 1);
+        if (IS_ERR(*de_oldp)) {
+                rc = PTR_ERR(*de_oldp);
+                CERROR("old child lookup error (%*s): %d\n",
+                       old_len - 1, old_name, rc);
+                GOTO(cleanup, rc);
+        }
+
+        cleanup_phase = 3; /* original name dentry */
+
+        inode = (*de_oldp)->d_inode;
+        if (inode != NULL)
+                inode = igrab(inode);
+        if (inode == NULL)
+                GOTO(cleanup, rc = -ENOENT);
+
+        c1_res_id.name[0] = inode->i_ino;
+        c1_res_id.name[1] = inode->i_generation;
+        iput(inode);
+
+        /* Step 4: Lookup the target child entry */
+        *de_newp = ll_lookup_one_len(new_name, *de_tgtdirp, new_len - 1);
+        if (IS_ERR(*de_newp)) {
+                rc = PTR_ERR(*de_newp);
+                CERROR("new child lookup error (%*s): %d\n",
+                       old_len - 1, old_name, rc);
+                GOTO(cleanup, rc);
+        }
+
+        cleanup_phase = 4; /* target dentry */
+
+        inode = (*de_newp)->d_inode;
+        if (inode != NULL)
+                inode = igrab(inode);
+        if (inode == NULL)
+                goto retry_locks;
+
+        c2_res_id.name[0] = inode->i_ino;
+        c2_res_id.name[1] = inode->i_generation;
+        iput(inode);
+
+retry_locks:
+        /* Step 5: Take locks on the parents and child(ren) */
+        maxres_src = &p1_res_id;
+        maxres_tgt = &p2_res_id;
+        cleanup_phase = 4; /* target dentry */
+
+        if (c1_res_id.name[0] != 0 && res_gt(&c1_res_id, &p1_res_id))
+                maxres_src = &c1_res_id;
+        if (c2_res_id.name[0] != 0 && res_gt(&c2_res_id, &p2_res_id))
+                maxres_tgt = &c2_res_id;
+
+        rc = enqueue_4ordered_locks(obd, &p1_res_id,&dlm_handles[0],parent_mode,
+                                    &p2_res_id, &dlm_handles[1], parent_mode,
+                                    &c1_res_id, &dlm_handles[2], child_mode,
+                                    &c2_res_id, &dlm_handles[3], child_mode);
+        if (rc)
+                GOTO(cleanup, rc);
+
+        cleanup_phase = 6; /* parent and child(ren) locks */
+
+        /* Step 6a: Re-lookup source child to verify it hasn't changed */
+        rc = mds_verify_child(obd, &p1_res_id, &dlm_handles[0], *de_srcdirp,
+                              parent_mode, &c1_res_id, &dlm_handles[2], de_oldp,
+                              child_mode, old_name, old_len, maxres_tgt);
+        if (rc) {
+                if (c2_res_id.name[0] != 0)
+                        ldlm_lock_decref(&dlm_handles[3], child_mode);
+                ldlm_lock_decref(&dlm_handles[1], parent_mode);
+                cleanup_phase = 4;
+                if (rc > 0)
+                        goto retry_locks;
+                GOTO(cleanup, rc);
+        }
+
+        if ((*de_oldp)->d_inode == NULL)
+                GOTO(cleanup, rc = -ENOENT);
+
+        /* Step 6b: Re-lookup target child to verify it hasn't changed */
+        rc = mds_verify_child(obd, &p2_res_id, &dlm_handles[1], *de_tgtdirp,
+                              parent_mode, &c2_res_id, &dlm_handles[3], de_newp,
+                              child_mode, new_name, new_len, maxres_src);
+        if (rc) {
+                ldlm_lock_decref(&dlm_handles[2], child_mode);
+                ldlm_lock_decref(&dlm_handles[0], parent_mode);
+                cleanup_phase = 4;
+                if (rc > 0)
+                        goto retry_locks;
+                GOTO(cleanup, rc);
+        }
+
+        EXIT;
+cleanup:
+        if (rc) {
+                switch (cleanup_phase) {
+                case 6: /* child lock(s) */
+                        if (c2_res_id.name[0] != 0)
+                                ldlm_lock_decref(&dlm_handles[3], child_mode);
+                        if (c1_res_id.name[0] != 0)
+                                ldlm_lock_decref(&dlm_handles[2], child_mode);
+                case 5: /* parent locks */
+                        ldlm_lock_decref(&dlm_handles[1], parent_mode);
+                        ldlm_lock_decref(&dlm_handles[0], parent_mode);
+                case 4: /* target dentry */
+                        l_dput(*de_newp);
+                case 3: /* source dentry */
+                        l_dput(*de_oldp);
+                case 2: /* target directory dentry */
+                        l_dput(*de_tgtdirp);
+                case 1: /* source directry dentry */
+                        l_dput(*de_srcdirp);
+                }
+        }
+
+        return rc;
+}
+
 static int mds_reint_rename(struct mds_update_record *rec, int offset,
                             struct ptlrpc_request *req,
                             struct lustre_handle *lockh)
@@ -1167,116 +1626,45 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset,
         struct dentry *de_new = NULL;
         struct mds_obd *mds = mds_req2mds(req);
         struct lustre_handle dlm_handles[4];
-        struct ldlm_res_id p1_res_id = { .name = {0} };
-        struct ldlm_res_id p2_res_id = { .name = {0} };
-        struct ldlm_res_id c1_res_id = { .name = {0} };
-        struct ldlm_res_id c2_res_id = { .name = {0} };
         struct mds_body *body = NULL;
-        int rc = 0, lock_count = 3, flags = LDLM_FL_LOCAL_ONLY;
+        int rc = 0, lock_count = 3;
         int cleanup_phase = 0;
         void *handle = NULL;
         ENTRY;
 
         LASSERT(offset == 0);
 
-        MDS_CHECK_RESENT(req, mds_reconstruct_generic(req));
-
-        de_srcdir = mds_fid2dentry(mds, rec->ur_fid1, NULL);
-        if (IS_ERR(de_srcdir))
-                GOTO(cleanup, rc = PTR_ERR(de_srcdir));
-
-        cleanup_phase = 1; /* source directory dentry */
-
-        de_tgtdir = mds_fid2dentry(mds, rec->ur_fid2, NULL);
-        if (IS_ERR(de_tgtdir))
-                GOTO(cleanup, rc = PTR_ERR(de_tgtdir));
-
-        cleanup_phase = 2; /* target directory dentry */
+        DEBUG_REQ(D_INODE, req, "parent "LPU64"/%u %s to "LPU64"/%u %s",
+                  rec->ur_fid1->id, rec->ur_fid1->generation, rec->ur_name,
+                  rec->ur_fid2->id, rec->ur_fid2->generation, rec->ur_tgt);
 
-        /* The idea here is that we need to get four locks in the end:
-         * one on each parent directory, one on each child.  We need to take
-         * these locks in some kind of order (to avoid deadlocks), and the order
-         * I selected is "increasing resource number" order.  We need to take
-         * the locks on the parent directories, however, before we can lookup
-         * the children.  Thus the following plan:
-         *
-         * 1. Take locks on the parent(s), in order
-         * 2. Lookup the children
-         * 3. Take locks on the children, in order
-         * 4. Execute the rename
-         */
-
-        /* Step 1: Take locks on the parent(s), in order */
-        p1_res_id.name[0] = de_srcdir->d_inode->i_ino;
-        p1_res_id.name[1] = de_srcdir->d_inode->i_generation;
-
-        p2_res_id.name[0] = de_tgtdir->d_inode->i_ino;
-        p2_res_id.name[1] = de_tgtdir->d_inode->i_generation;
+        MDS_CHECK_RESENT(req, mds_reconstruct_generic(req));
 
-        rc = enqueue_ordered_locks(LCK_EX, obd, &p1_res_id, &p2_res_id,
-                                   &(dlm_handles[0]), &(dlm_handles[1]));
-        if (rc != ELDLM_OK)
+        rc = mds_get_parents_children_locked(obd, mds, rec->ur_fid1, &de_srcdir,
+                                             rec->ur_fid2, &de_tgtdir, LCK_PW,
+                                             rec->ur_name, rec->ur_namelen,
+                                             &de_old, rec->ur_tgt,
+                                             rec->ur_tgtlen, &de_new,
+                                             dlm_handles, LCK_EX);
+        if (rc)
                 GOTO(cleanup, rc);
 
-        cleanup_phase = 3; /* parent locks */
+        cleanup_phase = 1; /* parent(s), children, locks */
 
-        /* Step 2: Lookup the children */
-        de_old = ll_lookup_one_len(rec->ur_name, de_srcdir, rec->ur_namelen-1);
-        if (IS_ERR(de_old)) {
-                CERROR("old child lookup error (%*s): %ld\n",
-                       rec->ur_namelen - 1, rec->ur_name, PTR_ERR(de_old));
-                GOTO(cleanup, rc = PTR_ERR(de_old));
-        }
-
-        cleanup_phase = 4; /* original name dentry */
-
-        if (de_old->d_inode == NULL)
-                GOTO(cleanup, rc = -ENOENT);
+        if (de_new->d_inode)
+                lock_count = 4;
 
         /* sanity check for src inode */
         if (de_old->d_inode->i_ino == de_srcdir->d_inode->i_ino ||
             de_old->d_inode->i_ino == de_tgtdir->d_inode->i_ino)
                 GOTO(cleanup, rc = -EINVAL);
 
-        de_new = ll_lookup_one_len(rec->ur_tgt, de_tgtdir, rec->ur_tgtlen - 1);
-        if (IS_ERR(de_new)) {
-                CERROR("new child lookup error (%*s): %ld\n",
-                       rec->ur_tgtlen - 1, rec->ur_tgt, PTR_ERR(de_new));
-                GOTO(cleanup, rc = PTR_ERR(de_new));
-        }
-
-        cleanup_phase = 5; /* target dentry */
-
         /* sanity check for dest inode */
         if (de_new->d_inode &&
             (de_new->d_inode->i_ino == de_srcdir->d_inode->i_ino ||
-            de_new->d_inode->i_ino == de_tgtdir->d_inode->i_ino))
+             de_new->d_inode->i_ino == de_tgtdir->d_inode->i_ino))
                 GOTO(cleanup, rc = -EINVAL);
 
-        /* Step 3: Take locks on the children */
-        c1_res_id.name[0] = de_old->d_inode->i_ino;
-        c1_res_id.name[1] = de_old->d_inode->i_generation;
-        if (de_new->d_inode == NULL) {
-                flags = LDLM_FL_LOCAL_ONLY;
-                rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
-                                      c1_res_id, LDLM_PLAIN, NULL, 0, LCK_EX,
-                                      &flags, ldlm_completion_ast,
-                                      mds_blocking_ast, NULL,
-                                      &(dlm_handles[2]));
-                lock_count = 3;
-        } else {
-                c2_res_id.name[0] = de_new->d_inode->i_ino;
-                c2_res_id.name[1] = de_new->d_inode->i_generation;
-                rc = enqueue_ordered_locks(LCK_EX, obd, &c1_res_id, &c2_res_id,
-                                           &(dlm_handles[2]),
-                                           &(dlm_handles[3]));
-                lock_count = 4;
-        }
-        if (rc != ELDLM_OK)
-                GOTO(cleanup, rc);
-
-        cleanup_phase = 6; /* child locks */
-
         /* if we are about to remove the target at first, pass the EA of
          * that inode to client to perform and cleanup on OST */
         body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
@@ -1291,14 +1679,14 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset,
                 mds_pack_md(obd, req->rq_repmsg, 1, body, de_new->d_inode, 1);
                 if (!(body->valid & OBD_MD_FLEASIZE)) {
                         body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
-                                        OBD_MD_FLATIME | OBD_MD_FLMTIME);
+                        OBD_MD_FLATIME | OBD_MD_FLMTIME);
                 } else {
                         /* XXX need log unlink? */
                 }
         }
 
-        /* Step 4: Execute the rename */
-        OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_RENAME_WRITE,de_srcdir->d_inode->i_sb);
+        OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_RENAME_WRITE,
+                       de_srcdir->d_inode->i_sb);
 
         handle = fsfilt_start(obd, de_tgtdir->d_inode, FSFILT_OP_RENAME, NULL);
         if (IS_ERR(handle))
@@ -1322,37 +1710,29 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset,
         rc = vfs_rename(de_srcdir->d_inode, de_old, de_tgtdir->d_inode, de_new);
         unlock_kernel();
 
-        EXIT;
+        GOTO(cleanup, rc);
 cleanup:
         rc = mds_finish_transno(mds, de_tgtdir ? de_tgtdir->d_inode : NULL,
                                 handle, req, rc, 0);
         switch (cleanup_phase) {
-        case 6: /* child locks */
+        case 1:
                 if (rc) {
-                        ldlm_lock_decref(&(dlm_handles[2]), LCK_EX);
                         if (lock_count == 4)
                                 ldlm_lock_decref(&(dlm_handles[3]), LCK_EX);
+                        ldlm_lock_decref(&(dlm_handles[2]), LCK_EX);
+                        ldlm_lock_decref(&(dlm_handles[1]), LCK_PW);
+                        ldlm_lock_decref(&(dlm_handles[0]), LCK_PW);
                 } else {
-                        ldlm_put_lock_into_req(req, &(dlm_handles[2]), LCK_EX);
                         if (lock_count == 4)
                                 ldlm_put_lock_into_req(req,
-                                                &(dlm_handles[3]), LCK_EX);
+                                                    &(dlm_handles[3]), LCK_EX);
+                        ldlm_put_lock_into_req(req, &(dlm_handles[2]), LCK_EX);
+                        ldlm_put_lock_into_req(req, &(dlm_handles[1]), LCK_PW);
+                        ldlm_put_lock_into_req(req, &(dlm_handles[0]), LCK_PW);
                 }
-        case 5: /* target dentry */
                 l_dput(de_new);
-        case 4: /* source dentry */
                 l_dput(de_old);
-        case 3: /* parent locks */
-                if (rc) {
-                        ldlm_lock_decref(&(dlm_handles[0]), LCK_EX);
-                        ldlm_lock_decref(&(dlm_handles[1]), LCK_EX);
-                } else {
-                        ldlm_put_lock_into_req(req, &(dlm_handles[0]), LCK_EX);
-                        ldlm_put_lock_into_req(req, &(dlm_handles[1]), LCK_EX);
-                }
-        case 2: /* target directory dentry */
                 l_dput(de_tgtdir);
-        case 1: /* source directry dentry */
                 l_dput(de_srcdir);
         case 0:
                 break;
index 1474b7b..289e071 100644 (file)
@@ -1,3 +1,7 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ */
+
 #include <stdio.h>
 #include <sys/types.h>
 #include <sys/stat.h>
@@ -13,9 +17,9 @@
 
 void usage(char *prog)
 {
-       printf("usage: %s {-o|-m} filenamefmt count\n", prog);
-       printf("       %s {-o|-m} filenamefmt -seconds\n", prog);
-       printf("       %s {-o|-m} filenamefmt start count\n", prog);
+        printf("usage: %s {-o|-m|-l<tgt>} filenamefmt count\n", prog);
+        printf("       %s {-o|-m|-l<tgt>} filenamefmt -seconds\n", prog);
+        printf("       %s {-o|-m|-l<tgt>} filenamefmt start count\n", prog);
 }
 
 /* Print process rank, loop count, message, and exit (i.e. a fatal error) */
@@ -35,27 +39,28 @@ int rprintf(int rank, int loop, const char *fmt, ...)
 
 int main(int argc, char ** argv)
 {
-        int i, rc = 0, do_open, rank;
-        char format[4096], *fmt;
+        int i, rc = 0, do_open = 0, do_link = 0, rank;
+        char format[4096], *fmt, *tgt;
         char filename[4096];
         long start, last, end;
-       long begin = 0, count;
+        long begin = 0, count;
 
         rc = MPI_Init(&argc, &argv);
         if (rc != MPI_SUCCESS)
                 rprintf(-1, -1, "MPI_Init failed: %d\n", rc);
 
         if (argc < 4 || argc > 5) {
-               usage(argv[0]);
+                usage(argv[0]);
                 return 1;
         }
 
         if (strcmp(argv[1], "-o") == 0) {
                 do_open = 1;
-        } else if (strcmp(argv[1], "-m") == 0) {
-                do_open = 0;
-        } else {
-               usage(argv[0]);
+        } else if (strncmp(argv[1], "-l", 2) == 0 && argv[1][2]) {
+                tgt = argv[1] + 2;
+                do_link = 1;
+        } else if (strcmp(argv[1], "-m") != 0) {
+                usage(argv[0]);
                 return 1;
         }
 
@@ -74,28 +79,27 @@ int main(int argc, char ** argv)
 
         start = last = time(0);
 
-       if (argc == 4) {
-               end = strtol(argv[3], NULL, 0);
-               if (end > 0) {
-                       count = end;
-                       end = -1UL >> 1;
-               } else {
-                       end = start - end;
-                       count = -1UL >> 1;
-               }
-       } else {
-               end = -1UL >> 1;
-               begin = strtol(argv[3], NULL, 0);
-               count = strtol(argv[4], NULL, 0);
-       }
-
-       if (strchr(argv[2], '%'))
-               fmt = argv[2];
-       else {
-               sprintf(format, "%s%%d", argv[2]);
-               fmt = format;
-       }
-       printf("starting at %s", ctime(&start));
+        if (argc == 4) {
+                end = strtol(argv[3], NULL, 0);
+        } else {
+                begin = strtol(argv[3], NULL, 0);
+                end = strtol(argv[4], NULL, 0);
+        }
+        if (end > 0) {
+                count = end;
+                end = -1UL >> 1;
+        } else {
+                end = start - end;
+                count = -1UL >> 1;
+        }
+
+        if (strchr(argv[2], '%'))
+                fmt = argv[2];
+        else {
+                sprintf(format, "%s%%d", argv[2]);
+                fmt = format;
+        }
+        printf("starting at %s", ctime(&start));
         for (i = 0; i < count && time(0) < end; i++, begin++) {
                 sprintf(filename, fmt, begin);
                 if (do_open) {
@@ -107,6 +111,14 @@ int main(int argc, char ** argv)
                                 break;
                         }
                         close(fd);
+                } else if (do_link) {
+                        rc = link(tgt, filename);
+                        if (rc) {
+                                printf("link(%s, %s) error: %s\n",
+                                       tgt, filename, strerror(errno));
+                                rc = errno;
+                                break;
+                        }
                 } else {
                         rc = mknod(filename, S_IFREG| 0444, 0);
                         if (rc) {
@@ -117,15 +129,15 @@ int main(int argc, char ** argv)
                         }
                 }
                 if ((i % 10000) == 0) {
-                        printf(" - created %d (time %ld ; total %ld ; last %ld)\n",
+                        printf(" - created %d (time %ld total %ld last %ld)\n",
                                i, time(0), time(0) - start, time(0) - last);
                         last = time(0);
                 }
         }
         printf("total: %d creates in %ld seconds: %f creates/second\n", i,
                time(0) - start, ((float)i / (time(0) - start)));
-       start = time(0);
-       printf("finish at %s", ctime(&start));
+        start = time(0);
+        printf("finish at %s", ctime(&start));
 
         return rc;
 }
index 8399824..f426488 100644 (file)
@@ -1,3 +1,7 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ */
+
 #include <stdio.h>
 #include <sys/types.h>
 #include <sys/stat.h>
 
 void usage(char *prog)
 {
-       printf("usage: %s {-o|-m} filenamefmt count\n", prog);
-       printf("       %s {-o|-m} filenamefmt -seconds\n", prog);
-       printf("       %s {-o|-m} filenamefmt start count\n", prog);
+        printf("usage: %s {-o|-m|-l<tgt>} filenamefmt count\n", prog);
+        printf("       %s {-o|-m|-l<tgt>} filenamefmt -seconds\n", prog);
+        printf("       %s {-o|-m|-l<tgt>} filenamefmt start count\n", prog);
 }
 
 int main(int argc, char ** argv)
 {
-        int i, rc = 0, do_open;
-        char format[4096], *fmt;
+        int i, rc = 0, do_open = 0, do_link = 0;
+        char format[4096], *fmt, *tgt;
         char filename[4096];
         long start, last, end;
-       long begin = 0, count;
+        long begin = 0, count;
 
         if (argc < 4 || argc > 5) {
-               usage(argv[0]);
+                usage(argv[0]);
                 return 1;
         }
 
         if (strcmp(argv[1], "-o") == 0) {
                 do_open = 1;
-        } else if (strcmp(argv[1], "-m") == 0) {
-                do_open = 0;
-        } else {
-               usage(argv[0]);
+        } else if (strncmp(argv[1], "-l", 2) == 0 && argv[1][2]) {
+                tgt = argv[1] + 2;
+                do_link = 1;
+        } else if (strcmp(argv[1], "-m") != 0) {
+                usage(argv[0]);
                 return 1;
         }
 
@@ -44,27 +49,27 @@ int main(int argc, char ** argv)
 
         start = last = time(0);
 
-       if (argc == 4) {
-               end = strtol(argv[3], NULL, 0);
-               if (end > 0) {
-                       count = end;
-                       end = -1UL >> 1;
-               } else {
-                       end = start - end;
-                       count = -1UL >> 1;
-               }
-       } else {
-               end = -1UL >> 1;
-               begin = strtol(argv[3], NULL, 0);
-               count = strtol(argv[4], NULL, 0);
-       }
+        if (argc == 4) {
+                end = strtol(argv[3], NULL, 0);
+        } else {
+                begin = strtol(argv[3], NULL, 0);
+                end = strtol(argv[4], NULL, 0);
+        }
+
+        if (end > 0) {
+                count = end;
+                end = -1UL >> 1;
+        } else {
+                end = start - end;
+                count = -1UL >> 1;
+        }
 
-       if (strchr(argv[2], '%'))
-               fmt = argv[2];
-       else {
-               sprintf(format, "%s%%d", argv[2]);
-               fmt = format;
-       }
+        if (strchr(argv[2], '%'))
+                fmt = argv[2];
+        else {
+                sprintf(format, "%s%%d", argv[2]);
+                fmt = format;
+        }
         for (i = 0; i < count && time(0) < end; i++, begin++) {
                 sprintf(filename, fmt, begin);
                 if (do_open) {
@@ -76,6 +81,14 @@ int main(int argc, char ** argv)
                                 break;
                         }
                         close(fd);
+               } else if (do_link) {
+                        rc = link(tgt, filename);
+                        if (rc) {
+                                printf("link(%s, %s) error: %s\n",
+                                      tgt, filename, strerror(errno));
+                                rc = errno;
+                                break;
+                        }
                 } else {
                         rc = mknod(filename, S_IFREG| 0444, 0);
                         if (rc) {
@@ -86,7 +99,7 @@ int main(int argc, char ** argv)
                         }
                 }
                 if ((i % 10000) == 0) {
-                        printf(" - created %d (time %ld ; total %ld ; last %ld)\n",
+                        printf(" - created %d (time %ld total %ld last %ld)\n",
                                i, time(0), time(0) - start, time(0) - last);
                         last = time(0);
                 }
index 14bd6b2..b3173e9 100644 (file)
@@ -231,6 +231,11 @@ test_11() {
 }
 run_test 11 "execution of file opened for write should return error ===="
 
+test_12() {
+       sh lockorder.sh
+}
+run_test 12 "test lock ordering (link, stat, unlink) ==========="
+
 log "cleanup: ======================================================"
 rm -rf $DIR1/[df][0-9]* $DIR1/lnk || true
 echo '=========================== finished ==============================='