Whamcloud - gitweb
- add clean up unlinked open files after recovery or after recovey
authorbraam <braam>
Sun, 5 Oct 2003 04:49:50 +0000 (04:49 +0000)
committerbraam <braam>
Sun, 5 Oct 2003 04:49:50 +0000 (04:49 +0000)
  aborts
- add regressions for this in replay-single.sh
- stub in most of the orphan logging code (still under ENABLE_ORPHAN ifdefs)

lustre/ldlm/ldlm_lib.c
lustre/mds/mds_internal.h
lustre/mds/mds_log.c [new file with mode: 0644]
lustre/mds/mds_unlink_open.c [new file with mode: 0644]
lustre/obdclass/llog_obd.c [new file with mode: 0644]
lustre/obdfilter/filter_internal.h
lustre/obdfilter/filter_log.c
lustre/ptlrpc/llog_net.c [new file with mode: 0644]
lustre/tests/replay-single.sh

index f91a28a..8d92a64 100644 (file)
@@ -431,6 +431,7 @@ static void abort_recovery_queue(struct obd_device *obd)
 void target_abort_recovery(void *data)
 {
         struct obd_device *obd = data;
+        int rc;
 
         CERROR("disconnecting clients and aborting recovery\n");
         spin_lock_bh(&obd->obd_processing_task_lock);
@@ -451,6 +452,12 @@ void target_abort_recovery(void *data)
         if (OBT(obd) && OBP(obd, postsetup))
                 OBP(obd, postsetup)(obd);
 
+        /* when recovery was abort, cleanup orphans for mds */
+        if (OBT(obd) && OBP(obd, postcleanup)) {
+                rc = OBP(obd, postcleanup)(obd);
+                CERROR("Cleanup %d orphans after recovery was abort!\n", rc);
+        }
+
         class_disconnect_exports(obd, 0);
         abort_delayed_replies(obd);
         abort_recovery_queue(obd);
@@ -694,6 +701,7 @@ int target_queue_final_reply(struct ptlrpc_request *req, int rc)
         struct ptlrpc_request *saved_req;
         struct lustre_msg *reqmsg;
         int recovery_done = 0;
+        int rc2;
 
         if (rc) {
                 /* Just like ptlrpc_error, but without the sending. */
@@ -728,6 +736,14 @@ int target_queue_final_reply(struct ptlrpc_request *req, int rc)
                        obd->obd_name);
                 obd->obd_recovering = 0;
 
+                /* when recovering finished, cleanup orphans for mds       */
+                /* there should be no orphan cleaned up for this condition */
+                if (OBT(obd) && OBP(obd, postcleanup)) {
+                        CERROR("cleanup orphans after all clients recovered\n");
+                        rc2 = OBP(obd, postcleanup)(obd);
+                        LASSERT(rc2 == 0);
+                }
+
                 if (OBT(obd) && OBP(obd, postsetup))
                         OBP(obd, postsetup)(obd);
 
index 31639f4..dc554d7 100644 (file)
@@ -15,7 +15,9 @@ struct llog_handle *mds_log_create(struct obd_device *obd, char *name);
 int mds_log_close(struct llog_handle *cathandle, struct llog_handle *loghandle);
 struct llog_handle *mds_log_open(struct obd_device *obd,
                                  struct llog_cookie *logcookie);
+#if 0
 struct llog_handle *mds_get_catalog(struct obd_device *obd);
+#endif
 void mds_put_catalog(struct obd_device *obd, struct llog_handle *cathandle);
 
 
@@ -25,12 +27,23 @@ int mds_finish_transno(struct mds_obd *mds, struct inode *inode, void *handle,
                        struct ptlrpc_request *req, int rc, __u32 op_data);
 void mds_reconstruct_generic(struct ptlrpc_request *req);
 void mds_req_from_mcd(struct ptlrpc_request *req, struct mds_client_data *mcd);
-int mds_cleanup_orphans(struct obd_device *);
 
 /* mds/mds_lib.c */
 int mds_update_unpack(struct ptlrpc_request *, int offset,
                       struct mds_update_record *);
 
+/* mds/mds_unlink_open.c */
+int mds_open_unlink_rename(struct mds_update_record *rec,
+                           struct obd_device *obd, struct dentry *dparent,
+                           struct dentry *dchild, void **handle);
+int mds_cleanup_orphans(struct obd_device *obd);
+
+
+/* mds/mds_log.c */
+struct llog_handle *mds_get_catalog(struct obd_device *obd);
+int mds_log_op_unlink(struct obd_device *obd, struct inode *inode, struct lustre_msg *repmsg,
+                      int offset);
+
 /* mds/mds_lov.c */
 int mds_lov_connect(struct obd_device *obd);
 int mds_get_lovtgts(struct obd_device *, int tgt_count, struct obd_uuid *);
diff --git a/lustre/mds/mds_log.c b/lustre/mds/mds_log.c
new file mode 100644 (file)
index 0000000..7207167
--- /dev/null
@@ -0,0 +1,140 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  lustre/mds/mds_log.c
+ *
+ *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
+ *   Author: Peter Braam <braam@clusterfs.com>
+ *   Author: Andreas Dilger <adilger@clusterfs.com>
+ *   Author: Phil Schwan <phil@clusterfs.com>
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#define DEBUG_SUBSYSTEM S_MDS
+
+#include <linux/config.h>
+#include <linux/module.h>
+#include <linux/version.h>
+
+#include <portals/list.h>
+#include <linux/obd_class.h>
+#include <linux/lustre_fsfilt.h>
+#include <linux/lustre_commit_confd.h>
+
+#include "mds_internal.h"
+
+
+struct llog_handle *mds_get_catalog(struct obd_device *obd)
+{
+        struct mds_obd *mds = &obd->u.mds;
+        struct mds_server_data *msd = mds->mds_server_data;
+        struct obd_run_ctxt saved;
+        struct llog_handle *cathandle = NULL;
+        struct llog_logid logid;
+        int rc;
+        ENTRY;
+
+        push_ctxt(&saved, &obd->obd_ctxt, NULL);
+        if (msd->msd_catalog_oid) {
+                logid.lgl_oid = le64_to_cpu(msd->msd_catalog_oid);
+                logid.lgl_ogen = le32_to_cpu(msd->msd_catalog_ogen);
+                rc = llog_create(obd, &cathandle, &logid, NULL);
+                if (rc) {
+                        CERROR("error opening catalog "LPX64":%x: rc %d\n",
+                               logid.lgl_oid, logid.lgl_ogen,
+                               (int)PTR_ERR(cathandle));
+                        msd->msd_catalog_oid = 0;
+                        msd->msd_catalog_ogen = 0;
+                }
+        }
+
+        if (!msd->msd_catalog_oid) {
+                rc = llog_create(obd, &cathandle, NULL, NULL);
+                if (rc) {
+                        CERROR("error creating new catalog: rc %d\n", rc);
+                        cathandle = ERR_PTR(rc);
+                        GOTO(out, cathandle);
+                }
+                logid = cathandle->lgh_id;
+                msd->msd_catalog_oid = cpu_to_le64(logid.lgl_oid);
+                msd->msd_catalog_ogen = cpu_to_le32(logid.lgl_ogen);
+                rc = mds_update_server_data(obd, 0);
+                if (rc) {
+                        CERROR("error writing new catalog to disk: rc %d\n",rc);
+                        GOTO(out_handle, rc);
+                }
+        }
+
+        //rc = llog_init_handle(cathandle, LLOG_F_IS_CAT, &obd->u.filter.fo_mdc_uuid);
+        rc = llog_init_handle(cathandle, LLOG_F_IS_CAT, &obd->obd_uuid);
+        if (rc)
+                GOTO(out_handle, rc);
+out:
+        pop_ctxt(&saved, &obd->obd_ctxt, NULL);
+        RETURN(cathandle);
+
+out_handle:
+        llog_close(cathandle);
+        cathandle = ERR_PTR(rc);
+        goto out;
+}
+
+
+int mds_log_op_unlink(struct obd_device *obd, 
+                      struct inode *inode, struct lustre_msg *repmsg,
+                      int offset)
+{
+        struct mds_obd *mds = &obd->u.mds;
+        struct lov_stripe_md *lsm = NULL;
+        struct llog_unlink_rec *lur;
+        int rc;
+        ENTRY;
+
+        if (IS_ERR(mds->mds_osc_obd))
+                RETURN(PTR_ERR(mds->mds_osc_obd));
+
+        rc = obd_unpackmd(mds->mds_osc_exp, &lsm,
+                          lustre_msg_buf(repmsg, offset, 0),
+                          repmsg->buflens[offset]);
+        if (rc < 0)
+                RETURN(rc);
+
+        OBD_ALLOC(lur, sizeof(*lur));
+        if (!lur)
+                RETURN(-ENOMEM);
+        lur->lur_hdr.lrh_len = lur->lur_tail.lrt_len = sizeof(*lur);
+        lur->lur_hdr.lrh_type = MDS_UNLINK_REC;
+        lur->lur_oid = inode->i_ino;
+        lur->lur_ogen = inode->i_generation;
+
+#ifdef ENABLE_ORPHANS
+#if 0
+        rc = obd_log_add(mds->mds_osc_exp, mds->mds_catalog, &lur->lur_hdr,
+                         lsm, lustre_msg_buf(repmsg, offset + 1, 0),
+                         repmsg->buflens[offset+1]/sizeof(struct llog_cookie),
+                         NULL);
+#endif
+        rc = lov_log_add(mds->mds_osc_exp, mds->mds_catalog, &lur->lur_hdr,
+                         lsm, lustre_msg_buf(repmsg, offset + 1, 0),
+                         repmsg->buflens[offset+1]/sizeof(struct llog_cookie));
+#endif
+
+        obd_free_memmd(mds->mds_osc_exp, &lsm);
+        OBD_FREE(lur, sizeof(*lur));
+
+        RETURN(rc);
+}
diff --git a/lustre/mds/mds_unlink_open.c b/lustre/mds/mds_unlink_open.c
new file mode 100644 (file)
index 0000000..82cc150
--- /dev/null
@@ -0,0 +1,298 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  lustre/mds/mds_orphan.c
+ *
+ *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
+ *   Author: Peter Braam <braam@clusterfs.com>
+ *   Author: Andreas Dilger <adilger@clusterfs.com>
+ *   Author: Phil Schwan <phil@clusterfs.com>
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+/* code for handling open unlinked files */
+
+#define DEBUG_SUBSYSTEM S_MDS
+
+#include <linux/config.h>
+#include <linux/module.h>
+#include <linux/version.h>
+
+#include <portals/list.h>
+#include <linux/obd_class.h>
+#include <linux/lustre_fsfilt.h>
+#include <linux/lustre_commit_confd.h>
+#include <linux/lvfs.h>
+
+#include "mds_internal.h"
+
+
+/* If we are unlinking an open file/dir (i.e. creating an orphan) then
+ * we instead link the inode into the PENDING directory until it is
+ * finally released.  We can't simply call mds_reint_rename() or some
+ * part thereof, because we don't have the inode to check for link
+ * count/open status until after it is locked.
+ *
+ * For lock ordering, we always get the PENDING, then pending_child lock
+ * last to avoid deadlocks.
+ */
+
+int mds_open_unlink_rename(struct mds_update_record *rec,
+                           struct obd_device *obd, struct dentry *dparent,
+                           struct dentry *dchild, void **handle)
+{
+        struct mds_obd *mds = &obd->u.mds;
+        struct inode *pending_dir = mds->mds_pending_dir->d_inode;
+        struct dentry *pending_child;
+        char fidname[LL_FID_NAMELEN];
+        int fidlen = 0, rc;
+        ENTRY;
+
+        LASSERT(!mds_inode_is_orphan(dchild->d_inode));
+
+        down(&pending_dir->i_sem);
+        fidlen = ll_fid2str(fidname, dchild->d_inode->i_ino,
+                            dchild->d_inode->i_generation);
+
+        CDEBUG(D_ERROR, "pending destroy of %dx open file %s = %s\n",
+               mds_open_orphan_count(dchild->d_inode),
+               rec->ur_name, fidname);
+
+        pending_child = lookup_one_len(fidname, mds->mds_pending_dir, fidlen);
+        if (IS_ERR(pending_child))
+                GOTO(out_lock, rc = PTR_ERR(pending_child));
+
+        if (pending_child->d_inode != NULL) {
+                CERROR("re-destroying orphan file %s?\n", rec->ur_name);
+                LASSERT(pending_child->d_inode == dchild->d_inode);
+                GOTO(out_dput, rc = 0);
+        }
+
+        *handle = fsfilt_start(obd, pending_dir, FSFILT_OP_RENAME, NULL);
+        if (IS_ERR(*handle))
+                GOTO(out_dput, rc = PTR_ERR(*handle));
+
+        lock_kernel();
+        rc = vfs_rename(dparent->d_inode, dchild, pending_dir, pending_child);
+        unlock_kernel();
+        if (rc)
+                CERROR("error renaming orphan %lu/%s to PENDING: rc = %d\n",
+                       dparent->d_inode->i_ino, rec->ur_name, rc);
+        else
+                mds_inode_set_orphan(dchild->d_inode);
+out_dput:
+        dput(pending_child);
+out_lock:
+        up(&pending_dir->i_sem);
+        RETURN(rc);
+}
+
+
+int mds_cleanup_orphans(struct obd_device *obd)
+{
+        struct mds_obd *mds = &obd->u.mds;
+        struct obd_run_ctxt saved;
+        struct file *file;
+        struct dentry *dchild; 
+        struct inode *child_inode, *pending_dir = mds->mds_pending_dir->d_inode;
+        struct l_linux_dirent *dirent, *ptr;
+        unsigned int count = pending_dir->i_size;
+        void *handle = NULL;
+        struct lov_mds_md *lmm = NULL;
+        struct lov_stripe_md *lsm = NULL;
+        struct obd_trans_info oti = { 0 };
+        struct obdo *oa;
+        struct ptlrpc_request *req;
+        struct mds_body *body;
+        int lengths[3] = {sizeof(struct mds_body),
+                          mds->mds_max_mdsize,
+                          mds->mds_max_cookiesize}; 
+        int rc = 0, rc2 = 0, item = 0;
+        ENTRY;
+        
+        push_ctxt(&saved, &obd->obd_ctxt, NULL);
+        dget(mds->mds_pending_dir);
+        mntget(mds->mds_vfsmnt);
+        file = dentry_open(mds->mds_pending_dir, mds->mds_vfsmnt, 
+                           O_RDONLY | O_LARGEFILE);
+        if (IS_ERR(file)) 
+                GOTO(err_open, rc2 = PTR_ERR(file));
+        
+        OBD_ALLOC(dirent, count);
+        if (dirent == NULL)
+                GOTO(err_alloc_dirent, rc2 = -ENOMEM);
+
+        rc = l_readdir(file, dirent, count); 
+        filp_close(file, 0);
+        if (rc < 0)
+                GOTO(err_out, rc2 = rc);
+
+        for (ptr = dirent; (char *)ptr < (char *)dirent + rc; 
+                        (char *)ptr += ptr->d_reclen) {
+                int namlen = strlen(ptr->d_name); 
+
+                if (((namlen == 1) && !strcmp(ptr->d_name, ".")) ||
+                    ((namlen == 2) && !strcmp(ptr->d_name, ".."))) 
+                        continue;
+
+                down(&pending_dir->i_sem);
+                dchild = lookup_one_len(ptr->d_name, mds->mds_pending_dir, namlen); 
+                if (IS_ERR(dchild)) {
+                        up(&pending_dir->i_sem);
+                        GOTO(err_out, rc2 = PTR_ERR(dchild));
+                }
+                if (!dchild->d_inode) {
+                        CDEBUG(D_ERROR, "orphan %s has been deleted\n", ptr->d_name);
+                        GOTO(next, rc2 = 0);
+                }
+
+                child_inode = dchild->d_inode;
+                if (mds_inode_is_orphan(child_inode) && 
+                    mds_open_orphan_count(child_inode)) {
+                        CDEBUG(D_ERROR, "orphan %s was re-opened during recovery\n",
+                               ptr->d_name);
+                        GOTO(next, rc2 = 0);
+                }
+
+                CDEBUG(D_ERROR, "cleanup orphan %s start on mds and ost:\n", ptr->d_name);
+
+                LASSERT(mds->mds_osc_obd != NULL);
+
+                OBD_ALLOC(req, sizeof(*req)); 
+                if (!req) {
+                        CERROR("request allocation out of memory\n");
+                        GOTO(err_lov_conn, rc2 = -ENOMEM);
+                }
+                rc2 = lustre_pack_msg(3, lengths, NULL, &req->rq_replen,
+                                      &req->rq_repmsg);
+                if (rc2) {
+                        CERROR("cannot pack request %d\n", rc2);
+                        OBD_FREE(req, sizeof(*req));
+                        GOTO(out_free_req, rc2);
+                }
+                body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*body));
+                LASSERT(body != NULL);
+
+                mds_pack_inode2body(body, child_inode);
+                mds_pack_md(obd, req->rq_repmsg, 1, body, child_inode);
+                lmm = lustre_msg_buf(req->rq_repmsg, 1, 0);
+
+#ifdef ENABLE_ORPHANS
+                if (mds_log_op_unlink(obd, child_inode,
+                                              req->rq_repmsg, 1) > 0)
+                        oa->o_valid |= OBD_MD_FLCOOKIE;
+#endif
+
+                rc2 = obd_unpackmd(mds->mds_osc_exp, &lsm, lmm, body->eadatasize);
+                if (rc2 < 0) {
+                        CERROR("Error unpack md %p\n", lmm);
+                        GOTO(out_free_req, rc2 = 0);
+                } else {
+                        LASSERT(rc2 >= sizeof(*lsm));
+                        rc2 = 0;                
+                }
+
+                oa = obdo_alloc();
+                if (oa == NULL)
+                        GOTO(err_alloc_oa, rc2 = -ENOMEM);
+
+                oa->o_id = lsm->lsm_object_id;
+                oa->o_mode = child_inode->i_mode & S_IFMT;
+                oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE;
+
+#ifdef ENABLE_ORPHANS
+                if (oa->o_valid & OBD_MD_FLCOOKIE) {
+                        oti.oti_logcookies =
+                                lustre_msg_buf(req->rq_repmsg, 2,
+                                               sizeof(struct llog_cookie) *
+                                               lsm->lsm_stripe_count);
+                        if (oti.oti_logcookies == NULL) 
+                                oa->o_valid &= ~OBD_MD_FLCOOKIE;
+                }
+#endif
+
+                rc2 = obd_destroy(mds->mds_osc_exp, oa, lsm, &oti);
+                obdo_free(oa); 
+                if (rc2) {
+                        CERROR("destroy orphan objid 0x"LPX64" on ost error %d\n",
+                               lsm->lsm_object_id, rc2);
+                        GOTO(out_free_memmd, rc2 = 0);
+                }
+                item ++;
+
+                CDEBUG(D_ERROR, "removed orphan %s object from ost successlly!\n", 
+                       ptr->d_name);
+
+                handle = fsfilt_start(obd, pending_dir, FSFILT_OP_UNLINK, NULL); 
+                if (IS_ERR(handle)) {
+                        rc2 = PTR_ERR(handle);
+                        CERROR("error fsfilt_start: %d\n", rc2);
+                        handle = NULL;
+                        GOTO(err_alloc_oa, rc2);
+                }
+                rc2 = vfs_unlink(pending_dir, dchild);
+                if (rc2) {
+                        CERROR("error unlinking orphan from PENDING directory");
+                        CERROR("%s: rc %d\n", ptr->d_name, rc2); 
+                }
+                if (handle) {
+                        int err = fsfilt_commit(obd, pending_dir, handle, 0);
+                        if (err) {
+                                CERROR("error committing orphan unlink: %d\n", err);
+                                rc2 = err;
+                                GOTO(err_alloc_oa, rc2);
+                        }
+                }
+
+                CDEBUG(D_ERROR, "removed orphan %s from mds successfully!\n", 
+                       ptr->d_name);
+
+out_free_memmd:
+                obd_free_memmd(mds->mds_osc_exp, &lsm);
+out_free_req:
+                OBD_FREE(req, sizeof(*req));
+next:
+                l_dput(dchild);
+                up(&pending_dir->i_sem);
+        }
+err_out:
+        OBD_FREE(dirent, count);
+err_pop:
+        pop_ctxt(&saved, &obd->obd_ctxt, NULL);
+        if (rc2 == 0)
+                rc2 = item;
+
+        RETURN(rc2);
+
+err_open:
+        mntput(mds->mds_vfsmnt);
+        l_dput(mds->mds_pending_dir);
+        goto err_pop;
+
+err_alloc_dirent:
+        filp_close(file, 0);
+        goto err_pop;
+
+err_alloc_oa:
+        obd_free_memmd(mds->mds_osc_exp, &lsm);
+        OBD_FREE(req, sizeof(*req));
+
+err_lov_conn:
+        l_dput(dchild);
+        up(&pending_dir->i_sem);
+        goto err_out;
+}
diff --git a/lustre/obdclass/llog_obd.c b/lustre/obdclass/llog_obd.c
new file mode 100644 (file)
index 0000000..d8eb374
--- /dev/null
@@ -0,0 +1,69 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#define DEBUG_SUBSYSTEM S_LOG
+
+#ifndef EXPORT_SYMTAB
+#define EXPORT_SYMTAB
+#endif
+
+#include <linux/fs.h>
+#include <linux/obd_class.h>
+#include <linux/lustre_log.h>
+#include <portals/list.h>
+
+int obd_llog_open(struct obd_device *obd, struct obd_device *disk_obd,
+                  int index, int named, int flags, struct obd_uuid *log_uuid)
+{
+        int rc;
+        ENTRY;
+
+        
+
+        RETURN(rc);
+}
+EXPORT_SYMBOL(obd_llog_open);
+
+int obd_log_add(struct obd_export *exp, struct llog_handle *cathandle,
+                struct llog_rec_hdr *rec, void *buf, 
+                struct llog_cookie *logcookies, int numcookies)
+{
+        struct obd_device *obd = class_exp2obd(exp->exp_obd->obd_log_exp);
+        struct obd_run_ctxt saved;
+        int rc;
+        ENTRY;
+
+        LASSERT(cathandle != NULL);
+        push_ctxt(&saved, &obd->obd_ctxt, NULL); 
+        rc = llog_cat_add_rec(cathandle, rec, logcookies, buf);
+        if (rc != 1)
+                CERROR("write one catalog record failed: %d\n", rc);
+        pop_ctxt(&saved, &obd->obd_ctxt, NULL);
+
+        RETURN(rc);
+}
+EXPORT_SYMBOL(obd_log_add);
+
+int obd_log_cancel(struct obd_export *exp, struct llog_handle *cathandle,
+                   void *buf, int count, struct llog_cookie *cookies, int flags)
+{
+        struct obd_device *obd = class_exp2obd(exp->exp_obd->obd_log_exp);
+        struct obd_run_ctxt saved;
+        int rc;
+        ENTRY;
+
+        LASSERT(cathandle != NULL);
+        push_ctxt(&saved, &obd->obd_ctxt, NULL); 
+        rc = llog_cat_cancel_records(cathandle, count, cookies);
+        if (rc)
+                CERROR("cancel %d catalog record failed: %d\n", count, rc);
+        pop_ctxt(&saved, &obd->obd_ctxt, NULL);
+
+        RETURN(rc);
+}
+EXPORT_SYMBOL(obd_log_cancel);
index 6c575a5..b171342 100644 (file)
@@ -59,10 +59,12 @@ struct filter_server_data {
         __u16 fsd_client_size;     /* size of per-client data area */
         __u16 fsd_subdir_count;    /* number of subdirectories for objects */
         __u64 fsd_catalog_oid;     /* recovery catalog object id */
-        __u32 fsd_catalog_ogen;    /* recovery catalog inode generation */
+        //__u32 fsd_catalog_ogen;    /* recovery catalog inode generation */
+        __u64 fsd_catalog_ogr;    /* recovery catalog inode group */
         __u8  fsd_peeruuid[37];    /* UUID of MDS associated with this OST */
         __u8  peer_padding[3];     /* unused */
-        __u8  fsd_padding[FILTER_LR_SERVER_SIZE - 140];
+        //__u8  fsd_padding[FILTER_LR_SERVER_SIZE - 140];
+        __u8  fsd_padding[FILTER_LR_SERVER_SIZE - 144];
 };
 
 /* Data stored per client in the last_rcvd file.  In le32 order. */
