Whamcloud - gitweb
- merge recovery from cmd2
authortappro <tappro>
Mon, 2 Oct 2006 07:30:21 +0000 (07:30 +0000)
committertappro <tappro>
Mon, 2 Oct 2006 07:30:21 +0000 (07:30 +0000)
26 files changed:
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_export.h
lustre/include/lustre_lib.h
lustre/include/obd.h
lustre/include/obd_class.h
lustre/ldlm/ldlm_lib.c
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c
lustre/mds/handler.c
lustre/mds/mds_fs.c
lustre/mds/mds_lov.c
lustre/mds/mds_reint.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_open.c
lustre/mdt/mdt_recovery.c
lustre/mgs/mgs_handler.c
lustre/obdclass/genops.c
lustre/obdclass/obd_config.c
lustre/obdfilter/filter.c
lustre/ost/ost_handler.c
lustre/ptlrpc/client.c
lustre/ptlrpc/import.c
lustre/ptlrpc/pack_generic.c
lustre/ptlrpc/recover.c
lustre/utils/wirecheck.c

index ce8b8cb..c6fd621 100644 (file)
@@ -409,6 +409,8 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb);
 #define MSG_LAST_REPLAY        1
 #define MSG_RESENT             2
 #define MSG_REPLAY             4
+#define MSG_REQ_REPLAY_DONE    8
+#define MSG_LOCK_REPLAY_DONE  16
 
 /*
  * Flags for all connect opcodes (MDS_CONNECT, OST_CONNECT)
@@ -421,7 +423,8 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb);
 #define MSG_CONNECT_LIBCLIENT   0x10
 #define MSG_CONNECT_INITIAL     0x20
 #define MSG_CONNECT_ASYNC       0x40
-#define MSG_CONNECT_NEXT_VER    0x80 /* use next version of lustre_msg */
+#define MSG_CONNECT_NEXT_VER    0x80  /* use next version of lustre_msg */
+#define MSG_CONNECT_TRANSNO     0x100 /* report transno */
 
 /* Connect flags */
 #define OBD_CONNECT_RDONLY         0x1ULL /* client allowed read-only access */
@@ -479,18 +482,18 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb);
  * almost certainly will, then perhaps we stick a union in here. */
 struct obd_connect_data {
         __u64 ocd_connect_flags;        /* OBD_CONNECT_* per above */
+        __u64 ocd_transno;              /* first transno from client to be replayed */
+        __u64 ocd_ibits_known;          /* inode bits this client understands */
         __u32 ocd_version;              /* lustre release version number */
         __u32 ocd_grant;                /* initial cache grant amount (bytes) */
         __u32 ocd_index;                /* LOV index to connect to */
         __u32 ocd_brw_size;             /* Maximum BRW size in bytes */
-        __u64 ocd_ibits_known;          /* inode bits this client understands */
         __u32 ocd_nllu;                 /* non-local-lustre-user */
         __u32 ocd_nllg;                 /* non-local-lustre-group */
         __u32 ocd_group;                /* MDS group on OST */
         __u32 padding1;                 /* also fix lustre_swab_connect */
         __u64 padding2;                 /* also fix lustre_swab_connect */
         __u64 padding3;                 /* also fix lustre_swab_connect */
-        __u64 padding4;                 /* also fix lustre_swab_connect */
 };
 
 extern void lustre_swab_connect(struct obd_connect_data *ocd);
