Whamcloud - gitweb
land v0.9.1 on HEAD, in preparation for a 1.0.x branch
[fs/lustre-release.git] / lustre / ost / ost_handler.c
index 5f35758..74a6d1d 100644 (file)
@@ -30,7 +30,9 @@
  *  modules do not have a full method table.)
  */
 
-#define EXPORT_SYMTAB
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
 #define DEBUG_SUBSYSTEM S_OST
 
 #include <linux/module.h>
@@ -68,16 +70,14 @@ void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
         for (ack_lock = oti->oti_ack_locks, i = 0; i < 4; i++, ack_lock++) {
                 if (!ack_lock->mode)
                         break;
-                memcpy(&req->rq_ack_locks[i].lock, &ack_lock->lock,
-                       sizeof(req->rq_ack_locks[i].lock));
-                req->rq_ack_locks[i].mode = ack_lock->mode;
+                ldlm_put_lock_into_req(req, &ack_lock->lock, ack_lock->mode);
         }
 }
 
-static int ost_destroy(struct ptlrpc_request *req, struct obd_trans_info *oti)
+static int ost_destroy(struct obd_export *exp, struct ptlrpc_request *req, 
+                       struct obd_trans_info *oti)
 {
-        struct lustre_handle *conn = &req->rq_reqmsg->handle;
-        struct ost_body *body;
+        struct ost_body *body, *repbody;
         int rc, size = sizeof(*body);
         ENTRY;
 
@@ -85,19 +85,20 @@ static int ost_destroy(struct ptlrpc_request *req, struct obd_trans_info *oti)
         if (body == NULL)
                 RETURN(-EFAULT);
 
-        rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
+        rc = lustre_pack_reply(req, 1, &size, NULL);
         if (rc)
                 RETURN(rc);
 
         if (body->oa.o_valid & OBD_MD_FLCOOKIE)
                 oti->oti_logcookies = obdo_logcookie(&body->oa);
-        req->rq_status = obd_destroy(conn, &body->oa, NULL, oti);
+        repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
+        memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
+        req->rq_status = obd_destroy(exp, &body->oa, NULL, oti);
         RETURN(0);
 }
 
-static int ost_getattr(struct ptlrpc_request *req)
+static int ost_getattr(struct obd_export *exp, struct ptlrpc_request *req)
 {
-        struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
         struct ost_body *body, *repbody;
         int rc, size = sizeof(*body);
         ENTRY;
@@ -106,13 +107,13 @@ static int ost_getattr(struct ptlrpc_request *req)
         if (body == NULL)
                 RETURN(-EFAULT);
 
-        rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
+        rc = lustre_pack_reply(req, 1, &size, NULL);
         if (rc)
                 RETURN(rc);
 
         repbody = lustre_msg_buf (req->rq_repmsg, 0, sizeof(*repbody));
         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
-        req->rq_status = obd_getattr(conn, &repbody->oa, NULL);
+        req->rq_status = obd_getattr(exp, &repbody->oa, NULL);
         RETURN(0);
 }
 
@@ -122,7 +123,7 @@ static int ost_statfs(struct ptlrpc_request *req)
         int rc, size = sizeof(*osfs);
         ENTRY;
 
-        rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
+        rc = lustre_pack_reply(req, 1, &size, NULL);
         if (rc)
                 RETURN(rc);
 
@@ -135,29 +136,9 @@ static int ost_statfs(struct ptlrpc_request *req)
         RETURN(0);
 }
 
