Whamcloud - gitweb
LU-4942 at: per-export lock callback timeout 36/9336/9
authorVitaly Fertman <vitaly_fertman@xyratex.com>
Fri, 10 Oct 2014 14:45:45 +0000 (18:45 +0400)
committerOleg Drokin <oleg.drokin@intel.com>
Thu, 30 Oct 2014 02:15:24 +0000 (02:15 +0000)
The lock callback timeout is calculated as an average per namespace.
This does not reflect individual client behavior.
Instead, we should calculate it on a per-export basis.

Signed-off-by: Vitaly Fertman <vitaly_fertman@xyratex.com>
Change-Id: I12e3fc5f8d261cce252fcf13f22193273dc054ee
Tested-by: Elena Gryaznova <Elena_Gryaznova@xyratex.com>
Reviewed-by: Andriy Skulysh <Andriy_Skulysh@xyratex.com>
Reviewed-by: Alexey Lyashkov <Alexey_Lyashkov@xyratex.com>
Xyratex-bug-id: MRP-417
Reviewed-on: http://review.whamcloud.com/9336
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: James Simmons <uja.ornl@gmail.com>
13 files changed:
lustre/include/lustre_dlm.h
lustre/include/lustre_export.h
lustre/include/lustre_import.h
lustre/ldlm/ldlm_internal.h
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_reint.c
lustre/mdt/mdt_xattr.c
lustre/obdclass/genops.c
lustre/ofd/ofd_dev.c
lustre/quota/qmt_handler.c

index a8f5715..6bc0b41 100644 (file)
@@ -1218,10 +1218,12 @@ int ldlm_handle_convert0(struct ptlrpc_request *req,
                          const struct ldlm_request *dlm_req);
 int ldlm_handle_cancel(struct ptlrpc_request *req);
 int ldlm_request_cancel(struct ptlrpc_request *req,
-                        const struct ldlm_request *dlm_req, int first);
+                       const struct ldlm_request *dlm_req,
+                       int first, enum lustre_at_flags flags);
 /** @} ldlm_handlers */
 
 void ldlm_revoke_export_locks(struct obd_export *exp);
+unsigned int ldlm_bl_timeout(struct ldlm_lock *lock);
 #endif
 int ldlm_del_waiting_lock(struct ldlm_lock *lock);
 int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, int timeout);
index bb1237c..318c8dd 100644 (file)
@@ -265,6 +265,7 @@ struct obd_export {
         } u;
 
        struct nodemap            *exp_nodemap;
+       struct adaptive_timeout    exp_bl_lock_at;
 };
 
 #define exp_target_data u.eu_target_data
index f4a7b90..b5649cf 100644 (file)
@@ -70,6 +70,11 @@ struct adaptive_timeout {
        spinlock_t      at_lock;
 };
 
