Whamcloud - gitweb
git://git.whamcloud.com
/
fs
/
lustre-release.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
| inline |
side by side
LU-1517 ptlrpc: throw net error to ptlrpc for bulk
[fs/lustre-release.git]
/
lustre
/
ptlrpc
/
events.c
diff --git
a/lustre/ptlrpc/events.c
b/lustre/ptlrpc/events.c
index
ab6da13
..
55cc32e
100644
(file)
--- a/
lustre/ptlrpc/events.c
+++ b/
lustre/ptlrpc/events.c
@@
-178,6
+178,7
@@
void client_bulk_callback (lnet_event_t *ev)
{
struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
struct ptlrpc_bulk_desc *desc = cbid->cbid_arg;
+ struct ptlrpc_request *req;
ENTRY;
LASSERT ((desc->bd_type == BULK_PUT_SINK &&
@@
-198,7
+199,7
@@
void client_bulk_callback (lnet_event_t *ev)
ev->type, ev->status, desc);
cfs_spin_lock(&desc->bd_lock);
-
+ req = desc->bd_req;
LASSERT(desc->bd_network_rw);
desc->bd_network_rw = 0;
@@
-206,6
+207,11
@@
void client_bulk_callback (lnet_event_t *ev)
desc->bd_success = 1;
desc->bd_nob_transferred = ev->mlength;
desc->bd_sender = ev->sender;
+ } else {
+ /* start reconnect and resend if network error hit */
+ cfs_spin_lock(&req->rq_lock);
+ req->rq_net_err = 1;
+ cfs_spin_unlock(&req->rq_lock);
}
/* release the encrypted pages for write */
@@
-214,20
+220,75
@@
void client_bulk_callback (lnet_event_t *ev)
/* NB don't unlock till after wakeup; desc can disappear under us
* otherwise */
- ptlrpc_client_wake_req(
desc->bd_
req);
+ ptlrpc_client_wake_req(req);
cfs_spin_unlock(&desc->bd_lock);
EXIT;
}
/*
+ * We will have percpt request history list for ptlrpc service in upcoming
+ * patches because we don't want to be serialized by current per-service
+ * history operations. So we require history ID can (somehow) show arriving
+ * order w/o grabbing global lock, and user can sort them in userspace.
+ *
+ * This is how we generate history ID for ptlrpc_request:
+ * ----------------------------------------------------
+ * | 32 bits | 16 bits | (16 - X)bits | X bits |
+ * ----------------------------------------------------
+ * | seconds | usec / 16 | sequence | CPT id |
+ * ----------------------------------------------------
+ *
+ * it might not be precise but should be good enough.
+ */
+
+#define REQS_CPT_BITS(svcpt) ((svcpt)->scp_service->srv_cpt_bits)
+
+#define REQS_SEC_SHIFT 32
+#define REQS_USEC_SHIFT 16
+#define REQS_SEQ_SHIFT(svcpt) REQS_CPT_BITS(svcpt)
+
+static void ptlrpc_req_add_history(struct ptlrpc_service_part *svcpt,
+ struct ptlrpc_request *req)
+{
+ __u64 sec = req->rq_arrival_time.tv_sec;
+ __u32 usec = req->rq_arrival_time.tv_usec >> 4; /* usec / 16 */
+ __u64 new_seq;
+
+ /* set sequence ID for request and add it to history list,
+ * it must be called with hold svcpt::scp_lock */
+
+ new_seq = (sec << REQS_SEC_SHIFT) |
+ (usec << REQS_USEC_SHIFT) | svcpt->scp_cpt;
+ if (new_seq > svcpt->scp_hist_seq) {
+ /* This handles the initial case of scp_hist_seq == 0 or
+ * we just jumped into a new time window */
+ svcpt->scp_hist_seq = new_seq;
+ } else {
+ LASSERT(REQS_SEQ_SHIFT(svcpt) < REQS_USEC_SHIFT);
+ /* NB: increase sequence number in current usec bucket,
+ * however, it's possible that we used up all bits for
+ * sequence and jumped into the next usec bucket (future time),
+ * then we hope there will be less RPCs per bucket at some
+ * point, and sequence will catch up again */
+ svcpt->scp_hist_seq += (1U << REQS_SEQ_SHIFT(svcpt));
+ new_seq = svcpt->scp_hist_seq;
+ }
+
+ req->rq_history_seq = new_seq;
+
+ cfs_list_add_tail(&req->rq_history_list, &svcpt->scp_hist_reqs);
+}
+
+/*
* Server's incoming request callback
*/
void request_in_callback(lnet_event_t *ev)
{
- struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
- struct ptlrpc_request_buffer_desc *rqbd = cbid->cbid_arg;
- struct ptlrpc_service *service = rqbd->rqbd_service;
+ struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
+ struct ptlrpc_request_buffer_desc *rqbd = cbid->cbid_arg;
+ struct ptlrpc_service_part *svcpt = rqbd->rqbd_svcpt;
+ struct ptlrpc_service *service = svcpt->scp_service;
struct ptlrpc_request *req;
ENTRY;
@@
-277,11
+338,9
@@
void request_in_callback(lnet_event_t *ev)
req->rq_self = ev->target.nid;
req->rq_rqbd = rqbd;
req->rq_phase = RQ_PHASE_NEW;
-#ifdef CRAY_XT3
- req->rq_uid = ev->uid;
-#endif
cfs_spin_lock_init(&req->rq_lock);
CFS_INIT_LIST_HEAD(&req->rq_timed_list);
+ CFS_INIT_LIST_HEAD(&req->rq_exp_list);
cfs_atomic_set(&req->rq_refcount, 1);
if (ev->type == LNET_EVENT_PUT)
CDEBUG(D_INFO, "incoming req@%p x"LPU64" msgsize %u\n",
@@
-289,21
+348,20
@@
void request_in_callback(lnet_event_t *ev)
CDEBUG(D_RPCTRACE, "peer: %s\n", libcfs_id2str(req->rq_peer));
-
cfs_spin_lock(&service->srv
_lock);
+
cfs_spin_lock(&svcpt->scp
_lock);
- req->rq_history_seq = service->srv_request_seq++;
- cfs_list_add_tail(&req->rq_history_list, &service->srv_request_history);
+ ptlrpc_req_add_history(svcpt, req);
-
if (ev->unlinked) {
-
service->srv_nrqbd_receiving
--;
-
CDEBUG(D_INFO, "Buffer complete: %d buffers still posted\n",
-
service->srv_nrqbd_receiving
);
-
-
/* Normally, don't complain about 0 buffers posted; LNET won't
-
* drop incoming reqs since we set the portal lazy */
-
if (test_req_buffer_pressure &&
-
ev->type != LNET_EVENT_UNLINK &&
-
service->srv_nrqbd_receiving
== 0)
+ if (ev->unlinked) {
+
svcpt->scp_nrqbds_posted
--;
+ CDEBUG(D_INFO, "Buffer complete: %d buffers still posted\n",
+
svcpt->scp_nrqbds_posted
);
+
+ /* Normally, don't complain about 0 buffers posted; LNET won't
+ * drop incoming reqs since we set the portal lazy */
+ if (test_req_buffer_pressure &&
+ ev->type != LNET_EVENT_UNLINK &&
+
svcpt->scp_nrqbds_posted
== 0)
CWARN("All %s request buffers busy\n",
service->srv_name);
@@
-313,15
+371,15
@@
void request_in_callback(lnet_event_t *ev)
rqbd->rqbd_refcount++;
}
-
cfs_list_add_tail(&req->rq_list, &service->srv_req_in_queue
);
-
service->srv_n_queued_reqs
++;
+
cfs_list_add_tail(&req->rq_list, &svcpt->scp_req_incoming
);
+
svcpt->scp_nreqs_incoming
++;
-
/* NB everything can disappear under us once the request
-
* has been queued and we unlock, so do the wake now... */
-
cfs_waitq_signal(&service->srv
_waitq);
+ /* NB everything can disappear under us once the request
+ * has been queued and we unlock, so do the wake now... */
+
cfs_waitq_signal(&svcpt->scp
_waitq);
-
cfs_spin_unlock(&service->srv
_lock);
-
EXIT;
+
cfs_spin_unlock(&svcpt->scp
_lock);
+ EXIT;
}
/*
@@
-329,9
+387,9
@@
void request_in_callback(lnet_event_t *ev)
*/
void reply_out_callback(lnet_event_t *ev)
{
-
struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
-
struct ptlrpc_reply_state *rs = cbid->cbid_arg;
-
struct ptlrpc_service *svc = rs->rs_service
;
+ struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
+ struct ptlrpc_reply_state *rs = cbid->cbid_arg;
+
struct ptlrpc_service_part *svcpt = rs->rs_svcpt
;
ENTRY;
LASSERT (ev->type == LNET_EVENT_SEND ||
@@
-352,17
+410,19
@@
void reply_out_callback(lnet_event_t *ev)
if (ev->unlinked) {
/* Last network callback. The net's ref on 'rs' stays put
* until ptlrpc_handle_rs() is done with it */
- cfs_spin_lock(&svc->srv_rs_lock);
- cfs_spin_lock(&rs->rs_lock);
- rs->rs_on_net = 0;
- if (!rs->rs_no_ack ||
- rs->rs_transno <= rs->rs_export->exp_obd->obd_last_committed)
- ptlrpc_schedule_difficult_reply (rs);
- cfs_spin_unlock(&rs->rs_lock);
- cfs_spin_unlock(&svc->srv_rs_lock);
- }
-
- EXIT;
+ cfs_spin_lock(&svcpt->scp_rep_lock);
+ cfs_spin_lock(&rs->rs_lock);
+
+ rs->rs_on_net = 0;
+ if (!rs->rs_no_ack ||
+ rs->rs_transno <=
+ rs->rs_export->exp_obd->obd_last_committed)
+ ptlrpc_schedule_difficult_reply(rs);
+
+ cfs_spin_unlock(&rs->rs_lock);
+ cfs_spin_unlock(&svcpt->scp_rep_lock);
+ }
+ EXIT;
}
#ifdef HAVE_SERVER_SUPPORT