Whamcloud - gitweb
use special macro for print time_t, cleanup in includes.
[fs/lustre-release.git] / lustre / mds / handler.c
index eebf615..9c33a98 100644 (file)
@@ -62,6 +62,10 @@ int mds_num_threads;
 CFS_MODULE_PARM(mds_num_threads, "i", int, 0444,
                 "number of MDS service threads to start");
 
+__u32 mds_max_ost_index=0xFFFF;
+CFS_MODULE_PARM(mds_max_ost_index, "i", int, 0444,
+                "maximal OST index");
+
 static int mds_intent_policy(struct ldlm_namespace *ns,
                              struct ldlm_lock **lockp, void *req_cookie,
                              ldlm_mode_t mode, int flags, void *data);
@@ -93,7 +97,7 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
         for (i = 0, tmpcount = count; i < npages; i++, tmpcount -= tmpsize) {
                 tmpsize = tmpcount > CFS_PAGE_SIZE ? CFS_PAGE_SIZE : tmpcount;
 
-                pages[i] = cfs_alloc_page(CFS_ALLOC_STD);
+                OBD_PAGE_ALLOC(pages[i], CFS_ALLOC_STD);
                 if (pages[i] == NULL)
                         GOTO(cleanup_buf, rc = -ENOMEM);
 
@@ -152,7 +156,7 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
  cleanup_buf:
         for (i = 0; i < npages; i++)
                 if (pages[i])
-                        __cfs_free_page(pages[i]);
+                        OBD_PAGE_FREE(pages[i]);
 
         ptlrpc_free_bulk(desc);
  out_free:
@@ -285,7 +289,8 @@ static int mds_connect_internal(struct obd_export *exp,
         return 0;
 }
 
-static int mds_reconnect(struct obd_export *exp, struct obd_device *obd,
+static int mds_reconnect(const struct lu_env *env,
+                         struct obd_export *exp, struct obd_device *obd,
                          struct obd_uuid *cluuid,
                          struct obd_connect_data *data)
 {
@@ -308,7 +313,8 @@ static int mds_reconnect(struct obd_export *exp, struct obd_device *obd,
  */
 static int mds_connect(const struct lu_env *env,
                        struct lustre_handle *conn, struct obd_device *obd,
-                       struct obd_uuid *cluuid, struct obd_connect_data *data)
+                       struct obd_uuid *cluuid, struct obd_connect_data *data,
+                       void *localdata)
 {
         struct obd_export *exp;
         struct mds_export_data *med;
@@ -336,6 +342,8 @@ static int mds_connect(const struct lu_env *env,
         LASSERT(exp);
         med = &exp->exp_mds_data;
 
+        exp->exp_flvr.sf_rpc = SPTLRPC_FLVR_NULL;
+
         rc = mds_connect_internal(exp, data);
         if (rc)
                 GOTO(out, rc);
@@ -347,7 +355,7 @@ static int mds_connect(const struct lu_env *env,
         memcpy(mcd->mcd_uuid, cluuid, sizeof(mcd->mcd_uuid));
         med->med_mcd = mcd;
 
-        rc = mds_client_add(obd, exp, -1);
+        rc = mds_client_add(obd, exp, -1, localdata);
         GOTO(out, rc);
 
 out:
@@ -368,9 +376,9 @@ int mds_init_export(struct obd_export *exp)
 {
         struct mds_export_data *med = &exp->exp_mds_data;
 
-        INIT_LIST_HEAD(&med->med_open_head);
+        CFS_INIT_LIST_HEAD(&med->med_open_head);
         spin_lock_init(&med->med_open_lock);
-        
+
         spin_lock(&exp->exp_lock);
         exp->exp_connecting = 1;
         spin_unlock(&exp->exp_lock);
@@ -515,12 +523,11 @@ static int mds_getstatus(struct ptlrpc_request *req)
         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
         ENTRY;
 
+        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK))
+                RETURN(req->rq_status = -ENOMEM);
         rc = lustre_pack_reply(req, 2, size, NULL);
-        if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK)) {
-                CERROR("mds: out of memory for message\n");
-                req->rq_status = -ENOMEM;       /* superfluous? */
-                RETURN(-ENOMEM);
-        }
+        if (rc)
+                RETURN(req->rq_status = rc);
 
         body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*body));
         memcpy(&body->fid1, &mds->mds_rootfid, sizeof(body->fid1));
@@ -831,7 +838,6 @@ static int mds_getattr_pack_msg(struct ptlrpc_request *req, struct inode *inode,
 
         rc = lustre_pack_reply(req, bufcount, size, NULL);
         if (rc) {
-                CERROR("lustre_pack_reply failed: rc %d\n", rc);
                 req->rq_status = rc;
                 RETURN(rc);
         }
@@ -1001,8 +1007,10 @@ static int mds_getattr_lock(struct ptlrpc_request *req, int offset,
         default:
                 mds_exit_ucred(&uc, mds);
                 if (req->rq_reply_state == NULL) {
+                        int rc2 = lustre_pack_reply(req, 1, NULL, NULL);
+                        if (rc == 0)
+                                rc = rc2;
                         req->rq_status = rc;
-                        lustre_pack_reply(req, 1, NULL, NULL);
                 }
         }
         return rc;
@@ -1052,15 +1060,17 @@ out_pop:
         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
 out_ucred:
         if (req->rq_reply_state == NULL) {
+                int rc2 = lustre_pack_reply(req, 1, NULL, NULL);
+                if (rc == 0)
+                        rc = rc2;
                 req->rq_status = rc;
-                lustre_pack_reply(req, 1, NULL, NULL);
         }
         mds_exit_ucred(&uc, mds);
         return rc;
 }
 
 static int mds_obd_statfs(struct obd_device *obd, struct obd_statfs *osfs,
-                          __u64 max_age)
+                          __u64 max_age, __u32 flags)
 {
         int rc;
 
@@ -1085,16 +1095,16 @@ static int mds_statfs(struct ptlrpc_request *req)
                          (MDS_SERVICE_WATCHDOG_TIMEOUT / 1000) + 1);
         OBD_COUNTER_INCREMENT(obd, statfs);
 