-static int ost_syncfs(struct ptlrpc_request *req)
-{
-        struct obd_statfs *osfs;
-        int rc, size = sizeof(*osfs);
-        ENTRY;
-
-        rc = lustre_pack_msg(0, &size, NULL, &req->rq_replen, &req->rq_repmsg);
-        if (rc)
-                RETURN(rc);
-
-        rc = obd_syncfs(req->rq_export);
-        if (rc) {
-                CERROR("ost: syncfs failed: rc %d\n", rc);
-                req->rq_status = rc;
-                RETURN(rc);
-        }
-
-        RETURN(0);
-}
-
-static int ost_open(struct ptlrpc_request *req, struct obd_trans_info *oti)
+static int ost_create(struct obd_export *exp, struct ptlrpc_request *req,
+                      struct obd_trans_info *oti)
 {
-        struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
         struct ost_body *body, *repbody;
         int rc, size = sizeof(*repbody);
         ENTRY;
@@ -166,19 +147,21 @@ static int ost_open(struct ptlrpc_request *req, struct obd_trans_info *oti)
         if (body == NULL)
                 RETURN(-EFAULT);
 
-        rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
+        rc = lustre_pack_reply(req, 1, &size, NULL);
         if (rc)
                 RETURN(rc);
 
         repbody = lustre_msg_buf (req->rq_repmsg, 0, sizeof(*repbody));
         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
-        req->rq_status = obd_open(conn, &repbody->oa, NULL, oti, NULL);
+        oti->oti_logcookies = obdo_logcookie(&repbody->oa);
+        req->rq_status = obd_create(exp, &repbody->oa, NULL, oti);
+        //obd_log_cancel(conn, NULL, 1, oti->oti_logcookies, 0);
         RETURN(0);
 }
 
-static int ost_close(struct ptlrpc_request *req, struct obd_trans_info *oti)
+static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req, 
+                     struct obd_trans_info *oti)
 {
-        struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
         struct ost_body *body, *repbody;
         int rc, size = sizeof(*repbody);
         ENTRY;
@@ -187,42 +170,23 @@ static int ost_close(struct ptlrpc_request *req, struct obd_trans_info *oti)
         if (body == NULL)
                 RETURN(-EFAULT);
 
-        rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
-        if (rc)
-                RETURN(rc);
-
-        repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
-        memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
-        req->rq_status = obd_close(conn, &repbody->oa, NULL, oti);
-        RETURN(0);
-}
-
-static int ost_create(struct ptlrpc_request *req, struct obd_trans_info *oti)
-{
-        struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
-        struct ost_body *body, *repbody;
-        int rc, size = sizeof(*repbody);
-        ENTRY;
-
-        body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
-        if (body == NULL)
-                RETURN(-EFAULT);
+        if ((body->oa.o_valid & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) !=
+            (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
+                RETURN(-EINVAL);
 
-        rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
+        rc = lustre_pack_reply(req, 1, &size, NULL);
         if (rc)
                 RETURN(rc);
 
-        repbody = lustre_msg_buf (req->rq_repmsg, 0, sizeof(*repbody));
+        repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
-        oti->oti_logcookies = obdo_logcookie(&repbody->oa);
-        req->rq_status = obd_create(conn, &repbody->oa, NULL, oti);
-        //obd_log_cancel(conn, NULL, 1, oti->oti_logcookies, 0);
+        req->rq_status = obd_punch(exp, &repbody->oa, NULL, repbody->oa.o_size,
+                                   repbody->oa.o_blocks, oti);
         RETURN(0);
 }
 
-static int ost_punch(struct ptlrpc_request *req, struct obd_trans_info *oti)
+static int ost_sync(struct obd_export *exp, struct ptlrpc_request *req)
 {
-        struct lustre_handle *conn = (struct lustre_handle *)req->rq_reqmsg;
         struct ost_body *body, *repbody;
         int rc, size = sizeof(*repbody);
         ENTRY;
@@ -231,24 +195,20 @@ static int ost_punch(struct ptlrpc_request *req, struct obd_trans_info *oti)
         if (body == NULL)
                 RETURN(-EFAULT);
 
-        if ((body->oa.o_valid & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) !=
-            (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
-                RETURN(-EINVAL);
-
-        rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
+        rc = lustre_pack_reply(req, 1, &size, NULL);
         if (rc)
                 RETURN(rc);
 
         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
-        req->rq_status = obd_punch(conn, &repbody->oa, NULL, repbody->oa.o_size,
-                                   repbody->oa.o_blocks, oti);
+        req->rq_status = obd_sync(exp, &repbody->oa, NULL, repbody->oa.o_size,
+                                  repbody->oa.o_blocks);
         RETURN(0);
 }
 
-static int ost_setattr(struct ptlrpc_request *req, struct obd_trans_info *oti)
+static int ost_setattr(struct obd_export *exp, struct ptlrpc_request *req, 
+                       struct obd_trans_info *oti)
 {
-        struct lustre_handle *conn = &req->rq_reqmsg->handle;
         struct ost_body *body, *repbody;
         int rc, size = sizeof(*repbody);
         ENTRY;
@@ -257,14 +217,14 @@ static int ost_setattr(struct ptlrpc_request *req, struct obd_trans_info *oti)
         if (body == NULL)
                 RETURN(-EFAULT);
 
-        rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
+        rc = lustre_pack_reply(req, 1, &size, NULL);
         if (rc)
                 RETURN(rc);
 
         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
 
-        req->rq_status = obd_setattr(conn, &repbody->oa, NULL, oti);
+        req->rq_status = obd_setattr(exp, &repbody->oa, NULL, oti);
         RETURN(0);
 }
 
@@ -377,9 +337,9 @@ static void free_per_page_niobufs (int npages, struct niobuf_remote *pp_rnb,
 }
 
 #if CHECKSUM_BULK
-__u64 ost_checksum_bulk (struct ptlrpc_bulk_desc *desc)
+obd_count ost_checksum_bulk(struct ptlrpc_bulk_desc *desc)
 {
-        __u64             cksum = 0;
+        obd_count cksum = 0;
         struct ptlrpc_bulk_page *bp;
 
         list_for_each_entry(bp, &desc->bd_page_list, bp_link) {
@@ -387,6 +347,8 @@ __u64 ost_checksum_bulk (struct ptlrpc_bulk_desc *desc)
                              bp->bp_buflen);
                 kunmap(bp->bp_page);
         }
+
+        return cksum;
 }
 #endif
 
@@ -400,6 +362,7 @@ static int ost_brw_read(struct ptlrpc_request *req)
         struct ost_body         *body, *repbody;
         struct l_wait_info       lwi;
         struct obd_trans_info    oti = { 0 };
+        char                     str[PTL_NALFMT_SIZE];
         int                      size[1] = { sizeof(*body) };
         int                      comms_error = 0;
         int                      niocount;
@@ -443,7 +406,7 @@ static int ost_brw_read(struct ptlrpc_request *req)
         }
 
         size[0] = sizeof(*body);
-        rc = lustre_pack_msg(1, size, NULL, &req->rq_replen, &req->rq_repmsg);
+        rc = lustre_pack_reply(req, 1, size, NULL);
         if (rc)
                 GOTO(out, rc);
 
@@ -508,8 +471,8 @@ static int ost_brw_read(struct ptlrpc_request *req)
                         }
                 } else {
                         DEBUG_REQ(D_ERROR, req, "bulk PUT failed: rc %d\n", rc);
-               }
-               comms_error = rc != 0;
+                }
+                comms_error = rc != 0;
         }
 
         /* Must commit after prep above in all cases */