+enum lustre_at_flags {
+       LATF_SKIP       = 0x0,
+       LATF_STATS      = 0x1,
+};
+
 struct ptlrpc_at_array {
        struct list_head *paa_reqs_array; /** array to hold requests */
         __u32             paa_size;       /** the size of array */
index d9b5d96..6d2fc61 100644 (file)
@@ -110,9 +110,7 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr,
 int ldlm_cancel_lru_local(struct ldlm_namespace *ns,
                          struct list_head *cancels, int count, int max,
                           ldlm_cancel_flags_t cancel_flags, int flags);
-extern int ldlm_enqueue_min;
-int ldlm_get_enq_timeout(struct ldlm_lock *lock);
-
+extern unsigned int ldlm_enqueue_min;
 /* ldlm_resource.c */
 int ldlm_resource_putref_locked(struct ldlm_resource *res);
 void ldlm_resource_insert_lock_after(struct ldlm_lock *original,
index 6b81e74..d23ba79 100644 (file)
@@ -1629,7 +1629,6 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns,
         struct ldlm_interval *node = NULL;
         ENTRY;
 
-        lock->l_last_activity = cfs_time_current_sec();
         /* policies are not executed on the client or during replay */
         if ((*flags & (LDLM_FL_HAS_INTENT|LDLM_FL_REPLAY)) == LDLM_FL_HAS_INTENT
             && !local && ns->ns_policy) {
index 62baaf5..bc8f6bd 100644 (file)
@@ -324,7 +324,7 @@ static void waiting_locks_callback(unsigned long unused)
                        spin_unlock_bh(&waiting_locks_spinlock);
                        LDLM_DEBUG(lock, "prolong the busy lock");
                        ldlm_refresh_waiting_lock(lock,
-                                                 ldlm_get_enq_timeout(lock));
+                                                 ldlm_bl_timeout(lock) >> 1);
                        spin_lock_bh(&waiting_locks_spinlock);
 
                         if (!cont) {
@@ -417,7 +417,7 @@ static int __ldlm_add_waiting_lock(struct ldlm_lock *lock, int seconds)
 static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
 {
        int ret;
-       int timeout = ldlm_get_enq_timeout(lock);
+       int timeout = ldlm_bl_timeout(lock);
 
        /* NB: must be called with hold of lock_res_and_lock() */
        LASSERT(ldlm_is_res_locked(lock));
@@ -429,20 +429,21 @@ static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
        if (ldlm_is_destroyed(lock)) {
                static cfs_time_t next;
                spin_unlock_bh(&waiting_locks_spinlock);
-                LDLM_ERROR(lock, "not waiting on destroyed lock (bug 5653)");
-                if (cfs_time_after(cfs_time_current(), next)) {
-                        next = cfs_time_shift(14400);
-                        libcfs_debug_dumpstack(NULL);
-                }
-                return 0;
-        }
+               LDLM_ERROR(lock, "not waiting on destroyed lock (bug 5653)");
+               if (cfs_time_after(cfs_time_current(), next)) {
+                       next = cfs_time_shift(14400);
+                       libcfs_debug_dumpstack(NULL);
+               }
+               return 0;
+       }
 
-        ret = __ldlm_add_waiting_lock(lock, timeout);
-        if (ret) {
-                /* grab ref on the lock if it has been added to the
-                 * waiting list */
-                LDLM_LOCK_GET(lock);
-        }
+       lock->l_last_activity = cfs_time_current_sec();
+       ret = __ldlm_add_waiting_lock(lock, timeout);
+       if (ret) {
+               /* grab ref on the lock if it has been added to the
+                * waiting list */
+               LDLM_LOCK_GET(lock);
+       }
        spin_unlock_bh(&waiting_locks_spinlock);
 
        if (ret) {
@@ -573,6 +574,31 @@ int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, int timeout)
 #ifdef HAVE_SERVER_SUPPORT
 
 /**
+ * Calculate the per-export Blocking timeout (covering BL AST, data flush,
+ * lock cancel, and their replies). Used for lock callback timeout and AST
+ * re-send period.
+ *
+ * \param[in] lock        lock which is getting the blocking callback
+ *
+ * \retval            timeout in seconds to wait for the client reply
+ */
+unsigned int ldlm_bl_timeout(struct ldlm_lock *lock)
+{
+       unsigned int timeout;
+
+       if (AT_OFF)
+               return obd_timeout / 2;
+
+       /* Since these are non-updating timeouts, we should be conservative.
+        * Take more than usually, 150%
+        * It would be nice to have some kind of "early reply" mechanism for
+        * lock callbacks too... */
+       timeout = at_get(&lock->l_export->exp_bl_lock_at);
+       return max(timeout + (timeout >> 1), ldlm_enqueue_min);
+}
+EXPORT_SYMBOL(ldlm_bl_timeout);
+
+/**
  * Perform lock cleanup if AST sending failed.
  */
 static void ldlm_failed_ast(struct ldlm_lock *lock, int rc,
@@ -641,7 +667,7 @@ static int ldlm_handle_ast_error(struct ldlm_lock *lock,
                         }
 
                 } else {
-                       LDLM_ERROR(lock, "client (nid %s) returned %d: rc=%d "
+                       LDLM_ERROR(lock, "client (nid %s) returned %d: rc = %d "
                                   "from %s AST", libcfs_nid2str(peer.nid),
                                   (req->rq_repmsg != NULL) ?
                                   lustre_msg_get_status(req->rq_repmsg) : 0,
@@ -714,7 +740,7 @@ static void ldlm_update_resend(struct ptlrpc_request *req, void *data)
        struct ldlm_cb_async_args *ca   = data;
        struct ldlm_lock          *lock = ca->ca_lock;
 
-       ldlm_refresh_waiting_lock(lock, ldlm_get_enq_timeout(lock));
+       ldlm_refresh_waiting_lock(lock, ldlm_bl_timeout(lock));
 }
 
 static inline int ldlm_ast_fini(struct ptlrpc_request *req,
@@ -853,7 +879,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
                unlock_res_and_lock(lock);
 
                /* Do not resend after lock callback timeout */
-               req->rq_delay_limit = ldlm_get_enq_timeout(lock);
+               req->rq_delay_limit = ldlm_bl_timeout(lock);
                req->rq_resend_cb = ldlm_update_resend;
        }
 
@@ -888,7 +914,6 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
         struct ldlm_request    *body;
         struct ptlrpc_request  *req;
         struct ldlm_cb_async_args *ca;
-        long                    total_enqueue_wait;
         int                     instant_cancel = 0;
         int                     rc = 0;
        int                     lvb_len;
@@ -897,9 +922,6 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
         LASSERT(lock != NULL);
         LASSERT(data != NULL);
 
-        total_enqueue_wait = cfs_time_sub(cfs_time_current_sec(),
-                                          lock->l_last_activity);
-
        if (OBD_FAIL_PRECHECK(OBD_FAIL_OST_LDLM_REPLY_NET)) {
                LDLM_DEBUG(lock, "dropping CP AST");
                RETURN(0);
@@ -957,25 +979,9 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
                }
         }
 
-        LDLM_DEBUG(lock, "server preparing completion AST (after %lds wait)",
-                   total_enqueue_wait);
-
        lock->l_last_activity = cfs_time_current_sec();
 
-        /* Server-side enqueue wait time estimate, used in
-            __ldlm_add_waiting_lock to set future enqueue timers */
-        if (total_enqueue_wait < ldlm_get_enq_timeout(lock))
-                at_measured(ldlm_lock_to_ns_at(lock),
-                            total_enqueue_wait);
-        else
-                /* bz18618. Don't add lock enqueue time we spend waiting for a
-                   previous callback to fail. Locks waiting legitimately will
-                   get extended by ldlm_refresh_waiting_lock regardless of the
-                   estimate, so it's okay to underestimate here. */
-                LDLM_DEBUG(lock, "lock completed after %lus; estimate was %ds. "
-                       "It is likely that a previous callback timed out.",
-                       total_enqueue_wait,
-                       at_get(ldlm_lock_to_ns_at(lock)));
+       LDLM_DEBUG(lock, "server preparing completion AST");
 
         ptlrpc_request_set_replen(req);
 
@@ -1010,7 +1016,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
                        /* start the lock-timeout clock */
                        ldlm_add_waiting_lock(lock);
                        /* Do not resend after lock callback timeout */
-                       req->rq_delay_limit = ldlm_get_enq_timeout(lock);
+                       req->rq_delay_limit = ldlm_bl_timeout(lock);
                        req->rq_resend_cb = ldlm_update_resend;
                }
         }
@@ -1184,7 +1190,7 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns,
 
         LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
 
-        ldlm_request_cancel(req, dlm_req, LDLM_ENQUEUE_CANCEL_OFF);
+       ldlm_request_cancel(req, dlm_req, LDLM_ENQUEUE_CANCEL_OFF, LATF_SKIP);
        flags = ldlm_flags_from_wire(dlm_req->lock_flags);
 
         LASSERT(req->rq_export);
@@ -1271,7 +1277,6 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns,
                GOTO(out, rc);
        }
 
-        lock->l_last_activity = cfs_time_current_sec();
         lock->l_remote_handle = dlm_req->lock_handle[0];
         LDLM_DEBUG(lock, "server-side enqueue handler, new lock created");
 
@@ -1546,7 +1551,6 @@ int ldlm_handle_convert0(struct ptlrpc_request *req,
 
                 LDLM_DEBUG(lock, "server-side convert handler START");
 
-                lock->l_last_activity = cfs_time_current_sec();
                 res = ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode,
                                         &dlm_rep->lock_flags);
                 if (res) {
@@ -1597,7 +1601,8 @@ EXPORT_SYMBOL(ldlm_handle_convert);
  * requests.
  */
 int ldlm_request_cancel(struct ptlrpc_request *req,
-                        const struct ldlm_request *dlm_req, int first)
+                       const struct ldlm_request *dlm_req,
+                       int first, enum lustre_at_flags flags)
 {
         struct ldlm_resource *res, *pres = NULL;
         struct ldlm_lock *lock;
@@ -1647,6 +1652,14 @@ int ldlm_request_cancel(struct ptlrpc_request *req,
                         }
                         pres = res;
                 }
+
+               if ((flags & LATF_STATS) && ldlm_is_ast_sent(lock)) {
+                       long delay = cfs_time_sub(cfs_time_current_sec(),
+                                                 lock->l_last_activity);
+                       LDLM_DEBUG(lock, "server cancels blocked lock after "
+                                  CFS_DURATION_T"s", delay);
+                       at_measured(&lock->l_export->exp_bl_lock_at, delay);
+               }
                 ldlm_lock_cancel(lock);
                 LDLM_LOCK_PUT(lock);
         }
