X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;ds=sidebyside;f=lustre%2Fmds%2Fmds_open.c;h=50ca59279e3c46b81f5888d182ea213d07c5fe79;hb=040033cef24c5aca2967daf2da7a862abcd074cf;hp=2f6538424552950e1bb8066edf9e24bfeebd6342;hpb=93acd158c57c4c5d0fc751d46741231490c04707;p=fs%2Flustre-release.git diff --git a/lustre/mds/mds_open.c b/lustre/mds/mds_open.c index 2f65384..50ca592 100644 --- a/lustre/mds/mds_open.c +++ b/lustre/mds/mds_open.c @@ -1,10 +1,7 @@ /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * - * lustre/mds/handler.c - * Lustre Metadata Server (mds) request handler - * - * Copyright (c) 2001, 2002 Cluster File Systems, Inc. + * Copyright (c) 2003 Cluster File Systems, Inc. * Author: Peter Braam * Author: Andreas Dilger * Author: Phil Schwan @@ -35,10 +32,11 @@ #include #include #include -#include #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0)) #include #include +#else +#include #endif #include #include @@ -47,9 +45,8 @@ extern kmem_cache_t *mds_file_cache; extern inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req); -extern void mds_start_transno(struct mds_obd *mds); -extern int mds_finish_transno(struct mds_obd *mds, void *handle, - struct ptlrpc_request *req, int rc); +int mds_finish_transno(struct mds_obd *mds, struct inode *i, void *handle, + struct ptlrpc_request *req, int rc, __u32 op_data); extern int enqueue_ordered_locks(int lock_mode, struct obd_device *obd, struct ldlm_res_id *p1_res_id, struct ldlm_res_id *p2_res_id, @@ -60,6 +57,142 @@ extern int enqueue_ordered_locks(int lock_mode, struct obd_device *obd, struct lustre_handle *c1_lockh, struct lustre_handle *c2_lockh); +void reconstruct_open(struct mds_update_record *rec, struct ptlrpc_request *req, + struct lustre_handle *child_lockh) +{ + struct mds_export_data *med = &req->rq_export->exp_mds_data; + struct mds_client_data *mcd = med->med_mcd; + struct mds_obd *mds = mds_req2mds(req); + struct mds_file_data *mfd; + struct obd_device *obd = req->rq_export->exp_obd; + struct dentry *parent, *child; + struct ldlm_reply *rep = lustre_msg_buf(req->rq_repmsg, 0); + struct mds_body *body = lustre_msg_buf(req->rq_repmsg, 1); + int disp, rc; + ENTRY; + + ENTRY; + + /* copy rc, transno and disp; steal locks */ + req->rq_transno = mcd->mcd_last_transno; + req->rq_status = mcd->mcd_last_result; + disp = rep->lock_policy_res1 = mcd->mcd_last_data; + + if (med->med_outstanding_reply) + mds_steal_ack_locks(med, req); + + /* We never care about these. */ + disp &= ~(IT_OPEN_LOOKUP | IT_OPEN_POS | IT_OPEN_NEG); + if (!disp) { + EXIT; + return; /* error looking up parent or child */ + } + + parent = mds_fid2dentry(mds, rec->ur_fid1, NULL); + LASSERT(!IS_ERR(parent)); + + child = lookup_one_len(lustre_msg_buf(req->rq_reqmsg, 3), + parent, req->rq_reqmsg->buflens[3] - 1); + LASSERT(!IS_ERR(child)); + + if (!child->d_inode) { + GOTO(out_dput, 0); /* child not present to open */ + } + + /* At this point, we know we have a child, which means that we'll send + * it back _unless_ it was open failed, _and_ we didn't create the file. + * I love you guys. No, really. + */ + if (((disp & (IT_OPEN_OPEN | IT_OPEN_CREATE)) == IT_OPEN_OPEN) && + req->rq_status) { + GOTO(out_dput, 0); + } + + if (!med->med_outstanding_reply) { + LBUG(); /* XXX need to get enqueue client lock */ + } + + /* get lock (write for O_CREAT, read otherwise) */ + + mds_pack_inode2fid(&body->fid1, child->d_inode); + mds_pack_inode2body(body, child->d_inode); + if (S_ISREG(child->d_inode->i_mode)) { + rc = mds_pack_md(obd, req->rq_repmsg, 2, body, + child->d_inode); + if (rc) + LASSERT(rc == req->rq_status); + } else { + /* XXX need to check this case */ + } + + /* If we're opening a file without an EA, change to a write + lock (unless we already have one). */ + + /* If we have -EEXIST as the status, and we were asked to create + * exclusively, we can tell we failed because the file already existed. + */ + if (req->rq_status == -EEXIST && + ((rec->ur_flags & (O_CREAT | O_EXCL)) == (O_CREAT | O_EXCL))) { + GOTO(out_dput, 0); + } + + /* If we didn't get as far as trying to open, then some locking thing + * probably went wrong, and we'll just bail here. + */ + if ((disp & IT_OPEN_OPEN) == 0) { + GOTO(out_dput, 0); + } + + /* If we failed, then we must have failed opening, so don't look for + * file descriptor or anything, just give the client the bad news. + */ + if (req->rq_status) { + GOTO(out_dput, 0); + } + + if (med->med_outstanding_reply) { + struct list_head *t; + mfd = NULL; + /* XXX can we just look in the old reply to find the handle in + * XXX O(1) here? */ + list_for_each(t, &med->med_open_head) { + mfd = list_entry(t, struct mds_file_data, mfd_list); + if (mfd->mfd_xid == req->rq_xid) + break; + mfd = NULL; + } + /* if we're not recovering, it had better be found */ + LASSERT(mfd); + } else { + struct file *file; + mfd = kmem_cache_alloc(mds_file_cache, GFP_KERNEL); + if (!mfd) { + CERROR("mds: out of memory\n"); + GOTO(out_dput, req->rq_status = -ENOMEM); + } + mntget(mds->mds_vfsmnt); + file = dentry_open(child, mds->mds_vfsmnt, + rec->ur_flags & ~(O_DIRECT | O_TRUNC)); + LASSERT(!IS_ERR(file)); /* XXX -ENOMEM? */ + file->private_data = mfd; + mfd->mfd_file = file; + mfd->mfd_xid = req->rq_xid; + get_random_bytes(&mfd->mfd_servercookie, + sizeof(mfd->mfd_servercookie)); + spin_lock(&med->med_open_lock); + list_add(&mfd->mfd_list, &med->med_open_head); + spin_unlock(&med->med_open_lock); + } + + body->handle.addr = (__u64)(unsigned long)mfd; + body->handle.cookie = mfd->mfd_servercookie; + + out_dput: + l_dput(child); + l_dput(parent); + EXIT; +} + int mds_open(struct mds_update_record *rec, int offset, struct ptlrpc_request *req, struct lustre_handle *child_lockh) { @@ -68,33 +201,19 @@ int mds_open(struct mds_update_record *rec, int offset, struct ldlm_reply *rep = lustre_msg_buf(req->rq_repmsg, 0); struct file *file; struct mds_body *body = lustre_msg_buf(req->rq_repmsg, 1); - struct dentry *dchild, *parent; + struct dentry *dchild = NULL, *parent; struct mds_export_data *med; struct mds_file_data *mfd = NULL; struct ldlm_res_id child_res_id = { .name = {0} }; struct lustre_handle parent_lockh; int rc = 0, parent_mode, child_mode = LCK_PR, lock_flags, created = 0; + int cleanup_phase = 0; + void *handle = NULL; ENTRY; -#warning replay of open needs to be redone - /* was this animal open already and the client lost the reply? */ - /* XXX need some way to detect a reopen, to avoid locked list walks */ + MDS_CHECK_RESENT(req, reconstruct_open(rec, req, child_lockh)); + med = &req->rq_export->exp_mds_data; -#if 0 - spin_lock(&med->med_open_lock); - list_for_each(tmp, &med->med_open_head) { - mfd = list_entry(tmp, typeof(*mfd), mfd_list); - if (!memcmp(&mfd->mfd_clienthandle, &body->handle, - sizeof(mfd->mfd_clienthandle)) && - body->fid1.id == mfd->mfd_file->f_dentry->d_inode->i_ino) { - dchild = mfd->mfd_file->f_dentry; - spin_unlock(&med->med_open_lock); - CERROR("Re opening "LPD64"\n", body->fid1.id); - GOTO(out_pack, rc = 0); - } - } - spin_unlock(&med->med_open_lock); -#endif rep->lock_policy_res1 |= IT_OPEN_LOOKUP; if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK)) { CERROR("test case OBD_FAIL_MDS_OPEN_PACK\n"); @@ -109,16 +228,19 @@ int mds_open(struct mds_update_record *rec, int offset, if (IS_ERR(parent)) { rc = PTR_ERR(parent); CERROR("parent lookup error %d\n", rc); - LBUG(); - RETURN(rc); + GOTO(cleanup, rc); } LASSERT(parent->d_inode); + cleanup_phase = 1; /* parent dentry and lock */ + /* Step 2: Lookup the child */ dchild = lookup_one_len(lustre_msg_buf(req->rq_reqmsg, 3), parent, req->rq_reqmsg->buflens[3] - 1); if (IS_ERR(dchild)) - GOTO(out_step_2, rc = PTR_ERR(dchild)); + GOTO(cleanup, rc = PTR_ERR(dchild)); + + cleanup_phase = 2; /* child dentry */ if (dchild->d_inode) rep->lock_policy_res1 |= IT_OPEN_POS; @@ -127,31 +249,24 @@ int mds_open(struct mds_update_record *rec, int offset, /* Step 3: If the child was negative, and we're supposed to, * create it. */ - if ((rec->ur_flags & O_CREAT) && !dchild->d_inode) { - int err; - void *handle; - mds_start_transno(mds); + if (!dchild->d_inode) { + if (!(rec->ur_flags & O_CREAT)) { + /* It's negative and we weren't supposed to create it */ + GOTO(cleanup, rc = -ENOENT); + } + rep->lock_policy_res1 |= IT_OPEN_CREATE; handle = fsfilt_start(obd, parent->d_inode, FSFILT_OP_CREATE); if (IS_ERR(handle)) { rc = PTR_ERR(handle); - mds_finish_transno(mds, handle, req, rc); - GOTO(out_step_3, rc); + handle = NULL; + GOTO(cleanup, rc); } rc = vfs_create(parent->d_inode, dchild, rec->ur_mode); - rc = mds_finish_transno(mds, handle, req, rc); - err = fsfilt_commit(obd, parent->d_inode, handle); - if (rc || err) { - CERROR("error on commit: err = %d\n", err); - if (!rc) - rc = err; - GOTO(out_step_3, rc); - } + if (rc) + GOTO(cleanup, rc); created = 1; child_mode = LCK_PW; - } else if (!dchild->d_inode) { - /* It's negative and we weren't supposed to create it */ - GOTO(out_step_3, rc = -ENOENT); } /* Step 4: It's positive, so lock the child */ @@ -165,30 +280,36 @@ int mds_open(struct mds_update_record *rec, int offset, mds_blocking_ast, NULL, NULL, child_lockh); if (rc != ELDLM_OK) { CERROR("ldlm_cli_enqueue: %d\n", rc); - GOTO(out_step_3, rc = -EIO); + GOTO(cleanup, rc = -EIO); } + cleanup_phase = 3; /* child lock */ + mds_pack_inode2fid(&body->fid1, dchild->d_inode); mds_pack_inode2body(body, dchild->d_inode); if (S_ISREG(dchild->d_inode->i_mode)) { rc = mds_pack_md(obd, req->rq_repmsg, 2, body, dchild->d_inode); if (rc) - GOTO(out_step_4, rc); + GOTO(cleanup, rc); } else { /* If this isn't a regular file, we can't open it. */ - GOTO(out_step_3, rc = 0); /* returns the lock to the client */ + + /* We want to drop the child dentry, because we're not returning + * failure (which would do this for us in step 2), and we're not + * handing it off to the open file in dentry_open. */ + l_dput(dchild); + GOTO(cleanup, rc = 0); /* returns the lock to the client */ } if (!created && (rec->ur_flags & O_CREAT) && (rec->ur_flags & O_EXCL)) { /* File already exists, we didn't just create it, and we * were passed O_EXCL; err-or. */ - GOTO(out_step_3, rc = -EEXIST); // returns a lock to the client + GOTO(cleanup, rc = -EEXIST); // returns a lock to the client } /* If we're opening a file without an EA, the client needs a write * lock. */ - if (child_mode != LCK_PW && S_ISREG(dchild->d_inode->i_mode) && - !(body->valid & OBD_MD_FLEASIZE)) { + if (child_mode != LCK_PW && !(body->valid & OBD_MD_FLEASIZE)) { ldlm_lock_decref(child_lockh, child_mode); child_mode = LCK_PW; goto reacquire; @@ -199,18 +320,23 @@ int mds_open(struct mds_update_record *rec, int offset, mfd = kmem_cache_alloc(mds_file_cache, GFP_KERNEL); if (!mfd) { CERROR("mds: out of memory\n"); - GOTO(out_step_4, req->rq_status = -ENOMEM); + GOTO(cleanup, rc = -ENOMEM); } + cleanup_phase = 4; /* mfd allocated */ + /* dentry_open does a dput(de) and mntput(mds->mds_vfsmnt) on error */ mntget(mds->mds_vfsmnt); - file = dentry_open(dchild,mds->mds_vfsmnt, + file = dentry_open(dchild, mds->mds_vfsmnt, rec->ur_flags & ~(O_DIRECT | O_TRUNC)); - if (IS_ERR(file)) - GOTO(out_step_5, rc = PTR_ERR(file)); + if (IS_ERR(file)) { + dchild = NULL; /* prevent a double dput in step 2 */ + GOTO(cleanup, rc = PTR_ERR(file)); + } file->private_data = mfd; mfd->mfd_file = file; + mfd->mfd_xid = req->rq_xid; get_random_bytes(&mfd->mfd_servercookie, sizeof(mfd->mfd_servercookie)); spin_lock(&med->med_open_lock); list_add(&mfd->mfd_list, &med->med_open_head); @@ -220,19 +346,34 @@ int mds_open(struct mds_update_record *rec, int offset, body->handle.cookie = mfd->mfd_servercookie; CDEBUG(D_INODE, "file %p: mfd %p, cookie "LPX64"\n", mfd->mfd_file, mfd, mfd->mfd_servercookie); - GOTO(out_step_2, rc = 0); /* returns a lock to the client */ + GOTO(cleanup, rc = 0); /* returns a lock to the client */ - out_step_5: - if (mfd != NULL) { - kmem_cache_free(mds_file_cache, mfd); - mfd = NULL; + cleanup: + rc = mds_finish_transno(mds, dchild ? dchild->d_inode : NULL, handle, + req, rc, rep->lock_policy_res1); + switch (cleanup_phase) { + case 4: + if (rc) + kmem_cache_free(mds_file_cache, mfd); + case 3: + /* This is the same logic as in the IT_OPEN part of + * ldlm_intent_policy: if we found the dentry, or we tried to + * open it (meaning that we created, if it wasn't found), then + * we return the lock to the caller and client. */ + if (!(rep->lock_policy_res1 & (IT_OPEN_OPEN | IT_OPEN_POS))) + ldlm_lock_decref(child_lockh, child_mode); + case 2: + if (rc) + l_dput(dchild); + case 1: + l_dput(parent); + if (rc) { + ldlm_lock_decref(&parent_lockh, parent_mode); + } else { + memcpy(&req->rq_ack_locks[0].lock, &parent_lockh, + sizeof(parent_lockh)); + req->rq_ack_locks[0].mode = parent_mode; + } } - out_step_4: - ldlm_lock_decref(child_lockh, child_mode); - out_step_3: - l_dput(dchild); - out_step_2: - l_dput(parent); - ldlm_lock_decref(&parent_lockh, parent_mode); RETURN(rc); }