Whamcloud - gitweb
- macroize and share check-for-resent-req code.
authorshaver <shaver>
Mon, 24 Feb 2003 17:05:30 +0000 (17:05 +0000)
committershaver <shaver>
Mon, 24 Feb 2003 17:05:30 +0000 (17:05 +0000)
- defensive assertion in mds_close_mfd to help catch dentry mismanagement
- transnos for getattr_name and open, always.
- don't free the repmsg too early when we're dropping replies for testing
- ldlm_intent_policy knows how to fix up lock handles for reconstructed requests,
  and not give the lock away to the client twice
- tchmod exits with errno on failure.

lustre/mds/mds_open.c

index b28ae50..8e15586 100644 (file)
@@ -57,7 +57,8 @@ extern int enqueue_ordered_locks(int lock_mode, struct obd_device *obd,
                                  struct lustre_handle *c1_lockh,
                                  struct lustre_handle *c2_lockh);
 
-void reconstruct_open(struct mds_update_record *rec, struct ptlrpc_request *req)
+void reconstruct_open(struct mds_update_record *rec, struct ptlrpc_request *req,
+                      struct lustre_handle *child_lockh)
 {
         struct mds_export_data *med = &req->rq_export->exp_mds_data;
         struct mds_client_data *mcd = med->med_mcd;
@@ -67,11 +68,12 @@ void reconstruct_open(struct mds_update_record *rec, struct ptlrpc_request *req)
         struct dentry *parent, *child;
         struct ldlm_reply *rep = lustre_msg_buf(req->rq_repmsg, 0);
         struct mds_body *body = lustre_msg_buf(req->rq_repmsg, 1);
-        int disp;
+        int disp, rc;
         ENTRY;
 
-        /* copy rc, transno and disp; steal locks */
+        ENTRY;
 
+        /* copy rc, transno and disp; steal locks */
         req->rq_transno = mcd->mcd_last_transno;
         req->rq_status = mcd->mcd_last_result;
         disp = rep->lock_policy_res1 = mcd->mcd_last_data;
@@ -82,6 +84,7 @@ void reconstruct_open(struct mds_update_record *rec, struct ptlrpc_request *req)
         /* We never care about these. */
         disp &= ~(IT_OPEN_LOOKUP | IT_OPEN_POS | IT_OPEN_NEG);
         if (!disp) {
+                EXIT;
                 return; /* error looking up parent or child */
         }
 
@@ -105,13 +108,28 @@ void reconstruct_open(struct mds_update_record *rec, struct ptlrpc_request *req)
                 GOTO(out_dput, 0);
         }
 
+        if (!med->med_outstanding_reply) {
+                LBUG(); /* XXX need to get enqueue client lock */
+        }
+
+        /* get lock (write for O_CREAT, read otherwise) */
+        
         mds_pack_inode2fid(&body->fid1, child->d_inode);
         mds_pack_inode2body(body, child->d_inode);
+        if (S_ISREG(child->d_inode->i_mode)) {
+                rc = mds_pack_md(obd, req->rq_repmsg, 2, body,
+                                 child->d_inode);
+                if (rc)
+                        LASSERT(rc == req->rq_status);
+        } else {
+                /* XXX need to check this case */
+        }
 
+        /* If we're opening a file without an EA, change to a write
+           lock (unless we already have one). */
+                   
         /* If we have -EEXIST as the status, and we were asked to create
          * exclusively, we can tell we failed because the file already existed.
-         *
-         * We can also tell that Phil had scallops for dinner.
          */
         if (req->rq_status == -EEXIST &&
             ((rec->ur_flags & (O_CREAT | O_EXCL)) == (O_CREAT | O_EXCL))) {
@@ -132,9 +150,11 @@ void reconstruct_open(struct mds_update_record *rec, struct ptlrpc_request *req)
                 GOTO(out_dput, 0);
         }
 