index 896d3e8..51e8bf4 100644 (file)
@@ -80,6 +80,7 @@ struct filter_export_data {
 struct obd_export {
         struct portals_handle     exp_handle;
         atomic_t                  exp_refcount;
+        atomic_t                  exp_rpc_count;
         struct obd_uuid           exp_client_uuid;
         struct list_head          exp_obd_chain;
         /* exp_obd_chain_timed fo ping evictor, protected by obd_dev_lock */
@@ -96,9 +97,11 @@ struct obd_export {
         __u64                     exp_connect_flags;
         int                       exp_flags;
         unsigned int              exp_failed:1,
+                                  exp_connected:1,
                                   exp_disconnected:1,
                                   exp_connecting:1,
-                                  exp_replay_needed:1,
+                                  exp_req_replay_needed:1,
+                                  exp_lock_replay_needed:1,
                                   exp_libclient:1; /* liblustre client? */
         union {
                 struct mds_export_data    eu_mds_data;
index 8e865ca..4233d36 100644 (file)
@@ -51,7 +51,7 @@ struct obd_export;
 #include <lustre_net.h>
 #include <lvfs.h>
 
-int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler);
+int target_handle_connect(struct ptlrpc_request *req);
 int target_handle_disconnect(struct ptlrpc_request *req);
 void target_destroy_export(struct obd_export *exp);
 int target_handle_reconnect(struct lustre_handle *conn, struct obd_export *exp,
@@ -71,8 +71,10 @@ int target_handle_dqacq_callback(struct ptlrpc_request *req);
 void target_cancel_recovery_timer(struct obd_device *obd);
 
 #define OBD_RECOVERY_TIMEOUT (obd_timeout * 5 / 2) /* *waves hands* */
-void target_start_recovery_timer(struct obd_device *obd, svc_handler_t handler);
-void target_abort_recovery(void *data);
+void target_start_recovery_timer(struct obd_device *obd);
+int target_start_recovery_thread(struct obd_device *obd, 
+                                  svc_handler_t handler);
+void target_stop_recovery_thread(struct obd_device *obd);
 void target_cleanup_recovery(struct obd_device *obd);
 int target_queue_recovery_request(struct ptlrpc_request *req,
                                   struct obd_device *obd);
index 8515aae..3493fec 100644 (file)
@@ -848,6 +848,13 @@ struct obd_notify_upcall {
         void *onu_owner;
 };
 
+struct target_recovery_data {
+        svc_handler_t     trd_recovery_handler;
+        pid_t             trd_processing_task;
+        struct completion trd_starting;
+        struct completion trd_finishing;
+};
+
 /* corresponds to one of the obd's */
 #define MAX_OBD_NAME 128
 #define OBD_DEVICE_MAGIC        0XAB5CD6EF
@@ -869,6 +876,7 @@ struct obd_device {
                      obd_replayable:1,    /* recovery is enabled; inform clients */
                      obd_no_transno:1,    /* no committed-transno notification */
                      obd_no_recov:1,      /* fail instead of retry messages */
+                     obd_req_replaying:1, /* replaying requests */
                      obd_stopping:1,      /* started cleanup */
                      obd_starting:1,      /* started setup */
                      obd_force:1,         /* cleanup with > 0 obd refcount */
@@ -900,11 +908,13 @@ struct obd_device {
 
         /* XXX encapsulate all this recovery data into one struct */
         svc_handler_t                    obd_recovery_handler;
+        pid_t                            obd_processing_task;
+        
+        /* common recovery fields for b1_x and CMD2 */
         int                              obd_max_recoverable_clients;
         int                              obd_connected_clients;
         int                              obd_recoverable_clients;
         spinlock_t                       obd_processing_task_lock; /* BH lock (timer) */
-        pid_t                            obd_processing_task;
         __u64                            obd_next_recovery_transno;
         int                              obd_replayed_requests;
         int                              obd_requests_queued_for_recovery;
@@ -912,10 +922,18 @@ struct obd_device {
         struct list_head                 obd_uncommitted_replies;
         spinlock_t                       obd_uncommitted_replies_lock;
         cfs_timer_t                      obd_recovery_timer;
-        struct list_head                 obd_recovery_queue;
-        struct list_head                 obd_delayed_reply_queue;
         time_t                           obd_recovery_start;
         time_t                           obd_recovery_end;
+        
+        /* new recovery stuff from CMD2 */
+        struct target_recovery_data      obd_recovery_data;
+        int                              obd_replayed_locks;
+        atomic_t                         obd_req_replay_clients;
+        atomic_t                         obd_lock_replay_clients;
+        struct list_head                 obd_req_replay_queue;
+        struct list_head                 obd_lock_replay_queue;
+        struct list_head                 obd_final_req_queue;
+        int                              obd_recovery_stage;
 
         union {
                 struct obd_device_target obt;
index b69db25..d4ca3d1 100644 (file)
@@ -176,7 +176,8 @@ int class_connect(struct lustre_handle *conn, struct obd_device *obd,
 int class_disconnect(struct obd_export *exp);
 void class_fail_export(struct obd_export *exp);
 void class_disconnect_exports(struct obd_device *obddev);
-void class_disconnect_stale_exports(struct obd_device *obddev);
+int class_disconnect_stale_exports(struct obd_device *, 
+                                    int (*test_export)(struct obd_export *));
 int class_manual_cleanup(struct obd_device *obd);
 
 void obdo_cpy_md(struct obdo *dst, struct obdo *src, obd_flag valid);
index 0a6cb18..8400b01 100644 (file)
@@ -98,7 +98,6 @@ static int import_set_conn(struct obd_import *imp, struct obd_uuid *uuid,
         } else {
                 spin_unlock(&imp->imp_lock);
                 GOTO(out_free, rc = -ENOENT);
-
         }
 
         spin_unlock(&imp->imp_lock);
@@ -539,7 +538,7 @@ int target_handle_reconnect(struct lustre_handle *conn, struct obd_export *exp,
         RETURN(0);
 }
 
-int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
+int target_handle_connect(struct ptlrpc_request *req)
 {
         struct obd_device *target, *targref = NULL;
         struct obd_export *export = NULL;
@@ -550,7 +549,8 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
         struct obd_uuid remote_uuid;
         struct list_head *p;
         char *str, *tmp;
-        int rc = 0, abort_recovery;
+        int rc = 0;
+        int initial_conn = 0;
         struct obd_connect_data *data;
         int size[2] = { sizeof(struct ptlrpc_body), sizeof(*data) };
         ENTRY;
@@ -566,17 +566,9 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
 
         obd_str2uuid (&tgtuuid, str);
         target = class_uuid2obd(&tgtuuid);
-        /* COMPAT_146 */
-        /* old (pre 1.6) lustre_process_log tries to connect to mdsname
-           (eg. mdsA) instead of uuid. */
-        if (!target) {
-                snprintf((char *)tgtuuid.uuid, sizeof(tgtuuid), "%s_UUID", str);
-                target = class_uuid2obd(&tgtuuid);
-        }
         if (!target)
                 target = class_name2obd(str);
-        /* end COMPAT_146 */
-
+        
         if (!target || target->obd_stopping || !target->obd_set_up) {
                 DEBUG_REQ(D_ERROR, req, "UUID '%s' is not available "
                           " for connect (%s)", str,
@@ -615,12 +607,6 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
                 LBUG();
         }
 
-        spin_lock_bh(&target->obd_processing_task_lock);
-        abort_recovery = target->obd_abort_recovery;
-        spin_unlock_bh(&target->obd_processing_task_lock);
-        if (abort_recovery)
-                target_abort_recovery(target);
-
         tmp = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2, sizeof conn);
         if (tmp == NULL)
                 GOTO(out, rc = -EPROTO);
@@ -629,6 +615,10 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
 
         data = lustre_swab_reqbuf(req, REQ_REC_OFF + 3, sizeof(*data),
                                   lustre_swab_connect);
+
+        if (!data)
+                GOTO(out, rc = -EPROTO);
+        
         rc = lustre_pack_reply(req, 2, size, NULL);
         if (rc)
                 GOTO(out, rc);
@@ -658,6 +648,9 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
                 }
         }
 
+        if (lustre_msg_get_op_flags(req->rq_reqmsg) & MSG_CONNECT_INITIAL)
+                initial_conn = 1;
+
         /* lctl gets a backstage, all-access pass. */
         if (obd_uuid_equals(&cluuid, &target->obd_uuid))
                 goto dont_check_exports;
@@ -682,10 +675,25 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
                 }
                 export = NULL;
         }
+        
         /* If we found an export, we already unlocked. */
         if (!export) {
                 spin_unlock(&target->obd_dev_lock);
                 OBD_FAIL_TIMEOUT(OBD_FAIL_TGT_DELAY_CONNECT, 2 * obd_timeout);
+        } else if (req->rq_export == NULL && 
+                   atomic_read(&export->exp_rpc_count) > 0) {
+                CWARN("%s: refuse connection from %s/%s to 0x%p/%d\n",
+                      target->obd_name, cluuid.uuid,
+                      libcfs_nid2str(req->rq_peer.nid),
+                      export, atomic_read(&export->exp_refcount));
+                GOTO(out, rc = -EBUSY);
+        } else if (req->rq_export != NULL &&
+                   atomic_read(&export->exp_rpc_count) > 1) {
+                CWARN("%s: refuse reconnection from %s@%s to 0x%p/%d\n",
+                      target->obd_name, cluuid.uuid,
+                      libcfs_nid2str(req->rq_peer.nid),
+                      export, atomic_read(&export->exp_rpc_count));
+                GOTO(out, rc = -EBUSY);
         } else if (lustre_msg_get_conn_cnt(req->rq_reqmsg) == 1) {
                 CERROR("%s: NID %s (%s) reconnected with 1 conn_cnt; "
                        "cookies not random?\n", target->obd_name,
@@ -697,18 +705,22 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
 
         /* We want to handle EALREADY but *not* -EALREADY from
          * target_handle_reconnect(), return reconnection state in a flag */
+        //XXX: check this
         if (rc == EALREADY) {
                 lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECONNECT);
                 rc = 0;
         } else if (rc) {
                 GOTO(out, rc);
         }
-
         /* Tell the client if we're in recovery. */
         /* If this is the first client, start the recovery timer */
+        CWARN("%s: connection from %s@%s %st"LPU64"\n", target->obd_name,
+              cluuid.uuid, libcfs_nid2str(req->rq_peer.nid),
+              target->obd_recovering ? "recovering/" : "", data->ocd_transno);
+
         if (target->obd_recovering) {
                 lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECOVERING);
-                target_start_recovery_timer(target, handler);
+                target_start_recovery_timer(target/*, handler*/);
         }
 
         /* Tell the client if we support replayable requests */
@@ -733,17 +745,16 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
         } else {
                 rc = obd_reconnect(export, target, &cluuid, data);
         }
-
         if (rc)
                 GOTO(out, rc);
-
         /* Return only the parts of obd_connect_data that we understand, so the
          * client knows that we don't understand the rest. */
-        if (data)
+        if (data) {
+                //data->ocd_connect_flags &= OBD_CONNECT_SUPPORTED;
                 memcpy(lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
-                                      sizeof(*data)),
-                       data, sizeof(*data));
-
+                                      sizeof(*data)), data, sizeof(*data));
+        }
+        
         /* If all else goes well, this is our RPC return code. */
         req->rq_status = 0;
 
@@ -771,7 +782,9 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
         req->rq_export = export;
 
         spin_lock(&export->exp_lock);
-        if (export->exp_conn_cnt >= lustre_msg_get_conn_cnt(req->rq_reqmsg)) {
+        if (initial_conn) {
+                lustre_msg_set_conn_cnt(req->rq_repmsg, export->exp_conn_cnt + 1);
+        } else if (export->exp_conn_cnt >= lustre_msg_get_conn_cnt(req->rq_reqmsg)) {
                 CERROR("%s: %s already connected at higher conn_cnt: %d > %d\n",
                        cluuid.uuid, libcfs_nid2str(req->rq_peer.nid),
                        export->exp_conn_cnt,
@@ -804,16 +817,33 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
                 GOTO(out, rc = 0);
         }
 
-        if (target->obd_recovering)
+        spin_lock_bh(&target->obd_processing_task_lock);
+        if (target->obd_recovering && export->exp_connected == 0) {
+                export->exp_connected = 1;
+                if ((lustre_msg_get_op_flags(req->rq_reqmsg) & MSG_CONNECT_TRANSNO)
+                     && data->ocd_transno < target->obd_next_recovery_transno)
+                        target->obd_next_recovery_transno = data->ocd_transno;
                 target->obd_connected_clients++;
-
+                if (target->obd_connected_clients == target->obd_max_recoverable_clients)
+                        wake_up(&target->obd_next_transno_waitq);
+        }
+        spin_unlock_bh(&target->obd_processing_task_lock);
         memcpy(&conn,
                lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2, sizeof conn),
                sizeof conn);
 
         if (export->exp_imp_reverse != NULL)
                 destroy_import(export->exp_imp_reverse);
+
+        /* for the rest part, we return -ENOTCONN in case of errors
+         * in order to let client initialize connection again.
+         */
         revimp = export->exp_imp_reverse = class_new_import(target);
+        if (!revimp) {
+                CERROR("fail to alloc new reverse import.\n");
+                GOTO(out, rc = -ENOTCONN);
+        }
+        
         revimp->imp_connection = ptlrpc_connection_addref(export->exp_connection);
         revimp->imp_client = &export->exp_obd->obd_ldlm_client;
         revimp->imp_remote_handle = conn;
@@ -875,44 +905,59 @@ void target_destroy_export(struct obd_export *exp)
  * Recovery functions
  */
 
-
-static
-struct ptlrpc_request *target_save_req(struct ptlrpc_request *src)
+struct ptlrpc_request *
+ptlrpc_clone_req( struct ptlrpc_request *orig_req) 
 {
-        struct ptlrpc_request *req;
-        struct lustre_msg *reqmsg;
+        struct ptlrpc_request *copy_req;
+        struct lustre_msg *copy_reqmsg;
 
-        OBD_ALLOC_PTR(req);
-        if (!req)
+        OBD_ALLOC_PTR(copy_req);
+        if (!copy_req)
                 return NULL;
-
-        OBD_ALLOC(reqmsg, src->rq_reqlen);
-        if (!reqmsg) {
-                OBD_FREE_PTR(req);
+        OBD_ALLOC(copy_reqmsg, orig_req->rq_reqlen);
+        if (!copy_reqmsg){
+                OBD_FREE_PTR(copy_req);
                 return NULL;
         }
 
-        *req = *src;
-        memcpy(reqmsg, src->rq_reqmsg, src->rq_reqlen);
-        req->rq_reqmsg = reqmsg;
-
-        class_export_get(req->rq_export);
-        CFS_INIT_LIST_HEAD(&req->rq_list);
-        sptlrpc_svc_ctx_addref(req);
-        if (req->rq_reply_state)
-                ptlrpc_rs_addref(req->rq_reply_state);
+        *copy_req = *orig_req;
+        memcpy(copy_reqmsg, orig_req->rq_reqmsg, orig_req->rq_reqlen);
+        orig_req->rq_svc_ctx = NULL;
+        orig_req->rq_reply_state = NULL;
+
+        copy_req->rq_reqmsg = copy_reqmsg;
+        class_export_get(copy_req->rq_export);
+        CFS_INIT_LIST_HEAD(&copy_req->rq_list);
+#if 0
+        sptlrpc_svc_ctx_addref(copy_req);
+        if (copy_req->rq_reply_state)
+               ptlrpc_rs_addref(copy_req->rq_reply_state);
+        /* the copied req takes over the reply state and security data */
+        if (orig_req->rq_reply_state) {
+                ptlrpc_rs_decref(orig_req->rq_reply_state);
+                orig_req->rq_reply_state = NULL;
+        }
+        sptlrpc_svc_ctx_decref(orig_req);
+#endif
+        return copy_req;
+}
 
-        /* repmsg have been taken over, in privacy mode this might point to
-         * invalid data. prevent further access on it.
-         */
-        src->rq_repmsg = NULL;
-        src->rq_replen = 0;
+void ptlrpc_free_clone( struct ptlrpc_request *req) 
+{
+        if (req->rq_reply_state) {
+                ptlrpc_rs_decref(req->rq_reply_state);
+                req->rq_reply_state = NULL;
+        }
 
-        return req;
+        sptlrpc_svc_ctx_decref(req);
+        class_export_put(req->rq_export);
+        list_del(&req->rq_list);
+        OBD_FREE(req->rq_reqmsg, req->rq_reqlen);
+        OBD_FREE_PTR(req);
 }
 
-static
-void target_release_saved_req(struct ptlrpc_request *req)
+
+static void target_release_req(struct ptlrpc_request *req)
 {
         if (req->rq_reply_state) {
                 ptlrpc_rs_decref(req->rq_reply_state);
@@ -927,8 +972,6 @@ void target_release_saved_req(struct ptlrpc_request *req)
 
 static void target_finish_recovery(struct obd_device *obd)
 {
-        struct list_head *tmp, *n;
-
         CWARN("%s: sending delayed replies to recovered clients\n",
               obd->obd_name);
 
@@ -941,37 +984,52 @@ static void target_finish_recovery(struct obd_device *obd)
                       rc < 0 ? "failed" : "complete", rc);
         }
 
-        list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) {
-                struct ptlrpc_request *req;
+        obd->obd_recovery_end = CURRENT_SECONDS;
+}
+
+static void abort_req_replay_queue(struct obd_device *obd)
+{
+        struct ptlrpc_request *req;
+        struct list_head *tmp, *n;
+        int rc;
+
+        list_for_each_safe(tmp, n, &obd->obd_req_replay_queue) {
                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
                 list_del(&req->rq_list);
-                DEBUG_REQ(D_WARNING, req, "delayed:");
-                ptlrpc_reply(req);
-                target_release_saved_req(req);
+                DEBUG_REQ(D_ERROR, req, "aborted:");
+                req->rq_status = -ENOTCONN;
+                req->rq_type = PTL_RPC_MSG_ERR;
+                rc = lustre_pack_reply(req, 0, NULL, NULL);
+                if (rc == 0) {
+                        ptlrpc_reply(req);
+                } else {
+                        DEBUG_REQ(D_ERROR, req,
+                                  "packing failed for abort-reply; skipping");
+                }
+                target_release_req(req);
         }
-        obd->obd_recovery_end = CURRENT_SECONDS;
 }
 
-static void abort_recovery_queue(struct obd_device *obd)
+static void abort_lock_replay_queue(struct obd_device *obd)
 {
         struct ptlrpc_request *req;
         struct list_head *tmp, *n;
         int rc;
 
-        list_for_each_safe(tmp, n, &obd->obd_recovery_queue) {
+        list_for_each_safe(tmp, n, &obd->obd_lock_replay_queue) {
                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
                 list_del(&req->rq_list);
                 DEBUG_REQ(D_ERROR, req, "aborted:");
                 req->rq_status = -ENOTCONN;
                 req->rq_type = PTL_RPC_MSG_ERR;
-                rc = lustre_pack_reply(req, 1, NULL, NULL);
+                rc = lustre_pack_reply(req, 0, NULL, NULL);
                 if (rc == 0) {
                         ptlrpc_reply(req);
                 } else {
                         DEBUG_REQ(D_ERROR, req,
                                   "packing failed for abort-reply; skipping");
                 }
-                target_release_saved_req(req);
+                target_release_req(req);
         }
 }
 
@@ -1002,43 +1060,25 @@ void target_cleanup_recovery(struct obd_device *obd)
         target_cancel_recovery_timer(obd);
         spin_unlock_bh(&obd->obd_processing_task_lock);
 
-        list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) {
+        list_for_each_safe(tmp, n, &obd->obd_req_replay_queue) {
                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
                 list_del(&req->rq_list);
-                target_release_saved_req(req);
+                LASSERT (req->rq_reply_state == 0);
+                target_release_req(req);
         }
-
-        list_for_each_safe(tmp, n, &obd->obd_recovery_queue) {
+        list_for_each_safe(tmp, n, &obd->obd_lock_replay_queue) {
                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
                 list_del(&req->rq_list);
-                target_release_saved_req(req);
+                LASSERT (req->rq_reply_state == 0);
+                target_release_req(req);
         }
-        EXIT;
-}
-
-void target_abort_recovery(void *data)
-{
-        struct obd_device *obd = data;
-
-        ENTRY;
-        spin_lock_bh(&obd->obd_processing_task_lock);
-        if (!obd->obd_recovering) {
-                spin_unlock_bh(&obd->obd_processing_task_lock);
-                EXIT;
-                return;
+        list_for_each_safe(tmp, n, &obd->obd_final_req_queue) {
+                req = list_entry(tmp, struct ptlrpc_request, rq_list);
+                list_del(&req->rq_list);
+                LASSERT (req->rq_reply_state == 0);
+                target_release_req(req);
         }
-        obd->obd_recovering = obd->obd_abort_recovery = 0;
-        obd->obd_recoverable_clients = 0;
-        target_cancel_recovery_timer(obd);
-        spin_unlock_bh(&obd->obd_processing_task_lock);
-
-        LCONSOLE_WARN("%s: recovery period over; disconnecting unfinished "
-                      "clients.\n", obd->obd_name);
-        class_disconnect_stale_exports(obd);
-        abort_recovery_queue(obd);
-
-        target_finish_recovery(obd);
-        CDEBUG(D_HA, "%s: recovery complete\n", obd_uuid2str(&obd->obd_uuid));
+        
         EXIT;
 }
 
@@ -1079,116 +1119,431 @@ static void reset_recovery_timer(struct obd_device *obd)
 
 
 /* Only start it the first time called */
-void target_start_recovery_timer(struct obd_device *obd, svc_handler_t handler)
+void target_start_recovery_timer(struct obd_device *obd)
 {
         spin_lock_bh(&obd->obd_processing_task_lock);
-        if (obd->obd_recovery_handler) {
+        if (obd->obd_recovery_handler
+            || timer_pending(&obd->obd_recovery_timer)) {
                 spin_unlock_bh(&obd->obd_processing_task_lock);
                 return;
         }
         CWARN("%s: starting recovery timer (%us)\n", obd->obd_name,
               OBD_RECOVERY_TIMEOUT);
-        obd->obd_recovery_handler = handler;
         cfs_timer_init(&obd->obd_recovery_timer, target_recovery_expired, obd);
         spin_unlock_bh(&obd->obd_processing_task_lock);
 
         reset_recovery_timer(obd);
 }
 
+#ifdef __KERNEL__
 static int check_for_next_transno(struct obd_device *obd)
 {
-        struct ptlrpc_request *req;
+        struct ptlrpc_request *req = NULL;
         int wake_up = 0, connected, completed, queue_len, max;
         __u64 next_transno, req_transno;
-
+        ENTRY;
         spin_lock_bh(&obd->obd_processing_task_lock);
-        req = list_entry(obd->obd_recovery_queue.next,
-                         struct ptlrpc_request, rq_list);
+        
+        if (!list_empty(&obd->obd_req_replay_queue)) {
+                req = list_entry(obd->obd_req_replay_queue.next,
+                                 struct ptlrpc_request, rq_list);
+                req_transno = lustre_msg_get_transno(req->rq_reqmsg);
+        } else {
+                req_transno = 0;
+        }
+
         max = obd->obd_max_recoverable_clients;
-        req_transno = lustre_msg_get_transno(req->rq_reqmsg);
         connected = obd->obd_connected_clients;
         completed = max - obd->obd_recoverable_clients;
         queue_len = obd->obd_requests_queued_for_recovery;
         next_transno = obd->obd_next_recovery_transno;
 
-        CDEBUG(D_HA,"max: %d, connected: %d, completed: %d, queue_len: %d, "
+        CWARN("max: %d, connected: %d, completed: %d, queue_len: %d, "
                "req_transno: "LPU64", next_transno: "LPU64"\n",
                max, connected, completed, queue_len, req_transno, next_transno);
         if (obd->obd_abort_recovery) {
                 CDEBUG(D_HA, "waking for aborted recovery\n");
                 wake_up = 1;
-        } else if (!obd->obd_recovering) {
-                CDEBUG(D_HA, "waking for completed recovery (?)\n");
+        } else if (atomic_read(&obd->obd_req_replay_clients) == 0) {
+                CDEBUG(D_HA, "waking for completed recovery\n");
                 wake_up = 1;
         } else if (req_transno == next_transno) {
                 CDEBUG(D_HA, "waking for next ("LPD64")\n", next_transno);
                 wake_up = 1;
         } else if (queue_len + completed == max) {
-                CDEBUG(D_ERROR,
+                LASSERT(lustre_msg_get_transno(req->rq_reqmsg) >= next_transno);
+                CDEBUG(req_transno > obd->obd_last_committed ? D_ERROR : D_HA,
                        "waking for skipped transno (skip: "LPD64
                        ", ql: %d, comp: %d, conn: %d, next: "LPD64")\n",
                        next_transno, queue_len, completed, max, req_transno);
                 obd->obd_next_recovery_transno = req_transno;
                 wake_up = 1;
+        } else if (queue_len == atomic_read(&obd->obd_req_replay_clients)) {
+                /* some clients haven't connected in time, but we can try
+                 * to replay requests that demand on already committed ones
+                 * also, we can replay first non-committed transation */
+                LASSERT(req_transno != 0);
+                if (req_transno == obd->obd_last_committed + 1) {
+                        obd->obd_next_recovery_transno = req_transno;
+                } else if (req_transno > obd->obd_last_committed) {
+                        /* can't continue recovery: have no needed transno */
+                        obd->obd_abort_recovery = 1;
+                        CDEBUG(D_ERROR, "abort due to missed clients. max: %d, "
+                               "connected: %d, completed: %d, queue_len: %d, "
+                               "req_transno: "LPU64", next_transno: "LPU64"\n",
+                               max, connected, completed, queue_len,
+                               req_transno, next_transno);
+                }
+                wake_up = 1;
         }
+        
         spin_unlock_bh(&obd->obd_processing_task_lock);
-        LASSERT(lustre_msg_get_transno(req->rq_reqmsg) >= next_transno);
         return wake_up;
 }
 
-static void process_recovery_queue(struct obd_device *obd)
+static struct ptlrpc_request *target_next_replay_req(struct obd_device *obd)
 {
+        struct l_wait_info lwi = { 0 };
         struct ptlrpc_request *req;
-        int abort_recovery = 0;
+        
+        CDEBUG(D_HA, "Waiting for transno "LPD64"\n",
+               obd->obd_next_recovery_transno);
+        l_wait_event(obd->obd_next_transno_waitq,
+                     check_for_next_transno(obd), &lwi);
+        
+        spin_lock_bh(&obd->obd_processing_task_lock);
+        if (obd->obd_abort_recovery) {
+                req = NULL;
+        } else if (!list_empty(&obd->obd_req_replay_queue)) {
+                req = list_entry(obd->obd_req_replay_queue.next,
+                                 struct ptlrpc_request, rq_list);
+                list_del_init(&req->rq_list);
+                obd->obd_requests_queued_for_recovery--;
+        } else {
+                req = NULL;
+        }
+        spin_unlock_bh(&obd->obd_processing_task_lock);
+        RETURN(req);
+}
+
+static int check_for_next_lock(struct obd_device *obd)
+{
+        struct ptlrpc_request *req = NULL;
+        int wake_up = 0;
+
+        spin_lock_bh(&obd->obd_processing_task_lock);
+        if (!list_empty(&obd->obd_lock_replay_queue)) {
+                req = list_entry(obd->obd_lock_replay_queue.next,
+                                 struct ptlrpc_request, rq_list);
+                CDEBUG(D_HA, "waking for next lock\n");
+                wake_up = 1;
+        } else if (atomic_read(&obd->obd_lock_replay_clients) == 0) {
+                CDEBUG(D_HA, "waking for completed lock replay\n");
+                wake_up = 1;
+        } else if (obd->obd_abort_recovery) {
+                CDEBUG(D_HA, "waking for aborted recovery\n");
+                wake_up = 1;
+        }
+        spin_unlock_bh(&obd->obd_processing_task_lock);
+        
+        return wake_up;
+}
+
+static struct ptlrpc_request *target_next_replay_lock(struct obd_device *obd)
+{
         struct l_wait_info lwi = { 0 };
-        ENTRY;
+        struct ptlrpc_request *req;
 
-        for (;;) {
-                spin_lock_bh(&obd->obd_processing_task_lock);
-                LASSERT(obd->obd_processing_task == cfs_curproc_pid());
-                req = list_entry(obd->obd_recovery_queue.next,
+        CDEBUG(D_HA, "Waiting for lock\n");
+        l_wait_event(obd->obd_next_transno_waitq,
+                     check_for_next_lock(obd), &lwi);
+        
+        spin_lock_bh(&obd->obd_processing_task_lock);
+        if (obd->obd_abort_recovery) {
+                req = NULL;
+        } else if (!list_empty(&obd->obd_lock_replay_queue)) {
+                req = list_entry(obd->obd_lock_replay_queue.next,
                                  struct ptlrpc_request, rq_list);
+                list_del_init(&req->rq_list);
+        } else {
+                req = NULL;
+        }
+        spin_unlock_bh(&obd->obd_processing_task_lock);
+        return req;
+}
 
-                if (lustre_msg_get_transno(req->rq_reqmsg) !=
-                    obd->obd_next_recovery_transno) {
-                        spin_unlock_bh(&obd->obd_processing_task_lock);
-                        CDEBUG(D_HA, "Waiting for transno "LPD64" (1st is "
-                               LPD64")\n",
-                               obd->obd_next_recovery_transno,
-                               lustre_msg_get_transno(req->rq_reqmsg));
-                        l_wait_event(obd->obd_next_transno_waitq,
-                                     check_for_next_transno(obd), &lwi);
-                        spin_lock_bh(&obd->obd_processing_task_lock);
-                        abort_recovery = obd->obd_abort_recovery;
-                        spin_unlock_bh(&obd->obd_processing_task_lock);
-                        if (abort_recovery) {
-                                target_abort_recovery(obd);
-                                return;
-                        }
-                        continue;
-                }
+static struct ptlrpc_request *target_next_final_ping(struct obd_device *obd)
+{
+        struct ptlrpc_request *req;
+
+        spin_lock_bh(&obd->obd_processing_task_lock);
+        if (!list_empty(&obd->obd_final_req_queue)) {
+                req = list_entry(obd->obd_final_req_queue.next,
+                                 struct ptlrpc_request, rq_list);
                 list_del_init(&req->rq_list);
-                obd->obd_requests_queued_for_recovery--;
-                spin_unlock_bh(&obd->obd_processing_task_lock);
+        } else {
+                req = NULL;
+        }
+        spin_unlock_bh(&obd->obd_processing_task_lock);
+        return req;
+}
+
+static inline int req_replay_done(struct obd_export *exp)
+{
+        return (exp->exp_req_replay_needed == 0);
+}
+
+static inline int lock_replay_done(struct obd_export *exp)
+{
+        return (exp->exp_lock_replay_needed == 0);
+}
+
+static inline int connect_done(struct obd_export *exp)
+{
+        return (exp->exp_connected != 0);
+}
+
+static int check_for_clients(struct obd_device *obd)
+{
+        if (obd->obd_abort_recovery)
+                return 1;
+        LASSERT(obd->obd_connected_clients <= obd->obd_max_recoverable_clients);
+        if (obd->obd_connected_clients == obd->obd_max_recoverable_clients)
+                return 1;
+        return 0;
+}
+
+static int target_recovery_thread(void *arg)
+{
+        struct obd_device *obd = arg;
+        struct ptlrpc_request *req;
+        struct target_recovery_data *trd = &obd->obd_recovery_data;
+        struct l_wait_info lwi = { 0 };
+        unsigned long delta;
+        unsigned long flags;
+        struct lu_env env;
+        struct ptlrpc_thread fake_svc_thread, *thread = &fake_svc_thread;
+        __u32 recov_ctx_tags = LCT_MD_THREAD;
+        int rc = 0;
+        ENTRY;
+
+        cfs_daemonize("tgt_recov");
+
+        SIGNAL_MASK_LOCK(current, flags);
+        sigfillset(&current->blocked);
+        RECALC_SIGPENDING;
+        SIGNAL_MASK_UNLOCK(current, flags);
+
+        rc = lu_context_init(&env.le_ctx, recov_ctx_tags);
+        if (rc)
+                return rc;
+
+        thread->t_env = &env;
+        env.le_ctx.lc_thread = thread;
+
+        CERROR("%s: started recovery thread pid %d\n", obd->obd_name, 
+               current->pid);
+        trd->trd_processing_task = current->pid;
+
+        obd->obd_recovering = 1;
+        complete(&trd->trd_starting);
+
+        /* first of all, we have to know the first transno to replay */
+        obd->obd_abort_recovery = 0;
+        l_wait_event(obd->obd_next_transno_waitq,
+                     check_for_clients(obd), &lwi);
+
+        spin_lock_bh(&obd->obd_processing_task_lock);
+        target_cancel_recovery_timer(obd);
+        spin_unlock_bh(&obd->obd_processing_task_lock);
+
+        /* If some clients haven't connected in time, evict them */
+        if (obd->obd_abort_recovery) {
+                int stale;
+                CDEBUG(D_ERROR, "few clients haven't connect in time (%d/%d),"
+                       "evict them ...\n", obd->obd_connected_clients,
+                       obd->obd_max_recoverable_clients);
+                obd->obd_abort_recovery = 0;
+                stale = class_disconnect_stale_exports(obd, connect_done);
+                atomic_sub(stale, &obd->obd_req_replay_clients);
+                atomic_sub(stale, &obd->obd_lock_replay_clients);
+        }
+        /* next stage: replay requests */
+        delta = jiffies;
+        obd->obd_req_replaying = 1;
+        CDEBUG(D_ERROR, "1: request replay stage - %d clients from t"LPU64"\n",
+              atomic_read(&obd->obd_req_replay_clients),
+              obd->obd_next_recovery_transno);
+        while ((req = target_next_replay_req(obd))) {
+                LASSERT(trd->trd_processing_task == current->pid);
+                DEBUG_REQ(D_HA, req, "processing t"LPD64" from %s", 
+                          lustre_msg_get_transno(req->rq_reqmsg), 
+                          libcfs_nid2str(req->rq_peer.nid));
+
+                rc = lu_context_init(&req->rq_session, LCT_SESSION);
+                if (rc) {
+                        CERROR("Failure to initialize session: %d\n", rc);
+                        break;
+                }
+                req->rq_session.lc_thread = thread;
+                lu_context_enter(&req->rq_session);
+                req->rq_svc_thread = thread;
+                req->rq_svc_thread->t_env->le_ses = &req->rq_session;
+                
+                (void)trd->trd_recovery_handler(req);
+                
+                lu_context_exit(&req->rq_session);
+                lu_context_fini(&req->rq_session);
 
-                DEBUG_REQ(D_HA, req, "processing: ");
-                (void)obd->obd_recovery_handler(req);
                 obd->obd_replayed_requests++;
                 reset_recovery_timer(obd);
-                /* bug 1580: decide how to properly sync() in recovery */
-                //mds_fsync_super(obd->u.obt.obt_sb);
-                target_release_saved_req(req);
-
+                /* bug 1580: decide how to properly sync() in recovery*/
+                //mds_fsync_super(mds->mds_sb);
+                ptlrpc_free_clone(req);
                 spin_lock_bh(&obd->obd_processing_task_lock);
                 obd->obd_next_recovery_transno++;
-                if (list_empty(&obd->obd_recovery_queue)) {
-                        obd->obd_processing_task = 0;
-                        spin_unlock_bh(&obd->obd_processing_task_lock);
-                        break;
+                spin_unlock_bh(&obd->obd_processing_task_lock);
+        }
+
+        spin_lock_bh(&obd->obd_processing_task_lock);
+        target_cancel_recovery_timer(obd);
+        spin_unlock_bh(&obd->obd_processing_task_lock);
+        /* If some clients haven't replayed requests in time, evict them */
+        if (obd->obd_abort_recovery) {
+                int stale;
+                CDEBUG(D_ERROR, "req replay timed out, aborting ...\n");
+                obd->obd_abort_recovery = 0;
+                stale = class_disconnect_stale_exports(obd, req_replay_done);
+                atomic_sub(stale, &obd->obd_lock_replay_clients);
+                abort_req_replay_queue(obd);
+                /* XXX for debuggin tests 11 and 17 */
+                /* LBUG(); */
+        }
+        /* The second stage: replay locks */
+        CDEBUG(D_ERROR, "2: lock replay stage - %d clients\n",
+              atomic_read(&obd->obd_lock_replay_clients));
+        while ((req = target_next_replay_lock(obd))) {
+                LASSERT(trd->trd_processing_task == current->pid);
+                DEBUG_REQ(D_HA, req, "processing lock from %s: ", 
+                          libcfs_nid2str(req->rq_peer.nid));
+                (void)trd->trd_recovery_handler(req);
+                reset_recovery_timer(obd);
+                ptlrpc_free_clone(req);
+                obd->obd_replayed_locks++;
+        }
+        
+        spin_lock_bh(&obd->obd_processing_task_lock);
+        target_cancel_recovery_timer(obd);
+        spin_unlock_bh(&obd->obd_processing_task_lock);
+        /* If some clients haven't replayed requests in time, evict them */
+        if (obd->obd_abort_recovery) {
+                int stale;
+                CERROR("lock replay timed out, aborting ...\n");
+                obd->obd_abort_recovery = 0;
+                stale = class_disconnect_stale_exports(obd, lock_replay_done);
+                abort_lock_replay_queue(obd);
+        }
+
+        /* We drop recoverying flag to forward all new requests
+         * to regular mds_handle() since now */
+        spin_lock_bh(&obd->obd_processing_task_lock);
+        obd->obd_recovering = 0;
+        spin_unlock_bh(&obd->obd_processing_task_lock);
+        /* The third stage: reply on final pings */
+        CDEBUG(D_ERROR, "3: final stage - process recovery completion pings\n");
+        while ((req = target_next_final_ping(obd))) {
+                LASSERT(trd->trd_processing_task == current->pid);
+                DEBUG_REQ(D_HA, req, "processing final ping from %s: ", 
+                          libcfs_nid2str(req->rq_peer.nid));
+                (void)trd->trd_recovery_handler(req);
+                ptlrpc_free_clone(req);
+        }
+       
+        delta = (jiffies - delta) / HZ;
+        CDEBUG(D_ERROR,"4: recovery completed in %lus - %d/%d reqs/locks\n",
+              delta, obd->obd_replayed_requests, obd->obd_replayed_locks);
+        if (delta > obd_timeout * 2) {
+                CWARN("too long recovery - read logs\n");
+                libcfs_debug_dumplog();
+        }
+        target_finish_recovery(obd);
+
+        lu_env_fini(&env);
+        trd->trd_processing_task = 0;
+        complete(&trd->trd_finishing);
+        return rc;
+}
+
+int target_start_recovery_thread(struct obd_device *obd, svc_handler_t handler)
+{
+        int rc = 0;
+        struct target_recovery_data *trd = &obd->obd_recovery_data;
+
+        memset(trd, 0, sizeof(*trd));
+        init_completion(&trd->trd_starting);
+        init_completion(&trd->trd_finishing);
+        trd->trd_recovery_handler = handler;
+
+        if (kernel_thread(target_recovery_thread, obd, 0) > 0) {
+                wait_for_completion(&trd->trd_starting);
+                LASSERT(obd->obd_recovering != 0);
+        } else
+                rc = -ECHILD;
+
+        return rc;
+}
+
+void target_stop_recovery_thread(struct obd_device *obd)
+{
+        spin_lock_bh(&obd->obd_processing_task_lock);
+        if (obd->obd_recovery_data.trd_processing_task > 0) {
+                struct target_recovery_data *trd = &obd->obd_recovery_data;
+                CERROR("%s: aborting recovery\n", obd->obd_name);
+                obd->obd_abort_recovery = 1;
+                wake_up(&obd->obd_next_transno_waitq);
+                spin_unlock_bh(&obd->obd_processing_task_lock);
+                wait_for_completion(&trd->trd_finishing);
+        } else {
+                spin_unlock_bh(&obd->obd_processing_task_lock);
+        }
+}
+#endif
+
+int target_process_req_flags(struct obd_device *obd, struct ptlrpc_request *req)
+{
+        struct obd_export *exp = req->rq_export;
+        LASSERT(exp != NULL);
+        if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REQ_REPLAY_DONE) {
+                /* client declares he's ready to replay locks */
+                spin_lock_bh(&obd->obd_processing_task_lock);
+                if (exp->exp_req_replay_needed) {
+                        LASSERT(atomic_read(&obd->obd_req_replay_clients) > 0);
+                        exp->exp_req_replay_needed = 0;
+                        atomic_dec(&obd->obd_req_replay_clients);
+                        obd->obd_recoverable_clients--;
+                        if (atomic_read(&obd->obd_req_replay_clients) == 0)
+                                CDEBUG(D_HA, "all clients have replayed reqs\n");
+                        wake_up(&obd->obd_next_transno_waitq);
                 }
                 spin_unlock_bh(&obd->obd_processing_task_lock);
         }
-        EXIT;
+        if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LOCK_REPLAY_DONE) {
+                /* client declares he's ready to complete recovery 
+                 * so, we put the request on th final queue */
+                spin_lock_bh(&obd->obd_processing_task_lock);
+                if (exp->exp_lock_replay_needed) {
+                        LASSERT(atomic_read(&obd->obd_lock_replay_clients) > 0);
+                        exp->exp_lock_replay_needed = 0;
+                        atomic_dec(&obd->obd_lock_replay_clients);
+                        if (atomic_read(&obd->obd_lock_replay_clients) == 0)
+                                CDEBUG(D_HA, "all clients have replayed locks\n");
+                        wake_up(&obd->obd_next_transno_waitq);
+                }
+                spin_unlock_bh(&obd->obd_processing_task_lock);
+        }
+
+        return 0;
 }
 
 int target_queue_recovery_request(struct ptlrpc_request *req,
@@ -1197,7 +1552,40 @@ int target_queue_recovery_request(struct ptlrpc_request *req,
         struct list_head *tmp;
         int inserted = 0;
         __u64 transno = lustre_msg_get_transno(req->rq_reqmsg);
-        struct ptlrpc_request *saved_req;
+
+        ENTRY;
+
+        if (obd->obd_recovery_data.trd_processing_task == current->pid) {
+                /* Processing the queue right now, don't re-add. */
+                return 1;
+        }
+
+        target_process_req_flags(obd, req);
+
+        if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LOCK_REPLAY_DONE) {
+                /* client declares he's ready to complete recovery 
+                 * so, we put the request on th final queue */
+                req = ptlrpc_clone_req(req);
+                if (req == NULL)
+                        return -ENOMEM;
+                DEBUG_REQ(D_HA, req, "queue final req");
+                spin_lock_bh(&obd->obd_processing_task_lock);
+                list_add_tail(&req->rq_list, &obd->obd_final_req_queue);
+                spin_unlock_bh(&obd->obd_processing_task_lock);
+                return 0;
+        }
+        if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REQ_REPLAY_DONE) {
+                /* client declares he's ready to replay locks */
+                req = ptlrpc_clone_req(req);
+                if (req == NULL)
+                        return -ENOMEM;
+                DEBUG_REQ(D_HA, req, "queue lock replay req");
+                spin_lock_bh(&obd->obd_processing_task_lock);
+                list_add_tail(&req->rq_list, &obd->obd_lock_replay_queue);
+                spin_unlock_bh(&obd->obd_processing_task_lock);
+                wake_up(&obd->obd_next_transno_waitq);
+                return 0;
+        }
 
         /* CAVEAT EMPTOR: The incoming request message has been swabbed
          * (i.e. buflens etc are in my own byte order), but type-dependent
@@ -1209,48 +1597,43 @@ int target_queue_recovery_request(struct ptlrpc_request *req,
                 return 1;
         }
 
-        /* XXX If I were a real man, these LBUGs would be sane cleanups. */
-        saved_req = target_save_req(req);
-        if (!saved_req)
-                LBUG();
-
         spin_lock_bh(&obd->obd_processing_task_lock);
 
-        /*
-         * If we're processing the queue, we don't want to queue this message.
+        /* If we're processing the queue, we want don't want to queue this
+         * message.
          *
          * Also, if this request has a transno less than the one we're waiting
          * for, we should process it now.  It could (and currently always will)
          * be an open request for a descriptor that was opened some time ago.
          *
-         * Also, a resent, replayed request that has already been handled will
-         * pass through here and be processed immediately.
+         * Also, a resent, replayed request that has already been
+         * handled will pass through here and be processed immediately.
          */
-        if (obd->obd_processing_task == cfs_curproc_pid() ||
-            transno < obd->obd_next_recovery_transno) {
+        CWARN("Next recovery transno: "LPX64", current: "LPX64", replaying: %i\n",
+              obd->obd_next_recovery_transno, transno, obd->obd_req_replaying);
+        if (transno <= obd->obd_next_recovery_transno && obd->obd_req_replaying) {
                 /* Processing the queue right now, don't re-add. */
                 LASSERT(list_empty(&req->rq_list));
                 spin_unlock_bh(&obd->obd_processing_task_lock);
-
-                target_release_saved_req(saved_req);
                 return 1;
         }
-
+        spin_unlock_bh(&obd->obd_processing_task_lock);
+        
         /* A resent, replayed request that is still on the queue; just drop it.
            The queued request will handle this. */
-        if ((lustre_msg_get_flags(req->rq_reqmsg) & (MSG_RESENT|MSG_REPLAY)) ==
-            (MSG_RESENT | MSG_REPLAY)) {
+        if ((lustre_msg_get_flags(req->rq_reqmsg) & (MSG_RESENT|MSG_REPLAY))
+            == (MSG_RESENT | MSG_REPLAY)) {
                 DEBUG_REQ(D_ERROR, req, "dropping resent queued req");
-                spin_unlock_bh(&obd->obd_processing_task_lock);
-
-                target_release_saved_req(saved_req);
                 return 0;
         }
 
-        req = saved_req;
+        req = ptlrpc_clone_req(req);
+        if (req == NULL)
+                return -ENOMEM;
 
+        spin_lock_bh(&obd->obd_processing_task_lock);
         /* XXX O(n^2) */
-        list_for_each(tmp, &obd->obd_recovery_queue) {
+        list_for_each(tmp, &obd->obd_req_replay_queue) {
                 struct ptlrpc_request *reqiter =
                         list_entry(tmp, struct ptlrpc_request, rq_list);
 
@@ -1261,28 +1644,12 @@ int target_queue_recovery_request(struct ptlrpc_request *req,
                 }
         }
 
-        if (!inserted) {
-                list_add_tail(&req->rq_list, &obd->obd_recovery_queue);
-        }
+        if (!inserted)
+                list_add_tail(&req->rq_list, &obd->obd_req_replay_queue);
 
         obd->obd_requests_queued_for_recovery++;
-
-        if (obd->obd_processing_task != 0) {
-                /* Someone else is processing this queue, we'll leave it to
-                 * them.
-                 */
-                cfs_waitq_signal(&obd->obd_next_transno_waitq);
-                spin_unlock_bh(&obd->obd_processing_task_lock);
-                return 0;
-        }
-
-        /* Nobody is processing, and we know there's (at least) one to process
-         * now, so we'll do the honours.
-         */
-        obd->obd_processing_task = cfs_curproc_pid();
+        wake_up(&obd->obd_next_transno_waitq);
         spin_unlock_bh(&obd->obd_processing_task_lock);
-
-        process_recovery_queue(obd);
         return 0;
 }
 
@@ -1291,84 +1658,15 @@ struct obd_device * target_req2obd(struct ptlrpc_request *req)
         return req->rq_export->exp_obd;
 }
 
-int target_queue_final_reply(struct ptlrpc_request *req, int rc)
-{
-        struct obd_device *obd = target_req2obd(req);
-        struct ptlrpc_request *saved_req;
-        int recovery_done = 0;
-
-        LASSERT ((rc == 0) == (req->rq_reply_state != NULL));
-
-        if (rc) {
-                /* Just like ptlrpc_error, but without the sending. */
-                rc = lustre_pack_reply(req, 1, NULL, NULL);
-                LASSERT(rc == 0); /* XXX handle this */
-                req->rq_type = PTL_RPC_MSG_ERR;
-        }
-
-        LASSERT (!req->rq_reply_state->rs_difficult);
-        LASSERT(list_empty(&req->rq_list));
-
-        saved_req = target_save_req(req);
-        if (!saved_req)
-                LBUG();
-
-        /* Don't race cleanup */
-        spin_lock_bh(&obd->obd_processing_task_lock);
-        if (obd->obd_stopping) {
-                spin_unlock_bh(&obd->obd_processing_task_lock);
-                target_release_saved_req(saved_req);
-                req->rq_status = -ENOTCONN;
-                /* rv is ignored anyhow */
-                return -ENOTCONN;
-        }
-
-        req = saved_req;
-        list_add(&req->rq_list, &obd->obd_delayed_reply_queue);
-
-        /* only count the first "replay over" request from each
-           export */
-        if (req->rq_export->exp_replay_needed) {
-                --obd->obd_recoverable_clients;
-                req->rq_export->exp_replay_needed = 0;
-        }
-        recovery_done = (obd->obd_recoverable_clients == 0);
-        spin_unlock_bh(&obd->obd_processing_task_lock);
-
-        OBD_RACE(OBD_FAIL_LDLM_RECOV_CLIENTS);
-        if (recovery_done) {
-                spin_lock_bh(&obd->obd_processing_task_lock);
-                obd->obd_recovering = obd->obd_abort_recovery = 0;
-                target_cancel_recovery_timer(obd);
-                spin_unlock_bh(&obd->obd_processing_task_lock);
-
-                target_finish_recovery(obd);
-                CDEBUG(D_HA, "%s: recovery complete\n",
-                       obd_uuid2str(&obd->obd_uuid));
-        } else {
-                CWARN("%s: %d recoverable clients remain\n",
-                       obd->obd_name, obd->obd_recoverable_clients);
-                cfs_waitq_signal(&obd->obd_next_transno_waitq);
-        }
-
-        return 1;
-}
-
-int
-target_send_reply_msg(struct ptlrpc_request *req, int rc, int fail_id)
+int target_send_reply_msg(struct ptlrpc_request *req, int rc, int fail_id)
 {
         if (OBD_FAIL_CHECK(fail_id | OBD_FAIL_ONCE)) {
                 obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED;
-                DEBUG_REQ(D_ERROR, req, "dropping reply");
+                DEBUG_REQ(D_ERROR, req, "dropping reply"); 
                 return (-ECOMM);
         }
 
-        if (rc || req->rq_reply_state == NULL) {
-                if (rc == 0) {
-                        DEBUG_REQ(D_ERROR, req, "no reply message packed");
-                        rc = -ENOMEM;
-                } else
-                        DEBUG_REQ(D_ERROR, req, "processing error (%d)", rc);
+        if (rc) {
                 req->rq_status = rc;
                 return (ptlrpc_error(req));
         }
@@ -1377,8 +1675,7 @@ target_send_reply_msg(struct ptlrpc_request *req, int rc, int fail_id)
         return (ptlrpc_send_reply(req, 1));
 }
 
-void
-target_send_reply(struct ptlrpc_request *req, int rc, int fail_id)
+void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id)
 {
         int                        netrc;
         struct ptlrpc_reply_state *rs;
@@ -1486,8 +1783,8 @@ void target_committed_to_req(struct ptlrpc_request *req)
         else
                 DEBUG_REQ(D_IOCTL, req, "not sending last_committed update");
 
-        CDEBUG(D_INFO, "last_committed "LPU64", xid "LPU64"\n",
-               obd->obd_last_committed, req->rq_xid);
+        CDEBUG(D_INFO, "last_committed "LPU64", transno "LPU64", xid "LPU64"\n",
+               obd->obd_last_committed, req->rq_transno, req->rq_xid);
 }
 
 EXPORT_SYMBOL(target_committed_to_req);
index 88821ec..0f63667 100644 (file)
@@ -1928,16 +1928,16 @@ EXPORT_SYMBOL(client_obd_setup);
 EXPORT_SYMBOL(client_obd_cleanup);
 EXPORT_SYMBOL(client_connect_import);
 EXPORT_SYMBOL(client_disconnect_export);
-EXPORT_SYMBOL(target_abort_recovery);
-EXPORT_SYMBOL(target_cleanup_recovery);
+EXPORT_SYMBOL(target_start_recovery_thread);
+EXPORT_SYMBOL(target_stop_recovery_thread);
 EXPORT_SYMBOL(target_handle_connect);
+EXPORT_SYMBOL(target_cleanup_recovery);
 EXPORT_SYMBOL(target_destroy_export);
 EXPORT_SYMBOL(target_cancel_recovery_timer);
 EXPORT_SYMBOL(target_send_reply);
 EXPORT_SYMBOL(target_queue_recovery_request);
 EXPORT_SYMBOL(target_handle_ping);
 EXPORT_SYMBOL(target_handle_disconnect);
-EXPORT_SYMBOL(target_queue_final_reply);
 
 /* l_lock.c */
 EXPORT_SYMBOL(lock_res_and_lock);
index 25a319c..a934e5d 100644 (file)
@@ -1282,6 +1282,11 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
                 size[DLM_REPLY_REC_OFF] = lock->l_lvb_len;
         }
         ptlrpc_req_set_repsize(req, buffers, size);
+        /* notify the server we've replayed all requests.
+         * also, we mark the request to be put on a dedicated
+         * queue to be processed after all request replayes.
+         * bug 6063 */
+        lustre_msg_set_flags(req->rq_reqmsg, MSG_REQ_REPLAY_DONE);
 
         LDLM_DEBUG(lock, "replaying lock:");
 
index 1be79c3..fba849d 100644 (file)
@@ -319,19 +319,12 @@ static int mds_connect(const struct lu_env *env,
         struct obd_export *exp;
         struct mds_export_data *med;
         struct mds_client_data *mcd = NULL;
-        int rc, abort_recovery;
+        int rc;
         ENTRY;
 
         if (!conn || !obd || !cluuid)
                 RETURN(-EINVAL);
 
-        /* Check for aborted recovery. */
-        spin_lock_bh(&obd->obd_processing_task_lock);
-        abort_recovery = obd->obd_abort_recovery;
-        spin_unlock_bh(&obd->obd_processing_task_lock);
-        if (abort_recovery)
-                target_abort_recovery(obd);
-
         /* XXX There is a small race between checking the list and adding a
          * new connection for the same UUID, but the real threat (list
          * corruption when multiple different clients connect) is solved.
@@ -1444,7 +1437,7 @@ int mds_handle(struct ptlrpc_request *req)
         /* XXX identical to OST */
         if (lustre_msg_get_opc(req->rq_reqmsg) != MDS_CONNECT) {
                 struct mds_export_data *med;
-                int recovering, abort_recovery;
+                int recovering;
 
                 if (req->rq_export == NULL) {
                         CERROR("operation %d on unconnected MDS from %s\n",
@@ -1479,16 +1472,18 @@ int mds_handle(struct ptlrpc_request *req)
 
                 /* Check for aborted recovery. */
                 spin_lock_bh(&obd->obd_processing_task_lock);
-                abort_recovery = obd->obd_abort_recovery;
                 recovering = obd->obd_recovering;
                 spin_unlock_bh(&obd->obd_processing_task_lock);
-                if (abort_recovery) {
-                        target_abort_recovery(obd);
-                } else if (recovering) {
+                if (recovering) {
                         rc = mds_filter_recovery_request(req, obd,
                                                          &should_process);
                         if (rc || !should_process)
                                 RETURN(rc);
+                        else if (should_process < 0) {
+                                req->rq_status = should_process;
+                                rc = ptlrpc_error(req);
+                                RETURN(rc);
+                        }
                 }
         }
 
@@ -1496,7 +1491,7 @@ int mds_handle(struct ptlrpc_request *req)
         case MDS_CONNECT:
                 DEBUG_REQ(D_INODE, req, "connect");
                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CONNECT_NET, 0);
-                rc = target_handle_connect(req, mds_handle);
+                rc = target_handle_connect(req);
                 if (!rc) {
                         /* Now that we have an export, set mds. */
                         /*
@@ -1747,15 +1742,6 @@ int mds_handle(struct ptlrpc_request *req)
         EXIT;
  out:
 
-        if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
-                if (obd && obd->obd_recovering) {
-                        DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
-                        return target_queue_final_reply(req, rc);
-                }
-                /* Lost a race with recovery; let the error path DTRT. */
-                rc = req->rq_status = -ENOTCONN;
-        }
-
         target_send_reply(req, rc, fail);
         return 0;
 }
index b3ce673..198e0cb 100644 (file)
@@ -365,7 +365,7 @@ static int mds_init_server_data(struct obd_device *obd, struct file *file)
 
 
                 mcd = NULL;
-                exp->exp_replay_needed = 1;
+                exp->exp_req_replay_needed = 1;
                 exp->exp_connecting = 0;
                 obd->obd_recoverable_clients++;
                 obd->obd_max_recoverable_clients++;
@@ -384,6 +384,8 @@ static int mds_init_server_data(struct obd_device *obd, struct file *file)
         obd->obd_last_committed = mds->mds_last_transno;
 
         if (obd->obd_recoverable_clients) {
+                /* shouldn't happen in b_new_cmd */
+                LBUG();
                 CWARN("RECOVERY: service %s, %d recoverable clients, "
                       "last_transno "LPU64"\n", obd->obd_name,
                       obd->obd_recoverable_clients, mds->mds_last_transno);
index 892efa1..028d290 100644 (file)
@@ -614,7 +614,7 @@ int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
 
         case OBD_IOC_ABORT_RECOVERY:
                 CERROR("aborting recovery for device %s\n", obd->obd_name);
-                target_abort_recovery(obd);
+                target_stop_recovery_thread(obd);
                 RETURN(0);
 
         default:
index 82f3a9f..6d13fbe 100644 (file)
@@ -359,7 +359,7 @@ void mds_steal_ack_locks(struct ptlrpc_request *req)
         }
         spin_unlock(&exp->exp_lock);
 }
-
+EXPORT_SYMBOL(mds_steal_ack_locks);
 void mds_req_from_mcd(struct ptlrpc_request *req, struct mds_client_data *mcd)
 {
         if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE) {
index 54e9b51..db7f3d3 100644 (file)
@@ -125,7 +125,6 @@ static struct mdt_opc_slice mdt_fld_handlers[];
 
 static struct mdt_device *mdt_dev(struct lu_device *d);
 static int mdt_regular_handle(struct ptlrpc_request *req);
-static int mdt_recovery_handle(struct ptlrpc_request *req);
 static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags);
 
 static struct lu_object_operations mdt_obj_ops;
@@ -675,7 +674,7 @@ static int mdt_connect(struct mdt_thread_info *info)
         struct ptlrpc_request *req;
 
         req = mdt_info_req(info);
-        rc = target_handle_connect(req, mdt_recovery_handle);
+        rc = target_handle_connect(req);
         if (rc == 0) {
                 LASSERT(req->rq_export != NULL);
                 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
@@ -1643,7 +1642,7 @@ static int mdt_req_handle(struct mdt_thread_info *info,
         }
 
         /* If we're DISCONNECTing, the mdt_export_data is already freed */
-        if (rc == 0 && h->mh_opc != MDS_DISCONNECT)
+        if (h->mh_opc != MDS_DISCONNECT)
                 target_committed_to_req(req);
 
         RETURN(rc);
@@ -1665,6 +1664,7 @@ static void mdt_thread_info_init(struct ptlrpc_request *req,
 {
         int i;
 
+        LASSERT(info->mti_env != req->rq_svc_thread->t_env);
         memset(info, 0, sizeof(*info));
 
         info->mti_rep_buf_nr = ARRAY_SIZE(info->mti_rep_buf_size);
@@ -1696,6 +1696,7 @@ static void mdt_thread_info_fini(struct mdt_thread_info *info)
         }
         for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
                 mdt_lock_handle_fini(&info->mti_lh[i]);
+        info->mti_env = NULL;
 }
 
 /* mds/handler.c */
@@ -1711,7 +1712,6 @@ static int mdt_recovery(struct mdt_thread_info *info)
 {
         struct ptlrpc_request *req = mdt_info_req(info);
         int recovering;
-        int abort_recovery;
         struct obd_device *obd;
 
         ENTRY;
@@ -1730,7 +1730,8 @@ static int mdt_recovery(struct mdt_thread_info *info)
                        lustre_msg_get_opc(req->rq_reqmsg),
                        libcfs_id2str(req->rq_peer));
                 req->rq_status = -ENOTCONN;
-                RETURN(-ENOTCONN);
+                target_send_reply(req, -ENOTCONN, info->mti_fail_id);
+                RETURN(0);
         }
 
         /* sanity check: if the xid matches, the request must be marked as a
@@ -1741,6 +1742,7 @@ static int mdt_recovery(struct mdt_thread_info *info)
                       (MSG_RESENT | MSG_REPLAY))) {
                         CERROR("rq_xid "LPU64" matches last_xid, "
                                 "expected RESENT flag\n", req->rq_xid);
+                        LBUG();
                         req->rq_status = -ENOTCONN;
                         RETURN(-ENOTCONN);
                 }
@@ -1756,18 +1758,20 @@ static int mdt_recovery(struct mdt_thread_info *info)
 
         /* Check for aborted recovery... */
         spin_lock_bh(&obd->obd_processing_task_lock);
-        abort_recovery = obd->obd_abort_recovery;
         recovering = obd->obd_recovering;
         spin_unlock_bh(&obd->obd_processing_task_lock);
-        if (abort_recovery) {
-                target_abort_recovery(obd);
-        } else if (recovering) {
+        if (recovering) {
                 int rc;
                 int should_process;
-
+                DEBUG_REQ(D_WARNING, req, "Got new replay");
                 rc = mds_filter_recovery_request(req, obd, &should_process);
                 if (rc != 0 || !should_process)
                         RETURN(rc);
+                else if (should_process < 0) {
+                        req->rq_status = should_process;
+                        rc = ptlrpc_error(req);
+                        RETURN(rc);
+                }
         }
         RETURN(+1);
 }
@@ -1775,24 +1779,7 @@ static int mdt_recovery(struct mdt_thread_info *info)
 static int mdt_reply(struct ptlrpc_request *req, int rc,
                      struct mdt_thread_info *info)
 {
-        struct obd_device *obd;
         ENTRY;
-
-        if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
-                if (lustre_msg_get_opc(req->rq_reqmsg) != OBD_PING)
-                        DEBUG_REQ(D_ERROR, req, "Unexpected MSG_LAST_REPLAY");
-
-                obd = req->rq_export != NULL ? req->rq_export->exp_obd : NULL;
-                if (obd && obd->obd_recovering) {
-                        DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
-                        RETURN(target_queue_final_reply(req, rc));
-                } else {
-                        /*
-                         * Lost a race with recovery; let the error path DTRT.
-                         */
-                        rc = req->rq_status = -ENOTCONN;
-                }
-        }
         target_send_reply(req, rc, info->mti_fail_id);
         RETURN(0);
 }
@@ -1868,7 +1855,7 @@ static int mdt_handle_common(struct ptlrpc_request *req,
  * This is called from recovery code as handler of _all_ RPC types, FLD and SEQ
  * as well.
  */
-static int mdt_recovery_handle(struct ptlrpc_request *req)
+int mdt_recovery_handle(struct ptlrpc_request *req)
 {
         int rc;
         ENTRY;
@@ -3105,9 +3092,15 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
         ENTRY;
         target_cleanup_recovery(m->mdt_md_dev.md_lu_dev.ld_obd);
 
+        mdt_fs_cleanup(env, m);
+
         ping_evictor_stop();
         mdt_stop_ptlrpc_service(m);
 
+        cleanup_capas(CAPA_SITE_SERVER);
+        del_timer(&m->mdt_ck_timer);
+        mdt_ck_thread_stop(m);
+
         upcall_cache_cleanup(m->mdt_rmtacl_cache);
         m->mdt_rmtacl_cache = NULL;
 
@@ -3128,12 +3121,6 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
                 m->mdt_rootsquash_info = NULL;
         }
 
-        cleanup_capas(CAPA_SITE_SERVER);
-        del_timer(&m->mdt_ck_timer);
-        mdt_ck_thread_stop(m);
-
-        mdt_fs_cleanup(env, m);
-
         /* finish the stack */
         mdt_stack_fini(env, m, md2lu_dev(m->mdt_child));
 
@@ -3273,6 +3260,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
                 GOTO(err_capa, rc);
 
         ping_evictor_start();
+
         rc = mdt_fs_setup(env, m, obd);
         if (rc)
                 GOTO(err_stop_service, rc);
@@ -3810,7 +3798,7 @@ static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
 
         case OBD_IOC_ABORT_RECOVERY:
                 CERROR("aborting recovery for device %s\n", obd->obd_name);
-                target_abort_recovery(obd);
+                target_stop_recovery_thread(obd);
                 break;
 
         default:
index 447019a..0c35829 100644 (file)
@@ -434,6 +434,8 @@ int mdt_client_new(const struct lu_env *env,
                    struct mdt_device *mdt,
                    struct mdt_export_data *med);
 
+int mdt_recovery_handle(struct ptlrpc_request *);
+
 int mdt_pin(struct mdt_thread_info* info);
 
 int mdt_lock_new_child(struct mdt_thread_info *info,
index ce7a46a..0e3e404 100644 (file)
@@ -286,16 +286,14 @@ static void mdt_open_transno(struct mdt_thread_info* info)
         struct mdt_device *mdt = info->mti_mdt;
         struct ptlrpc_request *req = mdt_info_req(info);
 
-        if (info->mti_transno != 0) {
-                CDEBUG(D_INODE, "(open | create) | replay: transno = %llu,"
-                                " last_committed = %llu\n",
-                                info->mti_transno,
-                                req->rq_export->exp_obd->obd_last_committed);
-                return;
-        }
-
         spin_lock(&mdt->mdt_transno_lock);
-        info->mti_transno = ++ mdt->mdt_last_transno;
+        if (info->mti_transno == 0) {
+                info->mti_transno = ++ mdt->mdt_last_transno;
+        } else {
+                /* should be replay */
+                if (info->mti_transno > mdt->mdt_last_transno)
+                        mdt->mdt_last_transno = info->mti_transno;
+        }
         spin_unlock(&mdt->mdt_transno_lock);
 
         CDEBUG(D_INODE, "open only: transno = %llu, last_committed = %llu\n",
@@ -304,7 +302,6 @@ static void mdt_open_transno(struct mdt_thread_info* info)
 
         req->rq_transno = info->mti_transno;
         lustre_msg_set_transno(req->rq_repmsg, info->mti_transno);
-        target_committed_to_req(req);
         lustre_msg_set_last_xid(req->rq_repmsg, req->rq_xid);
 }
 
index bd91703..feeba27 100644 (file)
@@ -385,10 +385,14 @@ static int mdt_clients_data_init(const struct lu_env *env,
                 rc = mdt_client_add(env, mdt, med, cl_idx);
                 LASSERTF(rc == 0, "rc = %d\n", rc); /* can't fail existing */
                 mcd = NULL;
-                exp->exp_replay_needed = 1;
                 exp->exp_connecting = 0;
                 obd->obd_recoverable_clients++;
                 obd->obd_max_recoverable_clients++;
+                atomic_inc(&obd->obd_req_replay_clients);
+                exp->exp_req_replay_needed = 1;
+                atomic_inc(&obd->obd_lock_replay_clients);
+                exp->exp_lock_replay_needed = 1;
+                
                 class_export_put(exp);
 
                 CDEBUG(D_OTHER, "client at idx %d has last_transno = "LPU64"\n",
@@ -537,7 +541,7 @@ static int mdt_server_data_init(const struct lu_env *env,
                       "last_transno "LPU64"\n", obd->obd_name,
                       obd->obd_recoverable_clients, mdt->mdt_last_transno);
                 obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
-                obd->obd_recovering = 1;
+                target_start_recovery_thread(obd, mdt_recovery_handle);
                 obd->obd_recovery_start = CURRENT_SECONDS;
                 /* Only used for lprocfs_status */
                 obd->obd_recovery_end = obd->obd_recovery_start +
@@ -780,7 +784,7 @@ static int mdt_last_rcvd_update(struct mdt_thread_info *mti,
 
         off = med->med_lr_off;
         mutex_down(&med->med_mcd_lock);
-        if(lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE) {
+        if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE) {
                 mcd->mcd_last_close_transno = mti->mti_transno;
                 mcd->mcd_last_close_xid = req->rq_xid;
                 mcd->mcd_last_close_result = rc;
@@ -869,7 +873,7 @@ static int mdt_txn_stop_cb(const struct lu_env *env,
 
         req->rq_transno = mti->mti_transno;
         lustre_msg_set_transno(req->rq_repmsg, mti->mti_transno);
-        target_committed_to_req(req);
+        //target_committed_to_req(req);
         lustre_msg_set_last_xid(req->rq_repmsg, req_exp_last_xid(req));
         /* save transno for the commit callback */
         txi->txi_transno = mti->mti_transno;
@@ -980,6 +984,7 @@ void mdt_fs_cleanup(const struct lu_env *env, struct mdt_device *mdt)
 }
 
 /* reconstruction code */
+extern void mds_steal_ack_locks(struct ptlrpc_request *req);
 void mdt_req_from_mcd(struct ptlrpc_request *req,
                       struct mdt_client_data *mcd)
 {
@@ -997,7 +1002,7 @@ void mdt_req_from_mcd(struct ptlrpc_request *req,
                 lustre_msg_set_transno(req->rq_repmsg, req->rq_transno);
                 lustre_msg_set_status(req->rq_repmsg, req->rq_status);
         }
-        //mds_steal_ack_locks(req);
+        mds_steal_ack_locks(req);
 }
 
 static void mdt_reconstruct_generic(struct mdt_thread_info *mti,
index 4ff9bbd..cf1f9f5 100644 (file)
@@ -475,7 +475,7 @@ int mgs_handle(struct ptlrpc_request *req)
         switch (opc) {
         case MGS_CONNECT:
                 DEBUG_REQ(D_MGS, req, "connect");
-                rc = target_handle_connect(req, mgs_handle);
+                rc = target_handle_connect(req);
                 if (!rc && (lustre_msg_get_conn_cnt(req->rq_reqmsg) > 1))
                         /* Make clients trying to reconnect after a MGS restart
                            happy; also requires obd_replayable */
index 86dbd59..0d89aea 100644 (file)
@@ -942,7 +942,8 @@ EXPORT_SYMBOL(class_disconnect_exports);
 
 /* Remove exports that have not completed recovery.
  */
-void class_disconnect_stale_exports(struct obd_device *obd)
+int class_disconnect_stale_exports(struct obd_device *obd,
+                                   int (*test_export)(struct obd_export *))
 {
         struct list_head work_list;
         struct list_head *pos, *n;
@@ -954,18 +955,23 @@ void class_disconnect_stale_exports(struct obd_device *obd)
         spin_lock(&obd->obd_dev_lock);
         list_for_each_safe(pos, n, &obd->obd_exports) {
                 exp = list_entry(pos, struct obd_export, exp_obd_chain);
-                if (exp->exp_replay_needed) {
-                        list_del(&exp->exp_obd_chain);
-                        list_add(&exp->exp_obd_chain, &work_list);
-                        cnt++;
-                }
+                if (test_export(exp))
+                        continue;
+                
+                list_del(&exp->exp_obd_chain);
+                list_add(&exp->exp_obd_chain, &work_list);
+                cnt++;
+                CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
+                       obd->obd_name, exp->exp_client_uuid.uuid,
+                       exp->exp_connection == NULL ? "<unknown>" :
+                       libcfs_nid2str(exp->exp_connection->c_peer.nid));
         }
         spin_unlock(&obd->obd_dev_lock);
 
         CDEBUG(D_ERROR, "%s: disconnecting %d stale clients\n",
                obd->obd_name, cnt);
         class_disconnect_export_list(&work_list, get_exp_flags_from_obd(obd));
-        EXIT;
+        RETURN(cnt);
 }
 EXPORT_SYMBOL(class_disconnect_stale_exports);
 
index 6e8bf9f..ab35ec4 100644 (file)
@@ -198,8 +198,9 @@ int class_attach(struct lustre_cfg *lcfg)
         cfs_init_timer(&obd->obd_recovery_timer);
         spin_lock_init(&obd->obd_processing_task_lock);
         cfs_waitq_init(&obd->obd_next_transno_waitq);
-        CFS_INIT_LIST_HEAD(&obd->obd_recovery_queue);
-        CFS_INIT_LIST_HEAD(&obd->obd_delayed_reply_queue);
+        CFS_INIT_LIST_HEAD(&obd->obd_req_replay_queue);
+        CFS_INIT_LIST_HEAD(&obd->obd_lock_replay_queue);
+        CFS_INIT_LIST_HEAD(&obd->obd_final_req_queue);
 
         spin_lock_init(&obd->obd_uncommitted_replies_lock);
         CFS_INIT_LIST_HEAD(&obd->obd_uncommitted_replies);
index a322154..17446d1 100644 (file)
@@ -541,7 +541,7 @@ int filter_update_last_objid(struct obd_device *obd, obd_gr group,
                        group, rc);
         RETURN(rc);
 }
-
+extern int ost_handle(struct ptlrpc_request *req);
 /* assumes caller has already in kernel ctxt */
 static int filter_init_server_data(struct obd_device *obd, struct file * filp)
 {
@@ -702,8 +702,11 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp)
                         LASSERTF(rc == 0, "rc = %d\n", rc); /* can't fail existing */
 
                         fcd = NULL;
-                        exp->exp_replay_needed = 1;
+                        exp->exp_req_replay_needed = 1;
+                        exp->exp_lock_replay_needed = 1;
                         exp->exp_connecting = 0;
+                        atomic_inc(&obd->obd_req_replay_clients);
+                        atomic_inc(&obd->obd_lock_replay_clients);
                         obd->obd_recoverable_clients++;
                         obd->obd_max_recoverable_clients++;
                         class_export_put(exp);
@@ -728,7 +731,7 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp)
                       obd->obd_recoverable_clients,
                       le64_to_cpu(fsd->lsd_last_transno));
                 obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
-                obd->obd_recovering = 1;
+                target_start_recovery_thread(obd, ost_handle);
                 obd->obd_recovery_start = CURRENT_SECONDS;
                 /* Only used for lprocfs_status */
                 obd->obd_recovery_end = obd->obd_recovery_start +
@@ -3768,7 +3771,7 @@ int filter_iocontrol(unsigned int cmd, struct obd_export *exp,
         switch (cmd) {
         case OBD_IOC_ABORT_RECOVERY: {
                 CERROR("aborting recovery for device %s\n", obd->obd_name);
-                target_abort_recovery(obd);
+                target_stop_recovery_thread(obd);
                 RETURN(0);
         }
 
index 8def3b1..e55c527 100644 (file)
@@ -1313,7 +1313,7 @@ int ost_msg_check_version(struct lustre_msg *msg)
         return rc;
 }
 
-static int ost_handle(struct ptlrpc_request *req)
+int ost_handle(struct ptlrpc_request *req)
 {
         struct obd_trans_info trans_info = { 0, };
         struct obd_trans_info *oti = &trans_info;
@@ -1333,7 +1333,7 @@ static int ost_handle(struct ptlrpc_request *req)
 
         /* XXX identical to MDS */
         if (lustre_msg_get_opc(req->rq_reqmsg) != OST_CONNECT) {
-                int abort_recovery, recovering;
+                int recovering;
 
                 if (req->rq_export == NULL) {
                         CDEBUG(D_HA,"operation %d on unconnected OST from %s\n",
@@ -1347,16 +1347,18 @@ static int ost_handle(struct ptlrpc_request *req)
 
                 /* Check for aborted recovery. */
                 spin_lock_bh(&obd->obd_processing_task_lock);
-                abort_recovery = obd->obd_abort_recovery;
                 recovering = obd->obd_recovering;
                 spin_unlock_bh(&obd->obd_processing_task_lock);
-                if (abort_recovery) {
-                        target_abort_recovery(obd);
-                } else if (recovering) {
+                if (recovering) {
                         rc = ost_filter_recovery_request(req, obd,
                                                          &should_process);
                         if (rc || !should_process)
                                 RETURN(rc);
+                        else if (should_process < 0) {
+                                req->rq_status = should_process;
+                                rc = ptlrpc_error(req);
+                                RETURN(rc);
+                        }
                 }
         }
 
@@ -1369,7 +1371,7 @@ static int ost_handle(struct ptlrpc_request *req)
         case OST_CONNECT: {
                 CDEBUG(D_INODE, "connect\n");
                 OBD_FAIL_RETURN(OBD_FAIL_OST_CONNECT_NET, 0);
-                rc = target_handle_connect(req, ost_handle);
+                rc = target_handle_connect(req);
                 if (!rc)
                         obd = req->rq_export->exp_obd;
                 break;
@@ -1519,22 +1521,13 @@ static int ost_handle(struct ptlrpc_request *req)
                 target_committed_to_req(req);
 
 out:
-        if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
-                if (obd && obd->obd_recovering) {
-                        DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
-                        return target_queue_final_reply(req, rc);
-                }
-                /* Lost a race with recovery; let the error path DTRT. */
-                rc = req->rq_status = -ENOTCONN;
-        }
-
         if (!rc)
                 oti_to_request(oti, req);
 
         target_send_reply(req, rc, fail);
         return 0;
 }
-
+EXPORT_SYMBOL(ost_handle);
 /*
  * free per-thread pool created by ost_thread_init().
  */
index 4e37dea..371a48a 100644 (file)
@@ -703,9 +703,15 @@ static int after_reply(struct ptlrpc_request *req)
                 }
 
                 /* Replay-enabled imports return commit-status information. */
-                if (lustre_msg_get_last_committed(req->rq_repmsg))
+                if (lustre_msg_get_last_committed(req->rq_repmsg)) {
+                        if (imp->imp_peer_committed_transno >
+                                lustre_msg_get_last_committed(req->rq_repmsg))
+                                CERROR(" Import has "LPU64", receive "LPU64" committed\n",
+                                       imp->imp_peer_committed_transno, 
+                                       lustre_msg_get_last_committed(req->rq_repmsg));
                         imp->imp_peer_committed_transno =
                                 lustre_msg_get_last_committed(req->rq_repmsg);
+                }
                 ptlrpc_free_committed(imp);
                 spin_unlock(&imp->imp_lock);
         }
@@ -1808,6 +1814,7 @@ static int ptlrpc_replay_interpret(struct ptlrpc_request *req,
         }
 
         spin_lock(&imp->imp_lock);
+        LASSERT(req->rq_transno <= imp->imp_last_replay_transno);
         imp->imp_last_replay_transno = req->rq_transno;
         spin_unlock(&imp->imp_lock);
 
index 740450c..46bb13c 100644 (file)
@@ -327,10 +327,27 @@ static int import_select_connection(struct obd_import *imp)
         RETURN(0);
 }
 
+/*
+ * must be called under imp_lock
+ */
+int ptlrpc_first_transno(struct obd_import *imp, __u64 *transno)
+{
+        struct ptlrpc_request *req;
+        struct list_head *tmp;
+        
+        if (list_empty(&imp->imp_replay_list))
+                return 0;
+        tmp = imp->imp_replay_list.next;
+        req = list_entry(tmp, struct ptlrpc_request, rq_replay_list);
+        *transno = req->rq_transno;
+        return 1;
+}
+
 int ptlrpc_connect_import(struct obd_import *imp, char *new_uuid)
 {
         struct obd_device *obd = imp->imp_obd;
         int initial_connect = 0;
+        int set_transno = 0;
         int rc;
         __u64 committed_before_reconnect = 0;
         struct ptlrpc_request *request;
@@ -373,6 +390,7 @@ int ptlrpc_connect_import(struct obd_import *imp, char *new_uuid)
         else
                 committed_before_reconnect = imp->imp_peer_committed_transno;
 
+        set_transno = ptlrpc_first_transno(imp, &imp->imp_connect_data.ocd_transno);
         spin_unlock(&imp->imp_lock);
 
         if (new_uuid) {
@@ -444,8 +462,15 @@ int ptlrpc_connect_import(struct obd_import *imp, char *new_uuid)
                 /* On an initial connect, we don't know which one of a
                    failover server pair is up.  Don't wait long. */
                 request->rq_timeout = max((int)(obd_timeout / 20), 5);
+                lustre_msg_add_op_flags(request->rq_reqmsg, 
+                                        MSG_CONNECT_INITIAL);
+
         }
 
+        if (set_transno)
+                lustre_msg_add_op_flags(request->rq_reqmsg, 
+                                        MSG_CONNECT_TRANSNO);
+
         DEBUG_REQ(D_RPCTRACE, request, "(re)connect request");
         ptlrpcd_add_req(request);
         rc = 0;
@@ -803,7 +828,8 @@ static int signal_completed_replay(struct obd_import *imp)
 
         ptlrpc_req_set_repsize(req, 1, NULL);
         req->rq_send_state = LUSTRE_IMP_REPLAY_WAIT;
-        lustre_msg_add_flags(req->rq_reqmsg, MSG_LAST_REPLAY);
+        lustre_msg_add_flags(req->rq_reqmsg, 
+                             MSG_LOCK_REPLAY_DONE | MSG_REQ_REPLAY_DONE);
         req->rq_timeout *= 3;
         req->rq_interpret_reply = completed_replay_interpret;
 
index 9177ae1..2e23555 100644 (file)
@@ -1501,18 +1501,18 @@ void lustre_swab_ptlrpc_body(struct ptlrpc_body *b)
 void lustre_swab_connect(struct obd_connect_data *ocd)
 {
         __swab64s(&ocd->ocd_connect_flags);
+        __swab64s(&ocd->ocd_transno);
+        __swab64s(&ocd->ocd_ibits_known);
         __swab32s(&ocd->ocd_version);
         __swab32s(&ocd->ocd_grant);
         __swab32s(&ocd->ocd_index);
         __swab32s(&ocd->ocd_brw_size);
-        __swab64s(&ocd->ocd_ibits_known);
         __swab32s(&ocd->ocd_nllu);
         __swab32s(&ocd->ocd_nllg);
         __swab32s(&ocd->ocd_group);
         CLASSERT(offsetof(typeof(*ocd), padding1) != 0);
         CLASSERT(offsetof(typeof(*ocd), padding2) != 0);
         CLASSERT(offsetof(typeof(*ocd), padding3) != 0);
-        CLASSERT(offsetof(typeof(*ocd), padding4) != 0);
 }
 
 void lustre_swab_obdo (struct obdo  *o)
index 1849656..26ac533 100644 (file)
@@ -75,7 +75,6 @@ int ptlrpc_replay_next(struct obd_import *imp, int *inflight)
         ptlrpc_free_committed(imp);
         last_transno = imp->imp_last_replay_transno;
         spin_unlock(&imp->imp_lock);
-
         CDEBUG(D_HA, "import %p from %s committed "LPU64" last "LPU64"\n",
                imp, obd2cli_tgt(imp->imp_obd),
                imp->imp_peer_committed_transno, last_transno);
@@ -97,7 +96,6 @@ int ptlrpc_replay_next(struct obd_import *imp, int *inflight)
          */
         list_for_each_safe(tmp, pos, &imp->imp_replay_list) {
                 req = list_entry(tmp, struct ptlrpc_request, rq_replay_list);
-
                 /* If need to resend the last sent transno (because a
                    reconnect has occurred), then stop on the matching
                    req and send it again. If, however, the last sent
index b97130f..d4955c3 100644 (file)
@@ -135,17 +135,17 @@ static void check_obd_connect_data(void)
         BLANK_LINE();
         CHECK_STRUCT(obd_connect_data);
         CHECK_MEMBER(obd_connect_data, ocd_connect_flags);
+        CHECK_MEMBER(obd_connect_data, ocd_transno);
+        CHECK_MEMBER(obd_connect_data, ocd_ibits_known);
         CHECK_MEMBER(obd_connect_data, ocd_version);
         CHECK_MEMBER(obd_connect_data, ocd_grant);
         CHECK_MEMBER(obd_connect_data, ocd_index);
         CHECK_MEMBER(obd_connect_data, ocd_brw_size);
-        CHECK_MEMBER(obd_connect_data, ocd_ibits_known);
         CHECK_MEMBER(obd_connect_data, ocd_nllu);
         CHECK_MEMBER(obd_connect_data, ocd_nllg);
         CHECK_MEMBER(obd_connect_data, padding1);
         CHECK_MEMBER(obd_connect_data, padding2);
         CHECK_MEMBER(obd_connect_data, padding3);
-        CHECK_MEMBER(obd_connect_data, padding4);
 
         CHECK_CDEFINE(OBD_CONNECT_RDONLY);
         CHECK_CDEFINE(OBD_CONNECT_INDEX);