Whamcloud - gitweb
b=13766
[fs/lustre-release.git] / lustre / ldlm / ldlm_lib.c
index c15e088..fe890ca 100644 (file)
@@ -37,6 +37,8 @@
 #include <lustre_dlm.h>
 #include <lustre_net.h>
 #include <lustre_sec.h>
+#include "ldlm_internal.h"
+
 
 /* @priority: if non-zero, move the selected to the list head
  * @create: if zero, only search in existed connections
@@ -178,11 +180,10 @@ out:
 static void destroy_import(struct obd_import *imp)
 {
         /* drop security policy instance after all rpc finished/aborted
-         * to let all busy credentials be released.
-         */
+         * to let all busy contexts be released. */
         class_import_get(imp);
         class_destroy_import(imp);
-        sptlrpc_import_put_sec(imp);
+        sptlrpc_import_sec_put(imp);
         class_import_put(imp);
 }
 
@@ -245,10 +246,8 @@ int client_obd_setup(struct obd_device *obddev, struct lustre_cfg *lcfg)
 
         sema_init(&cli->cl_sem, 1);
         sema_init(&cli->cl_mgc_sem, 1);
-        cli->cl_sec_conf.sfc_rpc_flavor = SPTLRPC_FLVR_NULL;
-        cli->cl_sec_conf.sfc_bulk_csum = BULK_CSUM_ALG_NULL;
-        cli->cl_sec_conf.sfc_bulk_priv = BULK_PRIV_ALG_NULL;
-        cli->cl_sec_conf.sfc_flags = 0;
+        sptlrpc_rule_set_init(&cli->cl_sptlrpc_rset);
+        cli->cl_sec_part = LUSTRE_SP_ANY;
         cli->cl_conn_count = 0;
         memcpy(server_uuid.uuid, lustre_cfg_buf(lcfg, 2),
                min_t(unsigned int, LUSTRE_CFG_BUFLEN(lcfg, 2),
@@ -274,6 +273,8 @@ int client_obd_setup(struct obd_device *obddev, struct lustre_cfg *lcfg)
         spin_lock_init(&cli->cl_write_page_hist.oh_lock);
         spin_lock_init(&cli->cl_read_offset_hist.oh_lock);
         spin_lock_init(&cli->cl_write_offset_hist.oh_lock);
+        cfs_waitq_init(&cli->cl_destroy_waitq);
+        atomic_set(&cli->cl_destroy_in_flight, 0);
 #ifdef ENABLE_CHECKSUM
         cli->cl_checksum = 1;
 #endif
@@ -296,7 +297,7 @@ int client_obd_setup(struct obd_device *obddev, struct lustre_cfg *lcfg)
                 cli->cl_max_rpcs_in_flight = OSC_MAX_RIF_DEFAULT;
         }
 
-        rc = ldlm_get_ref(LDLM_NAMESPACE_CLIENT);
+        rc = ldlm_get_ref();
         if (rc) {
                 CERROR("ldlm_get_ref failed: %d\n", rc);
                 GOTO(err, rc);
@@ -346,7 +347,7 @@ int client_obd_setup(struct obd_device *obddev, struct lustre_cfg *lcfg)
 err_import:
         class_destroy_import(imp);
 err_ldlm:
-        ldlm_put_ref(LDLM_NAMESPACE_CLIENT, 0);
+        ldlm_put_ref(0);
 err:
         RETURN(rc);
 
@@ -355,7 +356,8 @@ err:
 int client_obd_cleanup(struct obd_device *obddev)
 {
         ENTRY;
-        ldlm_put_ref(LDLM_NAMESPACE_CLIENT, obddev->obd_force);
+        sptlrpc_rule_set_free(&obddev->u.cli.cl_sptlrpc_rset);
+        ldlm_put_ref(obddev->obd_force);
         RETURN(0);
 }
 
@@ -369,6 +371,7 @@ int client_connect_import(const struct lu_env *env,
         struct obd_import *imp = cli->cl_import;
         struct obd_export *exp;
         struct obd_connect_data *ocd;
+        struct ldlm_namespace *to_be_freed = NULL;
         int rc;
         ENTRY;
 
@@ -395,11 +398,6 @@ int client_connect_import(const struct lu_env *env,
         if (rc != 0)
                 GOTO(out_ldlm, rc);
 
-        rc = sptlrpc_import_get_sec(imp, NULL, cli->cl_sec_conf.sfc_rpc_flavor,
-                                    cli->cl_sec_conf.sfc_flags);
-        if (rc)
-                GOTO(out_ldlm, rc);
-
         ocd = &imp->imp_connect_data;
         if (data) {
                 *ocd = *data;
@@ -425,7 +423,8 @@ int client_connect_import(const struct lu_env *env,
 
         if (rc) {
 out_ldlm:
-                ldlm_namespace_free(obd->obd_namespace, 0);
+                ldlm_namespace_free_prior(obd->obd_namespace);
+                to_be_freed = obd->obd_namespace;
                 obd->obd_namespace = NULL;
 out_disco:
                 cli->cl_conn_count--;
@@ -435,6 +434,9 @@ out_disco:
         }
 out_sem:
         mutex_up(&cli->cl_sem);
+        if (to_be_freed)
+                ldlm_namespace_free_post(to_be_freed, 1);
+
         return rc;
 }
 
@@ -444,6 +446,7 @@ int client_disconnect_export(struct obd_export *exp)
         struct client_obd *cli;
         struct obd_import *imp;
         int rc = 0, err;
+        struct ldlm_namespace *to_be_freed = NULL;
         ENTRY;
 
         if (!obd) {
@@ -472,7 +475,7 @@ int client_disconnect_export(struct obd_export *exp)
         spin_lock(&imp->imp_lock);
         imp->imp_deactive = 1;
         spin_unlock(&imp->imp_lock);
-        
+
         /* Some non-replayable imports (MDS's OSCs) are pinged, so just
          * delete it regardless.  (It's safe to delete an import that was
          * never added.) */
@@ -483,14 +486,18 @@ int client_disconnect_export(struct obd_export *exp)
                 ldlm_cli_cancel_unused(obd->obd_namespace, NULL,
                                        obd->obd_force ? LDLM_FL_LOCAL_ONLY:0,
                                        NULL);
-                ldlm_namespace_free(obd->obd_namespace, obd->obd_force);
-                obd->obd_namespace = NULL;
+                ldlm_namespace_free_prior(obd->obd_namespace);
+                to_be_freed = obd->obd_namespace;
         }
 
-        if (!obd->obd_force)
-                rc = ptlrpc_disconnect_import(imp, 0);
+        rc = ptlrpc_disconnect_import(imp, 0);
 
         ptlrpc_invalidate_import(imp);
+        /* set obd_namespace to NULL only after invalidate, because we can have
+         * some connect requests in flight, and his need store a connect flags
+         * in obd_namespace. bug 14260 */
+        obd->obd_namespace = NULL;
+
         ptlrpc_free_rq_pool(imp->imp_rq_pool);
         destroy_import(imp);
         cli->cl_import = NULL;
@@ -502,6 +509,9 @@ int client_disconnect_export(struct obd_export *exp)
                 rc = err;
  out_sem:
         mutex_up(&cli->cl_sem);
+        if (to_be_freed)
+                ldlm_namespace_free_post(to_be_freed, obd->obd_force);
+
         RETURN(rc);
 }
 
@@ -565,27 +575,25 @@ int target_handle_connect(struct ptlrpc_request *req)
         struct obd_export *export = NULL;
         struct obd_import *revimp;
         struct lustre_handle conn;
+        struct lustre_handle *tmp;
         struct obd_uuid tgtuuid;
         struct obd_uuid cluuid;
         struct obd_uuid remote_uuid;
-        struct list_head *p;
-        char *str, *tmp;
+        char *str;
         int rc = 0;
         int initial_conn = 0;
-        struct obd_connect_data *data;
-        int size[2] = { sizeof(struct ptlrpc_body), sizeof(*data) };
+        struct obd_connect_data *data, *tmpdata;
         ENTRY;
 
         OBD_RACE(OBD_FAIL_TGT_CONN_RACE);
 
-        LASSERT_REQSWAB(req, REQ_REC_OFF);
-        str = lustre_msg_string(req->rq_reqmsg, REQ_REC_OFF, sizeof(tgtuuid)-1);
+        str = req_capsule_client_get(&req->rq_pill, &RMF_TGTUUID);
         if (str == NULL) {
                 DEBUG_REQ(D_ERROR, req, "bad target UUID for connect");
                 GOTO(out, rc = -EINVAL);
         }
 
-        obd_str2uuid (&tgtuuid, str);
+        obd_str2uuid(&tgtuuid, str);
         target = class_uuid2obd(&tgtuuid);
         if (!target)
                 target = class_name2obd(str);
@@ -601,7 +609,7 @@ int target_handle_connect(struct ptlrpc_request *req)
 
         if (target->obd_no_conn) {
                 LCONSOLE_WARN("%s: temporarily refusing client connection "
-                              "from %s\n", target->obd_name, 
+                              "from %s\n", target->obd_name,
                               libcfs_nid2str(req->rq_peer.nid));
                 GOTO(out, rc = -EAGAIN);
         }
@@ -611,15 +619,14 @@ int target_handle_connect(struct ptlrpc_request *req)
            Really, class_uuid2obd should take the ref. */
         targref = class_incref(target);
 
-        LASSERT_REQSWAB(req, REQ_REC_OFF + 1);
-        str = lustre_msg_string(req->rq_reqmsg, REQ_REC_OFF + 1,
-                                sizeof(cluuid) - 1);
+
+        str = req_capsule_client_get(&req->rq_pill, &RMF_CLUUID);
         if (str == NULL) {
                 DEBUG_REQ(D_ERROR, req, "bad client UUID for connect");
                 GOTO(out, rc = -EINVAL);
         }
 
-        obd_str2uuid (&cluuid, str);
+        obd_str2uuid(&cluuid, str);
 
         /* XXX extract a nettype and format accordingly */
         switch (sizeof(lnet_nid_t)) {
@@ -636,19 +643,17 @@ int target_handle_connect(struct ptlrpc_request *req)
                 LBUG();
         }
 
-        tmp = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2, sizeof conn);
+        tmp = req_capsule_client_get(&req->rq_pill, &RMF_CONN);
         if (tmp == NULL)
                 GOTO(out, rc = -EPROTO);
 
-        memcpy(&conn, tmp, sizeof conn);
-
-        data = lustre_swab_reqbuf(req, REQ_REC_OFF + 3, sizeof(*data),
-                                  lustre_swab_connect);
+        conn = *tmp;
 
+        data = req_capsule_client_get(&req->rq_pill, &RMF_CONNECT_DATA);
         if (!data)
                 GOTO(out, rc = -EPROTO);
 
-        rc = lustre_pack_reply(req, 2, size, NULL);
+        rc = req_capsule_server_pack(&req->rq_pill);
         if (rc)
                 GOTO(out, rc);
 
@@ -669,10 +674,10 @@ int target_handle_connect(struct ptlrpc_request *req)
                                   OBD_OCD_VERSION_MINOR(data->ocd_version),
                                   OBD_OCD_VERSION_PATCH(data->ocd_version),
                                   OBD_OCD_VERSION_FIX(data->ocd_version));
-                        data = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
-                                              offsetof(typeof(*data),
-                                                       ocd_version) +
-                                              sizeof(data->ocd_version));
+                        data = req_capsule_server_sized_get(&req->rq_pill,
+                                                        &RMF_CONNECT_DATA,
+                                    offsetof(typeof(*data), ocd_version) +
+                                             sizeof(data->ocd_version));
                         if (data) {
                                 data->ocd_connect_flags = OBD_CONNECT_VERSION;
                                 data->ocd_version = LUSTRE_VERSION_CODE;
@@ -689,43 +694,35 @@ int target_handle_connect(struct ptlrpc_request *req)
                 goto dont_check_exports;
 
         spin_lock(&target->obd_dev_lock);
-        list_for_each(p, &target->obd_exports) {
-                export = list_entry(p, struct obd_export, exp_obd_chain);
-                if (obd_uuid_equals(&cluuid, &export->exp_client_uuid)) {
-                        if (export->exp_connecting) { /* bug 9635, et. al. */
-                                CWARN("%s: exp %p already connecting\n",
-                                      export->exp_obd->obd_name, export);
-                                export = NULL;
-                                rc = -EALREADY;
-                                break;
-                        }
-
-                        /* make darn sure this is coming from the same peer
-                         * if the UUIDs matched */
-                        if ((export->exp_connection != NULL) &&
-                            (strcmp(libcfs_nid2str(req->rq_peer.nid),
-                                    libcfs_nid2str(export->exp_connection->c_peer.nid)))) {
-                                CWARN("%s: cookie %s seen on new NID %s when "
-                                      "existing NID %s is already connected\n",
-                                      target->obd_name, cluuid.uuid,
-                                      libcfs_nid2str(req->rq_peer.nid),
-                                      libcfs_nid2str(export->exp_connection->c_peer.nid));
-                                export = NULL;
-                                rc = -EALREADY;
-                                break;
-                        }
+        export = lustre_hash_get_object_by_key(target->obd_uuid_hash_body, &cluuid);
 
-                        spin_lock(&export->exp_lock);
-                        export->exp_connecting = 1;
-                        spin_unlock(&export->exp_lock);
-                        spin_unlock(&target->obd_dev_lock);
-                        LASSERT(export->exp_obd == target);
-
-                        rc = target_handle_reconnect(&conn, export, &cluuid,
-                                                     initial_conn);
-                        break;
-                }
+        if (export != NULL && export->exp_connecting) { /* bug 9635, et. al. */
+                CWARN("%s: exp %p already connecting\n",
+                      export->exp_obd->obd_name, export);
+                class_export_put(export);
                 export = NULL;
+                rc = -EALREADY;
+        } else if (export != NULL && export->exp_connection != NULL &&
+                   req->rq_peer.nid != export->exp_connection->c_peer.nid) {
+                /* make darn sure this is coming from the same peer
+                 * if the UUIDs matched */
+                  CWARN("%s: cookie %s seen on new NID %s when "
+                          "existing NID %s is already connected\n",
+                        target->obd_name, cluuid.uuid,
+                  libcfs_nid2str(req->rq_peer.nid),
+                  libcfs_nid2str(export->exp_connection->c_peer.nid));
+                  class_export_put(export);
+                  export = NULL;
+                  rc = -EALREADY;
+        } else if (export != NULL) {
+                spin_lock(&export->exp_lock);
+                export->exp_connecting = 1;
+                spin_unlock(&export->exp_lock);
+                class_export_put(export);
+                spin_unlock(&target->obd_dev_lock);
+                LASSERT(export->exp_obd == target);
+
+                rc = target_handle_reconnect(&conn, export, &cluuid, initial_conn);
         }
 
         /* If we found an export, we already unlocked. */
@@ -755,7 +752,7 @@ int target_handle_connect(struct ptlrpc_request *req)
         } else {
                 OBD_FAIL_TIMEOUT(OBD_FAIL_TGT_DELAY_RECONNECT, 2 * obd_timeout);
                 if (req->rq_export == NULL && initial_conn)
-                       export->exp_last_request_time = 
+                       export->exp_last_request_time =
                                max(export->exp_last_request_time,
                                    (time_t)CURRENT_SECONDS);
         }
@@ -773,7 +770,7 @@ int target_handle_connect(struct ptlrpc_request *req)
         CWARN("%s: connection from %s@%s %st"LPU64" exp %p cur %ld last %ld\n",
                target->obd_name, cluuid.uuid, libcfs_nid2str(req->rq_peer.nid),
               target->obd_recovering ? "recovering/" : "", data->ocd_transno,
-              export, (long)CURRENT_SECONDS, 
+              export, (long)CURRENT_SECONDS,
               export ? (long)export->exp_last_request_time : 0);
 
 
@@ -788,13 +785,16 @@ int target_handle_connect(struct ptlrpc_request *req)
 
         if (export == NULL) {
                 if (target->obd_recovering) {
+                        cfs_time_t t;
+
+                        t = cfs_timer_deadline(&target->obd_recovery_timer);
+                        t = cfs_time_sub(t, cfs_time_current());
                         CERROR("%s: denying connection for new client %s (%s): "
                                "%d clients in recovery for %lds\n",
                                target->obd_name,
                                libcfs_nid2str(req->rq_peer.nid), cluuid.uuid,
                                target->obd_recoverable_clients,
-                               cfs_duration_sec(cfs_time_sub(cfs_timer_deadline(&target->obd_recovery_timer),
-                                                             cfs_time_current())));
+                               cfs_duration_sec(t));
                         rc = -EBUSY;
                 } else {
 dont_check_exports:
@@ -802,17 +802,20 @@ dont_check_exports:
                                          &conn, target, &cluuid, data);
                 }
         } else {
-                rc = obd_reconnect(export, target, &cluuid, data);
+                rc = obd_reconnect(req->rq_svc_thread->t_env,
+                                   export, target, &cluuid, data);
         }
         if (rc)
                 GOTO(out, rc);
         /* Return only the parts of obd_connect_data that we understand, so the
          * client knows that we don't understand the rest. */
