}
void
-ptlrpc_commit_replies (struct obd_device *obd)
+ptlrpc_commit_replies (struct obd_export *exp)
{
struct list_head *tmp;
struct list_head *nxt;
* to attend to complete them. */
/* CAVEAT EMPTOR: spinlock ordering!!! */
- spin_lock(&obd->obd_uncommitted_replies_lock);
+ spin_lock(&exp->exp_uncommitted_replies_lock);
- list_for_each_safe (tmp, nxt, &obd->obd_uncommitted_replies) {
+ list_for_each_safe(tmp, nxt, &exp->exp_uncommitted_replies) {
struct ptlrpc_reply_state *rs =
list_entry(tmp, struct ptlrpc_reply_state, rs_obd_list);
- LASSERT (rs->rs_difficult);
-
- if (rs->rs_transno <= obd->obd_last_committed) {
+ LASSERT(rs->rs_difficult);
+ /* VBR: per-export last_committed */
+ LASSERT(rs->rs_export);
+ if (rs->rs_transno <= rs->rs_export->exp_last_committed) {
struct ptlrpc_service *svc = rs->rs_service;
spin_lock (&svc->srv_lock);
}
}
- spin_unlock(&obd->obd_uncommitted_replies_lock);
+ spin_unlock(&exp->exp_uncommitted_replies_lock);
}
static int
oldest_time);
}
} else {
- if (cfs_time_current_sec() >
+ if (cfs_time_current_sec() >
(exp->exp_obd->obd_eviction_timer + extra_delay)) {
/* The evictor won't evict anyone who we've heard from
* recently, so we don't have to check before we start
static int ptlrpc_check_req(struct ptlrpc_request *req)
{
- if (lustre_msg_get_conn_cnt(req->rq_reqmsg) <
+ if (lustre_msg_get_conn_cnt(req->rq_reqmsg) <
req->rq_export->exp_conn_cnt) {
DEBUG_REQ(D_ERROR, req,
"DROPPING req from old connection %d < %d",
}
/* Set timer for closest deadline */
- rq = list_entry(svc->srv_at_list.next, struct ptlrpc_request,
+ rq = list_entry(svc->srv_at_list.next, struct ptlrpc_request,
rq_timed_list);
next = (__s32)(rq->rq_deadline - cfs_time_current_sec() -
at_early_margin);
- if (next <= 0)
+ if (next <= 0)
ptlrpc_at_timer((unsigned long)svc);
else
cfs_timer_arm(&svc->srv_at_timer, cfs_time_shift(next));
struct ptlrpc_request *rq;
int found = 0;
- if (AT_OFF)
+ if (AT_OFF)
return(0);
if ((lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT) == 0)
return(-ENOSYS);
-
- DEBUG_REQ(D_ADAPTTO, req, "add timed %lds",
+
+ DEBUG_REQ(D_ADAPTTO, req, "add timed %lds",
req->rq_deadline - cfs_time_current_sec());
-
+
spin_lock(&svc->srv_at_lock);
if (unlikely(req->rq_sent_final)) {
ptlrpc_at_set_timer(svc);
return 0;
-}
+}
static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req,
int extra_time)
time_t newdl;
int rc;
ENTRY;
-
- /* deadline is when the client expects us to reply, margin is the
+
+ /* deadline is when the client expects us to reply, margin is the
difference between clients' and servers' expectations */
- DEBUG_REQ(D_ADAPTTO, req,
+ DEBUG_REQ(D_ADAPTTO, req,
"%ssending early reply (deadline %+lds, margin %+lds) for "
"%d+%d", AT_OFF ? "AT off - not " : "",
olddl, olddl - at_get(&svc->srv_at_estimate),
at_get(&svc->srv_at_estimate), extra_time);
- if (AT_OFF)
+ if (AT_OFF)
RETURN(0);
-
+
if (olddl < 0) {
CDEBUG(D_WARNING, "x"LPU64": Already past deadline (%+lds), not"
" sending early reply. Increase at_early_margin (%d)?\n",
OBD_FREE(reqcopy, sizeof *reqcopy);
RETURN(-ENOMEM);
}
-
+
*reqcopy = *req;
reqcopy->rq_reply_state = NULL;
reqcopy->rq_rep_swab_mask = 0;
/* RPC ref */
class_export_rpc_get(reqcopy->rq_export);
- if (reqcopy->rq_export->exp_obd &&
+ if (reqcopy->rq_export->exp_obd &&
reqcopy->rq_export->exp_obd->obd_fail)
GOTO(out_put, rc = -ENODEV);
DEBUG_REQ(D_ERROR, req, "Early reply send failed %d", rc);
}
- /* Free the (early) reply state from lustre_pack_reply.
+ /* Free the (early) reply state from lustre_pack_reply.
(ptlrpc_send_reply takes it's own rs ref, so this is safe here) */
ptlrpc_req_drop_rs(reqcopy);
}
delay = cfs_time_sub(cfs_time_current(), svc->srv_at_checktime);
svc->srv_at_check = 0;
-
+
if (list_empty(&svc->srv_at_list)) {
spin_unlock(&svc->srv_at_lock);
- RETURN(0);
+ RETURN(0);
}
/* The timer went off, but maybe the nearest rpc already completed. */
/* We've still got plenty of time. Reset the timer. */
spin_unlock(&svc->srv_at_lock);
ptlrpc_at_set_timer(svc);
- RETURN(0);
+ RETURN(0);
}
/* We're close to a timeout, and we don't know how much longer the
CDEBUG(D_ADAPTTO, "timeout in %+ds, asking for %d secs on %d early "
"replies\n", first, at_extra, counter);
-
+
if (first < 0) {
/* We're already past request deadlines before we even get a
chance to send early replies */
rc = ptlrpc_check_req(req);
class_export_put(req->rq_export);
req->rq_export = NULL;
- if (rc)
+ if (rc)
goto err_req;
}
/* req_in handling should/must be fast */
- if (cfs_time_current_sec() - req->rq_arrival_time.tv_sec > 5)
+ if (cfs_time_current_sec() - req->rq_arrival_time.tv_sec > 5)
DEBUG_REQ(D_WARNING, req, "Slow req_in handling %lus",
cfs_time_current_sec() - req->rq_arrival_time.tv_sec);
/* Set rpc server deadline and add it to the timed list */
deadline = (lustre_msghdr_get_flags(req->rq_reqmsg) &
- MSGHDR_AT_SUPPORT) ?
+ MSGHDR_AT_SUPPORT) ?
/* The max time the client expects us to take */
lustre_msg_get_timeout(req->rq_reqmsg) : obd_timeout;
req->rq_deadline = req->rq_arrival_time.tv_sec + deadline;
DEBUG_REQ(D_ERROR, req, "Dropping request with 0 timeout");
goto err_req;
}
-
+
ptlrpc_at_add_timed(req);
/* Move it over to the request processing queue */
svc->srv_n_difficult_replies != 0 &&
#endif
svc->srv_n_active_reqs >= (svc->srv_threads_running - 1))) {
- /* Don't handle regular requests in the last thread, in order * remain free to handle any 'difficult' replies (that might
+ /* Don't handle regular requests in the last thread, in order
* to handle difficult replies (which might block other threads)
* as well as handle any incoming reqs, early replies, etc.
* That means we always need at least 2 service threads. */
RETURN(0);
}
- request = list_entry (svc->srv_request_queue.next,
- struct ptlrpc_request, rq_list);
+ request = list_entry(svc->srv_request_queue.next,
+ struct ptlrpc_request, rq_list);
list_del_init (&request->rq_list);
svc->srv_n_queued_reqs--;
svc->srv_n_active_reqs++;
lprocfs_counter_add(svc->srv_stats, PTLRPC_TIMEOUT,
at_get(&svc->srv_at_estimate));
}
-
+
CDEBUG(D_NET, "got req "LPD64"\n", request->rq_xid);
-
+
request->rq_svc_thread = thread;
request->rq_export = class_conn2export(
lustre_msg_get_handle(request->rq_reqmsg));
export = class_export_rpc_get(request->rq_export);
}
- /* Discard requests queued for longer than the deadline.
+ /* Discard requests queued for longer than the deadline.
The deadline is increased if we send an early reply. */
if (cfs_time_current_sec() > request->rq_deadline) {
DEBUG_REQ(D_ERROR, request, "Dropping timed-out request from %s"
OBD_FAIL_TIMEOUT_MS(OBD_FAIL_PTLRPC_PAUSE_REQ, obd_fail_val);
rc = svc->srv_handler(request);
-
+
request->rq_phase = RQ_PHASE_COMPLETE;
CDEBUG(D_RPCTRACE, "Handled RPC pname:cluuid+ref:pid:xid:nid:opc "
cfs_timeval_sub(&work_end, &request->rq_arrival_time, NULL),
request->rq_repmsg ? lustre_msg_get_transno(request->rq_repmsg) :
request->rq_transno, request->rq_status,
- request->rq_repmsg ? lustre_msg_get_status(request->rq_repmsg):
+ request->rq_repmsg ? lustre_msg_get_status(request->rq_repmsg):
-999);
if (svc->srv_stats != NULL) {
__u32 op = lustre_msg_get_opc(request->rq_reqmsg);
}
}
if (request->rq_early_count) {
- DEBUG_REQ(D_ADAPTTO, request,
+ DEBUG_REQ(D_ADAPTTO, request,
"sent %d early replies before finishing in %lds",
request->rq_early_count,
work_end.tv_sec - request->rq_arrival_time.tv_sec);
/* Disengage from notifiers carefully (lock order - irqrestore below!)*/
spin_unlock(&svc->srv_lock);
- spin_lock (&obd->obd_uncommitted_replies_lock);
+ spin_lock (&exp->exp_uncommitted_replies_lock);
/* Noop if removed already */
list_del_init (&rs->rs_obd_list);
- spin_unlock (&obd->obd_uncommitted_replies_lock);
+ spin_unlock (&exp->exp_uncommitted_replies_lock);
spin_lock (&exp->exp_lock);
/* Noop if removed already */
/* If we see this, we should already have seen the warning
* in mds_steal_ack_locks() */
CWARN("All locks stolen from rs %p x"LPD64".t"LPD64
- " o%d NID %s\n",
- rs,
- rs->rs_xid, rs->rs_transno,
+ " o%d NID %s\n", rs, rs->rs_xid, rs->rs_transno,
lustre_msg_get_opc(rs->rs_msg),
libcfs_nid2str(exp->exp_connection->c_peer.nid));
}
&lwi);
lc_watchdog_touch_ms(watchdog, max_t(int, obd_timeout,
- AT_OFF ? 0 :
- at_get(&svc->srv_at_estimate)) *
+ AT_OFF ? 0 :
+ at_get(&svc->srv_at_estimate)) *
svc->srv_watchdog_factor);
ptlrpc_check_rqbd_pool(svc);
if (!list_empty(&svc->srv_req_in_queue)) {
/* Process all incoming reqs before handling any */
ptlrpc_server_handle_req_in(svc);
- /* but limit ourselves in case of flood */
+ /* but limit ourselves in case of flood */
if (counter++ < 1000)
continue;
counter = 0;
}
- if (svc->srv_at_check)
+ if (svc->srv_at_check)
ptlrpc_at_check_timed(svc);
/* don't handle requests in the last thread */
/* We require 2 threads min - see note in
* ptlrpc_server_handle_request() */
+
LASSERT(svc->srv_threads_min >= 2);
for (i = 0; i < svc->srv_threads_min; i++) {
rc = ptlrpc_start_thread(dev, svc);
d.thread = thread;
CDEBUG(D_RPCTRACE, "starting thread '%s'\n", name);
-
+
/* CLONE_VM and CLONE_FILES just avoid a needless copy, because we
* just drop the VM and FILES in ptlrpc_daemonize() right away.
*/
* its 'unlink' flag set for each posted rqbd */
list_for_each(tmp, &service->srv_active_rqbds) {
struct ptlrpc_request_buffer_desc *rqbd =
- list_entry(tmp, struct ptlrpc_request_buffer_desc,
+ list_entry(tmp, struct ptlrpc_request_buffer_desc,
rqbd_list);
rc = LNetMDUnlink(rqbd->rqbd_md_h);
spin_unlock(&svc->srv_lock);
return 0;
}
-
+
/* How long has the next entry been waiting? */
request = list_entry(svc->srv_request_queue.next,
struct ptlrpc_request, rq_list);