Whamcloud - gitweb
LU-10467 ldlm: convert waiting in ldlm_completion_ast()
[fs/lustre-release.git] / lustre / ldlm / ldlm_request.c
index fa05c94..53791f2 100644 (file)
@@ -71,10 +71,6 @@ MODULE_PARM_DESC(ldlm_enqueue_min, "lock enqueue timeout minimum");
 /* in client side, whether the cached locks will be canceled before replay */
 unsigned int ldlm_cancel_unused_locks_before_replay = 1;
 
-static void interrupted_completion_wait(void *data)
-{
-}
-
 struct lock_wait_data {
        struct ldlm_lock *lwd_lock;
        __u32             lwd_conn_cnt;
@@ -111,9 +107,8 @@ int ldlm_request_bufsize(int count, int type)
        return sizeof(struct ldlm_request) + avail;
 }
 
-int ldlm_expired_completion_wait(void *data)
+void ldlm_expired_completion_wait(struct lock_wait_data *lwd)
 {
-       struct lock_wait_data *lwd = data;
        struct ldlm_lock *lock = lwd->lwd_lock;
        struct obd_import *imp;
        struct obd_device *obd;
@@ -135,7 +130,7 @@ int ldlm_expired_completion_wait(void *data)
                        if (last_dump == 0)
                                libcfs_debug_dumplog();
                }
-               RETURN(0);
+               RETURN_EXIT;
        }
 
        obd = lock->l_conn_export->exp_obd;
@@ -147,7 +142,7 @@ int ldlm_expired_completion_wait(void *data)
                   (s64)(ktime_get_real_seconds() - lock->l_activity),
                   obd2cli_tgt(obd), imp->imp_connection->c_remote_uuid.uuid);
 
-       RETURN(0);
+       EXIT;
 }
 
 int is_granted_or_cancelled_nolock(struct ldlm_lock *lock)
@@ -269,7 +264,6 @@ int ldlm_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
        struct lock_wait_data lwd;
        struct obd_device *obd;
        struct obd_import *imp = NULL;
-       struct l_wait_info lwi;
        time64_t timeout;
        int rc = 0;
 
@@ -300,15 +294,6 @@ noreproc:
        lwd.lwd_lock = lock;
        lock->l_activity = ktime_get_real_seconds();
 
-       if (ldlm_is_no_timeout(lock)) {
-               LDLM_DEBUG(lock, "waiting indefinitely because of NO_TIMEOUT");
-               lwi = LWI_INTR(interrupted_completion_wait, &lwd);
-       } else {
-               lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout),
-                                      ldlm_expired_completion_wait,
-                                      interrupted_completion_wait, &lwd);
-       }
-
        if (imp != NULL) {
                spin_lock(&imp->imp_lock);
                lwd.lwd_conn_cnt = imp->imp_conn_cnt;
@@ -322,8 +307,22 @@ noreproc:
                rc = -EINTR;
        } else {
                /* Go to sleep until the lock is granted or cancelled. */
-               rc = l_wait_event(lock->l_waitq,
-                                 is_granted_or_cancelled(lock), &lwi);
+               if (ldlm_is_no_timeout(lock)) {
+                       LDLM_DEBUG(lock, "waiting indefinitely because of NO_TIMEOUT");
+                       rc = l_wait_event_abortable(
+                               lock->l_waitq,
+                               is_granted_or_cancelled(lock));
+               } else {
+                       if (wait_event_idle_timeout(
+                                   lock->l_waitq,
+                                   is_granted_or_cancelled(lock),
+                                   cfs_time_seconds(timeout)) == 0) {
+                               ldlm_expired_completion_wait(&lwd);
+                               rc = l_wait_event_abortable(
+                                       lock->l_waitq,
+                                       is_granted_or_cancelled(lock));
+                       }
+               }
        }
 
        if (rc) {
@@ -583,6 +582,11 @@ static void failed_lock_cleanup(struct ldlm_namespace *ns,
        }
 }
 
