Whamcloud - gitweb
Landing b_recovery
[fs/lustre-release.git] / lustre / ldlm / ldlm_request.c
index 7e79cd8..ae9b202 100644 (file)
@@ -871,12 +871,40 @@ static int ldlm_chain_lock_for_replay(struct ldlm_lock *lock, void *closure)
         return LDLM_ITER_CONTINUE;
 }
 
+static int replay_lock_interpret(struct ptlrpc_request *req,
+                                    void * data, int rc)
+{
+        struct ldlm_lock *lock;
+        struct ldlm_reply *reply;
+
+        atomic_dec(&req->rq_import->imp_replay_inflight);
+        if (rc != ELDLM_OK)
+                GOTO(out, rc);
+
+        lock = req->rq_async_args.pointer_arg[0];
+        LASSERT(lock != NULL);
+
+        reply = lustre_swab_repbuf(req, 0, sizeof (*reply),
+                                   lustre_swab_ldlm_reply);
+        if (reply == NULL) {
+                CERROR("Can't unpack ldlm_reply\n");
+                GOTO (out, rc = -EPROTO);
+        }
+
+        memcpy(&lock->l_remote_handle, &reply->lock_handle,
+               sizeof(lock->l_remote_handle));
+        LDLM_DEBUG(lock, "replayed lock:");
+        ptlrpc_import_recovery_state_machine(req->rq_import);
+ out:
+        RETURN(rc);
+}
+
 static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
 {
         struct ptlrpc_request *req;
         struct ldlm_request *body;
         struct ldlm_reply *reply;
-        int rc, size;
+        int size;
         int flags;
 
         /*
@@ -908,7 +936,7 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
                 RETURN(-ENOMEM);
 
         /* We're part of recovery, so don't wait for it. */
-        req->rq_send_state = LUSTRE_IMP_REPLAY;
+        req->rq_send_state = LUSTRE_IMP_REPLAY_LOCKS;
 
         body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
         ldlm_lock2desc(lock, &body->lock_desc);
@@ -919,23 +947,13 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
         req->rq_replen = lustre_msg_size(1, &size);
 
         LDLM_DEBUG(lock, "replaying lock:");
-        rc = ptlrpc_queue_wait(req);
-        if (rc != ELDLM_OK)
-                GOTO(out, rc);
 
-        reply = lustre_swab_repbuf(req, 0, sizeof (*reply),
-                                   lustre_swab_ldlm_reply);
-        if (reply == NULL) {
-                CERROR("Can't unpack ldlm_reply\n");
-                GOTO (out, rc = -EPROTO);
-        }
+        atomic_inc(&req->rq_import->imp_replay_inflight);
+        req->rq_async_args.pointer_arg[0] = lock;
+        req->rq_interpret_reply = replay_lock_interpret;
+        ptlrpcd_add_req(req);
 
-        memcpy(&lock->l_remote_handle, &reply->lock_handle,
-               sizeof(lock->l_remote_handle));
-        LDLM_DEBUG(lock, "replayed lock:");
- out:
-        ptlrpc_req_finished(req);
-        RETURN(rc);
+        RETURN(0);
 }
 
 int ldlm_replay_locks(struct obd_import *imp)
@@ -948,6 +966,11 @@ int ldlm_replay_locks(struct obd_import *imp)
         ENTRY;
         INIT_LIST_HEAD(&list);
 
+        LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
+
+        /* ensure this doesn't fall to 0 before all have been queued */
+        atomic_inc(&imp->imp_replay_inflight);
+
         l_lock(&ns->ns_lock);
         (void)ldlm_namespace_foreach(ns, ldlm_chain_lock_for_replay, &list);
 
@@ -958,5 +981,8 @@ int ldlm_replay_locks(struct obd_import *imp)
                         break; /* or try to do the rest? */
         }
         l_unlock(&ns->ns_lock);
+
+        atomic_dec(&imp->imp_replay_inflight);
+
         RETURN(rc);
 }