Whamcloud - gitweb
land 0.5.20.3 b_devel onto HEAD (b_devel will remain)
[fs/lustre-release.git] / lustre / osc / osc_request.c
index 1abd150..ea205a6 100644 (file)
@@ -29,6 +29,7 @@
 #define EXPORT_SYMTAB
 #define DEBUG_SUBSYSTEM S_OSC
 
+#ifdef __KERNEL__
 #include <linux/version.h>
 #include <linux/module.h>
 #include <linux/mm.h>
 #include <linux/lustre_dlm.h>
 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
 #include <linux/workqueue.h>
+#include <linux/smp_lock.h>
+#else
+#include <linux/locks.h>
 #endif
+#else
+#include <liblustre.h>
+#endif
+
 #include <linux/kp30.h>
 #include <linux/lustre_mds.h> /* for mds_objid */
 #include <linux/obd_ost.h>
 #include <portals/lib-types.h> /* for PTL_MD_MAX_IOV */
 #include <linux/lprocfs_status.h>
 
+/* It is important that ood_fh remain the first item in this structure: that
+ * way, we don't have to re-pack the obdo's inline data before we send it to
+ * the server, we can just send the whole struct unaltered. */
+#define OSC_OBDO_DATA_MAGIC 0xD15EA5ED
+struct osc_obdo_data {
+        struct lustre_handle ood_fh;
+        struct ptlrpc_request *ood_request;
+        __u32 ood_magic;
+};
+#include <linux/obd_lov.h> /* just for the startup assertion; is that wrong? */
+
+static int send_sync(struct obd_import *imp, struct ll_fid *rootfid,
+                          int level, int msg_flags)
+{
+        struct ptlrpc_request *req;
+        struct mds_body *body;
+        int rc, size = sizeof(*body);
+        ENTRY;
+
+        req = ptlrpc_prep_req(imp, OST_SYNCFS, 1, &size, NULL);
+        if (!req)
+                GOTO(out, rc = -ENOMEM);
+
+        body = lustre_msg_buf(req->rq_reqmsg, 0);
+        req->rq_level = level;
+        req->rq_replen = lustre_msg_size(1, &size);
+
+        req->rq_reqmsg->flags |= msg_flags;
+        rc = ptlrpc_queue_wait(req);
+
+        if (!rc) {
+                CDEBUG(D_NET, "last_committed="LPU64
+                       ", last_xid="LPU64"\n",
+                       req->rq_repmsg->last_committed,
+                       req->rq_repmsg->last_xid);
+        }
+
+        EXIT;
+ out:
+        ptlrpc_req_finished(req);
+        return rc;
+}
+
+static int signal_completed_replay(struct obd_import *imp)
+{
+        struct ll_fid fid;
+
+        return send_sync(imp, &fid, LUSTRE_CONN_RECOVD, MSG_LAST_REPLAY);
+}
+
 static int osc_attach(struct obd_device *dev, obd_count len, void *data)
 {
         struct lprocfs_static_vars lvars;
@@ -123,7 +181,8 @@ static int osc_unpackmd(struct lustre_handle *conn, struct lov_stripe_md **lsmp,
         RETURN(lsm_size);
 }
 
-inline void oti_from_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
+inline void oti_from_request(struct obd_trans_info *oti,
+                             struct ptlrpc_request *req)
 {
         if (oti && req->rq_repmsg)
                 oti->oti_transno = NTOH__u64(req->rq_repmsg->transno);
@@ -178,7 +237,7 @@ static int osc_open(struct lustre_handle *conn, struct obdo *oa,
         if (!request)
                 RETURN(-ENOMEM);
 
-#warning FIXME: request->rq_flags |= PTL_RPC_FL_REPLAY;
+        request->rq_flags |= PTL_RPC_FL_REPLAY;
         body = lustre_msg_buf(request->rq_reqmsg, 0);
 #warning FIXME: pack only valid fields instead of memcpy, endianness
         memcpy(&body->oa, oa, sizeof(*oa));
@@ -189,11 +248,28 @@ static int osc_open(struct lustre_handle *conn, struct obdo *oa,
         if (rc)
                 GOTO(out, rc);
 
-        body = lustre_msg_buf(request->rq_repmsg, 0);
-        CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
-        if (oa)
+        if (oa) {
+                struct osc_obdo_data ood;
+                body = lustre_msg_buf(request->rq_repmsg, 0);
                 memcpy(oa, &body->oa, sizeof(*oa));
 
+                /* If the open succeeded, we better have a handle */
+                /* BlueArc OSTs don't send back (o_valid | FLHANDLE).  sigh.
+                 * Temporary workaround until fixed. -phil 24 Feb 03 */
+                //LASSERT(oa->o_valid & OBD_MD_FLHANDLE);
+                oa->o_valid |= OBD_MD_FLHANDLE;
+
+                memcpy(&ood.ood_fh, obdo_handle(oa), sizeof(ood.ood_fh));
+                ood.ood_request = ptlrpc_request_addref(request);
+                ood.ood_magic = OSC_OBDO_DATA_MAGIC;
+
+                /* Save this data in the request; it will be passed back to us
+                 * in future obdos.  This memcpy is guaranteed to be safe,
+                 * because we check at compile-time that sizeof(ood) is smaller
+                 * than oa->o_inline. */
+                memcpy(&oa->o_inline, &ood, sizeof(ood));
+        }
+
         EXIT;
  out:
         ptlrpc_req_finished(request);
@@ -203,13 +279,19 @@ static int osc_open(struct lustre_handle *conn, struct obdo *oa,
 static int osc_close(struct lustre_handle *conn, struct obdo *oa,
                      struct lov_stripe_md *md, struct obd_trans_info *oti)
 {
+        struct obd_import *import = class_conn2cliimp(conn);
         struct ptlrpc_request *request;
         struct ost_body *body;
+        struct osc_obdo_data *ood;
+        unsigned long flags;
         int rc, size = sizeof(*body);
         ENTRY;
 
-        request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CLOSE, 1, &size,
-                                  NULL);
+        LASSERT(oa != NULL);
+        ood = (struct osc_obdo_data *)&oa->o_inline;
+        LASSERT(ood->ood_magic == OSC_OBDO_DATA_MAGIC);
+
+        request = ptlrpc_prep_req(import, OST_CLOSE, 1, &size, NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -220,13 +302,30 @@ static int osc_close(struct lustre_handle *conn, struct obdo *oa,
         request->rq_replen = lustre_msg_size(1, &size);
 
         rc = ptlrpc_queue_wait(request);
-        if (rc)
+        if (rc) {
+                /* FIXME: Does this mean that the file is still open locally?
+                 * If not, and I somehow suspect not, we need to cleanup
+                 * below */
                 GOTO(out, rc);
+        }
+
+        spin_lock_irqsave(&import->imp_lock, flags);
+        ood->ood_request->rq_flags &= ~PTL_RPC_FL_REPLAY;
+        /* see comments in llite/file.c:ll_mdc_close() */
+        if (ood->ood_request->rq_transno) {
+                LBUG(); /* this can't happen yet */
+                if (!request->rq_transno) {
+                        request->rq_transno = ood->ood_request->rq_transno;
+                        ptlrpc_retain_replayable_request(request, import);
+                }
+                spin_unlock_irqrestore(&import->imp_lock, flags);
+        } else {
+                spin_unlock_irqrestore(&import->imp_lock, flags);
+                ptlrpc_req_finished(ood->ood_request);
+        }
 
         body = lustre_msg_buf(request->rq_repmsg, 0);
-        CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
-        if (oa)
-                memcpy(oa, &body->oa, sizeof(*oa));
+        memcpy(oa, &body->oa, sizeof(*oa));
 
         EXIT;
  out:
@@ -401,7 +500,6 @@ static void unmap_and_decref_bulk_desc(void *data)
         struct list_head *tmp;
         ENTRY;
 
-        /* This feels wrong to me. */
         list_for_each(tmp, &desc->bd_page_list) {
                 struct ptlrpc_bulk_page *bulk;
                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
@@ -435,6 +533,27 @@ static void osc_ptl_ev_hdlr(struct ptlrpc_bulk_desc *desc)
         EXIT;
 }
 
+/*
+ * This is called when there was a bulk error return.  However, we don't know
+ * whether the bulk completed or not.  We cancel the portals bulk descriptors,
+ * so that if the OST decides to send them later we don't double free.  Then
+ * remove this descriptor from the set so that the set callback doesn't wait
+ * forever for the last CB_PHASE_FINISH to be called, and finally dump all of
+ * the bulk descriptor references.
+ */
+static void osc_ptl_ev_abort(struct ptlrpc_bulk_desc *desc)
+{
+        ENTRY;
+
+        LASSERT(desc->bd_brw_set != NULL);
+
+        ptlrpc_abort_bulk(desc);
+        obd_brw_set_del(desc);
+        unmap_and_decref_bulk_desc(desc);
+
+        EXIT;
+}
+
 static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *lsm,
                         obd_count page_count, struct brw_page *pga,
                         struct obd_brw_set *set)
@@ -445,12 +564,12 @@ static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *lsm,
         struct ptlrpc_bulk_desc *desc = NULL;
         struct ost_body *body;
         int rc, size[3] = {sizeof(*body)}, mapped = 0;
-        unsigned long flags;
         struct obd_ioobj *iooptr;
         void *nioptr;
         __u32 xid;
         ENTRY;
 
+restart_bulk:
         size[1] = sizeof(struct obd_ioobj);
         size[2] = page_count * sizeof(struct niobuf_remote);
 
@@ -459,6 +578,7 @@ static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *lsm,
                 RETURN(-ENOMEM);
 
         body = lustre_msg_buf(request->rq_reqmsg, 0);
+        body->oa.o_valid = HTON__u32(OBD_MD_FLCKSUM * CHECKSUM_BULK);
 
         desc = ptlrpc_prep_bulk(connection);
         if (!desc)
@@ -472,16 +592,16 @@ static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *lsm,
         ost_pack_ioo(&iooptr, lsm, page_count);
         /* end almost identical to brw_write case */
 
-        spin_lock_irqsave(&imp->imp_lock, flags);
-        xid = ++imp->imp_last_xid;       /* single xid for all pages */
-        spin_unlock_irqrestore(&imp->imp_lock, flags);
+        xid = ptlrpc_next_xid();       /* single xid for all pages */
 
         obd_kmap_get(page_count, 0);
 
         for (mapped = 0; mapped < page_count; mapped++) {
                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
-                if (bulk == NULL)
-                        GOTO(out_unmap, rc = -ENOMEM);
+                if (bulk == NULL) {
+                        unmap_and_decref_bulk_desc(desc);
+                        GOTO(out_req, rc = -ENOMEM);
+                }
 
                 bulk->bp_xid = xid;           /* single xid for all pages */
 
@@ -496,8 +616,8 @@ static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *lsm,
          * Register the bulk first, because the reply could arrive out of order,
          * and we want to be ready for the bulk data.
          *
-         * One reference is released when brw_finish is complete, the other when
-         * the caller removes us from the "set" list.
+         * One reference is released when osc_ptl_ev_hdlr() is called by
+         * portals, the other when the caller removes us from the "set" list.
          *
          * On error, we never do the brw_finish, so we handle all decrefs.
          */
@@ -506,38 +626,70 @@ static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *lsm,
                        OBD_FAIL_OSC_BRW_READ_BULK);
         } else {
                 rc = ptlrpc_register_bulk_put(desc);
-                if (rc)
-                        GOTO(out_unmap, rc);
+                if (rc) {
+                        unmap_and_decref_bulk_desc(desc);
+                        GOTO(out_req, rc);
+                }
                 obd_brw_set_add(set, desc);
         }
 
+        request->rq_flags |= PTL_RPC_FL_NO_RESEND;
         request->rq_replen = lustre_msg_size(1, size);
         rc = ptlrpc_queue_wait(request);
 
-        /*
-         * XXX: If there is an error during the processing of the callback,
-         *      such as a timeout in a sleep that it performs, brw_finish
-         *      will never get called, and we'll leak the desc, fail to kunmap
-         *      things, cats will live with dogs.  One solution would be to
-         *      export brw_finish as osc_brw_finish, so that the timeout case
-         *      and its kin could call it for proper cleanup.  An alternative
-         *      would be for an error return from the callback to cause us to
-         *      clean up, but that doesn't help the truly async cases (like
-         *      LOV), which will immediately return from their PHASE_START
-         *      callback, before any such cleanup-requiring error condition can
-         *      be detected.
-         */
+        /* XXX bug 937 here */
+        if (rc == -ETIMEDOUT && (request->rq_flags & PTL_RPC_FL_RESEND)) {
+                DEBUG_REQ(D_HA, request,  "BULK TIMEOUT");
+                ptlrpc_req_finished(request);
+                goto restart_bulk;
+        }
+
+        if (rc) {
+                osc_ptl_ev_abort(desc);
+                GOTO(out_req, rc);
+        }
+
+#if CHECKSUM_BULK
+        body = lustre_msg_buf(request->rq_repmsg, 0);
+        if (body->oa.o_valid & NTOH__u32(OBD_MD_FLCKSUM)) {
+                static int cksum_counter;
+                __u64 server_cksum = NTOH__u64(body->oa.o_rdev);
+                __u64 cksum = 0;
+
+                for (mapped = 0; mapped < page_count; mapped++) {
+                        char *ptr = kmap(pga[mapped].pg);
+                        int   off = pga[mapped].off & (PAGE_SIZE - 1);
+                        int   len = pga[mapped].count;
+
+                        LASSERT(off + len <= PAGE_SIZE);
+                        ost_checksum(&cksum, ptr + off, len);
+                        kunmap(pga[mapped].pg);
+                }
+
+                cksum_counter++;
+                if (server_cksum != cksum) {
+                        CERROR("Bad checksum: server "LPX64", client "LPX64
+                               ", server NID "LPX64"\n", server_cksum, cksum,
+                               imp->imp_connection->c_peer.peer_nid);
+                        cksum_counter = 0;
+                } else if ((cksum_counter & (-cksum_counter)) == cksum_counter)
+                        CERROR("Checksum %u from "LPX64" OK: "LPX64"\n",
+                               cksum_counter,
+                               imp->imp_connection->c_peer.peer_nid, cksum);
+        } else {
+                static int cksum_missed;
+                cksum_missed++;
+                if ((cksum_missed & (-cksum_missed)) == cksum_missed)
+                        CERROR("Request checksum %u from "LPX64", no reply\n",
+                               cksum_missed,
+                               imp->imp_connection->c_peer.peer_nid);
+        }
+#endif
+
+        EXIT;
  out_req:
         ptlrpc_req_finished(request);
-        RETURN(rc);
-
-        /* Clean up on error. */
-out_unmap:
-        while (mapped-- > 0)
-                kunmap(pga[mapped].pg);
-        obd_kmap_put(page_count);
-        ptlrpc_bulk_decref(desc);
-        goto out_req;
+        return rc;
 }
 
 static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *lsm,
@@ -550,12 +702,15 @@ static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *lsm,
         struct ptlrpc_bulk_desc *desc = NULL;
         struct ost_body *body;
         int rc, size[3] = {sizeof(*body)}, mapped = 0;
-        unsigned long flags;
         struct obd_ioobj *iooptr;
         void *nioptr;
         __u32 xid;
+#if CHECKSUM_BULK
+        __u64 cksum = 0;
+#endif
         ENTRY;
 
+restart_bulk:
         size[1] = sizeof(struct obd_ioobj);
         size[2] = page_count * sizeof(struct niobuf_remote);
 
@@ -577,26 +732,31 @@ static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *lsm,
         ost_pack_ioo(&iooptr, lsm, page_count);
         /* end almost identical to brw_read case */
 
-        spin_lock_irqsave(&imp->imp_lock, flags);
-        xid = ++imp->imp_last_xid;       /* single xid for all pages */
-        spin_unlock_irqrestore(&imp->imp_lock, flags);
+        xid = ptlrpc_next_xid();       /* single xid for all pages */
 
         obd_kmap_get(page_count, 0);
 
         for (mapped = 0; mapped < page_count; mapped++) {
                 struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
-                if (bulk == NULL)
-                        GOTO(out_unmap, rc = -ENOMEM);
+                if (bulk == NULL) {
+                        unmap_and_decref_bulk_desc(desc);
+                        GOTO(out_req, rc = -ENOMEM);
+                }
 
                 bulk->bp_xid = xid;           /* single xid for all pages */
 
                 bulk->bp_buf = kmap(pga[mapped].pg);
                 bulk->bp_page = pga[mapped].pg;
-                bulk->bp_buflen = PAGE_SIZE;
+                bulk->bp_buflen = pga[mapped].count;
                 ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
                                 pga[mapped].flag, bulk->bp_xid);
+                ost_checksum(&cksum, bulk->bp_buf, bulk->bp_buflen);
         }
 
+#if CHECKSUM_BULK
+        body->oa.o_rdev = HTON__u64(cksum);
+        body->oa.o_valid |= HTON__u32(OBD_MD_FLCKSUM);
+#endif
         /*
          * Register the bulk first, because the reply could arrive out of
          * order, and we want to be ready for the bulk data.
@@ -608,47 +768,363 @@ static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *lsm,
          */
         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_WRITE_BULK)) {
                 CERROR("obd_fail_loc=%x, skipping register_bulk\n",
-                OBD_FAIL_OSC_BRW_WRITE_BULK);
+                       OBD_FAIL_OSC_BRW_WRITE_BULK);
         } else {
                 rc = ptlrpc_register_bulk_get(desc);
-                if (rc)
-                        GOTO(out_unmap, rc);
+                if (rc) {
+                        unmap_and_decref_bulk_desc(desc);
+                        GOTO(out_req, rc);
+                }
                 obd_brw_set_add(set, desc);
         }
 
