1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lnet/ulnds/ptllnd/ptllnd_cb.c
38 * Author: Eric Barton <eeb@bartonsoftware.com>
44 ptllnd_set_tx_deadline(ptllnd_tx_t *tx)
46 ptllnd_peer_t *peer = tx->tx_peer;
47 lnet_ni_t *ni = peer->plp_ni;
48 ptllnd_ni_t *plni = ni->ni_data;
50 tx->tx_deadline = cfs_time_current_sec() + plni->plni_timeout;
54 ptllnd_post_tx(ptllnd_tx_t *tx)
56 ptllnd_peer_t *peer = tx->tx_peer;
58 LASSERT (tx->tx_type != PTLLND_MSG_TYPE_NOOP);
60 ptllnd_set_tx_deadline(tx);
61 list_add_tail(&tx->tx_list, &peer->plp_txq);
62 ptllnd_check_sends(peer);
66 ptllnd_ptlid2str(ptl_process_id_t id)
68 static char strs[8][32];
71 char *str = strs[idx++];
73 if (idx >= sizeof(strs)/sizeof(strs[0]))
76 snprintf(str, sizeof(strs[0]), FMT_PTLID, id.pid, id.nid);
81 ptllnd_destroy_peer(ptllnd_peer_t *peer)
83 lnet_ni_t *ni = peer->plp_ni;
84 ptllnd_ni_t *plni = ni->ni_data;
85 int nmsg = peer->plp_lazy_credits +
86 plni->plni_peer_credits;
88 ptllnd_size_buffers(ni, -nmsg);
90 LASSERT (peer->plp_closing);
91 LASSERT (plni->plni_npeers > 0);
92 LASSERT (list_empty(&peer->plp_txq));
93 LASSERT (list_empty(&peer->plp_noopq));
94 LASSERT (list_empty(&peer->plp_activeq));
96 LIBCFS_FREE(peer, sizeof(*peer));
100 ptllnd_abort_txs(ptllnd_ni_t *plni, struct list_head *q)
102 while (!list_empty(q)) {
103 ptllnd_tx_t *tx = list_entry(q->next, ptllnd_tx_t, tx_list);
105 tx->tx_status = -ESHUTDOWN;
106 list_del(&tx->tx_list);
107 list_add_tail(&tx->tx_list, &plni->plni_zombie_txs);
112 ptllnd_close_peer(ptllnd_peer_t *peer, int error)
114 lnet_ni_t *ni = peer->plp_ni;
115 ptllnd_ni_t *plni = ni->ni_data;
117 if (peer->plp_closing)
120 peer->plp_closing = 1;
122 if (!list_empty(&peer->plp_txq) ||
123 !list_empty(&peer->plp_noopq) ||
124 !list_empty(&peer->plp_activeq) ||
126 CWARN("Closing %s: %d\n", libcfs_id2str(peer->plp_id), error);
127 if (plni->plni_debug)
128 ptllnd_dump_debug(ni, peer->plp_id);
131 ptllnd_abort_txs(plni, &peer->plp_txq);
132 ptllnd_abort_txs(plni, &peer->plp_noopq);
133 ptllnd_abort_txs(plni, &peer->plp_activeq);
135 list_del(&peer->plp_list);
136 ptllnd_peer_decref(peer);
140 ptllnd_find_peer(lnet_ni_t *ni, lnet_process_id_t id, int create)
142 ptllnd_ni_t *plni = ni->ni_data;
143 unsigned int hash = LNET_NIDADDR(id.nid) % plni->plni_peer_hash_size;
148 LASSERT (LNET_NIDNET(id.nid) == LNET_NIDNET(ni->ni_nid));
150 list_for_each_entry (plp, &plni->plni_peer_hash[hash], plp_list) {
151 if (plp->plp_id.nid == id.nid &&
152 plp->plp_id.pid == id.pid) {
153 ptllnd_peer_addref(plp);
161 /* New peer: check first for enough posted buffers */
163 rc = ptllnd_size_buffers(ni, plni->plni_peer_credits);
169 LIBCFS_ALLOC(plp, sizeof(*plp));
171 CERROR("Can't allocate new peer %s\n", libcfs_id2str(id));
173 ptllnd_size_buffers(ni, -plni->plni_peer_credits);
179 plp->plp_ptlid.nid = LNET_NIDADDR(id.nid);
180 plp->plp_ptlid.pid = plni->plni_ptllnd_pid;
181 plp->plp_credits = 1; /* add more later when she gives me credits */
182 plp->plp_max_msg_size = plni->plni_max_msg_size; /* until I hear from her */
183 plp->plp_sent_credits = 1; /* Implicit credit for HELLO */
184 plp->plp_outstanding_credits = plni->plni_peer_credits - 1;
185 plp->plp_lazy_credits = 0;
186 plp->plp_extra_lazy_credits = 0;
189 plp->plp_sent_hello = 0;
190 plp->plp_recvd_hello = 0;
191 plp->plp_closing = 0;
192 plp->plp_refcount = 1;
193 CFS_INIT_LIST_HEAD(&plp->plp_list);
194 CFS_INIT_LIST_HEAD(&plp->plp_txq);
195 CFS_INIT_LIST_HEAD(&plp->plp_noopq);
196 CFS_INIT_LIST_HEAD(&plp->plp_activeq);
198 ptllnd_peer_addref(plp);
199 list_add_tail(&plp->plp_list, &plni->plni_peer_hash[hash]);
201 tx = ptllnd_new_tx(plp, PTLLND_MSG_TYPE_HELLO, 0);
203 CERROR("Can't send HELLO to %s\n", libcfs_id2str(id));
204 ptllnd_close_peer(plp, -ENOMEM);
205 ptllnd_peer_decref(plp);
209 tx->tx_msg.ptlm_u.hello.kptlhm_matchbits = PTL_RESERVED_MATCHBITS;
210 tx->tx_msg.ptlm_u.hello.kptlhm_max_msg_size = plni->plni_max_msg_size;
212 PTLLND_HISTORY("%s[%d/%d+%d(%d)]: post hello %p", libcfs_id2str(id),
213 tx->tx_peer->plp_credits,
214 tx->tx_peer->plp_outstanding_credits,
215 tx->tx_peer->plp_sent_credits,
216 plni->plni_peer_credits +
217 tx->tx_peer->plp_lazy_credits, tx);
224 ptllnd_count_q(struct list_head *q)
229 list_for_each(e, q) {
237 ptllnd_tx_typestr(int type)
240 case PTLLND_RDMA_WRITE:
243 case PTLLND_RDMA_READ:
246 case PTLLND_MSG_TYPE_PUT:
249 case PTLLND_MSG_TYPE_GET:
252 case PTLLND_MSG_TYPE_IMMEDIATE:
255 case PTLLND_MSG_TYPE_NOOP:
258 case PTLLND_MSG_TYPE_HELLO:
267 ptllnd_debug_tx(ptllnd_tx_t *tx)
269 CDEBUG(D_WARNING, "%s %s b %ld.%06ld/%ld.%06ld"
270 " r %ld.%06ld/%ld.%06ld status %d\n",
271 ptllnd_tx_typestr(tx->tx_type),
272 libcfs_id2str(tx->tx_peer->plp_id),
273 tx->tx_bulk_posted.tv_sec, tx->tx_bulk_posted.tv_usec,
274 tx->tx_bulk_done.tv_sec, tx->tx_bulk_done.tv_usec,
275 tx->tx_req_posted.tv_sec, tx->tx_req_posted.tv_usec,
276 tx->tx_req_done.tv_sec, tx->tx_req_done.tv_usec,
281 ptllnd_debug_peer(lnet_ni_t *ni, lnet_process_id_t id)
283 ptllnd_peer_t *plp = ptllnd_find_peer(ni, id, 0);
284 ptllnd_ni_t *plni = ni->ni_data;
288 CDEBUG(D_WARNING, "No peer %s\n", libcfs_id2str(id));
292 CWARN("%s %s%s [%d] "LPU64".%06d m "LPU64" q %d/%d/%d c %d/%d+%d(%d)\n",
294 plp->plp_recvd_hello ? "H" : "_",
295 plp->plp_closing ? "C" : "_",
297 plp->plp_stamp / 1000000, (int)(plp->plp_stamp % 1000000),
299 ptllnd_count_q(&plp->plp_txq),
300 ptllnd_count_q(&plp->plp_noopq),
301 ptllnd_count_q(&plp->plp_activeq),
302 plp->plp_credits, plp->plp_outstanding_credits, plp->plp_sent_credits,
303 plni->plni_peer_credits + plp->plp_lazy_credits);
305 CDEBUG(D_WARNING, "txq:\n");
306 list_for_each_entry (tx, &plp->plp_txq, tx_list) {
310 CDEBUG(D_WARNING, "noopq:\n");
311 list_for_each_entry (tx, &plp->plp_noopq, tx_list) {
315 CDEBUG(D_WARNING, "activeq:\n");
316 list_for_each_entry (tx, &plp->plp_activeq, tx_list) {
320 CDEBUG(D_WARNING, "zombies:\n");
321 list_for_each_entry (tx, &plni->plni_zombie_txs, tx_list) {
322 if (tx->tx_peer->plp_id.nid == id.nid &&
323 tx->tx_peer->plp_id.pid == id.pid)
327 CDEBUG(D_WARNING, "history:\n");
328 list_for_each_entry (tx, &plni->plni_tx_history, tx_list) {
329 if (tx->tx_peer->plp_id.nid == id.nid &&
330 tx->tx_peer->plp_id.pid == id.pid)
334 ptllnd_peer_decref(plp);
338 ptllnd_dump_debug(lnet_ni_t *ni, lnet_process_id_t id)
340 ptllnd_debug_peer(ni, id);
341 ptllnd_dump_history();
345 ptllnd_setasync(lnet_ni_t *ni, lnet_process_id_t id, int nasync)
347 ptllnd_peer_t *peer = ptllnd_find_peer(ni, id, nasync > 0);
353 LASSERT (peer->plp_lazy_credits >= 0);
354 LASSERT (peer->plp_extra_lazy_credits >= 0);
356 /* If nasync < 0, we're being told we can reduce the total message
357 * headroom. We can't do this right now because our peer might already
358 * have credits for the extra buffers, so we just account the extra
359 * headroom in case we need it later and only destroy buffers when the
362 * Note that the following condition handles this case, where it
363 * actually increases the extra lazy credit counter. */
365 if (nasync <= peer->plp_extra_lazy_credits) {
366 peer->plp_extra_lazy_credits -= nasync;
370 LASSERT (nasync > 0);
372 nasync -= peer->plp_extra_lazy_credits;
373 peer->plp_extra_lazy_credits = 0;
375 rc = ptllnd_size_buffers(ni, nasync);
377 peer->plp_lazy_credits += nasync;
378 peer->plp_outstanding_credits += nasync;
385 ptllnd_cksum (void *ptr, int nob)
391 sum = ((sum << 1) | (sum >> 31)) + *c++;
393 /* ensure I don't return 0 (== no checksum) */
394 return (sum == 0) ? 1 : sum;
398 ptllnd_new_tx(ptllnd_peer_t *peer, int type, int payload_nob)
400 lnet_ni_t *ni = peer->plp_ni;
401 ptllnd_ni_t *plni = ni->ni_data;
405 CDEBUG(D_NET, "peer=%p type=%d payload=%d\n", peer, type, payload_nob);
411 case PTLLND_RDMA_WRITE:
412 case PTLLND_RDMA_READ:
413 LASSERT (payload_nob == 0);
417 case PTLLND_MSG_TYPE_PUT:
418 case PTLLND_MSG_TYPE_GET:
419 LASSERT (payload_nob == 0);
420 msgsize = offsetof(kptl_msg_t, ptlm_u) +
421 sizeof(kptl_rdma_msg_t);
424 case PTLLND_MSG_TYPE_IMMEDIATE:
425 msgsize = offsetof(kptl_msg_t,
426 ptlm_u.immediate.kptlim_payload[payload_nob]);
429 case PTLLND_MSG_TYPE_NOOP:
430 LASSERT (payload_nob == 0);
431 msgsize = offsetof(kptl_msg_t, ptlm_u);
434 case PTLLND_MSG_TYPE_HELLO:
435 LASSERT (payload_nob == 0);
436 msgsize = offsetof(kptl_msg_t, ptlm_u) +
437 sizeof(kptl_hello_msg_t);
441 msgsize = (msgsize + 7) & ~7;
442 LASSERT (msgsize <= peer->plp_max_msg_size);
444 LIBCFS_ALLOC(tx, offsetof(ptllnd_tx_t, tx_msg) + msgsize);
447 CERROR("Can't allocate msg type %d for %s\n",
448 type, libcfs_id2str(peer->plp_id));
452 CFS_INIT_LIST_HEAD(&tx->tx_list);
455 tx->tx_lnetmsg = tx->tx_lnetreplymsg = NULL;
458 tx->tx_reqmdh = PTL_INVALID_HANDLE;
459 tx->tx_bulkmdh = PTL_INVALID_HANDLE;
460 tx->tx_msgsize = msgsize;
461 tx->tx_completing = 0;
464 memset(&tx->tx_bulk_posted, 0, sizeof(tx->tx_bulk_posted));
465 memset(&tx->tx_bulk_done, 0, sizeof(tx->tx_bulk_done));
466 memset(&tx->tx_req_posted, 0, sizeof(tx->tx_req_posted));
467 memset(&tx->tx_req_done, 0, sizeof(tx->tx_req_done));
470 tx->tx_msg.ptlm_magic = PTLLND_MSG_MAGIC;
471 tx->tx_msg.ptlm_version = PTLLND_MSG_VERSION;
472 tx->tx_msg.ptlm_type = type;
473 tx->tx_msg.ptlm_credits = 0;
474 tx->tx_msg.ptlm_nob = msgsize;
475 tx->tx_msg.ptlm_cksum = 0;
476 tx->tx_msg.ptlm_srcnid = ni->ni_nid;
477 tx->tx_msg.ptlm_srcstamp = plni->plni_stamp;
478 tx->tx_msg.ptlm_dstnid = peer->plp_id.nid;
479 tx->tx_msg.ptlm_dststamp = peer->plp_stamp;
480 tx->tx_msg.ptlm_srcpid = the_lnet.ln_pid;
481 tx->tx_msg.ptlm_dstpid = peer->plp_id.pid;
484 ptllnd_peer_addref(peer);
487 CDEBUG(D_NET, "tx=%p\n", tx);
493 ptllnd_abort_tx(ptllnd_tx_t *tx, ptl_handle_md_t *mdh)
495 ptllnd_peer_t *peer = tx->tx_peer;
496 lnet_ni_t *ni = peer->plp_ni;
498 time_t start = cfs_time_current_sec();
499 ptllnd_ni_t *plni = ni->ni_data;
500 int w = plni->plni_long_wait;
502 while (!PtlHandleIsEqual(*mdh, PTL_INVALID_HANDLE)) {
503 rc = PtlMDUnlink(*mdh);
504 #ifndef LUSTRE_PORTALS_UNLINK_SEMANTICS
505 if (rc == PTL_OK) /* unlink successful => no unlinked event */
507 LASSERT (rc == PTL_MD_IN_USE);
509 if (w > 0 && cfs_time_current_sec() > start + w/1000) {
510 CWARN("Waited %ds to abort tx to %s\n",
511 (int)(cfs_time_current_sec() - start),
512 libcfs_id2str(peer->plp_id));
515 /* Wait for ptllnd_tx_event() to invalidate */
521 ptllnd_cull_tx_history(ptllnd_ni_t *plni)
523 int max = plni->plni_max_tx_history;
525 while (plni->plni_ntx_history > max) {
526 ptllnd_tx_t *tx = list_entry(plni->plni_tx_history.next,
527 ptllnd_tx_t, tx_list);
528 list_del(&tx->tx_list);
530 ptllnd_peer_decref(tx->tx_peer);
532 LIBCFS_FREE(tx, offsetof(ptllnd_tx_t, tx_msg) + tx->tx_msgsize);
534 LASSERT (plni->plni_ntxs > 0);
536 plni->plni_ntx_history--;
541 ptllnd_tx_done(ptllnd_tx_t *tx)
543 ptllnd_peer_t *peer = tx->tx_peer;
544 lnet_ni_t *ni = peer->plp_ni;
545 ptllnd_ni_t *plni = ni->ni_data;
547 /* CAVEAT EMPTOR: If this tx is being aborted, I'll continue to get
548 * events for this tx until it's unlinked. So I set tx_completing to
549 * flag the tx is getting handled */
551 if (tx->tx_completing)
554 tx->tx_completing = 1;
556 if (!list_empty(&tx->tx_list))
557 list_del_init(&tx->tx_list);
559 if (tx->tx_status != 0) {
560 if (plni->plni_debug) {
561 CERROR("Completing tx for %s with error %d\n",
562 libcfs_id2str(peer->plp_id), tx->tx_status);
565 ptllnd_close_peer(peer, tx->tx_status);
568 ptllnd_abort_tx(tx, &tx->tx_reqmdh);
569 ptllnd_abort_tx(tx, &tx->tx_bulkmdh);
571 if (tx->tx_niov > 0) {
572 LIBCFS_FREE(tx->tx_iov, tx->tx_niov * sizeof(*tx->tx_iov));
576 if (tx->tx_lnetreplymsg != NULL) {
577 LASSERT (tx->tx_type == PTLLND_MSG_TYPE_GET);
578 LASSERT (tx->tx_lnetmsg != NULL);
579 /* Simulate GET success always */
580 lnet_finalize(ni, tx->tx_lnetmsg, 0);
581 CDEBUG(D_NET, "lnet_finalize(tx_lnetreplymsg=%p)\n",tx->tx_lnetreplymsg);
582 lnet_finalize(ni, tx->tx_lnetreplymsg, tx->tx_status);
583 } else if (tx->tx_lnetmsg != NULL) {
584 lnet_finalize(ni, tx->tx_lnetmsg, tx->tx_status);
587 plni->plni_ntx_history++;
588 list_add_tail(&tx->tx_list, &plni->plni_tx_history);
590 ptllnd_cull_tx_history(plni);
594 ptllnd_set_txiov(ptllnd_tx_t *tx,
595 unsigned int niov, struct iovec *iov,
596 unsigned int offset, unsigned int len)
598 ptl_md_iovec_t *piov;
607 * Remove iovec's at the beginning that
608 * are skipped because of the offset.
609 * Adjust the offset accordingly
613 if (offset < iov->iov_len)
615 offset -= iov->iov_len;
621 int temp_offset = offset;
623 LIBCFS_ALLOC(piov, niov * sizeof(*piov));
627 for (npiov = 0;; npiov++) {
628 LASSERT (npiov < niov);
629 LASSERT (iov->iov_len >= temp_offset);
631 piov[npiov].iov_base = iov[npiov].iov_base + temp_offset;
632 piov[npiov].iov_len = iov[npiov].iov_len - temp_offset;
634 if (piov[npiov].iov_len >= resid) {
635 piov[npiov].iov_len = resid;
639 resid -= piov[npiov].iov_len;
649 /* Dang! The piov I allocated was too big and it's a drag to
650 * have to maintain separate 'allocated' and 'used' sizes, so
651 * I'll just do it again; NB this doesn't happen normally... */
652 LIBCFS_FREE(piov, niov * sizeof(*piov));
658 ptllnd_set_md_buffer(ptl_md_t *md, ptllnd_tx_t *tx)
660 unsigned int niov = tx->tx_niov;
661 ptl_md_iovec_t *iov = tx->tx_iov;
663 LASSERT ((md->options & PTL_MD_IOVEC) == 0);
668 } else if (niov == 1) {
669 md->start = iov[0].iov_base;
670 md->length = iov[0].iov_len;
674 md->options |= PTL_MD_IOVEC;
679 ptllnd_post_buffer(ptllnd_buffer_t *buf)
681 lnet_ni_t *ni = buf->plb_ni;
682 ptllnd_ni_t *plni = ni->ni_data;
683 ptl_process_id_t anyid = {
687 .start = buf->plb_buffer,
688 .length = plni->plni_buffer_size,
689 .threshold = PTL_MD_THRESH_INF,
690 .max_size = plni->plni_max_msg_size,
691 .options = (PTLLND_MD_OPTIONS |
692 PTL_MD_OP_PUT | PTL_MD_MAX_SIZE |
693 PTL_MD_LOCAL_ALIGN8),
694 .user_ptr = ptllnd_obj2eventarg(buf, PTLLND_EVENTARG_TYPE_BUF),
695 .eq_handle = plni->plni_eqh};
699 LASSERT (!buf->plb_posted);
701 rc = PtlMEAttach(plni->plni_nih, plni->plni_portal,
702 anyid, LNET_MSG_MATCHBITS, 0,
703 PTL_UNLINK, PTL_INS_AFTER, &meh);
705 CERROR("PtlMEAttach failed: %s(%d)\n",
706 ptllnd_errtype2str(rc), rc);
711 plni->plni_nposted_buffers++;
713 rc = PtlMDAttach(meh, md, LNET_UNLINK, &buf->plb_md);
717 CERROR("PtlMDAttach failed: %s(%d)\n",
718 ptllnd_errtype2str(rc), rc);
721 plni->plni_nposted_buffers--;
723 rc = PtlMEUnlink(meh);
724 LASSERT (rc == PTL_OK);
730 ptllnd_peer_send_noop (ptllnd_peer_t *peer)
732 ptllnd_ni_t *plni = peer->plp_ni->ni_data;
734 if (!peer->plp_sent_hello ||
735 peer->plp_credits == 0 ||
736 !list_empty(&peer->plp_noopq) ||
737 peer->plp_outstanding_credits < PTLLND_CREDIT_HIGHWATER(plni))
740 /* No tx to piggyback NOOP onto or no credit to send a tx */
741 return (list_empty(&peer->plp_txq) || peer->plp_credits == 1);
745 ptllnd_check_sends(ptllnd_peer_t *peer)
747 ptllnd_ni_t *plni = peer->plp_ni->ni_data;
753 CDEBUG(D_NET, "%s: [%d/%d+%d(%d)\n",
754 libcfs_id2str(peer->plp_id), peer->plp_credits,
755 peer->plp_outstanding_credits, peer->plp_sent_credits,
756 plni->plni_peer_credits + peer->plp_lazy_credits);
758 if (ptllnd_peer_send_noop(peer)) {
759 tx = ptllnd_new_tx(peer, PTLLND_MSG_TYPE_NOOP, 0);
760 CDEBUG(D_NET, "NOOP tx=%p\n",tx);
762 CERROR("Can't return credits to %s\n",
763 libcfs_id2str(peer->plp_id));
765 ptllnd_set_tx_deadline(tx);
766 list_add_tail(&tx->tx_list, &peer->plp_noopq);
771 if (!list_empty(&peer->plp_noopq)) {
772 LASSERT (peer->plp_sent_hello);
773 tx = list_entry(peer->plp_noopq.next,
774 ptllnd_tx_t, tx_list);
775 } else if (!list_empty(&peer->plp_txq)) {
776 tx = list_entry(peer->plp_txq.next,
777 ptllnd_tx_t, tx_list);
779 /* nothing to send right now */
783 LASSERT (tx->tx_msgsize > 0);
785 LASSERT (peer->plp_outstanding_credits >= 0);
786 LASSERT (peer->plp_sent_credits >= 0);
787 LASSERT (peer->plp_outstanding_credits + peer->plp_sent_credits
788 <= plni->plni_peer_credits + peer->plp_lazy_credits);
789 LASSERT (peer->plp_credits >= 0);
791 /* say HELLO first */
792 if (!peer->plp_sent_hello) {
793 LASSERT (list_empty(&peer->plp_noopq));
794 LASSERT (tx->tx_type == PTLLND_MSG_TYPE_HELLO);
796 peer->plp_sent_hello = 1;
799 if (peer->plp_credits == 0) { /* no credits */
800 PTLLND_HISTORY("%s[%d/%d+%d(%d)]: no creds for %p",
801 libcfs_id2str(peer->plp_id),
803 peer->plp_outstanding_credits,
804 peer->plp_sent_credits,
805 plni->plni_peer_credits +
806 peer->plp_lazy_credits, tx);
810 /* Last/Initial credit reserved for NOOP/HELLO */
811 if (peer->plp_credits == 1 &&
812 tx->tx_type != PTLLND_MSG_TYPE_NOOP &&
813 tx->tx_type != PTLLND_MSG_TYPE_HELLO) {
814 PTLLND_HISTORY("%s[%d/%d+%d(%d)]: too few creds for %p",
815 libcfs_id2str(peer->plp_id),
817 peer->plp_outstanding_credits,
818 peer->plp_sent_credits,
819 plni->plni_peer_credits +
820 peer->plp_lazy_credits, tx);
824 list_del(&tx->tx_list);
825 list_add_tail(&tx->tx_list, &peer->plp_activeq);
827 CDEBUG(D_NET, "Sending at TX=%p type=%s (%d)\n",tx,
828 ptllnd_msgtype2str(tx->tx_type),tx->tx_type);
830 if (tx->tx_type == PTLLND_MSG_TYPE_NOOP &&
831 !ptllnd_peer_send_noop(peer)) {
837 /* Set stamp at the last minute; on a new peer, I don't know it
838 * until I receive the HELLO back */
839 tx->tx_msg.ptlm_dststamp = peer->plp_stamp;
842 * Return all the credits we have
844 tx->tx_msg.ptlm_credits = MIN(PTLLND_MSG_MAX_CREDITS,
845 peer->plp_outstanding_credits);
846 peer->plp_sent_credits += tx->tx_msg.ptlm_credits;
847 peer->plp_outstanding_credits -= tx->tx_msg.ptlm_credits;
854 if (plni->plni_checksum)
855 tx->tx_msg.ptlm_cksum =
856 ptllnd_cksum(&tx->tx_msg,
857 offsetof(kptl_msg_t, ptlm_u));
859 md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);
860 md.eq_handle = plni->plni_eqh;
862 md.options = PTLLND_MD_OPTIONS;
863 md.start = &tx->tx_msg;
864 md.length = tx->tx_msgsize;
866 rc = PtlMDBind(plni->plni_nih, md, LNET_UNLINK, &mdh);
868 CERROR("PtlMDBind for %s failed: %s(%d)\n",
869 libcfs_id2str(peer->plp_id),
870 ptllnd_errtype2str(rc), rc);
871 tx->tx_status = -EIO;
876 LASSERT (tx->tx_type != PTLLND_RDMA_WRITE &&
877 tx->tx_type != PTLLND_RDMA_READ);
880 gettimeofday(&tx->tx_req_posted, NULL);
882 PTLLND_HISTORY("%s[%d/%d+%d(%d)]: %s %p c %d",
883 libcfs_id2str(peer->plp_id),
885 peer->plp_outstanding_credits,
886 peer->plp_sent_credits,
887 plni->plni_peer_credits +
888 peer->plp_lazy_credits,
889 ptllnd_msgtype2str(tx->tx_type), tx,
890 tx->tx_msg.ptlm_credits);
892 rc = PtlPut(mdh, PTL_NOACK_REQ, peer->plp_ptlid,
893 plni->plni_portal, 0, LNET_MSG_MATCHBITS, 0, 0);
895 CERROR("PtlPut for %s failed: %s(%d)\n",
896 libcfs_id2str(peer->plp_id),
897 ptllnd_errtype2str(rc), rc);
898 tx->tx_status = -EIO;
906 ptllnd_passive_rdma(ptllnd_peer_t *peer, int type, lnet_msg_t *msg,
907 unsigned int niov, struct iovec *iov,
908 unsigned int offset, unsigned int len)
910 lnet_ni_t *ni = peer->plp_ni;
911 ptllnd_ni_t *plni = ni->ni_data;
912 ptllnd_tx_t *tx = ptllnd_new_tx(peer, type, 0);
922 CDEBUG(D_NET, "niov=%d offset=%d len=%d\n",niov,offset,len);
924 LASSERT (type == PTLLND_MSG_TYPE_GET ||
925 type == PTLLND_MSG_TYPE_PUT);
928 CERROR("Can't allocate %s tx for %s\n",
929 ptllnd_msgtype2str(type), libcfs_id2str(peer->plp_id));
933 rc = ptllnd_set_txiov(tx, niov, iov, offset, len);
935 CERROR("Can't allocate iov %d for %s\n",
936 niov, libcfs_id2str(peer->plp_id));
941 md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);
942 md.eq_handle = plni->plni_eqh;
945 md.options = PTLLND_MD_OPTIONS;
946 if(type == PTLLND_MSG_TYPE_GET)
947 md.options |= PTL_MD_OP_PUT | PTL_MD_ACK_DISABLE;
949 md.options |= PTL_MD_OP_GET;
950 ptllnd_set_md_buffer(&md, tx);
952 start = cfs_time_current_sec();
953 w = plni->plni_long_wait;
954 ptllnd_set_tx_deadline(tx);
956 while (!peer->plp_recvd_hello) { /* wait to validate plp_match */
957 if (peer->plp_closing) {
962 /* NB must check here to avoid unbounded wait - tx not yet
963 * on peer->plp_txq, so ptllnd_watchdog can't expire it */
964 if (tx->tx_deadline < cfs_time_current_sec()) {
965 CERROR("%s tx for %s timed out\n",
966 ptllnd_msgtype2str(type),
967 libcfs_id2str(peer->plp_id));
972 if (w > 0 && cfs_time_current_sec() > start + w/1000) {
973 CWARN("Waited %ds to connect to %s\n",
974 (int)(cfs_time_current_sec() - start),
975 libcfs_id2str(peer->plp_id));
981 if (peer->plp_match < PTL_RESERVED_MATCHBITS)
982 peer->plp_match = PTL_RESERVED_MATCHBITS;
983 matchbits = peer->plp_match++;
985 rc = PtlMEAttach(plni->plni_nih, plni->plni_portal, peer->plp_ptlid,
986 matchbits, 0, PTL_UNLINK, PTL_INS_BEFORE, &meh);
988 CERROR("PtlMEAttach for %s failed: %s(%d)\n",
989 libcfs_id2str(peer->plp_id),
990 ptllnd_errtype2str(rc), rc);
995 gettimeofday(&tx->tx_bulk_posted, NULL);
997 rc = PtlMDAttach(meh, md, LNET_UNLINK, &mdh);
999 CERROR("PtlMDAttach for %s failed: %s(%d)\n",
1000 libcfs_id2str(peer->plp_id),
1001 ptllnd_errtype2str(rc), rc);
1002 rc2 = PtlMEUnlink(meh);
1003 LASSERT (rc2 == PTL_OK);
1007 tx->tx_bulkmdh = mdh;
1010 * We need to set the stamp here because it
1011 * we could have received a HELLO above that set
1014 tx->tx_msg.ptlm_dststamp = peer->plp_stamp;
1016 tx->tx_msg.ptlm_u.rdma.kptlrm_hdr = msg->msg_hdr;
1017 tx->tx_msg.ptlm_u.rdma.kptlrm_matchbits = matchbits;
1019 if (type == PTLLND_MSG_TYPE_GET) {
1020 tx->tx_lnetreplymsg = lnet_create_reply_msg(ni, msg);
1021 if (tx->tx_lnetreplymsg == NULL) {
1022 CERROR("Can't create reply for GET to %s\n",
1023 libcfs_id2str(msg->msg_target));
1029 tx->tx_lnetmsg = msg;
1030 PTLLND_HISTORY("%s[%d/%d+%d(%d)]: post passive %s p %d %p",
1031 libcfs_id2str(msg->msg_target),
1032 peer->plp_credits, peer->plp_outstanding_credits,
1033 peer->plp_sent_credits,
1034 plni->plni_peer_credits + peer->plp_lazy_credits,
1035 lnet_msgtyp2str(msg->msg_type),
1036 (le32_to_cpu(msg->msg_type) == LNET_MSG_PUT) ?
1037 le32_to_cpu(msg->msg_hdr.msg.put.ptl_index) :
1038 (le32_to_cpu(msg->msg_type) == LNET_MSG_GET) ?
1039 le32_to_cpu(msg->msg_hdr.msg.get.ptl_index) : -1,
1051 ptllnd_active_rdma(ptllnd_peer_t *peer, int type,
1052 lnet_msg_t *msg, __u64 matchbits,
1053 unsigned int niov, struct iovec *iov,
1054 unsigned int offset, unsigned int len)
1056 lnet_ni_t *ni = peer->plp_ni;
1057 ptllnd_ni_t *plni = ni->ni_data;
1058 ptllnd_tx_t *tx = ptllnd_new_tx(peer, type, 0);
1060 ptl_handle_md_t mdh;
1063 LASSERT (type == PTLLND_RDMA_READ ||
1064 type == PTLLND_RDMA_WRITE);
1067 CERROR("Can't allocate tx for RDMA %s with %s\n",
1068 (type == PTLLND_RDMA_WRITE) ? "write" : "read",
1069 libcfs_id2str(peer->plp_id));
1070 ptllnd_close_peer(peer, -ENOMEM);
1074 rc = ptllnd_set_txiov(tx, niov, iov, offset, len);
1076 CERROR("Can't allocate iov %d for %s\n",
1077 niov, libcfs_id2str(peer->plp_id));
1082 md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);
1083 md.eq_handle = plni->plni_eqh;
1085 md.options = PTLLND_MD_OPTIONS;
1086 md.threshold = (type == PTLLND_RDMA_READ) ? 2 : 1;
1088 ptllnd_set_md_buffer(&md, tx);
1090 rc = PtlMDBind(plni->plni_nih, md, LNET_UNLINK, &mdh);
1092 CERROR("PtlMDBind for %s failed: %s(%d)\n",
1093 libcfs_id2str(peer->plp_id),
1094 ptllnd_errtype2str(rc), rc);
1099 tx->tx_bulkmdh = mdh;
1100 tx->tx_lnetmsg = msg;
1102 ptllnd_set_tx_deadline(tx);
1103 list_add_tail(&tx->tx_list, &peer->plp_activeq);
1104 gettimeofday(&tx->tx_bulk_posted, NULL);
1106 if (type == PTLLND_RDMA_READ)
1107 rc = PtlGet(mdh, peer->plp_ptlid,
1108 plni->plni_portal, 0, matchbits, 0);
1110 rc = PtlPut(mdh, PTL_NOACK_REQ, peer->plp_ptlid,
1111 plni->plni_portal, 0, matchbits, 0,
1112 (msg == NULL) ? PTLLND_RDMA_FAIL : PTLLND_RDMA_OK);
1117 CERROR("Can't initiate RDMA with %s: %s(%d)\n",
1118 libcfs_id2str(peer->plp_id),
1119 ptllnd_errtype2str(rc), rc);
1121 tx->tx_lnetmsg = NULL;
1124 ptllnd_tx_done(tx); /* this will close peer */
1129 ptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *msg)
1131 ptllnd_ni_t *plni = ni->ni_data;
1137 LASSERT (!msg->msg_routing);
1138 LASSERT (msg->msg_kiov == NULL);
1140 LASSERT (msg->msg_niov <= PTL_MD_MAX_IOV); /* !!! */
1142 CDEBUG(D_NET, "%s [%d]+%d,%d -> %s%s\n",
1143 lnet_msgtyp2str(msg->msg_type),
1144 msg->msg_niov, msg->msg_offset, msg->msg_len,
1145 libcfs_nid2str(msg->msg_target.nid),
1146 msg->msg_target_is_router ? "(rtr)" : "");
1148 if ((msg->msg_target.pid & LNET_PID_USERFLAG) != 0) {
1149 CERROR("Can't send to non-kernel peer %s\n",
1150 libcfs_id2str(msg->msg_target));
1151 return -EHOSTUNREACH;
1154 plp = ptllnd_find_peer(ni, msg->msg_target, 1);
1158 switch (msg->msg_type) {
1163 LASSERT (msg->msg_len == 0);
1164 break; /* send IMMEDIATE */
1167 if (msg->msg_target_is_router)
1168 break; /* send IMMEDIATE */
1170 nob = msg->msg_md->md_length;
1171 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[nob]);
1172 if (nob <= plni->plni_max_msg_size)
1175 LASSERT ((msg->msg_md->md_options & LNET_MD_KIOV) == 0);
1176 rc = ptllnd_passive_rdma(plp, PTLLND_MSG_TYPE_GET, msg,
1177 msg->msg_md->md_niov,
1178 msg->msg_md->md_iov.iov,
1179 0, msg->msg_md->md_length);
1180 ptllnd_peer_decref(plp);
1183 case LNET_MSG_REPLY:
1186 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[nob]);
1187 if (nob <= plp->plp_max_msg_size)
1188 break; /* send IMMEDIATE */
1190 rc = ptllnd_passive_rdma(plp, PTLLND_MSG_TYPE_PUT, msg,
1191 msg->msg_niov, msg->msg_iov,
1192 msg->msg_offset, msg->msg_len);
1193 ptllnd_peer_decref(plp);
1198 * NB copy the payload so we don't have to do a fragmented send */
1200 tx = ptllnd_new_tx(plp, PTLLND_MSG_TYPE_IMMEDIATE, msg->msg_len);
1202 CERROR("Can't allocate tx for lnet type %d to %s\n",
1203 msg->msg_type, libcfs_id2str(msg->msg_target));
1204 ptllnd_peer_decref(plp);
1208 lnet_copy_iov2flat(tx->tx_msgsize, &tx->tx_msg,
1209 offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload),
1210 msg->msg_niov, msg->msg_iov, msg->msg_offset,
1212 tx->tx_msg.ptlm_u.immediate.kptlim_hdr = msg->msg_hdr;
1214 tx->tx_lnetmsg = msg;
1215 PTLLND_HISTORY("%s[%d/%d+%d(%d)]: post immediate %s p %d %p",
1216 libcfs_id2str(msg->msg_target),
1217 plp->plp_credits, plp->plp_outstanding_credits,
1218 plp->plp_sent_credits,
1219 plni->plni_peer_credits + plp->plp_lazy_credits,
1220 lnet_msgtyp2str(msg->msg_type),
1221 (le32_to_cpu(msg->msg_type) == LNET_MSG_PUT) ?
1222 le32_to_cpu(msg->msg_hdr.msg.put.ptl_index) :
1223 (le32_to_cpu(msg->msg_type) == LNET_MSG_GET) ?
1224 le32_to_cpu(msg->msg_hdr.msg.get.ptl_index) : -1,
1227 ptllnd_peer_decref(plp);
1232 ptllnd_rx_done(ptllnd_rx_t *rx)
1234 ptllnd_peer_t *plp = rx->rx_peer;
1235 ptllnd_ni_t *plni = plp->plp_ni->ni_data;
1237 plp->plp_outstanding_credits++;
1239 PTLLND_HISTORY("%s[%d/%d+%d(%d)]: rx=%p done\n",
1240 libcfs_id2str(plp->plp_id),
1241 plp->plp_credits, plp->plp_outstanding_credits,
1242 plp->plp_sent_credits,
1243 plni->plni_peer_credits + plp->plp_lazy_credits, rx);
1245 ptllnd_check_sends(plp);
1247 LASSERT (plni->plni_nrxs > 0);
1252 ptllnd_eager_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg,
1253 void **new_privatep)
1255 /* Shouldn't get here; recvs only block for router buffers */
1261 ptllnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg,
1262 int delayed, unsigned int niov,
1263 struct iovec *iov, lnet_kiov_t *kiov,
1264 unsigned int offset, unsigned int mlen, unsigned int rlen)
1266 ptllnd_rx_t *rx = private;
1270 LASSERT (kiov == NULL);
1271 LASSERT (niov <= PTL_MD_MAX_IOV); /* !!! */
1273 switch (rx->rx_msg->ptlm_type) {
1277 case PTLLND_MSG_TYPE_IMMEDIATE:
1278 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[mlen]);
1279 if (nob > rx->rx_nob) {
1280 CERROR("Immediate message from %s too big: %d(%d)\n",
1281 libcfs_id2str(rx->rx_peer->plp_id),
1286 lnet_copy_flat2iov(niov, iov, offset,
1287 rx->rx_nob, rx->rx_msg,
1288 offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload),
1290 lnet_finalize(ni, msg, 0);
1293 case PTLLND_MSG_TYPE_PUT:
1294 rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_READ, msg,
1295 rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,
1296 niov, iov, offset, mlen);
1299 case PTLLND_MSG_TYPE_GET:
1301 rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_WRITE, msg,
1302 rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,
1303 msg->msg_niov, msg->msg_iov,
1304 msg->msg_offset, msg->msg_len);
1306 rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_WRITE, NULL,
1307 rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,
1317 ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator,
1318 kptl_msg_t *msg, unsigned int nob)
1320 ptllnd_ni_t *plni = ni->ni_data;
1321 const int basenob = offsetof(kptl_msg_t, ptlm_u);
1322 lnet_process_id_t srcid;
1331 CERROR("Very short receive from %s\n",
1332 ptllnd_ptlid2str(initiator));
1336 /* I can at least read MAGIC/VERSION */
1338 flip = msg->ptlm_magic == __swab32(PTLLND_MSG_MAGIC);
1339 if (!flip && msg->ptlm_magic != PTLLND_MSG_MAGIC) {
1340 CERROR("Bad protocol magic %08x from %s\n",
1341 msg->ptlm_magic, ptllnd_ptlid2str(initiator));
1345 msg_version = flip ? __swab16(msg->ptlm_version) : msg->ptlm_version;
1347 if (msg_version != PTLLND_MSG_VERSION) {
1348 CERROR("Bad protocol version %04x from %s: %04x expected\n",
1349 (__u32)msg_version, ptllnd_ptlid2str(initiator), PTLLND_MSG_VERSION);
1351 if (plni->plni_abort_on_protocol_mismatch)
1357 if (nob < basenob) {
1358 CERROR("Short receive from %s: got %d, wanted at least %d\n",
1359 ptllnd_ptlid2str(initiator), nob, basenob);
1363 /* checksum must be computed with
1364 * 1) ptlm_cksum zero and
1365 * 2) BEFORE anything gets modified/flipped
1367 msg_cksum = flip ? __swab32(msg->ptlm_cksum) : msg->ptlm_cksum;
1368 msg->ptlm_cksum = 0;
1369 if (msg_cksum != 0 &&
1370 msg_cksum != ptllnd_cksum(msg, offsetof(kptl_msg_t, ptlm_u))) {
1371 CERROR("Bad checksum from %s\n", ptllnd_ptlid2str(initiator));
1375 msg->ptlm_version = msg_version;
1376 msg->ptlm_cksum = msg_cksum;
1379 /* NB stamps are opaque cookies */
1380 __swab32s(&msg->ptlm_nob);
1381 __swab64s(&msg->ptlm_srcnid);
1382 __swab64s(&msg->ptlm_dstnid);
1383 __swab32s(&msg->ptlm_srcpid);
1384 __swab32s(&msg->ptlm_dstpid);
1387 srcid.nid = msg->ptlm_srcnid;
1388 srcid.pid = msg->ptlm_srcpid;
1390 if (LNET_NIDNET(msg->ptlm_srcnid) != LNET_NIDNET(ni->ni_nid)) {
1391 CERROR("Bad source id %s from %s\n",
1392 libcfs_id2str(srcid),
1393 ptllnd_ptlid2str(initiator));
1397 if (msg->ptlm_type == PTLLND_MSG_TYPE_NAK) {
1398 CERROR("NAK from %s (%s)\n",
1399 libcfs_id2str(srcid),
1400 ptllnd_ptlid2str(initiator));
1402 if (plni->plni_dump_on_nak)
1403 ptllnd_dump_debug(ni, srcid);
1405 if (plni->plni_abort_on_nak)
1408 plp = ptllnd_find_peer(ni, srcid, 0);
1410 CERROR("Ignore NAK from %s: no peer\n", libcfs_id2str(srcid));
1413 ptllnd_close_peer(plp, -EPROTO);
1414 ptllnd_peer_decref(plp);
1418 if (msg->ptlm_dstnid != ni->ni_nid ||
1419 msg->ptlm_dstpid != the_lnet.ln_pid) {
1420 CERROR("Bad dstid %s (%s expected) from %s\n",
1421 libcfs_id2str((lnet_process_id_t) {
1422 .nid = msg->ptlm_dstnid,
1423 .pid = msg->ptlm_dstpid}),
1424 libcfs_id2str((lnet_process_id_t) {
1426 .pid = the_lnet.ln_pid}),
1427 libcfs_id2str(srcid));
1431 if (msg->ptlm_dststamp != plni->plni_stamp) {
1432 CERROR("Bad dststamp "LPX64"("LPX64" expected) from %s\n",
1433 msg->ptlm_dststamp, plni->plni_stamp,
1434 libcfs_id2str(srcid));
1438 PTLLND_HISTORY("RX %s: %s %d %p", libcfs_id2str(srcid),
1439 ptllnd_msgtype2str(msg->ptlm_type),
1440 msg->ptlm_credits, &rx);
1442 switch (msg->ptlm_type) {
1443 case PTLLND_MSG_TYPE_PUT:
1444 case PTLLND_MSG_TYPE_GET:
1445 if (nob < basenob + sizeof(kptl_rdma_msg_t)) {
1446 CERROR("Short rdma request from %s(%s)\n",
1447 libcfs_id2str(srcid),
1448 ptllnd_ptlid2str(initiator));
1452 __swab64s(&msg->ptlm_u.rdma.kptlrm_matchbits);
1455 case PTLLND_MSG_TYPE_IMMEDIATE:
1456 if (nob < offsetof(kptl_msg_t,
1457 ptlm_u.immediate.kptlim_payload)) {
1458 CERROR("Short immediate from %s(%s)\n",
1459 libcfs_id2str(srcid),
1460 ptllnd_ptlid2str(initiator));
1465 case PTLLND_MSG_TYPE_HELLO:
1466 if (nob < basenob + sizeof(kptl_hello_msg_t)) {
1467 CERROR("Short hello from %s(%s)\n",
1468 libcfs_id2str(srcid),
1469 ptllnd_ptlid2str(initiator));
1473 __swab64s(&msg->ptlm_u.hello.kptlhm_matchbits);
1474 __swab32s(&msg->ptlm_u.hello.kptlhm_max_msg_size);
1478 case PTLLND_MSG_TYPE_NOOP:
1482 CERROR("Bad message type %d from %s(%s)\n", msg->ptlm_type,
1483 libcfs_id2str(srcid),
1484 ptllnd_ptlid2str(initiator));
1488 plp = ptllnd_find_peer(ni, srcid, 0);
1490 CERROR("Can't find peer %s\n", libcfs_id2str(srcid));
1494 if (msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {
1495 if (plp->plp_recvd_hello) {
1496 CERROR("Unexpected HELLO from %s\n",
1497 libcfs_id2str(srcid));
1498 ptllnd_peer_decref(plp);
1502 plp->plp_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size;
1503 plp->plp_match = msg->ptlm_u.hello.kptlhm_matchbits;
1504 plp->plp_stamp = msg->ptlm_srcstamp;
1505 plp->plp_recvd_hello = 1;
1507 } else if (!plp->plp_recvd_hello) {
1509 CERROR("Bad message type %d (HELLO expected) from %s\n",
1510 msg->ptlm_type, libcfs_id2str(srcid));
1511 ptllnd_peer_decref(plp);
1514 } else if (msg->ptlm_srcstamp != plp->plp_stamp) {
1516 CERROR("Bad srcstamp "LPX64"("LPX64" expected) from %s\n",
1517 msg->ptlm_srcstamp, plp->plp_stamp,
1518 libcfs_id2str(srcid));
1519 ptllnd_peer_decref(plp);
1523 /* Check peer only sends when I've sent her credits */
1524 if (plp->plp_sent_credits == 0) {
1525 CERROR("%s[%d/%d+%d(%d)]: unexpected message\n",
1526 libcfs_id2str(plp->plp_id),
1527 plp->plp_credits, plp->plp_outstanding_credits,
1528 plp->plp_sent_credits,
1529 plni->plni_peer_credits + plp->plp_lazy_credits);
1532 plp->plp_sent_credits--;
1534 /* No check for credit overflow - the peer may post new buffers after
1535 * the startup handshake. */
1536 plp->plp_credits += msg->ptlm_credits;
1538 /* All OK so far; assume the message is good... */
1545 switch (msg->ptlm_type) {
1546 default: /* message types have been checked already */
1547 ptllnd_rx_done(&rx);
1550 case PTLLND_MSG_TYPE_PUT:
1551 case PTLLND_MSG_TYPE_GET:
1552 rc = lnet_parse(ni, &msg->ptlm_u.rdma.kptlrm_hdr,
1553 msg->ptlm_srcnid, &rx, 1);
1555 ptllnd_rx_done(&rx);
1558 case PTLLND_MSG_TYPE_IMMEDIATE:
1559 rc = lnet_parse(ni, &msg->ptlm_u.immediate.kptlim_hdr,
1560 msg->ptlm_srcnid, &rx, 0);
1562 ptllnd_rx_done(&rx);
1566 if (msg->ptlm_credits > 0)
1567 ptllnd_check_sends(plp);
1569 ptllnd_peer_decref(plp);
1573 ptllnd_buf_event (lnet_ni_t *ni, ptl_event_t *event)
1575 ptllnd_buffer_t *buf = ptllnd_eventarg2obj(event->md.user_ptr);
1576 ptllnd_ni_t *plni = ni->ni_data;
1577 char *msg = &buf->plb_buffer[event->offset];
1579 int unlinked = event->type == PTL_EVENT_UNLINK;
1581 LASSERT (buf->plb_ni == ni);
1582 LASSERT (event->type == PTL_EVENT_PUT_END ||
1583 event->type == PTL_EVENT_UNLINK);
1585 if (event->ni_fail_type != PTL_NI_OK) {
1587 CERROR("event type %s(%d), status %s(%d) from %s\n",
1588 ptllnd_evtype2str(event->type), event->type,
1589 ptllnd_errtype2str(event->ni_fail_type),
1590 event->ni_fail_type,
1591 ptllnd_ptlid2str(event->initiator));
1593 } else if (event->type == PTL_EVENT_PUT_END) {
1594 #if (PTL_MD_LOCAL_ALIGN8 == 0)
1595 /* Portals can't force message alignment - someone sending an
1596 * odd-length message could misalign subsequent messages */
1597 if ((event->mlength & 7) != 0) {
1598 CERROR("Message from %s has odd length "LPU64
1599 " probable version incompatibility\n",
1600 ptllnd_ptlid2str(event->initiator),
1605 LASSERT ((event->offset & 7) == 0);
1607 ptllnd_parse_request(ni, event->initiator,
1608 (kptl_msg_t *)msg, event->mlength);
1611 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
1612 /* UNLINK event only on explicit unlink */
1613 repost = (event->unlinked && event->type != PTL_EVENT_UNLINK);
1614 if (event->unlinked)
1617 /* UNLINK event only on implicit unlink */
1618 repost = (event->type == PTL_EVENT_UNLINK);
1622 LASSERT(buf->plb_posted);
1623 buf->plb_posted = 0;
1624 plni->plni_nposted_buffers--;
1628 (void) ptllnd_post_buffer(buf);
1632 ptllnd_tx_event (lnet_ni_t *ni, ptl_event_t *event)
1634 ptllnd_ni_t *plni = ni->ni_data;
1635 ptllnd_tx_t *tx = ptllnd_eventarg2obj(event->md.user_ptr);
1636 int error = (event->ni_fail_type != PTL_NI_OK);
1639 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
1640 int unlinked = event->unlinked;
1642 int unlinked = (event->type == PTL_EVENT_UNLINK);
1646 CERROR("Error %s(%d) event %s(%d) unlinked %d, %s(%d) for %s\n",
1647 ptllnd_errtype2str(event->ni_fail_type),
1648 event->ni_fail_type,
1649 ptllnd_evtype2str(event->type), event->type,
1650 unlinked, ptllnd_msgtype2str(tx->tx_type), tx->tx_type,
1651 libcfs_id2str(tx->tx_peer->plp_id));
1653 LASSERT (!PtlHandleIsEqual(event->md_handle, PTL_INVALID_HANDLE));
1655 isreq = PtlHandleIsEqual(event->md_handle, tx->tx_reqmdh);
1657 LASSERT (event->md.start == (void *)&tx->tx_msg);
1659 tx->tx_reqmdh = PTL_INVALID_HANDLE;
1660 gettimeofday(&tx->tx_req_done, NULL);
1664 isbulk = PtlHandleIsEqual(event->md_handle, tx->tx_bulkmdh);
1665 if ( isbulk && unlinked ) {
1666 tx->tx_bulkmdh = PTL_INVALID_HANDLE;
1667 gettimeofday(&tx->tx_bulk_done, NULL);
1670 LASSERT (!isreq != !isbulk); /* always one and only 1 match */
1672 PTLLND_HISTORY("%s[%d/%d+%d(%d)]: TX done %p %s%s",
1673 libcfs_id2str(tx->tx_peer->plp_id),
1674 tx->tx_peer->plp_credits,
1675 tx->tx_peer->plp_outstanding_credits,
1676 tx->tx_peer->plp_sent_credits,
1677 plni->plni_peer_credits + tx->tx_peer->plp_lazy_credits,
1678 tx, isreq ? "REQ" : "BULK", unlinked ? "(unlinked)" : "");
1680 LASSERT (!isreq != !isbulk); /* always one and only 1 match */
1681 switch (tx->tx_type) {
1685 case PTLLND_MSG_TYPE_NOOP:
1686 case PTLLND_MSG_TYPE_HELLO:
1687 case PTLLND_MSG_TYPE_IMMEDIATE:
1688 LASSERT (event->type == PTL_EVENT_UNLINK ||
1689 event->type == PTL_EVENT_SEND_END);
1693 case PTLLND_MSG_TYPE_GET:
1694 LASSERT (event->type == PTL_EVENT_UNLINK ||
1695 (isreq && event->type == PTL_EVENT_SEND_END) ||
1696 (isbulk && event->type == PTL_EVENT_PUT_END));
1698 if (isbulk && !error && event->type == PTL_EVENT_PUT_END) {
1699 /* Check GET matched */
1700 if (event->hdr_data == PTLLND_RDMA_OK) {
1701 lnet_set_reply_msg_len(ni,
1702 tx->tx_lnetreplymsg,
1705 CERROR ("Unmatched GET with %s\n",
1706 libcfs_id2str(tx->tx_peer->plp_id));
1707 tx->tx_status = -EIO;
1712 case PTLLND_MSG_TYPE_PUT:
1713 LASSERT (event->type == PTL_EVENT_UNLINK ||
1714 (isreq && event->type == PTL_EVENT_SEND_END) ||
1715 (isbulk && event->type == PTL_EVENT_GET_END));
1718 case PTLLND_RDMA_READ:
1719 LASSERT (event->type == PTL_EVENT_UNLINK ||
1720 event->type == PTL_EVENT_SEND_END ||
1721 event->type == PTL_EVENT_REPLY_END);
1725 case PTLLND_RDMA_WRITE:
1726 LASSERT (event->type == PTL_EVENT_UNLINK ||
1727 event->type == PTL_EVENT_SEND_END);
1731 /* Schedule ptllnd_tx_done() on error or last completion event */
1733 (PtlHandleIsEqual(tx->tx_bulkmdh, PTL_INVALID_HANDLE) &&
1734 PtlHandleIsEqual(tx->tx_reqmdh, PTL_INVALID_HANDLE))) {
1736 tx->tx_status = -EIO;
1737 list_del(&tx->tx_list);
1738 list_add_tail(&tx->tx_list, &plni->plni_zombie_txs);
1743 ptllnd_find_timed_out_tx(ptllnd_peer_t *peer)
1745 time_t now = cfs_time_current_sec();
1748 list_for_each_entry (tx, &peer->plp_txq, tx_list) {
1749 if (tx->tx_deadline < now)
1753 list_for_each_entry (tx, &peer->plp_noopq, tx_list) {
1754 if (tx->tx_deadline < now)
1758 list_for_each_entry (tx, &peer->plp_activeq, tx_list) {
1759 if (tx->tx_deadline < now)
1767 ptllnd_check_peer(ptllnd_peer_t *peer)
1769 ptllnd_tx_t *tx = ptllnd_find_timed_out_tx(peer);
1774 CERROR("%s (sent %d recvd %d, credits %d/%d/%d/%d/%d): timed out %p %p\n",
1775 libcfs_id2str(peer->plp_id), peer->plp_sent_hello, peer->plp_recvd_hello,
1776 peer->plp_credits, peer->plp_outstanding_credits,
1777 peer->plp_sent_credits, peer->plp_lazy_credits,
1778 peer->plp_extra_lazy_credits, tx, tx->tx_lnetmsg);
1779 ptllnd_debug_tx(tx);
1780 ptllnd_close_peer(peer, -ETIMEDOUT);
1784 ptllnd_watchdog (lnet_ni_t *ni, time_t now)
1786 ptllnd_ni_t *plni = ni->ni_data;
1788 int p = plni->plni_watchdog_interval;
1789 int chunk = plni->plni_peer_hash_size;
1790 int interval = now - (plni->plni_watchdog_nextt - p);
1792 struct list_head *hashlist;
1793 struct list_head *tmp;
1794 struct list_head *nxt;
1796 /* Time to check for RDMA timeouts on a few more peers:
1797 * I try to do checks every 'p' seconds on a proportion of the peer
1798 * table and I need to check every connection 'n' times within a
1799 * timeout interval, to ensure I detect a timeout on any connection
1800 * within (n+1)/n times the timeout interval. */
1802 LASSERT (now >= plni->plni_watchdog_nextt);
1804 if (plni->plni_timeout > n * interval) { /* Scan less than the whole table? */
1805 chunk = (chunk * n * interval) / plni->plni_timeout;
1810 for (i = 0; i < chunk; i++) {
1811 hashlist = &plni->plni_peer_hash[plni->plni_watchdog_peeridx];
1813 list_for_each_safe(tmp, nxt, hashlist) {
1814 ptllnd_check_peer(list_entry(tmp, ptllnd_peer_t, plp_list));
1817 plni->plni_watchdog_peeridx = (plni->plni_watchdog_peeridx + 1) %
1818 plni->plni_peer_hash_size;
1821 plni->plni_watchdog_nextt = now + p;
1825 ptllnd_wait (lnet_ni_t *ni, int milliseconds)
1827 static struct timeval prevt;
1828 static int prevt_count;
1829 static int call_count;
1831 struct timeval start;
1832 struct timeval then;
1834 struct timeval deadline;
1836 ptllnd_ni_t *plni = ni->ni_data;
1844 /* Handle any currently queued events, returning immediately if any.
1845 * Otherwise block for the timeout and handle all events queued
1848 gettimeofday(&start, NULL);
1851 if (milliseconds <= 0) {
1854 deadline.tv_sec = start.tv_sec + milliseconds/1000;
1855 deadline.tv_usec = start.tv_usec + (milliseconds % 1000)*1000;
1857 if (deadline.tv_usec >= 1000000) {
1858 start.tv_usec -= 1000000;
1864 gettimeofday(&then, NULL);
1866 rc = PtlEQPoll(&plni->plni_eqh, 1, timeout, &event, &which);
1868 gettimeofday(&now, NULL);
1870 if ((now.tv_sec*1000 + now.tv_usec/1000) -
1871 (then.tv_sec*1000 + then.tv_usec/1000) > timeout + 1000) {
1872 /* 1000 mS grace...........................^ */
1873 CERROR("SLOW PtlEQPoll(%d): %dmS elapsed\n", timeout,
1874 (int)(now.tv_sec*1000 + now.tv_usec/1000) -
1875 (int)(then.tv_sec*1000 + then.tv_usec/1000));
1878 if (rc == PTL_EQ_EMPTY) {
1879 if (found) /* handled some events */
1882 if (now.tv_sec >= plni->plni_watchdog_nextt) { /* check timeouts? */
1883 ptllnd_watchdog(ni, now.tv_sec);
1884 LASSERT (now.tv_sec < plni->plni_watchdog_nextt);
1887 if (now.tv_sec > deadline.tv_sec || /* timeout expired */
1888 (now.tv_sec == deadline.tv_sec &&
1889 now.tv_usec >= deadline.tv_usec))
1892 if (milliseconds < 0 ||
1893 plni->plni_watchdog_nextt <= deadline.tv_sec) {
1894 timeout = (plni->plni_watchdog_nextt - now.tv_sec)*1000;
1896 timeout = (deadline.tv_sec - now.tv_sec)*1000 +
1897 (deadline.tv_usec - now.tv_usec)/1000;
1903 LASSERT (rc == PTL_OK || rc == PTL_EQ_DROPPED);
1905 if (rc == PTL_EQ_DROPPED)
1906 CERROR("Event queue: size %d is too small\n",
1907 plni->plni_eq_size);
1912 switch (ptllnd_eventarg2type(event.md.user_ptr)) {
1916 case PTLLND_EVENTARG_TYPE_TX:
1917 ptllnd_tx_event(ni, &event);
1920 case PTLLND_EVENTARG_TYPE_BUF:
1921 ptllnd_buf_event(ni, &event);
1926 while (!list_empty(&plni->plni_zombie_txs)) {
1927 tx = list_entry(plni->plni_zombie_txs.next,
1928 ptllnd_tx_t, tx_list);
1929 list_del_init(&tx->tx_list);
1933 if (prevt.tv_sec == 0 ||
1934 prevt.tv_sec != now.tv_sec) {
1935 PTLLND_HISTORY("%d wait entered at %d.%06d - prev %d %d.%06d",
1936 call_count, (int)start.tv_sec, (int)start.tv_usec,
1937 prevt_count, (int)prevt.tv_sec, (int)prevt.tv_usec);