Whamcloud - gitweb
Allow llrd to evict clients directly on OSTs.
[fs/lustre-release.git] / lustre / ost / ost_handler.c
index d2a8ade..c18d919 100644 (file)
@@ -660,6 +660,7 @@ static void ost_prolong_locks(struct obd_export *exp, struct obd_ioobj *obj,
 static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
 {
         struct ptlrpc_bulk_desc *desc;
+        struct obd_export *exp = req->rq_export;
         struct niobuf_remote *remote_nb;
         struct niobuf_remote *pp_rnb = NULL;
         struct niobuf_local *local_nb;
@@ -681,6 +682,17 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK | OBD_FAIL_ONCE,
                          (obd_timeout + 1) / 4);
 
+        /* Check if there is eviction in progress, and if so, wait for it to
+         * finish */
+        if (unlikely(atomic_read(&exp->exp_obd->obd_evict_inprogress))) {
+                lwi = LWI_INTR(NULL, NULL); // We do not care how long it takes
+                rc = l_wait_event(exp->exp_obd->obd_evict_inprogress_waitq,
+                        !atomic_read(&exp->exp_obd->obd_evict_inprogress),
+                        &lwi);
+        }
+        if (exp->exp_failed)
+                GOTO(out, rc = -ENOTCONN);
+
         body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
                                   lustre_swab_ost_body);
         if (body == NULL) {
@@ -743,7 +755,7 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
         if (desc == NULL)
                 GOTO(out, rc = -ENOMEM);
 
-        rc = ost_brw_lock_get(LCK_PR, req->rq_export, ioo, pp_rnb, &lockh);
+        rc = ost_brw_lock_get(LCK_PR, exp, ioo, pp_rnb, &lockh);
         if (rc != 0)
                 GOTO(out_bulk, rc);
 
@@ -761,12 +773,12 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
                 GOTO(out_lock, rc = -ETIMEDOUT);
         }
 
-        rc = obd_preprw(OBD_BRW_READ, req->rq_export, &body->oa, 1,
+        rc = obd_preprw(OBD_BRW_READ, exp, &body->oa, 1,
                         ioo, npages, pp_rnb, local_nb, oti, capa);
         if (rc != 0)
                 GOTO(out_lock, rc);
 
-        ost_prolong_locks(req->rq_export, ioo, pp_rnb, LCK_PW | LCK_PR);
+        ost_prolong_locks(exp, ioo, pp_rnb, LCK_PW | LCK_PR);
 
         nob = 0;
         for (i = 0; i < npages; i++) {
@@ -807,7 +819,18 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
         /* Check if client was evicted while we were doing i/o before touching
            network */
         if (rc == 0) {
-                if (desc->bd_export->exp_failed)
+                /* Check if there is eviction in progress, and if so, wait for
+                 * it to finish */
+                if (unlikely(atomic_read(&exp->exp_obd->
+                                                obd_evict_inprogress))) {
+                        lwi = LWI_INTR(NULL, NULL);
+                        rc = l_wait_event(exp->exp_obd->
+                                                obd_evict_inprogress_waitq,
+                                          !atomic_read(&exp->exp_obd->
+                                                        obd_evict_inprogress),
+                                          &lwi);
+                }
+                if (exp->exp_failed)
                         rc = -ENOTCONN;
                 else {
                         sptlrpc_svc_wrap_bulk(req, desc);
@@ -820,12 +843,12 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
                                                    ost_bulk_timeout, desc);
                         rc = l_wait_event(desc->bd_waitq,
                                           !ptlrpc_bulk_active(desc) ||
-                                          desc->bd_export->exp_failed, &lwi);
+                                          exp->exp_failed, &lwi);
                         LASSERT(rc == 0 || rc == -ETIMEDOUT);
                         if (rc == -ETIMEDOUT) {
                                 DEBUG_REQ(D_ERROR, req, "timeout on bulk PUT");
                                 ptlrpc_abort_bulk(desc);
-                        } else if (desc->bd_export->exp_failed) {
+                        } else if (exp->exp_failed) {
                                 DEBUG_REQ(D_ERROR, req, "Eviction on bulk PUT");
                                 rc = -ENOTCONN;
                                 ptlrpc_abort_bulk(desc);
@@ -846,7 +869,7 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
         }
 
         /* Must commit after prep above in all cases */
-        rc = obd_commitrw(OBD_BRW_READ, req->rq_export, &body->oa, 1,
+        rc = obd_commitrw(OBD_BRW_READ, exp, &body->oa, 1,
                           ioo, npages, local_nb, oti, rc);
 
         ost_nio_pages_put(req, local_nb, npages);
@@ -880,9 +903,9 @@ out:
                 }
                 CWARN("%s: ignoring bulk IO comm error with %s@%s id %s - "
                       "client will retry\n",
-                      req->rq_export->exp_obd->obd_name,
-                      req->rq_export->exp_client_uuid.uuid,
-                      req->rq_export->exp_connection->c_remote_uuid.uuid,
+                      exp->exp_obd->obd_name,
+                      exp->exp_client_uuid.uuid,
+                      exp->exp_connection->c_remote_uuid.uuid,
                       libcfs_id2str(req->rq_peer));
         }
 
@@ -892,6 +915,7 @@ out:
 static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
 {
         struct ptlrpc_bulk_desc *desc;
+        struct obd_export       *exp = req->rq_export;
         struct niobuf_remote    *remote_nb;
         struct niobuf_remote    *pp_rnb;
         struct niobuf_local     *local_nb;
@@ -917,6 +941,17 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK | OBD_FAIL_ONCE,
                          (obd_timeout + 1) / 4);
 
+        /* Check if there is eviction in progress, and if so, wait for it to
+         * finish */
+        if (unlikely(atomic_read(&exp->exp_obd->obd_evict_inprogress))) {
+                lwi = LWI_INTR(NULL, NULL); // We do not care how long it takes
+                rc = l_wait_event(exp->exp_obd->obd_evict_inprogress_waitq,
+                        !atomic_read(&exp->exp_obd->obd_evict_inprogress),
+                        &lwi);
+        }
+        if (exp->exp_failed)
+                GOTO(out, rc = -ENOTCONN);
+
         swab = lustre_msg_swabbed(req->rq_reqmsg);
         body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
                                   lustre_swab_ost_body);
