Whamcloud - gitweb
b=17031
authorzhanghc <zhanghc>
Tue, 2 Dec 2008 06:00:27 +0000 (06:00 +0000)
committerzhanghc <zhanghc>
Tue, 2 Dec 2008 06:00:27 +0000 (06:00 +0000)
during refreshing locks waiting its I/O to complete,
take current service time into account, not only using
the timeout gotten by ldlm_get_enqueue_timeout

i=Andreas
i=Nathan.Rutman

lustre/include/lustre_dlm.h
lustre/ldlm/ldlm_lockd.c
lustre/ost/ost_handler.c

index 3eac378..e11fcbb 100644 (file)
@@ -857,7 +857,7 @@ int ldlm_handle_cancel(struct ptlrpc_request *req);
 int ldlm_request_cancel(struct ptlrpc_request *req,
                         const struct ldlm_request *dlm_req, int first);
 int ldlm_del_waiting_lock(struct ldlm_lock *lock);
-int ldlm_refresh_waiting_lock(struct ldlm_lock *lock);
+int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, int timeout);
 void ldlm_revoke_export_locks(struct obd_export *exp);
 int ldlm_get_ref(void);
 void ldlm_put_ref(void);
index 701a2a0..13ee9e1 100644 (file)
@@ -321,7 +321,8 @@ repeat:
                         LDLM_LOCK_GET(lock);
                         spin_unlock_bh(&waiting_locks_spinlock);
                         LDLM_DEBUG(lock, "prolong the busy lock");
-                        ldlm_refresh_waiting_lock(lock);
+                        ldlm_refresh_waiting_lock(lock,
+                                                  ldlm_get_enq_timeout(lock));
                         spin_lock_bh(&waiting_locks_spinlock);
 
                         if (!cont) {
@@ -380,7 +381,7 @@ repeat:
  *
  * Called with the namespace lock held.
  */
-static int __ldlm_add_waiting_lock(struct ldlm_lock *lock)
+static int __ldlm_add_waiting_lock(struct ldlm_lock *lock, int seconds)
 {
         cfs_time_t timeout;
         cfs_time_t timeout_rounded;
@@ -390,11 +391,9 @@ static int __ldlm_add_waiting_lock(struct ldlm_lock *lock)
 
         if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT) ||
             OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT))
-                timeout = 2;
-        else
-                timeout = ldlm_get_enq_timeout(lock);
+                seconds = 2;
 
-        timeout = cfs_time_shift(timeout);
+        timeout = cfs_time_shift(seconds);
         if (likely(cfs_time_after(timeout, lock->l_callback_timeout)))
                 lock->l_callback_timeout = timeout;
 
@@ -429,7 +428,7 @@ static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
                 return 0;
         }
 
-        ret = __ldlm_add_waiting_lock(lock);
+        ret = __ldlm_add_waiting_lock(lock, ldlm_get_enq_timeout(lock));
         if (ret)
                 /* grab ref on the lock if it has been added to the
                  * waiting list */
@@ -503,7 +502,7 @@ int ldlm_del_waiting_lock(struct ldlm_lock *lock)
  *
  * Called with namespace lock held.
  */
-int ldlm_refresh_waiting_lock(struct ldlm_lock *lock)
+int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, int timeout)
 {
         if (lock->l_export == NULL) {
                 /* We don't have a "waiting locks list" on clients. */
@@ -522,7 +521,7 @@ int ldlm_refresh_waiting_lock(struct ldlm_lock *lock)
         /* we remove/add the lock to the waiting list, so no needs to
          * release/take a lock reference */
         __ldlm_del_waiting_lock(lock);
-        __ldlm_add_waiting_lock(lock);
+        __ldlm_add_waiting_lock(lock, timeout);
         spin_unlock_bh(&waiting_locks_spinlock);
 
         LDLM_DEBUG(lock, "refreshed");
@@ -541,7 +540,7 @@ int ldlm_del_waiting_lock(struct ldlm_lock *lock)
         RETURN(0);
 }
 
-int ldlm_refresh_waiting_lock(struct ldlm_lock *lock)
+int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, int timeout)
 {
         RETURN(0);
 }
index d76b2e2..28430a4 100644 (file)
@@ -495,6 +495,7 @@ struct ost_prolong_data {
         struct obdo *opd_oa;
         ldlm_mode_t opd_mode;
         int opd_lock_match;
+        int opd_timeout;
 };
 
 static int ost_prolong_locks_iter(struct ldlm_lock *lock, void *data)
@@ -539,13 +540,13 @@ static int ost_prolong_locks_iter(struct ldlm_lock *lock, void *data)
 
         /* OK. this is a possible lock the user holds doing I/O
          * let's refresh eviction timer for it */
-        ldlm_refresh_waiting_lock(lock);
+        ldlm_refresh_waiting_lock(lock, opd->opd_timeout);
         opd->opd_lock_match = 1;
 
         return LDLM_ITER_CONTINUE;
 }
 