@@ -1686,7 +1699,7 @@ int ldlm_handle_cancel(struct ptlrpc_request *req)
         if (rc)
                 RETURN(rc);
 
-        if (!ldlm_request_cancel(req, dlm_req, 0))
+       if (!ldlm_request_cancel(req, dlm_req, 0, LATF_STATS))
                req->rq_status = LUSTRE_ESTALE;
 
         RETURN(ptlrpc_reply(req));
index 533f18c..defd2ac 100644 (file)
@@ -67,8 +67,8 @@
 
 #include "ldlm_internal.h"
 
-int ldlm_enqueue_min = OBD_TIMEOUT_DEFAULT;
-CFS_MODULE_PARM(ldlm_enqueue_min, "i", int, 0644,
+unsigned int ldlm_enqueue_min = OBD_TIMEOUT_DEFAULT;
+CFS_MODULE_PARM(ldlm_enqueue_min, "i", uint, 0644,
                 "lock enqueue timeout minimum");
 
 /* in client side, whether the cached locks will be canceled before replay */
@@ -133,43 +133,55 @@ int ldlm_expired_completion_wait(void *data)
 }
 EXPORT_SYMBOL(ldlm_expired_completion_wait);
 
+/**
+ * Calculate the Completion timeout (covering enqueue, BL AST, data flush,
+ * lock cancel, and their replies). Used for lock completion timeout on the
+ * client side.
+ *
+ * \param[in] lock        lock which is waiting the completion callback
+ *
+ * \retval            timeout in seconds to wait for the server reply
+ */
+
 /* We use the same basis for both server side and client side functions
    from a single node. */