+        request->rq_flags |= PTL_RPC_FL_NO_RESEND;
         request->rq_replen = lustre_msg_size(1, size);
         rc = ptlrpc_queue_wait(request);
 
-        /*
-         * XXX: If there is an error during the processing of the callback,
-         *      such as a timeout in a sleep that it performs, brw_finish
-         *      will never get called, and we'll leak the desc, fail to kunmap
-         *      things, cats will live with dogs.  One solution would be to
-         *      export brw_finish as osc_brw_finish, so that the timeout case
-         *      and its kin could call it for proper cleanup.  An alternative
-         *      would be for an error return from the callback to cause us to
-         *      clean up, but that doesn't help the truly async cases (like
-         *      LOV), which will immediately return from their PHASE_START
-         *      callback, before any such cleanup-requiring error condition can
-         *      be detected.
-         */
+        /* XXX bug 937 here */
+        if (rc == -ETIMEDOUT && (request->rq_flags & PTL_RPC_FL_RESEND)) {
+                DEBUG_REQ(D_HA, request,  "BULK TIMEOUT");
+                ptlrpc_req_finished(request);
+                goto restart_bulk;
+        }
+
+        if (rc) {
+                osc_ptl_ev_abort(desc);
+                GOTO(out_req, rc);
+        }
+
+        EXIT;
  out_req:
         ptlrpc_req_finished(request);
