Whamcloud - gitweb
merge b_devel into HEAD, which will become 0.7.3
[fs/lustre-release.git] / lustre / mds / mds_reint.c
index 50949dd..61871d7 100644 (file)
 #include <linux/lustre_mds.h>
 #include <linux/lustre_dlm.h>
 #include <linux/lustre_fsfilt.h>
+
 #include "mds_internal.h"
 
-extern inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req);
+void mds_commit_cb(struct obd_device *obd, __u64 transno, void *data,
+                   int error)
+{
+        obd_transno_commit_cb(obd, transno, error);
+}
+
+struct mds_logcancel_data {
+        struct lov_mds_md      *mlcd_lmm;
+        int                     mlcd_size;
+        int                     mlcd_cookielen;
+        int                     mlcd_eadatalen;
+        struct llog_cookie      mlcd_cookies[0];
+};
+
+/* Establish a connection to the OSC when we first need it.  We don't do
+ * this during MDS setup because that would introduce setup ordering issues. */
+static int mds_osc_connect(struct obd_device *obd, struct mds_obd *mds)
+{
+        int rc;
+        ENTRY;
+
+        if (IS_ERR(mds->mds_osc_obd))
+                RETURN(PTR_ERR(mds->mds_osc_obd));
+
+        if (mds->mds_osc_obd)
+                RETURN(0);
+
+        mds->mds_osc_obd = class_uuid2obd(&mds->mds_osc_uuid);
+        if (!mds->mds_osc_obd) {
+                CERROR("MDS cannot locate OSC/LOV %s - no logging!\n",
+                       mds->mds_osc_uuid.uuid);
+                mds->mds_osc_obd = ERR_PTR(-ENOTCONN);
+                RETURN(-ENOTCONN);
+        }
+
+        rc = obd_connect(&mds->mds_osc_conn, mds->mds_osc_obd, &obd->obd_uuid);
+        if (rc) {
+                CERROR("MDS cannot locate OSC/LOV %s - no logging!\n",
+                       mds->mds_osc_uuid.uuid);
+                mds->mds_osc_obd = ERR_PTR(rc);
+                RETURN(rc);
+        }
+
+        rc = obd_set_info(&mds->mds_osc_conn, strlen("mds_conn"), "mds_conn",
+                          0, NULL);
+        RETURN(rc);
+}
 
-static void mds_commit_cb(struct obd_device *obd, __u64 transno, int error)
+static void mds_cancel_cookies_cb(struct obd_device *obd, __u64 transno,
+                                  void *cb_data, int error)
 {
+        struct mds_logcancel_data *mlcd = cb_data;
+        struct lov_stripe_md *lsm = NULL;
+        int rc;
+
         obd_transno_commit_cb(obd, transno, error);
+
+        CDEBUG(D_HA, "cancelling %d cookies\n",
+               (int)(mlcd->mlcd_cookielen / sizeof(*mlcd->mlcd_cookies)));
+
+        rc = obd_unpackmd(&obd->u.mds.mds_osc_conn, &lsm, mlcd->mlcd_lmm,
+                          mlcd->mlcd_eadatalen);
+        if (rc < 0) {
+                CERROR("bad LSM cancelling %d log cookies: rc %d\n",
+                       (int)(mlcd->mlcd_cookielen/sizeof(*mlcd->mlcd_cookies)),
+                       rc);
+        } else {
+                rc = obd_log_cancel(&obd->u.mds.mds_osc_conn, lsm,
+                                    mlcd->mlcd_cookielen /
+                                    sizeof(*mlcd->mlcd_cookies),
+                                    mlcd->mlcd_cookies, OBD_LLOG_FL_SENDNOW);
+                ///* XXX 0 normally, SENDNOW for debug */);
+                if (rc)
+                        CERROR("error cancelling %d log cookies: rc %d\n",
+                               (int)(mlcd->mlcd_cookielen /
+                                     sizeof(*mlcd->mlcd_cookies)), rc);
+        }
+
+        OBD_FREE(mlcd, mlcd->mlcd_size);
 }
 
 /* Assumes caller has already pushed us into the kernel context. */