-        if (data)
-                memcpy(lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
-                                      sizeof(*data)),
-                       data, sizeof(*data));
-
+        if (data) {
+                 tmpdata = req_capsule_server_get(&req->rq_pill,
+                                                  &RMF_CONNECT_DATA);
+                  //data->ocd_connect_flags &= OBD_CONNECT_SUPPORTED;
+                 *tmpdata = *data;
+        }
+                  
         /* If all else goes well, this is our RPC return code. */
         req->rq_status = 0;
 
@@ -857,7 +860,7 @@ dont_check_exports:
         if (lustre_msg_get_op_flags(req->rq_reqmsg) & MSG_CONNECT_LIBCLIENT) {
                 export->exp_libclient = 1;
                 spin_unlock(&export->exp_lock);
-                
+
                 spin_lock(&target->obd_dev_lock);
                 list_del_init(&export->exp_obd_chain_timed);
                 spin_unlock(&target->obd_dev_lock);
@@ -871,12 +874,20 @@ dont_check_exports:
                                                        req->rq_self,
                                                        &remote_uuid);
 
+        spin_lock(&target->obd_dev_lock);
+        /* Export might be hashed already, e.g. if this is reconnect */
+        if (hlist_unhashed(&export->exp_nid_hash))
+                lustre_hash_additem(export->exp_obd->obd_nid_hash_body,
+                                    &export->exp_connection->c_peer.nid,
+                                    &export->exp_nid_hash);
+        spin_unlock(&target->obd_dev_lock);
+
         spin_lock_bh(&target->obd_processing_task_lock);
         if (target->obd_recovering && !export->exp_in_recovery) {
                 spin_lock(&export->exp_lock);
                 export->exp_in_recovery = 1;
                 export->exp_req_replay_needed = 1;
-                export->exp_lock_replay_needed = 1;                
+                export->exp_lock_replay_needed = 1;
                 spin_unlock(&export->exp_lock);
                 if ((lustre_msg_get_op_flags(req->rq_reqmsg) & MSG_CONNECT_TRANSNO)
                      && data->ocd_transno < target->obd_next_recovery_transno)
@@ -886,19 +897,32 @@ dont_check_exports:
                 target->obd_recoverable_clients++;
                 atomic_inc(&target->obd_req_replay_clients);
                 atomic_inc(&target->obd_lock_replay_clients);
-                if (target->obd_connected_clients == 
+                if (target->obd_connected_clients ==
                     target->obd_max_recoverable_clients)
                         wake_up(&target->obd_next_transno_waitq);
         }
         spin_unlock_bh(&target->obd_processing_task_lock);
-        memcpy(&conn,
-               lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2, sizeof conn),
-               sizeof conn);
+        tmp = req_capsule_client_get(&req->rq_pill, &RMF_CONN);
+        conn = *tmp;
 
         if (export->exp_imp_reverse != NULL) {
                 /* destroyed import can be still referenced in ctxt */
-                obd_set_info_async(export, strlen(KEY_REVIMP_UPD), 
+                obd_set_info_async(export, strlen(KEY_REVIMP_UPD),
                                    KEY_REVIMP_UPD, 0, NULL, NULL);
+
+                /* in some recovery senarios, previous ctx init rpc handled
+                 * in sptlrpc_target_export_check() might be used to install
+                 * a reverse ctx in this reverse import, and later OBD_CONNECT
+                 * using the same gss ctx could reach here and following new
+                 * reverse import. note all reverse ctx in new/old import are
+                 * actually based on the same gss ctx. so we invalidate ctx
+                 * here before destroy import, otherwise flush old import will
+                 * lead to remote reverse ctx be destroied, thus the reverse
+                 * ctx of new import will lost its peer.
+                 * there might be a better way to deal with this???
+                 */
+                sptlrpc_import_inval_all_ctx(export->exp_imp_reverse);
+
                 destroy_import(export->exp_imp_reverse);
         }
 
@@ -922,8 +946,8 @@ dont_check_exports:
                 lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_NEXT_VER);
         }
 
