Whamcloud - gitweb
Lproc-snmp code drop
[fs/lustre-release.git] / lustre / osc / osc_request.c
index 7a34b91..5700797 100644 (file)
@@ -1,7 +1,7 @@
 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
- * Copryright (C) 2001, 2002 Cluster File Systems, Inc.
+ * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
  *
  *  This code is issued under the GNU General Public License.
  *  See the file COPYING in this distribution
 #define EXPORT_SYMTAB
 #define DEBUG_SUBSYSTEM S_OSC
 
+#include <linux/version.h>
 #include <linux/module.h>
+#include <linux/mm.h>
+#include <linux/highmem.h>
 #include <linux/lustre_dlm.h>
+#if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
+#include <linux/workqueue.h>
+#endif
+#include <linux/kp30.h>
 #include <linux/lustre_mds.h> /* for mds_objid */
 #include <linux/obd_ost.h>
 #include <linux/obd_lov.h>
-
-static void osc_obd2cl(struct obd_device *obd, struct ptlrpc_client **cl,
-                       struct ptlrpc_connection **connection)
-{
-        struct osc_obd *osc = &obd->u.osc;
-        *cl = osc->osc_client;
-        *connection = osc->osc_conn;
-}
-
-static void osc_con2cl(struct lustre_handle *conn, struct ptlrpc_client **cl,
-                       struct ptlrpc_connection **connection)
-{
-        struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
-        *cl = osc->osc_client;
-        *connection = osc->osc_conn;
-}
-
-static void osc_con2dlmcl(struct lustre_handle *conn, struct ptlrpc_client **cl,
-                          struct ptlrpc_connection **connection)
-{
-        struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
-        *cl = osc->osc_ldlm_client;
-        *connection = osc->osc_conn;
-}
-
-static int osc_connect(struct lustre_handle *conn, struct obd_device *obd)
-{
-        struct osc_obd *osc = &obd->u.osc;
-        struct obd_import *import;
-        struct ptlrpc_request *request;
-        struct ptlrpc_client *cl;
-        struct ptlrpc_connection *connection;
-        char *tmp = osc->osc_target_uuid;
-        int rc, size = sizeof(osc->osc_target_uuid);
-        ENTRY;
-
-        OBD_ALLOC(import, sizeof(*import)); 
-        if (!import)
-                RETURN(-ENOMEM);
-                  
-        MOD_INC_USE_COUNT;
-        rc = class_connect(conn, obd);
-        if (rc) 
-                RETURN(rc); 
-
-        osc_obd2cl(obd, &cl, &connection);
-        request = ptlrpc_prep_req(osc->osc_client, osc->osc_conn, 
-                                  OST_CONNECT, 1, &size, &tmp);
-        if (!request)
-                GOTO(out_disco, -ENOMEM);
-
-        request->rq_replen = lustre_msg_size(0, NULL);
-
-        rc = ptlrpc_queue_wait(request);
-        rc = ptlrpc_check_status(request, rc);
-        if (rc) {
-                CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
-                GOTO(out, rc);
-        }
-
-        /* XXX: Make this a handle */
-        osc->osc_connh.addr = request->rq_repmsg->addr;
-        osc->osc_connh.cookie = request->rq_repmsg->cookie;
-
-        EXIT;
- out:
-        ptlrpc_free_req(request);
- out_disco:
-        class_disconnect(conn); 
-        if (rc)
-                MOD_DEC_USE_COUNT;
-        return rc;
-}
-
-static int osc_disconnect(struct lustre_handle *conn)
+#include <linux/ctype.h>
+#include <linux/init.h>
+#include <linux/lustre_ha.h>
+#include <linux/obd_support.h> /* for OBD_FAIL_CHECK */
+#include <linux/lustre_lite.h> /* for ll_i2info */
+#include <portals/lib-types.h> /* for PTL_MD_MAX_IOV */
+#include <linux/lprocfs_status.h>
+
+extern lprocfs_vars_t status_var_nm_1[];
+extern lprocfs_vars_t status_class_var[];
+
+static int osc_getattr(struct lustre_handle *conn, struct obdo *oa,
+                       struct lov_stripe_md *md)
 {
         struct ptlrpc_request *request;
-        struct ptlrpc_client *cl;
-        struct ptlrpc_connection *connection;
-        struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
-        int rc;
-        ENTRY;
-
-        osc_con2cl(conn, &cl, &connection);
-        request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh,
-                                  OST_DISCONNECT, 0, NULL, NULL);
-        if (!request)
-                RETURN(-ENOMEM);
-        request->rq_replen = lustre_msg_size(0, NULL);
-
-        rc = ptlrpc_queue_wait(request);
-        if (rc) 
-                GOTO(out, rc);
-        rc = class_disconnect(conn);
-        if (!rc)
-                MOD_DEC_USE_COUNT;
-
- out:
-        ptlrpc_free_req(request);
-        return rc;
-}
-
-static int osc_getattr(struct lustre_handle *conn, struct obdo *oa)
-{
-        struct ptlrpc_request *request;
-        struct ptlrpc_client *cl;
-        struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
-        struct ptlrpc_connection *connection;
         struct ost_body *body;
         int rc, size = sizeof(*body);
         ENTRY;
 
-        osc_con2cl(conn, &cl, &connection);
-        request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
-                                   OST_GETATTR, 1, &size, NULL);
+        request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_GETATTR, 1,
+                                  &size, NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
         body = lustre_msg_buf(request->rq_reqmsg, 0);
+#warning FIXME: pack only valid fields instead of memcpy, endianness
         memcpy(&body->oa, oa, sizeof(*oa));
-        body->oa.o_valid = ~0;
 
         request->rq_replen = lustre_msg_size(1, &size);
 
@@ -162,29 +75,26 @@ static int osc_getattr(struct lustre_handle *conn, struct obdo *oa)
 
         EXIT;
  out:
-        ptlrpc_free_req(request);
-        return 0;
+        ptlrpc_req_finished(request);
+        return rc;
 }
 
-static int osc_open(struct lustre_handle *conn, struct obdo *oa)
+static int osc_open(struct lustre_handle *conn, struct obdo *oa,
+                    struct lov_stripe_md *md)
 {
         struct ptlrpc_request *request;
-        struct ptlrpc_client *cl;
-        struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
-        struct ptlrpc_connection *connection;
         struct ost_body *body;
         int rc, size = sizeof(*body);
         ENTRY;
 
-        osc_con2cl(conn, &cl, &connection);
-        request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
-                                   OST_OPEN, 1, &size, NULL);
+        request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_OPEN, 1, &size,
+                                  NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
         body = lustre_msg_buf(request->rq_reqmsg, 0);