-        if ((obd->obd_flags & OBD_RECOVERING) == 0) {
+        if (med->med_outstanding_reply) {
                 struct list_head *t;
                 mfd = NULL;
+                /* XXX can we just look in the old reply to find the handle in
+                 * XXX O(1) here? */
                 list_for_each(t, &med->med_open_head) {
                         mfd = list_entry(t, struct mds_file_data, mfd_list);
                         if (mfd->mfd_xid == req->rq_xid)
@@ -170,6 +190,7 @@ void reconstruct_open(struct mds_update_record *rec, struct ptlrpc_request *req)
  out_dput:
         l_dput(child);
         l_dput(parent);
+        EXIT;
 }
 
 int mds_open(struct mds_update_record *rec, int offset,
@@ -186,18 +207,11 @@ int mds_open(struct mds_update_record *rec, int offset,
         struct ldlm_res_id child_res_id = { .name = {0} };
         struct lustre_handle parent_lockh;
         int rc = 0, parent_mode, child_mode = LCK_PR, lock_flags, created = 0;
+        int cleanup_phase = 0;
+        void *handle = NULL;
         ENTRY;
 
-        /* XXX macroize with reint_create, etc. */
-        if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
-                struct mds_client_data *mcd = 
-                        req->rq_export->exp_mds_data.med_mcd;
-                if (mcd->mcd_last_xid == req->rq_xid) {
-                        reconstruct_open(rec, req);
-                        RETURN(0);
-                }
-                DEBUG_REQ(D_HA, req, "no reply for resent request");
-        }
+        MDS_CHECK_RESENT(req, reconstruct_open(rec, req, child_lockh));
 
         med = &req->rq_export->exp_mds_data;
         rep->lock_policy_res1 |= IT_OPEN_LOOKUP;
@@ -214,15 +228,19 @@ int mds_open(struct mds_update_record *rec, int offset,
         if (IS_ERR(parent)) {
                 rc = PTR_ERR(parent);
                 CERROR("parent lookup error %d\n", rc);
-                RETURN(rc);
+                GOTO(cleanup, rc);
         }
         LASSERT(parent->d_inode);
 
+        cleanup_phase = 1; /* parent dentry and lock */
+
         /* Step 2: Lookup the child */
         dchild = lookup_one_len(lustre_msg_buf(req->rq_reqmsg, 3),
                                 parent, req->rq_reqmsg->buflens[3] - 1);
         if (IS_ERR(dchild))
-                GOTO(out_step_2, rc = PTR_ERR(dchild));
+                GOTO(cleanup, rc = PTR_ERR(dchild));
+
+        cleanup_phase = 2; /* child dentry */
 
         if (dchild->d_inode)
                 rep->lock_policy_res1 |= IT_OPEN_POS;
@@ -231,31 +249,24 @@ int mds_open(struct mds_update_record *rec, int offset,
 
         /* Step 3: If the child was negative, and we're supposed to,
          * create it. */
-        if ((rec->ur_flags & O_CREAT) && !dchild->d_inode) {
-                int err;
-                void *handle;
+        if (!dchild->d_inode) {
+                if (!(rec->ur_flags & O_CREAT)) {
+                        /* It's negative and we weren't supposed to create it */
+                        GOTO(cleanup, rc = -ENOENT);
+                }
+
                 rep->lock_policy_res1 |= IT_OPEN_CREATE;
                 handle = fsfilt_start(obd, parent->d_inode, FSFILT_OP_CREATE);
                 if (IS_ERR(handle)) {
                         rc = PTR_ERR(handle);
-                        mds_finish_transno(mds, parent->d_inode, handle, req,
-                                           rc, rep->lock_policy_res1);
-                        GOTO(out_step_3, rc);
+                        handle = NULL;
+                        GOTO(cleanup, rc);
                 }
                 rc = vfs_create(parent->d_inode, dchild, rec->ur_mode);
-                err = mds_finish_transno(mds, parent->d_inode, handle, req, rc,
-                                        rep->lock_policy_res1);
-                if (err) {
-                        CERROR("error on commit: err = %d\n", err);
-                        if (!rc)
-                                rc = err;
-                        GOTO(out_step_3, rc);
-                }
+                if (rc)
+                        GOTO(cleanup, rc);
                 created = 1;
                 child_mode = LCK_PW;
-        } else if (!dchild->d_inode) {
-                /* It's negative and we weren't supposed to create it */
-                GOTO(out_step_3, rc = -ENOENT);
         }
 
         /* Step 4: It's positive, so lock the child */
@@ -269,30 +280,36 @@ int mds_open(struct mds_update_record *rec, int offset,
                               mds_blocking_ast, NULL, NULL, child_lockh);
         if (rc != ELDLM_OK) {
                 CERROR("ldlm_cli_enqueue: %d\n", rc);
-                GOTO(out_step_3, rc = -EIO);
+                GOTO(cleanup, rc = -EIO);
         }
 
+        cleanup_phase = 3; /* child lock */
+
         mds_pack_inode2fid(&body->fid1, dchild->d_inode);
         mds_pack_inode2body(body, dchild->d_inode);
         if (S_ISREG(dchild->d_inode->i_mode)) {
                 rc = mds_pack_md(obd, req->rq_repmsg, 2, body, dchild->d_inode);
                 if (rc)
-                        GOTO(out_step_4, rc);
+                        GOTO(cleanup, rc);
         } else {
                 /* If this isn't a regular file, we can't open it. */
-                GOTO(out_step_3, rc = 0); /* returns the lock to the client */
+
+                /* We want to drop the child dentry, because we're not returning
+                 * failure (which would do this for us in step 2), and we're not
+                 * handing it off to the open file in dentry_open. */
+                l_dput(dchild);
+                GOTO(cleanup, rc = 0); /* returns the lock to the client */
         }
 
         if (!created && (rec->ur_flags & O_CREAT) && (rec->ur_flags & O_EXCL)) {
                 /* File already exists, we didn't just create it, and we
                  * were passed O_EXCL; err-or. */
-                GOTO(out_step_3, rc = -EEXIST); // returns a lock to the client
+                GOTO(cleanup, rc = -EEXIST); // returns a lock to the client
         }
 
         /* If we're opening a file without an EA, the client needs a write
          * lock. */
-        if (child_mode != LCK_PW && S_ISREG(dchild->d_inode->i_mode) &&
-            !(body->valid & OBD_MD_FLEASIZE)) {
+        if (child_mode != LCK_PW && !(body->valid & OBD_MD_FLEASIZE)) {
                 ldlm_lock_decref(child_lockh, child_mode);
                 child_mode = LCK_PW;
                 goto reacquire;
@@ -303,16 +320,18 @@ int mds_open(struct mds_update_record *rec, int offset,
         mfd = kmem_cache_alloc(mds_file_cache, GFP_KERNEL);
         if (!mfd) {
                 CERROR("mds: out of memory\n");
-                GOTO(out_step_4, rc = -ENOMEM);
+                GOTO(cleanup, rc = -ENOMEM);
         }
 
+        cleanup_phase = 4; /* mfd allocated */
+
         /* dentry_open does a dput(de) and mntput(mds->mds_vfsmnt) on error */
         mntget(mds->mds_vfsmnt);
         file = dentry_open(dchild, mds->mds_vfsmnt,
                            rec->ur_flags & ~(O_DIRECT | O_TRUNC));
         if (IS_ERR(file)) {
-                dchild = NULL; /* prevent a double dput in step 3 */
-                GOTO(out_step_5, rc = PTR_ERR(file));
+                dchild = NULL; /* prevent a double dput in step 2 */
+                GOTO(cleanup, rc = PTR_ERR(file));
         }
 
         file->private_data = mfd;
@@ -327,26 +346,34 @@ int mds_open(struct mds_update_record *rec, int offset,
         body->handle.cookie = mfd->mfd_servercookie;
         CDEBUG(D_INODE, "file %p: mfd %p, cookie "LPX64"\n",
                mfd->mfd_file, mfd, mfd->mfd_servercookie);
-        GOTO(out_step_2, rc = 0); /* returns a lock to the client */
+        GOTO(cleanup, rc = 0); /* returns a lock to the client */
 
- out_step_5:
-        if (mfd != NULL) {
-                kmem_cache_free(mds_file_cache, mfd);
-                mfd = NULL;
-        }
- out_step_4:
-        ldlm_lock_decref(child_lockh, child_mode);
- out_step_3:
-        if (dchild)
-                l_dput(dchild);
- out_step_2:
-        l_dput(parent);
-        if (rc) {
-                ldlm_lock_decref(&parent_lockh, parent_mode);
-        } else {
-                memcpy(&req->rq_ack_locks[0].lock, &parent_lockh,
-                       sizeof(parent_lockh));
-                req->rq_ack_locks[0].mode = parent_mode;
+ cleanup:
+        rc = mds_finish_transno(mds, dchild ? dchild->d_inode : NULL, handle,
+                                req, rc, rep->lock_policy_res1);
+        switch (cleanup_phase) {
+        case 4:
+                if (rc)
+                        kmem_cache_free(mds_file_cache, mfd);
+        case 3:
+                /* This is the same logic as in the IT_OPEN part of 
+                 * ldlm_intent_policy: if we found the dentry, or we tried to
+                 * open it (meaning that we created, if it wasn't found), then
+                 * we return the lock to the caller and client. */
+                if (!(rep->lock_policy_res1 & (IT_OPEN_OPEN | IT_OPEN_POS)))
+                        ldlm_lock_decref(child_lockh, child_mode);
+        case 2:
+                if (rc) 
+                    l_dput(dchild);
+        case 1:
+                l_dput(parent);
+                if (rc) {
+                        ldlm_lock_decref(&parent_lockh, parent_mode);
+                } else {
+                        memcpy(&req->rq_ack_locks[0].lock, &parent_lockh,
+                               sizeof(parent_lockh));
+                        req->rq_ack_locks[0].mode = parent_mode;
+                }
         }
         RETURN(rc);
 }