+static bool ldlm_request_slot_needed(enum ldlm_type type)
+{
+       return type == LDLM_FLOCK || type == LDLM_IBITS;
+}
+
 /**
  * Finishing portion of client lock enqueue code.
  *
@@ -603,6 +607,11 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
 
        ENTRY;
 
+       if (ldlm_request_slot_needed(type))
+               obd_put_request_slot(&req->rq_import->imp_obd->u.cli);
+
+       ptlrpc_put_mod_rpc_slot(req);
+
        if (req && req->rq_svc_thread)
                env = req->rq_svc_thread->t_env;
 
@@ -725,6 +734,7 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
 
        if ((*flags) & LDLM_FL_AST_SENT) {
                lock_res_and_lock(lock);
+               ldlm_bl_desc2lock(&reply->lock_desc, lock);
                lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
                unlock_res_and_lock(lock);
                LDLM_DEBUG(lock, "enqueue reply includes blocking AST");
@@ -1072,6 +1082,24 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
                                             LDLM_GLIMPSE_ENQUEUE);
        }
 
+       /* It is important to obtain modify RPC slot first (if applicable), so
+        * that threads that are waiting for a modify RPC slot are not polluting
+        * our rpcs in flight counter. */
+
+       if (einfo->ei_enq_slot)
+               ptlrpc_get_mod_rpc_slot(req);
+
+       if (ldlm_request_slot_needed(einfo->ei_type)) {
+               rc = obd_get_request_slot(&req->rq_import->imp_obd->u.cli);
+               if (rc) {
+                       if (einfo->ei_enq_slot)
+                               ptlrpc_put_mod_rpc_slot(req);
+                       failed_lock_cleanup(ns, lock, einfo->ei_mode);
+                       LDLM_LOCK_RELEASE(lock);
+                       GOTO(out, rc);
+               }
+       }
+
        if (async) {
                LASSERT(reqp != NULL);
                RETURN(0);
@@ -1094,6 +1122,7 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
        else
                rc = err;
 
+out:
        if (!req_passed_in && req != NULL) {
                ptlrpc_req_finished(req);
                if (reqp)
@@ -1105,128 +1134,6 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
 EXPORT_SYMBOL(ldlm_cli_enqueue);
 
 /**
- * Client-side lock convert reply handling.
- *
- * Finish client lock converting, checks for concurrent converts
- * and clear 'converting' flag so lock can be placed back into LRU.
- */
-static int lock_convert_interpret(const struct lu_env *env,
-                                 struct ptlrpc_request *req,
-                                 void *args, int rc)
-{
-       struct ldlm_async_args *aa = args;
-       struct ldlm_lock *lock;
-       struct ldlm_reply *reply;
-
-       ENTRY;
-
-       lock = ldlm_handle2lock(&aa->lock_handle);
-       if (!lock) {
-               LDLM_DEBUG_NOLOCK("convert ACK for unknown local cookie %#llx",
-                       aa->lock_handle.cookie);
-               RETURN(-ESTALE);
-       }
-
-       LDLM_DEBUG(lock, "CONVERTED lock:");
-
-       if (rc != ELDLM_OK)
-               GOTO(out, rc);
-
-       reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
-       if (reply == NULL)
-               GOTO(out, rc = -EPROTO);
-
-       if (reply->lock_handle.cookie != aa->lock_handle.cookie) {
-               LDLM_ERROR(lock,
-                          "convert ACK with wrong lock cookie %#llx but cookie %#llx from server %s id %s\n",
-                          aa->lock_handle.cookie, reply->lock_handle.cookie,
-                          req->rq_export->exp_client_uuid.uuid,
-                          libcfs_id2str(req->rq_peer));
-               GOTO(out, rc = ELDLM_NO_LOCK_DATA);
-       }
-
-       lock_res_and_lock(lock);
-       /*
-        * Lock convert is sent for any new bits to drop, the converting flag
-        * is dropped when ibits on server are the same as on client. Meanwhile
-        * that can be so that more later convert will be replied first with
-        * and clear converting flag, so in case of such race just exit here.
-        * if lock has no converting bits then
-        */
-       if (!ldlm_is_converting(lock)) {
-               LDLM_DEBUG(lock,
-                          "convert ACK for lock without converting flag, reply ibits %#llx",
-                          reply->lock_desc.l_policy_data.l_inodebits.bits);
-       } else if (reply->lock_desc.l_policy_data.l_inodebits.bits !=
-                  lock->l_policy_data.l_inodebits.bits) {
-               /*
-                * Compare server returned lock ibits and local lock ibits
-                * if they are the same we consider convertion is done,
-                * otherwise we have more converts inflight and keep
-                * converting flag.
-                */
-               LDLM_DEBUG(lock, "convert ACK with ibits %#llx\n",
-                          reply->lock_desc.l_policy_data.l_inodebits.bits);
-       } else {
-               ldlm_clear_converting(lock);
-
-               /*
-                * Concurrent BL AST may arrive and cause another convert
-                * or cancel so just do nothing here if bl_ast is set,
-                * finish with convert otherwise.
-                */
-               if (!ldlm_is_bl_ast(lock)) {
-                       struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
-
-                       /*
-                        * Drop cancel_bits since there are no more converts
-                        * and put lock into LRU if it is still not used and
-                        * is not there yet.
-                        */
-                       lock->l_policy_data.l_inodebits.cancel_bits = 0;
-                       if (!lock->l_readers && !lock->l_writers &&
-                           !ldlm_is_canceling(lock)) {
-                               spin_lock(&ns->ns_lock);
-                               /* there is check for list_empty() inside */
-                               ldlm_lock_remove_from_lru_nolock(lock);
-                               ldlm_lock_add_to_lru_nolock(lock);
-                               spin_unlock(&ns->ns_lock);
-                       }
-               }
-       }
-       unlock_res_and_lock(lock);
-out:
-       if (rc) {
-               int flag;
-
-               lock_res_and_lock(lock);
-               if (ldlm_is_converting(lock)) {
-                       ldlm_clear_converting(lock);
-                       ldlm_set_cbpending(lock);
-                       ldlm_set_bl_ast(lock);
-                       lock->l_policy_data.l_inodebits.cancel_bits = 0;
-               }
-               unlock_res_and_lock(lock);
-
-               /*
-                * fallback to normal lock cancel. If rc means there is no
-                * valid lock on server, do only local cancel
-                */
-               if (rc == ELDLM_NO_LOCK_DATA)
-                       flag = LCF_LOCAL;
-               else
-                       flag = LCF_ASYNC;
-
-               rc = ldlm_cli_cancel(&aa->lock_handle, flag);
-               if (rc < 0)
-                       LDLM_DEBUG(lock, "failed to cancel lock: rc = %d\n",
-                                  rc);
-       }
-       LDLM_LOCK_PUT(lock);
-       RETURN(rc);
-}
-
-/**
  * Client-side IBITS lock convert.
  *
  * Inform server that lock has been converted instead of canceling.
@@ -1238,19 +1145,15 @@ out:
  * is made asynchronous.
  *
  */
-int ldlm_cli_convert(struct ldlm_lock *lock, __u32 *flags)
+int ldlm_cli_convert_req(struct ldlm_lock *lock, __u32 *flags, __u64 new_bits)
 {
        struct ldlm_request *body;
        struct ptlrpc_request *req;
-       struct ldlm_async_args *aa;
        struct obd_export *exp = lock->l_conn_export;
 
        ENTRY;
 
-       if (exp == NULL) {
-               LDLM_ERROR(lock, "convert must not be called on local locks.");
-               RETURN(-EINVAL);
-       }
+       LASSERT(exp != NULL);
 
        /*
         * this is better to check earlier and it is done so already,
@@ -1281,8 +1184,7 @@ int ldlm_cli_convert(struct ldlm_lock *lock, __u32 *flags)
        body->lock_desc.l_req_mode = lock->l_req_mode;
        body->lock_desc.l_granted_mode = lock->l_granted_mode;
 
-       body->lock_desc.l_policy_data.l_inodebits.bits =
-                                       lock->l_policy_data.l_inodebits.bits;
+       body->lock_desc.l_policy_data.l_inodebits.bits = new_bits;
        body->lock_desc.l_policy_data.l_inodebits.cancel_bits = 0;
 
        body->lock_flags = ldlm_flags_to_wire(*flags);
@@ -1302,10 +1204,6 @@ int ldlm_cli_convert(struct ldlm_lock *lock, __u32 *flags)
                lprocfs_counter_incr(exp->exp_obd->obd_svc_stats,
                                     LDLM_CONVERT - LDLM_FIRST_OPC);
 
-       aa = ptlrpc_req_async_args(aa, req);
-       ldlm_lock2handle(lock, &aa->lock_handle);
-       req->rq_interpret_reply = lock_convert_interpret;
-
        ptlrpcd_add_req(req);
        RETURN(0);
 }
@@ -1327,6 +1225,9 @@ static __u64 ldlm_cli_cancel_local(struct ldlm_lock *lock)
                bool local_only;
 
                LDLM_DEBUG(lock, "client-side cancel");
+               OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL_LOCAL,
+                                cfs_fail_val);
+
                /* Set this flag to prevent others from getting new references*/
                lock_res_and_lock(lock);
                ldlm_set_cbpending(lock);
@@ -1558,6 +1459,27 @@ int ldlm_cli_update_pool(struct ptlrpc_request *req)
        RETURN(0);
 }
 
+int ldlm_cli_convert(struct ldlm_lock *lock,
+                    enum ldlm_cancel_flags cancel_flags)
+{
+       int rc = -EINVAL;
+
+       LASSERT(!lock->l_readers && !lock->l_writers);
+       LDLM_DEBUG(lock, "client lock convert START");
+
+       if (lock->l_resource->lr_type == LDLM_IBITS) {
+               lock_res_and_lock(lock);
+               do {
+                       rc = ldlm_cli_inodebits_convert(lock, cancel_flags);
+               } while (rc == -EAGAIN);
+               unlock_res_and_lock(lock);
+       }
+
+       LDLM_DEBUG(lock, "client lock convert END");
+       RETURN(rc);
+}
+EXPORT_SYMBOL(ldlm_cli_convert);
+
 /**
  * Client side lock cancel.
  *
@@ -1582,20 +1504,9 @@ int ldlm_cli_cancel(const struct lustre_handle *lockh,
                RETURN(0);
        }
 
-       /* Convert lock bits instead of cancel for IBITS locks */
-       if (cancel_flags & LCF_CONVERT) {
-               LASSERT(lock->l_resource->lr_type == LDLM_IBITS);
-               LASSERT(lock->l_policy_data.l_inodebits.cancel_bits != 0);
-
-               rc = ldlm_cli_dropbits(lock,
-                               lock->l_policy_data.l_inodebits.cancel_bits);
-               if (rc == 0) {
-                       LDLM_LOCK_RELEASE(lock);
-                       RETURN(0);
-               }
-       }
-
        lock_res_and_lock(lock);
+       LASSERT(!ldlm_is_converting(lock));
+
        /* Lock is being canceled and the caller doesn't want to wait */
        if (ldlm_is_canceling(lock)) {
                if (cancel_flags & LCF_ASYNC) {
@@ -1608,16 +1519,6 @@ int ldlm_cli_cancel(const struct lustre_handle *lockh,
                RETURN(0);
        }
 
-       /*
-        * Lock is being converted, cancel it immediately.
-        * When convert will end, it releases lock and it will be gone.
-        */
-       if (ldlm_is_converting(lock)) {
-               /* set back flags removed by convert */
-               ldlm_set_cbpending(lock);
-               ldlm_set_bl_ast(lock);
-       }
-
        ldlm_set_canceling(lock);
        unlock_res_and_lock(lock);
 
@@ -1989,8 +1890,7 @@ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns,
                        /* No locks which got blocking requests. */
                        LASSERT(!ldlm_is_bl_ast(lock));
 
-                       if (!ldlm_is_canceling(lock) &&
-                           !ldlm_is_converting(lock))
+                       if (!ldlm_is_canceling(lock))
                                break;
 
                        /*
@@ -2048,7 +1948,7 @@ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns,
 
                lock_res_and_lock(lock);
                /* Check flags again under the lock. */
-               if (ldlm_is_canceling(lock) || ldlm_is_converting(lock) ||
+               if (ldlm_is_canceling(lock) ||
                    ldlm_lock_remove_from_lru_check(lock, last_use) == 0) {
                        /*
                         * Another thread is removing lock from LRU, or
@@ -2178,11 +2078,10 @@ int ldlm_cancel_resource_local(struct ldlm_resource *res,
                        continue;
 
                /*
-                * If somebody is already doing CANCEL, or blocking AST came,
-                * or lock is being converted then skip this lock.
+                * If somebody is already doing CANCEL, or blocking AST came
+                * then skip this lock.
                 */
-               if (ldlm_is_bl_ast(lock) || ldlm_is_canceling(lock) ||
-                   ldlm_is_converting(lock))
+               if (ldlm_is_bl_ast(lock) || ldlm_is_canceling(lock))
                        continue;
 
                if (lockmode_compat(lock->l_granted_mode, mode))
@@ -2208,7 +2107,6 @@ int ldlm_cancel_resource_local(struct ldlm_resource *res,
                /* See CBPENDING comment in ldlm_cancel_lru */
                lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING |
                                 lock_flags;
-
                LASSERT(list_empty(&lock->l_bl_ast));
                list_add(&lock->l_bl_ast, cancels);
                LDLM_LOCK_GET(lock);