+#warning FIXME: pack only valid fields instead of memcpy, endianness
         memcpy(&body->oa, oa, sizeof(*oa));
-        body->oa.o_valid = (OBD_MD_FLMODE | OBD_MD_FLID);
 
         request->rq_replen = lustre_msg_size(1, &size);
 
@@ -200,27 +110,25 @@ static int osc_open(struct lustre_handle *conn, struct obdo *oa)
 
         EXIT;
  out:
-        ptlrpc_free_req(request);
-        return 0;
+        ptlrpc_req_finished(request);
+        return rc;
 }
 
-static int osc_close(struct lustre_handle *conn, struct obdo *oa)
+static int osc_close(struct lustre_handle *conn, struct obdo *oa,
+                     struct lov_stripe_md *md)
 {
         struct ptlrpc_request *request;
-        struct ptlrpc_client *cl;
-        struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
-        struct ptlrpc_connection *connection;
         struct ost_body *body;
         int rc, size = sizeof(*body);
         ENTRY;
 
-        osc_con2cl(conn, &cl, &connection);
-        request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
-                                   OST_CLOSE, 1, &size, NULL);
+        request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CLOSE, 1, &size,
+                                  NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
         body = lustre_msg_buf(request->rq_reqmsg, 0);
+#warning FIXME: pack only valid fields instead of memcpy, endianness
         memcpy(&body->oa, oa, sizeof(*oa));
 
         request->rq_replen = lustre_msg_size(1, &size);
@@ -237,23 +145,20 @@ static int osc_close(struct lustre_handle *conn, struct obdo *oa)
 
         EXIT;
  out:
-        ptlrpc_free_req(request);
-        return 0;
+        ptlrpc_req_finished(request);
+        return rc;
 }
 
-static int osc_setattr(struct lustre_handle *conn, struct obdo *oa)
+static int osc_setattr(struct lustre_handle *conn, struct obdo *oa,
+                       struct lov_stripe_md *md)
 {
         struct ptlrpc_request *request;
-        struct ptlrpc_client *cl;
-        struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
-        struct ptlrpc_connection *connection;
         struct ost_body *body;
         int rc, size = sizeof(*body);
         ENTRY;
 
-        osc_con2cl(conn, &cl, &connection);
-        request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
-                                  OST_SETATTR, 1, &size, NULL);
+        request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SETATTR, 1,
+                                  &size, NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -264,34 +169,36 @@ static int osc_setattr(struct lustre_handle *conn, struct obdo *oa)
 
         rc = ptlrpc_queue_wait(request);
         rc = ptlrpc_check_status(request, rc);
-        GOTO(out, rc);
 
- out:
-        ptlrpc_free_req(request);
-        return 0;
+        ptlrpc_req_finished(request);
+        return rc;
 }
 
-static int osc_create(struct lustre_handle *conn, struct obdo *oa)
+static int osc_create(struct lustre_handle *conn, struct obdo *oa,
+                      struct lov_stripe_md **ea)
 {
         struct ptlrpc_request *request;
-        struct ptlrpc_client *cl;
-        struct ptlrpc_connection *connection;
-        struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
         struct ost_body *body;
-        struct mds_objid *objid;
-        struct lov_object_id *lov_id;
+        struct lov_stripe_md *lsm;
         int rc, size = sizeof(*body);
         ENTRY;
 
-        if (!oa) {
-                CERROR("oa NULL\n");
-                RETURN(-EINVAL);
+        LASSERT(oa);
+        LASSERT(ea);
+
+        lsm = *ea;
+        if (!lsm) {
+                // XXX check oa->o_valid & OBD_MD_FLEASIZE first...
+                OBD_ALLOC(lsm, oa->o_easize);
+                if (!lsm)
+                        RETURN(-ENOMEM);
+                lsm->lsm_mds_easize = oa->o_easize;
         }
-        osc_con2cl(conn, &cl, &connection);
-        request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
-                                  OST_CREATE, 1, &size, NULL);
+
+        request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CREATE, 1, &size,
+                                  NULL);
         if (!request)
-                RETURN(-ENOMEM);
+                GOTO(out, rc = -ENOMEM);
 
         body = lustre_msg_buf(request->rq_reqmsg, 0);
         memcpy(&body->oa, oa, sizeof(*oa));
@@ -301,32 +208,28 @@ static int osc_create(struct lustre_handle *conn, struct obdo *oa)
         rc = ptlrpc_queue_wait(request);
         rc = ptlrpc_check_status(request, rc);
         if (rc)
-                GOTO(out, rc);
+                GOTO(out_req, rc);
 
         body = lustre_msg_buf(request->rq_repmsg, 0);
         memcpy(oa, &body->oa, sizeof(*oa));
 
-        memset(oa->o_inline, 0, sizeof(oa->o_inline));
-        objid = (struct mds_objid *)oa->o_inline;
-        objid->mo_lov_md.lmd_object_id = oa->o_id;
-        objid->mo_lov_md.lmd_stripe_count = 1;
-        lov_id = (struct lov_object_id *)(oa->o_inline + sizeof(*objid));
-        lov_id->l_device_id = 0;
-        lov_id->l_object_id = oa->o_id;
-
+        lsm->lsm_object_id = oa->o_id;
+        lsm->lsm_stripe_count = 0;
+        *ea = lsm;
         EXIT;
- out:
-        ptlrpc_free_req(request);
-        return 0;
+out_req:
+        ptlrpc_req_finished(request);
+out:
+        if (rc && !*ea)
+                OBD_FREE(lsm, oa->o_easize);
+        return rc;
 }
 
-static int osc_punch(struct lustre_handle *conn, struct obdo *oa, obd_size count,
-                     obd_off offset)
+static int osc_punch(struct lustre_handle *conn, struct obdo *oa,
+                     struct lov_stripe_md *md, obd_size start,
+                     obd_size end)
 {
         struct ptlrpc_request *request;
-        struct ptlrpc_client *cl;
-        struct ptlrpc_connection *connection;
-        struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
         struct ost_body *body;
         int rc, size = sizeof(*body);
         ENTRY;
@@ -335,16 +238,20 @@ static int osc_punch(struct lustre_handle *conn, struct obdo *oa, obd_size count
                 CERROR("oa NULL\n");
                 RETURN(-EINVAL);
         }
-        osc_con2cl(conn, &cl, &connection);
-        request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
-                                   OST_PUNCH, 1, &size, NULL);
+
+        request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_PUNCH, 1, &size,
+                                  NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
         body = lustre_msg_buf(request->rq_reqmsg, 0);
