l_lock(nslock);
lock->l_refc--;
+ LDLM_DEBUG(lock, "after refc--");
if (lock->l_refc < 0)
LBUG();
ldlm_resource_put(lock->l_resource);
if (lock->l_parent)
- ldlm_lock_put(lock->l_parent);
+ LDLM_LOCK_PUT(lock->l_parent);
if (lock->l_refc == 0 && (lock->l_flags & LDLM_FL_DESTROYED)) {
+ lock->l_resource = NULL;
+ LDLM_DEBUG(lock, "final lock_put on destroyed lock, freeing");
if (lock->l_connection)
ptlrpc_put_connection(lock->l_connection);
CDEBUG(D_MALLOC, "kfreed 'lock': %d at %p (tot 1).\n",
}
l_unlock(nslock);
EXIT;
- return;
}
void ldlm_lock_destroy(struct ldlm_lock *lock)
l_lock(&lock->l_resource->lr_namespace->ns_lock);
if (!list_empty(&lock->l_children)) {
- CERROR("lock %p still has children (%p)!\n", lock,
- lock->l_children.next);
+ LDLM_DEBUG(lock, "still has children (%p)!",
+ lock->l_children.next);
ldlm_lock_dump(lock);
LBUG();
}
if (lock->l_readers || lock->l_writers) {
- CDEBUG(D_INFO, "lock still has references (%d readers, %d "
- "writers)\n", lock->l_readers, lock->l_writers);
+ LDLM_DEBUG(lock, "lock still has references");
+ ldlm_lock_dump(lock);
LBUG();
}
- if (!list_empty(&lock->l_res_link))
+ if (!list_empty(&lock->l_res_link)) {
+ ldlm_lock_dump(lock);
LBUG();
+ }
if (lock->l_flags & LDLM_FL_DESTROYED) {
+ l_unlock(&lock->l_resource->lr_namespace->ns_lock);
EXIT;
return;
}
lock->l_flags = LDLM_FL_DESTROYED;
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
- ldlm_lock_put(lock);
+ LDLM_LOCK_PUT(lock);
EXIT;
- return;
}
+
/*
usage: pass in a resource on which you have done get
pass in a parent lock on which you have done a get
}
/* this is the extra refcount, to prevent the lock
evaporating */
- ldlm_lock_get(lock);
+ LDLM_LOCK_GET(lock);
RETURN(lock);
}
lockh->cookie = lock->l_random;
}
-
struct ldlm_lock *ldlm_handle2lock(struct lustre_handle *handle)
{
- struct ldlm_lock *lock = NULL;
+ struct ldlm_lock *lock = NULL, *retval = NULL;
ENTRY;
if (!handle || !handle->addr)
l_lock(&lock->l_resource->lr_namespace->ns_lock);
if (lock->l_random != handle->cookie)
- GOTO(out, handle = NULL);
+ GOTO(out, NULL);
if (lock->l_flags & LDLM_FL_DESTROYED)
- GOTO(out, handle = NULL);
+ GOTO(out, NULL);
- ldlm_lock_get(lock);
+ retval = LDLM_LOCK_GET(lock);
EXIT;
out:
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
- return lock;
+ return retval;
}
if (req->rq_reqmsg->bufcount > 1) {
/* an intent needs to be considered */
struct ldlm_intent *it = lustre_msg_buf(req->rq_reqmsg, 1);
+ struct mds_obd *mds= &req->rq_export->exp_obd->u.mds;
struct mds_body *mds_rep;
struct ldlm_reply *rep;
__u64 new_resid[3] = {0, 0, 0}, old_res;
int bufcount = -1, rc, size[3] = {sizeof(struct ldlm_reply),
sizeof(struct mds_body),
- sizeof(struct obdo)};
+ mds->mds_max_mdsize};
it->opc = NTOH__u64(it->opc);
* a file and its obdo */
case IT_CREAT:
case IT_CREAT|IT_OPEN:
+ case IT_LINK:
+ case IT_LOOKUP:
case IT_MKDIR:
- case IT_SYMLINK:
case IT_MKNOD:
- case IT_LINK:
case IT_OPEN:
+ case IT_READLINK:
case IT_RENAME:
- bufcount = 3;
- break;
+ case IT_RMDIR:
+ case IT_SETATTR:
+ case IT_SYMLINK:
case IT_UNLINK:
- bufcount = 2;
- size[1] = sizeof(struct obdo);
+ bufcount = 3;
break;
- case IT_RMDIR:
+ case IT_RENAME2:
bufcount = 1;
break;
default:
switch (it->opc) {
case IT_CREAT:
case IT_CREAT|IT_OPEN:
+ case IT_LINK:
case IT_MKDIR:
- case IT_SETATTR:
- case IT_SYMLINK:
case IT_MKNOD:
- case IT_LINK:
- case IT_UNLINK:
- case IT_RMDIR:
case IT_RENAME2:
- if (mds_reint_p == NULL)
- mds_reint_p =
- inter_module_get_request
- ("mds_reint", "mds");
- if (IS_ERR(mds_reint_p)) {
- CERROR("MDSINTENT locks require the MDS "
- "module.\n");
- LBUG();
- RETURN(-EINVAL);
- }
+ case IT_RMDIR:
+ case IT_SYMLINK:
+ case IT_UNLINK:
rc = mds_reint_p(2, req);
- if (rc)
- LBUG();
+ if (rc || req->rq_status != 0) {
+ rep->lock_policy_res2 = req->rq_status;
+ RETURN(ELDLM_LOCK_ABORTED);
+ }
break;
case IT_GETATTR:
+ case IT_LOOKUP:
+ case IT_OPEN:
case IT_READDIR:
+ case IT_READLINK:
case IT_RENAME:
- case IT_OPEN:
- if (mds_getattr_name_p == NULL)
- mds_getattr_name_p =
- inter_module_get_request
- ("mds_getattr_name", "mds");
- if (IS_ERR(mds_getattr_name_p)) {
- CERROR("MDSINTENT locks require the MDS "
- "module.\n");
- LBUG();
- RETURN(-EINVAL);
- }
+ case IT_SETATTR:
rc = mds_getattr_name_p(2, req);
- if (rc) {
- req->rq_status = rc;
- RETURN(rc);
+ /* FIXME: we need to sit down and decide on who should
+ * set req->rq_status, who should return negative and
+ * positive return values, and what they all mean. */
+ if (rc || req->rq_status != 0) {
+ rep->lock_policy_res2 = req->rq_status;
+ RETURN(ELDLM_LOCK_ABORTED);
}
break;
case IT_READDIR|IT_OPEN:
LBUG();
}
- if (it->opc == IT_UNLINK || it->opc == IT_RMDIR)
+ if (it->opc == IT_UNLINK || it->opc == IT_RMDIR ||
+ it->opc == IT_RENAME || it->opc == IT_RENAME2)
RETURN(ELDLM_LOCK_ABORTED);
- mds_rep = lustre_msg_buf(req->rq_repmsg, 1);
rep->lock_policy_res2 = req->rq_status;
- new_resid[0] = mds_rep->ino;
+ mds_rep = lustre_msg_buf(req->rq_repmsg, 1);
+ new_resid[0] = NTOH__u32(mds_rep->ino);
+ if (new_resid[0] == 0)
+ LBUG();
old_res = lock->l_resource->lr_name[0];
CDEBUG(D_INFO, "remote intent: locking %d instead of"
OBD_ALLOC(w, sizeof(*w));
if (!w) {
LBUG();
- return;
+ GOTO(out, 0);
}
if (new) {
ldlm_lock2desc(new, &w->w_desc);
}
- w->w_lock = ldlm_lock_get(lock);
+ w->w_lock = LDLM_LOCK_GET(lock);
list_add(&w->w_list, lock->l_resource->lr_tmp);
out:
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
lock = ldlm_handle2lock(lockh);
ldlm_lock_addref_internal(lock, mode);
- ldlm_lock_put(lock);
+ LDLM_LOCK_PUT(lock);
}
/* only called for local locks */
else
lock->l_writers++;
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
- ldlm_lock_get(lock);
+ LDLM_LOCK_GET(lock);
+ LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]);
}
/* Args: unlocked lock */
if (lock == NULL)
LBUG();
- LDLM_DEBUG(lock, "ldlm_lock_decref(%d)", mode);
+ LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
l_lock(&lock->l_resource->lr_namespace->ns_lock);
if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
lock->l_readers--;
LBUG();
}
- CDEBUG(D_INFO, "final decref done on cbpending lock, "
- "calling callback.\n");
+ LDLM_DEBUG(lock, "final decref done on cbpending lock");
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
ldlm_lock2handle(lock, &lockh);
- lock->l_blocking_ast(&lockh, NULL, lock->l_data,
+ /* FIXME: -1 is a really, really bad 'desc' */
+ lock->l_blocking_ast(&lockh, (void *)-1, lock->l_data,
lock->l_data_len);
} else
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
- ldlm_lock_put(lock);
+ LDLM_LOCK_PUT(lock); /* matches the ldlm_lock_get in addref */
+ LDLM_LOCK_PUT(lock); /* matches the handle2lock above */
EXIT;
}
if (lock) {
ldlm_lock2handle(lock, lockh);
- wait_event_interruptible(lock->l_waitq, lock->l_req_mode ==
- lock->l_granted_mode);
+ wait_event(lock->l_waitq,
+ lock->l_req_mode == lock->l_granted_mode);
}
if (rc)
LDLM_DEBUG(lock, "matched");
}
/* Returns a referenced, lock */
-
struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
struct lustre_handle *parent_lock_handle,
__u64 *res_id, __u32 type,
if (rc)
CERROR("Failed AST - should clean & disconnect "
"client\n");
- ldlm_lock_put(w->w_lock);
+ LDLM_LOCK_PUT(w->w_lock);
list_del(&w->w_list);
OBD_FREE(w, sizeof(*w));
}
EXIT;
}
-/* Must be called with lock and lock->l_resource unlocked */
void ldlm_lock_cancel(struct ldlm_lock *lock)
{
struct ldlm_resource *res;
ldlm_resource_unlink_lock(lock);
ldlm_lock_destroy(lock);
l_unlock(&ns->ns_lock);
+ EXIT;
}
-/* Must be called with lock and lock->l_resource unlocked */
struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
int *flags)
{