+        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK))
+                GOTO(out, rc = -ENOMEM);
         rc = lustre_pack_reply(req, 2, size, NULL);
-        if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
-                CERROR("mds: statfs lustre_pack_reply failed: rc = %d\n", rc);
+        if (rc)
                 GOTO(out, rc);
-        }
 
         /* We call this so that we can cache a bit - 1 jiffie worth */
         rc = mds_obd_statfs(obd, lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
                                                 size[REPLY_REC_OFF]),
-                            cfs_time_current_64() - HZ);
+                            cfs_time_current_64() - HZ, 0);
         if (rc) {
                 CERROR("mds_obd_statfs failed: rc %d\n", rc);
                 GOTO(out, rc);
@@ -1119,36 +1129,28 @@ static int mds_sync(struct ptlrpc_request *req, int offset)
         if (body == NULL)
                 GOTO(out, rc = -EFAULT);
 
+        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK))
+                GOTO(out, rc = -ENOMEM);
         rc = lustre_pack_reply(req, 2, size, NULL);
-        if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK)) {
-                CERROR("fsync lustre_pack_reply failed: rc = %d\n", rc);
+        if (rc)
                 GOTO(out, rc);
-        }
 
-        if (body->fid1.id == 0) {
-                /* a fid of zero is taken to mean "sync whole filesystem" */
-                rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
-                GOTO(out, rc);
-        } else {
+        rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
+        if (rc == 0 && body->fid1.id != 0) {
                 struct dentry *de;
 
                 de = mds_fid2dentry(mds, &body->fid1, NULL);
                 if (IS_ERR(de))
                         GOTO(out, rc = PTR_ERR(de));
 
-                /* The file parameter isn't used for anything */
-                if (de->d_inode->i_fop && de->d_inode->i_fop->fsync)
-                        rc = de->d_inode->i_fop->fsync(NULL, de, 1);
-                if (rc == 0) {
-                        body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
-                                              sizeof(*body));
-                        mds_pack_inode2fid(&body->fid1, de->d_inode);
-                        mds_pack_inode2body(body, de->d_inode);
-                }
+                body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
+                                      sizeof(*body));
+                mds_pack_inode2fid(&body->fid1, de->d_inode);
+                mds_pack_inode2body(body, de->d_inode);
 
                 l_dput(de);
-                GOTO(out, rc);
         }
+        GOTO(out, rc);
 out:
         req->rq_status = rc;
         return 0;
