Whamcloud - gitweb
b=18948
authorpanda <panda>
Tue, 20 Oct 2009 13:39:19 +0000 (13:39 +0000)
committerpanda <panda>
Tue, 20 Oct 2009 13:39:19 +0000 (13:39 +0000)
a=Mike Pershin
i=Nathan Rutman
i=Zhanghc

 use request refcount instead of copying

lustre/include/liblustre.h
lustre/include/lustre_net.h
lustre/ldlm/ldlm_lib.c
lustre/obdfilter/filter.c
lustre/ptlrpc/client.c
lustre/ptlrpc/events.c
lustre/ptlrpc/import.c
lustre/ptlrpc/pinger.c
lustre/ptlrpc/service.c

index 8571201..fc820e3 100644 (file)
@@ -737,6 +737,8 @@ typedef struct { volatile int counter; } atomic_t;
 #define atomic_sub(b,a)  do {(a)->counter -= b;} while (0)
 #define atomic_sub_return(n,a) ((a)->counter -= n)
 #define atomic_dec_return(a)  atomic_sub_return(1,a)
+#define atomic_add_unless(v, a, u) ((v)->counter != u ? (v)->counter += a : 0)
+#define atomic_inc_not_zero(v) atomic_add_unless((v), 1, 0)
 
 #ifndef likely
 #define likely(exp) (exp)
index 067beac..67d5ca6 100644 (file)
@@ -321,15 +321,12 @@ struct ptlrpc_request {
                 rq_no_delay:1, rq_net_err:1, rq_early:1, rq_must_unlink:1,
                 /* server-side flags */
                 rq_packed_final:1,  /* packed final reply */
-                rq_sent_final:1,    /* stop sending early replies */
                 rq_hp:1,            /* high priority RPC */
                 rq_at_linked:1,     /* link into service's srv_at_array */
                 rq_reply_truncate:1, /* reply is truncated */
                 rq_fake:1,          /* fake request - just for timeout only */
-                /* a copy of the request is queued to replay during recovery */
-                rq_copy_queued:1,
-                /* whether the rquest is a copy of another replay request */
-                rq_copy:1;
+                /* the request is queued to replay during recovery */
+                rq_copy_queued:1;
         enum rq_phase rq_phase;     /* one of RQ_PHASE_* */
         enum rq_phase rq_next_phase; /* one of RQ_PHASE_* to be used next */
         atomic_t rq_refcount;   /* client-side refcount for SENT race,
@@ -910,6 +907,7 @@ int liblustre_check_services (void *arg);
 void ptlrpc_daemonize(char *name);
 int ptlrpc_service_health_check(struct ptlrpc_service *);
 void ptlrpc_hpreq_reorder(struct ptlrpc_request *req);
+void ptlrpc_server_drop_request(struct ptlrpc_request *req);
 
 
 struct ptlrpc_svc_data {
@@ -1157,9 +1155,11 @@ int ptlrpc_obd_ping(struct obd_device *obd);
 #ifdef __KERNEL__
 void ping_evictor_start(void);
 void ping_evictor_stop(void);
+int ping_evictor_wake(struct obd_export *exp);
 #else
 #define ping_evictor_start()    do {} while (0)
 #define ping_evictor_stop()     do {} while (0)
+#define ping_evictor_wake(exp)  1
 #endif
 
 /* ptlrpc/ptlrpcd.c */
index 0faab9c..9d96387 100644 (file)
@@ -649,6 +649,7 @@ int target_recovery_check_and_stop(struct obd_device *obd)
         }
         /* always check versions now */
         obd->obd_version_recov = 1;
+        cfs_waitq_signal(&obd->obd_next_transno_waitq);
         spin_unlock_bh(&obd->obd_processing_task_lock);
         /* reset timer, recovery will proceed with versions now */
         reset_recovery_timer(obd, OBD_RECOVERY_TIME_SOFT, 1);
@@ -1165,12 +1166,26 @@ static void target_exp_dequeue_req_replay(struct ptlrpc_request *req)
         spin_unlock(&req->rq_export->exp_lock);
 }
 
