Whamcloud - gitweb
land b_eq on HEAD
[fs/lustre-release.git] / lustre / ost / ost_handler.c
index dfdcf1c..f5c5579 100644 (file)
@@ -70,7 +70,8 @@ void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
         for (ack_lock = oti->oti_ack_locks, i = 0; i < 4; i++, ack_lock++) {
                 if (!ack_lock->mode)
                         break;
-                ldlm_put_lock_into_req(req, &ack_lock->lock, ack_lock->mode);
+                /* XXX not even calling target_send_reply in some cases... */
+                ptlrpc_save_lock (req, &ack_lock->lock, ack_lock->mode);
         }
 }
 
@@ -417,7 +418,8 @@ static int ost_brw_read(struct ptlrpc_request *req)
         if (local_nb == NULL)
                 GOTO(out_pp_rnb, rc = -ENOMEM);
 
-        desc = ptlrpc_prep_bulk_exp(req, BULK_PUT_SOURCE, OST_BULK_PORTAL);
+        desc = ptlrpc_prep_bulk_exp (req, npages, 
+                                     BULK_PUT_SOURCE, OST_BULK_PORTAL);
         if (desc == NULL)
                 GOTO(out_local, rc = -ENOMEM);
 
@@ -439,11 +441,9 @@ static int ost_brw_read(struct ptlrpc_request *req)
                 nob += page_rc;
                 if (page_rc != 0) {             /* some data! */
                         LASSERT (local_nb[i].page != NULL);
-                        rc = ptlrpc_prep_bulk_page(desc, local_nb[i].page,
-                                                   pp_rnb[i].offset& ~PAGE_MASK,
-                                                   page_rc);
-                        if (rc != 0)
-                                break;
+                        ptlrpc_prep_bulk_page(desc, local_nb[i].page,
+                                              pp_rnb[i].offset & (PAGE_SIZE - 1),
+                                              page_rc);
                 }
 
                 if (page_rc != pp_rnb[i].len) { /* short read */
@@ -455,16 +455,25 @@ static int ost_brw_read(struct ptlrpc_request *req)
         }
 
         if (rc == 0) {
-                rc = ptlrpc_bulk_put(desc);
+                rc = ptlrpc_start_bulk_transfer(desc);
                 if (rc == 0) {
                         lwi = LWI_TIMEOUT(obd_timeout * HZ / 4,
                                           ost_bulk_timeout, desc);
                         rc = l_wait_event(desc->bd_waitq,
-                                          ptlrpc_bulk_complete(desc), &lwi);
-                        if (rc) {
-                                LASSERT(rc == -ETIMEDOUT);
+                                          !ptlrpc_bulk_active(desc), &lwi);
+                        LASSERT(rc == 0 || rc == -ETIMEDOUT);
+                        if (rc == -ETIMEDOUT) {
                                 DEBUG_REQ(D_ERROR, req, "timeout on bulk PUT");
                                 ptlrpc_abort_bulk(desc);
+                        } else if (!desc->bd_success ||
+                                   desc->bd_nob_transferred != desc->bd_nob) {
+                                DEBUG_REQ(D_ERROR, req, "%s bulk PUT %d(%d)",
+                                          desc->bd_success ? 
+                                          "truncated" : "network error on",
+                                          desc->bd_nob_transferred, 
+                                          desc->bd_nob);
+                                /* XXX should this be a different errno? */
+                                rc = -ETIMEDOUT;
                         }
                 } else {
                         DEBUG_REQ(D_ERROR, req, "bulk PUT failed: rc %d\n", rc);
@@ -502,9 +511,9 @@ static int ost_brw_read(struct ptlrpc_request *req)
                 req->rq_status = rc;
                 ptlrpc_error(req);
         } else {
-                if (req->rq_repmsg != NULL) {
+                if (req->rq_reply_state != NULL) {
                         /* reply out callback would free */
-                        OBD_FREE(req->rq_repmsg, req->rq_replen);
+                        lustre_free_reply_state (req->rq_reply_state);
                 }
                 if (req->rq_reqmsg->conn_cnt == req->rq_export->exp_conn_cnt) {
                         CERROR("bulk IO comms error: "
@@ -545,7 +554,7 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
         int                      objcount, niocount, npages;
         int                      comms_error = 0;
         int                      rc, rc2, swab, i, j;
-        char                    str[PTL_NALFMT_SIZE];
+        char                     str[PTL_NALFMT_SIZE];
         ENTRY;
 
         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
@@ -607,7 +616,8 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
         if (local_nb == NULL)
                 GOTO(out_pp_rnb, rc = -ENOMEM);
 
-        desc = ptlrpc_prep_bulk_exp(req, BULK_GET_SINK, OST_BULK_PORTAL);
+        desc = ptlrpc_prep_bulk_exp (req, npages, 
+                                     BULK_GET_SINK, OST_BULK_PORTAL);
         if (desc == NULL)
                 GOTO(out_local, rc = -ENOMEM);
 
@@ -618,31 +628,34 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
 
         /* NB Having prepped, we must commit... */
 
-        for (i = 0; i < npages; i++) {
-                rc = ptlrpc_prep_bulk_page(desc, local_nb[i].page,
-                                           pp_rnb[i].offset & (PAGE_SIZE - 1),
-                                           pp_rnb[i].len);
-                if (rc != 0)
-                        break;
-        }
+        for (i = 0; i < npages; i++)
+                ptlrpc_prep_bulk_page(desc, local_nb[i].page, 
+                                      pp_rnb[i].offset & (PAGE_SIZE - 1),
+                                      pp_rnb[i].len);
 
+        rc = ptlrpc_start_bulk_transfer (desc);
         if (rc == 0) {
-                rc = ptlrpc_bulk_get(desc);
-                if (rc == 0) {
-                        lwi = LWI_TIMEOUT(obd_timeout * HZ / 4,
-                                          ost_bulk_timeout, desc);
-                        rc = l_wait_event(desc->bd_waitq,
-                                          ptlrpc_bulk_complete(desc), &lwi);
-                        if (rc) {
-                                LASSERT(rc == -ETIMEDOUT);
-                                DEBUG_REQ(D_ERROR, req, "timeout on bulk GET");
-                                ptlrpc_abort_bulk(desc);
-                        }
-                } else {
-                        DEBUG_REQ(D_ERROR, req, "bulk GET failed: rc %d\n", rc);
+                lwi = LWI_TIMEOUT(obd_timeout * HZ / 4,
+                                  ost_bulk_timeout, desc);
+                rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), 
+                                  &lwi);
+                LASSERT(rc == 0 || rc == -ETIMEDOUT);
+                if (rc == -ETIMEDOUT) {
+                        DEBUG_REQ(D_ERROR, req, "timeout on bulk GET");
+                        ptlrpc_abort_bulk(desc);
+                } else if (!desc->bd_success ||
+                           desc->bd_nob_transferred != desc->bd_nob) {
+                        DEBUG_REQ(D_ERROR, req, "%s bulk GET %d(%d)",
+                                  desc->bd_success ? 
+                                  "truncated" : "network error on",
+                                  desc->bd_nob_transferred, desc->bd_nob);
+                        /* XXX should this be a different errno? */
+                        rc = -ETIMEDOUT;
                 }
-                comms_error = rc != 0;
+        } else {
+                DEBUG_REQ(D_ERROR, req, "ptlrpc_bulk_get failed: rc %d\n", rc);
         }
+        comms_error = rc != 0;
 
         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
         memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
@@ -710,9 +723,9 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
                 req->rq_status = rc;
                 ptlrpc_error(req);
         } else {
-                if (req->rq_repmsg != NULL) {
+                if (req->rq_reply_state != NULL) {
                         /* reply out callback would free */
-                        OBD_FREE (req->rq_repmsg, req->rq_replen);
+                        lustre_free_reply_state (req->rq_reply_state);
                 }
                 if (req->rq_reqmsg->conn_cnt == req->rq_export->exp_conn_cnt) {
                         CERROR("bulk IO comms error: "
@@ -806,8 +819,6 @@ out_pp_rnb:
         free_per_page_niobufs(npages, pp_rnb, remote_nb);
 out:
         if (rc) {
-                OBD_FREE(req->rq_repmsg, req->rq_replen);
-                req->rq_repmsg = NULL;
                 req->rq_status = rc;
                 ptlrpc_error(req);
         } else
@@ -1122,11 +1133,11 @@ static int ost_setup(struct obd_device *obddev, obd_count len, void *buf)
         if (rc < 0)
                 RETURN(rc);
 
-        ost->ost_service = ptlrpc_init_svc(OST_NEVENTS, OST_NBUFS,
-                                           OST_BUFSIZE, OST_MAXREQSIZE,
-                                           OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
-                                           ost_handle, "ost", 
-                                           obddev->obd_proc_entry);
+        ost->ost_service = 
+                ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
+                                OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
+                                ost_handle, "ost",
+                                obddev->obd_proc_entry);
         if (ost->ost_service == NULL) {
                 CERROR("failed to start service\n");
                 RETURN(-ENOMEM);
@@ -1138,9 +1149,9 @@ static int ost_setup(struct obd_device *obddev, obd_count len, void *buf)
                 GOTO(out, rc = -EINVAL);
 
         ost->ost_create_service =
-                ptlrpc_init_svc(OST_NEVENTS, OST_NBUFS, OST_BUFSIZE,
-                                OST_MAXREQSIZE, OST_CREATE_PORTAL,
-                                OSC_REPLY_PORTAL, ost_handle, "ost_create",
+                ptlrpc_init_svc(OST_NBUFS, OST_BUFSIZE, OST_MAXREQSIZE,
+                                OST_CREATE_PORTAL, OSC_REPLY_PORTAL,
+                                ost_handle, "ost_create",
                                 obddev->obd_proc_entry);
         if (ost->ost_create_service == NULL) {
                 CERROR("failed to start OST create service\n");