@@ -521,7 +484,7 @@ static int ost_brw_read(struct ptlrpc_request *req)
 
 #if CHECKSUM_BULK
         if (rc == 0) {
-                repbody->oa.o_rdev = ost_checksum_bulk(desc);
+                repbody->oa.o_nlink = ost_checksum_bulk(desc);
                 repbody->oa.o_valid |= OBD_MD_FLCKSUM;
         }
 #endif
@@ -546,11 +509,26 @@ static int ost_brw_read(struct ptlrpc_request *req)
                         /* reply out callback would free */
                         OBD_FREE(req->rq_repmsg, req->rq_replen);
                 }
-                CERROR("bulk IO comms error: evicting %s@%s nid "LPU64"\n",
-                       req->rq_export->exp_client_uuid.uuid,
-                       req->rq_connection->c_remote_uuid.uuid,
-                       req->rq_connection->c_peer.peer_nid);
-                ptlrpc_fail_export(req->rq_export);
+                if (req->rq_reqmsg->conn_cnt == req->rq_export->exp_conn_cnt) {
+                        CERROR("bulk IO comms error: "
+                               "evicting %s@%s nid "LPX64" (%s)\n",
+                               req->rq_export->exp_client_uuid.uuid,
+                               req->rq_connection->c_remote_uuid.uuid,
+                               req->rq_connection->c_peer.peer_nid,
+                               portals_nid2str(req->rq_connection->c_peer.peer_ni->pni_number,
+                                               req->rq_connection->c_peer.peer_nid,
+                                               str));
+                        ptlrpc_fail_export(req->rq_export);
+                } else {
+                        CERROR("ignoring bulk IO comms error: "
+                               "client reconnected %s@%s nid "LPX64" (%s)\n",  
+                               req->rq_export->exp_client_uuid.uuid,
+                               req->rq_connection->c_remote_uuid.uuid,
+                               req->rq_connection->c_peer.peer_nid,
+                               portals_nid2str(req->rq_connection->c_peer.peer_ni->pni_number,
+                                               req->rq_connection->c_peer.peer_nid,
+                                               str));
+                }
         }
 
         RETURN(rc);