-static void target_release_saved_req(struct ptlrpc_request *req)
+static void target_request_copy_get(struct ptlrpc_request *req)
+{
+        /* mark that request is in recovery queue, so request handler will not
+         * drop rpc count in export, bug 19870*/
+        LASSERT(!req->rq_copy_queued);
+        req->rq_copy_queued = 1;
+        /* increase refcount to keep request in queue */
+        atomic_inc(&req->rq_refcount);
+}
+
+static void target_request_copy_put(struct ptlrpc_request *req)
 {
-        ptlrpc_req_drop_rs(req);
-        class_export_put(req->rq_export);
-        OBD_FREE(req->rq_reqmsg, req->rq_reqlen);
-        OBD_FREE(req, sizeof *req);
+        LASSERTF(list_empty(&req->rq_replay_list), "next: %p, prev: %p\n",
+                 req->rq_replay_list.next, req->rq_replay_list.prev);
+        /* class_export_rpc_get was done before handling request,
+         * drop it early to allow new requests, see bug 19870.
+         */
+        LASSERT(req->rq_copy_queued);
+        class_export_rpc_put(req->rq_export);
+        ptlrpc_server_drop_request(req);
 }
 
 static void target_send_delayed_replies(struct obd_device *obd)
@@ -1195,7 +1210,7 @@ static void target_send_delayed_replies(struct obd_device *obd)
                 list_del_init(&req->rq_list);
                 DEBUG_REQ(D_HA, req, "delayed:");
                 ptlrpc_reply(req);
-                target_release_saved_req(req);
+                target_request_copy_put(req);
         }
         obd->obd_recovery_end = cfs_time_current_sec();
 }
@@ -1249,10 +1264,7 @@ static void abort_recovery_queue(struct obd_device *obd)
                 else
                         DEBUG_REQ(D_ERROR, req,
                                   "packing failed for abort-reply; skipping");
-
-                LASSERT(req->rq_copy);
-                class_export_rpc_put(req->rq_export);
-                target_release_saved_req(req);
+                target_request_copy_put(req);
         }
 }
 
@@ -1287,7 +1299,7 @@ void target_cleanup_recovery(struct obd_device *obd)
         list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) {
                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
                 list_del(&req->rq_list);
-                target_release_saved_req(req);
+                target_request_copy_put(req);
         }
 
         CFS_INIT_LIST_HEAD(&clean_list);
@@ -1298,10 +1310,7 @@ void target_cleanup_recovery(struct obd_device *obd)
                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
                 target_exp_dequeue_req_replay(req);
                 list_del_init(&req->rq_list);
-
-                LASSERT(req->rq_copy);
-                class_export_rpc_put(req->rq_export);
-                target_release_saved_req(req);
+                target_request_copy_put(req);
         }
         EXIT;
 }
@@ -1361,6 +1370,21 @@ static void target_recovery_expired(unsigned long castmeharder)
         obd->obd_abort_recovery = 1;
         cfs_waitq_signal(&obd->obd_next_transno_waitq);
         spin_unlock_bh(&obd->obd_processing_task_lock);
+
+        /* bug 18948:
+         * The recovery timer expired and target_check_and_stop_recovery()
+         * must be called.  We cannot call it directly because we are in
+         * interrupt context, so we need to wake up another thread to call it.
+         * This may happen if there are obd->obd_next_transno_waitq waiters,
+         * or if we happen to handle a connect request.  However, we cannot
+         * count on either of those things so we wake up the ping evictor
+         * and leverage it's context to complete recovery.
+         *
+         * Note: HEAD has a separate recovery thread and handle this.
+         */
+        spin_lock(&obd->obd_dev_lock);
+        ping_evictor_wake(obd->obd_self_export);
+        spin_unlock(&obd->obd_dev_lock);
 }
 
 /* obd_processing_task_lock should be held */
@@ -1553,14 +1577,7 @@ static void process_recovery_queue(struct obd_device *obd)
                 obd->obd_next_recovery_transno++;
                 spin_unlock_bh(&obd->obd_processing_task_lock);
                 target_exp_dequeue_req_replay(req);