-int ldlm_get_enq_timeout(struct ldlm_lock *lock)
+static unsigned int ldlm_cp_timeout(struct ldlm_lock *lock)
 {
-        int timeout = at_get(ldlm_lock_to_ns_at(lock));
-        if (AT_OFF)
-                return obd_timeout / 2;
-        /* Since these are non-updating timeouts, we should be conservative.
-           It would be nice to have some kind of "early reply" mechanism for
-           lock callbacks too... */
-        timeout = min_t(int, at_max, timeout + (timeout >> 1)); /* 150% */
-        return max(timeout, ldlm_enqueue_min);
+       unsigned int timeout;
+
+       if (AT_OFF)
+               return obd_timeout;
+
+       /* Wait a long time for enqueue - server may have to callback a
+        * lock from another client.  Server will evict the other client if it
+        * doesn't respond reasonably, and then give us the lock. */
+       timeout = at_get(ldlm_lock_to_ns_at(lock));
+       return max(3 * timeout, ldlm_enqueue_min);
 }
-EXPORT_SYMBOL(ldlm_get_enq_timeout);
 
 /**
  * Helper function for ldlm_completion_ast(), updating timings when lock is
  * actually granted.
  */
-static int ldlm_completion_tail(struct ldlm_lock *lock)
+static int ldlm_completion_tail(struct ldlm_lock *lock, void *data)
 {
        long delay;
-       int  result;
+       int  result = 0;
 
        if (ldlm_is_destroyed(lock) || ldlm_is_failed(lock)) {
                LDLM_DEBUG(lock, "client-side enqueue: destroyed");
                result = -EIO;
+       } else if (data == NULL) {
+               LDLM_DEBUG(lock, "client-side enqueue: granted");
        } else {
+               /* Take into AT only CP RPC, not immediately granted locks */
                delay = cfs_time_sub(cfs_time_current_sec(),
                                     lock->l_last_activity);
                LDLM_DEBUG(lock, "client-side enqueue: granted after "
                           CFS_DURATION_T"s", delay);
 
                /* Update our time estimate */
-               at_measured(ldlm_lock_to_ns_at(lock),
-                           delay);
-               result = 0;
+               at_measured(ldlm_lock_to_ns_at(lock), delay);
        }
        return result;
 }
@@ -190,7 +202,7 @@ int ldlm_completion_ast_async(struct ldlm_lock *lock, __u64 flags, void *data)
 
        if (!(flags & LDLM_FL_BLOCKED_MASK)) {
                wake_up(&lock->l_waitq);
-               RETURN(ldlm_completion_tail(lock));
+               RETURN(ldlm_completion_tail(lock, data));
        }
 
        LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
