-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
+/*
* GPL HEADER START
*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
{
struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
struct ptlrpc_bulk_desc *desc = cbid->cbid_arg;
+ struct ptlrpc_request *req;
ENTRY;
LASSERT ((desc->bd_type == BULK_PUT_SINK &&
ev->type, ev->status, desc);
cfs_spin_lock(&desc->bd_lock);
-
+ req = desc->bd_req;
LASSERT(desc->bd_network_rw);
desc->bd_network_rw = 0;
desc->bd_success = 1;
desc->bd_nob_transferred = ev->mlength;
desc->bd_sender = ev->sender;
+ } else {
+ /* start reconnect and resend if network error hit */
+ cfs_spin_lock(&req->rq_lock);
+ req->rq_net_err = 1;
+ cfs_spin_unlock(&req->rq_lock);
}
/* release the encrypted pages for write */
/* NB don't unlock till after wakeup; desc can disappear under us
* otherwise */
- ptlrpc_client_wake_req(desc->bd_req);
+ ptlrpc_client_wake_req(req);
cfs_spin_unlock(&desc->bd_lock);
EXIT;
}
/*
+ * We will have percpt request history list for ptlrpc service in upcoming
+ * patches because we don't want to be serialized by current per-service
+ * history operations. So we require history ID can (somehow) show arriving
+ * order w/o grabbing global lock, and user can sort them in userspace.
+ *
+ * This is how we generate history ID for ptlrpc_request:
+ * ----------------------------------------------------
+ * | 32 bits | 16 bits | (16 - X)bits | X bits |
+ * ----------------------------------------------------
+ * | seconds | usec / 16 | sequence | CPT id |
+ * ----------------------------------------------------
+ *
+ * it might not be precise but should be good enough.
+ */
+
+#define REQS_CPT_BITS(svcpt) ((svcpt)->scp_service->srv_cpt_bits)
+
+#define REQS_SEC_SHIFT 32
+#define REQS_USEC_SHIFT 16
+#define REQS_SEQ_SHIFT(svcpt) REQS_CPT_BITS(svcpt)
+
+static void ptlrpc_req_add_history(struct ptlrpc_service_part *svcpt,
+ struct ptlrpc_request *req)
+{
+ __u64 sec = req->rq_arrival_time.tv_sec;
+ __u32 usec = req->rq_arrival_time.tv_usec >> 4; /* usec / 16 */
+ __u64 new_seq;
+
+ /* set sequence ID for request and add it to history list,
+ * it must be called with hold svcpt::scp_lock */
+
+ new_seq = (sec << REQS_SEC_SHIFT) |
+ (usec << REQS_USEC_SHIFT) | svcpt->scp_cpt;
+ if (new_seq > svcpt->scp_hist_seq) {
+ /* This handles the initial case of scp_hist_seq == 0 or
+ * we just jumped into a new time window */
+ svcpt->scp_hist_seq = new_seq;
+ } else {
+ LASSERT(REQS_SEQ_SHIFT(svcpt) < REQS_USEC_SHIFT);
+ /* NB: increase sequence number in current usec bucket,
+ * however, it's possible that we used up all bits for
+ * sequence and jumped into the next usec bucket (future time),
+ * then we hope there will be less RPCs per bucket at some
+ * point, and sequence will catch up again */
+ svcpt->scp_hist_seq += (1U << REQS_SEQ_SHIFT(svcpt));
+ new_seq = svcpt->scp_hist_seq;
+ }
+
+ req->rq_history_seq = new_seq;
+
+ cfs_list_add_tail(&req->rq_history_list, &svcpt->scp_hist_reqs);
+}
+
+/*
* Server's incoming request callback
*/
void request_in_callback(lnet_event_t *ev)
{
- struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
- struct ptlrpc_request_buffer_desc *rqbd = cbid->cbid_arg;
- struct ptlrpc_service *service = rqbd->rqbd_service;
+ struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
+ struct ptlrpc_request_buffer_desc *rqbd = cbid->cbid_arg;
+ struct ptlrpc_service_part *svcpt = rqbd->rqbd_svcpt;
+ struct ptlrpc_service *service = svcpt->scp_service;
struct ptlrpc_request *req;
ENTRY;
req->rq_self = ev->target.nid;
req->rq_rqbd = rqbd;
req->rq_phase = RQ_PHASE_NEW;
-#ifdef CRAY_XT3
- req->rq_uid = ev->uid;
-#endif
cfs_spin_lock_init(&req->rq_lock);
CFS_INIT_LIST_HEAD(&req->rq_timed_list);
+ CFS_INIT_LIST_HEAD(&req->rq_exp_list);
cfs_atomic_set(&req->rq_refcount, 1);
if (ev->type == LNET_EVENT_PUT)
CDEBUG(D_INFO, "incoming req@%p x"LPU64" msgsize %u\n",
CDEBUG(D_RPCTRACE, "peer: %s\n", libcfs_id2str(req->rq_peer));
- cfs_spin_lock(&service->srv_lock);
+ cfs_spin_lock(&svcpt->scp_lock);
- req->rq_history_seq = service->srv_request_seq++;
- cfs_list_add_tail(&req->rq_history_list, &service->srv_request_history);
+ ptlrpc_req_add_history(svcpt, req);
- if (ev->unlinked) {
- service->srv_nrqbd_receiving--;
- CDEBUG(D_INFO, "Buffer complete: %d buffers still posted\n",
- service->srv_nrqbd_receiving);
-
- /* Normally, don't complain about 0 buffers posted; LNET won't
- * drop incoming reqs since we set the portal lazy */
- if (test_req_buffer_pressure &&
- ev->type != LNET_EVENT_UNLINK &&
- service->srv_nrqbd_receiving == 0)
+ if (ev->unlinked) {
+ svcpt->scp_nrqbds_posted--;
+ CDEBUG(D_INFO, "Buffer complete: %d buffers still posted\n",
+ svcpt->scp_nrqbds_posted);
+
+ /* Normally, don't complain about 0 buffers posted; LNET won't
+ * drop incoming reqs since we set the portal lazy */
+ if (test_req_buffer_pressure &&
+ ev->type != LNET_EVENT_UNLINK &&
+ svcpt->scp_nrqbds_posted == 0)
CWARN("All %s request buffers busy\n",
service->srv_name);
rqbd->rqbd_refcount++;
}
- cfs_list_add_tail(&req->rq_list, &service->srv_req_in_queue);
- service->srv_n_queued_reqs++;
+ cfs_list_add_tail(&req->rq_list, &svcpt->scp_req_incoming);
+ svcpt->scp_nreqs_incoming++;
- /* NB everything can disappear under us once the request
- * has been queued and we unlock, so do the wake now... */
- cfs_waitq_signal(&service->srv_waitq);
+ /* NB everything can disappear under us once the request
+ * has been queued and we unlock, so do the wake now... */
+ cfs_waitq_signal(&svcpt->scp_waitq);
- cfs_spin_unlock(&service->srv_lock);
- EXIT;
+ cfs_spin_unlock(&svcpt->scp_lock);
+ EXIT;
}
/*
*/
void reply_out_callback(lnet_event_t *ev)
{
- struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
- struct ptlrpc_reply_state *rs = cbid->cbid_arg;
- struct ptlrpc_service *svc = rs->rs_service;
+ struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
+ struct ptlrpc_reply_state *rs = cbid->cbid_arg;
+ struct ptlrpc_service_part *svcpt = rs->rs_svcpt;
ENTRY;
LASSERT (ev->type == LNET_EVENT_SEND ||
if (ev->unlinked) {
/* Last network callback. The net's ref on 'rs' stays put
* until ptlrpc_handle_rs() is done with it */
- cfs_spin_lock(&svc->srv_rs_lock);
- cfs_spin_lock(&rs->rs_lock);
- rs->rs_on_net = 0;
- if (!rs->rs_no_ack ||
- rs->rs_transno <= rs->rs_export->exp_obd->obd_last_committed)
- ptlrpc_schedule_difficult_reply (rs);
- cfs_spin_unlock(&rs->rs_lock);
- cfs_spin_unlock(&svc->srv_rs_lock);
- }
-
- EXIT;
+ cfs_spin_lock(&svcpt->scp_rep_lock);
+ cfs_spin_lock(&rs->rs_lock);
+
+ rs->rs_on_net = 0;
+ if (!rs->rs_no_ack ||
+ rs->rs_transno <=
+ rs->rs_export->exp_obd->obd_last_committed)
+ ptlrpc_schedule_difficult_reply(rs);
+
+ cfs_spin_unlock(&rs->rs_lock);
+ cfs_spin_unlock(&svcpt->scp_rep_lock);
+ }
+ EXIT;
}
+#ifdef HAVE_SERVER_SUPPORT
/*
* Server's bulk completion callback
*/
cfs_spin_unlock(&desc->bd_lock);
EXIT;
}
+#endif
static void ptlrpc_master_callback(lnet_event_t *ev)
{
callback == reply_in_callback ||
callback == client_bulk_callback ||
callback == request_in_callback ||
- callback == reply_out_callback ||
- callback == server_bulk_callback);
+ callback == reply_out_callback
+#ifdef HAVE_SERVER_SUPPORT
+ || callback == server_bulk_callback
+#endif
+ );
callback (ev);
}
/* CAVEAT EMPTOR: how we process portals events is _radically_
* different depending on... */
#ifdef __KERNEL__
- /* kernel portals calls our master callback when events are added to
- * the event queue. In fact lustre never pulls events off this queue,
- * so it's only sized for some debug history. */
- rc = LNetEQAlloc(1024, ptlrpc_master_callback, &ptlrpc_eq_h);
+ /* kernel LNet calls our master callback when there are new event,
+ * because we are guaranteed to get every event via callback,
+ * so we just set EQ size to 0 to avoid overhread of serializing
+ * enqueue/dequeue operations in LNet. */
+ rc = LNetEQAlloc(0, ptlrpc_master_callback, &ptlrpc_eq_h);
#else
/* liblustre calls the master callback when it removes events from the
* event queue. The event queue has to be big enough not to drop