-
-                LASSERT(req->rq_copy);
-                class_export_rpc_put(req->rq_export);
-
-                class_export_put(req->rq_export);
-                ptlrpc_req_drop_rs(req);
-                OBD_FREE(req->rq_reqmsg, req->rq_reqlen);
-                OBD_FREE(req, sizeof *req);
+                target_request_copy_put(req);
                 OBD_RACE(OBD_FAIL_TGT_REPLAY_DELAY);
                 spin_lock_bh(&obd->obd_processing_task_lock);
                 if (list_empty(&obd->obd_recovery_queue)) {
@@ -1580,10 +1597,7 @@ int target_queue_recovery_request(struct ptlrpc_request *req,
         struct list_head *tmp;
         int inserted = 0;
         __u64 transno = lustre_msg_get_transno(req->rq_reqmsg);
-        struct ptlrpc_request *saved_req, *orig_req;
-        struct lustre_msg *reqmsg;
-        int rc = 0;
-
+        ENTRY;
         /* CAVEAT EMPTOR: The incoming request message has been swabbed
          * (i.e. buflens etc are in my own byte order), but type-dependent
          * buffers (eg mds_body, ost_body etc) have NOT been swabbed. */
@@ -1591,20 +1605,10 @@ int target_queue_recovery_request(struct ptlrpc_request *req,
         if (!transno) {
                 CFS_INIT_LIST_HEAD(&req->rq_list);
                 DEBUG_REQ(D_HA, req, "not queueing");
-                return 1;
+                RETURN(1);
         }
 
-        /* XXX If I were a real man, these LBUGs would be sane cleanups. */
-        /* XXX just like the request-dup code in queue_final_reply */
-        OBD_ALLOC(saved_req, sizeof *saved_req);
-        if (!saved_req)
-                LBUG();
-        OBD_ALLOC(reqmsg, req->rq_reqlen);
-        if (!reqmsg)
-                LBUG();
-
         spin_lock_bh(&obd->obd_processing_task_lock);
-
         /* If we're processing the queue, we want don't want to queue this
          * message.
          *
@@ -1620,27 +1624,18 @@ int target_queue_recovery_request(struct ptlrpc_request *req,
                 /* Processing the queue right now, don't re-add. */
                 LASSERT(list_empty(&req->rq_list));
                 spin_unlock_bh(&obd->obd_processing_task_lock);
-                GOTO(err_free, rc = 1);
+                RETURN(1);
         }
 
         if (unlikely(OBD_FAIL_CHECK(OBD_FAIL_TGT_REPLAY_DROP))) {
                 spin_unlock_bh(&obd->obd_processing_task_lock);
-                GOTO(err_free, rc = 0);
+                RETURN(0);
         }
 
-        memcpy(saved_req, req, sizeof *req);
-        memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen);
-        orig_req = req;
-        req = saved_req;
-        req->rq_reqmsg = reqmsg;
-        class_export_get(req->rq_export);
-        CFS_INIT_LIST_HEAD(&req->rq_list);
-        CFS_INIT_LIST_HEAD(&req->rq_replay_list);
-
         if (target_exp_enqueue_req_replay(req)) {
                 spin_unlock_bh(&obd->obd_processing_task_lock);
                 DEBUG_REQ(D_ERROR, req, "dropping resent queued req");
-                GOTO(err_exp, rc = 0);
+                RETURN(0);
         }
 
         /* XXX O(n^2) */
@@ -1660,7 +1655,7 @@ int target_queue_recovery_request(struct ptlrpc_request *req,
                         DEBUG_REQ(D_ERROR, req, "dropping replay: transno "
                                   "has been claimed by another client");
                         target_exp_dequeue_req_replay(req);
-                        GOTO(err_exp, rc = 0);
+                        RETURN(0);
                 }
         }
 
@@ -1668,9 +1663,8 @@ int target_queue_recovery_request(struct ptlrpc_request *req,
                 list_add_tail(&req->rq_list, &obd->obd_recovery_queue);
         }
 
+        target_request_copy_get(req);
         obd->obd_requests_queued_for_recovery++;