@@ -1174,12 +1176,9 @@ static int mds_readpage(struct ptlrpc_request *req, int offset)
 
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK))
                 RETURN(-ENOMEM);
-
         rc = lustre_pack_reply(req, 2, size, NULL);
-        if (rc) {
-                CERROR("error packing readpage reply: rc %d\n", rc);
+        if (rc)
                 GOTO(out, rc);
-        }
 
         body = lustre_swab_reqbuf(req, offset, sizeof(*body),
                                   lustre_swab_mds_body);
@@ -1301,9 +1300,8 @@ static char *reint_names[] = {
 
 static int mds_set_info_rpc(struct obd_export *exp, struct ptlrpc_request *req)
 {
-        char *key;
-        __u32 *val;
-        int keylen, rc = 0;
+        void *key, *val;
+        int keylen, vallen, rc = 0;
         ENTRY;
 
         key = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, 1);
@@ -1313,25 +1311,28 @@ static int mds_set_info_rpc(struct obd_export *exp, struct ptlrpc_request *req)
         }
         keylen = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF);
 
-        val = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*val));
-        if (val == NULL) {
-                DEBUG_REQ(D_HA, req, "no set_info val");
-                RETURN(-EFAULT);
-        }
+        val = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, 0);
+        vallen = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1);
 
         rc = lustre_pack_reply(req, 1, NULL, NULL);
         if (rc)
                 RETURN(rc);
+
         lustre_msg_set_status(req->rq_repmsg, 0);
 
-        if (keylen < strlen("read-only") ||
-            memcmp(key, "read-only", keylen) != 0)
-                RETURN(-EINVAL);
+        if (KEY_IS("read-only")) {
+                if (val == NULL || vallen < sizeof(__u32)) {
+                        DEBUG_REQ(D_HA, req, "no set_info val");
+                        RETURN(-EFAULT);
+                }
 
-        if (*val)
-                exp->exp_connect_flags |= OBD_CONNECT_RDONLY;
-        else
-                exp->exp_connect_flags &= ~OBD_CONNECT_RDONLY;
+                if (*(__u32 *)val)
+                        exp->exp_connect_flags |= OBD_CONNECT_RDONLY;
+                else
+                        exp->exp_connect_flags &= ~OBD_CONNECT_RDONLY;
+        } else {
+                RETURN(-EINVAL);
+        }
 
         RETURN(0);
 }
@@ -1348,10 +1349,8 @@ static int mds_handle_quotacheck(struct ptlrpc_request *req)
                 RETURN(-EPROTO);
 
         rc = lustre_pack_reply(req, 1, NULL, NULL);
-        if (rc) {
-                CERROR("mds: out of memory while packing quotacheck reply\n");
+        if (rc)
                 RETURN(rc);
-        }
 
         req->rq_status = obd_quotacheck(req->rq_export, oqctl);
         RETURN(0);
@@ -1467,7 +1466,8 @@ int mds_handle(struct ptlrpc_request *req)
         struct obd_device *obd = NULL;
         ENTRY;
 
-        OBD_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
+        if (OBD_FAIL_CHECK_ORSET(OBD_FAIL_MDS_ALL_REQUEST_NET, OBD_FAIL_ONCE))
+                RETURN(0);
 
         LASSERT(current->journal_info == NULL);
 
@@ -1533,7 +1533,8 @@ int mds_handle(struct ptlrpc_request *req)
         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
         case MDS_CONNECT:
                 DEBUG_REQ(D_INODE, req, "connect");
-                OBD_FAIL_RETURN(OBD_FAIL_MDS_CONNECT_NET, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_MDS_CONNECT_NET))
+                        RETURN(0);
                 rc = target_handle_connect(req);
                 if (!rc) {
                         /* Now that we have an export, set mds. */
@@ -1550,39 +1551,45 @@ int mds_handle(struct ptlrpc_request *req)
 
         case MDS_DISCONNECT:
                 DEBUG_REQ(D_INODE, req, "disconnect");
-                OBD_FAIL_RETURN(OBD_FAIL_MDS_DISCONNECT_NET, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_MDS_DISCONNECT_NET))
+                        RETURN(0);
                 rc = target_handle_disconnect(req);
                 req->rq_status = rc;            /* superfluous? */
                 break;
 
         case MDS_GETSTATUS:
                 DEBUG_REQ(D_INODE, req, "getstatus");
