1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see [sun.com URL with a
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lnet/ulnds/ptllnd/ptllnd_cb.c
38 * Author: Eric Barton <eeb@bartonsoftware.com>
44 ptllnd_set_tx_deadline(ptllnd_tx_t *tx)
46 ptllnd_peer_t *peer = tx->tx_peer;
47 lnet_ni_t *ni = peer->plp_ni;
48 ptllnd_ni_t *plni = ni->ni_data;
50 tx->tx_deadline = cfs_time_current_sec() + plni->plni_timeout;
54 ptllnd_post_tx(ptllnd_tx_t *tx)
56 ptllnd_peer_t *peer = tx->tx_peer;
58 ptllnd_set_tx_deadline(tx);
59 list_add_tail(&tx->tx_list, &peer->plp_txq);
60 ptllnd_check_sends(peer);
64 ptllnd_ptlid2str(ptl_process_id_t id)
66 static char strs[8][32];
69 char *str = strs[idx++];
71 if (idx >= sizeof(strs)/sizeof(strs[0]))
74 snprintf(str, sizeof(strs[0]), FMT_PTLID, id.pid, id.nid);
79 ptllnd_destroy_peer(ptllnd_peer_t *peer)
81 lnet_ni_t *ni = peer->plp_ni;
82 ptllnd_ni_t *plni = ni->ni_data;
83 int nmsg = peer->plp_lazy_credits +
84 plni->plni_peer_credits;
86 ptllnd_size_buffers(ni, -nmsg);
88 LASSERT (peer->plp_closing);
89 LASSERT (plni->plni_npeers > 0);
90 LASSERT (list_empty(&peer->plp_txq));
91 LASSERT (list_empty(&peer->plp_activeq));
93 LIBCFS_FREE(peer, sizeof(*peer));
97 ptllnd_abort_txs(ptllnd_ni_t *plni, struct list_head *q)
99 while (!list_empty(q)) {
100 ptllnd_tx_t *tx = list_entry(q->next, ptllnd_tx_t, tx_list);
102 tx->tx_status = -ESHUTDOWN;
103 list_del(&tx->tx_list);
104 list_add_tail(&tx->tx_list, &plni->plni_zombie_txs);
109 ptllnd_close_peer(ptllnd_peer_t *peer, int error)
111 lnet_ni_t *ni = peer->plp_ni;
112 ptllnd_ni_t *plni = ni->ni_data;
114 if (peer->plp_closing)
117 peer->plp_closing = 1;
119 if (!list_empty(&peer->plp_txq) ||
120 !list_empty(&peer->plp_activeq) ||
122 CWARN("Closing %s\n", libcfs_id2str(peer->plp_id));
123 if (plni->plni_debug)
124 ptllnd_dump_debug(ni, peer->plp_id);
127 ptllnd_abort_txs(plni, &peer->plp_txq);
128 ptllnd_abort_txs(plni, &peer->plp_activeq);
130 list_del(&peer->plp_list);
131 ptllnd_peer_decref(peer);
135 ptllnd_find_peer(lnet_ni_t *ni, lnet_process_id_t id, int create)
137 ptllnd_ni_t *plni = ni->ni_data;
138 unsigned int hash = LNET_NIDADDR(id.nid) % plni->plni_peer_hash_size;
139 struct list_head *tmp;
144 LASSERT (LNET_NIDNET(id.nid) == LNET_NIDNET(ni->ni_nid));
146 list_for_each(tmp, &plni->plni_peer_hash[hash]) {
147 plp = list_entry(tmp, ptllnd_peer_t, plp_list);
149 if (plp->plp_id.nid == id.nid &&
150 plp->plp_id.pid == id.pid) {
151 ptllnd_peer_addref(plp);
159 /* New peer: check first for enough posted buffers */
161 rc = ptllnd_size_buffers(ni, plni->plni_peer_credits);
167 LIBCFS_ALLOC(plp, sizeof(*plp));
169 CERROR("Can't allocate new peer %s\n", libcfs_id2str(id));
171 ptllnd_size_buffers(ni, -plni->plni_peer_credits);
177 plp->plp_ptlid.nid = LNET_NIDADDR(id.nid);
178 plp->plp_ptlid.pid = plni->plni_ptllnd_pid;
179 plp->plp_credits = 1; /* add more later when she gives me credits */
180 plp->plp_max_msg_size = plni->plni_max_msg_size; /* until I hear from her */
181 plp->plp_sent_credits = 1; /* Implicit credit for HELLO */
182 plp->plp_outstanding_credits = plni->plni_peer_credits - 1;
183 plp->plp_lazy_credits = 0;
184 plp->plp_extra_lazy_credits = 0;
187 plp->plp_recvd_hello = 0;
188 plp->plp_closing = 0;
189 plp->plp_refcount = 1;
190 CFS_INIT_LIST_HEAD(&plp->plp_list);
191 CFS_INIT_LIST_HEAD(&plp->plp_txq);
192 CFS_INIT_LIST_HEAD(&plp->plp_activeq);
194 ptllnd_peer_addref(plp);
195 list_add_tail(&plp->plp_list, &plni->plni_peer_hash[hash]);
197 tx = ptllnd_new_tx(plp, PTLLND_MSG_TYPE_HELLO, 0);
199 CERROR("Can't send HELLO to %s\n", libcfs_id2str(id));
200 ptllnd_close_peer(plp, -ENOMEM);
201 ptllnd_peer_decref(plp);
205 tx->tx_msg.ptlm_u.hello.kptlhm_matchbits = PTL_RESERVED_MATCHBITS;
206 tx->tx_msg.ptlm_u.hello.kptlhm_max_msg_size = plni->plni_max_msg_size;
208 PTLLND_HISTORY("%s[%d/%d+%d(%d)]: post hello %p", libcfs_id2str(id),
209 tx->tx_peer->plp_credits,
210 tx->tx_peer->plp_outstanding_credits,
211 tx->tx_peer->plp_sent_credits,
212 plni->plni_peer_credits +
213 tx->tx_peer->plp_lazy_credits, tx);
220 ptllnd_count_q(struct list_head *q)
225 list_for_each(e, q) {
233 ptllnd_tx_typestr(int type)
236 case PTLLND_RDMA_WRITE:
239 case PTLLND_RDMA_READ:
242 case PTLLND_MSG_TYPE_PUT:
245 case PTLLND_MSG_TYPE_GET:
248 case PTLLND_MSG_TYPE_IMMEDIATE:
251 case PTLLND_MSG_TYPE_NOOP:
254 case PTLLND_MSG_TYPE_HELLO:
263 ptllnd_debug_tx(ptllnd_tx_t *tx)
265 CDEBUG(D_WARNING, "%s %s b %ld.%06ld/%ld.%06ld"
266 " r %ld.%06ld/%ld.%06ld status %d\n",
267 ptllnd_tx_typestr(tx->tx_type),
268 libcfs_id2str(tx->tx_peer->plp_id),
269 tx->tx_bulk_posted.tv_sec, tx->tx_bulk_posted.tv_usec,
270 tx->tx_bulk_done.tv_sec, tx->tx_bulk_done.tv_usec,
271 tx->tx_req_posted.tv_sec, tx->tx_req_posted.tv_usec,
272 tx->tx_req_done.tv_sec, tx->tx_req_done.tv_usec,
277 ptllnd_debug_peer(lnet_ni_t *ni, lnet_process_id_t id)
279 ptllnd_peer_t *plp = ptllnd_find_peer(ni, id, 0);
280 struct list_head *tmp;
281 ptllnd_ni_t *plni = ni->ni_data;
285 CDEBUG(D_WARNING, "No peer %s\n", libcfs_id2str(id));
289 CDEBUG(D_WARNING, "%s %s%s [%d] "LPU64".%06d m "LPU64" q %d/%d c %d/%d+%d(%d)\n",
291 plp->plp_recvd_hello ? "H" : "_",
292 plp->plp_closing ? "C" : "_",
294 plp->plp_stamp / 1000000, (int)(plp->plp_stamp % 1000000),
296 ptllnd_count_q(&plp->plp_txq),
297 ptllnd_count_q(&plp->plp_activeq),
298 plp->plp_credits, plp->plp_outstanding_credits, plp->plp_sent_credits,
299 plni->plni_peer_credits + plp->plp_lazy_credits);
301 CDEBUG(D_WARNING, "txq:\n");
302 list_for_each (tmp, &plp->plp_txq) {
303 tx = list_entry(tmp, ptllnd_tx_t, tx_list);
308 CDEBUG(D_WARNING, "activeq:\n");
309 list_for_each (tmp, &plp->plp_activeq) {
310 tx = list_entry(tmp, ptllnd_tx_t, tx_list);
315 CDEBUG(D_WARNING, "zombies:\n");
316 list_for_each (tmp, &plni->plni_zombie_txs) {
317 tx = list_entry(tmp, ptllnd_tx_t, tx_list);
319 if (tx->tx_peer->plp_id.nid == id.nid &&
320 tx->tx_peer->plp_id.pid == id.pid)
324 CDEBUG(D_WARNING, "history:\n");
325 list_for_each (tmp, &plni->plni_tx_history) {
326 tx = list_entry(tmp, ptllnd_tx_t, tx_list);
328 if (tx->tx_peer->plp_id.nid == id.nid &&
329 tx->tx_peer->plp_id.pid == id.pid)
333 ptllnd_peer_decref(plp);
337 ptllnd_dump_debug(lnet_ni_t *ni, lnet_process_id_t id)
339 ptllnd_debug_peer(ni, id);
340 ptllnd_dump_history();
344 ptllnd_notify(lnet_ni_t *ni, lnet_nid_t nid, int alive)
346 lnet_process_id_t id;
348 time_t start = cfs_time_current_sec();
349 ptllnd_ni_t *plni = ni->ni_data;
350 int w = plni->plni_long_wait;
352 /* This is only actually used to connect to routers at startup! */
356 id.pid = LUSTRE_SRV_LNET_PID;
358 peer = ptllnd_find_peer(ni, id, 1);
362 /* wait for the peer to reply */
363 while (!peer->plp_recvd_hello) {
364 if (w > 0 && cfs_time_current_sec() > start + w/1000) {
365 CWARN("Waited %ds to connect to %s\n",
366 (int)(cfs_time_current_sec() - start),
374 ptllnd_peer_decref(peer);
378 ptllnd_setasync(lnet_ni_t *ni, lnet_process_id_t id, int nasync)
380 ptllnd_peer_t *peer = ptllnd_find_peer(ni, id, nasync > 0);
386 LASSERT (peer->plp_lazy_credits >= 0);
387 LASSERT (peer->plp_extra_lazy_credits >= 0);
389 /* If nasync < 0, we're being told we can reduce the total message
390 * headroom. We can't do this right now because our peer might already
391 * have credits for the extra buffers, so we just account the extra
392 * headroom in case we need it later and only destroy buffers when the
395 * Note that the following condition handles this case, where it
396 * actually increases the extra lazy credit counter. */
398 if (nasync <= peer->plp_extra_lazy_credits) {
399 peer->plp_extra_lazy_credits -= nasync;
403 LASSERT (nasync > 0);
405 nasync -= peer->plp_extra_lazy_credits;
406 peer->plp_extra_lazy_credits = 0;
408 rc = ptllnd_size_buffers(ni, nasync);
410 peer->plp_lazy_credits += nasync;
411 peer->plp_outstanding_credits += nasync;
418 ptllnd_cksum (void *ptr, int nob)
424 sum = ((sum << 1) | (sum >> 31)) + *c++;
426 /* ensure I don't return 0 (== no checksum) */
427 return (sum == 0) ? 1 : sum;
431 ptllnd_new_tx(ptllnd_peer_t *peer, int type, int payload_nob)
433 lnet_ni_t *ni = peer->plp_ni;
434 ptllnd_ni_t *plni = ni->ni_data;
438 CDEBUG(D_NET, "peer=%p type=%d payload=%d\n", peer, type, payload_nob);
444 case PTLLND_RDMA_WRITE:
445 case PTLLND_RDMA_READ:
446 LASSERT (payload_nob == 0);
450 case PTLLND_MSG_TYPE_PUT:
451 case PTLLND_MSG_TYPE_GET:
452 LASSERT (payload_nob == 0);
453 msgsize = offsetof(kptl_msg_t, ptlm_u) +
454 sizeof(kptl_rdma_msg_t);
457 case PTLLND_MSG_TYPE_IMMEDIATE:
458 msgsize = offsetof(kptl_msg_t,
459 ptlm_u.immediate.kptlim_payload[payload_nob]);
462 case PTLLND_MSG_TYPE_NOOP:
463 LASSERT (payload_nob == 0);
464 msgsize = offsetof(kptl_msg_t, ptlm_u);
467 case PTLLND_MSG_TYPE_HELLO:
468 LASSERT (payload_nob == 0);
469 msgsize = offsetof(kptl_msg_t, ptlm_u) +
470 sizeof(kptl_hello_msg_t);
474 msgsize = (msgsize + 7) & ~7;
475 LASSERT (msgsize <= peer->plp_max_msg_size);
477 LIBCFS_ALLOC(tx, offsetof(ptllnd_tx_t, tx_msg) + msgsize);
480 CERROR("Can't allocate msg type %d for %s\n",
481 type, libcfs_id2str(peer->plp_id));
485 CFS_INIT_LIST_HEAD(&tx->tx_list);
488 tx->tx_lnetmsg = tx->tx_lnetreplymsg = NULL;
491 tx->tx_reqmdh = PTL_INVALID_HANDLE;
492 tx->tx_bulkmdh = PTL_INVALID_HANDLE;
493 tx->tx_msgsize = msgsize;
494 tx->tx_completing = 0;
497 memset(&tx->tx_bulk_posted, 0, sizeof(tx->tx_bulk_posted));
498 memset(&tx->tx_bulk_done, 0, sizeof(tx->tx_bulk_done));
499 memset(&tx->tx_req_posted, 0, sizeof(tx->tx_req_posted));
500 memset(&tx->tx_req_done, 0, sizeof(tx->tx_req_done));
503 tx->tx_msg.ptlm_magic = PTLLND_MSG_MAGIC;
504 tx->tx_msg.ptlm_version = PTLLND_MSG_VERSION;
505 tx->tx_msg.ptlm_type = type;
506 tx->tx_msg.ptlm_credits = 0;
507 tx->tx_msg.ptlm_nob = msgsize;
508 tx->tx_msg.ptlm_cksum = 0;
509 tx->tx_msg.ptlm_srcnid = ni->ni_nid;
510 tx->tx_msg.ptlm_srcstamp = plni->plni_stamp;
511 tx->tx_msg.ptlm_dstnid = peer->plp_id.nid;
512 tx->tx_msg.ptlm_dststamp = peer->plp_stamp;
513 tx->tx_msg.ptlm_srcpid = the_lnet.ln_pid;
514 tx->tx_msg.ptlm_dstpid = peer->plp_id.pid;
517 ptllnd_peer_addref(peer);
520 CDEBUG(D_NET, "tx=%p\n",tx);
526 ptllnd_abort_tx(ptllnd_tx_t *tx, ptl_handle_md_t *mdh)
528 ptllnd_peer_t *peer = tx->tx_peer;
529 lnet_ni_t *ni = peer->plp_ni;
531 time_t start = cfs_time_current_sec();
532 ptllnd_ni_t *plni = ni->ni_data;
533 int w = plni->plni_long_wait;
535 while (!PtlHandleIsEqual(*mdh, PTL_INVALID_HANDLE)) {
536 rc = PtlMDUnlink(*mdh);
537 #ifndef LUSTRE_PORTALS_UNLINK_SEMANTICS
538 if (rc == PTL_OK) /* unlink successful => no unlinked event */
540 LASSERT (rc == PTL_MD_IN_USE);
542 if (w > 0 && cfs_time_current_sec() > start + w/1000) {
543 CWARN("Waited %ds to abort tx to %s\n",
544 (int)(cfs_time_current_sec() - start),
545 libcfs_id2str(peer->plp_id));
548 /* Wait for ptllnd_tx_event() to invalidate */
554 ptllnd_cull_tx_history(ptllnd_ni_t *plni)
556 int max = plni->plni_max_tx_history;
558 while (plni->plni_ntx_history > max) {
559 ptllnd_tx_t *tx = list_entry(plni->plni_tx_history.next,
560 ptllnd_tx_t, tx_list);
561 list_del(&tx->tx_list);
563 ptllnd_peer_decref(tx->tx_peer);
565 LIBCFS_FREE(tx, offsetof(ptllnd_tx_t, tx_msg) + tx->tx_msgsize);
567 LASSERT (plni->plni_ntxs > 0);
569 plni->plni_ntx_history--;
574 ptllnd_tx_done(ptllnd_tx_t *tx)
576 ptllnd_peer_t *peer = tx->tx_peer;
577 lnet_ni_t *ni = peer->plp_ni;
578 ptllnd_ni_t *plni = ni->ni_data;
580 /* CAVEAT EMPTOR: If this tx is being aborted, I'll continue to get
581 * events for this tx until it's unlinked. So I set tx_completing to
582 * flag the tx is getting handled */
584 if (tx->tx_completing)
587 tx->tx_completing = 1;
589 if (!list_empty(&tx->tx_list))
590 list_del_init(&tx->tx_list);
592 if (tx->tx_status != 0) {
593 if (plni->plni_debug) {
594 CERROR("Completing tx for %s with error %d\n",
595 libcfs_id2str(peer->plp_id), tx->tx_status);
598 ptllnd_close_peer(peer, tx->tx_status);
601 ptllnd_abort_tx(tx, &tx->tx_reqmdh);
602 ptllnd_abort_tx(tx, &tx->tx_bulkmdh);
604 if (tx->tx_niov > 0) {
605 LIBCFS_FREE(tx->tx_iov, tx->tx_niov * sizeof(*tx->tx_iov));
609 if (tx->tx_lnetreplymsg != NULL) {
610 LASSERT (tx->tx_type == PTLLND_MSG_TYPE_GET);
611 LASSERT (tx->tx_lnetmsg != NULL);
612 /* Simulate GET success always */
613 lnet_finalize(ni, tx->tx_lnetmsg, 0);
614 CDEBUG(D_NET, "lnet_finalize(tx_lnetreplymsg=%p)\n",tx->tx_lnetreplymsg);
615 lnet_finalize(ni, tx->tx_lnetreplymsg, tx->tx_status);
616 } else if (tx->tx_lnetmsg != NULL) {
617 lnet_finalize(ni, tx->tx_lnetmsg, tx->tx_status);
620 plni->plni_ntx_history++;
621 list_add_tail(&tx->tx_list, &plni->plni_tx_history);
623 ptllnd_cull_tx_history(plni);
627 ptllnd_set_txiov(ptllnd_tx_t *tx,
628 unsigned int niov, struct iovec *iov,
629 unsigned int offset, unsigned int len)
631 ptl_md_iovec_t *piov;
640 * Remove iovec's at the beginning that
641 * are skipped because of the offset.
642 * Adjust the offset accordingly
646 if (offset < iov->iov_len)
648 offset -= iov->iov_len;
654 int temp_offset = offset;
656 LIBCFS_ALLOC(piov, niov * sizeof(*piov));
660 for (npiov = 0;; npiov++) {
661 LASSERT (npiov < niov);
662 LASSERT (iov->iov_len >= temp_offset);
664 piov[npiov].iov_base = iov[npiov].iov_base + temp_offset;
665 piov[npiov].iov_len = iov[npiov].iov_len - temp_offset;
667 if (piov[npiov].iov_len >= resid) {
668 piov[npiov].iov_len = resid;
672 resid -= piov[npiov].iov_len;
682 /* Dang! The piov I allocated was too big and it's a drag to
683 * have to maintain separate 'allocated' and 'used' sizes, so
684 * I'll just do it again; NB this doesn't happen normally... */
685 LIBCFS_FREE(piov, niov * sizeof(*piov));
691 ptllnd_set_md_buffer(ptl_md_t *md, ptllnd_tx_t *tx)
693 unsigned int niov = tx->tx_niov;
694 ptl_md_iovec_t *iov = tx->tx_iov;
696 LASSERT ((md->options & PTL_MD_IOVEC) == 0);
701 } else if (niov == 1) {
702 md->start = iov[0].iov_base;
703 md->length = iov[0].iov_len;
707 md->options |= PTL_MD_IOVEC;
712 ptllnd_post_buffer(ptllnd_buffer_t *buf)
714 lnet_ni_t *ni = buf->plb_ni;
715 ptllnd_ni_t *plni = ni->ni_data;
716 ptl_process_id_t anyid = {
720 .start = buf->plb_buffer,
721 .length = plni->plni_buffer_size,
722 .threshold = PTL_MD_THRESH_INF,
723 .max_size = plni->plni_max_msg_size,
724 .options = (PTLLND_MD_OPTIONS |
725 PTL_MD_OP_PUT | PTL_MD_MAX_SIZE |
726 PTL_MD_LOCAL_ALIGN8),
727 .user_ptr = ptllnd_obj2eventarg(buf, PTLLND_EVENTARG_TYPE_BUF),
728 .eq_handle = plni->plni_eqh};
732 LASSERT (!buf->plb_posted);
734 rc = PtlMEAttach(plni->plni_nih, plni->plni_portal,
735 anyid, LNET_MSG_MATCHBITS, 0,
736 PTL_UNLINK, PTL_INS_AFTER, &meh);
738 CERROR("PtlMEAttach failed: %s(%d)\n",
739 ptllnd_errtype2str(rc), rc);
744 plni->plni_nposted_buffers++;
746 rc = PtlMDAttach(meh, md, LNET_UNLINK, &buf->plb_md);
750 CERROR("PtlMDAttach failed: %s(%d)\n",
751 ptllnd_errtype2str(rc), rc);
754 plni->plni_nposted_buffers--;
756 rc = PtlMEUnlink(meh);
757 LASSERT (rc == PTL_OK);
763 ptllnd_check_sends(ptllnd_peer_t *peer)
765 lnet_ni_t *ni = peer->plp_ni;
766 ptllnd_ni_t *plni = ni->ni_data;
772 CDEBUG(D_NET, "%s: [%d/%d+%d(%d)\n",
773 libcfs_id2str(peer->plp_id), peer->plp_credits,
774 peer->plp_outstanding_credits, peer->plp_sent_credits,
775 plni->plni_peer_credits + peer->plp_lazy_credits);
777 if (list_empty(&peer->plp_txq) &&
778 peer->plp_outstanding_credits >= PTLLND_CREDIT_HIGHWATER(plni) &&
779 peer->plp_credits != 0) {
781 tx = ptllnd_new_tx(peer, PTLLND_MSG_TYPE_NOOP, 0);
782 CDEBUG(D_NET, "NOOP tx=%p\n",tx);
784 CERROR("Can't return credits to %s\n",
785 libcfs_id2str(peer->plp_id));
787 ptllnd_set_tx_deadline(tx);
788 list_add_tail(&tx->tx_list, &peer->plp_txq);
792 while (!list_empty(&peer->plp_txq)) {
793 tx = list_entry(peer->plp_txq.next, ptllnd_tx_t, tx_list);
795 LASSERT (tx->tx_msgsize > 0);
797 LASSERT (peer->plp_outstanding_credits >= 0);
798 LASSERT (peer->plp_sent_credits >= 0);
799 LASSERT (peer->plp_outstanding_credits + peer->plp_sent_credits
800 <= plni->plni_peer_credits + peer->plp_lazy_credits);
801 LASSERT (peer->plp_credits >= 0);
803 if (peer->plp_credits == 0) { /* no credits */
804 PTLLND_HISTORY("%s[%d/%d+%d(%d)]: no creds for %p",
805 libcfs_id2str(peer->plp_id),
807 peer->plp_outstanding_credits,
808 peer->plp_sent_credits,
809 plni->plni_peer_credits +
810 peer->plp_lazy_credits, tx);
814 if (peer->plp_credits == 1 && /* last credit reserved for */
815 peer->plp_outstanding_credits == 0) { /* returning credits */
816 PTLLND_HISTORY("%s[%d/%d+%d(%d)]: too few creds for %p",
817 libcfs_id2str(peer->plp_id),
819 peer->plp_outstanding_credits,
820 peer->plp_sent_credits,
821 plni->plni_peer_credits +
822 peer->plp_lazy_credits, tx);
826 list_del(&tx->tx_list);
827 list_add_tail(&tx->tx_list, &peer->plp_activeq);
829 CDEBUG(D_NET, "Sending at TX=%p type=%s (%d)\n",tx,
830 ptllnd_msgtype2str(tx->tx_type),tx->tx_type);
832 if (tx->tx_type == PTLLND_MSG_TYPE_NOOP &&
833 (!list_empty(&peer->plp_txq) ||
834 peer->plp_outstanding_credits <
835 PTLLND_CREDIT_HIGHWATER(plni))) {
841 /* Set stamp at the last minute; on a new peer, I don't know it
842 * until I receive the HELLO back */
843 tx->tx_msg.ptlm_dststamp = peer->plp_stamp;
846 * Return all the credits we have
848 tx->tx_msg.ptlm_credits = peer->plp_outstanding_credits;
849 peer->plp_sent_credits += peer->plp_outstanding_credits;
850 peer->plp_outstanding_credits = 0;
857 if (plni->plni_checksum)
858 tx->tx_msg.ptlm_cksum =
859 ptllnd_cksum(&tx->tx_msg,
860 offsetof(kptl_msg_t, ptlm_u));
862 md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);
863 md.eq_handle = plni->plni_eqh;
865 md.options = PTLLND_MD_OPTIONS;
866 md.start = &tx->tx_msg;
867 md.length = tx->tx_msgsize;
869 rc = PtlMDBind(plni->plni_nih, md, LNET_UNLINK, &mdh);
871 CERROR("PtlMDBind for %s failed: %s(%d)\n",
872 libcfs_id2str(peer->plp_id),
873 ptllnd_errtype2str(rc), rc);
874 tx->tx_status = -EIO;
879 LASSERT (tx->tx_type != PTLLND_RDMA_WRITE &&
880 tx->tx_type != PTLLND_RDMA_READ);
883 gettimeofday(&tx->tx_req_posted, NULL);
885 PTLLND_HISTORY("%s[%d/%d+%d(%d)]: %s %p c %d",
886 libcfs_id2str(peer->plp_id),
888 peer->plp_outstanding_credits,
889 peer->plp_sent_credits,
890 plni->plni_peer_credits +
891 peer->plp_lazy_credits,
892 ptllnd_msgtype2str(tx->tx_type), tx,
893 tx->tx_msg.ptlm_credits);
895 rc = PtlPut(mdh, PTL_NOACK_REQ, peer->plp_ptlid,
896 plni->plni_portal, 0, LNET_MSG_MATCHBITS, 0, 0);
898 CERROR("PtlPut for %s failed: %s(%d)\n",
899 libcfs_id2str(peer->plp_id),
900 ptllnd_errtype2str(rc), rc);
901 tx->tx_status = -EIO;
909 ptllnd_passive_rdma(ptllnd_peer_t *peer, int type, lnet_msg_t *msg,
910 unsigned int niov, struct iovec *iov,
911 unsigned int offset, unsigned int len)
913 lnet_ni_t *ni = peer->plp_ni;
914 ptllnd_ni_t *plni = ni->ni_data;
915 ptllnd_tx_t *tx = ptllnd_new_tx(peer, type, 0);
925 CDEBUG(D_NET, "niov=%d offset=%d len=%d\n",niov,offset,len);
927 LASSERT (type == PTLLND_MSG_TYPE_GET ||
928 type == PTLLND_MSG_TYPE_PUT);
931 CERROR("Can't allocate %s tx for %s\n",
932 type == PTLLND_MSG_TYPE_GET ? "GET" : "PUT/REPLY",
933 libcfs_id2str(peer->plp_id));
937 rc = ptllnd_set_txiov(tx, niov, iov, offset, len);
939 CERROR ("Can't allocate iov %d for %s\n",
940 niov, libcfs_id2str(peer->plp_id));
945 md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);
946 md.eq_handle = plni->plni_eqh;
949 md.options = PTLLND_MD_OPTIONS;
950 if(type == PTLLND_MSG_TYPE_GET)
951 md.options |= PTL_MD_OP_PUT | PTL_MD_ACK_DISABLE;
953 md.options |= PTL_MD_OP_GET;
954 ptllnd_set_md_buffer(&md, tx);
956 start = cfs_time_current_sec();
957 w = plni->plni_long_wait;
959 while (!peer->plp_recvd_hello) { /* wait to validate plp_match */
960 if (peer->plp_closing) {
964 if (w > 0 && cfs_time_current_sec() > start + w/1000) {
965 CWARN("Waited %ds to connect to %s\n",
966 (int)(cfs_time_current_sec() - start),
967 libcfs_id2str(peer->plp_id));
973 if (peer->plp_match < PTL_RESERVED_MATCHBITS)
974 peer->plp_match = PTL_RESERVED_MATCHBITS;
975 matchbits = peer->plp_match++;
977 rc = PtlMEAttach(plni->plni_nih, plni->plni_portal, peer->plp_ptlid,
978 matchbits, 0, PTL_UNLINK, PTL_INS_BEFORE, &meh);
980 CERROR("PtlMEAttach for %s failed: %s(%d)\n",
981 libcfs_id2str(peer->plp_id),
982 ptllnd_errtype2str(rc), rc);
987 gettimeofday(&tx->tx_bulk_posted, NULL);
989 rc = PtlMDAttach(meh, md, LNET_UNLINK, &mdh);
991 CERROR("PtlMDAttach for %s failed: %s(%d)\n",
992 libcfs_id2str(peer->plp_id),
993 ptllnd_errtype2str(rc), rc);
994 rc2 = PtlMEUnlink(meh);
995 LASSERT (rc2 == PTL_OK);
999 tx->tx_bulkmdh = mdh;
1002 * We need to set the stamp here because it
1003 * we could have received a HELLO above that set
1006 tx->tx_msg.ptlm_dststamp = peer->plp_stamp;
1008 tx->tx_msg.ptlm_u.rdma.kptlrm_hdr = msg->msg_hdr;
1009 tx->tx_msg.ptlm_u.rdma.kptlrm_matchbits = matchbits;
1011 if (type == PTLLND_MSG_TYPE_GET) {
1012 tx->tx_lnetreplymsg = lnet_create_reply_msg(ni, msg);
1013 if (tx->tx_lnetreplymsg == NULL) {
1014 CERROR("Can't create reply for GET to %s\n",
1015 libcfs_id2str(msg->msg_target));
1021 tx->tx_lnetmsg = msg;
1022 PTLLND_HISTORY("%s[%d/%d+%d(%d)]: post passive %s p %d %p",
1023 libcfs_id2str(msg->msg_target),
1024 peer->plp_credits, peer->plp_outstanding_credits,
1025 peer->plp_sent_credits,
1026 plni->plni_peer_credits + peer->plp_lazy_credits,
1027 lnet_msgtyp2str(msg->msg_type),
1028 (le32_to_cpu(msg->msg_type) == LNET_MSG_PUT) ?
1029 le32_to_cpu(msg->msg_hdr.msg.put.ptl_index) :
1030 (le32_to_cpu(msg->msg_type) == LNET_MSG_GET) ?
1031 le32_to_cpu(msg->msg_hdr.msg.get.ptl_index) : -1,
1042 ptllnd_active_rdma(ptllnd_peer_t *peer, int type,
1043 lnet_msg_t *msg, __u64 matchbits,
1044 unsigned int niov, struct iovec *iov,
1045 unsigned int offset, unsigned int len)
1047 lnet_ni_t *ni = peer->plp_ni;
1048 ptllnd_ni_t *plni = ni->ni_data;
1049 ptllnd_tx_t *tx = ptllnd_new_tx(peer, type, 0);
1051 ptl_handle_md_t mdh;
1054 LASSERT (type == PTLLND_RDMA_READ ||
1055 type == PTLLND_RDMA_WRITE);
1058 CERROR("Can't allocate tx for RDMA %s with %s\n",
1059 (type == PTLLND_RDMA_WRITE) ? "write" : "read",
1060 libcfs_id2str(peer->plp_id));
1061 ptllnd_close_peer(peer, -ENOMEM);
1065 rc = ptllnd_set_txiov(tx, niov, iov, offset, len);
1067 CERROR ("Can't allocate iov %d for %s\n",
1068 niov, libcfs_id2str(peer->plp_id));
1073 md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);
1074 md.eq_handle = plni->plni_eqh;
1076 md.options = PTLLND_MD_OPTIONS;
1077 md.threshold = (type == PTLLND_RDMA_READ) ? 2 : 1;
1079 ptllnd_set_md_buffer(&md, tx);
1081 rc = PtlMDBind(plni->plni_nih, md, LNET_UNLINK, &mdh);
1083 CERROR("PtlMDBind for %s failed: %s(%d)\n",
1084 libcfs_id2str(peer->plp_id),
1085 ptllnd_errtype2str(rc), rc);
1090 tx->tx_bulkmdh = mdh;
1091 tx->tx_lnetmsg = msg;
1093 ptllnd_set_tx_deadline(tx);
1094 list_add_tail(&tx->tx_list, &peer->plp_activeq);
1095 gettimeofday(&tx->tx_bulk_posted, NULL);
1097 if (type == PTLLND_RDMA_READ)
1098 rc = PtlGet(mdh, peer->plp_ptlid,
1099 plni->plni_portal, 0, matchbits, 0);
1101 rc = PtlPut(mdh, PTL_NOACK_REQ, peer->plp_ptlid,
1102 plni->plni_portal, 0, matchbits, 0,
1103 (msg == NULL) ? PTLLND_RDMA_FAIL : PTLLND_RDMA_OK);
1108 CERROR("Can't initiate RDMA with %s: %s(%d)\n",
1109 libcfs_id2str(peer->plp_id),
1110 ptllnd_errtype2str(rc), rc);
1112 tx->tx_lnetmsg = NULL;
1115 ptllnd_tx_done(tx); /* this will close peer */
1120 ptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *msg)
1122 ptllnd_ni_t *plni = ni->ni_data;
1128 LASSERT (!msg->msg_routing);
1129 LASSERT (msg->msg_kiov == NULL);
1131 LASSERT (msg->msg_niov <= PTL_MD_MAX_IOV); /* !!! */
1133 CDEBUG(D_NET, "%s [%d]+%d,%d -> %s%s\n",
1134 lnet_msgtyp2str(msg->msg_type),
1135 msg->msg_niov, msg->msg_offset, msg->msg_len,
1136 libcfs_nid2str(msg->msg_target.nid),
1137 msg->msg_target_is_router ? "(rtr)" : "");
1139 if ((msg->msg_target.pid & LNET_PID_USERFLAG) != 0) {
1140 CERROR("Can't send to non-kernel peer %s\n",
1141 libcfs_id2str(msg->msg_target));
1142 return -EHOSTUNREACH;
1145 plp = ptllnd_find_peer(ni, msg->msg_target, 1);
1149 switch (msg->msg_type) {
1154 LASSERT (msg->msg_len == 0);
1155 break; /* send IMMEDIATE */
1158 if (msg->msg_target_is_router)
1159 break; /* send IMMEDIATE */
1161 nob = msg->msg_md->md_length;
1162 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[nob]);
1163 if (nob <= plni->plni_max_msg_size)
1166 LASSERT ((msg->msg_md->md_options & LNET_MD_KIOV) == 0);
1167 rc = ptllnd_passive_rdma(plp, PTLLND_MSG_TYPE_GET, msg,
1168 msg->msg_md->md_niov,
1169 msg->msg_md->md_iov.iov,
1170 0, msg->msg_md->md_length);
1171 ptllnd_peer_decref(plp);
1174 case LNET_MSG_REPLY:
1177 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[nob]);
1178 if (nob <= plp->plp_max_msg_size)
1179 break; /* send IMMEDIATE */
1181 rc = ptllnd_passive_rdma(plp, PTLLND_MSG_TYPE_PUT, msg,
1182 msg->msg_niov, msg->msg_iov,
1183 msg->msg_offset, msg->msg_len);
1184 ptllnd_peer_decref(plp);
1189 * NB copy the payload so we don't have to do a fragmented send */
1191 tx = ptllnd_new_tx(plp, PTLLND_MSG_TYPE_IMMEDIATE, msg->msg_len);
1193 CERROR("Can't allocate tx for lnet type %d to %s\n",
1194 msg->msg_type, libcfs_id2str(msg->msg_target));
1195 ptllnd_peer_decref(plp);
1199 lnet_copy_iov2flat(tx->tx_msgsize, &tx->tx_msg,
1200 offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload),
1201 msg->msg_niov, msg->msg_iov, msg->msg_offset,
1203 tx->tx_msg.ptlm_u.immediate.kptlim_hdr = msg->msg_hdr;
1205 tx->tx_lnetmsg = msg;
1206 PTLLND_HISTORY("%s[%d/%d+%d(%d)]: post immediate %s p %d %p",
1207 libcfs_id2str(msg->msg_target),
1208 plp->plp_credits, plp->plp_outstanding_credits,
1209 plp->plp_sent_credits,
1210 plni->plni_peer_credits + plp->plp_lazy_credits,
1211 lnet_msgtyp2str(msg->msg_type),
1212 (le32_to_cpu(msg->msg_type) == LNET_MSG_PUT) ?
1213 le32_to_cpu(msg->msg_hdr.msg.put.ptl_index) :
1214 (le32_to_cpu(msg->msg_type) == LNET_MSG_GET) ?
1215 le32_to_cpu(msg->msg_hdr.msg.get.ptl_index) : -1,
1218 ptllnd_peer_decref(plp);
1223 ptllnd_rx_done(ptllnd_rx_t *rx)
1225 ptllnd_peer_t *plp = rx->rx_peer;
1226 lnet_ni_t *ni = plp->plp_ni;
1227 ptllnd_ni_t *plni = ni->ni_data;
1229 plp->plp_outstanding_credits++;
1231 PTLLND_HISTORY("%s[%d/%d+%d(%d)]: rx=%p done\n",
1232 libcfs_id2str(plp->plp_id),
1233 plp->plp_credits, plp->plp_outstanding_credits,
1234 plp->plp_sent_credits,
1235 plni->plni_peer_credits + plp->plp_lazy_credits, rx);
1237 ptllnd_check_sends(rx->rx_peer);
1239 LASSERT (plni->plni_nrxs > 0);
1244 ptllnd_eager_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg,
1245 void **new_privatep)
1247 /* Shouldn't get here; recvs only block for router buffers */
1253 ptllnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg,
1254 int delayed, unsigned int niov,
1255 struct iovec *iov, lnet_kiov_t *kiov,
1256 unsigned int offset, unsigned int mlen, unsigned int rlen)
1258 ptllnd_rx_t *rx = private;
1262 LASSERT (kiov == NULL);
1263 LASSERT (niov <= PTL_MD_MAX_IOV); /* !!! */
1265 switch (rx->rx_msg->ptlm_type) {
1269 case PTLLND_MSG_TYPE_IMMEDIATE:
1270 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[mlen]);
1271 if (nob > rx->rx_nob) {
1272 CERROR("Immediate message from %s too big: %d(%d)\n",
1273 libcfs_id2str(rx->rx_peer->plp_id),
1278 lnet_copy_flat2iov(niov, iov, offset,
1279 rx->rx_nob, rx->rx_msg,
1280 offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload),
1282 lnet_finalize(ni, msg, 0);
1285 case PTLLND_MSG_TYPE_PUT:
1286 rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_READ, msg,
1287 rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,
1288 niov, iov, offset, mlen);
1291 case PTLLND_MSG_TYPE_GET:
1293 rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_WRITE, msg,
1294 rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,
1295 msg->msg_niov, msg->msg_iov,
1296 msg->msg_offset, msg->msg_len);
1298 rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_WRITE, NULL,
1299 rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,
1309 ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator,
1310 kptl_msg_t *msg, unsigned int nob)
1312 ptllnd_ni_t *plni = ni->ni_data;
1313 const int basenob = offsetof(kptl_msg_t, ptlm_u);
1314 lnet_process_id_t srcid;
1323 CERROR("Very short receive from %s\n",
1324 ptllnd_ptlid2str(initiator));
1328 /* I can at least read MAGIC/VERSION */
1330 flip = msg->ptlm_magic == __swab32(PTLLND_MSG_MAGIC);
1331 if (!flip && msg->ptlm_magic != PTLLND_MSG_MAGIC) {
1332 CERROR("Bad protocol magic %08x from %s\n",
1333 msg->ptlm_magic, ptllnd_ptlid2str(initiator));
1337 msg_version = flip ? __swab16(msg->ptlm_version) : msg->ptlm_version;
1339 if (msg_version != PTLLND_MSG_VERSION) {
1340 CERROR("Bad protocol version %04x from %s: %04x expected\n",
1341 (__u32)msg_version, ptllnd_ptlid2str(initiator), PTLLND_MSG_VERSION);
1343 if (plni->plni_abort_on_protocol_mismatch)
1349 if (nob < basenob) {
1350 CERROR("Short receive from %s: got %d, wanted at least %d\n",
1351 ptllnd_ptlid2str(initiator), nob, basenob);
1355 /* checksum must be computed with
1356 * 1) ptlm_cksum zero and
1357 * 2) BEFORE anything gets modified/flipped
1359 msg_cksum = flip ? __swab32(msg->ptlm_cksum) : msg->ptlm_cksum;
1360 msg->ptlm_cksum = 0;
1361 if (msg_cksum != 0 &&
1362 msg_cksum != ptllnd_cksum(msg, offsetof(kptl_msg_t, ptlm_u))) {
1363 CERROR("Bad checksum from %s\n", ptllnd_ptlid2str(initiator));
1367 msg->ptlm_version = msg_version;
1368 msg->ptlm_cksum = msg_cksum;
1371 /* NB stamps are opaque cookies */
1372 __swab32s(&msg->ptlm_nob);
1373 __swab64s(&msg->ptlm_srcnid);
1374 __swab64s(&msg->ptlm_dstnid);
1375 __swab32s(&msg->ptlm_srcpid);
1376 __swab32s(&msg->ptlm_dstpid);
1379 srcid.nid = msg->ptlm_srcnid;
1380 srcid.pid = msg->ptlm_srcpid;
1382 if (LNET_NIDNET(msg->ptlm_srcnid) != LNET_NIDNET(ni->ni_nid)) {
1383 CERROR("Bad source id %s from %s\n",
1384 libcfs_id2str(srcid),
1385 ptllnd_ptlid2str(initiator));
1389 if (msg->ptlm_type == PTLLND_MSG_TYPE_NAK) {
1390 CERROR("NAK from %s (%s)\n",
1391 libcfs_id2str(srcid),
1392 ptllnd_ptlid2str(initiator));
1394 if (plni->plni_dump_on_nak)
1395 ptllnd_dump_debug(ni, srcid);
1397 if (plni->plni_abort_on_nak)
1403 if (msg->ptlm_dstnid != ni->ni_nid ||
1404 msg->ptlm_dstpid != the_lnet.ln_pid) {
1405 CERROR("Bad dstid %s (%s expected) from %s\n",
1406 libcfs_id2str((lnet_process_id_t) {
1407 .nid = msg->ptlm_dstnid,
1408 .pid = msg->ptlm_dstpid}),
1409 libcfs_id2str((lnet_process_id_t) {
1411 .pid = the_lnet.ln_pid}),
1412 libcfs_id2str(srcid));
1416 if (msg->ptlm_dststamp != plni->plni_stamp) {
1417 CERROR("Bad dststamp "LPX64"("LPX64" expected) from %s\n",
1418 msg->ptlm_dststamp, plni->plni_stamp,
1419 libcfs_id2str(srcid));
1423 PTLLND_HISTORY("RX %s: %s %d %p", libcfs_id2str(srcid),
1424 ptllnd_msgtype2str(msg->ptlm_type),
1425 msg->ptlm_credits, &rx);
1427 switch (msg->ptlm_type) {
1428 case PTLLND_MSG_TYPE_PUT:
1429 case PTLLND_MSG_TYPE_GET:
1430 if (nob < basenob + sizeof(kptl_rdma_msg_t)) {
1431 CERROR("Short rdma request from %s(%s)\n",
1432 libcfs_id2str(srcid),
1433 ptllnd_ptlid2str(initiator));
1437 __swab64s(&msg->ptlm_u.rdma.kptlrm_matchbits);
1440 case PTLLND_MSG_TYPE_IMMEDIATE:
1441 if (nob < offsetof(kptl_msg_t,
1442 ptlm_u.immediate.kptlim_payload)) {
1443 CERROR("Short immediate from %s(%s)\n",
1444 libcfs_id2str(srcid),
1445 ptllnd_ptlid2str(initiator));
1450 case PTLLND_MSG_TYPE_HELLO:
1451 if (nob < basenob + sizeof(kptl_hello_msg_t)) {
1452 CERROR("Short hello from %s(%s)\n",
1453 libcfs_id2str(srcid),
1454 ptllnd_ptlid2str(initiator));
1458 __swab64s(&msg->ptlm_u.hello.kptlhm_matchbits);
1459 __swab32s(&msg->ptlm_u.hello.kptlhm_max_msg_size);
1463 case PTLLND_MSG_TYPE_NOOP:
1467 CERROR("Bad message type %d from %s(%s)\n", msg->ptlm_type,
1468 libcfs_id2str(srcid),
1469 ptllnd_ptlid2str(initiator));
1473 plp = ptllnd_find_peer(ni, srcid, 0);
1475 CERROR("Can't find peer %s\n", libcfs_id2str(srcid));
1479 if (msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {
1480 if (plp->plp_recvd_hello) {
1481 CERROR("Unexpected HELLO from %s\n",
1482 libcfs_id2str(srcid));
1483 ptllnd_peer_decref(plp);
1487 plp->plp_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size;
1488 plp->plp_match = msg->ptlm_u.hello.kptlhm_matchbits;
1489 plp->plp_stamp = msg->ptlm_srcstamp;
1490 plp->plp_recvd_hello = 1;
1492 } else if (!plp->plp_recvd_hello) {
1494 CERROR("Bad message type %d (HELLO expected) from %s\n",
1495 msg->ptlm_type, libcfs_id2str(srcid));
1496 ptllnd_peer_decref(plp);
1499 } else if (msg->ptlm_srcstamp != plp->plp_stamp) {
1501 CERROR("Bad srcstamp "LPX64"("LPX64" expected) from %s\n",
1502 msg->ptlm_srcstamp, plp->plp_stamp,
1503 libcfs_id2str(srcid));
1504 ptllnd_peer_decref(plp);
1508 /* Check peer only sends when I've sent her credits */
1509 if (plp->plp_sent_credits == 0) {
1510 CERROR("%s[%d/%d+%d(%d)]: unexpected message\n",
1511 libcfs_id2str(plp->plp_id),
1512 plp->plp_credits, plp->plp_outstanding_credits,
1513 plp->plp_sent_credits,
1514 plni->plni_peer_credits + plp->plp_lazy_credits);
1517 plp->plp_sent_credits--;
1519 /* No check for credit overflow - the peer may post new buffers after
1520 * the startup handshake. */
1521 if (msg->ptlm_credits > 0) {
1522 plp->plp_credits += msg->ptlm_credits;
1523 ptllnd_check_sends(plp);
1526 /* All OK so far; assume the message is good... */
1533 switch (msg->ptlm_type) {
1534 default: /* message types have been checked already */
1535 ptllnd_rx_done(&rx);
1538 case PTLLND_MSG_TYPE_PUT:
1539 case PTLLND_MSG_TYPE_GET:
1540 rc = lnet_parse(ni, &msg->ptlm_u.rdma.kptlrm_hdr,
1541 msg->ptlm_srcnid, &rx, 1);
1543 ptllnd_rx_done(&rx);
1546 case PTLLND_MSG_TYPE_IMMEDIATE:
1547 rc = lnet_parse(ni, &msg->ptlm_u.immediate.kptlim_hdr,
1548 msg->ptlm_srcnid, &rx, 0);
1550 ptllnd_rx_done(&rx);
1554 ptllnd_peer_decref(plp);
1558 ptllnd_buf_event (lnet_ni_t *ni, ptl_event_t *event)
1560 ptllnd_buffer_t *buf = ptllnd_eventarg2obj(event->md.user_ptr);
1561 ptllnd_ni_t *plni = ni->ni_data;
1562 char *msg = &buf->plb_buffer[event->offset];
1564 int unlinked = event->type == PTL_EVENT_UNLINK;
1566 LASSERT (buf->plb_ni == ni);
1567 LASSERT (event->type == PTL_EVENT_PUT_END ||
1568 event->type == PTL_EVENT_UNLINK);
1570 if (event->ni_fail_type != PTL_NI_OK) {
1572 CERROR("event type %s(%d), status %s(%d) from %s\n",
1573 ptllnd_evtype2str(event->type), event->type,
1574 ptllnd_errtype2str(event->ni_fail_type),
1575 event->ni_fail_type,
1576 ptllnd_ptlid2str(event->initiator));
1578 } else if (event->type == PTL_EVENT_PUT_END) {
1579 #if (PTL_MD_LOCAL_ALIGN8 == 0)
1580 /* Portals can't force message alignment - someone sending an
1581 * odd-length message could misalign subsequent messages */
1582 if ((event->mlength & 7) != 0) {
1583 CERROR("Message from %s has odd length %llu: "
1584 "probable version incompatibility\n",
1585 ptllnd_ptlid2str(event->initiator),
1590 LASSERT ((event->offset & 7) == 0);
1592 ptllnd_parse_request(ni, event->initiator,
1593 (kptl_msg_t *)msg, event->mlength);
1596 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
1597 /* UNLINK event only on explicit unlink */
1598 repost = (event->unlinked && event->type != PTL_EVENT_UNLINK);
1599 if (event->unlinked)
1602 /* UNLINK event only on implicit unlink */
1603 repost = (event->type == PTL_EVENT_UNLINK);
1607 LASSERT(buf->plb_posted);
1608 buf->plb_posted = 0;
1609 plni->plni_nposted_buffers--;
1613 (void) ptllnd_post_buffer(buf);
1617 ptllnd_tx_event (lnet_ni_t *ni, ptl_event_t *event)
1619 ptllnd_ni_t *plni = ni->ni_data;
1620 ptllnd_tx_t *tx = ptllnd_eventarg2obj(event->md.user_ptr);
1621 int error = (event->ni_fail_type != PTL_NI_OK);
1624 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
1625 int unlinked = event->unlinked;
1627 int unlinked = (event->type == PTL_EVENT_UNLINK);
1631 CERROR("Error %s(%d) event %s(%d) unlinked %d, %s(%d) for %s\n",
1632 ptllnd_errtype2str(event->ni_fail_type),
1633 event->ni_fail_type,
1634 ptllnd_evtype2str(event->type), event->type,
1635 unlinked, ptllnd_msgtype2str(tx->tx_type), tx->tx_type,
1636 libcfs_id2str(tx->tx_peer->plp_id));
1638 LASSERT (!PtlHandleIsEqual(event->md_handle, PTL_INVALID_HANDLE));
1640 isreq = PtlHandleIsEqual(event->md_handle, tx->tx_reqmdh);
1642 LASSERT (event->md.start == (void *)&tx->tx_msg);
1644 tx->tx_reqmdh = PTL_INVALID_HANDLE;
1645 gettimeofday(&tx->tx_req_done, NULL);
1649 isbulk = PtlHandleIsEqual(event->md_handle, tx->tx_bulkmdh);
1650 if ( isbulk && unlinked ) {
1651 tx->tx_bulkmdh = PTL_INVALID_HANDLE;
1652 gettimeofday(&tx->tx_bulk_done, NULL);
1655 LASSERT (!isreq != !isbulk); /* always one and only 1 match */
1657 PTLLND_HISTORY("%s[%d/%d+%d(%d)]: TX done %p %s%s",
1658 libcfs_id2str(tx->tx_peer->plp_id),
1659 tx->tx_peer->plp_credits,
1660 tx->tx_peer->plp_outstanding_credits,
1661 tx->tx_peer->plp_sent_credits,
1662 plni->plni_peer_credits + tx->tx_peer->plp_lazy_credits,
1663 tx, isreq ? "REQ" : "BULK", unlinked ? "(unlinked)" : "");
1665 LASSERT (!isreq != !isbulk); /* always one and only 1 match */
1666 switch (tx->tx_type) {
1670 case PTLLND_MSG_TYPE_NOOP:
1671 case PTLLND_MSG_TYPE_HELLO:
1672 case PTLLND_MSG_TYPE_IMMEDIATE:
1673 LASSERT (event->type == PTL_EVENT_UNLINK ||
1674 event->type == PTL_EVENT_SEND_END);
1678 case PTLLND_MSG_TYPE_GET:
1679 LASSERT (event->type == PTL_EVENT_UNLINK ||
1680 (isreq && event->type == PTL_EVENT_SEND_END) ||
1681 (isbulk && event->type == PTL_EVENT_PUT_END));
1683 if (isbulk && !error && event->type == PTL_EVENT_PUT_END) {
1684 /* Check GET matched */
1685 if (event->hdr_data == PTLLND_RDMA_OK) {
1686 lnet_set_reply_msg_len(ni,
1687 tx->tx_lnetreplymsg,
1690 CERROR ("Unmatched GET with %s\n",
1691 libcfs_id2str(tx->tx_peer->plp_id));
1692 tx->tx_status = -EIO;
1697 case PTLLND_MSG_TYPE_PUT:
1698 LASSERT (event->type == PTL_EVENT_UNLINK ||
1699 (isreq && event->type == PTL_EVENT_SEND_END) ||
1700 (isbulk && event->type == PTL_EVENT_GET_END));
1703 case PTLLND_RDMA_READ:
1704 LASSERT (event->type == PTL_EVENT_UNLINK ||
1705 event->type == PTL_EVENT_SEND_END ||
1706 event->type == PTL_EVENT_REPLY_END);
1710 case PTLLND_RDMA_WRITE:
1711 LASSERT (event->type == PTL_EVENT_UNLINK ||
1712 event->type == PTL_EVENT_SEND_END);
1716 /* Schedule ptllnd_tx_done() on error or last completion event */
1718 (PtlHandleIsEqual(tx->tx_bulkmdh, PTL_INVALID_HANDLE) &&
1719 PtlHandleIsEqual(tx->tx_reqmdh, PTL_INVALID_HANDLE))) {
1721 tx->tx_status = -EIO;
1722 list_del(&tx->tx_list);
1723 list_add_tail(&tx->tx_list, &plni->plni_zombie_txs);
1728 ptllnd_find_timed_out_tx(ptllnd_peer_t *peer)
1730 time_t now = cfs_time_current_sec();
1731 struct list_head *tmp;
1733 list_for_each(tmp, &peer->plp_txq) {
1734 ptllnd_tx_t *tx = list_entry(tmp, ptllnd_tx_t, tx_list);
1736 if (tx->tx_deadline < now)
1740 list_for_each(tmp, &peer->plp_activeq) {
1741 ptllnd_tx_t *tx = list_entry(tmp, ptllnd_tx_t, tx_list);
1743 if (tx->tx_deadline < now)
1751 ptllnd_check_peer(ptllnd_peer_t *peer)
1753 ptllnd_tx_t *tx = ptllnd_find_timed_out_tx(peer);
1758 CERROR("%s: timed out\n", libcfs_id2str(peer->plp_id));
1759 ptllnd_close_peer(peer, -ETIMEDOUT);
1763 ptllnd_watchdog (lnet_ni_t *ni, time_t now)
1765 ptllnd_ni_t *plni = ni->ni_data;
1767 int p = plni->plni_watchdog_interval;
1768 int chunk = plni->plni_peer_hash_size;
1769 int interval = now - (plni->plni_watchdog_nextt - p);
1771 struct list_head *hashlist;
1772 struct list_head *tmp;
1773 struct list_head *nxt;
1775 /* Time to check for RDMA timeouts on a few more peers:
1776 * I try to do checks every 'p' seconds on a proportion of the peer
1777 * table and I need to check every connection 'n' times within a
1778 * timeout interval, to ensure I detect a timeout on any connection
1779 * within (n+1)/n times the timeout interval. */
1781 LASSERT (now >= plni->plni_watchdog_nextt);
1783 if (plni->plni_timeout > n * interval) { /* Scan less than the whole table? */
1784 chunk = (chunk * n * interval) / plni->plni_timeout;
1789 for (i = 0; i < chunk; i++) {
1790 hashlist = &plni->plni_peer_hash[plni->plni_watchdog_peeridx];
1792 list_for_each_safe(tmp, nxt, hashlist) {
1793 ptllnd_check_peer(list_entry(tmp, ptllnd_peer_t, plp_list));
1796 plni->plni_watchdog_peeridx = (plni->plni_watchdog_peeridx + 1) %
1797 plni->plni_peer_hash_size;
1800 plni->plni_watchdog_nextt = now + p;
1804 ptllnd_wait (lnet_ni_t *ni, int milliseconds)
1806 static struct timeval prevt;
1807 static int prevt_count;
1808 static int call_count;
1810 struct timeval start;
1811 struct timeval then;
1813 struct timeval deadline;
1815 ptllnd_ni_t *plni = ni->ni_data;
1823 /* Handle any currently queued events, returning immediately if any.
1824 * Otherwise block for the timeout and handle all events queued
1827 gettimeofday(&start, NULL);
1830 if (milliseconds <= 0) {
1833 deadline.tv_sec = start.tv_sec + milliseconds/1000;
1834 deadline.tv_usec = start.tv_usec + (milliseconds % 1000)*1000;
1836 if (deadline.tv_usec >= 1000000) {
1837 start.tv_usec -= 1000000;
1843 gettimeofday(&then, NULL);
1845 rc = PtlEQPoll(&plni->plni_eqh, 1, timeout, &event, &which);
1847 gettimeofday(&now, NULL);
1849 if ((now.tv_sec*1000 + now.tv_usec/1000) -
1850 (then.tv_sec*1000 + then.tv_usec/1000) > timeout + 1000) {
1851 /* 1000 mS grace...........................^ */
1852 CERROR("SLOW PtlEQPoll(%d): %dmS elapsed\n", timeout,
1853 (int)(now.tv_sec*1000 + now.tv_usec/1000) -
1854 (int)(then.tv_sec*1000 + then.tv_usec/1000));
1857 if (rc == PTL_EQ_EMPTY) {
1858 if (found) /* handled some events */
1861 if (now.tv_sec >= plni->plni_watchdog_nextt) { /* check timeouts? */
1862 ptllnd_watchdog(ni, now.tv_sec);
1863 LASSERT (now.tv_sec < plni->plni_watchdog_nextt);
1866 if (now.tv_sec > deadline.tv_sec || /* timeout expired */
1867 (now.tv_sec == deadline.tv_sec &&
1868 now.tv_usec >= deadline.tv_usec))
1871 if (milliseconds < 0 ||
1872 plni->plni_watchdog_nextt <= deadline.tv_sec) {
1873 timeout = (plni->plni_watchdog_nextt - now.tv_sec)*1000;
1875 timeout = (deadline.tv_sec - now.tv_sec)*1000 +
1876 (deadline.tv_usec - now.tv_usec)/1000;
1882 LASSERT (rc == PTL_OK || rc == PTL_EQ_DROPPED);
1884 if (rc == PTL_EQ_DROPPED)
1885 CERROR("Event queue: size %d is too small\n",
1886 plni->plni_eq_size);
1891 switch (ptllnd_eventarg2type(event.md.user_ptr)) {
1895 case PTLLND_EVENTARG_TYPE_TX:
1896 ptllnd_tx_event(ni, &event);
1899 case PTLLND_EVENTARG_TYPE_BUF:
1900 ptllnd_buf_event(ni, &event);
1905 while (!list_empty(&plni->plni_zombie_txs)) {
1906 tx = list_entry(plni->plni_zombie_txs.next,
1907 ptllnd_tx_t, tx_list);
1908 list_del_init(&tx->tx_list);
1912 if (prevt.tv_sec == 0 ||
1913 prevt.tv_sec != now.tv_sec) {
1914 PTLLND_HISTORY("%d wait entered at %d.%06d - prev %d %d.%06d",
1915 call_count, (int)start.tv_sec, (int)start.tv_usec,
1916 prevt_count, (int)prevt.tv_sec, (int)prevt.tv_usec);