Whamcloud - gitweb
b=17887
[fs/lustre-release.git] / lustre / ost / ost_handler.c
index d76b2e2..cb520bb 100644 (file)
@@ -495,6 +495,7 @@ struct ost_prolong_data {
         struct obdo *opd_oa;
         ldlm_mode_t opd_mode;
         int opd_lock_match;
+        int opd_timeout;
 };
 
 static int ost_prolong_locks_iter(struct ldlm_lock *lock, void *data)
@@ -539,13 +540,13 @@ static int ost_prolong_locks_iter(struct ldlm_lock *lock, void *data)
 
         /* OK. this is a possible lock the user holds doing I/O
          * let's refresh eviction timer for it */
-        ldlm_refresh_waiting_lock(lock);
+        ldlm_refresh_waiting_lock(lock, opd->opd_timeout);
         opd->opd_lock_match = 1;
 
         return LDLM_ITER_CONTINUE;
 }
 
-static int ost_rw_prolong_locks(struct obd_export *exp, struct obd_ioobj *obj,
+static int ost_rw_prolong_locks(struct ptlrpc_request *req, struct obd_ioobj *obj,
                                 struct niobuf_remote *nb, struct obdo *oa,
                                 ldlm_mode_t mode)
 {
@@ -557,11 +558,17 @@ static int ost_rw_prolong_locks(struct obd_export *exp, struct obd_ioobj *obj,
         osc_build_res_name(obj->ioo_id, obj->ioo_gr, &res_id);
 
         opd.opd_mode = mode;
-        opd.opd_exp = exp;
+        opd.opd_exp = req->rq_export;
         opd.opd_policy.l_extent.start = nb[0].offset & CFS_PAGE_MASK;
         opd.opd_policy.l_extent.end = (nb[nrbufs - 1].offset +
                                        nb[nrbufs - 1].len - 1) | ~CFS_PAGE_MASK;
 
+        /* prolong locks for the current service time of the corresponding
+         * portal (= OST_IO_PORTAL) */
+        opd.opd_timeout = AT_OFF ? obd_timeout / 2:
+                          max(at_est2timeout(at_get(&req->rq_rqbd->
+                              rqbd_service->srv_at_estimate)), ldlm_timeout);
+
         CDEBUG(D_DLMTRACE,"refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
                res_id.name[0], res_id.name[1], opd.opd_policy.l_extent.start,
                opd.opd_policy.l_extent.end);
@@ -591,7 +598,7 @@ static int ost_rw_prolong_locks(struct obd_export *exp, struct obd_ioobj *obj,
         }
 
         opd.opd_oa = oa;
-        ldlm_resource_iterate(exp->exp_obd->obd_namespace, &res_id,
+        ldlm_resource_iterate(req->rq_export->exp_obd->obd_namespace, &res_id,
                               ost_prolong_locks_iter, &opd);
         RETURN(opd.opd_lock_match);
 }
@@ -686,7 +693,7 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
         if (desc == NULL) /* XXX: check all cleanup stuff */
                 GOTO(out, rc = -ENOMEM);
 
-        ost_rw_prolong_locks(exp, ioo, remote_nb, &body->oa, LCK_PW | LCK_PR);
+        ost_rw_prolong_locks(req, ioo, remote_nb, &body->oa, LCK_PW | LCK_PR);
 
         nob = 0;
         for (i = 0; i < npages; i++) {
@@ -744,9 +751,9 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
                 if (exp->exp_failed)
                         rc = -ENOTCONN;
                 else {
-                        sptlrpc_svc_wrap_bulk(req, desc);
-
-                        rc = ptlrpc_start_bulk_transfer(desc);
+                        rc = sptlrpc_svc_wrap_bulk(req, desc);
+                        if (rc == 0)
+                                rc = ptlrpc_start_bulk_transfer(desc);
                 }
 
                 if (rc == 0) {
@@ -932,7 +939,7 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
                 GOTO(out_lock, rc = -ETIMEDOUT);
         }
 
-        ost_rw_prolong_locks(exp, ioo, remote_nb,&body->oa,  LCK_PW);
+        ost_rw_prolong_locks(req, ioo, remote_nb,&body->oa,  LCK_PW);
 
         /* obd_preprw clobbers oa->valid, so save what we need */
         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
@@ -971,6 +978,10 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
                                       local_nb[i].offset & ~CFS_PAGE_MASK,
                                       local_nb[i].len);
 
+        rc = sptlrpc_svc_prep_bulk(req, desc);
+        if (rc != 0)
+                GOTO(out_lock, rc);
+
         /* Check if client was evicted while we were doing i/o before touching
            network */
         if (desc->bd_export->exp_failed)
@@ -1005,23 +1016,18 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
                         DEBUG_REQ(D_ERROR, req, "Eviction on bulk GET");
                         rc = -ENOTCONN;
                         ptlrpc_abort_bulk(desc);
-                } else if (!desc->bd_success ||
-                           desc->bd_nob_transferred != desc->bd_nob) {
-                        DEBUG_REQ(D_ERROR, req, "%s bulk GET %d(%d)",
-                                  desc->bd_success ?
-                                  "truncated" : "network error on",
-                                  desc->bd_nob_transferred, desc->bd_nob);
+                } else if (!desc->bd_success) {
+                        DEBUG_REQ(D_ERROR, req, "network error on bulk GET");
                         /* XXX should this be a different errno? */
                         rc = -ETIMEDOUT;
+                } else {
+                        rc = sptlrpc_svc_unwrap_bulk(req, desc);
                 }
         } else {
                 DEBUG_REQ(D_ERROR, req, "ptlrpc_bulk_get failed: rc %d", rc);
         }
         no_reply = rc != 0;
 
-        if (rc == 0)
-                sptlrpc_svc_unwrap_bulk(req, desc);
-
         repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
                                  sizeof(*repbody));
         memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
