1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved.
5 * Author: PJ Kirner <pjkirner@clusterfs.com>
7 * This file is part of the Lustre file system, http://www.lustre.org
8 * Lustre is a trademark of Cluster File Systems, Inc.
10 * This file is confidential source code owned by Cluster File Systems.
11 * No viewing, modification, compilation, redistribution, or any other
12 * form of use is permitted except through a signed license agreement.
14 * If you have not signed such an agreement, then you have no rights to
15 * this file. Please destroy it immediately and contact CFS.
22 kptllnd_rx_buffer_pool_init(kptl_rx_buffer_pool_t *rxbp)
24 memset(rxbp, 0, sizeof(*rxbp));
25 spin_lock_init(&rxbp->rxbp_lock);
26 INIT_LIST_HEAD(&rxbp->rxbp_list);
30 kptllnd_rx_buffer_destroy(kptl_rx_buffer_t *rxb)
32 kptl_rx_buffer_pool_t *rxbp = rxb->rxb_pool;
34 LASSERT(rxb->rxb_refcount == 0);
35 LASSERT(PtlHandleIsEqual(rxb->rxb_mdh, PTL_INVALID_HANDLE));
36 LASSERT(!rxb->rxb_posted);
37 LASSERT(rxb->rxb_idle);
39 list_del(&rxb->rxb_list);
42 LIBCFS_FREE(rxb->rxb_buffer, kptllnd_rx_buffer_size());
43 LIBCFS_FREE(rxb, sizeof(*rxb));
47 kptllnd_rx_buffer_pool_reserve(kptl_rx_buffer_pool_t *rxbp, int count)
52 kptl_rx_buffer_t *rxb;
56 bufsize = kptllnd_rx_buffer_size();
57 msgs_per_buffer = bufsize / (*kptllnd_tunables.kptl_max_msg_size);
59 CDEBUG(D_NET, "kptllnd_rx_buffer_pool_reserve(%d)\n", count);
61 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
64 if (rxbp->rxbp_shutdown) {
69 if (rxbp->rxbp_reserved + count <=
70 rxbp->rxbp_count * msgs_per_buffer) {
75 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
77 LIBCFS_ALLOC(rxb, sizeof(*rxb));
78 LIBCFS_ALLOC(buffer, bufsize);
80 if (rxb == NULL || buffer == NULL) {
81 CERROR("Failed to allocate rx buffer\n");
84 LIBCFS_FREE(rxb, sizeof(*rxb));
86 LIBCFS_FREE(buffer, bufsize);
88 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
93 memset(rxb, 0, sizeof(*rxb));
95 rxb->rxb_eventarg.eva_type = PTLLND_EVENTARG_TYPE_BUF;
96 rxb->rxb_refcount = 0;
100 rxb->rxb_buffer = buffer;
101 rxb->rxb_mdh = PTL_INVALID_HANDLE;
103 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
105 if (rxbp->rxbp_shutdown) {
106 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
108 LIBCFS_FREE(rxb, sizeof(*rxb));
109 LIBCFS_FREE(buffer, bufsize);
111 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
116 list_add_tail(&rxb->rxb_list, &rxbp->rxbp_list);
119 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
121 kptllnd_rx_buffer_post(rxb);
123 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
127 rxbp->rxbp_reserved += count;
129 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
135 kptllnd_rx_buffer_pool_unreserve(kptl_rx_buffer_pool_t *rxbp,
140 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
142 CDEBUG(D_NET, "kptllnd_rx_buffer_pool_unreserve(%d)\n", count);
143 rxbp->rxbp_reserved -= count;
145 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
149 kptllnd_rx_buffer_pool_fini(kptl_rx_buffer_pool_t *rxbp)
151 kptl_rx_buffer_t *rxb;
155 struct list_head *tmp;
156 struct list_head *nxt;
159 /* CAVEAT EMPTOR: I'm racing with everything here!!!
161 * Buffers can still be posted after I set rxbp_shutdown because I
162 * can't hold rxbp_lock while I'm posting them.
164 * Calling PtlMDUnlink() here races with auto-unlinks; i.e. a buffer's
165 * MD handle could become invalid under me. I am vulnerable to portals
166 * re-using handles (i.e. make the same handle valid again, but for a
167 * different MD) from when the MD is actually unlinked, to when the
168 * event callback tells me it has been unlinked. */
170 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
172 rxbp->rxbp_shutdown = 1;
175 list_for_each_safe(tmp, nxt, &rxbp->rxbp_list) {
176 rxb = list_entry (tmp, kptl_rx_buffer_t, rxb_list);
179 spin_unlock_irqrestore(&rxbp->rxbp_lock,
181 kptllnd_rx_buffer_destroy(rxb);
182 spin_lock_irqsave(&rxbp->rxbp_lock,
188 if (PtlHandleIsEqual(mdh, PTL_INVALID_HANDLE))
191 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
193 rc = PtlMDUnlink(mdh);
195 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
197 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
198 /* callback clears rxb_mdh and drops net's ref
199 * (which causes repost, but since I set
200 * shutdown, it will just set the buffer
205 rxb->rxb_mdh = PTL_INVALID_HANDLE;
206 kptllnd_rx_buffer_decref_locked(rxb);
211 if (list_empty(&rxbp->rxbp_list))
214 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
216 /* Wait a bit for references to be dropped */
217 CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
218 "Waiting for %d Busy RX Buffers\n",
221 cfs_pause(cfs_time_seconds(1));
223 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
226 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
230 kptllnd_rx_buffer_post(kptl_rx_buffer_t *rxb)
236 ptl_process_id_t any;
237 kptl_rx_buffer_pool_t *rxbp = rxb->rxb_pool;
240 LASSERT (!in_interrupt());
241 LASSERT (rxb->rxb_refcount == 0);
242 LASSERT (!rxb->rxb_idle);
243 LASSERT (!rxb->rxb_posted);
244 LASSERT (PtlHandleIsEqual(rxb->rxb_mdh, PTL_INVALID_HANDLE));
246 any.nid = PTL_NID_ANY;
247 any.pid = PTL_PID_ANY;
249 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
251 if (rxbp->rxbp_shutdown) {
253 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
257 rxb->rxb_refcount = 1; /* net's ref */
258 rxb->rxb_posted = 1; /* I'm posting */
260 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
262 rc = PtlMEAttach(kptllnd_data.kptl_nih,
263 *kptllnd_tunables.kptl_portal,
266 0, /* all matchbits are valid - ignore none */
271 CERROR("PtlMeAttach rxb failed %s(%d)\n",
272 kptllnd_errtype2str(rc), rc);
279 md.start = rxb->rxb_buffer;
280 md.length = PAGE_SIZE * *kptllnd_tunables.kptl_rxb_npages;
281 md.threshold = PTL_MD_THRESH_INF;
282 md.options = PTL_MD_OP_PUT |
283 PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
284 PTL_MD_EVENT_START_DISABLE |
287 md.user_ptr = &rxb->rxb_eventarg;
288 md.max_size = *kptllnd_tunables.kptl_max_msg_size;
289 md.eq_handle = kptllnd_data.kptl_eqh;
291 rc = PtlMDAttach(meh, md, PTL_UNLINK, &mdh);
293 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
294 if (rxb->rxb_posted) /* Not auto-unlinked yet!!! */
296 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
300 CERROR("PtlMDAttach rxb failed %s(%d)\n",
301 kptllnd_errtype2str(rc), rc);
302 rc = PtlMEUnlink(meh);
303 LASSERT(rc == PTL_OK);
306 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
308 /* XXX this will just try again immediately */
309 kptllnd_rx_buffer_decref_locked(rxb);
310 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
314 kptllnd_rx_alloc(void)
318 if (IS_SIMULATION_ENABLED(FAIL_RX_ALLOC)) {
319 CERROR ("FAIL_RX_ALLOC SIMULATION triggered\n");
323 rx = cfs_mem_cache_alloc(kptllnd_data.kptl_rx_cache, CFS_ALLOC_ATOMIC);
325 CERROR("Failed to allocate rx\n");
329 memset(rx, 0, sizeof(*rx));
334 kptllnd_rx_done(kptl_rx_t *rx, int post_credit)
336 kptl_rx_buffer_t *rxb = rx->rx_rxb;
337 kptl_peer_t *peer = rx->rx_peer;
340 LASSERT (post_credit == PTLLND_POSTRX_NO_CREDIT ||
341 post_credit == PTLLND_POSTRX_PEER_CREDIT);
343 CDEBUG(D_NET, "rx=%p rxb %p peer %p\n", rx, rxb, peer);
346 kptllnd_rx_buffer_decref(rxb);
349 /* Update credits (after I've decref-ed the buffer) */
350 spin_lock_irqsave(&peer->peer_lock, flags);
352 if (post_credit == PTLLND_POSTRX_PEER_CREDIT)
353 peer->peer_outstanding_credits++;
355 LASSERT (peer->peer_outstanding_credits +
356 peer->peer_sent_credits <=
357 *kptllnd_tunables.kptl_peercredits);
359 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: rx %p done\n",
360 libcfs_id2str(peer->peer_id), peer->peer_credits,
361 peer->peer_outstanding_credits, peer->peer_sent_credits,
364 spin_unlock_irqrestore(&peer->peer_lock, flags);
366 /* I might have to send back credits */
367 kptllnd_peer_check_sends(peer);
368 kptllnd_peer_decref(peer);
371 cfs_mem_cache_free(kptllnd_data.kptl_rx_cache, rx);
375 kptllnd_rx_buffer_callback (ptl_event_t *ev)
377 kptl_eventarg_t *eva = ev->md.user_ptr;
378 kptl_rx_buffer_t *rxb = kptllnd_eventarg2obj(eva);
379 kptl_rx_buffer_pool_t *rxbp = rxb->rxb_pool;
384 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
385 unlinked = ev->unlinked;
387 unlinked = ev->type == PTL_EVENT_UNLINK;
390 CDEBUG(D_NET, "%s: %s(%d) rxb=%p fail=%s(%d) unlink=%d\n",
391 kptllnd_ptlid2str(ev->initiator),
392 kptllnd_evtype2str(ev->type), ev->type, rxb,
393 kptllnd_errtype2str(ev->ni_fail_type), ev->ni_fail_type,
396 LASSERT (!rxb->rxb_idle);
397 LASSERT (ev->md.start == rxb->rxb_buffer);
398 LASSERT (ev->offset + ev->mlength <=
399 PAGE_SIZE * *kptllnd_tunables.kptl_rxb_npages);
400 LASSERT (ev->type == PTL_EVENT_PUT_END ||
401 ev->type == PTL_EVENT_UNLINK);
402 LASSERT (ev->type == PTL_EVENT_UNLINK ||
403 ev->match_bits == LNET_MSG_MATCHBITS);
405 if (ev->ni_fail_type != PTL_NI_OK) {
406 CERROR("Portals error from %s: %s(%d) rxb=%p fail=%s(%d) unlink=%dn",
407 kptllnd_ptlid2str(ev->initiator),
408 kptllnd_evtype2str(ev->type), ev->type, rxb,
409 kptllnd_errtype2str(ev->ni_fail_type),
410 ev->ni_fail_type, unlinked);
412 } else if (ev->type == PTL_EVENT_PUT_END &&
413 !rxbp->rxbp_shutdown) {
415 /* rxbp_shutdown sampled without locking! I only treat it as a
416 * hint since shutdown can start while rx's are queued on
418 #if (PTL_MD_LOCAL_ALIGN8 == 0)
419 /* Portals can't force message alignment - someone sending an
420 * odd-length message will misalign subsequent messages and
421 * force the fixup below... */
422 if ((ev->mlength & 7) != 0)
423 CWARN("Message from %s has odd length "LPU64": "
424 "probable version incompatibility\n",
425 kptllnd_ptlid2str(ev->initiator),
428 rx = kptllnd_rx_alloc();
430 CERROR("Message from %s dropped: ENOMEM",
431 kptllnd_ptlid2str(ev->initiator));
433 if ((ev->offset & 7) == 0) {
434 kptllnd_rx_buffer_addref(rxb);
436 rx->rx_nob = ev->mlength;
437 rx->rx_msg = (kptl_msg_t *)
438 (rxb->rxb_buffer + ev->offset);
440 #if (PTL_MD_LOCAL_ALIGN8 == 0)
441 /* Portals can't force alignment - copy into
442 * rx_space (avoiding overflow) to fix */
443 int maxlen = *kptllnd_tunables.kptl_max_msg_size;
446 rx->rx_nob = MIN(maxlen, ev->mlength);
447 rx->rx_msg = (kptl_msg_t *)rx->rx_space;
448 memcpy(rx->rx_msg, rxb->rxb_buffer + ev->offset,
451 /* Portals should have forced the alignment */
456 rx->rx_initiator = ev->initiator;
457 rx->rx_treceived = jiffies;
459 rx->rx_uid = ev->uid;
461 /* Queue for attention */
462 spin_lock_irqsave(&kptllnd_data.kptl_sched_lock,
465 list_add_tail(&rx->rx_list,
466 &kptllnd_data.kptl_sched_rxq);
467 wake_up(&kptllnd_data.kptl_sched_waitq);
469 spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock,
475 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
478 rxb->rxb_mdh = PTL_INVALID_HANDLE;
479 kptllnd_rx_buffer_decref_locked(rxb);
481 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
486 kptllnd_nak (kptl_rx_t *rx)
488 /* Fire-and-forget a stub message that will let the peer know my
489 * protocol magic/version and make her drop/refresh any peer state she
490 * might have with me. */
492 .start = kptllnd_data.kptl_nak_msg,
493 .length = kptllnd_data.kptl_nak_msg->ptlm_nob,
497 .eq_handle = PTL_EQ_NONE};
501 rc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &mdh);
503 CWARN("Can't NAK %s: bind failed %s(%d)\n",
504 kptllnd_ptlid2str(rx->rx_initiator),
505 kptllnd_errtype2str(rc), rc);
509 rc = PtlPut(mdh, PTL_NOACK_REQ, rx->rx_initiator,
510 *kptllnd_tunables.kptl_portal, 0,
511 LNET_MSG_MATCHBITS, 0, 0);
514 CWARN("Can't NAK %s: put failed %s(%d)\n",
515 kptllnd_ptlid2str(rx->rx_initiator),
516 kptllnd_errtype2str(rc), rc);
520 kptllnd_rx_parse(kptl_rx_t *rx)
522 kptl_msg_t *msg = rx->rx_msg;
523 int post_credit = PTLLND_POSTRX_PEER_CREDIT;
527 lnet_process_id_t srcid;
529 LASSERT (rx->rx_peer == NULL);
531 if ((rx->rx_nob >= 4 &&
532 (msg->ptlm_magic == LNET_PROTO_MAGIC ||
533 msg->ptlm_magic == __swab32(LNET_PROTO_MAGIC))) ||
535 ((msg->ptlm_magic == PTLLND_MSG_MAGIC &&
536 msg->ptlm_version != PTLLND_MSG_VERSION) ||
537 (msg->ptlm_magic == __swab32(PTLLND_MSG_MAGIC) &&
538 msg->ptlm_version != __swab16(PTLLND_MSG_VERSION))))) {
539 /* NAK incompatible versions
540 * See other LNDs for how to handle this if/when ptllnd begins
541 * to allow different versions to co-exist */
542 CERROR("Bad version: got %04x expected %04x from %s\n",
543 (__u32)(msg->ptlm_magic == PTLLND_MSG_MAGIC ?
544 msg->ptlm_version : __swab16(msg->ptlm_version)),
545 PTLLND_MSG_VERSION, kptllnd_ptlid2str(rx->rx_initiator));
550 rc = kptllnd_msg_unpack(msg, rx->rx_nob);
552 CERROR ("Error %d unpacking rx from %s\n",
553 rc, kptllnd_ptlid2str(rx->rx_initiator));
557 srcid.nid = msg->ptlm_srcnid;
558 srcid.pid = msg->ptlm_srcpid;
560 CDEBUG(D_NETTRACE, "%s: RX %s c %d %p rxb %p queued %lu ticks (%ld s)\n",
561 libcfs_id2str(srcid), kptllnd_msgtype2str(msg->ptlm_type),
562 msg->ptlm_credits, rx, rx->rx_rxb,
563 jiffies - rx->rx_treceived,
564 cfs_duration_sec(jiffies - rx->rx_treceived));
566 if (srcid.nid != kptllnd_ptl2lnetnid(rx->rx_initiator.nid)) {
567 CERROR("Bad source id %s from %s\n",
568 libcfs_id2str(srcid),
569 kptllnd_ptlid2str(rx->rx_initiator));
573 if (msg->ptlm_type == PTLLND_MSG_TYPE_NAK) {
574 peer = kptllnd_id2peer(srcid);
578 CWARN("NAK from %s (%s)\n",
579 libcfs_id2str(srcid),
580 kptllnd_ptlid2str(rx->rx_initiator));
586 if (msg->ptlm_dstnid != kptllnd_data.kptl_ni->ni_nid ||
587 msg->ptlm_dstpid != the_lnet.ln_pid) {
588 CERROR("Bad dstid %s (expected %s) from %s\n",
589 libcfs_id2str((lnet_process_id_t) {
590 .nid = msg->ptlm_dstnid,
591 .pid = msg->ptlm_dstpid}),
592 libcfs_id2str((lnet_process_id_t) {
593 .nid = kptllnd_data.kptl_ni->ni_nid,
594 .pid = the_lnet.ln_pid}),
595 kptllnd_ptlid2str(rx->rx_initiator));
599 if (msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {
600 peer = kptllnd_peer_handle_hello(rx->rx_initiator, msg);
604 peer = kptllnd_id2peer(srcid);
606 CWARN("NAK %s: no connection; peer must reconnect\n",
607 libcfs_id2str(srcid));
608 /* NAK to make the peer reconnect */
613 /* Ignore anything apart from HELLO while I'm waiting for it and
614 * any messages for a previous incarnation of the connection */
615 if (peer->peer_state == PEER_STATE_WAITING_HELLO ||
616 msg->ptlm_dststamp < peer->peer_myincarnation) {
617 kptllnd_peer_decref(peer);
621 if (msg->ptlm_srcstamp != peer->peer_incarnation) {
622 CERROR("%s: Unexpected srcstamp "LPX64" "
623 "("LPX64" expected)\n",
624 libcfs_id2str(peer->peer_id),
626 peer->peer_incarnation);
631 if (msg->ptlm_dststamp != peer->peer_myincarnation) {
632 CERROR("%s: Unexpected dststamp "LPX64" "
633 "("LPX64" expected)\n",
634 libcfs_id2str(peer->peer_id), msg->ptlm_dststamp,
635 peer->peer_myincarnation);
641 LASSERT (msg->ptlm_srcnid == peer->peer_id.nid &&
642 msg->ptlm_srcpid == peer->peer_id.pid);
644 spin_lock_irqsave(&peer->peer_lock, flags);
646 /* Check peer only sends when I've sent her credits */
647 if (peer->peer_sent_credits == 0) {
648 int c = peer->peer_credits;
649 int oc = peer->peer_outstanding_credits;
650 int sc = peer->peer_sent_credits;
652 spin_unlock_irqrestore(&peer->peer_lock, flags);
654 CERROR("%s: buffer overrun [%d/%d+%d]\n",
655 libcfs_id2str(peer->peer_id), c, sc, oc);
658 peer->peer_sent_credits--;
660 /* No check for credit overflow - the peer may post new
661 * buffers after the startup handshake. */
662 peer->peer_credits += msg->ptlm_credits;
664 /* This ensures the credit taken by NOOP can be returned */
665 if (msg->ptlm_type == PTLLND_MSG_TYPE_NOOP) {
666 peer->peer_outstanding_credits++;
667 post_credit = PTLLND_POSTRX_NO_CREDIT;
670 spin_unlock_irqrestore(&peer->peer_lock, flags);
672 /* See if something can go out now that credits have come in */
673 if (msg->ptlm_credits != 0)
674 kptllnd_peer_check_sends(peer);
676 /* ptllnd-level protocol correct - rx takes my ref on peer and increments
677 * peer_outstanding_credits when it completes */
679 kptllnd_peer_alive(peer);
681 switch (msg->ptlm_type) {
683 /* already checked by kptllnd_msg_unpack() */
686 case PTLLND_MSG_TYPE_HELLO:
687 CDEBUG(D_NET, "PTLLND_MSG_TYPE_HELLO\n");
690 case PTLLND_MSG_TYPE_NOOP:
691 CDEBUG(D_NET, "PTLLND_MSG_TYPE_NOOP\n");
694 case PTLLND_MSG_TYPE_IMMEDIATE:
695 CDEBUG(D_NET, "PTLLND_MSG_TYPE_IMMEDIATE\n");
696 rc = lnet_parse(kptllnd_data.kptl_ni,
697 &msg->ptlm_u.immediate.kptlim_hdr,
700 if (rc >= 0) /* kptllnd_recv owns 'rx' now */
704 case PTLLND_MSG_TYPE_PUT:
705 case PTLLND_MSG_TYPE_GET:
706 CDEBUG(D_NET, "PTLLND_MSG_TYPE_%s\n",
707 msg->ptlm_type == PTLLND_MSG_TYPE_PUT ?
710 /* checked in kptllnd_msg_unpack() */
711 LASSERT (msg->ptlm_u.rdma.kptlrm_matchbits >=
712 PTL_RESERVED_MATCHBITS);
714 /* Update last match bits seen */
715 spin_lock_irqsave(&peer->peer_lock, flags);
717 if (msg->ptlm_u.rdma.kptlrm_matchbits >
718 rx->rx_peer->peer_last_matchbits_seen)
719 rx->rx_peer->peer_last_matchbits_seen =
720 msg->ptlm_u.rdma.kptlrm_matchbits;
722 spin_unlock_irqrestore(&rx->rx_peer->peer_lock, flags);
724 rc = lnet_parse(kptllnd_data.kptl_ni,
725 &msg->ptlm_u.rdma.kptlrm_hdr,
728 if (rc >= 0) /* kptllnd_recv owns 'rx' now */
734 kptllnd_peer_close(peer, rc);
735 if (rx->rx_peer == NULL) /* drop ref on peer */
736 kptllnd_peer_decref(peer); /* unless rx_done will */
738 kptllnd_rx_done(rx, post_credit);