-int mds_finish_transno(struct mds_obd *mds, struct inode *i, void *handle,
-                       struct ptlrpc_request *req, int rc,
-                       __u32 op_data)
+int mds_finish_transno(struct mds_obd *mds, struct inode *inode, void *handle,
+                       struct ptlrpc_request *req, int rc, __u32 op_data)
 {
         struct mds_export_data *med = &req->rq_export->exp_mds_data;
         struct mds_client_data *mcd = med->med_mcd;
@@ -70,15 +144,15 @@ int mds_finish_transno(struct mds_obd *mds, struct inode *i, void *handle,
 
         if (!handle) {
                 /* if we're starting our own xaction, use our own inode */
-                i = mds->mds_rcvd_filp->f_dentry->d_inode;
-                handle = fsfilt_start(obd, i, FSFILT_OP_SETATTR);
+                inode = mds->mds_rcvd_filp->f_dentry->d_inode;
+                handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
                 if (IS_ERR(handle)) {
                         CERROR("fsfilt_start: %ld\n", PTR_ERR(handle));
                         GOTO(out, rc = PTR_ERR(handle));
                 }
         }
 
-        off = MDS_LR_CLIENT + med->med_off * MDS_LR_SIZE;
+        off = med->med_off;
 
         transno = req->rq_reqmsg->transno;
         if (transno == 0) {
@@ -94,10 +168,11 @@ int mds_finish_transno(struct mds_obd *mds, struct inode *i, void *handle,
         mcd->mcd_last_data = cpu_to_le32(op_data);
 
         fsfilt_set_last_rcvd(req->rq_export->exp_obd, transno, handle,
-                             mds_commit_cb);
-        written = lustre_fwrite(mds->mds_rcvd_filp, mcd, sizeof(*mcd), &off);
-        CDEBUG(D_INODE, "wrote trans "LPU64" client %s at #%u: written = "
-               LPSZ"\n", transno, mcd->mcd_uuid, med->med_off, written);
+                             mds_commit_cb, NULL);
+        written = fsfilt_write_record(obd, mds->mds_rcvd_filp,
+                                      (char *)mcd, sizeof(*mcd), &off);
+        CDEBUG(D_INODE, "wrote trans "LPU64" client %s at idx %u: written = "
+               LPSZ"\n", transno, mcd->mcd_uuid, med->med_idx, written);
 
         if (written != sizeof(*mcd)) {
                 CERROR("error writing to last_rcvd: rc = "LPSZ"\n", written);
@@ -110,7 +185,7 @@ int mds_finish_transno(struct mds_obd *mds, struct inode *i, void *handle,
         }
 
 commit:
-        err = fsfilt_commit(obd, i, handle, 0);
+        err = fsfilt_commit(obd, inode, handle, 0);
         if (err) {
                 CERROR("error committing transaction: %d\n", err);
                 if (!rc)
@@ -139,22 +214,29 @@ int mds_fix_attr(struct inode *inode, struct mds_update_record *rec)
         if (!(ia_valid & ATTR_RAW))
                 RETURN(0);
 
-        if (IS_IMMUTABLE(inode) || IS_APPEND(inode))
-                RETURN(-EPERM);
-
-        LTIME_S(attr->ia_ctime) = now;
+        if (!(ia_valid & ATTR_CTIME_SET))
+                LTIME_S(attr->ia_ctime) = now;
         if (!(ia_valid & ATTR_ATIME_SET))
                 LTIME_S(attr->ia_atime) = now;
         if (!(ia_valid & ATTR_MTIME_SET))
                 LTIME_S(attr->ia_mtime) = now;
 
+        if (IS_IMMUTABLE(inode) || IS_APPEND(inode))
+                RETURN(-EPERM);
+
         /* times */
-        if ((ia_valid & (ATTR_MTIME|ATTR_ATIME))==(ATTR_MTIME|ATTR_ATIME) &&
-             !(ia_valid & ATTR_ATIME_SET)) {
+        if ((ia_valid & (ATTR_MTIME|ATTR_ATIME))==(ATTR_MTIME|ATTR_ATIME)) {
                 if (rec->ur_fsuid != inode->i_uid &&
                     (error = permission(inode,MAY_WRITE)) != 0)
                         RETURN(error);
-        } else if (ia_valid & ATTR_UID) {
+        }
+
+        if (ia_valid & ATTR_SIZE) {
+                if ((error = permission(inode,MAY_WRITE)) != 0)
+                        RETURN(error);
+        }
+
+        if (ia_valid & ATTR_UID) {
                 /* chown */
                 error = -EPERM;
                 if (IS_IMMUTABLE(inode) || IS_APPEND(inode))
@@ -164,7 +246,6 @@ int mds_fix_attr(struct inode *inode, struct mds_update_record *rec)
                 if (attr->ia_gid == (gid_t) -1)
                         attr->ia_gid = inode->i_gid;
                 attr->ia_mode = inode->i_mode;
-                attr->ia_valid =  ATTR_UID | ATTR_GID | ATTR_CTIME;
                 /*
                  * If the user or group of a non-directory has been
                  * changed by a non-root user, remove the setuid bit.
@@ -232,6 +313,14 @@ static void reconstruct_reint_setattr(struct mds_update_record *rec,
         mds_pack_inode2fid(&body->fid1, de->d_inode);
         mds_pack_inode2body(body, de->d_inode);
 
+        /* Don't return OST-specific attributes if we didn't just set them */
+        if (rec->ur_iattr.ia_valid & ATTR_SIZE)
+                body->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
+        if (rec->ur_iattr.ia_valid & (ATTR_MTIME | ATTR_MTIME_SET))
+                body->valid |= OBD_MD_FLMTIME;
+        if (rec->ur_iattr.ia_valid & (ATTR_ATIME | ATTR_ATIME_SET))
+                body->valid |= OBD_MD_FLATIME;
+
         l_dput(de);
 }
 
@@ -251,6 +340,7 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset,
         struct inode *inode = NULL;
         struct lustre_handle lockh;
         void *handle = NULL;
+        struct mds_logcancel_data *mlcd = NULL;
         int rc = 0, cleanup_phase = 0, err, locked = 0;
         ENTRY;
 
@@ -279,21 +369,28 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset,
         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_SETATTR_WRITE,
                        to_kdev_t(inode->i_sb->s_dev));
 
-        handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR);
+#ifdef ENABLE_ORPHANS
+        if (unlikely(mds->mds_osc_obd == NULL))
+                mds_osc_connect(obd, mds);
+#endif
+
+        handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
         if (IS_ERR(handle)) {
                 rc = PTR_ERR(handle);
                 handle = NULL;
                 GOTO(cleanup, rc);
         }
 
+        if (rec->ur_iattr.ia_valid & (ATTR_MTIME | ATTR_CTIME))
+                CDEBUG(D_INODE, "setting mtime %lu, ctime %lu\n",
+                       LTIME_S(rec->ur_iattr.ia_mtime),
+                       LTIME_S(rec->ur_iattr.ia_ctime));
         rc = mds_fix_attr(inode, rec);
         if (rc)
                 GOTO(cleanup, rc);
 
         rc = fsfilt_setattr(obd, de, handle, &rec->ur_iattr, 0);
-        if (rc == 0 &&
-            S_ISREG(inode->i_mode) &&
-            rec->ur_eadata != NULL) {
+        if (rc == 0 && S_ISREG(inode->i_mode) && rec->ur_eadata != NULL) {
                 rc = fsfilt_set_md(obd, inode, handle,
                                    rec->ur_eadata, rec->ur_eadatalen);
         }
@@ -302,10 +399,39 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset,
         mds_pack_inode2fid(&body->fid1, inode);
         mds_pack_inode2body(body, inode);
 
+        /* Don't return OST-specific attributes if we didn't just set them */
+        if (rec->ur_iattr.ia_valid & ATTR_SIZE)
+                body->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
+        if (rec->ur_iattr.ia_valid & (ATTR_MTIME | ATTR_MTIME_SET))
+                body->valid |= OBD_MD_FLMTIME;
+        if (rec->ur_iattr.ia_valid & (ATTR_ATIME | ATTR_ATIME_SET))
+                body->valid |= OBD_MD_FLATIME;
+
+        if (rc == 0 && rec->ur_cookielen && !IS_ERR(mds->mds_osc_obd)) {
+                OBD_ALLOC(mlcd, sizeof(*mlcd) + rec->ur_cookielen +
+                          rec->ur_eadatalen);
+                if (mlcd) {
+                        mlcd->mlcd_size = sizeof(*mlcd) + rec->ur_cookielen +
+                                rec->ur_eadatalen;
+                        mlcd->mlcd_eadatalen = rec->ur_eadatalen;
+                        mlcd->mlcd_cookielen = rec->ur_cookielen;
+                        mlcd->mlcd_lmm = (void *)&mlcd->mlcd_cookies +
+                                mlcd->mlcd_cookielen;
+                        memcpy(&mlcd->mlcd_cookies, rec->ur_logcookies,
+                               mlcd->mlcd_cookielen);
+                        memcpy(mlcd->mlcd_lmm, rec->ur_eadata,
+                               mlcd->mlcd_eadatalen);
+                } else {
+                        CERROR("unable to allocate log cancel data\n");
+                }
+        }
         EXIT;
  cleanup:
+        if (mlcd != NULL)
+                fsfilt_set_last_rcvd(req->rq_export->exp_obd, 0, handle,
+                                     mds_cancel_cookies_cb, mlcd);
         err = mds_finish_transno(mds, inode, handle, req, rc, 0);
-        switch(cleanup_phase) {
+        switch (cleanup_phase) {
         case 1:
                 l_dput(de);
                 if (locked) {
@@ -418,7 +544,7 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
 
         switch (type) {
         case S_IFREG:{
-                handle = fsfilt_start(obd, dir, FSFILT_OP_CREATE);
+                handle = fsfilt_start(obd, dir, FSFILT_OP_CREATE, NULL);
                 if (IS_ERR(handle))
                         GOTO(cleanup, rc = PTR_ERR(handle));
                 rc = vfs_create(dir, dchild, rec->ur_mode);
@@ -426,7 +552,7 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
                 break;
         }
         case S_IFDIR:{
-                handle = fsfilt_start(obd, dir, FSFILT_OP_MKDIR);
+                handle = fsfilt_start(obd, dir, FSFILT_OP_MKDIR, NULL);
                 if (IS_ERR(handle))
                         GOTO(cleanup, rc = PTR_ERR(handle));
                 rc = vfs_mkdir(dir, dchild, rec->ur_mode);
@@ -434,7 +560,7 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
                 break;
         }
         case S_IFLNK:{
-                handle = fsfilt_start(obd, dir, FSFILT_OP_SYMLINK);
+                handle = fsfilt_start(obd, dir, FSFILT_OP_SYMLINK, NULL);
                 if (IS_ERR(handle))
                         GOTO(cleanup, rc = PTR_ERR(handle));
                 if (rec->ur_tgt == NULL)        /* no target supplied */
@@ -449,7 +575,7 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
         case S_IFIFO:
         case S_IFSOCK:{
                 int rdev = rec->ur_rdev;
-                handle = fsfilt_start(obd, dir, FSFILT_OP_MKNOD);
+                handle = fsfilt_start(obd, dir, FSFILT_OP_MKNOD, NULL);
                 if (IS_ERR(handle))
                         GOTO(cleanup, (handle = NULL, rc = PTR_ERR(handle)));
                 rc = vfs_mknod(dir, dchild, rec->ur_mode, rdev);
@@ -458,13 +584,13 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
         }
         default:
                 CERROR("bad file type %o creating %s\n", type, rec->ur_name);
+                dchild->d_fsdata = NULL;
                 GOTO(cleanup, rc = -EINVAL);
         }
 
-        /* In case we stored the desired inum in here, we want to clean up.
-         * We also do this in the cleanup block, for the error cases.
-         */
-        dchild->d_fsdata = NULL;
+        /* In case we stored the desired inum in here, we want to clean up. */
+        if (dchild->d_fsdata == (void *)(unsigned long)rec->ur_fid2->id)
+                dchild->d_fsdata = NULL;
 
         if (rc) {
                 CDEBUG(D_INODE, "error during create: %d\n", rc);
@@ -532,7 +658,6 @@ cleanup:
         }
         switch (cleanup_phase) {
         case 2: /* child dentry */
-                dchild->d_fsdata = NULL;
                 l_dput(dchild);
         case 1: /* locked parent dentry */
                 if (rc) {
@@ -634,43 +759,134 @@ static void reconstruct_reint_unlink(struct mds_update_record *rec, int offset,
                   "can't get EA for reconstructed unlink, leaking OST inodes");
 }
 
+/* If we are unlinking an open file/dir (i.e. creating an orphan) then
+ * we instead link the inode into the PENDING directory until it is
+ * finally released.  We can't simply call mds_reint_rename() or some
+ * part thereof, because we don't have the inode to check for link
+ * count/open status until after it is locked.
+ *
+ * For lock ordering, we always get the PENDING, then pending_child lock
+ * last to avoid deadlocks.
+ */
+static int mds_unlink_orphan(struct mds_update_record *rec,
+                             struct obd_device *obd, struct dentry *dparent,
+                             struct dentry *dchild, void **handle)
+{
+        struct mds_obd *mds = &obd->u.mds;
+        struct inode *pending_dir = mds->mds_pending_dir->d_inode;
+        struct dentry *pending_child;
+        char fidname[LL_FID_NAMELEN];
+        int fidlen = 0, rc;
+        ENTRY;
+
+        LASSERT(!mds_inode_is_orphan(dchild->d_inode));
+
+        down(&pending_dir->i_sem);
+        fidlen = ll_fid2str(fidname, dchild->d_inode->i_ino,
+                            dchild->d_inode->i_generation);
+
+        CDEBUG(D_ERROR, "pending destroy of %dx open file %s = %s\n",
+               mds_open_orphan_count(dchild->d_inode),
+               rec->ur_name, fidname);
+
+        pending_child = lookup_one_len(fidname, mds->mds_pending_dir, fidlen);
+        if (IS_ERR(pending_child))
+                GOTO(out_lock, rc = PTR_ERR(pending_child));
+
+        if (pending_child->d_inode != NULL) {
+                CERROR("re-destroying orphan file %s?\n", rec->ur_name);
+                LASSERT(pending_child->d_inode == dchild->d_inode);
+                GOTO(out_dput, rc = 0);
+        }
+
+        *handle = fsfilt_start(obd, pending_dir, FSFILT_OP_RENAME, NULL);
+        if (IS_ERR(*handle))
+                GOTO(out_dput, rc = PTR_ERR(*handle));
+
+        rc = vfs_rename(dparent->d_inode, dchild, pending_dir, pending_child);
+        if (rc)
+                CERROR("error renaming orphan %lu/%s to PENDING: rc = %d\n",
+                       dparent->d_inode->i_ino, rec->ur_name, rc);
+        else
+                mds_inode_set_orphan(dchild->d_inode);
+out_dput:
+        dput(pending_child);
+out_lock:
+        up(&pending_dir->i_sem);
+        RETURN(rc);
+}
+
+static int mds_log_op_unlink(struct obd_device *obd, struct mds_obd *mds,
+                             struct inode *inode, struct lustre_msg *repmsg,
+                             int offset)
+{
+        struct lov_stripe_md *lsm = NULL;
+        struct llog_unlink_rec *lur;
+        int rc;
+        ENTRY;
+
+        if (IS_ERR(mds->mds_osc_obd))
+                RETURN(PTR_ERR(mds->mds_osc_obd));
+
+        rc = obd_unpackmd(&mds->mds_osc_conn, &lsm,
+                          lustre_msg_buf(repmsg, offset, 0),
+                          repmsg->buflens[offset]);
+        if (rc < 0)
+                RETURN(rc);
+
+        OBD_ALLOC(lur, sizeof(*lur));
+        if (!lur)
+                RETURN(-ENOMEM);
+        lur->lur_hdr.lth_len = lur->lur_end_len = sizeof(*lur);
+        lur->lur_hdr.lth_type = MDS_UNLINK_REC;
+        lur->lur_oid = inode->i_ino;
+        lur->lur_ogen = inode->i_generation;
+
+        rc = obd_log_add(&mds->mds_osc_conn, mds->mds_catalog, &lur->lur_hdr,
+                         lsm, lustre_msg_buf(repmsg, offset + 1, 0),
+                         repmsg->buflens[offset+1]/sizeof(struct llog_cookie));
+
+        obd_free_memmd(&mds->mds_osc_conn, &lsm);
+        OBD_FREE(lur, sizeof(*lur));
+
+        RETURN(rc);
+}
+
 static int mds_reint_unlink(struct mds_update_record *rec, int offset,
                             struct ptlrpc_request *req,
-                            struct lustre_handle *child_lockh)
+                            struct lustre_handle *lh)
 {
-        struct dentry *dir_de = NULL;
+        struct dentry *dparent = NULL;
         struct dentry *dchild = NULL;
         struct mds_obd *mds = mds_req2mds(req);
         struct obd_device *obd = req->rq_export->exp_obd;
         struct mds_body *body = NULL;
-        struct inode *dir_inode = NULL, *child_inode;
-        struct lustre_handle parent_lockh;
+        struct inode *child_inode;
+        struct lustre_handle parent_lockh, child_lockh;
         void *handle = NULL;
         struct ldlm_res_id child_res_id = { .name = {0} };
-        int rc = 0, flags = 0, return_lock = 0;
-        int cleanup_phase = 0;
+        int rc = 0, flags = 0, log_unlink = 0, cleanup_phase = 0;
         ENTRY;
 
         LASSERT(offset == 0 || offset == 2);
 
         MDS_CHECK_RESENT(req, reconstruct_reint_unlink(rec, offset, req,
-                                                       child_lockh));
+                                                       &child_lockh));
 
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
                 GOTO(cleanup, rc = -ENOENT);
 
         /* Step 1: Lookup the parent by FID */
-        dir_de = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PW,
-                                       &parent_lockh);
-        if (IS_ERR(dir_de))
-                GOTO(cleanup, rc = PTR_ERR(dir_de));
-        dir_inode = dir_de->d_inode;
-        LASSERT(dir_inode);
+        dparent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PW,
+                                        &parent_lockh);
+        if (IS_ERR(dparent))
+                GOTO(cleanup, rc = PTR_ERR(dparent));
+        LASSERT(dparent->d_inode);
 
         cleanup_phase = 1; /* Have parent dentry lock */
 
         /* Step 2: Lookup the child */
-        dchild = ll_lookup_one_len(rec->ur_name, dir_de, rec->ur_namelen - 1);
+        dchild = ll_lookup_one_len(rec->ur_name, dparent, rec->ur_namelen - 1);
         if (IS_ERR(dchild))
                 GOTO(cleanup, rc = PTR_ERR(dchild));
 
@@ -678,15 +894,13 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
 
         child_inode = dchild->d_inode;
         if (child_inode == NULL) {
-                CDEBUG(D_INODE,
-                       "child doesn't exist (dir %lu, name %s)\n",
-                       dir_inode->i_ino, rec->ur_name);
-                rc = -ENOENT;
-                GOTO(cleanup, rc);
+                CDEBUG(D_INODE, "child doesn't exist (dir %lu, name %s)\n",
+                       dparent->d_inode->i_ino, rec->ur_name);
+                GOTO(cleanup, rc = -ENOENT);
         }
 
         DEBUG_REQ(D_INODE, req, "parent ino %lu, child ino %lu",
-                  dir_inode->i_ino, child_inode->i_ino);
+                  dparent->d_inode->i_ino, child_inode->i_ino);
 
         /* Step 3: Get a lock on the child */
         child_res_id.name[0] = child_inode->i_ino;
@@ -695,14 +909,14 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
                               child_res_id, LDLM_PLAIN, NULL, 0, LCK_EX,
                               &flags, ldlm_completion_ast, mds_blocking_ast,
-                              NULL, child_lockh);
+                              NULL, &child_lockh);
         if (rc != ELDLM_OK)
                 GOTO(cleanup, rc);
 
         cleanup_phase = 3; /* child lock */
 
         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_UNLINK_WRITE,
-                       to_kdev_t(dir_inode->i_sb->s_dev));
+                       to_kdev_t(dparent->d_inode->i_sb->s_dev));
 
         /* ldlm_reply in buf[0] if called via intent */
         if (offset)
@@ -711,43 +925,89 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
         body = lustre_msg_buf(req->rq_repmsg, offset, sizeof (*body));
         LASSERT(body != NULL);
 
-        /* Step 4: Do the unlink: client decides between rmdir/unlink!
-         * (bug 72) */
+#ifdef ENABLE_ORPHANS
+        if (unlikely(mds->mds_osc_obd == NULL))
+                mds_osc_connect(obd, mds);
+#endif
+
+        /* If this is the last reference to this inode, get the OBD EA
+         * data first so the client can destroy OST objects */
+        if (S_ISREG(child_inode->i_mode) && child_inode->i_nlink == 1) {
+                mds_pack_inode2fid(&body->fid1, child_inode);
+                mds_pack_inode2body(body, child_inode);
+                mds_pack_md(obd, req->rq_repmsg, offset + 1, body, child_inode);
+                if (!(body->valid & OBD_MD_FLEASIZE)) {
+                        body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
+                                        OBD_MD_FLATIME | OBD_MD_FLMTIME);
+                        log_unlink = 1;
+                }
+        }
+
+        /* We have to do these checks ourselves, in case we are making an
+         * orphan.  The client tells us whether rmdir() or unlink() was called,
+         * so we need to return appropriate errors (bug 72).
+         *
+         * We don't have to check permissions, because vfs_rename (called from
+         * mds_unlink_orphan) also calls may_delete. */
+        if ((rec->ur_mode & S_IFMT) == S_IFDIR) {
+                if (!S_ISDIR(child_inode->i_mode))
+                        GOTO(cleanup, rc = -ENOTDIR);
+        } else {
+                if (S_ISDIR(child_inode->i_mode))
+                        GOTO(cleanup, rc = -EISDIR);
+        }
+
+        if (mds_open_orphan_count(child_inode) > 0) {
+                rc = mds_unlink_orphan(rec, obd, dparent, dchild, &handle);
+#ifdef ENABLE_ORPHANS
+                if (!rc && mds_log_op_unlink(obd, mds, child_inode,
+                                             req->rq_repmsg, offset + 1) > 0)
+                        body->valid |= OBD_MD_FLCOOKIE;
+#endif
+                GOTO(cleanup, rc);
+        }
+
+        // Step 4: Do the unlink: client decides between rmdir/unlink! (bug 72)
         switch (rec->ur_mode & S_IFMT) {
         case S_IFDIR:
                 /* Drop any lingering child directories before we start our
                  * transaction, to avoid doing multiple inode dirty/delete
-                 * in our compound transaction (bug 1321).
-                 */
+                 * in our compound transaction (bug 1321). */
                 shrink_dcache_parent(dchild);
-                handle = fsfilt_start(obd, dir_inode, FSFILT_OP_RMDIR);
+                handle = fsfilt_start(obd, dparent->d_inode, FSFILT_OP_RMDIR,
+                                      NULL);
                 if (IS_ERR(handle))
                         GOTO(cleanup, rc = PTR_ERR(handle));
                 cleanup_phase = 4;
-                rc = vfs_rmdir(dir_inode, dchild);
+                rc = vfs_rmdir(dparent->d_inode, dchild);
                 break;
-        case S_IFREG:
-                /* If this is the last reference to this inode, get the OBD EA
-                 * data first so the client can destroy OST objects */
-                if (S_ISREG(child_inode->i_mode) && child_inode->i_nlink == 1) {
-                        mds_pack_inode2fid(&body->fid1, child_inode);
-                        mds_pack_inode2body(body, child_inode);
-                        mds_pack_md(obd, req->rq_repmsg, offset + 1,
-                                    body, child_inode);
-                        if (body->valid & OBD_MD_FLEASIZE)
-                                return_lock = 1;
-                }
-                /* no break */
+        case S_IFREG: {
+                handle = fsfilt_start(obd, dparent->d_inode,
+                                      FSFILT_OP_UNLINK_LOG, NULL);
+                if (IS_ERR(handle))
+                        GOTO(cleanup, rc = PTR_ERR(handle));
+
+                cleanup_phase = 4;
+                rc = vfs_unlink(dparent->d_inode, dchild);
+#ifdef ENABLE_ORPHANS
+                if (!rc && log_unlink)
+                        if (mds_log_op_unlink(obd, mds, child_inode,
+                                              req->rq_repmsg, offset + 1) > 0)
+                                body->valid |= OBD_MD_FLCOOKIE;
+#endif
+                break;
+        }
         case S_IFLNK:
         case S_IFCHR:
         case S_IFBLK:
         case S_IFIFO:
         case S_IFSOCK:
-                handle = fsfilt_start(obd, dir_inode, FSFILT_OP_UNLINK);
+                handle = fsfilt_start(obd, dparent->d_inode, FSFILT_OP_UNLINK,
+                                      NULL);
                 if (IS_ERR(handle))
                         GOTO(cleanup, rc = PTR_ERR(handle));
                 cleanup_phase = 4;
-                rc = vfs_unlink(dir_inode, dchild);
+                rc = vfs_unlink(dparent->d_inode, dchild);
                 break;
         default:
                 CERROR("bad file type %o unlinking %s\n", rec->ur_mode,
@@ -758,29 +1018,29 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
 
  cleanup:
         switch(cleanup_phase) {
-            case 4:
-                rc = mds_finish_transno(mds, dir_inode, handle, req, rc, 0);
-                if (rc && body) {
-                        /* Don't unlink the OST objects if the MDS unlink failed */
+        case 4:
+                rc = mds_finish_transno(mds, dparent->d_inode, handle, req,
+                                        rc, 0);
+                if (rc && body != NULL) {
+                        // Don't unlink the OST objects if the MDS unlink failed
                         body->valid = 0;
                 }
-            case 3: /* child lock */
-                if (rc != 0 || return_lock == 0)
-                        ldlm_lock_decref(child_lockh, LCK_EX);
-            case 2: /* child dentry */
+        case 3: /* child lock */
+                ldlm_lock_decref(&child_lockh, LCK_EX);
+        case 2: /* child dentry */
                 l_dput(dchild);
-            case 1: /* parent dentry and lock */
+        case 1: /* parent dentry and lock */
                 if (rc) {
-                        ldlm_lock_decref(&parent_lockh, LCK_EX);
+                        ldlm_lock_decref(&parent_lockh, LCK_PW);
                 } else {
                         memcpy(&req->rq_ack_locks[0].lock, &parent_lockh,
                                sizeof(parent_lockh));
-                        req->rq_ack_locks[0].mode = LCK_EX;
+                        req->rq_ack_locks[0].mode = LCK_PW;
                 }
-                l_dput(dir_de);
-            case 0:
+                l_dput(dparent);
+        case 0:
                 break;
-            default:
+        default:
                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
                 LBUG();
         }
@@ -857,8 +1117,10 @@ static int mds_reint_link(struct mds_update_record *rec, int offset,
         /* Step 3: Lookup the child */
         dchild = ll_lookup_one_len(rec->ur_name, de_tgt_dir, rec->ur_namelen-1);
         if (IS_ERR(dchild)) {
-                CERROR("child lookup error %ld\n", PTR_ERR(dchild));
-                GOTO(cleanup, rc = PTR_ERR(dchild));
+                rc = PTR_ERR(dchild);
+                if (rc != -EPERM && rc != -EACCES)
+                        CERROR("child lookup error %d\n", rc);
+                GOTO(cleanup, rc);
         }
 
         cleanup_phase = 4; /* child dentry */
@@ -874,15 +1136,15 @@ static int mds_reint_link(struct mds_update_record *rec, int offset,
         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_LINK_WRITE,
                        to_kdev_t(de_src->d_inode->i_sb->s_dev));
 
-        handle = fsfilt_start(obd, de_tgt_dir->d_inode, FSFILT_OP_LINK);
+        handle = fsfilt_start(obd, de_tgt_dir->d_inode, FSFILT_OP_LINK, NULL);
         if (IS_ERR(handle)) {
                 rc = PTR_ERR(handle);
                 GOTO(cleanup, rc);
         }
 
         rc = vfs_link(de_src, de_tgt_dir->d_inode, dchild);
-        if (rc)
-                CERROR("link error %d\n", rc);
+        if (rc && rc != -EPERM && rc != -EACCES)
+                CERROR("vfs_link error %d\n", rc);
 cleanup:
         rc = mds_finish_transno(mds, de_tgt_dir ? de_tgt_dir->d_inode : NULL,
                                 handle, req, rc, 0);
@@ -1057,13 +1319,12 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset,
         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_RENAME_WRITE,
                        to_kdev_t(de_srcdir->d_inode->i_sb->s_dev));
 
-        handle = fsfilt_start(obd, de_tgtdir->d_inode, FSFILT_OP_RENAME);
+        handle = fsfilt_start(obd, de_tgtdir->d_inode, FSFILT_OP_RENAME, NULL);
         if (IS_ERR(handle))
                 GOTO(cleanup, rc = PTR_ERR(handle));
 
         lock_kernel();
-        rc = vfs_rename(de_srcdir->d_inode, de_old, de_tgtdir->d_inode, de_new,
-                        NULL);
+        rc = vfs_rename(de_srcdir->d_inode, de_old, de_tgtdir->d_inode, de_new);
         unlock_kernel();
 
         EXIT;