Whamcloud - gitweb
merge b_devel into HEAD, which will become 0.7.3
[fs/lustre-release.git] / lustre / ost / ost_handler.c
index 023deb2..6801e92 100644 (file)
 #include <linux/lustre_export.h>
 #include <linux/init.h>
 #include <linux/lprocfs_status.h>
+#include <linux/lustre_commit_confd.h>
+#include <portals/list.h>
 
-inline void oti_init(struct obd_trans_info *oti,
-                           struct ptlrpc_request *req)
+void oti_init(struct obd_trans_info *oti, struct ptlrpc_request *req)
 {
-        if(oti == NULL)
+        if (oti == NULL)
                 return;
         memset(oti, 0, sizeof *oti);
 
-        
         if (req->rq_repmsg && req->rq_reqmsg != 0)
                 oti->oti_transno = req->rq_repmsg->transno;
-
-        EXIT;
 }
 
-inline void oti_to_request(struct obd_trans_info *oti,
-                           struct ptlrpc_request *req)
+void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
 {
-        int i;
         struct oti_req_ack_lock *ack_lock;
+        int i;
 
-        if(oti == NULL)
+        if (oti == NULL)
                 return;
 
         if (req->rq_repmsg)
@@ -75,7 +72,6 @@ inline void oti_to_request(struct obd_trans_info *oti,
                        sizeof(req->rq_ack_locks[i].lock));
                 req->rq_ack_locks[i].mode = ack_lock->mode;
         }
-        EXIT;
 }
 
 static int ost_destroy(struct ptlrpc_request *req, struct obd_trans_info *oti)
@@ -85,15 +81,16 @@ static int ost_destroy(struct ptlrpc_request *req, struct obd_trans_info *oti)
         int rc, size = sizeof(*body);
         ENTRY;
 
-        body = lustre_swab_reqbuf (req, 0, sizeof (*body),
-                                   lustre_swab_ost_body);
+        body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
         if (body == NULL)
-                RETURN (-EFAULT);
+                RETURN(-EFAULT);
 
         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
         if (rc)
                 RETURN(rc);
 
+        if (body->oa.o_valid & OBD_MD_FLCOOKIE)
+                oti->oti_logcookies = obdo_logcookie(&body->oa);
         req->rq_status = obd_destroy(conn, &body->oa, NULL, oti);
         RETURN(0);
 }
@@ -105,16 +102,15 @@ static int ost_getattr(struct ptlrpc_request *req)
         int rc, size = sizeof(*body);
         ENTRY;
 
-        body = lustre_swab_reqbuf (req, 0, sizeof (*body),
-                                   lustre_swab_ost_body);
+        body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
         if (body == NULL)
-                RETURN (-EFAULT);
+                RETURN(-EFAULT);
 
         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
         if (rc)
                 RETURN(rc);
 
-        repbody = lustre_msg_buf (req->rq_repmsg, 0, sizeof (*repbody));
+        repbody = lustre_msg_buf (req->rq_repmsg, 0, sizeof(*repbody));
         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
         req->rq_status = obd_getattr(conn, &repbody->oa, NULL);
         RETURN(0);
@@ -130,10 +126,9 @@ static int ost_statfs(struct ptlrpc_request *req)
         if (rc)
                 RETURN(rc);
 
-        osfs = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*osfs));
-        memset(osfs, 0, size);
+        osfs = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*osfs));
 
-        req->rq_status = obd_statfs(req->rq_export, osfs);
+        req->rq_status = obd_statfs(req->rq_export->exp_obd, osfs, jiffies-HZ);
         if (req->rq_status != 0)
                 CERROR("ost: statfs failed: rc %d\n", req->rq_status);
 
@@ -167,16 +162,15 @@ static int ost_open(struct ptlrpc_request *req, struct obd_trans_info *oti)
         int rc, size = sizeof(*repbody);
         ENTRY;
 
-        body = lustre_swab_reqbuf (req, 0, sizeof (*body),
-                                   lustre_swab_ost_body);
+        body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
         if (body == NULL)
-                return (-EFAULT);
+                RETURN(-EFAULT);
 
         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
         if (rc)
                 RETURN(rc);
 
-        repbody = lustre_msg_buf (req->rq_repmsg, 0, sizeof (*repbody));
+        repbody = lustre_msg_buf (req->rq_repmsg, 0, sizeof(*repbody));
         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
         req->rq_status = obd_open(conn, &repbody->oa, NULL, oti, NULL);
         RETURN(0);
@@ -189,16 +183,15 @@ static int ost_close(struct ptlrpc_request *req, struct obd_trans_info *oti)
         int rc, size = sizeof(*repbody);
         ENTRY;
 
-        body = lustre_swab_reqbuf (req, 0, sizeof (*body),
-                                   lustre_swab_ost_body);
+        body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
         if (body == NULL)
