From 12e9f2d09890be33b4be95748c8cc93d85dff764 Mon Sep 17 00:00:00 2001 From: shaver Date: Thu, 20 Feb 2003 07:36:15 +0000 Subject: [PATCH] - Store open-request XID in the MFD, so that we can find it during reply reconstruction. - Untested reply reconstruction for GETATTR_NAME (easy) and OPEN (not easy, ahem). - Don't leak dentry refs when reconstructing replies for reint_setattr or reint_create. - Don't double-dput dchild if dentry_open fails. (Not yet in test suite, bug 894 filed to trick someone into adding it.) --- lustre/mds/mds_open.c | 139 ++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 119 insertions(+), 20 deletions(-) diff --git a/lustre/mds/mds_open.c b/lustre/mds/mds_open.c index e02e9fd..c8c655e 100644 --- a/lustre/mds/mds_open.c +++ b/lustre/mds/mds_open.c @@ -57,6 +57,119 @@ extern int enqueue_ordered_locks(int lock_mode, struct obd_device *obd, struct lustre_handle *c1_lockh, struct lustre_handle *c2_lockh); +void reconstruct_open(struct mds_update_record *rec, struct ptlrpc_request *req) +{ + struct mds_export_data *med = &req->rq_export->exp_mds_data; + struct mds_client_data *mcd = med->med_mcd; + struct mds_obd *mds = mds_req2mds(req); + struct mds_file_data *mfd; + struct obd_device *obd = req->rq_export->exp_obd; + struct dentry *parent, *child; + struct ldlm_reply *rep = lustre_msg_buf(req->rq_repmsg, 0); + struct mds_body *body = lustre_msg_buf(req->rq_repmsg, 1); + int disp; + + /* copy rc, transno and disp; steal locks */ + + req->rq_transno = mcd->mcd_last_transno; + req->rq_status = mcd->mcd_last_result; + disp = rep->lock_policy_res1 = mcd->mcd_last_data; + + if (med->med_outstanding_reply) + mds_steal_ack_locks(med, req); + + /* We never care about these. */ + disp &= ~(IT_OPEN_LOOKUP | IT_OPEN_POS | IT_OPEN_NEG); + if (!disp) { + return; /* error looking up parent or child */ + } + + parent = mds_fid2dentry(mds, rec->ur_fid1, NULL); + LASSERT(!IS_ERR(parent)); + + child = lookup_one_len(lustre_msg_buf(req->rq_reqmsg, 3), + parent, req->rq_reqmsg->buflens[3] - 1); + LASSERT(!IS_ERR(child)); + + if (!child->d_inode) { + GOTO(out_dput, 0); /* child not present to open */ + } + + /* At this point, we know we have a child, which means that we'll send + * it back _unless_ it was open failed, _and_ we didn't create the file. + * I love you guys. No, really. + */ + if (((disp & (IT_OPEN_OPEN | IT_OPEN_CREATE)) == IT_OPEN_OPEN) && + req->rq_status) { + GOTO(out_dput, 0); + } + + mds_pack_inode2fid(&body->fid1, child->d_inode); + mds_pack_inode2body(body, child->d_inode); + + /* If we have -EEXIST as the status, and we were asked to create + * exclusively, we can tell we failed because the file already existed. + * + * We can also tell that Phil had scallops for dinner. + */ + if (req->rq_status == -EEXIST && + ((rec->ur_flags & (O_CREAT | O_EXCL)) == (O_CREAT | O_EXCL))) { + GOTO(out_dput, 0); + } + + /* If we didn't get as far as trying to open, then some locking thing + * probably went wrong, and we'll just bail here. + */ + if ((disp & IT_OPEN_OPEN) == 0) { + GOTO(out_dput, 0); + } + + /* If we failed, then we must have failed opening, so don't look for + * file descriptor or anything, just give the client the bad news. + */ + if (req->rq_status) { + GOTO(out_dput, 0); + } + + if ((obd->obd_flags & OBD_RECOVERING) == 0) { + struct list_head *t; + list_for_each(t, &med->med_open_head) { + mfd = list_entry(t, struct mds_file_data, mfd_list); + if (mfd->mfd_xid == req->rq_xid) + break; + mfd = NULL; + } + /* if we're not recovering, it had better be found */ + LASSERT(mfd); + } else { + struct file *file; + mfd = kmem_cache_alloc(mds_file_cache, GFP_KERNEL); + if (!mfd) { + CERROR("mds: out of memory\n"); + GOTO(out_dput, req->rq_status = -ENOMEM); + } + mntget(mds->mds_vfsmnt); + file = dentry_open(child, mds->mds_vfsmnt, + rec->ur_flags & ~(O_DIRECT | O_TRUNC)); + LASSERT(!IS_ERR(file)); /* XXX -ENOMEM? */ + file->private_data = mfd; + mfd->mfd_file = file; + mfd->mfd_xid = req->rq_xid; + get_random_bytes(&mfd->mfd_servercookie, + sizeof(mfd->mfd_servercookie)); + spin_lock(&med->med_open_lock); + list_add(&mfd->mfd_list, &med->med_open_head); + spin_unlock(&med->med_open_lock); + } + + body->handle.addr = (__u64)(unsigned long)mfd; + body->handle.cookie = mfd->mfd_servercookie; + + out_dput: + l_dput(child); + l_dput(parent); +} + int mds_open(struct mds_update_record *rec, int offset, struct ptlrpc_request *req, struct lustre_handle *child_lockh) { @@ -73,25 +186,7 @@ int mds_open(struct mds_update_record *rec, int offset, int rc = 0, parent_mode, child_mode = LCK_PR, lock_flags, created = 0; ENTRY; -#warning replay of open needs to be redone - /* was this animal open already and the client lost the reply? */ - /* XXX need some way to detect a reopen, to avoid locked list walks */ med = &req->rq_export->exp_mds_data; -#if 0 - spin_lock(&med->med_open_lock); - list_for_each(tmp, &med->med_open_head) { - mfd = list_entry(tmp, typeof(*mfd), mfd_list); - if (!memcmp(&mfd->mfd_clienthandle, &body->handle, - sizeof(mfd->mfd_clienthandle)) && - body->fid1.id == mfd->mfd_file->f_dentry->d_inode->i_ino) { - dchild = mfd->mfd_file->f_dentry; - spin_unlock(&med->med_open_lock); - CERROR("Re opening "LPD64"\n", body->fid1.id); - GOTO(out_pack, rc = 0); - } - } - spin_unlock(&med->med_open_lock); -#endif rep->lock_policy_res1 |= IT_OPEN_LOOKUP; if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK)) { CERROR("test case OBD_FAIL_MDS_OPEN_PACK\n"); @@ -202,11 +297,14 @@ int mds_open(struct mds_update_record *rec, int offset, mntget(mds->mds_vfsmnt); file = dentry_open(dchild, mds->mds_vfsmnt, rec->ur_flags & ~(O_DIRECT | O_TRUNC)); - if (IS_ERR(file)) + if (IS_ERR(file)) { + dchild = NULL; /* prevent a double dput in step 3 */ GOTO(out_step_5, rc = PTR_ERR(file)); + } file->private_data = mfd; mfd->mfd_file = file; + mfd->mfd_xid = req->rq_xid; get_random_bytes(&mfd->mfd_servercookie, sizeof(mfd->mfd_servercookie)); spin_lock(&med->med_open_lock); list_add(&mfd->mfd_list, &med->med_open_head); @@ -226,7 +324,8 @@ int mds_open(struct mds_update_record *rec, int offset, out_step_4: ldlm_lock_decref(child_lockh, child_mode); out_step_3: - l_dput(dchild); + if (dchild) + l_dput(dchild); out_step_2: l_dput(parent); if (rc) { -- 1.8.3.1