@@ -1000,7 +1035,7 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
         if (desc == NULL)
                 GOTO(out, rc = -ENOMEM);
 
-        rc = ost_brw_lock_get(LCK_PW, req->rq_export, ioo, pp_rnb, &lockh);
+        rc = ost_brw_lock_get(LCK_PW, exp, ioo, pp_rnb, &lockh);
         if (rc != 0)
                 GOTO(out_bulk, rc);
 
@@ -1018,7 +1053,7 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
                 GOTO(out_lock, rc = -ETIMEDOUT);
         }
 
-        ost_prolong_locks(req->rq_export, ioo, pp_rnb, LCK_PW);
+        ost_prolong_locks(exp, ioo, pp_rnb, LCK_PW);
 
         /* obd_preprw clobbers oa->valid, so save what we need */
         client_cksum = body->oa.o_valid & OBD_MD_FLCKSUM ? body->oa.o_cksum : 0;
@@ -1031,7 +1066,7 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
                 body->oa.o_valid &= ~OBD_MD_FLGRANT;
         }
 
-        rc = obd_preprw(OBD_BRW_WRITE, req->rq_export, &body->oa, objcount,
+        rc = obd_preprw(OBD_BRW_WRITE, exp, &body->oa, objcount,
                         ioo, npages, pp_rnb, local_nb, oti, capa);
         if (rc != 0)
                 GOTO(out_lock, rc);
@@ -1099,8 +1134,19 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
 
         sptlrpc_svc_unwrap_bulk(req, desc);
 
+        /* Check if there is eviction in progress, and if so, wait for
+         * it to finish */
+        if (unlikely(atomic_read(&exp->exp_obd->obd_evict_inprogress))) {
+                lwi = LWI_INTR(NULL, NULL);
+                rc = l_wait_event(exp->exp_obd->obd_evict_inprogress_waitq,
+                        !atomic_read(&exp->exp_obd->obd_evict_inprogress),
+                        &lwi);
+        }
+        if (rc == 0 && exp->exp_failed)
+                rc = -ENOTCONN;
+
         /* Must commit after prep above in all cases */
-        rc = obd_commitrw(OBD_BRW_WRITE, req->rq_export, &repbody->oa,
+        rc = obd_commitrw(OBD_BRW_WRITE, exp, &repbody->oa,
                            objcount, ioo, npages, local_nb, oti, rc);
 
         if (unlikely(client_cksum != server_cksum && rc == 0)) {
@@ -1126,7 +1172,7 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
                 LCONSOLE_ERROR_MSG(0x168, "%s: BAD WRITE CHECKSUM: %s from "
                                    "%s%s%s inum "LPU64"/"LPU64" object "
                                    LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
-                                   req->rq_export->exp_obd->obd_name, msg,
+                                   exp->exp_obd->obd_name, msg,
                                    libcfs_id2str(req->rq_peer),
                                    via, router,
                                    body->oa.o_valid & OBD_MD_FLFID ?
@@ -1186,9 +1232,9 @@ out:
                 }
                 CWARN("%s: ignoring bulk IO comm error with %s@%s id %s - "
                       "client will retry\n",
-                      req->rq_export->exp_obd->obd_name,
-                      req->rq_export->exp_client_uuid.uuid,
-                      req->rq_export->exp_connection->c_remote_uuid.uuid,
+                      exp->exp_obd->obd_name,
+                      exp->exp_client_uuid.uuid,
+                      exp->exp_connection->c_remote_uuid.uuid,
                       libcfs_id2str(req->rq_peer));
         }
         RETURN(rc);