int test_req_buffer_pressure = 0;
CFS_MODULE_PARM(test_req_buffer_pressure, "i", int, 0444,
"set non-zero to put pressure on request buffer pools");
-unsigned int at_min = 0;
CFS_MODULE_PARM(at_min, "i", int, 0644,
"Adaptive timeout minimum (sec)");
-unsigned int at_max = 600;
-EXPORT_SYMBOL(at_max);
CFS_MODULE_PARM(at_max, "i", int, 0644,
"Adaptive timeout maximum (sec)");
-unsigned int at_history = 600;
CFS_MODULE_PARM(at_history, "i", int, 0644,
"Adaptive timeouts remember the slowest event that took place "
"within this period (sec)");
-static int at_early_margin = 5;
CFS_MODULE_PARM(at_early_margin, "i", int, 0644,
"How soon before an RPC deadline to send an early reply");
-static int at_extra = 30;
CFS_MODULE_PARM(at_extra, "i", int, 0644,
"How much extra time to give with each early reply");
}
void
-ptlrpc_save_lock (struct ptlrpc_request *req,
- struct lustre_handle *lock, int mode, int no_ack)
+ptlrpc_save_lock(struct ptlrpc_request *req,
+ struct lustre_handle *lock, int mode, int no_ack)
{
struct ptlrpc_reply_state *rs = req->rq_reply_state;
int idx;
LASSERT(rs != NULL);
LASSERT(rs->rs_nlocks < RS_MAX_LOCKS);
- idx = rs->rs_nlocks++;
- rs->rs_locks[idx] = *lock;
- rs->rs_modes[idx] = mode;
- rs->rs_difficult = 1;
- rs->rs_no_ack = !!no_ack;
+ if (req->rq_export->exp_disconnected) {
+ ldlm_lock_decref(lock, mode);
+ } else {
+ idx = rs->rs_nlocks++;
+ rs->rs_locks[idx] = *lock;
+ rs->rs_modes[idx] = mode;
+ rs->rs_difficult = 1;
+ rs->rs_no_ack = !!no_ack;
+ }
}
#ifdef __KERNEL__
/**
* Dispatch all replies accumulated in the batch to one from
- * dedicated reply handing threads.
+ * dedicated reply handling threads.
*
* \param b batch
*/
rs->rs_scheduled = 1;
b->rsb_n_replies++;
}
+ rs->rs_committed = 1;
spin_unlock(&rs->rs_lock);
}
EXIT;
}
-void
-ptlrpc_commit_replies (struct obd_device *obd)
+void ptlrpc_commit_replies(struct obd_export *exp)
{
- struct list_head *tmp;
- struct list_head *nxt;
+ struct ptlrpc_reply_state *rs, *nxt;
DECLARE_RS_BATCH(batch);
ENTRY;
* to attend to complete them. */
/* CAVEAT EMPTOR: spinlock ordering!!! */
- spin_lock(&obd->obd_uncommitted_replies_lock);
- list_for_each_safe (tmp, nxt, &obd->obd_uncommitted_replies) {
- struct ptlrpc_reply_state *rs =
- list_entry(tmp, struct ptlrpc_reply_state, rs_obd_list);
-
+ spin_lock(&exp->exp_uncommitted_replies_lock);
+ list_for_each_entry_safe(rs, nxt, &exp->exp_uncommitted_replies,
+ rs_obd_list) {
LASSERT (rs->rs_difficult);
-
- if (rs->rs_transno <= obd->obd_last_committed) {
+ /* VBR: per-export last_committed */
+ LASSERT(rs->rs_export);
+ if (rs->rs_transno <= exp->exp_last_committed) {
list_del_init(&rs->rs_obd_list);
rs_batch_add(&batch, rs);
}
}
- spin_unlock(&obd->obd_uncommitted_replies_lock);
+ spin_unlock(&exp->exp_uncommitted_replies_lock);
rs_batch_fini(&batch);
EXIT;
}
char *threadname, __u32 ctx_tags,
svc_hpreq_handler_t hp_handler)
{
- int rc;
- struct ptlrpc_service *service;
+ int rc;
+ struct ptlrpc_at_array *array;
+ struct ptlrpc_service *service;
+ unsigned int size, index;
ENTRY;
LASSERT (nbufs > 0);
spin_lock_init(&service->srv_at_lock);
CFS_INIT_LIST_HEAD(&service->srv_req_in_queue);
- CFS_INIT_LIST_HEAD(&service->srv_at_list);
+
+ array = &service->srv_at_array;
+ size = at_est2timeout(at_max);
+ array->paa_size = size;
+ array->paa_count = 0;
+ array->paa_deadline = -1;
+
+ /* allocate memory for srv_at_array (ptlrpc_at_array) */
+ OBD_ALLOC(array->paa_reqs_array, sizeof(struct list_head) * size);
+ if (array->paa_reqs_array == NULL)
+ GOTO(failed, NULL);
+
+ for (index = 0; index < size; index++)
+ CFS_INIT_LIST_HEAD(&array->paa_reqs_array[index]);
+
+ OBD_ALLOC(array->paa_reqs_count, sizeof(__u32) * size);
+ if (array->paa_reqs_count == NULL)
+ GOTO(failed, NULL);
+
cfs_timer_init(&service->srv_at_timer, ptlrpc_at_timer, service);
/* At SOW, service time should be quick; 10s seems generous. If client
timeout is less than this, we'll be sending an early reply. */
* drop a reference count of the request. if it reaches 0, we either
* put it into history list, or free it immediately.
*/
-static void ptlrpc_server_drop_request(struct ptlrpc_request *req)
+void ptlrpc_server_drop_request(struct ptlrpc_request *req)
{
struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd;
struct ptlrpc_service *svc = rqbd->rqbd_service;
if (!atomic_dec_and_test(&req->rq_refcount))
return;
+ spin_lock(&svc->srv_at_lock);
+ if (req->rq_at_linked) {
+ struct ptlrpc_at_array *array = &svc->srv_at_array;
+ __u32 index = req->rq_at_index;
+
+ LASSERT(!list_empty(&req->rq_timed_list));
+ list_del_init(&req->rq_timed_list);
+ req->rq_at_linked = 0;
+ array->paa_reqs_count[index]--;
+ array->paa_count--;
+ } else
+ LASSERT(list_empty(&req->rq_timed_list));
+ spin_unlock(&svc->srv_at_lock);
+
+ /* finalize request */
+ if (req->rq_export) {
+ class_export_put(req->rq_export);
+ req->rq_export = NULL;
+ }
+
spin_lock(&svc->srv_lock);
svc->srv_n_active_reqs--;
*/
static void ptlrpc_server_finish_request(struct ptlrpc_request *req)
{
- struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service;
-
- if (req->rq_export) {
- class_export_put(req->rq_export);
- req->rq_export = NULL;
- }
-
- if (req->rq_phase != RQ_PHASE_NEW) /* incorrect message magic */
- DEBUG_REQ(D_INFO, req, "free req");
-
- spin_lock(&svc->srv_at_lock);
- req->rq_sent_final = 1;
- list_del_init(&req->rq_timed_list);
- spin_unlock(&svc->srv_at_lock);
-
ptlrpc_server_drop_request(req);
}
exp->exp_obd->obd_eviction_timer =
cfs_time_current_sec() + 3 * PING_INTERVAL;
CDEBUG(D_HA, "%s: Think about evicting %s from "CFS_TIME_T"\n",
- exp->exp_obd->obd_name, obd_export_nid2str(exp),
- oldest_time);
+ exp->exp_obd->obd_name,
+ obd_export_nid2str(oldest_exp), oldest_time);
}
} else {
if (cfs_time_current_sec() >
static void ptlrpc_at_set_timer(struct ptlrpc_service *svc)
{
- struct ptlrpc_request *rq;
+ struct ptlrpc_at_array *array = &svc->srv_at_array;
__s32 next;
spin_lock(&svc->srv_at_lock);
- if (list_empty(&svc->srv_at_list)) {
+ if (array->paa_count == 0) {
cfs_timer_disarm(&svc->srv_at_timer);
spin_unlock(&svc->srv_at_lock);
return;
}
/* Set timer for closest deadline */
- rq = list_entry(svc->srv_at_list.next, struct ptlrpc_request,
- rq_timed_list);
- next = (__s32)(rq->rq_deadline - cfs_time_current_sec() -
+ next = (__s32)(array->paa_deadline - cfs_time_current_sec() -
at_early_margin);
if (next <= 0)
ptlrpc_at_timer((unsigned long)svc);
static int ptlrpc_at_add_timed(struct ptlrpc_request *req)
{
struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service;
- struct ptlrpc_request *rq;
+ struct ptlrpc_request *rq = NULL;
+ struct ptlrpc_at_array *array = &svc->srv_at_array;
+ __u32 index;
int found = 0;
if (AT_OFF)
return(-ENOSYS);
spin_lock(&svc->srv_at_lock);
-
- if (unlikely(req->rq_sent_final)) {
- spin_unlock(&svc->srv_at_lock);
- return 0;
- }
-
LASSERT(list_empty(&req->rq_timed_list));
- /* Add to sorted list. Presumably latest rpcs will have the latest
- deadlines, so search backward. */
- list_for_each_entry_reverse(rq, &svc->srv_at_list, rq_timed_list) {
- if (req->rq_deadline >= rq->rq_deadline) {
- list_add(&req->rq_timed_list, &rq->rq_timed_list);
- found++;
- break;
+
+ index = (unsigned long)req->rq_deadline % array->paa_size;
+ if (array->paa_reqs_count[index] > 0) {
+ /* latest rpcs will have the latest deadlines in the list,
+ * so search backward. */
+ list_for_each_entry_reverse(rq, &array->paa_reqs_array[index],
+ rq_timed_list) {
+ if (req->rq_deadline >= rq->rq_deadline) {
+ list_add(&req->rq_timed_list,
+ &rq->rq_timed_list);
+ break;
+ }
}
}
- if (!found)
- /* Add to front if shortest deadline or list empty */
- list_add(&req->rq_timed_list, &svc->srv_at_list);
- /* Check if we're the head of the list */
- found = (svc->srv_at_list.next == &req->rq_timed_list);
+ /* Add the request at the head of the list */
+ if (list_empty(&req->rq_timed_list))
+ list_add(&req->rq_timed_list, &array->paa_reqs_array[index]);
+ req->rq_at_linked = 1;
+ req->rq_at_index = index;
+ array->paa_reqs_count[index]++;
+ array->paa_count++;
+ if (array->paa_count == 1 || array->paa_deadline > req->rq_deadline) {
+ array->paa_deadline = req->rq_deadline;
+ found = 1;
+ }
spin_unlock(&svc->srv_at_lock);
if (found)
return 0;
}
-static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req,
- int extra_time)
+static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req)
{
struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service;
struct ptlrpc_request *reqcopy;
"%ssending early reply (deadline %+lds, margin %+lds) for "
"%d+%d", AT_OFF ? "AT off - not " : "",
olddl, olddl - at_get(&svc->srv_at_estimate),
- at_get(&svc->srv_at_estimate), extra_time);
+ at_get(&svc->srv_at_estimate), at_extra);
if (AT_OFF)
RETURN(0);
RETURN(-ENOSYS);
}
- if (req->rq_export && req->rq_export->exp_in_recovery) {
- /* don't increase server estimates during recovery, and give
- clients the full recovery time. */
- newdl = cfs_time_current_sec() +
- req->rq_export->exp_obd->obd_recovery_timeout;
+ if (req->rq_export &&
+ lustre_msg_get_flags(req->rq_reqmsg) &
+ (MSG_REPLAY | MSG_REQ_REPLAY_DONE | MSG_LOCK_REPLAY_DONE)) {
+ /**
+ * Use at_extra as early reply period for recovery requests but
+ * make sure it is not bigger than recovery time / 4
+ */
+ at_add(&svc->srv_at_estimate,
+ min(at_extra,
+ req->rq_export->exp_obd->obd_recovery_timeout / 4));
} else {
- if (extra_time) {
- /* Fake our processing time into the future to ask the
- clients for some extra amount of time */
- extra_time += cfs_time_current_sec() -
- req->rq_arrival_time.tv_sec;
- at_add(&svc->srv_at_estimate, extra_time);
- }
- newdl = req->rq_arrival_time.tv_sec +
- at_get(&svc->srv_at_estimate);
+ /* Fake our processing time into the future to ask the clients
+ * for some extra amount of time */
+ at_add(&svc->srv_at_estimate, at_extra);
}
+
+ newdl = cfs_time_current_sec() + at_get(&svc->srv_at_estimate);
if (req->rq_deadline >= newdl) {
/* We're not adding any time, no need to send an early reply
(e.g. maybe at adaptive_max) */
reqcopy->rq_reqmsg = reqmsg;
memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen);
- if (req->rq_sent_final) {
+ LASSERT(atomic_read(&req->rq_refcount));
+ /** if it is last refcount then early reply isn't needed */
+ if (atomic_read(&req->rq_refcount) == 1) {
DEBUG_REQ(D_ADAPTTO, reqcopy, "Normal reply already sent out, "
"abort sending early reply\n");
- GOTO(out, rc = 0);
+ GOTO(out, rc = -EINVAL);
}
/* Connection ref */
{
struct ptlrpc_request *rq, *n;
struct list_head work_list;
+ struct ptlrpc_at_array *array = &svc->srv_at_array;
+ __u32 index, count;
+ time_t deadline;
time_t now = cfs_time_current_sec();
cfs_duration_t delay;
int first, counter = 0;
delay = cfs_time_sub(cfs_time_current(), svc->srv_at_checktime);
svc->srv_at_check = 0;
- if (list_empty(&svc->srv_at_list)) {
+ if (array->paa_count == 0) {
spin_unlock(&svc->srv_at_lock);
RETURN(0);
}
/* The timer went off, but maybe the nearest rpc already completed. */
- rq = list_entry(svc->srv_at_list.next, struct ptlrpc_request,
- rq_timed_list);
- first = (int)(rq->rq_deadline - now);
+ first = array->paa_deadline - now;
if (first > at_early_margin) {
/* We've still got plenty of time. Reset the timer. */
spin_unlock(&svc->srv_at_lock);
/* We're close to a timeout, and we don't know how much longer the
server will take. Send early replies to everyone expiring soon. */
CFS_INIT_LIST_HEAD(&work_list);
- list_for_each_entry_safe(rq, n, &svc->srv_at_list, rq_timed_list) {
- if (rq->rq_deadline <= now + at_early_margin) {
- list_move_tail(&rq->rq_timed_list, &work_list);
- counter++;
- } else {
+ deadline = -1;
+ index = (unsigned long)array->paa_deadline % array->paa_size;
+ count = array->paa_count;
+ while (count > 0) {
+ count -= array->paa_reqs_count[index];
+ list_for_each_entry_safe(rq, n, &array->paa_reqs_array[index],
+ rq_timed_list) {
+ if (rq->rq_deadline <= now + at_early_margin) {
+ list_del_init(&rq->rq_timed_list);
+ /**
+ * ptlrpc_server_drop_request() may drop
+ * refcount to 0 already. Let's check this and
+ * don't add entry to work_list
+ */
+ if (likely(atomic_inc_not_zero(&rq->rq_refcount)))
+ list_add(&rq->rq_timed_list, &work_list);
+ counter++;
+ array->paa_reqs_count[index]--;
+ array->paa_count--;
+ rq->rq_at_linked = 0;
+ continue;
+ }
+
+ /* update the earliest deadline */
+ if (deadline == -1 || rq->rq_deadline < deadline)
+ deadline = rq->rq_deadline;
+
break;
}
- }
+ if (++index >= array->paa_size)
+ index = 0;
+ }
+ array->paa_deadline = deadline;
spin_unlock(&svc->srv_at_lock);
/* we have a new earliest deadline, restart the timer */
at_get(&svc->srv_at_estimate), delay);
}
- /* ptlrpc_server_finish_request may delete an entry out of
- * the work list */
- spin_lock(&svc->srv_at_lock);
+ /* we took additional refcount so entries can't be deleted from list, no
+ * locking is needed */
while (!list_empty(&work_list)) {
rq = list_entry(work_list.next, struct ptlrpc_request,
rq_timed_list);
list_del_init(&rq->rq_timed_list);
- /* if the entry is still in the worklist, it hasn't been
- deleted, and is safe to take a ref to keep the req around */
- atomic_inc(&rq->rq_refcount);
- spin_unlock(&svc->srv_at_lock);
- if (ptlrpc_at_send_early_reply(rq, at_extra) == 0)
+ if (ptlrpc_at_send_early_reply(rq) == 0)
ptlrpc_at_add_timed(rq);
ptlrpc_server_drop_request(rq);
- spin_lock(&svc->srv_at_lock);
}
- spin_unlock(&svc->srv_at_lock);
RETURN(0);
}
EXIT;
}
-/** Check if the request if a high priority one. */
+/** Check if the request is a high priority one. */
static int ptlrpc_server_hpreq_check(struct ptlrpc_request *req)
{
int opc, rc = 0;
LBUG();
}
- /* Clear request swab mask; this is a new request */
- req->rq_req_swab_mask = 0;
-
- rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
- if (rc != 0) {
- CERROR("error unpacking request: ptl %d from %s x"LPU64"\n",
- svc->srv_req_portal, libcfs_id2str(req->rq_peer),
- req->rq_xid);
- goto err_req;
+ /*
+ * for null-flavored rpc, msg has been unpacked by sptlrpc, although
+ * redo it wouldn't be harmful.
+ */
+ if (SPTLRPC_FLVR_POLICY(req->rq_flvr.sf_rpc) != SPTLRPC_POLICY_NULL) {
+ rc = ptlrpc_unpack_req_msg(req, req->rq_reqlen);
+ if (rc != 0) {
+ CERROR("error unpacking request: ptl %d from %s "
+ "x"LPU64"\n", svc->srv_req_portal,
+ libcfs_id2str(req->rq_peer), req->rq_xid);
+ goto err_req;
+ }
}
rc = lustre_unpack_req_ptlrpc_body(req, MSG_PTLRPC_BODY_OFF);
goto err_req;
}
+ if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_DROP_REQ_OPC) &&
+ lustre_msg_get_opc(req->rq_reqmsg) == obd_fail_val) {
+ CERROR("drop incoming rpc opc %u, x"LPU64"\n",
+ obd_fail_val, req->rq_xid);
+ goto err_req;
+ }
+
rc = -EINVAL;
if (lustre_msg_get_type(req->rq_reqmsg) != PTL_RPC_MSG_REQUEST) {
CERROR("wrong packet type received (type=%u) from %s\n",
goto err_req;
}
- CDEBUG(D_NET, "got req "LPD64"\n", req->rq_xid);
+ switch(lustre_msg_get_opc(req->rq_reqmsg)) {
+ case MDS_WRITEPAGE:
+ case OST_WRITE:
+ req->rq_bulk_write = 1;
+ break;
+ case MDS_READPAGE:
+ case OST_READ:
+ req->rq_bulk_read = 1;
+ break;
+ }
+
+ CDEBUG(D_NET, "got req "LPU64"\n", req->rq_xid);
req->rq_export = class_conn2export(
lustre_msg_get_handle(req->rq_reqmsg));
libcfs_id2str(request->rq_peer),
lustre_msg_get_opc(request->rq_reqmsg));
- OBD_FAIL_TIMEOUT_MS(OBD_FAIL_PTLRPC_PAUSE_REQ, obd_fail_val);
+ if (lustre_msg_get_opc(request->rq_reqmsg) != OBD_PING)
+ OBD_FAIL_TIMEOUT_MS(OBD_FAIL_PTLRPC_PAUSE_REQ, obd_fail_val);
rc = svc->srv_handler(request);
ptlrpc_rqphase_move(request, RQ_PHASE_COMPLETE);
- CDEBUG(D_RPCTRACE, "Handled RPC pname:cluuid+ref:pid:xid:nid:opc "
- "%s:%s+%d:%d:x"LPU64":%s:%d\n", cfs_curproc_comm(),
- (request->rq_export ?
- (char *)request->rq_export->exp_client_uuid.uuid : "0"),
- (request->rq_export ?
- atomic_read(&request->rq_export->exp_refcount) : -99),
- lustre_msg_get_status(request->rq_reqmsg), request->rq_xid,
- libcfs_id2str(request->rq_peer),
- lustre_msg_get_opc(request->rq_reqmsg));
-
put_rpc_export:
if (export != NULL)
class_export_rpc_put(export);
do_gettimeofday(&work_end);
timediff = cfs_timeval_sub(&work_end, &work_start, NULL);
- CDEBUG(D_RPCTRACE, "request x"LPU64" opc %u from %s processed in "
- "%ldus (%ldus total) trans "LPU64" rc %d/%d\n",
- request->rq_xid, lustre_msg_get_opc(request->rq_reqmsg),
- libcfs_id2str(request->rq_peer), timediff,
- cfs_timeval_sub(&work_end, &request->rq_arrival_time, NULL),
- request->rq_repmsg ? lustre_msg_get_transno(request->rq_repmsg) :
- request->rq_transno, request->rq_status,
- request->rq_repmsg ? lustre_msg_get_status(request->rq_repmsg):
- -999);
+ CDEBUG(D_RPCTRACE, "Handled RPC pname:cluuid+ref:pid:xid:nid:opc "
+ "%s:%s+%d:%d:x"LPU64":%s:%d Request procesed in "
+ "%lds (%lds total) trans "LPU64" rc %d/%d\n",
+ cfs_curproc_comm(),
+ (request->rq_export ?
+ (char *)request->rq_export->exp_client_uuid.uuid : "0"),
+ (request->rq_export ?
+ atomic_read(&request->rq_export->exp_refcount) : -99),
+ lustre_msg_get_status(request->rq_reqmsg),
+ request->rq_xid,
+ libcfs_id2str(request->rq_peer),
+ lustre_msg_get_opc(request->rq_reqmsg),
+ timediff,
+ cfs_timeval_sub(&work_end, &request->rq_arrival_time, NULL),
+ (request->rq_repmsg ?
+ lustre_msg_get_transno(request->rq_repmsg) :
+ request->rq_transno),
+ request->rq_status,
+ (request->rq_repmsg ?
+ lustre_msg_get_status(request->rq_repmsg) : -999));
if (likely(svc->srv_stats != NULL && request->rq_reqmsg != NULL)) {
__u32 op = lustre_msg_get_opc(request->rq_reqmsg);
int opc = opcode_offset(op);
list_del_init (&rs->rs_exp_list);
spin_unlock (&exp->exp_lock);
- /* Avoid obd_uncommitted_replies_lock contention if we 100% sure that
- * rs has been removed from the list already */
- if (!list_empty_careful(&rs->rs_obd_list)) {
- spin_lock(&obd->obd_uncommitted_replies_lock);
+ /* The disk commit callback holds exp_uncommitted_replies_lock while it
+ * iterates over newly committed replies, removing them from
+ * exp_uncommitted_replies. It then drops this lock and schedules the
+ * replies it found for handling here.
+ *
+ * We can avoid contention for exp_uncommitted_replies_lock between the
+ * HRT threads and further commit callbacks by checking rs_committed
+ * which is set in the commit callback while it holds both
+ * rs_lock and exp_uncommitted_reples.
+ *
+ * If we see rs_committed clear, the commit callback _may_ not have
+ * handled this reply yet and we race with it to grab
+ * exp_uncommitted_replies_lock before removing the reply from
+ * exp_uncommitted_replies. Note that if we lose the race and the
+ * reply has already been removed, list_del_init() is a noop.
+ *
+ * If we see rs_committed set, we know the commit callback is handling,
+ * or has handled this reply since store reordering might allow us to
+ * see rs_committed set out of sequence. But since this is done
+ * holding rs_lock, we can be sure it has all completed once we hold
+ * rs_lock, which we do right next.
+ */
+ if (!rs->rs_committed) {
+ spin_lock(&exp->exp_uncommitted_replies_lock);
list_del_init(&rs->rs_obd_list);
- spin_unlock(&obd->obd_uncommitted_replies_lock);
+ spin_unlock(&exp->exp_uncommitted_replies_lock);
}
spin_lock(&rs->rs_lock);
CWARN("All locks stolen from rs %p x"LPD64".t"LPD64
" o%d NID %s\n",
rs,
- rs->rs_xid, rs->rs_transno,
- lustre_msg_get_opc(rs->rs_msg),
+ rs->rs_xid, rs->rs_transno, rs->rs_opc,
libcfs_nid2str(exp->exp_connection->c_peer.nid));
}
* and process it.
*
* \param svc a ptlrpc service
- * \retval 0 no replies processes
+ * \retval 0 no replies processed
* \retval 1 one reply processed
*/
static int
#else /* __KERNEL__ */
-/* Don't use daemonize, it removes fs struct from new thread (bug 418) */
-void ptlrpc_daemonize(char *name)
-{
- struct fs_struct *fs = current->fs;
-
- atomic_inc(&fs->count);
- cfs_daemonize(name);
- exit_fs(cfs_current());
- current->fs = fs;
- ll_set_fs_pwd(current->fs, init_task.fs->pwdmnt, init_task.fs->pwd);
-}
-
static void
ptlrpc_check_rqbd_pool(struct ptlrpc_service *svc)
{
int counter = 0, rc = 0;
ENTRY;
- ptlrpc_daemonize(data->name);
+ thread->t_pid = cfs_curproc_pid();
+ cfs_daemonize_ctxt(data->name);
#if defined(HAVE_NODE_TO_CPUMASK) && defined(CONFIG_NUMA)
/* we need to do this before any per-thread allocation is done so that
goto out_srv_fini;
}
- /* Record that the thread is running */
- thread->t_flags = SVC_RUNNING;
+ spin_lock(&svc->srv_lock);
+ /* SVC_STOPPING may already be set here if someone else is trying
+ * to stop the service while this new thread has been dynamically
+ * forked. We still set SVC_RUNNING to let our creator know that
+ * we are now running, however we will exit as soon as possible */
+ thread->t_flags |= SVC_RUNNING;
+ spin_unlock(&svc->srv_lock);
+
/*
* wake up our creator. Note: @data is invalid after this point,
* because it's allocated on ptlrpc_start_thread() stack.
*/
cfs_waitq_signal(&thread->t_ctl_waitq);
- thread->t_watchdog = lc_watchdog_add(max_t(int, obd_timeout, AT_OFF ? 0 :
- at_get(&svc->srv_at_estimate))
- * svc->srv_watchdog_factor,
- NULL, NULL);
+ thread->t_watchdog = lc_watchdog_add(GET_TIMEOUT(svc), NULL, NULL);
spin_lock(&svc->srv_lock);
svc->srv_threads_running++;
/* XXX maintain a list of all managed devices: insert here */
- while ((thread->t_flags & SVC_STOPPING) == 0) {
+ while (!(thread->t_flags & SVC_STOPPING) && !svc->srv_is_stopping) {
/* Don't exit while there are replies to be handled */
struct l_wait_info lwi = LWI_TIMEOUT(svc->srv_rqbd_timeout,
ptlrpc_retry_rqbds, svc);
cond_resched();
l_wait_event_exclusive (svc->srv_waitq,
- ((thread->t_flags & SVC_STOPPING) != 0) ||
+ thread->t_flags & SVC_STOPPING ||
+ svc->srv_is_stopping ||
(!list_empty(&svc->srv_idle_rqbds) &&
svc->srv_rqbd_timeout == 0) ||
!list_empty(&svc->srv_req_in_queue) ||
svc->srv_at_check,
&lwi);
- lc_watchdog_touch_ms(thread->t_watchdog, max_t(int, obd_timeout,
- AT_OFF ? 0 :
- at_get(&svc->srv_at_estimate)) *
- svc->srv_watchdog_factor);
+ if (thread->t_flags & SVC_STOPPING || svc->srv_is_stopping)
+ break;
+
+ lc_watchdog_touch(thread->t_watchdog, GET_TIMEOUT(svc));
ptlrpc_check_rqbd_pool(svc);
- if ((svc->srv_threads_started < svc->srv_threads_max) &&
- (svc->srv_n_active_reqs >= (svc->srv_threads_started - 1))){
+ if (svc->srv_threads_started < svc->srv_threads_max &&
+ svc->srv_n_active_reqs >= (svc->srv_threads_started - 1))
/* Ignore return code - we tried... */
ptlrpc_start_thread(dev, svc);
- }
if (!list_empty(&svc->srv_req_in_queue)) {
/* Process all incoming reqs before handling any */
lu_context_fini(&env.le_ctx);
out:
- CDEBUG(D_NET, "service thread %d exiting: rc %d\n", thread->t_id, rc);
+ CDEBUG(D_RPCTRACE, "service thread [ %p : %u ] %d exiting: rc %d\n",
+ thread, thread->t_pid, thread->t_id, rc);
spin_lock(&svc->srv_lock);
svc->srv_threads_running--; /* must know immediately */
snprintf(threadname, sizeof(threadname),
"ptlrpc_hr_%d", hr_args->thread_index);
- ptlrpc_daemonize(threadname);
-#if defined(HAVE_NODE_TO_CPUMASK)
+ cfs_daemonize_ctxt(threadname);
+#if defined(CONFIG_SMP) && defined(HAVE_NODE_TO_CPUMASK)
set_cpus_allowed(cfs_current(),
node_to_cpumask(cpu_to_node(hr_args->cpu_index)));
#endif
LASSERT(hr->hr_n_threads > 0);
for (n = 0, cpu = 0; n < hr->hr_n_threads; n++) {
-#if defined(HAVE_NODE_TO_CPUMASK)
+#if defined(CONFIG_SMP) && defined(HAVE_NODE_TO_CPUMASK)
while(!cpu_online(cpu)) {
cpu++;
if (cpu >= num_possible_cpus())
struct l_wait_info lwi = { 0 };
ENTRY;
- CDEBUG(D_RPCTRACE, "Stopping thread %p\n", thread);
+ CDEBUG(D_RPCTRACE, "Stopping thread [ %p : %u ]\n",
+ thread, thread->t_pid);
+
spin_lock(&svc->srv_lock);
- thread->t_flags = SVC_STOPPING;
+ /* let the thread know that we would like it to stop asap */
+ thread->t_flags |= SVC_STOPPING;
spin_unlock(&svc->srv_lock);
cfs_waitq_broadcast(&svc->srv_waitq);
- l_wait_event(thread->t_ctl_waitq, (thread->t_flags & SVC_STOPPED),
- &lwi);
+ l_wait_event(thread->t_ctl_waitq,
+ (thread->t_flags & SVC_STOPPED), &lwi);
spin_lock(&svc->srv_lock);
list_del(&thread->t_link);
CERROR("cannot start %s thread #%d: rc %d\n",
svc->srv_thread_name, i, rc);
ptlrpc_stop_all_threads(svc);
+ break;
}
}
RETURN(rc);
CDEBUG(D_RPCTRACE, "%s started %d min %d max %d running %d\n",
svc->srv_name, svc->srv_threads_started, svc->srv_threads_min,
svc->srv_threads_max, svc->srv_threads_running);
+
+ if (unlikely(svc->srv_is_stopping))
+ RETURN(-ESRCH);
+
if (unlikely(svc->srv_threads_started >= svc->srv_threads_max) ||
(OBD_FAIL_CHECK(OBD_FAIL_TGT_TOOMANY_THREADS) &&
svc->srv_threads_started == svc->srv_threads_min - 1))
id = svc->srv_threads_started++;
spin_unlock(&svc->srv_lock);
+ thread->t_svc = svc;
thread->t_id = id;
sprintf(name, "%s_%02d", svc->srv_thread_name, id);
d.dev = dev;
CDEBUG(D_RPCTRACE, "starting thread '%s'\n", name);
- /* CLONE_VM and CLONE_FILES just avoid a needless copy, because we
- * just drop the VM and FILES in ptlrpc_daemonize() right away.
+ /* CLONE_VM and CLONE_FILES just avoid a needless copy, because we
+ * just drop the VM and FILES in cfs_daemonize_ctxt() right away.
*/
rc = cfs_kernel_thread(ptlrpc_main, &d, CLONE_VM | CLONE_FILES);
if (rc < 0) {
int n_cpus = num_online_cpus();
struct ptlrpc_hr_service *hr;
int size;
+ int rc;
ENTRY;
LASSERT(ptlrpc_hr == NULL);
hr->hr_size = size;
ptlrpc_hr = hr;
- RETURN(ptlrpc_start_hr_threads(hr));
+ rc = ptlrpc_start_hr_threads(hr);
+ if (rc) {
+ OBD_FREE(hr, hr->hr_size);
+ ptlrpc_hr = NULL;
+ }
+ RETURN(rc);
}
void ptlrpc_hr_fini(void)
struct l_wait_info lwi;
struct list_head *tmp;
struct ptlrpc_reply_state *rs, *t;
+ struct ptlrpc_at_array *array = &service->srv_at_array;
ENTRY;
service->srv_is_stopping = 1;
/* In case somebody rearmed this in the meantime */
cfs_timer_disarm(&service->srv_at_timer);
+ if (array->paa_reqs_array != NULL) {
+ OBD_FREE(array->paa_reqs_array,
+ sizeof(struct list_head) * array->paa_size);
+ array->paa_reqs_array = NULL;
+ }
+
+ if (array->paa_reqs_count != NULL) {
+ OBD_FREE(array->paa_reqs_count,
+ sizeof(__u32) * array->paa_size);
+ array->paa_reqs_count= NULL;
+ }
+
OBD_FREE_PTR(service);
RETURN(0);
}