-                OBD_FAIL_RETURN(OBD_FAIL_MDS_GETSTATUS_NET, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_NET))
+                        RETURN(0);
                 rc = mds_getstatus(req);
                 break;
 
         case MDS_GETATTR:
                 DEBUG_REQ(D_INODE, req, "getattr");
-                OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_NET))
+                        RETURN(0);
                 rc = mds_getattr(req, REQ_REC_OFF);
                 break;
 
         case MDS_SETXATTR:
                 DEBUG_REQ(D_INODE, req, "setxattr");
-                OBD_FAIL_RETURN(OBD_FAIL_MDS_SETXATTR_NET, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SETXATTR_NET))
+                        RETURN(0);
                 rc = mds_setxattr(req);
                 break;
 
         case MDS_GETXATTR:
                 DEBUG_REQ(D_INODE, req, "getxattr");
-                OBD_FAIL_RETURN(OBD_FAIL_MDS_GETXATTR_NET, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETXATTR_NET))
+                        RETURN(0);
                 rc = mds_getxattr(req);
                 break;
 
         case MDS_GETATTR_NAME: {
                 struct lustre_handle lockh = { 0 };
                 DEBUG_REQ(D_INODE, req, "getattr_name");
-                OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NAME_NET, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_NAME_NET))
+                        RETURN(0);
 
                 /* If this request gets a reconstructed reply, we won't be
                  * acquiring any new locks in mds_getattr_lock, so we don't
@@ -1598,16 +1605,18 @@ int mds_handle(struct ptlrpc_request *req)
         }
         case MDS_STATFS:
                 DEBUG_REQ(D_INODE, req, "statfs");
-                OBD_FAIL_RETURN(OBD_FAIL_MDS_STATFS_NET, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_NET))
+                        RETURN(0);
                 rc = mds_statfs(req);
                 break;
 
         case MDS_READPAGE:
                 DEBUG_REQ(D_INODE, req, "readpage");
-                OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_NET))
+                        RETURN(0);
                 rc = mds_readpage(req, REQ_REC_OFF);
 
-                if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_MDS_SENDPAGE)) {
+                if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
                         RETURN(0);
                 }
 
@@ -1617,6 +1626,7 @@ int mds_handle(struct ptlrpc_request *req)
                 __u32 *opcp = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF,
                                              sizeof(*opcp));
                 __u32  opc;
+                int op = 0;
                 int size[4] = { sizeof(struct ptlrpc_body),
                                 sizeof(struct mds_body),
                                 mds->mds_max_mdsize,
@@ -1637,8 +1647,36 @@ int mds_handle(struct ptlrpc_request *req)
                           (opc < sizeof(reint_names) / sizeof(reint_names[0]) ||
                            reint_names[opc] == NULL) ? reint_names[opc] :
                                                        "unknown opcode");
+                switch (opc) {
+                case REINT_CREATE:
+                        op = PTLRPC_LAST_CNTR + MDS_REINT_CREATE;
+                        break;
+                case REINT_LINK:
+                        op = PTLRPC_LAST_CNTR + MDS_REINT_LINK;
+                        break;
+                case REINT_OPEN:
+                        op = PTLRPC_LAST_CNTR + MDS_REINT_OPEN;
+                        break;
+                case REINT_SETATTR:
+                        op = PTLRPC_LAST_CNTR + MDS_REINT_SETATTR;
+                        break;
+                case REINT_RENAME:
+                        op = PTLRPC_LAST_CNTR + MDS_REINT_RENAME;
+                        break;
+                case REINT_UNLINK:
+                        op = PTLRPC_LAST_CNTR + MDS_REINT_UNLINK;
+                        break;
+                default:
+                        op = 0;
+                        break;
+                }
+
+                if (op && req->rq_rqbd->rqbd_service->srv_stats)
+                        lprocfs_counter_incr(
+                                req->rq_rqbd->rqbd_service->srv_stats, op);
 
-                OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_NET))
+                        RETURN(0);
 
                 if (opc == REINT_UNLINK || opc == REINT_RENAME)
                         bufcount = 4;
@@ -1658,25 +1696,30 @@ int mds_handle(struct ptlrpc_request *req)
 
         case MDS_CLOSE:
                 DEBUG_REQ(D_INODE, req, "close");
-                OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_NET))
+                        RETURN(0);
                 rc = mds_close(req, REQ_REC_OFF);
+                fail = OBD_FAIL_MDS_CLOSE_NET_REP;
                 break;
 
         case MDS_DONE_WRITING:
                 DEBUG_REQ(D_INODE, req, "done_writing");
-                OBD_FAIL_RETURN(OBD_FAIL_MDS_DONE_WRITING_NET, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_MDS_DONE_WRITING_NET))
+                        RETURN(0);
                 rc = mds_done_writing(req, REQ_REC_OFF);
                 break;
 
         case MDS_PIN:
                 DEBUG_REQ(D_INODE, req, "pin");
-                OBD_FAIL_RETURN(OBD_FAIL_MDS_PIN_NET, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_MDS_PIN_NET))
+                        RETURN(0);
                 rc = mds_pin(req, REQ_REC_OFF);
                 break;
 
         case MDS_SYNC:
                 DEBUG_REQ(D_INODE, req, "sync");
-                OBD_FAIL_RETURN(OBD_FAIL_MDS_SYNC_NET, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SYNC_NET))
+                        RETURN(0);
                 rc = mds_sync(req, REQ_REC_OFF);
                 break;
 
@@ -1687,13 +1730,15 @@ int mds_handle(struct ptlrpc_request *req)
 
         case MDS_QUOTACHECK:
                 DEBUG_REQ(D_INODE, req, "quotacheck");
-                OBD_FAIL_RETURN(OBD_FAIL_MDS_QUOTACHECK_NET, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_MDS_QUOTACHECK_NET))
+                        RETURN(0);
                 rc = mds_handle_quotacheck(req);
                 break;
 
         case MDS_QUOTACTL:
                 DEBUG_REQ(D_INODE, req, "quotactl");
-                OBD_FAIL_RETURN(OBD_FAIL_MDS_QUOTACTL_NET, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_MDS_QUOTACTL_NET))
+                        RETURN(0);
                 rc = mds_handle_quotactl(req);
                 break;
 
@@ -1704,20 +1749,23 @@ int mds_handle(struct ptlrpc_request *req)
 
         case OBD_LOG_CANCEL:
                 CDEBUG(D_INODE, "log cancel\n");
-                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_NET))
+                        RETURN(0);
                 rc = -ENOTSUPP; /* la la la */
                 break;
 
         case LDLM_ENQUEUE:
                 DEBUG_REQ(D_INODE, req, "enqueue");
