Whamcloud - gitweb
Remove no-longer-needed inode operations (they previously had extN EA VFS
[fs/lustre-release.git] / lustre / ost / ost_handler.c
index ba71208..67ca61a 100644 (file)
@@ -252,10 +252,11 @@ static int ost_get_info(struct ost_obd *ost, struct ptlrpc_request *req)
 
 static int ost_brw_read(struct ost_obd *obddev, struct ptlrpc_request *req)
 {
-        struct ptlrpc_bulk_desc **bulk_vec = NULL, *bulk = NULL;
+        struct ptlrpc_bulk_desc *desc;
         struct obd_conn conn;
         void *tmp1, *tmp2, *end2;
-        struct niobuf *nb, *dst, *res = NULL;
+        struct niobuf_remote *remote_nb;
+        struct niobuf_local *local_nb = NULL;
         struct obd_ioobj *ioo;
         struct ost_body *body;
         int rc, cmd, i, j, objcount, niocount, size = sizeof(*body);
@@ -266,7 +267,7 @@ static int ost_brw_read(struct ost_obd *obddev, struct ptlrpc_request *req)
         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
         end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
-        niocount = req->rq_reqmsg->buflens[2] / sizeof(*nb);
+        niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
         cmd = body->data;
 
         conn.oc_id = body->connid;
@@ -276,79 +277,71 @@ static int ost_brw_read(struct ost_obd *obddev, struct ptlrpc_request *req)
                 ost_unpack_ioo(&tmp1, &ioo);
                 if (tmp2 + ioo->ioo_bufcnt > end2) {
                         LBUG();
-                        rc = -EFAULT;
-                        break;
+                        GOTO(out, rc = -EFAULT);
                 }
                 for (j = 0; j < ioo->ioo_bufcnt; j++)
-                        ost_unpack_niobuf(&tmp2, &nb);
+                        ost_unpack_niobuf(&tmp2, &remote_nb);
         }
 
         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
         if (rc)
                 RETURN(rc);
-        OBD_ALLOC(res, sizeof(*res) * niocount);
-        if (res == NULL)
+        OBD_ALLOC(local_nb, sizeof(*local_nb) * niocount);
+        if (local_nb == NULL)
                 RETURN(-ENOMEM);
 
         /* The unpackers move tmp1 and tmp2, so reset them before using */
         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
         req->rq_status = obd_preprw(cmd, &conn, objcount,
-                                    tmp1, niocount, tmp2, res);
+                                    tmp1, niocount, tmp2, local_nb);
 
         if (req->rq_status)
-                GOTO(out, 0);
+                GOTO(out_local, 0);
 
-        for (i = 0; i < niocount; i++) {
-                bulk = ptlrpc_prep_bulk(req->rq_connection);
-                if (bulk == NULL) {
-                        CERROR("cannot alloc bulk desc\n");
-                        GOTO(out, rc = -ENOMEM);
-                }
+        desc = ptlrpc_prep_bulk(req->rq_connection);
+        if (desc == NULL)
+                GOTO(out_local, rc = -ENOMEM);
+        desc->b_portal = OST_BULK_PORTAL;
 
-                dst = &(((struct niobuf *)tmp2)[i]);
-                bulk->b_xid = dst->xid;
-                bulk->b_buf = (void *)(unsigned long)res[i].addr;
+        for (i = 0; i < niocount; i++) {
+                struct ptlrpc_bulk_page *bulk;
+                bulk = ptlrpc_prep_bulk_page(desc);
+                if (bulk == NULL)
+                        GOTO(out_bulk, rc = -ENOMEM);
+                remote_nb = &(((struct niobuf_remote *)tmp2)[i]);
+                bulk->b_xid = remote_nb->xid;
+                bulk->b_buf = (void *)(unsigned long)local_nb[i].addr;
                 bulk->b_buflen = PAGE_SIZE;
-                rc = ptlrpc_send_bulk(bulk, OST_BULK_PORTAL);
-                if (rc)
-                        GOTO(out, rc);
-                wait_event_interruptible(bulk->b_waitq,
-                                         ptlrpc_check_bulk_sent(bulk));
+        }
 
-                if (bulk->b_flags & PTL_RPC_FL_INTR)
-                        GOTO(out, 0);
+        rc = ptlrpc_send_bulk(desc);
+        if (rc)
+                GOTO(out_bulk, rc);
 
-                ptlrpc_free_bulk(bulk);
-        }
-        bulk = NULL;
+        ptlrpc_free_bulk(desc);
 
         /* The unpackers move tmp1 and tmp2, so reset them before using */
         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
         req->rq_status = obd_commitrw(cmd, &conn, objcount,
-                                      tmp1, niocount, res);
+                                      tmp1, niocount, local_nb);
 
