}
/*
+ * We will have percpt request history list for ptlrpc service in upcoming
+ * patches because we don't want to be serialized by current per-service
+ * history operations. So we require history ID can (somehow) show arriving
+ * order w/o grabbing global lock, and user can sort them in userspace.
+ *
+ * This is how we generate history ID for ptlrpc_request:
+ * ----------------------------------------------------
+ * | 32 bits | 16 bits | (16 - X)bits | X bits |
+ * ----------------------------------------------------
+ * | seconds | usec / 16 | sequence | CPT id |
+ * ----------------------------------------------------
+ *
+ * it might not be precise but should be good enough.
+ */
+#define REQS_ALL_BITS(svcpt) ((int)(sizeof((svcpt)->scp_hist_seq) * 8))
+#define REQS_SEC_BITS 32
+#define REQS_USEC_BITS 16
+/* will be replaced by bits for total service partition number soon */
+#define REQS_CPT_BITS(svcpt) 0
+#define REQS_SEQ_BITS(svcpt) (REQS_ALL_BITS(svcpt) - REQS_CPT_BITS(svcpt) -\
+ REQS_SEC_BITS - REQS_USEC_BITS)
+
+#define REQS_SEQ_SHIFT(svcpt) (REQS_CPT_BITS(svcpt))
+#define REQS_USEC_SHIFT(svcpt) (REQS_SEQ_SHIFT(svcpt) + REQS_SEQ_BITS(svcpt))
+#define REQS_SEC_SHIFT(svcpt) (REQS_USEC_SHIFT(svcpt) + REQS_USEC_BITS)
+
+static void ptlrpc_req_add_history(struct ptlrpc_service_part *svcpt,
+ struct ptlrpc_request *req)
+{
+ __u64 sec = req->rq_arrival_time.tv_sec;
+ __u32 usec = req->rq_arrival_time.tv_usec >> 4; /* usec / 16 */
+ __u64 new_seq;
+
+ /* set sequence ID for request and add it to history list,
+ * it must be called with hold svcpt::scp_lock */
+
+ LASSERT(REQS_SEQ_BITS(svcpt) > 0);
+
+ new_seq = (sec << REQS_SEC_SHIFT(svcpt)) |
+ (usec << REQS_USEC_SHIFT(svcpt)) | svcpt->scp_cpt;
+ if (new_seq > svcpt->scp_hist_seq) {
+ /* This handles the initial case of scp_hist_seq == 0 or
+ * we just jumped into a new time window */
+ svcpt->scp_hist_seq = new_seq;
+ } else {
+ /* NB: increase sequence number in current usec bucket,
+ * however, it's possible that we used up all bits for
+ * sequence and jumped into the next usec bucket (future time),
+ * then we hope there will be less RPCs per bucket at some
+ * point, and sequence will catch up again */
+ svcpt->scp_hist_seq += (1U << REQS_CPT_BITS(svcpt));
+ new_seq = svcpt->scp_hist_seq;
+ }
+
+ req->rq_history_seq = new_seq;
+
+ cfs_list_add_tail(&req->rq_history_list, &svcpt->scp_hist_reqs);
+}
+
+/*
* Server's incoming request callback
*/
void request_in_callback(lnet_event_t *ev)
{
- struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
- struct ptlrpc_request_buffer_desc *rqbd = cbid->cbid_arg;
- struct ptlrpc_service *service = rqbd->rqbd_service;
+ struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
+ struct ptlrpc_request_buffer_desc *rqbd = cbid->cbid_arg;
+ struct ptlrpc_service_part *svcpt = rqbd->rqbd_svcpt;
+ struct ptlrpc_service *service = svcpt->scp_service;
struct ptlrpc_request *req;
ENTRY;
CDEBUG(D_RPCTRACE, "peer: %s\n", libcfs_id2str(req->rq_peer));
- cfs_spin_lock(&service->srv_lock);
+ cfs_spin_lock(&svcpt->scp_lock);
- req->rq_history_seq = service->srv_request_seq++;
- cfs_list_add_tail(&req->rq_history_list, &service->srv_request_history);
+ ptlrpc_req_add_history(svcpt, req);
- if (ev->unlinked) {
- service->srv_nrqbd_receiving--;
- CDEBUG(D_INFO, "Buffer complete: %d buffers still posted\n",
- service->srv_nrqbd_receiving);
-
- /* Normally, don't complain about 0 buffers posted; LNET won't
- * drop incoming reqs since we set the portal lazy */
- if (test_req_buffer_pressure &&
- ev->type != LNET_EVENT_UNLINK &&
- service->srv_nrqbd_receiving == 0)
+ if (ev->unlinked) {
+ svcpt->scp_nrqbds_posted--;
+ CDEBUG(D_INFO, "Buffer complete: %d buffers still posted\n",
+ svcpt->scp_nrqbds_posted);
+
+ /* Normally, don't complain about 0 buffers posted; LNET won't
+ * drop incoming reqs since we set the portal lazy */
+ if (test_req_buffer_pressure &&
+ ev->type != LNET_EVENT_UNLINK &&
+ svcpt->scp_nrqbds_posted == 0)
CWARN("All %s request buffers busy\n",
service->srv_name);
rqbd->rqbd_refcount++;
}
- cfs_list_add_tail(&req->rq_list, &service->srv_req_in_queue);
- service->srv_n_queued_reqs++;
+ cfs_list_add_tail(&req->rq_list, &svcpt->scp_req_incoming);
+ svcpt->scp_nreqs_incoming++;
- /* NB everything can disappear under us once the request
- * has been queued and we unlock, so do the wake now... */
- cfs_waitq_signal(&service->srv_waitq);
+ /* NB everything can disappear under us once the request
+ * has been queued and we unlock, so do the wake now... */
+ cfs_waitq_signal(&svcpt->scp_waitq);
- cfs_spin_unlock(&service->srv_lock);
- EXIT;
+ cfs_spin_unlock(&svcpt->scp_lock);
+ EXIT;
}
/*
*/
void reply_out_callback(lnet_event_t *ev)
{
- struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
- struct ptlrpc_reply_state *rs = cbid->cbid_arg;
- struct ptlrpc_service *svc = rs->rs_service;
+ struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
+ struct ptlrpc_reply_state *rs = cbid->cbid_arg;
+ struct ptlrpc_service_part *svcpt = rs->rs_svcpt;
ENTRY;
LASSERT (ev->type == LNET_EVENT_SEND ||
if (ev->unlinked) {
/* Last network callback. The net's ref on 'rs' stays put
* until ptlrpc_handle_rs() is done with it */
- cfs_spin_lock(&svc->srv_rs_lock);
- cfs_spin_lock(&rs->rs_lock);
- rs->rs_on_net = 0;
- if (!rs->rs_no_ack ||
- rs->rs_transno <= rs->rs_export->exp_obd->obd_last_committed)
- ptlrpc_schedule_difficult_reply (rs);
- cfs_spin_unlock(&rs->rs_lock);
- cfs_spin_unlock(&svc->srv_rs_lock);
- }
-
- EXIT;
+ cfs_spin_lock(&svcpt->scp_rep_lock);
+ cfs_spin_lock(&rs->rs_lock);
+
+ rs->rs_on_net = 0;
+ if (!rs->rs_no_ack ||
+ rs->rs_transno <=
+ rs->rs_export->exp_obd->obd_last_committed)
+ ptlrpc_schedule_difficult_reply(rs);
+
+ cfs_spin_unlock(&rs->rs_lock);
+ cfs_spin_unlock(&svcpt->scp_rep_lock);
+ }
+ EXIT;
}
#ifdef HAVE_SERVER_SUPPORT