Whamcloud - gitweb
- Store open-request XID in the MFD, so that we can find it during reply
authorshaver <shaver>
Thu, 20 Feb 2003 07:36:15 +0000 (07:36 +0000)
committershaver <shaver>
Thu, 20 Feb 2003 07:36:15 +0000 (07:36 +0000)
  reconstruction.
- Untested reply reconstruction for GETATTR_NAME (easy) and OPEN (not easy,
  ahem).
- Don't leak dentry refs when reconstructing replies for reint_setattr or
  reint_create.
- Don't double-dput dchild if dentry_open fails.  (Not yet in test suite, bug
  894 filed to trick someone into adding it.)

lustre/mds/mds_open.c

index e02e9fd..c8c655e 100644 (file)
@@ -57,6 +57,119 @@ extern int enqueue_ordered_locks(int lock_mode, struct obd_device *obd,
                                  struct lustre_handle *c1_lockh,
                                  struct lustre_handle *c2_lockh);
 
+void reconstruct_open(struct mds_update_record *rec, struct ptlrpc_request *req)
+{
+        struct mds_export_data *med = &req->rq_export->exp_mds_data;
+        struct mds_client_data *mcd = med->med_mcd;
+        struct mds_obd *mds = mds_req2mds(req);
+        struct mds_file_data *mfd;
+        struct obd_device *obd = req->rq_export->exp_obd;
+        struct dentry *parent, *child;
+        struct ldlm_reply *rep = lustre_msg_buf(req->rq_repmsg, 0);
+        struct mds_body *body = lustre_msg_buf(req->rq_repmsg, 1);
+        int disp;
+
+        /* copy rc, transno and disp; steal locks */
+
+        req->rq_transno = mcd->mcd_last_transno;
+        req->rq_status = mcd->mcd_last_result;
+        disp = rep->lock_policy_res1 = mcd->mcd_last_data;
+        
+        if (med->med_outstanding_reply)
+                mds_steal_ack_locks(med, req);
+        
+        /* We never care about these. */
+        disp &= ~(IT_OPEN_LOOKUP | IT_OPEN_POS | IT_OPEN_NEG);
+        if (!disp) {
+                return; /* error looking up parent or child */
+        }
+
+        parent = mds_fid2dentry(mds, rec->ur_fid1, NULL);
+        LASSERT(!IS_ERR(parent));
+
+        child = lookup_one_len(lustre_msg_buf(req->rq_reqmsg, 3),
+                               parent, req->rq_reqmsg->buflens[3] - 1);
+        LASSERT(!IS_ERR(child));
+        
+        if (!child->d_inode) {
+                GOTO(out_dput, 0); /* child not present to open */
+        }
+
+        /* At this point, we know we have a child, which means that we'll send
+         * it back _unless_ it was open failed, _and_ we didn't create the file.
+         * I love you guys.  No, really.
+         */
+        if (((disp & (IT_OPEN_OPEN | IT_OPEN_CREATE)) == IT_OPEN_OPEN) &&
+            req->rq_status) {
+                GOTO(out_dput, 0);
+        }
+
+        mds_pack_inode2fid(&body->fid1, child->d_inode);
+        mds_pack_inode2body(body, child->d_inode);
+
+        /* If we have -EEXIST as the status, and we were asked to create
+         * exclusively, we can tell we failed because the file already existed.
+         *
+         * We can also tell that Phil had scallops for dinner.
+         */
+        if (req->rq_status == -EEXIST &&
+            ((rec->ur_flags & (O_CREAT | O_EXCL)) == (O_CREAT | O_EXCL))) {
+                GOTO(out_dput, 0);
+        }
+
+        /* If we didn't get as far as trying to open, then some locking thing
+         * probably went wrong, and we'll just bail here.
+         */
+        if ((disp & IT_OPEN_OPEN) == 0) {
+                GOTO(out_dput, 0);
+        }
+
+        /* If we failed, then we must have failed opening, so don't look for
+         * file descriptor or anything, just give the client the bad news.
+         */
+        if (req->rq_status) {
+                GOTO(out_dput, 0);
+        }
+
+        if ((obd->obd_flags & OBD_RECOVERING) == 0) {
+                struct list_head *t;
+                list_for_each(t, &med->med_open_head) {
+                        mfd = list_entry(t, struct mds_file_data, mfd_list);
+                        if (mfd->mfd_xid == req->rq_xid)
+                                break;
+                        mfd = NULL;
+                }
+                /* if we're not recovering, it had better be found */
+                LASSERT(mfd);
+        } else {
+                struct file *file;
+                mfd = kmem_cache_alloc(mds_file_cache, GFP_KERNEL);
+                if (!mfd) {
+                        CERROR("mds: out of memory\n");
+                        GOTO(out_dput, req->rq_status = -ENOMEM);
+                }
+                mntget(mds->mds_vfsmnt);
+                file = dentry_open(child, mds->mds_vfsmnt,
+                                   rec->ur_flags & ~(O_DIRECT | O_TRUNC));
+                LASSERT(!IS_ERR(file)); /* XXX -ENOMEM? */
+                file->private_data = mfd;
+                mfd->mfd_file = file;
+                mfd->mfd_xid = req->rq_xid;
+                get_random_bytes(&mfd->mfd_servercookie,
+                                 sizeof(mfd->mfd_servercookie));
+                spin_lock(&med->med_open_lock);
+                list_add(&mfd->mfd_list, &med->med_open_head);
+                spin_unlock(&med->med_open_lock);
+        }
+                
+        body->handle.addr = (__u64)(unsigned long)mfd;
+        body->handle.cookie = mfd->mfd_servercookie;
+
+ out_dput:
+        l_dput(child);
+        l_dput(parent);
+}
+
 int mds_open(struct mds_update_record *rec, int offset,
              struct ptlrpc_request *req, struct lustre_handle *child_lockh)
 {
@@ -73,25 +186,7 @@ int mds_open(struct mds_update_record *rec, int offset,
         int rc = 0, parent_mode, child_mode = LCK_PR, lock_flags, created = 0;
         ENTRY;
 
-#warning replay of open needs to be redone
-        /* was this animal open already and the client lost the reply? */
-        /* XXX need some way to detect a reopen, to avoid locked list walks */
         med = &req->rq_export->exp_mds_data;
-#if 0
-        spin_lock(&med->med_open_lock);
-        list_for_each(tmp, &med->med_open_head) {
-                mfd = list_entry(tmp, typeof(*mfd), mfd_list);
-                if (!memcmp(&mfd->mfd_clienthandle, &body->handle,
-                            sizeof(mfd->mfd_clienthandle)) &&
-                    body->fid1.id == mfd->mfd_file->f_dentry->d_inode->i_ino) {
-                        dchild = mfd->mfd_file->f_dentry;
-                        spin_unlock(&med->med_open_lock);
-                        CERROR("Re opening "LPD64"\n", body->fid1.id);
-                        GOTO(out_pack, rc = 0);
-                }
-        }
-        spin_unlock(&med->med_open_lock);
-#endif
         rep->lock_policy_res1 |= IT_OPEN_LOOKUP;
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK)) {
                 CERROR("test case OBD_FAIL_MDS_OPEN_PACK\n");
@@ -202,11 +297,14 @@ int mds_open(struct mds_update_record *rec, int offset,
         mntget(mds->mds_vfsmnt);
         file = dentry_open(dchild, mds->mds_vfsmnt,
                            rec->ur_flags & ~(O_DIRECT | O_TRUNC));
-        if (IS_ERR(file))
+        if (IS_ERR(file)) {
+                dchild = NULL; /* prevent a double dput in step 3 */
                 GOTO(out_step_5, rc = PTR_ERR(file));
+        }
 
         file->private_data = mfd;
         mfd->mfd_file = file;
+        mfd->mfd_xid = req->rq_xid;
         get_random_bytes(&mfd->mfd_servercookie, sizeof(mfd->mfd_servercookie));
         spin_lock(&med->med_open_lock);
         list_add(&mfd->mfd_list, &med->med_open_head);
@@ -226,7 +324,8 @@ int mds_open(struct mds_update_record *rec, int offset,
  out_step_4:
         ldlm_lock_decref(child_lockh, child_mode);
  out_step_3:
-        l_dput(dchild);
+        if (dchild)
+                l_dput(dchild);
  out_step_2:
         l_dput(parent);
         if (rc) {