-                OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE))
+                        RETURN(0);
                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
                                          ldlm_server_blocking_ast, NULL);
                 fail = OBD_FAIL_LDLM_REPLY;
                 break;
         case LDLM_CONVERT:
                 DEBUG_REQ(D_INODE, req, "convert");
-                OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CONVERT))
+                        RETURN(0);
                 rc = ldlm_handle_convert(req);
                 break;
         case LDLM_BL_CALLBACK:
@@ -1725,41 +1773,49 @@ int mds_handle(struct ptlrpc_request *req)
                 DEBUG_REQ(D_INODE, req, "callback");
                 CERROR("callbacks should not happen on MDS\n");
                 LBUG();
-                OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK))
+                        RETURN(0);
                 break;
         case LLOG_ORIGIN_HANDLE_CREATE:
                 DEBUG_REQ(D_INODE, req, "llog_init");
-                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
+                        RETURN(0);
                 rc = llog_origin_handle_create(req);
                 break;
         case LLOG_ORIGIN_HANDLE_DESTROY:
                 DEBUG_REQ(D_INODE, req, "llog_init");
-                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
+                        RETURN(0);
                 rc = llog_origin_handle_destroy(req);
                 break;
         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
                 DEBUG_REQ(D_INODE, req, "llog next block");
-                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
+                        RETURN(0);
                 rc = llog_origin_handle_next_block(req);
                 break;
         case LLOG_ORIGIN_HANDLE_PREV_BLOCK:
                 DEBUG_REQ(D_INODE, req, "llog prev block");
-                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
+                        RETURN(0);
                 rc = llog_origin_handle_prev_block(req);
                 break;
         case LLOG_ORIGIN_HANDLE_READ_HEADER:
                 DEBUG_REQ(D_INODE, req, "llog read header");
-                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
+                        RETURN(0);
                 rc = llog_origin_handle_read_header(req);
                 break;
         case LLOG_ORIGIN_HANDLE_CLOSE:
                 DEBUG_REQ(D_INODE, req, "llog close");
