return LDLM_ITER_CONTINUE;
}
+static int replay_lock_interpret(struct ptlrpc_request *req,
+ void * data, int rc)
+{
+ struct ldlm_lock *lock;
+ struct ldlm_reply *reply;
+
+ atomic_dec(&req->rq_import->imp_replay_inflight);
+ if (rc != ELDLM_OK)
+ GOTO(out, rc);
+
+ lock = req->rq_async_args.pointer_arg[0];
+ LASSERT(lock != NULL);
+
+ reply = lustre_swab_repbuf(req, 0, sizeof (*reply),
+ lustre_swab_ldlm_reply);
+ if (reply == NULL) {
+ CERROR("Can't unpack ldlm_reply\n");
+ GOTO (out, rc = -EPROTO);
+ }
+
+ memcpy(&lock->l_remote_handle, &reply->lock_handle,
+ sizeof(lock->l_remote_handle));
+ LDLM_DEBUG(lock, "replayed lock:");
+ ptlrpc_import_recovery_state_machine(req->rq_import);
+ out:
+ RETURN(rc);
+}
+
static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
{
struct ptlrpc_request *req;
struct ldlm_request *body;
struct ldlm_reply *reply;
- int rc, size;
+ int size;
int flags;
/*
RETURN(-ENOMEM);
/* We're part of recovery, so don't wait for it. */
- req->rq_send_state = LUSTRE_IMP_REPLAY;
+ req->rq_send_state = LUSTRE_IMP_REPLAY_LOCKS;
body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
ldlm_lock2desc(lock, &body->lock_desc);
req->rq_replen = lustre_msg_size(1, &size);
LDLM_DEBUG(lock, "replaying lock:");
- rc = ptlrpc_queue_wait(req);
- if (rc != ELDLM_OK)
- GOTO(out, rc);
- reply = lustre_swab_repbuf(req, 0, sizeof (*reply),
- lustre_swab_ldlm_reply);
- if (reply == NULL) {
- CERROR("Can't unpack ldlm_reply\n");
- GOTO (out, rc = -EPROTO);
- }
+ atomic_inc(&req->rq_import->imp_replay_inflight);
+ req->rq_async_args.pointer_arg[0] = lock;
+ req->rq_interpret_reply = replay_lock_interpret;
+ ptlrpcd_add_req(req);
- memcpy(&lock->l_remote_handle, &reply->lock_handle,
- sizeof(lock->l_remote_handle));
- LDLM_DEBUG(lock, "replayed lock:");
- out:
- ptlrpc_req_finished(req);
- RETURN(rc);
+ RETURN(0);
}
int ldlm_replay_locks(struct obd_import *imp)
ENTRY;
INIT_LIST_HEAD(&list);
+ LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
+
+ /* ensure this doesn't fall to 0 before all have been queued */
+ atomic_inc(&imp->imp_replay_inflight);
+
l_lock(&ns->ns_lock);
(void)ldlm_namespace_foreach(ns, ldlm_chain_lock_for_replay, &list);
break; /* or try to do the rest? */
}
l_unlock(&ns->ns_lock);
+
+ atomic_dec(&imp->imp_replay_inflight);
+
RETURN(rc);
}