@@ -1599,6 +1605,11 @@ static int ost_rw_hpreq_lock_match(struct ptlrpc_request *req,
         end = (nb[ioo->ioo_bufcnt - 1].offset +
                nb[ioo->ioo_bufcnt - 1].len - 1) | ~CFS_PAGE_MASK;
 
+        LASSERT(lock->l_resource != NULL);
+        if (!osc_res_name_eq(ioo->ioo_id, ioo->ioo_gr, 
+                             &lock->l_resource->lr_name))
+                RETURN(0);
+
         if (!(lock->l_granted_mode & mode))
                 RETURN(0);
 
@@ -1645,10 +1656,10 @@ static int ost_rw_hpreq_check(struct ptlrpc_request *req)
         mode = LCK_PW;
         if (opc == OST_READ)
                 mode |= LCK_PR;
-        RETURN(ost_rw_prolong_locks(req->rq_export, ioo, nb, &body->oa, mode));
+        RETURN(ost_rw_prolong_locks(req, ioo, nb, &body->oa, mode));
 }
 
-static int ost_punch_prolong_locks(struct obd_export *exp, struct obdo *oa)
+static int ost_punch_prolong_locks(struct ptlrpc_request *req, struct obdo *oa)
 {
         struct ldlm_res_id res_id = { .name = { oa->o_id } };
         struct ost_prolong_data opd = { 0 };
@@ -1659,19 +1670,25 @@ static int ost_punch_prolong_locks(struct obd_export *exp, struct obdo *oa)
         end = start + oa->o_blocks;
 
         opd.opd_mode = LCK_PW;
-        opd.opd_exp = exp;
+        opd.opd_exp = req->rq_export;
         opd.opd_policy.l_extent.start = start & CFS_PAGE_MASK;
         if (oa->o_blocks == OBD_OBJECT_EOF || end < start)
                 opd.opd_policy.l_extent.end = OBD_OBJECT_EOF;
         else
                 opd.opd_policy.l_extent.end = end | ~CFS_PAGE_MASK;
 
+        /* prolong locks for the current service time of the corresponding
+         * portal (= OST_IO_PORTAL) */
+        opd.opd_timeout = AT_OFF ? obd_timeout / 2:
+                          max(at_est2timeout(at_get(&req->rq_rqbd->
+                              rqbd_service->srv_at_estimate)), ldlm_timeout);
+
         CDEBUG(D_DLMTRACE,"refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
                res_id.name[0], res_id.name[1], opd.opd_policy.l_extent.start,
                opd.opd_policy.l_extent.end);
 
         opd.opd_oa = oa;
-        ldlm_resource_iterate(exp->exp_obd->obd_namespace, &res_id,
+        ldlm_resource_iterate(req->rq_export->exp_obd->obd_namespace, &res_id,
                               ost_prolong_locks_iter, &opd);
         RETURN(opd.opd_lock_match);
 }
@@ -1700,7 +1717,7 @@ static int ost_punch_hpreq_check(struct ptlrpc_request *req)
         LASSERT(!(body->oa.o_valid & OBD_MD_FLFLAGS) ||
                 !(body->oa.o_flags & OBD_FL_TRUNCLOCK));
 
-        RETURN(ost_punch_prolong_locks(req->rq_export, &body->oa));
+        RETURN(ost_punch_prolong_locks(req, &body->oa));
 }
 
 struct ptlrpc_hpreq_ops ost_hpreq_rw = {