@@ -570,6 +548,7 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
         int                      objcount, niocount, npages;
         int                      comms_error = 0;
         int                      rc, rc2, swab, i, j;
+        char                    str[PTL_NALFMT_SIZE];
         ENTRY;
 
         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
@@ -619,8 +598,7 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
         }
 
         size[1] = niocount * sizeof(*rcs);
-        rc = lustre_pack_msg(2, size, NULL, &req->rq_replen,
-                             &req->rq_repmsg);
+        rc = lustre_pack_reply(req, 2, size, NULL);
         if (rc != 0)
                 GOTO(out, rc);
         rcs = lustre_msg_buf(req->rq_repmsg, 1, niocount * sizeof(*rcs));
@@ -667,9 +645,9 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
                                 ptlrpc_abort_bulk(desc);
                         }
                 } else {
-                       DEBUG_REQ(D_ERROR, req, "bulk GET failed: rc %d\n", rc);
-               }
-               comms_error = rc != 0;
+                        DEBUG_REQ(D_ERROR, req, "bulk GET failed: rc %d\n", rc);
+                }
+                comms_error = rc != 0;
         }
 
         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
@@ -678,22 +656,24 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
 #if CHECKSUM_BULK
         if (rc == 0 && (body->oa.o_valid & OBD_MD_FLCKSUM) != 0) {
                 static int cksum_counter;
-                __u64 client_cksum = body->oa.o_rdev;
-                __u64 cksum = ost_checksum_bulk(desc);
+                obd_count client_cksum = body->oa.o_nlink;
+                obd_count cksum = ost_checksum_bulk(desc);
 
+                portals_nid2str(req->rq_connection->c_peer.peer_ni->pni_number,
+                                req->rq_connection->c_peer.peer_nid, str);
                 if (client_cksum != cksum) {
-                        CERROR("Bad checksum: client "LPX64", server "LPX64
-                               ", client NID "LPX64"\n", client_cksum, cksum,
-                               req->rq_connection->c_peer.peer_nid);
+                        CERROR("Bad checksum: client %x, server %x, client NID "
+                               LPX64" (%s)\n", client_cksum, cksum,
+                               req->rq_connection->c_peer.peer_nid, str);
                         cksum_counter = 1;
-                        repbody->oa.o_rdev = cksum;
+                        repbody->oa.o_nlink = cksum;
                 } else {
                         cksum_counter++;
                         if ((cksum_counter & (-cksum_counter)) == cksum_counter)
-                                CERROR("Checksum %d from "LPX64": "LPX64" OK\n",
-                                        cksum_counter,
-                                        req->rq_connection->c_peer.peer_nid,
-                                        cksum);
+                                CWARN("Checksum %u from "LPX64": %x OK\n",
+                                      cksum_counter,
+                                      req->rq_connection->c_peer.peer_nid,
+                                      cksum);
                 }
         }
 #endif
@@ -740,22 +720,36 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
                         /* reply out callback would free */
                         OBD_FREE (req->rq_repmsg, req->rq_replen);
                 }