-                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
+                        RETURN(0);
                 rc = llog_origin_handle_close(req);
                 break;
         case LLOG_CATINFO:
                 DEBUG_REQ(D_INODE, req, "llog catinfo");
-                OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
+                if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
+                        RETURN(0);
                 rc = llog_catinfo(req);
                 break;
         default:
@@ -1864,13 +1920,9 @@ static void fsoptions_to_mds_flags(struct mds_obd *mds, char *options)
 }
 static int mds_lov_presetup (struct mds_obd *mds, struct lustre_cfg *lcfg)
 {
-        int rc;
+        int rc = 0;
         ENTRY;
 
-        rc = llog_start_commit_thread();
-        if (rc < 0)
-                RETURN(rc);
-
         if (lcfg->lcfg_bufcount >= 4 && LUSTRE_CFG_BUFLEN(lcfg, 3) > 0) {
                 class_uuid_t uuid;
 
@@ -1926,7 +1978,7 @@ static int mds_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
 
         /* We mounted in lustre_fill_super.
            lcfg bufs 1, 2, 4 (device, fstype, mount opts) are ignored.*/
-                
+
         lsi = s2lsi(lmi->lmi_sb);
         fsoptions_to_mds_flags(mds, lsi->lsi_ldd->ldd_mount_opts);
         fsoptions_to_mds_flags(mds, lsi->lsi_lmd->lmd_opts);
@@ -1955,13 +2007,20 @@ static int mds_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
         }
         ldlm_register_intent(obd->obd_namespace, mds_intent_policy);
 
-        lprocfs_init_vars(mds, &lvars);
+        lprocfs_mds_init_vars(&lvars);
         if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0 &&
             lprocfs_alloc_obd_stats(obd, LPROC_MDS_LAST) == 0) {
                 /* Init private stats here */
                 mds_stats_counter_init(obd->obd_stats);
-                obd->obd_proc_exports = proc_mkdir("exports",
-                                                   obd->obd_proc_entry);
+                obd->obd_proc_exports_entry = lprocfs_register("exports",
+                                                         obd->obd_proc_entry,
+                                                         NULL, NULL);
+                if (IS_ERR(obd->obd_proc_exports_entry)) {
+                        rc = PTR_ERR(obd->obd_proc_exports_entry);
+                        CERROR("error %d setting up lprocfs for %s\n",
+                               rc, "exports");
+                        obd->obd_proc_exports_entry = NULL;
+                }
         }
 
         rc = mds_fs_setup(obd, mnt);
@@ -1971,6 +2030,11 @@ static int mds_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
                 GOTO(err_ns, rc);
         }
 
+        if (obd->obd_proc_exports_entry)
+                lprocfs_add_simple(obd->obd_proc_exports_entry,
+                                   "clear", lprocfs_nid_stats_clear_read,
+                                   lprocfs_nid_stats_clear_write, obd);
+
         rc = mds_lov_presetup(mds, lcfg);
         if (rc < 0)
                 GOTO(err_fs, rc);
@@ -2048,8 +2112,8 @@ err_fs:
         mds->mds_group_hash = NULL;
 #endif
 err_ns:
-        lprocfs_obd_cleanup(obd);
         lprocfs_free_obd_stats(obd);
+        lprocfs_obd_cleanup(obd);
         ldlm_namespace_free(obd->obd_namespace, 0);
         obd->obd_namespace = NULL;
 err_ops:
@@ -2098,12 +2162,12 @@ static int mds_postsetup(struct obd_device *obd)
         int rc = 0;
         ENTRY;
 
