+ struct ldlm_async_args *aa = args;
+ struct ldlm_lock *lock;
+ struct ldlm_reply *reply;
+
+ ENTRY;
+
+ lock = ldlm_handle2lock(&aa->lock_handle);
+ if (!lock) {
+ LDLM_DEBUG_NOLOCK("convert ACK for unknown local cookie %#llx",
+ aa->lock_handle.cookie);
+ RETURN(-ESTALE);
+ }
+
+ LDLM_DEBUG(lock, "CONVERTED lock:");
+
+ if (rc != ELDLM_OK)
+ GOTO(out, rc);
+
+ reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
+ if (reply == NULL)
+ GOTO(out, rc = -EPROTO);
+
+ if (reply->lock_handle.cookie != aa->lock_handle.cookie) {
+ LDLM_ERROR(lock,
+ "convert ACK with wrong lock cookie %#llx but cookie %#llx from server %s id %s\n",
+ aa->lock_handle.cookie, reply->lock_handle.cookie,
+ req->rq_export->exp_client_uuid.uuid,
+ libcfs_id2str(req->rq_peer));
+ GOTO(out, rc = ELDLM_NO_LOCK_DATA);
+ }
+
+ lock_res_and_lock(lock);
+ /*
+ * Lock convert is sent for any new bits to drop, the converting flag
+ * is dropped when ibits on server are the same as on client. Meanwhile
+ * that can be so that more later convert will be replied first with
+ * and clear converting flag, so in case of such race just exit here.
+ * if lock has no converting bits then
+ */
+ if (!ldlm_is_converting(lock)) {
+ LDLM_DEBUG(lock,
+ "convert ACK for lock without converting flag, reply ibits %#llx",
+ reply->lock_desc.l_policy_data.l_inodebits.bits);
+ } else if (reply->lock_desc.l_policy_data.l_inodebits.bits !=
+ lock->l_policy_data.l_inodebits.bits) {
+ /*
+ * Compare server returned lock ibits and local lock ibits
+ * if they are the same we consider convertion is done,
+ * otherwise we have more converts inflight and keep
+ * converting flag.
+ */
+ LDLM_DEBUG(lock, "convert ACK with ibits %#llx\n",
+ reply->lock_desc.l_policy_data.l_inodebits.bits);
+ } else {
+ ldlm_clear_converting(lock);
+
+ /*
+ * Concurrent BL AST may arrive and cause another convert
+ * or cancel so just do nothing here if bl_ast is set,
+ * finish with convert otherwise.
+ */
+ if (!ldlm_is_bl_ast(lock)) {
+ struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
+
+ /*
+ * Drop cancel_bits since there are no more converts
+ * and put lock into LRU if it is still not used and
+ * is not there yet.
+ */
+ lock->l_policy_data.l_inodebits.cancel_bits = 0;
+ if (!lock->l_readers && !lock->l_writers &&
+ !ldlm_is_canceling(lock)) {
+ spin_lock(&ns->ns_lock);
+ /* there is check for list_empty() inside */
+ ldlm_lock_remove_from_lru_nolock(lock);
+ ldlm_lock_add_to_lru_nolock(lock);
+ spin_unlock(&ns->ns_lock);
+ }
+ }
+ }
+ unlock_res_and_lock(lock);
+out:
+ if (rc) {
+ int flag;
+
+ lock_res_and_lock(lock);
+ if (ldlm_is_converting(lock)) {
+ ldlm_clear_converting(lock);
+ ldlm_set_cbpending(lock);
+ ldlm_set_bl_ast(lock);
+ lock->l_policy_data.l_inodebits.cancel_bits = 0;
+ }
+ unlock_res_and_lock(lock);
+
+ /*
+ * fallback to normal lock cancel. If rc means there is no
+ * valid lock on server, do only local cancel
+ */
+ if (rc == ELDLM_NO_LOCK_DATA)
+ flag = LCF_LOCAL;
+ else
+ flag = LCF_ASYNC;
+
+ rc = ldlm_cli_cancel(&aa->lock_handle, flag);
+ if (rc < 0)
+ LDLM_DEBUG(lock, "failed to cancel lock: rc = %d\n",
+ rc);
+ }
+ LDLM_LOCK_PUT(lock);
+ RETURN(rc);