From 2fff516c7d5ab9932b537ba42472aa977462c1e0 Mon Sep 17 00:00:00 2001 From: panda Date: Tue, 20 Oct 2009 13:39:19 +0000 Subject: [PATCH] b=18948 a=Mike Pershin i=Nathan Rutman i=Zhanghc use request refcount instead of copying --- lustre/include/liblustre.h | 2 + lustre/include/lustre_net.h | 10 ++-- lustre/ldlm/ldlm_lib.c | 135 ++++++++++++++++---------------------------- lustre/obdfilter/filter.c | 3 + lustre/ptlrpc/client.c | 23 +++----- lustre/ptlrpc/events.c | 5 ++ lustre/ptlrpc/import.c | 4 +- lustre/ptlrpc/pinger.c | 7 ++- lustre/ptlrpc/service.c | 110 ++++++++++++++++-------------------- 9 files changed, 131 insertions(+), 168 deletions(-) diff --git a/lustre/include/liblustre.h b/lustre/include/liblustre.h index 8571201..fc820e3 100644 --- a/lustre/include/liblustre.h +++ b/lustre/include/liblustre.h @@ -737,6 +737,8 @@ typedef struct { volatile int counter; } atomic_t; #define atomic_sub(b,a) do {(a)->counter -= b;} while (0) #define atomic_sub_return(n,a) ((a)->counter -= n) #define atomic_dec_return(a) atomic_sub_return(1,a) +#define atomic_add_unless(v, a, u) ((v)->counter != u ? (v)->counter += a : 0) +#define atomic_inc_not_zero(v) atomic_add_unless((v), 1, 0) #ifndef likely #define likely(exp) (exp) diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h index 067beac..67d5ca6 100644 --- a/lustre/include/lustre_net.h +++ b/lustre/include/lustre_net.h @@ -321,15 +321,12 @@ struct ptlrpc_request { rq_no_delay:1, rq_net_err:1, rq_early:1, rq_must_unlink:1, /* server-side flags */ rq_packed_final:1, /* packed final reply */ - rq_sent_final:1, /* stop sending early replies */ rq_hp:1, /* high priority RPC */ rq_at_linked:1, /* link into service's srv_at_array */ rq_reply_truncate:1, /* reply is truncated */ rq_fake:1, /* fake request - just for timeout only */ - /* a copy of the request is queued to replay during recovery */ - rq_copy_queued:1, - /* whether the rquest is a copy of another replay request */ - rq_copy:1; + /* the request is queued to replay during recovery */ + rq_copy_queued:1; enum rq_phase rq_phase; /* one of RQ_PHASE_* */ enum rq_phase rq_next_phase; /* one of RQ_PHASE_* to be used next */ atomic_t rq_refcount; /* client-side refcount for SENT race, @@ -910,6 +907,7 @@ int liblustre_check_services (void *arg); void ptlrpc_daemonize(char *name); int ptlrpc_service_health_check(struct ptlrpc_service *); void ptlrpc_hpreq_reorder(struct ptlrpc_request *req); +void ptlrpc_server_drop_request(struct ptlrpc_request *req); struct ptlrpc_svc_data { @@ -1157,9 +1155,11 @@ int ptlrpc_obd_ping(struct obd_device *obd); #ifdef __KERNEL__ void ping_evictor_start(void); void ping_evictor_stop(void); +int ping_evictor_wake(struct obd_export *exp); #else #define ping_evictor_start() do {} while (0) #define ping_evictor_stop() do {} while (0) +#define ping_evictor_wake(exp) 1 #endif /* ptlrpc/ptlrpcd.c */ diff --git a/lustre/ldlm/ldlm_lib.c b/lustre/ldlm/ldlm_lib.c index 0faab9c..9d96387 100644 --- a/lustre/ldlm/ldlm_lib.c +++ b/lustre/ldlm/ldlm_lib.c @@ -649,6 +649,7 @@ int target_recovery_check_and_stop(struct obd_device *obd) } /* always check versions now */ obd->obd_version_recov = 1; + cfs_waitq_signal(&obd->obd_next_transno_waitq); spin_unlock_bh(&obd->obd_processing_task_lock); /* reset timer, recovery will proceed with versions now */ reset_recovery_timer(obd, OBD_RECOVERY_TIME_SOFT, 1); @@ -1165,12 +1166,26 @@ static void target_exp_dequeue_req_replay(struct ptlrpc_request *req) spin_unlock(&req->rq_export->exp_lock); } -static void target_release_saved_req(struct ptlrpc_request *req) +static void target_request_copy_get(struct ptlrpc_request *req) +{ + /* mark that request is in recovery queue, so request handler will not + * drop rpc count in export, bug 19870*/ + LASSERT(!req->rq_copy_queued); + req->rq_copy_queued = 1; + /* increase refcount to keep request in queue */ + atomic_inc(&req->rq_refcount); +} + +static void target_request_copy_put(struct ptlrpc_request *req) { - ptlrpc_req_drop_rs(req); - class_export_put(req->rq_export); - OBD_FREE(req->rq_reqmsg, req->rq_reqlen); - OBD_FREE(req, sizeof *req); + LASSERTF(list_empty(&req->rq_replay_list), "next: %p, prev: %p\n", + req->rq_replay_list.next, req->rq_replay_list.prev); + /* class_export_rpc_get was done before handling request, + * drop it early to allow new requests, see bug 19870. + */ + LASSERT(req->rq_copy_queued); + class_export_rpc_put(req->rq_export); + ptlrpc_server_drop_request(req); } static void target_send_delayed_replies(struct obd_device *obd) @@ -1195,7 +1210,7 @@ static void target_send_delayed_replies(struct obd_device *obd) list_del_init(&req->rq_list); DEBUG_REQ(D_HA, req, "delayed:"); ptlrpc_reply(req); - target_release_saved_req(req); + target_request_copy_put(req); } obd->obd_recovery_end = cfs_time_current_sec(); } @@ -1249,10 +1264,7 @@ static void abort_recovery_queue(struct obd_device *obd) else DEBUG_REQ(D_ERROR, req, "packing failed for abort-reply; skipping"); - - LASSERT(req->rq_copy); - class_export_rpc_put(req->rq_export); - target_release_saved_req(req); + target_request_copy_put(req); } } @@ -1287,7 +1299,7 @@ void target_cleanup_recovery(struct obd_device *obd) list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) { req = list_entry(tmp, struct ptlrpc_request, rq_list); list_del(&req->rq_list); - target_release_saved_req(req); + target_request_copy_put(req); } CFS_INIT_LIST_HEAD(&clean_list); @@ -1298,10 +1310,7 @@ void target_cleanup_recovery(struct obd_device *obd) req = list_entry(tmp, struct ptlrpc_request, rq_list); target_exp_dequeue_req_replay(req); list_del_init(&req->rq_list); - - LASSERT(req->rq_copy); - class_export_rpc_put(req->rq_export); - target_release_saved_req(req); + target_request_copy_put(req); } EXIT; } @@ -1361,6 +1370,21 @@ static void target_recovery_expired(unsigned long castmeharder) obd->obd_abort_recovery = 1; cfs_waitq_signal(&obd->obd_next_transno_waitq); spin_unlock_bh(&obd->obd_processing_task_lock); + + /* bug 18948: + * The recovery timer expired and target_check_and_stop_recovery() + * must be called. We cannot call it directly because we are in + * interrupt context, so we need to wake up another thread to call it. + * This may happen if there are obd->obd_next_transno_waitq waiters, + * or if we happen to handle a connect request. However, we cannot + * count on either of those things so we wake up the ping evictor + * and leverage it's context to complete recovery. + * + * Note: HEAD has a separate recovery thread and handle this. + */ + spin_lock(&obd->obd_dev_lock); + ping_evictor_wake(obd->obd_self_export); + spin_unlock(&obd->obd_dev_lock); } /* obd_processing_task_lock should be held */ @@ -1553,14 +1577,7 @@ static void process_recovery_queue(struct obd_device *obd) obd->obd_next_recovery_transno++; spin_unlock_bh(&obd->obd_processing_task_lock); target_exp_dequeue_req_replay(req); - - LASSERT(req->rq_copy); - class_export_rpc_put(req->rq_export); - - class_export_put(req->rq_export); - ptlrpc_req_drop_rs(req); - OBD_FREE(req->rq_reqmsg, req->rq_reqlen); - OBD_FREE(req, sizeof *req); + target_request_copy_put(req); OBD_RACE(OBD_FAIL_TGT_REPLAY_DELAY); spin_lock_bh(&obd->obd_processing_task_lock); if (list_empty(&obd->obd_recovery_queue)) { @@ -1580,10 +1597,7 @@ int target_queue_recovery_request(struct ptlrpc_request *req, struct list_head *tmp; int inserted = 0; __u64 transno = lustre_msg_get_transno(req->rq_reqmsg); - struct ptlrpc_request *saved_req, *orig_req; - struct lustre_msg *reqmsg; - int rc = 0; - + ENTRY; /* CAVEAT EMPTOR: The incoming request message has been swabbed * (i.e. buflens etc are in my own byte order), but type-dependent * buffers (eg mds_body, ost_body etc) have NOT been swabbed. */ @@ -1591,20 +1605,10 @@ int target_queue_recovery_request(struct ptlrpc_request *req, if (!transno) { CFS_INIT_LIST_HEAD(&req->rq_list); DEBUG_REQ(D_HA, req, "not queueing"); - return 1; + RETURN(1); } - /* XXX If I were a real man, these LBUGs would be sane cleanups. */ - /* XXX just like the request-dup code in queue_final_reply */ - OBD_ALLOC(saved_req, sizeof *saved_req); - if (!saved_req) - LBUG(); - OBD_ALLOC(reqmsg, req->rq_reqlen); - if (!reqmsg) - LBUG(); - spin_lock_bh(&obd->obd_processing_task_lock); - /* If we're processing the queue, we want don't want to queue this * message. * @@ -1620,27 +1624,18 @@ int target_queue_recovery_request(struct ptlrpc_request *req, /* Processing the queue right now, don't re-add. */ LASSERT(list_empty(&req->rq_list)); spin_unlock_bh(&obd->obd_processing_task_lock); - GOTO(err_free, rc = 1); + RETURN(1); } if (unlikely(OBD_FAIL_CHECK(OBD_FAIL_TGT_REPLAY_DROP))) { spin_unlock_bh(&obd->obd_processing_task_lock); - GOTO(err_free, rc = 0); + RETURN(0); } - memcpy(saved_req, req, sizeof *req); - memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen); - orig_req = req; - req = saved_req; - req->rq_reqmsg = reqmsg; - class_export_get(req->rq_export); - CFS_INIT_LIST_HEAD(&req->rq_list); - CFS_INIT_LIST_HEAD(&req->rq_replay_list); - if (target_exp_enqueue_req_replay(req)) { spin_unlock_bh(&obd->obd_processing_task_lock); DEBUG_REQ(D_ERROR, req, "dropping resent queued req"); - GOTO(err_exp, rc = 0); + RETURN(0); } /* XXX O(n^2) */ @@ -1660,7 +1655,7 @@ int target_queue_recovery_request(struct ptlrpc_request *req, DEBUG_REQ(D_ERROR, req, "dropping replay: transno " "has been claimed by another client"); target_exp_dequeue_req_replay(req); - GOTO(err_exp, rc = 0); + RETURN(0); } } @@ -1668,9 +1663,8 @@ int target_queue_recovery_request(struct ptlrpc_request *req, list_add_tail(&req->rq_list, &obd->obd_recovery_queue); } + target_request_copy_get(req); obd->obd_requests_queued_for_recovery++; - orig_req->rq_copy_queued = 1; - req->rq_copy = 1; if (obd->obd_processing_task != 0) { /* Someone else is processing this queue, we'll leave it to @@ -1678,7 +1672,7 @@ int target_queue_recovery_request(struct ptlrpc_request *req, */ cfs_waitq_signal(&obd->obd_next_transno_waitq); spin_unlock_bh(&obd->obd_processing_task_lock); - return 0; + RETURN(0); } /* Nobody is processing, and we know there's (at least) one to process @@ -1690,14 +1684,7 @@ int target_queue_recovery_request(struct ptlrpc_request *req, spin_unlock_bh(&obd->obd_processing_task_lock); process_recovery_queue(obd); - return 0; - -err_exp: - class_export_put(req->rq_export); -err_free: - OBD_FREE(reqmsg, req->rq_reqlen); - OBD_FREE(saved_req, sizeof(*saved_req)); - return rc; + RETURN(0); } struct obd_device * target_req2obd(struct ptlrpc_request *req) @@ -1708,8 +1695,6 @@ struct obd_device * target_req2obd(struct ptlrpc_request *req) int target_queue_last_replay_reply(struct ptlrpc_request *req, int rc) { struct obd_device *obd = target_req2obd(req); - struct ptlrpc_request *saved_req; - struct lustre_msg *reqmsg; struct obd_export *exp = req->rq_export; int recovery_done = 0, delayed_done = 0; @@ -1725,17 +1710,6 @@ int target_queue_last_replay_reply(struct ptlrpc_request *req, int rc) LASSERT(!req->rq_reply_state->rs_difficult); LASSERT(list_empty(&req->rq_list)); - /* XXX a bit like the request-dup code in queue_recovery_request */ - OBD_ALLOC(saved_req, sizeof *saved_req); - if (!saved_req) - return -ENOMEM; - OBD_ALLOC(reqmsg, req->rq_reqlen); - if (!reqmsg) { - OBD_FREE(saved_req, sizeof *req); - return -ENOMEM; - } - *saved_req = *req; - memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen); /* Don't race cleanup */ spin_lock_bh(&obd->obd_processing_task_lock); @@ -1745,12 +1719,7 @@ int target_queue_last_replay_reply(struct ptlrpc_request *req, int rc) } if (!exp->exp_vbr_failed) { - ptlrpc_rs_addref(req->rq_reply_state); /* +1 ref for saved reply */ - req = saved_req; - req->rq_reqmsg = reqmsg; - CFS_INIT_LIST_HEAD(&req->rq_list); - CFS_INIT_LIST_HEAD(&req->rq_replay_list); - class_export_get(exp); + target_request_copy_get(req); list_add(&req->rq_list, &obd->obd_delayed_reply_queue); } @@ -1812,8 +1781,6 @@ int target_queue_last_replay_reply(struct ptlrpc_request *req, int rc) CWARN("%s: disconnect export %s\n", obd->obd_name, exp->exp_client_uuid.uuid); class_fail_export(exp); - OBD_FREE(reqmsg, req->rq_reqlen); - OBD_FREE(saved_req, sizeof *req); req->rq_status = 0; ptlrpc_send_reply(req, 0); } @@ -1821,8 +1788,6 @@ int target_queue_last_replay_reply(struct ptlrpc_request *req, int rc) return 1; out_noconn: - OBD_FREE(reqmsg, req->rq_reqlen); - OBD_FREE(saved_req, sizeof *req); req->rq_status = -ENOTCONN; /* rv is ignored anyhow */ return -ENOTCONN; diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index 01fc2a2..56a40fb 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -2428,6 +2428,9 @@ static int filter_connect(struct lustre_handle *conn, struct obd_device *obd, if (conn == NULL || obd == NULL || cluuid == NULL) RETURN(-EINVAL); + /* Check for aborted recovery. */ + target_recovery_check_and_stop(obd); + rc = class_connect(conn, obd, cluuid); if (rc) RETURN(rc); diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index 95fadeb..6722ba4 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -199,16 +199,15 @@ void ptlrpc_at_set_req_timeout(struct ptlrpc_request *req) if (AT_OFF) { /* non-AT settings */ req->rq_timeout = req->rq_import->imp_server_timeout ? - obd_timeout / 2 : obd_timeout; - lustre_msg_set_timeout(req->rq_reqmsg, req->rq_timeout); - return; + obd_timeout / 2 : obd_timeout; + } else { + at = &req->rq_import->imp_at; + idx = import_at_get_index(req->rq_import, + req->rq_request_portal); + serv_est = at_get(&at->iat_service_estimate[idx]); + req->rq_timeout = at_est2timeout(serv_est); } - at = &req->rq_import->imp_at; - idx = import_at_get_index(req->rq_import, - req->rq_request_portal); - serv_est = at_get(&at->iat_service_estimate[idx]); - req->rq_timeout = at_est2timeout(serv_est); /* We could get even fancier here, using history to predict increased loading... */ @@ -225,11 +224,6 @@ static void ptlrpc_at_adj_service(struct ptlrpc_request *req, unsigned int oldse; struct imp_at *at; - /* do estimate only if is not in recovery */ - if ((req->rq_send_state != LUSTRE_IMP_FULL) && - (req->rq_send_state != LUSTRE_IMP_CONNECTING)) - return; - LASSERT(req->rq_import); at = &req->rq_import->imp_at; @@ -401,7 +395,6 @@ static int ptlrpc_at_recv_early_reply(struct ptlrpc_request *req) { /* Expecting to increase the service time estimate here */ ptlrpc_at_adj_service(req, lustre_msg_get_timeout(msg)); ptlrpc_at_adj_net_latency(req, lustre_msg_get_service_time(msg)); - /* Adjust the local timeout for this req */ ptlrpc_at_set_req_timeout(req); @@ -409,7 +402,7 @@ static int ptlrpc_at_recv_early_reply(struct ptlrpc_request *req) { /* Server assumes it now has rq_timeout from when it sent the early reply, so client should give it at least that long. */ req->rq_deadline = cfs_time_current_sec() + req->rq_timeout + - ptlrpc_at_get_net_latency(req); + ptlrpc_at_get_net_latency(req); DEBUG_REQ(D_ADAPTTO, req, "Early reply #%d, new deadline in %lds (%+lds)", diff --git a/lustre/ptlrpc/events.c b/lustre/ptlrpc/events.c index 3853b09..e664265 100644 --- a/lustre/ptlrpc/events.c +++ b/lustre/ptlrpc/events.c @@ -279,7 +279,12 @@ void request_in_callback(lnet_event_t *ev) req->rq_uid = ev->uid; #endif spin_lock_init(&req->rq_lock); + CFS_INIT_LIST_HEAD(&req->rq_list); CFS_INIT_LIST_HEAD(&req->rq_timed_list); + CFS_INIT_LIST_HEAD(&req->rq_replay_list); + CFS_INIT_LIST_HEAD(&req->rq_set_chain); + CFS_INIT_LIST_HEAD(&req->rq_history_list); + CFS_INIT_LIST_HEAD(&req->rq_exp_list); atomic_set(&req->rq_refcount, 1); if (ev->type == LNET_EVENT_PUT) DEBUG_REQ(D_RPCTRACE, req, "incoming req"); diff --git a/lustre/ptlrpc/import.c b/lustre/ptlrpc/import.c index c4d4222..d3b6a87 100644 --- a/lustre/ptlrpc/import.c +++ b/lustre/ptlrpc/import.c @@ -1216,9 +1216,11 @@ static int signal_completed_replay(struct obd_import *imp) if (imp->imp_delayed_recovery) lustre_msg_add_flags(req->rq_reqmsg, MSG_DELAY_REPLAY); - req->rq_timeout *= 3; req->rq_interpret_reply = completed_replay_interpret; + if (AT_OFF) + req->rq_timeout *= 3; + ptlrpcd_add_req(req); RETURN(0); } diff --git a/lustre/ptlrpc/pinger.c b/lustre/ptlrpc/pinger.c index a69e4e8..9ad3ca7 100644 --- a/lustre/ptlrpc/pinger.c +++ b/lustre/ptlrpc/pinger.c @@ -602,6 +602,11 @@ static int ping_evictor_main(void *arg) obd = pet_exp->exp_obd; spin_unlock(&pet_lock); + /* bug 18948: ensure recovery is aborted in a timely fashion */ + if (target_recovery_check_and_stop(obd) || + obd->obd_recovering /* no evictor during recovery */) + GOTO(skip, 0); + expire_time = cfs_time_current_sec() - PING_EVICT_TIMEOUT; CDEBUG(D_HA, "evicting all exports of obd %s older than %ld\n", @@ -637,7 +642,7 @@ static int ping_evictor_main(void *arg) } } spin_unlock(&obd->obd_dev_lock); - +skip: class_export_put(pet_exp); spin_lock(&pet_lock); diff --git a/lustre/ptlrpc/service.c b/lustre/ptlrpc/service.c index ce6482d..f1c21a2 100644 --- a/lustre/ptlrpc/service.c +++ b/lustre/ptlrpc/service.c @@ -428,7 +428,7 @@ static void ptlrpc_server_free_request(struct ptlrpc_request *req) * drop a reference count of the request. if it reaches 0, we either * put it into history list, or free it immediately. */ -static void ptlrpc_server_drop_request(struct ptlrpc_request *req) +void ptlrpc_server_drop_request(struct ptlrpc_request *req) { struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd; struct ptlrpc_service *svc = rqbd->rqbd_service; @@ -439,6 +439,24 @@ static void ptlrpc_server_drop_request(struct ptlrpc_request *req) if (!atomic_dec_and_test(&req->rq_refcount)) return; + spin_lock(&svc->srv_at_lock); + list_del_init(&req->rq_timed_list); + if (req->rq_at_linked) { + struct ptlrpc_at_array *array = &svc->srv_at_array; + __u32 index = req->rq_at_index; + + req->rq_at_linked = 0; + array->paa_reqs_count[index]--; + array->paa_count--; + } + spin_unlock(&svc->srv_at_lock); + + /* finalize request */ + if (req->rq_export) { + class_export_put(req->rq_export); + req->rq_export = NULL; + } + spin_lock(&svc->srv_lock); svc->srv_n_active_reqs--; @@ -512,29 +530,6 @@ static void ptlrpc_server_drop_request(struct ptlrpc_request *req) */ static void ptlrpc_server_finish_request(struct ptlrpc_request *req) { - struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service; - - if (req->rq_export) { - class_export_put(req->rq_export); - req->rq_export = NULL; - } - - if (req->rq_phase != RQ_PHASE_NEW) /* incorrect message magic */ - DEBUG_REQ(D_INFO, req, "free req"); - - spin_lock(&svc->srv_at_lock); - req->rq_sent_final = 1; - list_del_init(&req->rq_timed_list); - if (req->rq_at_linked) { - struct ptlrpc_at_array *array = &svc->srv_at_array; - __u32 index = req->rq_at_index; - - req->rq_at_linked = 0; - array->paa_reqs_count[index]--; - array->paa_count--; - } - spin_unlock(&svc->srv_at_lock); - ptlrpc_server_drop_request(req); } @@ -681,12 +676,6 @@ static int ptlrpc_at_add_timed(struct ptlrpc_request *req) return(-ENOSYS); spin_lock(&svc->srv_at_lock); - - if (unlikely(req->rq_sent_final)) { - spin_unlock(&svc->srv_at_lock); - return 0; - } - LASSERT(list_empty(&req->rq_timed_list)); index = (unsigned long)req->rq_deadline % array->paa_size; @@ -702,7 +691,7 @@ static int ptlrpc_at_add_timed(struct ptlrpc_request *req) } } } - + /* Add the request at the head of the list */ if (list_empty(&req->rq_timed_list)) list_add(&req->rq_timed_list, &array->paa_reqs_array[index]); @@ -723,8 +712,7 @@ static int ptlrpc_at_add_timed(struct ptlrpc_request *req) return 0; } -static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req, - int extra_time) +static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req) { struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service; struct ptlrpc_request *reqcopy; @@ -740,7 +728,7 @@ static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req, "%ssending early reply (deadline %+lds, margin %+lds) for " "%d+%d", AT_OFF ? "AT off - not " : "", olddl, olddl - at_get(&svc->srv_at_estimate), - at_get(&svc->srv_at_estimate), extra_time); + at_get(&svc->srv_at_estimate), at_extra); if (AT_OFF) RETURN(0); @@ -760,22 +748,20 @@ static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req, RETURN(-ENOSYS); } - if (req->rq_export && req->rq_export->exp_in_recovery) { - /* don't increase server estimates during recovery, and give - clients the full recovery time. */ - newdl = cfs_time_current_sec() + - req->rq_export->exp_obd->obd_recovery_timeout; + if (req->rq_export && + lustre_msg_get_flags(req->rq_reqmsg) & + (MSG_REPLAY | MSG_LAST_REPLAY)) { + /* Use at_extra as early reply period for recovery requests + * but keep it not longer than recovery time / 4 */ + at_add(&svc->srv_at_estimate, + min(at_extra, + req->rq_export->exp_obd->obd_recovery_timeout / 4)); } else { - if (extra_time) { - /* Fake our processing time into the future to ask the - clients for some extra amount of time */ - extra_time += cfs_time_current_sec() - - req->rq_arrival_time.tv_sec; - at_add(&svc->srv_at_estimate, extra_time); - } - newdl = req->rq_arrival_time.tv_sec + - at_get(&svc->srv_at_estimate); + /* Fake our processing time into the future to ask the + * clients for some extra amount of time */ + at_add(&svc->srv_at_estimate, at_extra); } + newdl = cfs_time_current_sec() + at_get(&svc->srv_at_estimate); if (req->rq_deadline >= newdl) { /* We're not adding any time, no need to send an early reply (e.g. maybe at adaptive_max) */ @@ -801,10 +787,12 @@ static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req, reqcopy->rq_reqmsg = reqmsg; memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen); - if (req->rq_sent_final) { + LASSERT(atomic_read(&req->rq_refcount)); + /** if it is last refcount then early reply isn't needed */ + if (atomic_read(&req->rq_refcount) == 1) { DEBUG_REQ(D_ADAPTTO, reqcopy, "Normal reply already sent out, " "abort sending early reply\n"); - GOTO(out, rc = 0); + GOTO(out, rc = -EINVAL); } /* Connection ref */ @@ -893,7 +881,14 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service *svc) list_for_each_entry_safe(rq, n, &array->paa_reqs_array[index], rq_timed_list) { if (rq->rq_deadline <= now + at_early_margin) { - list_move(&rq->rq_timed_list, &work_list); + list_del(&rq->rq_timed_list); + /** + * ptlrpc_server_drop_request() may drop + * refcount to 0 already. Let's check this and + * don't add entry to work_list + */ + if (likely(atomic_inc_not_zero(&rq->rq_refcount))) + list_add(&rq->rq_timed_list, &work_list); counter++; array->paa_reqs_count[index]--; array->paa_count--; @@ -931,25 +926,18 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service *svc) at_get(&svc->srv_at_estimate), delay); } - /* ptlrpc_server_finish_request may delete an entry out of - * the work list */ - spin_lock(&svc->srv_at_lock); + /* we took additional refcount so entries can't be deleted from list, no + * locking is needed */ while (!list_empty(&work_list)) { rq = list_entry(work_list.next, struct ptlrpc_request, rq_timed_list); list_del_init(&rq->rq_timed_list); - /* if the entry is still in the worklist, it hasn't been - deleted, and is safe to take a ref to keep the req around */ - atomic_inc(&rq->rq_refcount); - spin_unlock(&svc->srv_at_lock); - if (ptlrpc_at_send_early_reply(rq, at_extra) == 0) + if (ptlrpc_at_send_early_reply(rq) == 0) ptlrpc_at_add_timed(rq); ptlrpc_server_drop_request(rq); - spin_lock(&svc->srv_at_lock); } - spin_unlock(&svc->srv_at_lock); RETURN(0); } -- 1.8.3.1