1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lnet/klnds/ptllnd/ptllnd_rx_buf.c
38 * Author: PJ Kirner <pjkirner@clusterfs.com>
44 kptllnd_rx_buffer_pool_init(kptl_rx_buffer_pool_t *rxbp)
46 memset(rxbp, 0, sizeof(*rxbp));
47 spin_lock_init(&rxbp->rxbp_lock);
48 INIT_LIST_HEAD(&rxbp->rxbp_list);
52 kptllnd_rx_buffer_destroy(kptl_rx_buffer_t *rxb)
54 kptl_rx_buffer_pool_t *rxbp = rxb->rxb_pool;
56 LASSERT(rxb->rxb_refcount == 0);
57 LASSERT(PtlHandleIsEqual(rxb->rxb_mdh, PTL_INVALID_HANDLE));
58 LASSERT(!rxb->rxb_posted);
59 LASSERT(rxb->rxb_idle);
61 list_del(&rxb->rxb_list);
64 LIBCFS_FREE(rxb->rxb_buffer, kptllnd_rx_buffer_size());
65 LIBCFS_FREE(rxb, sizeof(*rxb));
69 kptllnd_rx_buffer_pool_reserve(kptl_rx_buffer_pool_t *rxbp, int count)
74 kptl_rx_buffer_t *rxb;
78 bufsize = kptllnd_rx_buffer_size();
79 msgs_per_buffer = bufsize / (*kptllnd_tunables.kptl_max_msg_size);
81 CDEBUG(D_NET, "kptllnd_rx_buffer_pool_reserve(%d)\n", count);
83 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
86 if (rxbp->rxbp_shutdown) {
91 if (rxbp->rxbp_reserved + count <=
92 rxbp->rxbp_count * msgs_per_buffer) {
97 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
99 LIBCFS_ALLOC(rxb, sizeof(*rxb));
100 LIBCFS_ALLOC(buffer, bufsize);
102 if (rxb == NULL || buffer == NULL) {
103 CERROR("Failed to allocate rx buffer\n");
106 LIBCFS_FREE(rxb, sizeof(*rxb));
108 LIBCFS_FREE(buffer, bufsize);
110 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
115 memset(rxb, 0, sizeof(*rxb));
117 rxb->rxb_eventarg.eva_type = PTLLND_EVENTARG_TYPE_BUF;
118 rxb->rxb_refcount = 0;
119 rxb->rxb_pool = rxbp;
122 rxb->rxb_buffer = buffer;
123 rxb->rxb_mdh = PTL_INVALID_HANDLE;
125 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
127 if (rxbp->rxbp_shutdown) {
128 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
130 LIBCFS_FREE(rxb, sizeof(*rxb));
131 LIBCFS_FREE(buffer, bufsize);
133 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
138 list_add_tail(&rxb->rxb_list, &rxbp->rxbp_list);
141 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
143 kptllnd_rx_buffer_post(rxb);
145 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
149 rxbp->rxbp_reserved += count;
151 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
157 kptllnd_rx_buffer_pool_unreserve(kptl_rx_buffer_pool_t *rxbp,
162 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
164 CDEBUG(D_NET, "kptllnd_rx_buffer_pool_unreserve(%d)\n", count);
165 rxbp->rxbp_reserved -= count;
167 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
171 kptllnd_rx_buffer_pool_fini(kptl_rx_buffer_pool_t *rxbp)
173 kptl_rx_buffer_t *rxb;
177 struct list_head *tmp;
178 struct list_head *nxt;
181 /* CAVEAT EMPTOR: I'm racing with everything here!!!
183 * Buffers can still be posted after I set rxbp_shutdown because I
184 * can't hold rxbp_lock while I'm posting them.
186 * Calling PtlMDUnlink() here races with auto-unlinks; i.e. a buffer's
187 * MD handle could become invalid under me. I am vulnerable to portals
188 * re-using handles (i.e. make the same handle valid again, but for a
189 * different MD) from when the MD is actually unlinked, to when the
190 * event callback tells me it has been unlinked. */
192 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
194 rxbp->rxbp_shutdown = 1;
197 list_for_each_safe(tmp, nxt, &rxbp->rxbp_list) {
198 rxb = list_entry (tmp, kptl_rx_buffer_t, rxb_list);
201 spin_unlock_irqrestore(&rxbp->rxbp_lock,
203 kptllnd_rx_buffer_destroy(rxb);
204 spin_lock_irqsave(&rxbp->rxbp_lock,
210 if (PtlHandleIsEqual(mdh, PTL_INVALID_HANDLE))
213 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
215 rc = PtlMDUnlink(mdh);
217 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
219 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
220 /* callback clears rxb_mdh and drops net's ref
221 * (which causes repost, but since I set
222 * shutdown, it will just set the buffer
227 rxb->rxb_mdh = PTL_INVALID_HANDLE;
228 kptllnd_rx_buffer_decref_locked(rxb);
233 if (list_empty(&rxbp->rxbp_list))
236 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
238 /* Wait a bit for references to be dropped */
239 CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
240 "Waiting for %d Busy RX Buffers\n",
243 cfs_pause(cfs_time_seconds(1));
245 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
248 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
252 kptllnd_rx_buffer_post(kptl_rx_buffer_t *rxb)
258 ptl_process_id_t any;
259 kptl_rx_buffer_pool_t *rxbp = rxb->rxb_pool;
262 LASSERT (!in_interrupt());
263 LASSERT (rxb->rxb_refcount == 0);
264 LASSERT (!rxb->rxb_idle);
265 LASSERT (!rxb->rxb_posted);
266 LASSERT (PtlHandleIsEqual(rxb->rxb_mdh, PTL_INVALID_HANDLE));
268 any.nid = PTL_NID_ANY;
269 any.pid = PTL_PID_ANY;
271 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
273 if (rxbp->rxbp_shutdown) {
275 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
279 rxb->rxb_refcount = 1; /* net's ref */
280 rxb->rxb_posted = 1; /* I'm posting */
282 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
284 rc = PtlMEAttach(kptllnd_data.kptl_nih,
285 *kptllnd_tunables.kptl_portal,
288 0, /* all matchbits are valid - ignore none */
293 CERROR("PtlMeAttach rxb failed %s(%d)\n",
294 kptllnd_errtype2str(rc), rc);
301 md.start = rxb->rxb_buffer;
302 md.length = PAGE_SIZE * *kptllnd_tunables.kptl_rxb_npages;
303 md.threshold = PTL_MD_THRESH_INF;
304 md.options = PTL_MD_OP_PUT |
305 PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
306 PTL_MD_EVENT_START_DISABLE |
309 md.user_ptr = &rxb->rxb_eventarg;
310 md.max_size = *kptllnd_tunables.kptl_max_msg_size;
311 md.eq_handle = kptllnd_data.kptl_eqh;
313 rc = PtlMDAttach(meh, md, PTL_UNLINK, &mdh);
315 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
316 if (rxb->rxb_posted) /* Not auto-unlinked yet!!! */
318 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
322 CERROR("PtlMDAttach rxb failed %s(%d)\n",
323 kptllnd_errtype2str(rc), rc);
324 rc = PtlMEUnlink(meh);
325 LASSERT(rc == PTL_OK);
328 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
330 /* XXX this will just try again immediately */
331 kptllnd_rx_buffer_decref_locked(rxb);
332 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
336 kptllnd_rx_alloc(void)
340 if (IS_SIMULATION_ENABLED(FAIL_RX_ALLOC)) {
341 CERROR ("FAIL_RX_ALLOC SIMULATION triggered\n");
345 rx = cfs_mem_cache_alloc(kptllnd_data.kptl_rx_cache, CFS_ALLOC_ATOMIC);
347 CERROR("Failed to allocate rx\n");
351 memset(rx, 0, sizeof(*rx));
356 kptllnd_rx_done(kptl_rx_t *rx, int post_credit)
358 kptl_rx_buffer_t *rxb = rx->rx_rxb;
359 kptl_peer_t *peer = rx->rx_peer;
362 LASSERT (post_credit == PTLLND_POSTRX_NO_CREDIT ||
363 post_credit == PTLLND_POSTRX_PEER_CREDIT);
365 CDEBUG(D_NET, "rx=%p rxb %p peer %p\n", rx, rxb, peer);
368 kptllnd_rx_buffer_decref(rxb);
371 /* Update credits (after I've decref-ed the buffer) */
372 spin_lock_irqsave(&peer->peer_lock, flags);
374 if (post_credit == PTLLND_POSTRX_PEER_CREDIT)
375 peer->peer_outstanding_credits++;
377 LASSERT (peer->peer_outstanding_credits +
378 peer->peer_sent_credits <=
379 *kptllnd_tunables.kptl_peertxcredits);
381 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: rx %p done\n",
382 libcfs_id2str(peer->peer_id), peer->peer_credits,
383 peer->peer_outstanding_credits, peer->peer_sent_credits,
386 spin_unlock_irqrestore(&peer->peer_lock, flags);
388 /* I might have to send back credits */
389 kptllnd_peer_check_sends(peer);
390 kptllnd_peer_decref(peer);
393 cfs_mem_cache_free(kptllnd_data.kptl_rx_cache, rx);
397 kptllnd_rx_buffer_callback (ptl_event_t *ev)
399 kptl_eventarg_t *eva = ev->md.user_ptr;
400 kptl_rx_buffer_t *rxb = kptllnd_eventarg2obj(eva);
401 kptl_rx_buffer_pool_t *rxbp = rxb->rxb_pool;
406 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
407 unlinked = ev->unlinked;
409 unlinked = ev->type == PTL_EVENT_UNLINK;
412 CDEBUG(D_NET, "%s: %s(%d) rxb=%p fail=%s(%d) unlink=%d\n",
413 kptllnd_ptlid2str(ev->initiator),
414 kptllnd_evtype2str(ev->type), ev->type, rxb,
415 kptllnd_errtype2str(ev->ni_fail_type), ev->ni_fail_type,
418 LASSERT (!rxb->rxb_idle);
419 LASSERT (ev->md.start == rxb->rxb_buffer);
420 LASSERT (ev->offset + ev->mlength <=
421 PAGE_SIZE * *kptllnd_tunables.kptl_rxb_npages);
422 LASSERT (ev->type == PTL_EVENT_PUT_END ||
423 ev->type == PTL_EVENT_UNLINK);
424 LASSERT (ev->type == PTL_EVENT_UNLINK ||
425 ev->match_bits == LNET_MSG_MATCHBITS);
427 if (ev->ni_fail_type != PTL_NI_OK) {
428 CERROR("Portals error from %s: %s(%d) rxb=%p fail=%s(%d) unlink=%dn",
429 kptllnd_ptlid2str(ev->initiator),
430 kptllnd_evtype2str(ev->type), ev->type, rxb,
431 kptllnd_errtype2str(ev->ni_fail_type),
432 ev->ni_fail_type, unlinked);
433 kptllnd_schedule_ptltrace_dump();
434 } else if (ev->type == PTL_EVENT_PUT_END &&
435 !rxbp->rxbp_shutdown) {
437 /* rxbp_shutdown sampled without locking! I only treat it as a
438 * hint since shutdown can start while rx's are queued on
440 #if (PTL_MD_LOCAL_ALIGN8 == 0)
441 /* Portals can't force message alignment - someone sending an
442 * odd-length message will misalign subsequent messages and
443 * force the fixup below... */
444 if ((ev->mlength & 7) != 0)
445 CWARN("Message from %s has odd length "LPU64": "
446 "probable version incompatibility\n",
447 kptllnd_ptlid2str(ev->initiator),
450 rx = kptllnd_rx_alloc();
452 CERROR("Message from %s dropped: ENOMEM",
453 kptllnd_ptlid2str(ev->initiator));
455 if ((ev->offset & 7) == 0) {
456 kptllnd_rx_buffer_addref(rxb);
458 rx->rx_nob = ev->mlength;
459 rx->rx_msg = (kptl_msg_t *)
460 (rxb->rxb_buffer + ev->offset);
462 #if (PTL_MD_LOCAL_ALIGN8 == 0)
463 /* Portals can't force alignment - copy into
464 * rx_space (avoiding overflow) to fix */
465 int maxlen = *kptllnd_tunables.kptl_max_msg_size;
468 rx->rx_nob = MIN(maxlen, ev->mlength);
469 rx->rx_msg = (kptl_msg_t *)rx->rx_space;
470 memcpy(rx->rx_msg, rxb->rxb_buffer + ev->offset,
473 /* Portals should have forced the alignment */
478 rx->rx_initiator = ev->initiator;
479 rx->rx_treceived = jiffies;
481 rx->rx_uid = ev->uid;
483 /* Queue for attention */
484 spin_lock_irqsave(&kptllnd_data.kptl_sched_lock,
487 list_add_tail(&rx->rx_list,
488 &kptllnd_data.kptl_sched_rxq);
489 wake_up(&kptllnd_data.kptl_sched_waitq);
491 spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock,
497 spin_lock_irqsave(&rxbp->rxbp_lock, flags);
500 rxb->rxb_mdh = PTL_INVALID_HANDLE;
501 kptllnd_rx_buffer_decref_locked(rxb);
503 spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
508 kptllnd_nak (kptl_rx_t *rx)
510 /* Fire-and-forget a stub message that will let the peer know my
511 * protocol magic/version and make her drop/refresh any peer state she
512 * might have with me. */
514 .start = kptllnd_data.kptl_nak_msg,
515 .length = kptllnd_data.kptl_nak_msg->ptlm_nob,
519 .eq_handle = PTL_EQ_NONE};
523 rc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &mdh);
525 CWARN("Can't NAK %s: bind failed %s(%d)\n",
526 kptllnd_ptlid2str(rx->rx_initiator),
527 kptllnd_errtype2str(rc), rc);
531 rc = PtlPut(mdh, PTL_NOACK_REQ, rx->rx_initiator,
532 *kptllnd_tunables.kptl_portal, 0,
533 LNET_MSG_MATCHBITS, 0, 0);
536 CWARN("Can't NAK %s: put failed %s(%d)\n",
537 kptllnd_ptlid2str(rx->rx_initiator),
538 kptllnd_errtype2str(rc), rc);
539 kptllnd_schedule_ptltrace_dump();
544 kptllnd_rx_parse(kptl_rx_t *rx)
546 kptl_msg_t *msg = rx->rx_msg;
548 int post_credit = PTLLND_POSTRX_PEER_CREDIT;
550 struct list_head txs;
552 lnet_process_id_t srcid;
554 LASSERT (rx->rx_peer == NULL);
556 INIT_LIST_HEAD(&txs);
558 if ((rx->rx_nob >= 4 &&
559 (msg->ptlm_magic == LNET_PROTO_MAGIC ||
560 msg->ptlm_magic == __swab32(LNET_PROTO_MAGIC))) ||
562 ((msg->ptlm_magic == PTLLND_MSG_MAGIC &&
563 msg->ptlm_version != PTLLND_MSG_VERSION) ||
564 (msg->ptlm_magic == __swab32(PTLLND_MSG_MAGIC) &&
565 msg->ptlm_version != __swab16(PTLLND_MSG_VERSION))))) {
566 /* NAK incompatible versions
567 * See other LNDs for how to handle this if/when ptllnd begins
568 * to allow different versions to co-exist */
569 CERROR("Bad version: got %04x expected %04x from %s\n",
570 (__u32)(msg->ptlm_magic == PTLLND_MSG_MAGIC ?
571 msg->ptlm_version : __swab16(msg->ptlm_version)),
572 PTLLND_MSG_VERSION, kptllnd_ptlid2str(rx->rx_initiator));
577 rc = kptllnd_msg_unpack(msg, rx->rx_nob);
579 CERROR ("Error %d unpacking rx from %s\n",
580 rc, kptllnd_ptlid2str(rx->rx_initiator));
584 srcid.nid = msg->ptlm_srcnid;
585 srcid.pid = msg->ptlm_srcpid;
587 CDEBUG(D_NETTRACE, "%s: RX %s c %d %p rxb %p queued %lu ticks (%ld s)\n",
588 libcfs_id2str(srcid), kptllnd_msgtype2str(msg->ptlm_type),
589 msg->ptlm_credits, rx, rx->rx_rxb,
590 jiffies - rx->rx_treceived,
591 cfs_duration_sec(jiffies - rx->rx_treceived));
593 if (srcid.nid != kptllnd_ptl2lnetnid(rx->rx_initiator.nid)) {
594 CERROR("Bad source id %s from %s\n",
595 libcfs_id2str(srcid),
596 kptllnd_ptlid2str(rx->rx_initiator));
600 if (msg->ptlm_type == PTLLND_MSG_TYPE_NAK) {
601 peer = kptllnd_id2peer(srcid);
605 CWARN("NAK from %s (%d:%s)\n",
606 libcfs_id2str(srcid), peer->peer_state,
607 kptllnd_ptlid2str(rx->rx_initiator));
609 /* NB can't nuke new peer - bug 17546 comment 31 */
610 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
611 CDEBUG(D_NET, "Stale NAK from %s(%s): WAITING_HELLO\n",
612 libcfs_id2str(srcid),
613 kptllnd_ptlid2str(rx->rx_initiator));
614 kptllnd_peer_decref(peer);
622 if (msg->ptlm_dstnid != kptllnd_data.kptl_ni->ni_nid ||
623 msg->ptlm_dstpid != the_lnet.ln_pid) {
624 CERROR("Bad dstid %s (expected %s) from %s\n",
625 libcfs_id2str((lnet_process_id_t) {
626 .nid = msg->ptlm_dstnid,
627 .pid = msg->ptlm_dstpid}),
628 libcfs_id2str((lnet_process_id_t) {
629 .nid = kptllnd_data.kptl_ni->ni_nid,
630 .pid = the_lnet.ln_pid}),
631 kptllnd_ptlid2str(rx->rx_initiator));
635 if (msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {
636 peer = kptllnd_peer_handle_hello(rx->rx_initiator, msg);
640 peer = kptllnd_id2peer(srcid);
642 CWARN("NAK %s: no connection, %s must reconnect\n",
643 kptllnd_msgtype2str(msg->ptlm_type),
644 libcfs_id2str(srcid));
645 /* NAK to make the peer reconnect */
650 /* Ignore any messages for a previous incarnation of me */
651 if (msg->ptlm_dststamp < peer->peer_myincarnation) {
652 kptllnd_peer_decref(peer);
656 if (msg->ptlm_dststamp != peer->peer_myincarnation) {
657 CERROR("%s: Unexpected dststamp "LPX64" "
658 "("LPX64" expected)\n",
659 libcfs_id2str(peer->peer_id), msg->ptlm_dststamp,
660 peer->peer_myincarnation);
665 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
666 /* recoverable error - restart txs */
667 spin_lock_irqsave(&peer->peer_lock, flags);
668 kptllnd_cancel_txlist(&peer->peer_sendq, &txs);
669 spin_unlock_irqrestore(&peer->peer_lock, flags);
671 CWARN("NAK %s: Unexpected %s message\n",
672 libcfs_id2str(srcid),
673 kptllnd_msgtype2str(msg->ptlm_type));
679 if (msg->ptlm_srcstamp != peer->peer_incarnation) {
680 CERROR("%s: Unexpected srcstamp "LPX64" "
681 "("LPX64" expected)\n",
682 libcfs_id2str(srcid),
684 peer->peer_incarnation);
690 LASSERT (msg->ptlm_srcnid == peer->peer_id.nid &&
691 msg->ptlm_srcpid == peer->peer_id.pid);
693 spin_lock_irqsave(&peer->peer_lock, flags);
695 /* Check peer only sends when I've sent her credits */
696 if (peer->peer_sent_credits == 0) {
697 int c = peer->peer_credits;
698 int oc = peer->peer_outstanding_credits;
699 int sc = peer->peer_sent_credits;
701 spin_unlock_irqrestore(&peer->peer_lock, flags);
703 CERROR("%s: buffer overrun [%d/%d+%d]\n",
704 libcfs_id2str(peer->peer_id), c, sc, oc);
708 peer->peer_sent_credits--;
710 /* No check for credit overflow - the peer may post new
711 * buffers after the startup handshake. */
712 peer->peer_credits += msg->ptlm_credits;
714 /* This ensures the credit taken by NOOP can be returned */
715 if (msg->ptlm_type == PTLLND_MSG_TYPE_NOOP) {
716 peer->peer_outstanding_credits++;
717 post_credit = PTLLND_POSTRX_NO_CREDIT;
720 spin_unlock_irqrestore(&peer->peer_lock, flags);
722 /* See if something can go out now that credits have come in */
723 if (msg->ptlm_credits != 0)
724 kptllnd_peer_check_sends(peer);
726 /* ptllnd-level protocol correct - rx takes my ref on peer and increments
727 * peer_outstanding_credits when it completes */
729 kptllnd_peer_alive(peer);
731 switch (msg->ptlm_type) {
733 /* already checked by kptllnd_msg_unpack() */
736 case PTLLND_MSG_TYPE_HELLO:
737 CDEBUG(D_NET, "PTLLND_MSG_TYPE_HELLO\n");
740 case PTLLND_MSG_TYPE_NOOP:
741 CDEBUG(D_NET, "PTLLND_MSG_TYPE_NOOP\n");
744 case PTLLND_MSG_TYPE_IMMEDIATE:
745 CDEBUG(D_NET, "PTLLND_MSG_TYPE_IMMEDIATE\n");
746 rc = lnet_parse(kptllnd_data.kptl_ni,
747 &msg->ptlm_u.immediate.kptlim_hdr,
750 if (rc >= 0) /* kptllnd_recv owns 'rx' now */
754 case PTLLND_MSG_TYPE_PUT:
755 case PTLLND_MSG_TYPE_GET:
756 CDEBUG(D_NET, "PTLLND_MSG_TYPE_%s\n",
757 msg->ptlm_type == PTLLND_MSG_TYPE_PUT ?
760 /* checked in kptllnd_msg_unpack() */
761 LASSERT (msg->ptlm_u.rdma.kptlrm_matchbits >=
762 PTL_RESERVED_MATCHBITS);
764 /* Update last match bits seen */
765 spin_lock_irqsave(&peer->peer_lock, flags);
767 if (msg->ptlm_u.rdma.kptlrm_matchbits >
768 rx->rx_peer->peer_last_matchbits_seen)
769 rx->rx_peer->peer_last_matchbits_seen =
770 msg->ptlm_u.rdma.kptlrm_matchbits;
772 spin_unlock_irqrestore(&rx->rx_peer->peer_lock, flags);
774 rc = lnet_parse(kptllnd_data.kptl_ni,
775 &msg->ptlm_u.rdma.kptlrm_hdr,
778 if (rc >= 0) /* kptllnd_recv owns 'rx' now */
785 kptllnd_peer_close(peer, rc);
786 if (rx->rx_peer == NULL) /* drop ref on peer */
787 kptllnd_peer_decref(peer); /* unless rx_done will */
788 if (!list_empty(&txs))
789 kptllnd_restart_txs(srcid, &txs);
791 kptllnd_rx_done(rx, post_credit);