From 49e3675b810ee5ac06bda1ec011bf2f09ce20dc2 Mon Sep 17 00:00:00 2001 From: pschwan Date: Sat, 13 Jul 2002 19:03:23 +0000 Subject: [PATCH] - Added match_or_enqueue helper function - fixed userspace build problem in lustre_lib.h - added intent-based lookup code - fixed intent-based setattr - added mds_fid2locked_dentry and mds_name2locked_dentry helpers - don't crash in ptlrpc_reply, just warn of API violation - update create.pl to open instead of mcreate --- lustre/include/linux/lustre_dlm.h | 21 ++++++++++-- lustre/include/linux/lustre_lib.h | 3 +- lustre/include/linux/lustre_lite.h | 3 +- lustre/include/linux/lustre_mds.h | 10 +++++- lustre/ldlm/ldlm_lock.c | 25 +++----------- lustre/ldlm/ldlm_lockd.c | 19 +++++++++++ lustre/ldlm/ldlm_request.c | 34 ++++++++++++++++-- lustre/ldlm/ldlm_resource.c | 2 +- lustre/llite/dcache.c | 10 ++++-- lustre/llite/file.c | 2 +- lustre/llite/namei.c | 47 ++++++++++++------------- lustre/llite/super.c | 3 +- lustre/mdc/mdc_request.c | 29 ++-------------- lustre/mds/handler.c | 70 ++++++++++++++++++++++++++++++++++++-- lustre/mds/mds_reint.c | 55 ++++++++++++++++++++++++------ lustre/ptlrpc/niobuf.c | 4 ++- lustre/tests/create.pl | 20 +++++++---- 17 files changed, 249 insertions(+), 108 deletions(-) diff --git a/lustre/include/linux/lustre_dlm.h b/lustre/include/linux/lustre_dlm.h index cb446e2..f4d7e3b 100644 --- a/lustre/include/linux/lustre_dlm.h +++ b/lustre/include/linux/lustre_dlm.h @@ -305,11 +305,26 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, void *data, __u32 data_len, struct lustre_handle *lockh); +int ldlm_match_or_enqueue(struct ptlrpc_client *cl, + struct ptlrpc_connection *conn, + struct lustre_handle *connh, + struct ptlrpc_request *req, + struct ldlm_namespace *ns, + struct lustre_handle *parent_lock_handle, + __u64 *res_id, + __u32 type, + void *cookie, int cookielen, + ldlm_mode_t mode, + int *flags, + ldlm_lock_callback callback, + void *data, + __u32 data_len, + struct lustre_handle *lockh); int ldlm_server_ast(struct lustre_handle *lockh, struct ldlm_lock_desc *new, void *data, __u32 data_len); -int ldlm_cli_convert(struct ptlrpc_client *, struct lustre_handle *, struct lustre_handle *connh, - int new_mode, int *flags); -int ldlm_cli_cancel(struct lustre_handle *lockh, struct lustre_handle *connh); +int ldlm_cli_convert(struct ptlrpc_client *, struct lustre_handle *, + struct lustre_handle *connh, int new_mode, int *flags); +int ldlm_cli_cancel(struct lustre_handle *lockh); #endif /* __KERNEL__ */ diff --git a/lustre/include/linux/lustre_lib.h b/lustre/include/linux/lustre_lib.h index 535861d..251e5b2 100644 --- a/lustre/include/linux/lustre_lib.h +++ b/lustre/include/linux/lustre_lib.h @@ -29,10 +29,11 @@ #ifndef __KERNEL__ # include +#else +# include #endif #include -#include #include #ifdef __KERNEL__ diff --git a/lustre/include/linux/lustre_lite.h b/lustre/include/linux/lustre_lite.h index b63dfc9..07c37dc 100644 --- a/lustre/include/linux/lustre_lite.h +++ b/lustre/include/linux/lustre_lite.h @@ -127,7 +127,8 @@ int ll_lock(struct inode *dir, struct dentry *dentry, struct lookup_intent *it, struct lustre_handle *lockh); int ll_unlock(__u32 mode, struct lustre_handle *lockh); - +/* dcache.c */ +void ll_intent_release(struct dentry *de); /* dir.c */ extern struct file_operations ll_dir_operations; diff --git a/lustre/include/linux/lustre_mds.h b/lustre/include/linux/lustre_mds.h index 8e8a59f..e5fbfca 100644 --- a/lustre/include/linux/lustre_mds.h +++ b/lustre/include/linux/lustre_mds.h @@ -131,7 +131,15 @@ void mds_pack_inode2fid(struct ll_fid *fid, struct inode *inode); void mds_pack_inode2body(struct mds_body *body, struct inode *inode); /* mds/handler.c */ -struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid, struct vfsmount **mnt); +struct dentry *mds_name2locked_dentry(struct mds_obd *mds, struct dentry *dir, + struct vfsmount **mnt, char *name, + int namelen, int lock_mode, + struct lustre_handle *lockh); +struct dentry *mds_fid2locked_dentry(struct mds_obd *mds, struct ll_fid *fid, + struct vfsmount **mnt, int lock_mode, + struct lustre_handle *lockh); +struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid, + struct vfsmount **mnt); int mds_lock_callback(struct lustre_handle *lockh, struct ldlm_lock_desc *desc, void *data, int data_len, struct ptlrpc_request **req); int mds_reint(int offset, struct ptlrpc_request *req); diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index dbb7b71..9861e15 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -288,6 +288,8 @@ static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie, case IT_LINK: case IT_OPEN: case IT_RENAME: + case IT_SETATTR: + case IT_LOOKUP: bufcount = 3; break; case IT_UNLINK: @@ -317,23 +319,12 @@ static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie, case IT_CREAT: case IT_CREAT|IT_OPEN: case IT_MKDIR: - case IT_SETATTR: case IT_SYMLINK: case IT_MKNOD: case IT_LINK: case IT_UNLINK: case IT_RMDIR: case IT_RENAME2: - if (mds_reint_p == NULL) - mds_reint_p = - inter_module_get_request - ("mds_reint", "mds"); - if (IS_ERR(mds_reint_p)) { - CERROR("MDSINTENT locks require the MDS " - "module.\n"); - LBUG(); - RETURN(-EINVAL); - } rc = mds_reint_p(2, req); if (rc || req->rq_status != 0) { rep->lock_policy_res2 = req->rq_status; @@ -344,16 +335,8 @@ static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie, case IT_READDIR: case IT_RENAME: case IT_OPEN: - if (mds_getattr_name_p == NULL) - mds_getattr_name_p = - inter_module_get_request - ("mds_getattr_name", "mds"); - if (IS_ERR(mds_getattr_name_p)) { - CERROR("MDSINTENT locks require the MDS " - "module.\n"); - LBUG(); - RETURN(-EINVAL); - } + case IT_SETATTR: + case IT_LOOKUP: rc = mds_getattr_name_p(2, req); /* FIXME: we need to sit down and decide on who should * set req->rq_status, who should return negative and diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index d69128c..e5c35f0 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -391,6 +391,20 @@ static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf) GOTO(out_dec, rc = -ENOMEM); } + if (mds_reint_p == NULL) + mds_reint_p = inter_module_get_request("mds_reint", "mds"); + if (IS_ERR(mds_reint_p)) { + CERROR("MDSINTENT locks require the MDS module.\n"); + GOTO(out_dec, rc = -EINVAL); + } + if (mds_getattr_name_p == NULL) + mds_getattr_name_p = inter_module_get_request + ("mds_getattr_name", "mds"); + if (IS_ERR(mds_getattr_name_p)) { + CERROR("MDSINTENT locks require the MDS module.\n"); + GOTO(out_dec, rc = -EINVAL); + } + for (i = 0; i < LDLM_NUM_THREADS; i++) { rc = ptlrpc_start_thread(obddev, ldlm->ldlm_service, "lustre_dlm"); @@ -407,6 +421,10 @@ out_thread: ptlrpc_stop_all_threads(ldlm->ldlm_service); ptlrpc_unregister_service(ldlm->ldlm_service); out_dec: + if (mds_reint_p != NULL) + inter_module_put("mds_reint"); + if (mds_getattr_name_p != NULL) + inter_module_put("mds_getattr_name"); MOD_DEC_USE_COUNT; return rc; } @@ -478,6 +496,7 @@ EXPORT_SYMBOL(ldlm_lock_decref); EXPORT_SYMBOL(ldlm_cli_convert); EXPORT_SYMBOL(ldlm_cli_enqueue); EXPORT_SYMBOL(ldlm_cli_cancel); +EXPORT_SYMBOL(ldlm_match_or_enqueue); EXPORT_SYMBOL(ldlm_test); EXPORT_SYMBOL(ldlm_lock_dump); EXPORT_SYMBOL(ldlm_namespace_new); diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index 7e47486..f632587 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -145,6 +145,37 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, return rc; } +int ldlm_match_or_enqueue(struct ptlrpc_client *cl, + struct ptlrpc_connection *conn, + struct lustre_handle *connh, + struct ptlrpc_request *req, + struct ldlm_namespace *ns, + struct lustre_handle *parent_lock_handle, + __u64 *res_id, + __u32 type, + void *cookie, int cookielen, + ldlm_mode_t mode, + int *flags, + ldlm_lock_callback callback, + void *data, + __u32 data_len, + struct lustre_handle *lockh) +{ + int rc; + ENTRY; + rc = ldlm_lock_match(ns, res_id, type, cookie, cookielen, mode, lockh); + if (rc == 0) { + rc = ldlm_cli_enqueue(cl, conn, connh, req, ns, + parent_lock_handle, res_id, type, cookie, + cookielen, mode, flags, callback, data, + data_len, lockh); + if (rc != ELDLM_OK) + CERROR("ldlm_cli_enqueue: err: %d\n", rc); + RETURN(rc); + } else + RETURN(0); +} + int ldlm_server_ast(struct lustre_handle *lockh, struct ldlm_lock_desc *desc, void *data, __u32 data_len) { @@ -254,8 +285,7 @@ int ldlm_cli_convert(struct ptlrpc_client *cl, struct lustre_handle *lockh, return rc; } -int ldlm_cli_cancel(struct lustre_handle *lockh, - struct lustre_handle *connh) +int ldlm_cli_cancel(struct lustre_handle *lockh) { struct ptlrpc_request *req; struct ldlm_lock *lock; diff --git a/lustre/ldlm/ldlm_resource.c b/lustre/ldlm/ldlm_resource.c index 374b10c..5a4743e 100644 --- a/lustre/ldlm/ldlm_resource.c +++ b/lustre/ldlm/ldlm_resource.c @@ -79,7 +79,7 @@ static void cleanup_resource(struct ldlm_resource *res, struct list_head *q) struct lustre_handle lockh; ldlm_lock2handle(lock, &lockh); /* can we get away without a connh here? */ - rc = ldlm_cli_cancel(&lockh, NULL); + rc = ldlm_cli_cancel(&lockh); if (rc < 0) { CERROR("ldlm_cli_cancel: %d\n", rc); LBUG(); diff --git a/lustre/llite/dcache.c b/lustre/llite/dcache.c index 5d1c29d..41cb398 100644 --- a/lustre/llite/dcache.c +++ b/lustre/llite/dcache.c @@ -28,10 +28,16 @@ void ll_intent_release(struct dentry *de) EXIT; return; } - if (de->d_it->it_lock_mode) { handle = (struct lustre_handle *)de->d_it->it_lock_handle; - ldlm_lock_decref(handle, de->d_it->it_lock_mode); + if (de->d_it->it_op == IT_SETATTR) { + int rc; + ldlm_lock_decref(handle, de->d_it->it_lock_mode); + rc = ldlm_cli_cancel(handle); + if (rc < 0) + CERROR("ldlm_cli_cancel: %d\n", rc); + } else + ldlm_lock_decref(handle, de->d_it->it_lock_mode); } de->d_it = NULL; EXIT; diff --git a/lustre/llite/file.c b/lustre/llite/file.c index 52db328..81f879d 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -220,7 +220,7 @@ static int ll_lock_callback(struct lustre_handle *lockh, invalidate_inode_pages(inode); up(&inode->i_sem); - if (ldlm_cli_cancel(lockh, NULL) < 0) + if (ldlm_cli_cancel(lockh) < 0) LBUG(); RETURN(0); } diff --git a/lustre/llite/namei.c b/lustre/llite/namei.c index ab9108f..e752e67e 100644 --- a/lustre/llite/namei.c +++ b/lustre/llite/namei.c @@ -98,23 +98,24 @@ int ll_lock(struct inode *dir, struct dentry *dentry, struct lookup_intent *it, struct lustre_handle *lockh) { struct ll_sb_info *sbi = ll_i2sbi(dir); - int err; + int err, lock_mode; if ((it->it_op & (IT_CREAT | IT_MKDIR | IT_SYMLINK | IT_SETATTR | - IT_MKNOD)) ) - err = mdc_enqueue(&sbi->ll_mdc_conn, LDLM_MDSINTENT, - it, LCK_PW, dir, dentry, lockh, 0, NULL, 0, - dir, sizeof(*dir)); + IT_MKNOD))) + lock_mode = LCK_PW; else if (it->it_op & (IT_READDIR | IT_GETATTR | IT_OPEN | IT_UNLINK | IT_RMDIR | IT_RENAME | IT_RENAME2)) - err = mdc_enqueue(&sbi->ll_mdc_conn, LDLM_MDSINTENT, - it, LCK_PR, dir, dentry, lockh, 0, NULL, 0, - dir, sizeof(*dir)); + lock_mode = LCK_PR; + else if (it->it_op & IT_LOOKUP) + lock_mode = LCK_CR; else { LBUG(); RETURN(-1); } + err = mdc_enqueue(&sbi->ll_mdc_conn, LDLM_MDSINTENT, it, lock_mode, dir, + dentry, lockh, 0, NULL, 0, dir, sizeof(*dir)); + RETURN(err); } @@ -136,13 +137,14 @@ static struct dentry *ll_lookup2(struct inode * dir, struct dentry *dentry, struct ll_inode_md md; struct lustre_handle lockh; int err, type, offset; + struct lookup_intent lookup_it = { IT_LOOKUP }; obd_id ino; ENTRY; if (it == NULL) { - LBUG(); - RETURN(NULL); + it = &lookup_it; + dentry->d_it = it; } CDEBUG(D_INFO, "name: %*s, intent op: %d\n", dentry->d_name.len, @@ -160,23 +162,13 @@ static struct dentry *ll_lookup2(struct inode * dir, struct dentry *dentry, memcpy(it->it_lock_handle, &lockh, sizeof(lockh)); if ((it->it_op & (IT_CREAT | IT_MKDIR | IT_SYMLINK | IT_MKNOD)) && - it->it_disposition && !it->it_status) { -#if 0 - if (it->it_data) - CERROR("leaking request %p\n", it->it_data); -#endif + it->it_disposition && !it->it_status) GOTO(negative, NULL); - } - if ((it->it_op & (IT_RENAME | IT_GETATTR | IT_UNLINK | IT_RMDIR)) && - it->it_disposition && it->it_status) { -#if 0 - if (it->it_data) - CERROR("request: %p, status: %d\n", it->it_data, - it->it_status); -#endif + if ((it->it_op & (IT_RENAME | IT_GETATTR | IT_UNLINK | IT_RMDIR | + IT_SETATTR | IT_LOOKUP)) && it->it_disposition && + it->it_status) GOTO(negative, NULL); - } request = (struct ptlrpc_request *)it->it_data; if (!it->it_disposition) { @@ -215,7 +207,9 @@ static struct dentry *ll_lookup2(struct inode * dir, struct dentry *dentry, inode->i_mode = S_IFDIR; inode->i_nlink = 1; GOTO(out_req, 0); - } else if (it->it_op != IT_RENAME2) { + } else if (it->it_op == IT_RENAME2) { + LBUG(); + } else { struct mds_body *body; offset = 1; @@ -251,6 +245,9 @@ static struct dentry *ll_lookup2(struct inode * dir, struct dentry *dentry, negative: dentry->d_op = &ll_d_ops; d_add(dentry, inode); + if (it->it_op == IT_LOOKUP) + ll_intent_release(dentry); + return NULL; } diff --git a/lustre/llite/super.c b/lustre/llite/super.c index b0596e3..4db2384 100644 --- a/lustre/llite/super.c +++ b/lustre/llite/super.c @@ -320,8 +320,7 @@ int ll_inode_setattr(struct inode *inode, struct iattr *attr, int do_trunc) /* change incore inode */ ll_attr2inode(inode, attr, do_trunc); - err = mdc_setattr(&sbi->ll_mdc_conn, inode, attr, - &request); + err = mdc_setattr(&sbi->ll_mdc_conn, inode, attr, &request); if (err) CERROR("mdc_setattr fails (%d)\n", err); diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index a806055..19f3ef8 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -150,7 +150,7 @@ static int mdc_lock_callback(struct lustre_handle *lockh, invalidate_inode_pages(inode); } - rc = ldlm_cli_cancel(lockh, NULL); + rc = ldlm_cli_cancel(lockh); if (rc < 0) { CERROR("ldlm_cli_cancel: %d\n", rc); LBUG(); @@ -179,9 +179,6 @@ int mdc_enqueue(struct lustre_handle *conn, int lock_type, case IT_MKDIR: it->it_mode = (it->it_mode | S_IFDIR) & ~current->fs->umask; break; - case IT_SETATTR: - it->it_op = IT_GETATTR; - break; case (IT_CREAT|IT_OPEN): case IT_CREAT: case IT_MKNOD: @@ -275,7 +272,8 @@ int mdc_enqueue(struct lustre_handle *conn, int lock_type, size[0] = sizeof(struct ldlm_reply); req->rq_replen = lustre_msg_size(1, size); } else if (it->it_op == IT_GETATTR || it->it_op == IT_RENAME || - it->it_op == IT_OPEN) { + it->it_op == IT_OPEN || it->it_op == IT_SETATTR || + it->it_op == IT_LOOKUP) { size[2] = sizeof(struct mds_body); size[3] = de->d_name.len + 1; @@ -297,27 +295,6 @@ int mdc_enqueue(struct lustre_handle *conn, int lock_type, size[1] = sizeof(struct mds_body); size[2] = sizeof(struct obdo); req->rq_replen = lustre_msg_size(3, size); - } else if (it->it_op == IT_SETATTR) { - size[2] = sizeof(struct mds_rec_setattr); - size[3] = de->d_name.len + 1; - - req = ptlrpc_prep_req2(mdc->mdc_ldlm_client, mdc->mdc_conn, - &mdc->mdc_connh, LDLM_ENQUEUE, 4, size, - NULL); - if (!req) - RETURN(-ENOMEM); - - lit = lustre_msg_buf(req->rq_reqmsg, 1); - lit->opc = NTOH__u64((__u64)it->it_op); - - if (!it->it_iattr) - LBUG(); - - mds_setattr_pack(req, 2, dir, it->it_iattr, - de->d_name.name, de->d_name.len); - size[0] = sizeof(struct ldlm_reply); - size[1] = sizeof(struct mds_body); - req->rq_replen = lustre_msg_size(2, size); } else if (it->it_op == IT_READDIR) { req = ptlrpc_prep_req2(mdc->mdc_ldlm_client, mdc->mdc_conn, &mdc->mdc_connh, LDLM_ENQUEUE, 1, size, NULL); diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index 18f342b..cf32175 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -91,6 +91,71 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file, return rc; } +/* 'dir' is a inode for which a lock has already been taken */ +struct dentry *mds_name2locked_dentry(struct mds_obd *mds, struct dentry *dir, + struct vfsmount **mnt, char *name, + int namelen, int lock_mode, + struct lustre_handle *lockh) +{ + struct dentry *dchild; + int flags, rc; + __u64 res_id[3] = {0}; + ENTRY; + + down(&dir->d_inode->i_sem); + dchild = lookup_one_len(name, dir, namelen); + if (IS_ERR(dchild)) { + CERROR("child lookup error %ld\n", PTR_ERR(dchild)); + up(&dir->d_inode->i_sem); + LBUG(); + } + up(&dir->d_inode->i_sem); + + if (lock_mode == 0) + RETURN(dchild); + + res_id[0] = dchild->d_inode->i_ino; + rc = ldlm_match_or_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn, + (struct lustre_handle *)&mds->mds_connh, + NULL, mds->mds_local_namespace, NULL, + res_id, LDLM_PLAIN, NULL, 0, lock_mode, + &flags, (void *)mds_lock_callback, NULL, + 0, lockh); + if (rc != ELDLM_OK) { + l_dput(dchild); + RETURN(NULL); + } + + RETURN(dchild); +} + +struct dentry *mds_fid2locked_dentry(struct mds_obd *mds, struct ll_fid *fid, + struct vfsmount **mnt, int lock_mode, + struct lustre_handle *lockh) +{ + struct dentry *de = mds_fid2dentry(mds, fid, mnt), *retval = de; + int flags, rc; + __u64 res_id[3] = {0}; + ENTRY; + + if (IS_ERR(de)) + RETURN(de); + + res_id[0] = de->d_inode->i_ino; + rc = ldlm_match_or_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn, + (struct lustre_handle *)&mds->mds_connh, + NULL, mds->mds_local_namespace, NULL, + res_id, LDLM_PLAIN, NULL, 0, lock_mode, + &flags, (void *)mds_lock_callback, NULL, + 0, lockh); + if (rc != ELDLM_OK) { + l_dput(de); + retval = NULL; + } + + RETURN(retval); +} + struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid, struct vfsmount **mnt) { @@ -287,7 +352,7 @@ int mds_lock_callback(struct lustre_handle *lockh, struct ldlm_lock_desc *desc, RETURN(0); } - if (ldlm_cli_cancel(lockh, NULL) < 0) + if (ldlm_cli_cancel(lockh) < 0) LBUG(); RETURN(0); } @@ -375,9 +440,8 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req) } /* now a normal case for intent locking */ rc = 0; - } else { + } else rc = -ENOENT; - } EXIT; out_create_dchild: diff --git a/lustre/mds/mds_reint.c b/lustre/mds/mds_reint.c index a25d074..5cc9ac5 100644 --- a/lustre/mds/mds_reint.c +++ b/lustre/mds/mds_reint.c @@ -94,28 +94,58 @@ int mds_update_last_rcvd(struct mds_obd *mds, void *handle, return rc; } +/* In the write-back case, the client holds a lock on a subtree. + * In the intent case, the client holds a lock on the child inode. + * In the pathname case, the client (may) hold a lock on the child inode. */ static int mds_reint_setattr(struct mds_update_record *rec, int offset, struct ptlrpc_request *req) { struct mds_obd *mds = mds_req2mds(req); struct dentry *de; void *handle; - int rc = 0; - int err; + struct lustre_handle child_lockh; + int rc = 0, err; + + if (req->rq_reqmsg->bufcount > offset + 1) { + struct dentry *dir; + struct lustre_handle dir_lockh; + char *name; + int namelen; + /* a name was supplied by the client; fid1 is the directory */ + + name = lustre_msg_buf(req->rq_reqmsg, offset + 1); + namelen = req->rq_reqmsg->buflens[offset + 1] - 1; + dir = mds_fid2locked_dentry(mds, rec->ur_fid1, NULL, LCK_PR, + &dir_lockh); + if (!dir || IS_ERR(dir)) { + l_dput(dir); + LBUG(); + GOTO(out_setattr, rc = -ESTALE); + } - de = mds_fid2dentry(mds, rec->ur_fid1, NULL); - if (IS_ERR(de) || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_SETATTR)) { - GOTO(out_setattr, rc = -ESTALE); + de = mds_name2locked_dentry(mds, dir, NULL, name, namelen, + 0, &child_lockh); + l_dput(dir); + if (!de || IS_ERR(de)) { + LBUG(); + GOTO(out_setattr_de, rc = -ESTALE); + } + } else { + de = mds_fid2dentry(mds, rec->ur_fid1, NULL); + if (!de || IS_ERR(de)) { + LBUG(); + GOTO(out_setattr_de, rc = -ESTALE); + } } - CDEBUG(D_INODE, "ino %ld\n", de->d_inode->i_ino); OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_SETATTR_WRITE, de->d_inode->i_sb->s_dev); + lock_kernel(); handle = mds_fs_start(mds, de->d_inode, MDS_FSOP_SETATTR); if (!handle) - GOTO(out_setattr_de, rc = PTR_ERR(handle)); + GOTO(out_unlock, rc = PTR_ERR(handle)); rc = mds_fs_setattr(mds, de, handle, &rec->ur_iattr); if (!rc) @@ -127,10 +157,13 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset, if (!rc) rc = err; } + EXIT; -out_setattr_de: + out_unlock: + unlock_kernel(); + out_setattr_de: l_dput(de); -out_setattr: + out_setattr: req->rq_status = rc; return(0); } @@ -533,7 +566,7 @@ out_unlink_de: if (!rc) { ldlm_lock_decref(&lockh, LCK_EX); - rc = ldlm_cli_cancel(&lockh, NULL); + rc = ldlm_cli_cancel(&lockh); if (rc < 0) CERROR("failed to cancel child inode lock ino " "%Ld: %d\n", res_id[0], rc); @@ -737,7 +770,7 @@ out_rename_deold: if (!rc) { ldlm_lock_decref(&oldhandle, LCK_EX); - rc = ldlm_cli_cancel(&oldhandle, NULL); + rc = ldlm_cli_cancel(&oldhandle); if (rc < 0) CERROR("failed to cancel child inode lock ino " "%Ld: %d\n", res_id[0], rc); diff --git a/lustre/ptlrpc/niobuf.c b/lustre/ptlrpc/niobuf.c index 201b4d8..a72d14e 100644 --- a/lustre/ptlrpc/niobuf.c +++ b/lustre/ptlrpc/niobuf.c @@ -215,9 +215,11 @@ int ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *desc) int ptlrpc_reply(struct ptlrpc_service *svc, struct ptlrpc_request *req) { - if (req->rq_repmsg == NULL) + if (req->rq_repmsg == NULL) { CERROR("bad: someone called ptlrpc_reply when they meant " "ptlrpc_error\n"); + return -EINVAL; + } /* FIXME: we need to increment the count of handled events */ req->rq_type = PTL_RPC_TYPE_REPLY; diff --git a/lustre/tests/create.pl b/lustre/tests/create.pl index f5e6d9b7..debdcd1 100644 --- a/lustre/tests/create.pl +++ b/lustre/tests/create.pl @@ -7,6 +7,16 @@ sub usage () { exit; } +sub unused() { + my $tmp = `./mcreate $path`; + if ($tmp) { + $tmp =~ /.*error: (.*)\n/; + print "Created $path: $1\n"; + } else { + print "Created $path: Success\n"; + } +} + my $mtpt = shift || usage(); my $mount_count = shift || usage(); my $i = shift || usage(); @@ -16,13 +26,9 @@ while ($i--) { $which = int(rand() * $mount_count) + 1; $d = int(rand() * $files); $path = "$mtpt$which/$d"; - my $tmp = `./mcreate $path`; - if ($tmp) { - $tmp =~ /.*error: (.*)\n/; - print "Created $path: $1\n"; - } else { - print "Created $path: Success\n"; - } + open(FH, ">$path") || die "open($PATH): $!"; + print "Opened $path: Success\n"; + close(FH) || die; $which = int(rand() * $mount_count) + 1; $d = int(rand() * $files); -- 1.8.3.1