4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
31 * This file is part of Lustre, http://www.lustre.org/
32 * Lustre is a trademark of Sun Microsystems, Inc.
34 * lnet/ulnds/ptllnd/ptllnd_cb.c
36 * Author: Eric Barton <eeb@bartonsoftware.com>
42 ptllnd_set_tx_deadline(ptllnd_tx_t *tx)
44 ptllnd_peer_t *peer = tx->tx_peer;
45 lnet_ni_t *ni = peer->plp_ni;
46 ptllnd_ni_t *plni = ni->ni_data;
48 tx->tx_deadline = cfs_time_current_sec() + plni->plni_timeout;
52 ptllnd_post_tx(ptllnd_tx_t *tx)
54 ptllnd_peer_t *peer = tx->tx_peer;
56 LASSERT (tx->tx_type != PTLLND_MSG_TYPE_NOOP);
58 ptllnd_set_tx_deadline(tx);
59 cfs_list_add_tail(&tx->tx_list, &peer->plp_txq);
60 ptllnd_check_sends(peer);
64 ptllnd_ptlid2str(ptl_process_id_t id)
66 static char strs[8][32];
69 char *str = strs[idx++];
71 if (idx >= sizeof(strs)/sizeof(strs[0]))
74 snprintf(str, sizeof(strs[0]), FMT_PTLID, id.pid, id.nid);
79 ptllnd_destroy_peer(ptllnd_peer_t *peer)
81 lnet_ni_t *ni = peer->plp_ni;
82 ptllnd_ni_t *plni = ni->ni_data;
83 int nmsg = peer->plp_lazy_credits +
84 plni->plni_peer_credits;
86 ptllnd_size_buffers(ni, -nmsg);
88 LASSERT (peer->plp_closing);
89 LASSERT (plni->plni_npeers > 0);
90 LASSERT (cfs_list_empty(&peer->plp_txq));
91 LASSERT (cfs_list_empty(&peer->plp_noopq));
92 LASSERT (cfs_list_empty(&peer->plp_activeq));
94 LIBCFS_FREE(peer, sizeof(*peer));
98 ptllnd_abort_txs(ptllnd_ni_t *plni, cfs_list_t *q)
100 while (!cfs_list_empty(q)) {
101 ptllnd_tx_t *tx = cfs_list_entry(q->next, ptllnd_tx_t, tx_list);
103 tx->tx_status = -ESHUTDOWN;
104 cfs_list_del(&tx->tx_list);
105 cfs_list_add_tail(&tx->tx_list, &plni->plni_zombie_txs);
110 ptllnd_close_peer(ptllnd_peer_t *peer, int error)
112 lnet_ni_t *ni = peer->plp_ni;
113 ptllnd_ni_t *plni = ni->ni_data;
115 if (peer->plp_closing)
118 peer->plp_closing = 1;
120 if (!cfs_list_empty(&peer->plp_txq) ||
121 !cfs_list_empty(&peer->plp_noopq) ||
122 !cfs_list_empty(&peer->plp_activeq) ||
124 CWARN("Closing %s: %d\n", libcfs_id2str(peer->plp_id), error);
125 if (plni->plni_debug)
126 ptllnd_dump_debug(ni, peer->plp_id);
129 ptllnd_abort_txs(plni, &peer->plp_txq);
130 ptllnd_abort_txs(plni, &peer->plp_noopq);
131 ptllnd_abort_txs(plni, &peer->plp_activeq);
133 cfs_list_del(&peer->plp_list);
134 ptllnd_peer_decref(peer);
138 ptllnd_find_peer(lnet_ni_t *ni, lnet_process_id_t id, int create)
140 ptllnd_ni_t *plni = ni->ni_data;
141 unsigned int hash = LNET_NIDADDR(id.nid) % plni->plni_peer_hash_size;
146 LASSERT (LNET_NIDNET(id.nid) == LNET_NIDNET(ni->ni_nid));
148 cfs_list_for_each_entry (plp, &plni->plni_peer_hash[hash], plp_list) {
149 if (plp->plp_id.nid == id.nid &&
150 plp->plp_id.pid == id.pid) {
151 ptllnd_peer_addref(plp);
159 /* New peer: check first for enough posted buffers */
161 rc = ptllnd_size_buffers(ni, plni->plni_peer_credits);
167 LIBCFS_ALLOC(plp, sizeof(*plp));
169 CERROR("Can't allocate new peer %s\n", libcfs_id2str(id));
171 ptllnd_size_buffers(ni, -plni->plni_peer_credits);
177 plp->plp_ptlid.nid = LNET_NIDADDR(id.nid);
178 plp->plp_ptlid.pid = plni->plni_ptllnd_pid;
179 plp->plp_credits = 1; /* add more later when she gives me credits */
180 plp->plp_max_msg_size = plni->plni_max_msg_size; /* until I hear from her */
181 plp->plp_sent_credits = 1; /* Implicit credit for HELLO */
182 plp->plp_outstanding_credits = plni->plni_peer_credits - 1;
183 plp->plp_lazy_credits = 0;
184 plp->plp_extra_lazy_credits = 0;
187 plp->plp_sent_hello = 0;
188 plp->plp_recvd_hello = 0;
189 plp->plp_closing = 0;
190 plp->plp_refcount = 1;
191 CFS_INIT_LIST_HEAD(&plp->plp_list);
192 CFS_INIT_LIST_HEAD(&plp->plp_txq);
193 CFS_INIT_LIST_HEAD(&plp->plp_noopq);
194 CFS_INIT_LIST_HEAD(&plp->plp_activeq);
196 ptllnd_peer_addref(plp);
197 cfs_list_add_tail(&plp->plp_list, &plni->plni_peer_hash[hash]);
199 tx = ptllnd_new_tx(plp, PTLLND_MSG_TYPE_HELLO, 0);
201 CERROR("Can't send HELLO to %s\n", libcfs_id2str(id));
202 ptllnd_close_peer(plp, -ENOMEM);
203 ptllnd_peer_decref(plp);
207 tx->tx_msg.ptlm_u.hello.kptlhm_matchbits = PTL_RESERVED_MATCHBITS;
208 tx->tx_msg.ptlm_u.hello.kptlhm_max_msg_size = plni->plni_max_msg_size;
210 PTLLND_HISTORY("%s[%d/%d+%d(%d)]: post hello %p", libcfs_id2str(id),
211 tx->tx_peer->plp_credits,
212 tx->tx_peer->plp_outstanding_credits,
213 tx->tx_peer->plp_sent_credits,
214 plni->plni_peer_credits +
215 tx->tx_peer->plp_lazy_credits, tx);
222 ptllnd_count_q(cfs_list_t *q)
227 cfs_list_for_each(e, q) {
235 ptllnd_tx_typestr(int type)
238 case PTLLND_RDMA_WRITE:
241 case PTLLND_RDMA_READ:
244 case PTLLND_MSG_TYPE_PUT:
247 case PTLLND_MSG_TYPE_GET:
250 case PTLLND_MSG_TYPE_IMMEDIATE:
253 case PTLLND_MSG_TYPE_NOOP:
256 case PTLLND_MSG_TYPE_HELLO:
265 ptllnd_debug_tx(ptllnd_tx_t *tx)
267 CDEBUG(D_WARNING, "%s %s b %ld.%06ld/%ld.%06ld"
268 " r %ld.%06ld/%ld.%06ld status %d\n",
269 ptllnd_tx_typestr(tx->tx_type),
270 libcfs_id2str(tx->tx_peer->plp_id),
271 tx->tx_bulk_posted.tv_sec, tx->tx_bulk_posted.tv_usec,
272 tx->tx_bulk_done.tv_sec, tx->tx_bulk_done.tv_usec,
273 tx->tx_req_posted.tv_sec, tx->tx_req_posted.tv_usec,
274 tx->tx_req_done.tv_sec, tx->tx_req_done.tv_usec,
279 ptllnd_debug_peer(lnet_ni_t *ni, lnet_process_id_t id)
281 ptllnd_peer_t *plp = ptllnd_find_peer(ni, id, 0);
282 ptllnd_ni_t *plni = ni->ni_data;
286 CDEBUG(D_WARNING, "No peer %s\n", libcfs_id2str(id));
290 CWARN("%s %s%s [%d] "LPU64".%06d m "LPU64" q %d/%d/%d c %d/%d+%d(%d)\n",
292 plp->plp_recvd_hello ? "H" : "_",
293 plp->plp_closing ? "C" : "_",
295 plp->plp_stamp / 1000000, (int)(plp->plp_stamp % 1000000),
297 ptllnd_count_q(&plp->plp_txq),
298 ptllnd_count_q(&plp->plp_noopq),
299 ptllnd_count_q(&plp->plp_activeq),
300 plp->plp_credits, plp->plp_outstanding_credits, plp->plp_sent_credits,
301 plni->plni_peer_credits + plp->plp_lazy_credits);
303 CDEBUG(D_WARNING, "txq:\n");
304 cfs_list_for_each_entry (tx, &plp->plp_txq, tx_list) {
308 CDEBUG(D_WARNING, "noopq:\n");
309 cfs_list_for_each_entry (tx, &plp->plp_noopq, tx_list) {
313 CDEBUG(D_WARNING, "activeq:\n");
314 cfs_list_for_each_entry (tx, &plp->plp_activeq, tx_list) {
318 CDEBUG(D_WARNING, "zombies:\n");
319 cfs_list_for_each_entry (tx, &plni->plni_zombie_txs, tx_list) {
320 if (tx->tx_peer->plp_id.nid == id.nid &&
321 tx->tx_peer->plp_id.pid == id.pid)
325 CDEBUG(D_WARNING, "history:\n");
326 cfs_list_for_each_entry (tx, &plni->plni_tx_history, tx_list) {
327 if (tx->tx_peer->plp_id.nid == id.nid &&
328 tx->tx_peer->plp_id.pid == id.pid)
332 ptllnd_peer_decref(plp);
336 ptllnd_dump_debug(lnet_ni_t *ni, lnet_process_id_t id)
338 ptllnd_debug_peer(ni, id);
339 ptllnd_dump_history();
343 ptllnd_setasync(lnet_ni_t *ni, lnet_process_id_t id, int nasync)
345 ptllnd_peer_t *peer = ptllnd_find_peer(ni, id, nasync > 0);
351 LASSERT (peer->plp_lazy_credits >= 0);
352 LASSERT (peer->plp_extra_lazy_credits >= 0);
354 /* If nasync < 0, we're being told we can reduce the total message
355 * headroom. We can't do this right now because our peer might already
356 * have credits for the extra buffers, so we just account the extra
357 * headroom in case we need it later and only destroy buffers when the
360 * Note that the following condition handles this case, where it
361 * actually increases the extra lazy credit counter. */
363 if (nasync <= peer->plp_extra_lazy_credits) {
364 peer->plp_extra_lazy_credits -= nasync;
368 LASSERT (nasync > 0);
370 nasync -= peer->plp_extra_lazy_credits;
371 peer->plp_extra_lazy_credits = 0;
373 rc = ptllnd_size_buffers(ni, nasync);
375 peer->plp_lazy_credits += nasync;
376 peer->plp_outstanding_credits += nasync;
383 ptllnd_cksum (void *ptr, int nob)
389 sum = ((sum << 1) | (sum >> 31)) + *c++;
391 /* ensure I don't return 0 (== no checksum) */
392 return (sum == 0) ? 1 : sum;
396 ptllnd_new_tx(ptllnd_peer_t *peer, int type, int payload_nob)
398 lnet_ni_t *ni = peer->plp_ni;
399 ptllnd_ni_t *plni = ni->ni_data;
403 CDEBUG(D_NET, "peer=%p type=%d payload=%d\n", peer, type, payload_nob);
409 case PTLLND_RDMA_WRITE:
410 case PTLLND_RDMA_READ:
411 LASSERT (payload_nob == 0);
415 case PTLLND_MSG_TYPE_PUT:
416 case PTLLND_MSG_TYPE_GET:
417 LASSERT (payload_nob == 0);
418 msgsize = offsetof(kptl_msg_t, ptlm_u) +
419 sizeof(kptl_rdma_msg_t);
422 case PTLLND_MSG_TYPE_IMMEDIATE:
423 msgsize = offsetof(kptl_msg_t,
424 ptlm_u.immediate.kptlim_payload[payload_nob]);
427 case PTLLND_MSG_TYPE_NOOP:
428 LASSERT (payload_nob == 0);
429 msgsize = offsetof(kptl_msg_t, ptlm_u);
432 case PTLLND_MSG_TYPE_HELLO:
433 LASSERT (payload_nob == 0);
434 msgsize = offsetof(kptl_msg_t, ptlm_u) +
435 sizeof(kptl_hello_msg_t);
439 msgsize = (msgsize + 7) & ~7;
440 LASSERT (msgsize <= peer->plp_max_msg_size);
442 LIBCFS_ALLOC(tx, offsetof(ptllnd_tx_t, tx_msg) + msgsize);
445 CERROR("Can't allocate msg type %d for %s\n",
446 type, libcfs_id2str(peer->plp_id));
450 CFS_INIT_LIST_HEAD(&tx->tx_list);
453 tx->tx_lnetmsg = tx->tx_lnetreplymsg = NULL;
456 tx->tx_reqmdh = PTL_INVALID_HANDLE;
457 tx->tx_bulkmdh = PTL_INVALID_HANDLE;
458 tx->tx_msgsize = msgsize;
459 tx->tx_completing = 0;
462 memset(&tx->tx_bulk_posted, 0, sizeof(tx->tx_bulk_posted));
463 memset(&tx->tx_bulk_done, 0, sizeof(tx->tx_bulk_done));
464 memset(&tx->tx_req_posted, 0, sizeof(tx->tx_req_posted));
465 memset(&tx->tx_req_done, 0, sizeof(tx->tx_req_done));
468 tx->tx_msg.ptlm_magic = PTLLND_MSG_MAGIC;
469 tx->tx_msg.ptlm_version = PTLLND_MSG_VERSION;
470 tx->tx_msg.ptlm_type = type;
471 tx->tx_msg.ptlm_credits = 0;
472 tx->tx_msg.ptlm_nob = msgsize;
473 tx->tx_msg.ptlm_cksum = 0;
474 tx->tx_msg.ptlm_srcnid = ni->ni_nid;
475 tx->tx_msg.ptlm_srcstamp = plni->plni_stamp;
476 tx->tx_msg.ptlm_dstnid = peer->plp_id.nid;
477 tx->tx_msg.ptlm_dststamp = peer->plp_stamp;
478 tx->tx_msg.ptlm_srcpid = the_lnet.ln_pid;
479 tx->tx_msg.ptlm_dstpid = peer->plp_id.pid;
482 ptllnd_peer_addref(peer);
485 CDEBUG(D_NET, "tx=%p\n", tx);
491 ptllnd_abort_tx(ptllnd_tx_t *tx, ptl_handle_md_t *mdh)
493 ptllnd_peer_t *peer = tx->tx_peer;
494 lnet_ni_t *ni = peer->plp_ni;
496 time_t start = cfs_time_current_sec();
497 ptllnd_ni_t *plni = ni->ni_data;
498 int w = plni->plni_long_wait;
500 while (!PtlHandleIsEqual(*mdh, PTL_INVALID_HANDLE)) {
501 rc = PtlMDUnlink(*mdh);
502 #ifndef LUSTRE_PORTALS_UNLINK_SEMANTICS
503 if (rc == PTL_OK) /* unlink successful => no unlinked event */
505 LASSERT (rc == PTL_MD_IN_USE);
507 if (w > 0 && cfs_time_current_sec() > start + w/1000) {
508 CWARN("Waited %ds to abort tx to %s\n",
509 (int)(cfs_time_current_sec() - start),
510 libcfs_id2str(peer->plp_id));
513 /* Wait for ptllnd_tx_event() to invalidate */
519 ptllnd_cull_tx_history(ptllnd_ni_t *plni)
521 int max = plni->plni_max_tx_history;
523 while (plni->plni_ntx_history > max) {
524 ptllnd_tx_t *tx = cfs_list_entry(plni->plni_tx_history.next,
525 ptllnd_tx_t, tx_list);
526 cfs_list_del(&tx->tx_list);
528 ptllnd_peer_decref(tx->tx_peer);
530 LIBCFS_FREE(tx, offsetof(ptllnd_tx_t, tx_msg) + tx->tx_msgsize);
532 LASSERT (plni->plni_ntxs > 0);
534 plni->plni_ntx_history--;
539 ptllnd_tx_done(ptllnd_tx_t *tx)
541 ptllnd_peer_t *peer = tx->tx_peer;
542 lnet_ni_t *ni = peer->plp_ni;
543 ptllnd_ni_t *plni = ni->ni_data;
545 /* CAVEAT EMPTOR: If this tx is being aborted, I'll continue to get
546 * events for this tx until it's unlinked. So I set tx_completing to
547 * flag the tx is getting handled */
549 if (tx->tx_completing)
552 tx->tx_completing = 1;
554 if (!cfs_list_empty(&tx->tx_list))
555 cfs_list_del_init(&tx->tx_list);
557 if (tx->tx_status != 0) {
558 if (plni->plni_debug) {
559 CERROR("Completing tx for %s with error %d\n",
560 libcfs_id2str(peer->plp_id), tx->tx_status);
563 ptllnd_close_peer(peer, tx->tx_status);
566 ptllnd_abort_tx(tx, &tx->tx_reqmdh);
567 ptllnd_abort_tx(tx, &tx->tx_bulkmdh);
569 if (tx->tx_niov > 0) {
570 LIBCFS_FREE(tx->tx_iov, tx->tx_niov * sizeof(*tx->tx_iov));
574 if (tx->tx_lnetreplymsg != NULL) {
575 LASSERT (tx->tx_type == PTLLND_MSG_TYPE_GET);
576 LASSERT (tx->tx_lnetmsg != NULL);
577 /* Simulate GET success always */
578 lnet_finalize(ni, tx->tx_lnetmsg, 0);
579 CDEBUG(D_NET, "lnet_finalize(tx_lnetreplymsg=%p)\n",tx->tx_lnetreplymsg);
580 lnet_finalize(ni, tx->tx_lnetreplymsg, tx->tx_status);
581 } else if (tx->tx_lnetmsg != NULL) {
582 lnet_finalize(ni, tx->tx_lnetmsg, tx->tx_status);
585 plni->plni_ntx_history++;
586 cfs_list_add_tail(&tx->tx_list, &plni->plni_tx_history);
588 ptllnd_cull_tx_history(plni);
592 ptllnd_set_txiov(ptllnd_tx_t *tx,
593 unsigned int niov, struct iovec *iov,
594 unsigned int offset, unsigned int len)
596 ptl_md_iovec_t *piov;
605 * Remove iovec's at the beginning that
606 * are skipped because of the offset.
607 * Adjust the offset accordingly
611 if (offset < iov->iov_len)
613 offset -= iov->iov_len;
619 int temp_offset = offset;
621 LIBCFS_ALLOC(piov, niov * sizeof(*piov));
625 for (npiov = 0;; npiov++) {
626 LASSERT (npiov < niov);
627 LASSERT (iov->iov_len >= temp_offset);
629 piov[npiov].iov_base = iov[npiov].iov_base + temp_offset;
630 piov[npiov].iov_len = iov[npiov].iov_len - temp_offset;
632 if (piov[npiov].iov_len >= resid) {
633 piov[npiov].iov_len = resid;
637 resid -= piov[npiov].iov_len;
647 /* Dang! The piov I allocated was too big and it's a drag to
648 * have to maintain separate 'allocated' and 'used' sizes, so
649 * I'll just do it again; NB this doesn't happen normally... */
650 LIBCFS_FREE(piov, niov * sizeof(*piov));
656 ptllnd_set_md_buffer(ptl_md_t *md, ptllnd_tx_t *tx)
658 unsigned int niov = tx->tx_niov;
659 ptl_md_iovec_t *iov = tx->tx_iov;
661 LASSERT ((md->options & PTL_MD_IOVEC) == 0);
666 } else if (niov == 1) {
667 md->start = iov[0].iov_base;
668 md->length = iov[0].iov_len;
672 md->options |= PTL_MD_IOVEC;
677 ptllnd_post_buffer(ptllnd_buffer_t *buf)
679 lnet_ni_t *ni = buf->plb_ni;
680 ptllnd_ni_t *plni = ni->ni_data;
681 ptl_process_id_t anyid = {
685 .start = buf->plb_buffer,
686 .length = plni->plni_buffer_size,
687 .threshold = PTL_MD_THRESH_INF,
688 .max_size = plni->plni_max_msg_size,
689 .options = (PTLLND_MD_OPTIONS |
690 PTL_MD_OP_PUT | PTL_MD_MAX_SIZE |
691 PTL_MD_LOCAL_ALIGN8),
692 .user_ptr = ptllnd_obj2eventarg(buf, PTLLND_EVENTARG_TYPE_BUF),
693 .eq_handle = plni->plni_eqh};
697 LASSERT (!buf->plb_posted);
699 rc = PtlMEAttach(plni->plni_nih, plni->plni_portal,
700 anyid, LNET_MSG_MATCHBITS, 0,
701 PTL_UNLINK, PTL_INS_AFTER, &meh);
703 CERROR("PtlMEAttach failed: %s(%d)\n",
704 ptllnd_errtype2str(rc), rc);
709 plni->plni_nposted_buffers++;
711 rc = PtlMDAttach(meh, md, LNET_UNLINK, &buf->plb_md);
715 CERROR("PtlMDAttach failed: %s(%d)\n",
716 ptllnd_errtype2str(rc), rc);
719 plni->plni_nposted_buffers--;
721 rc = PtlMEUnlink(meh);
722 LASSERT (rc == PTL_OK);
728 ptllnd_peer_send_noop (ptllnd_peer_t *peer)
730 ptllnd_ni_t *plni = peer->plp_ni->ni_data;
732 if (!peer->plp_sent_hello ||
733 peer->plp_credits == 0 ||
734 !cfs_list_empty(&peer->plp_noopq) ||
735 peer->plp_outstanding_credits < PTLLND_CREDIT_HIGHWATER(plni))
738 /* No tx to piggyback NOOP onto or no credit to send a tx */
739 return (cfs_list_empty(&peer->plp_txq) || peer->plp_credits == 1);
743 ptllnd_check_sends(ptllnd_peer_t *peer)
745 ptllnd_ni_t *plni = peer->plp_ni->ni_data;
751 CDEBUG(D_NET, "%s: [%d/%d+%d(%d)\n",
752 libcfs_id2str(peer->plp_id), peer->plp_credits,
753 peer->plp_outstanding_credits, peer->plp_sent_credits,
754 plni->plni_peer_credits + peer->plp_lazy_credits);
756 if (ptllnd_peer_send_noop(peer)) {
757 tx = ptllnd_new_tx(peer, PTLLND_MSG_TYPE_NOOP, 0);
758 CDEBUG(D_NET, "NOOP tx=%p\n",tx);
760 CERROR("Can't return credits to %s\n",
761 libcfs_id2str(peer->plp_id));
763 ptllnd_set_tx_deadline(tx);
764 cfs_list_add_tail(&tx->tx_list, &peer->plp_noopq);
769 if (!cfs_list_empty(&peer->plp_noopq)) {
770 LASSERT (peer->plp_sent_hello);
771 tx = cfs_list_entry(peer->plp_noopq.next,
772 ptllnd_tx_t, tx_list);
773 } else if (!cfs_list_empty(&peer->plp_txq)) {
774 tx = cfs_list_entry(peer->plp_txq.next,
775 ptllnd_tx_t, tx_list);
777 /* nothing to send right now */
781 LASSERT (tx->tx_msgsize > 0);
783 LASSERT (peer->plp_outstanding_credits >= 0);
784 LASSERT (peer->plp_sent_credits >= 0);
785 LASSERT (peer->plp_outstanding_credits + peer->plp_sent_credits
786 <= plni->plni_peer_credits + peer->plp_lazy_credits);
787 LASSERT (peer->plp_credits >= 0);
789 /* say HELLO first */
790 if (!peer->plp_sent_hello) {
791 LASSERT (cfs_list_empty(&peer->plp_noopq));
792 LASSERT (tx->tx_type == PTLLND_MSG_TYPE_HELLO);
794 peer->plp_sent_hello = 1;
797 if (peer->plp_credits == 0) { /* no credits */
798 PTLLND_HISTORY("%s[%d/%d+%d(%d)]: no creds for %p",
799 libcfs_id2str(peer->plp_id),
801 peer->plp_outstanding_credits,
802 peer->plp_sent_credits,
803 plni->plni_peer_credits +
804 peer->plp_lazy_credits, tx);
808 /* Last/Initial credit reserved for NOOP/HELLO */
809 if (peer->plp_credits == 1 &&
810 tx->tx_type != PTLLND_MSG_TYPE_NOOP &&
811 tx->tx_type != PTLLND_MSG_TYPE_HELLO) {
812 PTLLND_HISTORY("%s[%d/%d+%d(%d)]: too few creds for %p",
813 libcfs_id2str(peer->plp_id),
815 peer->plp_outstanding_credits,
816 peer->plp_sent_credits,
817 plni->plni_peer_credits +
818 peer->plp_lazy_credits, tx);
822 cfs_list_del(&tx->tx_list);
823 cfs_list_add_tail(&tx->tx_list, &peer->plp_activeq);
825 CDEBUG(D_NET, "Sending at TX=%p type=%s (%d)\n",tx,
826 ptllnd_msgtype2str(tx->tx_type),tx->tx_type);
828 if (tx->tx_type == PTLLND_MSG_TYPE_NOOP &&
829 !ptllnd_peer_send_noop(peer)) {
835 /* Set stamp at the last minute; on a new peer, I don't know it
836 * until I receive the HELLO back */
837 tx->tx_msg.ptlm_dststamp = peer->plp_stamp;
840 * Return all the credits we have
842 tx->tx_msg.ptlm_credits = MIN(PTLLND_MSG_MAX_CREDITS,
843 peer->plp_outstanding_credits);
844 peer->plp_sent_credits += tx->tx_msg.ptlm_credits;
845 peer->plp_outstanding_credits -= tx->tx_msg.ptlm_credits;
852 if (plni->plni_checksum)
853 tx->tx_msg.ptlm_cksum =
854 ptllnd_cksum(&tx->tx_msg,
855 offsetof(kptl_msg_t, ptlm_u));
857 md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);
858 md.eq_handle = plni->plni_eqh;
860 md.options = PTLLND_MD_OPTIONS;
861 md.start = &tx->tx_msg;
862 md.length = tx->tx_msgsize;
864 rc = PtlMDBind(plni->plni_nih, md, LNET_UNLINK, &mdh);
866 CERROR("PtlMDBind for %s failed: %s(%d)\n",
867 libcfs_id2str(peer->plp_id),
868 ptllnd_errtype2str(rc), rc);
869 tx->tx_status = -EIO;
874 LASSERT (tx->tx_type != PTLLND_RDMA_WRITE &&
875 tx->tx_type != PTLLND_RDMA_READ);
878 gettimeofday(&tx->tx_req_posted, NULL);
880 PTLLND_HISTORY("%s[%d/%d+%d(%d)]: %s %p c %d",
881 libcfs_id2str(peer->plp_id),
883 peer->plp_outstanding_credits,
884 peer->plp_sent_credits,
885 plni->plni_peer_credits +
886 peer->plp_lazy_credits,
887 ptllnd_msgtype2str(tx->tx_type), tx,
888 tx->tx_msg.ptlm_credits);
890 rc = PtlPut(mdh, PTL_NOACK_REQ, peer->plp_ptlid,
891 plni->plni_portal, 0, LNET_MSG_MATCHBITS, 0, 0);
893 CERROR("PtlPut for %s failed: %s(%d)\n",
894 libcfs_id2str(peer->plp_id),
895 ptllnd_errtype2str(rc), rc);
896 tx->tx_status = -EIO;
904 ptllnd_passive_rdma(ptllnd_peer_t *peer, int type, lnet_msg_t *msg,
905 unsigned int niov, struct iovec *iov,
906 unsigned int offset, unsigned int len)
908 lnet_ni_t *ni = peer->plp_ni;
909 ptllnd_ni_t *plni = ni->ni_data;
910 ptllnd_tx_t *tx = ptllnd_new_tx(peer, type, 0);
920 CDEBUG(D_NET, "niov=%d offset=%d len=%d\n",niov,offset,len);
922 LASSERT (type == PTLLND_MSG_TYPE_GET ||
923 type == PTLLND_MSG_TYPE_PUT);
926 CERROR("Can't allocate %s tx for %s\n",
927 ptllnd_msgtype2str(type), libcfs_id2str(peer->plp_id));
931 rc = ptllnd_set_txiov(tx, niov, iov, offset, len);
933 CERROR("Can't allocate iov %d for %s\n",
934 niov, libcfs_id2str(peer->plp_id));
939 md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);
940 md.eq_handle = plni->plni_eqh;
943 md.options = PTLLND_MD_OPTIONS;
944 if(type == PTLLND_MSG_TYPE_GET)
945 md.options |= PTL_MD_OP_PUT | PTL_MD_ACK_DISABLE;
947 md.options |= PTL_MD_OP_GET;
948 ptllnd_set_md_buffer(&md, tx);
950 start = cfs_time_current_sec();
951 w = plni->plni_long_wait;
952 ptllnd_set_tx_deadline(tx);
954 while (!peer->plp_recvd_hello) { /* wait to validate plp_match */
955 if (peer->plp_closing) {
960 /* NB must check here to avoid unbounded wait - tx not yet
961 * on peer->plp_txq, so ptllnd_watchdog can't expire it */
962 if (tx->tx_deadline < cfs_time_current_sec()) {
963 CERROR("%s tx for %s timed out\n",
964 ptllnd_msgtype2str(type),
965 libcfs_id2str(peer->plp_id));
970 if (w > 0 && cfs_time_current_sec() > start + w/1000) {
971 CWARN("Waited %ds to connect to %s\n",
972 (int)(cfs_time_current_sec() - start),
973 libcfs_id2str(peer->plp_id));
979 if (peer->plp_match < PTL_RESERVED_MATCHBITS)
980 peer->plp_match = PTL_RESERVED_MATCHBITS;
981 matchbits = peer->plp_match++;
983 rc = PtlMEAttach(plni->plni_nih, plni->plni_portal, peer->plp_ptlid,
984 matchbits, 0, PTL_UNLINK, PTL_INS_BEFORE, &meh);
986 CERROR("PtlMEAttach for %s failed: %s(%d)\n",
987 libcfs_id2str(peer->plp_id),
988 ptllnd_errtype2str(rc), rc);
993 gettimeofday(&tx->tx_bulk_posted, NULL);
995 rc = PtlMDAttach(meh, md, LNET_UNLINK, &mdh);
997 CERROR("PtlMDAttach for %s failed: %s(%d)\n",
998 libcfs_id2str(peer->plp_id),
999 ptllnd_errtype2str(rc), rc);
1000 rc2 = PtlMEUnlink(meh);
1001 LASSERT (rc2 == PTL_OK);
1005 tx->tx_bulkmdh = mdh;
1008 * We need to set the stamp here because it
1009 * we could have received a HELLO above that set
1012 tx->tx_msg.ptlm_dststamp = peer->plp_stamp;
1014 tx->tx_msg.ptlm_u.rdma.kptlrm_hdr = msg->msg_hdr;
1015 tx->tx_msg.ptlm_u.rdma.kptlrm_matchbits = matchbits;
1017 if (type == PTLLND_MSG_TYPE_GET) {
1018 tx->tx_lnetreplymsg = lnet_create_reply_msg(ni, msg);
1019 if (tx->tx_lnetreplymsg == NULL) {
1020 CERROR("Can't create reply for GET to %s\n",
1021 libcfs_id2str(msg->msg_target));
1027 tx->tx_lnetmsg = msg;
1028 PTLLND_HISTORY("%s[%d/%d+%d(%d)]: post passive %s p %d %p",
1029 libcfs_id2str(msg->msg_target),
1030 peer->plp_credits, peer->plp_outstanding_credits,
1031 peer->plp_sent_credits,
1032 plni->plni_peer_credits + peer->plp_lazy_credits,
1033 lnet_msgtyp2str(msg->msg_type),
1034 (le32_to_cpu(msg->msg_type) == LNET_MSG_PUT) ?
1035 le32_to_cpu(msg->msg_hdr.msg.put.ptl_index) :
1036 (le32_to_cpu(msg->msg_type) == LNET_MSG_GET) ?
1037 le32_to_cpu(msg->msg_hdr.msg.get.ptl_index) : -1,
1049 ptllnd_active_rdma(ptllnd_peer_t *peer, int type,
1050 lnet_msg_t *msg, __u64 matchbits,
1051 unsigned int niov, struct iovec *iov,
1052 unsigned int offset, unsigned int len)
1054 lnet_ni_t *ni = peer->plp_ni;
1055 ptllnd_ni_t *plni = ni->ni_data;
1056 ptllnd_tx_t *tx = ptllnd_new_tx(peer, type, 0);
1058 ptl_handle_md_t mdh;
1061 LASSERT (type == PTLLND_RDMA_READ ||
1062 type == PTLLND_RDMA_WRITE);
1065 CERROR("Can't allocate tx for RDMA %s with %s\n",
1066 (type == PTLLND_RDMA_WRITE) ? "write" : "read",
1067 libcfs_id2str(peer->plp_id));
1068 ptllnd_close_peer(peer, -ENOMEM);
1072 rc = ptllnd_set_txiov(tx, niov, iov, offset, len);
1074 CERROR("Can't allocate iov %d for %s\n",
1075 niov, libcfs_id2str(peer->plp_id));
1080 md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);
1081 md.eq_handle = plni->plni_eqh;
1083 md.options = PTLLND_MD_OPTIONS;
1084 md.threshold = (type == PTLLND_RDMA_READ) ? 2 : 1;
1086 ptllnd_set_md_buffer(&md, tx);
1088 rc = PtlMDBind(plni->plni_nih, md, LNET_UNLINK, &mdh);
1090 CERROR("PtlMDBind for %s failed: %s(%d)\n",
1091 libcfs_id2str(peer->plp_id),
1092 ptllnd_errtype2str(rc), rc);
1097 tx->tx_bulkmdh = mdh;
1098 tx->tx_lnetmsg = msg;
1100 ptllnd_set_tx_deadline(tx);
1101 cfs_list_add_tail(&tx->tx_list, &peer->plp_activeq);
1102 gettimeofday(&tx->tx_bulk_posted, NULL);
1104 if (type == PTLLND_RDMA_READ)
1105 rc = PtlGet(mdh, peer->plp_ptlid,
1106 plni->plni_portal, 0, matchbits, 0);
1108 rc = PtlPut(mdh, PTL_NOACK_REQ, peer->plp_ptlid,
1109 plni->plni_portal, 0, matchbits, 0,
1110 (msg == NULL) ? PTLLND_RDMA_FAIL : PTLLND_RDMA_OK);
1115 CERROR("Can't initiate RDMA with %s: %s(%d)\n",
1116 libcfs_id2str(peer->plp_id),
1117 ptllnd_errtype2str(rc), rc);
1119 tx->tx_lnetmsg = NULL;
1122 ptllnd_tx_done(tx); /* this will close peer */
1127 ptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *msg)
1129 ptllnd_ni_t *plni = ni->ni_data;
1135 LASSERT (!msg->msg_routing);
1136 LASSERT (msg->msg_kiov == NULL);
1138 LASSERT (msg->msg_niov <= PTL_MD_MAX_IOV); /* !!! */
1140 CDEBUG(D_NET, "%s [%d]+%d,%d -> %s%s\n",
1141 lnet_msgtyp2str(msg->msg_type),
1142 msg->msg_niov, msg->msg_offset, msg->msg_len,
1143 libcfs_nid2str(msg->msg_target.nid),
1144 msg->msg_target_is_router ? "(rtr)" : "");
1146 if ((msg->msg_target.pid & LNET_PID_USERFLAG) != 0) {
1147 CERROR("Can't send to non-kernel peer %s\n",
1148 libcfs_id2str(msg->msg_target));
1149 return -EHOSTUNREACH;
1152 plp = ptllnd_find_peer(ni, msg->msg_target, 1);
1156 switch (msg->msg_type) {
1161 LASSERT (msg->msg_len == 0);
1162 break; /* send IMMEDIATE */
1165 if (msg->msg_target_is_router)
1166 break; /* send IMMEDIATE */
1168 nob = msg->msg_md->md_length;
1169 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[nob]);
1170 if (nob <= plni->plni_max_msg_size)
1173 LASSERT ((msg->msg_md->md_options & LNET_MD_KIOV) == 0);
1174 rc = ptllnd_passive_rdma(plp, PTLLND_MSG_TYPE_GET, msg,
1175 msg->msg_md->md_niov,
1176 msg->msg_md->md_iov.iov,
1177 0, msg->msg_md->md_length);
1178 ptllnd_peer_decref(plp);
1181 case LNET_MSG_REPLY:
1184 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[nob]);
1185 if (nob <= plp->plp_max_msg_size)
1186 break; /* send IMMEDIATE */
1188 rc = ptllnd_passive_rdma(plp, PTLLND_MSG_TYPE_PUT, msg,
1189 msg->msg_niov, msg->msg_iov,
1190 msg->msg_offset, msg->msg_len);
1191 ptllnd_peer_decref(plp);
1196 * NB copy the payload so we don't have to do a fragmented send */
1198 tx = ptllnd_new_tx(plp, PTLLND_MSG_TYPE_IMMEDIATE, msg->msg_len);
1200 CERROR("Can't allocate tx for lnet type %d to %s\n",
1201 msg->msg_type, libcfs_id2str(msg->msg_target));
1202 ptllnd_peer_decref(plp);
1206 lnet_copy_iov2flat(tx->tx_msgsize, &tx->tx_msg,
1207 offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload),
1208 msg->msg_niov, msg->msg_iov, msg->msg_offset,
1210 tx->tx_msg.ptlm_u.immediate.kptlim_hdr = msg->msg_hdr;
1212 tx->tx_lnetmsg = msg;
1213 PTLLND_HISTORY("%s[%d/%d+%d(%d)]: post immediate %s p %d %p",
1214 libcfs_id2str(msg->msg_target),
1215 plp->plp_credits, plp->plp_outstanding_credits,
1216 plp->plp_sent_credits,
1217 plni->plni_peer_credits + plp->plp_lazy_credits,
1218 lnet_msgtyp2str(msg->msg_type),
1219 (le32_to_cpu(msg->msg_type) == LNET_MSG_PUT) ?
1220 le32_to_cpu(msg->msg_hdr.msg.put.ptl_index) :
1221 (le32_to_cpu(msg->msg_type) == LNET_MSG_GET) ?
1222 le32_to_cpu(msg->msg_hdr.msg.get.ptl_index) : -1,
1225 ptllnd_peer_decref(plp);
1230 ptllnd_rx_done(ptllnd_rx_t *rx)
1232 ptllnd_peer_t *plp = rx->rx_peer;
1233 ptllnd_ni_t *plni = plp->plp_ni->ni_data;
1235 plp->plp_outstanding_credits++;
1237 PTLLND_HISTORY("%s[%d/%d+%d(%d)]: rx=%p done\n",
1238 libcfs_id2str(plp->plp_id),
1239 plp->plp_credits, plp->plp_outstanding_credits,
1240 plp->plp_sent_credits,
1241 plni->plni_peer_credits + plp->plp_lazy_credits, rx);
1243 ptllnd_check_sends(plp);
1245 LASSERT (plni->plni_nrxs > 0);
1250 ptllnd_eager_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg,
1251 void **new_privatep)
1253 /* Shouldn't get here; recvs only block for router buffers */
1259 ptllnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg,
1260 int delayed, unsigned int niov,
1261 struct iovec *iov, lnet_kiov_t *kiov,
1262 unsigned int offset, unsigned int mlen, unsigned int rlen)
1264 ptllnd_rx_t *rx = private;
1268 LASSERT (kiov == NULL);
1269 LASSERT (niov <= PTL_MD_MAX_IOV); /* !!! */
1271 switch (rx->rx_msg->ptlm_type) {
1275 case PTLLND_MSG_TYPE_IMMEDIATE:
1276 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[mlen]);
1277 if (nob > rx->rx_nob) {
1278 CERROR("Immediate message from %s too big: %d(%d)\n",
1279 libcfs_id2str(rx->rx_peer->plp_id),
1284 lnet_copy_flat2iov(niov, iov, offset,
1285 rx->rx_nob, rx->rx_msg,
1286 offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload),
1288 lnet_finalize(ni, msg, 0);
1291 case PTLLND_MSG_TYPE_PUT:
1292 rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_READ, msg,
1293 rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,
1294 niov, iov, offset, mlen);
1297 case PTLLND_MSG_TYPE_GET:
1299 rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_WRITE, msg,
1300 rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,
1301 msg->msg_niov, msg->msg_iov,
1302 msg->msg_offset, msg->msg_len);
1304 rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_WRITE, NULL,
1305 rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,
1315 ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator,
1316 kptl_msg_t *msg, unsigned int nob)
1318 ptllnd_ni_t *plni = ni->ni_data;
1319 const int basenob = offsetof(kptl_msg_t, ptlm_u);
1320 lnet_process_id_t srcid;
1329 CERROR("Very short receive from %s\n",
1330 ptllnd_ptlid2str(initiator));
1334 /* I can at least read MAGIC/VERSION */
1336 flip = msg->ptlm_magic == __swab32(PTLLND_MSG_MAGIC);
1337 if (!flip && msg->ptlm_magic != PTLLND_MSG_MAGIC) {
1338 CERROR("Bad protocol magic %08x from %s\n",
1339 msg->ptlm_magic, ptllnd_ptlid2str(initiator));
1343 msg_version = flip ? __swab16(msg->ptlm_version) : msg->ptlm_version;
1345 if (msg_version != PTLLND_MSG_VERSION) {
1346 CERROR("Bad protocol version %04x from %s: %04x expected\n",
1347 (__u32)msg_version, ptllnd_ptlid2str(initiator), PTLLND_MSG_VERSION);
1349 if (plni->plni_abort_on_protocol_mismatch)
1355 if (nob < basenob) {
1356 CERROR("Short receive from %s: got %d, wanted at least %d\n",
1357 ptllnd_ptlid2str(initiator), nob, basenob);
1361 /* checksum must be computed with
1362 * 1) ptlm_cksum zero and
1363 * 2) BEFORE anything gets modified/flipped
1365 msg_cksum = flip ? __swab32(msg->ptlm_cksum) : msg->ptlm_cksum;
1366 msg->ptlm_cksum = 0;
1367 if (msg_cksum != 0 &&
1368 msg_cksum != ptllnd_cksum(msg, offsetof(kptl_msg_t, ptlm_u))) {
1369 CERROR("Bad checksum from %s\n", ptllnd_ptlid2str(initiator));
1373 msg->ptlm_version = msg_version;
1374 msg->ptlm_cksum = msg_cksum;
1377 /* NB stamps are opaque cookies */
1378 __swab32s(&msg->ptlm_nob);
1379 __swab64s(&msg->ptlm_srcnid);
1380 __swab64s(&msg->ptlm_dstnid);
1381 __swab32s(&msg->ptlm_srcpid);
1382 __swab32s(&msg->ptlm_dstpid);
1385 srcid.nid = msg->ptlm_srcnid;
1386 srcid.pid = msg->ptlm_srcpid;
1388 if (LNET_NIDNET(msg->ptlm_srcnid) != LNET_NIDNET(ni->ni_nid)) {
1389 CERROR("Bad source id %s from %s\n",
1390 libcfs_id2str(srcid),
1391 ptllnd_ptlid2str(initiator));
1395 if (msg->ptlm_type == PTLLND_MSG_TYPE_NAK) {
1396 CERROR("NAK from %s (%s)\n",
1397 libcfs_id2str(srcid),
1398 ptllnd_ptlid2str(initiator));
1400 if (plni->plni_dump_on_nak)
1401 ptllnd_dump_debug(ni, srcid);
1403 if (plni->plni_abort_on_nak)
1406 plp = ptllnd_find_peer(ni, srcid, 0);
1408 CERROR("Ignore NAK from %s: no peer\n", libcfs_id2str(srcid));
1411 ptllnd_close_peer(plp, -EPROTO);
1412 ptllnd_peer_decref(plp);
1416 if (msg->ptlm_dstnid != ni->ni_nid ||
1417 msg->ptlm_dstpid != the_lnet.ln_pid) {
1418 CERROR("Bad dstid %s (%s expected) from %s\n",
1419 libcfs_id2str((lnet_process_id_t) {
1420 .nid = msg->ptlm_dstnid,
1421 .pid = msg->ptlm_dstpid}),
1422 libcfs_id2str((lnet_process_id_t) {
1424 .pid = the_lnet.ln_pid}),
1425 libcfs_id2str(srcid));
1429 if (msg->ptlm_dststamp != plni->plni_stamp) {
1430 CERROR("Bad dststamp "LPX64"("LPX64" expected) from %s\n",
1431 msg->ptlm_dststamp, plni->plni_stamp,
1432 libcfs_id2str(srcid));
1436 PTLLND_HISTORY("RX %s: %s %d %p", libcfs_id2str(srcid),
1437 ptllnd_msgtype2str(msg->ptlm_type),
1438 msg->ptlm_credits, &rx);
1440 switch (msg->ptlm_type) {
1441 case PTLLND_MSG_TYPE_PUT:
1442 case PTLLND_MSG_TYPE_GET:
1443 if (nob < basenob + sizeof(kptl_rdma_msg_t)) {
1444 CERROR("Short rdma request from %s(%s)\n",
1445 libcfs_id2str(srcid),
1446 ptllnd_ptlid2str(initiator));
1450 __swab64s(&msg->ptlm_u.rdma.kptlrm_matchbits);
1453 case PTLLND_MSG_TYPE_IMMEDIATE:
1454 if (nob < offsetof(kptl_msg_t,
1455 ptlm_u.immediate.kptlim_payload)) {
1456 CERROR("Short immediate from %s(%s)\n",
1457 libcfs_id2str(srcid),
1458 ptllnd_ptlid2str(initiator));
1463 case PTLLND_MSG_TYPE_HELLO:
1464 if (nob < basenob + sizeof(kptl_hello_msg_t)) {
1465 CERROR("Short hello from %s(%s)\n",
1466 libcfs_id2str(srcid),
1467 ptllnd_ptlid2str(initiator));
1471 __swab64s(&msg->ptlm_u.hello.kptlhm_matchbits);
1472 __swab32s(&msg->ptlm_u.hello.kptlhm_max_msg_size);
1476 case PTLLND_MSG_TYPE_NOOP:
1480 CERROR("Bad message type %d from %s(%s)\n", msg->ptlm_type,
1481 libcfs_id2str(srcid),
1482 ptllnd_ptlid2str(initiator));
1486 plp = ptllnd_find_peer(ni, srcid, 0);
1488 CERROR("Can't find peer %s\n", libcfs_id2str(srcid));
1492 if (msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {
1493 if (plp->plp_recvd_hello) {
1494 CERROR("Unexpected HELLO from %s\n",
1495 libcfs_id2str(srcid));
1496 ptllnd_peer_decref(plp);
1500 plp->plp_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size;
1501 plp->plp_match = msg->ptlm_u.hello.kptlhm_matchbits;
1502 plp->plp_stamp = msg->ptlm_srcstamp;
1503 plp->plp_recvd_hello = 1;
1505 } else if (!plp->plp_recvd_hello) {
1507 CERROR("Bad message type %d (HELLO expected) from %s\n",
1508 msg->ptlm_type, libcfs_id2str(srcid));
1509 ptllnd_peer_decref(plp);
1512 } else if (msg->ptlm_srcstamp != plp->plp_stamp) {
1514 CERROR("Bad srcstamp "LPX64"("LPX64" expected) from %s\n",
1515 msg->ptlm_srcstamp, plp->plp_stamp,
1516 libcfs_id2str(srcid));
1517 ptllnd_peer_decref(plp);
1521 /* Check peer only sends when I've sent her credits */
1522 if (plp->plp_sent_credits == 0) {
1523 CERROR("%s[%d/%d+%d(%d)]: unexpected message\n",
1524 libcfs_id2str(plp->plp_id),
1525 plp->plp_credits, plp->plp_outstanding_credits,
1526 plp->plp_sent_credits,
1527 plni->plni_peer_credits + plp->plp_lazy_credits);
1530 plp->plp_sent_credits--;
1532 /* No check for credit overflow - the peer may post new buffers after
1533 * the startup handshake. */
1534 plp->plp_credits += msg->ptlm_credits;
1536 /* All OK so far; assume the message is good... */
1543 switch (msg->ptlm_type) {
1544 default: /* message types have been checked already */
1545 ptllnd_rx_done(&rx);
1548 case PTLLND_MSG_TYPE_PUT:
1549 case PTLLND_MSG_TYPE_GET:
1550 rc = lnet_parse(ni, &msg->ptlm_u.rdma.kptlrm_hdr,
1551 msg->ptlm_srcnid, &rx, 1);
1553 ptllnd_rx_done(&rx);
1556 case PTLLND_MSG_TYPE_IMMEDIATE:
1557 rc = lnet_parse(ni, &msg->ptlm_u.immediate.kptlim_hdr,
1558 msg->ptlm_srcnid, &rx, 0);
1560 ptllnd_rx_done(&rx);
1564 if (msg->ptlm_credits > 0)
1565 ptllnd_check_sends(plp);
1567 ptllnd_peer_decref(plp);
1571 ptllnd_buf_event (lnet_ni_t *ni, ptl_event_t *event)
1573 ptllnd_buffer_t *buf = ptllnd_eventarg2obj(event->md.user_ptr);
1574 ptllnd_ni_t *plni = ni->ni_data;
1575 char *msg = &buf->plb_buffer[event->offset];
1577 int unlinked = event->type == PTL_EVENT_UNLINK;
1579 LASSERT (buf->plb_ni == ni);
1580 LASSERT (event->type == PTL_EVENT_PUT_END ||
1581 event->type == PTL_EVENT_UNLINK);
1583 if (event->ni_fail_type != PTL_NI_OK) {
1585 CERROR("event type %s(%d), status %s(%d) from %s\n",
1586 ptllnd_evtype2str(event->type), event->type,
1587 ptllnd_errtype2str(event->ni_fail_type),
1588 event->ni_fail_type,
1589 ptllnd_ptlid2str(event->initiator));
1591 } else if (event->type == PTL_EVENT_PUT_END) {
1592 #if (PTL_MD_LOCAL_ALIGN8 == 0)
1593 /* Portals can't force message alignment - someone sending an
1594 * odd-length message could misalign subsequent messages */
1595 if ((event->mlength & 7) != 0) {
1596 CERROR("Message from %s has odd length "LPU64
1597 " probable version incompatibility\n",
1598 ptllnd_ptlid2str(event->initiator),
1603 LASSERT ((event->offset & 7) == 0);
1605 ptllnd_parse_request(ni, event->initiator,
1606 (kptl_msg_t *)msg, event->mlength);
1609 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
1610 /* UNLINK event only on explicit unlink */
1611 repost = (event->unlinked && event->type != PTL_EVENT_UNLINK);
1612 if (event->unlinked)
1615 /* UNLINK event only on implicit unlink */
1616 repost = (event->type == PTL_EVENT_UNLINK);
1620 LASSERT(buf->plb_posted);
1621 buf->plb_posted = 0;
1622 plni->plni_nposted_buffers--;
1626 (void) ptllnd_post_buffer(buf);
1630 ptllnd_tx_event (lnet_ni_t *ni, ptl_event_t *event)
1632 ptllnd_ni_t *plni = ni->ni_data;
1633 ptllnd_tx_t *tx = ptllnd_eventarg2obj(event->md.user_ptr);
1634 int error = (event->ni_fail_type != PTL_NI_OK);
1637 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
1638 int unlinked = event->unlinked;
1640 int unlinked = (event->type == PTL_EVENT_UNLINK);
1644 CERROR("Error %s(%d) event %s(%d) unlinked %d, %s(%d) for %s\n",
1645 ptllnd_errtype2str(event->ni_fail_type),
1646 event->ni_fail_type,
1647 ptllnd_evtype2str(event->type), event->type,
1648 unlinked, ptllnd_msgtype2str(tx->tx_type), tx->tx_type,
1649 libcfs_id2str(tx->tx_peer->plp_id));
1651 LASSERT (!PtlHandleIsEqual(event->md_handle, PTL_INVALID_HANDLE));
1653 isreq = PtlHandleIsEqual(event->md_handle, tx->tx_reqmdh);
1655 LASSERT (event->md.start == (void *)&tx->tx_msg);
1657 tx->tx_reqmdh = PTL_INVALID_HANDLE;
1658 gettimeofday(&tx->tx_req_done, NULL);
1662 isbulk = PtlHandleIsEqual(event->md_handle, tx->tx_bulkmdh);
1663 if ( isbulk && unlinked ) {
1664 tx->tx_bulkmdh = PTL_INVALID_HANDLE;
1665 gettimeofday(&tx->tx_bulk_done, NULL);
1668 LASSERT (!isreq != !isbulk); /* always one and only 1 match */
1670 PTLLND_HISTORY("%s[%d/%d+%d(%d)]: TX done %p %s%s",
1671 libcfs_id2str(tx->tx_peer->plp_id),
1672 tx->tx_peer->plp_credits,
1673 tx->tx_peer->plp_outstanding_credits,
1674 tx->tx_peer->plp_sent_credits,
1675 plni->plni_peer_credits + tx->tx_peer->plp_lazy_credits,
1676 tx, isreq ? "REQ" : "BULK", unlinked ? "(unlinked)" : "");
1678 LASSERT (!isreq != !isbulk); /* always one and only 1 match */
1679 switch (tx->tx_type) {
1683 case PTLLND_MSG_TYPE_NOOP:
1684 case PTLLND_MSG_TYPE_HELLO:
1685 case PTLLND_MSG_TYPE_IMMEDIATE:
1686 LASSERT (event->type == PTL_EVENT_UNLINK ||
1687 event->type == PTL_EVENT_SEND_END);
1691 case PTLLND_MSG_TYPE_GET:
1692 LASSERT (event->type == PTL_EVENT_UNLINK ||
1693 (isreq && event->type == PTL_EVENT_SEND_END) ||
1694 (isbulk && event->type == PTL_EVENT_PUT_END));
1696 if (isbulk && !error && event->type == PTL_EVENT_PUT_END) {
1697 /* Check GET matched */
1698 if (event->hdr_data == PTLLND_RDMA_OK) {
1699 lnet_set_reply_msg_len(ni,
1700 tx->tx_lnetreplymsg,
1703 CERROR ("Unmatched GET with %s\n",
1704 libcfs_id2str(tx->tx_peer->plp_id));
1705 tx->tx_status = -EIO;
1710 case PTLLND_MSG_TYPE_PUT:
1711 LASSERT (event->type == PTL_EVENT_UNLINK ||
1712 (isreq && event->type == PTL_EVENT_SEND_END) ||
1713 (isbulk && event->type == PTL_EVENT_GET_END));
1716 case PTLLND_RDMA_READ:
1717 LASSERT (event->type == PTL_EVENT_UNLINK ||
1718 event->type == PTL_EVENT_SEND_END ||
1719 event->type == PTL_EVENT_REPLY_END);
1723 case PTLLND_RDMA_WRITE:
1724 LASSERT (event->type == PTL_EVENT_UNLINK ||
1725 event->type == PTL_EVENT_SEND_END);
1729 /* Schedule ptllnd_tx_done() on error or last completion event */
1731 (PtlHandleIsEqual(tx->tx_bulkmdh, PTL_INVALID_HANDLE) &&
1732 PtlHandleIsEqual(tx->tx_reqmdh, PTL_INVALID_HANDLE))) {
1734 tx->tx_status = -EIO;
1735 cfs_list_del(&tx->tx_list);
1736 cfs_list_add_tail(&tx->tx_list, &plni->plni_zombie_txs);
1741 ptllnd_find_timed_out_tx(ptllnd_peer_t *peer)
1743 time_t now = cfs_time_current_sec();
1746 cfs_list_for_each_entry (tx, &peer->plp_txq, tx_list) {
1747 if (tx->tx_deadline < now)
1751 cfs_list_for_each_entry (tx, &peer->plp_noopq, tx_list) {
1752 if (tx->tx_deadline < now)
1756 cfs_list_for_each_entry (tx, &peer->plp_activeq, tx_list) {
1757 if (tx->tx_deadline < now)
1765 ptllnd_check_peer(ptllnd_peer_t *peer)
1767 ptllnd_tx_t *tx = ptllnd_find_timed_out_tx(peer);
1772 CERROR("%s (sent %d recvd %d, credits %d/%d/%d/%d/%d): timed out %p %p\n",
1773 libcfs_id2str(peer->plp_id), peer->plp_sent_hello, peer->plp_recvd_hello,
1774 peer->plp_credits, peer->plp_outstanding_credits,
1775 peer->plp_sent_credits, peer->plp_lazy_credits,
1776 peer->plp_extra_lazy_credits, tx, tx->tx_lnetmsg);
1777 ptllnd_debug_tx(tx);
1778 ptllnd_close_peer(peer, -ETIMEDOUT);
1782 ptllnd_watchdog (lnet_ni_t *ni, time_t now)
1784 ptllnd_ni_t *plni = ni->ni_data;
1786 int p = plni->plni_watchdog_interval;
1787 int chunk = plni->plni_peer_hash_size;
1788 int interval = now - (plni->plni_watchdog_nextt - p);
1790 cfs_list_t *hashlist;
1794 /* Time to check for RDMA timeouts on a few more peers:
1795 * I try to do checks every 'p' seconds on a proportion of the peer
1796 * table and I need to check every connection 'n' times within a
1797 * timeout interval, to ensure I detect a timeout on any connection
1798 * within (n+1)/n times the timeout interval. */
1800 LASSERT (now >= plni->plni_watchdog_nextt);
1802 if (plni->plni_timeout > n * interval) { /* Scan less than the whole table? */
1803 chunk = (chunk * n * interval) / plni->plni_timeout;
1808 for (i = 0; i < chunk; i++) {
1809 hashlist = &plni->plni_peer_hash[plni->plni_watchdog_peeridx];
1811 cfs_list_for_each_safe(tmp, nxt, hashlist) {
1812 ptllnd_check_peer(cfs_list_entry(tmp, ptllnd_peer_t,
1816 plni->plni_watchdog_peeridx = (plni->plni_watchdog_peeridx + 1) %
1817 plni->plni_peer_hash_size;
1820 plni->plni_watchdog_nextt = now + p;
1824 ptllnd_wait (lnet_ni_t *ni, int milliseconds)
1826 static struct timeval prevt;
1827 static int prevt_count;
1828 static int call_count;
1830 struct timeval start;
1831 struct timeval then;
1833 struct timeval deadline;
1835 ptllnd_ni_t *plni = ni->ni_data;
1843 /* Handle any currently queued events, returning immediately if any.
1844 * Otherwise block for the timeout and handle all events queued
1847 gettimeofday(&start, NULL);
1850 if (milliseconds <= 0) {
1853 deadline.tv_sec = start.tv_sec + milliseconds/1000;
1854 deadline.tv_usec = start.tv_usec + (milliseconds % 1000)*1000;
1856 if (deadline.tv_usec >= 1000000) {
1857 start.tv_usec -= 1000000;
1863 gettimeofday(&then, NULL);
1865 rc = PtlEQPoll(&plni->plni_eqh, 1, timeout, &event, &which);
1867 gettimeofday(&now, NULL);
1869 if ((now.tv_sec*1000 + now.tv_usec/1000) -
1870 (then.tv_sec*1000 + then.tv_usec/1000) > timeout + 1000) {
1871 /* 1000 mS grace...........................^ */
1872 CERROR("SLOW PtlEQPoll(%d): %dmS elapsed\n", timeout,
1873 (int)(now.tv_sec*1000 + now.tv_usec/1000) -
1874 (int)(then.tv_sec*1000 + then.tv_usec/1000));
1877 if (rc == PTL_EQ_EMPTY) {
1878 if (found) /* handled some events */
1881 if (now.tv_sec >= plni->plni_watchdog_nextt) { /* check timeouts? */
1882 ptllnd_watchdog(ni, now.tv_sec);
1883 LASSERT (now.tv_sec < plni->plni_watchdog_nextt);
1886 if (now.tv_sec > deadline.tv_sec || /* timeout expired */
1887 (now.tv_sec == deadline.tv_sec &&
1888 now.tv_usec >= deadline.tv_usec))
1891 if (milliseconds < 0 ||
1892 plni->plni_watchdog_nextt <= deadline.tv_sec) {
1893 timeout = (plni->plni_watchdog_nextt - now.tv_sec)*1000;
1895 timeout = (deadline.tv_sec - now.tv_sec)*1000 +
1896 (deadline.tv_usec - now.tv_usec)/1000;
1902 LASSERT (rc == PTL_OK || rc == PTL_EQ_DROPPED);
1904 if (rc == PTL_EQ_DROPPED)
1905 CERROR("Event queue: size %d is too small\n",
1906 plni->plni_eq_size);
1911 switch (ptllnd_eventarg2type(event.md.user_ptr)) {
1915 case PTLLND_EVENTARG_TYPE_TX:
1916 ptllnd_tx_event(ni, &event);
1919 case PTLLND_EVENTARG_TYPE_BUF:
1920 ptllnd_buf_event(ni, &event);
1925 while (!cfs_list_empty(&plni->plni_zombie_txs)) {
1926 tx = cfs_list_entry(plni->plni_zombie_txs.next,
1927 ptllnd_tx_t, tx_list);
1928 cfs_list_del_init(&tx->tx_list);
1932 if (prevt.tv_sec == 0 ||
1933 prevt.tv_sec != now.tv_sec) {
1934 PTLLND_HISTORY("%d wait entered at %d.%06d - prev %d %d.%06d",
1935 call_count, (int)start.tv_sec, (int)start.tv_usec,
1936 prevt_count, (int)prevt.tv_sec, (int)prevt.tv_usec);