Whamcloud - gitweb
add prototype code for open, just passed compilation, will be revisioned later.
authorhuanghua <huanghua>
Thu, 6 Jul 2006 07:15:26 +0000 (07:15 +0000)
committerhuanghua <huanghua>
Thu, 6 Jul 2006 07:15:26 +0000 (07:15 +0000)
add prototype code for handling last_rcvd.

lustre/mdt/Makefile.in
lustre/mdt/mdt_fs.c [new file with mode: 0644]
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_open.c [new file with mode: 0644]
lustre/mdt/mdt_reint.c

index 26c17e2..46168c8 100644 (file)
@@ -1,4 +1,4 @@
 MODULES := mdt
-mdt-objs := mdt_handler.o mdt_lib.o mdt_reint.o mdt_xattr.o
+mdt-objs := mdt_handler.o mdt_lib.o mdt_reint.o mdt_xattr.o mdt_fs.o mdt_open.o
 
 @INCLUDE_RULES@
diff --git a/lustre/mdt/mdt_fs.c b/lustre/mdt/mdt_fs.c
new file mode 100644 (file)
index 0000000..239c578
--- /dev/null
@@ -0,0 +1,421 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  linux/mdt/mdt_open.c
+ *  Lustre Metadata Target (mdt) open/close file handling
+ *
+ *  Copyright (C) 2002-2006 Cluster File Systems, Inc.
+ *   Author: Huang Hua <huanghua@clusterfs.com>
+ *
+ *   This file is part of the Lustre file system, http://www.lustre.org
+ *   Lustre is a trademark of Cluster File Systems, Inc.
+ *
+ *   You may have signed or agreed to another license before downloading
+ *   this software.  If so, you are bound by the terms and conditions
+ *   of that agreement, and the following does not apply to you.  See the
+ *   LICENSE file included with this distribution for more information.
+ *
+ *   If you did not agree to a different license, then this copy of Lustre
+ *   is open source software; you can redistribute it and/or modify it
+ *   under the terms of version 2 of the GNU General Public License as
+ *   published by the Free Software Foundation.
+ *
+ *   In either case, Lustre is distributed in the hope that it will be
+ *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
+ *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   license text for more details.
+ */
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+#define DEBUG_SUBSYSTEM S_MDS
+
+#include "mdt_internal.h"
+
+/* Add client data to the MDS.  We use a bitmap to locate a free space
+ * in the last_rcvd file if cl_off is -1 (i.e. a new client).
+ * Otherwise, we just have to read the data from the last_rcvd file and
+ * we know its offset.
+ *
+ * It should not be possible to fail adding an existing client - otherwise
+ * mdt_init_server_data() callsite needs to be fixed.
+ */
+int mdt_client_add(const struct lu_context *ctxt, 
+                   struct mdt_device *mdt,
+                   struct mdt_export_data *med, 
+                   int cl_idx)
+{
+        unsigned long *bitmap = mdt->mdt_client_bitmap;
+        struct mdt_client_data *mcd = med->med_mcd;
+        struct mdt_server_data *msd = &mdt->mdt_msd;
+        int new_client = (cl_idx == -1);
+        ENTRY;
+
+        LASSERT(bitmap != NULL);
+        LASSERTF(cl_idx > -2, "%d\n", cl_idx);
+
+        /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
+         * there's no need for extra complication here
+         */
+        if (new_client) {
+                cl_idx = find_first_zero_bit(bitmap, LR_MAX_CLIENTS);
+        repeat:
+                if (cl_idx >= LR_MAX_CLIENTS ||
+                    OBD_FAIL_CHECK_ONCE(OBD_FAIL_MDS_CLIENT_ADD)) {
+                        CERROR("no room for clients - fix LR_MAX_CLIENTS\n");
+                        return -EOVERFLOW;
+                }
+                if (test_and_set_bit(cl_idx, bitmap)) {
+                        cl_idx = find_next_zero_bit(bitmap, LR_MAX_CLIENTS,
+                                                    cl_idx);
+                        goto repeat;
+                }
+        } else {
+                if (test_and_set_bit(cl_idx, bitmap)) {
+                        CERROR("MDS client %d: bit already set in bitmap!!\n",
+                               cl_idx);
+                        LBUG();
+                }
+        }
+
+        CDEBUG(D_INFO, "client at idx %d with UUID '%s' added\n",
+               cl_idx, med->med_mcd->mcd_uuid);
+
+        med->med_lr_idx = cl_idx;
+        med->med_lr_off = le32_to_cpu(msd->msd_client_start) + 
+                          (cl_idx * le16_to_cpu(msd->msd_client_size));
+        LASSERTF(med->med_lr_off > 0, "med_lr_off = %llu\n", med->med_lr_off);
+
+        if (new_client) {
+                loff_t off = med->med_lr_off;
+                int rc = 0;
+                
+                rc = mdt->mdt_last->do_body_ops->dbo_write(ctxt, 
+                                                           mdt->mdt_last,
+                                                           mcd, sizeof(*mcd), 
+                                                           &off);
+
+                if (rc)
+                        return rc;
+                CDEBUG(D_INFO, "wrote client mcd at idx %u off %llu (len %u)\n",
+                       cl_idx, off, sizeof(mcd));
+        }
+        return 0;
+}
+
+int mdt_update_server_data(const struct lu_context *ctxt, 
+                           struct mdt_device *mdt, 
+                           int sync)
+{
+        struct mdt_server_data *msd = &mdt->mdt_msd;
+        loff_t off = 0;
+        int rc = 0;
+        ENTRY;
+
+        CDEBUG(D_SUPER, "MDS mount_count is "LPU64", last_transno is "LPU64"\n",
+                mdt->mdt_mount_count, mdt->mdt_last_transno);
+      
+        msd->msd_last_transno = cpu_to_le64(mdt->mdt_last_transno);
+        rc = mdt->mdt_last->do_body_ops->dbo_write(ctxt, 
+                                                   mdt->mdt_last,
+                                                   msd,
+                                                   sizeof(*msd), &off);
+        RETURN(rc);
+
+}
+
+int mdt_client_free(const struct lu_context *ctxt, 
+                    struct mdt_device *mdt, 
+                    struct mdt_export_data *med)
+{
+        struct mdt_client_data *mcd = med->med_mcd;
+        int rc;
+        loff_t off;
+        ENTRY;
+
+        if (!mcd)
+                RETURN(0);
+
+        CDEBUG(D_INFO, "freeing client at idx %u, offset %lld with UUID '%s'\n",
+               med->med_lr_idx, med->med_lr_off, mcd->mcd_uuid);
+
+        off = med->med_lr_off;
+
+        /* Don't clear med_lr_idx here as it is likely also unset.  At worst
+         * we leak a client slot that will be cleaned on the next recovery. */
+        if (off <= 0) {
+                CERROR("client idx %d has offset %lld\n",
+                        med->med_lr_idx, off);
+                GOTO(free, rc = -EINVAL);
+        }
+
+        /* Clear the bit _after_ zeroing out the client so we don't
+           race with mdt_client_add and zero out new clients.*/
+        if (!test_bit(med->med_lr_idx, mdt->mdt_client_bitmap)) {
+                CERROR("MDT client %u: bit already clear in bitmap!!\n",
+                       med->med_lr_idx);
+                LBUG();
+        }
+
+//      if (!(exp->exp_flags & OBD_OPT_FAILOVER)) {
+        memset(&mcd, 0, sizeof *mcd);
+        rc = mdt->mdt_last->do_body_ops->dbo_write(ctxt, 
+                                                   mdt->mdt_last,
+                                                   mcd,
+                                                   sizeof(*mcd), &off);
+        CDEBUG_EX(rc == 0 ? D_INFO : D_ERROR,
+                  "zeroing out client idx %u in %s rc %d\n",
+                  med->med_lr_idx, LAST_RCVD, rc);
+
+        if (!test_and_clear_bit(med->med_lr_idx, mdt->mdt_client_bitmap)) {
+                CERROR("MDS client %u: bit already clear in bitmap!!\n",
+                       med->med_lr_idx);
+                LBUG();
+        }
+
+        /* Make sure the server's last_transno is up to date. Do this
+         * after the client is freed so we know all the client's
+         * transactions have been committed. */
+        mdt_update_server_data(ctxt, mdt, 0);
+
+        EXIT;
+free:
+        OBD_FREE_PTR(mcd);
+        med->med_mcd = NULL;
+        return 0;
+}
+
+static int mdt_init_server_data(const struct lu_context *ctxt, 
+                                struct mdt_device *mdt)
+{
+        struct mdt_server_data *msd = &mdt->mdt_msd;
+        struct mdt_client_data *mcd = NULL;
+        struct obd_device *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
+        loff_t off = 0;
+        unsigned long last_rcvd_size = 0; // = getsize(mdt->mdt_last)
+        __u64 mount_count;
+        int cl_idx;
+        int rc = 0;
+        ENTRY;
+
+        /* ensure padding in the struct is the correct size */
+        LASSERT(offsetof(struct mdt_server_data, msd_padding) +
+                sizeof(msd->msd_padding) == LR_SERVER_SIZE);
+        LASSERT(offsetof(struct mdt_client_data, mcd_padding) +
+                sizeof(mcd->mcd_padding) == LR_CLIENT_SIZE);
+
+        if (last_rcvd_size == 0) {
+                LCONSOLE_WARN("%s: new disk, initializing\n", obd->obd_name);
+
+                memcpy(msd->msd_uuid, obd->obd_uuid.uuid,sizeof(msd->msd_uuid));
+                msd->msd_last_transno = 0;
+                mount_count = msd->msd_mount_count = 0;
+                msd->msd_server_size = cpu_to_le32(LR_SERVER_SIZE);
+                msd->msd_client_start = cpu_to_le32(LR_CLIENT_START);
+                msd->msd_client_size = cpu_to_le16(LR_CLIENT_SIZE);
+                msd->msd_feature_rocompat = cpu_to_le32(OBD_ROCOMPAT_LOVOBJID);
+                msd->msd_feature_incompat = cpu_to_le32(OBD_INCOMPAT_MDT |
+                                                        OBD_INCOMPAT_COMMON_LR);
+        } else {
+                rc = mdt->mdt_last->do_body_ops->dbo_read(ctxt, 
+                                                          mdt->mdt_last,
+                                                          msd,
+                                                          sizeof(*msd), &off);
+                if (rc) {
+                        CERROR("error reading MDS %s: rc %d\n", LAST_RCVD, rc);
+                        GOTO(out, rc);
+                }
+                if (strcmp(msd->msd_uuid, obd->obd_uuid.uuid) != 0) {
+                        LCONSOLE_ERROR("Trying to start OBD %s using the wrong"
+                                       " disk %s. Were the /dev/ assignments "
+                                       "rearranged?\n",
+                                       obd->obd_uuid.uuid, msd->msd_uuid);
+                        GOTO(out, rc = -EINVAL);
+                }
+                mount_count = le64_to_cpu(msd->msd_mount_count);
+        }
+
+        if (msd->msd_feature_incompat & ~cpu_to_le32(MDT_INCOMPAT_SUPP)) {
+                CERROR("%s: unsupported incompat filesystem feature(s) %x\n",
+                       obd->obd_name, le32_to_cpu(msd->msd_feature_incompat) &
+                       ~MDT_INCOMPAT_SUPP);
+                GOTO(out, rc = -EINVAL);
+        }
+        if (msd->msd_feature_rocompat & ~cpu_to_le32(MDT_ROCOMPAT_SUPP)) {
+                CERROR("%s: unsupported read-only filesystem feature(s) %x\n",
+                       obd->obd_name, le32_to_cpu(msd->msd_feature_rocompat) &
+                       ~MDT_ROCOMPAT_SUPP);
+                /* Do something like remount filesystem read-only */
+                GOTO(out, rc = -EINVAL);
+        }
+        if (!(msd->msd_feature_incompat & cpu_to_le32(OBD_INCOMPAT_COMMON_LR))){
+                CDEBUG(D_WARNING, "using old last_rcvd format\n");
+                msd->msd_mount_count = msd->msd_last_transno;
+                msd->msd_last_transno = msd->msd_unused;
+                /* If we update the last_rcvd, we can never go back to
+                   an old install, so leave this in the old format for now.
+                msd->msd_feature_incompat |= cpu_to_le32(LR_INCOMPAT_COMMON_LR);
+                */
+        }
+        msd->msd_feature_compat = cpu_to_le32(OBD_COMPAT_MDT);
+
+        mdt->mdt_last_transno = le64_to_cpu(msd->msd_last_transno);
+
+        CDEBUG(D_INODE, "%s: server last_transno: "LPU64"\n",
+               obd->obd_name, mdt->mdt_last_transno);
+        CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n",
+               obd->obd_name, mount_count + 1);
+        CDEBUG(D_INODE, "%s: server data size: %u\n",
+               obd->obd_name, le32_to_cpu(msd->msd_server_size));
+        CDEBUG(D_INODE, "%s: per-client data start: %u\n",
+               obd->obd_name, le32_to_cpu(msd->msd_client_start));
+        CDEBUG(D_INODE, "%s: per-client data size: %u\n",
+               obd->obd_name, le32_to_cpu(msd->msd_client_size));
+        CDEBUG(D_INODE, "%s: last_rcvd size: %lu\n",
+               obd->obd_name, last_rcvd_size);
+        CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", obd->obd_name,
+               last_rcvd_size <= le32_to_cpu(msd->msd_client_start) ? 0 :
+               (last_rcvd_size - le32_to_cpu(msd->msd_client_start)) /
+                le16_to_cpu(msd->msd_client_size));
+
+        if (!msd->msd_server_size || !msd->msd_client_start ||
+            !msd->msd_client_size) {
+                CERROR("Bad last_rcvd contents!\n");
+                GOTO(out, rc = -EINVAL);
+        }
+
+        /* When we do a clean MDS shutdown, we save the last_transno into
+         * the header.  If we find clients with higher last_transno values
+         * then those clients may need recovery done. */
+        for (cl_idx = 0, off = le32_to_cpu(msd->msd_client_start);
+             off < last_rcvd_size; cl_idx++) {
+                __u64 last_transno;
+                struct obd_export *exp;
+                struct mdt_export_data *med;
+
+                if (!mcd) {
+                        OBD_ALLOC_PTR(mcd);
+                        if (!mcd)
+                                GOTO(err_client, rc = -ENOMEM);
+                }
+
+                off = le32_to_cpu(msd->msd_client_start) +
+                        cl_idx * le16_to_cpu(msd->msd_client_size);
+                rc = mdt->mdt_last->do_body_ops->dbo_read(ctxt, 
+                                                          mdt->mdt_last,
+                                                          mcd,
+                                                          sizeof(*mcd), &off);
+                if (rc) {
+                        CERROR("error reading MDS %s idx %d, off %llu: rc %d\n",
+                               LAST_RCVD, cl_idx, off, rc);
+                        break; /* read error shouldn't cause startup to fail */
+                }
+
+                if (mcd->mcd_uuid[0] == '\0') {
+                        CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
+                               cl_idx);
+                        continue;
+                }
+
+                last_transno = le64_to_cpu(mcd->mcd_last_transno);
+
+                /* These exports are cleaned up by mdt_obd_disconnect(), so they
+                 * need to be set up like real exports as mdt_obd_connect() does.
+                 */
+                CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
+                       " srv lr: "LPU64" lx: "LPU64"\n", mcd->mcd_uuid, cl_idx,
+                       last_transno, le64_to_cpu(msd->msd_last_transno),
+                       le64_to_cpu(mcd->mcd_last_xid));
+
+                exp = class_new_export(obd, (struct obd_uuid *)mcd->mcd_uuid);
+                if (IS_ERR(exp))
+                        GOTO(err_client, rc = PTR_ERR(exp));
+
+                med = &exp->exp_mdt_data;
+                med->med_mcd = mcd;
+                rc = mdt_client_add(ctxt, mdt, med, cl_idx);
+                LASSERTF(rc == 0, "rc = %d\n", rc); /* can't fail existing */
+
+                mcd = NULL;
+                exp->exp_replay_needed = 1;
+                exp->exp_connecting = 0;
+                obd->obd_recoverable_clients++;
+                obd->obd_max_recoverable_clients++;
+                class_export_put(exp);
+
+                CDEBUG(D_OTHER, "client at idx %d has last_transno = "LPU64"\n",
+                       cl_idx, last_transno);
+
+                if (last_transno > mdt->mdt_last_transno)
+                        mdt->mdt_last_transno = last_transno;
+        }
+
+        if (mcd)
+                OBD_FREE_PTR(mcd);
+
+        obd->obd_last_committed = mdt->mdt_last_transno;
+
+        if (obd->obd_recoverable_clients) {
+                CWARN("RECOVERY: service %s, %d recoverable clients, "
+                      "last_transno "LPU64"\n", obd->obd_name,
+                      obd->obd_recoverable_clients, mdt->mdt_last_transno);
+                obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
+                obd->obd_recovering = 1;
+                obd->obd_recovery_start = CURRENT_SECONDS;
+                /* Only used for lprocfs_status */
+                obd->obd_recovery_end = obd->obd_recovery_start +
+                        OBD_RECOVERY_TIMEOUT;
+        }
+
+        mdt->mdt_mount_count = mount_count + 1;
+        msd->msd_mount_count = cpu_to_le64(mdt->mdt_mount_count);
+
+        /* save it, so mount count and last_transno is current */
+        rc = mdt_update_server_data(ctxt, mdt, 1);
+        if (rc)
+                GOTO(err_client, rc);
+
+        RETURN(0);
+
+err_client:
+        class_disconnect_exports(obd);
+out:
+        return rc;
+}
+
+int mdt_fs_setup(const struct lu_context *ctxt, 
+                 struct mdt_device *mdt)
+{
+        struct lu_fid last_fid;
+        struct dt_object *last;
+        int rc;
+        ENTRY;
+
+        last = dt_store_open(ctxt, mdt->mdt_bottom, LAST_RCVD, &last_fid);
+        if(!IS_ERR(last)) {
+                mdt->mdt_last = last;
+                rc = mdt_init_server_data(ctxt, mdt);
+                if (rc) {
+                        lu_object_put(ctxt, &last->do_lu);
+                        mdt->mdt_last = NULL;
+                }
+        } else {
+                CERROR("cannot open %s: rc = %d\n", LAST_RCVD, rc);
+                rc = PTR_ERR(last);
+        }
+        return rc;
+}
+
+
+void mdt_fs_cleanup(const struct lu_context *ctxt, 
+                   struct mdt_device *mdt)
+{
+        struct obd_device *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
+
+        class_disconnect_exports(obd); /* cleans up client info too */
+
+        if (mdt->mdt_last)
+                lu_object_put(ctxt, &mdt->mdt_last->do_lu);
+        mdt->mdt_last = NULL;
+}
+
index b9a9bab..9536238 100644 (file)
@@ -553,33 +553,6 @@ static int mdt_reint(struct mdt_thread_info *info)
         RETURN(rc);
 }
 