-                CERROR("bulk IO comms error: evicting %s@%s nid "LPU64"\n",
-                       req->rq_export->exp_client_uuid.uuid,
-                       req->rq_connection->c_remote_uuid.uuid,
-                       req->rq_connection->c_peer.peer_nid);
-                ptlrpc_fail_export(req->rq_export);
+                if (req->rq_reqmsg->conn_cnt == req->rq_export->exp_conn_cnt) {
+                        CERROR("bulk IO comms error: "
+                               "evicting %s@%s nid "LPX64" (%s)\n",
+                               req->rq_export->exp_client_uuid.uuid,
+                               req->rq_connection->c_remote_uuid.uuid,
+                               req->rq_connection->c_peer.peer_nid,
+                               portals_nid2str(req->rq_connection->c_peer.peer_ni->pni_number,
+                                               req->rq_connection->c_peer.peer_nid,
+                                               str));
+                        ptlrpc_fail_export(req->rq_export);
+                } else {
+                        CERROR("ignoring bulk IO comms error: "
+                               "client reconnected %s@%s nid "LPX64" (%s)\n",
+                               req->rq_export->exp_client_uuid.uuid,
+                               req->rq_connection->c_remote_uuid.uuid,
+                               req->rq_connection->c_peer.peer_nid,
+                               portals_nid2str(req->rq_connection->c_peer.peer_ni->pni_number,
+                                               req->rq_connection->c_peer.peer_nid,
+                                               str));
+                }        
         }
         RETURN(rc);
 }
 
 static int ost_san_brw(struct ptlrpc_request *req, int cmd)
 {
-        struct niobuf_remote *remote_nb, *res_nb;
+        struct niobuf_remote *remote_nb, *res_nb, *pp_rnb;
         struct obd_ioobj *ioo;
         struct ost_body *body, *repbody;
-        int rc, i, j, objcount, niocount, size[2] = {sizeof(*body)};
-        int n;
+        int rc, i, objcount, niocount, size[2] = {sizeof(*body)}, npages;
         int swab;
         ENTRY;
 
@@ -792,33 +786,21 @@ static int ost_san_brw(struct ptlrpc_request *req, int cmd)
                         lustre_swab_niobuf_remote (&remote_nb[i]);
         }
 
-        for (i = n = 0; i < objcount; i++) {
-                for (j = 0; j < ioo[i].ioo_bufcnt; j++, n++) {
-                        if (remote_nb[n].len == 0) {
-                                CERROR("zero len BRW: objid "LPX64" buf %u\n",
-                                       ioo[i].ioo_id, j);
-                                GOTO(out, rc = -EINVAL);
-                        }
-                        if (j && remote_nb[n].offset <= remote_nb[n-1].offset) {
-                                CERROR("unordered BRW: objid "LPX64
-                                       " buf %u offset "LPX64" <= "LPX64"\n",
-                                       ioo[i].ioo_id, j, remote_nb[n].offset,
-                                       remote_nb[n-1].offset);
-                                GOTO(out, rc = -EINVAL);
-                        }
-                }
-        }
-
-        size[1] = niocount * sizeof(*remote_nb);
-        rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg);
+        /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */
+        npages = get_per_page_niobufs(ioo, objcount,remote_nb,niocount,&pp_rnb);
+        if (npages < 0)
+                GOTO (out, rc = npages);
+        size[1] = npages * sizeof(*pp_rnb);
+        rc = lustre_pack_reply(req, 2, size, NULL);
         if (rc)
-                GOTO(out, rc);
+                GOTO(out_pp_rnb, rc);
 
         req->rq_status = obd_san_preprw(cmd, req->rq_export, &body->oa,
-                                        objcount, ioo, niocount, remote_nb);
+                                        objcount, ioo, npages, pp_rnb);
 
         if (req->rq_status)
-                GOTO(out, rc = 0);
+                GOTO(out_pp_rnb, rc = 0);
 
         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
@@ -826,6 +808,8 @@ static int ost_san_brw(struct ptlrpc_request *req, int cmd)
         res_nb = lustre_msg_buf(req->rq_repmsg, 1, size[1]);
         memcpy(res_nb, remote_nb, size[1]);
         rc = 0;
+out_pp_rnb:
+        free_per_page_niobufs(npages, pp_rnb, remote_nb);
 out:
         if (rc) {
                 OBD_FREE(req->rq_repmsg, req->rq_replen);
@@ -838,59 +822,58 @@ out:
         return rc;
 }
 
-static int ost_log_cancel(struct ptlrpc_request *req)
+
+static int ost_set_info(struct obd_export *exp, struct ptlrpc_request *req)
 {
-        struct lustre_handle *conn;
-        struct llog_cookie *logcookies;
-        int num_cookies, rc = 0;
+        char *key;
+        int keylen, rc = 0;
         ENTRY;
 
-        logcookies = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*logcookies));
-        if (logcookies == NULL) {
-                DEBUG_REQ(D_HA, req, "no cookies sent");
+        key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
+        if (key == NULL) {
+                DEBUG_REQ(D_HA, req, "no set_info key");
                 RETURN(-EFAULT);
         }
-        num_cookies = req->rq_reqmsg->buflens[0] / sizeof(*logcookies);
+        keylen = req->rq_reqmsg->buflens[0];
 
-        /* workaround until we don't need to send replies */
-        rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
+        rc = lustre_pack_reply(req, 0, NULL, NULL);
         if (rc)
                 RETURN(rc);
-        req->rq_repmsg->status = 0;
-        /* end workaround */
-
-        conn = (struct lustre_handle *)&req->rq_reqmsg->handle;
-        rc = obd_log_cancel(conn, NULL, num_cookies, logcookies, 0);
 
+        rc = obd_set_info(exp, keylen, key, 0, NULL);
+        req->rq_repmsg->status = 0;
         RETURN(rc);
 }
 
