Whamcloud - gitweb
b=14230
[fs/lustre-release.git] / lustre / mds / mds_unlink_open.c
index 20191c6..3c2f4fe 100644 (file)
@@ -8,45 +8,52 @@
  *   Author: Andreas Dilger <adilger@clusterfs.com>
  *   Author: Phil Schwan <phil@clusterfs.com>
  *
- *   This file is part of Lustre, http://www.lustre.org.
+ *   This file is part of the Lustre file system, http://www.lustre.org
+ *   Lustre is a trademark of Cluster File Systems, Inc.
  *
- *   Lustre is free software; you can redistribute it and/or
- *   modify it under the terms of version 2 of the GNU General Public
- *   License as published by the Free Software Foundation.
+ *   You may have signed or agreed to another license before downloading
+ *   this software.  If so, you are bound by the terms and conditions
+ *   of that agreement, and the following does not apply to you.  See the
+ *   LICENSE file included with this distribution for more information.
  *
- *   Lustre is distributed in the hope that it will be useful,
- *   but WITHOUT ANY WARRANTY; without even the implied warranty of
- *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- *   GNU General Public License for more details.
+ *   If you did not agree to a different license, then this copy of Lustre
+ *   is open source software; you can redistribute it and/or modify it
+ *   under the terms of version 2 of the GNU General Public License as
+ *   published by the Free Software Foundation.
  *
- *   You should have received a copy of the GNU General Public License
- *   along with Lustre; if not, write to the Free Software
- *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *   In either case, Lustre is distributed in the hope that it will be
+ *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
+ *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   license text for more details.
  */
 
 /* code for handling open unlinked files */
 
 #define DEBUG_SUBSYSTEM S_MDS
 
+#ifndef AUTOCONF_INCLUDED
 #include <linux/config.h>
+#endif
 #include <linux/module.h>
 #include <linux/version.h>
 
-#include <portals/list.h>
-#include <linux/obd_class.h>
-#include <linux/lustre_fsfilt.h>
-#include <linux/lustre_commit_confd.h>
-#include <linux/lvfs.h>
+#include <libcfs/list.h>
+#include <obd_class.h>
+#include <lustre_fsfilt.h>
+#include <lustre_mds.h>
+#include <lustre_commit_confd.h>
+#include <lvfs.h>
 
 #include "mds_internal.h"
 
