+ ENTRY;
+
+ if (!res)
+ RETURN(LDLM_ITER_CONTINUE);
+
+ lock_res(res);
+ list_for_each_safe(tmp, next, &res->lr_granted) {
+ lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+
+ if (iter(lock, closure) == LDLM_ITER_STOP)
+ GOTO(out, rc = LDLM_ITER_STOP);
+ }
+
+ list_for_each_safe(tmp, next, &res->lr_converting) {
+ lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+
+ if (iter(lock, closure) == LDLM_ITER_STOP)
+ GOTO(out, rc = LDLM_ITER_STOP);
+ }
+
+ list_for_each_safe(tmp, next, &res->lr_waiting) {
+ lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+
+ if (iter(lock, closure) == LDLM_ITER_STOP)
+ GOTO(out, rc = LDLM_ITER_STOP);
+ }
+ out:
+ unlock_res(res);
+ RETURN(rc);
+}
+
+struct iter_helper_data {
+ ldlm_iterator_t iter;
+ void *closure;
+};
+
+static int ldlm_iter_helper(struct ldlm_lock *lock, void *closure)
+{
+ struct iter_helper_data *helper = closure;
+ return helper->iter(lock, helper->closure);
+}
+
+static int ldlm_res_iter_helper(struct ldlm_resource *res, void *closure)
+{
+ return ldlm_resource_foreach(res, ldlm_iter_helper, closure);
+}
+
+int ldlm_namespace_foreach(struct ldlm_namespace *ns, ldlm_iterator_t iter,
+ void *closure)
+{
+ struct iter_helper_data helper = { iter: iter, closure: closure };
+ return ldlm_namespace_foreach_res(ns, ldlm_res_iter_helper, &helper);
+}
+
+int ldlm_namespace_foreach_res(struct ldlm_namespace *ns,
+ ldlm_res_iterator_t iter, void *closure)
+{
+ int i, rc = LDLM_ITER_CONTINUE;
+ struct ldlm_resource *res;
+ struct list_head *tmp;
+
+ ENTRY;
+ spin_lock(&ns->ns_hash_lock);
+ for (i = 0; i < RES_HASH_SIZE; i++) {
+ tmp = ns->ns_hash[i].next;
+ while (tmp != &(ns->ns_hash[i])) {
+ res = list_entry(tmp, struct ldlm_resource, lr_hash);
+ ldlm_resource_getref(res);
+ spin_unlock(&ns->ns_hash_lock);
+
+ rc = iter(res, closure);
+
+ spin_lock(&ns->ns_hash_lock);
+ tmp = tmp->next;
+ ldlm_resource_putref_locked(res);
+ if (rc == LDLM_ITER_STOP)
+ GOTO(out, rc);
+ }
+ }
+ out:
+ spin_unlock(&ns->ns_hash_lock);
+ RETURN(rc);
+}
+
+/* non-blocking function to manipulate a lock whose cb_data is being put away.*/
+void ldlm_resource_iterate(struct ldlm_namespace *ns,
+ const struct ldlm_res_id *res_id,
+ ldlm_iterator_t iter, void *data)
+{
+ struct ldlm_resource *res;
+ ENTRY;
+
+ if (ns == NULL) {
+ CERROR("must pass in namespace\n");
+ LBUG();
+ }
+
+ res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
+ if (res == NULL) {
+ EXIT;
+ return;
+ }
+
+ ldlm_resource_foreach(res, iter, data);
+ ldlm_resource_putref(res);
+ EXIT;
+}
+
+/* Lock replay */
+
+static int ldlm_chain_lock_for_replay(struct ldlm_lock *lock, void *closure)
+{
+ struct list_head *list = closure;
+
+ /* we use l_pending_chain here, because it's unused on clients. */
+ LASSERTF(list_empty(&lock->l_pending_chain),"lock %p next %p prev %p\n",
+ lock, &lock->l_pending_chain.next,&lock->l_pending_chain.prev);
+ /* bug 9573: don't replay locks left after eviction */
+ if (!(lock->l_flags & LDLM_FL_FAILED))
+ list_add(&lock->l_pending_chain, list);
+ return LDLM_ITER_CONTINUE;
+}
+
+static int replay_lock_interpret(struct ptlrpc_request *req,
+ struct ldlm_async_args *aa, int rc)
+{
+ struct ldlm_lock *lock;
+ struct ldlm_reply *reply;
+
+ ENTRY;
+ atomic_dec(&req->rq_import->imp_replay_inflight);
+ if (rc != ELDLM_OK)
+ GOTO(out, rc);
+
+
+ reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
+ if (reply == NULL)
+ GOTO(out, rc = -EPROTO);
+
+ lock = ldlm_handle2lock(&aa->lock_handle);
+ if (!lock) {
+ CERROR("received replay ack for unknown local cookie "LPX64
+ " remote cookie "LPX64 " from server %s id %s\n",
+ aa->lock_handle.cookie, reply->lock_handle.cookie,
+ req->rq_export->exp_client_uuid.uuid,
+ libcfs_id2str(req->rq_peer));
+ GOTO(out, rc = -ESTALE);
+ }
+
+ lock->l_remote_handle = reply->lock_handle;
+ LDLM_DEBUG(lock, "replayed lock:");
+ ptlrpc_import_recovery_state_machine(req->rq_import);
+ LDLM_LOCK_PUT(lock);
+out:
+ if (rc != ELDLM_OK)
+ ptlrpc_connect_import(req->rq_import, NULL);
+
+
+ RETURN(rc);
+}
+
+static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
+{
+ struct ptlrpc_request *req;
+ struct ldlm_async_args *aa;
+ struct ldlm_request *body;
+ int flags;
+ ENTRY;
+
+
+ /* Bug 11974: Do not replay a lock which is actively being canceled */
+ if (lock->l_flags & LDLM_FL_CANCELING) {
+ LDLM_DEBUG(lock, "Not replaying canceled lock:");
+ RETURN(0);
+ }
+
+ /* If this is reply-less callback lock, we cannot replay it, since
+ * server might have long dropped it, but notification of that event was
+ * lost by network. (and server granted conflicting lock already) */
+ if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK) {
+ LDLM_DEBUG(lock, "Not replaying reply-less lock:");
+ ldlm_lock_cancel(lock);
+ RETURN(0);
+ }
+ /*
+ * If granted mode matches the requested mode, this lock is granted.
+ *
+ * If they differ, but we have a granted mode, then we were granted
+ * one mode and now want another: ergo, converting.
+ *
+ * If we haven't been granted anything and are on a resource list,
+ * then we're blocked/waiting.
+ *
+ * If we haven't been granted anything and we're NOT on a resource list,
+ * then we haven't got a reply yet and don't have a known disposition.
+ * This happens whenever a lock enqueue is the request that triggers
+ * recovery.
+ */
+ if (lock->l_granted_mode == lock->l_req_mode)
+ flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_GRANTED;
+ else if (lock->l_granted_mode)
+ flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_CONV;
+ else if (!list_empty(&lock->l_res_link))
+ flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_WAIT;
+ else
+ flags = LDLM_FL_REPLAY;
+
+ req = ptlrpc_request_alloc_pack(imp, &RQF_LDLM_ENQUEUE,
+ LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
+ if (req == NULL)
+ RETURN(-ENOMEM);
+
+ /* We're part of recovery, so don't wait for it. */
+ req->rq_send_state = LUSTRE_IMP_REPLAY_LOCKS;
+
+ body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
+ ldlm_lock2desc(lock, &body->lock_desc);
+ body->lock_flags = flags;
+
+ ldlm_lock2handle(lock, &body->lock_handle[0]);
+ if (lock->l_lvb_len != 0) {
+ req_capsule_extend(&req->rq_pill, &RQF_LDLM_ENQUEUE_LVB);
+ req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
+ lock->l_lvb_len);
+ }
+ ptlrpc_request_set_replen(req);
+ /* notify the server we've replayed all requests.
+ * also, we mark the request to be put on a dedicated
+ * queue to be processed after all request replayes.
+ * bug 6063 */
+ lustre_msg_set_flags(req->rq_reqmsg, MSG_REQ_REPLAY_DONE);
+
+ LDLM_DEBUG(lock, "replaying lock:");
+
+ atomic_inc(&req->rq_import->imp_replay_inflight);
+ CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
+ aa = (struct ldlm_async_args *)&req->rq_async_args;
+ aa->lock_handle = body->lock_handle[0];
+ req->rq_interpret_reply = replay_lock_interpret;
+ ptlrpcd_add_req(req);
+
+ RETURN(0);
+}
+
+int ldlm_replay_locks(struct obd_import *imp)
+{
+ struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
+ struct list_head list;
+ struct ldlm_lock *lock, *next;
+ int rc = 0;
+
+ ENTRY;
+ CFS_INIT_LIST_HEAD(&list);
+
+ LASSERT(atomic_read(&imp->imp_replay_inflight) == 0);
+
+ /* ensure this doesn't fall to 0 before all have been queued */
+ atomic_inc(&imp->imp_replay_inflight);
+
+ (void)ldlm_namespace_foreach(ns, ldlm_chain_lock_for_replay, &list);
+
+ list_for_each_entry_safe(lock, next, &list, l_pending_chain) {
+ list_del_init(&lock->l_pending_chain);
+ if (rc)
+ continue; /* or try to do the rest? */
+ rc = replay_one_lock(imp, lock);
+ }
+
+ atomic_dec(&imp->imp_replay_inflight);
+
+ RETURN(rc);