-        rc = sptlrpc_import_get_sec(revimp, req->rq_svc_ctx,
-                                    req->rq_sec_flavor, 0);
+        rc = sptlrpc_import_sec_adapt(revimp, req->rq_svc_ctx,
+                                      req->rq_flvr.sf_rpc);
         if (rc) {
                 CERROR("Failed to get sec for reverse import: %d\n", rc);
                 export->exp_imp_reverse = NULL;
@@ -949,13 +973,13 @@ int target_handle_disconnect(struct ptlrpc_request *req)
         int rc;
         ENTRY;
 
-        rc = lustre_pack_reply(req, 1, NULL, NULL);
+        rc = req_capsule_server_pack(&req->rq_pill);
         if (rc)
                 RETURN(rc);
 
         /* keep the rq_export around so we can send the reply */
         req->rq_status = obd_disconnect(class_export_get(req->rq_export));
-        
+
         RETURN(0);
 }
 
@@ -1085,7 +1109,7 @@ static void abort_lock_replay_queue(struct obd_device *obd)
         list_for_each_entry_safe(req, n, &obd->obd_lock_replay_queue, rq_list){
                 DEBUG_REQ(D_ERROR, req, "aborted:");
                 req->rq_status = -ENOTCONN;
-                if (ptlrpc_error(req)) { 
+                if (ptlrpc_error(req)) {
                         DEBUG_REQ(D_ERROR, req,
                                   "failed abort_lock_reply; skipping");
                 }
@@ -1157,18 +1181,22 @@ void target_cancel_recovery_timer(struct obd_device *obd)
 
 static void reset_recovery_timer(struct obd_device *obd)
 {
+        time_t timeout_shift = OBD_RECOVERY_TIMEOUT;
         spin_lock_bh(&obd->obd_processing_task_lock);
         if (!obd->obd_recovering) {
                 spin_unlock_bh(&obd->obd_processing_task_lock);
                 return;
         }
-        cfs_timer_arm(&obd->obd_recovery_timer,
-                      cfs_time_shift(OBD_RECOVERY_TIMEOUT));
+        if (cfs_time_current_sec() + OBD_RECOVERY_TIMEOUT > 
+            obd->obd_recovery_start + obd->obd_recovery_max_time)
+                timeout_shift = obd->obd_recovery_start + 
+                        obd->obd_recovery_max_time - cfs_time_current_sec();
+        cfs_timer_arm(&obd->obd_recovery_timer, cfs_time_shift(timeout_shift));
         spin_unlock_bh(&obd->obd_processing_task_lock);
         CDEBUG(D_HA, "%s: timer will expire in %u seconds\n", obd->obd_name,
-               OBD_RECOVERY_TIMEOUT);
+               (unsigned int)timeout_shift);
         /* Only used for lprocfs_status */
-        obd->obd_recovery_end = CURRENT_SECONDS + OBD_RECOVERY_TIMEOUT;
+        obd->obd_recovery_end = CURRENT_SECONDS + timeout_shift;
 }
 
 
@@ -1590,7 +1618,7 @@ void target_recovery_init(struct obd_device *obd, svc_handler_t handler)
 {
         if (obd->obd_max_recoverable_clients == 0)
                 return;
-        
+
         CWARN("RECOVERY: service %s, %d recoverable clients, "
               "last_transno "LPU64"\n", obd->obd_name,
               obd->obd_max_recoverable_clients, obd->obd_last_committed);
@@ -1599,6 +1627,8 @@ void target_recovery_init(struct obd_device *obd, svc_handler_t handler)
         obd->obd_recovery_start = CURRENT_SECONDS;
         /* Only used for lprocfs_status */
         obd->obd_recovery_end = obd->obd_recovery_start + OBD_RECOVERY_TIMEOUT;
+        /* bz13079: this should be set to desired value for ost but not for mds */
+        obd->obd_recovery_max_time = OBD_RECOVERY_MAX_TIME;
 }
 EXPORT_SYMBOL(target_recovery_init);
 
@@ -1795,18 +1825,18 @@ int target_pack_pool_reply(struct ptlrpc_request *req)
         struct ldlm_pool *pl;
         ENTRY;
    
-        if (req->rq_export == NULL) {
+        if (!req->rq_export || !req->rq_export->exp_obd ||
+            !req->rq_export->exp_obd->obd_namespace ||
+            !exp_connect_lru_resize(req->rq_export)) {
                 lustre_msg_set_slv(req->rq_repmsg, 0);
                 lustre_msg_set_limit(req->rq_repmsg, 0);
                 RETURN(0);
         }
-        if (!exp_connect_lru_resize(req->rq_export))
-                RETURN(0);
-        
+
         pl = ldlm_exp2pl(req->rq_export);
 
         spin_lock(&pl->pl_lock);
+        LASSERT(ldlm_pool_get_slv(pl) != 0 && ldlm_pool_get_limit(pl) != 0);
         lustre_msg_set_slv(req->rq_repmsg, ldlm_pool_get_slv(pl));
         lustre_msg_set_limit(req->rq_repmsg, ldlm_pool_get_limit(pl));
         spin_unlock(&pl->pl_lock);
@@ -1816,8 +1846,7 @@ int target_pack_pool_reply(struct ptlrpc_request *req)
 
 int target_send_reply_msg(struct ptlrpc_request *req, int rc, int fail_id)
 {
-        if (OBD_FAIL_CHECK(fail_id | OBD_FAIL_ONCE)) {
-                obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED;
+        if (OBD_FAIL_CHECK_ORSET(fail_id & ~OBD_FAIL_ONCE, OBD_FAIL_ONCE)) {
                 DEBUG_REQ(D_ERROR, req, "dropping reply");
                 return (-ECOMM);
         }
@@ -1830,7 +1859,6 @@ int target_send_reply_msg(struct ptlrpc_request *req, int rc, int fail_id)
                 DEBUG_REQ(D_NET, req, "sending reply");
         }
 
-        target_pack_pool_reply(req);
         return (ptlrpc_send_reply(req, 1));
 }
 
@@ -1925,7 +1953,7 @@ void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id)
 int target_handle_ping(struct ptlrpc_request *req)
 {
         obd_ping(req->rq_export);
-        return lustre_pack_reply(req, 1, NULL, NULL);
+        return req_capsule_server_pack(&req->rq_pill);
 }
 
 void target_committed_to_req(struct ptlrpc_request *req)
@@ -1957,12 +1985,9 @@ int target_handle_qc_callback(struct ptlrpc_request *req)
         struct obd_quotactl *oqctl;
         struct client_obd *cli = &req->rq_export->exp_obd->u.cli;
 
-        oqctl = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*oqctl),
-                                   lustre_swab_obd_quotactl);
-        if (oqctl == NULL) {
-                CERROR("Can't unpack obd_quotactl\n");
+        oqctl = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
+        if (oqctl == NULL)
                 RETURN(-EPROTO);
-        }
 
         cli->cl_qchk_stat = oqctl->qc_stat;
 
