+ ldlm_resource_putref(res);
+
+ RETURN(0);
+}
+
+static inline int have_no_nsresource(struct ldlm_namespace *ns)
+{
+ int no_resource = 0;
+
+ spin_lock(&ns->ns_counter_lock);
+ if (ns->ns_resources == 0)
+ no_resource = 1;
+ spin_unlock(&ns->ns_counter_lock);
+
+ RETURN(no_resource);
+}
+
+/* Cancel all locks on a namespace (or a specific resource, if given)
+ * that have 0 readers/writers.
+ *
+ * If flags & LDLM_FL_LOCAL_ONLY, throw the locks away without trying
+ * to notify the server.
+ * If flags & LDLM_FL_NO_CALLBACK, don't run the cancel callback.
+ * If flags & LDLM_FL_WARN, print a warning if some locks are still in use.
+ * If flags & LDLM_FL_CONFIG_CHANGE, mark all locks as having a pending callback
+ */
+int ldlm_cli_cancel_unused(struct ldlm_namespace *ns,
+ struct ldlm_res_id *res_id, int flags, void *opaque)
+{
+ int i;
+ struct l_wait_info lwi = { 0 };
+ ENTRY;
+
+ if (ns == NULL)
+ RETURN(ELDLM_OK);
+
+ if (res_id)
+ RETURN(ldlm_cli_cancel_unused_resource(ns, *res_id, flags,
+ opaque));
+
+ l_lock(&ns->ns_lock);
+ for (i = 0; i < RES_HASH_SIZE; i++) {
+ struct list_head *tmp, *next;
+ list_for_each_safe(tmp, next, &(ns->ns_hash[i])) {
+ int rc;
+ struct ldlm_resource *res;
+ res = list_entry(tmp, struct ldlm_resource, lr_hash);
+ ldlm_resource_getref(res);
+ l_unlock(&ns->ns_lock);
+
+ rc = ldlm_cli_cancel_unused_resource(ns, res->lr_name,
+ flags, opaque);
+ if (rc)
+ CERROR("cancel_unused_res ("LPU64"): %d\n",
+ res->lr_name.name[0], rc);
+
+ l_lock(&ns->ns_lock);
+ next = tmp->next;
+ ldlm_resource_putref(res);
+ }
+ }
+ l_unlock(&ns->ns_lock);
+ if (flags & LDLM_FL_CONFIG_CHANGE)
+ l_wait_event(ns->ns_waitq, have_no_nsresource(ns), &lwi);
+
+ RETURN(ELDLM_OK);
+}
+
+/* Lock iterators. */
+
+int ldlm_resource_foreach(struct ldlm_resource *res, ldlm_iterator_t iter,
+ void *closure)
+{
+ struct list_head *tmp, *next;
+ struct ldlm_lock *lock;
+ int rc = LDLM_ITER_CONTINUE;
+ struct ldlm_namespace *ns = res->lr_namespace;
+
+ ENTRY;
+
+ if (!res)
+ RETURN(LDLM_ITER_CONTINUE);
+
+ l_lock(&ns->ns_lock);
+ list_for_each_safe(tmp, next, &res->lr_granted) {
+ lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+
+ if (iter(lock, closure) == LDLM_ITER_STOP)
+ GOTO(out, rc = LDLM_ITER_STOP);
+ }
+
+ list_for_each_safe(tmp, next, &res->lr_converting) {
+ lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+
+ if (iter(lock, closure) == LDLM_ITER_STOP)
+ GOTO(out, rc = LDLM_ITER_STOP);
+ }
+
+ list_for_each_safe(tmp, next, &res->lr_waiting) {
+ lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+
+ if (iter(lock, closure) == LDLM_ITER_STOP)
+ GOTO(out, rc = LDLM_ITER_STOP);
+ }
+ out:
+ l_unlock(&ns->ns_lock);
+ RETURN(rc);
+}
+
+struct iter_helper_data {
+ ldlm_iterator_t iter;
+ void *closure;
+};
+
+static int ldlm_iter_helper(struct ldlm_lock *lock, void *closure)
+{
+ struct iter_helper_data *helper = closure;
+ return helper->iter(lock, helper->closure);
+}
+
+static int ldlm_res_iter_helper(struct ldlm_resource *res, void *closure)
+{
+ return ldlm_resource_foreach(res, ldlm_iter_helper, closure);
+}
+
+int ldlm_namespace_foreach(struct ldlm_namespace *ns, ldlm_iterator_t iter,
+ void *closure)
+{
+ struct iter_helper_data helper = { iter: iter, closure: closure };
+ return ldlm_namespace_foreach_res(ns, ldlm_res_iter_helper, &helper);
+}
+
+int ldlm_namespace_foreach_res(struct ldlm_namespace *ns,
+ ldlm_res_iterator_t iter, void *closure)
+{
+ int i, rc = LDLM_ITER_CONTINUE;
+
+ l_lock(&ns->ns_lock);
+ for (i = 0; i < RES_HASH_SIZE; i++) {
+ struct list_head *tmp, *next;
+ list_for_each_safe(tmp, next, &(ns->ns_hash[i])) {
+ struct ldlm_resource *res =
+ list_entry(tmp, struct ldlm_resource, lr_hash);
+
+ ldlm_resource_getref(res);
+ rc = iter(res, closure);
+ ldlm_resource_putref(res);
+ if (rc == LDLM_ITER_STOP)
+ GOTO(out, rc);
+ }
+ }
+ out:
+ l_unlock(&ns->ns_lock);
+ RETURN(rc);
+}
+
+/* non-blocking function to manipulate a lock whose cb_data is being put away.*/
+void ldlm_change_cbdata(struct ldlm_namespace *ns, struct ldlm_res_id *res_id,
+ ldlm_iterator_t iter, void *data)
+{
+ struct ldlm_resource *res;
+ ENTRY;
+
+ if (ns == NULL) {
+ CERROR("must pass in namespace");
+ LBUG();
+ }
+
+ res = ldlm_resource_get(ns, NULL, *res_id, 0, 0);
+ if (res == NULL) {
+ EXIT;
+ return;
+ }
+
+ l_lock(&ns->ns_lock);
+ ldlm_resource_foreach(res, iter, data);
+ l_unlock(&ns->ns_lock);
+ ldlm_resource_putref(res);
+ EXIT;
+}
+
+/* Lock replay */
+
+static int ldlm_chain_lock_for_replay(struct ldlm_lock *lock, void *closure)
+{
+ struct list_head *list = closure;
+
+ /* we use l_pending_chain here, because it's unused on clients. */
+ list_add(&lock->l_pending_chain, list);
+ return LDLM_ITER_CONTINUE;
+}
+
+static int replay_lock_interpret(struct ptlrpc_request *req,
+ void * data, int rc)
+{
+ struct ldlm_lock *lock;
+ struct ldlm_reply *reply;
+
+ atomic_dec(&req->rq_import->imp_replay_inflight);
+ if (rc != ELDLM_OK)
+ GOTO(out, rc);
+
+ lock = req->rq_async_args.pointer_arg[0];
+ LASSERT(lock != NULL);
+
+ reply = lustre_swab_repbuf(req, 0, sizeof (*reply),
+ lustre_swab_ldlm_reply);
+ if (reply == NULL) {
+ CERROR("Can't unpack ldlm_reply\n");
+ GOTO (out, rc = -EPROTO);
+ }
+
+ memcpy(&lock->l_remote_handle, &reply->lock_handle,
+ sizeof(lock->l_remote_handle));
+ LDLM_DEBUG(lock, "replayed lock:");
+ ptlrpc_import_recovery_state_machine(req->rq_import);
+ out:
+ RETURN(rc);
+}
+
+static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
+{
+ struct ptlrpc_request *req;
+ struct ldlm_request *body;
+ struct ldlm_reply *reply;
+ int buffers = 1;
+ int size[2];
+ int flags;
+
+ /*
+ * If granted mode matches the requested mode, this lock is granted.
+ *
+ * If they differ, but we have a granted mode, then we were granted
+ * one mode and now want another: ergo, converting.
+ *
+ * If we haven't been granted anything and are on a resource list,
+ * then we're blocked/waiting.
+ *
+ * If we haven't been granted anything and we're NOT on a resource list,
+ * then we haven't got a reply yet and don't have a known disposition.
+ * This happens whenever a lock enqueue is the request that triggers
+ * recovery.
+ */
+ if (lock->l_granted_mode == lock->l_req_mode)
+ flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_GRANTED;
+ else if (lock->l_granted_mode)
+ flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_CONV;
+ else if (!list_empty(&lock->l_res_link))
+ flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_WAIT;
+ else
+ flags = LDLM_FL_REPLAY;
+
+ size[0] = sizeof(*body);
+ req = ptlrpc_prep_req(imp, LDLM_ENQUEUE, 1, size, NULL);
+ if (!req)
+ RETURN(-ENOMEM);
+
+ /* We're part of recovery, so don't wait for it. */
+ req->rq_send_state = LUSTRE_IMP_REPLAY_LOCKS;
+
+ body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
+ ldlm_lock2desc(lock, &body->lock_desc);
+ body->lock_flags = flags;
+
+ ldlm_lock2handle(lock, &body->lock_handle1);
+ size[0] = sizeof(*reply);
+ if (lock->l_lvb_len != 0) {
+ buffers = 2;
+ size[1] = lock->l_lvb_len;
+ }
+ req->rq_replen = lustre_msg_size(buffers, size);
+
+ LDLM_DEBUG(lock, "replaying lock:");
+
+ atomic_inc(&req->rq_import->imp_replay_inflight);
+ req->rq_async_args.pointer_arg[0] = lock;
+ req->rq_interpret_reply = replay_lock_interpret;
+ ptlrpcd_add_req(req);