extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req);
extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req);
-/* _ldlm_callback and local_callback setup the variables then call this common
- * code */
-static int common_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
- ldlm_mode_t mode, void *data, __u32 data_len,
- struct ptlrpc_request **reqp)
-{
- ENTRY;
-
- if (!lock)
- LBUG();
- if (!lock->l_resource)
- LBUG();
-
- ldlm_lock_dump(lock);
-
- spin_lock(&lock->l_resource->lr_lock);
- spin_lock(&lock->l_lock);
- if (!new) {
- CDEBUG(D_INFO, "Got local completion AST for lock %p.\n", lock);
- lock->l_req_mode = mode;
-
- /* FIXME: the API is flawed if I have to do these refcount
- * acrobatics (along with the _put() below). */
- lock->l_resource->lr_refcount++;
-
- /* _del_lock is safe for half-created locks that are not yet on
- * a list. */
- ldlm_resource_del_lock(lock);
- ldlm_grant_lock(lock->l_resource, lock);
-
- ldlm_resource_put(lock->l_resource);
-
- wake_up(&lock->l_waitq);
- spin_unlock(&lock->l_lock);
- spin_unlock(&lock->l_resource->lr_lock);
- } else {
- CDEBUG(D_INFO, "Got local blocking AST for lock %p.\n", lock);
- lock->l_flags |= LDLM_FL_DYING;
- spin_unlock(&lock->l_lock);
- spin_unlock(&lock->l_resource->lr_lock);
- if (!lock->l_readers && !lock->l_writers) {
- CDEBUG(D_INFO, "Lock already unused, calling "
- "callback (%p).\n", lock->l_blocking_ast);
- if (lock->l_blocking_ast != NULL)
- lock->l_blocking_ast(lock, new, lock->l_data,
- lock->l_data_len, reqp);
- } else {
- CDEBUG(D_INFO, "Lock still has references; lock will be"
- " cancelled later.\n");
- }
- }
- RETURN(0);
-}
-
static int _ldlm_enqueue(struct obd_device *obddev, struct ptlrpc_service *svc,
struct ptlrpc_request *req)
{
struct ptlrpc_request *req)
{
struct ldlm_request *dlm_req;
- struct ldlm_lock *lock1, *lock2;
+ struct ldlm_lock *lock, *new;
int rc;
ENTRY;
if (rc != 0)
RETURN(rc);
- lock1 = lustre_handle2object(&dlm_req->lock_handle1);
- lock2 = lustre_handle2object(&dlm_req->lock_handle2);
+ lock = lustre_handle2object(&dlm_req->lock_handle1);
+ new = lustre_handle2object(&dlm_req->lock_handle2);
+
+ LDLM_DEBUG(lock, "client %s callback handler START",
+ new == NULL ? "completion" : "blocked");
+
+ spin_lock(&lock->l_resource->lr_lock);
+ spin_lock(&lock->l_lock);
+ if (!new) {
+ CDEBUG(D_INFO, "Got local completion AST for lock %p.\n", lock);
+ lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
+
+ /* If we receive the completion AST before the actual enqueue
+ * returned, then we might need to switch resources. */
+ if (memcmp(dlm_req->lock_desc.l_resource.lr_name,
+ lock->l_resource->lr_name,
+ sizeof(__u64) * RES_NAME_SIZE) != 0) {
+ struct ldlm_namespace *ns =
+ lock->l_resource->lr_namespace;
+ int type = lock->l_resource->lr_type;
+
+ if (!ldlm_resource_put(lock->l_resource))
+ spin_unlock(&lock->l_resource->lr_lock);
+
+ lock->l_resource = ldlm_resource_get(ns, NULL, dlm_req->lock_desc.l_resource.lr_name, type, 1);
+ if (lock->l_resource == NULL) {
+ LBUG();
+ RETURN(-ENOMEM);
+ }
+ spin_lock(&lock->l_resource->lr_lock);
+ LDLM_DEBUG(lock, "completion AST, new resource");
+ }
+
+ /* FIXME: the API is flawed if I have to do these refcount
+ * acrobatics (along with the _put() below). */
+ lock->l_resource->lr_refcount++;
+
+ /* _del_lock is safe for half-created locks that are not yet on
+ * a list. */
+ ldlm_resource_del_lock(lock);
+ ldlm_grant_lock(lock->l_resource, lock);
- LDLM_DEBUG(lock1, "client %s callback handler START",
- lock2 == NULL ? "completion" : "blocked");
+ ldlm_resource_put(lock->l_resource);
- common_callback(lock1, lock2, dlm_req->lock_desc.l_granted_mode, NULL,
- 0, NULL);
+ wake_up(&lock->l_waitq);
+ spin_unlock(&lock->l_lock);
+ spin_unlock(&lock->l_resource->lr_lock);
+ } else {
+ CDEBUG(D_INFO, "Got local blocking AST for lock %p.\n", lock);
+ lock->l_flags |= LDLM_FL_DYING;
+ spin_unlock(&lock->l_lock);
+ spin_unlock(&lock->l_resource->lr_lock);
+ if (!lock->l_readers && !lock->l_writers) {
+ CDEBUG(D_INFO, "Lock already unused, calling "
+ "callback (%p).\n", lock->l_blocking_ast);
+ if (lock->l_blocking_ast != NULL)
+ lock->l_blocking_ast(lock, new, lock->l_data,
+ lock->l_data_len, NULL);
+ } else {
+ CDEBUG(D_INFO, "Lock still has references; lock will be"
+ " cancelled later.\n");
+ }
+ }
LDLM_DEBUG_NOLOCK("client %s callback handler END (lock: %p)",
- lock2 == NULL ? "completion" : "blocked", lock1);
+ new == NULL ? "completion" : "blocked", lock);
RETURN(0);
}
req->rq_replen = lustre_msg_size(1, &size);
}
+ lock->l_connection = conn;
+ lock->l_client = cl;
+
rc = ptlrpc_queue_wait(req);
rc = ptlrpc_check_status(req, rc);
+
+ spin_lock(&lock->l_lock);
if (rc != ELDLM_OK) {
spin_lock(&lock->l_resource->lr_lock);
ldlm_resource_put(lock->l_resource);
GOTO(out, rc);
}
- lock->l_connection = conn;
- lock->l_client = cl;
reply = lustre_msg_buf(req->rq_repmsg, 0);
memcpy(&lock->l_remote_handle, &reply->lock_handle,
sizeof(lock->l_remote_handle));
(unsigned long long)reply->lock_extent.start,
(unsigned long long)reply->lock_extent.end);
- if (*flags & LDLM_FL_LOCK_CHANGED) {
+ /* If enqueue returned a blocked lock but the completion handler has
+ * already run, then it fixed up the resource and we don't need to do it
+ * again. */
+ if (*flags & LDLM_FL_LOCK_CHANGED &&
+ lock->l_req_mode != lock->l_granted_mode) {
CDEBUG(D_INFO, "remote intent success, locking %ld instead of"
"%ld\n", (long)reply->lock_resource_name[0],
(long)lock->l_resource->lr_name[0]);
- ldlm_resource_put(lock->l_resource);
+ spin_lock(&lock->l_resource->lr_lock);
+ if (!ldlm_resource_put(lock->l_resource))
+ spin_unlock(&lock->l_resource->lr_lock);
lock->l_resource =
ldlm_resource_get(ns, NULL, reply->lock_resource_name,
if (!req_passed_in)
ptlrpc_free_req(req);
+ spin_unlock(&lock->l_lock);
rc = ldlm_local_lock_enqueue(lockh, cookie, cookielen, flags, callback,
callback);