+#warning FIXME: pack only valid fields instead of memcpy, endianness, valid
         memcpy(&body->oa, oa, sizeof(*oa));
-        body->oa.o_blocks = count;
-        body->oa.o_valid |= OBD_MD_FLBLOCKS;
+
+        /* overload the size and blocks fields in the oa with start/end */
+        body->oa.o_size = HTON__u64(start);
+        body->oa.o_blocks = HTON__u64(end);
+        body->oa.o_valid |= HTON__u32(OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
 
         request->rq_replen = lustre_msg_size(1, &size);
 
@@ -358,16 +265,14 @@ static int osc_punch(struct lustre_handle *conn, struct obdo *oa, obd_size count
 
         EXIT;
  out:
-        ptlrpc_free_req(request);
-        return 0;
+        ptlrpc_req_finished(request);
+        return rc;
 }
 
-static int osc_destroy(struct lustre_handle *conn, struct obdo *oa)
+static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
+                       struct lov_stripe_md *ea)
 {
         struct ptlrpc_request *request;
-        struct ptlrpc_client *cl;
-        struct ptlrpc_connection *connection;
-        struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
         struct ost_body *body;
         int rc, size = sizeof(*body);
         ENTRY;
@@ -376,15 +281,14 @@ static int osc_destroy(struct lustre_handle *conn, struct obdo *oa)
                 CERROR("oa NULL\n");
                 RETURN(-EINVAL);
         }
-        osc_con2cl(conn, &cl, &connection);
-        request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
-                                   OST_DESTROY, 1, &size, NULL);
+        request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_DESTROY, 1,
+                                  &size, NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
         body = lustre_msg_buf(request->rq_reqmsg, 0);
+#warning FIXME: pack only valid fields instead of memcpy, endianness
         memcpy(&body->oa, oa, sizeof(*oa));
-        body->oa.o_valid = ~0;
 
         request->rq_replen = lustre_msg_size(1, &size);
 
@@ -398,347 +302,408 @@ static int osc_destroy(struct lustre_handle *conn, struct obdo *oa)
 
         EXIT;
  out:
-        ptlrpc_free_req(request);
-        return 0;
+        ptlrpc_req_finished(request);
+        return rc;
 }
 
 struct osc_brw_cb_data {
-        struct page **buf;
-        struct ptlrpc_request *req;
-        bulk_callback_t callback;
+        brw_callback_t callback;
         void *cb_data;
+        void *obd_data;
+        size_t obd_size;
 };
 
-static void brw_read_finish(struct ptlrpc_bulk_desc *desc, void *data)
+/* Our bulk-unmapping bottom half. */
+static void unmap_and_decref_bulk_desc(void *data)
 {
-        struct osc_brw_cb_data *cb_data = data;
+        struct ptlrpc_bulk_desc *desc = data;
+        struct list_head *tmp;
+        ENTRY;
+
+        /* This feels wrong to me. */
+        list_for_each(tmp, &desc->bd_page_list) {
+                struct ptlrpc_bulk_page *bulk;
+                bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
+
+                kunmap(bulk->bp_page);
+        }
+
+        ptlrpc_bulk_decref(desc);
+        EXIT;
+}
 
-        if (desc->b_flags & PTL_RPC_FL_INTR)
-                CERROR("got signal\n");
+static void brw_finish(struct ptlrpc_bulk_desc *desc, void *data)
+{
+        struct osc_brw_cb_data *cb_data = data;
+        int err = 0;
+        ENTRY;
 
-        (cb_data->callback)(desc, cb_data->cb_data);
+        if (desc->bd_flags & PTL_RPC_FL_TIMEOUT) {
+                err = (desc->bd_flags & PTL_RPC_FL_INTR ? -ERESTARTSYS :
+                       -ETIMEDOUT);
+        }
 
-        ptlrpc_free_bulk(desc);
-        ptlrpc_free_req(cb_data->req);
+        if (cb_data->callback)
+                cb_data->callback(cb_data->cb_data, err, CB_PHASE_FINISH);
 
+        if (cb_data->obd_data)
+                OBD_FREE(cb_data->obd_data, cb_data->obd_size);
         OBD_FREE(cb_data, sizeof(*cb_data));
+
+        /* We can't kunmap the desc from interrupt context, so we do it from
+         * the bottom half above. */
+        prepare_work(&desc->bd_queue, unmap_and_decref_bulk_desc, desc);
+        schedule_work(&desc->bd_queue);
+
+        EXIT;
 }
 
-static int osc_brw_read(struct lustre_handle *conn, obd_count num_oa,
-                        struct obdo **oa, obd_count *oa_bufs, struct page **buf,
-                        obd_size *count, obd_off *offset, obd_flag *flags,
-                        bulk_callback_t callback)
+static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *lsm,
+                        obd_count page_count, struct brw_page *pga,
+                        brw_callback_t callback, struct io_cb_data *data)
 {
-        struct ptlrpc_client *cl;
-        struct ptlrpc_connection *connection;
-        struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
-        struct ptlrpc_request *request;
+        struct ptlrpc_connection *connection =
+                client_conn2cli(conn)->cl_import.imp_connection;
+        struct ptlrpc_request *request = NULL;
+        struct ptlrpc_bulk_desc *desc = NULL;
         struct ost_body *body;
-        struct list_head *tmp;
-        int pages, rc, i, j, size[3] = {sizeof(*body)};
-        void *ptr1, *ptr2;
-        struct ptlrpc_bulk_desc *desc;
+        struct osc_brw_cb_data *cb_data = NULL;
+        int rc, size[3] = {sizeof(*body)};
+        void *iooptr, *nioptr;
+        int mapped = 0;
+        __u32 xid;
         ENTRY;
 
-        size[1] = num_oa * sizeof(struct obd_ioobj);
-        pages = 0;
-        for (i = 0; i < num_oa; i++)
-                pages += oa_bufs[i];
-        size[2] = pages * sizeof(struct niobuf_remote);
+        size[1] = sizeof(struct obd_ioobj);
+        size[2] = page_count * sizeof(struct niobuf_remote);
 
-        osc_con2cl(conn, &cl, &connection);
-        request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
-                                  OST_BRW, 3, size, NULL);
+        request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_READ, 3, size,
+                                  NULL);
         if (!request)