-static int mds_osc_destroy_orphan(struct mds_obd *mds,
-                                  struct inode *inode,
+int mds_osc_destroy_orphan(struct obd_device *obd,
+                                  umode_t mode,
                                   struct lov_mds_md *lmm,
                                   int lmm_size,
                                   struct llog_cookie *logcookies,
                                   int log_unlink)
 {
+        struct mds_obd *mds = &obd->u.mds;
         struct lov_stripe_md *lsm = NULL;
         struct obd_trans_info oti = { 0 };
         struct obdo *oa;
@@ -65,21 +72,24 @@ static int mds_osc_destroy_orphan(struct mds_obd *mds,
                 rc = 0;
         }
 
-        oa = obdo_alloc();
+        rc = obd_checkmd(mds->mds_osc_exp, obd->obd_self_export, lsm);
+        if (rc)
+                GOTO(out_free_memmd, rc);
+
+        OBDO_ALLOC(oa);
         if (oa == NULL)
                 GOTO(out_free_memmd, rc = -ENOMEM);
         oa->o_id = lsm->lsm_object_id;
-        oa->o_gr = FILTER_GROUP_FIRST_MDS + mds->mds_num;
-        oa->o_mode = inode->i_mode & S_IFMT;
+        oa->o_gr = lsm->lsm_object_gr;
+        oa->o_mode = mode & S_IFMT;
         oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLGROUP;
 
         if (log_unlink && logcookies) {
                 oa->o_valid |= OBD_MD_FLCOOKIE;
                 oti.oti_logcookies = logcookies;
         }
-
-        rc = obd_destroy(mds->mds_osc_exp, oa, lsm, &oti);
-        obdo_free(oa);
+        rc = obd_destroy(mds->mds_osc_exp, oa, lsm, &oti, obd->obd_self_export);
+        OBDO_FREE(oa);
         if (rc)
                 CDEBUG(D_INODE, "destroy orphan objid 0x"LPX64" on ost error "
                        "%d\n", lsm->lsm_object_id, rc);
@@ -94,32 +104,33 @@ static int mds_unlink_orphan(struct obd_device *obd, struct dentry *dchild,
         struct mds_obd *mds = &obd->u.mds;
         struct lov_mds_md *lmm = NULL;
         struct llog_cookie *logcookies = NULL;
-        int lmm_size = 0, log_unlink = 0;
+        int lmm_size, log_unlink = 0, cookie_size = 0;
         void *handle = NULL;
+        umode_t mode;
         int rc, err;
         ENTRY;
 
         LASSERT(mds->mds_osc_obd != NULL);
+        
+        /* We don't need to do any of these other things for orhpan dirs,
+         * especially not mds_get_md (may get a default LOV EA, bug 4554) */
+        mode = inode->i_mode;
+        if (S_ISDIR(mode)) {
+                rc = vfs_rmdir(pending_dir, dchild);
+                if (rc)
+                        CERROR("error %d unlinking dir %*s from PENDING\n",
+                               rc, dchild->d_name.len, dchild->d_name.name);
+                RETURN(rc);
+        }
 
-        OBD_ALLOC(lmm, mds->mds_max_mdsize);
+        lmm_size = mds->mds_max_mdsize;
+        OBD_ALLOC(lmm, lmm_size);
         if (lmm == NULL)
                 RETURN(-ENOMEM);
 
-        down(&inode->i_sem);
-        rc = fsfilt_get_md(obd, inode, lmm, mds->mds_max_mdsize);
-        up(&inode->i_sem);
-
-        if (rc < 0) {
-                CERROR("Error %d reading eadata for ino %lu\n",
-                       rc, inode->i_ino);
+        rc = mds_get_md(obd, inode, lmm, &lmm_size, 1);
+        if (rc < 0)
                 GOTO(out_free_lmm, rc);
-        } else if (rc > 0) {
-                lmm_size = rc;
-                rc = mds_convert_lov_ea(obd, inode, lmm, lmm_size);
-                if (rc > 0)
-                        lmm_size = rc;
-                rc = 0;
-        }
 
         handle = fsfilt_start_log(obd, pending_dir, FSFILT_OP_UNLINK, NULL,
                                   le32_to_cpu(lmm->lmm_stripe_count));
@@ -130,42 +141,41 @@ static int mds_unlink_orphan(struct obd_device *obd, struct dentry *dchild,
                 GOTO(out_free_lmm, rc);
         }
 
-        if (S_ISDIR(inode->i_mode))
-                rc = vfs_rmdir(pending_dir, dchild);
-        else
-                rc = vfs_unlink(pending_dir, dchild);
-
-        if (rc)
-                CERROR("error %d unlinking orphan %*s from PENDING directory\n",
+        rc = vfs_unlink(pending_dir, dchild);
+        if (rc) {
+                CERROR("error %d unlinking orphan %.*s from PENDING\n",
                        rc, dchild->d_name.len, dchild->d_name.name);
-
-        if (!rc && lmm_size) {
-                OBD_ALLOC(logcookies, mds->mds_max_cookiesize);
+        } else if (lmm_size) {
+                cookie_size = mds_get_cookie_size(obd, lmm); 
+                OBD_ALLOC(logcookies, cookie_size);
                 if (logcookies == NULL)
                         rc = -ENOMEM;
-                else if (mds_log_op_unlink(obd, inode, lmm,lmm_size,logcookies,
-                                           mds->mds_max_cookiesize, NULL) > 0)
+                else if (mds_log_op_unlink(obd, lmm,lmm_size,logcookies,
+                                           cookie_size) > 0)
                         log_unlink = 1;
         }
-        err = fsfilt_commit(obd, mds->mds_sb, pending_dir, handle, 0);
+
+        err = fsfilt_commit(obd, pending_dir, handle, 0);
         if (err) {
                 CERROR("error committing orphan unlink: %d\n", err);
                 if (!rc)
                         rc = err;
-        }
-        if (!rc) {
-                rc = mds_osc_destroy_orphan(mds, inode, lmm, lmm_size,
+        } else if (!rc) {
+                rc = mds_osc_destroy_orphan(obd, mode, lmm, lmm_size,
                                             logcookies, log_unlink);
         }
 
         if (logcookies != NULL)
-                OBD_FREE(logcookies, mds->mds_max_cookiesize);
+                OBD_FREE(logcookies, cookie_size);
 out_free_lmm:
         OBD_FREE(lmm, mds->mds_max_mdsize);
         RETURN(rc);
 }
 
-int mds_cleanup_orphans(struct obd_device *obd)
+/* Delete inodes which were previously open-unlinked but were not reopened
+ * during MDS recovery for whatever reason (e.g. client also failed, recovery
+ * aborted, etc). */
+int mds_cleanup_pending(struct obd_device *obd)
 {
         struct mds_obd *mds = &obd->u.mds;
         struct lvfs_run_ctxt saved;
@@ -176,11 +186,13 @@ int mds_cleanup_orphans(struct obd_device *obd)
         struct l_linux_dirent *dirent, *n;
         struct list_head dentry_list;
         char d_name[LL_FID_NAMELEN];
-        __u64 i = 0;
-        int rc = 0, item = 0, namlen;
+        unsigned long inum;
+        int i = 0, rc = 0, item = 0, namlen;
         ENTRY;
 
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+        /* dentry and mnt ref dropped in dentry_open() on error, or
+         * in filp_close() if dentry_open() succeeds */
         dentry = dget(mds->mds_pending_dir);
         if (IS_ERR(dentry))
                 GOTO(err_pop, rc = PTR_ERR(dentry));
@@ -193,62 +205,70 @@ int mds_cleanup_orphans(struct obd_device *obd)
         if (IS_ERR(file))
                 GOTO(err_pop, rc = PTR_ERR(file));
 
-        INIT_LIST_HEAD(&dentry_list);
+        CFS_INIT_LIST_HEAD(&dentry_list);
         rc = l_readdir(file, &dentry_list);
         filp_close(file, 0);
         if (rc < 0)
                 GOTO(err_out, rc);
 
         list_for_each_entry_safe(dirent, n, &dentry_list, lld_list) {
-                i ++;
+                i++;
                 list_del(&dirent->lld_list);
 
                 namlen = strlen(dirent->lld_name);
                 LASSERT(sizeof(d_name) >= namlen + 1);
                 strcpy(d_name, dirent->lld_name);
+                inum = dirent->lld_ino;
                 OBD_FREE(dirent, sizeof(*dirent));
 
-                CDEBUG(D_INODE, "entry "LPU64" of PENDING DIR: %s\n",
-                       i, d_name);
+                CDEBUG(D_INODE, "entry %d of PENDING DIR: %s\n", i, d_name);
 
                 if (((namlen == 1) && !strcmp(d_name, ".")) ||
-                    ((namlen == 2) && !strcmp(d_name, ".."))) {
+                    ((namlen == 2) && !strcmp(d_name, "..")) || inum == 0)
                         continue;
-                }
 
-                down(&pending_dir->i_sem);
+                LOCK_INODE_MUTEX(pending_dir);
                 dchild = lookup_one_len(d_name, mds->mds_pending_dir, namlen);
                 if (IS_ERR(dchild)) {
-                        up(&pending_dir->i_sem);
+                        UNLOCK_INODE_MUTEX(pending_dir);
                         GOTO(err_out, rc = PTR_ERR(dchild));
                 }
                 if (!dchild->d_inode) {
-                        CERROR("orphan %s has been removed\n", d_name);
+                        CWARN("%s: orphan %s has already been removed\n",
+                              obd->obd_name, d_name);
                         GOTO(next, rc = 0);
                 }
 
+                if (is_bad_inode(dchild->d_inode)) {
+                        CERROR("%s: bad orphan inode found %lu/%u\n",
+                               obd->obd_name, dchild->d_inode->i_ino,
+                               dchild->d_inode->i_generation);
+                        GOTO(next, rc = -ENOENT);
+                }
+
                 child_inode = dchild->d_inode;
-                DOWN_READ_I_ALLOC_SEM(child_inode);
+                MDS_DOWN_READ_ORPHAN_SEM(child_inode);
                 if (mds_inode_is_orphan(child_inode) &&
                     mds_orphan_open_count(child_inode)) {
-                        UP_READ_I_ALLOC_SEM(child_inode);
-                        CWARN("orphan %s re-opened during recovery\n", d_name);
+                        MDS_UP_READ_ORPHAN_SEM(child_inode);
+                        CWARN("%s: orphan %s re-opened during recovery\n",
+                              obd->obd_name, d_name);
                         GOTO(next, rc = 0);
                 }
-                UP_READ_I_ALLOC_SEM(child_inode);
+                MDS_UP_READ_ORPHAN_SEM(child_inode);
+
                 rc = mds_unlink_orphan(obd, dchild, child_inode, pending_dir);
-                if (rc == 0) {
-                        item ++;
-                        CWARN("removed orphan %s from MDS and OST\n", d_name);
-                } else {
-                        CDEBUG(D_INODE, "removed orphan %s from MDS/OST failed,"
-                               " rc = %d\n", d_name, rc);
+                CDEBUG(D_INODE, "%s: removed orphan %s: rc %d\n",
+                       obd->obd_name, d_name, rc);
+                if (rc == 0)
+                        item++;
+                else
                         rc = 0;
-                }
 next:
                 l_dput(dchild);
-                up(&pending_dir->i_sem);
+                UNLOCK_INODE_MUTEX(pending_dir);
         }
+        rc = 0;
 err_out:
         list_for_each_entry_safe(dirent, n, &dentry_list, lld_list) {
                 list_del(&dirent->lld_list);
@@ -256,8 +276,9 @@ err_out:
         }
 err_pop:
         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
-        if (rc == 0)
-                rc = item;
+        if (item > 0)
+                CWARN("%s: removed %d pending open-unlinked files\n",
+                      obd->obd_name, item);
         RETURN(rc);
 
 err_mntget: