Whamcloud - gitweb
merge devel to lcfg
authorrread <rread>
Mon, 6 Oct 2003 20:36:22 +0000 (20:36 +0000)
committerrread <rread>
Mon, 6 Oct 2003 20:36:22 +0000 (20:36 +0000)
lustre/lov/lov_log.c [new file with mode: 0644]
lustre/mds/mds_log.c [new file with mode: 0644]
lustre/mds/mds_unlink_open.c [new file with mode: 0644]
lustre/obdclass/llog_obd.c [new file with mode: 0644]
lustre/ptlrpc/llog_net.c [new file with mode: 0644]

diff --git a/lustre/lov/lov_log.c b/lustre/lov/lov_log.c
new file mode 100644 (file)
index 0000000..6a91d80
--- /dev/null
@@ -0,0 +1,172 @@
+ /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
+ * Author: Phil Schwan <phil@clusterfs.com>
+ *         Peter Braam <braam@clusterfs.com>
+ *         Mike Shaver <shaver@clusterfs.com>
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+#define DEBUG_SUBSYSTEM S_LOV
+#ifdef __KERNEL__
+#include <linux/slab.h>
+#include <linux/module.h>
+#include <linux/init.h>
+#include <linux/slab.h>
+#include <linux/pagemap.h>
+#include <asm/div64.h>
+#else
+#include <liblustre.h>
+#endif
+
+#include <linux/obd_support.h>
+#include <linux/lustre_lib.h>
+#include <linux/lustre_net.h>
+#include <linux/lustre_idl.h>
+#include <linux/lustre_lite.h> /* for LL_IOC_LOV_[GS]ETSTRIPE */
+#include <linux/lustre_dlm.h>
+#include <linux/lustre_mds.h>
+#include <linux/obd_class.h>
+#include <linux/obd_lov.h>
+#include <linux/obd_ost.h>
+#include <linux/seq_file.h>
+#include <linux/lprocfs_status.h>
+
+#include "lov_internal.h"
+
+/* For LOV catalogs, we "nest" catalogs from the parent catalog.  What this
+ * means is that the parent catalog has a bunch of log cookies that are
+ * pointing at one catalog for each OSC.  The OSC catalogs in turn hold
+ * cookies for actual log files. */
+int lov_llog_open(struct obd_device *obd, struct obd_device *disk_obd,
+                  int index, int named, int flags, struct obd_uuid *log_uuid)
+
+{
+        struct lov_obd *lov = &obd->u.lov;
+        int i, rc;
+
+        ENTRY;
+        for (i = 0; i < lov->desc.ld_tgt_count; i++) {
+                struct obd_device *child = lov->tgts[i].ltd_exp->exp_obd;
+                rc = obd_llog_open(child, disk_obd, 
+                                   index, named, flags, log_uuid);
+                CERROR("error lov_llog_open %d\n", i);
+                if (rc) 
+                        break;
+        }
+        RETURN(rc);
+}
+
+int lov_get_catalogs(struct lov_obd *lov, struct llog_handle *cathandle)
+{
+        struct obd_device *obd = cathandle->lgh_obd;
+        struct lustre_handle conn;
+        struct obd_export *exp;
+        struct obd_uuid cluuid = { "MDS_OSC_UUID" }; 
+        int rc = 0, i;
+        ENTRY;
+
+        for (i = 0; i < lov->desc.ld_active_tgt_count; i ++) {
+                rc = class_connect(&conn, obd, &cluuid);
+                if (rc) {
+                        CERROR("failed %d: \n", rc);
+                        GOTO(out, rc);
+                }
+                exp = class_conn2export(&conn);
+                lov->tgts[i].ltd_exp->exp_obd->obd_log_exp = exp;
+                lov->tgts[i].ltd_cathandle = cathandle;
+        }
+                
+        lov->lo_catalog_loaded = 1;
+        RETURN(rc);
+
+out:
+        while (--i > 0) {
+                class_disconnect(lov->tgts[i].ltd_exp->exp_obd->obd_log_exp, 0);
+                lov->tgts[i].ltd_cathandle = cathandle;
+        }
+        RETURN(rc);
+}
+
+/* Add log records for each OSC that this object is striped over, and return
+ * cookies for each one.  We _would_ have nice abstraction here, except that
+ * we need to keep cookies in stripe order, even if some are NULL, so that
+ * the right cookies are passed back to the right OSTs at the client side.
+ * Unset cookies should be all-zero (which will never occur naturally). */
+int lov_log_add(struct obd_export *exp,
+                       struct llog_handle *cathandle,
+                       struct llog_rec_hdr *rec, struct lov_stripe_md *lsm,
+                       struct llog_cookie *logcookies, int numcookies)
+{
+        struct obd_device *obd = class_exp2obd(exp);
+        struct lov_obd *lov = &obd->u.lov;
+        struct lov_oinfo *loi;
+        int i, rc = 0;
+        ENTRY;
+
+        LASSERT(logcookies && numcookies >= lsm->lsm_stripe_count);
+
+        if (unlikely(!lov->lo_catalog_loaded))
+                lov_get_catalogs(lov, cathandle);
+
+        for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
+                rc += obd_log_add(lov->tgts[loi->loi_ost_idx].ltd_exp,
+                                  lov->tgts[loi->loi_ost_idx].ltd_cathandle,
+                                  rec, NULL, logcookies + rc, numcookies - rc);
+        }
+
+        RETURN(rc);
+}
+
+int lov_log_cancel(struct obd_export *exp, struct lov_stripe_md *lsm,
+                          int count, struct llog_cookie *cookies, int flags)
+{
+        struct lov_obd *lov;
+        struct lov_oinfo *loi;
+        int rc = 0, i;
+        ENTRY;
+
+        LASSERT(lsm != NULL);
+        if (exp == NULL || exp->exp_obd == NULL)
+                RETURN(-ENODEV);
+
+        LASSERT(count == lsm->lsm_stripe_count);
+
+        loi = lsm->lsm_oinfo;
+        lov = &exp->exp_obd->u.lov;
+        for (i = 0; i < count; i++, cookies++, loi++) {
+                int err;
+
+                err = obd_log_cancel(lov->tgts[loi->loi_ost_idx].ltd_exp,
+                                     lov->tgts[loi->loi_ost_idx].ltd_cathandle,
+                                     NULL, 1, cookies, flags);
+
+                if (err && lov->tgts[loi->loi_ost_idx].active) {
+                        CERROR("error: objid "LPX64" subobj "LPX64
+                               " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
+                               loi->loi_id, loi->loi_ost_idx, err);
+                        if (!rc)
+                                rc = err;
+                }
+        }
+        RETURN(rc);
+}
+
diff --git a/lustre/mds/mds_log.c b/lustre/mds/mds_log.c
new file mode 100644 (file)
index 0000000..7207167
--- /dev/null
@@ -0,0 +1,140 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  lustre/mds/mds_log.c
+ *
+ *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
+ *   Author: Peter Braam <braam@clusterfs.com>
+ *   Author: Andreas Dilger <adilger@clusterfs.com>
+ *   Author: Phil Schwan <phil@clusterfs.com>
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#define DEBUG_SUBSYSTEM S_MDS
+
+#include <linux/config.h>
+#include <linux/module.h>
+#include <linux/version.h>
+
+#include <portals/list.h>
+#include <linux/obd_class.h>
+#include <linux/lustre_fsfilt.h>
+#include <linux/lustre_commit_confd.h>
+
+#include "mds_internal.h"
+
+
+struct llog_handle *mds_get_catalog(struct obd_device *obd)
+{
+        struct mds_obd *mds = &obd->u.mds;
+        struct mds_server_data *msd = mds->mds_server_data;
+        struct obd_run_ctxt saved;
+        struct llog_handle *cathandle = NULL;
+        struct llog_logid logid;
+        int rc;
+        ENTRY;
+
+        push_ctxt(&saved, &obd->obd_ctxt, NULL);
+        if (msd->msd_catalog_oid) {
+                logid.lgl_oid = le64_to_cpu(msd->msd_catalog_oid);
+                logid.lgl_ogen = le32_to_cpu(msd->msd_catalog_ogen);
+                rc = llog_create(obd, &cathandle, &logid, NULL);
+                if (rc) {
+                        CERROR("error opening catalog "LPX64":%x: rc %d\n",
+                               logid.lgl_oid, logid.lgl_ogen,
+                               (int)PTR_ERR(cathandle));
+                        msd->msd_catalog_oid = 0;
+                        msd->msd_catalog_ogen = 0;
+                }
+        }
+
+        if (!msd->msd_catalog_oid) {
+                rc = llog_create(obd, &cathandle, NULL, NULL);
+                if (rc) {
+                        CERROR("error creating new catalog: rc %d\n", rc);
+                        cathandle = ERR_PTR(rc);
+                        GOTO(out, cathandle);
+                }
+                logid = cathandle->lgh_id;
+                msd->msd_catalog_oid = cpu_to_le64(logid.lgl_oid);
+                msd->msd_catalog_ogen = cpu_to_le32(logid.lgl_ogen);
+                rc = mds_update_server_data(obd, 0);
+                if (rc) {
+                        CERROR("error writing new catalog to disk: rc %d\n",rc);
+                        GOTO(out_handle, rc);
+                }
+        }
+
+        //rc = llog_init_handle(cathandle, LLOG_F_IS_CAT, &obd->u.filter.fo_mdc_uuid);
+        rc = llog_init_handle(cathandle, LLOG_F_IS_CAT, &obd->obd_uuid);
+        if (rc)
+                GOTO(out_handle, rc);
+out:
+        pop_ctxt(&saved, &obd->obd_ctxt, NULL);
+        RETURN(cathandle);
+
+out_handle:
+        llog_close(cathandle);
+        cathandle = ERR_PTR(rc);
+        goto out;
+}
+
+
+int mds_log_op_unlink(struct obd_device *obd, 
+                      struct inode *inode, struct lustre_msg *repmsg,
+                      int offset)
+{
+        struct mds_obd *mds = &obd->u.mds;
+        struct lov_stripe_md *lsm = NULL;
+        struct llog_unlink_rec *lur;
+        int rc;
+        ENTRY;
+
+        if (IS_ERR(mds->mds_osc_obd))
+                RETURN(PTR_ERR(mds->mds_osc_obd));
+
+        rc = obd_unpackmd(mds->mds_osc_exp, &lsm,
+                          lustre_msg_buf(repmsg, offset, 0),
+                          repmsg->buflens[offset]);
+        if (rc < 0)
+                RETURN(rc);
+
+        OBD_ALLOC(lur, sizeof(*lur));
+        if (!lur)
+                RETURN(-ENOMEM);
+        lur->lur_hdr.lrh_len = lur->lur_tail.lrt_len = sizeof(*lur);
+        lur->lur_hdr.lrh_type = MDS_UNLINK_REC;
+        lur->lur_oid = inode->i_ino;
+        lur->lur_ogen = inode->i_generation;
+
+#ifdef ENABLE_ORPHANS
+#if 0
+        rc = obd_log_add(mds->mds_osc_exp, mds->mds_catalog, &lur->lur_hdr,
+                         lsm, lustre_msg_buf(repmsg, offset + 1, 0),
+                         repmsg->buflens[offset+1]/sizeof(struct llog_cookie),
+                         NULL);
+#endif
+        rc = lov_log_add(mds->mds_osc_exp, mds->mds_catalog, &lur->lur_hdr,
+                         lsm, lustre_msg_buf(repmsg, offset + 1, 0),
+                         repmsg->buflens[offset+1]/sizeof(struct llog_cookie));
+#endif
+
+        obd_free_memmd(mds->mds_osc_exp, &lsm);
+        OBD_FREE(lur, sizeof(*lur));
+
+        RETURN(rc);
+}
diff --git a/lustre/mds/mds_unlink_open.c b/lustre/mds/mds_unlink_open.c
new file mode 100644 (file)
index 0000000..82cc150
--- /dev/null
@@ -0,0 +1,298 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  lustre/mds/mds_orphan.c
+ *
+ *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
+ *   Author: Peter Braam <braam@clusterfs.com>
+ *   Author: Andreas Dilger <adilger@clusterfs.com>
+ *   Author: Phil Schwan <phil@clusterfs.com>
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+/* code for handling open unlinked files */
+
+#define DEBUG_SUBSYSTEM S_MDS
+
+#include <linux/config.h>
+#include <linux/module.h>
+#include <linux/version.h>
+
+#include <portals/list.h>
+#include <linux/obd_class.h>
+#include <linux/lustre_fsfilt.h>
+#include <linux/lustre_commit_confd.h>
+#include <linux/lvfs.h>
+
+#include "mds_internal.h"
+
+
+/* If we are unlinking an open file/dir (i.e. creating an orphan) then
+ * we instead link the inode into the PENDING directory until it is
+ * finally released.  We can't simply call mds_reint_rename() or some
+ * part thereof, because we don't have the inode to check for link
+ * count/open status until after it is locked.
+ *
+ * For lock ordering, we always get the PENDING, then pending_child lock
+ * last to avoid deadlocks.
+ */
+
+int mds_open_unlink_rename(struct mds_update_record *rec,
+                           struct obd_device *obd, struct dentry *dparent,
+                           struct dentry *dchild, void **handle)
+{
+        struct mds_obd *mds = &obd->u.mds;
+        struct inode *pending_dir = mds->mds_pending_dir->d_inode;
+        struct dentry *pending_child;
+        char fidname[LL_FID_NAMELEN];
+        int fidlen = 0, rc;
+        ENTRY;
+
+        LASSERT(!mds_inode_is_orphan(dchild->d_inode));
+
+        down(&pending_dir->i_sem);
+        fidlen = ll_fid2str(fidname, dchild->d_inode->i_ino,
+                            dchild->d_inode->i_generation);
+
+        CDEBUG(D_ERROR, "pending destroy of %dx open file %s = %s\n",
+               mds_open_orphan_count(dchild->d_inode),
+               rec->ur_name, fidname);
+
+        pending_child = lookup_one_len(fidname, mds->mds_pending_dir, fidlen);
+        if (IS_ERR(pending_child))
+                GOTO(out_lock, rc = PTR_ERR(pending_child));
+
+        if (pending_child->d_inode != NULL) {
+                CERROR("re-destroying orphan file %s?\n", rec->ur_name);
+                LASSERT(pending_child->d_inode == dchild->d_inode);
+                GOTO(out_dput, rc = 0);
+        }
+
+        *handle = fsfilt_start(obd, pending_dir, FSFILT_OP_RENAME, NULL);
+        if (IS_ERR(*handle))
+                GOTO(out_dput, rc = PTR_ERR(*handle));
+
+        lock_kernel();
+        rc = vfs_rename(dparent->d_inode, dchild, pending_dir, pending_child);
+        unlock_kernel();
+        if (rc)
+                CERROR("error renaming orphan %lu/%s to PENDING: rc = %d\n",
+                       dparent->d_inode->i_ino, rec->ur_name, rc);
+        else
+                mds_inode_set_orphan(dchild->d_inode);
+out_dput:
+        dput(pending_child);
+out_lock:
+        up(&pending_dir->i_sem);
+        RETURN(rc);
+}
+
+
+int mds_cleanup_orphans(struct obd_device *obd)
+{
+        struct mds_obd *mds = &obd->u.mds;
+        struct obd_run_ctxt saved;
+        struct file *file;
+        struct dentry *dchild; 
+        struct inode *child_inode, *pending_dir = mds->mds_pending_dir->d_inode;
+        struct l_linux_dirent *dirent, *ptr;
+        unsigned int count = pending_dir->i_size;
+        void *handle = NULL;
+        struct lov_mds_md *lmm = NULL;
+        struct lov_stripe_md *lsm = NULL;
+        struct obd_trans_info oti = { 0 };
+        struct obdo *oa;
+        struct ptlrpc_request *req;
+        struct mds_body *body;
+        int lengths[3] = {sizeof(struct mds_body),
+                          mds->mds_max_mdsize,
+                          mds->mds_max_cookiesize}; 
+        int rc = 0, rc2 = 0, item = 0;
+        ENTRY;
+        
+        push_ctxt(&saved, &obd->obd_ctxt, NULL);
+        dget(mds->mds_pending_dir);
+        mntget(mds->mds_vfsmnt);
+        file = dentry_open(mds->mds_pending_dir, mds->mds_vfsmnt, 
+                           O_RDONLY | O_LARGEFILE);
+        if (IS_ERR(file)) 
+                GOTO(err_open, rc2 = PTR_ERR(file));
+        
+        OBD_ALLOC(dirent, count);
+        if (dirent == NULL)
+                GOTO(err_alloc_dirent, rc2 = -ENOMEM);
+
+        rc = l_readdir(file, dirent, count); 
+        filp_close(file, 0);
+        if (rc < 0)
+                GOTO(err_out, rc2 = rc);
+
+        for (ptr = dirent; (char *)ptr < (char *)dirent + rc; 
+                        (char *)ptr += ptr->d_reclen) {
+                int namlen = strlen(ptr->d_name); 
+
+                if (((namlen == 1) && !strcmp(ptr->d_name, ".")) ||
+                    ((namlen == 2) && !strcmp(ptr->d_name, ".."))) 
+                        continue;
+
+                down(&pending_dir->i_sem);
+                dchild = lookup_one_len(ptr->d_name, mds->mds_pending_dir, namlen); 
+                if (IS_ERR(dchild)) {
+                        up(&pending_dir->i_sem);
+                        GOTO(err_out, rc2 = PTR_ERR(dchild));
+                }
+                if (!dchild->d_inode) {
+                        CDEBUG(D_ERROR, "orphan %s has been deleted\n", ptr->d_name);
+                        GOTO(next, rc2 = 0);
+                }
+
+                child_inode = dchild->d_inode;
+                if (mds_inode_is_orphan(child_inode) && 
+                    mds_open_orphan_count(child_inode)) {
+                        CDEBUG(D_ERROR, "orphan %s was re-opened during recovery\n",
+                               ptr->d_name);
+                        GOTO(next, rc2 = 0);
+                }
+
+                CDEBUG(D_ERROR, "cleanup orphan %s start on mds and ost:\n", ptr->d_name);
+
+                LASSERT(mds->mds_osc_obd != NULL);
+
+                OBD_ALLOC(req, sizeof(*req)); 
+                if (!req) {
+                        CERROR("request allocation out of memory\n");
+                        GOTO(err_lov_conn, rc2 = -ENOMEM);
+                }
+                rc2 = lustre_pack_msg(3, lengths, NULL, &req->rq_replen,
+                                      &req->rq_repmsg);
+                if (rc2) {
+                        CERROR("cannot pack request %d\n", rc2);
+                        OBD_FREE(req, sizeof(*req));
+                        GOTO(out_free_req, rc2);
+                }
+                body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*body));
+                LASSERT(body != NULL);
+
+                mds_pack_inode2body(body, child_inode);
+                mds_pack_md(obd, req->rq_repmsg, 1, body, child_inode);
+                lmm = lustre_msg_buf(req->rq_repmsg, 1, 0);
+
+#ifdef ENABLE_ORPHANS
+                if (mds_log_op_unlink(obd, child_inode,
+                                              req->rq_repmsg, 1) > 0)
+                        oa->o_valid |= OBD_MD_FLCOOKIE;
+#endif
+
+                rc2 = obd_unpackmd(mds->mds_osc_exp, &lsm, lmm, body->eadatasize);
+                if (rc2 < 0) {
+                        CERROR("Error unpack md %p\n", lmm);
+                        GOTO(out_free_req, rc2 = 0);
+                } else {
+                        LASSERT(rc2 >= sizeof(*lsm));
+                        rc2 = 0;                
+                }
+
+                oa = obdo_alloc();
+                if (oa == NULL)
+                        GOTO(err_alloc_oa, rc2 = -ENOMEM);
+
+                oa->o_id = lsm->lsm_object_id;
+                oa->o_mode = child_inode->i_mode & S_IFMT;
+                oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE;
+
+#ifdef ENABLE_ORPHANS
+                if (oa->o_valid & OBD_MD_FLCOOKIE) {
+                        oti.oti_logcookies =
+                                lustre_msg_buf(req->rq_repmsg, 2,
+                                               sizeof(struct llog_cookie) *
+                                               lsm->lsm_stripe_count);
+                        if (oti.oti_logcookies == NULL) 
+                                oa->o_valid &= ~OBD_MD_FLCOOKIE;
+                }
+#endif
+
+                rc2 = obd_destroy(mds->mds_osc_exp, oa, lsm, &oti);
+                obdo_free(oa); 
+                if (rc2) {
+                        CERROR("destroy orphan objid 0x"LPX64" on ost error %d\n",
+                               lsm->lsm_object_id, rc2);
+                        GOTO(out_free_memmd, rc2 = 0);
+                }
+                item ++;
+
+                CDEBUG(D_ERROR, "removed orphan %s object from ost successlly!\n", 
+                       ptr->d_name);
+
+                handle = fsfilt_start(obd, pending_dir, FSFILT_OP_UNLINK, NULL); 
+                if (IS_ERR(handle)) {
+                        rc2 = PTR_ERR(handle);
+                        CERROR("error fsfilt_start: %d\n", rc2);
+                        handle = NULL;
+                        GOTO(err_alloc_oa, rc2);
+                }
+                rc2 = vfs_unlink(pending_dir, dchild);
+                if (rc2) {
+                        CERROR("error unlinking orphan from PENDING directory");
+                        CERROR("%s: rc %d\n", ptr->d_name, rc2); 
+                }
+                if (handle) {
+                        int err = fsfilt_commit(obd, pending_dir, handle, 0);
+                        if (err) {
+                                CERROR("error committing orphan unlink: %d\n", err);
+                                rc2 = err;
+                                GOTO(err_alloc_oa, rc2);
+                        }
+                }
+
+                CDEBUG(D_ERROR, "removed orphan %s from mds successfully!\n", 
+                       ptr->d_name);
+
+out_free_memmd:
+                obd_free_memmd(mds->mds_osc_exp, &lsm);
+out_free_req:
+                OBD_FREE(req, sizeof(*req));
+next:
+                l_dput(dchild);
+                up(&pending_dir->i_sem);
+        }
+err_out:
+        OBD_FREE(dirent, count);
+err_pop:
+        pop_ctxt(&saved, &obd->obd_ctxt, NULL);
+        if (rc2 == 0)
+                rc2 = item;
+
+        RETURN(rc2);
+
+err_open:
+        mntput(mds->mds_vfsmnt);
+        l_dput(mds->mds_pending_dir);
+        goto err_pop;
+
+err_alloc_dirent:
+        filp_close(file, 0);
+        goto err_pop;
+
+err_alloc_oa:
+        obd_free_memmd(mds->mds_osc_exp, &lsm);
+        OBD_FREE(req, sizeof(*req));
+
+err_lov_conn:
+        l_dput(dchild);
+        up(&pending_dir->i_sem);
+        goto err_out;
+}
diff --git a/lustre/obdclass/llog_obd.c b/lustre/obdclass/llog_obd.c
new file mode 100644 (file)
index 0000000..d8eb374
--- /dev/null
@@ -0,0 +1,69 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ */
+
+#define DEBUG_SUBSYSTEM S_LOG
+
+#ifndef EXPORT_SYMTAB
+#define EXPORT_SYMTAB
+#endif
+
+#include <linux/fs.h>
+#include <linux/obd_class.h>
+#include <linux/lustre_log.h>
+#include <portals/list.h>
+
+int obd_llog_open(struct obd_device *obd, struct obd_device *disk_obd,
+                  int index, int named, int flags, struct obd_uuid *log_uuid)
+{
+        int rc;
+        ENTRY;
+
+        
+
+        RETURN(rc);
+}
+EXPORT_SYMBOL(obd_llog_open);
+
+int obd_log_add(struct obd_export *exp, struct llog_handle *cathandle,
+                struct llog_rec_hdr *rec, void *buf, 
+                struct llog_cookie *logcookies, int numcookies)
+{
+        struct obd_device *obd = class_exp2obd(exp->exp_obd->obd_log_exp);
+        struct obd_run_ctxt saved;
+        int rc;
+        ENTRY;
+
+        LASSERT(cathandle != NULL);
+        push_ctxt(&saved, &obd->obd_ctxt, NULL); 
+        rc = llog_cat_add_rec(cathandle, rec, logcookies, buf);
+        if (rc != 1)
+                CERROR("write one catalog record failed: %d\n", rc);
+        pop_ctxt(&saved, &obd->obd_ctxt, NULL);
+
+        RETURN(rc);
+}
+EXPORT_SYMBOL(obd_log_add);
+
+int obd_log_cancel(struct obd_export *exp, struct llog_handle *cathandle,
+                   void *buf, int count, struct llog_cookie *cookies, int flags)
+{
+        struct obd_device *obd = class_exp2obd(exp->exp_obd->obd_log_exp);
+        struct obd_run_ctxt saved;
+        int rc;
+        ENTRY;
+
+        LASSERT(cathandle != NULL);
+        push_ctxt(&saved, &obd->obd_ctxt, NULL); 
+        rc = llog_cat_cancel_records(cathandle, count, cookies);
+        if (rc)
+                CERROR("cancel %d catalog record failed: %d\n", count, rc);
+        pop_ctxt(&saved, &obd->obd_ctxt, NULL);
+
+        RETURN(rc);
+}
+EXPORT_SYMBOL(obd_log_cancel);
diff --git a/lustre/ptlrpc/llog_net.c b/lustre/ptlrpc/llog_net.c
new file mode 100644 (file)
index 0000000..5944caf
--- /dev/null
@@ -0,0 +1,78 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
+ *   Author: Andreas Dilger <adilger@clusterfs.com>
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ * OST<->MDS recovery logging infrastructure.
+ *
+ * Invariants in implementation:
+ * - we do not share logs among different OST<->MDS connections, so that
+ *   if an OST or MDS fails it need only look at log(s) relevant to itself
+ */
+
+#define DEBUG_SUBSYSTEM S_LOG
+
+#ifndef EXPORT_SYMTAB
+#define EXPORT_SYMTAB
+#endif
+
+#include <linux/fs.h>
+#include <linux/obd_class.h>
+#include <linux/lustre_log.h>
+#include <portals/list.h>
+#include <linux/lvfs.h>
+
+
+/* This is a callback from the llog_* functions.
+ * Assumes caller has already pushed us into the kernel context. */
+static int llog_net_create(struct obd_device *obd, struct llog_handle **res,
+                            struct llog_logid *logid, char *name)
+{
+        struct llog_handle *handle;
+        ENTRY;
+
+        handle = llog_alloc_handle();
+        if (handle == NULL)
+                RETURN(-ENOMEM);
+        *res = handle;
+
+        if (!logid) {
+                CERROR("llog_net_create: must pass logid\n");
+                llog_free_handle(handle);
+                RETURN(-EINVAL);
+        }
+
+        handle->lgh_file = NULL;
+        handle->lgh_obd = obd;
+        handle->lgh_id.lgl_ogr = 1;
+        handle->lgh_id.lgl_oid =
+                handle->lgh_file->f_dentry->d_inode->i_ino;
+        handle->lgh_id.lgl_ogen =
+                handle->lgh_file->f_dentry->d_inode->i_generation;
+
+        RETURN(0);
+}
+
+struct llog_operations llog_net_ops = {
+        //lop_next_block:  llog_lvfs_next_block,
+        //lop_read_header: llog_lvfs_read_header,
+        lop_create:      llog_net_create,
+};
+
+EXPORT_SYMBOL(llog_lvfs_ops);