-                GOTO(out, rc = -ENOMEM);
+                RETURN(-ENOMEM);
 
         body = lustre_msg_buf(request->rq_reqmsg, 0);
-        body->data = OBD_BRW_READ;
 
         desc = ptlrpc_prep_bulk(connection);
         if (!desc)
-                GOTO(out2, rc = -ENOMEM);
-        desc->b_portal = OST_BULK_PORTAL;
-
-        ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
-        ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
-        for (pages = 0, i = 0; i < num_oa; i++) {
-                ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
-                /* FIXME: this inner loop is wrong for multiple OAs */
-                for (j = 0; j < oa_bufs[i]; j++, pages++) {
-                        struct ptlrpc_bulk_page *bulk;
-                        bulk = ptlrpc_prep_bulk_page(desc);
-                        if (bulk == NULL)
-                                GOTO(out3, rc = -ENOMEM);
-
-                        spin_lock(&connection->c_lock);
-                        bulk->b_xid = ++connection->c_xid_out;
-                        spin_unlock(&connection->c_lock);
-
-                        bulk->b_buf = kmap(buf[pages]);
-                        bulk->b_page = buf[pages];
-                        bulk->b_buflen = PAGE_SIZE;
-                        ost_pack_niobuf(&ptr2, offset[pages], count[pages],
-                                        flags[pages], bulk->b_xid);
-                }
+                GOTO(out_req, rc = -ENOMEM);
+        desc->bd_portal = OST_BULK_PORTAL;
+        desc->bd_cb = brw_finish;
+        OBD_ALLOC(cb_data, sizeof(*cb_data));
+        if (!cb_data)
+                GOTO(out_desc, rc = -ENOMEM);
+
+        cb_data->callback = callback;
+        cb_data->cb_data = data;
+        CDEBUG(D_PAGE, "data(%p)->desc = %p\n", data, desc);
+        data->desc = desc;
+        desc->bd_cb_data = cb_data;
+
+        iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
+        nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
+        ost_pack_ioo(&iooptr, lsm, page_count);
+        /* end almost identical to brw_write case */
+
+        spin_lock(&connection->c_lock);
+        xid = ++connection->c_xid_out;       /* single xid for all pages */
+        spin_unlock(&connection->c_lock);
+
+        for (mapped = 0; mapped < page_count; mapped++) {
+                struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
+                if (bulk == NULL)
+                        GOTO(out_unmap, rc = -ENOMEM);
+
+                bulk->bp_xid = xid;           /* single xid for all pages */
+
+                bulk->bp_buf = kmap(pga[mapped].pg);
+                bulk->bp_page = pga[mapped].pg;
+                bulk->bp_buflen = PAGE_SIZE;
+                ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
+                                pga[mapped].flag, bulk->bp_xid);
         }
 
-        rc = ptlrpc_register_bulk(desc);
-        if (rc)
-                GOTO(out3, rc);
+        /*
+         * Register the bulk first, because the reply could arrive out of order,
+         * and we want to be ready for the bulk data.
+         *
+         * The reference is released when brw_finish is complete.
+         *
+         * On error, we never do the brw_finish, so we handle all decrefs.
+         */
+        if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_READ_BULK)) {
+                CERROR("obd_fail_loc=%x, skipping register_bulk\n",
+                       OBD_FAIL_OSC_BRW_READ_BULK);
+        } else {
+                rc = ptlrpc_register_bulk(desc);
+                if (rc)
+                        GOTO(out_unmap, rc);
+        }
 
         request->rq_replen = lustre_msg_size(1, size);
         rc = ptlrpc_queue_wait(request);
         rc = ptlrpc_check_status(request, rc);
-        if (rc)
-                ptlrpc_abort_bulk(desc);
-        GOTO(out3, rc);
 
- out3:
-        list_for_each(tmp, &desc->b_page_list) {
-                struct ptlrpc_bulk_page *bulk;
-                bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link);
-                if (bulk->b_page != NULL)
-                        kunmap(bulk->b_page);
-        }
-        ptlrpc_free_bulk(desc);
- out2:
-        ptlrpc_free_req(request);
- out:
-        return rc;
-}
-
-static void brw_write_finish(struct ptlrpc_bulk_desc *desc, void *data)
-{
-        struct osc_brw_cb_data *cb_data = data;
-        int i;
-        ENTRY;
-
-        if (desc->b_flags & PTL_RPC_FL_INTR)
-                CERROR("got signal\n");
-
-        for (i = 0; i < desc->b_page_count; i++)
-                kunmap(cb_data->buf[i]);
+        /*
+         * XXX: If there is an error during the processing of the callback,
+         *      such as a timeout in a sleep that it performs, brw_finish
+         *      will never get called, and we'll leak the desc, fail to kunmap
+         *      things, cats will live with dogs.  One solution would be to
+         *      export brw_finish as osc_brw_finish, so that the timeout case
+         *      and its kin could call it for proper cleanup.  An alternative
+         *      would be for an error return from the callback to cause us to
+         *      clean up, but that doesn't help the truly async cases (like
+         *      LOV), which will immediately return from their PHASE_START
+         *      callback, before any such cleanup-requiring error condition can
+         *      be detected.
+         */
+        if (rc)
+                GOTO(out_req, rc);
 
-        (cb_data->callback)(desc, cb_data->cb_data);
+        /* Callbacks cause asynchronous handling. */
+        rc = callback(data, 0, CB_PHASE_START);
 
-        ptlrpc_free_bulk(desc);
-        ptlrpc_free_req(cb_data->req);
+out_req:
+        ptlrpc_req_finished(request);
+        RETURN(rc);
 
+        /* Clean up on error. */
+out_unmap:
+        while (mapped-- > 0)
+                kunmap(pga[mapped].pg);
         OBD_FREE(cb_data, sizeof(*cb_data));
-        EXIT;
+out_desc:
+        ptlrpc_bulk_decref(desc);
+        goto out_req;
 }
 
