Whamcloud - gitweb
Changed quotes for newer bash
[fs/lustre-release.git] / lustre / mds / mds_unlink_open.c
index c11328c..c480885 100644 (file)
@@ -32,7 +32,7 @@
 #include <linux/module.h>
 #include <linux/version.h>
 
-#include <portals/list.h>
+#include <libcfs/list.h>
 #include <linux/obd_class.h>
 #include <linux/lustre_fsfilt.h>
 #include <linux/lustre_commit_confd.h>
 
 #include "mds_internal.h"
 
-
-/* If we are unlinking an open file/dir (i.e. creating an orphan) then
- * we instead link the inode into the PENDING directory until it is
- * finally released.  We can't simply call mds_reint_rename() or some
- * part thereof, because we don't have the inode to check for link
- * count/open status until after it is locked.
- *
- * For lock ordering, we always get the PENDING, then pending_child lock
- * last to avoid deadlocks.
+/*
+ * used when destroying orphanes and from mds_reint_unlink() when MDS wants to
+ * destroy objects on OSS.
  */
-
-int mds_open_unlink_rename(struct mds_update_record *rec,
-                           struct obd_device *obd, struct dentry *dparent,
-                           struct dentry *dchild, void **handle)
-{
-        struct mds_obd *mds = &obd->u.mds;
-        struct inode *pending_dir = mds->mds_pending_dir->d_inode;
-        struct dentry *pending_child;
-        char fidname[LL_FID_NAMELEN];
-        int fidlen = 0, rc;
-        unsigned mode;
-        ENTRY;
-
-        LASSERT(!mds_inode_is_orphan(dchild->d_inode));
-
-        down(&pending_dir->i_sem);
-        fidlen = ll_fid2str(fidname, dchild->d_inode->i_ino,
-                            dchild->d_inode->i_generation);
-
-        CDEBUG(D_HA, "pending destroy of %dx open file %s = %s\n",
-               mds_open_orphan_count(dchild->d_inode), rec->ur_name, fidname);
-
-        pending_child = lookup_one_len(fidname, mds->mds_pending_dir, fidlen);
-        if (IS_ERR(pending_child))
-                GOTO(out_lock, rc = PTR_ERR(pending_child));
-
-        if (pending_child->d_inode != NULL) {
-                CERROR("re-destroying orphan file %s?\n", rec->ur_name);
-                LASSERT(pending_child->d_inode == dchild->d_inode);
-                GOTO(out_dput, rc = 0);
-        }
-
-        /* link() is semanticaly-wrong for S_IFDIR, so we set S_IFREG
-         * for linking and return real mode back then -bzzz */
-        mode = dchild->d_inode->i_mode;
-        dchild->d_inode->i_mode = S_IFREG;
-        rc = vfs_link(dchild, pending_dir, pending_child);
-        if (rc)
-                CERROR("error linking orphan %s to PENDING: rc = %d\n",
-                       rec->ur_name, rc);
-        else
-                mds_inode_set_orphan(dchild->d_inode);
-
-        /* return mode and correct i_nlink if inode is directory */
-        LASSERT(dchild->d_inode->i_nlink == 1);
-        dchild->d_inode->i_mode = mode;
-        if ((mode & S_IFMT) == S_IFDIR) {
-                dchild->d_inode->i_nlink++;
-                pending_dir->i_nlink++;
-        }
-        mark_inode_dirty(dchild->d_inode);
-
-out_dput:
-        dput(pending_child);
-out_lock:
-        up(&pending_dir->i_sem);
-        RETURN(rc);
-}
-
-static int mds_osc_destroy_orphan(struct mds_obd *mds,
-                                  struct inode *inode,
-                                  struct lov_mds_md *lmm,
-                                  int lmm_size,
-                                  struct llog_cookie *logcookies,
-                                  int log_unlink)
+int
+mds_unlink_object(struct mds_obd *mds, struct inode *inode,
+                  struct lov_mds_md *lmm, int lmm_size,
+                  struct llog_cookie *logcookies,
+                  int log_unlink, int async)
 {
         struct lov_stripe_md *lsm = NULL;
         struct obd_trans_info oti = { 0 };
@@ -125,7 +59,7 @@ static int mds_osc_destroy_orphan(struct mds_obd *mds,
         if (lmm_size == 0)
                 RETURN(0);
 
-        rc = obd_unpackmd(mds->mds_osc_exp, &lsm, lmm, lmm_size);
+        rc = obd_unpackmd(mds->mds_dt_exp, &lsm, lmm, lmm_size);
         if (rc < 0) {
                 CERROR("Error unpack md %p\n", lmm);
                 RETURN(rc);
@@ -147,13 +81,16 @@ static int mds_osc_destroy_orphan(struct mds_obd *mds,
                 oti.oti_logcookies = logcookies;
         }
 
-        rc = obd_destroy(mds->mds_osc_exp, oa, lsm, &oti);
+        CDEBUG(D_INODE, "destroy OSS object %d/%d\n",
+               (int)oa->o_id, (int)oa->o_gr);
+
+        if (async)
+                oti.oti_flags |= OBD_MODE_ASYNC;
+        
+        rc = obd_destroy(mds->mds_dt_exp, oa, lsm, &oti);
         obdo_free(oa);
-        if (rc)
-                CDEBUG(D_INODE, "destroy orphan objid 0x"LPX64" on ost error "
-                       "%d\n", lsm->lsm_object_id, rc);
 out_free_memmd:
-        obd_free_memmd(mds->mds_osc_exp, &lsm);
+        obd_free_memmd(mds->mds_dt_exp, &lsm);
         RETURN(rc);
 }
 
@@ -163,32 +100,32 @@ static int mds_unlink_orphan(struct obd_device *obd, struct dentry *dchild,
         struct mds_obd *mds = &obd->u.mds;
         struct lov_mds_md *lmm = NULL;
         struct llog_cookie *logcookies = NULL;
-        int lmm_size = 0, log_unlink = 0;
+        int lmm_size, log_unlink = 0;
         void *handle = NULL;
         int rc, err;
         ENTRY;
 
-        LASSERT(mds->mds_osc_obd != NULL);
+        LASSERT(mds->mds_dt_obd != NULL);
+        LASSERT(obd->obd_recovering == 0);
+
+        /* We don't need to do any of these other things for orhpan dirs,
+         * especially not mds_get_md (may get a default LOV EA, bug 4554) */
+        if (S_ISDIR(inode->i_mode)) {
+                rc = vfs_rmdir(pending_dir, dchild);
+                if (rc)
+                        CERROR("error %d unlinking dir %*s from PENDING\n",
+                               rc, dchild->d_name.len, dchild->d_name.name);
+                RETURN(rc);
+        }
 
-        OBD_ALLOC(lmm, mds->mds_max_mdsize);
+        lmm_size = mds->mds_max_mdsize;
+        OBD_ALLOC(lmm, lmm_size);
         if (lmm == NULL)
                 RETURN(-ENOMEM);
 
-        down(&inode->i_sem);
-        rc = fsfilt_get_md(obd, inode, lmm, mds->mds_max_mdsize);
-        up(&inode->i_sem);
-
-        if (rc < 0) {
-                CERROR("Error %d reading eadata for ino %lu\n",
-                       rc, inode->i_ino);
+        rc = mds_get_md(obd, inode, lmm, &lmm_size, 1, 0);
+        if (rc < 0)
                 GOTO(out_free_lmm, rc);
-        } else if (rc > 0) {
-                lmm_size = rc;
-                rc = mds_convert_lov_ea(obd, inode, lmm, lmm_size);
-                if (rc > 0)
-                        lmm_size = rc;
-                rc = 0;
-        }
 
         handle = fsfilt_start_log(obd, pending_dir, FSFILT_OP_UNLINK, NULL,
                                   le32_to_cpu(lmm->lmm_stripe_count));
@@ -199,34 +136,16 @@ static int mds_unlink_orphan(struct obd_device *obd, struct dentry *dchild,
                 GOTO(out_free_lmm, rc);
         }
 
-        down(&inode->i_sem);
-        rc = fsfilt_get_md(obd, inode, lmm, mds->mds_max_mdsize);
-        up(&inode->i_sem);
-
-        if (rc < 0) {
-                CERROR("Error %d reading eadata for ino %lu\n",
-                       rc, inode->i_ino);
-                GOTO(out_free_lmm, rc);
-        } else if (rc > 0) {
-                lmm_size = rc;
-                rc = 0;
-        }
-
-        if (S_ISDIR(inode->i_mode))
-                rc = vfs_rmdir(pending_dir, dchild);
-        else
-                rc = vfs_unlink(pending_dir, dchild);
-
-        if (rc)
-                CERROR("error %d unlinking orphan %*s from PENDING directory\n",
+        rc = vfs_unlink(pending_dir, dchild);
+        if (rc) {
+                CERROR("error %d unlinking orphan %.*s from PENDING\n",
                        rc, dchild->d_name.len, dchild->d_name.name);
-
-        if (!rc && lmm_size) {
+        } else if (lmm_size) {
                 OBD_ALLOC(logcookies, mds->mds_max_cookiesize);
                 if (logcookies == NULL)
                         rc = -ENOMEM;
                 else if (mds_log_op_unlink(obd, inode, lmm,lmm_size,logcookies,
-                                           mds->mds_max_cookiesize) > 0)
+                                           mds->mds_max_cookiesize, NULL) > 0)
                         log_unlink = 1;
         }
         err = fsfilt_commit(obd, mds->mds_sb, pending_dir, handle, 0);
@@ -234,10 +153,9 @@ static int mds_unlink_orphan(struct obd_device *obd, struct dentry *dchild,
                 CERROR("error committing orphan unlink: %d\n", err);
                 if (!rc)
                         rc = err;
-        }
-        if (!rc) {
-                rc = mds_osc_destroy_orphan(mds, inode, lmm, lmm_size,
-                                            logcookies, log_unlink);
+        } else if (!rc) {
+                rc = mds_unlink_object(mds, inode, lmm, lmm_size,
+                                       logcookies, log_unlink, 0);
         }
 
         if (logcookies != NULL)
@@ -257,11 +175,13 @@ int mds_cleanup_orphans(struct obd_device *obd)
         struct inode *child_inode, *pending_dir = mds->mds_pending_dir->d_inode;
         struct l_linux_dirent *dirent, *n;
         struct list_head dentry_list;
-        char d_name[LL_FID_NAMELEN];
+        char d_name[LL_ID_NAMELEN];
+       unsigned long inum;
         __u64 i = 0;
         int rc = 0, item = 0, namlen;
         ENTRY;
 
+        LASSERT(obd->obd_recovering == 0);
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
         dentry = dget(mds->mds_pending_dir);
         if (IS_ERR(dentry))
@@ -282,21 +202,21 @@ int mds_cleanup_orphans(struct obd_device *obd)
                 GOTO(err_out, rc);
 
         list_for_each_entry_safe(dirent, n, &dentry_list, lld_list) {
-                i ++;
+                i++;
                 list_del(&dirent->lld_list);
 
                 namlen = strlen(dirent->lld_name);
                 LASSERT(sizeof(d_name) >= namlen + 1);
                 strcpy(d_name, dirent->lld_name);
+                inum = dirent->lld_ino;
                 OBD_FREE(dirent, sizeof(*dirent));
 
                 CDEBUG(D_INODE, "entry "LPU64" of PENDING DIR: %s\n",
                        i, d_name);
 
                 if (((namlen == 1) && !strcmp(d_name, ".")) ||
-                    ((namlen == 2) && !strcmp(d_name, ".."))) {
+                    ((namlen == 2) && !strcmp(d_name, "..")) || inum == 0)
                         continue;
-                }
 
                 down(&pending_dir->i_sem);
                 dchild = lookup_one_len(d_name, mds->mds_pending_dir, namlen);
@@ -309,13 +229,21 @@ int mds_cleanup_orphans(struct obd_device *obd)
                         GOTO(next, rc = 0);
                 }
 
+                if (is_bad_inode(dchild->d_inode)) {
+                        CERROR("bad orphan inode found %lu/%u\n",
+                               dchild->d_inode->i_ino,
+                               dchild->d_inode->i_generation);
+                        GOTO(next, rc = -ENOENT);
+                }
+
                 child_inode = dchild->d_inode;
-                if (mds_inode_is_orphan(child_inode) &&
-                    mds_open_orphan_count(child_inode)) {
-                        CWARN("orphan %s was re-opened during recovery\n", d_name);
+                DOWN_READ_I_ALLOC_SEM(child_inode);
+                if (mds_orphan_open_count(child_inode)) {
+                        UP_READ_I_ALLOC_SEM(child_inode);
+                        CWARN("orphan %s re-opened during recovery\n", d_name);
                         GOTO(next, rc = 0);
                 }
-
+                UP_READ_I_ALLOC_SEM(child_inode);
                 rc = mds_unlink_orphan(obd, dchild, child_inode, pending_dir);
                 if (rc == 0) {
                         item ++;