-static int mdt_close(struct mdt_thread_info *info)
-{
-#ifdef MDT_CODE
-        /* TODO: dual to open handling, orphan handling */
-        struct mdt_body * reqbody;
-        struct mdt_body * repbody;
-
-        reqbody = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
-        repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
-
-#endif
-        return -EOPNOTSUPP;
-}
-
-static int mdt_done_writing(struct mdt_thread_info *info)
-{
-        return -EOPNOTSUPP;
-}
-
-static int mdt_pin(struct mdt_thread_info *info)
-{
-#ifdef MDT_CODE
-        /* TODO: This is open handling. */
-#endif
-        return -EOPNOTSUPP;
-}
-
 #ifdef MDT_CODE
 /* TODO these two methods not available now. */
 
@@ -805,16 +778,6 @@ struct mdt_object *mdt_object_find(const struct lu_context *ctxt,
                 return mdt_obj(o);
 }
 
-void mdt_object_put(const struct lu_context *ctxt, struct mdt_object *o)
-{
-        lu_object_put(ctxt, &o->mot_obj.mo_lu);
-}
-
-const struct lu_fid *mdt_object_fid(struct mdt_object *o)
-{
-        return lu_object_fid(&o->mot_obj.mo_lu);
-}
-
 int mdt_object_lock(struct ldlm_namespace *ns, struct mdt_object *o,
                     struct mdt_lock_handle *lh, __u64 ibits)
 {
@@ -2055,6 +2018,8 @@ static void mdt_fini(const struct lu_context *ctx, struct mdt_device *m)
         struct lu_site   *ls = d->ld_site;
 
         ENTRY;
+        
+        mdt_fs_cleanup(ctx, m);
 
         mdt_stop_ptlrpc_service(m);
 
@@ -2145,8 +2110,13 @@ static int mdt_init0(const struct lu_context *ctx, struct mdt_device *m,
         if (rc)
                 GOTO(err_free_ns, rc);
 
-        RETURN(0);
+        rc = mdt_fs_setup(ctx, m);
+        if (rc)
+                GOTO(err_stop_service, rc);
 
+        RETURN(0);
+err_stop_service:
+        mdt_stop_ptlrpc_service(m);
 err_free_ns:
         ldlm_namespace_free(m->mdt_namespace, 0);
         m->mdt_namespace = NULL;
@@ -2240,18 +2210,18 @@ static void mdt_object_free(const struct lu_context *ctxt, struct lu_object *o)
         EXIT;
 }
 
-static int mdt_object_exists(const struct lu_context *ctx,
-                             const struct lu_object *o)
-{
-        return lu_object_exists(ctx, lu_object_next(o));
-}
-
 static int mdt_object_print(const struct lu_context *ctxt,
                             struct seq_file *f, const struct lu_object *o)
 {
         return seq_printf(f, LUSTRE_MDT0_NAME"-object@%p", o);
 }
 
+int mdt_object_exists(const struct lu_context *ctx,
+                      const struct lu_object *o)
+{
+        return lu_object_exists(ctx, lu_object_next(o));
+}
+
 static struct lu_device_operations mdt_lu_ops = {
         .ldo_object_alloc   = mdt_object_alloc,
         .ldo_process_config = mdt_process_config
@@ -2376,6 +2346,64 @@ static int mdt_obd_disconnect(struct obd_export *exp)
         RETURN(rc);
 }
 
+static int mdt_init_export(struct obd_export *exp)
+{
+        struct mdt_export_data *med = &exp->exp_mdt_data;
+
+        INIT_LIST_HEAD(&med->med_open_head);
+        spin_lock_init(&med->med_open_lock);
+        exp->exp_connecting = 1;
+        RETURN(0);
+}
+
+static int mdt_destroy_export(struct obd_export *export)
+{
+        struct mdt_export_data *med;
+        struct obd_device *obd = export->exp_obd;
+        struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
+        struct lu_context ctxt;
+        int rc = 0;
+        ENTRY;
+
+        med = &export->exp_mdt_data;
+        target_destroy_export(export);
+
+        if (obd_uuid_equals(&export->exp_client_uuid, &obd->obd_uuid))
+                RETURN(0);
+
+        rc = lu_context_init(&ctxt);
+        if (rc)
+                RETURN(rc);
+
+        lu_context_enter(&ctxt);
+        /* Close any open files (which may also cause orphan unlinking). */
+        spin_lock(&med->med_open_lock);
+        while (!list_empty(&med->med_open_head)) {
+                struct list_head *tmp = med->med_open_head.next;
+                struct mdt_file_data *mfd =
+                        list_entry(tmp, struct mdt_file_data, mfd_list);
+
+                /* Remove mfd handle so it can't be found again.
+                 * We are consuming the mfd_list reference here. */
+                class_handle_unhash(&mfd->mfd_handle);
+                list_del_init(&mfd->mfd_list);
+                spin_unlock(&med->med_open_lock);
+
+                rc = mdt_mfd_close(&ctxt, mfd, 0);
+
+                if (rc)
+                        CDEBUG(D_INODE|D_IOCTL, "Error closing file: %d\n", rc);
+                spin_lock(&med->med_open_lock);
+        }
+        spin_unlock(&med->med_open_lock);
+        mdt_client_free(&ctxt, mdt, med);
+
+        lu_context_exit(&ctxt);
+        lu_context_fini(&ctxt);
+
+        RETURN(rc);
+}
+
 static int mdt_notify(struct obd_device *obd, struct obd_device *watched,
                       enum obd_notify_event ev, void *data)
 {
@@ -2402,10 +2430,12 @@ out:
 }
 
 static struct obd_ops mdt_obd_device_ops = {
-        .o_owner = THIS_MODULE,
-        .o_connect = mdt_obd_connect,
-        .o_disconnect = mdt_obd_disconnect,
-        .o_notify = mdt_notify,
+        .o_owner          = THIS_MODULE,
+        .o_connect        = mdt_obd_connect,
+        .o_disconnect     = mdt_obd_disconnect,
+        .o_init_export    = mdt_init_export,    /* By Huang Hua*/
+        .o_destroy_export = mdt_destroy_export, /* By Huang Hua*/
+        .o_notify         = mdt_notify,
 };
 
 static void mdt_device_free(const struct lu_context *ctx, struct lu_device *d)
index 84e9c7a..0e83c69 100644 (file)
@@ -10,6 +10,7 @@
  *   Author: Phil Schwan <phil@clusterfs.com>
  *   Author: Mike Shaver <shaver@clusterfs.com>
  *   Author: Nikita Danilov <nikita@clusterfs.com>
+ *   Author: Huang Hua <huanghua@clusterfs.com>
  *
  *   This file is part of the Lustre file system, http://www.lustre.org
  *   Lustre is a trademark of Cluster File Systems, Inc.
  */
 #include <lustre/lustre_idl.h>
 #include <md_object.h>
+#include <dt_object.h>
 #include <lustre_fid.h>
 #include <lustre_fld.h>
 #include <lustre_req_layout.h>
 /* LR_CLIENT_SIZE, etc. */
 #include <lustre_disk.h>
 
+
+/* Data stored per client in the last_rcvd file.  In le32 order. */
+struct mdt_client_data {
+        __u8  mcd_uuid[40];     /* client UUID */
+        __u64 mcd_last_transno; /* last completed transaction ID */
+        __u64 mcd_last_xid;     /* xid for the last transaction */
+        __u32 mcd_last_result;  /* result from last RPC */
+        __u32 mcd_last_data;    /* per-op data (disposition for open &c.) */
+        __u8  mcd_padding[LR_CLIENT_SIZE - 64];
+};
+
+/* copied from lr_server_data.
+ * mds data stored at the head of last_rcvd file. In le32 order. */
+struct mdt_server_data {
+        __u8  msd_uuid[40];        /* server UUID */
+        __u64 msd_unused;          /* was fsd_last_objid - don't use for now */
+        __u64 msd_last_transno;    /* last completed transaction ID */
+        __u64 msd_mount_count;     /* incarnation number */
+        __u32 msd_feature_compat;  /* compatible feature flags */
+        __u32 msd_feature_rocompat;/* read-only compatible feature flags */
+        __u32 msd_feature_incompat;/* incompatible feature flags */
+        __u32 msd_server_size;     /* size of server data area */
+        __u32 msd_client_start;    /* start of per-client data area */
+        __u16 msd_client_size;     /* size of per-client data area */
+        __u16 msd_subdir_count;    /* number of subdirectories for objects */
+        __u64 msd_catalog_oid;     /* recovery catalog object id */
+        __u32 msd_catalog_ogen;    /* recovery catalog inode generation */
+        __u8  msd_peeruuid[40];    /* UUID of MDS associated with this OST */
+        __u32 msd_ost_index;       /* index number of OST in LOV */
+        __u32 msd_mdt_index;       /* index number of MDT in LMV */
+        __u8  msd_padding[LR_SERVER_SIZE - 148];
+};
+
+struct mdt_object;
+/* file data for open files on MDS */
+struct mdt_file_data {
+        struct portals_handle mfd_handle; /* must be first */
+        atomic_t              mfd_refcount;
+        struct list_head      mfd_list; /* protected by med_open_lock */
+        __u64                 mfd_xid;
+        int                   mfd_mode;
+        struct mdt_object    *mfd_object;
+};
+
 struct mdt_device {
         /* super-class */
         struct md_device           mdt_md_dev;
@@ -83,24 +129,17 @@ struct mdt_device {
          * or should be placed somewhere else. */
         int                        mdt_max_mdsize;
         int                        mdt_max_cookiesize;
-
+        __u64                      mdt_mount_count;     
+        
+        struct mdt_server_data     mdt_msd;
+        unsigned long              mdt_client_bitmap[LR_MAX_CLIENTS / sizeof(long)];
+        struct dt_object          *mdt_last;
 };
-/* Data stored per client in the last_rcvd file.  In le32 order. */
-struct mdt_client_data {
-        __u8  mcd_uuid[40];     /* client UUID */
-        __u64 mcd_last_transno; /* last completed transaction ID */
-        __u64 mcd_last_xid;     /* xid for the last transaction */
-        __u32 mcd_last_result;  /* result from last RPC */
-        __u32 mcd_last_data;    /* per-op data (disposition for open &c.) */
-        __u8  mcd_padding[LR_CLIENT_SIZE - 64];
-};
-#define MDT_SERVICE_WATCHDOG_TIMEOUT (obd_timeout * 1000)
 
-static inline struct md_device_operations *mdt_child_ops(struct mdt_device * m)
-{
-        LASSERT(m->mdt_child);
-        return m->mdt_child->md_ops;
-}
+/*XXX copied from mds_internal.h */
+#define MDT_SERVICE_WATCHDOG_TIMEOUT (obd_timeout * 1000)
+#define MDT_ROCOMPAT_SUPP       (OBD_ROCOMPAT_LOVOBJID)
+#define MDT_INCOMPAT_SUPP       (OBD_INCOMPAT_MDT | OBD_INCOMPAT_COMMON_LR)
 
 enum mdt_flags {
         /*
@@ -115,22 +154,11 @@ struct mdt_object {
         struct md_object        mot_obj;
 };
 
-static inline struct md_object *mdt_object_child(struct mdt_object *o)
-{
-        return lu2md(lu_object_next(&o->mot_obj.mo_lu));
-}
-
 struct mdt_lock_handle {
         struct lustre_handle    mlh_lh;
         ldlm_mode_t             mlh_mode;
 };
 
-void mdt_lock_handle_init(struct mdt_lock_handle *lh);
-void mdt_lock_handle_fini(struct mdt_lock_handle *lh);
-
-int md_device_init(struct md_device *md, struct lu_device_type *t);
-void md_device_fini(struct md_device *md);
-
 enum {
         MDT_REP_BUF_NR_MAX = 8
 };
@@ -220,23 +248,51 @@ struct mdt_thread_info {
         struct mdt_reint_reply     mti_reint_rep;
 };
 
-int fid_lock(struct ldlm_namespace *, const struct lu_fid *,
-             struct lustre_handle *, ldlm_mode_t,
-             ldlm_policy_data_t *);
+static inline struct md_device_operations *mdt_child_ops(struct mdt_device * m)
+{
+        LASSERT(m->mdt_child);
+        return m->mdt_child->md_ops;
+}
+
+static inline struct md_object *mdt_object_child(struct mdt_object *o)
+{
+        return lu2md(lu_object_next(&o->mot_obj.mo_lu));
+}
+
+static inline struct ptlrpc_request *mdt_info_req(struct mdt_thread_info *info)
+{
+         return info->mti_pill.rc_req;
+}
+
+static inline void mdt_object_get(const struct lu_context *ctxt, 
+                                  struct mdt_object *o)
+{
+        lu_object_get(&o->mot_obj.mo_lu);
+}
 
-void fid_unlock(struct ldlm_namespace *, const struct lu_fid *,
-                struct lustre_handle *, ldlm_mode_t);
+static inline void mdt_object_put(const struct lu_context *ctxt, 
+                                  struct mdt_object *o)
+{
+        lu_object_put(ctxt, &o->mot_obj.mo_lu);
+}
 
-struct mdt_object *mdt_object_find(const struct lu_context *,
-                                   struct mdt_device *, const struct lu_fid *);
-void mdt_object_put(const struct lu_context *ctxt, struct mdt_object *);
+static inline const struct lu_fid *mdt_object_fid(struct mdt_object *o)
+{
+        return lu_object_fid(&o->mot_obj.mo_lu);
+}
 
-int mdt_object_lock(struct ldlm_namespace *, struct mdt_object *,
-                    struct mdt_lock_handle *, __u64);
+int mdt_object_lock(struct ldlm_namespace *, 
+                    struct mdt_object *,
+                    struct mdt_lock_handle *, 
+                    __u64);
 
-void mdt_object_unlock(struct ldlm_namespace *, struct mdt_object *,
+void mdt_object_unlock(struct ldlm_namespace *, 
+                       struct mdt_object *,
                        struct mdt_lock_handle *);
 
+struct mdt_object *mdt_object_find(const struct lu_context *,
+                                   struct mdt_device *, 
+                                   const struct lu_fid *);
 struct mdt_object *mdt_object_find_lock(const struct lu_context *,
                                         struct mdt_device *,
                                         const struct lu_fid *,
@@ -245,13 +301,53 @@ struct mdt_object *mdt_object_find_lock(const struct lu_context *,
 int mdt_reint_unpack(struct mdt_thread_info *info, __u32 op);
 int mdt_reint_rec(struct mdt_thread_info *);
 void mdt_pack_attr2body(struct mdt_body *b, struct lu_attr *attr);
-const struct lu_fid *mdt_object_fid(struct mdt_object *o);
-static inline struct ptlrpc_request *mdt_info_req  (struct mdt_thread_info *info)
-{
-         return info->mti_pill.rc_req;
-}
+
 int mdt_getxattr(struct mdt_thread_info *info);
 int mdt_setxattr(struct mdt_thread_info *info);
 
+void mdt_lock_handle_init(struct mdt_lock_handle *lh);
+void mdt_lock_handle_fini(struct mdt_lock_handle *lh);
+
+
+int mdt_object_exists(const struct lu_context *ctx,
+                      const struct lu_object *o);
+
+int mdt_fs_setup(const struct lu_context *ctxt,
+                 struct mdt_device *mdt);
+
+void mdt_fs_cleanup(const struct lu_context *ctxt,
+                    struct mdt_device *mdt);
+
+int mdt_client_free(const struct lu_context *ctxt,
+                    struct mdt_device *mdt,
+                    struct mdt_export_data *med);
+
+int mdt_update_server_data(const struct lu_context *ctxt,
+                           struct mdt_device *mdt,
+                           int sync);
+
+int mdt_client_add(const struct lu_context *ctxt,
+                   struct mdt_device *mdt,
+                   struct mdt_export_data *med,
+                   int cl_idx);
+
+int mdt_pin(struct mdt_thread_info* info);
+
+int mdt_lock_new_child(struct mdt_thread_info *info, 
+                       struct mdt_object *o,
+                       struct mdt_lock_handle *child_lockh);
+
+int mdt_reint_open(struct mdt_thread_info *info);
+
+int mdt_mfd_close(const struct lu_context *ctxt,
+                  struct mdt_file_data *mfd,
+                  int unlink_orphan);
+
+int mdt_close(struct mdt_thread_info *info);
+
+int mdt_done_writing(struct mdt_thread_info *info);
+
+
+
 #endif /* __KERNEL__ */
 #endif /* _MDT_H */
diff --git a/lustre/mdt/mdt_open.c b/lustre/mdt/mdt_open.c
new file mode 100644 (file)
index 0000000..9cff0e5
--- /dev/null
@@ -0,0 +1,377 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  linux/mdt/mdt_open.c
+ *  Lustre Metadata Target (mdt) open/close file handling
+ *
+ *  Copyright (C) 2002-2006 Cluster File Systems, Inc.
+ *   Author: Huang Hua <huanghua@clusterfs.com>
+ *
+ *   This file is part of the Lustre file system, http://www.lustre.org
+ *   Lustre is a trademark of Cluster File Systems, Inc.
+ *
+ *   You may have signed or agreed to another license before downloading
+ *   this software.  If so, you are bound by the terms and conditions
+ *   of that agreement, and the following does not apply to you.  See the
+ *   LICENSE file included with this distribution for more information.
+ *
+ *   If you did not agree to a different license, then this copy of Lustre
+ *   is open source software; you can redistribute it and/or modify it
+ *   under the terms of version 2 of the GNU General Public License as
+ *   published by the Free Software Foundation.
+ *
+ *   In either case, Lustre is distributed in the hope that it will be
+ *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
+ *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   license text for more details.
+ */
+
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+#define DEBUG_SUBSYSTEM S_MDS
+
+#include "mdt_internal.h"
+
+/*
+ * MDS file data handling: file data holds a handle for a file opened
+ * by a client.
+ */
+
+static void mdt_mfd_get(void *mfdp)
+{
+        struct mdt_file_data *mfd = mfdp;
+
+        atomic_inc(&mfd->mfd_refcount);
+        CDEBUG(D_INFO, "GETting mfd %p : new refcount %d\n", mfd,
+               atomic_read(&mfd->mfd_refcount));
+}
+
+/* Create a new mdt_file_data struct. 
+ * reference is set to 1 */
+static struct mdt_file_data *mdt_mfd_new(void)
+{
+        struct mdt_file_data *mfd;
+
+        OBD_ALLOC_PTR(mfd);
+        if (mfd == NULL) {
+                CERROR("mds: out of memory\n");
+                return NULL;
+        }
+
+        atomic_set(&mfd->mfd_refcount, 1);
+
+        INIT_LIST_HEAD(&mfd->mfd_handle.h_link);
+        INIT_LIST_HEAD(&mfd->mfd_list);
+        class_handle_hash(&mfd->mfd_handle, mdt_mfd_get);
+
+        return mfd;
+}
+
+/* Get a new reference on the mfd pointed to by handle, if handle is still
+ * valid.  Caller must drop reference with mdt_mfd_put(). */
+static struct mdt_file_data *mdt_handle2mfd(const struct lustre_handle *handle)
+{
+        ENTRY;
+        LASSERT(handle != NULL);
+        RETURN(class_handle2object(handle->cookie));
+}
+
+/* Drop mfd reference, freeing struct if this is the last one. */
+static void mdt_mfd_put(struct mdt_file_data *mfd)
+{
+        CDEBUG(D_INFO, "PUTting mfd %p : new refcount %d\n", mfd,
+               atomic_read(&mfd->mfd_refcount) - 1);
+        LASSERT(atomic_read(&mfd->mfd_refcount) > 0 &&
+                atomic_read(&mfd->mfd_refcount) < 0x5a5a);
+        if (atomic_dec_and_test(&mfd->mfd_refcount)) {
+                LASSERT(list_empty(&mfd->mfd_handle.h_link));
+                OBD_FREE_PTR(mfd);
+        }
+}
+
+static int mdt_object_open(struct mdt_thread_info *info,
+                           struct mdt_object *o, 
+                           int flags)
+{
+        struct mdt_export_data *med;
+        struct mdt_file_data   *mfd;
+        struct mdt_body        *repbody;
+        struct lov_mds_md      *lmm;
+        int                     rc = 0;
+        ENTRY;
+
+        med = &mdt_info_req(info)->rq_export->exp_mdt_data;
+        repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
+        lmm = req_capsule_server_get(&info->mti_pill, &RMF_MDT_MD);
+
+        rc = mo_attr_get(info->mti_ctxt, mdt_object_child(o),
+                         &info->mti_attr);
+        if (rc != 0)
+                GOTO(out, rc);
+
+        mdt_pack_attr2body(repbody, &info->mti_attr);
+        repbody->fid1 = *mdt_object_fid(o);
+        repbody->valid |= OBD_MD_FLID;
+
+        rc = mo_xattr_get(info->mti_ctxt, mdt_object_child(o),
+                          lmm, info->mti_mdt->mdt_max_mdsize, "lov");
+        if (rc < 0)
+                GOTO(out, rc = -EINVAL);
+
+        if (S_ISDIR(info->mti_attr.la_mode))
+                repbody->valid |= OBD_MD_FLDIREA;
+        else
+                repbody->valid |= OBD_MD_FLEASIZE;
+        repbody->eadatasize = rc;
+        rc = 0;
+
+        mfd = mdt_mfd_new();
+        if (mfd == NULL) {
+                CERROR("mds: out of memory\n");
+                GOTO(out, rc = -ENOMEM);
+        }
+
+        if (flags & FMODE_WRITE) {
+                /*mds_get_write_access*/
+        } else if (flags & MDS_FMODE_EXEC) {
+                /*mds_deny_write_access*/
+        }
+
+        /* keep a reference on this object for this open,
+         * and is released by mdt_mfd_close() */
+        mdt_object_get(info->mti_ctxt, o);
+
+        mfd->mfd_mode = flags;
+        mfd->mfd_object = o;
+        mfd->mfd_xid = mdt_info_req(info)->rq_xid;
+
+        spin_lock(&med->med_open_lock);
+        list_add(&mfd->mfd_list, &med->med_open_head);
+        spin_unlock(&med->med_open_lock);
+
+        repbody->handle.cookie = mfd->mfd_handle.h_cookie;
+
+        RETURN(rc);
+out:
+        return rc;
+}
+
+int mdt_pin(struct mdt_thread_info* info)
+{
+        struct mdt_object *o;
+        int rc;
+        ENTRY;
+
+        o = mdt_object_find(info->mti_ctxt, info->mti_mdt, &info->mti_body->fid1);
+        if (!IS_ERR(o)) {
+                if (mdt_object_exists(info->mti_ctxt, &o->mot_obj.mo_lu)) {
+                        rc = mdt_object_open(info, o, info->mti_body->flags);
+                        mdt_object_put(info->mti_ctxt, o);
+                } else
+                        rc = -ENOENT;
+        } else
+                rc = PTR_ERR(o);
+
+        RETURN(rc);
+}
+
+/*  Get an internal lock on the inode number (but not generation) to sync
+ *  new inode creation with inode unlink (bug 2029).  If child_lockh is NULL
+ *  we just get the lock as a barrier to wait for other holders of this lock,
+ *  and drop it right away again. */
+int mdt_lock_new_child(struct mdt_thread_info *info, 
+                       struct mdt_object *o,
+                       struct mdt_lock_handle *child_lockh)
+{
+        struct mdt_lock_handle lockh;
+        int rc;
+        
+        if (child_lockh == NULL)
+                child_lockh = &lockh;
+
+        lockh.mlh_mode = LCK_EX;
+        rc = mdt_object_lock(info->mti_mdt->mdt_namespace, 
+                             o, &lockh, MDS_INODELOCK_UPDATE);
+
+        if (rc != ELDLM_OK)
+                CERROR("can not mdt_object_lock: %d\n", rc);
+        else if (child_lockh == &lockh)
+                mdt_object_unlock(info->mti_mdt->mdt_namespace, 
+                                  o, &lockh);
+
+        RETURN(rc);
+}
+
+int mdt_reint_open(struct mdt_thread_info *info)
+{
+        struct mdt_device      *mdt = info->mti_mdt;
+        struct mdt_object      *parent;
+        struct mdt_object      *child;
+        struct mdt_lock_handle *lh;
+        struct ldlm_reply      *ldlm_rep;
+        struct ptlrpc_request  *req = mdt_info_req(info);
+        struct mdt_body        *body;
+        struct lu_fid           child_fid;
+        int                     result;
+        int                     created = 0;
+        struct mdt_reint_record *rr = &info->mti_rr;
+        ENTRY;
+
+        /* we now have no resent message, so it must be an intent */
+        LASSERT(info->mti_pill.rc_fmt == &RQF_LDLM_INTENT_OPEN);
+
+        /*TODO: MDS_CHECK_RESENT */;
+
+        ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
+        body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
+
+        lh = &info->mti_lh[MDT_LH_PARENT];
+        lh->mlh_mode = LCK_PW;
+        parent = mdt_object_find_lock(info->mti_ctxt, mdt, rr->rr_fid1,
+                                      lh, MDS_INODELOCK_UPDATE);
+        if (IS_ERR(parent))
+                GOTO(out, result = PTR_ERR(parent));
+
+        result = mdo_lookup(info->mti_ctxt, mdt_object_child(parent),
+                            rr->rr_name, &child_fid);
+        if (result && result != -ENOENT) {
+                GOTO(out_parent, result);
+        }
+
+        intent_set_disposition(ldlm_rep, DISP_LOOKUP_EXECD);
+
+        if (result == -ENOENT) {
+                intent_set_disposition(ldlm_rep, DISP_LOOKUP_NEG);
+                if (!(info->mti_attr.la_flags & MDS_OPEN_CREAT))
+                        GOTO(out_parent, result);
+                if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
+                        GOTO(out_parent, result = -EROFS);
+                child_fid = *info->mti_rr.rr_fid2;
+        } else {
+                intent_set_disposition(ldlm_rep, DISP_LOOKUP_POS);
+                if (info->mti_attr.la_flags & MDS_OPEN_EXCL &&
+                    info->mti_attr.la_flags & MDS_OPEN_CREAT)
+                        GOTO(out_parent, result = -EEXIST);
+
+        }
+
+        child = mdt_object_find(info->mti_ctxt, mdt, &child_fid);
+        if (IS_ERR(child))
+                GOTO(out_parent, result = PTR_ERR(child));
+
+        if (result == -ENOENT) {
+                /* not found and with MDS_OPEN_CREAT: let's create something */
+                result = mdo_create(info->mti_ctxt,
+                                    mdt_object_child(parent),
+                                    rr->rr_name,
+                                    mdt_object_child(child),
+                                    &info->mti_attr);
+                intent_set_disposition(ldlm_rep, DISP_OPEN_CREATE);
+                if (result != 0)
+                        GOTO(out_child, result);
+                created = 1;
+        } else
+                intent_set_disposition(ldlm_rep, DISP_OPEN_OPEN);
+
+        /* Open it now. */
+        result = mdt_object_open(info, child, info->mti_attr.la_flags);
+        GOTO(destroy_child, result);
+
+destroy_child:
+        if (result != 0 && created) {
+                mdo_unlink(info->mti_ctxt, mdt_object_child(parent),
+                           mdt_object_child(child), rr->rr_name);
+        } else if (created) {
+                mdt_lock_new_child(info, child, NULL);
+        }
+out_child:
+        mdt_object_put(info->mti_ctxt, child);
+out_parent:
+        mdt_object_unlock(mdt->mdt_namespace, parent, lh);
+        mdt_object_put(info->mti_ctxt, parent);
+out:
+        return result;
+}
+
+int mdt_mfd_close(const struct lu_context *ctxt,
+                  struct mdt_file_data *mfd, 
+                  int unlink_orphan)
+{
+        ENTRY;
+
+        if (mfd->mfd_mode & FMODE_WRITE) {
+                /*mdt_put_write_access*/
+        } else if (mfd->mfd_mode & MDS_FMODE_EXEC) {
+                /*mdt_allow_write_access*/
+        }
+
+        /* release reference on this object.
+         * it will be destroyed by lower layer if necessary.
+         */
+        mdt_object_put(ctxt, mfd->mfd_object);
+
+        mdt_mfd_put(mfd);
+        RETURN(0);
+}
+
+int mdt_close(struct mdt_thread_info *info)
+{
+        struct mdt_export_data *med;
+        struct mdt_body        *repbody;
+        struct mdt_file_data   *mfd;
+        struct mdt_object      *o;
+        struct lov_mds_md      *lmm;
+        int rc;
+        ENTRY;
+
+        med = &mdt_info_req(info)->rq_export->exp_mdt_data;
+        LASSERT(med);        
+
+        spin_lock(&med->med_open_lock);
+        mfd = mdt_handle2mfd(&(info->mti_body->handle));
+        if (mfd == NULL) {
+                spin_unlock(&med->med_open_lock);
+                CDEBUG(D_INODE, "no handle for file close ino "DFID3
+                       ": cookie "LPX64, PFID3(&info->mti_body->fid1), 
+                       info->mti_body->handle.cookie);
+                RETURN(-ESTALE);
+        }
+        class_handle_unhash(&mfd->mfd_handle);
+        list_del_init(&mfd->mfd_list);
+        spin_unlock(&med->med_open_lock);
+
+        o = mfd->mfd_object;
+        repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
+        lmm = req_capsule_server_get(&info->mti_pill, &RMF_MDT_MD);
+
+        rc = mo_attr_get(info->mti_ctxt, mdt_object_child(o),
+                         &info->mti_attr);
+        if (rc == 0) {
+                mdt_pack_attr2body(repbody, &info->mti_attr);
+                repbody->fid1 = *mdt_object_fid(o);
+                repbody->valid |= OBD_MD_FLID;
+
+                rc = mo_xattr_get(info->mti_ctxt, mdt_object_child(o),
+                                  lmm, info->mti_mdt->mdt_max_mdsize, "lov");
+                if (rc >= 0) {
+                        if (S_ISDIR(info->mti_attr.la_mode))
+                                repbody->valid |= OBD_MD_FLDIREA;
+                        else
+                                repbody->valid |= OBD_MD_FLEASIZE;
+                        repbody->eadatasize = rc;
+                        rc = 0;
+                }
+        }
+
+        rc = mdt_mfd_close(info->mti_ctxt, mfd, 1);
+
+        RETURN(rc);
+}
+
+int mdt_done_writing(struct mdt_thread_info *info)
+{
+        ENTRY;
+
+        RETURN(0);
+}
index 1330dcb..79b98bc 100644 (file)
 #include "mdt_internal.h"
 
 
-/* object operations */
-static int mdt_md_open(struct mdt_thread_info *info, struct mdt_object *child)
-{
-        ENTRY;
-        RETURN(mo_open(info->mti_ctxt, mdt_object_child(child)));
-}
-
 static int mdt_md_create(struct mdt_thread_info *info)
 {
         struct mdt_device      *mdt = info->mti_mdt;
@@ -653,131 +646,6 @@ out:
         RETURN(-EOPNOTSUPP);
 }
 
-static int mdt_reint_open(struct mdt_thread_info *info)
-{
-        struct mdt_device      *mdt = info->mti_mdt;
-        struct mdt_object      *parent;
-        struct mdt_object      *child;
-        struct mdt_lock_handle *lh;
-        int                     result;
-        struct ldlm_reply      *ldlm_rep;
-        struct ptlrpc_request  *req = mdt_info_req(info);
-        struct mdt_body        *body = info->mti_reint_rep.mrr_body;
-        struct lov_mds_md      *lmm  = info->mti_reint_rep.mrr_md;
-        struct mdt_reint_record *rr = &info->mti_rr;
-        int                     created = 0;
-        struct lu_fid          child_fid;
-        ENTRY;
-
-        lh = &info->mti_lh[MDT_LH_PARENT];
-        lh->mlh_mode = LCK_PW;
-
-        //req_capsule_pack(&info->mti_pill);
-        ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
-        body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
-
-        parent = mdt_object_find_lock(info->mti_ctxt, mdt, rr->rr_fid1,
-                                      lh, MDS_INODELOCK_UPDATE);
-        if (IS_ERR(parent))
-                RETURN(PTR_ERR(parent));
-
-        result = mdo_lookup(info->mti_ctxt, mdt_object_child(parent),
-                            rr->rr_name, &child_fid);
-        if (result && result != -ENOENT) {
-                GOTO(out_parent, result);
-        }
-
-        intent_set_disposition(ldlm_rep, DISP_LOOKUP_EXECD);
-
-        if (result == -ENOENT) {
-                intent_set_disposition(ldlm_rep, DISP_LOOKUP_NEG);
-                if (!(info->mti_attr.la_flags & MDS_OPEN_CREAT))
-                        GOTO(out_parent, result);
-                if (req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
-                        GOTO(out_parent, result = -EROFS);
-                child_fid = *info->mti_rr.rr_fid2;
-        } else {
-                intent_set_disposition(ldlm_rep, DISP_LOOKUP_POS);
-                if (info->mti_attr.la_flags & MDS_OPEN_EXCL &&
-                    info->mti_attr.la_flags & MDS_OPEN_CREAT)
-                        GOTO(out_parent, result = -EEXIST);
-
-        }
-        child = mdt_object_find(info->mti_ctxt, mdt, &child_fid);
-        if (IS_ERR(child))
-                GOTO(out_parent, result = PTR_ERR(child));
-
-        if (result == -ENOENT) {
-                /* not found and with MDS_OPEN_CREAT: let's create something */
-                result = mdo_create(info->mti_ctxt,
-                                    mdt_object_child(parent),
-                                    rr->rr_name,
-                                    mdt_object_child(child),
-                                    &info->mti_attr);
-                intent_set_disposition(ldlm_rep, DISP_OPEN_CREATE);
-                if (result != 0)
-                        GOTO(out_child, result);
-                created = 1;
-        } else {
-                result = mo_attr_get(info->mti_ctxt,
-                                     mdt_object_child(child),
-                                     &info->mti_attr);
-                if (result != 0)
-                        GOTO(destroy_child, result);
-        }
-
-        mdt_pack_attr2body(body, &info->mti_attr);
-        body->fid1 = *mdt_object_fid(child);
-        body->valid |= OBD_MD_FLID;
-
-        /* we should return "struct lov_mds_md" back*/
-        lmm = req_capsule_server_get(&info->mti_pill,
-                                     &RMF_MDT_MD);
-
-        /* not supported yet in MDD
-        result = mo_xattr_get(info->mti_ctxt, mdt_object_child(child),
-                              lmm, MAX_MD_SIZE, "lov");
-        if (result < 0)
-                GOTO(destroy_child, result = -EINVAL);
-        */
-        if (S_ISDIR(info->mti_attr.la_mode))
-                body->valid |= OBD_MD_FLDIREA;
-        else
-                body->valid |= OBD_MD_FLEASIZE;
-        body->eadatasize = result;
-        result = 0;
-
-        /* FIXME Let me fake it until the underlying works */
-        lmm->lmm_magic   = LOV_MAGIC;           /* magic number = LOV_MAGIC_V1 */
-        lmm->lmm_pattern = LOV_PATTERN_RAID0;   /* LOV_PATTERN_RAID0, LOV_PATTERN_RAID1 */
-        lmm->lmm_object_id = 1;                 /* LOV object ID */
-        lmm->lmm_object_gr = 1;                 /* LOV object group */
-        lmm->lmm_stripe_size = 4096 * 1024;     /* size of stripe in bytes */
-        lmm->lmm_stripe_count = 1;              /* num stripes in use for this object */
-                                                /* per-stripe data */
-        lmm->lmm_objects[0].l_object_id = 1;    /* OST object ID */
-        lmm->lmm_objects[0].l_object_gr = 1;    /* OST object group (creating MDS number) */
-        lmm->lmm_objects[0].l_ost_gen   = 1;    /* generation of this l_ost_idx */
-        lmm->lmm_objects[0].l_ost_idx   = 0;    /* OST index in LOV (lov_tgt_desc->tgts) */
-        body->eadatasize = sizeof(struct lov_mds_md) + sizeof(struct lov_ost_data);
-
-        /* Open it now. */
-        /* TODO: not supported yet
-        result = mdt_md_open(info, child);
-        */
-
-destroy_child:
-        if (result != 0 && created)
-                mdo_unlink(info->mti_ctxt, mdt_object_child(parent),
-                           mdt_object_child(child), rr->rr_name);
-out_child:
-        mdt_object_put(info->mti_ctxt, child);
-out_parent:
-        mdt_object_unlock(mdt->mdt_namespace, parent, lh);
-        mdt_object_put(info->mti_ctxt, parent);
-        RETURN(result);
-}
-
 
 typedef int (*mdt_reinter)(struct mdt_thread_info *info);