-        EXIT;
- out:
-        if (res != NULL)
-                OBD_FREE(res, sizeof(*res) * niocount);
-        if (bulk != NULL)
-                ptlrpc_free_bulk(bulk);
-        if (bulk_vec != NULL) {
-                for (i = 0; i < niocount; i++)
-                        if (bulk_vec[i] != NULL)
-                                ptlrpc_free_bulk(bulk_vec[i]);
-                OBD_FREE(bulk_vec, niocount * sizeof(*bulk_vec));
-        }
+        RETURN(rc);
 
+ out_bulk:
+        ptlrpc_free_bulk(desc);
+ out_local:
+        if (local_nb != NULL)
+                OBD_FREE(local_nb, sizeof(*local_nb) * niocount);
+ out:
         return 0;
 }
 
 static int ost_commit_page(struct obd_conn *conn, struct page *page)
 {
         struct obd_ioobj obj;
-        struct niobuf buf;
+        struct niobuf_local buf;
         int rc;
         ENTRY;
 
@@ -362,23 +355,43 @@ static int ost_commit_page(struct obd_conn *conn, struct page *page)
         RETURN(rc);
 }
 
-static int ost_brw_write_cb(struct ptlrpc_bulk_desc *bulk, void *data)
+static int ost_brw_write_cb(struct ptlrpc_bulk_page *bulk)
 {
+        void *journal_save;
         int rc;
-
         ENTRY;
 
-        rc = ost_commit_page(&bulk->b_conn, bulk->b_page);
+        /* Restore the filesystem journal context when we do the commit.
+         * This is needed for ext3 and reiserfs, but can't really hurt
+         * other filesystems.
+         */
+        journal_save = current->journal_info;
+        current->journal_info = bulk->b_desc->b_journal_info;
+        CDEBUG(D_BUFFS, "journal_info: saved %p->%p, restored %p\n", current,
+               journal_save, bulk->b_desc->b_journal_info);
+        rc = ost_commit_page(&bulk->b_desc->b_conn, bulk->b_page);
+        current->journal_info = journal_save;
+        CDEBUG(D_BUFFS, "journal_info: restored %p->%p\n", current,
+               journal_save);
         if (rc)
                 CERROR("ost_commit_page failed: %d\n", rc);
 
         RETURN(rc);
 }
 
+static int ost_brw_write_finished_cb(struct ptlrpc_bulk_desc *desc)
+{
+        ptlrpc_free_bulk(desc);
+
+        return 0;
+}
+
 static int ost_brw_write(struct ost_obd *obddev, struct ptlrpc_request *req)
 {
+        struct ptlrpc_bulk_desc *desc;
         struct obd_conn conn;
-        struct niobuf *nb, *res;
+        struct niobuf_remote *remote_nb;
+        struct niobuf_local *local_nb, *lnb;
         struct obd_ioobj *ioo;
         struct ost_body *body;
         int cmd, rc, i, j, objcount, niocount, size[2] = {sizeof(*body)};
@@ -390,7 +403,7 @@ static int ost_brw_write(struct ost_obd *obddev, struct ptlrpc_request *req)
         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
         end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2];
         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
-        niocount = req->rq_reqmsg->buflens[2] / sizeof(*nb);
+        niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb);
         cmd = body->data;
 
         conn.oc_id = body->connid;
@@ -403,54 +416,78 @@ static int ost_brw_write(struct ost_obd *obddev, struct ptlrpc_request *req)
                         break;
                 }
                 for (j = 0; j < ioo->ioo_bufcnt; j++)
-                        ost_unpack_niobuf((void *)&tmp2, &nb);
+                        ost_unpack_niobuf((void *)&tmp2, &remote_nb);
         }
 
-        size[1] = niocount * sizeof(*nb);
+        size[1] = niocount * sizeof(*remote_nb);
         rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg);
         if (rc)