@@ -254,12 +266,10 @@ noreproc:
                 imp = obd->u.cli.cl_import;
         }
 
-        /* Wait a long time for enqueue - server may have to callback a
-           lock from another client.  Server will evict the other client if it
-           doesn't respond reasonably, and then give us the lock. */
-        timeout = ldlm_get_enq_timeout(lock) * 2;
+       timeout = ldlm_cp_timeout(lock);
 
-        lwd.lwd_lock = lock;
+       lwd.lwd_lock = lock;
+       lock->l_last_activity = cfs_time_current_sec();
 
        if (ldlm_is_no_timeout(lock)) {
                 LDLM_DEBUG(lock, "waiting indefinitely because of NO_TIMEOUT");
@@ -291,9 +301,9 @@ noreproc:
                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
                            rc);
                 RETURN(rc);
-        }
+       }
 
-        RETURN(ldlm_completion_tail(lock));
+       RETURN(ldlm_completion_tail(lock, data));
 }
 EXPORT_SYMBOL(ldlm_completion_ast);
 
@@ -919,6 +929,7 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
        lock->l_export = NULL;
        lock->l_blocking_ast = einfo->ei_cb_bl;
        lock->l_flags |= (*flags & (LDLM_FL_NO_LRU | LDLM_FL_EXCL));
+        lock->l_last_activity = cfs_time_current_sec();
 
         /* lock not sent to server yet */
 
index c1e5ea8..569ecf6 100644 (file)
@@ -1195,7 +1195,7 @@ static int mdt_swap_layouts(struct tgt_session_info *tsi)
        info = tsi2mdt_info(tsi);
 
        if (info->mti_dlm_req != NULL)
