Whamcloud - gitweb
add reint handlers: The first step is adding mkdir.
authorhuanghua <huanghua>
Thu, 27 Apr 2006 04:06:36 +0000 (04:06 +0000)
committerhuanghua <huanghua>
Thu, 27 Apr 2006 04:06:36 +0000 (04:06 +0000)
lustre/include/linux/lustre_idl.h
lustre/include/linux/lustre_mdt.h [new file with mode: 0644]
lustre/mdt/Makefile.in
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_lib.c [new file with mode: 0644]
lustre/mdt/mdt_reint.c [new file with mode: 0644]

index 2e3e3c4..52632e7 100644 (file)
@@ -710,7 +710,7 @@ typedef enum {
 //      REINT_CLOSE    = 7,
 //      REINT_WRITE    = 8,
         REINT_MAX
-} mds_reint_t;
+} mds_reint_t,mdt_reint_t;
 
 /* the disposition of the intent outlines what was executed */
 #define DISP_IT_EXECD     0x01
diff --git a/lustre/include/linux/lustre_mdt.h b/lustre/include/linux/lustre_mdt.h
new file mode 100644 (file)
index 0000000..8b9af68
--- /dev/null
@@ -0,0 +1,70 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *   This file is part of Lustre, http://www.lustre.org
+ *
+ * MDS data structures.
+ * See also lustre_idl.h for wire formats of requests.
+ */
+
+#ifndef _LUSTRE_MDT_H
+#define _LUSTRE_MDT_H
+
+#ifdef __KERNEL__
+# include <linux/fs.h>
+# include <linux/dcache.h>
+#endif
+#include <linux/lustre_handles.h>
+#include <libcfs/kp30.h>
+#include <linux/lustre_idl.h>
+#include <linux/lustre_lib.h>
+#include <linux/lustre_dlm.h>
+#include <linux/lustre_log.h>
+#include <linux/lustre_export.h>
+
+
+struct mdt_reint_record {
+        __u32 ur_opcode;
+        struct lu_fid *ur_fid1;
+        struct lu_fid *ur_fid2;
+        int ur_namelen;
+        char *ur_name;
+        int ur_tgtlen;
+        char *ur_tgt;
+        int ur_eadatalen;
+        void *ur_eadata;
+        int ur_cookielen;
+        struct llog_cookie *ur_logcookies;
+        struct iattr ur_iattr;
+        struct lvfs_ucred ur_uc;
+        __u64 ur_rdev;
+        __u64 ur_time;
+        __u32 ur_mode;
+        __u32 ur_flags;
+        struct lvfs_grp_hash_entry *ur_grp_entry;
+};
+
+/* file data for open files on MDT */
+struct mdt_file_data {
+        struct portals_handle mfd_handle; /* must be first */
+        atomic_t              mfd_refcount;
+        struct list_head      mfd_list; /* protected by med_open_lock */
+        __u64                 mfd_xid;
+        int                   mfd_mode;
+        struct dentry        *mfd_dentry;
+};
+
+
+/* ioctls for trying requests */
+#define IOC_REQUEST_TYPE                   'f'
+#define IOC_REQUEST_MIN_NR                 30
+
+#define IOC_REQUEST_GETATTR             _IOWR('f', 30, long)
+#define IOC_REQUEST_READPAGE            _IOWR('f', 31, long)
+#define IOC_REQUEST_SETATTR             _IOWR('f', 32, long)
+#define IOC_REQUEST_CREATE              _IOWR('f', 33, long)
+#define IOC_REQUEST_OPEN                _IOWR('f', 34, long)
+#define IOC_REQUEST_CLOSE               _IOWR('f', 35, long)
+#define IOC_REQUEST_MAX_NR               35
+
+#endif
index 468e35f..846af6e 100644 (file)
@@ -1,4 +1,4 @@
 MODULES := mdt
-mdt-objs := mdt_handler.o
+mdt-objs := mdt_handler.o mdt_lib.o mdt_reint.o
 
 @INCLUDE_RULES@
index d563b36..07c94ad 100644 (file)
@@ -70,39 +70,6 @@ static struct lu_fid     *mdt_object_fid(struct mdt_object *o);
 static struct lu_context_key       mdt_thread_key;
 static struct lu_object_operations mdt_obj_ops;
 