index de21a1f..e5a5b61 100644 (file)
@@ -51,14 +51,14 @@ struct llog_handle *filter_get_catalog(struct obd_device *obd)
         push_ctxt(&saved, &obd->obd_ctxt, NULL);
         if (fsd->fsd_catalog_oid) {
                 logid.lgl_oid = le64_to_cpu(fsd->fsd_catalog_oid);
-                logid.lgl_ogen = le32_to_cpu(fsd->fsd_catalog_ogen);
+                logid.lgl_ogr = le64_to_cpu(fsd->fsd_catalog_ogr);
                 rc = llog_create(obd, &cathandle, &logid, NULL);
                 if (rc) {
                         CERROR("error opening catalog "LPX64":%x: rc %d\n",
                                logid.lgl_oid, logid.lgl_ogen,
                                (int)PTR_ERR(cathandle));
                         fsd->fsd_catalog_oid = 0;
-                        fsd->fsd_catalog_ogen = 0;
+                        fsd->fsd_catalog_ogr = 0;
                 }
         }
 
@@ -71,7 +71,7 @@ struct llog_handle *filter_get_catalog(struct obd_device *obd)
                 }
                 logid = cathandle->lgh_id;
                 fsd->fsd_catalog_oid = cpu_to_le64(logid.lgl_oid);
