Whamcloud - gitweb
- merge 0.7rc1 from b_devel to HEAD (20030612 merge point)
[fs/lustre-release.git] / lustre / mds / mds_open.c
index 50ca592..d83e4ee 100644 (file)
 #include <linux/obd_class.h>
 #include <linux/random.h>
 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
-#include <linux/buffer_head.h>
-#include <linux/workqueue.h>
+# include <linux/buffer_head.h>
+# include <linux/workqueue.h>
 #else
-#include <linux/locks.h>
+# include <linux/locks.h>
 #endif
 #include <linux/obd_lov.h>
 #include <linux/lustre_mds.h>
 #include <linux/lustre_fsfilt.h>
 #include <linux/lprocfs_status.h>
 
-extern kmem_cache_t *mds_file_cache;
+#include "mds_internal.h"
+
 extern inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req);
 int mds_finish_transno(struct mds_obd *mds, struct inode *i, void *handle,
                        struct ptlrpc_request *req, int rc, __u32 op_data);
@@ -57,7 +58,53 @@ extern int enqueue_ordered_locks(int lock_mode, struct obd_device *obd,
                                  struct lustre_handle *c1_lockh,
                                  struct lustre_handle *c2_lockh);
 
-void reconstruct_open(struct mds_update_record *rec, struct ptlrpc_request *req,
+struct mds_file_data *mds_dentry_open(struct dentry *dentry,
+                                      struct vfsmount *mnt,
+                                      int flags,
+                                      struct ptlrpc_request *req)
+{
+        struct mds_export_data *med = &req->rq_export->exp_mds_data;
+        struct inode *inode;
+        int mode;
+        struct mds_file_data *mfd;
+        int error;
+
+        mfd = mds_mfd_new();
+        if (!mfd) {
+                CERROR("mds: out of memory\n");
+                GOTO(cleanup_dentry, error = -ENOMEM);
+        }
+
+        mode = (flags+1) & O_ACCMODE;
+        inode = dentry->d_inode;
+
+        if (mode & FMODE_WRITE) {
+                error = get_write_access(inode);
+                if (error)
+                        goto cleanup_mfd;
+        }
+
+        mfd->mfd_mode = mode;
+        mfd->mfd_dentry = dentry;
+        mfd->mfd_xid = req->rq_xid;
+
+        spin_lock(&med->med_open_lock);
+        list_add(&mfd->mfd_list, &med->med_open_head);
+        spin_unlock(&med->med_open_lock);
+        mds_mfd_put(mfd);
+        return mfd;
+
+cleanup_mfd:
+        mds_mfd_put(mfd);
+        mds_mfd_destroy(mfd);
+cleanup_dentry:
+        dput(dentry);
+        mntput(mnt);
+        return ERR_PTR(error);
+}
+
+void reconstruct_open(struct mds_update_record *rec, int offset,
+                      struct ptlrpc_request *req,
                       struct lustre_handle *child_lockh)
 {
         struct mds_export_data *med = &req->rq_export->exp_mds_data;
@@ -66,21 +113,23 @@ void reconstruct_open(struct mds_update_record *rec, struct ptlrpc_request *req,
         struct mds_file_data *mfd;
         struct obd_device *obd = req->rq_export->exp_obd;
         struct dentry *parent, *child;
-        struct ldlm_reply *rep = lustre_msg_buf(req->rq_repmsg, 0);
-        struct mds_body *body = lustre_msg_buf(req->rq_repmsg, 1);
+        struct ldlm_reply *rep;
+        struct mds_body *body;
         int disp, rc;
         ENTRY;
 
-        ENTRY;
+        LASSERT(offset == 2);                  /* only called via intent */
+        rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
+        body = lustre_msg_buf(req->rq_repmsg, 1, sizeof (*body));
 
         /* copy rc, transno and disp; steal locks */
         req->rq_transno = mcd->mcd_last_transno;
         req->rq_status = mcd->mcd_last_result;
         disp = rep->lock_policy_res1 = mcd->mcd_last_data;
-        
-        if (med->med_outstanding_reply)
-                mds_steal_ack_locks(med, req);
-        
+
+        if (req->rq_export->exp_outstanding_reply)
+                mds_steal_ack_locks(req->rq_export, req);
+
         /* We never care about these. */
         disp &= ~(IT_OPEN_LOOKUP | IT_OPEN_POS | IT_OPEN_NEG);
         if (!disp) {
@@ -91,10 +140,9 @@ void reconstruct_open(struct mds_update_record *rec, struct ptlrpc_request *req,
         parent = mds_fid2dentry(mds, rec->ur_fid1, NULL);
         LASSERT(!IS_ERR(parent));
 
-        child = lookup_one_len(lustre_msg_buf(req->rq_reqmsg, 3),
-                               parent, req->rq_reqmsg->buflens[3] - 1);
+        child = ll_lookup_one_len(rec->ur_name, parent, rec->ur_namelen - 1);
         LASSERT(!IS_ERR(child));
-        
+
         if (!child->d_inode) {
                 GOTO(out_dput, 0); /* child not present to open */
         }
@@ -108,12 +156,8 @@ void reconstruct_open(struct mds_update_record *rec, struct ptlrpc_request *req,
                 GOTO(out_dput, 0);
         }
 
-        if (!med->med_outstanding_reply) {
-                LBUG(); /* XXX need to get enqueue client lock */
-        }
-
         /* get lock (write for O_CREAT, read otherwise) */
-        
+
         mds_pack_inode2fid(&body->fid1, child->d_inode);
         mds_pack_inode2body(body, child->d_inode);
         if (S_ISREG(child->d_inode->i_mode)) {
@@ -127,7 +171,7 @@ void reconstruct_open(struct mds_update_record *rec, struct ptlrpc_request *req,
 
         /* If we're opening a file without an EA, change to a write
            lock (unless we already have one). */
-                   
+
         /* If we have -EEXIST as the status, and we were asked to create
          * exclusively, we can tell we failed because the file already existed.
          */
@@ -150,7 +194,7 @@ void reconstruct_open(struct mds_update_record *rec, struct ptlrpc_request *req,
                 GOTO(out_dput, 0);
         }
 
-        if (med->med_outstanding_reply) {
+        if (req->rq_export->exp_outstanding_reply) {
                 struct list_head *t;
                 mfd = NULL;
                 /* XXX can we just look in the old reply to find the handle in
@@ -164,28 +208,16 @@ void reconstruct_open(struct mds_update_record *rec, struct ptlrpc_request *req,
                 /* if we're not recovering, it had better be found */
                 LASSERT(mfd);
         } else {
-                struct file *file;
-                mfd = kmem_cache_alloc(mds_file_cache, GFP_KERNEL);
+                mntget(mds->mds_vfsmnt);
+                mfd = mds_dentry_open(child, mds->mds_vfsmnt,
+                                   rec->ur_flags & ~(O_DIRECT | O_TRUNC), req);
                 if (!mfd) {
                         CERROR("mds: out of memory\n");
                         GOTO(out_dput, req->rq_status = -ENOMEM);
                 }
-                mntget(mds->mds_vfsmnt);
-                file = dentry_open(child, mds->mds_vfsmnt,
-                                   rec->ur_flags & ~(O_DIRECT | O_TRUNC));
-                LASSERT(!IS_ERR(file)); /* XXX -ENOMEM? */
-                file->private_data = mfd;
-                mfd->mfd_file = file;
-                mfd->mfd_xid = req->rq_xid;
-                get_random_bytes(&mfd->mfd_servercookie,
-                                 sizeof(mfd->mfd_servercookie));
-                spin_lock(&med->med_open_lock);
-                list_add(&mfd->mfd_list, &med->med_open_head);
-                spin_unlock(&med->med_open_lock);
         }
-                
-        body->handle.addr = (__u64)(unsigned long)mfd;
-        body->handle.cookie = mfd->mfd_servercookie;
+
+        body->handle.cookie = mfd->mfd_handle.h_cookie;
 
  out_dput:
         l_dput(child);
@@ -196,11 +228,13 @@ void reconstruct_open(struct mds_update_record *rec, struct ptlrpc_request *req,
 int mds_open(struct mds_update_record *rec, int offset,
              struct ptlrpc_request *req, struct lustre_handle *child_lockh)
 {
+        static const char acc_table [] = {[O_RDONLY] MAY_READ,
+                                          [O_WRONLY] MAY_WRITE,
+                                          [O_RDWR]   MAY_READ | MAY_WRITE};
         struct mds_obd *mds = mds_req2mds(req);
         struct obd_device *obd = req->rq_export->exp_obd;
-        struct ldlm_reply *rep = lustre_msg_buf(req->rq_repmsg, 0);
-        struct file *file;
-        struct mds_body *body = lustre_msg_buf(req->rq_repmsg, 1);
+        struct ldlm_reply *rep;
+        struct mds_body *body;
         struct dentry *dchild = NULL, *parent;
         struct mds_export_data *med;
         struct mds_file_data *mfd = NULL;
@@ -209,9 +243,14 @@ int mds_open(struct mds_update_record *rec, int offset,
         int rc = 0, parent_mode, child_mode = LCK_PR, lock_flags, created = 0;
         int cleanup_phase = 0;
         void *handle = NULL;
+        int acc_mode;
         ENTRY;
 
-        MDS_CHECK_RESENT(req, reconstruct_open(rec, req, child_lockh));
+        LASSERT(offset == 2);                  /* only called via intent */
+        rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
+        body = lustre_msg_buf(req->rq_repmsg, 1, sizeof (*body));
+
+        MDS_CHECK_RESENT(req, reconstruct_open(rec, offset, req, child_lockh));
 
         med = &req->rq_export->exp_mds_data;
         rep->lock_policy_res1 |= IT_OPEN_LOOKUP;
@@ -221,6 +260,12 @@ int mds_open(struct mds_update_record *rec, int offset,
                 RETURN(-ENOMEM);
         }
 
+        if ((rec->ur_flags & O_ACCMODE) >= sizeof (acc_table))
+                RETURN(-EINVAL);
+        acc_mode = acc_table [rec->ur_flags & O_ACCMODE];
+        if ((rec->ur_flags & O_TRUNC) != 0)
+                acc_mode |= MAY_WRITE;
+
         /* Step 1: Find and lock the parent */
         parent_mode = (rec->ur_flags & O_CREAT) ? LCK_PW : LCK_PR;
         parent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, parent_mode,
@@ -235,8 +280,7 @@ int mds_open(struct mds_update_record *rec, int offset,
         cleanup_phase = 1; /* parent dentry and lock */
 
         /* Step 2: Lookup the child */
-        dchild = lookup_one_len(lustre_msg_buf(req->rq_reqmsg, 3),
-                                parent, req->rq_reqmsg->buflens[3] - 1);
+        dchild = ll_lookup_one_len(rec->ur_name, parent, rec->ur_namelen - 1);
         if (IS_ERR(dchild))
                 GOTO(cleanup, rc = PTR_ERR(dchild));
 
@@ -267,6 +311,7 @@ int mds_open(struct mds_update_record *rec, int offset,
                         GOTO(cleanup, rc);
                 created = 1;
                 child_mode = LCK_PW;
+                acc_mode = 0;                  /* Don't check for permissions */
         }
 
         /* Step 4: It's positive, so lock the child */
@@ -277,7 +322,7 @@ int mds_open(struct mds_update_record *rec, int offset,
         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
                               child_res_id, LDLM_PLAIN, NULL, 0, child_mode,
                               &lock_flags, ldlm_completion_ast,
-                              mds_blocking_ast, NULL, NULL, child_lockh);
+                              mds_blocking_ast, NULL, child_lockh);
         if (rc != ELDLM_OK) {
                 CERROR("ldlm_cli_enqueue: %d\n", rc);
                 GOTO(cleanup, rc = -EIO);
@@ -287,21 +332,32 @@ int mds_open(struct mds_update_record *rec, int offset,
 
         mds_pack_inode2fid(&body->fid1, dchild->d_inode);
         mds_pack_inode2body(body, dchild->d_inode);
+
         if (S_ISREG(dchild->d_inode->i_mode)) {
+                /* Check permissions etc */
+                rc = permission(dchild->d_inode, acc_mode);
+                if (rc != 0)
+                        GOTO(cleanup, rc);
+
+                /* Can't write to a read-only file */
+                if (IS_RDONLY(dchild->d_inode) && (acc_mode & MAY_WRITE) != 0)
+                        GOTO(cleanup, rc = -EPERM);
+
+                /* An append-only file must be opened in append mode for
+                 * writing */
+                if (IS_APPEND(dchild->d_inode) &&
+                    (acc_mode & MAY_WRITE) != 0 &&
+                    ((rec->ur_flags & O_APPEND) == 0 ||
+                     (rec->ur_flags & O_TRUNC) != 0))
+                        GOTO (cleanup, rc = -EPERM);
+
                 rc = mds_pack_md(obd, req->rq_repmsg, 2, body, dchild->d_inode);
                 if (rc)
                         GOTO(cleanup, rc);
-        } else {
-                /* If this isn't a regular file, we can't open it. */
-
-                /* We want to drop the child dentry, because we're not returning
-                 * failure (which would do this for us in step 2), and we're not
-                 * handing it off to the open file in dentry_open. */
-                l_dput(dchild);
-                GOTO(cleanup, rc = 0); /* returns the lock to the client */
         }
 
-        if (!created && (rec->ur_flags & O_CREAT) && (rec->ur_flags & O_EXCL)) {
+        if (!created && (rec->ur_flags & O_CREAT) &&
+            (rec->ur_flags & O_EXCL)) {
                 /* File already exists, we didn't just create it, and we
                  * were passed O_EXCL; err-or. */
                 GOTO(cleanup, rc = -EEXIST); // returns a lock to the client
@@ -309,43 +365,33 @@ int mds_open(struct mds_update_record *rec, int offset,
 
         /* If we're opening a file without an EA, the client needs a write
          * lock. */
-        if (child_mode != LCK_PW && !(body->valid & OBD_MD_FLEASIZE)) {
+        if (S_ISREG(dchild->d_inode->i_mode) &&
+            child_mode != LCK_PW && !(body->valid & OBD_MD_FLEASIZE)) {
                 ldlm_lock_decref(child_lockh, child_mode);
                 child_mode = LCK_PW;
                 goto reacquire;
         }
 
-        /* Step 5: Open it */
+        /* if we are following a symlink, don't open */
+        if (S_ISLNK(dchild->d_inode->i_mode))
+                GOTO(cleanup, rc = 0);
+
+        /* Step 5: mds_open it */
         rep->lock_policy_res1 |= IT_OPEN_OPEN;
-        mfd = kmem_cache_alloc(mds_file_cache, GFP_KERNEL);
+
+        /* dentry_open does a dput(de) and mntput(mds->mds_vfsmnt) on error */
+        mfd = mds_dentry_open(dchild, mds->mds_vfsmnt,
+                              rec->ur_flags & ~(O_DIRECT | O_TRUNC), req);
         if (!mfd) {
                 CERROR("mds: out of memory\n");
+                dchild = NULL; /* prevent a double dput in step 2 */
                 GOTO(cleanup, rc = -ENOMEM);
         }
 
         cleanup_phase = 4; /* mfd allocated */
-
-        /* dentry_open does a dput(de) and mntput(mds->mds_vfsmnt) on error */
-        mntget(mds->mds_vfsmnt);
-        file = dentry_open(dchild, mds->mds_vfsmnt,
-                           rec->ur_flags & ~(O_DIRECT | O_TRUNC));
-        if (IS_ERR(file)) {
-                dchild = NULL; /* prevent a double dput in step 2 */
-                GOTO(cleanup, rc = PTR_ERR(file));
-        }
-
-        file->private_data = mfd;
-        mfd->mfd_file = file;
-        mfd->mfd_xid = req->rq_xid;
-        get_random_bytes(&mfd->mfd_servercookie, sizeof(mfd->mfd_servercookie));
-        spin_lock(&med->med_open_lock);
-        list_add(&mfd->mfd_list, &med->med_open_head);
-        spin_unlock(&med->med_open_lock);
-
-        body->handle.addr = (__u64)(unsigned long)mfd;
-        body->handle.cookie = mfd->mfd_servercookie;
-        CDEBUG(D_INODE, "file %p: mfd %p, cookie "LPX64"\n",
-               mfd->mfd_file, mfd, mfd->mfd_servercookie);
+        body->handle.cookie = mfd->mfd_handle.h_cookie;
+        CDEBUG(D_INODE, "mfd %p, cookie "LPX64"\n", mfd,
+               mfd->mfd_handle.h_cookie);
         GOTO(cleanup, rc = 0); /* returns a lock to the client */
 
  cleanup:
@@ -353,18 +399,18 @@ int mds_open(struct mds_update_record *rec, int offset,
                                 req, rc, rep->lock_policy_res1);
         switch (cleanup_phase) {
         case 4:
-                if (rc)
-                        kmem_cache_free(mds_file_cache, mfd);
+                if (rc && !S_ISLNK(dchild->d_inode->i_mode))
+                        mds_mfd_destroy(mfd);
         case 3:
-                /* This is the same logic as in the IT_OPEN part of 
+                /* This is the same logic as in the IT_OPEN part of
                  * ldlm_intent_policy: if we found the dentry, or we tried to
                  * open it (meaning that we created, if it wasn't found), then
                  * we return the lock to the caller and client. */
                 if (!(rep->lock_policy_res1 & (IT_OPEN_OPEN | IT_OPEN_POS)))
                         ldlm_lock_decref(child_lockh, child_mode);
         case 2:
-                if (rc
-                    l_dput(dchild);
+                if (rc || S_ISLNK(dchild->d_inode->i_mode))
+                        l_dput(dchild);
         case 1:
                 l_dput(parent);
                 if (rc) {