-                RETURN (-EFAULT);
+                RETURN(-EFAULT);
 
         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
         if (rc)
                 RETURN(rc);
 
-        repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*repbody));
+        repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
         req->rq_status = obd_close(conn, &repbody->oa, NULL, oti);
         RETURN(0);
@@ -211,18 +204,19 @@ static int ost_create(struct ptlrpc_request *req, struct obd_trans_info *oti)
         int rc, size = sizeof(*repbody);
         ENTRY;
 
-        body = lustre_swab_reqbuf (req, 0, sizeof (*body),
-                                   lustre_swab_ost_body);
+        body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
         if (body == NULL)
-                RETURN (-EFAULT);
+                RETURN(-EFAULT);
 
         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
         if (rc)
                 RETURN(rc);
 
-        repbody = lustre_msg_buf (req->rq_repmsg, 0, sizeof (*repbody));
+        repbody = lustre_msg_buf (req->rq_repmsg, 0, sizeof(*repbody));
         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
+        oti->oti_logcookies = obdo_logcookie(&repbody->oa);
         req->rq_status = obd_create(conn, &repbody->oa, NULL, oti);
+        //obd_log_cancel(conn, NULL, 1, oti->oti_logcookies, 0);
         RETURN(0);
 }
 
@@ -233,10 +227,9 @@ static int ost_punch(struct ptlrpc_request *req, struct obd_trans_info *oti)
         int rc, size = sizeof(*repbody);
         ENTRY;
 
-        body = lustre_swab_reqbuf (req, 0, sizeof (*body),
-                                   lustre_swab_ost_body);
+        body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
         if (body == NULL)
-                RETURN (-EFAULT);
+                RETURN(-EFAULT);
 
         if ((body->oa.o_valid & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) !=
             (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
@@ -246,7 +239,7 @@ static int ost_punch(struct ptlrpc_request *req, struct obd_trans_info *oti)
         if (rc)
                 RETURN(rc);
 
-        repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*repbody));
+        repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
         req->rq_status = obd_punch(conn, &repbody->oa, NULL, repbody->oa.o_size,
                                    repbody->oa.o_blocks, oti);
@@ -260,16 +253,15 @@ static int ost_setattr(struct ptlrpc_request *req, struct obd_trans_info *oti)
         int rc, size = sizeof(*repbody);
         ENTRY;
 
-        body = lustre_swab_reqbuf (req, 0, sizeof (*body),
-                                   lustre_swab_ost_body);
+        body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
         if (body == NULL)
-                RETURN (-EFAULT);
+                RETURN(-EFAULT);
 
         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
         if (rc)
                 RETURN(rc);
 
-        repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*repbody));
+        repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
 
         req->rq_status = obd_setattr(conn, &repbody->oa, NULL, oti);
@@ -285,9 +277,9 @@ static int ost_bulk_timeout(void *data)
         RETURN(1);
 }
 