-        rc = llog_setup(obd, NULL, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
+        rc = llog_setup(obd, &obd->obd_olg, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
                         &llog_lvfs_ops);
         if (rc)
                 RETURN(rc);
 
-        rc = llog_setup(obd, NULL, LLOG_LOVEA_ORIG_CTXT, obd, 0, NULL,
+        rc = llog_setup(obd, &obd->obd_olg, LLOG_LOVEA_ORIG_CTXT, obd, 0, NULL,
                         &llog_lvfs_ops);
         if (rc)
                 RETURN(rc);
@@ -2136,23 +2200,14 @@ err_cleanup:
 
 int mds_postrecov(struct obd_device *obd)
 {
-        int rc;
+        int rc = 0;
         ENTRY;
 
         if (obd->obd_fail)
                 RETURN(0);
 
         LASSERT(!obd->obd_recovering);
-        LASSERT(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT) != NULL);
-
-        /* FIXME why not put this in the synchronize? */
-        /* set nextid first, so we are sure it happens */
-        rc = mds_lov_set_nextid(obd);
-        if (rc) {
-                CERROR("%s: mds_lov_set_nextid failed %d\n",
-                       obd->obd_name, rc);
-                GOTO(out, rc);
-        }
+        LASSERT(!llog_ctxt_null(obd, LLOG_MDS_OST_ORIG_CTXT));
 
         /* clean PENDING dir */
         if (strncmp(obd->obd_name, MDD_OBD_NAME, strlen(MDD_OBD_NAME)))
@@ -2232,14 +2287,17 @@ static int mds_cleanup(struct obd_device *obd)
                    we just need to drop our ref */
                 class_export_put(mds->mds_osc_exp);
 
-        lprocfs_obd_cleanup(obd);
+        lprocfs_remove_proc_entry("clear", obd->obd_proc_exports_entry);
+        lprocfs_free_per_client_stats(obd);
         lprocfs_free_obd_stats(obd);
+        lprocfs_obd_cleanup(obd);
 
         lquota_cleanup(mds_quota_interface_ref, obd);
 
         mds_update_server_data(obd, 1);
-        if (mds->mds_lov_objids != NULL)
-                OBD_FREE(mds->mds_lov_objids, mds->mds_lov_objids_size);
+        /* XXX
+        mds_lov_destroy_objids(obd);
+        */
         mds_fs_cleanup(obd);
 
 #if 0
@@ -2358,7 +2416,8 @@ static int mds_intent_policy(struct ldlm_namespace *ns,
         if (lustre_msg_bufcount(req->rq_reqmsg) <= DLM_INTENT_IT_OFF) {
                 /* No intent was provided */
                 rc = lustre_pack_reply(req, 2, repsize, NULL);
-                LASSERT(rc == 0);
+                if (rc)
+                        RETURN(rc);
                 RETURN(0);
         }
 
@@ -2521,7 +2580,7 @@ static int mdt_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
         int rc = 0;
         ENTRY;
 
-        lprocfs_init_vars(mdt, &lvars);
+        lprocfs_mdt_init_vars(&lvars);
         lprocfs_obd_setup(obd, lvars.obd_vars);
 
         sema_init(&mds->mds_health_sem, 1);
@@ -2535,12 +2594,12 @@ static int mdt_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
                 mds_max_threads = mds_min_threads = mds_num_threads;
         } else {
                 /* Base min threads on memory and cpus */
-                mds_min_threads = num_possible_cpus() * num_physpages >> 
+                mds_min_threads = num_possible_cpus() * num_physpages >>
                         (27 - CFS_PAGE_SHIFT);
                 if (mds_min_threads < MDS_THREADS_MIN)
                         mds_min_threads = MDS_THREADS_MIN;
                 /* Largest auto threads start value */
-                if (mds_min_threads > 32) 
+                if (mds_min_threads > 32)
                         mds_min_threads = 32;
                 mds_max_threads = min(MDS_THREADS_MAX, mds_min_threads * 4);
         }
@@ -2550,7 +2609,7 @@ static int mdt_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
                                 MDS_MAXREPSIZE, MDS_REQUEST_PORTAL,
                                 MDC_REPLY_PORTAL, MDS_SERVICE_WATCHDOG_TIMEOUT,
                                 mds_handle, LUSTRE_MDS_NAME,
-                                obd->obd_proc_entry, NULL, 
+                                obd->obd_proc_entry, NULL,
                                 mds_min_threads, mds_max_threads, "ll_mdt", 0);
 
         if (!mds->mds_service) {
@@ -2584,7 +2643,7 @@ static int mdt_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
                                 MDS_MAXREPSIZE, MDS_READPAGE_PORTAL,
                                 MDC_REPLY_PORTAL, MDS_SERVICE_WATCHDOG_TIMEOUT,
                                 mds_handle, "mds_readpage",
-                                obd->obd_proc_entry, NULL, 
+                                obd->obd_proc_entry, NULL,
                                 MDS_THREADS_MIN_READPAGE, mds_max_threads,
                                 "ll_mdt_rdpg", 0);
         if (!mds->mds_readpage_service) {
@@ -2598,7 +2657,7 @@ static int mdt_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
                 GOTO(err_thread3, rc);
 
         ping_evictor_start();
-        
+
         RETURN(0);
 
 err_thread3:
@@ -2691,7 +2750,7 @@ static int mds_process_config(struct obd_device *obd, obd_count len, void *buf)
         struct lprocfs_static_vars lvars;
         int rc;
 
-        lprocfs_init_vars(mds, &lvars);
+        lprocfs_mds_init_vars(&lvars);
 
         rc = class_process_proc_param(PARAM_MDT, lvars.obd_vars, lcfg, obd);
         return(rc);
@@ -2749,10 +2808,10 @@ static __attribute__((unused)) int __init mds_init(void)
         }
         init_obd_quota_ops(mds_quota_interface_ref, &mds_obd_ops);
 
-        lprocfs_init_vars(mds, &lvars);
+        lprocfs_mds_init_vars(&lvars);
         class_register_type(&mds_obd_ops, NULL,
                             lvars.module_vars, LUSTRE_MDS_NAME, NULL);
-        lprocfs_init_vars(mdt, &lvars);
+        lprocfs_mds_init_vars(&lvars);
         mdt_obd_ops = mdt_obd_ops; //make compiler happy
 //        class_register_type(&mdt_obd_ops, NULL,
 //                            lvars.module_vars, LUSTRE_MDT_NAME, NULL);
@@ -2779,7 +2838,6 @@ static int mds_cmd_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
         struct lustre_sb_info *lsi;
         struct lustre_mount_info *lmi;
         struct dentry  *dentry;
-        struct file *file;
         int rc = 0;
         ENTRY;
 
@@ -2830,19 +2888,10 @@ static int mds_cmd_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
                 CERROR("__iopen__ directory has no inode? rc = %d\n", rc);
                 GOTO(err_fid, rc);
         }