-                RETURN(rc);
+                GOTO(fail, rc);
+        remote_nb = lustre_msg_buf(req->rq_repmsg, 1);
 
-        res = lustre_msg_buf(req->rq_repmsg, 1);
+        OBD_ALLOC(local_nb, niocount * sizeof(*local_nb));
+        if (local_nb == NULL)
+                GOTO(fail, rc = -ENOMEM);
 
         /* The unpackers move tmp1 and tmp2, so reset them before using */
         tmp1 = lustre_msg_buf(req->rq_reqmsg, 1);
         tmp2 = lustre_msg_buf(req->rq_reqmsg, 2);
         req->rq_status = obd_preprw(cmd, &conn, objcount,
-                                    tmp1, niocount, tmp2, res);
-
+                                    tmp1, niocount, tmp2, local_nb);
         if (req->rq_status)
-                GOTO(out, 0);
+                GOTO(success, 0);
+
+        desc = ptlrpc_prep_bulk(req->rq_connection);
+        if (desc == NULL)
+                GOTO(fail_preprw, rc = -ENOMEM);
+        desc->b_cb = ost_brw_write_finished_cb;
+        desc->b_portal = OSC_BULK_PORTAL;
+        memcpy(&(desc->b_conn), &conn, sizeof(conn));
+
+        /* Save journal context for commit callbacks */
+        CDEBUG(D_BUFFS, "journal_info: saved %p->%p\n", current,
+               current->journal_info);
+        desc->b_journal_info = current->journal_info;
 
-        for (i = 0; i < niocount; i++, res++) {
-                struct ptlrpc_bulk_desc *bulk;
+        for (i = 0, lnb = local_nb; i < niocount; i++, lnb++) {
                 struct ptlrpc_service *srv = req->rq_obd->u.ost.ost_service;
+                struct ptlrpc_bulk_page *bulk;
 
-                bulk = ptlrpc_prep_bulk(req->rq_connection);
+                bulk = ptlrpc_prep_bulk_page(desc);
                 if (bulk == NULL)
-                        GOTO(out, rc = -ENOMEM);
+                        GOTO(fail_bulk, rc = -ENOMEM);
 
                 spin_lock(&srv->srv_lock);
                 bulk->b_xid = srv->srv_xid++;
                 spin_unlock(&srv->srv_lock);
 
-                res->xid = HTON__u32(bulk->b_xid);
-
-                bulk->b_buf = (void *)(unsigned long)res->addr;
-                bulk->b_cb = ost_brw_write_cb;
-                bulk->b_page = res->page;
-                memcpy(&(bulk->b_conn), &conn, sizeof(conn));
+                bulk->b_buf = (void *)(unsigned long)lnb->addr;
+                bulk->b_page = lnb->page;
                 bulk->b_buflen = PAGE_SIZE;
-                bulk->b_portal = OSC_BULK_PORTAL;
-                rc = ptlrpc_register_bulk(bulk);
-                if (rc)
-                        GOTO(out, rc);
+                bulk->b_cb = ost_brw_write_cb;
+
+                /* this advances remote_nb */
+                ost_pack_niobuf((void **)&remote_nb, lnb->offset, lnb->len, 0,
+                                bulk->b_xid);
         }
 
+        rc = ptlrpc_register_bulk(desc);
+        current->journal_info = NULL; /* kind of scary */
+        if (rc)
+                GOTO(fail_bulk, rc);
+
         EXIT;
out:
-        /* FIXME: should we return 'rc' here? */
success:
+        OBD_FREE(local_nb, niocount * sizeof(*local_nb));
         return 0;
+
+ fail_bulk:
+        ptlrpc_free_bulk(desc);
+ fail_preprw:
+        OBD_FREE(local_nb, niocount * sizeof(*local_nb));
+        /* FIXME: how do we undo the preprw? */
+ fail:
+        return rc;
 }
 
 static int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req)
@@ -598,11 +635,9 @@ static int ost_setup(struct obd_device *obddev, obd_count len, void *buf)
         err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost");
         if (err)
                 GOTO(error_disc, err = -EINVAL);
-#if 0
         err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost");
         if (err)
                 GOTO(error_disc, err = -EINVAL);
-#endif
 
         RETURN(0);