From 8fd7009502daf2507da5e3349fb90f00df3f2bf3 Mon Sep 17 00:00:00 2001 From: pschwan Date: Tue, 29 Oct 2002 05:27:13 +0000 Subject: [PATCH] b=256 abstract ll_lookup2; make ll_lookup2 and ll_revalidate2 both use this function. For the first time, we might almost be properly cache coherant. --- lustre/ChangeLog | 1 + lustre/include/linux/lustre_lite.h | 4 + lustre/llite/dcache.c | 28 ++++- lustre/llite/dir.c | 8 +- lustre/llite/namei.c | 235 ++++++++++++++++++++++++------------- lustre/tests/lov.xml | 18 +-- 6 files changed, 197 insertions(+), 97 deletions(-) diff --git a/lustre/ChangeLog b/lustre/ChangeLog index 53f9548..bae803f 100644 --- a/lustre/ChangeLog +++ b/lustre/ChangeLog @@ -2,6 +2,7 @@ TBA * version v0_5_16 * bug fixes: - limit Lustre IOVs to PTL_MD_MAX_IOV (611336) + - abstract ll_lookup2, fix ll_revalidate2 to use abstraction (256) * protocol change to lustre_msg: move |version| and add |flags| * added replay of create, unlink, link and rename operations during MDS failover; recovery should be much more robust now diff --git a/lustre/include/linux/lustre_lite.h b/lustre/include/linux/lustre_lite.h index 825b53d..13c1e5d 100644 --- a/lustre/include/linux/lustre_lite.h +++ b/lustre/include/linux/lustre_lite.h @@ -167,6 +167,8 @@ int ll_save_intent(struct dentry * de, struct lookup_intent * it); struct lookup_intent * ll_get_intent(struct dentry * de); ****/ +#define IT_RELEASED_MAGIC 0xDEADCAFE + #define LL_SAVE_INTENT(de, it) \ do { \ LASSERT(ll_d2d(de) != NULL); \ @@ -183,10 +185,12 @@ do { \ it = de->d_it; \ \ LASSERT(ll_d2d(de) != NULL); \ + LASSERT(it->it_op != IT_RELEASED_MAGIC); \ \ CDEBUG(D_DENTRY, "D_IT UP dentry %p fsdata %p intent: %s\n", \ de, ll_d2d(de), ldlm_it2str(de->d_it->it_op)); \ de->d_it = NULL; \ + it->it_op = IT_RELEASED_MAGIC; \ up(&ll_d2d(de)->lld_it_sem); \ } while(0) diff --git a/lustre/llite/dcache.c b/lustre/llite/dcache.c index 02e78ae..0990504 100644 --- a/lustre/llite/dcache.c +++ b/lustre/llite/dcache.c @@ -63,12 +63,37 @@ void ll_intent_release(struct dentry *de, struct lookup_intent *it) } else ldlm_lock_decref(handle, it->it_lock_mode); } - //up(&ll_d2d(de)->lld_it_sem); + + if (it->it_op != IT_RELEASED_MAGIC) { + up(&ll_d2d(de)->lld_it_sem); + it->it_op = IT_RELEASED_MAGIC; + } EXIT; } int ll_revalidate2(struct dentry *de, int flags, struct lookup_intent *it) { + int ll_intent_lock(struct inode *parent, struct dentry *dentry, + struct lookup_intent *it, void *); + int rc; + ENTRY; + + rc = ll_intent_lock(de->d_parent->d_inode, de, it, NULL); + if (rc < 0) { + /* Something bad happened; overwrite it_status? */ + } + + if (it != NULL && it->it_status == 0) { + LL_SAVE_INTENT(de, it); + } else if (it != NULL) { + de->d_it = NULL; + CDEBUG(D_DENTRY, + "D_IT dentry %p fsdata %p intent: %s status %d\n", + de, ll_d2d(de), ldlm_it2str(it->it_op), it->it_status); + } + + RETURN(1); +#if 0 struct ll_sb_info *sbi = ll_s2sbi(de->d_sb); struct lustre_handle lockh; __u64 res_id[RES_NAME_SIZE] = {0}; @@ -125,6 +150,7 @@ int ll_revalidate2(struct dentry *de, int flags, struct lookup_intent *it) de->d_it = it; RETURN(rc); +#endif } int ll_set_dd(struct dentry *de) diff --git a/lustre/llite/dir.c b/lustre/llite/dir.c index d824376..32a4e32 100644 --- a/lustre/llite/dir.c +++ b/lustre/llite/dir.c @@ -71,7 +71,7 @@ static int ll_dir_readpage(struct file *file, struct page *page) struct ptlrpc_request *request; struct lustre_handle lockh; struct mds_body *body; - struct lookup_intent it = {IT_READDIR}; + struct lookup_intent it = { .it_op = IT_READDIR }; ENTRY; @@ -81,9 +81,11 @@ static int ll_dir_readpage(struct file *file, struct page *page) GOTO(readpage_out, rc); } - rc = ll_lock(inode, NULL, &it, &lockh); + rc = mdc_enqueue(&sbi->ll_mdc_conn, LDLM_MDSINTENT, &it, LCK_PR, + inode, NULL, &lockh, NULL, 0, inode, sizeof(*inode)); request = (struct ptlrpc_request *)it.it_data; - ptlrpc_req_finished(request); + if (request) + ptlrpc_req_finished(request); if (rc != ELDLM_OK) { CERROR("lock enqueue: err: %d\n", rc); unlock_page(page); diff --git a/lustre/llite/namei.c b/lustre/llite/namei.c index 1dcc8f1..4ff4c4e 100644 --- a/lustre/llite/namei.c +++ b/lustre/llite/namei.c @@ -109,38 +109,6 @@ static int ll_test_inode(struct inode *inode, void *opaque) extern struct dentry_operations ll_d_ops; -int ll_lock(struct inode *dir, struct dentry *dentry, - struct lookup_intent *it, struct lustre_handle *lockh) -{ - struct ll_sb_info *sbi = ll_i2sbi(dir); - char *tgt = NULL; - int tgtlen = 0; - int err, lock_mode; - - /* CREAT needs to be tested before open (both could be set) */ - if ((it->it_op & (IT_CREAT | IT_MKDIR | IT_SETATTR | IT_MKNOD))) { - lock_mode = LCK_PW; - } else if (it->it_op & (IT_READDIR | IT_GETATTR | IT_OPEN | IT_UNLINK | - IT_RMDIR | IT_RENAME | IT_RENAME2 | IT_READLINK| - IT_LINK | IT_LINK2 | IT_LOOKUP)) { - /* XXXphil PW for LINK2/RENAME2? */ - lock_mode = LCK_PR; - } else if (it->it_op & IT_SYMLINK) { - lock_mode = LCK_PW; - tgt = it->it_data; - tgtlen = strlen(tgt); - it->it_data = NULL; - } else { - LBUG(); - RETURN(-EINVAL); - } - - err = mdc_enqueue(&sbi->ll_mdc_conn, LDLM_MDSINTENT, it, lock_mode, - dir, dentry, lockh, tgt, tgtlen, dir, sizeof(*dir)); - - RETURN(err); -} - int ll_unlock(__u32 mode, struct lustre_handle *lockh) { ENTRY; @@ -180,16 +148,38 @@ struct inode *ll_iget(struct super_block *sb, ino_t hash, } #endif -static struct dentry *ll_lookup2(struct inode *dir, struct dentry *dentry, - struct lookup_intent *it) +static int ll_intent_to_lock_mode(struct lookup_intent *it) { - struct ptlrpc_request *request = NULL; - struct inode * inode = NULL; - struct ll_sb_info *sbi = ll_i2sbi(dir); - struct ll_read_inode2_cookie lic; + /* CREAT needs to be tested before open (both could be set) */ + if ((it->it_op & (IT_CREAT | IT_MKDIR | IT_SETATTR | IT_MKNOD))) { + return LCK_PW; + } else if (it->it_op & (IT_READDIR | IT_GETATTR | IT_OPEN | IT_UNLINK | + IT_RMDIR | IT_RENAME | IT_RENAME2 | IT_READLINK| + IT_LINK | IT_LINK2 | IT_LOOKUP | IT_SYMLINK)) { + return LCK_PW; + } + + LBUG(); + RETURN(-EINVAL); +} + +#define LL_LOOKUP_POSITIVE 1 +#define LL_LOOKUP_NEGATIVE 2 + +typedef int (*intent_finish_cb)(int flag, struct ptlrpc_request *, + struct dentry *, struct lookup_intent *, + int offset, obd_id ino); + +int ll_intent_lock(struct inode *parent, struct dentry *dentry, + struct lookup_intent *it, + intent_finish_cb intent_finish) +{ + struct ll_sb_info *sbi = ll_i2sbi(parent); struct lustre_handle lockh; - struct lookup_intent lookup_it = { IT_LOOKUP }; - int rc, offset; + struct lookup_intent lookup_it = { .it_op = IT_LOOKUP }; + struct ptlrpc_request *request = NULL; + char *tgt = NULL; + int rc, lock_mode, tgtlen = 0, offset, flag = LL_LOOKUP_POSITIVE; obd_id ino = 0; ENTRY; @@ -201,35 +191,69 @@ static struct dentry *ll_lookup2(struct inode *dir, struct dentry *dentry, dentry->d_name.name, ldlm_it2str(it->it_op)); if (dentry->d_name.len > EXT2_NAME_LEN) - RETURN(ERR_PTR(-ENAMETOOLONG)); + RETURN(-ENAMETOOLONG); - rc = ll_lock(dir, dentry, it, &lockh); + lock_mode = ll_intent_to_lock_mode(it); + if (it->it_op & IT_SYMLINK) { + tgt = it->it_data; + tgtlen = strlen(tgt); + it->it_data = NULL; + } + + rc = mdc_enqueue(&sbi->ll_mdc_conn, LDLM_MDSINTENT, it, lock_mode, + parent, dentry, &lockh, tgt, tgtlen, parent, + sizeof(*parent)); if (rc < 0) - RETURN(ERR_PTR(rc)); + RETURN(rc); memcpy(it->it_lock_handle, &lockh, sizeof(lockh)); request = (struct ptlrpc_request *)it->it_data; if (it->it_disposition) { + struct mds_body *mds_body; int mode, symlen = 0; obd_flag valid; + /* it_disposition == 1 indicates that the server performed the + * intent on our behalf. This long block is all about fixing up + * the local state so that it is correct as of the moment + * _before_ the operation was applied; that way, the VFS will + * think that everything is normal and call Lustre's regular + * FS function. + * + * If we're performing a creation, that means that unless the + * creation failed with EEXIST, we should fake up a negative + * dentry. Likewise for the target of a hard link. + * + * For everything else, we want to lookup to succeed. */ + + /* One additional note: we add an extra reference to the request + * because we need to keep it around until ll_create gets + * called. For anything else which results in + * LL_LOOKUP_POSITIVE, we can do the iget() immediately with the + * contents of the reply (in the intent_finish callback). In + * the create case, however, we need to wait until + * ll_create_node to do the iget() or the VFS will abort with + * -EEXISTS. */ + offset = 1; - lic.lic_body = lustre_msg_buf(request->rq_repmsg, offset); - ino = lic.lic_body->fid1.id; - mode = lic.lic_body->mode; + mds_body = lustre_msg_buf(request->rq_repmsg, offset); + ino = mds_body->fid1.id; + mode = mds_body->mode; if (it->it_op & (IT_CREAT | IT_MKDIR | IT_SYMLINK | IT_MKNOD)) { - mdc_store_create_replay_data(request, dir->i_sb); + mdc_store_create_replay_data(request, parent->i_sb); /* For create ops, we want the lookup to be negative, * unless the create failed in a way that indicates * that the file is already there */ - if (it->it_status != -EEXIST) - GOTO(negative, NULL); + if (it->it_status != -EEXIST) { + atomic_inc(&request->rq_refcount); + GOTO(out, flag = LL_LOOKUP_NEGATIVE); + } } else if (it->it_op & (IT_GETATTR | IT_SETATTR | IT_LOOKUP | IT_READLINK)) { /* For check ops, we want the lookup to succeed */ it->it_data = NULL; if (it->it_status) - GOTO(neg_req, NULL); + GOTO(out, flag = LL_LOOKUP_NEGATIVE); } else if (it->it_op & (IT_RENAME | IT_LINK)) { /* For rename, we want the source lookup to succeed */ if (it->it_status) { @@ -242,25 +266,27 @@ static struct dentry *ll_lookup2(struct inode *dir, struct dentry *dentry, * the file truly doesn't exist */ it->it_data = NULL; if (it->it_status == -ENOENT) - GOTO(neg_req, NULL); - goto iget; + GOTO(out, flag = LL_LOOKUP_NEGATIVE); + GOTO(out, flag = LL_LOOKUP_POSITIVE); } else if (it->it_op == IT_OPEN) { it->it_data = NULL; if (it->it_status && it->it_status != -EEXIST) - GOTO(neg_req, NULL); + GOTO(out, flag = LL_LOOKUP_NEGATIVE); } else if (it->it_op & (IT_RENAME2 | IT_LINK2)) { + struct mds_body *body = + lustre_msg_buf(request->rq_repmsg, offset); it->it_data = NULL; /* This means the target lookup is negative */ - if (lic.lic_body->valid == 0) - GOTO(neg_req, NULL); - goto iget; + if (body->valid == 0) + GOTO(out, flag = LL_LOOKUP_NEGATIVE); + GOTO(out, flag = LL_LOOKUP_POSITIVE); } /* Do a getattr now that we have the lock */ valid = OBD_MD_FLNOTOBD | OBD_MD_FLEASIZE; if (it->it_op == IT_READLINK) { valid |= OBD_MD_LINKNAME; - symlen = lic.lic_body->size; + symlen = mds_body->size; } ptlrpc_req_finished(request); request = NULL; @@ -272,13 +298,17 @@ static struct dentry *ll_lookup2(struct inode *dir, struct dentry *dentry, } offset = 0; } else { - struct ll_inode_info *lli = ll_i2info(dir); + struct ll_inode_info *lli = ll_i2info(parent); int mode; + /* it_disposition == 0 indicates that it just did a simple lock + * request, for which we are very thankful. move along with + * the local lookup then. */ + memcpy(&lli->lli_intent_lock_handle, &lockh, sizeof(lockh)); offset = 0; - ino = ll_inode_by_name(dir, dentry, &mode); + ino = ll_inode_by_name(parent, dentry, &mode); if (!ino) { CERROR("inode %*s not found by name\n", dentry->d_name.len, dentry->d_name.name); @@ -293,28 +323,60 @@ static struct dentry *ll_lookup2(struct inode *dir, struct dentry *dentry, } } - iget: - lic.lic_body = lustre_msg_buf(request->rq_repmsg, offset); - if (S_ISREG(lic.lic_body->mode) && - lic.lic_body->valid & OBD_MD_FLEASIZE) { - LASSERT(request->rq_repmsg->bufcount > offset); - lic.lic_lmm = lustre_msg_buf(request->rq_repmsg, offset + 1); - } else - lic.lic_lmm = NULL; + EXIT; + out: + if (intent_finish != NULL) + rc = intent_finish(flag, request, dentry, it, offset, ino); + else + ptlrpc_req_finished(request); - /* No rpc's happen during iget4, -ENOMEM's are possible */ - LASSERT(ino != 0); - inode = ll_iget(dir->i_sb, ino, &lic); - if (!inode) { - ptlrpc_free_req(request); + if (it->it_op == IT_LOOKUP || rc < 0) ll_intent_release(dentry, it); - RETURN(ERR_PTR(-ENOMEM)); + + return rc; + + drop_req: + ptlrpc_free_req(request); + drop_lock: +#warning FIXME: must release lock here + return rc; +} + +static int lookup2_finish(int flag, struct ptlrpc_request *request, + struct dentry *dentry, struct lookup_intent *it, + int offset, obd_id ino) +{ + struct inode *inode = NULL; + struct ll_read_inode2_cookie lic; + + if (flag == LL_LOOKUP_POSITIVE) { + ENTRY; + lic.lic_body = lustre_msg_buf(request->rq_repmsg, offset); + + if (S_ISREG(lic.lic_body->mode) && + lic.lic_body->valid & OBD_MD_FLEASIZE) { + LASSERT(request->rq_repmsg->bufcount > offset); + lic.lic_lmm = lustre_msg_buf(request->rq_repmsg, + offset + 1); + } else { + lic.lic_lmm = NULL; + } + + /* No rpc's happen during iget4, -ENOMEM's are possible */ + LASSERT(ino != 0); + inode = ll_iget(dentry->d_sb, ino, &lic); + if (!inode) { + /* XXX make sure that request is freed in this case; + * I think it is, but double-check refcounts. -phil */ + RETURN(-ENOMEM); + } + EXIT; + } else { + ENTRY; } - EXIT; - neg_req: ptlrpc_req_finished(request); - negative: + dentry->d_op = &ll_d_ops; if (ll_d2d(dentry) == NULL) { ll_set_dd(dentry); @@ -333,16 +395,21 @@ static struct dentry *ll_lookup2(struct inode *dir, struct dentry *dentry, it->it_status); } - if (it->it_op == IT_LOOKUP) - ll_intent_release(dentry, it); + RETURN(0); +} - return NULL; +static struct dentry *ll_lookup2(struct inode *parent, struct dentry *dentry, + struct lookup_intent *it) +{ + int rc; - drop_req: - ptlrpc_free_req(request); - drop_lock: -#warning FIXME: must release lock here - return ERR_PTR(rc); + rc = ll_intent_lock(parent, dentry, it, lookup2_finish); + if (rc < 0) { + CERROR("ll_intent_lock: %d\n", rc); + return ERR_PTR(rc); + } + + return 0; } static struct inode *ll_create_node(struct inode *dir, const char *name, diff --git a/lustre/tests/lov.xml b/lustre/tests/lov.xml index b768f5d..532c1ec2 100644 --- a/lustre/tests/lov.xml +++ b/lustre/tests/lov.xml @@ -2,7 +2,7 @@ - + @@ -14,7 +14,7 @@ - + localhost 988 @@ -26,17 +26,17 @@ - + - + - + - + extN /tmp/ost1 no @@ -49,16 +49,16 @@ - + extN /tmp/ost2 no - + - + -- 1.8.3.1