-static int get_per_page_niobufs (struct obd_ioobj *ioo, int nioo,
-                                 struct niobuf_remote *rnb, int nrnb,
-                                 struct niobuf_remote **pp_rnbp)
+static int get_per_page_niobufs(struct obd_ioobj *ioo, int nioo,
+                                struct niobuf_remote *rnb, int nrnb,
+                                struct niobuf_remote **pp_rnbp)
 {
         /* Copy a remote niobuf, splitting it into page-sized chunks
          * and setting ioo[i].ioo_bufcnt accordingly */
@@ -305,14 +297,14 @@ static int get_per_page_niobufs (struct obd_ioobj *ioo, int nioo,
                         obd_off p0 = offset >> PAGE_SHIFT;
                         obd_off pn = (offset + rnb[rnbidx].len - 1)>>PAGE_SHIFT;
 
-                        LASSERT (rnbidx < nrnb);
+                        LASSERT(rnbidx < nrnb);
 
                         npages += (pn + 1 - p0);
 
                         if (rnb[rnbidx].len == 0) {
                                 CERROR("zero len BRW: obj %d objid "LPX64
                                        " buf %u\n", i, ioo[i].ioo_id, j);
-                                return (-EINVAL);
+                                return -EINVAL;
                         }
                         if (j > 0 &&
                             rnb[rnbidx].offset <= rnb[rnbidx-1].offset) {
@@ -320,20 +312,20 @@ static int get_per_page_niobufs (struct obd_ioobj *ioo, int nioo,
                                        " buf %u offset "LPX64" <= "LPX64"\n",
                                        i, ioo[i].ioo_id, j, rnb[rnbidx].offset,
                                        rnb[rnbidx].offset);
-                                return (-EINVAL);
+                                return -EINVAL;
                         }
                 }
 
-        LASSERT (rnbidx == nrnb);
+        LASSERT(rnbidx == nrnb);
 
         if (npages == nrnb) {       /* all niobufs are for single pages */
                 *pp_rnbp = rnb;
-                return (npages);
+                return npages;
         }
 
-        OBD_ALLOC (pp_rnb, sizeof (*pp_rnb) * npages);
+        OBD_ALLOC(pp_rnb, sizeof(*pp_rnb) * npages);
         if (pp_rnb == NULL)
-                return (-ENOMEM);
+                return -ENOMEM;
 
         /* now do the actual split */
         page = rnbidx = 0;
@@ -344,35 +336,35 @@ static int get_per_page_niobufs (struct obd_ioobj *ioo, int nioo,
                         obd_off off = rnb[rnbidx].offset;
                         int     nob = rnb[rnbidx].len;
 
-                        LASSERT (rnbidx < nrnb);
+                        LASSERT(rnbidx < nrnb);
                         do {
                                 obd_off  poff = off & (PAGE_SIZE - 1);
                                 int      pnob = (poff + nob > PAGE_SIZE) ?
                                                 PAGE_SIZE - poff : nob;
 
-                                LASSERT (page < npages);
+                                LASSERT(page < npages);
                                 pp_rnb[page].len = pnob;
                                 pp_rnb[page].offset = off;
                                 pp_rnb[page].flags = rnb->flags;
 
-                                CDEBUG (D_PAGE, "   obj %d id "LPX64
-                                        "page %d(%d) "LPX64" for %d\n",
-                                        i, ioo[i].ioo_id, obj_pages, page,
-                                        pp_rnb[page].offset, pp_rnb[page].len);
+                                CDEBUG(D_PAGE, "   obj %d id "LPX64
+                                       "page %d(%d) "LPX64" for %d\n",
+                                       i, ioo[i].ioo_id, obj_pages, page,
+                                       pp_rnb[page].offset, pp_rnb[page].len);
                                 page++;
                                 obj_pages++;
 
                                 off += pnob;
                                 nob -= pnob;
                         } while (nob > 0);
-                        LASSERT (nob == 0);
+                        LASSERT(nob == 0);
                 }
                 ioo[i].ioo_bufcnt = obj_pages;
         }
-        LASSERT (page == npages);
+        LASSERT(page == npages);
 
         *pp_rnbp = pp_rnb;
