1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved.
5 * Author: Eric Barton <eeb@bartonsoftware.com>
7 * This file is part of the Lustre file system, http://www.lustre.org
8 * Lustre is a trademark of Cluster File Systems, Inc.
10 * This file is confidential source code owned by Cluster File Systems.
11 * No viewing, modification, compilation, redistribution, or any other
12 * form of use is permitted except through a signed license agreement.
14 * If you have not signed such an agreement, then you have no rights to
15 * this file. Please destroy it immediately and contact CFS.
22 ptllnd_set_tx_deadline(ptllnd_tx_t *tx)
24 ptllnd_peer_t *peer = tx->tx_peer;
25 lnet_ni_t *ni = peer->plp_ni;
26 ptllnd_ni_t *plni = ni->ni_data;
28 tx->tx_deadline = cfs_time_current_sec() + plni->plni_timeout;
32 ptllnd_post_tx(ptllnd_tx_t *tx)
34 ptllnd_peer_t *peer = tx->tx_peer;
36 ptllnd_set_tx_deadline(tx);
37 list_add_tail(&tx->tx_list, &peer->plp_txq);
38 ptllnd_check_sends(peer);
42 ptllnd_ptlid2str(ptl_process_id_t id)
44 static char strs[8][32];
47 char *str = strs[idx++];
49 if (idx >= sizeof(strs)/sizeof(strs[0]))
52 snprintf(str, sizeof(strs[0]), FMT_PTLID, id.pid, id.nid);
57 ptllnd_destroy_peer(ptllnd_peer_t *peer)
59 lnet_ni_t *ni = peer->plp_ni;
60 ptllnd_ni_t *plni = ni->ni_data;
61 int nmsg = peer->plp_lazy_credits +
62 plni->plni_peer_credits;
64 ptllnd_size_buffers(ni, -nmsg);
66 LASSERT (peer->plp_closing);
67 LASSERT (plni->plni_npeers > 0);
68 LASSERT (list_empty(&peer->plp_txq));
69 LASSERT (list_empty(&peer->plp_activeq));
71 LIBCFS_FREE(peer, sizeof(*peer));
75 ptllnd_abort_txs(ptllnd_ni_t *plni, struct list_head *q)
77 while (!list_empty(q)) {
78 ptllnd_tx_t *tx = list_entry(q->next, ptllnd_tx_t, tx_list);
80 tx->tx_status = -ESHUTDOWN;
81 list_del(&tx->tx_list);
82 list_add_tail(&tx->tx_list, &plni->plni_zombie_txs);
87 ptllnd_close_peer(ptllnd_peer_t *peer, int error)
89 lnet_ni_t *ni = peer->plp_ni;
90 ptllnd_ni_t *plni = ni->ni_data;
92 if (peer->plp_closing)
95 peer->plp_closing = 1;
97 if (!list_empty(&peer->plp_txq) ||
98 !list_empty(&peer->plp_activeq) ||
100 CWARN("Closing %s\n", libcfs_id2str(peer->plp_id));
101 if (plni->plni_debug)
102 ptllnd_dump_debug(ni, peer->plp_id);
105 ptllnd_abort_txs(plni, &peer->plp_txq);
106 ptllnd_abort_txs(plni, &peer->plp_activeq);
108 list_del(&peer->plp_list);
109 ptllnd_peer_decref(peer);
113 ptllnd_find_peer(lnet_ni_t *ni, lnet_process_id_t id, int create)
115 ptllnd_ni_t *plni = ni->ni_data;
116 unsigned int hash = LNET_NIDADDR(id.nid) % plni->plni_peer_hash_size;
117 struct list_head *tmp;
122 LASSERT (LNET_NIDNET(id.nid) == LNET_NIDNET(ni->ni_nid));
124 list_for_each(tmp, &plni->plni_peer_hash[hash]) {
125 plp = list_entry(tmp, ptllnd_peer_t, plp_list);
127 if (plp->plp_id.nid == id.nid &&
128 plp->plp_id.pid == id.pid) {
129 ptllnd_peer_addref(plp);
137 /* New peer: check first for enough posted buffers */
139 rc = ptllnd_size_buffers(ni, plni->plni_peer_credits);
145 LIBCFS_ALLOC(plp, sizeof(*plp));
147 CERROR("Can't allocate new peer %s\n", libcfs_id2str(id));
149 ptllnd_size_buffers(ni, -plni->plni_peer_credits);
155 plp->plp_ptlid.nid = LNET_NIDADDR(id.nid);
156 plp->plp_ptlid.pid = plni->plni_ptllnd_pid;
157 plp->plp_credits = 1; /* add more later when she gives me credits */
158 plp->plp_max_msg_size = plni->plni_max_msg_size; /* until I hear from her */
159 plp->plp_sent_credits = 1; /* Implicit credit for HELLO */
160 plp->plp_outstanding_credits = plni->plni_peer_credits - 1;
161 plp->plp_lazy_credits = 0;
162 plp->plp_extra_lazy_credits = 0;
165 plp->plp_recvd_hello = 0;
166 plp->plp_closing = 0;
167 plp->plp_refcount = 1;
168 CFS_INIT_LIST_HEAD(&plp->plp_list);
169 CFS_INIT_LIST_HEAD(&plp->plp_txq);
170 CFS_INIT_LIST_HEAD(&plp->plp_activeq);
172 ptllnd_peer_addref(plp);
173 list_add_tail(&plp->plp_list, &plni->plni_peer_hash[hash]);
175 tx = ptllnd_new_tx(plp, PTLLND_MSG_TYPE_HELLO, 0);
177 CERROR("Can't send HELLO to %s\n", libcfs_id2str(id));
178 ptllnd_close_peer(plp, -ENOMEM);
179 ptllnd_peer_decref(plp);
183 tx->tx_msg.ptlm_u.hello.kptlhm_matchbits = PTL_RESERVED_MATCHBITS;
184 tx->tx_msg.ptlm_u.hello.kptlhm_max_msg_size = plni->plni_max_msg_size;
186 PTLLND_HISTORY("%s[%d/%d+%d(%d)]: post hello %p", libcfs_id2str(id),
187 tx->tx_peer->plp_credits,
188 tx->tx_peer->plp_outstanding_credits,
189 tx->tx_peer->plp_sent_credits,
190 plni->plni_peer_credits +
191 tx->tx_peer->plp_lazy_credits, tx);
198 ptllnd_count_q(struct list_head *q)
203 list_for_each(e, q) {
211 ptllnd_tx_typestr(int type)
214 case PTLLND_RDMA_WRITE:
217 case PTLLND_RDMA_READ:
220 case PTLLND_MSG_TYPE_PUT:
223 case PTLLND_MSG_TYPE_GET:
226 case PTLLND_MSG_TYPE_IMMEDIATE:
229 case PTLLND_MSG_TYPE_NOOP:
232 case PTLLND_MSG_TYPE_HELLO:
241 ptllnd_debug_tx(ptllnd_tx_t *tx)
243 CDEBUG(D_WARNING, "%s %s b %ld.%06ld/%ld.%06ld"
244 " r %ld.%06ld/%ld.%06ld status %d\n",
245 ptllnd_tx_typestr(tx->tx_type),
246 libcfs_id2str(tx->tx_peer->plp_id),
247 tx->tx_bulk_posted.tv_sec, tx->tx_bulk_posted.tv_usec,
248 tx->tx_bulk_done.tv_sec, tx->tx_bulk_done.tv_usec,
249 tx->tx_req_posted.tv_sec, tx->tx_req_posted.tv_usec,
250 tx->tx_req_done.tv_sec, tx->tx_req_done.tv_usec,
255 ptllnd_debug_peer(lnet_ni_t *ni, lnet_process_id_t id)
257 ptllnd_peer_t *plp = ptllnd_find_peer(ni, id, 0);
258 struct list_head *tmp;
259 ptllnd_ni_t *plni = ni->ni_data;
263 CDEBUG(D_WARNING, "No peer %s\n", libcfs_id2str(id));
267 CDEBUG(D_WARNING, "%s %s%s [%d] "LPU64".%06d m "LPU64" q %d/%d c %d/%d+%d(%d)\n",
269 plp->plp_recvd_hello ? "H" : "_",
270 plp->plp_closing ? "C" : "_",
272 plp->plp_stamp / 1000000, (int)(plp->plp_stamp % 1000000),
274 ptllnd_count_q(&plp->plp_txq),
275 ptllnd_count_q(&plp->plp_activeq),
276 plp->plp_credits, plp->plp_outstanding_credits, plp->plp_sent_credits,
277 plni->plni_peer_credits + plp->plp_lazy_credits);
279 CDEBUG(D_WARNING, "txq:\n");
280 list_for_each (tmp, &plp->plp_txq) {
281 tx = list_entry(tmp, ptllnd_tx_t, tx_list);
286 CDEBUG(D_WARNING, "activeq:\n");
287 list_for_each (tmp, &plp->plp_activeq) {
288 tx = list_entry(tmp, ptllnd_tx_t, tx_list);
293 CDEBUG(D_WARNING, "zombies:\n");
294 list_for_each (tmp, &plni->plni_zombie_txs) {
295 tx = list_entry(tmp, ptllnd_tx_t, tx_list);
297 if (tx->tx_peer->plp_id.nid == id.nid &&
298 tx->tx_peer->plp_id.pid == id.pid)
302 CDEBUG(D_WARNING, "history:\n");
303 list_for_each (tmp, &plni->plni_tx_history) {
304 tx = list_entry(tmp, ptllnd_tx_t, tx_list);
306 if (tx->tx_peer->plp_id.nid == id.nid &&
307 tx->tx_peer->plp_id.pid == id.pid)
311 ptllnd_peer_decref(plp);
315 ptllnd_dump_debug(lnet_ni_t *ni, lnet_process_id_t id)
317 ptllnd_debug_peer(ni, id);
318 ptllnd_dump_history();
322 ptllnd_notify(lnet_ni_t *ni, lnet_nid_t nid, int alive)
324 lnet_process_id_t id;
326 time_t start = cfs_time_current_sec();
327 ptllnd_ni_t *plni = ni->ni_data;
328 int w = plni->plni_long_wait;
330 /* This is only actually used to connect to routers at startup! */
334 id.pid = LUSTRE_SRV_LNET_PID;
336 peer = ptllnd_find_peer(ni, id, 1);
340 /* wait for the peer to reply */
341 while (!peer->plp_recvd_hello) {
342 if (w > 0 && cfs_time_current_sec() > start + w/1000) {
343 CWARN("Waited %ds to connect to %s\n",
344 (int)(cfs_time_current_sec() - start),
352 ptllnd_peer_decref(peer);
356 ptllnd_setasync(lnet_ni_t *ni, lnet_process_id_t id, int nasync)
358 ptllnd_peer_t *peer = ptllnd_find_peer(ni, id, nasync > 0);
364 LASSERT (peer->plp_lazy_credits >= 0);
365 LASSERT (peer->plp_extra_lazy_credits >= 0);
367 /* If nasync < 0, we're being told we can reduce the total message
368 * headroom. We can't do this right now because our peer might already
369 * have credits for the extra buffers, so we just account the extra
370 * headroom in case we need it later and only destroy buffers when the
373 * Note that the following condition handles this case, where it
374 * actually increases the extra lazy credit counter. */
376 if (nasync <= peer->plp_extra_lazy_credits) {
377 peer->plp_extra_lazy_credits -= nasync;
381 LASSERT (nasync > 0);
383 nasync -= peer->plp_extra_lazy_credits;
384 peer->plp_extra_lazy_credits = 0;
386 rc = ptllnd_size_buffers(ni, nasync);
388 peer->plp_lazy_credits += nasync;
389 peer->plp_outstanding_credits += nasync;
396 ptllnd_cksum (void *ptr, int nob)
402 sum = ((sum << 1) | (sum >> 31)) + *c++;
404 /* ensure I don't return 0 (== no checksum) */
405 return (sum == 0) ? 1 : sum;
409 ptllnd_new_tx(ptllnd_peer_t *peer, int type, int payload_nob)
411 lnet_ni_t *ni = peer->plp_ni;
412 ptllnd_ni_t *plni = ni->ni_data;
416 CDEBUG(D_NET, "peer=%p type=%d payload=%d\n", peer, type, payload_nob);
422 case PTLLND_RDMA_WRITE:
423 case PTLLND_RDMA_READ:
424 LASSERT (payload_nob == 0);
428 case PTLLND_MSG_TYPE_PUT:
429 case PTLLND_MSG_TYPE_GET:
430 LASSERT (payload_nob == 0);
431 msgsize = offsetof(kptl_msg_t, ptlm_u) +
432 sizeof(kptl_rdma_msg_t);
435 case PTLLND_MSG_TYPE_IMMEDIATE:
436 msgsize = offsetof(kptl_msg_t,
437 ptlm_u.immediate.kptlim_payload[payload_nob]);
440 case PTLLND_MSG_TYPE_NOOP:
441 LASSERT (payload_nob == 0);
442 msgsize = offsetof(kptl_msg_t, ptlm_u);
445 case PTLLND_MSG_TYPE_HELLO:
446 LASSERT (payload_nob == 0);
447 msgsize = offsetof(kptl_msg_t, ptlm_u) +
448 sizeof(kptl_hello_msg_t);
452 msgsize = (msgsize + 7) & ~7;
453 LASSERT (msgsize <= peer->plp_max_msg_size);
455 LIBCFS_ALLOC(tx, offsetof(ptllnd_tx_t, tx_msg) + msgsize);
458 CERROR("Can't allocate msg type %d for %s\n",
459 type, libcfs_id2str(peer->plp_id));
463 CFS_INIT_LIST_HEAD(&tx->tx_list);
466 tx->tx_lnetmsg = tx->tx_lnetreplymsg = NULL;
469 tx->tx_reqmdh = PTL_INVALID_HANDLE;
470 tx->tx_bulkmdh = PTL_INVALID_HANDLE;
471 tx->tx_msgsize = msgsize;
472 tx->tx_completing = 0;
475 memset(&tx->tx_bulk_posted, 0, sizeof(tx->tx_bulk_posted));
476 memset(&tx->tx_bulk_done, 0, sizeof(tx->tx_bulk_done));
477 memset(&tx->tx_req_posted, 0, sizeof(tx->tx_req_posted));
478 memset(&tx->tx_req_done, 0, sizeof(tx->tx_req_done));
481 tx->tx_msg.ptlm_magic = PTLLND_MSG_MAGIC;
482 tx->tx_msg.ptlm_version = PTLLND_MSG_VERSION;
483 tx->tx_msg.ptlm_type = type;
484 tx->tx_msg.ptlm_credits = 0;
485 tx->tx_msg.ptlm_nob = msgsize;
486 tx->tx_msg.ptlm_cksum = 0;
487 tx->tx_msg.ptlm_srcnid = ni->ni_nid;
488 tx->tx_msg.ptlm_srcstamp = plni->plni_stamp;
489 tx->tx_msg.ptlm_dstnid = peer->plp_id.nid;
490 tx->tx_msg.ptlm_dststamp = peer->plp_stamp;
491 tx->tx_msg.ptlm_srcpid = the_lnet.ln_pid;
492 tx->tx_msg.ptlm_dstpid = peer->plp_id.pid;
495 ptllnd_peer_addref(peer);
498 CDEBUG(D_NET, "tx=%p\n",tx);
504 ptllnd_abort_tx(ptllnd_tx_t *tx, ptl_handle_md_t *mdh)
506 ptllnd_peer_t *peer = tx->tx_peer;
507 lnet_ni_t *ni = peer->plp_ni;
509 time_t start = cfs_time_current_sec();
510 ptllnd_ni_t *plni = ni->ni_data;
511 int w = plni->plni_long_wait;
513 while (!PtlHandleIsEqual(*mdh, PTL_INVALID_HANDLE)) {
514 rc = PtlMDUnlink(*mdh);
515 #ifndef LUSTRE_PORTALS_UNLINK_SEMANTICS
516 if (rc == PTL_OK) /* unlink successful => no unlinked event */
518 LASSERT (rc == PTL_MD_IN_USE);
520 if (w > 0 && cfs_time_current_sec() > start + w/1000) {
521 CWARN("Waited %ds to abort tx to %s\n",
522 (int)(cfs_time_current_sec() - start),
523 libcfs_id2str(peer->plp_id));
526 /* Wait for ptllnd_tx_event() to invalidate */
532 ptllnd_cull_tx_history(ptllnd_ni_t *plni)
534 int max = plni->plni_max_tx_history;
536 while (plni->plni_ntx_history > max) {
537 ptllnd_tx_t *tx = list_entry(plni->plni_tx_history.next,
538 ptllnd_tx_t, tx_list);
539 list_del(&tx->tx_list);
541 ptllnd_peer_decref(tx->tx_peer);
543 LIBCFS_FREE(tx, offsetof(ptllnd_tx_t, tx_msg) + tx->tx_msgsize);
545 LASSERT (plni->plni_ntxs > 0);
547 plni->plni_ntx_history--;
552 ptllnd_tx_done(ptllnd_tx_t *tx)
554 ptllnd_peer_t *peer = tx->tx_peer;
555 lnet_ni_t *ni = peer->plp_ni;
556 ptllnd_ni_t *plni = ni->ni_data;
558 /* CAVEAT EMPTOR: If this tx is being aborted, I'll continue to get
559 * events for this tx until it's unlinked. So I set tx_completing to
560 * flag the tx is getting handled */
562 if (tx->tx_completing)
565 tx->tx_completing = 1;
567 if (!list_empty(&tx->tx_list))
568 list_del_init(&tx->tx_list);
570 if (tx->tx_status != 0) {
571 if (plni->plni_debug) {
572 CERROR("Completing tx for %s with error %d\n",
573 libcfs_id2str(peer->plp_id), tx->tx_status);
576 ptllnd_close_peer(peer, tx->tx_status);
579 ptllnd_abort_tx(tx, &tx->tx_reqmdh);
580 ptllnd_abort_tx(tx, &tx->tx_bulkmdh);
582 if (tx->tx_niov > 0) {
583 LIBCFS_FREE(tx->tx_iov, tx->tx_niov * sizeof(*tx->tx_iov));
587 if (tx->tx_lnetreplymsg != NULL) {
588 LASSERT (tx->tx_type == PTLLND_MSG_TYPE_GET);
589 LASSERT (tx->tx_lnetmsg != NULL);
590 /* Simulate GET success always */
591 lnet_finalize(ni, tx->tx_lnetmsg, 0);
592 CDEBUG(D_NET, "lnet_finalize(tx_lnetreplymsg=%p)\n",tx->tx_lnetreplymsg);
593 lnet_finalize(ni, tx->tx_lnetreplymsg, tx->tx_status);
594 } else if (tx->tx_lnetmsg != NULL) {
595 lnet_finalize(ni, tx->tx_lnetmsg, tx->tx_status);
598 plni->plni_ntx_history++;
599 list_add_tail(&tx->tx_list, &plni->plni_tx_history);
601 ptllnd_cull_tx_history(plni);
605 ptllnd_set_txiov(ptllnd_tx_t *tx,
606 unsigned int niov, struct iovec *iov,
607 unsigned int offset, unsigned int len)
609 ptl_md_iovec_t *piov;
618 * Remove iovec's at the beginning that
619 * are skipped because of the offset.
620 * Adjust the offset accordingly
624 if (offset < iov->iov_len)
626 offset -= iov->iov_len;
632 int temp_offset = offset;
634 LIBCFS_ALLOC(piov, niov * sizeof(*piov));
638 for (npiov = 0;; npiov++) {
639 LASSERT (npiov < niov);
640 LASSERT (iov->iov_len >= temp_offset);
642 piov[npiov].iov_base = iov[npiov].iov_base + temp_offset;
643 piov[npiov].iov_len = iov[npiov].iov_len - temp_offset;
645 if (piov[npiov].iov_len >= resid) {
646 piov[npiov].iov_len = resid;
650 resid -= piov[npiov].iov_len;
660 /* Dang! The piov I allocated was too big and it's a drag to
661 * have to maintain separate 'allocated' and 'used' sizes, so
662 * I'll just do it again; NB this doesn't happen normally... */
663 LIBCFS_FREE(piov, niov * sizeof(*piov));
669 ptllnd_set_md_buffer(ptl_md_t *md, ptllnd_tx_t *tx)
671 unsigned int niov = tx->tx_niov;
672 ptl_md_iovec_t *iov = tx->tx_iov;
674 LASSERT ((md->options & PTL_MD_IOVEC) == 0);
679 } else if (niov == 1) {
680 md->start = iov[0].iov_base;
681 md->length = iov[0].iov_len;
685 md->options |= PTL_MD_IOVEC;
690 ptllnd_post_buffer(ptllnd_buffer_t *buf)
692 lnet_ni_t *ni = buf->plb_ni;
693 ptllnd_ni_t *plni = ni->ni_data;
694 ptl_process_id_t anyid = {
698 .start = buf->plb_buffer,
699 .length = plni->plni_buffer_size,
700 .threshold = PTL_MD_THRESH_INF,
701 .max_size = plni->plni_max_msg_size,
702 .options = (PTLLND_MD_OPTIONS |
703 PTL_MD_OP_PUT | PTL_MD_MAX_SIZE |
704 PTL_MD_LOCAL_ALIGN8),
705 .user_ptr = ptllnd_obj2eventarg(buf, PTLLND_EVENTARG_TYPE_BUF),
706 .eq_handle = plni->plni_eqh};
710 LASSERT (!buf->plb_posted);
712 rc = PtlMEAttach(plni->plni_nih, plni->plni_portal,
713 anyid, LNET_MSG_MATCHBITS, 0,
714 PTL_UNLINK, PTL_INS_AFTER, &meh);
716 CERROR("PtlMEAttach failed: %s(%d)\n",
717 ptllnd_errtype2str(rc), rc);
722 plni->plni_nposted_buffers++;
724 rc = PtlMDAttach(meh, md, LNET_UNLINK, &buf->plb_md);
728 CERROR("PtlMDAttach failed: %s(%d)\n",
729 ptllnd_errtype2str(rc), rc);
732 plni->plni_nposted_buffers--;
734 rc = PtlMEUnlink(meh);
735 LASSERT (rc == PTL_OK);
741 ptllnd_check_sends(ptllnd_peer_t *peer)
743 lnet_ni_t *ni = peer->plp_ni;
744 ptllnd_ni_t *plni = ni->ni_data;
750 CDEBUG(D_NET, "%s: [%d/%d+%d(%d)\n",
751 libcfs_id2str(peer->plp_id), peer->plp_credits,
752 peer->plp_outstanding_credits, peer->plp_sent_credits,
753 plni->plni_peer_credits + peer->plp_lazy_credits);
755 if (list_empty(&peer->plp_txq) &&
756 peer->plp_outstanding_credits >= PTLLND_CREDIT_HIGHWATER(plni) &&
757 peer->plp_credits != 0) {
759 tx = ptllnd_new_tx(peer, PTLLND_MSG_TYPE_NOOP, 0);
760 CDEBUG(D_NET, "NOOP tx=%p\n",tx);
762 CERROR("Can't return credits to %s\n",
763 libcfs_id2str(peer->plp_id));
765 ptllnd_set_tx_deadline(tx);
766 list_add_tail(&tx->tx_list, &peer->plp_txq);
770 while (!list_empty(&peer->plp_txq)) {
771 tx = list_entry(peer->plp_txq.next, ptllnd_tx_t, tx_list);
773 LASSERT (tx->tx_msgsize > 0);
775 LASSERT (peer->plp_outstanding_credits >= 0);
776 LASSERT (peer->plp_sent_credits >= 0);
777 LASSERT (peer->plp_outstanding_credits + peer->plp_sent_credits
778 <= plni->plni_peer_credits + peer->plp_lazy_credits);
779 LASSERT (peer->plp_credits >= 0);
781 if (peer->plp_credits == 0) { /* no credits */
782 PTLLND_HISTORY("%s[%d/%d+%d(%d)]: no creds for %p",
783 libcfs_id2str(peer->plp_id),
785 peer->plp_outstanding_credits,
786 peer->plp_sent_credits,
787 plni->plni_peer_credits +
788 peer->plp_lazy_credits, tx);
792 if (peer->plp_credits == 1 && /* last credit reserved for */
793 peer->plp_outstanding_credits == 0) { /* returning credits */
794 PTLLND_HISTORY("%s[%d/%d+%d(%d)]: too few creds for %p",
795 libcfs_id2str(peer->plp_id),
797 peer->plp_outstanding_credits,
798 peer->plp_sent_credits,
799 plni->plni_peer_credits +
800 peer->plp_lazy_credits, tx);
804 list_del(&tx->tx_list);
805 list_add_tail(&tx->tx_list, &peer->plp_activeq);
807 CDEBUG(D_NET, "Sending at TX=%p type=%s (%d)\n",tx,
808 ptllnd_msgtype2str(tx->tx_type),tx->tx_type);
810 if (tx->tx_type == PTLLND_MSG_TYPE_NOOP &&
811 (!list_empty(&peer->plp_txq) ||
812 peer->plp_outstanding_credits <
813 PTLLND_CREDIT_HIGHWATER(plni))) {
819 /* Set stamp at the last minute; on a new peer, I don't know it
820 * until I receive the HELLO back */
821 tx->tx_msg.ptlm_dststamp = peer->plp_stamp;
824 * Return all the credits we have
826 tx->tx_msg.ptlm_credits = peer->plp_outstanding_credits;
827 peer->plp_sent_credits += peer->plp_outstanding_credits;
828 peer->plp_outstanding_credits = 0;
835 if (plni->plni_checksum)
836 tx->tx_msg.ptlm_cksum =
837 ptllnd_cksum(&tx->tx_msg,
838 offsetof(kptl_msg_t, ptlm_u));
840 md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);
841 md.eq_handle = plni->plni_eqh;
843 md.options = PTLLND_MD_OPTIONS;
844 md.start = &tx->tx_msg;
845 md.length = tx->tx_msgsize;
847 rc = PtlMDBind(plni->plni_nih, md, LNET_UNLINK, &mdh);
849 CERROR("PtlMDBind for %s failed: %s(%d)\n",
850 libcfs_id2str(peer->plp_id),
851 ptllnd_errtype2str(rc), rc);
852 tx->tx_status = -EIO;
857 LASSERT (tx->tx_type != PTLLND_RDMA_WRITE &&
858 tx->tx_type != PTLLND_RDMA_READ);
861 gettimeofday(&tx->tx_req_posted, NULL);
863 PTLLND_HISTORY("%s[%d/%d+%d(%d)]: %s %p c %d",
864 libcfs_id2str(peer->plp_id),
866 peer->plp_outstanding_credits,
867 peer->plp_sent_credits,
868 plni->plni_peer_credits +
869 peer->plp_lazy_credits,
870 ptllnd_msgtype2str(tx->tx_type), tx,
871 tx->tx_msg.ptlm_credits);
873 rc = PtlPut(mdh, PTL_NOACK_REQ, peer->plp_ptlid,
874 plni->plni_portal, 0, LNET_MSG_MATCHBITS, 0, 0);
876 CERROR("PtlPut for %s failed: %s(%d)\n",
877 libcfs_id2str(peer->plp_id),
878 ptllnd_errtype2str(rc), rc);
879 tx->tx_status = -EIO;
887 ptllnd_passive_rdma(ptllnd_peer_t *peer, int type, lnet_msg_t *msg,
888 unsigned int niov, struct iovec *iov,
889 unsigned int offset, unsigned int len)
891 lnet_ni_t *ni = peer->plp_ni;
892 ptllnd_ni_t *plni = ni->ni_data;
893 ptllnd_tx_t *tx = ptllnd_new_tx(peer, type, 0);
903 CDEBUG(D_NET, "niov=%d offset=%d len=%d\n",niov,offset,len);
905 LASSERT (type == PTLLND_MSG_TYPE_GET ||
906 type == PTLLND_MSG_TYPE_PUT);
909 CERROR("Can't allocate %s tx for %s\n",
910 type == PTLLND_MSG_TYPE_GET ? "GET" : "PUT/REPLY",
911 libcfs_id2str(peer->plp_id));
915 rc = ptllnd_set_txiov(tx, niov, iov, offset, len);
917 CERROR ("Can't allocate iov %d for %s\n",
918 niov, libcfs_id2str(peer->plp_id));
923 md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);
924 md.eq_handle = plni->plni_eqh;
927 md.options = PTLLND_MD_OPTIONS;
928 if(type == PTLLND_MSG_TYPE_GET)
929 md.options |= PTL_MD_OP_PUT | PTL_MD_ACK_DISABLE;
931 md.options |= PTL_MD_OP_GET;
932 ptllnd_set_md_buffer(&md, tx);
934 start = cfs_time_current_sec();
935 w = plni->plni_long_wait;
937 while (!peer->plp_recvd_hello) { /* wait to validate plp_match */
938 if (peer->plp_closing) {
942 if (w > 0 && cfs_time_current_sec() > start + w/1000) {
943 CWARN("Waited %ds to connect to %s\n",
944 (int)(cfs_time_current_sec() - start),
945 libcfs_id2str(peer->plp_id));
951 if (peer->plp_match < PTL_RESERVED_MATCHBITS)
952 peer->plp_match = PTL_RESERVED_MATCHBITS;
953 matchbits = peer->plp_match++;
955 rc = PtlMEAttach(plni->plni_nih, plni->plni_portal, peer->plp_ptlid,
956 matchbits, 0, PTL_UNLINK, PTL_INS_BEFORE, &meh);
958 CERROR("PtlMEAttach for %s failed: %s(%d)\n",
959 libcfs_id2str(peer->plp_id),
960 ptllnd_errtype2str(rc), rc);
965 gettimeofday(&tx->tx_bulk_posted, NULL);
967 rc = PtlMDAttach(meh, md, LNET_UNLINK, &mdh);
969 CERROR("PtlMDAttach for %s failed: %s(%d)\n",
970 libcfs_id2str(peer->plp_id),
971 ptllnd_errtype2str(rc), rc);
972 rc2 = PtlMEUnlink(meh);
973 LASSERT (rc2 == PTL_OK);
977 tx->tx_bulkmdh = mdh;
980 * We need to set the stamp here because it
981 * we could have received a HELLO above that set
984 tx->tx_msg.ptlm_dststamp = peer->plp_stamp;
986 tx->tx_msg.ptlm_u.rdma.kptlrm_hdr = msg->msg_hdr;
987 tx->tx_msg.ptlm_u.rdma.kptlrm_matchbits = matchbits;
989 if (type == PTLLND_MSG_TYPE_GET) {
990 tx->tx_lnetreplymsg = lnet_create_reply_msg(ni, msg);
991 if (tx->tx_lnetreplymsg == NULL) {
992 CERROR("Can't create reply for GET to %s\n",
993 libcfs_id2str(msg->msg_target));
999 tx->tx_lnetmsg = msg;
1000 PTLLND_HISTORY("%s[%d/%d+%d(%d)]: post passive %s p %d %p",
1001 libcfs_id2str(msg->msg_target),
1002 peer->plp_credits, peer->plp_outstanding_credits,
1003 peer->plp_sent_credits,
1004 plni->plni_peer_credits + peer->plp_lazy_credits,
1005 lnet_msgtyp2str(msg->msg_type),
1006 (le32_to_cpu(msg->msg_type) == LNET_MSG_PUT) ?
1007 le32_to_cpu(msg->msg_hdr.msg.put.ptl_index) :
1008 (le32_to_cpu(msg->msg_type) == LNET_MSG_GET) ?
1009 le32_to_cpu(msg->msg_hdr.msg.get.ptl_index) : -1,
1020 ptllnd_active_rdma(ptllnd_peer_t *peer, int type,
1021 lnet_msg_t *msg, __u64 matchbits,
1022 unsigned int niov, struct iovec *iov,
1023 unsigned int offset, unsigned int len)
1025 lnet_ni_t *ni = peer->plp_ni;
1026 ptllnd_ni_t *plni = ni->ni_data;
1027 ptllnd_tx_t *tx = ptllnd_new_tx(peer, type, 0);
1029 ptl_handle_md_t mdh;
1032 LASSERT (type == PTLLND_RDMA_READ ||
1033 type == PTLLND_RDMA_WRITE);
1036 CERROR("Can't allocate tx for RDMA %s with %s\n",
1037 (type == PTLLND_RDMA_WRITE) ? "write" : "read",
1038 libcfs_id2str(peer->plp_id));
1039 ptllnd_close_peer(peer, -ENOMEM);
1043 rc = ptllnd_set_txiov(tx, niov, iov, offset, len);
1045 CERROR ("Can't allocate iov %d for %s\n",
1046 niov, libcfs_id2str(peer->plp_id));
1051 md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);
1052 md.eq_handle = plni->plni_eqh;
1054 md.options = PTLLND_MD_OPTIONS;
1055 md.threshold = (type == PTLLND_RDMA_READ) ? 2 : 1;
1057 ptllnd_set_md_buffer(&md, tx);
1059 rc = PtlMDBind(plni->plni_nih, md, LNET_UNLINK, &mdh);
1061 CERROR("PtlMDBind for %s failed: %s(%d)\n",
1062 libcfs_id2str(peer->plp_id),
1063 ptllnd_errtype2str(rc), rc);
1068 tx->tx_bulkmdh = mdh;
1069 tx->tx_lnetmsg = msg;
1071 ptllnd_set_tx_deadline(tx);
1072 list_add_tail(&tx->tx_list, &peer->plp_activeq);
1073 gettimeofday(&tx->tx_bulk_posted, NULL);
1075 if (type == PTLLND_RDMA_READ)
1076 rc = PtlGet(mdh, peer->plp_ptlid,
1077 plni->plni_portal, 0, matchbits, 0);
1079 rc = PtlPut(mdh, PTL_NOACK_REQ, peer->plp_ptlid,
1080 plni->plni_portal, 0, matchbits, 0,
1081 (msg == NULL) ? PTLLND_RDMA_FAIL : PTLLND_RDMA_OK);
1086 CERROR("Can't initiate RDMA with %s: %s(%d)\n",
1087 libcfs_id2str(peer->plp_id),
1088 ptllnd_errtype2str(rc), rc);
1090 tx->tx_lnetmsg = NULL;
1093 ptllnd_tx_done(tx); /* this will close peer */
1098 ptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *msg)
1100 ptllnd_ni_t *plni = ni->ni_data;
1106 LASSERT (!msg->msg_routing);
1107 LASSERT (msg->msg_kiov == NULL);
1109 LASSERT (msg->msg_niov <= PTL_MD_MAX_IOV); /* !!! */
1111 CDEBUG(D_NET, "%s [%d]+%d,%d -> %s%s\n",
1112 lnet_msgtyp2str(msg->msg_type),
1113 msg->msg_niov, msg->msg_offset, msg->msg_len,
1114 libcfs_nid2str(msg->msg_target.nid),
1115 msg->msg_target_is_router ? "(rtr)" : "");
1117 if ((msg->msg_target.pid & LNET_PID_USERFLAG) != 0) {
1118 CERROR("Can't send to non-kernel peer %s\n",
1119 libcfs_id2str(msg->msg_target));
1120 return -EHOSTUNREACH;
1123 plp = ptllnd_find_peer(ni, msg->msg_target, 1);
1127 switch (msg->msg_type) {
1132 LASSERT (msg->msg_len == 0);
1133 break; /* send IMMEDIATE */
1136 if (msg->msg_target_is_router)
1137 break; /* send IMMEDIATE */
1139 nob = msg->msg_md->md_length;
1140 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[nob]);
1141 if (nob <= plni->plni_max_msg_size)
1144 LASSERT ((msg->msg_md->md_options & LNET_MD_KIOV) == 0);
1145 rc = ptllnd_passive_rdma(plp, PTLLND_MSG_TYPE_GET, msg,
1146 msg->msg_md->md_niov,
1147 msg->msg_md->md_iov.iov,
1148 0, msg->msg_md->md_length);
1149 ptllnd_peer_decref(plp);
1152 case LNET_MSG_REPLY:
1155 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[nob]);
1156 if (nob <= plp->plp_max_msg_size)
1157 break; /* send IMMEDIATE */
1159 rc = ptllnd_passive_rdma(plp, PTLLND_MSG_TYPE_PUT, msg,
1160 msg->msg_niov, msg->msg_iov,
1161 msg->msg_offset, msg->msg_len);
1162 ptllnd_peer_decref(plp);
1167 * NB copy the payload so we don't have to do a fragmented send */
1169 tx = ptllnd_new_tx(plp, PTLLND_MSG_TYPE_IMMEDIATE, msg->msg_len);
1171 CERROR("Can't allocate tx for lnet type %d to %s\n",
1172 msg->msg_type, libcfs_id2str(msg->msg_target));
1173 ptllnd_peer_decref(plp);
1177 lnet_copy_iov2flat(tx->tx_msgsize, &tx->tx_msg,
1178 offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload),
1179 msg->msg_niov, msg->msg_iov, msg->msg_offset,
1181 tx->tx_msg.ptlm_u.immediate.kptlim_hdr = msg->msg_hdr;
1183 tx->tx_lnetmsg = msg;
1184 PTLLND_HISTORY("%s[%d/%d+%d(%d)]: post immediate %s p %d %p",
1185 libcfs_id2str(msg->msg_target),
1186 plp->plp_credits, plp->plp_outstanding_credits,
1187 plp->plp_sent_credits,
1188 plni->plni_peer_credits + plp->plp_lazy_credits,
1189 lnet_msgtyp2str(msg->msg_type),
1190 (le32_to_cpu(msg->msg_type) == LNET_MSG_PUT) ?
1191 le32_to_cpu(msg->msg_hdr.msg.put.ptl_index) :
1192 (le32_to_cpu(msg->msg_type) == LNET_MSG_GET) ?
1193 le32_to_cpu(msg->msg_hdr.msg.get.ptl_index) : -1,
1196 ptllnd_peer_decref(plp);
1201 ptllnd_rx_done(ptllnd_rx_t *rx)
1203 ptllnd_peer_t *plp = rx->rx_peer;
1204 lnet_ni_t *ni = plp->plp_ni;
1205 ptllnd_ni_t *plni = ni->ni_data;
1207 plp->plp_outstanding_credits++;
1209 PTLLND_HISTORY("%s[%d/%d+%d(%d)]: rx=%p done\n",
1210 libcfs_id2str(plp->plp_id),
1211 plp->plp_credits, plp->plp_outstanding_credits,
1212 plp->plp_sent_credits,
1213 plni->plni_peer_credits + plp->plp_lazy_credits, rx);
1215 ptllnd_check_sends(rx->rx_peer);
1217 LASSERT (plni->plni_nrxs > 0);
1222 ptllnd_eager_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg,
1223 void **new_privatep)
1225 /* Shouldn't get here; recvs only block for router buffers */
1231 ptllnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg,
1232 int delayed, unsigned int niov,
1233 struct iovec *iov, lnet_kiov_t *kiov,
1234 unsigned int offset, unsigned int mlen, unsigned int rlen)
1236 ptllnd_rx_t *rx = private;
1240 LASSERT (kiov == NULL);
1241 LASSERT (niov <= PTL_MD_MAX_IOV); /* !!! */
1243 switch (rx->rx_msg->ptlm_type) {
1247 case PTLLND_MSG_TYPE_IMMEDIATE:
1248 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[mlen]);
1249 if (nob > rx->rx_nob) {
1250 CERROR("Immediate message from %s too big: %d(%d)\n",
1251 libcfs_id2str(rx->rx_peer->plp_id),
1256 lnet_copy_flat2iov(niov, iov, offset,
1257 rx->rx_nob, rx->rx_msg,
1258 offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload),
1260 lnet_finalize(ni, msg, 0);
1263 case PTLLND_MSG_TYPE_PUT:
1264 rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_READ, msg,
1265 rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,
1266 niov, iov, offset, mlen);
1269 case PTLLND_MSG_TYPE_GET:
1271 rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_WRITE, msg,
1272 rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,
1273 msg->msg_niov, msg->msg_iov,
1274 msg->msg_offset, msg->msg_len);
1276 rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_WRITE, NULL,
1277 rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,
1287 ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator,
1288 kptl_msg_t *msg, unsigned int nob)
1290 ptllnd_ni_t *plni = ni->ni_data;
1291 const int basenob = offsetof(kptl_msg_t, ptlm_u);
1292 lnet_process_id_t srcid;
1301 CERROR("Very short receive from %s\n",
1302 ptllnd_ptlid2str(initiator));
1306 /* I can at least read MAGIC/VERSION */
1308 flip = msg->ptlm_magic == __swab32(PTLLND_MSG_MAGIC);
1309 if (!flip && msg->ptlm_magic != PTLLND_MSG_MAGIC) {
1310 CERROR("Bad protocol magic %08x from %s\n",
1311 msg->ptlm_magic, ptllnd_ptlid2str(initiator));
1315 msg_version = flip ? __swab16(msg->ptlm_version) : msg->ptlm_version;
1317 if (msg_version != PTLLND_MSG_VERSION) {
1318 CERROR("Bad protocol version %04x from %s: %04x expected\n",
1319 (__u32)msg_version, ptllnd_ptlid2str(initiator), PTLLND_MSG_VERSION);
1321 if (plni->plni_abort_on_protocol_mismatch)
1327 if (nob < basenob) {
1328 CERROR("Short receive from %s: got %d, wanted at least %d\n",
1329 ptllnd_ptlid2str(initiator), nob, basenob);
1333 /* checksum must be computed with
1334 * 1) ptlm_cksum zero and
1335 * 2) BEFORE anything gets modified/flipped
1337 msg_cksum = flip ? __swab32(msg->ptlm_cksum) : msg->ptlm_cksum;
1338 msg->ptlm_cksum = 0;
1339 if (msg_cksum != 0 &&
1340 msg_cksum != ptllnd_cksum(msg, offsetof(kptl_msg_t, ptlm_u))) {
1341 CERROR("Bad checksum from %s\n", ptllnd_ptlid2str(initiator));
1345 msg->ptlm_version = msg_version;
1346 msg->ptlm_cksum = msg_cksum;
1349 /* NB stamps are opaque cookies */
1350 __swab32s(&msg->ptlm_nob);
1351 __swab64s(&msg->ptlm_srcnid);
1352 __swab64s(&msg->ptlm_dstnid);
1353 __swab32s(&msg->ptlm_srcpid);
1354 __swab32s(&msg->ptlm_dstpid);
1357 srcid.nid = msg->ptlm_srcnid;
1358 srcid.pid = msg->ptlm_srcpid;
1360 if (LNET_NIDNET(msg->ptlm_srcnid) != LNET_NIDNET(ni->ni_nid)) {
1361 CERROR("Bad source id %s from %s\n",
1362 libcfs_id2str(srcid),
1363 ptllnd_ptlid2str(initiator));
1367 if (msg->ptlm_type == PTLLND_MSG_TYPE_NAK) {
1368 CERROR("NAK from %s (%s)\n",
1369 libcfs_id2str(srcid),
1370 ptllnd_ptlid2str(initiator));
1372 if (plni->plni_dump_on_nak)
1373 ptllnd_dump_debug(ni, srcid);
1375 if (plni->plni_abort_on_nak)
1381 if (msg->ptlm_dstnid != ni->ni_nid ||
1382 msg->ptlm_dstpid != the_lnet.ln_pid) {
1383 CERROR("Bad dstid %s (%s expected) from %s\n",
1384 libcfs_id2str((lnet_process_id_t) {
1385 .nid = msg->ptlm_dstnid,
1386 .pid = msg->ptlm_dstpid}),
1387 libcfs_id2str((lnet_process_id_t) {
1389 .pid = the_lnet.ln_pid}),
1390 libcfs_id2str(srcid));
1394 if (msg->ptlm_dststamp != plni->plni_stamp) {
1395 CERROR("Bad dststamp "LPX64"("LPX64" expected) from %s\n",
1396 msg->ptlm_dststamp, plni->plni_stamp,
1397 libcfs_id2str(srcid));
1401 PTLLND_HISTORY("RX %s: %s %d %p", libcfs_id2str(srcid),
1402 ptllnd_msgtype2str(msg->ptlm_type),
1403 msg->ptlm_credits, &rx);
1405 switch (msg->ptlm_type) {
1406 case PTLLND_MSG_TYPE_PUT:
1407 case PTLLND_MSG_TYPE_GET:
1408 if (nob < basenob + sizeof(kptl_rdma_msg_t)) {
1409 CERROR("Short rdma request from %s(%s)\n",
1410 libcfs_id2str(srcid),
1411 ptllnd_ptlid2str(initiator));
1415 __swab64s(&msg->ptlm_u.rdma.kptlrm_matchbits);
1418 case PTLLND_MSG_TYPE_IMMEDIATE:
1419 if (nob < offsetof(kptl_msg_t,
1420 ptlm_u.immediate.kptlim_payload)) {
1421 CERROR("Short immediate from %s(%s)\n",
1422 libcfs_id2str(srcid),
1423 ptllnd_ptlid2str(initiator));
1428 case PTLLND_MSG_TYPE_HELLO:
1429 if (nob < basenob + sizeof(kptl_hello_msg_t)) {
1430 CERROR("Short hello from %s(%s)\n",
1431 libcfs_id2str(srcid),
1432 ptllnd_ptlid2str(initiator));
1436 __swab64s(&msg->ptlm_u.hello.kptlhm_matchbits);
1437 __swab32s(&msg->ptlm_u.hello.kptlhm_max_msg_size);
1441 case PTLLND_MSG_TYPE_NOOP:
1445 CERROR("Bad message type %d from %s(%s)\n", msg->ptlm_type,
1446 libcfs_id2str(srcid),
1447 ptllnd_ptlid2str(initiator));
1451 plp = ptllnd_find_peer(ni, srcid, 0);
1453 CERROR("Can't find peer %s\n", libcfs_id2str(srcid));
1457 if (msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {
1458 if (plp->plp_recvd_hello) {
1459 CERROR("Unexpected HELLO from %s\n",
1460 libcfs_id2str(srcid));
1461 ptllnd_peer_decref(plp);
1465 plp->plp_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size;
1466 plp->plp_match = msg->ptlm_u.hello.kptlhm_matchbits;
1467 plp->plp_stamp = msg->ptlm_srcstamp;
1468 plp->plp_recvd_hello = 1;
1470 } else if (!plp->plp_recvd_hello) {
1472 CERROR("Bad message type %d (HELLO expected) from %s\n",
1473 msg->ptlm_type, libcfs_id2str(srcid));
1474 ptllnd_peer_decref(plp);
1477 } else if (msg->ptlm_srcstamp != plp->plp_stamp) {
1479 CERROR("Bad srcstamp "LPX64"("LPX64" expected) from %s\n",
1480 msg->ptlm_srcstamp, plp->plp_stamp,
1481 libcfs_id2str(srcid));
1482 ptllnd_peer_decref(plp);
1486 /* Check peer only sends when I've sent her credits */
1487 if (plp->plp_sent_credits == 0) {
1488 CERROR("%s[%d/%d+%d(%d)]: unexpected message\n",
1489 libcfs_id2str(plp->plp_id),
1490 plp->plp_credits, plp->plp_outstanding_credits,
1491 plp->plp_sent_credits,
1492 plni->plni_peer_credits + plp->plp_lazy_credits);
1495 plp->plp_sent_credits--;
1497 /* No check for credit overflow - the peer may post new buffers after
1498 * the startup handshake. */
1499 if (msg->ptlm_credits > 0) {
1500 plp->plp_credits += msg->ptlm_credits;
1501 ptllnd_check_sends(plp);
1504 /* All OK so far; assume the message is good... */
1511 switch (msg->ptlm_type) {
1512 default: /* message types have been checked already */
1513 ptllnd_rx_done(&rx);
1516 case PTLLND_MSG_TYPE_PUT:
1517 case PTLLND_MSG_TYPE_GET:
1518 rc = lnet_parse(ni, &msg->ptlm_u.rdma.kptlrm_hdr,
1519 msg->ptlm_srcnid, &rx, 1);
1521 ptllnd_rx_done(&rx);
1524 case PTLLND_MSG_TYPE_IMMEDIATE:
1525 rc = lnet_parse(ni, &msg->ptlm_u.immediate.kptlim_hdr,
1526 msg->ptlm_srcnid, &rx, 0);
1528 ptllnd_rx_done(&rx);
1532 ptllnd_peer_decref(plp);
1536 ptllnd_buf_event (lnet_ni_t *ni, ptl_event_t *event)
1538 ptllnd_buffer_t *buf = ptllnd_eventarg2obj(event->md.user_ptr);
1539 ptllnd_ni_t *plni = ni->ni_data;
1540 char *msg = &buf->plb_buffer[event->offset];
1542 int unlinked = event->type == PTL_EVENT_UNLINK;
1544 LASSERT (buf->plb_ni == ni);
1545 LASSERT (event->type == PTL_EVENT_PUT_END ||
1546 event->type == PTL_EVENT_UNLINK);
1548 if (event->ni_fail_type != PTL_NI_OK) {
1550 CERROR("event type %s(%d), status %s(%d) from %s\n",
1551 ptllnd_evtype2str(event->type), event->type,
1552 ptllnd_errtype2str(event->ni_fail_type),
1553 event->ni_fail_type,
1554 ptllnd_ptlid2str(event->initiator));
1556 } else if (event->type == PTL_EVENT_PUT_END) {
1557 #if (PTL_MD_LOCAL_ALIGN8 == 0)
1558 /* Portals can't force message alignment - someone sending an
1559 * odd-length message could misalign subsequent messages */
1560 if ((event->mlength & 7) != 0) {
1561 CERROR("Message from %s has odd length %llu: "
1562 "probable version incompatibility\n",
1563 ptllnd_ptlid2str(event->initiator),
1568 LASSERT ((event->offset & 7) == 0);
1570 ptllnd_parse_request(ni, event->initiator,
1571 (kptl_msg_t *)msg, event->mlength);
1574 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
1575 /* UNLINK event only on explicit unlink */
1576 repost = (event->unlinked && event->type != PTL_EVENT_UNLINK);
1577 if (event->unlinked)
1580 /* UNLINK event only on implicit unlink */
1581 repost = (event->type == PTL_EVENT_UNLINK);
1585 LASSERT(buf->plb_posted);
1586 buf->plb_posted = 0;
1587 plni->plni_nposted_buffers--;
1591 (void) ptllnd_post_buffer(buf);
1595 ptllnd_tx_event (lnet_ni_t *ni, ptl_event_t *event)
1597 ptllnd_ni_t *plni = ni->ni_data;
1598 ptllnd_tx_t *tx = ptllnd_eventarg2obj(event->md.user_ptr);
1599 int error = (event->ni_fail_type != PTL_NI_OK);
1602 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
1603 int unlinked = event->unlinked;
1605 int unlinked = (event->type == PTL_EVENT_UNLINK);
1609 CERROR("Error %s(%d) event %s(%d) unlinked %d, %s(%d) for %s\n",
1610 ptllnd_errtype2str(event->ni_fail_type),
1611 event->ni_fail_type,
1612 ptllnd_evtype2str(event->type), event->type,
1613 unlinked, ptllnd_msgtype2str(tx->tx_type), tx->tx_type,
1614 libcfs_id2str(tx->tx_peer->plp_id));
1616 LASSERT (!PtlHandleIsEqual(event->md_handle, PTL_INVALID_HANDLE));
1618 isreq = PtlHandleIsEqual(event->md_handle, tx->tx_reqmdh);
1620 LASSERT (event->md.start == (void *)&tx->tx_msg);
1622 tx->tx_reqmdh = PTL_INVALID_HANDLE;
1623 gettimeofday(&tx->tx_req_done, NULL);
1627 isbulk = PtlHandleIsEqual(event->md_handle, tx->tx_bulkmdh);
1628 if ( isbulk && unlinked ) {
1629 tx->tx_bulkmdh = PTL_INVALID_HANDLE;
1630 gettimeofday(&tx->tx_bulk_done, NULL);
1633 LASSERT (!isreq != !isbulk); /* always one and only 1 match */
1635 PTLLND_HISTORY("%s[%d/%d+%d(%d)]: TX done %p %s%s",
1636 libcfs_id2str(tx->tx_peer->plp_id),
1637 tx->tx_peer->plp_credits,
1638 tx->tx_peer->plp_outstanding_credits,
1639 tx->tx_peer->plp_sent_credits,
1640 plni->plni_peer_credits + tx->tx_peer->plp_lazy_credits,
1641 tx, isreq ? "REQ" : "BULK", unlinked ? "(unlinked)" : "");
1643 LASSERT (!isreq != !isbulk); /* always one and only 1 match */
1644 switch (tx->tx_type) {
1648 case PTLLND_MSG_TYPE_NOOP:
1649 case PTLLND_MSG_TYPE_HELLO:
1650 case PTLLND_MSG_TYPE_IMMEDIATE:
1651 LASSERT (event->type == PTL_EVENT_UNLINK ||
1652 event->type == PTL_EVENT_SEND_END);
1656 case PTLLND_MSG_TYPE_GET:
1657 LASSERT (event->type == PTL_EVENT_UNLINK ||
1658 (isreq && event->type == PTL_EVENT_SEND_END) ||
1659 (isbulk && event->type == PTL_EVENT_PUT_END));
1661 if (isbulk && !error && event->type == PTL_EVENT_PUT_END) {
1662 /* Check GET matched */
1663 if (event->hdr_data == PTLLND_RDMA_OK) {
1664 lnet_set_reply_msg_len(ni,
1665 tx->tx_lnetreplymsg,
1668 CERROR ("Unmatched GET with %s\n",
1669 libcfs_id2str(tx->tx_peer->plp_id));
1670 tx->tx_status = -EIO;
1675 case PTLLND_MSG_TYPE_PUT:
1676 LASSERT (event->type == PTL_EVENT_UNLINK ||
1677 (isreq && event->type == PTL_EVENT_SEND_END) ||
1678 (isbulk && event->type == PTL_EVENT_GET_END));
1681 case PTLLND_RDMA_READ:
1682 LASSERT (event->type == PTL_EVENT_UNLINK ||
1683 event->type == PTL_EVENT_SEND_END ||
1684 event->type == PTL_EVENT_REPLY_END);
1688 case PTLLND_RDMA_WRITE:
1689 LASSERT (event->type == PTL_EVENT_UNLINK ||
1690 event->type == PTL_EVENT_SEND_END);
1694 /* Schedule ptllnd_tx_done() on error or last completion event */
1696 (PtlHandleIsEqual(tx->tx_bulkmdh, PTL_INVALID_HANDLE) &&
1697 PtlHandleIsEqual(tx->tx_reqmdh, PTL_INVALID_HANDLE))) {
1699 tx->tx_status = -EIO;
1700 list_del(&tx->tx_list);
1701 list_add_tail(&tx->tx_list, &plni->plni_zombie_txs);
1706 ptllnd_find_timed_out_tx(ptllnd_peer_t *peer)
1708 time_t now = cfs_time_current_sec();
1709 struct list_head *tmp;
1711 list_for_each(tmp, &peer->plp_txq) {
1712 ptllnd_tx_t *tx = list_entry(tmp, ptllnd_tx_t, tx_list);
1714 if (tx->tx_deadline < now)
1718 list_for_each(tmp, &peer->plp_activeq) {
1719 ptllnd_tx_t *tx = list_entry(tmp, ptllnd_tx_t, tx_list);
1721 if (tx->tx_deadline < now)
1729 ptllnd_check_peer(ptllnd_peer_t *peer)
1731 ptllnd_tx_t *tx = ptllnd_find_timed_out_tx(peer);
1736 CERROR("%s: timed out\n", libcfs_id2str(peer->plp_id));
1737 ptllnd_close_peer(peer, -ETIMEDOUT);
1741 ptllnd_watchdog (lnet_ni_t *ni, time_t now)
1743 ptllnd_ni_t *plni = ni->ni_data;
1745 int p = plni->plni_watchdog_interval;
1746 int chunk = plni->plni_peer_hash_size;
1747 int interval = now - (plni->plni_watchdog_nextt - p);
1749 struct list_head *hashlist;
1750 struct list_head *tmp;
1751 struct list_head *nxt;
1753 /* Time to check for RDMA timeouts on a few more peers:
1754 * I try to do checks every 'p' seconds on a proportion of the peer
1755 * table and I need to check every connection 'n' times within a
1756 * timeout interval, to ensure I detect a timeout on any connection
1757 * within (n+1)/n times the timeout interval. */
1759 LASSERT (now >= plni->plni_watchdog_nextt);
1761 if (plni->plni_timeout > n * interval) { /* Scan less than the whole table? */
1762 chunk = (chunk * n * interval) / plni->plni_timeout;
1767 for (i = 0; i < chunk; i++) {
1768 hashlist = &plni->plni_peer_hash[plni->plni_watchdog_peeridx];
1770 list_for_each_safe(tmp, nxt, hashlist) {
1771 ptllnd_check_peer(list_entry(tmp, ptllnd_peer_t, plp_list));
1774 plni->plni_watchdog_peeridx = (plni->plni_watchdog_peeridx + 1) %
1775 plni->plni_peer_hash_size;
1778 plni->plni_watchdog_nextt = now + p;
1782 ptllnd_wait (lnet_ni_t *ni, int milliseconds)
1784 static struct timeval prevt;
1785 static int prevt_count;
1786 static int call_count;
1788 struct timeval start;
1789 struct timeval then;
1791 struct timeval deadline;
1793 ptllnd_ni_t *plni = ni->ni_data;
1801 /* Handle any currently queued events, returning immediately if any.
1802 * Otherwise block for the timeout and handle all events queued
1805 gettimeofday(&start, NULL);
1808 if (milliseconds <= 0) {
1811 deadline.tv_sec = start.tv_sec + milliseconds/1000;
1812 deadline.tv_usec = start.tv_usec + (milliseconds % 1000)*1000;
1814 if (deadline.tv_usec >= 1000000) {
1815 start.tv_usec -= 1000000;
1821 gettimeofday(&then, NULL);
1823 rc = PtlEQPoll(&plni->plni_eqh, 1, timeout, &event, &which);
1825 gettimeofday(&now, NULL);
1827 if ((now.tv_sec*1000 + now.tv_usec/1000) -
1828 (then.tv_sec*1000 + then.tv_usec/1000) > timeout + 1000) {
1829 /* 1000 mS grace...........................^ */
1830 CERROR("SLOW PtlEQPoll(%d): %dmS elapsed\n", timeout,
1831 (int)(now.tv_sec*1000 + now.tv_usec/1000) -
1832 (int)(then.tv_sec*1000 + then.tv_usec/1000));
1835 if (rc == PTL_EQ_EMPTY) {
1836 if (found) /* handled some events */
1839 if (now.tv_sec >= plni->plni_watchdog_nextt) { /* check timeouts? */
1840 ptllnd_watchdog(ni, now.tv_sec);
1841 LASSERT (now.tv_sec < plni->plni_watchdog_nextt);
1844 if (now.tv_sec > deadline.tv_sec || /* timeout expired */
1845 (now.tv_sec == deadline.tv_sec &&
1846 now.tv_usec >= deadline.tv_usec))
1849 if (milliseconds < 0 ||
1850 plni->plni_watchdog_nextt <= deadline.tv_sec) {
1851 timeout = (plni->plni_watchdog_nextt - now.tv_sec)*1000;
1853 timeout = (deadline.tv_sec - now.tv_sec)*1000 +
1854 (deadline.tv_usec - now.tv_usec)/1000;
1860 LASSERT (rc == PTL_OK || rc == PTL_EQ_DROPPED);
1862 if (rc == PTL_EQ_DROPPED)
1863 CERROR("Event queue: size %d is too small\n",
1864 plni->plni_eq_size);
1869 switch (ptllnd_eventarg2type(event.md.user_ptr)) {
1873 case PTLLND_EVENTARG_TYPE_TX:
1874 ptllnd_tx_event(ni, &event);
1877 case PTLLND_EVENTARG_TYPE_BUF:
1878 ptllnd_buf_event(ni, &event);
1883 while (!list_empty(&plni->plni_zombie_txs)) {
1884 tx = list_entry(plni->plni_zombie_txs.next,
1885 ptllnd_tx_t, tx_list);
1886 list_del_init(&tx->tx_list);
1890 if (prevt.tv_sec == 0 ||
1891 prevt.tv_sec != now.tv_sec) {
1892 PTLLND_HISTORY("%d wait entered at %d.%06d - prev %d %d.%06d",
1893 call_count, (int)start.tv_sec, (int)start.tv_usec,
1894 prevt_count, (int)prevt.tv_sec, (int)prevt.tv_usec);