-               ldlm_request_cancel(req, info->mti_dlm_req, 0);
+               ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
 
        if (req_capsule_get_size(info->mti_pill, &RMF_CAPA1, RCL_CLIENT))
                mdt_set_capainfo(info, 0, &info->mti_body->mbo_fid1,
index 5a762e1..f751f98 100644 (file)
@@ -665,8 +665,8 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
         DEBUG_REQ(D_INODE, req, "setattr "DFID" %x", PFID(rr->rr_fid1),
                   (unsigned int)ma->ma_attr.la_valid);
 
-        if (info->mti_dlm_req)
-                ldlm_request_cancel(req, info->mti_dlm_req, 0);
+       if (info->mti_dlm_req)
+               ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
 
        repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
         mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
@@ -828,8 +828,9 @@ static int mdt_reint_create(struct mdt_thread_info *info,
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
                 RETURN(err_serious(-ESTALE));
 
-        if (info->mti_dlm_req)
-                ldlm_request_cancel(mdt_info_req(info), info->mti_dlm_req, 0);
+       if (info->mti_dlm_req)
+               ldlm_request_cancel(mdt_info_req(info),
+                                   info->mti_dlm_req, 0, LATF_SKIP);
 
        if (!lu_name_is_valid(&info->mti_rr.rr_name))
                RETURN(-EPROTO);
@@ -883,8 +884,8 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
        DEBUG_REQ(D_INODE, req, "unlink "DFID"/"DNAME"", PFID(rr->rr_fid1),
                  PNAME(&rr->rr_name));
 
-        if (info->mti_dlm_req)
-                ldlm_request_cancel(req, info->mti_dlm_req, 0);
+       if (info->mti_dlm_req)
+               ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
 
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
                 RETURN(err_serious(-ENOENT));
@@ -1106,8 +1107,8 @@ static int mdt_reint_link(struct mdt_thread_info *info,
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
                 RETURN(err_serious(-ENOENT));
 
-        if (info->mti_dlm_req)
-                ldlm_request_cancel(req, info->mti_dlm_req, 0);
+       if (info->mti_dlm_req)
+               ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
 
         /* Invalid case so return error immediately instead of
          * processing it */
@@ -1987,7 +1988,7 @@ static int mdt_reint_rename_or_migrate(struct mdt_thread_info *info,
        ENTRY;
 
        if (info->mti_dlm_req)
-               ldlm_request_cancel(req, info->mti_dlm_req, 0);
+               ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
 
        if (!fid_is_md_operative(rr->rr_fid1) ||
            !fid_is_md_operative(rr->rr_fid2))
index 818c52d..9678397 100644 (file)
@@ -377,7 +377,7 @@ int mdt_reint_setxattr(struct mdt_thread_info *info,
         CDEBUG(D_INODE, "setxattr for "DFID"\n", PFID(rr->rr_fid1));
 
        if (info->mti_dlm_req)
-               ldlm_request_cancel(req, info->mti_dlm_req, 0);
+               ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
 
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SETXATTR))
                 RETURN(err_serious(-ENOMEM));
index a02d347..4398a92 100644 (file)
@@ -913,6 +913,7 @@ struct obd_export *class_new_export(struct obd_device *obd,
                 }
         }
 
+       at_init(&export->exp_bl_lock_at, obd_timeout, 0);
        spin_lock(&obd->obd_dev_lock);
         if (obd->obd_stopping) {
                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
index 077858a..928bf29 100644 (file)
@@ -1828,7 +1828,7 @@ static int ofd_destroy_hdl(struct tgt_session_info *tsi)
                dlm = req_capsule_client_get(tsi->tsi_pill, &RMF_DLM_REQ);
                if (dlm == NULL)
                        RETURN(-EFAULT);
-               ldlm_request_cancel(tgt_ses_req(tsi), dlm, 0);
+               ldlm_request_cancel(tgt_ses_req(tsi), dlm, 0, LATF_SKIP);
        }
 
        *fid = body->oa.o_oi.oi_fid;
@@ -2136,15 +2136,18 @@ static int ofd_quotactl(struct tgt_session_info *tsi)
  *
  * \retval             amount of time to extend the timeout with
  */
-static inline int prolong_timeout(struct ptlrpc_request *req)
+static inline int prolong_timeout(struct ptlrpc_request *req,
+                                 struct ldlm_lock *lock)
 {
        struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt;
 
        if (AT_OFF)
                return obd_timeout / 2;
 
-       return max(at_est2timeout(at_get(&svcpt->scp_at_estimate)),
-                  ldlm_timeout);
+       /* We are in the middle of the process - BL AST is sent, CANCEL
+         is ahead. Take half of AT + IO process time. */
+       return at_est2timeout(at_get(&svcpt->scp_at_estimate)) +
+               (ldlm_bl_timeout(lock) >> 1);
 }
 
 /**
@@ -2163,8 +2166,9 @@ static inline int prolong_timeout(struct ptlrpc_request *req)
  */
 static int ofd_prolong_one_lock(struct tgt_session_info *tsi,
                                struct ldlm_lock *lock,
-                               struct ldlm_extent *extent, int timeout)
+                               struct ldlm_extent *extent)
 {
+       int timeout = prolong_timeout(tgt_ses_req(tsi), lock);
 
        if (lock->l_flags & LDLM_FL_DESTROYED) /* lock already cancelled */
                return 0;
@@ -2222,7 +2226,6 @@ static int ofd_prolong_extent_locks(struct tgt_session_info *tsi,
                .end = end
        };
        struct ldlm_lock        *lock;
-       int                      timeout = prolong_timeout(tgt_ses_req(tsi));
        int                      lock_count = 0;
 
        ENTRY;
@@ -2240,7 +2243,7 @@ static int ofd_prolong_extent_locks(struct tgt_session_info *tsi,
                                /* bingo */
                                LASSERT(lock->l_export == exp);
                                lock_count = ofd_prolong_one_lock(tsi, lock,
-                                                            &extent, timeout);
+                                                                 &extent);
                                LDLM_LOCK_PUT(lock);
                                RETURN(lock_count);
                        }
@@ -2260,7 +2263,7 @@ static int ofd_prolong_extent_locks(struct tgt_session_info *tsi,
                                         &extent))
                        continue;
 
-               lock_count += ofd_prolong_one_lock(tsi, lock, &extent, timeout);
+               lock_count += ofd_prolong_one_lock(tsi, lock, &extent);
        }
        spin_unlock_bh(&exp->exp_bl_list_lock);
 
index a3edbdc..2a1bae8 100644 (file)
@@ -649,7 +649,7 @@ static int qmt_dqacq(const struct lu_env *env, struct lu_device *ld,
 
                        svc = req->rq_rqbd->rqbd_svcpt;
                        timeout = at_est2timeout(at_get(&svc->scp_at_estimate));
-                       timeout = max(timeout, ldlm_timeout);
+                       timeout += (ldlm_bl_timeout(lock) >> 1);
 
                        /* lock is being cancelled, prolong timeout */
                        ldlm_refresh_waiting_lock(lock, timeout);