Whamcloud - gitweb
Two fixed:
authoradilger <adilger>
Sat, 5 Oct 2002 13:42:39 +0000 (13:42 +0000)
committeradilger <adilger>
Sat, 5 Oct 2002 13:42:39 +0000 (13:42 +0000)
- refcounts on MDS module fixed, and problems with bogus exports
- unsafe list walking on the open file list of the MDS could cause
  random memory problems
- added real handles for open files on the MDS to ensure we don't
  dereference bogus pointers - mike you still need to clean up open
  files on the MDS for disconnect

15 files changed:
lustre/extN/Makefile.am
lustre/include/linux/lustre_idl.h
lustre/include/linux/lustre_lite.h
lustre/include/linux/lustre_mds.h
lustre/include/linux/lustre_net.h
lustre/lib/mds_updates.c
lustre/llite/file.c
lustre/llite/super.c
lustre/mdc/mdc_request.c
lustre/mds/handler.c
lustre/mds/mds_fs.c
lustre/mds/mds_reint.c
lustre/obdclass/class_obd.c
lustre/obdclass/genops.c
lustre/ptlrpc/client.c

index 408ed7d..828f099 100644 (file)
@@ -9,7 +9,7 @@ modulefs_DATA = extN.o
 EXTRA_PROGRAMS = extN
 
 # NOTE: If you are not using a RedHat 12.5 or later kernel, then you need to
-#       apply the following patch first, as it fixes a number of bugs in ext3.
+#       apply the "fixes" patch first, as it fixes a number of bugs in ext3.
 #       It will be applied automatically by the extN build process, or you
 #       can apply it to the source kernel tree and fix ext3 also.  For chaos22
 #       (or other RH < 12.5 kernels) use the "chaos22" patch instead.
