Whamcloud - gitweb
merge b_devel into HEAD, which will become 0.7.3
[fs/lustre-release.git] / lustre / mds / mds_open.c
index 04d6ee9..2bd2f8c 100644 (file)
 
 #include "mds_internal.h"
 
-extern inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req);
-int mds_finish_transno(struct mds_obd *mds, struct inode *i, void *handle,
-                       struct ptlrpc_request *req, int rc, __u32 op_data);
-extern int enqueue_ordered_locks(int lock_mode, struct obd_device *obd,
-                                 struct ldlm_res_id *p1_res_id,
-                                 struct ldlm_res_id *p2_res_id,
-                                 struct ldlm_res_id *c1_res_id,
-                                 struct ldlm_res_id *c2_res_id,
-                                 struct lustre_handle *p1_lockh,
-                                 struct lustre_handle *p2_lockh,
-                                 struct lustre_handle *c1_lockh,
-                                 struct lustre_handle *c2_lockh);
-
 struct mds_file_data *mds_dentry_open(struct dentry *dentry,
                                       struct vfsmount *mnt,
                                       int flags,
@@ -65,17 +52,16 @@ struct mds_file_data *mds_dentry_open(struct dentry *dentry,
 {
         struct mds_export_data *med = &req->rq_export->exp_mds_data;
         struct inode *inode;
-        int mode;
         struct mds_file_data *mfd;
-        int error;
+        int mode, error;
 
         mfd = mds_mfd_new();
-        if (!mfd) {
+        if (mfd == NULL) {
                 CERROR("mds: out of memory\n");
                 GOTO(cleanup_dentry, error = -ENOMEM);
         }
 
-        mode = (flags+1) & O_ACCMODE;
+        mode = (flags + 1) & O_ACCMODE;
         inode = dentry->d_inode;
 
         if (mode & FMODE_WRITE) {
@@ -107,6 +93,7 @@ void reconstruct_open(struct mds_update_record *rec, int offset,
                       struct ptlrpc_request *req,
                       struct lustre_handle *child_lockh)
 {
+        struct ptlrpc_request *oldreq = req->rq_export->exp_outstanding_reply;
         struct mds_export_data *med = &req->rq_export->exp_mds_data;
         struct mds_client_data *mcd = med->med_mcd;
         struct mds_obd *mds = mds_req2mds(req);
@@ -115,7 +102,7 @@ void reconstruct_open(struct mds_update_record *rec, int offset,
         struct dentry *parent, *child;
         struct ldlm_reply *rep;
         struct mds_body *body;
-        int disp, rc;
+        int rc;
         struct list_head *t;
         int put_child = 1;
         ENTRY;
@@ -127,14 +114,13 @@ void reconstruct_open(struct mds_update_record *rec, int offset,
         /* copy rc, transno and disp; steal locks */
         req->rq_transno = mcd->mcd_last_transno;
         req->rq_status = mcd->mcd_last_result;
-        disp = rep->lock_policy_res1 = mcd->mcd_last_data;
+        intent_set_disposition(rep, mcd->mcd_last_data);
 
-        if (req->rq_export->exp_outstanding_reply)
+        if (oldreq)
                 mds_steal_ack_locks(req->rq_export, req);
 
-        /* We never care about these. */
-        disp &= ~(IT_OPEN_LOOKUP | IT_OPEN_POS | IT_OPEN_NEG);
-        if (!disp) {
+        /* Only replay if create or open actually happened. */
+        if (!intent_disposition(rep, DISP_OPEN_CREATE | DISP_OPEN_OPEN) ) {
                 EXIT;
                 return; /* error looking up parent or child */
         }
@@ -149,11 +135,11 @@ void reconstruct_open(struct mds_update_record *rec, int offset,
                 GOTO(out_dput, 0); /* child not present to open */
         }
 
-        /* At this point, we know we have a child, which means that we'll send
-         * it back _unless_ it was open failed, _and_ we didn't create the file.
-         * I love you guys.  No, really.
+        /* At this point, we know we have a child. We'll send
+         * it back _unless_ it not created and open failed.
          */
-        if (((disp & (IT_OPEN_OPEN | IT_OPEN_CREATE)) == IT_OPEN_OPEN) &&
+        if (intent_disposition(rep, DISP_OPEN_OPEN) &&
+            !intent_disposition(rep, DISP_OPEN_CREATE) &&
             req->rq_status) {
                 GOTO(out_dput, 0);
         }
@@ -165,8 +151,14 @@ void reconstruct_open(struct mds_update_record *rec, int offset,
         if (S_ISREG(child->d_inode->i_mode)) {
                 rc = mds_pack_md(obd, req->rq_repmsg, 2, body,
                                  child->d_inode);
+
                 if (rc)
                         LASSERT(rc == req->rq_status);
+
+                /* If we have LOV EA data, the OST holds size, mtime */
+                if (!(body->valid & OBD_MD_FLEASIZE))
+                        body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
+                                        OBD_MD_FLATIME | OBD_MD_FLMTIME);
         } else {
                 /* XXX need to check this case */
         }
@@ -185,7 +177,7 @@ void reconstruct_open(struct mds_update_record *rec, int offset,
         /* If we didn't get as far as trying to open, then some locking thing
          * probably went wrong, and we'll just bail here.
          */
-        if ((disp & IT_OPEN_OPEN) == 0)
+        if (!intent_disposition(rep, DISP_OPEN_OPEN))
                 GOTO(out_dput, 0);
 
         /* If we failed, then we must have failed opening, so don't look for
@@ -197,12 +189,12 @@ void reconstruct_open(struct mds_update_record *rec, int offset,
         mfd = NULL;
         list_for_each(t, &med->med_open_head) {
                 mfd = list_entry(t, struct mds_file_data, mfd_list);
-                if (mfd->mfd_xid == req->rq_xid) 
+                if (mfd->mfd_xid == req->rq_xid)
                         break;
                 mfd = NULL;
         }
 
-        if (req->rq_export->exp_outstanding_reply) {
+        if (oldreq) {
                 /* if we're not recovering, it had better be found */
                 LASSERT(mfd);
         } else if (mfd == NULL) {
@@ -226,35 +218,180 @@ void reconstruct_open(struct mds_update_record *rec, int offset,
         EXIT;
 }
 
+int mds_pin(struct ptlrpc_request *req)
+{
+        struct mds_obd *mds = mds_req2mds(req);
+        struct inode *pending_dir = mds->mds_pending_dir->d_inode;
+        struct mds_file_data *mfd = NULL;
+        struct mds_body *body;
+        struct dentry *dchild;
+        struct obd_run_ctxt saved;
+        char fidname[LL_FID_NAMELEN];
+        int fidlen = 0, rc, cleanup_phase = 0, size = sizeof(*body);
+        ENTRY;
+
+        body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
+
+        down(&pending_dir->i_sem);
+        fidlen = ll_fid2str(fidname, body->fid1.id, body->fid1.generation);
+        dchild = lookup_one_len(fidname, mds->mds_pending_dir, fidlen);
+        if (IS_ERR(dchild)) {
+                up(&pending_dir->i_sem);
+                rc = PTR_ERR(dchild);
+                CERROR("error looking up %s in PENDING: rc = %d\n",
+                       fidname, rc);
+                RETURN(rc);
+        }
+
+        cleanup_phase = 2;
+
+        if (dchild->d_inode) {
+                up(&pending_dir->i_sem);
+                mds_inode_set_orphan(dchild->d_inode);
+                mds_pack_inode2fid(&body->fid1, dchild->d_inode);
+                mds_pack_inode2body(body, dchild->d_inode);
+                GOTO(openit, rc = 0);
+        }
+        dput(dchild);
+        up(&pending_dir->i_sem);
+
+        /* We didn't find it in PENDING so it isn't an orphan.  See
+         * if it's a regular inode. */
+        dchild = mds_fid2dentry(mds, &body->fid1, NULL);
+        if (!IS_ERR(dchild)) {
+                mds_pack_inode2fid(&body->fid1, dchild->d_inode);
+                mds_pack_inode2body(body, dchild->d_inode);
+                GOTO(openit, rc = 0);
+        }
+
+        /* We didn't find this inode on disk, but we're trying to pin it.
+         * This should never happen. */
+        CERROR("ENOENT during mds_pin for fid "LPU64"/%u\n", body->fid1.id,
+               body->fid1.generation);
+        RETURN(-ENOENT);
+
+ openit:
+        /* dentry_open does a dput(de) and mntput(mds->mds_vfsmnt) on error */
+        mfd = mds_dentry_open(dchild, mds->mds_vfsmnt, body->flags, req);
+        if (IS_ERR(mfd)) {
+                dchild = NULL; /* prevent a double dput in cleanup phase 2 */
+                GOTO(cleanup, rc = PTR_ERR(mfd));
+        }
+
+        rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
+        if (rc) {
+                CERROR("out of memoryK\n");
+                GOTO(cleanup, rc);
+        }
+        body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*body));
+
+        cleanup_phase = 4; /* mfd allocated */
+        body->handle.cookie = mfd->mfd_handle.h_cookie;
+        CDEBUG(D_INODE, "mfd %p, cookie "LPX64"\n", mfd,
+               mfd->mfd_handle.h_cookie);
+        GOTO(cleanup, rc = 0);
+
+ cleanup:
+        push_ctxt(&saved, &mds->mds_ctxt, NULL);
+        rc = mds_finish_transno(mds, dchild ? dchild->d_inode : NULL, NULL,
+                                req, rc, 0);
+        pop_ctxt(&saved, &mds->mds_ctxt, NULL);
+        /* XXX what do we do here if mds_finish_transno itself failed? */
+        switch (cleanup_phase) {
+        case 4:
+                if (rc)
+                        mds_mfd_destroy(mfd);
+        case 2:
+                if (rc || S_ISLNK(dchild->d_inode->i_mode))
+                        l_dput(dchild);
+        }
+        return rc;
+}
+
 int mds_open(struct mds_update_record *rec, int offset,
              struct ptlrpc_request *req, struct lustre_handle *child_lockh)
 {
+        /* XXX ALLOCATE _something_ - 464 bytes on stack here */
         static const char acc_table [] = {[O_RDONLY] MAY_READ,
                                           [O_WRONLY] MAY_WRITE,
                                           [O_RDWR]   MAY_READ | MAY_WRITE};
         struct mds_obd *mds = mds_req2mds(req);
         struct obd_device *obd = req->rq_export->exp_obd;
-        struct ldlm_reply *rep;
-        struct mds_body *body;
-        struct dentry *dchild = NULL, *parent;
+        struct ldlm_reply *rep = NULL;
+        struct mds_body *body = NULL;
+        struct dentry *dchild = NULL, *parent = NULL;
         struct mds_export_data *med;
         struct mds_file_data *mfd = NULL;
         struct ldlm_res_id child_res_id = { .name = {0} };
         struct lustre_handle parent_lockh;
         int rc = 0, parent_mode, child_mode = LCK_PR, lock_flags, created = 0;
-        int cleanup_phase = 0;
+        int cleanup_phase = 0, acc_mode;
         void *handle = NULL;
-        int acc_mode;
         ENTRY;
 
-        LASSERT(offset == 2);                  /* only called via intent */
-        rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
-        body = lustre_msg_buf(req->rq_repmsg, 1, sizeof (*body));
+        if (offset == 2) { /* intent */
+                rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
+                body = lustre_msg_buf(req->rq_repmsg, 1, sizeof (*body));
+        } else if (offset == 0) { /* non-intent reint */
+                body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
+        } else {
+                body = NULL;
+                LBUG();
+        }
 
         MDS_CHECK_RESENT(req, reconstruct_open(rec, offset, req, child_lockh));
 
+        /* Step 0: If we are passed a fid, then we assume the client already
+         * opened this file and is only replaying the RPC, so we open the
+         * inode by fid (at some large expense in security).
+         */
+        if (rec->ur_fid2->id) {
+                struct inode *pending_dir = mds->mds_pending_dir->d_inode;
+                char fidname[LL_FID_NAMELEN];
+                int fidlen = 0;
+
+                down(&pending_dir->i_sem);
+                fidlen = ll_fid2str(fidname, rec->ur_fid2->id,
+                                    rec->ur_fid2->generation);
+                dchild = lookup_one_len(fidname, mds->mds_pending_dir, fidlen);
+                if (IS_ERR(dchild)) {
+                        up(&pending_dir->i_sem);
+                        rc = PTR_ERR(dchild);
+                        CERROR("error looking up %s in PENDING: rc = %d\n",
+                               fidname, rc);
+                        RETURN(rc);
+                }
+
+                if (dchild->d_inode) {
+                        up(&pending_dir->i_sem);
+                        mds_inode_set_orphan(dchild->d_inode);
+                        mds_pack_inode2fid(&body->fid1, dchild->d_inode);
+                        mds_pack_inode2body(body, dchild->d_inode);
+                        cleanup_phase = 2;
+                        GOTO(openit, rc = 0);
+                }
+                dput(dchild);
+                up(&pending_dir->i_sem);
+
+                /* We didn't find it in PENDING so it isn't an orphan.  See
+                 * if it was a regular inode that was previously created.
+                 */
+                dchild = mds_fid2dentry(mds, rec->ur_fid2, NULL);
+                if (!IS_ERR(dchild)) {
+                        mds_pack_inode2fid(&body->fid1, dchild->d_inode);
+                        mds_pack_inode2body(body, dchild->d_inode);
+                        cleanup_phase = 2;
+                        GOTO(openit, rc = 0);
+                }
+
+                /* We didn't find the correct inode on disk either, so we
+                 * need to re-create it via a regular replay.  Do that below.
+                 */
+                LASSERT(rec->ur_flags & O_CREAT);
+        }
+        LASSERT(offset == 2); /* If we got here, we must be called via intent */
+
         med = &req->rq_export->exp_mds_data;
-        rep->lock_policy_res1 |= IT_OPEN_LOOKUP;
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK)) {
                 CERROR("test case OBD_FAIL_MDS_OPEN_PACK\n");
                 req->rq_status = -ENOMEM;
@@ -263,11 +400,12 @@ int mds_open(struct mds_update_record *rec, int offset,
 
         if ((rec->ur_flags & O_ACCMODE) >= sizeof (acc_table))
                 RETURN(-EINVAL);
-        acc_mode = acc_table [rec->ur_flags & O_ACCMODE];
+        acc_mode = acc_table[rec->ur_flags & O_ACCMODE];
         if ((rec->ur_flags & O_TRUNC) != 0)
                 acc_mode |= MAY_WRITE;
 
         /* Step 1: Find and lock the parent */
+        intent_set_disposition(rep, DISP_LOOKUP_EXECD);
         parent_mode = (rec->ur_flags & O_CREAT) ? LCK_PW : LCK_PR;
         parent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, parent_mode,
                                        &parent_lockh);
@@ -288,38 +426,88 @@ int mds_open(struct mds_update_record *rec, int offset,
         cleanup_phase = 2; /* child dentry */
 
         if (dchild->d_inode)
-                rep->lock_policy_res1 |= IT_OPEN_POS;
+                intent_set_disposition(rep, DISP_LOOKUP_POS);
         else
-                rep->lock_policy_res1 |= IT_OPEN_NEG;
+                intent_set_disposition(rep, DISP_LOOKUP_NEG);
 
         /* Step 3: If the child was negative, and we're supposed to,
          * create it. */
         if (!dchild->d_inode) {
+                unsigned long ino = rec->ur_fid2->id;
+
                 if (!(rec->ur_flags & O_CREAT)) {
                         /* It's negative and we weren't supposed to create it */
                         GOTO(cleanup, rc = -ENOENT);
                 }
 
-                rep->lock_policy_res1 |= IT_OPEN_CREATE;
-                handle = fsfilt_start(obd, parent->d_inode, FSFILT_OP_CREATE);
+                intent_set_disposition(rep, DISP_OPEN_CREATE);
+                handle = fsfilt_start(obd, parent->d_inode, FSFILT_OP_CREATE,
+                                      NULL);
                 if (IS_ERR(handle)) {
                         rc = PTR_ERR(handle);
                         handle = NULL;
                         GOTO(cleanup, rc);
                 }
+                if (ino)
+                        dchild->d_fsdata = (void *)(unsigned long)ino;
+
                 rc = vfs_create(parent->d_inode, dchild, rec->ur_mode);
-                if (rc)
+                if (dchild->d_fsdata == (void *)(unsigned long)ino)
+                        dchild->d_fsdata = NULL;
+
+                if (rc) {
+                        CDEBUG(D_INODE, "error during create: %d\n", rc);
                         GOTO(cleanup, rc);
-                created = 1;
+                } else {
+                        struct iattr iattr;
+                        struct inode *inode = dchild->d_inode;
+
+                        if (ino) {
+                                LASSERT(ino == inode->i_ino);
+                                /* Written as part of setattr */
+                                inode->i_generation = rec->ur_fid2->generation;
+                                CDEBUG(D_HA, "recreated ino %lu with gen %x\n",
+                                       inode->i_ino, inode->i_generation);
+                        }
+
+                        created = 1;
+                        LTIME_S(iattr.ia_atime) = rec->ur_time;
+                        LTIME_S(iattr.ia_ctime) = rec->ur_time;
+                        LTIME_S(iattr.ia_mtime) = rec->ur_time;
+
+                        iattr.ia_uid = rec->ur_uid;
+                        if (parent->d_inode->i_mode & S_ISGID) {
+                                iattr.ia_gid = parent->d_inode->i_gid;
+                        } else
+                                iattr.ia_gid = rec->ur_gid;
+
+                        iattr.ia_valid = ATTR_UID | ATTR_GID | ATTR_ATIME |
+                                ATTR_MTIME | ATTR_CTIME;
+
+                        rc = fsfilt_setattr(obd, dchild, handle, &iattr, 0);
+                        if (rc) {
+                                CERROR("error on setattr: rc = %d\n", rc);
+                                /* XXX should we abort here in case of error? */
+                        }
+                }
+
                 child_mode = LCK_PW;
                 acc_mode = 0;                  /* Don't check for permissions */
         }
 
+        LASSERT(!mds_inode_is_orphan(dchild->d_inode));
+
         /* Step 4: It's positive, so lock the child */
         child_res_id.name[0] = dchild->d_inode->i_ino;
         child_res_id.name[1] = dchild->d_inode->i_generation;
  reacquire:
         lock_flags = 0;
+        /* For the open(O_CREAT) case, this would technically be a lock
+         * inversion (getting a VFS lock after starting a transaction),
+         * but in that case we cannot possibly block on this lock because
+         * we just created the child and also hold a write lock on the
+         * parent, so nobody could be holding the lock yet.
+         */
         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
                               child_res_id, LDLM_PLAIN, NULL, 0, child_mode,
                               &lock_flags, ldlm_completion_ast,
@@ -346,15 +534,19 @@ int mds_open(struct mds_update_record *rec, int offset,
 
                 /* An append-only file must be opened in append mode for
                  * writing */
-                if (IS_APPEND(dchild->d_inode) &&
-                    (acc_mode & MAY_WRITE) != 0 &&
+                if (IS_APPEND(dchild->d_inode) && (acc_mode & MAY_WRITE) != 0 &&
                     ((rec->ur_flags & O_APPEND) == 0 ||
                      (rec->ur_flags & O_TRUNC) != 0))
-                        GOTO (cleanup, rc = -EPERM);
+                        GOTO(cleanup, rc = -EPERM);
 
                 rc = mds_pack_md(obd, req->rq_repmsg, 2, body, dchild->d_inode);
                 if (rc)
                         GOTO(cleanup, rc);
+
+                /* If we have LOV EA data, the OST holds size, mtime */
+                if (!(body->valid & OBD_MD_FLEASIZE))
+                        body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
+                                        OBD_MD_FLATIME | OBD_MD_FLMTIME);
         }
 
         if (!created && (rec->ur_flags & O_CREAT) &&
@@ -364,9 +556,9 @@ int mds_open(struct mds_update_record *rec, int offset,
                 GOTO(cleanup, rc = -EEXIST); // returns a lock to the client
         }
 
-        /* If we're opening a file without an EA, the client needs a write
-         * lock. */
-        if (S_ISREG(dchild->d_inode->i_mode) &&
+        /* If we're opening a file without an EA for write, the client needs
+         * a write lock. */
+        if (S_ISREG(dchild->d_inode->i_mode) && (rec->ur_flags & O_ACCMODE) &&
             child_mode != LCK_PW && !(body->valid & OBD_MD_FLEASIZE)) {
                 ldlm_lock_decref(child_lockh, child_mode);
                 child_mode = LCK_PW;
@@ -381,15 +573,14 @@ int mds_open(struct mds_update_record *rec, int offset,
                 GOTO(cleanup, rc = -ENOTDIR);
 
         /* Step 5: mds_open it */
-        rep->lock_policy_res1 |= IT_OPEN_OPEN;
-
+        intent_set_disposition(rep, DISP_OPEN_OPEN);
+ openit:
         /* dentry_open does a dput(de) and mntput(mds->mds_vfsmnt) on error */
         mfd = mds_dentry_open(dchild, mds->mds_vfsmnt,
                               rec->ur_flags & ~(O_DIRECT | O_TRUNC), req);
-        if (!mfd) {
-                CERROR("mds: out of memory\n");
-                dchild = NULL; /* prevent a double dput in step 2 */
-                GOTO(cleanup, rc = -ENOMEM);
+        if (IS_ERR(mfd)) {
+                dchild = NULL; /* prevent a double dput in cleanup phase 2 */
+                GOTO(cleanup, rc = PTR_ERR(mfd));
         }
 
         cleanup_phase = 4; /* mfd allocated */
@@ -401,6 +592,7 @@ int mds_open(struct mds_update_record *rec, int offset,
  cleanup:
         rc = mds_finish_transno(mds, dchild ? dchild->d_inode : NULL, handle,
                                 req, rc, rep->lock_policy_res1);
+        /* XXX what do we do here if mds_finish_transno itself failed? */
         switch (cleanup_phase) {
         case 4:
                 if (rc && !S_ISLNK(dchild->d_inode->i_mode))
@@ -410,19 +602,22 @@ int mds_open(struct mds_update_record *rec, int offset,
                  * ldlm_intent_policy: if we found the dentry, or we tried to
                  * open it (meaning that we created, if it wasn't found), then
                  * we return the lock to the caller and client. */
-                if (!(rep->lock_policy_res1 & (IT_OPEN_OPEN | IT_OPEN_POS)))
+                if (intent_disposition(rep, DISP_LOOKUP_NEG) &&
+                    !intent_disposition(rep, DISP_OPEN_OPEN))
                         ldlm_lock_decref(child_lockh, child_mode);
         case 2:
                 if (rc || S_ISLNK(dchild->d_inode->i_mode))
                         l_dput(dchild);
         case 1:
-                l_dput(parent);
-                if (rc) {
-                        ldlm_lock_decref(&parent_lockh, parent_mode);
-                } else {
-                        memcpy(&req->rq_ack_locks[0].lock, &parent_lockh,
-                               sizeof(parent_lockh));
-                        req->rq_ack_locks[0].mode = parent_mode;
+                if (parent) {
+                        l_dput(parent);
+                        if (rc) {
+                                ldlm_lock_decref(&parent_lockh, parent_mode);
+                        } else {
+                                memcpy(&req->rq_ack_locks[0].lock,&parent_lockh,
+                                       sizeof(parent_lockh));
+                                req->rq_ack_locks[0].mode = parent_mode;
+                        }
                 }
         }
         RETURN(rc);