struct list_head l_children;
struct list_head l_childof;
struct list_head l_res_link; /*position in one of three res lists*/
- atomic_t l_refcount;
ldlm_mode_t l_req_mode;
ldlm_mode_t l_granted_mode;
#define LDLM_DEBUG(lock, format, a...) \
do { \
- CDEBUG(D_DLMTRACE, "### " format \
- " (%s: lock %p(rc=%d/%d,%d) mode %s/%s on res %Lu" \
- "(rc=%d) type %s remote %Lx)\n" , ## a, \
- lock->l_resource->lr_namespace->ns_name, lock, \
- lock->l_refc, lock->l_readers, lock->l_writers, \
- ldlm_lockname[lock->l_granted_mode], \
- ldlm_lockname[lock->l_req_mode], \
- lock->l_resource->lr_name[0], \
- atomic_read(&lock->l_resource->lr_refcount), \
- ldlm_typename[lock->l_resource->lr_type], \
- lock->l_remote_handle.addr); \
+ if (lock->l_resource == NULL) \
+ CDEBUG(D_DLMTRACE, "### " format \
+ " (UNKNOWN: lock %p(rc=%d/%d,%d) mode %s/%s on " \
+ "res \?\? (rc=\?\?) type \?\?\? remote %Lx)\n" , \
+ ## a, lock, lock->l_refc, lock->l_readers, \
+ lock->l_writers, \
+ ldlm_lockname[lock->l_granted_mode], \
+ ldlm_lockname[lock->l_req_mode], \
+ lock->l_remote_handle.addr); \
+ else \
+ CDEBUG(D_DLMTRACE, "### " format \
+ " (%s: lock %p(rc=%d/%d,%d) mode %s/%s on res " \
+ "%Lu (rc=%d) type %s remote %Lx)\n" , ## a, \
+ lock->l_resource->lr_namespace->ns_name, lock, \
+ lock->l_refc, lock->l_readers, lock->l_writers, \
+ ldlm_lockname[lock->l_granted_mode], \
+ ldlm_lockname[lock->l_req_mode], \
+ lock->l_resource->lr_name[0], \
+ atomic_read(&lock->l_resource->lr_refcount), \
+ ldlm_typename[lock->l_resource->lr_type], \
+ lock->l_remote_handle.addr); \
} while (0)
#define LDLM_DEBUG_NOLOCK(format, a...) \
void ldlm_lock2handle(struct ldlm_lock *lock, struct lustre_handle *lockh);
struct ldlm_lock *ldlm_handle2lock(struct lustre_handle *handle);
void ldlm_lock2handle(struct ldlm_lock *lock, struct lustre_handle *lockh);
+
+#define LDLM_LOCK_PUT(lock) \
+do { \
+ LDLM_DEBUG(lock, "put"); \
+ ldlm_lock_put(lock); \
+} while (0)
+
+#define LDLM_LOCK_GET(lock) \
+({ \
+ ldlm_lock_get(lock); \
+ LDLM_DEBUG(lock, "get"); \
+ lock; \
+})
+
void ldlm_lock_put(struct ldlm_lock *lock);
void ldlm_lock_destroy(struct ldlm_lock *lock);
void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc);
l_lock(nslock);
lock->l_refc--;
- //LDLM_DEBUG(lock, "after refc--");
+ LDLM_DEBUG(lock, "after refc--");
if (lock->l_refc < 0)
LBUG();
ldlm_resource_put(lock->l_resource);
if (lock->l_parent)
- ldlm_lock_put(lock->l_parent);
+ LDLM_LOCK_PUT(lock->l_parent);
if (lock->l_refc == 0 && (lock->l_flags & LDLM_FL_DESTROYED)) {
+ lock->l_resource = NULL;
+ LDLM_DEBUG(lock, "final lock_put on destroyed lock, freeing");
if (lock->l_connection)
ptlrpc_put_connection(lock->l_connection);
CDEBUG(D_MALLOC, "kfreed 'lock': %d at %p (tot 1).\n",
lock->l_flags = LDLM_FL_DESTROYED;
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
- ldlm_lock_put(lock);
+ LDLM_LOCK_PUT(lock);
EXIT;
}
}
/* this is the extra refcount, to prevent the lock
evaporating */
- ldlm_lock_get(lock);
+ LDLM_LOCK_GET(lock);
RETURN(lock);
}
struct ldlm_lock *ldlm_handle2lock(struct lustre_handle *handle)
{
- struct ldlm_lock *lock = NULL;
+ struct ldlm_lock *lock = NULL, *retval = NULL;
ENTRY;
if (!handle || !handle->addr)
l_lock(&lock->l_resource->lr_namespace->ns_lock);
if (lock->l_random != handle->cookie)
- GOTO(out, handle = NULL);
+ GOTO(out, NULL);
if (lock->l_flags & LDLM_FL_DESTROYED)
- GOTO(out, handle = NULL);
+ GOTO(out, NULL);
- ldlm_lock_get(lock);
+ retval = LDLM_LOCK_GET(lock);
EXIT;
out:
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
- return lock;
+ return retval;
}
OBD_ALLOC(w, sizeof(*w));
if (!w) {
LBUG();
- return;
+ GOTO(out, 0);
}
if (new) {
ldlm_lock2desc(new, &w->w_desc);
}
- w->w_lock = ldlm_lock_get(lock);
+ w->w_lock = LDLM_LOCK_GET(lock);
list_add(&w->w_list, lock->l_resource->lr_tmp);
out:
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
lock = ldlm_handle2lock(lockh);
ldlm_lock_addref_internal(lock, mode);
- ldlm_lock_put(lock);
+ LDLM_LOCK_PUT(lock);
}
/* only called for local locks */
else
lock->l_writers++;
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
- ldlm_lock_get(lock);
+ LDLM_LOCK_GET(lock);
LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]);
}
} else
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
- ldlm_lock_put(lock); /* matches the ldlm_lock_get in addref */
- ldlm_lock_put(lock); /* matches the handle2lock above */
+ LDLM_LOCK_PUT(lock); /* matches the ldlm_lock_get in addref */
+ LDLM_LOCK_PUT(lock); /* matches the handle2lock above */
EXIT;
}
if (rc)
CERROR("Failed AST - should clean & disconnect "
"client\n");
- ldlm_lock_put(w->w_lock);
+ LDLM_LOCK_PUT(w->w_lock);
list_del(&w->w_list);
OBD_FREE(w, sizeof(*w));
}
EXIT;
}
-/* Must be called with lock and lock->l_resource unlocked */
void ldlm_lock_cancel(struct ldlm_lock *lock)
{
struct ldlm_resource *res;
ldlm_resource_unlink_lock(lock);
ldlm_lock_destroy(lock);
l_unlock(&ns->ns_lock);
+ EXIT;
}
-/* Must be called with lock and lock->l_resource unlocked */
struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
int *flags)
{
lock->l_connection = ptlrpc_connection_addref(req->rq_connection);
EXIT;
out:
- if (lock) {
- LDLM_DEBUG(lock, "server-side enqueue handler, sending reply");
- ldlm_lock_put(lock);
- }
+ if (lock)
+ LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"
+ "(err=%d)", err);
req->rq_status = err;
- CDEBUG(D_INFO, "err = %d\n", err);
if (ptlrpc_reply(req->rq_svc, req))
LBUG();
- if (!err)
- ldlm_reprocess_all(lock->l_resource);
+ if (lock) {
+ if (!err)
+ ldlm_reprocess_all(lock->l_resource);
+ LDLM_LOCK_PUT(lock);
+ }
LDLM_DEBUG_NOLOCK("server-side enqueue handler END (lock %p)", lock);
return 0;
if (lock) {
ldlm_reprocess_all(lock->l_resource);
- ldlm_lock_put(lock);
LDLM_DEBUG(lock, "server-side convert handler END");
+ LDLM_LOCK_PUT(lock);
} else
LDLM_DEBUG_NOLOCK("server-side convert handler END");
struct ldlm_request *dlm_req;
struct ldlm_lock *lock;
int rc;
- char *ns_name;
ENTRY;
rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
req->rq_status = ESTALE;
} else {
LDLM_DEBUG(lock, "server-side cancel handler START");
- ns_name = lock->l_resource->lr_namespace->ns_name;
ldlm_lock_cancel(lock);
req->rq_status = 0;
}
if (lock) {
ldlm_reprocess_all(lock->l_resource);
- ldlm_lock_put(lock);
- LDLM_DEBUG_NOLOCK("server-side cancel handler END (%s: lock "
- "%p)", ns_name, lock);
+ LDLM_DEBUG(lock, "server-side cancel handler END");
+ LDLM_LOCK_PUT(lock);
} else
LDLM_DEBUG_NOLOCK("server-side cancel handler END (lock %p)",
lock);
}
} else {
LDLM_DEBUG(lock, "Lock still has references, will be"
- " cancelled later");
+ " cancelled later");
}
- ldlm_lock_put(lock);
+ LDLM_LOCK_PUT(lock);
} else {
struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
wake_up(&lock->l_waitq);
lock->l_resource->lr_tmp = NULL;
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
- ldlm_lock_put(lock);
+ LDLM_LOCK_PUT(lock);
ldlm_run_ast_work(&rpc_list);
}
if (rc != ELDLM_OK) {
LDLM_DEBUG(lock, "client-side enqueue END (%s)",
rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED");
- ldlm_lock_put(lock);
+ LDLM_LOCK_PUT(lock);
ldlm_lock_decref(lockh, mode);
/* FIXME: if we've already received a completion AST, this will
* LBUG! */
ptlrpc_free_req(req);
rc = ldlm_lock_enqueue(lock, cookie, cookielen, flags, callback,
- callback);
+ callback);
if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
LDLM_FL_BLOCK_CONV)) {
LDLM_DEBUG(lock, "client-side enqueue waking up: granted");
}
LDLM_DEBUG(lock, "client-side enqueue END");
- ldlm_lock_put(lock);
+ LDLM_LOCK_PUT(lock);
EXIT;
out:
return rc;
ENTRY;
lock = ldlm_handle2lock(lockh);
- if (lock == NULL)
+ if (lock == NULL) {
LBUG();
+ RETURN(-EINVAL);
+ }
cl = &lock->l_resource->lr_namespace->ns_rpc_client;
req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CALLBACK, 1,
&size, NULL);
EXIT;
out:
- ldlm_lock_put(lock);
+ LDLM_LOCK_PUT(lock);
return rc;
}
ENTRY;
lock = ldlm_handle2lock(lockh);
- if (!lock)
- LBUG();
+ if (!lock) {
+ LBUG();
+ RETURN(-EINVAL);
+ }
*flags = 0;
LDLM_DEBUG(lock, "client-side convert");
lock->l_req_mode == lock->l_granted_mode);
CDEBUG(D_NET, "waking up, the lock must be granted.\n");
}
- ldlm_lock_put(lock);
+ LDLM_LOCK_PUT(lock);
EXIT;
out:
ptlrpc_free_req(req);
GOTO(out, rc);
ldlm_lock_cancel(lock);
- ldlm_lock_put(lock);
+ LDLM_LOCK_PUT(lock);
EXIT;
out:
return 0;
list_for_each_safe(tmp, pos, q) {
struct ldlm_lock *lock;
lock = list_entry(tmp, struct ldlm_lock, l_res_link);
- ldlm_lock_get(lock);
+ LDLM_LOCK_GET(lock);
if (client) {
struct lustre_handle lockh;
ldlm_resource_unlink_lock(lock);
ldlm_lock_destroy(lock);
}
- ldlm_lock_put(lock);
+ LDLM_LOCK_PUT(lock);
}
return;
int ptlrpc_reply(struct ptlrpc_service *svc, struct ptlrpc_request *req)
{
+ if (req->rq_repmsg == NULL)
+ CERROR("bad: someone called ptlrpc_reply when they meant "
+ "ptlrpc_error\n");
+
/* FIXME: we need to increment the count of handled events */
req->rq_type = PTL_RPC_TYPE_REPLY;
//req->rq_repmsg->conn = req->rq_connection->c_remote_conn;