@@ -1979,40 +2004,38 @@ int target_handle_dqacq_callback(struct ptlrpc_request *req)
         void* rep;
         struct qunit_data_old *qdata_old;
         int rc = 0;
-        int repsize[2] = { sizeof(struct ptlrpc_body),
-                           sizeof(struct qunit_data) };
         ENTRY;
 
-        rc = lustre_pack_reply(req, 2, repsize, NULL);
+        rc = req_capsule_server_pack(&req->rq_pill);
         if (rc) {
                 CERROR("packing reply failed!: rc = %d\n", rc);
                 RETURN(rc);
         }
+
         LASSERT(req->rq_export);
 
         /* fixed for bug10707 */
         if ((req->rq_export->exp_connect_flags & OBD_CONNECT_QUOTA64) &&
             !OBD_FAIL_CHECK(OBD_FAIL_QUOTA_QD_COUNT_32BIT)) {
                 CDEBUG(D_QUOTA, "qd_count is 64bit!\n");
-                rep = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
-                                     sizeof(struct qunit_data));
+                rep = req_capsule_server_get(&req->rq_pill,
+                                             &RMF_QUNIT_DATA);
                 LASSERT(rep);
-                qdata = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*qdata),
-                                           lustre_swab_qdata);
+                qdata = req_capsule_client_swab_get(&req->rq_pill,
+                                                    &RMF_QUNIT_DATA,
+                                          (void*)lustre_swab_qdata);
         } else {
                 CDEBUG(D_QUOTA, "qd_count is 32bit!\n");
-                rep = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
-                                     sizeof(struct qunit_data_old));
+                rep = req_capsule_server_get(&req->rq_pill, &RMF_QUNIT_DATA);
                 LASSERT(rep);