-static int osc_brw_write(struct lustre_handle *conn, obd_count num_oa,
-                         struct obdo **oa, obd_count *oa_bufs,
-                         struct page **pagearray, obd_size *count,
-                         obd_off *offset, obd_flag *flags,
-                         bulk_callback_t callback)
+static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *md,
+                         obd_count page_count, struct brw_page *pga,
+                         brw_callback_t callback, struct io_cb_data *data)
 {
-        struct ptlrpc_client *cl;
-        struct ptlrpc_connection *connection;
-        struct ptlrpc_request *request;
-        struct ptlrpc_bulk_desc *desc;
-        struct osc_obd *osc = &class_conn2obd(conn)->u.osc;
-        struct obd_ioobj ioo;
+        struct ptlrpc_connection *connection =
+                client_conn2cli(conn)->cl_import.imp_connection;
+        struct ptlrpc_request *request = NULL;
+        struct ptlrpc_bulk_desc *desc = NULL;
         struct ost_body *body;
-        struct niobuf_local *local;
+        struct niobuf_local *local = NULL;
         struct niobuf_remote *remote;
-        struct osc_brw_cb_data *cb_data;
-        long pages;
-        int rc, i, j, size[3] = {sizeof(*body)};
-        void *ptr1, *ptr2;
+        struct osc_brw_cb_data *cb_data = NULL;
+        int rc, j, size[3] = {sizeof(*body)};
+        void *iooptr, *nioptr;
+        int mapped = 0;
         ENTRY;
 
-        size[1] = num_oa * sizeof(ioo);
-        pages = 0;
-        for (i = 0; i < num_oa; i++)
-                pages += oa_bufs[i];
-        size[2] = pages * sizeof(*remote);
+        size[1] = sizeof(struct obd_ioobj);
+        size[2] = page_count * sizeof(*remote);
 
-        OBD_ALLOC(local, pages * sizeof(*local));
-        if (local == NULL)
+        request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_WRITE, 3, size,
+                                  NULL);
+        if (!request)
                 RETURN(-ENOMEM);
 
-        osc_con2cl(conn, &cl, &connection);
-        request = ptlrpc_prep_req2(cl, connection, &osc->osc_connh, 
-                                  OST_BRW, 3, size, NULL);
-        if (!request)
-                GOTO(out, rc = -ENOMEM);
         body = lustre_msg_buf(request->rq_reqmsg, 0);
-        body->data = OBD_BRW_WRITE;
-
-        ptr1 = lustre_msg_buf(request->rq_reqmsg, 1);
-        ptr2 = lustre_msg_buf(request->rq_reqmsg, 2);
-        for (pages = 0, i = 0; i < num_oa; i++) {
-                ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]);
-                for (j = 0; j < oa_bufs[i]; j++, pages++) {
-                        local[pages].addr = kmap(pagearray[pages]);
-                        local[pages].offset = offset[pages];
-                        local[pages].len = count[pages];
-                        ost_pack_niobuf(&ptr2, offset[pages], count[pages],
-                                        flags[pages], 0);
-                }
+
+        desc = ptlrpc_prep_bulk(connection);
+        if (!desc)
+                GOTO(out_req, rc = -ENOMEM);
+        desc->bd_portal = OSC_BULK_PORTAL;
+        desc->bd_cb = brw_finish;
+        OBD_ALLOC(cb_data, sizeof(*cb_data));
+        if (!cb_data)
+                GOTO(out_desc, rc = -ENOMEM);
+
+        cb_data->callback = callback;
+        cb_data->cb_data = data;
+        CDEBUG(D_PAGE, "data(%p)->desc = %p\n", data, desc);
+        data->desc = desc;
+        desc->bd_cb_data = cb_data;
+
+        iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
+        nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
+        ost_pack_ioo(&iooptr, md, page_count);
+        /* end almost identical to brw_read case */
+
+        OBD_ALLOC(local, page_count * sizeof(*local));
+        if (!local)
+                GOTO(out_cb, rc = -ENOMEM);
+
+        cb_data->obd_data = local;
+        cb_data->obd_size = page_count * sizeof(*local);
+
+        for (mapped = 0; mapped < page_count; mapped++) {
+                local[mapped].addr = kmap(pga[mapped].pg);
+
+                CDEBUG(D_INFO, "kmap(pg) = %p ; pg->flags = %lx ; pg->count = "
+                       "%d ; page %d of %d\n",
+                       local[mapped].addr, pga[mapped].pg->flags,
+                       page_count(pga[mapped].pg),
+                       mapped, page_count - 1);
+
+                local[mapped].offset = pga[mapped].off;
+                local[mapped].len = pga[mapped].count;
+                ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
+                                pga[mapped].flag, 0);
         }
 
-        size[1] = pages * sizeof(struct niobuf_remote);
+        size[1] = page_count * sizeof(*remote);
         request->rq_replen = lustre_msg_size(2, size);
-
         rc = ptlrpc_queue_wait(request);
         rc = ptlrpc_check_status(request, rc);
         if (rc)
-                GOTO(out2, rc);
-
-        ptr2 = lustre_msg_buf(request->rq_repmsg, 1);
-        if (ptr2 == NULL)
-                GOTO(out2, rc = -EINVAL);
-
-        if (request->rq_repmsg->buflens[1] !=
-            pages * sizeof(struct niobuf_remote)) {
-                CERROR("buffer length wrong (%d vs. %ld)\n",
-                       request->rq_repmsg->buflens[1],
-                       pages * sizeof(struct niobuf_remote));
-                GOTO(out2, rc = -EINVAL);
-        }
+                GOTO(out_unmap, rc);
 
-        desc = ptlrpc_prep_bulk(connection);
-        desc->b_portal = OSC_BULK_PORTAL;
-        if (callback) {
-                desc->b_cb = brw_write_finish;
-                OBD_ALLOC(cb_data, sizeof(*cb_data));
-                cb_data->buf = pagearray;
-                cb_data->callback = callback;
-                desc->b_cb_data = cb_data;
+        nioptr = lustre_msg_buf(request->rq_repmsg, 1);
+        if (!nioptr)
+                GOTO(out_unmap, rc = -EINVAL);
+
+        if (request->rq_repmsg->buflens[1] != size[1]) {
+                CERROR("buffer length wrong (%d vs. %d)\n",
+                       request->rq_repmsg->buflens[1], size[1]);
+                GOTO(out_unmap, rc = -EINVAL);
         }
 
-        for (pages = 0, i = 0; i < num_oa; i++) {
-                for (j = 0; j < oa_bufs[i]; j++, pages++) {
-                        struct ptlrpc_bulk_page *page;
+        for (j = 0; j < page_count; j++) {
+                struct ptlrpc_bulk_page *bulk;
 
-                        ost_unpack_niobuf(&ptr2, &remote);
+                ost_unpack_niobuf(&nioptr, &remote);
 
-                        page = ptlrpc_prep_bulk_page(desc);
-                        if (page == NULL)
-                                GOTO(out3, rc = -ENOMEM);
+                bulk = ptlrpc_prep_bulk_page(desc);
+                if (!bulk)
+                        GOTO(out_unmap, rc = -ENOMEM);
 
-                        page->b_buf = (void *)(unsigned long)local[pages].addr;
-                        page->b_buflen = local[pages].len;
-                        page->b_xid = remote->xid;
-                }
+                bulk->bp_buf = (void *)(unsigned long)local[j].addr;
+                bulk->bp_buflen = local[j].len;
+                bulk->bp_xid = remote->xid;
+                bulk->bp_page = pga[j].pg;
         }
 
-        if (desc->b_page_count != pages)
+        if (desc->bd_page_count != page_count)
                 LBUG();
 
+        if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_WRITE_BULK))
+                GOTO(out_unmap, rc = 0);
+
+        /* Our reference is released when brw_finish is complete. */
         rc = ptlrpc_send_bulk(desc);
+
+        /* XXX: Mike, same question as in osc_brw_read. */
         if (rc)
-                GOTO(out3, rc);
-        if (callback)
-                GOTO(out, rc);
+                GOTO(out_req, rc);
 
-        /* If there's no callback function, sleep here until complete. */
-        wait_event_interruptible(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
-        if (desc->b_flags & PTL_RPC_FL_INTR)
-                rc = -EINTR;
+        /* Callbacks cause asynchronous handling. */
+        rc = callback(data, 0, CB_PHASE_START);
 
-        GOTO(out3, rc);
+out_req:
+        ptlrpc_req_finished(request);
+        RETURN(rc);
 
- out3:
-        ptlrpc_free_bulk(desc);
- out2:
-        ptlrpc_free_req(request);
-        for (pages = 0, i = 0; i < num_oa; i++)
-                for (j = 0; j < oa_bufs[i]; j++, pages++)
-                        kunmap(pagearray[pages]);
- out:
-        OBD_FREE(local, pages * sizeof(*local));
+        /* Clean up on error. */
+out_unmap:
+        while (mapped-- > 0)
+                kunmap(pga[mapped].pg);
 
-        return rc;
+        OBD_FREE(local, page_count * sizeof(*local));
+out_cb:
+        OBD_FREE(cb_data, sizeof(*cb_data));
+out_desc:
+        ptlrpc_bulk_decref(desc);
+        goto out_req;
 }
 
-static int osc_brw(int cmd, struct lustre_handle *conn, obd_count num_oa,
-                   struct obdo **oa, obd_count *oa_bufs, struct page **buf,
-                   obd_size *count, obd_off *offset, obd_flag *flags,
-                   void *callback)
+static int osc_brw(int cmd, struct lustre_handle *conn,
+                   struct lov_stripe_md *md, obd_count page_count,
+                   struct brw_page *pga, brw_callback_t callback,
+                   struct io_cb_data *data)
 {
-        if (num_oa != 1)
-                LBUG();
+        ENTRY;
+
+        while (page_count) {
+                obd_count pages_per_brw;
+                int rc;
+
+                if (page_count > PTL_MD_MAX_IOV)
+                        pages_per_brw = PTL_MD_MAX_IOV;
+                else
+                        pages_per_brw = page_count;
+
+                if (cmd & OBD_BRW_WRITE)
+                        rc = osc_brw_write(conn, md, pages_per_brw, pga,
+                                           callback, data);
+                else
+                        rc = osc_brw_read(conn, md, pages_per_brw, pga,
+                                          callback, data);
 
-        if (cmd & OBD_BRW_WRITE)
-                return osc_brw_write(conn, num_oa, oa, oa_bufs, buf, count,
-                                     offset, flags, (bulk_callback_t)callback);
-        else
-                return osc_brw_read(conn, num_oa, oa, oa_bufs, buf, count,
-                                    offset, flags, (bulk_callback_t)callback);
+                if (rc != 0)
+                        RETURN(rc);
+
+                page_count -= pages_per_brw;
+                pga += pages_per_brw;
+        }
+        RETURN(0);
 }
 
-static int osc_enqueue(struct lustre_handle *oconn,
-                       struct lustre_handle *parent_lock, __u64 *res_id,
+static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *lsm,
+                       struct lustre_handle *parent_lock,
                        __u32 type, void *extentp, int extent_len, __u32 mode,
                        int *flags, void *callback, void *data, int datalen,
                        struct lustre_handle *lockh)
 {
-        struct obd_device *obddev = class_conn2obd(oconn);
-        struct osc_obd *osc = &obddev->u.osc;
-        struct ptlrpc_connection *conn;
-        struct ptlrpc_client *cl;
+        __u64 res_id[RES_NAME_SIZE] = { lsm->lsm_object_id };
+        struct obd_device *obddev = class_conn2obd(connh);
         struct ldlm_extent *extent = extentp;
         int rc;
-        __u32 mode2;
+        ENTRY;
 
-        /* Filesystem locks are given a bit of special treatment: first we
+        /* Filesystem locks are given a bit of special treatment: if
+         * this is not a file size lock (which has end == -1), we
          * fixup the lock to start and end on page boundaries. */
-        extent->start &= PAGE_MASK;
-        extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
+        if (extent->end != OBD_OBJECT_EOF) {
+                extent->start &= PAGE_MASK;
+                extent->end = (extent->end + PAGE_SIZE - 1) & PAGE_MASK;
+        }
 
         /* Next, search for already existing extent locks that will cover us */
-        osc_con2dlmcl(oconn, &cl, &conn);
         rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
                              sizeof(extent), mode, lockh);