-static int ost_set_info(struct ptlrpc_request *req)
+static int ost_get_info(struct obd_export *exp, struct ptlrpc_request *req)
 {
-        struct lustre_handle *conn;
         char *key;
-        int keylen, rc = 0;
+        int keylen, rc = 0, size = sizeof(obd_id);
+        obd_id *reply;
         ENTRY;
 
         key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
         if (key == NULL) {
-                DEBUG_REQ(D_HA, req, "no set_info key");
+                DEBUG_REQ(D_HA, req, "no get_info key");
                 RETURN(-EFAULT);
         }
         keylen = req->rq_reqmsg->buflens[0];
 
-        rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
+        if (keylen < strlen("last_id") || memcmp(key, "last_id", 7) != 0)
+                RETURN(-EPROTO);
+
+        rc = lustre_pack_reply(req, 1, &size, NULL);
         if (rc)
                 RETURN(rc);
 
-        conn = (struct lustre_handle *)&req->rq_reqmsg->handle;
-        rc = obd_set_info(conn, keylen, key, 0, NULL);
+        reply = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*reply));
+        rc = obd_get_info(exp, keylen, key, &size, reply);
         req->rq_repmsg->status = 0;
         RETURN(rc);
 }
 
-static int filter_recovery_request(struct ptlrpc_request *req,
-                                   struct obd_device *obd, int *process)
+static int ost_filter_recovery_request(struct ptlrpc_request *req,
+                                       struct obd_device *obd, int *process)
 {
         switch (req->rq_reqmsg->opc) {
         case OST_CONNECT: /* This will never get here, but for completeness. */
@@ -899,13 +882,11 @@ static int filter_recovery_request(struct ptlrpc_request *req,
                RETURN(0);
 
         case OBD_PING:
-        case OST_CLOSE:
         case OST_CREATE:
         case OST_DESTROY:
-        case OST_OPEN:
         case OST_PUNCH:
         case OST_SETATTR:
-        case OST_SYNCFS:
+        case OST_SYNC:
         case OST_WRITE:
         case OBD_LOG_CANCEL:
         case LDLM_ENQUEUE:
@@ -928,6 +909,7 @@ static int ost_handle(struct ptlrpc_request *req)
         struct obd_trans_info trans_info = { 0, };
         struct obd_trans_info *oti = &trans_info;
         int should_process, fail = OBD_FAIL_OST_ALL_REPLY_NET, rc = 0;
+        struct obd_export *exp = NULL;
         ENTRY;
 
         LASSERT(current->journal_info == NULL);
@@ -936,14 +918,16 @@ static int ost_handle(struct ptlrpc_request *req)
                 struct obd_device *obd;
                 int abort_recovery, recovering;
 
-                if (req->rq_export == NULL) {
+                exp = req->rq_export;
+
+                if (exp == NULL) {
                         CDEBUG(D_HA, "operation %d on unconnected OST\n",
                                req->rq_reqmsg->opc);
                         req->rq_status = -ENOTCONN;
                         GOTO(out, rc = -ENOTCONN);
                 }
 
-                obd = req->rq_export->exp_obd;
+                obd = exp->exp_obd;
 
                 /* Check for aborted recovery. */
                 spin_lock_bh(&obd->obd_processing_task_lock);
@@ -953,7 +937,8 @@ static int ost_handle(struct ptlrpc_request *req)
                 if (abort_recovery) {
                         target_abort_recovery(obd);
                 } else if (recovering) {
-                        rc = filter_recovery_request(req, obd, &should_process);
+                        rc = ost_filter_recovery_request(req, obd,
+                                                         &should_process);
                         if (rc || !should_process)
                                 RETURN(rc);
                 }
@@ -978,32 +963,22 @@ static int ost_handle(struct ptlrpc_request *req)
         case OST_CREATE:
                 CDEBUG(D_INODE, "create\n");
                 OBD_FAIL_RETURN(OBD_FAIL_OST_CREATE_NET, 0);
-                rc = ost_create(req, oti);
+                rc = ost_create(exp, req, oti);
                 break;
         case OST_DESTROY:
                 CDEBUG(D_INODE, "destroy\n");
                 OBD_FAIL_RETURN(OBD_FAIL_OST_DESTROY_NET, 0);
-                rc = ost_destroy(req, oti);
+                rc = ost_destroy(exp, req, oti);
                 break;
         case OST_GETATTR:
                 CDEBUG(D_INODE, "getattr\n");
                 OBD_FAIL_RETURN(OBD_FAIL_OST_GETATTR_NET, 0);
-                rc = ost_getattr(req);
+                rc = ost_getattr(exp, req);
                 break;
         case OST_SETATTR:
                 CDEBUG(D_INODE, "setattr\n");
                 OBD_FAIL_RETURN(OBD_FAIL_OST_SETATTR_NET, 0);
-                rc = ost_setattr(req, oti);
-                break;
-        case OST_OPEN:
-                CDEBUG(D_INODE, "open\n");
-                OBD_FAIL_RETURN(OBD_FAIL_OST_OPEN_NET, 0);
-                rc = ost_open(req, oti);
-                break;
-        case OST_CLOSE:
-                CDEBUG(D_INODE, "close\n");
-                OBD_FAIL_RETURN(OBD_FAIL_OST_CLOSE_NET, 0);
-                rc = ost_close(req, oti);
+                rc = ost_setattr(exp, req, oti);
                 break;
         case OST_WRITE:
                 CDEBUG(D_INODE, "write\n");
@@ -1034,30 +1009,52 @@ static int ost_handle(struct ptlrpc_request *req)
         case OST_PUNCH:
                 CDEBUG(D_INODE, "punch\n");
                 OBD_FAIL_RETURN(OBD_FAIL_OST_PUNCH_NET, 0);
-                rc = ost_punch(req, oti);
+                rc = ost_punch(exp, req, oti);
                 break;
         case OST_STATFS:
                 CDEBUG(D_INODE, "statfs\n");
                 OBD_FAIL_RETURN(OBD_FAIL_OST_STATFS_NET, 0);
                 rc = ost_statfs(req);
                 break;
-        case OST_SYNCFS:
+        case OST_SYNC:
                 CDEBUG(D_INODE, "sync\n");
-                OBD_FAIL_RETURN(OBD_FAIL_OST_SYNCFS_NET, 0);
-                rc = ost_syncfs(req);
+                OBD_FAIL_RETURN(OBD_FAIL_OST_SYNC_NET, 0);
+                rc = ost_sync(exp, req);
                 break;
         case OST_SET_INFO:
                 DEBUG_REQ(D_INODE, req, "set_info");
-                rc = ost_set_info(req);
+                rc = ost_set_info(exp, req);
+                break;
+        case OST_GET_INFO:
+                DEBUG_REQ(D_INODE, req, "get_info");
+                rc = ost_get_info(exp, req);
+                break;
         case OBD_PING:
                 DEBUG_REQ(D_INODE, req, "ping");
                 rc = target_handle_ping(req);
                 break;
+#ifdef ENABLE_ORPHANS
+        /* FIXME - just reply status */
+        case LLOG_ORIGIN_CONNECT:
+                DEBUG_REQ(D_INODE, req, "log connect\n");
+                rc = llog_handle_connect(req); 
+                req->rq_status = rc;
+                rc = lustre_pack_reply(req, 0, NULL, NULL);
+                if (rc)
+                        RETURN(rc);
+                RETURN(ptlrpc_reply(req));
+                //break;
         case OBD_LOG_CANCEL:
                 CDEBUG(D_INODE, "log cancel\n");
                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
-                rc = ost_log_cancel(req);
-                break;
+                rc = llog_origin_handle_cancel(req);
+                req->rq_status = rc;
+                rc = lustre_pack_reply(req, 0, NULL, NULL);
+                if (rc)
+                        RETURN(rc);
+                RETURN(ptlrpc_reply(req));
+                //break;
+#endif
         case LDLM_ENQUEUE:
                 CDEBUG(D_INODE, "enqueue\n");
                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
@@ -1126,35 +1123,54 @@ out:
 static int ost_setup(struct obd_device *obddev, obd_count len, void *buf)
 {
         struct ost_obd *ost = &obddev->u.ost;
-        int err, i;
+        int rc;
         ENTRY;
 
-#ifdef ENABLE_ORPHANS
-        err = llog_start_commit_thread();
-        if (err < 0)
-                RETURN(err);
-#endif
+        /* Get rid of unneeded supplementary groups */
+        current->ngroups = 0;
+        memset(current->groups, 0, sizeof(current->groups));
+
+        rc = llog_start_commit_thread();
+        if (rc < 0)
+                RETURN(rc);
 
         ost->ost_service = ptlrpc_init_svc(OST_NEVENTS, OST_NBUFS,
                                            OST_BUFSIZE, OST_MAXREQSIZE,
                                            OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
-                                           ost_handle, "ost", obddev);
-        if (!ost->ost_service) {
+                                           ost_handle, "ost", 
+                                           obddev->obd_proc_entry);
+        if (ost->ost_service == NULL) {
                 CERROR("failed to start service\n");
                 RETURN(-ENOMEM);
         }
+        
+        rc = ptlrpc_start_n_threads(obddev, ost->ost_service, OST_NUM_THREADS, 
+                                 "ll_ost");
+        if (rc)
+                GOTO(out, rc = -EINVAL);
 
-        for (i = 0; i < OST_NUM_THREADS; i++) {
-                char name[32];
-                sprintf(name, "ll_ost_%02d", i);
-                err = ptlrpc_start_thread(obddev, ost->ost_service, name);
-                if (err) {
-                        CERROR("error starting thread #%d: rc %d\n", i, err);
-                        RETURN(-EINVAL);
-                }
+        ost->ost_create_service =
+                ptlrpc_init_svc(OST_NEVENTS, OST_NBUFS, OST_BUFSIZE,
+                                OST_MAXREQSIZE, OST_CREATE_PORTAL,
+                                OSC_REPLY_PORTAL, ost_handle, "ost_create",
+                                obddev->obd_proc_entry);
+        if (ost->ost_create_service == NULL) {
+                CERROR("failed to start OST create service\n");
+                GOTO(out, rc = -ENOMEM);
         }
 
+        rc = ptlrpc_start_n_threads(obddev, ost->ost_create_service, 1,
+                                    "ll_ost_create");
+        if (rc) 
+                GOTO(out_create, rc = -EINVAL);
+
         RETURN(0);
+
+out_create:
+        ptlrpc_unregister_service(ost->ost_create_service);
+out:
+        ptlrpc_unregister_service(ost->ost_service);
+        RETURN(rc);
 }
 
 static int ost_cleanup(struct obd_device *obddev, int flags)
@@ -1169,6 +1185,9 @@ static int ost_cleanup(struct obd_device *obddev, int flags)
         ptlrpc_stop_all_threads(ost->ost_service);
         ptlrpc_unregister_service(ost->ost_service);
 
+        ptlrpc_stop_all_threads(ost->ost_create_service);
+        ptlrpc_unregister_service(ost->ost_create_service);
+
         RETURN(err);
 }
 
@@ -1185,29 +1204,6 @@ int ost_detach(struct obd_device *dev)
         return lprocfs_obd_detach(dev);
 }
 
-/* I don't think this function is ever used, since nothing
- * connects directly to this module.
- */
-static int ost_connect(struct lustre_handle *conn,
-                       struct obd_device *obd, struct obd_uuid *cluuid)
-{
-        struct obd_export *exp;
-        int rc;
-        ENTRY;
-
-        if (!conn || !obd || !cluuid)
-                RETURN(-EINVAL);
-
-        rc = class_connect(conn, obd, cluuid);
-        if (rc)
-                RETURN(rc);
-        exp = class_conn2export(conn);
-        LASSERT(exp);
-        class_export_put(exp);
-
-        RETURN(0);
-}
-
 /* use obd ops to offer management infrastructure */
 static struct obd_ops ost_obd_ops = {
         o_owner:        THIS_MODULE,
@@ -1215,7 +1211,6 @@ static struct obd_ops ost_obd_ops = {
         o_detach:       ost_detach,
         o_setup:        ost_setup,
         o_cleanup:      ost_cleanup,
-        o_connect:      ost_connect,
 };
 
 static int __init ost_init(void)