-                qdata_old = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*qdata_old),
-                                               lustre_swab_qdata_old);
+                qdata_old = req_capsule_client_swab_get(&req->rq_pill,
+                                                        &RMF_QUNIT_DATA,
+                                           (void*)lustre_swab_qdata_old);
                 qdata = lustre_quota_old_to_new(qdata_old);
         }
 
-        if (qdata == NULL) {
-                CERROR("Can't unpack qunit_data\n");
+        if (qdata == NULL)
                 RETURN(-EPROTO);
-        }
 
         /* we use the observer */
         LASSERT(obd->obd_observer && obd->obd_observer->obd_observer);
@@ -2029,10 +2052,10 @@ int target_handle_dqacq_callback(struct ptlrpc_request *req)
         /* the qd_count might be changed in lqc_handler */
         if ((req->rq_export->exp_connect_flags & OBD_CONNECT_QUOTA64) &&
             !OBD_FAIL_CHECK(OBD_FAIL_QUOTA_QD_COUNT_32BIT)) {
-                memcpy(rep,qdata,sizeof(*qdata));
+                memcpy(rep, qdata, sizeof(*qdata));
         } else {
                 qdata_old = lustre_quota_new_to_old(qdata);
-                memcpy(rep,qdata_old,sizeof(*qdata_old));
+                memcpy(rep, qdata_old, sizeof(*qdata_old));
         }
         req->rq_status = rc;
         rc = ptlrpc_reply(req);