-        if (rc == 1) {
+        if (rc == 1)
                 /* We already have a lock, and it's referenced */
-                return 0;
-        }
-
-        /* Next, search for locks that we can upgrade (if we're trying to write)
-         * or are more than we need (if we're trying to read).  Because the VFS
-         * and page cache already protect us locally, lots of readers/writers
-         * can share a single PW lock. */
-        if (mode == LCK_PW)
-                mode2 = LCK_PR;
-        else
-                mode2 = LCK_PW;
-
-        rc = ldlm_lock_match(obddev->obd_namespace, res_id, type, extent,
-                             sizeof(extent), mode2, lockh);
-        if (rc == 1) {
-                int flags;
-                /* FIXME: This is not incredibly elegant, but it might
-                 * be more elegant than adding another parameter to
-                 * lock_match.  I want a second opinion. */
-                ldlm_lock_addref(lockh, mode);
-                ldlm_lock_decref(lockh, mode2);
-
-                if (mode == LCK_PR)
-                        return 0;
-
-                rc = ldlm_cli_convert(cl, lockh, &osc->osc_connh, 
-                                      mode, &flags);
-                if (rc)
-                        LBUG();
-
-                return rc;
+                RETURN(ELDLM_OK);
+
+        /* If we're trying to read, we also search for an existing PW lock.  The
+         * VFS and page cache already protect us locally, so lots of readers/
+         * writers can share a single PW lock.
+         *
+         * There are problems with conversion deadlocks, so instead of
+         * converting a read lock to a write lock, we'll just enqueue a new
+         * one.
+         *
+         * At some point we should cancel the read lock instead of making them
+         * send us a blocking callback, but there are problems with canceling
+         * locks out from other users right now, too. */
+
+        if (mode == LCK_PR) {
+                rc = ldlm_lock_match(obddev->obd_namespace, res_id, type,
+                                     extent, sizeof(extent), LCK_PW, lockh);
+                if (rc == 1) {
+                        /* FIXME: This is not incredibly elegant, but it might
+                         * be more elegant than adding another parameter to
+                         * lock_match.  I want a second opinion. */
+                        ldlm_lock_addref(lockh, LCK_PR);
+                        ldlm_lock_decref(lockh, LCK_PW);
+
+                        RETURN(ELDLM_OK);
+                }
         }
 
-        rc = ldlm_cli_enqueue(cl, conn, &osc->osc_connh, 
-                              NULL, obddev->obd_namespace,
-                              parent_lock, res_id, type, extent, sizeof(extent),
-                              mode, flags, callback, data, datalen, lockh);
-        return rc;
+        rc = ldlm_cli_enqueue(connh, NULL, obddev->obd_namespace, parent_lock,
+                              res_id, type, extent, sizeof(extent), mode, flags,
+                              ldlm_completion_ast, callback, data, datalen,
+                              lockh);
+        RETURN(rc);
 }
 