-/* object operations */
-static int mdt_md_mkdir(struct mdt_thread_info *info, struct mdt_device *d,
-                        struct lu_fid *pfid, const char *name,
-                        struct lu_fid *cfid)
-{
-        struct mdt_object      *o;
-        struct mdt_object      *child;
-        struct mdt_lock_handle *lh;
-
-        int result;
-
-        lh = &info->mti_lh[MDT_LH_PARENT];
-        lh->mlh_mode = LCK_PW;
-
-        o = mdt_object_find_lock(info->mti_ctxt,
-                                 d, pfid, lh, MDS_INODELOCK_UPDATE);
-        if (IS_ERR(o))
-                return PTR_ERR(o);
-
-        child = mdt_object_find(info->mti_ctxt, d, cfid);
-        if (!IS_ERR(child)) {
-                struct md_object *next = mdt_object_child(o);
-
-                result = next->mo_ops->moo_mkdir(info->mti_ctxt, next, name,
-                                                 mdt_object_child(child));
-                mdt_object_put(info->mti_ctxt, child);
-        } else
-                result = PTR_ERR(child);
-        mdt_object_unlock(d->mdt_namespace, o, lh);
-        mdt_object_put(info->mti_ctxt, o);
-        return result;
-}
-
 static int mdt_getstatus(struct mdt_thread_info *info,
                          struct ptlrpc_request *req, int offset)
 {
@@ -282,7 +249,51 @@ static int mdt_readpage(struct mdt_thread_info *info,
 static int mdt_reint(struct mdt_thread_info *info,
                      struct ptlrpc_request *req, int offset)
 {
-        return -EOPNOTSUPP;
+        __u32 *opcp = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_REC_OFF,
+                                     sizeof (*opcp));
+        __u32  opc;
+        int size[] = { sizeof(struct mdt_body), sizeof(struct lov_mds_md), /*FIXME*/
+                       sizeof(struct llog_cookie)};
+        int bufcount,rc;
+        struct mdt_reint_record *rec; /* 116 bytes on the stack?  no sir! */
+
+        ENTRY;
+
+        /* NB only peek inside req now; mdt_XXX_unpack() will swab it */
+        if (opcp == NULL) {
+                CERROR ("Can't inspect opcode\n");
+                RETURN (-EINVAL);
+        }
+        opc = *opcp;
+        if (lustre_msg_swabbed (req->rq_reqmsg))
+                __swab32s(&opc);
+
+        DEBUG_REQ(D_INODE, req, "reint opt = %d", opc);
+
+        OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
+
+        if (opc == REINT_UNLINK || opc == REINT_RENAME)
+                bufcount = 3;
+        else if (opc == REINT_OPEN)
+                bufcount = 2;
+        rc = lustre_pack_reply(req, bufcount, size, NULL);
+        if (rc)
+                RETURN (rc);
+
+        OBD_ALLOC(rec, sizeof(*rec));
+        if (rec == NULL)
+                RETURN(-ENOMEM);
+
+        rc = mdt_reint_unpack(info, req, offset, rec);
+        if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) {
+                CERROR("invalid record\n");
+                GOTO(out, req->rq_status = -EINVAL);
+        }
+
+        rc = mdt_reint_rec(info, rec, offset, req, NULL);
+out:
+        OBD_FREE(rec, sizeof(*rec));
+        RETURN(rc);
 }
 
 static int mdt_close(struct mdt_thread_info *info,
index 9435ed8..e2f85e2 100644 (file)
@@ -1,7 +1,7 @@
 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
- *  lustre/mds/handler.c
+ *  lustre/mdt/mdt_internal.h
  *  Lustre Metadata Target (mdt) request handler
  *
  *  Copyright (c) 2006 Cluster File Systems, Inc.
@@ -47,6 +47,7 @@
 #include <linux/lustre_idl.h>
 #include <linux/md_object.h>
 #include <linux/lustre_fid.h>
+#include <linux/lustre_mdt.h>
 
 struct mdt_device {
         /* super-class */
@@ -185,5 +186,12 @@ void mdt_object_unlock(struct ldlm_namespace *, struct mdt_object *,
 struct mdt_object *mdt_object_find_lock(struct lu_context *, struct mdt_device *,
                                         struct lu_fid *,
                                         struct mdt_lock_handle *, __u64);
+int mdt_reint_unpack(struct mdt_thread_info *info, struct ptlrpc_request *req, int offset,
+                      struct mdt_reint_record *rec);
+
+int mdt_reint_rec(struct mdt_thread_info *, struct mdt_reint_record *,
+                  int, struct ptlrpc_request *,
+                  struct lustre_handle *);
+
 #endif /* __KERNEL__ */
 #endif /* _MDT_H */
diff --git a/lustre/mdt/mdt_lib.c b/lustre/mdt/mdt_lib.c
new file mode 100644 (file)
index 0000000..189de80
--- /dev/null
@@ -0,0 +1,318 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  Copyright (c) 2003 Cluster File Systems, Inc.
+ *
+ *   This file is part of the Lustre file system, http://www.lustre.org
+ *   Lustre is a trademark of Cluster File Systems, Inc.
+ *
+ *   You may have signed or agreed to another license before downloading
+ *   this software.  If so, you are bound by the terms and conditions
+ *   of that agreement, and the following does not apply to you.  See the
+ *   LICENSE file included with this distribution for more information.
+ *
+ *   If you did not agree to a different license, then this copy of Lustre
+ *   is open source software; you can redistribute it and/or modify it
+ *   under the terms of version 2 of the GNU General Public License as
+ *   published by the Free Software Foundation.
+ *
+ *   In either case, Lustre is distributed in the hope that it will be
+ *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
+ *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   license text for more details.
+ */
+
+
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+#define DEBUG_SUBSYSTEM S_MDS
+
+#include <linux/module.h>
+
+/* LUSTRE_VERSION_CODE */
+#include <linux/lustre_ver.h>
+/*
+ * struct OBD_{ALLOC,FREE}*()
+ * OBD_FAIL_CHECK
+ */
+#include <linux/obd_support.h>
+/* struct ptlrpc_request */
+#include <linux/lustre_net.h>
+/* struct obd_export */
+#include <linux/lustre_export.h>
+/* struct obd_device */
+#include <linux/obd.h>
+/* lu2dt_dev() */
+#include <linux/dt_object.h>
+
+
+#include "../mds/mds_internal.h"
+#include "mdt_internal.h"
+
+
+/* unpacking */
+static int mdt_setattr_unpack(struct ptlrpc_request *req, int offset,
+                              struct mdt_reint_record *r)
+{
+        struct iattr *attr = &r->ur_iattr;
+        struct mdt_rec_setattr *rec;
+        ENTRY;
+
+        rec = lustre_swab_reqbuf(req, offset, sizeof(*rec),
+                                 lustre_swab_mdt_rec_setattr);
+        if (rec == NULL)
+                RETURN (-EFAULT);
+
+        r->ur_uc.luc_fsuid = rec->sa_fsuid;
+        r->ur_uc.luc_fsgid = rec->sa_fsgid;
+        r->ur_uc.luc_cap = rec->sa_cap;
+        r->ur_uc.luc_suppgid1 = rec->sa_suppgid;
+        r->ur_uc.luc_suppgid2 = -1;
+        r->ur_fid1 = &rec->sa_fid;
+        attr->ia_valid = rec->sa_valid;
+        attr->ia_mode = rec->sa_mode;
+        attr->ia_uid = rec->sa_uid;
+        attr->ia_gid = rec->sa_gid;
+        attr->ia_size = rec->sa_size;
+        LTIME_S(attr->ia_atime) = rec->sa_atime;
+        LTIME_S(attr->ia_mtime) = rec->sa_mtime;
+        LTIME_S(attr->ia_ctime) = rec->sa_ctime;
+        attr->ia_attr_flags = rec->sa_attr_flags;
+
+        LASSERT_REQSWAB (req, offset + 1);
+        if (req->rq_reqmsg->bufcount > offset + 1) {
+                r->ur_eadata = lustre_msg_buf (req->rq_reqmsg,
+                                               offset + 1, 0);
+                if (r->ur_eadata == NULL)
+                        RETURN (-EFAULT);
+                r->ur_eadatalen = req->rq_reqmsg->buflens[offset + 1];
+        }
+
+        if (req->rq_reqmsg->bufcount > offset + 2) {
+                r->ur_logcookies = lustre_msg_buf(req->rq_reqmsg, offset + 2,0);
+                if (r->ur_eadata == NULL)
+                        RETURN (-EFAULT);
+
+                r->ur_cookielen = req->rq_reqmsg->buflens[offset + 2];
+        }
+
+        RETURN(0);
+}
+
+static int mdt_create_unpack(struct ptlrpc_request *req, int offset,
+                             struct mdt_reint_record *r)
+{
+        struct mdt_rec_create *rec;
+        ENTRY;
+
+        rec = lustre_swab_reqbuf (req, offset, sizeof (*rec),
+                                  lustre_swab_mdt_rec_create);
+        if (rec == NULL)
+                RETURN (-EFAULT);
+
+        r->ur_uc.luc_fsuid = rec->cr_fsuid;
+        r->ur_uc.luc_fsgid = rec->cr_fsgid;
+        r->ur_uc.luc_cap = rec->cr_cap;
+        r->ur_uc.luc_suppgid1 = rec->cr_suppgid;
+        r->ur_uc.luc_suppgid2 = -1;
+        r->ur_fid1 = &rec->cr_fid;
+        r->ur_fid2 = &rec->cr_replayfid;
+        r->ur_mode = rec->cr_mode;
+        r->ur_rdev = rec->cr_rdev;
+        r->ur_time = rec->cr_time;
+        r->ur_flags = rec->cr_flags;
+
+        LASSERT_REQSWAB (req, offset + 1);
+        r->ur_name = lustre_msg_string (req->rq_reqmsg, offset + 1, 0);
+        if (r->ur_name == NULL)
+                RETURN (-EFAULT);
+        r->ur_namelen = req->rq_reqmsg->buflens[offset + 1];
+
+        LASSERT_REQSWAB (req, offset + 2);
+        if (req->rq_reqmsg->bufcount > offset + 2) {
+                /* NB for now, we only seem to pass NULL terminated symlink
+                 * target strings here.  If this ever changes, we'll have
+                 * to stop checking for a buffer filled completely with a
+                 * NULL terminated string here, and make the callers check
+                 * depending on what they expect.  We should probably stash
+                 * it in r->ur_eadata in that case, so it's obvious... -eeb
+                 */
+                r->ur_tgt = lustre_msg_string(req->rq_reqmsg, offset + 2, 0);
+                if (r->ur_tgt == NULL)
+                        RETURN (-EFAULT);
+                r->ur_tgtlen = req->rq_reqmsg->buflens[offset + 2];
+        }
+        RETURN(0);
+}
+
+static int mdt_link_unpack(struct ptlrpc_request *req, int offset,
+                           struct mdt_reint_record *r)
+{
+        struct mdt_rec_link *rec;
+        ENTRY;
+
+        rec = lustre_swab_reqbuf (req, offset, sizeof (*rec),
+                                  lustre_swab_mdt_rec_link);
+        if (rec == NULL)
+                RETURN (-EFAULT);
+
+        r->ur_uc.luc_fsuid = rec->lk_fsuid;
+        r->ur_uc.luc_fsgid = rec->lk_fsgid;
+        r->ur_uc.luc_cap = rec->lk_cap;
+        r->ur_uc.luc_suppgid1 = rec->lk_suppgid1;
+        r->ur_uc.luc_suppgid2 = rec->lk_suppgid2;
+        r->ur_fid1 = &rec->lk_fid1;
+        r->ur_fid2 = &rec->lk_fid2;
+        r->ur_time = rec->lk_time;
+
+        LASSERT_REQSWAB (req, offset + 1);
+        r->ur_name = lustre_msg_string (req->rq_reqmsg, offset + 1, 0);
+        if (r->ur_name == NULL)
+                RETURN (-EFAULT);
+        r->ur_namelen = req->rq_reqmsg->buflens[offset + 1];
+        RETURN(0);
+}
+
+static int mdt_unlink_unpack(struct ptlrpc_request *req, int offset,
+                             struct mdt_reint_record *r)
+{
+        struct mdt_rec_unlink *rec;
+        ENTRY;
+
+        rec = lustre_swab_reqbuf (req, offset, sizeof (*rec),
+                                  lustre_swab_mdt_rec_unlink);
+        if (rec == NULL)
+                RETURN(-EFAULT);
+
+        r->ur_uc.luc_fsuid = rec->ul_fsuid;
+        r->ur_uc.luc_fsgid = rec->ul_fsgid;
+        r->ur_uc.luc_cap = rec->ul_cap;
+        r->ur_uc.luc_suppgid1 = rec->ul_suppgid;
+        r->ur_uc.luc_suppgid2 = -1;
+        r->ur_mode = rec->ul_mode;
+        r->ur_fid1 = &rec->ul_fid1;
+        r->ur_fid2 = &rec->ul_fid2;
+        r->ur_time = rec->ul_time;
+
+        LASSERT_REQSWAB (req, offset + 1);
+        r->ur_name = lustre_msg_string(req->rq_reqmsg, offset + 1, 0);
+        if (r->ur_name == NULL)
+                RETURN(-EFAULT);
+        r->ur_namelen = req->rq_reqmsg->buflens[offset + 1];
+        RETURN(0);
+}
+
+static int mdt_rename_unpack(struct ptlrpc_request *req, int offset,
+                             struct mdt_reint_record *r)
+{
+        struct mdt_rec_rename *rec;
+        ENTRY;
+
+        rec = lustre_swab_reqbuf (req, offset, sizeof (*rec),
+                                  lustre_swab_mdt_rec_rename);
+        if (rec == NULL)
+                RETURN(-EFAULT);
+
+        r->ur_uc.luc_fsuid = rec->rn_fsuid;
+        r->ur_uc.luc_fsgid = rec->rn_fsgid;
+        r->ur_uc.luc_cap = rec->rn_cap;
+        r->ur_uc.luc_suppgid1 = rec->rn_suppgid1;
+        r->ur_uc.luc_suppgid2 = rec->rn_suppgid2;
+        r->ur_fid1 = &rec->rn_fid1;
+        r->ur_fid2 = &rec->rn_fid2;
+        r->ur_time = rec->rn_time;
+
+        LASSERT_REQSWAB (req, offset + 1);
+        r->ur_name = lustre_msg_string(req->rq_reqmsg, offset + 1, 0);
+        if (r->ur_name == NULL)
+                RETURN(-EFAULT);
+        r->ur_namelen = req->rq_reqmsg->buflens[offset + 1];
+
+        LASSERT_REQSWAB (req, offset + 2);
+        r->ur_tgt = lustre_msg_string(req->rq_reqmsg, offset + 2, 0);
+        if (r->ur_tgt == NULL)
+                RETURN(-EFAULT);
+        r->ur_tgtlen = req->rq_reqmsg->buflens[offset + 2];
+        RETURN(0);
+}
+
+static int mdt_open_unpack(struct ptlrpc_request *req, int offset,
+                           struct mdt_reint_record *r)
+{
+        struct mdt_rec_create *rec;
+        ENTRY;
+
+        rec = lustre_swab_reqbuf (req, offset, sizeof (*rec),
+                                  lustre_swab_mdt_rec_create);
+        if (rec == NULL)
+                RETURN (-EFAULT);
+
+        r->ur_uc.luc_fsuid = rec->cr_fsuid;
+        r->ur_uc.luc_fsgid = rec->cr_fsgid;
+        r->ur_uc.luc_cap = rec->cr_cap;
+        r->ur_uc.luc_suppgid1 = rec->cr_suppgid;
+        r->ur_uc.luc_suppgid2 = -1;
+        r->ur_fid1 = &rec->cr_fid;
+        r->ur_fid2 = &rec->cr_replayfid;
+        r->ur_mode = rec->cr_mode;
+        r->ur_rdev = rec->cr_rdev;
+        r->ur_time = rec->cr_time;
+        r->ur_flags = rec->cr_flags;
+
+        LASSERT_REQSWAB (req, offset + 1);
+        r->ur_name = lustre_msg_string (req->rq_reqmsg, offset + 1, 0);
+        if (r->ur_name == NULL)
+                RETURN (-EFAULT);
+        r->ur_namelen = req->rq_reqmsg->buflens[offset + 1];
+
+        LASSERT_REQSWAB (req, offset + 2);
+        if (req->rq_reqmsg->bufcount > offset + 2) {
+                r->ur_eadata = lustre_msg_buf(req->rq_reqmsg, offset + 2, 0);
+                if (r->ur_eadata == NULL)
+                        RETURN (-EFAULT);
+                r->ur_eadatalen = req->rq_reqmsg->buflens[offset + 2];
+        }
+        RETURN(0);
+}
+
+typedef int (*reint_unpacker)(struct ptlrpc_request *req, int offset,
+                               struct mdt_reint_record *r);
+
+static reint_unpacker mdt_reint_unpackers[REINT_MAX] = {
+        [REINT_SETATTR] mdt_setattr_unpack,
+        [REINT_CREATE] mdt_create_unpack,
+        [REINT_LINK] mdt_link_unpack,
+        [REINT_UNLINK] mdt_unlink_unpack,
+        [REINT_RENAME] mdt_rename_unpack,
+        [REINT_OPEN] mdt_open_unpack,
+};
+
+int mdt_reint_unpack(struct mdt_thread_info *info, struct ptlrpc_request *req, int offset,
+                      struct mdt_reint_record *rec)
+{
+        mdt_reint_t opcode, *opcodep;
+        int rc;
+        ENTRY;
+
+        /* NB don't lustre_swab_reqbuf() here.  We're just taking a peek
+         * and we want to leave it to the specific unpacker once we've
+         * identified the message type */
+        opcodep = lustre_msg_buf (req->rq_reqmsg, offset, sizeof (*opcodep));
+        if (opcodep == NULL)
+                RETURN(-EFAULT);
+
+        opcode = *opcodep;
+        if (lustre_msg_swabbed (req->rq_reqmsg))
+                __swab32s (&opcode);
+
+        if (opcode >= REINT_MAX || mdt_reint_unpackers[opcode] == NULL) {
+                CERROR("Unexpected opcode %d\n", opcode);
+                RETURN(-EFAULT);
+        }
+
+        rec->ur_opcode = opcode;
+        rc = mdt_reint_unpackers[opcode](req, offset, rec);
+
+        RETURN(rc);
+}
diff --git a/lustre/mdt/mdt_reint.c b/lustre/mdt/mdt_reint.c
new file mode 100644 (file)
index 0000000..42a13da
--- /dev/null
@@ -0,0 +1,202 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  linux/mdt/mdt_reint.c
+ *  Lustre Metadata Server (mds) reintegration routines
+ *
+ *  Copyright (C) 2002-2005 Cluster File Systems, Inc.
+ *   Author: Peter Braam <braam@clusterfs.com>
+ *   Author: Andreas Dilger <adilger@clusterfs.com>
+ *   Author: Phil Schwan <phil@clusterfs.com>
+ *
+ *   This file is part of the Lustre file system, http://www.lustre.org
+ *   Lustre is a trademark of Cluster File Systems, Inc.
+ *
+ *   You may have signed or agreed to another license before downloading
+ *   this software.  If so, you are bound by the terms and conditions
+ *   of that agreement, and the following does not apply to you.  See the
+ *   LICENSE file included with this distribution for more information.
+ *
+ *   If you did not agree to a different license, then this copy of Lustre
+ *   is open source software; you can redistribute it and/or modify it
+ *   under the terms of version 2 of the GNU General Public License as
+ *   published by the Free Software Foundation.
+ *
+ *   In either case, Lustre is distributed in the hope that it will be
+ *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
+ *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   license text for more details.
+ */
+
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+#define DEBUG_SUBSYSTEM S_MDS
+
+#include <linux/module.h>
+
+/* LUSTRE_VERSION_CODE */
+#include <linux/lustre_ver.h>
+/*
+ * struct OBD_{ALLOC,FREE}*()
+ * OBD_FAIL_CHECK
+ */
+#include <linux/obd_support.h>
+/* struct ptlrpc_request */
+#include <linux/lustre_net.h>
+/* struct obd_export */
+#include <linux/lustre_export.h>
+/* struct obd_device */
+#include <linux/obd.h>
+/* lu2dt_dev() */
+#include <linux/dt_object.h>
+
+
+#include "../mds/mds_internal.h"
+#include "mdt_internal.h"
+
+
+/* object operations */
+static int mdt_md_mkdir(struct mdt_thread_info *info,
+                        struct mdt_reint_record *rec)
+{
+        struct mdt_device      *mdt= info->mti_mdt;
+        struct mdt_object      *parent;
+        struct mdt_object      *child;
+        struct mdt_lock_handle *lh;
+
+        int result;
+
+        lh = &info->mti_lh[MDT_LH_PARENT];
+        lh->mlh_mode = LCK_PW;
+
+        parent = mdt_object_find_lock(info->mti_ctxt,
+                                 mdt, rec->ur_fid1, lh, MDS_INODELOCK_UPDATE);
+        if (IS_ERR(parent))
+                return PTR_ERR(parent);
+
+        child = mdt_object_find(info->mti_ctxt, mdt, rec->ur_fid2);
+        if (!IS_ERR(child)) {
+                struct md_object *next = mdt_object_child(parent);
+
+                result = next->mo_ops->moo_mkdir(info->mti_ctxt, next, rec->ur_name,
+                                                 mdt_object_child(child));
+                mdt_object_put(info->mti_ctxt, child);
+        } else
+                result = PTR_ERR(child);
+        mdt_object_unlock(mdt->mdt_namespace, parent, lh);
+        mdt_object_put(info->mti_ctxt, parent);
+        return result;
+}
+
+
+static int mdt_reint_setattr(struct mdt_thread_info *info,
+                             struct mdt_reint_record *rec, int offset,
+                             struct ptlrpc_request *req,
+                             struct lustre_handle *lh)
+{
+        ENTRY;
+        RETURN (-EOPNOTSUPP);
+}
+
+
+static int mdt_reint_create(struct mdt_thread_info *info,
+                            struct mdt_reint_record *rec, int offset,
+                            struct ptlrpc_request *req,
+                            struct lustre_handle *lh)
+{
+        int rc = 0, type = rec->ur_mode & S_IFMT;
+        
+        ENTRY;
+
+        switch (type) {
+        case S_IFREG:{
+                RETURN (rc = -EOPNOTSUPP);
+                break;
+        }
+        case S_IFDIR:{
+                rc = mdt_md_mkdir(info, rec);
+                break;
+        }
+        case S_IFLNK:{
+                RETURN (rc = -EOPNOTSUPP);
+                break;
+        }
+        case S_IFCHR:
+        case S_IFBLK:
+        case S_IFIFO:
+        case S_IFSOCK:{
+                RETURN (rc = -EOPNOTSUPP);
+                break;
+        }
+        default:
+                CERROR("bad file type %o creating %s\n", type, rec->ur_name);
+        }
+        RETURN (rc);
+}
+
+
+static int mdt_reint_unlink(struct mdt_thread_info *info,
+                            struct mdt_reint_record *rec, int offset,
+                            struct ptlrpc_request *req,
+                            struct lustre_handle *lh)
+{
+        ENTRY;
+        RETURN (-EOPNOTSUPP);
+}
+
+static int mdt_reint_link(struct mdt_thread_info *info,
+                          struct mdt_reint_record *rec, int offset,
+                          struct ptlrpc_request *req,
+                          struct lustre_handle *lh)
+{
+        ENTRY;
+        RETURN (-EOPNOTSUPP);
+}
+
+
+static int mdt_reint_rename(struct mdt_thread_info *info,
+                            struct mdt_reint_record *rec, int offset,
+                            struct ptlrpc_request *req,
+                            struct lustre_handle *lockh)
+{
+        ENTRY;
+        RETURN (-EOPNOTSUPP);
+}
+
+static int mdt_reint_open(struct mdt_thread_info *info,
+                            struct mdt_reint_record *rec, int offset,
+                            struct ptlrpc_request *req,
+                            struct lustre_handle *lockh)
+{
+        ENTRY;
+        RETURN (-EOPNOTSUPP);
+}
+
+
+typedef int (*mdt_reinter)(struct mdt_thread_info *info,
+                           struct mdt_reint_record *, int offset,
+                           struct ptlrpc_request *, struct lustre_handle *);
+
+static mdt_reinter reinters[REINT_MAX] = {
+        [REINT_SETATTR] mdt_reint_setattr,
+        [REINT_CREATE] mdt_reint_create,
+        [REINT_LINK] mdt_reint_link,
+        [REINT_UNLINK] mdt_reint_unlink,
+        [REINT_RENAME] mdt_reint_rename,
+        [REINT_OPEN] mdt_reint_open
+};
+
+int mdt_reint_rec(struct mdt_thread_info *info, struct mdt_reint_record *rec, 
+                  int offset, struct ptlrpc_request *req, 
+                  struct lustre_handle *lockh)
+{
+        int rc;
+        ENTRY;
+        /* checked by unpacker */
+        LASSERT(rec->ur_opcode < REINT_MAX && reinters[rec->ur_opcode] != NULL);
+
+        rc = reinters[rec->ur_opcode] (info, rec, offset, req, lockh);
+
+        RETURN(rc);
+}