-        orig_req->rq_copy_queued = 1;
-        req->rq_copy = 1;
 
         if (obd->obd_processing_task != 0) {
                 /* Someone else is processing this queue, we'll leave it to
@@ -1678,7 +1672,7 @@ int target_queue_recovery_request(struct ptlrpc_request *req,
                  */
                 cfs_waitq_signal(&obd->obd_next_transno_waitq);
                 spin_unlock_bh(&obd->obd_processing_task_lock);
-                return 0;
+                RETURN(0);
         }
 
         /* Nobody is processing, and we know there's (at least) one to process
@@ -1690,14 +1684,7 @@ int target_queue_recovery_request(struct ptlrpc_request *req,
         spin_unlock_bh(&obd->obd_processing_task_lock);
 
         process_recovery_queue(obd);
-        return 0;
-
-err_exp:
-        class_export_put(req->rq_export);
-err_free:
-        OBD_FREE(reqmsg, req->rq_reqlen);
-        OBD_FREE(saved_req, sizeof(*saved_req));
-        return rc;
+        RETURN(0);
 }
 
 struct obd_device * target_req2obd(struct ptlrpc_request *req)
@@ -1708,8 +1695,6 @@ struct obd_device * target_req2obd(struct ptlrpc_request *req)
 int target_queue_last_replay_reply(struct ptlrpc_request *req, int rc)
 {
         struct obd_device *obd = target_req2obd(req);
-        struct ptlrpc_request *saved_req;
-        struct lustre_msg *reqmsg;
         struct obd_export *exp = req->rq_export;
         int recovery_done = 0, delayed_done = 0;
 
@@ -1725,17 +1710,6 @@ int target_queue_last_replay_reply(struct ptlrpc_request *req, int rc)
 
         LASSERT(!req->rq_reply_state->rs_difficult);
         LASSERT(list_empty(&req->rq_list));
-        /* XXX a bit like the request-dup code in queue_recovery_request */
-        OBD_ALLOC(saved_req, sizeof *saved_req);
-        if (!saved_req)
-                return -ENOMEM;
-        OBD_ALLOC(reqmsg, req->rq_reqlen);
-        if (!reqmsg) {
-                OBD_FREE(saved_req, sizeof *req);
-                return -ENOMEM;
-        }
-        *saved_req = *req;
-        memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen);
 
         /* Don't race cleanup */
         spin_lock_bh(&obd->obd_processing_task_lock);
@@ -1745,12 +1719,7 @@ int target_queue_last_replay_reply(struct ptlrpc_request *req, int rc)
         }
 
         if (!exp->exp_vbr_failed) {
-                ptlrpc_rs_addref(req->rq_reply_state);  /* +1 ref for saved reply */
-                req = saved_req;
-                req->rq_reqmsg = reqmsg;
-                CFS_INIT_LIST_HEAD(&req->rq_list);
-                CFS_INIT_LIST_HEAD(&req->rq_replay_list);
-                class_export_get(exp);
+                target_request_copy_get(req);
                 list_add(&req->rq_list, &obd->obd_delayed_reply_queue);
         }
 
@@ -1812,8 +1781,6 @@ int target_queue_last_replay_reply(struct ptlrpc_request *req, int rc)
                 CWARN("%s: disconnect export %s\n", obd->obd_name,
                       exp->exp_client_uuid.uuid);
                 class_fail_export(exp);
-                OBD_FREE(reqmsg, req->rq_reqlen);
-                OBD_FREE(saved_req, sizeof *req);
                 req->rq_status = 0;
                 ptlrpc_send_reply(req, 0);
         }
@@ -1821,8 +1788,6 @@ int target_queue_last_replay_reply(struct ptlrpc_request *req, int rc)
         return 1;
 
 out_noconn:
-        OBD_FREE(reqmsg, req->rq_reqlen);
-        OBD_FREE(saved_req, sizeof *req);
         req->rq_status = -ENOTCONN;
         /* rv is ignored anyhow */
         return -ENOTCONN;
