ldlm_cli_enqueue returns a lock, but it misses one refcount.
struct ldlm_lock *ldlm_handle2lock(struct lustre_handle *handle);
void ldlm_lock2handle(struct ldlm_lock *lock, struct lustre_handle *lockh);
void ldlm_lock_put(struct ldlm_lock *lock);
-void ldlm_lock_free(struct ldlm_lock *lock);
+void ldlm_lock_destroy(struct ldlm_lock *lock);
void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc);
void ldlm_lock_addref(struct ldlm_lock *lock, __u32 mode);
void ldlm_lock_decref(struct ldlm_lock *lock, __u32 mode);
int cookie_len, int *flags,
ldlm_lock_callback completion,
ldlm_lock_callback blocking);
-struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
+struct ldlm_resource *ldlm_convert(struct ldlm_lock *lock, int new_mode,
int *flags);
void ldlm_lock_cancel(struct ldlm_lock *lock);
void ldlm_reprocess_all(struct ldlm_resource *res);
struct ldlm_lock *lock = NULL;
ENTRY;
- if (!handle)
+ if (!handle || !handle->addr)
RETURN(NULL);
- lock = (struct ldlm_lock *)(unsigned long)(handle->addr);
+ lock = (struct ldlm_lock *)(unsigned long)(handle->addr);
if (!kmem_cache_validate(ldlm_lock_slab, (void *)lock))
RETURN(NULL);
if (!list_empty(&lock->l_res_link))
LBUG();
+ if (lock->l_flags & LDLM_FL_DESTROYED) {
+ EXIT;
+ return;
+ }
+
lock->l_flags = LDLM_FL_DESTROYED;
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
ldlm_lock_put(lock);
RETURN(NULL);
memset(lock, 0, sizeof(*lock));
- get_random_bytes(&lock->l_cookie, sizeof(__u64));
+ get_random_bytes(&lock->l_random, sizeof(__u64));
lock->l_resource = resource;
lock->l_refc = 1;
return rc;
}
-/* Must be called without the resource lock held. Returns a referenced,
- * unlocked ldlm_lock. */
+/* Returns a referenced, lock */
+
struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
struct lustre_handle *parent_lock_handle,
__u64 *res_id, __u32 type,
struct ldlm_resource *res, *parent_res = NULL;
struct ldlm_lock *lock, *parent_lock;
- parent_lock = lustre_handle2object(parent_lock_handle);
+ parent_lock = ldlm_handle2lock(parent_lock_handle);
if (parent_lock)
parent_res = parent_lock->l_resource;
if (!local && (policy = ldlm_res_policy_table[res->lr_type])) {
int rc;
- /* We do this dancing with refcounts and locks because the
- * policy function could send an RPC */
ldlm_resource_getref(res);
rc = policy(lock, cookie, lock->l_req_mode, NULL);
- if (ldlm_resource_put(res) && rc != ELDLM_LOCK_CHANGED)
- /* ldlm_resource_put() should not destroy 'res' unless
- * 'res' is no longer the resource for this lock. */
- LBUG();
-
if (rc == ELDLM_LOCK_CHANGED) {
res = lock->l_resource;
*flags |= LDLM_FL_LOCK_CHANGED;
}
/* If this is a local resource, put it on the appropriate list. */
+ /* FIXME: don't like this: can we call ldlm_resource_unlink_lock? */
list_del_init(&lock->l_res_link);
if (local) {
if (*flags & LDLM_FL_BLOCK_CONV)
}
/* Must be called with lock and lock->l_resource unlocked */
-struct ldlm_resource *ldlm_convert(struct lustre_handle *lockh, int new_mode, int *flags)
+struct ldlm_resource *ldlm_convert(struct ldlm_lock *lock, int new_mode, int *flags)
{
- struct ldlm_lock *lock;
struct ldlm_resource *res;
struct ldlm_namespace *ns;
ENTRY;
- lock = lustre_handle2object(lockh);
res = lock->l_resource;
ns = res->lr_namespace;
req->rq_status = EINVAL;
} else {
LDLM_DEBUG(lock, "server-side convert handler START");
- ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode,
+ ldlm_convert(lock, dlm_req->lock_desc.l_req_mode,
&dlm_rep->lock_flags);
req->rq_status = 0;
}
EXPORT_SYMBOL(ldlm_cli_convert);
EXPORT_SYMBOL(ldlm_cli_enqueue);
EXPORT_SYMBOL(ldlm_cli_cancel);
-EXPORT_SYMBOL(lustre_handle2object);
EXPORT_SYMBOL(ldlm_test);
EXPORT_SYMBOL(ldlm_lock_dump);
EXPORT_SYMBOL(ldlm_namespace_new);
data, data_len);
if (lock == NULL)
GOTO(out, rc = -ENOMEM);
+ ldlm_lock2handle(lock, lockh);
LDLM_DEBUG(lock, "client-side enqueue START");
/* Dump all of this data into the request buffer */
body = lustre_msg_buf(req->rq_reqmsg, 0);
- body->lock_desc.l_resource.lr_type = type;
- memcpy(body->lock_desc.l_resource.lr_name, res_id,
- sizeof(body->lock_desc.l_resource.lr_name));
-
- body->lock_desc.l_req_mode = mode;
+ ldlm_lock2desc(lock, &body->lock_desc);
+ /* Phil: make this part of ldlm_lock2desc */
if (type == LDLM_EXTENT)
memcpy(&body->lock_desc.l_extent, cookie,
sizeof(body->lock_desc.l_extent));
lock->l_client = cl;
rc = ptlrpc_queue_wait(req);
+ /* FIXME: status check here? */
rc = ptlrpc_check_status(req, rc);
if (rc != ELDLM_OK) {
GOTO(out, rc);
reply = lustre_msg_buf(req->rq_repmsg, 0);
- res = ldlm_lock_convert(lock, new_mode, &reply->lock_flags);
+ res = ldlm_convert(lock, new_mode, &reply->lock_flags);
if (res != NULL)
ldlm_reprocess_all(res);
if (lock->l_req_mode != lock->l_granted_mode) {
CERROR("Freeing a lock still held by a client node.\n");
ldlm_resource_unlink_lock(lock);
- ldlm_lock_free(lock);
+ ldlm_lock_destroy(lock);
rc = ldlm_resource_put(res);
}
RETURN(res);
}
-struct ldlm_resource *ldlm_resource_addref(struct ldlm_resource *res)
+struct ldlm_resource *ldlm_resource_getref(struct ldlm_resource *res)
{
atomic_inc(&res->lr_refcount);
return res;
LBUG();
ldlm_resource_dump(res);
- res = ldlm_lock_convert(lock1, LCK_NL, &flags);
+ res = ldlm_convert(lock1, LCK_NL, &flags);
if (res != NULL)
ldlm_reprocess_all(res);
/* Convert/cancel blocking locks */
flags = 0;
- res = ldlm_lock_convert(lock1, LCK_NL, &flags);
+ res = ldlm_convert(lock1, LCK_NL, &flags);
if (res != NULL)
ldlm_reprocess_all(res);