index 9746c90..c97e8f7 100644 (file)
@@ -315,17 +315,17 @@ struct ll_fid {
 #define MDS_STATUS_CONN 1
 #define MDS_STATUS_LOV 2
 
-struct mds_status_req { 
+struct mds_status_req {
         __u32  flags;
         __u32  repbuf;
 };
 
-struct mds_fileh_body { 
+struct mds_fileh_body {
         struct ll_fid f_fid;
         struct lustre_handle f_handle;
 };
 
-struct mds_conn_status { 
+struct mds_conn_status {
         struct ll_fid rootfid;
         __u64          xid;
         __u64          last_committed;
@@ -334,13 +334,14 @@ struct mds_conn_status {
 };
 
 struct mds_body {
-        __u32          fsuid;
-        __u32          fsgid;
         struct ll_fid  fid1;
         struct ll_fid  fid2;
+        struct lustre_handle handle;
         __u64          size;
-        __u64          extra; /* should become a lustre_handle */
+        __u32          ino;   /* make this a __u64 */
         __u32          valid;
+        __u32          fsuid;
+        __u32          fsgid;
         __u32          mode;
         __u32          uid;
         __u32          gid;
@@ -348,12 +349,9 @@ struct mds_body {
         __u32          ctime;
         __u32          atime;
         __u32          flags;
-        __u32          major;
-        __u32          minor;
-        __u32          ino;
+        __u32          rdev;
         __u32          nlink;
         __u32          generation;
-        __u32          last_xidnomore;
 };
 
 /* MDS update records */
index 8a1bc82..377a2fb 100644 (file)
@@ -26,7 +26,7 @@
 
 extern kmem_cache_t *ll_file_data_slab;
 struct ll_file_data {
-        __u64 fd_mdshandle;
+        struct lustre_handle fd_mdshandle;
         struct lustre_handle fd_osthandle;
         struct ptlrpc_request *fd_req;
         __u32 fd_flags;
index fd22b23..8b292ae 100644 (file)
@@ -90,16 +90,17 @@ struct mds_client_data {
 /* In-memory access to client data from MDS struct */
 struct mds_export_data {
         struct list_head        med_open_head;
+        spinlock_t              med_open_lock;
         struct mds_client_data *med_mcd;
         int                     med_off;
 };
 
 /* file data for open files on MDS */
 struct mds_file_data {
-        struct list_head  mfd_list;
-        struct file      *mfd_file;
-        __u64             mfd_clientfd;
-        __u32             mfd_clientcookie;
+        struct list_head     mfd_list;
+        struct lustre_handle mfd_clienthandle;
+        __u64                mfd_servercookie;
+        struct file         *mfd_file;
 };
 
 /* mds/mds_reint.c  */
@@ -165,10 +166,10 @@ int mdc_statfs(struct lustre_handle *conn, struct obd_statfs *osfs,
 int mdc_setattr(struct lustre_handle *conn,
                 struct inode *, struct iattr *iattr, struct ptlrpc_request **);
 int mdc_open(struct lustre_handle *conn, obd_id ino, int type, int flags,
-             struct lov_stripe_md *, __u64 cookie,  __u64 *fh,
+             struct lov_stripe_md *, struct lustre_handle *fh,
              struct ptlrpc_request **);
-int mdc_close(struct lustre_handle *conn,
-              obd_id ino, int type, __u64 fh,  struct ptlrpc_request **req);
+int mdc_close(struct lustre_handle *conn, obd_id ino, int type,
+              struct lustre_handle *fh,  struct ptlrpc_request **req);
 int mdc_readpage(struct lustre_handle *conn, obd_id ino,
                  int type, __u64 offset, char *addr, struct ptlrpc_request **);
 int mdc_create(struct lustre_handle *conn,
index 92b5712..7913e1c 100644 (file)
@@ -164,9 +164,9 @@ struct ptlrpc_request {
         struct ptlrpc_connection *rq_connection;
         struct obd_import *rq_import;
         struct ptlrpc_service *rq_svc;
-        
-        void (*rq_replay_cb)(struct ptlrpc_request *, void *);
-        void *rq_replay_cb_data;
+
+        void (*rq_replay_cb)(struct ptlrpc_request *, struct lustre_handle *);
+        struct lustre_handle rq_replay_cb_handle;
 };
 
 struct ptlrpc_bulk_page {
index 00620a1..bd7c640 100644 (file)
@@ -83,8 +83,8 @@ static void mds_pack_body(struct mds_body *b)
 
         mds_pack_fid(&b->fid1);
         mds_pack_fid(&b->fid2);
-        b->extra = HTON__u64(b->extra);
         b->size = HTON__u64(b->size);
+        b->ino = HTON__u32(b->ino);
         b->valid = HTON__u32(b->valid);
         b->mode = HTON__u32(b->mode);
         b->uid = HTON__u32(b->uid);
@@ -93,9 +93,7 @@ static void mds_pack_body(struct mds_body *b)
         b->ctime = HTON__u32(b->ctime);
         b->atime = HTON__u32(b->atime);
         b->flags = HTON__u32(b->flags);
-        b->major = HTON__u32(b->major);
-        b->minor = HTON__u32(b->minor);
-        b->ino = HTON__u32(b->ino);
+        b->rdev = HTON__u32(b->rdev);
         b->nlink = HTON__u32(b->nlink);
         b->generation = HTON__u32(b->generation);
 }
@@ -265,13 +263,13 @@ void mds_unpack_body(struct mds_body *b)
         if (b == NULL)
                 LBUG();
 
-        b->fsuid = NTOH__u32(b->fsuid);
-        b->fsgid = NTOH__u32(b->fsgid);
         mds_unpack_fid(&b->fid1);
         mds_unpack_fid(&b->fid2);
-        b->extra = NTOH__u64(b->extra);
         b->size = NTOH__u64(b->size);
         b->valid = NTOH__u32(b->valid);
+        b->fsuid = NTOH__u32(b->fsuid);
+        b->fsgid = NTOH__u32(b->fsgid);
+        b->ino = NTOH__u32(b->ino);
         b->mode = NTOH__u32(b->mode);
         b->uid = NTOH__u32(b->uid);
         b->gid = NTOH__u32(b->gid);
@@ -279,9 +277,7 @@ void mds_unpack_body(struct mds_body *b)
         b->ctime = NTOH__u32(b->ctime);
         b->atime = NTOH__u32(b->atime);
         b->flags = NTOH__u32(b->flags);
-        b->major = NTOH__u32(b->major);
-        b->minor = NTOH__u32(b->minor);
-        b->ino = NTOH__u32(b->ino);
+        b->rdev = NTOH__u32(b->rdev);
         b->nlink = NTOH__u32(b->nlink);
         b->generation = NTOH__u32(b->generation);
 }
index c8b246d..81e096f 100644 (file)
@@ -27,6 +27,7 @@
 
 #include <linux/lustre_dlm.h>
 #include <linux/lustre_lite.h>
+#include <linux/random.h>
 
 int ll_inode_setattr(struct inode *inode, struct iattr *attr, int do_trunc);
 extern int ll_setattr(struct dentry *de, struct iattr *attr);
@@ -42,8 +43,7 @@ static int ll_file_open(struct inode *inode, struct file *file)
         int rc;
         ENTRY;
 
-        if (file->private_data)
-                LBUG();
+        LASSERT(!file->private_data);
 
         CHECK_MOUNT_EPOCH(inode);
 
@@ -87,15 +87,18 @@ static int ll_file_open(struct inode *inode, struct file *file)
                 GOTO(out, rc = -ENOMEM);
         memset(fd, 0, sizeof(*fd));
 
+        fd->fd_mdshandle.addr = (__u64)(unsigned long)file;
+        get_random_bytes(&fd->fd_mdshandle.cookie,
+                         sizeof(fd->fd_mdshandle.cookie));
         rc = mdc_open(&sbi->ll_mdc_conn, inode->i_ino, S_IFREG | inode->i_mode,
-                      file->f_flags, lsm, (__u64)(unsigned long)file,
-                      &fd->fd_mdshandle, &req);
+                      file->f_flags, lsm, &fd->fd_mdshandle, &req);
         fd->fd_req = req;
         ptlrpc_req_finished(req);
         if (rc)
                 GOTO(out_req, -abs(rc));
-        if (!fd->fd_mdshandle) {
-                CERROR("mdc_open didn't assign fd_mdshandle\n");
+        if (!fd->fd_mdshandle.addr ||
+            fd->fd_mdshandle.addr == (__u64)(unsigned long)file) {
+                CERROR("hmm, mdc_open didn't assign fd_mdshandle?\n");
                 /* XXX handle this how, abort or is it non-fatal? */
         }
 
@@ -108,6 +111,7 @@ static int ll_file_open(struct inode *inode, struct file *file)
         oa->o_mode = S_IFREG;
         oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE;
         rc = obd_open(ll_i2obdconn(inode), oa, lsm);
+        obd_oa2handle(&fd->fd_osthandle, oa);
 
         if (rc)
                 GOTO(out_mdc, rc = -abs(rc));
@@ -121,7 +125,7 @@ static int ll_file_open(struct inode *inode, struct file *file)
         return 0;
 out_mdc:
         mdc_close(&sbi->ll_mdc_conn, inode->i_ino,
-                  S_IFREG, fd->fd_mdshandle, &req);
+                  S_IFREG, &fd->fd_mdshandle, &req);
 out_req:
         ptlrpc_free_req(req);
 //out_fd:
@@ -240,11 +244,11 @@ static int ll_file_release(struct inode *inode, struct file *file)
         struct ll_inode_info *lli = ll_i2info(inode);
 
         ENTRY;
-        
+
         CHECK_MOUNT_EPOCH(inode);
 
         fd = (struct ll_file_data *)file->private_data;
-        if (!fd || !fd->fd_mdshandle) {
+        if (!fd) {
                 LBUG();
                 GOTO(out, rc = -EINVAL);
         }
@@ -253,6 +257,7 @@ static int ll_file_release(struct inode *inode, struct file *file)
         oa.o_id = lli->lli_smd->lsm_object_id;
         oa.o_mode = S_IFREG;
         oa.o_valid = OBD_MD_FLTYPE | OBD_MD_FLID;
+        obd_handle2oa(&oa, &fd->fd_osthandle);
         rc = obd_close(ll_i2obdconn(inode), &oa, lli->lli_smd);
         if (rc)
                 GOTO(out_fd, abs(rc));
@@ -298,7 +303,7 @@ static int ll_file_release(struct inode *inode, struct file *file)
         }
 
         rc = mdc_close(&sbi->ll_mdc_conn, inode->i_ino,
-                       S_IFREG, fd->fd_mdshandle, &req);
+                       S_IFREG, &fd->fd_mdshandle, &req);
         ptlrpc_req_finished(req);
         if (rc) {
                 if (rc > 0)
@@ -477,6 +482,7 @@ ll_file_write(struct file *file, const char *buf, size_t count, loff_t *ppos)
                 oa->o_mode = inode->i_mode;
                 oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE |
                         OBD_MD_FLBLOCKS;
+                obd_handle2oa(oa, &fd->fd_osthandle);
                 retval = obd_getattr(&sbi->ll_osc_conn, oa, lsm);
                 if (retval) {
                         obdo_free(oa);
index ba8a61d..b79facd 100644 (file)
@@ -452,7 +452,7 @@ void ll_update_inode(struct inode *inode, struct mds_body *body)
         if (body->valid & OBD_MD_FLGENER)
                 inode->i_generation = body->generation;
         if (body->valid & OBD_MD_FLRDEV)
-                inode->i_rdev = body->extra;
+                inode->i_rdev = body->rdev;
         if (body->valid & OBD_MD_FLSIZE)
                 inode->i_size = body->size;
 }
index 7e41574..45e4def 100644 (file)
@@ -367,18 +367,17 @@ int mdc_enqueue(struct lustre_handle *conn, int lock_type,
         RETURN(0);
 }
 
-static void mdc_replay_open(struct ptlrpc_request *req, void *data)
+static void mdc_replay_open(struct ptlrpc_request *req,
+                            struct lustre_handle *data)
 {
-        __u64 *fh = data;
-        struct mds_body *body;
-        
-        body = lustre_msg_buf(req->rq_repmsg, 0);
+        struct mds_body *body = lustre_msg_buf(req->rq_repmsg, 0);
+
         mds_unpack_body(body);
-        *fh = body->extra;
+        memcpy(data, &body->handle, sizeof(*data));
 }
 
 int mdc_open(struct lustre_handle *conn, obd_id ino, int type, int flags,
-             struct lov_stripe_md *lsm, __u64 cookie, __u64 *fh,
+             struct lov_stripe_md *lsm, struct lustre_handle *fh,
              struct ptlrpc_request **request)
 {
         struct mds_body *body;
@@ -402,7 +401,7 @@ int mdc_open(struct lustre_handle *conn, obd_id ino, int type, int flags,
 
         ll_ino2fid(&body->fid1, ino, 0, type);
         body->flags = HTON__u32(flags);
-        body->extra = cookie;
+        memcpy(&body->handle, fh, sizeof(body->handle));
 
         if (lsm)
                 lov_packmd(lustre_msg_buf(req->rq_reqmsg, 1), lsm);
@@ -414,12 +413,12 @@ int mdc_open(struct lustre_handle *conn, obd_id ino, int type, int flags,
         if (!rc) {
                 body = lustre_msg_buf(req->rq_repmsg, 0);
                 mds_unpack_body(body);
-                *fh = body->extra;
+                memcpy(fh, &body->handle, sizeof(*fh));
         }
 
         /* If open is replayed, we need to fix up the fh. */
         req->rq_replay_cb = mdc_replay_open;
-        req->rq_replay_cb_data = fh;
+        memcpy(&req->rq_replay_cb_handle, fh, sizeof(req->rq_replay_cb_handle));
 
         EXIT;
  out:
@@ -427,8 +426,8 @@ int mdc_open(struct lustre_handle *conn, obd_id ino, int type, int flags,
         return rc;
 }
 
-int mdc_close(struct lustre_handle *conn,
-              obd_id ino, int type, __u64 fh, struct ptlrpc_request **request)
+int mdc_close(struct lustre_handle *conn, obd_id ino, int type,
+              struct lustre_handle *fh, struct ptlrpc_request **request)
 {
         struct mds_body *body;
         int rc, size = sizeof(*body);
@@ -441,7 +440,7 @@ int mdc_close(struct lustre_handle *conn,
 
         body = lustre_msg_buf(req->rq_reqmsg, 0);
         ll_ino2fid(&body->fid1, ino, 0, type);
-        body->extra = fh;
+        memcpy(&body->handle, fh, sizeof(body->handle));
 
         req->rq_replen = lustre_msg_size(0, NULL);
 
index f4857aa..0ad752b 100644 (file)
@@ -33,6 +33,9 @@
 #include <linux/lustre_dlm.h>
 #include <linux/init.h>
 #include <linux/obd_class.h>
+#include <linux/random.h>
+
+static kmem_cache_t *mds_file_cache;
 
 extern int mds_get_lovtgts(struct obd_device *obd, int tgt_count,
                            obd_uuid_t *uuidarray);
@@ -264,6 +267,7 @@ static int mds_connect(struct lustre_handle *conn, struct obd_device *obd,
                        obd_uuid_t cluuid)
 {
         struct obd_export *exp;
+        struct mds_export_data *med;
         struct mds_client_data *mcd;
         struct list_head *p;
         int rc;
@@ -284,6 +288,7 @@ static int mds_connect(struct lustre_handle *conn, struct obd_device *obd,
                         if (!list_empty(&exp->exp_conn_chain)) {
                                 CERROR("existing uuid/export, list not empty!\n");
                                 spin_unlock(&obd->obd_dev_lock);
+                                /* XXX should we MOD_DEC_USE_COUNT; here? */
                                 RETURN(-EALREADY);
                         }
                         conn->addr = (__u64) (unsigned long)exp;
@@ -293,6 +298,7 @@ static int mds_connect(struct lustre_handle *conn, struct obd_device *obd,
                                cluuid, exp);
                         CDEBUG(D_IOCTL,"connect: addr %Lx cookie %Lx\n",
                                (long long)conn->addr, (long long)conn->cookie);
+                        MOD_DEC_USE_COUNT;
                         RETURN(0);
                 }
         }
@@ -306,15 +312,21 @@ static int mds_connect(struct lustre_handle *conn, struct obd_device *obd,
                 GOTO(out_dec, rc);
         exp = class_conn2export(conn);
         LASSERT(exp);
+        med = &exp->exp_mds_data;
 
         OBD_ALLOC(mcd, sizeof(*mcd));
         if (!mcd) {
                 CERROR("mds: out of memory for client data\n");
                 GOTO(out_export, rc = -ENOMEM);
         }
+
         memcpy(mcd->mcd_uuid, cluuid, sizeof(mcd->mcd_uuid));
-        exp->exp_mds_data.med_mcd = mcd;
-        rc = mds_client_add(&exp->exp_mds_data, -1);
+        med->med_mcd = mcd;
+
+        INIT_LIST_HEAD(&med->med_open_head);
+        spin_lock_init(&med->med_open_lock);
+
+        rc = mds_client_add(med, -1);
         if (rc)
                 GOTO(out_mdc, rc);
 
@@ -332,9 +344,11 @@ out_dec:
 
 static int mds_disconnect(struct lustre_handle *conn)
 {
-        int rc;
         struct obd_export *export = class_conn2export(conn);
+        int rc;
+        ENTRY;
 
+#warning "Mike: we need to close all files opened on med_open_head"
         ldlm_cancel_locks_for_export(export);
         mds_client_free(export);
 
@@ -342,7 +356,7 @@ static int mds_disconnect(struct lustre_handle *conn)
         if (!rc)
                 MOD_DEC_USE_COUNT;
 
-        return rc;
+        RETURN(rc);
 }
 
 static int mds_getstatus(struct ptlrpc_request *req)
@@ -392,7 +406,7 @@ static int mds_getlovinfo(struct ptlrpc_request *req)
 
         desc = lustre_msg_buf(req->rq_repmsg, 0);
         rc = mds_get_lovdesc(req->rq_obd, desc);
-        if (rc != 0 ) {
+        if (rc) {
                 CERROR("mds_get_lovdesc error %d", rc);
                 req->rq_status = rc;
                 RETURN(0);
@@ -664,44 +678,99 @@ out:
         RETURN(0);
 }
 
+static struct mds_file_data *mds_handle2mfd(struct lustre_handle *handle)
+{
+        struct mds_file_data *mfd = NULL;
+
+        if (!handle || !handle->addr)
+                RETURN(NULL);
+
+        mfd = (struct mds_file_data *)(unsigned long)(handle->addr);
+        if (!kmem_cache_validate(mds_file_cache, mfd))
+                RETURN(NULL);
+
+        if (mfd->mfd_servercookie != handle->cookie)
+                RETURN(NULL);
+
+        return mfd;
+}
+
+static int mds_store_ea(struct mds_obd *mds, struct ptlrpc_request *req,
+                        struct mds_body *body, struct dentry *de,
+                        struct lov_mds_md *lmm)
+{
+        struct obd_run_ctxt saved;
+        struct obd_ucred uc;
+        void *handle;
+        int rc, rc2;
+
+        uc.ouc_fsuid = body->fsuid;
+        uc.ouc_fsgid = body->fsgid;
+        push_ctxt(&saved, &mds->mds_ctxt, &uc);
+        handle = mds_fs_start(mds, de->d_inode, MDS_FSOP_SETATTR);
+        if (!handle)
+                GOTO(out_ea, rc = -ENOMEM);
+
+        rc = mds_fs_set_md(mds, de->d_inode, handle, lmm);
+        if (!rc)
+                rc = mds_update_last_rcvd(mds, handle, req);
+
+        rc2 = mds_fs_commit(mds, de->d_inode, handle);
+        if (rc2 && !rc)
+                rc = rc2;
+out_ea:
+        pop_ctxt(&saved);
+
+        return rc;
+}
+
 static int mds_open(struct ptlrpc_request *req)
 {
-        struct dentry *de;
-        struct inode *inode;
+        struct mds_obd *mds = mds_req2mds(req);
         struct mds_body *body;
+        struct mds_export_data *med;
+        struct mds_file_data *mfd;
+        struct dentry *de;
         struct file *file;
         struct vfsmount *mnt;
-        struct mds_obd *mds = mds_req2mds(req);
-        struct mds_export_data *med;
         __u32 flags;
         struct list_head *tmp;
-        struct mds_file_data *mfd;
         int rc, size = sizeof(*body);
         ENTRY;
 
-        rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
-        if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK)) {
-                CERROR("mds: out of memory\n");
+        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK)) {
+                CERROR("test case OBD_FAIL_MDS_OPEN_PACK\n");
                 req->rq_status = -ENOMEM;
-                RETURN(0);
+                RETURN(-ENOMEM);
+        }
+
+        rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
+        if (rc) {
+                CERROR("mds: pack error: rc = %d\n", rc);
+                req->rq_status = rc;
+                RETURN(rc);
         }
 
         body = lustre_msg_buf(req->rq_reqmsg, 0);
 
-        /* was this animal open already? */
-        /* XXX we should only check on re-open, or do a refcount... */
+        /* was this animal open already and the client lost the reply? */
+        /* XXX need some way to detect a reopen, to avoid locked list walks */
         med = &req->rq_export->exp_mds_data;
+        spin_lock(&med->med_open_lock);
         list_for_each(tmp, &med->med_open_head) {
-                struct mds_file_data *fd;
-                fd = list_entry(tmp, struct mds_file_data, mfd_list);
-                if (body->extra == fd->mfd_clientfd &&
-                    body->fid1.id == fd->mfd_file->f_dentry->d_inode->i_ino) {
+                mfd = list_entry(tmp, typeof(*mfd), mfd_list);
+                if (!memcmp(&mfd->mfd_clienthandle, &body->handle,
+                            sizeof(mfd->mfd_clienthandle)) &&
+                    body->fid1.id == mfd->mfd_file->f_dentry->d_inode->i_ino) {
                         CERROR("Re opening "LPD64"\n", body->fid1.id);
-                        RETURN(0);
+                        de = mfd->mfd_file->f_dentry;
+                        spin_unlock(&med->med_open_lock);
+                        GOTO(out_pack, rc = 0);
                 }
         }
+        spin_unlock(&med->med_open_lock);
 
-        OBD_ALLOC(mfd, sizeof(*mfd));
+        mfd = kmem_cache_alloc(mds_file_cache, GFP_KERNEL);
         if (!mfd) {
                 CERROR("mds: out of memory\n");
                 req->rq_status = -ENOMEM;
@@ -709,72 +778,56 @@ static int mds_open(struct ptlrpc_request *req)
         }
 
         de = mds_fid2dentry(mds, &body->fid1, &mnt);
-        if (IS_ERR(de)) {
-                req->rq_status = -ENOENT;
-                RETURN(0);
-        }
-
-        inode = de->d_inode;
+        if (IS_ERR(de))
+                GOTO(out_free, rc = PTR_ERR(de));
 
         /* check if this inode has seen a delayed object creation */
         if (req->rq_reqmsg->bufcount > 1) {
-                void *handle;
-                struct lov_mds_md *lmm;
-                struct obd_run_ctxt saved;
-                struct obd_ucred uc;
-                int rc, rc2;
-
-                lmm = lustre_msg_buf(req->rq_reqmsg, 1);
-
-                uc.ouc_fsuid = body->fsuid;
-                uc.ouc_fsgid = body->fsgid;
-                push_ctxt(&saved, &mds->mds_ctxt, &uc);
-                handle = mds_fs_start(mds, de->d_inode, MDS_FSOP_SETATTR);
-                if (!handle) {
-                        pop_ctxt(&saved);
-                        GOTO(out_md, rc = -ENOMEM);
-                }
-
-                rc = mds_fs_set_md(mds, inode, handle, lmm);
-                if (!rc)
-                        rc = mds_update_last_rcvd(mds, handle, req);
+                struct lov_mds_md *lmm = lustre_msg_buf(req->rq_reqmsg, 1);
 
-                rc2 = mds_fs_commit(mds, inode, handle);
-                if (rc2 && !rc)
-                        rc = rc2;
-                pop_ctxt(&saved);
+                rc = mds_store_ea(mds, req, body, de, lmm);
                 if (rc) {
-out_md:
-                        req->rq_status = rc;
                         l_dput(de);
                         mntput(mnt);
-                        RETURN(0);
+                        GOTO(out_free, rc);
                 }
         }
 
         flags = body->flags;
+        /* dentry_open does a dput(de) and mntput(mnt) on error */
         file = dentry_open(de, mnt, flags & ~O_DIRECT);
         if (IS_ERR(file)) {
-                req->rq_status = PTR_ERR(file);
-                OBD_FREE(mfd, sizeof(*mfd));
-                RETURN(0);
+                rc = PTR_ERR(file);
+                GOTO(out_free, 0);
         }
 
         file->private_data = mfd;
         mfd->mfd_file = file;
-        mfd->mfd_clientfd = body->extra;
+        memcpy(&mfd->mfd_clienthandle, &body->handle, sizeof(body->handle));
+        get_random_bytes(&mfd->mfd_servercookie, sizeof(mfd->mfd_servercookie));
+        spin_lock(&med->med_open_lock);
         list_add(&mfd->mfd_list, &med->med_open_head);
+        spin_unlock(&med->med_open_lock);
 
+out_pack:
         body = lustre_msg_buf(req->rq_repmsg, 0);
-        mds_pack_inode2fid(&body->fid1, inode);
-        mds_pack_inode2body(body, inode);
-        /* FIXME: need to have cookies involved here */
-        body->extra = (__u64) (unsigned long)file;
+        mds_pack_inode2fid(&body->fid1, de->d_inode);
+        mds_pack_inode2body(body, de->d_inode);
+        body->handle.addr = (__u64)(unsigned long)mfd;
+        body->handle.cookie = mfd->mfd_servercookie;
+        CDEBUG(D_INODE, "llite file "LPX64": addr %p, cookie "LPX64"\n",
+               mfd->mfd_clienthandle.addr, mfd, mfd->mfd_servercookie);
+        RETURN(0);
+
+out_free:
+        kmem_cache_free(mds_file_cache, mfd);
+        req->rq_status = rc;
         RETURN(0);
 }
 
 static int mds_close(struct ptlrpc_request *req)
 {
+        struct mds_export_data *med = &req->rq_export->exp_mds_data;
         struct mds_body *body;
         struct file *file;
         struct mds_file_data *mfd;
@@ -783,13 +836,21 @@ static int mds_close(struct ptlrpc_request *req)
 
         body = lustre_msg_buf(req->rq_reqmsg, 0);
 
-        /* FIXME: need to have cookies involved here */
-        file = (struct file *)(unsigned long)body->extra;
-        if (!file->f_dentry)
-                LBUG();
-        mfd = (struct mds_file_data *)file->private_data;
+        mfd = mds_handle2mfd(&body->handle);
+        if (!mfd) {
+                CERROR("no handle for file close "LPD64
+                       ": addr "LPX64", cookie "LPX64"\n",
+                       body->fid1.id, body->handle.addr, body->handle.cookie);
+                RETURN(-ESTALE);
+        }
+
+        file = mfd->mfd_file;
+        LASSERT(file->private_data == mfd);
+
+        spin_lock(&med->med_open_lock);
         list_del(&mfd->mfd_list);
-        OBD_FREE(mfd, sizeof(*mfd));
+        spin_unlock(&med->med_open_lock);
+        kmem_cache_free(mds_file_cache, mfd);
 
         req->rq_status = filp_close(file, 0);
 
@@ -889,8 +950,7 @@ int mds_handle(struct ptlrpc_request *req)
         if (req->rq_reqmsg->opc != MDS_CONNECT && req->rq_export == NULL)
                 GOTO(out, rc = -ENOTCONN);
 
-        if (strcmp(req->rq_obd->obd_type->typ_name, "mds") != 0)
-                GOTO(out, rc = -EINVAL);
+        LASSERT(!strcmp(req->rq_obd->obd_type->typ_name, "mds"));
 
         switch (req->rq_reqmsg->opc) {
         case MDS_CONNECT:
@@ -1371,6 +1431,12 @@ static struct obd_ops mds_obd_ops = {
 
 static int __init mds_init(void)
 {
+        mds_file_cache = kmem_cache_create("ll_mds_file_data",
+                                           sizeof(struct mds_file_data),
+                                           0, 0, NULL, NULL);
+        if (mds_file_cache == NULL)
+                return -ENOMEM;
+
         class_register_type(&mds_obd_ops, LUSTRE_MDS_NAME);
         ldlm_register_intent(ldlm_intent_policy);
         return 0;
@@ -1380,6 +1446,8 @@ static void __exit mds_exit(void)
 {
         ldlm_unregister_intent();
         class_unregister_type(LUSTRE_MDS_NAME);
+        if (kmem_cache_destroy(mds_file_cache))
+                CERROR("couldn't free MDS file cache\n");
 }
 
 MODULE_AUTHOR("Cluster File Systems <info@clusterfs.com>");
index 8a88f9b..3975f4f 100644 (file)
@@ -174,18 +174,29 @@ static int mds_read_last_rcvd(struct obd_device *obddev, struct file *f)
 
                 last_rcvd = le64_to_cpu(mcd->mcd_last_rcvd);
 
-                /* Do client recovery here (open files, etc) */
+                /* The exports are cleaned up by mds_disconnect, so they
+                 * need to be set up like real exports also.
+                 */
                 if (last_rcvd && (last_mount - le64_to_cpu(mcd->mcd_mount_count)
                                   < MDS_MOUNT_RECOV)) {
-                        struct obd_export *export = class_new_export(obddev);
-                        if (!export) {
+                        struct obd_export *exp = class_new_export(obddev);
+                        struct mds_export_data *med;
+
+                        if (!exp) {
                                 rc = -ENOMEM;
                                 break;
                         }
-                        export->exp_mds_data.med_mcd = mcd;
-                        mds_client_add(&export->exp_mds_data, cl_off);
+
+                        med = &exp->exp_mds_data;
+                        med->med_mcd = mcd;
+                        mds_client_add(med, cl_off);
+                        /* XXX put this in a helper if it gets more complex */
+                        INIT_LIST_HEAD(&med->med_open_head);
+                        spin_lock_init(&med->med_open_lock);
+
                         mcd = NULL;
                         clients++;
+                        MOD_INC_USE_COUNT;
                 } else {
                         CDEBUG(D_INFO,
                                "ignored client %d, UUID '%s', last_mount %Ld\n",
index 7e2f0f1..9c510c1 100644 (file)
@@ -440,7 +440,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
 
         name = lustre_msg_buf(req->rq_reqmsg, offset + 1);
         namelen = req->rq_reqmsg->buflens[offset + 1] - 1;
-#warning FIXME: if mds_name2locked_dentry decrefs this lock, we must not
+#warning "FIXME: if mds_name2locked_dentry decrefs this lock, we must not"
         memcpy(&child_lockh, &lockh, sizeof(child_lockh));
         dchild = mds_name2locked_dentry(obd, de, NULL, name, namelen,
                                         LCK_EX, &child_lockh, lock_mode);
index b4296df..29d6f02 100644 (file)
@@ -64,7 +64,7 @@ extern struct obd_type *class_nm_to_type(char *nm);
 #ifdef LPROCFS_EXISTS
 
 /*
- * Common SNMP namespace         
+ * Common SNMP namespace
  */
 
 char *snmp_dir_nm[] = {
@@ -725,8 +725,8 @@ static int obd_class_ioctl (struct inode * inode, struct file * filp,
                                 }
                                 memcpy(obd->obd_uuid, data->ioc_inlbuf3, len);
                         }
+
                         /* Get the LprocFS namespace for this device class */
-                        
                         l_idx = lprocfs_get_nm(data->ioc_inlbuf1, obd_nm);
                         if (l_idx < 0) {
                                 CERROR("Non-existent device class"
@@ -735,7 +735,7 @@ static int obd_class_ioctl (struct inode * inode, struct file * filp,
                                 lprocfs_reg_dev(obd, obd_nm[l_idx].obd_names,
                                                 obd_nm[l_idx].cntr_blk_sz);
                         }
-                        
+
                         CDEBUG(D_IOCTL, "MOD_INC_USE for attach: count = %d\n",
                                atomic_read(&(THIS_MODULE)->uc.usecount));
                         MOD_INC_USE_COUNT;
@@ -761,11 +761,10 @@ static int obd_class_ioctl (struct inode * inode, struct file * filp,
                         GOTO(out, err=-EBUSY);
                 }
 
-                
                 if (lprocfs_dereg_dev(obd) != LPROCFS_SUCCESS) {
                         CERROR("Could not remove /proc entry\n");
                 }
-                
+
                 if (obd->obd_name) {
                         OBD_FREE(obd->obd_name, strlen(obd->obd_name)+1);
                         obd->obd_name = NULL;
index 2649663..30e7574 100644 (file)
@@ -369,8 +369,7 @@ struct obd_export *class_new_export(struct obd_device *obddev)
         memset(export, 0, sizeof(*export));
         get_random_bytes(&export->exp_cookie, sizeof(export->exp_cookie));
         export->exp_obd = obddev;
-        /* XXX should these be in MDS and LDLM init functions? */
-        INIT_LIST_HEAD(&export->exp_mds_data.med_open_head);
+        /* XXX this should be in LDLM init */
         INIT_LIST_HEAD(&export->exp_ldlm_data.led_held_locks);
         INIT_LIST_HEAD(&export->exp_conn_chain);
         spin_lock(&obddev->obd_dev_lock);
index a09af9e..aef1424 100644 (file)
@@ -443,7 +443,7 @@ void ptlrpc_restart_req(struct ptlrpc_request *req)
 static int expired_request(void *data)
 {
         struct ptlrpc_request *req = data;
-        
+
         ENTRY;
         CERROR("req xid "LPD64" op %d: timeout on conn to %s:%d\n",
                (unsigned long long)req->rq_xid, req->rq_reqmsg->opc,
@@ -666,7 +666,7 @@ int ptlrpc_replay_req(struct ptlrpc_request *req)
         }
 
         if (req->rq_replay_cb)
-                req->rq_replay_cb(req, req->rq_replay_cb_data);
+                req->rq_replay_cb(req, &req->rq_replay_cb_handle);
 
  out:
         RETURN(rc);