-                fsd->fsd_catalog_ogen = cpu_to_le32(logid.lgl_ogen);
+                fsd->fsd_catalog_ogr = cpu_to_le64(logid.lgl_ogr);
                 rc = filter_update_server_data(obd, filter->fo_rcvd_filp,fsd,0);
                 if (rc) {
                         CERROR("error writing new catalog to disk: rc %d\n",rc);
@@ -79,7 +79,8 @@ struct llog_handle *filter_get_catalog(struct obd_device *obd)
                 }
         }
 
-        rc = llog_init_handle(cathandle, LLOG_F_IS_CAT, &obd->u.filter.fo_mdc_uuid);
+        //rc = llog_init_handle(cathandle, LLOG_F_IS_CAT, &obd->u.filter.fo_mdc_uuid);
+        rc = llog_init_handle(cathandle, LLOG_F_IS_CAT, &obd->obd_uuid);
         if (rc)
                 GOTO(out_handle, rc);
 out:
diff --git a/lustre/ptlrpc/llog_net.c b/lustre/ptlrpc/llog_net.c
new file mode 100644 (file)
index 0000000..5944caf
--- /dev/null
@@ -0,0 +1,78 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
+ *   Author: Andreas Dilger <adilger@clusterfs.com>
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ * OST<->MDS recovery logging infrastructure.
+ *
+ * Invariants in implementation:
+ * - we do not share logs among different OST<->MDS connections, so that
+ *   if an OST or MDS fails it need only look at log(s) relevant to itself
+ */
+
+#define DEBUG_SUBSYSTEM S_LOG
+
+#ifndef EXPORT_SYMTAB
+#define EXPORT_SYMTAB
+#endif
+
+#include <linux/fs.h>
+#include <linux/obd_class.h>
+#include <linux/lustre_log.h>
+#include <portals/list.h>
+#include <linux/lvfs.h>
+
+
+/* This is a callback from the llog_* functions.
+ * Assumes caller has already pushed us into the kernel context. */
+static int llog_net_create(struct obd_device *obd, struct llog_handle **res,
+                            struct llog_logid *logid, char *name)
+{
+        struct llog_handle *handle;
+        ENTRY;
+
+        handle = llog_alloc_handle();
+        if (handle == NULL)
+                RETURN(-ENOMEM);
+        *res = handle;
+
+        if (!logid) {
+                CERROR("llog_net_create: must pass logid\n");
+                llog_free_handle(handle);
+                RETURN(-EINVAL);
+        }
+
+        handle->lgh_file = NULL;
+        handle->lgh_obd = obd;
+        handle->lgh_id.lgl_ogr = 1;
+        handle->lgh_id.lgl_oid =
+                handle->lgh_file->f_dentry->d_inode->i_ino;
+        handle->lgh_id.lgl_ogen =
+                handle->lgh_file->f_dentry->d_inode->i_generation;
+
+        RETURN(0);
+}
+
+struct llog_operations llog_net_ops = {
+        //lop_next_block:  llog_lvfs_next_block,
+        //lop_read_header: llog_lvfs_read_header,
+        lop_create:      llog_net_create,
+};
+
+EXPORT_SYMBOL(llog_lvfs_ops);
index 3908760..4b69c89 100755 (executable)
@@ -365,5 +365,233 @@ test_19() {
 }
 run_test 19 "|X| mcreate, open, write, rename "
 
