#define atomic_sub(b,a) do {(a)->counter -= b;} while (0)
#define atomic_sub_return(n,a) ((a)->counter -= n)
#define atomic_dec_return(a) atomic_sub_return(1,a)
+#define atomic_add_unless(v, a, u) ((v)->counter != u ? (v)->counter += a : 0)
+#define atomic_inc_not_zero(v) atomic_add_unless((v), 1, 0)
#ifndef likely
#define likely(exp) (exp)
rq_no_delay:1, rq_net_err:1, rq_early:1, rq_must_unlink:1,
/* server-side flags */
rq_packed_final:1, /* packed final reply */
- rq_sent_final:1, /* stop sending early replies */
rq_hp:1, /* high priority RPC */
rq_at_linked:1, /* link into service's srv_at_array */
rq_reply_truncate:1, /* reply is truncated */
rq_fake:1, /* fake request - just for timeout only */
- /* a copy of the request is queued to replay during recovery */
- rq_copy_queued:1,
- /* whether the rquest is a copy of another replay request */
- rq_copy:1;
+ /* the request is queued to replay during recovery */
+ rq_copy_queued:1;
enum rq_phase rq_phase; /* one of RQ_PHASE_* */
enum rq_phase rq_next_phase; /* one of RQ_PHASE_* to be used next */
atomic_t rq_refcount; /* client-side refcount for SENT race,
void ptlrpc_daemonize(char *name);
int ptlrpc_service_health_check(struct ptlrpc_service *);
void ptlrpc_hpreq_reorder(struct ptlrpc_request *req);
+void ptlrpc_server_drop_request(struct ptlrpc_request *req);
struct ptlrpc_svc_data {
#ifdef __KERNEL__
void ping_evictor_start(void);
void ping_evictor_stop(void);
+int ping_evictor_wake(struct obd_export *exp);
#else
#define ping_evictor_start() do {} while (0)
#define ping_evictor_stop() do {} while (0)
+#define ping_evictor_wake(exp) 1
#endif
/* ptlrpc/ptlrpcd.c */
}
/* always check versions now */
obd->obd_version_recov = 1;
+ cfs_waitq_signal(&obd->obd_next_transno_waitq);
spin_unlock_bh(&obd->obd_processing_task_lock);
/* reset timer, recovery will proceed with versions now */
reset_recovery_timer(obd, OBD_RECOVERY_TIME_SOFT, 1);
spin_unlock(&req->rq_export->exp_lock);
}
-static void target_release_saved_req(struct ptlrpc_request *req)
+static void target_request_copy_get(struct ptlrpc_request *req)
+{
+ /* mark that request is in recovery queue, so request handler will not
+ * drop rpc count in export, bug 19870*/
+ LASSERT(!req->rq_copy_queued);
+ req->rq_copy_queued = 1;
+ /* increase refcount to keep request in queue */
+ atomic_inc(&req->rq_refcount);
+}
+
+static void target_request_copy_put(struct ptlrpc_request *req)
{
- ptlrpc_req_drop_rs(req);
- class_export_put(req->rq_export);
- OBD_FREE(req->rq_reqmsg, req->rq_reqlen);
- OBD_FREE(req, sizeof *req);
+ LASSERTF(list_empty(&req->rq_replay_list), "next: %p, prev: %p\n",
+ req->rq_replay_list.next, req->rq_replay_list.prev);
+ /* class_export_rpc_get was done before handling request,
+ * drop it early to allow new requests, see bug 19870.
+ */
+ LASSERT(req->rq_copy_queued);
+ class_export_rpc_put(req->rq_export);
+ ptlrpc_server_drop_request(req);
}
static void target_send_delayed_replies(struct obd_device *obd)
list_del_init(&req->rq_list);
DEBUG_REQ(D_HA, req, "delayed:");
ptlrpc_reply(req);
- target_release_saved_req(req);
+ target_request_copy_put(req);
}
obd->obd_recovery_end = cfs_time_current_sec();
}
else
DEBUG_REQ(D_ERROR, req,
"packing failed for abort-reply; skipping");
-
- LASSERT(req->rq_copy);
- class_export_rpc_put(req->rq_export);
- target_release_saved_req(req);
+ target_request_copy_put(req);
}
}
list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) {
req = list_entry(tmp, struct ptlrpc_request, rq_list);
list_del(&req->rq_list);
- target_release_saved_req(req);
+ target_request_copy_put(req);
}
CFS_INIT_LIST_HEAD(&clean_list);
req = list_entry(tmp, struct ptlrpc_request, rq_list);
target_exp_dequeue_req_replay(req);
list_del_init(&req->rq_list);
-
- LASSERT(req->rq_copy);
- class_export_rpc_put(req->rq_export);
- target_release_saved_req(req);
+ target_request_copy_put(req);
}
EXIT;
}
obd->obd_abort_recovery = 1;
cfs_waitq_signal(&obd->obd_next_transno_waitq);
spin_unlock_bh(&obd->obd_processing_task_lock);
+
+ /* bug 18948:
+ * The recovery timer expired and target_check_and_stop_recovery()
+ * must be called. We cannot call it directly because we are in
+ * interrupt context, so we need to wake up another thread to call it.
+ * This may happen if there are obd->obd_next_transno_waitq waiters,
+ * or if we happen to handle a connect request. However, we cannot
+ * count on either of those things so we wake up the ping evictor
+ * and leverage it's context to complete recovery.
+ *
+ * Note: HEAD has a separate recovery thread and handle this.
+ */
+ spin_lock(&obd->obd_dev_lock);
+ ping_evictor_wake(obd->obd_self_export);
+ spin_unlock(&obd->obd_dev_lock);
}
/* obd_processing_task_lock should be held */
obd->obd_next_recovery_transno++;
spin_unlock_bh(&obd->obd_processing_task_lock);
target_exp_dequeue_req_replay(req);
-
- LASSERT(req->rq_copy);
- class_export_rpc_put(req->rq_export);
-
- class_export_put(req->rq_export);
- ptlrpc_req_drop_rs(req);
- OBD_FREE(req->rq_reqmsg, req->rq_reqlen);
- OBD_FREE(req, sizeof *req);
+ target_request_copy_put(req);
OBD_RACE(OBD_FAIL_TGT_REPLAY_DELAY);
spin_lock_bh(&obd->obd_processing_task_lock);
if (list_empty(&obd->obd_recovery_queue)) {
struct list_head *tmp;
int inserted = 0;
__u64 transno = lustre_msg_get_transno(req->rq_reqmsg);
- struct ptlrpc_request *saved_req, *orig_req;
- struct lustre_msg *reqmsg;
- int rc = 0;
-
+ ENTRY;
/* CAVEAT EMPTOR: The incoming request message has been swabbed
* (i.e. buflens etc are in my own byte order), but type-dependent
* buffers (eg mds_body, ost_body etc) have NOT been swabbed. */
if (!transno) {
CFS_INIT_LIST_HEAD(&req->rq_list);
DEBUG_REQ(D_HA, req, "not queueing");
- return 1;
+ RETURN(1);
}
- /* XXX If I were a real man, these LBUGs would be sane cleanups. */
- /* XXX just like the request-dup code in queue_final_reply */
- OBD_ALLOC(saved_req, sizeof *saved_req);
- if (!saved_req)
- LBUG();
- OBD_ALLOC(reqmsg, req->rq_reqlen);
- if (!reqmsg)
- LBUG();
-
spin_lock_bh(&obd->obd_processing_task_lock);
-
/* If we're processing the queue, we want don't want to queue this
* message.
*
/* Processing the queue right now, don't re-add. */
LASSERT(list_empty(&req->rq_list));
spin_unlock_bh(&obd->obd_processing_task_lock);
- GOTO(err_free, rc = 1);
+ RETURN(1);
}
if (unlikely(OBD_FAIL_CHECK(OBD_FAIL_TGT_REPLAY_DROP))) {
spin_unlock_bh(&obd->obd_processing_task_lock);
- GOTO(err_free, rc = 0);
+ RETURN(0);
}
- memcpy(saved_req, req, sizeof *req);
- memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen);
- orig_req = req;
- req = saved_req;
- req->rq_reqmsg = reqmsg;
- class_export_get(req->rq_export);
- CFS_INIT_LIST_HEAD(&req->rq_list);
- CFS_INIT_LIST_HEAD(&req->rq_replay_list);
-
if (target_exp_enqueue_req_replay(req)) {
spin_unlock_bh(&obd->obd_processing_task_lock);
DEBUG_REQ(D_ERROR, req, "dropping resent queued req");
- GOTO(err_exp, rc = 0);
+ RETURN(0);
}
/* XXX O(n^2) */
DEBUG_REQ(D_ERROR, req, "dropping replay: transno "
"has been claimed by another client");
target_exp_dequeue_req_replay(req);
- GOTO(err_exp, rc = 0);
+ RETURN(0);
}
}
list_add_tail(&req->rq_list, &obd->obd_recovery_queue);
}
+ target_request_copy_get(req);
obd->obd_requests_queued_for_recovery++;
- orig_req->rq_copy_queued = 1;
- req->rq_copy = 1;
if (obd->obd_processing_task != 0) {
/* Someone else is processing this queue, we'll leave it to
*/
cfs_waitq_signal(&obd->obd_next_transno_waitq);
spin_unlock_bh(&obd->obd_processing_task_lock);
- return 0;
+ RETURN(0);
}
/* Nobody is processing, and we know there's (at least) one to process
spin_unlock_bh(&obd->obd_processing_task_lock);
process_recovery_queue(obd);
- return 0;
-
-err_exp:
- class_export_put(req->rq_export);
-err_free:
- OBD_FREE(reqmsg, req->rq_reqlen);
- OBD_FREE(saved_req, sizeof(*saved_req));
- return rc;
+ RETURN(0);
}
struct obd_device * target_req2obd(struct ptlrpc_request *req)
int target_queue_last_replay_reply(struct ptlrpc_request *req, int rc)
{
struct obd_device *obd = target_req2obd(req);
- struct ptlrpc_request *saved_req;
- struct lustre_msg *reqmsg;
struct obd_export *exp = req->rq_export;
int recovery_done = 0, delayed_done = 0;
LASSERT(!req->rq_reply_state->rs_difficult);
LASSERT(list_empty(&req->rq_list));
- /* XXX a bit like the request-dup code in queue_recovery_request */
- OBD_ALLOC(saved_req, sizeof *saved_req);
- if (!saved_req)
- return -ENOMEM;
- OBD_ALLOC(reqmsg, req->rq_reqlen);
- if (!reqmsg) {
- OBD_FREE(saved_req, sizeof *req);
- return -ENOMEM;
- }
- *saved_req = *req;
- memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen);
/* Don't race cleanup */
spin_lock_bh(&obd->obd_processing_task_lock);
}
if (!exp->exp_vbr_failed) {
- ptlrpc_rs_addref(req->rq_reply_state); /* +1 ref for saved reply */
- req = saved_req;
- req->rq_reqmsg = reqmsg;
- CFS_INIT_LIST_HEAD(&req->rq_list);
- CFS_INIT_LIST_HEAD(&req->rq_replay_list);
- class_export_get(exp);
+ target_request_copy_get(req);
list_add(&req->rq_list, &obd->obd_delayed_reply_queue);
}
CWARN("%s: disconnect export %s\n", obd->obd_name,
exp->exp_client_uuid.uuid);
class_fail_export(exp);
- OBD_FREE(reqmsg, req->rq_reqlen);
- OBD_FREE(saved_req, sizeof *req);
req->rq_status = 0;
ptlrpc_send_reply(req, 0);
}
return 1;
out_noconn:
- OBD_FREE(reqmsg, req->rq_reqlen);
- OBD_FREE(saved_req, sizeof *req);
req->rq_status = -ENOTCONN;
/* rv is ignored anyhow */
return -ENOTCONN;
if (conn == NULL || obd == NULL || cluuid == NULL)
RETURN(-EINVAL);
+ /* Check for aborted recovery. */
+ target_recovery_check_and_stop(obd);
+
rc = class_connect(conn, obd, cluuid);
if (rc)
RETURN(rc);
if (AT_OFF) {
/* non-AT settings */
req->rq_timeout = req->rq_import->imp_server_timeout ?
- obd_timeout / 2 : obd_timeout;
- lustre_msg_set_timeout(req->rq_reqmsg, req->rq_timeout);
- return;
+ obd_timeout / 2 : obd_timeout;
+ } else {
+ at = &req->rq_import->imp_at;
+ idx = import_at_get_index(req->rq_import,
+ req->rq_request_portal);
+ serv_est = at_get(&at->iat_service_estimate[idx]);
+ req->rq_timeout = at_est2timeout(serv_est);
}
- at = &req->rq_import->imp_at;
- idx = import_at_get_index(req->rq_import,
- req->rq_request_portal);
- serv_est = at_get(&at->iat_service_estimate[idx]);
- req->rq_timeout = at_est2timeout(serv_est);
/* We could get even fancier here, using history to predict increased
loading... */
unsigned int oldse;
struct imp_at *at;
- /* do estimate only if is not in recovery */
- if ((req->rq_send_state != LUSTRE_IMP_FULL) &&
- (req->rq_send_state != LUSTRE_IMP_CONNECTING))
- return;
-
LASSERT(req->rq_import);
at = &req->rq_import->imp_at;
/* Expecting to increase the service time estimate here */
ptlrpc_at_adj_service(req, lustre_msg_get_timeout(msg));
ptlrpc_at_adj_net_latency(req, lustre_msg_get_service_time(msg));
-
/* Adjust the local timeout for this req */
ptlrpc_at_set_req_timeout(req);
/* Server assumes it now has rq_timeout from when it sent the
early reply, so client should give it at least that long. */
req->rq_deadline = cfs_time_current_sec() + req->rq_timeout +
- ptlrpc_at_get_net_latency(req);
+ ptlrpc_at_get_net_latency(req);
DEBUG_REQ(D_ADAPTTO, req,
"Early reply #%d, new deadline in %lds (%+lds)",
req->rq_uid = ev->uid;
#endif
spin_lock_init(&req->rq_lock);
+ CFS_INIT_LIST_HEAD(&req->rq_list);
CFS_INIT_LIST_HEAD(&req->rq_timed_list);
+ CFS_INIT_LIST_HEAD(&req->rq_replay_list);
+ CFS_INIT_LIST_HEAD(&req->rq_set_chain);
+ CFS_INIT_LIST_HEAD(&req->rq_history_list);
+ CFS_INIT_LIST_HEAD(&req->rq_exp_list);
atomic_set(&req->rq_refcount, 1);
if (ev->type == LNET_EVENT_PUT)
DEBUG_REQ(D_RPCTRACE, req, "incoming req");
if (imp->imp_delayed_recovery)
lustre_msg_add_flags(req->rq_reqmsg, MSG_DELAY_REPLAY);
- req->rq_timeout *= 3;
req->rq_interpret_reply = completed_replay_interpret;
+ if (AT_OFF)
+ req->rq_timeout *= 3;
+
ptlrpcd_add_req(req);
RETURN(0);
}
obd = pet_exp->exp_obd;
spin_unlock(&pet_lock);
+ /* bug 18948: ensure recovery is aborted in a timely fashion */
+ if (target_recovery_check_and_stop(obd) ||
+ obd->obd_recovering /* no evictor during recovery */)
+ GOTO(skip, 0);
+
expire_time = cfs_time_current_sec() - PING_EVICT_TIMEOUT;
CDEBUG(D_HA, "evicting all exports of obd %s older than %ld\n",
}
}
spin_unlock(&obd->obd_dev_lock);
-
+skip:
class_export_put(pet_exp);
spin_lock(&pet_lock);
* drop a reference count of the request. if it reaches 0, we either
* put it into history list, or free it immediately.
*/
-static void ptlrpc_server_drop_request(struct ptlrpc_request *req)
+void ptlrpc_server_drop_request(struct ptlrpc_request *req)
{
struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd;
struct ptlrpc_service *svc = rqbd->rqbd_service;
if (!atomic_dec_and_test(&req->rq_refcount))
return;
+ spin_lock(&svc->srv_at_lock);
+ list_del_init(&req->rq_timed_list);
+ if (req->rq_at_linked) {
+ struct ptlrpc_at_array *array = &svc->srv_at_array;
+ __u32 index = req->rq_at_index;
+
+ req->rq_at_linked = 0;
+ array->paa_reqs_count[index]--;
+ array->paa_count--;
+ }
+ spin_unlock(&svc->srv_at_lock);
+
+ /* finalize request */
+ if (req->rq_export) {
+ class_export_put(req->rq_export);
+ req->rq_export = NULL;
+ }
+
spin_lock(&svc->srv_lock);
svc->srv_n_active_reqs--;
*/
static void ptlrpc_server_finish_request(struct ptlrpc_request *req)
{
- struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service;
-
- if (req->rq_export) {
- class_export_put(req->rq_export);
- req->rq_export = NULL;
- }
-
- if (req->rq_phase != RQ_PHASE_NEW) /* incorrect message magic */
- DEBUG_REQ(D_INFO, req, "free req");
-
- spin_lock(&svc->srv_at_lock);
- req->rq_sent_final = 1;
- list_del_init(&req->rq_timed_list);
- if (req->rq_at_linked) {
- struct ptlrpc_at_array *array = &svc->srv_at_array;
- __u32 index = req->rq_at_index;
-
- req->rq_at_linked = 0;
- array->paa_reqs_count[index]--;
- array->paa_count--;
- }
- spin_unlock(&svc->srv_at_lock);
-
ptlrpc_server_drop_request(req);
}
return(-ENOSYS);
spin_lock(&svc->srv_at_lock);
-
- if (unlikely(req->rq_sent_final)) {
- spin_unlock(&svc->srv_at_lock);
- return 0;
- }
-
LASSERT(list_empty(&req->rq_timed_list));
index = (unsigned long)req->rq_deadline % array->paa_size;
}
}
}
-
+
/* Add the request at the head of the list */
if (list_empty(&req->rq_timed_list))
list_add(&req->rq_timed_list, &array->paa_reqs_array[index]);
return 0;
}
-static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req,
- int extra_time)
+static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req)
{
struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service;
struct ptlrpc_request *reqcopy;
"%ssending early reply (deadline %+lds, margin %+lds) for "
"%d+%d", AT_OFF ? "AT off - not " : "",
olddl, olddl - at_get(&svc->srv_at_estimate),
- at_get(&svc->srv_at_estimate), extra_time);
+ at_get(&svc->srv_at_estimate), at_extra);
if (AT_OFF)
RETURN(0);
RETURN(-ENOSYS);
}
- if (req->rq_export && req->rq_export->exp_in_recovery) {
- /* don't increase server estimates during recovery, and give
- clients the full recovery time. */
- newdl = cfs_time_current_sec() +
- req->rq_export->exp_obd->obd_recovery_timeout;
+ if (req->rq_export &&
+ lustre_msg_get_flags(req->rq_reqmsg) &
+ (MSG_REPLAY | MSG_LAST_REPLAY)) {
+ /* Use at_extra as early reply period for recovery requests
+ * but keep it not longer than recovery time / 4 */
+ at_add(&svc->srv_at_estimate,
+ min(at_extra,
+ req->rq_export->exp_obd->obd_recovery_timeout / 4));
} else {
- if (extra_time) {
- /* Fake our processing time into the future to ask the
- clients for some extra amount of time */
- extra_time += cfs_time_current_sec() -
- req->rq_arrival_time.tv_sec;
- at_add(&svc->srv_at_estimate, extra_time);
- }
- newdl = req->rq_arrival_time.tv_sec +
- at_get(&svc->srv_at_estimate);
+ /* Fake our processing time into the future to ask the
+ * clients for some extra amount of time */
+ at_add(&svc->srv_at_estimate, at_extra);
}
+ newdl = cfs_time_current_sec() + at_get(&svc->srv_at_estimate);
if (req->rq_deadline >= newdl) {
/* We're not adding any time, no need to send an early reply
(e.g. maybe at adaptive_max) */
reqcopy->rq_reqmsg = reqmsg;
memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen);
- if (req->rq_sent_final) {
+ LASSERT(atomic_read(&req->rq_refcount));
+ /** if it is last refcount then early reply isn't needed */
+ if (atomic_read(&req->rq_refcount) == 1) {
DEBUG_REQ(D_ADAPTTO, reqcopy, "Normal reply already sent out, "
"abort sending early reply\n");
- GOTO(out, rc = 0);
+ GOTO(out, rc = -EINVAL);
}
/* Connection ref */
list_for_each_entry_safe(rq, n, &array->paa_reqs_array[index],
rq_timed_list) {
if (rq->rq_deadline <= now + at_early_margin) {
- list_move(&rq->rq_timed_list, &work_list);
+ list_del(&rq->rq_timed_list);
+ /**
+ * ptlrpc_server_drop_request() may drop
+ * refcount to 0 already. Let's check this and
+ * don't add entry to work_list
+ */
+ if (likely(atomic_inc_not_zero(&rq->rq_refcount)))
+ list_add(&rq->rq_timed_list, &work_list);
counter++;
array->paa_reqs_count[index]--;
array->paa_count--;
at_get(&svc->srv_at_estimate), delay);
}
- /* ptlrpc_server_finish_request may delete an entry out of
- * the work list */
- spin_lock(&svc->srv_at_lock);
+ /* we took additional refcount so entries can't be deleted from list, no
+ * locking is needed */
while (!list_empty(&work_list)) {
rq = list_entry(work_list.next, struct ptlrpc_request,
rq_timed_list);
list_del_init(&rq->rq_timed_list);
- /* if the entry is still in the worklist, it hasn't been
- deleted, and is safe to take a ref to keep the req around */
- atomic_inc(&rq->rq_refcount);
- spin_unlock(&svc->srv_at_lock);
- if (ptlrpc_at_send_early_reply(rq, at_extra) == 0)
+ if (ptlrpc_at_send_early_reply(rq) == 0)
ptlrpc_at_add_timed(rq);
ptlrpc_server_drop_request(rq);
- spin_lock(&svc->srv_at_lock);
}
- spin_unlock(&svc->srv_at_lock);
RETURN(0);
}