-        return (npages);
+        return npages;
 }
 
 static void free_per_page_niobufs (int npages, struct niobuf_remote *pp_rnb,
@@ -381,23 +373,19 @@ static void free_per_page_niobufs (int npages, struct niobuf_remote *pp_rnb,
         if (pp_rnb == rnb)                      /* didn't allocate above */
                 return;
 
-        OBD_FREE (pp_rnb, sizeof (*pp_rnb) * npages);
+        OBD_FREE(pp_rnb, sizeof(*pp_rnb) * npages);
 }
 
 #if CHECKSUM_BULK
 __u64 ost_checksum_bulk (struct ptlrpc_bulk_desc *desc)
 {
         __u64             cksum = 0;
-        struct list_head *tmp;
-        char             *ptr;
+        struct ptlrpc_bulk_page *bp;
 
-        list_for_each (tmp, &desc->bd_page_list) {
-                struct ptlrpc_bulk_page *bp;
-
-                bp = list_entry (tmp, struct ptlrpc_bulk_page, bp_link);
-                ptr = kmap (bp->bp_page);
-                ost_checksum (&cksum, ptr + bp->bp_pageoffset, bp->bp_buflen);
-                kunmap (bp->bp_page);
+        list_for_each_entry(bp, &desc->bd_page_list, bp_link) {
+                ost_checksum(&cksum, kmap(bp->bp_page) + bp->bp_pageoffset,
+                             bp->bp_buflen);
+                kunmap(bp->bp_page);
         }
 }
 #endif
@@ -409,9 +397,9 @@ static int ost_brw_read(struct ptlrpc_request *req)
         struct niobuf_remote    *pp_rnb;
         struct niobuf_local     *local_nb;
         struct obd_ioobj        *ioo;
-        struct ost_body         *body;
+        struct ost_body         *body, *repbody;
         struct l_wait_info       lwi;
-        void                    *desc_priv = NULL;
+        struct obd_trans_info    oti = { 0 };
         int                      size[1] = { sizeof(*body) };
         int                      comms_error = 0;
         int                      niocount;
@@ -426,35 +414,36 @@ static int ost_brw_read(struct ptlrpc_request *req)
 
         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
         if (body == NULL) {
-                CERROR ("Missing/short ost_body\n");
-                GOTO (out, rc = -EFAULT);
+                CERROR("Missing/short ost_body\n");
+                GOTO(out, rc = -EFAULT);
         }
 
-        ioo = lustre_swab_reqbuf (req, 1, sizeof (*ioo),
-                                  lustre_swab_obd_ioobj);
+        ioo = lustre_swab_reqbuf(req, 1, sizeof(*ioo), lustre_swab_obd_ioobj);
         if (ioo == NULL) {
-                CERROR ("Missing/short ioobj\n");
-                GOTO (out, rc = -EFAULT);
+                CERROR("Missing/short ioobj\n");
+                GOTO(out, rc = -EFAULT);
         }
 
         niocount = ioo->ioo_bufcnt;
-        remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof (*remote_nb),
+        remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof(*remote_nb),
                                        lustre_swab_niobuf_remote);
         if (remote_nb == NULL) {
-                CERROR ("Missing/short niobuf\n");
-                GOTO (out, rc = -EFAULT);
+                CERROR("Missing/short niobuf\n");
+                GOTO(out, rc = -EFAULT);
         }
-        if (lustre_msg_swabbed (req->rq_reqmsg)) { /* swab remaining niobufs */
+        if (lustre_msg_swabbed(req->rq_reqmsg)) { /* swab remaining niobufs */
                 for (i = 1; i < niocount; i++)
                         lustre_swab_niobuf_remote (&remote_nb[i]);
         }
 
+        size[0] = sizeof(*body);
         rc = lustre_pack_msg(1, size, NULL, &req->rq_replen, &req->rq_repmsg);
         if (rc)
                 GOTO(out, rc);
 
+        /* FIXME all niobuf splitting should be done in obdfilter if needed */
         /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
-        npages = get_per_page_niobufs (ioo, 1, remote_nb, niocount, &pp_rnb);
+        npages = get_per_page_niobufs(ioo, 1, remote_nb, niocount, &pp_rnb);
         if (npages < 0)
                 GOTO(out, rc = npages);
 
@@ -462,12 +451,12 @@ static int ost_brw_read(struct ptlrpc_request *req)
         if (local_nb == NULL)
                 GOTO(out_pp_rnb, rc = -ENOMEM);
 
-        desc = ptlrpc_prep_bulk_exp (req, BULK_PUT_SOURCE, OST_BULK_PORTAL);
+        desc = ptlrpc_prep_bulk_exp(req, BULK_PUT_SOURCE, OST_BULK_PORTAL);
         if (desc == NULL)
                 GOTO(out_local, rc = -ENOMEM);
 
-        rc = obd_preprw(OBD_BRW_READ, req->rq_export, NULL, 1, ioo, npages,
-                        pp_rnb, local_nb, &desc_priv, NULL);
+        rc = obd_preprw(OBD_BRW_READ, req->rq_export, &body->oa, 1,
+                        ioo, npages, pp_rnb, local_nb, &oti);
         if (rc != 0)
                 GOTO(out_bulk, rc);
 
@@ -480,7 +469,7 @@ static int ost_brw_read(struct ptlrpc_request *req)
                         break;
                 }
 
-                LASSERT (page_rc <= pp_rnb[i].len);
+                LASSERT(page_rc <= pp_rnb[i].len);
                 nob += page_rc;
                 if (page_rc != 0) {             /* some data! */
                         LASSERT (local_nb[i].page != NULL);
@@ -493,8 +482,8 @@ static int ost_brw_read(struct ptlrpc_request *req)
 
                 if (page_rc != pp_rnb[i].len) { /* short read */
                         /* All subsequent pages should be 0 */
-                        while (++i < npages)
-                                LASSERT (local_nb[i].rc == 0);
+                        while(++i < npages)
+                                LASSERT(local_nb[i].rc == 0);
                         break;
                 }
         }
@@ -509,7 +498,7 @@ static int ost_brw_read(struct ptlrpc_request *req)
                         if (rc) {
                                 LASSERT(rc == -ETIMEDOUT);
                                 CERROR ("timeout waiting for bulk PUT\n");
-                                ptlrpc_abort_bulk (desc);
+                                ptlrpc_abort_bulk(desc);
                         }
                 } else {
                         CERROR("ptlrpc_bulk_put failed RC: %d\n", rc);
@@ -518,25 +507,27 @@ static int ost_brw_read(struct ptlrpc_request *req)
         }
 
         /* Must commit after prep above in all cases */
-        rc = obd_commitrw(OBD_BRW_READ, req->rq_export, 1, ioo, npages,
-                          local_nb, desc_priv, NULL);
+        rc = obd_commitrw(OBD_BRW_READ, req->rq_export, &body->oa, 1,
+                          ioo, npages, local_nb, &oti);
+
+        repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
+        memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
 
 #if CHECKSUM_BULK
         if (rc == 0) {
-                body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
-                body->oa.o_rdev = ost_checksum_bulk (desc);
-                body->oa.o_valid |= OBD_MD_FLCKSUM;
+                repbody->oa.o_rdev = ost_checksum_bulk(desc);
+                repbody->oa.o_valid |= OBD_MD_FLCKSUM;
         }
 #endif
 
  out_bulk:
-        ptlrpc_free_bulk (desc);
+        ptlrpc_free_bulk(desc);
  out_local:
         OBD_FREE(local_nb, sizeof(*local_nb) * npages);
  out_pp_rnb:
-        free_per_page_niobufs (npages, pp_rnb, remote_nb);
+        free_per_page_niobufs(npages, pp_rnb, remote_nb);
  out:
-        LASSERT (rc <= 0);
+        LASSERT(rc <= 0);
         if (rc == 0) {
                 req->rq_status = nob;
                 ptlrpc_reply(req);
@@ -547,7 +538,7 @@ static int ost_brw_read(struct ptlrpc_request *req)
         } else {
                 if (req->rq_repmsg != NULL) {
                         /* reply out callback would free */
-                        OBD_FREE (req->rq_repmsg, req->rq_replen);
+                        OBD_FREE(req->rq_repmsg, req->rq_replen);
                 }
                 CERROR("bulk IO comms error: evicting %s@%s nid "LPU64"\n",
                        req->rq_export->exp_client_uuid.uuid,
@@ -566,11 +557,10 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
         struct niobuf_remote    *pp_rnb;
         struct niobuf_local     *local_nb;
         struct obd_ioobj        *ioo;
-        struct ost_body         *body;
+        struct ost_body         *body, *repbody;
         struct l_wait_info       lwi;
-        void                    *desc_priv = NULL;
         __u32                   *rcs;
-        int                      size[2] = { sizeof (*body) };
+        int                      size[2] = { sizeof(*body) };
         int                      objcount, niocount, npages;
         int                      comms_error = 0;
         int                      rc, rc2, swab, i, j;
@@ -580,39 +570,38 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
                 GOTO(out, rc = -EIO);
 
         /* pause before transaction has been started */
-        OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK | OBD_FAIL_ONCE, 
+        OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK | OBD_FAIL_ONCE,
                          obd_timeout +1);
 
-        swab = lustre_msg_swabbed (req->rq_reqmsg);
-        body = lustre_swab_reqbuf (req, 0, sizeof (*body),
-                                   lustre_swab_ost_body);
+        swab = lustre_msg_swabbed(req->rq_reqmsg);
+        body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
         if (body == NULL) {
-                CERROR ("Missing/short ost_body\n");
+                CERROR("Missing/short ost_body\n");
                 GOTO(out, rc = -EFAULT);
         }
 
-        LASSERT_REQSWAB (req, 1);
+        LASSERT_REQSWAB(req, 1);
         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
         if (objcount == 0) {
-                CERROR ("Missing/short ioobj\n");
-                GOTO (out, rc = -EFAULT);
+                CERROR("Missing/short ioobj\n");
+                GOTO(out, rc = -EFAULT);
         }
-        ioo = lustre_msg_buf (req->rq_reqmsg, 1, objcount * sizeof (*ioo));
+        ioo = lustre_msg_buf (req->rq_reqmsg, 1, objcount * sizeof(*ioo));
         LASSERT (ioo != NULL);
         for (niocount = i = 0; i < objcount; i++) {
                 if (swab)
                         lustre_swab_obd_ioobj (&ioo[i]);
                 if (ioo[i].ioo_bufcnt == 0) {
-                        CERROR ("ioo[%d] has zero bufcnt\n", i);
-                        GOTO (out, rc = -EFAULT);
+                        CERROR("ioo[%d] has zero bufcnt\n", i);
+                        GOTO(out, rc = -EFAULT);
                 }
                 niocount += ioo[i].ioo_bufcnt;
         }
 
-        remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof (*remote_nb),
+        remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof(*remote_nb),
                                        lustre_swab_niobuf_remote);
         if (remote_nb == NULL) {
-                CERROR ("Missing/short niobuf\n");
+                CERROR("Missing/short niobuf\n");
                 GOTO(out, rc = -EFAULT);
         }
         if (swab) {                             /* swab the remaining niobufs */
@@ -620,30 +609,31 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
                         lustre_swab_niobuf_remote (&remote_nb[i]);
         }
 
-        size[1] = niocount * sizeof (*rcs);
+        size[1] = niocount * sizeof(*rcs);
         rc = lustre_pack_msg(2, size, NULL, &req->rq_replen,
                              &req->rq_repmsg);
         if (rc != 0)
-                GOTO (out, rc);
-        rcs = lustre_msg_buf (req->rq_repmsg, 1, niocount * sizeof (*rcs));
+                GOTO(out, rc);
+        rcs = lustre_msg_buf(req->rq_repmsg, 1, niocount * sizeof(*rcs));
 
