Whamcloud - gitweb
Don't use SLAB_KERNEL (GFP_KERNEL) allocations for locks and resources.
[fs/lustre-release.git] / lustre / ldlm / ldlm_lib.c
index 1b04a74..30d3ac5 100644 (file)
@@ -19,7 +19,9 @@
  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
  */
 
-#define EXPORT_SYMTAB
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
 #define DEBUG_SUBSYSTEM S_LDLM
 
 #ifdef __KERNEL__
 #else
 # include <liblustre.h>
 #endif
-#include <linux/obd_ost.h>
+#include <linux/obd.h>
+#include <linux/obd_ost.h> /* for LUSTRE_OSC_NAME */
+#include <linux/lustre_mds.h> /* for LUSTRE_MDC_NAME */
+#include <linux/lustre_mgmt.h>
 #include <linux/lustre_dlm.h>
-#include <linux/lustre_mds.h>
 #include <linux/lustre_net.h>
 
-int client_import_connect(struct lustre_handle *dlm_handle,
+int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf)
+{
+        struct ptlrpc_connection *conn;
+        struct lustre_cfg* lcfg = buf;
+        struct client_obd *cli = &obddev->u.cli;
+        struct obd_import *imp;
+        struct obd_uuid server_uuid;
+        int rq_portal, rp_portal, connect_op;
+        char *name = obddev->obd_type->typ_name;
+        char *mgmt_name = NULL;
+        int rc = 0;
+        struct obd_device *mgmt_obd;
+        mgmtcli_register_for_events_t register_f;
+        ENTRY;
+
+        /* In a more perfect world, we would hang a ptlrpc_client off of
+         * obd_type and just use the values from there. */
+        if (!strcmp(name, LUSTRE_OSC_NAME)) {
+                rq_portal = OST_REQUEST_PORTAL;
+                rp_portal = OSC_REPLY_PORTAL;
+                connect_op = OST_CONNECT;
+        } else if (!strcmp(name, LUSTRE_MDC_NAME)) {
+                rq_portal = MDS_REQUEST_PORTAL;
+                rp_portal = MDC_REPLY_PORTAL;
+                connect_op = MDS_CONNECT;
+        } else if (!strcmp(name, LUSTRE_MGMTCLI_NAME)) {
+                rq_portal = MGMT_REQUEST_PORTAL;
+                rp_portal = MGMT_REPLY_PORTAL;
+                connect_op = MGMT_CONNECT;
+        } else {
+                CERROR("unknown client OBD type \"%s\", can't setup\n",
+                       name);
+                RETURN(-EINVAL);
+        }
+
+        if (lcfg->lcfg_inllen1 < 1) {
+                CERROR("requires a TARGET UUID\n");
+                RETURN(-EINVAL);
+        }
+
+        if (lcfg->lcfg_inllen1 > 37) {
+                CERROR("client UUID must be less than 38 characters\n");
+                RETURN(-EINVAL);
+        }
+
+        if (lcfg->lcfg_inllen2 < 1) {
+                CERROR("setup requires a SERVER UUID\n");
+                RETURN(-EINVAL);
+        }
+
+        if (lcfg->lcfg_inllen2 > 37) {
+                CERROR("target UUID must be less than 38 characters\n");
+                RETURN(-EINVAL);
+        }
+
+        sema_init(&cli->cl_sem, 1);
+        cli->cl_conn_count = 0;
+        memcpy(server_uuid.uuid, lcfg->lcfg_inlbuf2, MIN(lcfg->lcfg_inllen2,
+                                                        sizeof(server_uuid)));
+
+        init_MUTEX(&cli->cl_dirty_sem);
+        cli->cl_dirty = 0;
+        cli->cl_dirty_granted = 0;
+        cli->cl_dirty_max = OSC_MAX_DIRTY_DEFAULT * 1024 * 1024;
+        cli->cl_ost_can_grant = 1;
+        INIT_LIST_HEAD(&cli->cl_cache_waiters);
+        INIT_LIST_HEAD(&cli->cl_loi_ready_list);
+        spin_lock_init(&cli->cl_loi_list_lock);
+        cli->cl_brw_in_flight = 0;
+        spin_lock_init(&cli->cl_read_rpc_hist.oh_lock);
+        spin_lock_init(&cli->cl_write_rpc_hist.oh_lock);
+        spin_lock_init(&cli->cl_read_page_hist.oh_lock);
+        spin_lock_init(&cli->cl_write_page_hist.oh_lock);
+        cli->cl_max_pages_per_rpc = PTL_MD_MAX_PAGES;
+        cli->cl_max_rpcs_in_flight = OSC_MAX_RIF_DEFAULT;
+
+        ldlm_get_ref();
+        if (rc) {
+                CERROR("ldlm_get_ref failed: %d\n", rc);
+                GOTO(err, rc);
+        }
+
+        conn = ptlrpc_uuid_to_connection(&server_uuid);
+        if (conn == NULL)
+                GOTO(err_ldlm, rc = -ENOENT);
+
+        ptlrpc_init_client(rq_portal, rp_portal, name,
+                           &obddev->obd_ldlm_client);
+
+        imp = class_new_import();
+        if (imp == NULL) {
+                ptlrpc_put_connection(conn);
+                GOTO(err_ldlm, rc = -ENOENT);
+        }
+        imp->imp_connection = conn;
+        imp->imp_client = &obddev->obd_ldlm_client;
+        imp->imp_obd = obddev;
+        imp->imp_connect_op = connect_op;
+        imp->imp_generation = 0;
+        INIT_LIST_HEAD(&imp->imp_pinger_chain);
+        memcpy(imp->imp_target_uuid.uuid, lcfg->lcfg_inlbuf1,
+              lcfg->lcfg_inllen1);
+        class_import_put(imp);
+
+        cli->cl_import = imp;
+        cli->cl_max_mds_easize = sizeof(struct lov_mds_md);
+        cli->cl_max_mds_cookiesize = sizeof(struct llog_cookie);
+        cli->cl_sandev = to_kdev_t(0);
+
+        if (lcfg->lcfg_inllen3 != 0) {
+                if (!strcmp(lcfg->lcfg_inlbuf3, "inactive")) {
+                        CDEBUG(D_HA, "marking %s %s->%s as inactive\n",
+                               name, obddev->obd_name,
+                               imp->imp_target_uuid.uuid);
+                        imp->imp_invalid = 1;
+
+                        if (lcfg->lcfg_inllen4 != 0)
+                                mgmt_name = lcfg->lcfg_inlbuf4;
+                } else {
+                        mgmt_name = lcfg->lcfg_inlbuf3;
+                }
+        }
+
+        if (mgmt_name != NULL) {
+                /* Register with management client if we need to. */
+                CDEBUG(D_HA, "%s registering with %s for events about %s\n",
+                       obddev->obd_name, mgmt_name, server_uuid.uuid);
+
+                mgmt_obd = class_name2obd(mgmt_name);
+                if (!mgmt_obd) {
+                        CERROR("can't find mgmtcli %s to register\n",
+                               mgmt_name);
+                        GOTO(err_import, rc = -ENOSYS);
+                }
+
+                register_f = inter_module_get("mgmtcli_register_for_events");
+                if (!register_f) {
+                        CERROR("can't i_m_g mgmtcli_register_for_events\n");
+                        GOTO(err_import, rc = -ENOSYS);
+                }
+
+                rc = register_f(mgmt_obd, obddev, &imp->imp_target_uuid);
+                inter_module_put("mgmtcli_register_for_events");
+
+                if (!rc)
+                        cli->cl_mgmtcli_obd = mgmt_obd;
+        }
+
+        RETURN(rc);
+
+err_import:
+        class_destroy_import(imp);
+err_ldlm:
+        ldlm_put_ref(0);
+err:
+        RETURN(rc);
+
+}
+
+int client_obd_cleanup(struct obd_device *obddev, int flags)
+{
+        struct client_obd *cli = &obddev->u.cli;
+
+        if (!cli->cl_import)
+                RETURN(-EINVAL);
+        if (cli->cl_mgmtcli_obd) {
+                mgmtcli_deregister_for_events_t dereg_f;
+
+                dereg_f = inter_module_get("mgmtcli_deregister_for_events");
+                dereg_f(cli->cl_mgmtcli_obd, obddev);
+                inter_module_put("mgmtcli_deregister_for_events");
+        }
+        class_destroy_import(cli->cl_import);
+        cli->cl_import = NULL;
+
+        ldlm_put_ref(flags & OBD_OPT_FORCE);
+
+        RETURN(0);
+}
+
+int client_connect_import(struct lustre_handle *dlm_handle,
                           struct obd_device *obd,
                           struct obd_uuid *cluuid)
 {
         struct client_obd *cli = &obd->u.cli;
         struct obd_import *imp = cli->cl_import;
         struct obd_export *exp;
-        struct ptlrpc_request *request;
-        /* XXX maybe this is a good time to create a connect struct? */
-        int rc, size[] = {sizeof(imp->imp_target_uuid),
-                          sizeof(obd->obd_uuid),
-                          sizeof(*dlm_handle)};
-        char *tmp[] = {imp->imp_target_uuid.uuid,
-                       obd->obd_uuid.uuid,
-                       (char *)dlm_handle};
-        int msg_flags;
-
+        int rc;
         ENTRY;
+
         down(&cli->cl_sem);
         rc = class_connect(dlm_handle, obd, cluuid);
         if (rc)
@@ -58,6 +234,7 @@ int client_import_connect(struct lustre_handle *dlm_handle,
         cli->cl_conn_count++;
         if (cli->cl_conn_count > 1)
                 GOTO(out_sem, rc);
+        exp = class_conn2export(dlm_handle);
 
         if (obd->obd_namespace != NULL)
                 CERROR("already have namespace!\n");
@@ -66,78 +243,56 @@ int client_import_connect(struct lustre_handle *dlm_handle,
         if (obd->obd_namespace == NULL)
                 GOTO(out_disco, rc = -ENOMEM);
 
-        request = ptlrpc_prep_req(imp, imp->imp_connect_op, 3, size, tmp);
-        if (!request)
-                GOTO(out_ldlm, rc = -ENOMEM);
-
-        request->rq_level = LUSTRE_CONN_NEW;
-        request->rq_replen = lustre_msg_size(0, NULL);
-
-        lustre_msg_add_op_flags(request->rq_reqmsg, MSG_CONNECT_PEER);
-
         imp->imp_dlm_handle = *dlm_handle;
+        imp->imp_state = LUSTRE_IMP_DISCON;
 
-        imp->imp_level = LUSTRE_CONN_CON;
-        rc = ptlrpc_queue_wait(request);
-        if (rc) {
-                class_disconnect(dlm_handle, 0);
-                GOTO(out_req, rc);
+        rc = ptlrpc_connect_import(imp);
+        if (rc != 0) {
+                LASSERT (imp->imp_state == LUSTRE_IMP_DISCON);
+                GOTO(out_ldlm, rc);
         }
 
-        exp = class_conn2export(dlm_handle);
-        exp->exp_connection = ptlrpc_connection_addref(request->rq_connection);
-        class_export_put(exp);
+        LASSERT (imp->imp_state == LUSTRE_IMP_FULL);
+
+        exp->exp_connection = ptlrpc_connection_addref(imp->imp_connection);
 
-        msg_flags = lustre_msg_get_op_flags(request->rq_repmsg);
-        if (msg_flags & MSG_CONNECT_REPLAYABLE) {
-                imp->imp_replayable = 1;
+        if (imp->imp_replayable) {
                 CDEBUG(D_HA, "connected to replayable target: %s\n",
                        imp->imp_target_uuid.uuid);
                 ptlrpc_pinger_add_import(imp);
         }
-        imp->imp_level = LUSTRE_CONN_FULL;
-        imp->imp_remote_handle = request->rq_repmsg->handle;
+
         CDEBUG(D_HA, "local import: %p, remote handle: "LPX64"\n", imp,
                imp->imp_remote_handle.cookie);
 
         EXIT;
-out_req:
-        ptlrpc_req_finished(request);
+
         if (rc) {
 out_ldlm:
-                ldlm_namespace_free(obd->obd_namespace);
+                ldlm_namespace_free(obd->obd_namespace, 0);
                 obd->obd_namespace = NULL;
 out_disco:
                 cli->cl_conn_count--;
-                class_disconnect(dlm_handle, 0);
+                class_disconnect(exp, 0);
+        } else {
+                class_export_put(exp);
         }
 out_sem:
         up(&cli->cl_sem);
         return rc;
 }
 
-int client_import_disconnect(struct lustre_handle *dlm_handle, int failover)
+int client_disconnect_export(struct obd_export *exp, int failover)
 {
-        struct obd_device *obd = class_conn2obd(dlm_handle);
+        struct obd_device *obd = class_exp2obd(exp);
         struct client_obd *cli = &obd->u.cli;
         struct obd_import *imp = cli->cl_import;
-        struct ptlrpc_request *request = NULL;
-        int rc = 0, err, rq_opc;
+        int rc = 0, err;
         ENTRY;
 
         if (!obd) {
-                CERROR("invalid connection for disconnect: cookie "LPX64"\n",
-                       dlm_handle ? dlm_handle->cookie : -1UL);
-                RETURN(-EINVAL);
-        }
-
-        switch (imp->imp_connect_op) {
-        case OST_CONNECT: rq_opc = OST_DISCONNECT; break;
-        case MDS_CONNECT: rq_opc = MDS_DISCONNECT; break;
-        case MGMT_CONNECT:rq_opc = MGMT_DISCONNECT;break;
-        default:
-                CERROR("don't know how to disconnect from %s (connect_op %d)\n",
-                       imp->imp_target_uuid.uuid, imp->imp_connect_op);
+                CERROR("invalid export for disconnect: exp %p cookie "LPX64"\n",
+                       exp, exp ? exp->exp_handle.h_cookie : -1);
                 RETURN(-EINVAL);
         }
 
@@ -152,37 +307,30 @@ int client_import_disconnect(struct lustre_handle *dlm_handle, int failover)
         if (cli->cl_conn_count)
                 GOTO(out_no_disconnect, rc = 0);
 
+        /* Some non-replayable imports (MDS's OSCs) are pinged, so just
+         * delete it regardless.  (It's safe to delete an import that was
+         * never added.) */
+        (void)ptlrpc_pinger_del_import(imp);
+
         if (obd->obd_namespace != NULL) {
                 /* obd_no_recov == local only */
                 ldlm_cli_cancel_unused(obd->obd_namespace, NULL,
                                        obd->obd_no_recov, NULL);
-                ldlm_namespace_free(obd->obd_namespace);
+                ldlm_namespace_free(obd->obd_namespace, obd->obd_no_recov);
                 obd->obd_namespace = NULL;
         }
 
         /* Yeah, obd_no_recov also (mainly) means "forced shutdown". */
-        if (obd->obd_no_recov) {
+        if (obd->obd_no_recov)
                 ptlrpc_set_import_active(imp, 0);
-        } else {
-                request = ptlrpc_prep_req(imp, rq_opc, 0, NULL, NULL);
-                if (!request)
-                        GOTO(out_req, rc = -ENOMEM);
+        else
+                rc = ptlrpc_disconnect_import(imp);
 
-                request->rq_replen = lustre_msg_size(0, NULL);
-
-                rc = ptlrpc_queue_wait(request);
-                if (rc)
-                        GOTO(out_req, rc);
-        }
-        if (imp->imp_replayable)
-                ptlrpc_pinger_del_import(imp);
+        imp->imp_state = LUSTRE_IMP_NEW;
 
         EXIT;
- out_req:
-        if (request)
-                ptlrpc_req_finished(request);
  out_no_disconnect:
-        err = class_disconnect(dlm_handle, 0);
+        err = class_disconnect(exp, 0);
         if (!rc && err)
                 rc = err;
  out_sem:
@@ -199,7 +347,7 @@ int target_handle_reconnect(struct lustre_handle *conn, struct obd_export *exp,
 {
         if (exp->exp_connection) {
                 struct lustre_handle *hdl;
-                hdl = &exp->exp_ldlm_data.led_import->imp_remote_handle;
+                hdl = &exp->exp_imp_reverse->imp_remote_handle;
                 /* Might be a re-connect after a partition. */
                 if (!memcmp(&conn->cookie, &hdl->cookie, sizeof conn->cookie)) {
                         CERROR("%s reconnecting\n", cluuid->uuid);
@@ -211,12 +359,7 @@ int target_handle_reconnect(struct lustre_handle *conn, struct obd_export *exp,
                                LPX64")\n", cluuid->uuid,
                                exp->exp_connection->c_remote_uuid.uuid,
                                hdl->cookie, conn->cookie);
-                        /* XXX disconnect them here? */
                         memset(conn, 0, sizeof *conn);
-                        /* This is a little scary, but right now we build this
-                         * file separately into each server module, so I won't
-                         * go _immediately_ to hell.
-                         */
                         RETURN(-EALREADY);
                 }
         }
@@ -232,7 +375,7 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
 {
         struct obd_device *target;
         struct obd_export *export = NULL;
-        struct obd_import *dlmimp;
+        struct obd_import *revimp;
         struct lustre_handle conn;
         struct obd_uuid tgtuuid;
         struct obd_uuid cluuid;
@@ -251,6 +394,10 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
 
         obd_str2uuid (&tgtuuid, str);
         target = class_uuid2obd(&tgtuuid);
+        if (!target) {
+                target = class_name2obd(str);
+        }
+
         if (!target || target->obd_stopping || !target->obd_set_up) {
                 CERROR("UUID '%s' is not available for connect\n", str);
                 GOTO(out, rc = -ENODEV);
@@ -281,12 +428,12 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
 
         memcpy(&conn, tmp, sizeof conn);
 
-        rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
+        rc = lustre_pack_reply(req, 0, NULL, NULL);
         if (rc)
                 GOTO(out, rc);
 
         /* lctl gets a backstage, all-access pass. */
-        if (obd_uuid_equals(&cluuid, &lctl_fake_uuid))
+        if (obd_uuid_equals(&cluuid, &target->obd_uuid))
                 goto dont_check_exports;
 
         spin_lock(&target->obd_dev_lock);
@@ -302,8 +449,13 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
                 export = NULL;
         }
         /* If we found an export, we already unlocked. */
-        if (!export)
+        if (!export) {
                 spin_unlock(&target->obd_dev_lock);
+        } else if (req->rq_reqmsg->conn_cnt == 1) {
+                CERROR("%s reconnected with 1 conn_cnt; cookies not random?\n",
+                       cluuid.uuid);
+                GOTO(out, rc = -EALREADY);
+        }
 
         /* Tell the client if we're in recovery. */
         /* If this is the first client, start the recovery timer */
@@ -319,7 +471,9 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
         if (export == NULL) {
                 if (target->obd_recovering) {
                         CERROR("denying connection for new client %s: "
-                               "in recovery\n", cluuid.uuid);
+                               "%d clients in recovery for %lds\n", cluuid.uuid,
+                               target->obd_recoverable_clients,
+                               (target->obd_recovery_timer.expires-jiffies)/HZ);
                         rc = -EBUSY;
                 } else {
  dont_check_exports:
@@ -333,6 +487,11 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
         if (rc && rc != EALREADY)
                 GOTO(out, rc);
 
+        /* XXX track this all the time? */
+        if (target->obd_recovering) {
+                target->obd_connected_clients++;
+        }
+
         req->rq_repmsg->handle = conn;
 
         /* If the client and the server are the same node, we will already
@@ -357,6 +516,9 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
                                                        &remote_uuid);
         req->rq_connection = ptlrpc_connection_addref(export->exp_connection);
 
+        LASSERT(export->exp_conn_cnt < req->rq_reqmsg->conn_cnt);
+        export->exp_conn_cnt = req->rq_reqmsg->conn_cnt;
+
         if (rc == EALREADY) {
                 /* We indicate the reconnection in a flag, not an error code. */
                 lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECONNECT);
@@ -366,16 +528,16 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
         memcpy(&conn, lustre_msg_buf(req->rq_reqmsg, 2, sizeof conn),
                sizeof conn);
 
-        if (export->exp_ldlm_data.led_import != NULL)
-                class_destroy_import(export->exp_ldlm_data.led_import);
-        dlmimp = export->exp_ldlm_data.led_import = class_new_import();
-        dlmimp->imp_connection = ptlrpc_connection_addref(req->rq_connection);
-        dlmimp->imp_client = &export->exp_obd->obd_ldlm_client;
-        dlmimp->imp_remote_handle = conn;
-        dlmimp->imp_obd = target;
-        dlmimp->imp_dlm_fake = 1;
-        dlmimp->imp_level = LUSTRE_CONN_FULL;
-        class_import_put(dlmimp);
+        if (export->exp_imp_reverse != NULL)
+                class_destroy_import(export->exp_imp_reverse);
+        revimp = export->exp_imp_reverse = class_new_import();
+        revimp->imp_connection = ptlrpc_connection_addref(req->rq_connection);
+        revimp->imp_client = &export->exp_obd->obd_ldlm_client;
+        revimp->imp_remote_handle = conn;
+        revimp->imp_obd = target;
+        revimp->imp_dlm_fake = 1;
+        revimp->imp_state = LUSTRE_IMP_FULL;
+        class_import_put(revimp);
 out:
         if (rc)
                 req->rq_status = rc;
@@ -384,25 +546,30 @@ out:
 
 int target_handle_disconnect(struct ptlrpc_request *req)
 {
-        struct lustre_handle *conn = &req->rq_reqmsg->handle;
-        struct obd_import *dlmimp;
         int rc;
         ENTRY;
 
-        rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
+        rc = lustre_pack_reply(req, 0, NULL, NULL);
         if (rc)
                 RETURN(rc);
 
-        req->rq_status = obd_disconnect(conn, 0);
-
-        dlmimp = req->rq_export->exp_ldlm_data.led_import;
-        class_destroy_import(dlmimp);
-
-        class_export_put(req->rq_export);
+        req->rq_status = obd_disconnect(req->rq_export, 0);
         req->rq_export = NULL;
         RETURN(0);
 }
 
+void target_destroy_export(struct obd_export *exp)
+{
+        /* exports created from last_rcvd data, and "fake"
+           exports created by lctl don't have an import */
+        if (exp->exp_imp_reverse != NULL)
+                class_destroy_import(exp->exp_imp_reverse);
+
+        /* We cancel locks at disconnect time, but this will catch any locks
+         * granted in a race with recovery-induced disconnect. */
+        ldlm_cancel_locks_for_export(exp);
+}
+
 /*
  * Recovery functions
  */
@@ -439,8 +606,7 @@ static void abort_recovery_queue(struct obd_device *obd)
                 DEBUG_REQ(D_ERROR, req, "aborted:");
                 req->rq_status = -ENOTCONN;
                 req->rq_type = PTL_RPC_MSG_ERR;
-                rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen,
-                                     &req->rq_repmsg);
+                rc = lustre_pack_reply(req, 0, NULL, NULL);
                 if (rc == 0) {
                         ptlrpc_reply(req);
                 } else {
@@ -457,6 +623,7 @@ static void abort_recovery_queue(struct obd_device *obd)
 void target_abort_recovery(void *data)
 {
         struct obd_device *obd = data;
+        int rc;
 
         CERROR("disconnecting clients and aborting recovery\n");
         spin_lock_bh(&obd->obd_processing_task_lock);
@@ -467,11 +634,22 @@ void target_abort_recovery(void *data)
         }
 
         obd->obd_recovering = obd->obd_abort_recovery = 0;
-        obd->obd_recoverable_clients = 0;
+
         wake_up(&obd->obd_next_transno_waitq);
         target_cancel_recovery_timer(obd);
         spin_unlock_bh(&obd->obd_processing_task_lock);
+
         class_disconnect_exports(obd, 0);
+
+        /* when recovery was abort, cleanup orphans for mds */
+        if (OBT(obd) && OBP(obd, postrecov)) {
+                rc = OBP(obd, postrecov)(obd);
+                if (rc >= 0)
+                        CWARN("Cleanup %d orphans after recovery was aborted\n", rc);
+                else
+                        CERROR("postrecov failed %d\n", rc);
+        }
+
         abort_delayed_replies(obd);
         abort_recovery_queue(obd);
         ptlrpc_run_recovery_over_upcall(obd);
@@ -496,7 +674,8 @@ static void reset_recovery_timer(struct obd_device *obd)
 
         if (!recovering)
                 return;
-        CERROR("timer will expire in %d seconds\n", OBD_RECOVERY_TIMEOUT / HZ);
+        CDEBUG(D_HA, "timer will expire in %u seconds\n",
+               OBD_RECOVERY_TIMEOUT / HZ);
         mod_timer(&obd->obd_recovery_timer, jiffies + OBD_RECOVERY_TIMEOUT);
 }
 
@@ -509,7 +688,8 @@ void target_start_recovery_timer(struct obd_device *obd, svc_handler_t handler)
                 spin_unlock_bh(&obd->obd_processing_task_lock);
                 return;
         }
-        CERROR("%s: starting recovery timer\n", obd->obd_name);
+        CWARN("%s: starting recovery timer (%us)\n", obd->obd_name,
+               OBD_RECOVERY_TIMEOUT / HZ);
         obd->obd_recovery_handler = handler;
         obd->obd_recovery_timer.function = target_recovery_expired;
         obd->obd_recovery_timer.data = (unsigned long)obd;
@@ -522,17 +702,38 @@ void target_start_recovery_timer(struct obd_device *obd, svc_handler_t handler)
 static int check_for_next_transno(struct obd_device *obd)
 {
         struct ptlrpc_request *req;
-        int wake_up;
+        int wake_up = 0, connected, completed, queue_len, max;
+        __u64 next_transno, req_transno;
 
+        spin_lock_bh(&obd->obd_processing_task_lock);
         req = list_entry(obd->obd_recovery_queue.next,
                          struct ptlrpc_request, rq_list);
-        LASSERT(req->rq_reqmsg->transno >= obd->obd_next_recovery_transno);
-
-        wake_up = req->rq_reqmsg->transno == obd->obd_next_recovery_transno ||
-                (obd->obd_recovering) == 0;
-        CDEBUG(D_HA, "check_for_next_transno: "LPD64" vs "LPD64", %d == %d\n",
-               req->rq_reqmsg->transno, obd->obd_next_recovery_transno,
-               obd->obd_recovering, wake_up);
+        max = obd->obd_max_recoverable_clients;
+        req_transno = req->rq_reqmsg->transno;
+        connected = obd->obd_connected_clients;
+        completed = max - obd->obd_recoverable_clients;
+        queue_len = obd->obd_requests_queued_for_recovery;
+        next_transno = obd->obd_next_recovery_transno;
+
+        if (obd->obd_abort_recovery) {
+                CDEBUG(D_HA, "waking for aborted recovery\n");
+                wake_up = 1;
+        } else if (!obd->obd_recovering) {
+                CDEBUG(D_HA, "waking for completed recovery (?)\n");
+                wake_up = 1;
+        } else if (req_transno == next_transno) {
+                CDEBUG(D_HA, "waking for next ("LPD64")\n", next_transno);
+                wake_up = 1;
+        } else if (queue_len + completed == max) {
+                CDEBUG(D_ERROR,
+                       "waking for skipped transno (skip: "LPD64
+                       ", ql: %d, comp: %d, conn: %d, next: "LPD64")\n",
+                       next_transno, queue_len, completed, max, req_transno);
+                obd->obd_next_recovery_transno = req_transno;
+                wake_up = 1;
+        }
+        spin_unlock_bh(&obd->obd_processing_task_lock);
+        LASSERT(req->rq_reqmsg->transno >= next_transno);
         return wake_up;
 }
 
@@ -567,10 +768,12 @@ static void process_recovery_queue(struct obd_device *obd)
                         continue;
                 }
                 list_del_init(&req->rq_list);
+                obd->obd_requests_queued_for_recovery--;
                 spin_unlock_bh(&obd->obd_processing_task_lock);
 
-                DEBUG_REQ(D_ERROR, req, "processing: ");
+                DEBUG_REQ(D_HA, req, "processing: ");
                 (void)obd->obd_recovery_handler(req);
+                obd->obd_replayed_requests++;
                 reset_recovery_timer(obd);
                 /* bug 1580: decide how to properly sync() in recovery */
                 //mds_fsync_super(mds->mds_sb);
@@ -659,12 +862,13 @@ int target_queue_recovery_request(struct ptlrpc_request *req,
                 list_add_tail(&req->rq_list, &obd->obd_recovery_queue);
         }
 
+        obd->obd_requests_queued_for_recovery++;
+
         if (obd->obd_processing_task != 0) {
                 /* Someone else is processing this queue, we'll leave it to
                  * them.
                  */
-                if (transno == obd->obd_next_recovery_transno)
-                        wake_up(&obd->obd_next_transno_waitq);
+                wake_up(&obd->obd_next_transno_waitq);
                 spin_unlock_bh(&obd->obd_processing_task_lock);
                 return 0;
         }
@@ -690,16 +894,17 @@ int target_queue_final_reply(struct ptlrpc_request *req, int rc)
         struct ptlrpc_request *saved_req;
         struct lustre_msg *reqmsg;
         int recovery_done = 0;
+        int rc2;
 
         if (rc) {
                 /* Just like ptlrpc_error, but without the sending. */
-                lustre_pack_msg(0, NULL, NULL, &req->rq_replen,
-                                &req->rq_repmsg);
+                rc = lustre_pack_reply(req, 0, NULL, NULL);
+                LASSERT(rc == 0); /* XXX handle this */
                 req->rq_type = PTL_RPC_MSG_ERR;
         }
 
         LASSERT(list_empty(&req->rq_list));
-        /* XXX just like the request-dup code in queue_recovery_request */
+        /* XXX a bit like the request-dup code in queue_recovery_request */
         OBD_ALLOC(saved_req, sizeof *saved_req);
         if (!saved_req)
                 LBUG();
@@ -720,9 +925,20 @@ int target_queue_final_reply(struct ptlrpc_request *req, int rc)
         if (recovery_done) {
                 struct list_head *tmp, *n;
                 ldlm_reprocess_all_ns(req->rq_export->exp_obd->obd_namespace);
-                CERROR("%s: all clients recovered, sending delayed replies\n",
+                CWARN("%s: all clients recovered, sending delayed replies\n",
                        obd->obd_name);
                 obd->obd_recovering = 0;
+
+                /* when recovering finished, cleanup orphans for mds */
+                if (OBT(obd) && OBP(obd, postrecov)) {
+                        rc2 = OBP(obd, postrecov)(obd);
+                        if (rc2 >= 0)
+                                CWARN("%s: all clients recovered, %d MDS orphans "
+                                       "deleted\n", obd->obd_name, rc2);
+                        else
+                                CERROR("postrecov failed %d\n", rc2);
+                }
+
                 list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) {
                         req = list_entry(tmp, struct ptlrpc_request, rq_list);
                         DEBUG_REQ(D_ERROR, req, "delayed:");
@@ -733,8 +949,9 @@ int target_queue_final_reply(struct ptlrpc_request *req, int rc)
                 }
                 target_cancel_recovery_timer(obd);
         } else {
-                CERROR("%s: %d recoverable clients remain\n",
+                CWARN("%s: %d recoverable clients remain\n",
                        obd->obd_name, obd->obd_recoverable_clients);
+                wake_up(&obd->obd_next_transno_waitq);
         }
 
         return 1;
@@ -789,7 +1006,7 @@ static void ptlrpc_abort_reply (struct ptlrpc_request *req)
                  * has finished.  Note that if the ACK does arrive, its
                  * callback wakes us in short order. --eeb */
                 lwi = LWI_TIMEOUT (HZ/4, NULL, NULL);
-                rc = l_wait_event(req->rq_wait_for_rep, !req->rq_want_ack,
+                rc = l_wait_event(req->rq_reply_waitq, !req->rq_want_ack,
                                   &lwi);
                 CDEBUG (D_HA, "Retrying req %p: %d\n", req, rc);
                 /* NB go back and test rq_want_ack with locking, to ensure
@@ -809,9 +1026,16 @@ void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id)
         wait_queue_t commit_wait;
         struct obd_device *obd =
                 req->rq_export ? req->rq_export->exp_obd : NULL;
-        struct obd_export *exp =
-                (req->rq_export && req->rq_ack_locks[0].mode) ?
-                req->rq_export : NULL;
+        struct obd_export *exp = NULL;
+
+        if (req->rq_export) {
+                for (i = 0; i < REQ_MAX_ACK_LOCKS; i++) {
+                        if (req->rq_ack_locks[i].mode) {
+                                exp = req->rq_export;
+                                break;
+                        }
+                }
+        }
 
         if (exp) {
                 exp->exp_outstanding_reply = req;
@@ -834,11 +1058,11 @@ void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id)
         } else {
                 obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED;
                 DEBUG_REQ(D_ERROR, req, "dropping reply");
-                if (!exp && req->rq_repmsg) {
+                if (req->rq_repmsg) {
                         OBD_FREE(req->rq_repmsg, req->rq_replen);
                         req->rq_repmsg = NULL;
                 }
-                init_waitqueue_head(&req->rq_wait_for_rep);
+                init_waitqueue_head(&req->rq_reply_waitq);
                 netrc = 0;
         }
 
@@ -852,7 +1076,7 @@ void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id)
 
         init_waitqueue_entry(&commit_wait, current);
         add_wait_queue(&obd->obd_commit_waitq, &commit_wait);
-        rc = l_wait_event(req->rq_wait_for_rep,
+        rc = l_wait_event(req->rq_reply_waitq,
                           !req->rq_want_ack || req->rq_resent ||
                           req->rq_transno <= obd->obd_last_committed, &lwi);
         remove_wait_queue(&obd->obd_commit_waitq, &commit_wait);
@@ -879,14 +1103,33 @@ void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id)
 
         exp->exp_outstanding_reply = NULL;
 
-        for (ack_lock = req->rq_ack_locks, i = 0; i < 4; i++, ack_lock++) {
+        for (ack_lock = req->rq_ack_locks, i = 0;
+             i < REQ_MAX_ACK_LOCKS; i++, ack_lock++) {
                 if (!ack_lock->mode)
-                        break;
+                        continue;
                 ldlm_lock_decref(&ack_lock->lock, ack_lock->mode);
         }
 }
 
 int target_handle_ping(struct ptlrpc_request *req)
 {
-        return lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
+        return lustre_pack_reply(req, 0, NULL, NULL);
+}
+
+void *ldlm_put_lock_into_req(struct ptlrpc_request *req,
+                                struct lustre_handle *lock, int mode)
+{
+        int i;
+
+        for (i = 0; i < REQ_MAX_ACK_LOCKS; i++) {
+                if (req->rq_ack_locks[i].mode)
+                        continue;
+                memcpy(&req->rq_ack_locks[i].lock, lock, sizeof(*lock));
+                req->rq_ack_locks[i].mode = mode;
+                return &req->rq_ack_locks[i];
+        }
+        CERROR("no space for lock in struct ptlrpc_request\n");
+        LBUG();
+        return NULL;
 }
+