index 01fc2a2..56a40fb 100644 (file)
@@ -2428,6 +2428,9 @@ static int filter_connect(struct lustre_handle *conn, struct obd_device *obd,
         if (conn == NULL || obd == NULL || cluuid == NULL)
                 RETURN(-EINVAL);
 
+        /* Check for aborted recovery. */
+        target_recovery_check_and_stop(obd);
+
         rc = class_connect(conn, obd, cluuid);
         if (rc)
                 RETURN(rc);
index 95fadeb..6722ba4 100644 (file)
@@ -199,16 +199,15 @@ void ptlrpc_at_set_req_timeout(struct ptlrpc_request *req)
         if (AT_OFF) {
                 /* non-AT settings */
                 req->rq_timeout = req->rq_import->imp_server_timeout ?
-                        obd_timeout / 2 : obd_timeout;
-                lustre_msg_set_timeout(req->rq_reqmsg, req->rq_timeout);
-                return;
+                                  obd_timeout / 2 : obd_timeout;
+        } else {
+                at = &req->rq_import->imp_at;
+                idx = import_at_get_index(req->rq_import,
+                                          req->rq_request_portal);
+                serv_est = at_get(&at->iat_service_estimate[idx]);
+                req->rq_timeout = at_est2timeout(serv_est);
         }
 
-        at = &req->rq_import->imp_at;
-        idx = import_at_get_index(req->rq_import,
-                                  req->rq_request_portal);
-        serv_est = at_get(&at->iat_service_estimate[idx]);
-        req->rq_timeout = at_est2timeout(serv_est);
         /* We could get even fancier here, using history to predict increased
            loading... */
 
@@ -225,11 +224,6 @@ static void ptlrpc_at_adj_service(struct ptlrpc_request *req,
         unsigned int oldse;
         struct imp_at *at;
 
-        /* do estimate only if is not in recovery */
-        if ((req->rq_send_state != LUSTRE_IMP_FULL) &&
-             (req->rq_send_state != LUSTRE_IMP_CONNECTING))
-                return;
-
         LASSERT(req->rq_import);
         at = &req->rq_import->imp_at;
 
@@ -401,7 +395,6 @@ static int ptlrpc_at_recv_early_reply(struct ptlrpc_request *req) {
         /* Expecting to increase the service time estimate here */
         ptlrpc_at_adj_service(req, lustre_msg_get_timeout(msg));
         ptlrpc_at_adj_net_latency(req, lustre_msg_get_service_time(msg));
-
         /* Adjust the local timeout for this req */
         ptlrpc_at_set_req_timeout(req);
 
@@ -409,7 +402,7 @@ static int ptlrpc_at_recv_early_reply(struct ptlrpc_request *req) {
         /* Server assumes it now has rq_timeout from when it sent the
            early reply, so client should give it at least that long. */
         req->rq_deadline = cfs_time_current_sec() + req->rq_timeout +
-                    ptlrpc_at_get_net_latency(req);
+                           ptlrpc_at_get_net_latency(req);
 
         DEBUG_REQ(D_ADAPTTO, req,
                   "Early reply #%d, new deadline in %lds (%+lds)",
index 3853b09..e664265 100644 (file)
@@ -279,7 +279,12 @@ void request_in_callback(lnet_event_t *ev)
         req->rq_uid = ev->uid;
 #endif
         spin_lock_init(&req->rq_lock);
+        CFS_INIT_LIST_HEAD(&req->rq_list);
         CFS_INIT_LIST_HEAD(&req->rq_timed_list);
+        CFS_INIT_LIST_HEAD(&req->rq_replay_list);
+        CFS_INIT_LIST_HEAD(&req->rq_set_chain);
+        CFS_INIT_LIST_HEAD(&req->rq_history_list);
+        CFS_INIT_LIST_HEAD(&req->rq_exp_list);
         atomic_set(&req->rq_refcount, 1);
         if (ev->type == LNET_EVENT_PUT)
                 DEBUG_REQ(D_RPCTRACE, req, "incoming req");
index c4d4222..d3b6a87 100644 (file)
@@ -1216,9 +1216,11 @@ static int signal_completed_replay(struct obd_import *imp)
 
         if (imp->imp_delayed_recovery)
                 lustre_msg_add_flags(req->rq_reqmsg, MSG_DELAY_REPLAY);
-        req->rq_timeout *= 3;
         req->rq_interpret_reply = completed_replay_interpret;
 
+        if (AT_OFF)
+                req->rq_timeout *= 3;
+
         ptlrpcd_add_req(req);
         RETURN(0);
 }
index a69e4e8..9ad3ca7 100644 (file)
@@ -602,6 +602,11 @@ static int ping_evictor_main(void *arg)
                 obd = pet_exp->exp_obd;
                 spin_unlock(&pet_lock);
 
+                /* bug 18948: ensure recovery is aborted in a timely fashion */
+                if (target_recovery_check_and_stop(obd) ||
+                    obd->obd_recovering /* no evictor during recovery */)
+                       GOTO(skip, 0);
+
                 expire_time = cfs_time_current_sec() - PING_EVICT_TIMEOUT;
 
                 CDEBUG(D_HA, "evicting all exports of obd %s older than %ld\n",
@@ -637,7 +642,7 @@ static int ping_evictor_main(void *arg)
                         }
                 }
                 spin_unlock(&obd->obd_dev_lock);
-
+skip:
                 class_export_put(pet_exp);
 
                 spin_lock(&pet_lock);
index ce6482d..f1c21a2 100644 (file)
@@ -428,7 +428,7 @@ static void ptlrpc_server_free_request(struct ptlrpc_request *req)
  * drop a reference count of the request. if it reaches 0, we either
  * put it into history list, or free it immediately.
  */
-static void ptlrpc_server_drop_request(struct ptlrpc_request *req)
+void ptlrpc_server_drop_request(struct ptlrpc_request *req)
 {
         struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd;
         struct ptlrpc_service             *svc = rqbd->rqbd_service;
@@ -439,6 +439,24 @@ static void ptlrpc_server_drop_request(struct ptlrpc_request *req)
         if (!atomic_dec_and_test(&req->rq_refcount))
                 return;
 
+        spin_lock(&svc->srv_at_lock);
+        list_del_init(&req->rq_timed_list);
+        if (req->rq_at_linked) {
+                struct ptlrpc_at_array *array = &svc->srv_at_array;
+                __u32 index = req->rq_at_index;
+
+                req->rq_at_linked = 0;
+                array->paa_reqs_count[index]--;
+                array->paa_count--;
+        }
+        spin_unlock(&svc->srv_at_lock);
+
+        /* finalize request */
+        if (req->rq_export) {
+                class_export_put(req->rq_export);
+                req->rq_export = NULL;
+        }
+
         spin_lock(&svc->srv_lock);
 
         svc->srv_n_active_reqs--;
@@ -512,29 +530,6 @@ static void ptlrpc_server_drop_request(struct ptlrpc_request *req)
  */
 static void ptlrpc_server_finish_request(struct ptlrpc_request *req)
 {
-        struct ptlrpc_service  *svc = req->rq_rqbd->rqbd_service;
-
-        if (req->rq_export) {
-                class_export_put(req->rq_export);
-                req->rq_export = NULL;
-        }
-
-        if (req->rq_phase != RQ_PHASE_NEW) /* incorrect message magic */
-                DEBUG_REQ(D_INFO, req, "free req");
-
-        spin_lock(&svc->srv_at_lock);
-        req->rq_sent_final = 1;
-        list_del_init(&req->rq_timed_list);
-        if (req->rq_at_linked) {
-                struct ptlrpc_at_array *array = &svc->srv_at_array;
-                __u32 index = req->rq_at_index;
-
-                req->rq_at_linked = 0;
-                array->paa_reqs_count[index]--;
-                array->paa_count--;
-        }
-        spin_unlock(&svc->srv_at_lock);
-
         ptlrpc_server_drop_request(req);
 }
 
@@ -681,12 +676,6 @@ static int ptlrpc_at_add_timed(struct ptlrpc_request *req)
                 return(-ENOSYS);
 
         spin_lock(&svc->srv_at_lock);
-
-        if (unlikely(req->rq_sent_final)) {
-                spin_unlock(&svc->srv_at_lock);
-                return 0;
-        }
-
         LASSERT(list_empty(&req->rq_timed_list));
 
         index = (unsigned long)req->rq_deadline % array->paa_size;
@@ -702,7 +691,7 @@ static int ptlrpc_at_add_timed(struct ptlrpc_request *req)
                         }
                 }
         }
-        
+
         /* Add the request at the head of the list */
         if (list_empty(&req->rq_timed_list))
                 list_add(&req->rq_timed_list, &array->paa_reqs_array[index]);
@@ -723,8 +712,7 @@ static int ptlrpc_at_add_timed(struct ptlrpc_request *req)
         return 0;
 }
 
