/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
* vim:expandtab:shiftwidth=8:tabstop=8:
*
- * Copyright (C) 2002 Cluster File Systems, Inc.
+ * GPL HEADER START
*
- * This file is part of the Lustre file system, http://www.lustre.org
- * Lustre is a trademark of Cluster File Systems, Inc.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
- * You may have signed or agreed to another license before downloading
- * this software. If so, you are bound by the terms and conditions
- * of that agreement, and the following does not apply to you. See the
- * LICENSE file included with this distribution for more information.
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
*
- * If you did not agree to a different license, then this copy of Lustre
- * is open source software; you can redistribute it and/or modify it
- * under the terms of version 2 of the GNU General Public License as
- * published by the Free Software Foundation.
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
*
- * In either case, Lustre is distributed in the hope that it will be
- * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
- * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * license text for more details.
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
*
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright 2008 Sun Microsystems, Inc. All rights reserved
+ * Use is subject to license terms.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
*/
#define DEBUG_SUBSYSTEM S_RPC
}
void
-ptlrpc_commit_replies (struct obd_device *obd)
+ptlrpc_commit_replies (struct obd_export *exp)
{
struct list_head *tmp;
struct list_head *nxt;
* to attend to complete them. */
/* CAVEAT EMPTOR: spinlock ordering!!! */
- spin_lock(&obd->obd_uncommitted_replies_lock);
+ spin_lock(&exp->exp_uncommitted_replies_lock);
- list_for_each_safe (tmp, nxt, &obd->obd_uncommitted_replies) {
+ list_for_each_safe(tmp, nxt, &exp->exp_uncommitted_replies) {
struct ptlrpc_reply_state *rs =
list_entry(tmp, struct ptlrpc_reply_state, rs_obd_list);
- LASSERT (rs->rs_difficult);
-
- if (rs->rs_transno <= obd->obd_last_committed) {
+ LASSERT(rs->rs_difficult);
+ /* VBR: per-export last_committed */
+ LASSERT(rs->rs_export);
+ if (rs->rs_transno <= rs->rs_export->exp_last_committed) {
struct ptlrpc_service *svc = rs->rs_service;
spin_lock (&svc->srv_lock);
}
}
- spin_unlock(&obd->obd_uncommitted_replies_lock);
+ spin_unlock(&exp->exp_uncommitted_replies_lock);
}
static int
static void ptlrpc_at_timer(unsigned long castmeharder)
{
struct ptlrpc_service *svc = (struct ptlrpc_service *)castmeharder;
- CDEBUG(D_INFO, "at timer %s hit at %ld%s\n",
- svc->srv_name, cfs_time_current_sec(),
- list_empty(&svc->srv_at_list) ? ", empty" : "");
svc->srv_at_check = 1;
svc->srv_at_checktime = cfs_time_current();
cfs_waitq_signal(&svc->srv_waitq);
int req_portal, int rep_portal, int watchdog_factor,
svc_handler_t handler, char *name,
cfs_proc_dir_entry_t *proc_entry,
- svcreq_printfn_t svcreq_printfn,
- int min_threads, int max_threads, char *threadname)
+ svcreq_printfn_t svcreq_printfn,
+ int min_threads, int max_threads, char *threadname,
+ svc_hpreq_handler_t hp_handler)
{
int rc;
struct ptlrpc_service *service;
LASSERT (nbufs > 0);
LASSERT (bufsize >= max_req_size);
-
+
OBD_ALLOC(service, sizeof(*service));
if (service == NULL)
RETURN(NULL);
service->srv_threads_min = min_threads;
service->srv_threads_max = max_threads;
service->srv_thread_name = threadname;
+ service->srv_hpreq_handler = hp_handler;
+ service->srv_hpreq_ratio = PTLRPC_SVC_HP_RATIO;
+ service->srv_hpreq_count = 0;
+ service->srv_n_hpreq = 0;
rc = LNetSetLazyPortal(service->srv_req_portal);
LASSERT (rc == 0);
CFS_INIT_LIST_HEAD(&service->srv_request_queue);
+ CFS_INIT_LIST_HEAD(&service->srv_request_hpq);
CFS_INIT_LIST_HEAD(&service->srv_idle_rqbds);
CFS_INIT_LIST_HEAD(&service->srv_active_rqbds);
CFS_INIT_LIST_HEAD(&service->srv_history_rqbds);
CFS_INIT_LIST_HEAD(&service->srv_req_in_queue);
CFS_INIT_LIST_HEAD(&service->srv_at_list);
cfs_timer_init(&service->srv_at_timer, ptlrpc_at_timer, service);
- /* At SOW, service time should be quick; 10s seems generous. If client
+ /* At SOW, service time should be quick; 10s seems generous. If client
timeout is less than this, we'll be sending an early reply. */
at_init(&service->srv_at_estimate, 10, 0);
spin_lock (&ptlrpc_all_services_lock);
list_add (&service->srv_list, &ptlrpc_all_services);
spin_unlock (&ptlrpc_all_services_lock);
-
+
/* Now allocate the request buffers */
rc = ptlrpc_grow_req_bufs(service);
/* We shouldn't be under memory pressure at startup, so
return NULL;
}
-static void ptlrpc_server_req_decref(struct ptlrpc_request *req)
+/**
+ * to actually free the request, must be called without holding svc_lock.
+ * note it's caller's responsibility to unlink req->rq_list.
+ */
+static void ptlrpc_server_free_request(struct ptlrpc_request *req)
{
- struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd;
+ LASSERT(atomic_read(&req->rq_refcount) == 0);
+ LASSERT(list_empty(&req->rq_timed_list));
- if (!atomic_dec_and_test(&req->rq_refcount))
- return;
+ /* DEBUG_REQ() assumes the reply state of a request with a valid
+ * ref will not be destroyed until that reference is dropped. */
+ ptlrpc_req_drop_rs(req);
- LASSERT(list_empty(&req->rq_timed_list));
- if (req != &rqbd->rqbd_req) {
+ if (req != &req->rq_rqbd->rqbd_req) {
/* NB request buffers use an embedded
* req if the incoming req unlinked the
* MD; this isn't one of them! */
OBD_FREE(req, sizeof(*req));
- } else {
- struct ptlrpc_service *svc = rqbd->rqbd_service;
- /* schedule request buffer for re-use.
- * NB I can only do this after I've disposed of their
- * reqs; particularly the embedded req */
- spin_lock(&svc->srv_lock);
- list_add_tail(&rqbd->rqbd_list, &svc->srv_idle_rqbds);
- spin_unlock(&svc->srv_lock);
}
}
-static void __ptlrpc_server_free_request(struct ptlrpc_request *req)
-{
- list_del(&req->rq_list);
- ptlrpc_req_drop_rs(req);
- ptlrpc_server_req_decref(req);
-}
-
-static void ptlrpc_server_free_request(struct ptlrpc_request *req)
+/**
+ * drop a reference count of the request. if it reaches 0, we either
+ * put it into history list, or free it immediately.
+ */
+static void ptlrpc_server_drop_request(struct ptlrpc_request *req)
{
struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd;
struct ptlrpc_service *svc = rqbd->rqbd_service;
struct list_head *tmp;
struct list_head *nxt;
- if (req->rq_phase != RQ_PHASE_NEW) /* incorrect message magic */
- DEBUG_REQ(D_INFO, req, "free req");
- spin_lock(&svc->srv_at_lock);
- req->rq_sent_final = 1;
- list_del_init(&req->rq_timed_list);
- spin_unlock(&svc->srv_at_lock);
+ if (!atomic_dec_and_test(&req->rq_refcount))
+ return;
spin_lock(&svc->srv_lock);
req = list_entry(rqbd->rqbd_reqs.next,
struct ptlrpc_request,
rq_list);
- __ptlrpc_server_free_request(req);
+ list_del(&req->rq_list);
+ ptlrpc_server_free_request(req);
}
spin_lock(&svc->srv_lock);
+ /*
+ * now all reqs including the embedded req has been
+ * disposed, schedule request buffer for re-use.
+ */
+ LASSERT(atomic_read(&rqbd->rqbd_req.rq_refcount) == 0);
+ list_add_tail(&rqbd->rqbd_list, &svc->srv_idle_rqbds);
}
+
+ spin_unlock(&svc->srv_lock);
} else if (req->rq_reply_state && req->rq_reply_state->rs_prealloc) {
- /* If we are low on memory, we are not interested in
- history */
- list_del(&req->rq_history_list);
- __ptlrpc_server_free_request(req);
+ /* If we are low on memory, we are not interested in history */
+ list_del(&req->rq_list);
+ list_del_init(&req->rq_history_list);
+ spin_unlock(&svc->srv_lock);
+
+ ptlrpc_server_free_request(req);
+ } else {
+ spin_unlock(&svc->srv_lock);
}
+}
- spin_unlock(&svc->srv_lock);
+/**
+ * to finish a request: stop sending more early replies, and release
+ * the request. should be called after we finished handling the request.
+ */
+static void ptlrpc_server_finish_request(struct ptlrpc_request *req)
+{
+ struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service;
+
+ if (req->rq_export) {
+ class_export_put(req->rq_export);
+ req->rq_export = NULL;
+ }
+
+ if (req->rq_phase != RQ_PHASE_NEW) /* incorrect message magic */
+ DEBUG_REQ(D_INFO, req, "free req");
+
+ spin_lock(&svc->srv_at_lock);
+ req->rq_sent_final = 1;
+ list_del_init(&req->rq_timed_list);
+ spin_unlock(&svc->srv_at_lock);
+
+ ptlrpc_server_drop_request(req);
}
/* This function makes sure dead exports are evicted in a timely manner.
static void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay)
{
struct obd_export *oldest_exp;
- time_t oldest_time;
+ time_t oldest_time, new_time;
ENTRY;
of the list, we can be really lazy here - we don't have to evict
at the exact right moment. Eventually, all silent exports
will make it to the top of the list. */
- exp->exp_last_request_time = max(exp->exp_last_request_time,
- cfs_time_current_sec() + extra_delay);
+ /* Do not pay attention on 1sec or smaller renewals. */
+ new_time = cfs_time_current_sec() + extra_delay;
+ if (exp->exp_last_request_time + 1 /*second */ >= new_time)
+ RETURN_EXIT;
+
+ exp->exp_last_request_time = new_time;
CDEBUG(D_INFO, "updating export %s at %ld\n",
exp->exp_client_uuid.uuid,
exp->exp_last_request_time);
if (list_empty(&exp->exp_obd_chain_timed)) {
/* this one is not timed */
spin_unlock(&exp->exp_obd->obd_dev_lock);
- EXIT;
- return;
+ RETURN_EXIT;
}
list_move_tail(&exp->exp_obd_chain_timed,
oldest_time);
}
} else {
- if (cfs_time_current_sec() >
+ if (cfs_time_current_sec() >
(exp->exp_obd->obd_eviction_timer + extra_delay)) {
/* The evictor won't evict anyone who we've heard from
* recently, so we don't have to check before we start
static int ptlrpc_check_req(struct ptlrpc_request *req)
{
- if (lustre_msg_get_conn_cnt(req->rq_reqmsg) <
+ if (lustre_msg_get_conn_cnt(req->rq_reqmsg) <
req->rq_export->exp_conn_cnt) {
DEBUG_REQ(D_ERROR, req,
"DROPPING req from old connection %d < %d",
}
/* Set timer for closest deadline */
- rq = list_entry(svc->srv_at_list.next, struct ptlrpc_request,
+ rq = list_entry(svc->srv_at_list.next, struct ptlrpc_request,
rq_timed_list);
next = (__s32)(rq->rq_deadline - cfs_time_current_sec() -
at_early_margin);
- if (next <= 0)
+ if (next <= 0)
ptlrpc_at_timer((unsigned long)svc);
else
cfs_timer_arm(&svc->srv_at_timer, cfs_time_shift(next));
spin_unlock(&svc->srv_at_lock);
- CDEBUG(D_INFO, "armed %s at %+lds\n", svc->srv_name, next);
+ CDEBUG(D_INFO, "armed %s at %+ds\n", svc->srv_name, next);
}
/* Add rpc to early reply check list */
struct ptlrpc_request *rq;
int found = 0;
- if (AT_OFF)
+ if (AT_OFF)
return(0);
if ((lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT) == 0)
return(-ENOSYS);
-
- DEBUG_REQ(D_ADAPTTO, req, "add timed %lds",
- req->rq_deadline - cfs_time_current_sec());
-
+
spin_lock(&svc->srv_at_lock);
if (unlikely(req->rq_sent_final)) {
/* Add to sorted list. Presumably latest rpcs will have the latest
deadlines, so search backward. */
list_for_each_entry_reverse(rq, &svc->srv_at_list, rq_timed_list) {
- if (req->rq_deadline > rq->rq_deadline) {
+ if (req->rq_deadline >= rq->rq_deadline) {
list_add(&req->rq_timed_list, &rq->rq_timed_list);
found++;
break;
ptlrpc_at_set_timer(svc);
return 0;
-}
+}
-static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req,
+static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req,
int extra_time)
{
struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service;
time_t newdl;
int rc;
ENTRY;
-
- /* deadline is when the client expects us to reply, margin is the
+
+ /* deadline is when the client expects us to reply, margin is the
difference between clients' and servers' expectations */
- DEBUG_REQ(D_ADAPTTO, req,
+ DEBUG_REQ(D_ADAPTTO, req,
"%ssending early reply (deadline %+lds, margin %+lds) for "
"%d+%d", AT_OFF ? "AT off - not " : "",
olddl, olddl - at_get(&svc->srv_at_estimate),
at_get(&svc->srv_at_estimate), extra_time);
- if (AT_OFF)
+ if (AT_OFF)
RETURN(0);
-
+
if (olddl < 0) {
- CDEBUG(D_WARNING, "x"LPU64": Already past deadline (%+lds), not"
- " sending early reply. Increase at_early_margin (%d)?\n",
- req->rq_xid, olddl, at_early_margin);
+ DEBUG_REQ(D_WARNING, req, "Already past deadline (%+lds), "
+ "not sending early reply. Consider increasing "
+ "at_early_margin (%d)?", olddl, at_early_margin);
+
/* Return an error so we're not re-added to the timed list. */
RETURN(-ETIMEDOUT);
}
if ((lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT) == 0){
- CDEBUG(D_INFO, "Wanted to ask client for more time, but no AT "
- "support\n");
+ DEBUG_REQ(D_INFO, req, "Wanted to ask client for more time, "
+ "but no AT support");
RETURN(-ENOSYS);
}
- if (extra_time) {
- /* Fake our processing time into the future to ask the
- clients for some extra amount of time */
- extra_time += cfs_time_current_sec() -
- req->rq_arrival_time.tv_sec;
- at_add(&svc->srv_at_estimate, extra_time);
+ if (req->rq_export && req->rq_export->exp_in_recovery) {
+ /* don't increase server estimates during recovery, and give
+ clients the full recovery time. */
+ newdl = cfs_time_current_sec() +
+ req->rq_export->exp_obd->obd_recovery_timeout;
+ } else {
+ if (extra_time) {
+ /* Fake our processing time into the future to ask the
+ clients for some extra amount of time */
+ extra_time += cfs_time_current_sec() -
+ req->rq_arrival_time.tv_sec;
+ at_add(&svc->srv_at_estimate, extra_time);
+ }
+ newdl = req->rq_arrival_time.tv_sec +
+ at_get(&svc->srv_at_estimate);
}
-
- newdl = req->rq_arrival_time.tv_sec + at_get(&svc->srv_at_estimate);
if (req->rq_deadline >= newdl) {
/* We're not adding any time, no need to send an early reply
(e.g. maybe at adaptive_max) */
- CDEBUG(D_ADAPTTO, "x"LPU64": Couldn't add any time (%ld/%ld), "
- "not sending early reply\n", req->rq_xid, olddl,
- newdl - cfs_time_current_sec());
+ DEBUG_REQ(D_WARNING, req, "Couldn't add any time "
+ "(%ld/%ld), not sending early reply\n",
+ olddl, newdl - cfs_time_current_sec());
RETURN(-ETIMEDOUT);
}
OBD_FREE(reqcopy, sizeof *reqcopy);
RETURN(-ENOMEM);
}
-
+
*reqcopy = *req;
reqcopy->rq_reply_state = NULL;
reqcopy->rq_rep_swab_mask = 0;
memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen);
if (req->rq_sent_final) {
- CDEBUG(D_ADAPTTO, "x"LPU64": normal reply already sent out, "
- "abort sending early reply\n", req->rq_xid);
+ DEBUG_REQ(D_ADAPTTO, reqcopy, "Normal reply already sent out, "
+ "abort sending early reply\n");
GOTO(out, rc = 0);
}
/* RPC ref */
class_export_rpc_get(reqcopy->rq_export);
- if (reqcopy->rq_export->exp_obd &&
+ if (reqcopy->rq_export->exp_obd &&
reqcopy->rq_export->exp_obd->obd_fail)
GOTO(out_put, rc = -ENODEV);
rc = lustre_pack_reply_flags(reqcopy, 1, NULL, NULL, LPRFL_EARLY_REPLY);
- if (rc)
+ if (rc)
GOTO(out_put, rc);
rc = ptlrpc_send_reply(reqcopy, PTLRPC_REPLY_EARLY);
DEBUG_REQ(D_ERROR, req, "Early reply send failed %d", rc);
}
- /* Free the (early) reply state from lustre_pack_reply.
+ /* Free the (early) reply state from lustre_pack_reply.
(ptlrpc_send_reply takes it's own rs ref, so this is safe here) */
ptlrpc_req_drop_rs(reqcopy);
}
delay = cfs_time_sub(cfs_time_current(), svc->srv_at_checktime);
svc->srv_at_check = 0;
-
+
if (list_empty(&svc->srv_at_list)) {
spin_unlock(&svc->srv_at_lock);
- RETURN(0);
+ RETURN(0);
}
/* The timer went off, but maybe the nearest rpc already completed. */
/* We've still got plenty of time. Reset the timer. */
spin_unlock(&svc->srv_at_lock);
ptlrpc_at_set_timer(svc);
- RETURN(0);
+ RETURN(0);
}
- /* We're close to a timeout, and we don't know how much longer the
+ /* We're close to a timeout, and we don't know how much longer the
server will take. Send early replies to everyone expiring soon. */
CFS_INIT_LIST_HEAD(&work_list);
list_for_each_entry_safe(rq, n, &svc->srv_at_list, rq_timed_list) {
CDEBUG(D_ADAPTTO, "timeout in %+ds, asking for %d secs on %d early "
"replies\n", first, at_extra, counter);
-
+
if (first < 0) {
- /* We're already past request deadlines before we even get a
+ /* We're already past request deadlines before we even get a
chance to send early replies */
LCONSOLE_WARN("%s: This server is not able to keep up with "
"request traffic (cpu-bound).\n", svc->srv_name);
at_get(&svc->srv_at_estimate), delay);
}
- /* ptlrpc_server_free_request may delete an entry out of the work
- list */
+ /* ptlrpc_server_finish_request may delete an entry out of
+ * the work list */
spin_lock(&svc->srv_at_lock);
while (!list_empty(&work_list)) {
rq = list_entry(work_list.next, struct ptlrpc_request,
if (ptlrpc_at_send_early_reply(rq, at_extra) == 0)
ptlrpc_at_add_timed(rq);
- ptlrpc_server_req_decref(rq);
+ ptlrpc_server_drop_request(rq);
spin_lock(&svc->srv_at_lock);
}
spin_unlock(&svc->srv_at_lock);
RETURN(0);
}
+/**
+ * Put the request to the export list if the request may become
+ * a high priority one.
+ */
+static int ptlrpc_hpreq_init(struct ptlrpc_service *svc,
+ struct ptlrpc_request *req)
+{
+ int rc;
+ ENTRY;
+
+ if (svc->srv_hpreq_handler) {
+ rc = svc->srv_hpreq_handler(req);
+ if (rc)
+ RETURN(rc);
+ }
+ if (req->rq_export && req->rq_ops) {
+ spin_lock(&req->rq_export->exp_lock);
+ list_add(&req->rq_exp_list, &req->rq_export->exp_queued_rpc);
+ spin_unlock(&req->rq_export->exp_lock);
+ }
+
+ RETURN(0);
+}
+
+/** Remove the request from the export list. */
+static void ptlrpc_hpreq_fini(struct ptlrpc_request *req)
+{
+ ENTRY;
+ if (req->rq_export && req->rq_ops) {
+ spin_lock(&req->rq_export->exp_lock);
+ list_del_init(&req->rq_exp_list);
+ spin_unlock(&req->rq_export->exp_lock);
+ }
+ EXIT;
+}
+
+/**
+ * Make the request a high priority one.
+ *
+ * All the high priority requests are queued in a separate FIFO
+ * ptlrpc_service::srv_request_hpq list which is parallel to
+ * ptlrpc_service::srv_request_queue list but has a higher priority
+ * for handling.
+ *
+ * \see ptlrpc_server_handle_request().
+ */
+static void ptlrpc_hpreq_reorder_nolock(struct ptlrpc_service *svc,
+ struct ptlrpc_request *req)
+{
+ ENTRY;
+ LASSERT(svc != NULL);
+ spin_lock(&req->rq_lock);
+ if (req->rq_hp == 0) {
+ int opc = lustre_msg_get_opc(req->rq_reqmsg);
+
+ /* Add to the high priority queue. */
+ list_move_tail(&req->rq_list, &svc->srv_request_hpq);
+ req->rq_hp = 1;
+ if (opc != OBD_PING)
+ DEBUG_REQ(D_NET, req, "high priority req");
+ }
+ spin_unlock(&req->rq_lock);
+ EXIT;
+}
+
+void ptlrpc_hpreq_reorder(struct ptlrpc_request *req)
+{
+ struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service;
+ ENTRY;
+
+ spin_lock(&svc->srv_lock);
+ /* It may happen that the request is already taken for the processing
+ * but still in the export list, do not re-add it into the HP list. */
+ if (req->rq_phase == RQ_PHASE_NEW)
+ ptlrpc_hpreq_reorder_nolock(svc, req);
+ spin_unlock(&svc->srv_lock);
+ EXIT;
+}
+
+/** Check if the request if a high priority one. */
+static int ptlrpc_server_hpreq_check(struct ptlrpc_request *req)
+{
+ int opc, rc = 0;
+ ENTRY;
+
+ /* Check by request opc. */
+ opc = lustre_msg_get_opc(req->rq_reqmsg);
+ if (opc == OBD_PING)
+ RETURN(1);
+
+ /* Perform request specific check. */
+ if (req->rq_ops && req->rq_ops->hpreq_check)
+ rc = req->rq_ops->hpreq_check(req);
+ RETURN(rc);
+}
+
+/** Check if a request is a high priority one. */
+static int ptlrpc_server_request_add(struct ptlrpc_service *svc,
+ struct ptlrpc_request *req)
+{
+ int rc;
+ ENTRY;
+
+ rc = ptlrpc_server_hpreq_check(req);
+ if (rc < 0)
+ RETURN(rc);
+
+ spin_lock(&svc->srv_lock);
+ /* Before inserting the request into the queue, check if it is not
+ * inserted yet, or even already handled -- it may happen due to
+ * a racing ldlm_server_blocking_ast(). */
+ if (req->rq_phase == RQ_PHASE_NEW && list_empty(&req->rq_list)) {
+ if (rc)
+ ptlrpc_hpreq_reorder_nolock(svc, req);
+ else
+ list_add_tail(&req->rq_list, &svc->srv_request_queue);
+ }
+ spin_unlock(&svc->srv_lock);
+
+ RETURN(0);
+}
+
+/* Only allow normal priority requests on a service that has a high-priority
+ * queue if forced (i.e. cleanup), if there are other high priority requests
+ * already being processed (i.e. those threads can service more high-priority
+ * requests), or if there are enough idle threads that a later thread can do
+ * a high priority request. */
+static int ptlrpc_server_allow_normal(struct ptlrpc_service *svc, int force)
+{
+ return force || !svc->srv_hpreq_handler || svc->srv_n_hpreq > 0 ||
+ svc->srv_n_active_reqs < svc->srv_threads_running - 2;
+}
+
+static struct ptlrpc_request *
+ptlrpc_server_request_get(struct ptlrpc_service *svc, int force)
+{
+ struct ptlrpc_request *req = NULL;
+ ENTRY;
+
+ if (ptlrpc_server_allow_normal(svc, force) &&
+ !list_empty(&svc->srv_request_queue) &&
+ (list_empty(&svc->srv_request_hpq) ||
+ svc->srv_hpreq_count >= svc->srv_hpreq_ratio)) {
+ req = list_entry(svc->srv_request_queue.next,
+ struct ptlrpc_request, rq_list);
+ svc->srv_hpreq_count = 0;
+ } else if (!list_empty(&svc->srv_request_hpq)) {
+ req = list_entry(svc->srv_request_hpq.next,
+ struct ptlrpc_request, rq_list);
+ svc->srv_hpreq_count++;
+ }
+ RETURN(req);
+}
+
+static int ptlrpc_server_request_pending(struct ptlrpc_service *svc, int force)
+{
+ return ((ptlrpc_server_allow_normal(svc, force) &&
+ !list_empty(&svc->srv_request_queue)) ||
+ !list_empty(&svc->srv_request_hpq));
+}
+
/* Handle freshly incoming reqs, add to timed early reply list,
pass on to regular request queue */
static int
lustre_msg_get_handle(req->rq_reqmsg));
if (req->rq_export) {
rc = ptlrpc_check_req(req);
- class_export_put(req->rq_export);
- req->rq_export = NULL;
- if (rc)
+ if (rc)
goto err_req;
+ ptlrpc_update_export_timer(req->rq_export, 0);
}
/* req_in handling should/must be fast */
- if (cfs_time_current_sec() - req->rq_arrival_time.tv_sec > 5)
+ if (cfs_time_current_sec() - req->rq_arrival_time.tv_sec > 5)
DEBUG_REQ(D_WARNING, req, "Slow req_in handling %lus",
cfs_time_current_sec() - req->rq_arrival_time.tv_sec);
/* Set rpc server deadline and add it to the timed list */
deadline = (lustre_msghdr_get_flags(req->rq_reqmsg) &
- MSGHDR_AT_SUPPORT) ?
+ MSGHDR_AT_SUPPORT) ?
/* The max time the client expects us to take */
lustre_msg_get_timeout(req->rq_reqmsg) : obd_timeout;
req->rq_deadline = req->rq_arrival_time.tv_sec + deadline;
DEBUG_REQ(D_ERROR, req, "Dropping request with 0 timeout");
goto err_req;
}
-
+
ptlrpc_at_add_timed(req);
+ rc = ptlrpc_hpreq_init(svc, req);
+ if (rc)
+ GOTO(err_req, rc);
/* Move it over to the request processing queue */
- spin_lock(&svc->srv_lock);
- list_add_tail(&req->rq_list, &svc->srv_request_queue);
+ rc = ptlrpc_server_request_add(svc, req);
+ if (rc)
+ GOTO(err_req, rc);
cfs_waitq_signal(&svc->srv_waitq);
- spin_unlock(&svc->srv_lock);
RETURN(1);
err_req:
svc->srv_n_queued_reqs--;
svc->srv_n_active_reqs++;
spin_unlock(&svc->srv_lock);
- ptlrpc_server_free_request(req);
+ ptlrpc_server_finish_request(req);
RETURN(1);
}
struct timeval work_start;
struct timeval work_end;
long timediff;
- int rc;
+ int opc, rc;
+ int fail_opc = 0;
ENTRY;
LASSERT(svc);
spin_lock(&svc->srv_lock);
- if (list_empty (&svc->srv_request_queue) ||
+ if (!ptlrpc_server_request_pending(svc, 0) ||
(
#ifndef __KERNEL__
/* !@%$# liblustre only has 1 thread */
svc->srv_n_difficult_replies != 0 &&
#endif
svc->srv_n_active_reqs >= (svc->srv_threads_running - 1))) {
- /* Don't handle regular requests in the last thread, in order * remain free to handle any 'difficult' replies (that might
+ /* Don't handle regular requests in the last thread, in order
* to handle difficult replies (which might block other threads)
- * as well as handle any incoming reqs, early replies, etc.
+ * as well as handle any incoming reqs, early replies, etc.
* That means we always need at least 2 service threads. */
spin_unlock(&svc->srv_lock);
RETURN(0);
}
- request = list_entry (svc->srv_request_queue.next,
- struct ptlrpc_request, rq_list);
- list_del_init (&request->rq_list);
+ request = ptlrpc_server_request_get(svc, 0);
+ if (request == NULL) {
+ spin_unlock(&svc->srv_lock);
+ RETURN(0);
+ }
+
+ opc = lustre_msg_get_opc(request->rq_reqmsg);
+ if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT))
+ fail_opc = OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT;
+ else if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT))
+ fail_opc = OBD_FAIL_PTLRPC_HPREQ_TIMEOUT;
+
+ if (unlikely(fail_opc)) {
+ if (request->rq_export && request->rq_ops) {
+ spin_unlock(&svc->srv_lock);
+ OBD_FAIL_TIMEOUT(fail_opc, 4);
+ spin_lock(&svc->srv_lock);
+ request = ptlrpc_server_request_get(svc, 0);
+ if (request == NULL) {
+ spin_unlock(&svc->srv_lock);
+ RETURN(0);
+ }
+ LASSERT(ptlrpc_server_request_pending(svc, 0));
+ }
+ }
+
+ list_del_init(&request->rq_list);
svc->srv_n_queued_reqs--;
svc->srv_n_active_reqs++;
+ if (request->rq_hp)
+ svc->srv_n_hpreq++;
+
+ /* The phase is changed under the lock here because we need to know
+ * the request is under processing (see ptlrpc_hpreq_reorder()). */
+ ptlrpc_rqphase_move(request, RQ_PHASE_INTERPRET);
spin_unlock(&svc->srv_lock);
+ ptlrpc_hpreq_fini(request);
+
+ if(OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_DUMP_LOG))
+ libcfs_debug_dumplog();
+
do_gettimeofday(&work_start);
timediff = cfs_timeval_sub(&work_start, &request->rq_arrival_time,NULL);
if (svc->srv_stats != NULL) {
lprocfs_counter_add(svc->srv_stats, PTLRPC_TIMEOUT,
at_get(&svc->srv_at_estimate));
}
-
+
CDEBUG(D_NET, "got req "LPD64"\n", request->rq_xid);
-
- request->rq_svc_thread = thread;
- request->rq_export = class_conn2export(
- lustre_msg_get_handle(request->rq_reqmsg));
+ request->rq_svc_thread = thread;
if (request->rq_export) {
if (ptlrpc_check_req(request))
goto put_conn;
export = class_export_rpc_get(request->rq_export);
}
- /* Discard requests queued for longer than the deadline.
+ /* Discard requests queued for longer than the deadline.
The deadline is increased if we send an early reply. */
if (cfs_time_current_sec() > request->rq_deadline) {
DEBUG_REQ(D_ERROR, request, "Dropping timed-out request from %s"
goto put_rpc_export;
}
- request->rq_phase = RQ_PHASE_INTERPRET;
-
CDEBUG(D_RPCTRACE, "Handling RPC pname:cluuid+ref:pid:xid:nid:opc "
"%s:%s+%d:%d:x"LPU64":%s:%d\n", cfs_curproc_comm(),
(request->rq_export ?
OBD_FAIL_TIMEOUT_MS(OBD_FAIL_PTLRPC_PAUSE_REQ, obd_fail_val);
rc = svc->srv_handler(request);
-
- request->rq_phase = RQ_PHASE_COMPLETE;
+
+ ptlrpc_rqphase_move(request, RQ_PHASE_COMPLETE);
CDEBUG(D_RPCTRACE, "Handled RPC pname:cluuid+ref:pid:xid:nid:opc "
"%s:%s+%d:%d:x"LPU64":%s:%d\n", cfs_curproc_comm(),
class_export_rpc_put(export);
put_conn:
- if (request->rq_export != NULL)
- class_export_put(request->rq_export);
-
if (cfs_time_current_sec() > request->rq_deadline) {
DEBUG_REQ(D_WARNING, request, "Request x"LPU64" took longer "
"than estimated (%ld%+lds); client may timeout.",
cfs_timeval_sub(&work_end, &request->rq_arrival_time, NULL),
request->rq_repmsg ? lustre_msg_get_transno(request->rq_repmsg) :
request->rq_transno, request->rq_status,
- request->rq_repmsg ? lustre_msg_get_status(request->rq_repmsg):
+ request->rq_repmsg ? lustre_msg_get_status(request->rq_repmsg):
-999);
if (svc->srv_stats != NULL) {
__u32 op = lustre_msg_get_opc(request->rq_reqmsg);
}
}
if (request->rq_early_count) {
- DEBUG_REQ(D_ADAPTTO, request,
+ DEBUG_REQ(D_ADAPTTO, request,
"sent %d early replies before finishing in %lds",
request->rq_early_count,
work_end.tv_sec - request->rq_arrival_time.tv_sec);
}
- ptlrpc_server_free_request(request);
+ spin_lock(&svc->srv_lock);
+ if (request->rq_hp)
+ svc->srv_n_hpreq--;
+ spin_unlock(&svc->srv_lock);
+ ptlrpc_server_finish_request(request);
RETURN(1);
}
/* Disengage from notifiers carefully (lock order - irqrestore below!)*/
spin_unlock(&svc->srv_lock);
- spin_lock (&obd->obd_uncommitted_replies_lock);
+ spin_lock (&exp->exp_uncommitted_replies_lock);
/* Noop if removed already */
list_del_init (&rs->rs_obd_list);
- spin_unlock (&obd->obd_uncommitted_replies_lock);
+ spin_unlock (&exp->exp_uncommitted_replies_lock);
spin_lock (&exp->exp_lock);
/* Noop if removed already */
/* If we see this, we should already have seen the warning
* in mds_steal_ack_locks() */
CWARN("All locks stolen from rs %p x"LPD64".t"LPD64
- " o%d NID %s\n",
- rs,
- rs->rs_xid, rs->rs_transno,
+ " o%d NID %s\n", rs, rs->rs_xid, rs->rs_transno,
lustre_msg_get_opc(rs->rs_msg),
libcfs_nid2str(exp->exp_connection->c_peer.nid));
}
struct ptlrpc_thread *thread = data->thread;
struct obd_device *dev = data->dev;
struct ptlrpc_reply_state *rs;
- struct lc_watchdog *watchdog;
#ifdef WITH_GROUP_INFO
struct group_info *ginfo = NULL;
#endif
*/
cfs_waitq_signal(&thread->t_ctl_waitq);
- watchdog = lc_watchdog_add(max_t(int, obd_timeout, AT_OFF ? 0 :
- at_get(&svc->srv_at_estimate)) *
- svc->srv_watchdog_factor, NULL, NULL);
+ thread->t_watchdog = lc_watchdog_add(max_t(int, obd_timeout, AT_OFF ? 0 :
+ at_get(&svc->srv_at_estimate))
+ * svc->srv_watchdog_factor,
+ NULL, NULL);
spin_lock(&svc->srv_lock);
svc->srv_threads_running++;
struct l_wait_info lwi = LWI_TIMEOUT(svc->srv_rqbd_timeout,
ptlrpc_retry_rqbds, svc);
- lc_watchdog_disable(watchdog);
+ lc_watchdog_disable(thread->t_watchdog);
cond_resched();
svc->srv_rqbd_timeout == 0) ||
!list_empty(&svc->srv_req_in_queue) ||
!list_empty(&svc->srv_reply_queue) ||
- (!list_empty(&svc->srv_request_queue) &&
+ (ptlrpc_server_request_pending(svc, 0) &&
(svc->srv_n_active_reqs <
(svc->srv_threads_running - 1))) ||
svc->srv_at_check,
&lwi);
- lc_watchdog_touch_ms(watchdog, max_t(int, obd_timeout,
- AT_OFF ? 0 :
- at_get(&svc->srv_at_estimate)) *
+ lc_watchdog_touch_ms(thread->t_watchdog, max_t(int, obd_timeout,
+ AT_OFF ? 0 :
+ at_get(&svc->srv_at_estimate)) *
svc->srv_watchdog_factor);
ptlrpc_check_rqbd_pool(svc);
if (!list_empty(&svc->srv_req_in_queue)) {
/* Process all incoming reqs before handling any */
ptlrpc_server_handle_req_in(svc);
- /* but limit ourselves in case of flood */
+ /* but limit ourselves in case of flood */
if (counter++ < 1000)
continue;
counter = 0;
}
- if (svc->srv_at_check)
+ if (svc->srv_at_check)
ptlrpc_at_check_timed(svc);
/* don't handle requests in the last thread */
- if (!list_empty (&svc->srv_request_queue) &&
+ if (ptlrpc_server_request_pending(svc, 0) &&
(svc->srv_n_active_reqs < (svc->srv_threads_running - 1)))
ptlrpc_server_handle_request(svc, thread);
}
}
- lc_watchdog_delete(watchdog);
+ lc_watchdog_delete(thread->t_watchdog);
+ thread->t_watchdog = NULL;
out_srv_init:
/*
/* We require 2 threads min - see note in
* ptlrpc_server_handle_request() */
+
LASSERT(svc->srv_threads_min >= 2);
for (i = 0; i < svc->srv_threads_min; i++) {
rc = ptlrpc_start_thread(dev, svc);
d.thread = thread;
CDEBUG(D_RPCTRACE, "starting thread '%s'\n", name);
-
+
/* CLONE_VM and CLONE_FILES just avoid a needless copy, because we
* just drop the VM and FILES in ptlrpc_daemonize() right away.
*/
* its 'unlink' flag set for each posted rqbd */
list_for_each(tmp, &service->srv_active_rqbds) {
struct ptlrpc_request_buffer_desc *rqbd =
- list_entry(tmp, struct ptlrpc_request_buffer_desc,
+ list_entry(tmp, struct ptlrpc_request_buffer_desc,
rqbd_list);
rc = LNetMDUnlink(rqbd->rqbd_md_h);
/* Network access will complete in finite time but the HUGE
* timeout lets us CWARN for visibility of sluggish NALs */
- lwi = LWI_TIMEOUT(cfs_time_seconds(LONG_UNLINK), NULL, NULL);
+ lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(LONG_UNLINK),
+ cfs_time_seconds(1), NULL, NULL);
rc = l_wait_event(service->srv_waitq,
service->srv_nrqbd_receiving == 0,
&lwi);
list_del(&req->rq_list);
service->srv_n_queued_reqs--;
service->srv_n_active_reqs++;
- ptlrpc_server_free_request(req);
+ ptlrpc_server_finish_request(req);
}
- while (!list_empty(&service->srv_request_queue)) {
- struct ptlrpc_request *req =
- list_entry(service->srv_request_queue.next,
- struct ptlrpc_request,
- rq_list);
+ while (ptlrpc_server_request_pending(service, 1)) {
+ struct ptlrpc_request *req;
+ req = ptlrpc_server_request_get(service, 1);
list_del(&req->rq_list);
service->srv_n_queued_reqs--;
service->srv_n_active_reqs++;
- ptlrpc_server_free_request(req);
+ ptlrpc_hpreq_fini(req);
+ ptlrpc_server_finish_request(req);
}
LASSERT(service->srv_n_queued_reqs == 0);
LASSERT(service->srv_n_active_reqs == 0);
do_gettimeofday(&right_now);
spin_lock(&svc->srv_lock);
- if (list_empty(&svc->srv_request_queue)) {
+ if (!ptlrpc_server_request_pending(svc, 1)) {
spin_unlock(&svc->srv_lock);
return 0;
}
-
+
/* How long has the next entry been waiting? */
- request = list_entry(svc->srv_request_queue.next,
- struct ptlrpc_request, rq_list);
+ if (list_empty(&svc->srv_request_queue))
+ request = list_entry(svc->srv_request_hpq.next,
+ struct ptlrpc_request, rq_list);
+ else
+ request = list_entry(svc->srv_request_queue.next,
+ struct ptlrpc_request, rq_list);
timediff = cfs_timeval_sub(&right_now, &request->rq_arrival_time, NULL);
spin_unlock(&svc->srv_lock);
- if ((timediff / ONE_MILLION) > (AT_OFF ? obd_timeout * 3/2 :
+ if ((timediff / ONE_MILLION) > (AT_OFF ? obd_timeout * 3/2 :
at_max)) {
CERROR("%s: unhealthy - request has been waiting %lds\n",
svc->srv_name, timediff / ONE_MILLION);