void *data,
__u32 data_len,
struct lustre_handle *lockh);
+int ldlm_match_or_enqueue(struct ptlrpc_client *cl,
+ struct ptlrpc_connection *conn,
+ struct lustre_handle *connh,
+ struct ptlrpc_request *req,
+ struct ldlm_namespace *ns,
+ struct lustre_handle *parent_lock_handle,
+ __u64 *res_id,
+ __u32 type,
+ void *cookie, int cookielen,
+ ldlm_mode_t mode,
+ int *flags,
+ ldlm_lock_callback callback,
+ void *data,
+ __u32 data_len,
+ struct lustre_handle *lockh);
int ldlm_server_ast(struct lustre_handle *lockh, struct ldlm_lock_desc *new,
void *data, __u32 data_len);
-int ldlm_cli_convert(struct ptlrpc_client *, struct lustre_handle *, struct lustre_handle *connh,
- int new_mode, int *flags);
-int ldlm_cli_cancel(struct lustre_handle *lockh, struct lustre_handle *connh);
+int ldlm_cli_convert(struct ptlrpc_client *, struct lustre_handle *,
+ struct lustre_handle *connh, int new_mode, int *flags);
+int ldlm_cli_cancel(struct lustre_handle *lockh);
#endif /* __KERNEL__ */
#ifndef __KERNEL__
# include <string.h>
+#else
+# include <asm/semaphore.h>
#endif
#include <linux/portals_lib.h>
-#include <asm/semaphore.h>
#include <linux/lustre_idl.h>
#ifdef __KERNEL__
struct lookup_intent *it, struct lustre_handle *lockh);
int ll_unlock(__u32 mode, struct lustre_handle *lockh);
-
+/* dcache.c */
+void ll_intent_release(struct dentry *de);
/* dir.c */
extern struct file_operations ll_dir_operations;
void mds_pack_inode2body(struct mds_body *body, struct inode *inode);
/* mds/handler.c */
-struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid, struct vfsmount **mnt);
+struct dentry *mds_name2locked_dentry(struct mds_obd *mds, struct dentry *dir,
+ struct vfsmount **mnt, char *name,
+ int namelen, int lock_mode,
+ struct lustre_handle *lockh);
+struct dentry *mds_fid2locked_dentry(struct mds_obd *mds, struct ll_fid *fid,
+ struct vfsmount **mnt, int lock_mode,
+ struct lustre_handle *lockh);
+struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
+ struct vfsmount **mnt);
int mds_lock_callback(struct lustre_handle *lockh, struct ldlm_lock_desc *desc,
void *data, int data_len, struct ptlrpc_request **req);
int mds_reint(int offset, struct ptlrpc_request *req);
case IT_LINK:
case IT_OPEN:
case IT_RENAME:
+ case IT_SETATTR:
+ case IT_LOOKUP:
bufcount = 3;
break;
case IT_UNLINK:
case IT_CREAT:
case IT_CREAT|IT_OPEN:
case IT_MKDIR:
- case IT_SETATTR:
case IT_SYMLINK:
case IT_MKNOD:
case IT_LINK:
case IT_UNLINK:
case IT_RMDIR:
case IT_RENAME2:
- if (mds_reint_p == NULL)
- mds_reint_p =
- inter_module_get_request
- ("mds_reint", "mds");
- if (IS_ERR(mds_reint_p)) {
- CERROR("MDSINTENT locks require the MDS "
- "module.\n");
- LBUG();
- RETURN(-EINVAL);
- }
rc = mds_reint_p(2, req);
if (rc || req->rq_status != 0) {
rep->lock_policy_res2 = req->rq_status;
case IT_READDIR:
case IT_RENAME:
case IT_OPEN:
- if (mds_getattr_name_p == NULL)
- mds_getattr_name_p =
- inter_module_get_request
- ("mds_getattr_name", "mds");
- if (IS_ERR(mds_getattr_name_p)) {
- CERROR("MDSINTENT locks require the MDS "
- "module.\n");
- LBUG();
- RETURN(-EINVAL);
- }
+ case IT_SETATTR:
+ case IT_LOOKUP:
rc = mds_getattr_name_p(2, req);
/* FIXME: we need to sit down and decide on who should
* set req->rq_status, who should return negative and
GOTO(out_dec, rc = -ENOMEM);
}
+ if (mds_reint_p == NULL)
+ mds_reint_p = inter_module_get_request("mds_reint", "mds");
+ if (IS_ERR(mds_reint_p)) {
+ CERROR("MDSINTENT locks require the MDS module.\n");
+ GOTO(out_dec, rc = -EINVAL);
+ }
+ if (mds_getattr_name_p == NULL)
+ mds_getattr_name_p = inter_module_get_request
+ ("mds_getattr_name", "mds");
+ if (IS_ERR(mds_getattr_name_p)) {
+ CERROR("MDSINTENT locks require the MDS module.\n");
+ GOTO(out_dec, rc = -EINVAL);
+ }
+
for (i = 0; i < LDLM_NUM_THREADS; i++) {
rc = ptlrpc_start_thread(obddev, ldlm->ldlm_service,
"lustre_dlm");
ptlrpc_stop_all_threads(ldlm->ldlm_service);
ptlrpc_unregister_service(ldlm->ldlm_service);
out_dec:
+ if (mds_reint_p != NULL)
+ inter_module_put("mds_reint");
+ if (mds_getattr_name_p != NULL)
+ inter_module_put("mds_getattr_name");
MOD_DEC_USE_COUNT;
return rc;
}
EXPORT_SYMBOL(ldlm_cli_convert);
EXPORT_SYMBOL(ldlm_cli_enqueue);
EXPORT_SYMBOL(ldlm_cli_cancel);
+EXPORT_SYMBOL(ldlm_match_or_enqueue);
EXPORT_SYMBOL(ldlm_test);
EXPORT_SYMBOL(ldlm_lock_dump);
EXPORT_SYMBOL(ldlm_namespace_new);
return rc;
}
+int ldlm_match_or_enqueue(struct ptlrpc_client *cl,
+ struct ptlrpc_connection *conn,
+ struct lustre_handle *connh,
+ struct ptlrpc_request *req,
+ struct ldlm_namespace *ns,
+ struct lustre_handle *parent_lock_handle,
+ __u64 *res_id,
+ __u32 type,
+ void *cookie, int cookielen,
+ ldlm_mode_t mode,
+ int *flags,
+ ldlm_lock_callback callback,
+ void *data,
+ __u32 data_len,
+ struct lustre_handle *lockh)
+{
+ int rc;
+ ENTRY;
+ rc = ldlm_lock_match(ns, res_id, type, cookie, cookielen, mode, lockh);
+ if (rc == 0) {
+ rc = ldlm_cli_enqueue(cl, conn, connh, req, ns,
+ parent_lock_handle, res_id, type, cookie,
+ cookielen, mode, flags, callback, data,
+ data_len, lockh);
+ if (rc != ELDLM_OK)
+ CERROR("ldlm_cli_enqueue: err: %d\n", rc);
+ RETURN(rc);
+ } else
+ RETURN(0);
+}
+
int ldlm_server_ast(struct lustre_handle *lockh, struct ldlm_lock_desc *desc,
void *data, __u32 data_len)
{
return rc;
}
-int ldlm_cli_cancel(struct lustre_handle *lockh,
- struct lustre_handle *connh)
+int ldlm_cli_cancel(struct lustre_handle *lockh)
{
struct ptlrpc_request *req;
struct ldlm_lock *lock;
struct lustre_handle lockh;
ldlm_lock2handle(lock, &lockh);
/* can we get away without a connh here? */
- rc = ldlm_cli_cancel(&lockh, NULL);
+ rc = ldlm_cli_cancel(&lockh);
if (rc < 0) {
CERROR("ldlm_cli_cancel: %d\n", rc);
LBUG();
EXIT;
return;
}
-
if (de->d_it->it_lock_mode) {
handle = (struct lustre_handle *)de->d_it->it_lock_handle;
- ldlm_lock_decref(handle, de->d_it->it_lock_mode);
+ if (de->d_it->it_op == IT_SETATTR) {
+ int rc;
+ ldlm_lock_decref(handle, de->d_it->it_lock_mode);
+ rc = ldlm_cli_cancel(handle);
+ if (rc < 0)
+ CERROR("ldlm_cli_cancel: %d\n", rc);
+ } else
+ ldlm_lock_decref(handle, de->d_it->it_lock_mode);
}
de->d_it = NULL;
EXIT;
invalidate_inode_pages(inode);
up(&inode->i_sem);
- if (ldlm_cli_cancel(lockh, NULL) < 0)
+ if (ldlm_cli_cancel(lockh) < 0)
LBUG();
RETURN(0);
}
struct lookup_intent *it, struct lustre_handle *lockh)
{
struct ll_sb_info *sbi = ll_i2sbi(dir);
- int err;
+ int err, lock_mode;
if ((it->it_op & (IT_CREAT | IT_MKDIR | IT_SYMLINK | IT_SETATTR |
- IT_MKNOD)) )
- err = mdc_enqueue(&sbi->ll_mdc_conn, LDLM_MDSINTENT,
- it, LCK_PW, dir, dentry, lockh, 0, NULL, 0,
- dir, sizeof(*dir));
+ IT_MKNOD)))
+ lock_mode = LCK_PW;
else if (it->it_op & (IT_READDIR | IT_GETATTR | IT_OPEN | IT_UNLINK |
IT_RMDIR | IT_RENAME | IT_RENAME2))
- err = mdc_enqueue(&sbi->ll_mdc_conn, LDLM_MDSINTENT,
- it, LCK_PR, dir, dentry, lockh, 0, NULL, 0,
- dir, sizeof(*dir));
+ lock_mode = LCK_PR;
+ else if (it->it_op & IT_LOOKUP)
+ lock_mode = LCK_CR;
else {
LBUG();
RETURN(-1);
}
+ err = mdc_enqueue(&sbi->ll_mdc_conn, LDLM_MDSINTENT, it, lock_mode, dir,
+ dentry, lockh, 0, NULL, 0, dir, sizeof(*dir));
+
RETURN(err);
}
struct ll_inode_md md;
struct lustre_handle lockh;
int err, type, offset;
+ struct lookup_intent lookup_it = { IT_LOOKUP };
obd_id ino;
ENTRY;
if (it == NULL) {
- LBUG();
- RETURN(NULL);
+ it = &lookup_it;
+ dentry->d_it = it;
}
CDEBUG(D_INFO, "name: %*s, intent op: %d\n", dentry->d_name.len,
memcpy(it->it_lock_handle, &lockh, sizeof(lockh));
if ((it->it_op & (IT_CREAT | IT_MKDIR | IT_SYMLINK | IT_MKNOD)) &&
- it->it_disposition && !it->it_status) {
-#if 0
- if (it->it_data)
- CERROR("leaking request %p\n", it->it_data);
-#endif
+ it->it_disposition && !it->it_status)
GOTO(negative, NULL);
- }
- if ((it->it_op & (IT_RENAME | IT_GETATTR | IT_UNLINK | IT_RMDIR)) &&
- it->it_disposition && it->it_status) {
-#if 0
- if (it->it_data)
- CERROR("request: %p, status: %d\n", it->it_data,
- it->it_status);
-#endif
+ if ((it->it_op & (IT_RENAME | IT_GETATTR | IT_UNLINK | IT_RMDIR |
+ IT_SETATTR | IT_LOOKUP)) && it->it_disposition &&
+ it->it_status)
GOTO(negative, NULL);
- }
request = (struct ptlrpc_request *)it->it_data;
if (!it->it_disposition) {
inode->i_mode = S_IFDIR;
inode->i_nlink = 1;
GOTO(out_req, 0);
- } else if (it->it_op != IT_RENAME2) {
+ } else if (it->it_op == IT_RENAME2) {
+ LBUG();
+ } else {
struct mds_body *body;
offset = 1;
negative:
dentry->d_op = &ll_d_ops;
d_add(dentry, inode);
+ if (it->it_op == IT_LOOKUP)
+ ll_intent_release(dentry);
+
return NULL;
}
/* change incore inode */
ll_attr2inode(inode, attr, do_trunc);
- err = mdc_setattr(&sbi->ll_mdc_conn, inode, attr,
- &request);
+ err = mdc_setattr(&sbi->ll_mdc_conn, inode, attr, &request);
if (err)
CERROR("mdc_setattr fails (%d)\n", err);
invalidate_inode_pages(inode);
}
- rc = ldlm_cli_cancel(lockh, NULL);
+ rc = ldlm_cli_cancel(lockh);
if (rc < 0) {
CERROR("ldlm_cli_cancel: %d\n", rc);
LBUG();
case IT_MKDIR:
it->it_mode = (it->it_mode | S_IFDIR) & ~current->fs->umask;
break;
- case IT_SETATTR:
- it->it_op = IT_GETATTR;
- break;
case (IT_CREAT|IT_OPEN):
case IT_CREAT:
case IT_MKNOD:
size[0] = sizeof(struct ldlm_reply);
req->rq_replen = lustre_msg_size(1, size);
} else if (it->it_op == IT_GETATTR || it->it_op == IT_RENAME ||
- it->it_op == IT_OPEN) {
+ it->it_op == IT_OPEN || it->it_op == IT_SETATTR ||
+ it->it_op == IT_LOOKUP) {
size[2] = sizeof(struct mds_body);
size[3] = de->d_name.len + 1;
size[1] = sizeof(struct mds_body);
size[2] = sizeof(struct obdo);
req->rq_replen = lustre_msg_size(3, size);
- } else if (it->it_op == IT_SETATTR) {
- size[2] = sizeof(struct mds_rec_setattr);
- size[3] = de->d_name.len + 1;
-
- req = ptlrpc_prep_req2(mdc->mdc_ldlm_client, mdc->mdc_conn,
- &mdc->mdc_connh, LDLM_ENQUEUE, 4, size,
- NULL);
- if (!req)
- RETURN(-ENOMEM);
-
- lit = lustre_msg_buf(req->rq_reqmsg, 1);
- lit->opc = NTOH__u64((__u64)it->it_op);
-
- if (!it->it_iattr)
- LBUG();
-
- mds_setattr_pack(req, 2, dir, it->it_iattr,
- de->d_name.name, de->d_name.len);
- size[0] = sizeof(struct ldlm_reply);
- size[1] = sizeof(struct mds_body);
- req->rq_replen = lustre_msg_size(2, size);
} else if (it->it_op == IT_READDIR) {
req = ptlrpc_prep_req2(mdc->mdc_ldlm_client, mdc->mdc_conn,
&mdc->mdc_connh, LDLM_ENQUEUE, 1, size, NULL);
return rc;
}
+/* 'dir' is a inode for which a lock has already been taken */
+struct dentry *mds_name2locked_dentry(struct mds_obd *mds, struct dentry *dir,
+ struct vfsmount **mnt, char *name,
+ int namelen, int lock_mode,
+ struct lustre_handle *lockh)
+{
+ struct dentry *dchild;
+ int flags, rc;
+ __u64 res_id[3] = {0};
+ ENTRY;
+
+ down(&dir->d_inode->i_sem);
+ dchild = lookup_one_len(name, dir, namelen);
+ if (IS_ERR(dchild)) {
+ CERROR("child lookup error %ld\n", PTR_ERR(dchild));
+ up(&dir->d_inode->i_sem);
+ LBUG();
+ }
+ up(&dir->d_inode->i_sem);
+
+ if (lock_mode == 0)
+ RETURN(dchild);
+
+ res_id[0] = dchild->d_inode->i_ino;
+ rc = ldlm_match_or_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn,
+ (struct lustre_handle *)&mds->mds_connh,
+ NULL, mds->mds_local_namespace, NULL,
+ res_id, LDLM_PLAIN, NULL, 0, lock_mode,
+ &flags, (void *)mds_lock_callback, NULL,
+ 0, lockh);
+ if (rc != ELDLM_OK) {
+ l_dput(dchild);
+ RETURN(NULL);
+ }
+
+ RETURN(dchild);
+}
+
+struct dentry *mds_fid2locked_dentry(struct mds_obd *mds, struct ll_fid *fid,
+ struct vfsmount **mnt, int lock_mode,
+ struct lustre_handle *lockh)
+{
+ struct dentry *de = mds_fid2dentry(mds, fid, mnt), *retval = de;
+ int flags, rc;
+ __u64 res_id[3] = {0};
+ ENTRY;
+
+ if (IS_ERR(de))
+ RETURN(de);
+
+ res_id[0] = de->d_inode->i_ino;
+ rc = ldlm_match_or_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn,
+ (struct lustre_handle *)&mds->mds_connh,
+ NULL, mds->mds_local_namespace, NULL,
+ res_id, LDLM_PLAIN, NULL, 0, lock_mode,
+ &flags, (void *)mds_lock_callback, NULL,
+ 0, lockh);
+ if (rc != ELDLM_OK) {
+ l_dput(de);
+ retval = NULL;
+ }
+
+ RETURN(retval);
+}
+
struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
struct vfsmount **mnt)
{
RETURN(0);
}
- if (ldlm_cli_cancel(lockh, NULL) < 0)
+ if (ldlm_cli_cancel(lockh) < 0)
LBUG();
RETURN(0);
}
}
/* now a normal case for intent locking */
rc = 0;
- } else {
+ } else
rc = -ENOENT;
- }
EXIT;
out_create_dchild:
return rc;
}
+/* In the write-back case, the client holds a lock on a subtree.
+ * In the intent case, the client holds a lock on the child inode.
+ * In the pathname case, the client (may) hold a lock on the child inode. */
static int mds_reint_setattr(struct mds_update_record *rec, int offset,
struct ptlrpc_request *req)
{
struct mds_obd *mds = mds_req2mds(req);
struct dentry *de;
void *handle;
- int rc = 0;
- int err;
+ struct lustre_handle child_lockh;
+ int rc = 0, err;
+
+ if (req->rq_reqmsg->bufcount > offset + 1) {
+ struct dentry *dir;
+ struct lustre_handle dir_lockh;
+ char *name;
+ int namelen;
+ /* a name was supplied by the client; fid1 is the directory */
+
+ name = lustre_msg_buf(req->rq_reqmsg, offset + 1);
+ namelen = req->rq_reqmsg->buflens[offset + 1] - 1;
+ dir = mds_fid2locked_dentry(mds, rec->ur_fid1, NULL, LCK_PR,
+ &dir_lockh);
+ if (!dir || IS_ERR(dir)) {
+ l_dput(dir);
+ LBUG();
+ GOTO(out_setattr, rc = -ESTALE);
+ }
- de = mds_fid2dentry(mds, rec->ur_fid1, NULL);
- if (IS_ERR(de) || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_SETATTR)) {
- GOTO(out_setattr, rc = -ESTALE);
+ de = mds_name2locked_dentry(mds, dir, NULL, name, namelen,
+ 0, &child_lockh);
+ l_dput(dir);
+ if (!de || IS_ERR(de)) {
+ LBUG();
+ GOTO(out_setattr_de, rc = -ESTALE);
+ }
+ } else {
+ de = mds_fid2dentry(mds, rec->ur_fid1, NULL);
+ if (!de || IS_ERR(de)) {
+ LBUG();
+ GOTO(out_setattr_de, rc = -ESTALE);
+ }
}
-
CDEBUG(D_INODE, "ino %ld\n", de->d_inode->i_ino);
OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_SETATTR_WRITE,
de->d_inode->i_sb->s_dev);
+ lock_kernel();
handle = mds_fs_start(mds, de->d_inode, MDS_FSOP_SETATTR);
if (!handle)
- GOTO(out_setattr_de, rc = PTR_ERR(handle));
+ GOTO(out_unlock, rc = PTR_ERR(handle));
rc = mds_fs_setattr(mds, de, handle, &rec->ur_iattr);
if (!rc)
if (!rc)
rc = err;
}
+
EXIT;
-out_setattr_de:
+ out_unlock:
+ unlock_kernel();
+ out_setattr_de:
l_dput(de);
-out_setattr:
+ out_setattr:
req->rq_status = rc;
return(0);
}
if (!rc) {
ldlm_lock_decref(&lockh, LCK_EX);
- rc = ldlm_cli_cancel(&lockh, NULL);
+ rc = ldlm_cli_cancel(&lockh);
if (rc < 0)
CERROR("failed to cancel child inode lock ino "
"%Ld: %d\n", res_id[0], rc);
if (!rc) {
ldlm_lock_decref(&oldhandle, LCK_EX);
- rc = ldlm_cli_cancel(&oldhandle, NULL);
+ rc = ldlm_cli_cancel(&oldhandle);
if (rc < 0)
CERROR("failed to cancel child inode lock ino "
"%Ld: %d\n", res_id[0], rc);
int ptlrpc_reply(struct ptlrpc_service *svc, struct ptlrpc_request *req)
{
- if (req->rq_repmsg == NULL)
+ if (req->rq_repmsg == NULL) {
CERROR("bad: someone called ptlrpc_reply when they meant "
"ptlrpc_error\n");
+ return -EINVAL;
+ }
/* FIXME: we need to increment the count of handled events */
req->rq_type = PTL_RPC_TYPE_REPLY;
exit;
}
+sub unused() {
+ my $tmp = `./mcreate $path`;
+ if ($tmp) {
+ $tmp =~ /.*error: (.*)\n/;
+ print "Created $path: $1\n";
+ } else {
+ print "Created $path: Success\n";
+ }
+}
+
my $mtpt = shift || usage();
my $mount_count = shift || usage();
my $i = shift || usage();
$which = int(rand() * $mount_count) + 1;
$d = int(rand() * $files);
$path = "$mtpt$which/$d";
- my $tmp = `./mcreate $path`;
- if ($tmp) {
- $tmp =~ /.*error: (.*)\n/;
- print "Created $path: $1\n";
- } else {
- print "Created $path: Success\n";
- }
+ open(FH, ">$path") || die "open($PATH): $!";
+ print "Opened $path: Success\n";
+ close(FH) || die;
$which = int(rand() * $mount_count) + 1;
$d = int(rand() * $files);