+        return rc;
+}
+
+#ifndef min_t
+#define min_t(a,b,c) ( b<c ) ? b : c
+#endif
+
+#warning "FIXME: make values dynamic based on get_info at setup (bug 665)"
+#define OSC_BRW_MAX_SIZE 65536
+#define OSC_BRW_MAX_IOV min_t(int, PTL_MD_MAX_IOV, OSC_BRW_MAX_SIZE/PAGE_SIZE)
+
+static int osc_brw(int cmd, struct lustre_handle *conn,
+                   struct lov_stripe_md *md, obd_count page_count,
+                   struct brw_page *pga, struct obd_brw_set *set,
+                   struct obd_trans_info *oti)
+{
+        ENTRY;
+
+        while (page_count) {
+                obd_count pages_per_brw;
+                int rc;
+
+                if (page_count > OSC_BRW_MAX_IOV)
+                        pages_per_brw = OSC_BRW_MAX_IOV;
+                else
+                        pages_per_brw = page_count;
+
+                if (cmd & OBD_BRW_WRITE)
+                        rc = osc_brw_write(conn, md, pages_per_brw, pga,
+                                           set, oti);
+                else
+                        rc = osc_brw_read(conn, md, pages_per_brw, pga, set);
+
+                if (rc != 0)
+                        RETURN(rc);
+
+                page_count -= pages_per_brw;
+                pga += pages_per_brw;
+        }
+        RETURN(0);
+}
+
+#ifdef __KERNEL__
+/* Note: caller will lock/unlock, and set uptodate on the pages */
+#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
+static int sanosc_brw_read(struct lustre_handle *conn,
+                           struct lov_stripe_md *md,
+                           obd_count page_count,
+                           struct brw_page *pga,
+                           struct obd_brw_set *set)
+{
+        struct ptlrpc_request *request = NULL;
+        struct ost_body *body;
+        struct niobuf_remote *remote, *nio_rep;
+        int rc, j, size[3] = {sizeof(*body)}, mapped = 0;
+        struct obd_ioobj *iooptr;
+        void *nioptr;
+        ENTRY;
+
+        size[1] = sizeof(struct obd_ioobj);
+        size[2] = page_count * sizeof(*remote);
+
+        request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SAN_READ, 3,
+                                  size, NULL);
+        if (!request)
+                RETURN(-ENOMEM);
+
+        body = lustre_msg_buf(request->rq_reqmsg, 0);
+        iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
+        nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
+        ost_pack_ioo(&iooptr, md, page_count);
+
+        obd_kmap_get(page_count, 0);
+
+        for (mapped = 0; mapped < page_count; mapped++) {
+                LASSERT(PageLocked(pga[mapped].pg));
+
+                kmap(pga[mapped].pg);
+                ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
+                                pga[mapped].flag, 0);
+        }
+
+        size[1] = page_count * sizeof(*remote);
+        request->rq_replen = lustre_msg_size(2, size);
+
+        rc = ptlrpc_queue_wait(request);
+        if (rc)
+                GOTO(out_unmap, rc);
+
+        nioptr = lustre_msg_buf(request->rq_repmsg, 1);
+        if (!nioptr)
+                GOTO(out_unmap, rc = -EINVAL);
+
+        if (request->rq_repmsg->buflens[1] != size[1]) {
+                CERROR("buffer length wrong (%d vs. %d)\n",
+                       request->rq_repmsg->buflens[1], size[1]);
+                GOTO(out_unmap, rc = -EINVAL);
+        }
+
+        for (j = 0; j < page_count; j++) {
+                ost_unpack_niobuf(&nioptr, &remote);
+        }
+
+        nioptr = lustre_msg_buf(request->rq_repmsg, 1);
+        nio_rep = (struct niobuf_remote*)nioptr;
+
+        /* actual read */
+        for (j = 0; j < page_count; j++) {
+                struct page *page = pga[j].pg;
+                struct buffer_head *bh;
+                kdev_t dev;
+
+                /* got san device associated */
+                LASSERT(class_conn2obd(conn));
+                dev = class_conn2obd(conn)->u.cli.cl_sandev;
+
+                /* hole */
+                if (!nio_rep[j].offset) {
+                        CDEBUG(D_PAGE, "hole at ino %lu; index %ld\n",
+                                        page->mapping->host->i_ino,
+                                        page->index);
+                        memset(page_address(page), 0, PAGE_SIZE);
+                        continue;
+                }
+
+                if (!page->buffers) {
+                        create_empty_buffers(page, dev, PAGE_SIZE);
+                        bh = page->buffers;
+
+                        clear_bit(BH_New, &bh->b_state);
+                        set_bit(BH_Mapped, &bh->b_state);
+                        bh->b_blocknr = (unsigned long)nio_rep[j].offset;
+
+                        clear_bit(BH_Uptodate, &bh->b_state);
+
+                        ll_rw_block(READ, 1, &bh);
+                } else {
+                        bh = page->buffers;
+
+                        /* if buffer already existed, it must be the
+                         * one we mapped before, check it */
+                        LASSERT(!test_bit(BH_New, &bh->b_state));
+                        LASSERT(test_bit(BH_Mapped, &bh->b_state));
+                        LASSERT(bh->b_blocknr ==
+                                (unsigned long)nio_rep[j].offset);
+
+                        /* wait it's io completion */
+                        if (test_bit(BH_Lock, &bh->b_state))
+                                wait_on_buffer(bh);
+
+                        if (!test_bit(BH_Uptodate, &bh->b_state))
+                                ll_rw_block(READ, 1, &bh);
+                }
+
+
+                /* must do syncronous write here */
+                wait_on_buffer(bh);
+                if (!buffer_uptodate(bh)) {
+                        /* I/O error */
+                        rc = -EIO;
+                        goto out_unmap;
+                }
+        }
+
+out_req:
+        ptlrpc_req_finished(request);
         RETURN(rc);
 
