#define LDLM_DEBUG(lock, format, a...) \
do { \
CDEBUG(D_DLMTRACE, "### " format \
- " (%s: lock %p(rc=%d) mode %s/%s on res %Lu(rc=%d) " \
- " type %s remote %Lx)\n" , ## a, \
+ " (%s: lock %p(rc=%d/%d,%d) mode %s/%s on res %Lu" \
+ "(rc=%d) type %s remote %Lx)\n" , ## a, \
lock->l_resource->lr_namespace->ns_name, lock, \
- lock->l_refc, ldlm_lockname[lock->l_granted_mode], \
+ lock->l_refc, lock->l_readers, lock->l_writers, \
+ ldlm_lockname[lock->l_granted_mode], \
ldlm_lockname[lock->l_req_mode], \
lock->l_resource->lr_name[0], \
atomic_read(&lock->l_resource->lr_refcount), \
RETURN(-EINVAL);
}
rc = mds_getattr_name_p(2, req);
- if (rc) {
- req->rq_status = rc;
- RETURN(rc);
+ /* FIXME: we need to sit down and decide on who should
+ * set req->rq_status, who should return negative and
+ * positive return values, and what they all mean. */
+ if (rc || req->rq_status != 0) {
+ mds_rep = lustre_msg_buf(req->rq_repmsg, 1);
+ rep->lock_policy_res2 = req->rq_status;
+ RETURN(ELDLM_LOCK_ABORTED);
}
break;
case IT_READDIR|IT_OPEN:
mds_rep = lustre_msg_buf(req->rq_repmsg, 1);
rep->lock_policy_res2 = req->rq_status;
- new_resid[0] = mds_rep->ino;
+ new_resid[0] = NTOH__u32(mds_rep->ino);
+ if (new_resid[0] == 0)
+ LBUG();
old_res = lock->l_resource->lr_name[0];
CDEBUG(D_INFO, "remote intent: locking %d instead of"
lock->l_writers++;
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
ldlm_lock_get(lock);
+ LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]);
}
/* Args: unlocked lock */
if (lock == NULL)
LBUG();
- LDLM_DEBUG(lock, "ldlm_lock_decref(%d)", mode);
+ LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
l_lock(&lock->l_resource->lr_namespace->ns_lock);
if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
lock->l_readers--;
LBUG();
}
- CDEBUG(D_INFO, "final decref done on cbpending lock, "
- "calling callback.\n");
+ LDLM_DEBUG(lock, "final decref done on cbpending lock");
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
ldlm_lock2handle(lock, &lockh);
- lock->l_blocking_ast(&lockh, NULL, lock->l_data,
+ /* FIXME: -1 is a really, really bad 'desc' */
+ lock->l_blocking_ast(&lockh, (void *)-1, lock->l_data,
lock->l_data_len);
} else
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
lock->l_connection = ptlrpc_connection_addref(req->rq_connection);
EXIT;
out:
- if (lock)
+ if (lock) {
+ LDLM_DEBUG(lock, "server-side enqueue handler, sending reply");
ldlm_lock_put(lock);
+ }
req->rq_status = err;
CDEBUG(D_INFO, "err = %d\n", err);
if (ptlrpc_reply(req->rq_svc, req))
LBUG();
- if (err)
- LDLM_DEBUG_NOLOCK("server-side enqueue handler END");
- else {
+ if (!err)
ldlm_reprocess_all(lock->l_resource);
- LDLM_DEBUG(lock, "server-side enqueue handler END");
- }
+ LDLM_DEBUG_NOLOCK("server-side enqueue handler END");
return 0;
}
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
if (do_ast) {
- CDEBUG(D_INFO, "Lock already unused, calling "
- "callback (%p).\n", lock->l_blocking_ast);
+ LDLM_DEBUG(lock, "already unused, calling "
+ "callback (%p)", lock->l_blocking_ast);
if (lock->l_blocking_ast != NULL) {
struct lustre_handle lockh;
ldlm_lock2handle(lock, &lockh);
#include <linux/lustre_dlm.h>
-int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, struct lustre_handle *connh,
+int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn,
+ struct lustre_handle *connh,
struct ptlrpc_request *req,
struct ldlm_namespace *ns,
struct lustre_handle *parent_lock_handle,
data, data_len);
if (lock == NULL)
GOTO(out, rc = -ENOMEM);
+ LDLM_DEBUG(lock, "client-side enqueue START");
/* for the local lock, add the reference */
ldlm_lock_addref_internal(lock, mode);
ldlm_lock2handle(lock, lockh);
- LDLM_DEBUG(lock, "client-side enqueue START");
-
if (req == NULL) {
req = ptlrpc_prep_req2(cl, conn, connh,
LDLM_ENQUEUE, 1, &size, NULL);
rc = ldlm_lock_enqueue(lock, cookie, cookielen, flags, callback,
callback);
- LDLM_DEBUG(lock, "client-side enqueue END");
if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
LDLM_FL_BLOCK_CONV)) {
/* Go to sleep until the lock is granted. */
/* FIXME: or cancelled. */
- CDEBUG(D_NET, "enqueue returned a blocked lock (%p), "
- "going to sleep.\n", lock);
+ LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock,"
+ " sleeping");
ldlm_lock_dump(lock);
wait_event_interruptible(lock->l_waitq, lock->l_req_mode ==
lock->l_granted_mode);
- CDEBUG(D_NET, "waking up, the lock must be granted.\n");
+ LDLM_DEBUG(lock, "client-side enqueue waking up: granted");
}
+ LDLM_DEBUG(lock, "client-side enqueue END");
ldlm_lock_put(lock);
EXIT;
out:
RETURN(ERR_PTR(-abs(err)));
}
offset = 0;
- } else if (it->it_op == IT_UNLINK) {
+ } else if (it->it_op == IT_UNLINK) {
struct obdo *obdo;
obdo = lustre_msg_buf(request->rq_repmsg, 1);
inode = new_inode(dir->i_sb);
/* XXX fix mem allocation error */
memcpy(ll_i2info(inode)->lli_obdo, obdo, sizeof(*obdo));
- if (!inode)
+ if (!inode)
GOTO(out_req, -ENOMEM);
inode->i_mode= S_IFREG;
inode->i_nlink = 1;
GOTO(out_req, 0);
- } else if (it->it_op == IT_RMDIR) {
+ } else if (it->it_op == IT_RMDIR) {
inode = new_inode(dir->i_sb);
- if (!inode)
+ if (!inode)
GOTO(out_req, -ENOMEM);
ll_i2info(inode)->lli_obdo = NULL;
inode->i_mode= S_IFDIR;
struct page * page;
int err = -ENOENT;
- if (dentry->d_it && dentry->d_it->it_disposition) {
+ if (dentry->d_it && dentry->d_it->it_disposition) {
inode->i_nlink = 0;
- GOTO(out, err=0);
+ GOTO(out, err = dentry->d_it->it_status);
}
de = ext2_find_entry (dir, dentry, &page);
int err = 0;
int intent_did = dentry->d_it && dentry->d_it->it_disposition;
- if (!intent_did) {
+ if (!intent_did) {
if (!ext2_empty_dir(inode))
LBUG();
err = ll_unlink(dir, dentry);
- if (err)
+ if (err)
RETURN(err);
- }
+ } else
+ err = dentry->d_it->it_status;
inode->i_size = 0;
ext2_dec_count(inode);
ext2_dec_count(dir);
RETURN(err);
}
-static int ll_rename (struct inode * old_dir, struct dentry * old_dentry,
- struct inode * new_dir, struct dentry * new_dentry )
+static int ll_rename(struct inode * old_dir, struct dentry * old_dentry,
+ struct inode * new_dir, struct dentry * new_dentry)
{
struct inode * old_inode = old_dentry->d_inode;
struct inode * new_inode = new_dentry->d_inode;
struct ext2_dir_entry_2 * old_de;
int err = -ENOENT;
- if (new_dentry->d_it) {
- struct ptlrpc_request *req = new_dentry->d_it->it_data;
- err = req->rq_status;
- goto out;
- }
+ if (new_dentry->d_it)
+ GOTO(out, err = new_dentry->d_it->it_status);
err = ll_mdc_rename(old_dir, new_dir, old_dentry, new_dentry);
if (err)
return rc;
}
-static int mdc_lock_callback(struct lustre_handle *lockh, struct ldlm_lock_desc *desc,
- void *data, int data_len,
- struct ptlrpc_request **req)
+static int mdc_lock_callback(struct lustre_handle *lockh,
+ struct ldlm_lock_desc *desc, void *data,
+ int data_len, struct ptlrpc_request **req)
{
int rc;
struct inode *inode = data;
size[4] = tgtlen + 1;
req = ptlrpc_prep_req2(mdc->mdc_ldlm_client, mdc->mdc_conn,
&mdc->mdc_connh,
- LDLM_ENQUEUE, 5, size, NULL);
+ LDLM_ENQUEUE, 5, size, NULL);
if (!req)
RETURN(-ENOMEM);
size[0] = sizeof(struct ldlm_reply);
req->rq_replen = lustre_msg_size(1, size);
} else if ( it->it_op == IT_GETATTR || it->it_op == IT_RENAME ||
- it->it_op == IT_OPEN ) {
+ it->it_op == IT_OPEN ) {
size[2] = sizeof(struct mds_body);
size[3] = de->d_name.len + 1;
req = ptlrpc_prep_req2(mdc->mdc_ldlm_client, mdc->mdc_conn,
- &mdc->mdc_connh, LDLM_ENQUEUE, 4, size, NULL);
+ &mdc->mdc_connh, LDLM_ENQUEUE, 4, size,
+ NULL);
if (!req)
RETURN(-ENOMEM);
EXIT;
out:
ptlrpc_free_req(request);
- return 0;
+ return rc;
}
static int osc_close(struct lustre_handle *conn, struct obdo *oa)
static void ptlrpc_stop_thread(struct ptlrpc_service *svc,
struct ptlrpc_thread *thread)
{
+ spin_lock(&svc->srv_lock);
thread->t_flags = SVC_STOPPING;
+ spin_unlock(&svc->srv_lock);
wake_up(&svc->srv_waitq);
wait_event_interruptible(thread->t_ctl_waitq,
#!/usr/bin/perl
-$mtpt = shift || die;
-$mount_count = shift || die;
-$i = shift || die;
+my $mtpt = shift || die;
+my $mount_count = shift || die;
+my $i = shift || die;
+my $size = 2;
while ($i--) {
$which = int(rand() * $mount_count) + 1;
$path = "$mtpt$which/";
- $d = int(rand() * 5);
+ $d = int(rand() * $size);
print `./mcreate $path$d`;
$which = int(rand() * $mount_count) + 1;
$path = "$mtpt$which/";
- $d = int(rand() * 5);
+ $d = int(rand() * $size);
unlink("$path$d") || print "unlink($path$d): $!\n"
}
+print "Done.\n";