+test_20() {
+    replay_barrier mds
+    multiop $DIR/$tfile O_tSc &
+    pid=$!
+    # give multiop a chance to open
+    sleep 1 
+    rm -f $DIR/$tfile
+
+    fail mds
+    kill -USR1 $pid
+    wait $pid || return 2
+    [ -e $DIR/$tfile ] && return 3
+    return 0
+}
+run_test 20 "|X| open(O_CREAT), unlink, replay, close (test mds_cleanup_orphans)"
+
+test_21() {
+    replay_barrier mds
+    multiop $DIR/$tfile O_tSc &
+    pid=$!
+    # give multiop a chance to open
+    sleep 1 
+    rm -f $DIR/$tfile
+    touch $DIR/g11 || return 1
+
+    fail mds
+    kill -USR1 $pid
+    wait $pid || return 2
+    [ -e $DIR/$tfile ] && return 3
+    touch $DIR/h11 || return 4
+    return 0
+}
+run_test 21 "|X| open(O_CREAT), unlink touch new, replay, close (test mds_cleanup_orphans)"
+
+test_22() {
+    multiop $DIR/$tfile O_tSc &
+    pid=$!
+    # give multiop a chance to open
+    sleep 1 
+
+    replay_barrier mds
+    rm -f $DIR/$tfile
+
+    fail mds
+    kill -USR1 $pid
+    wait $pid || return 2
+    [ -e $DIR/$tfile ] && return 3
+    return 0
+}
+run_test 22 "open(O_CREAT), |X| unlink, replay, close (test mds_cleanup_orphans)"
+
+test_23() {
+    multiop $DIR/$tfile O_tSc &
+    pid=$!
+    # give multiop a chance to open
+    sleep 1 
+
+    replay_barrier mds
+    rm -f $DIR/$tfile
+    touch $DIR/g11 || return 1
+
+    fail mds
+    kill -USR1 $pid
+    wait $pid || return 2
+    [ -e $DIR/$tfile ] && return 3
+    touch $DIR/h11 || return 4
+    return 0
+}
+run_test 23 "open(O_CREAT), |X| unlink touch new, replay, close (test mds_cleanup_orphans)"
+
+test_24() {
+    multiop $DIR/$tfile O_tSc &
+    pid=$!
+    # give multiop a chance to open
+    sleep 1 
+
+    replay_barrier mds
+    fail mds
+    rm -f $DIR/$tfile
+    kill -USR1 $pid
+    wait $pid || return 2
+    [ -e $DIR/$tfile ] && return 3
+    return 0
+}
+run_test 24 "open(O_CREAT), replay, unlink, close (test mds_cleanup_orphans)"
+
+test_25() {
+    multiop $DIR/$tfile O_tSc &
+    pid=$!
+    # give multiop a chance to open
+    sleep 1 
+    rm -f $DIR/$tfile
+
+    replay_barrier mds
+    fail mds
+    kill -USR1 $pid
+    wait $pid || return 2
+    [ -e $DIR/$tfile ] && return 3
+    return 0
+}
+run_test 25 "open(O_CREAT), unlink, replay, close (test mds_cleanup_orphans)"
+
+test_26() {
+    replay_barrier mds
+    multiop $DIR/$tfile O_tSc &
+    pid=$!
+    multiop $DIR/$tfile-2 O_tSc &
+    pid2=$!
+    # give multiop a chance to open
+    sleep 1 
+    rm -f $DIR/$tfile
+    rm -f $DIR/$tfile-2
+    kill -USR1 $pid2
+    wait $pid2 || return 4
+
+    fail mds
+    kill -USR1 $pid
+    wait $pid || return 2
+    [ -e $DIR/$tfile ] && return 3
+    return 0
+}
+run_test 26 "|X| open(O_CREAT), unlink two, close one, replay, close one (test mds_cleanup_orphans)"
+
+test_27() {
+    replay_barrier mds
+    multiop $DIR/$tfile O_tSc &
+    pid=$!
+    multiop $DIR/$tfile-2 O_tSc &
+    pid2=$!
+    # give multiop a chance to open
+    sleep 1 
+    rm -f $DIR/$tfile
+    rm -f $DIR/$tfile-2
+
+    fail mds
+    kill -USR1 $pid
+    wait $pid || return 2
+    kill -USR1 $pid2
+    wait $pid2 || return 4
+    [ -e $DIR/$tfile ] && return 3
+    return 0
+}
+run_test 27 "|X| open(O_CREAT), unlink two, replay, close two (test mds_cleanup_orphans)"
+
+test_28() {
+    multiop $DIR/$tfile O_tSc &
+    pid=$!
+    multiop $DIR/$tfile-2 O_tSc &
+    pid2=$!
+    # give multiop a chance to open
+    sleep 1 
+    replay_barrier mds
+    rm -f $DIR/$tfile
+    rm -f $DIR/$tfile-2
+    kill -USR1 $pid2
+    wait $pid2 || return 4
+
+    fail mds
+    kill -USR1 $pid
+    wait $pid || return 2
+    [ -e $DIR/$tfile ] && return 3
+    return 0
+}
+run_test 28 "open(O_CREAT), |X| unlink two, close one, replay, close one (test mds_cleanup_orphans)"
+
+test_29() {
+    multiop $DIR/$tfile O_tSc &
+    pid=$!
+    multiop $DIR/$tfile-2 O_tSc &
+    pid2=$!
+    # give multiop a chance to open
+    sleep 1 
+    replay_barrier mds
+    rm -f $DIR/$tfile
+    rm -f $DIR/$tfile-2
+
+    fail mds
+    kill -USR1 $pid
+    wait $pid || return 2
+    kill -USR1 $pid2
+    wait $pid2 || return 4
+    [ -e $DIR/$tfile ] && return 3
+    return 0
+}
+run_test 29 "open(O_CREAT), |X| unlink two, replay, close two (test mds_cleanup_orphans)"
+
+test_30() {
+    multiop $DIR/$tfile O_tSc &
+    pid=$!
+    multiop $DIR/$tfile-2 O_tSc &
+    pid2=$!
+    # give multiop a chance to open
+    sleep 1 
+    rm -f $DIR/$tfile
+    rm -f $DIR/$tfile-2
+
+    replay_barrier mds
+    fail mds
+    kill -USR1 $pid
+    wait $pid || return 2
+    kill -USR1 $pid2
+    wait $pid2 || return 4
+    [ -e $DIR/$tfile ] && return 3
+    return 0
+}
+run_test 30 "open(O_CREAT) two, unlink two, replay, close two (test mds_cleanup_orphans)"
+
+test_31() {
+    multiop $DIR/$tfile O_tSc &
+    pid=$!
+    multiop $DIR/$tfile-2 O_tSc &
+    pid2=$!
+    # give multiop a chance to open
+    sleep 1 
+    rm -f $DIR/$tfile
+
+    replay_barrier mds
+    rm -f $DIR/$tfile-2
+    fail mds
+    kill -USR1 $pid
+    wait $pid || return 2
+    kill -USR1 $pid2
+    wait $pid2 || return 4
+    [ -e $DIR/$tfile ] && return 3
+    return 0
+}
+run_test 31 "open(O_CREAT) two, unlink one, |X| unlink one, close two (test mds_cleanup_orphans)"
+
 equals_msg test complete, cleaning up
 cleanup