1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved.
5 * Author: PJ Kirner <pjkirner@clusterfs.com>
7 * This file is part of the Lustre file system, http://www.lustre.org
8 * Lustre is a trademark of Cluster File Systems, Inc.
10 * This file is confidential source code owned by Cluster File Systems.
11 * No viewing, modification, compilation, redistribution, or any other
12 * form of use is permitted except through a signed license agreement.
14 * If you have not signed such an agreement, then you have no rights to
15 * this file. Please destroy it immediately and contact CFS.
22 kptllnd_rx_buffer_pool_init(kptl_rx_buffer_pool_t *rxbp)
24 memset(rxbp, 0, sizeof(*rxbp));
25 spin_lock_init(&rxbp->rxbp_lock);
26 INIT_LIST_HEAD(&rxbp->rxbp_list);
30 kptllnd_rx_buffer_destroy(kptl_rx_buffer_t *rxb)
32 kptl_rx_buffer_pool_t *rxbp = rxb->rxb_pool;
34 LASSERT(rxb->rxb_refcount == 0);
35 LASSERT(PtlHandleIsEqual(rxb->rxb_mdh, PTL_INVALID_HANDLE));
36 LASSERT(!rxb->rxb_posted);
37 LASSERT(rxb->rxb_idle);
39 list_del(&rxb->rxb_list);
42 LIBCFS_FREE(rxb->rxb_buffer, kptllnd_rx_buffer_size());
43 LIBCFS_FREE(rxb, sizeof(*rxb));
47 kptllnd_rx_buffer_pool_reserve(kptl_rx_buffer_pool_t *rxbp, int count)
52 kptl_rx_buffer_t *rxb;
56 bufsize = kptllnd_rx_buffer_size();
57 msgs_per_buffer = bufsize / (*kptllnd_tunables.kptl_max_msg_size);
59 CDEBUG(D_NET, "kptllnd_rx_buffer_pool_reserve(%d)\n", count);
61 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
64 if (rxbp->rxbp_shutdown) {
69 if (rxbp->rxbp_reserved + count <=
70 rxbp->rxbp_count * msgs_per_buffer) {
75 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
77 LIBCFS_ALLOC(rxb, sizeof(*rxb));
78 LIBCFS_ALLOC(buffer, bufsize);
80 if (rxb == NULL || buffer == NULL) {
81 CERROR("Failed to allocate rx buffer\n");
84 LIBCFS_FREE(rxb, sizeof(*rxb));
86 LIBCFS_FREE(buffer, bufsize);
88 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
93 memset(rxb, 0, sizeof(*rxb));
95 rxb->rxb_eventarg.eva_type = PTLLND_EVENTARG_TYPE_BUF;
96 rxb->rxb_refcount = 0;
100 rxb->rxb_buffer = buffer;
101 rxb->rxb_mdh = PTL_INVALID_HANDLE;
103 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
105 if (rxbp->rxbp_shutdown) {
106 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
108 LIBCFS_FREE(rxb, sizeof(*rxb));
109 LIBCFS_FREE(buffer, bufsize);
111 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
116 list_add_tail(&rxb->rxb_list, &rxbp->rxbp_list);
119 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
121 kptllnd_rx_buffer_post(rxb);
123 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
127 rxbp->rxbp_reserved += count;
129 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
135 kptllnd_rx_buffer_pool_unreserve(kptl_rx_buffer_pool_t *rxbp,
140 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
142 CDEBUG(D_NET, "kptllnd_rx_buffer_pool_unreserve(%d)\n", count);
143 rxbp->rxbp_reserved -= count;
145 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
149 kptllnd_rx_buffer_pool_fini(kptl_rx_buffer_pool_t *rxbp)
151 kptl_rx_buffer_t *rxb;
155 struct list_head *tmp;
156 struct list_head *nxt;
159 /* CAVEAT EMPTOR: I'm racing with everything here!!!
161 * Buffers can still be posted after I set rxbp_shutdown because I
162 * can't hold rxbp_lock while I'm posting them.
164 * Calling PtlMDUnlink() here races with auto-unlinks; i.e. a buffer's
165 * MD handle could become invalid under me. I am vulnerable to portals
166 * re-using handles (i.e. make the same handle valid again, but for a
167 * different MD) from when the MD is actually unlinked, to when the
168 * event callback tells me it has been unlinked. */
170 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
172 rxbp->rxbp_shutdown = 1;
175 list_for_each_safe(tmp, nxt, &rxbp->rxbp_list) {
176 rxb = list_entry (tmp, kptl_rx_buffer_t, rxb_list);
179 spin_unlock_irqrestore(&rxbp->rxbp_lock,
181 kptllnd_rx_buffer_destroy(rxb);
182 spin_lock_irqsave(&rxbp->rxbp_lock,
188 if (PtlHandleIsEqual(mdh, PTL_INVALID_HANDLE))
191 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
193 rc = PtlMDUnlink(mdh);
195 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
197 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
198 /* callback clears rxb_mdh and drops net's ref
199 * (which causes repost, but since I set
200 * shutdown, it will just set the buffer
205 rxb->rxb_mdh = PTL_INVALID_HANDLE;
206 kptllnd_rx_buffer_decref_locked(rxb);
211 if (list_empty(&rxbp->rxbp_list))
214 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
216 /* Wait a bit for references to be dropped */
217 CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
218 "Waiting for %d Busy RX Buffers\n",
221 cfs_pause(cfs_time_seconds(1));
223 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
226 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
230 kptllnd_rx_buffer_post(kptl_rx_buffer_t *rxb)
236 ptl_process_id_t any;
237 kptl_rx_buffer_pool_t *rxbp = rxb->rxb_pool;
240 LASSERT (!in_interrupt());
241 LASSERT (rxb->rxb_refcount == 0);
242 LASSERT (!rxb->rxb_idle);
243 LASSERT (!rxb->rxb_posted);
244 LASSERT (PtlHandleIsEqual(rxb->rxb_mdh, PTL_INVALID_HANDLE));
246 any.nid = PTL_NID_ANY;
247 any.pid = PTL_PID_ANY;
249 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
251 if (rxbp->rxbp_shutdown) {
253 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
257 rxb->rxb_refcount = 1; /* net's ref */
258 rxb->rxb_posted = 1; /* I'm posting */
260 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
262 rc = PtlMEAttach(kptllnd_data.kptl_nih,
263 *kptllnd_tunables.kptl_portal,
266 0, /* all matchbits are valid - ignore none */
271 CERROR("PtlMeAttach rxb failed %s(%d)\n",
272 kptllnd_errtype2str(rc), rc);
279 md.start = rxb->rxb_buffer;
280 md.length = PAGE_SIZE * *kptllnd_tunables.kptl_rxb_npages;
281 md.threshold = PTL_MD_THRESH_INF;
282 md.options = PTL_MD_OP_PUT |
283 PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
284 PTL_MD_EVENT_START_DISABLE |
287 md.user_ptr = &rxb->rxb_eventarg;
288 md.max_size = *kptllnd_tunables.kptl_max_msg_size;
289 md.eq_handle = kptllnd_data.kptl_eqh;
291 rc = PtlMDAttach(meh, md, PTL_UNLINK, &mdh);
293 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
294 if (rxb->rxb_posted) /* Not auto-unlinked yet!!! */
296 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
300 CERROR("PtlMDAttach rxb failed %s(%d)\n",
301 kptllnd_errtype2str(rc), rc);
302 rc = PtlMEUnlink(meh);
303 LASSERT(rc == PTL_OK);
306 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
308 /* XXX this will just try again immediately */
309 kptllnd_rx_buffer_decref_locked(rxb);
310 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
314 kptllnd_rx_alloc(void)
318 if (IS_SIMULATION_ENABLED(FAIL_RX_ALLOC)) {
319 CERROR ("FAIL_RX_ALLOC SIMULATION triggered\n");
323 rx = cfs_mem_cache_alloc(kptllnd_data.kptl_rx_cache, CFS_ALLOC_ATOMIC);
325 CERROR("Failed to allocate rx\n");
329 memset(rx, 0, sizeof(*rx));
334 kptllnd_rx_done(kptl_rx_t *rx)
336 kptl_rx_buffer_t *rxb = rx->rx_rxb;
337 kptl_peer_t *peer = rx->rx_peer;
340 CDEBUG(D_NET, "rx=%p rxb %p peer %p\n", rx, rxb, peer);
343 kptllnd_rx_buffer_decref(rxb);
346 /* Update credits (after I've decref-ed the buffer) */
347 spin_lock_irqsave(&peer->peer_lock, flags);
349 peer->peer_outstanding_credits++;
350 LASSERT (peer->peer_outstanding_credits +
351 peer->peer_sent_credits <=
352 *kptllnd_tunables.kptl_peercredits);
354 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: rx %p done\n",
355 libcfs_id2str(peer->peer_id), peer->peer_credits,
356 peer->peer_outstanding_credits, peer->peer_sent_credits,
359 spin_unlock_irqrestore(&peer->peer_lock, flags);
361 /* I might have to send back credits */
362 kptllnd_peer_check_sends(peer);
363 kptllnd_peer_decref(peer);
366 cfs_mem_cache_free(kptllnd_data.kptl_rx_cache, rx);
370 kptllnd_rx_buffer_callback (ptl_event_t *ev)
372 kptl_eventarg_t *eva = ev->md.user_ptr;
373 kptl_rx_buffer_t *rxb = kptllnd_eventarg2obj(eva);
374 kptl_rx_buffer_pool_t *rxbp = rxb->rxb_pool;
379 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
380 unlinked = ev->unlinked;
382 unlinked = ev->type == PTL_EVENT_UNLINK;
385 CDEBUG(D_NET, "%s: %s(%d) rxb=%p fail=%s(%d) unlink=%d\n",
386 kptllnd_ptlid2str(ev->initiator),
387 kptllnd_evtype2str(ev->type), ev->type, rxb,
388 kptllnd_errtype2str(ev->ni_fail_type), ev->ni_fail_type,
391 LASSERT (!rxb->rxb_idle);
392 LASSERT (ev->md.start == rxb->rxb_buffer);
393 LASSERT (ev->offset + ev->mlength <=
394 PAGE_SIZE * *kptllnd_tunables.kptl_rxb_npages);
395 LASSERT (ev->type == PTL_EVENT_PUT_END ||
396 ev->type == PTL_EVENT_UNLINK);
397 LASSERT (ev->type == PTL_EVENT_UNLINK ||
398 ev->match_bits == LNET_MSG_MATCHBITS);
400 if (ev->ni_fail_type != PTL_NI_OK) {
401 CERROR("Portals error from %s: %s(%d) rxb=%p fail=%s(%d) unlink=%dn",
402 kptllnd_ptlid2str(ev->initiator),
403 kptllnd_evtype2str(ev->type), ev->type, rxb,
404 kptllnd_errtype2str(ev->ni_fail_type),
405 ev->ni_fail_type, unlinked);
407 } else if (ev->type == PTL_EVENT_PUT_END &&
408 !rxbp->rxbp_shutdown) {
410 /* rxbp_shutdown sampled without locking! I only treat it as a
411 * hint since shutdown can start while rx's are queued on
413 #if (PTL_MD_LOCAL_ALIGN8 == 0)
414 /* Portals can't force message alignment - someone sending an
415 * odd-length message will misalign subsequent messages and
416 * force the fixup below... */
417 if ((ev->mlength & 7) != 0)
418 CWARN("Message from %s has odd length "LPU64": "
419 "probable version incompatibility\n",
420 kptllnd_ptlid2str(ev->initiator),
423 rx = kptllnd_rx_alloc();
425 CERROR("Message from %s dropped: ENOMEM",
426 kptllnd_ptlid2str(ev->initiator));
428 if ((ev->offset & 7) == 0) {
429 kptllnd_rx_buffer_addref(rxb);
431 rx->rx_nob = ev->mlength;
432 rx->rx_msg = (kptl_msg_t *)
433 (rxb->rxb_buffer + ev->offset);
435 #if (PTL_MD_LOCAL_ALIGN8 == 0)
436 /* Portals can't force alignment - copy into
437 * rx_space (avoiding overflow) to fix */
438 int maxlen = *kptllnd_tunables.kptl_max_msg_size;
441 rx->rx_nob = MIN(maxlen, ev->mlength);
442 rx->rx_msg = (kptl_msg_t *)rx->rx_space;
443 memcpy(rx->rx_msg, rxb->rxb_buffer + ev->offset,
446 /* Portals should have forced the alignment */
451 rx->rx_initiator = ev->initiator;
452 rx->rx_treceived = jiffies;
454 rx->rx_uid = ev->uid;
456 /* Queue for attention */
457 spin_lock_irqsave(&kptllnd_data.kptl_sched_lock,
460 list_add_tail(&rx->rx_list,
461 &kptllnd_data.kptl_sched_rxq);
462 wake_up(&kptllnd_data.kptl_sched_waitq);
464 spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock,
470 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
473 rxb->rxb_mdh = PTL_INVALID_HANDLE;
474 kptllnd_rx_buffer_decref_locked(rxb);
476 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
481 kptllnd_nak (kptl_rx_t *rx)
483 /* Fire-and-forget a stub message that will let the peer know my
484 * protocol magic/version and make her drop/refresh any peer state she
485 * might have with me. */
487 .start = kptllnd_data.kptl_nak_msg,
488 .length = kptllnd_data.kptl_nak_msg->ptlm_nob,
492 .eq_handle = PTL_EQ_NONE};
496 rc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &mdh);
498 CWARN("Can't NAK %s: bind failed %s(%d)\n",
499 kptllnd_ptlid2str(rx->rx_initiator),
500 kptllnd_errtype2str(rc), rc);
504 rc = PtlPut(mdh, PTL_NOACK_REQ, rx->rx_initiator,
505 *kptllnd_tunables.kptl_portal, 0,
506 LNET_MSG_MATCHBITS, 0, 0);
509 CWARN("Can't NAK %s: put failed %s(%d)\n",
510 kptllnd_ptlid2str(rx->rx_initiator),
511 kptllnd_errtype2str(rc), rc);
515 kptllnd_rx_parse(kptl_rx_t *rx)
517 kptl_msg_t *msg = rx->rx_msg;
521 lnet_process_id_t srcid;
523 LASSERT (rx->rx_peer == NULL);
525 if ((rx->rx_nob >= 4 &&
526 (msg->ptlm_magic == LNET_PROTO_MAGIC ||
527 msg->ptlm_magic == __swab32(LNET_PROTO_MAGIC))) ||
529 ((msg->ptlm_magic == PTLLND_MSG_MAGIC &&
530 msg->ptlm_version != PTLLND_MSG_VERSION) ||
531 (msg->ptlm_magic == __swab32(PTLLND_MSG_MAGIC) &&
532 msg->ptlm_version != __swab16(PTLLND_MSG_VERSION))))) {
533 /* NAK incompatible versions
534 * See other LNDs for how to handle this if/when ptllnd begins
535 * to allow different versions to co-exist */
536 CERROR("Bad version: got %04x expected %04x from %s\n",
537 (__u32)(msg->ptlm_magic == PTLLND_MSG_MAGIC ?
538 msg->ptlm_version : __swab16(msg->ptlm_version)),
539 PTLLND_MSG_VERSION, kptllnd_ptlid2str(rx->rx_initiator));
544 rc = kptllnd_msg_unpack(msg, rx->rx_nob);
546 CERROR ("Error %d unpacking rx from %s\n",
547 rc, kptllnd_ptlid2str(rx->rx_initiator));
551 srcid.nid = msg->ptlm_srcnid;
552 srcid.pid = msg->ptlm_srcpid;
554 CDEBUG(D_NETTRACE, "%s: RX %s c %d %p rxb %p queued %lu ticks (%ld s)\n",
555 libcfs_id2str(srcid), kptllnd_msgtype2str(msg->ptlm_type),
556 msg->ptlm_credits, rx, rx->rx_rxb,
557 jiffies - rx->rx_treceived,
558 cfs_duration_sec(jiffies - rx->rx_treceived));
560 if (srcid.nid != kptllnd_ptl2lnetnid(rx->rx_initiator.nid)) {
561 CERROR("Bad source id %s from %s\n",
562 libcfs_id2str(srcid),
563 kptllnd_ptlid2str(rx->rx_initiator));
567 if (msg->ptlm_type == PTLLND_MSG_TYPE_NAK) {
568 peer = kptllnd_id2peer(srcid);
572 CWARN("NAK from %s (%s)\n",
573 libcfs_id2str(srcid),
574 kptllnd_ptlid2str(rx->rx_initiator));
580 if (msg->ptlm_dstnid != kptllnd_data.kptl_ni->ni_nid ||
581 msg->ptlm_dstpid != the_lnet.ln_pid) {
582 CERROR("Bad dstid %s (expected %s) from %s\n",
583 libcfs_id2str((lnet_process_id_t) {
584 .nid = msg->ptlm_dstnid,
585 .pid = msg->ptlm_dstpid}),
586 libcfs_id2str((lnet_process_id_t) {
587 .nid = kptllnd_data.kptl_ni->ni_nid,
588 .pid = the_lnet.ln_pid}),
589 kptllnd_ptlid2str(rx->rx_initiator));
593 if (msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {
594 peer = kptllnd_peer_handle_hello(rx->rx_initiator, msg);
598 peer = kptllnd_id2peer(srcid);
600 CWARN("NAK %s: no connection; peer must reconnect\n",
601 libcfs_id2str(srcid));
602 /* NAK to make the peer reconnect */
607 /* Ignore anything apart from HELLO while I'm waiting for it and
608 * any messages for a previous incarnation of the connection */
609 if (peer->peer_state == PEER_STATE_WAITING_HELLO ||
610 msg->ptlm_dststamp < peer->peer_myincarnation) {
611 kptllnd_peer_decref(peer);
615 if (msg->ptlm_srcstamp != peer->peer_incarnation) {
616 CERROR("%s: Unexpected srcstamp "LPX64" "
617 "("LPX64" expected)\n",
618 libcfs_id2str(peer->peer_id),
620 peer->peer_incarnation);
625 if (msg->ptlm_dststamp != peer->peer_myincarnation) {
626 CERROR("%s: Unexpected dststamp "LPX64" "
627 "("LPX64" expected)\n",
628 libcfs_id2str(peer->peer_id), msg->ptlm_dststamp,
629 peer->peer_myincarnation);
635 LASSERT (msg->ptlm_srcnid == peer->peer_id.nid &&
636 msg->ptlm_srcpid == peer->peer_id.pid);
638 spin_lock_irqsave(&peer->peer_lock, flags);
640 /* Check peer only sends when I've sent her credits */
641 if (peer->peer_sent_credits == 0) {
642 int c = peer->peer_credits;
643 int oc = peer->peer_outstanding_credits;
644 int sc = peer->peer_sent_credits;
646 spin_unlock_irqrestore(&peer->peer_lock, flags);
648 CERROR("%s: buffer overrun [%d/%d+%d]\n",
649 libcfs_id2str(peer->peer_id), c, sc, oc);
652 peer->peer_sent_credits--;
654 /* No check for credit overflow - the peer may post new
655 * buffers after the startup handshake. */
656 peer->peer_credits += msg->ptlm_credits;
658 spin_unlock_irqrestore(&peer->peer_lock, flags);
660 /* See if something can go out now that credits have come in */
661 if (msg->ptlm_credits != 0)
662 kptllnd_peer_check_sends(peer);
664 /* ptllnd-level protocol correct - rx takes my ref on peer and increments
665 * peer_outstanding_credits when it completes */
667 kptllnd_peer_alive(peer);
669 switch (msg->ptlm_type) {
671 /* already checked by kptllnd_msg_unpack() */
674 case PTLLND_MSG_TYPE_HELLO:
675 CDEBUG(D_NET, "PTLLND_MSG_TYPE_HELLO\n");
678 case PTLLND_MSG_TYPE_NOOP:
679 CDEBUG(D_NET, "PTLLND_MSG_TYPE_NOOP\n");
682 case PTLLND_MSG_TYPE_IMMEDIATE:
683 CDEBUG(D_NET, "PTLLND_MSG_TYPE_IMMEDIATE\n");
684 rc = lnet_parse(kptllnd_data.kptl_ni,
685 &msg->ptlm_u.immediate.kptlim_hdr,
688 if (rc >= 0) /* kptllnd_recv owns 'rx' now */
692 case PTLLND_MSG_TYPE_PUT:
693 case PTLLND_MSG_TYPE_GET:
694 CDEBUG(D_NET, "PTLLND_MSG_TYPE_%s\n",
695 msg->ptlm_type == PTLLND_MSG_TYPE_PUT ?
698 /* checked in kptllnd_msg_unpack() */
699 LASSERT (msg->ptlm_u.rdma.kptlrm_matchbits >=
700 PTL_RESERVED_MATCHBITS);
702 /* Update last match bits seen */
703 spin_lock_irqsave(&peer->peer_lock, flags);
705 if (msg->ptlm_u.rdma.kptlrm_matchbits >
706 rx->rx_peer->peer_last_matchbits_seen)
707 rx->rx_peer->peer_last_matchbits_seen =
708 msg->ptlm_u.rdma.kptlrm_matchbits;
710 spin_unlock_irqrestore(&rx->rx_peer->peer_lock, flags);
712 rc = lnet_parse(kptllnd_data.kptl_ni,
713 &msg->ptlm_u.rdma.kptlrm_hdr,
716 if (rc >= 0) /* kptllnd_recv owns 'rx' now */
722 kptllnd_peer_close(peer, rc);
723 if (rx->rx_peer == NULL) /* drop ref on peer */
724 kptllnd_peer_decref(peer); /* unless rx_done will */