-
-        /* open and test the lov objd file */
-        file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
-        if (IS_ERR(file)) {
-                rc = PTR_ERR(file);
-                CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
-                GOTO(err_fid, rc = PTR_ERR(file));
-        }
-        mds->mds_lov_objid_filp = file;
-        if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
-                CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
-                       file->f_dentry->d_inode->i_mode);
-                GOTO(err_lov_objid, rc = -ENOENT);
+        rc = mds_lov_init_objids(obd);
+        if (rc != 0) {
+               CERROR("cannot init lov objid rc = %d\n", rc);
+               GOTO(err_fid, rc );
         }
 
         rc = mds_lov_presetup(mds, lcfg);
@@ -2866,10 +2915,6 @@ static int mds_cmd_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
 err_pop:
         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
         RETURN(rc);
-err_lov_objid:
-        if (mds->mds_lov_objid_filp &&
-                filp_close((struct file *)mds->mds_lov_objid_filp, 0))
-                CERROR("can't close %s after error\n", LOV_OBJID);
 err_fid:
         dput(mds->mds_fid_de);
 err_objects:
@@ -2891,20 +2936,14 @@ static int mds_cmd_cleanup(struct obd_device *obd)
                               "will be preserved.\n", obd->obd_name);
 
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
-        if (mds->mds_lov_objid_filp) {
-                rc = filp_close((struct file *)mds->mds_lov_objid_filp, 0);
-                mds->mds_lov_objid_filp = NULL;
-                if (rc)
-                        CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
-        }
+
+        mds_lov_destroy_objids(obd);
+
         if (mds->mds_objects_dir != NULL) {
                 l_dput(mds->mds_objects_dir);
                 mds->mds_objects_dir = NULL;
         }
 
-        if (mds->mds_lov_objids != NULL)
-                OBD_FREE(mds->mds_lov_objids, mds->mds_lov_objids_size);
-
         shrink_dcache_parent(mds->mds_fid_de);
         dput(mds->mds_fid_de);
         LL_DQUOT_OFF(obd->u.obt.obt_sb);
@@ -2938,7 +2977,7 @@ static int __init mds_cmd_init(void)
 {
         struct lprocfs_static_vars lvars;
 
-        lprocfs_init_vars(mds, &lvars);
+        lprocfs_mds_init_vars(&lvars);
         class_register_type(&mds_cmd_obd_ops, NULL, lvars.module_vars,
                             LUSTRE_MDS_NAME, NULL);