-static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req,
-                                      int extra_time)
+static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req)
 {
         struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service;
         struct ptlrpc_request *reqcopy;
@@ -740,7 +728,7 @@ static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req,
                   "%ssending early reply (deadline %+lds, margin %+lds) for "
                   "%d+%d", AT_OFF ? "AT off - not " : "",
                   olddl, olddl - at_get(&svc->srv_at_estimate),
-                  at_get(&svc->srv_at_estimate), extra_time);
+                  at_get(&svc->srv_at_estimate), at_extra);
 
         if (AT_OFF)
                 RETURN(0);
@@ -760,22 +748,20 @@ static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req,
                 RETURN(-ENOSYS);
         }
 
-        if (req->rq_export && req->rq_export->exp_in_recovery) {
-                /* don't increase server estimates during recovery, and give
-                   clients the full recovery time. */
-                newdl = cfs_time_current_sec() +
-                        req->rq_export->exp_obd->obd_recovery_timeout;
+        if (req->rq_export &&
+            lustre_msg_get_flags(req->rq_reqmsg) &
+            (MSG_REPLAY | MSG_LAST_REPLAY)) {
+                /* Use at_extra as early reply period for recovery requests
+                 * but keep it not longer than recovery time / 4 */
+                at_add(&svc->srv_at_estimate,
+                       min(at_extra,
+                           req->rq_export->exp_obd->obd_recovery_timeout / 4));
         } else {
-                if (extra_time) {
-                        /* Fake our processing time into the future to ask the
-                           clients for some extra amount of time */
-                        extra_time += cfs_time_current_sec() -
-                                      req->rq_arrival_time.tv_sec;
-                        at_add(&svc->srv_at_estimate, extra_time);
-                }
-                newdl = req->rq_arrival_time.tv_sec +
-                        at_get(&svc->srv_at_estimate);
+                /* Fake our processing time into the future to ask the
+                 * clients for some extra amount of time */
+                at_add(&svc->srv_at_estimate, at_extra);
         }
+        newdl = cfs_time_current_sec() + at_get(&svc->srv_at_estimate);
         if (req->rq_deadline >= newdl) {
                 /* We're not adding any time, no need to send an early reply
                    (e.g. maybe at adaptive_max) */
@@ -801,10 +787,12 @@ static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req,
         reqcopy->rq_reqmsg = reqmsg;
         memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen);
 
-        if (req->rq_sent_final) {
+        LASSERT(atomic_read(&req->rq_refcount));
+        /** if it is last refcount then early reply isn't needed */
+        if (atomic_read(&req->rq_refcount) == 1) {
                 DEBUG_REQ(D_ADAPTTO, reqcopy, "Normal reply already sent out, "
                           "abort sending early reply\n");
-                GOTO(out, rc = 0);
+                GOTO(out, rc = -EINVAL);
         }
 
         /* Connection ref */
@@ -893,7 +881,14 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service *svc)
                 list_for_each_entry_safe(rq, n, &array->paa_reqs_array[index],
                                          rq_timed_list) {
                         if (rq->rq_deadline <= now + at_early_margin) {
-                                list_move(&rq->rq_timed_list, &work_list);
+                                list_del(&rq->rq_timed_list);
+                                /**
+                                 * ptlrpc_server_drop_request() may drop
+                                 * refcount to 0 already. Let's check this and
+                                 * don't add entry to work_list
+                                 */
+                                if (likely(atomic_inc_not_zero(&rq->rq_refcount)))
+                                        list_add(&rq->rq_timed_list, &work_list);
                                 counter++;
                                 array->paa_reqs_count[index]--;
                                 array->paa_count--;
@@ -931,25 +926,18 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service *svc)
                       at_get(&svc->srv_at_estimate), delay);
         }
 
-        /* ptlrpc_server_finish_request may delete an entry out of
-         * the work list */
-        spin_lock(&svc->srv_at_lock);
+        /* we took additional refcount so entries can't be deleted from list, no
+         * locking is needed */
         while (!list_empty(&work_list)) {
                 rq = list_entry(work_list.next, struct ptlrpc_request,
                                 rq_timed_list);
                 list_del_init(&rq->rq_timed_list);
-                /* if the entry is still in the worklist, it hasn't been
-                   deleted, and is safe to take a ref to keep the req around */
-                atomic_inc(&rq->rq_refcount);
-                spin_unlock(&svc->srv_at_lock);
 
-                if (ptlrpc_at_send_early_reply(rq, at_extra) == 0)
+                if (ptlrpc_at_send_early_reply(rq) == 0)
                         ptlrpc_at_add_timed(rq);
 
                 ptlrpc_server_drop_request(rq);
-                spin_lock(&svc->srv_at_lock);
         }
-        spin_unlock(&svc->srv_at_lock);
 
         RETURN(0);
 }