4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
31 * This file is part of Lustre, http://www.lustre.org/
32 * Lustre is a trademark of Sun Microsystems, Inc.
34 * lnet/klnds/ptllnd/ptllnd_rx_buf.c
36 * Author: PJ Kirner <pjkirner@clusterfs.com>
42 kptllnd_rx_buffer_pool_init(kptl_rx_buffer_pool_t *rxbp)
44 memset(rxbp, 0, sizeof(*rxbp));
45 spin_lock_init(&rxbp->rxbp_lock);
46 CFS_INIT_LIST_HEAD(&rxbp->rxbp_list);
50 kptllnd_rx_buffer_destroy(kptl_rx_buffer_t *rxb)
52 kptl_rx_buffer_pool_t *rxbp = rxb->rxb_pool;
54 LASSERT(rxb->rxb_refcount == 0);
55 LASSERT(PtlHandleIsEqual(rxb->rxb_mdh, PTL_INVALID_HANDLE));
56 LASSERT(!rxb->rxb_posted);
57 LASSERT(rxb->rxb_idle);
59 cfs_list_del(&rxb->rxb_list);
62 LIBCFS_FREE(rxb->rxb_buffer, kptllnd_rx_buffer_size());
63 LIBCFS_FREE(rxb, sizeof(*rxb));
67 kptllnd_rx_buffer_pool_reserve(kptl_rx_buffer_pool_t *rxbp, int count)
72 kptl_rx_buffer_t *rxb;
76 bufsize = kptllnd_rx_buffer_size();
77 msgs_per_buffer = bufsize / (*kptllnd_tunables.kptl_max_msg_size);
79 CDEBUG(D_NET, "kptllnd_rx_buffer_pool_reserve(%d)\n", count);
81 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
84 if (rxbp->rxbp_shutdown) {
89 if (rxbp->rxbp_reserved + count <=
90 rxbp->rxbp_count * msgs_per_buffer) {
95 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
97 LIBCFS_ALLOC(rxb, sizeof(*rxb));
98 LIBCFS_ALLOC(buffer, bufsize);
100 if (rxb == NULL || buffer == NULL) {
101 CERROR("Failed to allocate rx buffer\n");
104 LIBCFS_FREE(rxb, sizeof(*rxb));
106 LIBCFS_FREE(buffer, bufsize);
108 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
113 memset(rxb, 0, sizeof(*rxb));
115 rxb->rxb_eventarg.eva_type = PTLLND_EVENTARG_TYPE_BUF;
116 rxb->rxb_refcount = 0;
117 rxb->rxb_pool = rxbp;
120 rxb->rxb_buffer = buffer;
121 rxb->rxb_mdh = PTL_INVALID_HANDLE;
123 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
125 if (rxbp->rxbp_shutdown) {
126 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
128 LIBCFS_FREE(rxb, sizeof(*rxb));
129 LIBCFS_FREE(buffer, bufsize);
131 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
136 cfs_list_add_tail(&rxb->rxb_list, &rxbp->rxbp_list);
139 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
141 kptllnd_rx_buffer_post(rxb);
143 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
147 rxbp->rxbp_reserved += count;
149 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
155 kptllnd_rx_buffer_pool_unreserve(kptl_rx_buffer_pool_t *rxbp,
160 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
162 CDEBUG(D_NET, "kptllnd_rx_buffer_pool_unreserve(%d)\n", count);
163 rxbp->rxbp_reserved -= count;
165 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
169 kptllnd_rx_buffer_pool_fini(kptl_rx_buffer_pool_t *rxbp)
171 kptl_rx_buffer_t *rxb;
179 /* CAVEAT EMPTOR: I'm racing with everything here!!!
181 * Buffers can still be posted after I set rxbp_shutdown because I
182 * can't hold rxbp_lock while I'm posting them.
184 * Calling PtlMDUnlink() here races with auto-unlinks; i.e. a buffer's
185 * MD handle could become invalid under me. I am vulnerable to portals
186 * re-using handles (i.e. make the same handle valid again, but for a
187 * different MD) from when the MD is actually unlinked, to when the
188 * event callback tells me it has been unlinked. */
190 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
192 rxbp->rxbp_shutdown = 1;
195 cfs_list_for_each_safe(tmp, nxt, &rxbp->rxbp_list) {
196 rxb = cfs_list_entry (tmp, kptl_rx_buffer_t, rxb_list);
199 spin_unlock_irqrestore(&rxbp->rxbp_lock,
201 kptllnd_rx_buffer_destroy(rxb);
202 spin_lock_irqsave(&rxbp->rxbp_lock,
208 if (PtlHandleIsEqual(mdh, PTL_INVALID_HANDLE))
211 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
213 rc = PtlMDUnlink(mdh);
215 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
217 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
218 /* callback clears rxb_mdh and drops net's ref
219 * (which causes repost, but since I set
220 * shutdown, it will just set the buffer
225 rxb->rxb_mdh = PTL_INVALID_HANDLE;
226 kptllnd_rx_buffer_decref_locked(rxb);
231 if (cfs_list_empty(&rxbp->rxbp_list))
234 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
236 /* Wait a bit for references to be dropped */
237 CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
238 "Waiting for %d Busy RX Buffers\n",
241 cfs_pause(cfs_time_seconds(1));
243 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
246 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
250 kptllnd_rx_buffer_post(kptl_rx_buffer_t *rxb)
256 ptl_process_id_t any;
257 kptl_rx_buffer_pool_t *rxbp = rxb->rxb_pool;
260 LASSERT (!cfs_in_interrupt());
261 LASSERT (rxb->rxb_refcount == 0);
262 LASSERT (!rxb->rxb_idle);
263 LASSERT (!rxb->rxb_posted);
264 LASSERT (PtlHandleIsEqual(rxb->rxb_mdh, PTL_INVALID_HANDLE));
266 any.nid = PTL_NID_ANY;
267 any.pid = PTL_PID_ANY;
269 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
271 if (rxbp->rxbp_shutdown) {
273 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
277 rxb->rxb_refcount = 1; /* net's ref */
278 rxb->rxb_posted = 1; /* I'm posting */
280 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
282 rc = PtlMEAttach(kptllnd_data.kptl_nih,
283 *kptllnd_tunables.kptl_portal,
286 0, /* all matchbits are valid - ignore none */
291 CERROR("PtlMeAttach rxb failed %s(%d)\n",
292 kptllnd_errtype2str(rc), rc);
299 md.start = rxb->rxb_buffer;
300 md.length = kptllnd_rx_buffer_size();
301 md.threshold = PTL_MD_THRESH_INF;
302 md.options = PTL_MD_OP_PUT |
303 PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
304 PTL_MD_EVENT_START_DISABLE |
307 md.user_ptr = &rxb->rxb_eventarg;
308 md.max_size = *kptllnd_tunables.kptl_max_msg_size;
309 md.eq_handle = kptllnd_data.kptl_eqh;
311 rc = PtlMDAttach(meh, md, PTL_UNLINK, &mdh);
313 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
314 if (rxb->rxb_posted) /* Not auto-unlinked yet!!! */
316 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
320 CERROR("PtlMDAttach rxb failed %s(%d)\n",
321 kptllnd_errtype2str(rc), rc);
322 rc = PtlMEUnlink(meh);
323 LASSERT(rc == PTL_OK);
326 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
328 /* XXX this will just try again immediately */
329 kptllnd_rx_buffer_decref_locked(rxb);
330 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
334 kptllnd_rx_alloc(void)
338 if (IS_SIMULATION_ENABLED(FAIL_RX_ALLOC)) {
339 CERROR ("FAIL_RX_ALLOC SIMULATION triggered\n");
343 rx = cfs_mem_cache_alloc(kptllnd_data.kptl_rx_cache, CFS_ALLOC_ATOMIC);
345 CERROR("Failed to allocate rx\n");
349 memset(rx, 0, sizeof(*rx));
354 kptllnd_rx_done(kptl_rx_t *rx, int post_credit)
356 kptl_rx_buffer_t *rxb = rx->rx_rxb;
357 kptl_peer_t *peer = rx->rx_peer;
360 LASSERT (post_credit == PTLLND_POSTRX_NO_CREDIT ||
361 post_credit == PTLLND_POSTRX_PEER_CREDIT);
363 CDEBUG(D_NET, "rx=%p rxb %p peer %p\n", rx, rxb, peer);
366 kptllnd_rx_buffer_decref(rxb);
369 /* Update credits (after I've decref-ed the buffer) */
370 spin_lock_irqsave(&peer->peer_lock, flags);
372 if (post_credit == PTLLND_POSTRX_PEER_CREDIT)
373 peer->peer_outstanding_credits++;
375 LASSERT (peer->peer_outstanding_credits +
376 peer->peer_sent_credits <=
377 *kptllnd_tunables.kptl_peertxcredits);
379 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: rx %p done\n",
380 libcfs_id2str(peer->peer_id), peer->peer_credits,
381 peer->peer_outstanding_credits, peer->peer_sent_credits,
384 spin_unlock_irqrestore(&peer->peer_lock, flags);
386 /* I might have to send back credits */
387 kptllnd_peer_check_sends(peer);
388 kptllnd_peer_decref(peer);
391 cfs_mem_cache_free(kptllnd_data.kptl_rx_cache, rx);
395 kptllnd_rx_buffer_callback (ptl_event_t *ev)
397 kptl_eventarg_t *eva = ev->md.user_ptr;
398 kptl_rx_buffer_t *rxb = kptllnd_eventarg2obj(eva);
399 kptl_rx_buffer_pool_t *rxbp = rxb->rxb_pool;
404 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
405 unlinked = ev->unlinked;
407 unlinked = ev->type == PTL_EVENT_UNLINK;
410 CDEBUG(D_NET, "%s: %s(%d) rxb=%p fail=%s(%d) unlink=%d\n",
411 kptllnd_ptlid2str(ev->initiator),
412 kptllnd_evtype2str(ev->type), ev->type, rxb,
413 kptllnd_errtype2str(ev->ni_fail_type), ev->ni_fail_type,
416 LASSERT (!rxb->rxb_idle);
417 LASSERT (ev->md.start == rxb->rxb_buffer);
418 LASSERT (ev->offset + ev->mlength <=
419 PAGE_SIZE * *kptllnd_tunables.kptl_rxb_npages);
420 LASSERT (ev->type == PTL_EVENT_PUT_END ||
421 ev->type == PTL_EVENT_UNLINK);
422 LASSERT (ev->type == PTL_EVENT_UNLINK ||
423 ev->match_bits == LNET_MSG_MATCHBITS);
425 if (ev->ni_fail_type != PTL_NI_OK) {
426 CERROR("Portals error from %s: %s(%d) rxb=%p fail=%s(%d) unlink=%dn",
427 kptllnd_ptlid2str(ev->initiator),
428 kptllnd_evtype2str(ev->type), ev->type, rxb,
429 kptllnd_errtype2str(ev->ni_fail_type),
430 ev->ni_fail_type, unlinked);
431 } else if (ev->type == PTL_EVENT_PUT_END &&
432 !rxbp->rxbp_shutdown) {
434 /* rxbp_shutdown sampled without locking! I only treat it as a
435 * hint since shutdown can start while rx's are queued on
437 #if (PTL_MD_LOCAL_ALIGN8 == 0)
438 /* Portals can't force message alignment - someone sending an
439 * odd-length message will misalign subsequent messages and
440 * force the fixup below... */
441 if ((ev->mlength & 7) != 0)
442 CWARN("Message from %s has odd length "LPU64": "
443 "probable version incompatibility\n",
444 kptllnd_ptlid2str(ev->initiator),
447 rx = kptllnd_rx_alloc();
449 CERROR("Message from %s dropped: ENOMEM",
450 kptllnd_ptlid2str(ev->initiator));
452 if ((ev->offset & 7) == 0) {
453 kptllnd_rx_buffer_addref(rxb);
455 rx->rx_nob = ev->mlength;
456 rx->rx_msg = (kptl_msg_t *)
457 (rxb->rxb_buffer + ev->offset);
459 #if (PTL_MD_LOCAL_ALIGN8 == 0)
460 /* Portals can't force alignment - copy into
461 * rx_space (avoiding overflow) to fix */
462 int maxlen = *kptllnd_tunables.kptl_max_msg_size;
465 rx->rx_nob = MIN(maxlen, ev->mlength);
466 rx->rx_msg = (kptl_msg_t *)rx->rx_space;
467 memcpy(rx->rx_msg, rxb->rxb_buffer + ev->offset,
470 /* Portals should have forced the alignment */
475 rx->rx_initiator = ev->initiator;
476 rx->rx_treceived = jiffies;
477 /* Queue for attention */
478 spin_lock_irqsave(&kptllnd_data.kptl_sched_lock,
481 cfs_list_add_tail(&rx->rx_list,
482 &kptllnd_data.kptl_sched_rxq);
483 cfs_waitq_signal(&kptllnd_data.kptl_sched_waitq);
485 spin_unlock_irqrestore(&kptllnd_data. \
486 kptl_sched_lock, flags);
491 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
494 rxb->rxb_mdh = PTL_INVALID_HANDLE;
495 kptllnd_rx_buffer_decref_locked(rxb);
497 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
502 kptllnd_nak (ptl_process_id_t dest)
504 /* Fire-and-forget a stub message that will let the peer know my
505 * protocol magic/version and make her drop/refresh any peer state she
506 * might have with me. */
508 .start = kptllnd_data.kptl_nak_msg,
509 .length = kptllnd_data.kptl_nak_msg->ptlm_nob,
513 .eq_handle = PTL_EQ_NONE};
517 rc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &mdh);
519 CWARN("Can't NAK %s: bind failed %s(%d)\n",
520 kptllnd_ptlid2str(dest), kptllnd_errtype2str(rc), rc);
524 rc = PtlPut(mdh, PTL_NOACK_REQ, dest,
525 *kptllnd_tunables.kptl_portal, 0,
526 LNET_MSG_MATCHBITS, 0, 0);
528 CWARN("Can't NAK %s: put failed %s(%d)\n",
529 kptllnd_ptlid2str(dest), kptllnd_errtype2str(rc), rc);
534 kptllnd_find_net (lnet_nid_t nid)
538 read_lock(&kptllnd_data.kptl_net_rw_lock);
539 cfs_list_for_each_entry (net, &kptllnd_data.kptl_nets, net_list) {
540 LASSERT (!net->net_shutdown);
542 if (net->net_ni->ni_nid == nid) {
543 kptllnd_net_addref(net);
544 read_unlock(&kptllnd_data.kptl_net_rw_lock);
548 read_unlock(&kptllnd_data.kptl_net_rw_lock);
554 kptllnd_rx_parse(kptl_rx_t *rx)
556 kptl_msg_t *msg = rx->rx_msg;
558 int post_credit = PTLLND_POSTRX_PEER_CREDIT;
559 kptl_net_t *net = NULL;
563 lnet_process_id_t srcid;
565 LASSERT (!cfs_in_interrupt());
566 LASSERT (rx->rx_peer == NULL);
568 CFS_INIT_LIST_HEAD(&txs);
570 if ((rx->rx_nob >= 4 &&
571 (msg->ptlm_magic == LNET_PROTO_MAGIC ||
572 msg->ptlm_magic == __swab32(LNET_PROTO_MAGIC))) ||
574 ((msg->ptlm_magic == PTLLND_MSG_MAGIC &&
575 msg->ptlm_version != PTLLND_MSG_VERSION) ||
576 (msg->ptlm_magic == __swab32(PTLLND_MSG_MAGIC) &&
577 msg->ptlm_version != __swab16(PTLLND_MSG_VERSION))))) {
578 /* NAK incompatible versions
579 * See other LNDs for how to handle this if/when ptllnd begins
580 * to allow different versions to co-exist */
581 CERROR("Bad version: got %04x expected %04x from %s\n",
582 (__u32)(msg->ptlm_magic == PTLLND_MSG_MAGIC ?
583 msg->ptlm_version : __swab16(msg->ptlm_version)),
584 PTLLND_MSG_VERSION, kptllnd_ptlid2str(rx->rx_initiator));
585 /* NB backward compatibility */
586 kptllnd_nak(rx->rx_initiator);
590 rc = kptllnd_msg_unpack(msg, rx->rx_nob);
592 CERROR ("Error %d unpacking rx from %s\n",
593 rc, kptllnd_ptlid2str(rx->rx_initiator));
597 srcid.nid = msg->ptlm_srcnid;
598 srcid.pid = msg->ptlm_srcpid;
600 CDEBUG(D_NETTRACE, "%s: RX %s c %d %p rxb %p queued %lu ticks (%ld s)\n",
601 libcfs_id2str(srcid), kptllnd_msgtype2str(msg->ptlm_type),
602 msg->ptlm_credits, rx, rx->rx_rxb,
603 jiffies - rx->rx_treceived,
604 cfs_duration_sec(jiffies - rx->rx_treceived));
606 if (kptllnd_lnet2ptlnid(srcid.nid) != rx->rx_initiator.nid) {
607 CERROR("Bad source nid %s from %s\n",
608 libcfs_id2str(srcid),
609 kptllnd_ptlid2str(rx->rx_initiator));
613 if (msg->ptlm_type == PTLLND_MSG_TYPE_NAK) {
614 peer = kptllnd_id2peer(srcid);
618 CWARN("NAK from %s (%d:%s)\n",
619 libcfs_id2str(srcid), peer->peer_state,
620 kptllnd_ptlid2str(rx->rx_initiator));
622 /* NB can't nuke new peer - bug 17546 comment 31 */
623 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
624 CDEBUG(D_NET, "Stale NAK from %s(%s): WAITING_HELLO\n",
625 libcfs_id2str(srcid),
626 kptllnd_ptlid2str(rx->rx_initiator));
627 kptllnd_peer_decref(peer);
635 net = kptllnd_find_net(msg->ptlm_dstnid);
636 if (net == NULL || msg->ptlm_dstpid != the_lnet.ln_pid) {
637 CERROR("Bad dstid %s from %s\n",
638 libcfs_id2str((lnet_process_id_t) {
639 .nid = msg->ptlm_dstnid,
640 .pid = msg->ptlm_dstpid}),
641 kptllnd_ptlid2str(rx->rx_initiator));
645 if (LNET_NIDNET(srcid.nid) != LNET_NIDNET(net->net_ni->ni_nid)) {
646 lnet_nid_t nid = LNET_MKNID(LNET_NIDNET(net->net_ni->ni_nid),
647 LNET_NIDADDR(srcid.nid));
648 CERROR("Bad source nid %s from %s, %s expected.\n",
649 libcfs_id2str(srcid),
650 kptllnd_ptlid2str(rx->rx_initiator),
651 libcfs_nid2str(nid));
655 if (msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {
656 peer = kptllnd_peer_handle_hello(net, rx->rx_initiator, msg);
660 peer = kptllnd_id2peer(srcid);
662 CWARN("NAK %s: no connection, %s must reconnect\n",
663 kptllnd_msgtype2str(msg->ptlm_type),
664 libcfs_id2str(srcid));
665 /* NAK to make the peer reconnect */
666 kptllnd_nak(rx->rx_initiator);
670 /* Ignore any messages for a previous incarnation of me */
671 if (msg->ptlm_dststamp < peer->peer_myincarnation) {
672 kptllnd_peer_decref(peer);
676 if (msg->ptlm_dststamp != peer->peer_myincarnation) {
677 CERROR("%s: Unexpected dststamp "LPX64" "
678 "("LPX64" expected)\n",
679 libcfs_id2str(peer->peer_id), msg->ptlm_dststamp,
680 peer->peer_myincarnation);
685 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
686 /* recoverable error - restart txs */
687 spin_lock_irqsave(&peer->peer_lock, flags);
688 kptllnd_cancel_txlist(&peer->peer_sendq, &txs);
689 spin_unlock_irqrestore(&peer->peer_lock, flags);
691 CWARN("NAK %s: Unexpected %s message\n",
692 libcfs_id2str(srcid),
693 kptllnd_msgtype2str(msg->ptlm_type));
694 kptllnd_nak(rx->rx_initiator);
699 if (msg->ptlm_srcstamp != peer->peer_incarnation) {
700 CERROR("%s: Unexpected srcstamp "LPX64" "
701 "("LPX64" expected)\n",
702 libcfs_id2str(srcid),
704 peer->peer_incarnation);
710 LASSERTF (LNET_NIDADDR(msg->ptlm_srcnid) ==
711 LNET_NIDADDR(peer->peer_id.nid), "m %s p %s\n",
712 libcfs_nid2str(msg->ptlm_srcnid),
713 libcfs_nid2str(peer->peer_id.nid));
714 LASSERTF (msg->ptlm_srcpid == peer->peer_id.pid, "m %u p %u\n",
715 msg->ptlm_srcpid, peer->peer_id.pid);
717 spin_lock_irqsave(&peer->peer_lock, flags);
719 /* Check peer only sends when I've sent her credits */
720 if (peer->peer_sent_credits == 0) {
721 int c = peer->peer_credits;
722 int oc = peer->peer_outstanding_credits;
723 int sc = peer->peer_sent_credits;
725 spin_unlock_irqrestore(&peer->peer_lock, flags);
727 CERROR("%s: buffer overrun [%d/%d+%d]\n",
728 libcfs_id2str(peer->peer_id), c, sc, oc);
732 peer->peer_sent_credits--;
734 /* No check for credit overflow - the peer may post new
735 * buffers after the startup handshake. */
736 peer->peer_credits += msg->ptlm_credits;
738 /* This ensures the credit taken by NOOP can be returned */
739 if (msg->ptlm_type == PTLLND_MSG_TYPE_NOOP) {
740 peer->peer_outstanding_credits++;
741 post_credit = PTLLND_POSTRX_NO_CREDIT;
744 spin_unlock_irqrestore(&peer->peer_lock, flags);
746 /* See if something can go out now that credits have come in */
747 if (msg->ptlm_credits != 0)
748 kptllnd_peer_check_sends(peer);
750 /* ptllnd-level protocol correct - rx takes my ref on peer and increments
751 * peer_outstanding_credits when it completes */
753 kptllnd_peer_alive(peer);
755 switch (msg->ptlm_type) {
757 /* already checked by kptllnd_msg_unpack() */
760 case PTLLND_MSG_TYPE_HELLO:
761 CDEBUG(D_NET, "PTLLND_MSG_TYPE_HELLO\n");
764 case PTLLND_MSG_TYPE_NOOP:
765 CDEBUG(D_NET, "PTLLND_MSG_TYPE_NOOP\n");
768 case PTLLND_MSG_TYPE_IMMEDIATE:
769 CDEBUG(D_NET, "PTLLND_MSG_TYPE_IMMEDIATE\n");
770 rc = lnet_parse(net->net_ni,
771 &msg->ptlm_u.immediate.kptlim_hdr,
774 if (rc >= 0) { /* kptllnd_recv owns 'rx' now */
775 kptllnd_net_decref(net);
780 case PTLLND_MSG_TYPE_PUT:
781 case PTLLND_MSG_TYPE_GET:
782 CDEBUG(D_NET, "PTLLND_MSG_TYPE_%s\n",
783 msg->ptlm_type == PTLLND_MSG_TYPE_PUT ?
786 /* checked in kptllnd_msg_unpack() */
787 LASSERT (msg->ptlm_u.rdma.kptlrm_matchbits >=
788 PTL_RESERVED_MATCHBITS);
790 /* Update last match bits seen */
791 spin_lock_irqsave(&peer->peer_lock, flags);
793 if (msg->ptlm_u.rdma.kptlrm_matchbits >
794 rx->rx_peer->peer_last_matchbits_seen)
795 rx->rx_peer->peer_last_matchbits_seen =
796 msg->ptlm_u.rdma.kptlrm_matchbits;
798 spin_unlock_irqrestore(&rx->rx_peer->peer_lock, flags);
800 rc = lnet_parse(net->net_ni,
801 &msg->ptlm_u.rdma.kptlrm_hdr,
804 if (rc >= 0) { /* kptllnd_recv owns 'rx' now */
805 kptllnd_net_decref(net);
813 kptllnd_peer_close(peer, rc);
814 if (rx->rx_peer == NULL) /* drop ref on peer */
815 kptllnd_peer_decref(peer); /* unless rx_done will */
816 if (!cfs_list_empty(&txs)) {
817 LASSERT (net != NULL);
818 kptllnd_restart_txs(net, srcid, &txs);
822 kptllnd_net_decref(net);
823 kptllnd_rx_done(rx, post_credit);