-static int ost_rw_prolong_locks(struct obd_export *exp, struct obd_ioobj *obj,
+static int ost_rw_prolong_locks(struct ptlrpc_request *req, struct obd_ioobj *obj,
                                 struct niobuf_remote *nb, struct obdo *oa,
                                 ldlm_mode_t mode)
 {
@@ -557,11 +558,17 @@ static int ost_rw_prolong_locks(struct obd_export *exp, struct obd_ioobj *obj,
         osc_build_res_name(obj->ioo_id, obj->ioo_gr, &res_id);
 
         opd.opd_mode = mode;
-        opd.opd_exp = exp;
+        opd.opd_exp = req->rq_export;
         opd.opd_policy.l_extent.start = nb[0].offset & CFS_PAGE_MASK;
         opd.opd_policy.l_extent.end = (nb[nrbufs - 1].offset +
                                        nb[nrbufs - 1].len - 1) | ~CFS_PAGE_MASK;
 
+        /* prolong locks for the current service time of the corresponding
+         * portal (= OST_IO_PORTAL) */
+        opd.opd_timeout = AT_OFF ? obd_timeout / 2:
+                          max(at_est2timeout(at_get(&req->rq_rqbd->
+                              rqbd_service->srv_at_estimate)), ldlm_timeout);
+
         CDEBUG(D_DLMTRACE,"refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
                res_id.name[0], res_id.name[1], opd.opd_policy.l_extent.start,
                opd.opd_policy.l_extent.end);
@@ -591,7 +598,7 @@ static int ost_rw_prolong_locks(struct obd_export *exp, struct obd_ioobj *obj,
         }
 
         opd.opd_oa = oa;
-        ldlm_resource_iterate(exp->exp_obd->obd_namespace, &res_id,
+        ldlm_resource_iterate(req->rq_export->exp_obd->obd_namespace, &res_id,
                               ost_prolong_locks_iter, &opd);
         RETURN(opd.opd_lock_match);
 }
@@ -686,7 +693,7 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
         if (desc == NULL) /* XXX: check all cleanup stuff */
                 GOTO(out, rc = -ENOMEM);
 
-        ost_rw_prolong_locks(exp, ioo, remote_nb, &body->oa, LCK_PW | LCK_PR);
+        ost_rw_prolong_locks(req, ioo, remote_nb, &body->oa, LCK_PW | LCK_PR);
 
         nob = 0;
         for (i = 0; i < npages; i++) {
@@ -932,7 +939,7 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
                 GOTO(out_lock, rc = -ETIMEDOUT);
         }
 
-        ost_rw_prolong_locks(exp, ioo, remote_nb,&body->oa,  LCK_PW);
+        ost_rw_prolong_locks(req, ioo, remote_nb,&body->oa,  LCK_PW);
 
         /* obd_preprw clobbers oa->valid, so save what we need */
         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
@@ -1645,10 +1652,10 @@ static int ost_rw_hpreq_check(struct ptlrpc_request *req)
         mode = LCK_PW;
         if (opc == OST_READ)
                 mode |= LCK_PR;
-        RETURN(ost_rw_prolong_locks(req->rq_export, ioo, nb, &body->oa, mode));
+        RETURN(ost_rw_prolong_locks(req, ioo, nb, &body->oa, mode));
 }
 
-static int ost_punch_prolong_locks(struct obd_export *exp, struct obdo *oa)
+static int ost_punch_prolong_locks(struct ptlrpc_request *req, struct obdo *oa)
 {
         struct ldlm_res_id res_id = { .name = { oa->o_id } };
         struct ost_prolong_data opd = { 0 };
@@ -1659,19 +1666,25 @@ static int ost_punch_prolong_locks(struct obd_export *exp, struct obdo *oa)
         end = start + oa->o_blocks;
 
         opd.opd_mode = LCK_PW;
-        opd.opd_exp = exp;
+        opd.opd_exp = req->rq_export;
         opd.opd_policy.l_extent.start = start & CFS_PAGE_MASK;
         if (oa->o_blocks == OBD_OBJECT_EOF || end < start)
                 opd.opd_policy.l_extent.end = OBD_OBJECT_EOF;
         else
                 opd.opd_policy.l_extent.end = end | ~CFS_PAGE_MASK;
 
+        /* prolong locks for the current service time of the corresponding
+         * portal (= OST_IO_PORTAL) */
+        opd.opd_timeout = AT_OFF ? obd_timeout / 2:
+                          max(at_est2timeout(at_get(&req->rq_rqbd->
+                              rqbd_service->srv_at_estimate)), ldlm_timeout);
+
         CDEBUG(D_DLMTRACE,"refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
                res_id.name[0], res_id.name[1], opd.opd_policy.l_extent.start,
                opd.opd_policy.l_extent.end);
 
         opd.opd_oa = oa;
-        ldlm_resource_iterate(exp->exp_obd->obd_namespace, &res_id,
+        ldlm_resource_iterate(req->rq_export->exp_obd->obd_namespace, &res_id,
                               ost_prolong_locks_iter, &opd);
         RETURN(opd.opd_lock_match);
 }
@@ -1700,7 +1713,7 @@ static int ost_punch_hpreq_check(struct ptlrpc_request *req)
         LASSERT(!(body->oa.o_valid & OBD_MD_FLFLAGS) ||
                 !(body->oa.o_flags & OBD_FL_TRUNCLOCK));
 
-        RETURN(ost_punch_prolong_locks(req->rq_export, &body->oa));
+        RETURN(ost_punch_prolong_locks(req, &body->oa));
 }
 
 struct ptlrpc_hpreq_ops ost_hpreq_rw = {