X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fmds%2Fmds_open.c;h=d83e4ee4cfc99d2a352fbb2a9dfa51b658e405f8;hb=96ec6856f91f7f9031cfce4273c714d72cfe59ae;hp=50ca59279e3c46b81f5888d182ea213d07c5fe79;hpb=0bcac33c5aec463e9f471fba6f7d0adc26e6df9f;p=fs%2Flustre-release.git diff --git a/lustre/mds/mds_open.c b/lustre/mds/mds_open.c index 50ca592..d83e4ee 100644 --- a/lustre/mds/mds_open.c +++ b/lustre/mds/mds_open.c @@ -33,17 +33,18 @@ #include #include #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0)) -#include -#include +# include +# include #else -#include +# include #endif #include #include #include #include -extern kmem_cache_t *mds_file_cache; +#include "mds_internal.h" + extern inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req); int mds_finish_transno(struct mds_obd *mds, struct inode *i, void *handle, struct ptlrpc_request *req, int rc, __u32 op_data); @@ -57,7 +58,53 @@ extern int enqueue_ordered_locks(int lock_mode, struct obd_device *obd, struct lustre_handle *c1_lockh, struct lustre_handle *c2_lockh); -void reconstruct_open(struct mds_update_record *rec, struct ptlrpc_request *req, +struct mds_file_data *mds_dentry_open(struct dentry *dentry, + struct vfsmount *mnt, + int flags, + struct ptlrpc_request *req) +{ + struct mds_export_data *med = &req->rq_export->exp_mds_data; + struct inode *inode; + int mode; + struct mds_file_data *mfd; + int error; + + mfd = mds_mfd_new(); + if (!mfd) { + CERROR("mds: out of memory\n"); + GOTO(cleanup_dentry, error = -ENOMEM); + } + + mode = (flags+1) & O_ACCMODE; + inode = dentry->d_inode; + + if (mode & FMODE_WRITE) { + error = get_write_access(inode); + if (error) + goto cleanup_mfd; + } + + mfd->mfd_mode = mode; + mfd->mfd_dentry = dentry; + mfd->mfd_xid = req->rq_xid; + + spin_lock(&med->med_open_lock); + list_add(&mfd->mfd_list, &med->med_open_head); + spin_unlock(&med->med_open_lock); + mds_mfd_put(mfd); + return mfd; + +cleanup_mfd: + mds_mfd_put(mfd); + mds_mfd_destroy(mfd); +cleanup_dentry: + dput(dentry); + mntput(mnt); + return ERR_PTR(error); +} + +void reconstruct_open(struct mds_update_record *rec, int offset, + struct ptlrpc_request *req, struct lustre_handle *child_lockh) { struct mds_export_data *med = &req->rq_export->exp_mds_data; @@ -66,21 +113,23 @@ void reconstruct_open(struct mds_update_record *rec, struct ptlrpc_request *req, struct mds_file_data *mfd; struct obd_device *obd = req->rq_export->exp_obd; struct dentry *parent, *child; - struct ldlm_reply *rep = lustre_msg_buf(req->rq_repmsg, 0); - struct mds_body *body = lustre_msg_buf(req->rq_repmsg, 1); + struct ldlm_reply *rep; + struct mds_body *body; int disp, rc; ENTRY; - ENTRY; + LASSERT(offset == 2); /* only called via intent */ + rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep)); + body = lustre_msg_buf(req->rq_repmsg, 1, sizeof (*body)); /* copy rc, transno and disp; steal locks */ req->rq_transno = mcd->mcd_last_transno; req->rq_status = mcd->mcd_last_result; disp = rep->lock_policy_res1 = mcd->mcd_last_data; - - if (med->med_outstanding_reply) - mds_steal_ack_locks(med, req); - + + if (req->rq_export->exp_outstanding_reply) + mds_steal_ack_locks(req->rq_export, req); + /* We never care about these. */ disp &= ~(IT_OPEN_LOOKUP | IT_OPEN_POS | IT_OPEN_NEG); if (!disp) { @@ -91,10 +140,9 @@ void reconstruct_open(struct mds_update_record *rec, struct ptlrpc_request *req, parent = mds_fid2dentry(mds, rec->ur_fid1, NULL); LASSERT(!IS_ERR(parent)); - child = lookup_one_len(lustre_msg_buf(req->rq_reqmsg, 3), - parent, req->rq_reqmsg->buflens[3] - 1); + child = ll_lookup_one_len(rec->ur_name, parent, rec->ur_namelen - 1); LASSERT(!IS_ERR(child)); - + if (!child->d_inode) { GOTO(out_dput, 0); /* child not present to open */ } @@ -108,12 +156,8 @@ void reconstruct_open(struct mds_update_record *rec, struct ptlrpc_request *req, GOTO(out_dput, 0); } - if (!med->med_outstanding_reply) { - LBUG(); /* XXX need to get enqueue client lock */ - } - /* get lock (write for O_CREAT, read otherwise) */ - + mds_pack_inode2fid(&body->fid1, child->d_inode); mds_pack_inode2body(body, child->d_inode); if (S_ISREG(child->d_inode->i_mode)) { @@ -127,7 +171,7 @@ void reconstruct_open(struct mds_update_record *rec, struct ptlrpc_request *req, /* If we're opening a file without an EA, change to a write lock (unless we already have one). */ - + /* If we have -EEXIST as the status, and we were asked to create * exclusively, we can tell we failed because the file already existed. */ @@ -150,7 +194,7 @@ void reconstruct_open(struct mds_update_record *rec, struct ptlrpc_request *req, GOTO(out_dput, 0); } - if (med->med_outstanding_reply) { + if (req->rq_export->exp_outstanding_reply) { struct list_head *t; mfd = NULL; /* XXX can we just look in the old reply to find the handle in @@ -164,28 +208,16 @@ void reconstruct_open(struct mds_update_record *rec, struct ptlrpc_request *req, /* if we're not recovering, it had better be found */ LASSERT(mfd); } else { - struct file *file; - mfd = kmem_cache_alloc(mds_file_cache, GFP_KERNEL); + mntget(mds->mds_vfsmnt); + mfd = mds_dentry_open(child, mds->mds_vfsmnt, + rec->ur_flags & ~(O_DIRECT | O_TRUNC), req); if (!mfd) { CERROR("mds: out of memory\n"); GOTO(out_dput, req->rq_status = -ENOMEM); } - mntget(mds->mds_vfsmnt); - file = dentry_open(child, mds->mds_vfsmnt, - rec->ur_flags & ~(O_DIRECT | O_TRUNC)); - LASSERT(!IS_ERR(file)); /* XXX -ENOMEM? */ - file->private_data = mfd; - mfd->mfd_file = file; - mfd->mfd_xid = req->rq_xid; - get_random_bytes(&mfd->mfd_servercookie, - sizeof(mfd->mfd_servercookie)); - spin_lock(&med->med_open_lock); - list_add(&mfd->mfd_list, &med->med_open_head); - spin_unlock(&med->med_open_lock); } - - body->handle.addr = (__u64)(unsigned long)mfd; - body->handle.cookie = mfd->mfd_servercookie; + + body->handle.cookie = mfd->mfd_handle.h_cookie; out_dput: l_dput(child); @@ -196,11 +228,13 @@ void reconstruct_open(struct mds_update_record *rec, struct ptlrpc_request *req, int mds_open(struct mds_update_record *rec, int offset, struct ptlrpc_request *req, struct lustre_handle *child_lockh) { + static const char acc_table [] = {[O_RDONLY] MAY_READ, + [O_WRONLY] MAY_WRITE, + [O_RDWR] MAY_READ | MAY_WRITE}; struct mds_obd *mds = mds_req2mds(req); struct obd_device *obd = req->rq_export->exp_obd; - struct ldlm_reply *rep = lustre_msg_buf(req->rq_repmsg, 0); - struct file *file; - struct mds_body *body = lustre_msg_buf(req->rq_repmsg, 1); + struct ldlm_reply *rep; + struct mds_body *body; struct dentry *dchild = NULL, *parent; struct mds_export_data *med; struct mds_file_data *mfd = NULL; @@ -209,9 +243,14 @@ int mds_open(struct mds_update_record *rec, int offset, int rc = 0, parent_mode, child_mode = LCK_PR, lock_flags, created = 0; int cleanup_phase = 0; void *handle = NULL; + int acc_mode; ENTRY; - MDS_CHECK_RESENT(req, reconstruct_open(rec, req, child_lockh)); + LASSERT(offset == 2); /* only called via intent */ + rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep)); + body = lustre_msg_buf(req->rq_repmsg, 1, sizeof (*body)); + + MDS_CHECK_RESENT(req, reconstruct_open(rec, offset, req, child_lockh)); med = &req->rq_export->exp_mds_data; rep->lock_policy_res1 |= IT_OPEN_LOOKUP; @@ -221,6 +260,12 @@ int mds_open(struct mds_update_record *rec, int offset, RETURN(-ENOMEM); } + if ((rec->ur_flags & O_ACCMODE) >= sizeof (acc_table)) + RETURN(-EINVAL); + acc_mode = acc_table [rec->ur_flags & O_ACCMODE]; + if ((rec->ur_flags & O_TRUNC) != 0) + acc_mode |= MAY_WRITE; + /* Step 1: Find and lock the parent */ parent_mode = (rec->ur_flags & O_CREAT) ? LCK_PW : LCK_PR; parent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, parent_mode, @@ -235,8 +280,7 @@ int mds_open(struct mds_update_record *rec, int offset, cleanup_phase = 1; /* parent dentry and lock */ /* Step 2: Lookup the child */ - dchild = lookup_one_len(lustre_msg_buf(req->rq_reqmsg, 3), - parent, req->rq_reqmsg->buflens[3] - 1); + dchild = ll_lookup_one_len(rec->ur_name, parent, rec->ur_namelen - 1); if (IS_ERR(dchild)) GOTO(cleanup, rc = PTR_ERR(dchild)); @@ -267,6 +311,7 @@ int mds_open(struct mds_update_record *rec, int offset, GOTO(cleanup, rc); created = 1; child_mode = LCK_PW; + acc_mode = 0; /* Don't check for permissions */ } /* Step 4: It's positive, so lock the child */ @@ -277,7 +322,7 @@ int mds_open(struct mds_update_record *rec, int offset, rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL, child_res_id, LDLM_PLAIN, NULL, 0, child_mode, &lock_flags, ldlm_completion_ast, - mds_blocking_ast, NULL, NULL, child_lockh); + mds_blocking_ast, NULL, child_lockh); if (rc != ELDLM_OK) { CERROR("ldlm_cli_enqueue: %d\n", rc); GOTO(cleanup, rc = -EIO); @@ -287,21 +332,32 @@ int mds_open(struct mds_update_record *rec, int offset, mds_pack_inode2fid(&body->fid1, dchild->d_inode); mds_pack_inode2body(body, dchild->d_inode); + if (S_ISREG(dchild->d_inode->i_mode)) { + /* Check permissions etc */ + rc = permission(dchild->d_inode, acc_mode); + if (rc != 0) + GOTO(cleanup, rc); + + /* Can't write to a read-only file */ + if (IS_RDONLY(dchild->d_inode) && (acc_mode & MAY_WRITE) != 0) + GOTO(cleanup, rc = -EPERM); + + /* An append-only file must be opened in append mode for + * writing */ + if (IS_APPEND(dchild->d_inode) && + (acc_mode & MAY_WRITE) != 0 && + ((rec->ur_flags & O_APPEND) == 0 || + (rec->ur_flags & O_TRUNC) != 0)) + GOTO (cleanup, rc = -EPERM); + rc = mds_pack_md(obd, req->rq_repmsg, 2, body, dchild->d_inode); if (rc) GOTO(cleanup, rc); - } else { - /* If this isn't a regular file, we can't open it. */ - - /* We want to drop the child dentry, because we're not returning - * failure (which would do this for us in step 2), and we're not - * handing it off to the open file in dentry_open. */ - l_dput(dchild); - GOTO(cleanup, rc = 0); /* returns the lock to the client */ } - if (!created && (rec->ur_flags & O_CREAT) && (rec->ur_flags & O_EXCL)) { + if (!created && (rec->ur_flags & O_CREAT) && + (rec->ur_flags & O_EXCL)) { /* File already exists, we didn't just create it, and we * were passed O_EXCL; err-or. */ GOTO(cleanup, rc = -EEXIST); // returns a lock to the client @@ -309,43 +365,33 @@ int mds_open(struct mds_update_record *rec, int offset, /* If we're opening a file without an EA, the client needs a write * lock. */ - if (child_mode != LCK_PW && !(body->valid & OBD_MD_FLEASIZE)) { + if (S_ISREG(dchild->d_inode->i_mode) && + child_mode != LCK_PW && !(body->valid & OBD_MD_FLEASIZE)) { ldlm_lock_decref(child_lockh, child_mode); child_mode = LCK_PW; goto reacquire; } - /* Step 5: Open it */ + /* if we are following a symlink, don't open */ + if (S_ISLNK(dchild->d_inode->i_mode)) + GOTO(cleanup, rc = 0); + + /* Step 5: mds_open it */ rep->lock_policy_res1 |= IT_OPEN_OPEN; - mfd = kmem_cache_alloc(mds_file_cache, GFP_KERNEL); + + /* dentry_open does a dput(de) and mntput(mds->mds_vfsmnt) on error */ + mfd = mds_dentry_open(dchild, mds->mds_vfsmnt, + rec->ur_flags & ~(O_DIRECT | O_TRUNC), req); if (!mfd) { CERROR("mds: out of memory\n"); + dchild = NULL; /* prevent a double dput in step 2 */ GOTO(cleanup, rc = -ENOMEM); } cleanup_phase = 4; /* mfd allocated */ - - /* dentry_open does a dput(de) and mntput(mds->mds_vfsmnt) on error */ - mntget(mds->mds_vfsmnt); - file = dentry_open(dchild, mds->mds_vfsmnt, - rec->ur_flags & ~(O_DIRECT | O_TRUNC)); - if (IS_ERR(file)) { - dchild = NULL; /* prevent a double dput in step 2 */ - GOTO(cleanup, rc = PTR_ERR(file)); - } - - file->private_data = mfd; - mfd->mfd_file = file; - mfd->mfd_xid = req->rq_xid; - get_random_bytes(&mfd->mfd_servercookie, sizeof(mfd->mfd_servercookie)); - spin_lock(&med->med_open_lock); - list_add(&mfd->mfd_list, &med->med_open_head); - spin_unlock(&med->med_open_lock); - - body->handle.addr = (__u64)(unsigned long)mfd; - body->handle.cookie = mfd->mfd_servercookie; - CDEBUG(D_INODE, "file %p: mfd %p, cookie "LPX64"\n", - mfd->mfd_file, mfd, mfd->mfd_servercookie); + body->handle.cookie = mfd->mfd_handle.h_cookie; + CDEBUG(D_INODE, "mfd %p, cookie "LPX64"\n", mfd, + mfd->mfd_handle.h_cookie); GOTO(cleanup, rc = 0); /* returns a lock to the client */ cleanup: @@ -353,18 +399,18 @@ int mds_open(struct mds_update_record *rec, int offset, req, rc, rep->lock_policy_res1); switch (cleanup_phase) { case 4: - if (rc) - kmem_cache_free(mds_file_cache, mfd); + if (rc && !S_ISLNK(dchild->d_inode->i_mode)) + mds_mfd_destroy(mfd); case 3: - /* This is the same logic as in the IT_OPEN part of + /* This is the same logic as in the IT_OPEN part of * ldlm_intent_policy: if we found the dentry, or we tried to * open it (meaning that we created, if it wasn't found), then * we return the lock to the caller and client. */ if (!(rep->lock_policy_res1 & (IT_OPEN_OPEN | IT_OPEN_POS))) ldlm_lock_decref(child_lockh, child_mode); case 2: - if (rc) - l_dput(dchild); + if (rc || S_ISLNK(dchild->d_inode->i_mode)) + l_dput(dchild); case 1: l_dput(parent); if (rc) {