+        /* FIXME all niobuf splitting should be done in obdfilter if needed */
         /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
         npages = get_per_page_niobufs(ioo, objcount,remote_nb,niocount,&pp_rnb);
         if (npages < 0)
-                GOTO (out, rc = npages);
+                GOTO(out, rc = npages);
 
         OBD_ALLOC(local_nb, sizeof(*local_nb) * npages);
         if (local_nb == NULL)
                 GOTO(out_pp_rnb, rc = -ENOMEM);
 
-        desc = ptlrpc_prep_bulk_exp (req, BULK_GET_SINK, OST_BULK_PORTAL);
+        desc = ptlrpc_prep_bulk_exp(req, BULK_GET_SINK, OST_BULK_PORTAL);
         if (desc == NULL)
                 GOTO(out_local, rc = -ENOMEM);
 
-        rc = obd_preprw(OBD_BRW_WRITE, req->rq_export, NULL, objcount, ioo,
-                        npages, pp_rnb, local_nb, &desc_priv, oti);
+        rc = obd_preprw(OBD_BRW_WRITE, req->rq_export, &body->oa, objcount,
+                        ioo, npages, pp_rnb, local_nb, oti);
         if (rc != 0)
-                GOTO (out_bulk, rc);
+                GOTO(out_bulk, rc);
 
         /* NB Having prepped, we must commit... */
 
@@ -664,8 +654,8 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
                                           ptlrpc_bulk_complete(desc), &lwi);
                         if (rc) {
                                 LASSERT(rc == -ETIMEDOUT);
-                                CERROR ("timeout waiting for bulk GET\n");
-                                ptlrpc_abort_bulk (desc);
+                                CERROR("timeout waiting for bulk GET\n");
+                                ptlrpc_abort_bulk(desc);
                         }
                 } else {
                        CERROR("ptlrpc_bulk_get failed RC: %d\n", rc);
@@ -673,17 +663,21 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
                comms_error = rc != 0;
         }
 
+        repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
+        memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
+
 #if CHECKSUM_BULK
         if (rc == 0 && (body->oa.o_valid & OBD_MD_FLCKSUM) != 0) {
                 static int cksum_counter;
                 __u64 client_cksum = body->oa.o_rdev;
-                __u64 cksum = ost_checksum_bulk (desc);
+                __u64 cksum = ost_checksum_bulk(desc);
 
                 if (client_cksum != cksum) {
                         CERROR("Bad checksum: client "LPX64", server "LPX64
                                ", client NID "LPX64"\n", client_cksum, cksum,
                                req->rq_connection->c_peer.peer_nid);
                         cksum_counter = 1;
+                        repbody->oa.o_rdev = cksum;
                 } else {
                         cksum_counter++;
                         if ((cksum_counter & (-cksum_counter)) == cksum_counter)
@@ -695,8 +689,8 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
         }
 #endif
         /* Must commit after prep above in all cases */
-        rc2 = obd_commitrw(OBD_BRW_WRITE, req->rq_export, objcount, ioo,
-                           npages, local_nb, desc_priv, oti);
+        rc2 = obd_commitrw(OBD_BRW_WRITE, req->rq_export, &repbody->oa,
+                           objcount, ioo, npages, local_nb, oti);
 
         if (rc == 0) {
                 /* set per-requested niobuf return codes */
@@ -705,25 +699,25 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
 
                         rcs[i] = 0;
                         do {
-                                LASSERT (j < npages);
+                                LASSERT(j < npages);
                                 if (local_nb[j].rc < 0)
                                         rcs[i] = local_nb[j].rc;
                                 nob -= pp_rnb[j].len;
                                 j++;
                         } while (nob > 0);
-                        LASSERT (nob == 0);
+                        LASSERT(nob == 0);
                 }
-                LASSERT (j == npages);
+                LASSERT(j == npages);
         }
         if (rc == 0)
                 rc = rc2;
 
  out_bulk:
-        ptlrpc_free_bulk (desc);
+        ptlrpc_free_bulk(desc);
  out_local:
         OBD_FREE(local_nb, sizeof(*local_nb) * npages);
  out_pp_rnb:
-        free_per_page_niobufs (npages, pp_rnb, remote_nb);
+        free_per_page_niobufs(npages, pp_rnb, remote_nb);
  out:
         if (rc == 0) {
                 oti_to_request(oti, req);
@@ -748,10 +742,9 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
 
 static int ost_san_brw(struct ptlrpc_request *req, int cmd)
 {
-        struct lustre_handle *conn = &req->rq_reqmsg->handle;
         struct niobuf_remote *remote_nb, *res_nb;
         struct obd_ioobj *ioo;
-        struct ost_body *body;
+        struct ost_body *body, *repbody;
         int rc, i, j, objcount, niocount, size[2] = {sizeof(*body)};
         int n;
         int swab;
@@ -759,19 +752,17 @@ static int ost_san_brw(struct ptlrpc_request *req, int cmd)
 
         /* XXX not set to use latest protocol */
 
-        swab = lustre_msg_swabbed (req->rq_reqmsg);
-        body = lustre_swab_reqbuf (req, 0, sizeof (*body),
-                                   lustre_swab_ost_body);
+        swab = lustre_msg_swabbed(req->rq_reqmsg);
+        body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
         if (body == NULL) {
-                CERROR ("Missing/short ost_body\n");
-                GOTO (out, rc = -EFAULT);
+                CERROR("Missing/short ost_body\n");
+                GOTO(out, rc = -EFAULT);
         }
 
-        ioo = lustre_swab_reqbuf(req, 1, sizeof (*ioo),
-                                 lustre_swab_obd_ioobj);
+        ioo = lustre_swab_reqbuf(req, 1, sizeof(*ioo), lustre_swab_obd_ioobj);
         if (ioo == NULL) {
-                CERROR ("Missing/short ioobj\n");
-                GOTO (out, rc = -EFAULT);
+                CERROR("Missing/short ioobj\n");
+                GOTO(out, rc = -EFAULT);
         }
         objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo);
         niocount = ioo[0].ioo_bufcnt;
@@ -781,11 +772,11 @@ static int ost_san_brw(struct ptlrpc_request *req, int cmd)
                 niocount += ioo[i].ioo_bufcnt;
         }
 