+out_unmap:
         /* Clean up on error. */
+        while (mapped-- > 0)
+                kunmap(pga[mapped].pg);
+
+        obd_kmap_put(page_count);
+
+        goto out_req;
+}
+
+static int sanosc_brw_write(struct lustre_handle *conn,
+                            struct lov_stripe_md *md,
+                            obd_count page_count,
+                            struct brw_page *pga,
+                            struct obd_brw_set *set)
+{
+        struct ptlrpc_request *request = NULL;
+        struct ost_body *body;
+        struct niobuf_remote *remote, *nio_rep;
+        int rc, j, size[3] = {sizeof(*body)}, mapped = 0;
+        struct obd_ioobj *iooptr;
+        void *nioptr;
+        ENTRY;
+
+        size[1] = sizeof(struct obd_ioobj);
+        size[2] = page_count * sizeof(*remote);
+
+        request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SAN_WRITE,
+                                  3, size, NULL);
+        if (!request)
+                RETURN(-ENOMEM);
+
+        body = lustre_msg_buf(request->rq_reqmsg, 0);
+        iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
+        nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
+        ost_pack_ioo(&iooptr, md, page_count);
+
+        /* map pages, and pack request */
+        obd_kmap_get(page_count, 0);
+        for (mapped = 0; mapped < page_count; mapped++) {
+                LASSERT(PageLocked(pga[mapped].pg));
+
+                kmap(pga[mapped].pg);
+                ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
+                                pga[mapped].flag, 0);
+        }
+
+        size[1] = page_count * sizeof(*remote);
+        request->rq_replen = lustre_msg_size(2, size);
+
+        rc = ptlrpc_queue_wait(request);
+        if (rc)
+                GOTO(out_unmap, rc);
+
+        nioptr = lustre_msg_buf(request->rq_repmsg, 1);
+        if (!nioptr)
+                GOTO(out_unmap, rc = -EINVAL);
+
+        if (request->rq_repmsg->buflens[1] != size[1]) {
+                CERROR("buffer length wrong (%d vs. %d)\n",
+                       request->rq_repmsg->buflens[1], size[1]);
+                GOTO(out_unmap, rc = -EINVAL);
+        }
+
+        for (j = 0; j < page_count; j++) {
+                ost_unpack_niobuf(&nioptr, &remote);
+        }
+
+        nioptr = lustre_msg_buf(request->rq_repmsg, 1);
+        nio_rep = (struct niobuf_remote*)nioptr;
+
+        /* actual write */
+        for (j = 0; j < page_count; j++) {
+                struct page *page = pga[j].pg;
+                struct buffer_head *bh;
+                kdev_t dev;
+
+                /* got san device associated */
+                LASSERT(class_conn2obd(conn));
+                dev = class_conn2obd(conn)->u.cli.cl_sandev;
+
+                if (!page->buffers) {
+                        create_empty_buffers(page, dev, PAGE_SIZE);
+                } else {
+                        /* checking */
+                        LASSERT(!test_bit(BH_New, &page->buffers->b_state));
+                        LASSERT(test_bit(BH_Mapped, &page->buffers->b_state));
+                        LASSERT(page->buffers->b_blocknr ==
+                                (unsigned long)nio_rep[j].offset);
+                }
+                bh = page->buffers;
+
+                LASSERT(bh);
+
+                /* if buffer locked, wait it's io completion */
+                if (test_bit(BH_Lock, &bh->b_state))
+                        wait_on_buffer(bh);
+
+                clear_bit(BH_New, &bh->b_state);
+                set_bit(BH_Mapped, &bh->b_state);
+
+                /* override the block nr */
+                bh->b_blocknr = (unsigned long)nio_rep[j].offset;
+
+                /* we are about to write it, so set it
+                 * uptodate/dirty
+                 * page lock should garentee no race condition here */
+                set_bit(BH_Uptodate, &bh->b_state);
+                set_bit(BH_Dirty, &bh->b_state);
+
+                ll_rw_block(WRITE, 1, &bh);
+
+                /* must do syncronous write here */
+                wait_on_buffer(bh);
+                if (!buffer_uptodate(bh) || test_bit(BH_Dirty, &bh->b_state)) {
+                        /* I/O error */
+                        rc = -EIO;
+                        goto out_unmap;
+                }
+        }
+
+out_req:
+        ptlrpc_req_finished(request);
+        RETURN(rc);
+
 out_unmap:
+        /* Clean up on error. */
         while (mapped-- > 0)
                 kunmap(pga[mapped].pg);
+
         obd_kmap_put(page_count);
-        ptlrpc_bulk_decref(desc);
+
         goto out_req;
 }
+#else
+static int sanosc_brw_read(struct lustre_handle *conn,
+                           struct lov_stripe_md *md,
+                           obd_count page_count,
+                           struct brw_page *pga,
+                           struct obd_brw_set *set)
+{
+        LBUG();
+        return 0;
+}
 
-static int osc_brw(int cmd, struct lustre_handle *conn,
-                   struct lov_stripe_md *md, obd_count page_count,
-                   struct brw_page *pga, struct obd_brw_set *set, 
-                   struct obd_trans_info *oti)
+static int sanosc_brw_write(struct lustre_handle *conn,
+                            struct lov_stripe_md *md,
+                            obd_count page_count,
+                            struct brw_page *pga,
+                            struct obd_brw_set *set)
+{
+        LBUG();
+        return 0;
+}
+#endif
+
+static int sanosc_brw(int cmd, struct lustre_handle *conn,
+                      struct lov_stripe_md *md, obd_count page_count,
+                      struct brw_page *pga, struct obd_brw_set *set,
+                      struct obd_trans_info *oti)
 {
         ENTRY;
 
@@ -656,15 +1132,16 @@ static int osc_brw(int cmd, struct lustre_handle *conn,
                 obd_count pages_per_brw;
                 int rc;
 
-                if (page_count > PTL_MD_MAX_IOV)
-                        pages_per_brw = PTL_MD_MAX_IOV;
+                if (page_count > OSC_BRW_MAX_IOV)
+                        pages_per_brw = OSC_BRW_MAX_IOV;
                 else
                         pages_per_brw = page_count;
 
                 if (cmd & OBD_BRW_WRITE)
-                        rc = osc_brw_write(conn, md, pages_per_brw, pga, set, oti);
+                        rc = sanosc_brw_write(conn, md, pages_per_brw,
+                                              pga, set);
                 else
-                        rc = osc_brw_read(conn, md, pages_per_brw, pga, set);
+                        rc = sanosc_brw_read(conn, md, pages_per_brw, pga, set);
 
                 if (rc != 0)
                         RETURN(rc);
@@ -674,6 +1151,7 @@ static int osc_brw(int cmd, struct lustre_handle *conn,
         }
         RETURN(0);
 }
+#endif
 
 static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *lsm,
                        struct lustre_handle *parent_lock,
@@ -906,7 +1384,7 @@ static int osc_iocontrol(unsigned int cmd, struct lustre_handle *conn, int len,
                         GOTO(out, err = -EINVAL);
                 }
 
-                if (data->ioc_inllen2 < sizeof(uuid.uuid)) {
+                if (data->ioc_inllen2 < sizeof(uuid)) {
                         OBD_FREE(buf, len);
                         GOTO(out, err = -EINVAL);
                 }
@@ -918,10 +1396,9 @@ static int osc_iocontrol(unsigned int cmd, struct lustre_handle *conn, int len,
                 desc->ld_default_stripe_size = 0;
                 desc->ld_default_stripe_offset = 0;
                 desc->ld_pattern = 0;
-                memcpy(desc->ld_uuid.uuid,  obddev->obd_uuid.uuid, sizeof(uuid.uuid));
+                memcpy(&desc->ld_uuid, &obddev->obd_uuid, sizeof(uuid));
 
-                memcpy(data->ioc_inlbuf2,  obddev->obd_uuid.uuid, 
-                       sizeof(uuid.uuid));
+                memcpy(data->ioc_inlbuf2, &obddev->obd_uuid, sizeof(uuid));
 
                 err = copy_to_user((void *)uarg, buf, len);
                 if (err)
@@ -967,15 +1444,15 @@ static void set_osc_active(struct obd_import *imp, int active)
 
                 fakeconn.addr = (__u64)(unsigned long)exp;
                 fakeconn.cookie = exp->exp_cookie;
-                ioc_data.ioc_inlbuf1 = &imp->imp_obd->u.cli.cl_target_uuid;
+                ioc_data.ioc_inlbuf1 =
+                        (char *)&imp->imp_obd->u.cli.cl_target_uuid;
                 ioc_data.ioc_offset = active;
                 rc = obd_iocontrol(IOC_LOV_SET_OSC_ACTIVE, &fakeconn,
                                    sizeof ioc_data, &ioc_data, NULL);
-                if (rc) {
-                        CERROR("disabling %s on LOV %p/%s: %d\n",
+                if (rc)
+                        CERROR("error disabling %s on LOV %p/%s: %d\n",
                                imp->imp_obd->u.cli.cl_target_uuid.uuid,
                                notify_obd, notify_obd->obd_uuid.uuid, rc);
-                }
         } else {
                 CDEBUG(D_HA, "No exports for obd %p/%s, can't notify about "
                        "%p\n", notify_obd, notify_obd->obd_uuid.uuid,
@@ -987,30 +1464,86 @@ static int osc_recover(struct obd_import *imp, int phase)
 {
         int rc;
         unsigned long flags;
+        int msg_flags;
         struct ptlrpc_request *req;
+        struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
         ENTRY;
 
+        CDEBUG(D_HA, "%s: entering phase: %d\n",
+               imp->imp_obd->obd_name, phase);
         switch(phase) {
 
             case PTLRPC_RECOVD_PHASE_PREPARE: {
-                struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
-                ldlm_namespace_cleanup(ns, 1 /* no network ops */);
-                ptlrpc_abort_inflight(imp, 0);
-                set_osc_active(imp, 0 /* inactive */);
+                if (imp->imp_flags & IMP_REPLAYABLE) {
+                        CDEBUG(D_HA, "failover OST\n");
+                        /* If we're a failover OSC/OST, just cancel unused
+                         * locks to simplify lock replay.
+                         */
+                        ldlm_cli_cancel_unused(ns, NULL, LDLM_FL_LOCAL_ONLY);
+                } else {
+                        CDEBUG(D_HA, "non-failover OST\n");
+                        /* Non-failover OSTs (LLNL scenario) disable the OSC
+                         * and invalidate local state.
+                         */
+                        ldlm_namespace_cleanup(ns, 1 /* no network ops */);
+                        ptlrpc_abort_inflight(imp, 0);
+                        set_osc_active(imp, 0 /* inactive */);
+                }
                 RETURN(0);
             }
 
-            case PTLRPC_RECOVD_PHASE_RECOVER:
+        case PTLRPC_RECOVD_PHASE_RECOVER: {
+        reconnect:
                 imp->imp_flags &= ~IMP_INVALID;
                 rc = ptlrpc_reconnect_import(imp, OST_CONNECT, &req);
-                ptlrpc_req_finished(req);
+
+                msg_flags = req->rq_repmsg
+                        ? lustre_msg_get_op_flags(req->rq_repmsg)
+                        : 0;
+
+                if (rc == -EBUSY && (msg_flags & MSG_CONNECT_RECOVERING))
+                        CERROR("reconnect denied by recovery; should retry\n");
+
                 if (rc) {
+                        if (phase != PTLRPC_RECOVD_PHASE_NOTCONN) {
+                                CERROR("can't reconnect, invalidating\n");
+                                ldlm_namespace_cleanup(ns, 1);
+                                ptlrpc_abort_inflight(imp, 0);
+                        }
                         imp->imp_flags |= IMP_INVALID;
+                        ptlrpc_req_finished(req);
                         RETURN(rc);
                 }
 
+                if (msg_flags & MSG_CONNECT_RECOVERING) {
+                        /* Replay if they want it. */
+                        DEBUG_REQ(D_HA, req, "OST wants replay");
+                        rc = ptlrpc_replay(imp);
+                        if (rc)
+                                GOTO(check_rc, rc);
+
+                        rc = ldlm_replay_locks(imp);
+                        if (rc)
+                                GOTO(check_rc, rc);
+
+                        rc = signal_completed_replay(imp);
+                        if (rc)
+                                GOTO(check_rc, rc);
+                } else if (msg_flags & MSG_CONNECT_RECONNECT) {
+                        DEBUG_REQ(D_HA, req, "reconnecting to MDS\n");
+                        /* Nothing else to do here. */
+                } else {
+                        DEBUG_REQ(D_HA, req, "evicted: invalidating\n");
+                        /* Otherwise, clean everything up. */
+                        ldlm_namespace_cleanup(ns, 1);
+                        ptlrpc_abort_inflight(imp, 0);
+                }
+
+                ptlrpc_req_finished(req);
+
                 spin_lock_irqsave(&imp->imp_lock, flags);
                 imp->imp_level = LUSTRE_CONN_FULL;
+                imp->imp_flags &= ~IMP_INVALID;
                 spin_unlock_irqrestore(&imp->imp_lock, flags);
 
                 /* Is this the right place?  Should we do this in _PREPARE
@@ -1018,9 +1551,21 @@ static int osc_recover(struct obd_import *imp, int phase)
                  */
                 ptlrpc_wake_delayed(imp);
 
+                rc = ptlrpc_resend(imp);
+                if (rc)
+                        GOTO(check_rc, rc);
+
                 set_osc_active(imp, 1 /* active */);
                 RETURN(0);
 
+        check_rc:
+                /* If we get disconnected in the middle, recovery has probably
+                 * failed.  Reconnect and find out.
+                 */
+                if (rc == -ENOTCONN)
+                        goto reconnect;
+                RETURN(rc);
+        }
             case PTLRPC_RECOVD_PHASE_NOTCONN:
                 osc_recover(imp, PTLRPC_RECOVD_PHASE_PREPARE);
                 RETURN(osc_recover(imp, PTLRPC_RECOVD_PHASE_RECOVER));
@@ -1064,23 +1609,67 @@ struct obd_ops osc_obd_ops = {
         o_iocontrol:    osc_iocontrol
 };
 
-static int __init osc_init(void)
+struct obd_ops sanosc_obd_ops = {
+        o_owner:        THIS_MODULE,
+        o_attach:       osc_attach,
+        o_detach:       osc_detach,
+        o_cleanup:      client_obd_cleanup,
+        o_connect:      osc_connect,
+        o_disconnect:   client_obd_disconnect,
+        o_statfs:       osc_statfs,
+        o_packmd:       osc_packmd,
+        o_unpackmd:     osc_unpackmd,
+        o_create:       osc_create,
+        o_destroy:      osc_destroy,
+        o_getattr:      osc_getattr,
+        o_setattr:      osc_setattr,
+        o_open:         osc_open,
+        o_close:        osc_close,
+#ifdef __KERNEL__
+        o_setup:        client_sanobd_setup,
+        o_brw:          sanosc_brw,
+#endif
+        o_punch:        osc_punch,
+        o_enqueue:      osc_enqueue,
+        o_cancel:       osc_cancel,
+        o_cancel_unused: osc_cancel_unused,
+        o_iocontrol:    osc_iocontrol,
+};
+
+int __init osc_init(void)
 {
         struct lprocfs_static_vars lvars;
+        int rc;
+        ENTRY;
+
+        LASSERT(sizeof(struct osc_obdo_data) <= FD_OSTDATA_SIZE);
 
         lprocfs_init_vars(&lvars);
-        RETURN(class_register_type(&osc_obd_ops, lvars.module_vars,
-                                   LUSTRE_OSC_NAME));
+
+        rc = class_register_type(&osc_obd_ops, lvars.module_vars,
+                                 LUSTRE_OSC_NAME);
+        if (rc)
+                RETURN(rc);
+
+        rc = class_register_type(&sanosc_obd_ops, lvars.module_vars,
+                                 LUSTRE_SANOSC_NAME);
+        if (rc)
+                class_unregister_type(LUSTRE_OSC_NAME);
+
+        RETURN(rc);
 }
 
 static void __exit osc_exit(void)
 {
+        class_unregister_type(LUSTRE_SANOSC_NAME);
         class_unregister_type(LUSTRE_OSC_NAME);
 }
 
+#ifdef __KERNEL__
 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
 MODULE_LICENSE("GPL");
 
 module_init(osc_init);
 module_exit(osc_exit);
+#endif