Whamcloud - gitweb
mdt prototype updates: cleanup of mdt_handle()
authornikita <nikita>
Sun, 2 Apr 2006 15:21:44 +0000 (15:21 +0000)
committernikita <nikita>
Sun, 2 Apr 2006 15:21:44 +0000 (15:21 +0000)
lustre/mds/handler.c
lustre/mdt/mdt.h
lustre/mdt/mdt_handler.c

index 8312d22..823e36e 100644 (file)
@@ -1370,7 +1370,7 @@ static int mds_msg_check_version(struct lustre_msg *msg)
 int mds_handle(struct ptlrpc_request *req)
 {
         int should_process, fail = OBD_FAIL_MDS_ALL_REPLY_NET;
-        int rc = 0;
+        int rc;
         struct mds_obd *mds = NULL; /* quell gcc overwarning */
         struct obd_device *obd = NULL;
         ENTRY;
@@ -1400,7 +1400,7 @@ int mds_handle(struct ptlrpc_request *req)
 
                 med = &req->rq_export->exp_mds_data;
                 obd = req->rq_export->exp_obd;
-                mds = &obd->u.mds;
+                mds = mds_req2mds(req);
 
                 /* sanity check: if the xid matches, the request must
                  * be marked as a resent or replayed */
@@ -1439,6 +1439,12 @@ int mds_handle(struct ptlrpc_request *req)
                 rc = target_handle_connect(req, mds_handle);
                 if (!rc) {
                         /* Now that we have an export, set mds. */
+                        /*
+                         * XXX nikita: these assignments are useless: mds is
+                         * never used below, and obd is only used for
+                         * MSG_LAST_REPLAY case, which never happens for
+                         * MDS_CONNECT.
+                         */
                         obd = req->rq_export->exp_obd;
                         mds = mds_req2mds(req);
                 }
index 45cad82..ae870af 100644 (file)
@@ -37,8 +37,8 @@ struct ptlrpc_service_conf {
 struct md_object;
 
 struct md_device {
-       struct lu_device             md_lu_dev;
-       struct md_device_operations *md_ops;
+        struct lu_device             md_lu_dev;
+        struct md_device_operations *md_ops;
 };
 
 struct md_device_operations {
@@ -61,7 +61,7 @@ struct mdt_device {
 };
 
 struct md_object {
-       struct lu_object mo_lu;
+        struct lu_object mo_lu;
 };
 
 static inline int lu_device_is_md(struct lu_device *d)
@@ -72,22 +72,22 @@ static inline int lu_device_is_md(struct lu_device *d)
 static inline struct md_object *lu2md(struct lu_object *o)
 {
         LASSERT(lu_device_is_md(o->lo_dev));
-       return container_of(o, struct md_object, mo_lu);
+        return container_of(o, struct md_object, mo_lu);
 }
 
 static inline struct md_device *md_device_get(struct md_object *o)
 {
         LASSERT(lu_device_is_md(o->mo_lu.lo_dev));
-       return container_of(o->mo_lu.lo_dev, struct md_device, md_lu_dev);
+        return container_of(o->mo_lu.lo_dev, struct md_device, md_lu_dev);
 }
 
 struct mdt_object {
-       struct lu_object_header mot_header;
-       struct md_object        mot_obj;
+        struct lu_object_header mot_header;
+        struct md_object        mot_obj;
 };
 
 struct mdt_lock_handle {
-       struct lustre_handle    mlh_lh;
+        struct lustre_handle    mlh_lh;
         ldlm_mode_t             mlh_mode;
 };
 
@@ -95,19 +95,19 @@ void mdt_lock_handle_init(struct mdt_lock_handle *lh);
 void mdt_lock_handle_fini(struct mdt_lock_handle *lh);
 
 struct mdd_object {
-       struct md_object  mod_obj;
+        struct md_object  mod_obj;
 };
 
 struct osd_object {
-       struct lu_object  oo_lu;
-       struct dentry    *oo_dentry;
+        struct lu_object  oo_lu;
+        struct dentry    *oo_dentry;
 };
 
 int md_device_init(struct md_device *md, struct lu_device_type *t);
 void md_device_fini(struct md_device *md);
 
 enum {
-       MDT_REP_BUF_NR_MAX = 8
+        MDT_REP_BUF_NR_MAX = 8
 };
 
 enum {
@@ -121,28 +121,28 @@ enum {
  * reduce stack consumption.
  */
 struct mdt_thread_info {
-       struct mdt_device     *mti_mdt;
-       /*
-        * number of buffers in reply message.
-        */
-       int                    mti_rep_buf_nr;
-       /*
-        * sizes of reply buffers.
-        */
-       int                    mti_rep_buf_size[MDT_REP_BUF_NR_MAX];
-       /*
-        * Body for "habeo corpus" operations.
-        */
-       struct mds_body       *mti_body;
-       /*
-        * Host object. This is released at the end of mdt_handler().
-        */
-       struct mdt_object     *mti_object;
-       /*
-        * Additional fail id that can be set by handler. Passed to
-        * target_send_reply().
-        */
-       int                    mti_fail_id;
+        struct mdt_device     *mti_mdt;
+        /*
+         * number of buffers in reply message.
+         */
+        int                    mti_rep_buf_nr;
+        /*
+         * sizes of reply buffers.
+         */
+        int                    mti_rep_buf_size[MDT_REP_BUF_NR_MAX];
+        /*
+         * Body for "habeo corpus" operations.
+         */
+        struct mds_body       *mti_body;
+        /*
+         * Host object. This is released at the end of mdt_handler().
+         */
+        struct mdt_object     *mti_object;
+        /*
+         * Additional fail id that can be set by handler. Passed to
+         * target_send_reply().
+         */
+        int                    mti_fail_id;
         /*
          * A couple of lock handles.
          */
index cd9fb32..7c62010 100644 (file)
  */
 unsigned long mdt_num_threads;
 
+static int mdt_handle(struct ptlrpc_request *req);
+
 static int mdt_getstatus(struct mdt_thread_info *info,
-                        struct ptlrpc_request *req, int offset)
+                         struct ptlrpc_request *req, int offset)
 {
         struct md_device *mdd  = info->mti_mdt->mdt_child;
-       struct mds_body  *body;
-       int               size = sizeof *body;
-       int               result;
+        struct mds_body  *body;
+        int               size = sizeof *body;
+        int               result;
 
         ENTRY;
 
         result = lustre_pack_reply(req, 1, &size, NULL);
-       if (result)
+        if (result)
                 CERROR(LUSTRE_MDT0_NAME" out of memory for message: size=%d\n",
-                      size);
+                       size);
         else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK))
                 result = -ENOMEM;
         else {
-               body = lustre_msg_buf(req->rq_repmsg, 0, sizeof *body);
-               result = mdd->md_ops->mdo_root_get(mdd, &body->fid1);
-       }
+                body = lustre_msg_buf(req->rq_repmsg, 0, sizeof *body);
+                result = mdd->md_ops->mdo_root_get(mdd, &body->fid1);
+        }
 
         /* the last_committed and last_xid fields are filled in for all
          * replies already - no need to do so here also.
@@ -92,7 +94,7 @@ static int mdt_getstatus(struct mdt_thread_info *info,
 static int mdt_connect(struct mdt_thread_info *info,
                        struct ptlrpc_request *req, int offset)
 {
-        return -EOPNOTSUPP;
+        return target_handle_connect(req, mdt_handle);
 }
 
 static int mdt_disconnect(struct mdt_thread_info *info,
@@ -200,32 +202,32 @@ static struct lu_device_operations mdt_lu_ops;
 
 static int lu_device_is_mdt(struct lu_device *d)
 {
-       /*
-        * XXX for now. Tags in lu_device_type->ldt_something are needed.
-        */
-       return ergo(d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops);
+        /*
+         * XXX for now. Tags in lu_device_type->ldt_something are needed.
+         */
+        return ergo(d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops);
 }
 
 static struct mdt_object *mdt_obj(struct lu_object *o)
 {
-       LASSERT(lu_device_is_mdt(o->lo_dev));
-       return container_of(o, struct mdt_object, mot_obj.mo_lu);
+        LASSERT(lu_device_is_mdt(o->lo_dev));
+        return container_of(o, struct mdt_object, mot_obj.mo_lu);
 }
 
 struct mdt_object *mdt_object_find(struct mdt_device *d, struct ll_fid *f)
 {
-       struct lu_object *o;
+        struct lu_object *o;
 
-       o = lu_object_find(d->mdt_md_dev.md_lu_dev.ld_site, f);
-       if (IS_ERR(o))
-               return (struct mdt_object *)o;
-       else
-               return mdt_obj(o);
+        o = lu_object_find(d->mdt_md_dev.md_lu_dev.ld_site, f);
+        if (IS_ERR(o))
+                return (struct mdt_object *)o;
+        else
+                return mdt_obj(o);
 }
 
 void mdt_object_put(struct mdt_object *o)
 {
-       lu_object_put(&o->mot_obj.mo_lu);
+        lu_object_put(&o->mot_obj.mo_lu);
 }
 
 static struct ll_fid *mdt_object_fid(struct mdt_object *o)
@@ -268,19 +270,19 @@ struct mdt_object *mdt_object_find_lock(struct mdt_device *d, struct ll_fid *f,
 }
 
 struct mdt_handler {
-       const char *mh_name;
-       int         mh_fail_id;
-       __u32       mh_opc;
-       __u32       mh_flags;
-       int (*mh_act)(struct mdt_thread_info *info,
+        const char *mh_name;
+        int         mh_fail_id;
+        __u32       mh_opc;
+        __u32       mh_flags;
+        int (*mh_act)(struct mdt_thread_info *info,
                       struct ptlrpc_request *req, int offset);
 };
 
 enum mdt_handler_flags {
-       /*
-        * struct mds_body is passed in the 0-th incoming buffer.
-        */
-       HABEO_CORPUS = (1 << 0)
+        /*
+         * struct mds_body is passed in the 0-th incoming buffer.
+         */
+        HABEO_CORPUS = (1 << 0)
 };
 
 struct mdt_opc_slice {
@@ -293,66 +295,88 @@ static struct mdt_opc_slice mdt_handlers[];
 
 struct mdt_handler *mdt_handler_find(__u32 opc)
 {
-       struct mdt_opc_slice *s;
-       struct mdt_handler   *h;
+        struct mdt_opc_slice *s;
+        struct mdt_handler   *h;
+
+        h = NULL;
+        for (s = mdt_handlers; s->mos_hs != NULL; s++) {
+                if (s->mos_opc_start <= opc && opc < s->mos_opc_end) {
+                        h = s->mos_hs + (opc - s->mos_opc_start);
+                        if (h->mh_opc != 0)
+                                LASSERT(h->mh_opc == opc);
+                        else
+                                h = NULL; /* unsupported opc */
+                        break;
+                }
+        }
+        return h;
+}
 
-       h = NULL;
-       for (s = mdt_handlers; s->mos_hs != NULL; s++) {
-               if (s->mos_opc_start <= opc && opc < s->mos_opc_end) {
-                       h = s->mos_hs + (opc - s->mos_opc_start);
-                       if (h->mh_opc != 0)
-                               LASSERT(h->mh_opc == opc);
-                       else
-                               h = NULL; /* unsupported opc */
-                       break;
-               }
-       }
-       return h;
+static inline __u64 req_exp_last_xid(struct ptlrpc_request *req)
+{
+        return req->rq_export->exp_mds_data.med_mcd->mcd_last_xid;
 }
 
+/*
+ * Invoke handler for this request opc. Also do necessary preprocessing
+ * (according to handler ->mh_flags), and post-processing (setting of
+ * ->last_{xid,committed}).
+ */
 static int mdt_req_handle(struct mdt_thread_info *info,
-                         struct mdt_handler *h, struct ptlrpc_request *req,
-                         int shift)
+                          struct mdt_handler *h, struct ptlrpc_request *req,
+                          int shift)
 {
-       int result;
+        int result;
         int off;
 
-       ENTRY;
+        ENTRY;
 
-       LASSERT(h->mh_act != NULL);
-       LASSERT(h->mh_opc == req->rq_reqmsg->opc);
+        LASSERT(h->mh_act != NULL);
+        LASSERT(h->mh_opc == req->rq_reqmsg->opc);
+        LASSERT(current->journal_info == NULL);
 
-       DEBUG_REQ(D_INODE, req, "%s", h->mh_name);
+        DEBUG_REQ(D_INODE, req, "%s", h->mh_name);
 
-       if (h->mh_fail_id != 0)
-               OBD_FAIL_RETURN(h->mh_fail_id, 0);
+        if (h->mh_fail_id != 0)
+                OBD_FAIL_RETURN(h->mh_fail_id, 0);
 
-       off = MDS_REQ_REC_OFF + shift;
+        off = MDS_REQ_REC_OFF + shift;
         result = 0;
-       if (h->mh_flags & HABEO_CORPUS) {
-               info->mti_body = lustre_swab_reqbuf(req, off,
+        if (h->mh_flags & HABEO_CORPUS) {
+                info->mti_body = lustre_swab_reqbuf(req, off,
                                                     sizeof *info->mti_body,
-                                                   lustre_swab_mds_body);
-               if (info->mti_body == NULL) {
-                       CERROR("Can't unpack body\n");
-                       result = req->rq_status = -EFAULT;
-               }
-               info->mti_object = mdt_object_find(info->mti_mdt,
-                                                  &info->mti_body->fid1);
-               if (IS_ERR(info->mti_object))
-                       result = PTR_ERR(info->mti_object);
-       }
-       if (result == 0)
-               result = h->mh_act(info, req, off);
-       /*
-        * XXX result value is unconditionally shoved into ->rq_status
-        * (original code sometimes placed error code into ->rq_status, and
-        * sometimes returned it to the
-        * caller). ptlrpc_server_handle_request() doesn't check return value
-        * anyway.
-        */
-       req->rq_status = result;
-       RETURN(result);
+                                                    lustre_swab_mds_body);
+                if (info->mti_body == NULL) {
+                        CERROR("Can't unpack body\n");
+                        result = req->rq_status = -EFAULT;
+                }
+                info->mti_object = mdt_object_find(info->mti_mdt,
+                                                   &info->mti_body->fid1);
+                if (IS_ERR(info->mti_object))
+                        result = PTR_ERR(info->mti_object);
+        }
+        if (result == 0)
+                /*
+                 * Process request.
+                 */
+                result = h->mh_act(info, req, off);
+        /*
+         * XXX result value is unconditionally shoved into ->rq_status
+         * (original code sometimes placed error code into ->rq_status, and
+         * sometimes returned it to the
+         * caller). ptlrpc_server_handle_request() doesn't check return value
+         * anyway.
+         */
+        req->rq_status = result;
+
+        LASSERT(current->journal_info == NULL);
+
+        /* If we're DISCONNECTing, the mds_export_data is already freed */
+        if (result == 0 && h->mh_opc != MDS_DISCONNECT) {
+                req->rq_reqmsg->last_xid = le64_to_cpu(req_exp_last_xid(req));
+                target_committed_to_req(req);
+        }
+        RETURN(result);
 }
 
 void mdt_lock_handle_init(struct mdt_lock_handle *lh)
@@ -370,13 +394,13 @@ static void mdt_thread_info_init(struct mdt_thread_info *info)
 {
         int i;
 
-       memset(info, 0, sizeof *info);
-       info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET;
-       /*
-        * Poison size array.
-        */
-       for (i = 0; i < ARRAY_SIZE(info->mti_rep_buf_size); i++)
-               info->mti_rep_buf_size[i] = ~0;
+        memset(info, 0, sizeof *info);
+        info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET;
+        /*
+         * Poison size array.
+         */
+        for (i = 0; i < ARRAY_SIZE(info->mti_rep_buf_size); i++)
+                info->mti_rep_buf_size[i] = ~0;
         info->mti_rep_buf_nr = i;
         for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
                 mdt_lock_handle_init(&info->mti_lh[i]);
@@ -386,10 +410,10 @@ static void mdt_thread_info_fini(struct mdt_thread_info *info)
 {
         int i;
 
-       if (info->mti_object != NULL) {
-               mdt_object_put(info->mti_object);
-               info->mti_object = NULL;
-       }
+        if (info->mti_object != NULL) {
+                mdt_object_put(info->mti_object);
+                info->mti_object = NULL;
+        }
         for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
                 mdt_lock_handle_fini(&info->mti_lh[i]);
 }
@@ -462,110 +486,170 @@ static int mds_msg_check_version(struct lustre_msg *msg)
         return rc;
 }
 
-static int mdt_handle0(struct ptlrpc_request *req, struct mdt_thread_info *info)
+static int mdt_filter_recovery_request(struct ptlrpc_request *req,
+                                       struct obd_device *obd, int *process)
 {
-        int rc;
-        struct mds_obd *mds = NULL; /* quell gcc overwarning */
-        struct obd_device *obd = NULL;
-       struct mdt_handler *h;
+        switch (req->rq_reqmsg->opc) {
+        case MDS_CONNECT: /* This will never get here, but for completeness. */
+        case OST_CONNECT: /* This will never get here, but for completeness. */
+        case MDS_DISCONNECT:
+        case OST_DISCONNECT:
+               *process = 1;
+               RETURN(0);
+
+        case MDS_CLOSE:
+        case MDS_SYNC: /* used in unmounting */
+        case OBD_PING:
+        case MDS_REINT:
+        case LDLM_ENQUEUE:
+                *process = target_queue_recovery_request(req, obd);
+                RETURN(0);
+
+        default:
+                DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
+                *process = 0;
+                /* XXX what should we set rq_status to here? */
+                req->rq_status = -EAGAIN;
+                RETURN(ptlrpc_error(req));
+        }
+}
+
+/*
+ * Handle recovery. Return:
+ *       +ve: continue request processing;
+ *       -ve: abort immediately with given (negated) error code;
+ *         0: send reply with error code in req->rq_status;
+ */
+static int mdt_recovery(struct ptlrpc_request *req)
+{
+        int recovering;
+        int abort_recovery;
+        struct obd_device *obd;
 
         ENTRY;
 
-        OBD_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
+        if (req->rq_reqmsg->opc == MDS_CONNECT)
+                RETURN(+1);
 
-        LASSERT(current->journal_info == NULL);
+        if (req->rq_export == NULL) {
+                CERROR("operation %d on unconnected MDS from %s\n",
+                       req->rq_reqmsg->opc,
+                       libcfs_id2str(req->rq_peer));
+                req->rq_status = -ENOTCONN;
+                RETURN(0);
+        }
 
-        rc = mds_msg_check_version(req->rq_reqmsg);
-        if (rc) {
-                CERROR(LUSTRE_MDT0_NAME" drops mal-formed request\n");
-                RETURN(rc);
+        /* sanity check: if the xid matches, the request must be marked as a
+         * resent or replayed */
+        LASSERTF(ergo(req->rq_xid == req_exp_last_xid(req),
+                      lustre_msg_get_flags(req->rq_reqmsg) &
+                      (MSG_RESENT | MSG_REPLAY)),
+                 "rq_xid "LPU64" matches last_xid, "
+                 "expected RESENT flag\n", req->rq_xid);
+
+        /* else: note the opposite is not always true; a RESENT req after a
+         * failover will usually not match the last_xid, since it was likely
+         * never committed. A REPLAYed request will almost never match the
+         * last xid, however it could for a committed, but still retained,
+         * open. */
+
+        obd = req->rq_export->exp_obd;
+
+        /* Check for aborted recovery... */
+        spin_lock_bh(&obd->obd_processing_task_lock);
+        abort_recovery = obd->obd_abort_recovery;
+        recovering = obd->obd_recovering;
+        spin_unlock_bh(&obd->obd_processing_task_lock);
+        if (abort_recovery) {
+                target_abort_recovery(obd);
+        } else if (recovering) {
+                int rc;
+                int should_process;
+
+                rc = mdt_filter_recovery_request(req, obd, &should_process);
+                if (rc != 0 || !should_process) {
+                        LASSERT(rc < 0);
+                        RETURN(rc);
+                }
         }
+        RETURN(+1);
+}
 
-        if (req->rq_reqmsg->opc != MDS_CONNECT) {
-                struct mds_export_data *med;
+static int mdt_reply(struct ptlrpc_request *req, struct mdt_thread_info *info)
+{
+        struct obd_device *obd;
+
+        if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
+                if (req->rq_reqmsg->opc != OBD_PING)
+                        DEBUG_REQ(D_ERROR, req, "Unexpected MSG_LAST_REPLAY");
 
-                if (req->rq_export == NULL) {
-                        CERROR("operation %d on unconnected MDS from %s\n",
-                               req->rq_reqmsg->opc,
-                               libcfs_id2str(req->rq_peer));
+                obd = req->rq_export != NULL ? req->rq_export->exp_obd : NULL;
+                if (obd && obd->obd_recovering) {
+                        DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
+                        RETURN(target_queue_final_reply(req, req->rq_status));
+                } else {
+                        /* Lost a race with recovery; let the error path
+                         * DTRT. */
                         req->rq_status = -ENOTCONN;
-                        GOTO(out, rc = -ENOTCONN);
                 }
-
-                med = &req->rq_export->exp_mds_data;
-                obd = req->rq_export->exp_obd;
-                mds = &obd->u.mds;
-
-                /* sanity check: if the xid matches, the request must
-                 * be marked as a resent or replayed */
-                LASSERTF(ergo(req->rq_xid == med->med_mcd->mcd_last_xid,
-                              lustre_msg_get_flags(req->rq_reqmsg) &
-                              (MSG_RESENT | MSG_REPLAY)),
-                         "rq_xid "LPU64" matches last_xid, "
-                         "expected RESENT flag\n", req->rq_xid);
-                /* else: note the opposite is not always true; a
-                 * RESENT req after a failover will usually not match
-                 * the last_xid, since it was likely never
-                 * committed. A REPLAYed request will almost never
-                 * match the last xid, however it could for a
-                 * committed, but still retained, open. */
-
-                /* Check for aborted recovery... */
         }
+        target_send_reply(req, req->rq_status, info->mti_fail_id);
+        RETURN(req->rq_status);
+}
+
+static int mdt_handle0(struct ptlrpc_request *req, struct mdt_thread_info *info)
+{
+        struct lustre_msg *msg;
+        int                result;
 
-       h = mdt_handler_find(req->rq_reqmsg->opc);
-       if (h != NULL) {
-               rc = mdt_req_handle(info, h, req, 0);
-       } else {
-                req->rq_status = -ENOTSUPP;
-                rc = ptlrpc_error(req);
-                RETURN(rc);
-       }
+        ENTRY;
 
-        LASSERT(current->journal_info == NULL);
+        OBD_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
 
-        /* If we're DISCONNECTing, the mds_export_data is already freed */
-        if (!rc && req->rq_reqmsg->opc != MDS_DISCONNECT) {
-                struct mds_export_data *med = &req->rq_export->exp_mds_data;
-                req->rq_repmsg->last_xid =
-                        le64_to_cpu(med->med_mcd->mcd_last_xid);
+        LASSERT(current->journal_info == NULL);
 
-                target_committed_to_req(req);
+        msg = req->rq_reqmsg;
+        result = mds_msg_check_version(msg);
+        if (result != 0) {
+                CERROR(LUSTRE_MDT0_NAME" drops mal-formed request\n");
+                RETURN(result);
         }
 
-        EXIT;
- out:
-
-        if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
-                if (obd && obd->obd_recovering) {
-                        DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
-                        RETURN(target_queue_final_reply(req, rc));
+        result = mdt_recovery(req);
+        if (result > 0) {
+                struct mdt_handler *h;
+
+                h = mdt_handler_find(msg->opc);
+                if (h != NULL)
+                        result = mdt_req_handle(info, h, req, 0);
+                else {
+                        req->rq_status = -ENOTSUPP;
+                        result = ptlrpc_error(req);
+                        RETURN(result);
                 }
-                /* Lost a race with recovery; let the error path DTRT. */
-                rc = req->rq_status = -ENOTCONN;
-        }
+        } else if (result < 0)
+                RETURN(result);
 
-        target_send_reply(req, rc, info->mti_fail_id);
-       RETURN(0);
+        RETURN(mdt_reply(req, info));
 }
 
 static struct mdt_device *mdt_dev(struct lu_device *d)
 {
-       LASSERT(lu_device_is_mdt(d));
-       return container_of(d, struct mdt_device, mdt_md_dev.md_lu_dev);
+        LASSERT(lu_device_is_mdt(d));
+        return container_of(d, struct mdt_device, mdt_md_dev.md_lu_dev);
 }
 
-int mdt_handle(struct ptlrpc_request *req)
+static int mdt_handle(struct ptlrpc_request *req)
 {
-       int result;
+        int result;
 
-       struct mdt_thread_info info; /* XXX on stack for now */
-       mdt_thread_info_init(&info);
-       info.mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
+        struct mdt_thread_info info; /* XXX on stack for now */
+        mdt_thread_info_init(&info);
+        info.mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
 
-       result = mdt_handle0(req, &info);
+        result = mdt_handle0(req, &info);
 
-       mdt_thread_info_fini(&info);
+        mdt_thread_info_fini(&info);
         return result;
 }
 
@@ -573,89 +657,89 @@ static int mdt_intent_policy(struct ldlm_namespace *ns,
                              struct ldlm_lock **lockp, void *req_cookie,
                              ldlm_mode_t mode, int flags, void *data)
 {
-       RETURN(ELDLM_LOCK_ABORTED);
+        return ELDLM_LOCK_ABORTED;
 }
 
 struct ptlrpc_service *ptlrpc_init_svc_conf(struct ptlrpc_service_conf *c,
-                                           svc_handler_t h, char *name,
-                                           struct proc_dir_entry *proc_entry,
-                                           svcreq_printfn_t prntfn)
+                                            svc_handler_t h, char *name,
+                                            struct proc_dir_entry *proc_entry,
+                                            svcreq_printfn_t prntfn)
 {
-       return ptlrpc_init_svc(c->psc_nbufs, c->psc_bufsize,
-                              c->psc_max_req_size, c->psc_max_reply_size,
-                              c->psc_req_portal, c->psc_rep_portal,
-                              c->psc_watchdog_timeout,
-                              h, name, proc_entry,
-                              prntfn, c->psc_num_threads);
+        return ptlrpc_init_svc(c->psc_nbufs, c->psc_bufsize,
+                               c->psc_max_req_size, c->psc_max_reply_size,
+                               c->psc_req_portal, c->psc_rep_portal,
+                               c->psc_watchdog_timeout,
+                               h, name, proc_entry,
+                               prntfn, c->psc_num_threads);
 }
 
 int md_device_init(struct md_device *md, struct lu_device_type *t)
 {
-       return lu_device_init(&md->md_lu_dev, t);
+        return lu_device_init(&md->md_lu_dev, t);
 }
 
 void md_device_fini(struct md_device *md)
 {
-       lu_device_fini(&md->md_lu_dev);
+        lu_device_fini(&md->md_lu_dev);
 }
 
 static void mdt_fini(struct lu_device *d)
 {
-       struct mdt_device *m = mdt_dev(d);
-
-       if (d->ld_site != NULL) {
-               lu_site_fini(d->ld_site);
-               d->ld_site = NULL;
-       }
-       if (m->mdt_service != NULL) {
-               ptlrpc_unregister_service(m->mdt_service);
-               m->mdt_service = NULL;
-       }
-       if (m->mdt_namespace != NULL) {
-               ldlm_namespace_free(m->mdt_namespace, 0);
-               m->mdt_namespace = NULL;
-       }
-       
-       LASSERT(atomic_read(&d->ld_ref) == 0);
-       md_device_fini(&m->mdt_md_dev);
+        struct mdt_device *m = mdt_dev(d);
+
+        if (d->ld_site != NULL) {
+                lu_site_fini(d->ld_site);
+                d->ld_site = NULL;
+        }
+        if (m->mdt_service != NULL) {
+                ptlrpc_unregister_service(m->mdt_service);
+                m->mdt_service = NULL;
+        }
+        if (m->mdt_namespace != NULL) {
+                ldlm_namespace_free(m->mdt_namespace, 0);
+                m->mdt_namespace = NULL;
+        }
+
+        LASSERT(atomic_read(&d->ld_ref) == 0);
+        md_device_fini(&m->mdt_md_dev);
 }
 
 static int mdt_init0(struct mdt_device *m,
                      struct lu_device_type *t, struct lustre_cfg *cfg)
 {
-       struct lu_site *s;
+        struct lu_site *s;
         char   ns_name[48];
 
         ENTRY;
 
-       OBD_ALLOC_PTR(s);
-       if (s == NULL)
-               return -ENOMEM;
-
-       md_device_init(&m->mdt_md_dev, t);
-
-       m->mdt_md_dev.md_lu_dev.ld_ops = &mdt_lu_ops;
-
-       m->mdt_service_conf.psc_nbufs            = MDS_NBUFS;
-       m->mdt_service_conf.psc_bufsize          = MDS_BUFSIZE;
-       m->mdt_service_conf.psc_max_req_size     = MDS_MAXREQSIZE;
-       m->mdt_service_conf.psc_max_reply_size   = MDS_MAXREPSIZE;
-       m->mdt_service_conf.psc_req_portal       = MDS_REQUEST_PORTAL;
-       m->mdt_service_conf.psc_rep_portal       = MDC_REPLY_PORTAL;
-       m->mdt_service_conf.psc_watchdog_timeout = MDS_SERVICE_WATCHDOG_TIMEOUT;
-       /*
-        * We'd like to have a mechanism to set this on a per-device basis,
-        * but alas...
-        */
-       m->mdt_service_conf.psc_num_threads = min(max(mdt_num_threads,
+        OBD_ALLOC_PTR(s);
+        if (s == NULL)
+                return -ENOMEM;
+
+        md_device_init(&m->mdt_md_dev, t);
+
+        m->mdt_md_dev.md_lu_dev.ld_ops = &mdt_lu_ops;
+
+        m->mdt_service_conf.psc_nbufs            = MDS_NBUFS;
+        m->mdt_service_conf.psc_bufsize          = MDS_BUFSIZE;
+        m->mdt_service_conf.psc_max_req_size     = MDS_MAXREQSIZE;
+        m->mdt_service_conf.psc_max_reply_size   = MDS_MAXREPSIZE;
+        m->mdt_service_conf.psc_req_portal       = MDS_REQUEST_PORTAL;
+        m->mdt_service_conf.psc_rep_portal       = MDC_REPLY_PORTAL;
+        m->mdt_service_conf.psc_watchdog_timeout = MDS_SERVICE_WATCHDOG_TIMEOUT;
+        /*
+         * We'd like to have a mechanism to set this on a per-device basis,
+         * but alas...
+         */
+        m->mdt_service_conf.psc_num_threads = min(max(mdt_num_threads,
                                                       MDT_MIN_THREADS),
-                                                 MDT_MAX_THREADS);
-       lu_site_init(s, &m->mdt_md_dev.md_lu_dev);
+                                                  MDT_MAX_THREADS);
+        lu_site_init(s, &m->mdt_md_dev.md_lu_dev);
 
         snprintf(ns_name, sizeof ns_name, LUSTRE_MDT0_NAME"-%p", m);
         m->mdt_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER);
         if (m->mdt_namespace == NULL)
-               return -ENOMEM;
+                return -ENOMEM;
         ldlm_register_intent(m->mdt_namespace, mdt_intent_policy);
 
         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
@@ -666,54 +750,54 @@ static int mdt_init0(struct mdt_device *m,
                                      LUSTRE_MDT0_NAME,
                                      m->mdt_md_dev.md_lu_dev.ld_proc_entry,
                                      NULL);
-       if (m->mdt_service == NULL)
-               return -ENOMEM;
+        if (m->mdt_service == NULL)
+                return -ENOMEM;
 
-       return ptlrpc_start_threads(NULL, m->mdt_service, LUSTRE_MDT0_NAME);
+        return ptlrpc_start_threads(NULL, m->mdt_service, LUSTRE_MDT0_NAME);
 }
 
 struct lu_object *mdt_object_alloc(struct lu_device *d)
 {
-       struct mdt_object *mo;
+        struct mdt_object *mo;
 
-       OBD_ALLOC_PTR(mo);
-       if (mo != NULL) {
-               struct lu_object *o;
-               struct lu_object_header *h;
+        OBD_ALLOC_PTR(mo);
+        if (mo != NULL) {
+                struct lu_object *o;
+                struct lu_object_header *h;
 
-               o = &mo->mot_obj.mo_lu;
-               h = &mo->mot_header;
-               lu_object_header_init(h);
-               lu_object_init(o, h, d);
-               /* ->lo_depth and ->lo_flags are automatically 0 */
-               lu_object_add_top(h, o);
-               return o;
-       } else
-               return NULL;
+                o = &mo->mot_obj.mo_lu;
+                h = &mo->mot_header;
+                lu_object_header_init(h);
+                lu_object_init(o, h, d);
+                /* ->lo_depth and ->lo_flags are automatically 0 */
+                lu_object_add_top(h, o);
+                return o;
+        } else
+                return NULL;
 }
 
 int mdt_object_init(struct lu_object *o)
 {
-       struct mdt_device *d = mdt_dev(o->lo_dev);
-       struct lu_device  *under;
-       struct lu_object  *below;
+        struct mdt_device *d = mdt_dev(o->lo_dev);
+        struct lu_device  *under;
+        struct lu_object  *below;
 
-       under = &d->mdt_child->md_lu_dev;
-       below = under->ld_ops->ldo_object_alloc(under);
-       if (below != NULL) {
-               lu_object_add(o, below);
-               return 0;
-       } else
-               return -ENOMEM;
+        under = &d->mdt_child->md_lu_dev;
+        below = under->ld_ops->ldo_object_alloc(under);
+        if (below != NULL) {
+                lu_object_add(o, below);
+                return 0;
+        } else
+                return -ENOMEM;
 }
 
 void mdt_object_free(struct lu_object *o)
 {
-       struct lu_object_header *h;
+        struct lu_object_header *h;
 
-       h = o->lo_header;
-       lu_object_fini(o);
-       lu_object_header_fini(h);
+        h = o->lo_header;
+        lu_object_fini(o);
+        lu_object_header_fini(h);
 }
 
 void mdt_object_release(struct lu_object *o)
@@ -722,15 +806,15 @@ void mdt_object_release(struct lu_object *o)
 
 int mdt_object_print(struct seq_file *f, const struct lu_object *o)
 {
-       return seq_printf(f, LUSTRE_MDT0_NAME"-object@%p", o);
+        return seq_printf(f, LUSTRE_MDT0_NAME"-object@%p", o);
 }
 
 static struct lu_device_operations mdt_lu_ops = {
-       .ldo_object_alloc   = mdt_object_alloc,
-       .ldo_object_init    = mdt_object_init,
-       .ldo_object_free    = mdt_object_free,
-       .ldo_object_release = mdt_object_release,
-       .ldo_object_print   = mdt_object_print
+        .ldo_object_alloc   = mdt_object_alloc,
+        .ldo_object_init    = mdt_object_init,
+        .ldo_object_free    = mdt_object_free,
+        .ldo_object_release = mdt_object_release,
+        .ldo_object_print   = mdt_object_print
 };
 
 struct md_object *mdt_object_child(struct mdt_object *o)
@@ -746,16 +830,16 @@ static inline struct md_device_operations *mdt_child_ops(struct mdt_device *d)
 int mdt_mkdir(struct mdt_thread_info *info, struct mdt_device *d,
               struct ll_fid *pfid, const char *name, struct ll_fid *cfid)
 {
-       struct mdt_object      *o;
-       struct mdt_object      *child;
+        struct mdt_object      *o;
+        struct mdt_object      *child;
         struct mdt_lock_handle *lh;
 
-       int result;
+        int result;
 
         (lh = &info->mti_lh[MDT_LH_PARENT])->mlh_mode = LCK_PW;
 
-       o = mdt_object_find_lock(d, pfid, lh);
-       if (IS_ERR(o))
+        o = mdt_object_find_lock(d, pfid, lh);
+        if (IS_ERR(o))
                 return PTR_ERR(o);
 
         child = mdt_object_find(d, cfid);
@@ -767,7 +851,7 @@ int mdt_mkdir(struct mdt_thread_info *info, struct mdt_device *d,
                 result = PTR_ERR(child);
         mdt_object_unlock(o, lh);
         mdt_object_put(o);
-       return result;
+        return result;
 }
 
 static struct obd_ops mdt_obd_device_ops = {
@@ -852,7 +936,7 @@ static int __init mdt_mod_init(void)
                 if (result != 0)
                         class_unregister_type(LUSTRE_MDT0_NAME);
         }
-       return result;
+        return result;
 }
 
 static void __exit mdt_mod_exit(void)
@@ -861,35 +945,35 @@ static void __exit mdt_mod_exit(void)
 }
 
 
-#define DEF_HNDL(prefix, base, flags, opc, fn)                 \
-[prefix ## _ ## opc - prefix ## _ ## base] = {                 \
-       .mh_name    = #opc,                                     \
-       .mh_fail_id = OBD_FAIL_ ## prefix ## _  ## opc ## _NET, \
-       .mh_opc     = prefix ## _  ## opc,                      \
-       .mh_flags   = flags,                                    \
-       .mh_act     = fn                                        \
+#define DEF_HNDL(prefix, base, flags, opc, fn)                        \
+[prefix ## _ ## opc - prefix ## _ ## base] = {                        \
+        .mh_name    = #opc,                                        \
+        .mh_fail_id = OBD_FAIL_ ## prefix ## _  ## opc ## _NET,        \
+        .mh_opc     = prefix ## _  ## opc,                        \
+        .mh_flags   = flags,                                        \
+        .mh_act     = fn                                        \
 }
 
 #define DEF_MDT_HNDL(flags, name, fn) DEF_HNDL(MDS, GETATTR, flags, name, fn)
 
 static struct mdt_handler mdt_mds_ops[] = {
-       DEF_MDT_HNDL(0,            CONNECT,        mdt_connect),
-       DEF_MDT_HNDL(0,            DISCONNECT,     mdt_disconnect),
-       DEF_MDT_HNDL(0,            GETSTATUS,      mdt_getstatus),
-       DEF_MDT_HNDL(HABEO_CORPUS, GETATTR,        mdt_getattr),
-       DEF_MDT_HNDL(HABEO_CORPUS, GETATTR_NAME,   mdt_getattr_name),
-       DEF_MDT_HNDL(HABEO_CORPUS, SETXATTR,       mdt_setxattr),
-       DEF_MDT_HNDL(HABEO_CORPUS, GETXATTR,       mdt_getxattr),
-       DEF_MDT_HNDL(0,            STATFS,         mdt_statfs),
-       DEF_MDT_HNDL(HABEO_CORPUS, READPAGE,       mdt_readpage),
-       DEF_MDT_HNDL(0,            REINT,          mdt_reint),
-       DEF_MDT_HNDL(HABEO_CORPUS, CLOSE,          mdt_close),
-       DEF_MDT_HNDL(HABEO_CORPUS, DONE_WRITING,   mdt_done_writing),
-       DEF_MDT_HNDL(0,            PIN,            mdt_pin),
-       DEF_MDT_HNDL(HABEO_CORPUS, SYNC,           mdt_sync),
-       DEF_MDT_HNDL(0,            SET_INFO,       mdt_set_info),
-       DEF_MDT_HNDL(0,            QUOTACHECK,     mdt_handle_quotacheck),
-       DEF_MDT_HNDL(0,            QUOTACTL,       mdt_handle_quotactl),
+        DEF_MDT_HNDL(0,            CONNECT,        mdt_connect),
+        DEF_MDT_HNDL(0,            DISCONNECT,     mdt_disconnect),
+        DEF_MDT_HNDL(0,            GETSTATUS,      mdt_getstatus),
+        DEF_MDT_HNDL(HABEO_CORPUS, GETATTR,        mdt_getattr),
+        DEF_MDT_HNDL(HABEO_CORPUS, GETATTR_NAME,   mdt_getattr_name),
+        DEF_MDT_HNDL(HABEO_CORPUS, SETXATTR,       mdt_setxattr),
+        DEF_MDT_HNDL(HABEO_CORPUS, GETXATTR,       mdt_getxattr),
+        DEF_MDT_HNDL(0,            STATFS,         mdt_statfs),
+        DEF_MDT_HNDL(HABEO_CORPUS, READPAGE,       mdt_readpage),
+        DEF_MDT_HNDL(0,            REINT,          mdt_reint),
+        DEF_MDT_HNDL(HABEO_CORPUS, CLOSE,          mdt_close),
+        DEF_MDT_HNDL(HABEO_CORPUS, DONE_WRITING,   mdt_done_writing),
+        DEF_MDT_HNDL(0,            PIN,            mdt_pin),
+        DEF_MDT_HNDL(HABEO_CORPUS, SYNC,           mdt_sync),
+        DEF_MDT_HNDL(0,            SET_INFO,       mdt_set_info),
+        DEF_MDT_HNDL(0,            QUOTACHECK,     mdt_handle_quotacheck),
+        DEF_MDT_HNDL(0,            QUOTACTL,       mdt_handle_quotactl),
 };
 
 static struct mdt_handler mdt_obd_ops[] = {
@@ -902,28 +986,28 @@ static struct mdt_handler mdt_llog_ops[] = {
 };
 
 static struct mdt_opc_slice mdt_handlers[] = {
-       {
-               .mos_opc_start = MDS_GETATTR,
-               .mos_opc_end   = MDS_LAST_OPC,
-               .mos_hs        = mdt_mds_ops
-       },
-       {
-               .mos_opc_start = OBD_PING,
-               .mos_opc_end   = OBD_LAST_OPC,
-               .mos_hs        = mdt_obd_ops
-       },
-       {
-               .mos_opc_start = LDLM_ENQUEUE,
-               .mos_opc_end   = LDLM_LAST_OPC,
-               .mos_hs        = mdt_dlm_ops
-       },
-       {
-               .mos_opc_start = LLOG_ORIGIN_HANDLE_CREATE,
-               .mos_opc_end   = LLOG_LAST_OPC,
-               .mos_hs        = mdt_llog_ops
-       },
         {
-               .mos_hs        = NULL
+                .mos_opc_start = MDS_GETATTR,
+                .mos_opc_end   = MDS_LAST_OPC,
+                .mos_hs        = mdt_mds_ops
+        },
+        {
+                .mos_opc_start = OBD_PING,
+                .mos_opc_end   = OBD_LAST_OPC,
+                .mos_hs        = mdt_obd_ops
+        },
+        {
+                .mos_opc_start = LDLM_ENQUEUE,
+                .mos_opc_end   = LDLM_LAST_OPC,
+                .mos_hs        = mdt_dlm_ops
+        },
+        {
+                .mos_opc_start = LLOG_ORIGIN_HANDLE_CREATE,
+                .mos_opc_end   = LLOG_LAST_OPC,
+                .mos_hs        = mdt_llog_ops
+        },
+        {
+                .mos_hs        = NULL
         }
 };