-        remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof (*remote_nb),
+        remote_nb = lustre_swab_reqbuf(req, 2, niocount * sizeof(*remote_nb),
                                        lustre_swab_niobuf_remote);
         if (remote_nb == NULL) {
-                CERROR ("Missing/short niobuf\n");
-                GOTO (out, rc = -EFAULT);
+                CERROR("Missing/short niobuf\n");
+                GOTO(out, rc = -EFAULT);
         }
         if (swab) {                             /* swab the remaining niobufs */
                 for (i = 1; i < niocount; i++)
@@ -814,14 +805,17 @@ static int ost_san_brw(struct ptlrpc_request *req, int cmd)
         if (rc)
                 GOTO(out, rc);
 
-        req->rq_status = obd_san_preprw(cmd, conn, objcount, ioo,
-                                        niocount, remote_nb);
+        req->rq_status = obd_san_preprw(cmd, req->rq_export, &body->oa,
+                                        objcount, ioo, niocount, remote_nb);
 
         if (req->rq_status)
-                GOTO (out, rc = 0);
+                GOTO(out, rc = 0);
+
+        repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
+        memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
 
         res_nb = lustre_msg_buf(req->rq_repmsg, 1, size[1]);
-        memcpy (res_nb, remote_nb, size[1]);
+        memcpy(res_nb, remote_nb, size[1]);
         rc = 0;
 out:
         if (rc) {
@@ -835,6 +829,57 @@ out:
         return rc;
 }
 
+static int ost_log_cancel(struct ptlrpc_request *req)
+{
+        struct lustre_handle *conn;
+        struct llog_cookie *logcookies;
+        int num_cookies, rc = 0;
+        ENTRY;
+
+        logcookies = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*logcookies));
+        if (logcookies == NULL) {
+                DEBUG_REQ(D_HA, req, "no cookies sent");
+                RETURN(-EFAULT);
+        }
+        num_cookies = req->rq_reqmsg->buflens[0] / sizeof(*logcookies);
+
+        /* workaround until we don't need to send replies */
+        rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
+        if (rc)
+                RETURN(rc);
+        req->rq_repmsg->status = 0;
+        /* end workaround */
+
+        conn = (struct lustre_handle *)&req->rq_reqmsg->handle;
+        rc = obd_log_cancel(conn, NULL, num_cookies, logcookies, 0);
+
+        RETURN(rc);
+}
+
+static int ost_set_info(struct ptlrpc_request *req)
+{
+        struct lustre_handle *conn;
+        char *key;
+        int keylen, rc = 0;
+        ENTRY;
+
+        key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
+        if (key == NULL) {
+                DEBUG_REQ(D_HA, req, "no set_info key");
+                RETURN(-EFAULT);
+        }
+        keylen = req->rq_reqmsg->buflens[0];
+
+        rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
+        if (rc)
+                RETURN(rc);
+
+        conn = (struct lustre_handle *)&req->rq_reqmsg->handle;
+        rc = obd_set_info(conn, keylen, key, 0, NULL);
+        req->rq_repmsg->status = 0;
+        RETURN(rc);
+}
+
 static int filter_recovery_request(struct ptlrpc_request *req,
                                    struct obd_device *obd, int *process)
 {
@@ -850,9 +895,10 @@ static int filter_recovery_request(struct ptlrpc_request *req,
         case OST_DESTROY:
         case OST_OPEN:
         case OST_PUNCH:
-        case OST_SETATTR: 
+        case OST_SETATTR:
         case OST_SYNCFS:
         case OST_WRITE:
+        case OBD_LOG_CANCEL:
         case LDLM_ENQUEUE:
                 *process = target_queue_recovery_request(req, obd);
                 RETURN(0);
@@ -881,7 +927,7 @@ static int ost_handle(struct ptlrpc_request *req)
                 int abort_recovery, recovering;
 
                 if (req->rq_export == NULL) {
-                        CERROR("lustre_ost: operation %d on unconnected OST\n",
+                        CDEBUG(D_HA, "operation %d on unconnected OST\n",
                                req->rq_reqmsg->opc);
                         req->rq_status = -ENOTCONN;
                         GOTO(out, rc = -ENOTCONN);
@@ -901,7 +947,7 @@ static int ost_handle(struct ptlrpc_request *req)
                         if (rc || !should_process)
                                 RETURN(rc);
                 }
-        } 
+        }
 
         if (strcmp(req->rq_obd->obd_type->typ_name, "ost") != 0)
                 GOTO(out, rc = -EINVAL);
@@ -988,10 +1034,18 @@ static int ost_handle(struct ptlrpc_request *req)
                 OBD_FAIL_RETURN(OBD_FAIL_OST_SYNCFS_NET, 0);
                 rc = ost_syncfs(req);
                 break;
+        case OST_SET_INFO:
+                DEBUG_REQ(D_INODE, req, "set_info");
+                rc = ost_set_info(req);
         case OBD_PING:
                 DEBUG_REQ(D_INODE, req, "ping");
                 rc = target_handle_ping(req);
                 break;
+        case OBD_LOG_CANCEL:
+                CDEBUG(D_INODE, "log cancel\n");
+                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
+                rc = ost_log_cancel(req);
+                break;
         case LDLM_ENQUEUE:
                 CDEBUG(D_INODE, "enqueue\n");
                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
@@ -1058,17 +1112,22 @@ out:
 static int ost_setup(struct obd_device *obddev, obd_count len, void *buf)
 {
         struct ost_obd *ost = &obddev->u.ost;
-        int err;
-        int i;
+        int err, i;
         ENTRY;
 
+#ifdef ENABLE_ORPHANS
+        err = llog_start_commit_thread();
+        if (err < 0)
+                RETURN(err);
+#endif
+
         ost->ost_service = ptlrpc_init_svc(OST_NEVENTS, OST_NBUFS,
                                            OST_BUFSIZE, OST_MAXREQSIZE,
                                            OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
                                            ost_handle, "ost", obddev);
         if (!ost->ost_service) {
                 CERROR("failed to start service\n");
-                GOTO(error_disc, err = -ENOMEM);
+                RETURN(-ENOMEM);
         }
 
         for (i = 0; i < OST_NUM_THREADS; i++) {
@@ -1077,17 +1136,14 @@ static int ost_setup(struct obd_device *obddev, obd_count len, void *buf)
                 err = ptlrpc_start_thread(obddev, ost->ost_service, name);
                 if (err) {
                         CERROR("error starting thread #%d: rc %d\n", i, err);
-                        GOTO(error_disc, err = -EINVAL);
+                        RETURN(-EINVAL);
                 }
         }
 
         RETURN(0);
-
-error_disc:
-        RETURN(err);
 }
 
-static int ost_cleanup(struct obd_device *obddev, int force, int failover)
+static int ost_cleanup(struct obd_device *obddev, int flags)
 {
         struct ost_obd *ost = &obddev->u.ost;
         int err = 0;
@@ -1106,7 +1162,7 @@ int ost_attach(struct obd_device *dev, obd_count len, void *data)
 {
         struct lprocfs_static_vars lvars;
 
-        lprocfs_init_vars(&lvars);
+        lprocfs_init_vars(ost,&lvars);
         return lprocfs_obd_attach(dev, lvars.obd_vars);
 }
 
@@ -1115,7 +1171,7 @@ int ost_detach(struct obd_device *dev)
         return lprocfs_obd_detach(dev);
 }
 
-/* I don't think this function is ever used, since nothing 
+/* I don't think this function is ever used, since nothing
  * connects directly to this module.
  */
 static int ost_connect(struct lustre_handle *conn,
@@ -1153,12 +1209,12 @@ static int __init ost_init(void)
         struct lprocfs_static_vars lvars;
         ENTRY;
 
-        lprocfs_init_vars(&lvars);
+        lprocfs_init_vars(ost,&lvars);
         RETURN(class_register_type(&ost_obd_ops, lvars.module_vars,
                                    LUSTRE_OST_NAME));
 }
 
-static void __exit ost_exit(void)
+static void /*__exit*/ ost_exit(void)
 {
         class_unregister_type(LUSTRE_OST_NAME);
 }