-static int osc_cancel(struct lustre_handle *oconn, __u32 mode,
-                      struct lustre_handle *lockh)
+static int osc_cancel(struct lustre_handle *oconn, struct lov_stripe_md *md,
+                      __u32 mode, struct lustre_handle *lockh)
 {
         ENTRY;
 
@@ -747,102 +712,23 @@ static int osc_cancel(struct lustre_handle *oconn, __u32 mode,
         RETURN(0);
 }
 
-static int osc_setup(struct obd_device *obddev, obd_count len, void *buf)
+static int osc_cancel_unused(struct lustre_handle *connh,
+                             struct lov_stripe_md *lsm, int flags)
 {
-        struct obd_ioctl_data* data = buf;
-        struct osc_obd *osc = &obddev->u.osc;
-        char server_uuid[37];
-        int rc;
-        ENTRY;
+        struct obd_device *obddev = class_conn2obd(connh);
+        __u64 res_id[RES_NAME_SIZE] = { lsm->lsm_object_id };
 
-        if (data->ioc_inllen1 < 1) {
-                CERROR("osc setup requires a TARGET UUID\n");
-                RETURN(-EINVAL);
-        }
-
-        if (data->ioc_inllen1 > 37) {
-                CERROR("osc TARGET UUID must be less than 38 characters\n");
-                RETURN(-EINVAL);
-        }
-
-        if (data->ioc_inllen2 < 1) {
-                CERROR("osc setup requires a SERVER UUID\n");
-                RETURN(-EINVAL);
-        }
-
-        if (data->ioc_inllen2 > 37) {
-                CERROR("osc SERVER UUID must be less than 38 characters\n");
-                RETURN(-EINVAL);
-        }
-
-        memcpy(osc->osc_target_uuid, data->ioc_inlbuf1, data->ioc_inllen1);
-        memcpy(server_uuid, data->ioc_inlbuf2, MIN(data->ioc_inllen2,
-                                                   sizeof(server_uuid)));
-
-        osc->osc_conn = ptlrpc_uuid_to_connection(server_uuid);
-        if (!osc->osc_conn)
-                RETURN(-ENOENT);
-
-        obddev->obd_namespace =
-                ldlm_namespace_new("osc", LDLM_NAMESPACE_CLIENT);
-        if (obddev->obd_namespace == NULL)
-                GOTO(out_conn, rc = -ENOMEM);
-
-        OBD_ALLOC(osc->osc_client, sizeof(*osc->osc_client));
-        if (osc->osc_client == NULL)
-                GOTO(out_ns, rc = -ENOMEM);
-
-        OBD_ALLOC(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
-        if (osc->osc_ldlm_client == NULL)
-                GOTO(out_client, rc = -ENOMEM);
-
-        ptlrpc_init_client(NULL, NULL, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
-                           osc->osc_client);
-        ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
-                           osc->osc_ldlm_client);
-        osc->osc_client->cli_name = "osc";
-        osc->osc_ldlm_client->cli_name = "ldlm";
-
-        MOD_INC_USE_COUNT;
-        RETURN(0);
-
- out_client:
-        OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
- out_ns:
-        ldlm_namespace_free(obddev->obd_namespace);
- out_conn:
-        ptlrpc_put_connection(osc->osc_conn);
-        return rc;
+        return ldlm_cli_cancel_unused(obddev->obd_namespace, res_id, flags);
 }
 
-static int osc_cleanup(struct obd_device * obddev)
-{
-        struct osc_obd *osc = &obddev->u.osc;
-
-        ldlm_namespace_free(obddev->obd_namespace);
-
-        ptlrpc_cleanup_client(osc->osc_client);
-        OBD_FREE(osc->osc_client, sizeof(*osc->osc_client));
-        ptlrpc_cleanup_client(osc->osc_ldlm_client);
-        OBD_FREE(osc->osc_ldlm_client, sizeof(*osc->osc_ldlm_client));
-        ptlrpc_put_connection(osc->osc_conn);
-
-        MOD_DEC_USE_COUNT;
-        return 0;
-}
-
-#if 0
-static int osc_statfs(struct lustre_handle *conn, struct statfs *sfs);
+static int osc_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
 {
         struct ptlrpc_request *request;
-        struct ptlrpc_client *cl;
-        struct ptlrpc_connection *connection;
-        struct obd_statfs *osfs;
         int rc, size = sizeof(*osfs);
         ENTRY;
 
-        osc_con2cl(conn, &cl, &connection);
-        request = ptlrpc_prep_req(cl, connection, OST_STATFS, 0, NULL, NULL);
+        request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_STATFS, 0, NULL,
+                                  NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -850,39 +736,137 @@ static int osc_statfs(struct lustre_handle *conn, struct statfs *sfs);
 
         rc = ptlrpc_queue_wait(request);
         rc = ptlrpc_check_status(request, rc);
-        if (rc)
+        if (rc) {
+                CERROR("%s failed: rc = %d\n", __FUNCTION__, rc);
                 GOTO(out, rc);
+        }
 
-        osfs = lustre_msg_buf(request->rq_repmsg, 0);
-        obd_statfs_unpack(sfs, osfs);
+        obd_statfs_unpack(osfs, lustre_msg_buf(request->rq_repmsg, 0));
 
         EXIT;
  out:
-        ptlrpc_free_req(request);
-        return 0;
+        ptlrpc_req_finished(request);
+        return rc;
 }
-#endif
 
+static int osc_iocontrol(long cmd, struct lustre_handle *conn, int len,
+                         void *karg, void *uarg)
+{
+        struct obd_device *obddev = class_conn2obd(conn);
+        struct obd_ioctl_data *data = karg;
+        int err = 0;
+        ENTRY;
+
+        switch (cmd) {
+        case IOC_LDLM_TEST: {
+                err = ldlm_test(obddev, conn);
+                CERROR("-- done err %d\n", err);
+                GOTO(out, err);
+        }
+        case IOC_LDLM_REGRESS_START: {
+                unsigned int numthreads = 1;
+                unsigned int numheld = 10;
+                unsigned int numres = 10;
+                unsigned int numext = 10;
+                char *parse;
+
+                if (data->ioc_inllen1) {
+                        parse = data->ioc_inlbuf1;
+                        if (*parse != '\0') {
+                                while(isspace(*parse)) parse++;
+                                numthreads = simple_strtoul(parse, &parse, 0);
+                                while(isspace(*parse)) parse++;
+                        }
+                        if (*parse != '\0') {
+                                while(isspace(*parse)) parse++;
+                                numheld = simple_strtoul(parse, &parse, 0);
+                                while(isspace(*parse)) parse++;
+                        }
+                        if (*parse != '\0') {
+                                while(isspace(*parse)) parse++;
+                                numres = simple_strtoul(parse, &parse, 0);
+                                while(isspace(*parse)) parse++;
+                        }
+                        if (*parse != '\0') {
+                                while(isspace(*parse)) parse++;
+                                numext = simple_strtoul(parse, &parse, 0);
+                                while(isspace(*parse)) parse++;
+                        }
+                }
+
+                err = ldlm_regression_start(obddev, conn, numthreads,
+                                numheld, numres, numext);
+
+                CERROR("-- done err %d\n", err);
+                GOTO(out, err);
+        }
+        case IOC_LDLM_REGRESS_STOP: {
+                err = ldlm_regression_stop();
+                CERROR("-- done err %d\n", err);
+                GOTO(out, err);
+        }
+        case IOC_OSC_REGISTER_LOV: {
+                if (obddev->u.cli.cl_containing_lov)
+                        GOTO(out, err = -EALREADY);
+                obddev->u.cli.cl_containing_lov = (struct obd_device *)karg;
+                GOTO(out, err);
+        }
+
+        default:
+                GOTO(out, err = -ENOTTY);
+        }
+out:
+        return err;
+}
+
+int osc_attach(struct obd_device *dev, 
+                   obd_count len, void *data)
+{
+        int rc;
+        rc = lprocfs_reg_obd(dev, (lprocfs_vars_t*)status_var_nm_1, (void*)dev);
+        return rc; 
+}
+
+int osc_detach(struct obd_device *dev)
+{
+        int rc;
+        rc = lprocfs_dereg_obd(dev);
+        return rc;
+
+}
 struct obd_ops osc_obd_ops = {
-        o_setup:   osc_setup,
-        o_cleanup: osc_cleanup,
-        o_create: osc_create,
-        o_destroy: osc_destroy,
-        o_getattr: osc_getattr,
-        o_setattr: osc_setattr,
-        o_open: osc_open,
-        o_close: osc_close,
-        o_connect: osc_connect,
-        o_disconnect: osc_disconnect,
-        o_brw: osc_brw,
-        o_punch: osc_punch,
-        o_enqueue: osc_enqueue,
-        o_cancel: osc_cancel
+        o_attach:       osc_attach,
+        o_detach:       osc_detach,
+        o_setup:        client_obd_setup,
+        o_cleanup:      client_obd_cleanup,
+        o_statfs:       osc_statfs,
+        o_create:       osc_create,
+        o_destroy:      osc_destroy,
+        o_getattr:      osc_getattr,
+        o_setattr:      osc_setattr,
+        o_open:         osc_open,
+        o_close:        osc_close,
+        o_connect:      client_obd_connect,
+        o_disconnect:   client_obd_disconnect,
+        o_brw:          osc_brw,
+        o_punch:        osc_punch,
+        o_enqueue:      osc_enqueue,
+        o_cancel:       osc_cancel,
+        o_cancel_unused: osc_cancel_unused,
+        o_iocontrol:    osc_iocontrol
 };
 
 static int __init osc_init(void)
 {
-        return class_register_type(&osc_obd_ops, LUSTRE_OSC_NAME);
+        int rc;
+        
+        rc = class_register_type(&osc_obd_ops,
+                                 (lprocfs_vars_t*)status_class_var, 
+                                 LUSTRE_OSC_NAME);
+        if (rc)
+                RETURN(rc);
+        return 0;
+       
 }
 
 static void __exit osc_exit(void)
@@ -890,7 +874,7 @@ static void __exit osc_exit(void)
         class_unregister_type(LUSTRE_OSC_NAME);
 }
 
-MODULE_AUTHOR("Peter J. Braam <braam@clusterfs.com>");
+MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC) v1.0");
 MODULE_LICENSE("GPL");