1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved.
5 * Author: Eric Barton <eeb@bartonsoftware.com>
7 * This file is part of the Lustre file system, http://www.lustre.org
8 * Lustre is a trademark of Cluster File Systems, Inc.
10 * This file is confidential source code owned by Cluster File Systems.
11 * No viewing, modification, compilation, redistribution, or any other
12 * form of use is permitted except through a signed license agreement.
14 * If you have not signed such an agreement, then you have no rights to
15 * this file. Please destroy it immediately and contact CFS.
22 ptllnd_ptlid2str(ptl_process_id_t id)
24 static char strs[8][32];
27 char *str = strs[idx++];
29 if (idx >= sizeof(strs)/sizeof(strs[0]))
32 snprintf(str, sizeof(strs[0]), FMT_PTLID, id.pid, id.nid);
37 ptllnd_destroy_peer(ptllnd_peer_t *peer)
39 lnet_ni_t *ni = peer->plp_ni;
40 ptllnd_ni_t *plni = ni->ni_data;
42 LASSERT (peer->plp_closing);
43 LASSERT (plni->plni_npeers > 0);
44 LASSERT (list_empty(&peer->plp_txq));
45 LASSERT (list_empty(&peer->plp_activeq));
47 LIBCFS_FREE(peer, sizeof(*peer));
51 ptllnd_abort_txs(ptllnd_ni_t *plni, struct list_head *q)
53 while (!list_empty(q)) {
54 ptllnd_tx_t *tx = list_entry(q->next, ptllnd_tx_t, tx_list);
56 tx->tx_status = -ESHUTDOWN;
57 list_del(&tx->tx_list);
58 list_add_tail(&tx->tx_list, &plni->plni_zombie_txs);
63 ptllnd_close_peer(ptllnd_peer_t *peer, int error)
65 lnet_ni_t *ni = peer->plp_ni;
66 ptllnd_ni_t *plni = ni->ni_data;
68 if (peer->plp_closing)
71 peer->plp_closing = 1;
73 if (!list_empty(&peer->plp_txq) ||
74 !list_empty(&peer->plp_activeq) ||
76 CERROR("Closing %s\n", libcfs_id2str(peer->plp_id));
77 ptllnd_debug_peer(ni, peer->plp_id);
80 ptllnd_abort_txs(plni, &peer->plp_txq);
81 ptllnd_abort_txs(plni, &peer->plp_activeq);
83 list_del(&peer->plp_list);
84 ptllnd_peer_decref(peer);
88 ptllnd_find_peer(lnet_ni_t *ni, lnet_process_id_t id, int create)
90 ptllnd_ni_t *plni = ni->ni_data;
91 unsigned int hash = LNET_NIDADDR(id.nid) % plni->plni_peer_hash_size;
92 struct list_head *tmp;
97 LASSERT (LNET_NIDNET(id.nid) == LNET_NIDNET(ni->ni_nid));
99 list_for_each(tmp, &plni->plni_peer_hash[hash]) {
100 plp = list_entry(tmp, ptllnd_peer_t, plp_list);
102 if (plp->plp_id.nid == id.nid &&
103 plp->plp_id.pid == id.pid) {
104 ptllnd_peer_addref(plp);
112 /* New peer: check first for enough posted buffers */
114 rc = ptllnd_grow_buffers(ni);
120 LIBCFS_ALLOC(plp, sizeof(*plp));
122 CERROR("Can't allocate new peer %s\n", libcfs_id2str(id));
127 CDEBUG(D_NET, "new peer=%p\n",plp);
131 plp->plp_ptlid.nid = LNET_NIDADDR(id.nid);
132 plp->plp_ptlid.pid = plni->plni_ptllnd_pid;
133 plp->plp_max_credits =
134 plp->plp_credits = 1; /* add more later when she gives me credits */
135 plp->plp_max_msg_size = plni->plni_max_msg_size; /* until I hear from her */
136 plp->plp_outstanding_credits = plni->plni_peer_credits - 1;
139 plp->plp_recvd_hello = 0;
140 plp->plp_closing = 0;
141 plp->plp_refcount = 1;
142 CFS_INIT_LIST_HEAD(&plp->plp_list);
143 CFS_INIT_LIST_HEAD(&plp->plp_txq);
144 CFS_INIT_LIST_HEAD(&plp->plp_activeq);
146 ptllnd_peer_addref(plp);
147 list_add_tail(&plp->plp_list, &plni->plni_peer_hash[hash]);
149 tx = ptllnd_new_tx(plp, PTLLND_MSG_TYPE_HELLO, 0);
151 CERROR("Can't send HELLO to %s\n", libcfs_id2str(id));
152 ptllnd_close_peer(plp, -ENOMEM);
153 ptllnd_peer_decref(plp);
157 tx->tx_msg.ptlm_u.hello.kptlhm_matchbits = PTL_RESERVED_MATCHBITS;
158 tx->tx_msg.ptlm_u.hello.kptlhm_max_msg_size = plni->plni_max_msg_size;
160 PTLLND_HISTORY("%s[%d/%d]: post hello %p", libcfs_id2str(id),
161 tx->tx_peer->plp_credits,
162 tx->tx_peer->plp_outstanding_credits, tx);
169 ptllnd_count_q(struct list_head *q)
174 list_for_each(e, q) {
182 ptllnd_tx_typestr(int type)
185 case PTLLND_RDMA_WRITE:
188 case PTLLND_RDMA_READ:
191 case PTLLND_MSG_TYPE_PUT:
194 case PTLLND_MSG_TYPE_GET:
197 case PTLLND_MSG_TYPE_IMMEDIATE:
200 case PTLLND_MSG_TYPE_NOOP:
203 case PTLLND_MSG_TYPE_HELLO:
212 ptllnd_debug_tx(ptllnd_tx_t *tx)
214 CDEBUG(D_WARNING, "%s %s b "DBGT_FMT"/"DBGT_FMT
215 " r "DBGT_FMT"/"DBGT_FMT" status %d\n",
216 ptllnd_tx_typestr(tx->tx_type),
217 libcfs_id2str(tx->tx_peer->plp_id)
218 DBGT_ARGS(tx->tx_bulk_posted) DBGT_ARGS(tx->tx_bulk_done)
219 DBGT_ARGS(tx->tx_req_posted) DBGT_ARGS(tx->tx_req_done),
224 ptllnd_debug_peer(lnet_ni_t *ni, lnet_process_id_t id)
226 ptllnd_peer_t *plp = ptllnd_find_peer(ni, id, 0);
227 struct list_head *tmp;
228 ptllnd_ni_t *plni = ni->ni_data;
232 CDEBUG(D_WARNING, "No peer %s\n", libcfs_id2str(id));
236 CDEBUG(D_WARNING, "%s %s%s [%d] "LPD64".%06d m "LPD64" q %d/%d c %d/%d(%d)\n",
238 plp->plp_recvd_hello ? "H" : "_",
239 plp->plp_closing ? "C" : "_",
241 plp->plp_stamp / 1000000, (int)(plp->plp_stamp % 1000000),
243 ptllnd_count_q(&plp->plp_txq),
244 ptllnd_count_q(&plp->plp_activeq),
245 plp->plp_credits, plp->plp_outstanding_credits, plp->plp_max_credits);
247 CDEBUG(D_WARNING, "txq:\n");
248 list_for_each (tmp, &plp->plp_txq) {
249 tx = list_entry(tmp, ptllnd_tx_t, tx_list);
254 CDEBUG(D_WARNING, "activeq:\n");
255 list_for_each (tmp, &plp->plp_activeq) {
256 tx = list_entry(tmp, ptllnd_tx_t, tx_list);
261 CDEBUG(D_WARNING, "zombies:\n");
262 list_for_each (tmp, &plni->plni_zombie_txs) {
263 tx = list_entry(tmp, ptllnd_tx_t, tx_list);
265 if (tx->tx_peer->plp_id.nid == id.nid &&
266 tx->tx_peer->plp_id.pid == id.pid)
270 CDEBUG(D_WARNING, "history:\n");
271 list_for_each (tmp, &plni->plni_tx_history) {
272 tx = list_entry(tmp, ptllnd_tx_t, tx_list);
274 if (tx->tx_peer->plp_id.nid == id.nid &&
275 tx->tx_peer->plp_id.pid == id.pid)
279 ptllnd_peer_decref(plp);
280 ptllnd_dump_history();
284 ptllnd_notify(lnet_ni_t *ni, lnet_nid_t nid, int alive)
286 lnet_process_id_t id;
288 time_t start = cfs_time_current_sec();
289 int w = PTLLND_WARN_LONG_WAIT;
291 /* This is only actually used to connect to routers at startup! */
298 id.pid = LUSTRE_SRV_LNET_PID;
300 peer = ptllnd_find_peer(ni, id, 1);
304 /* wait for the peer to reply */
305 while (!peer->plp_recvd_hello) {
306 if (cfs_time_current_sec() > start + w) {
307 CWARN("Waited %ds to connect to %s\n",
308 w, libcfs_id2str(id));
312 ptllnd_wait(ni, w*1000);
315 ptllnd_peer_decref(peer);
319 ptllnd_cksum (void *ptr, int nob)
325 sum = ((sum << 1) | (sum >> 31)) + *c++;
327 /* ensure I don't return 0 (== no checksum) */
328 return (sum == 0) ? 1 : sum;
332 ptllnd_new_tx(ptllnd_peer_t *peer, int type, int payload_nob)
334 lnet_ni_t *ni = peer->plp_ni;
335 ptllnd_ni_t *plni = ni->ni_data;
339 CDEBUG(D_NET, "peer=%p type=%d payload=%d\n",peer,type,payload_nob);
345 case PTLLND_RDMA_WRITE:
346 case PTLLND_RDMA_READ:
347 LASSERT (payload_nob == 0);
351 case PTLLND_MSG_TYPE_PUT:
352 case PTLLND_MSG_TYPE_GET:
353 LASSERT (payload_nob == 0);
354 msgsize = offsetof(kptl_msg_t, ptlm_u) +
355 sizeof(kptl_rdma_msg_t);
358 case PTLLND_MSG_TYPE_IMMEDIATE:
359 msgsize = offsetof(kptl_msg_t,
360 ptlm_u.immediate.kptlim_payload[payload_nob]);
363 case PTLLND_MSG_TYPE_NOOP:
364 LASSERT (payload_nob == 0);
365 msgsize = offsetof(kptl_msg_t, ptlm_u);
368 case PTLLND_MSG_TYPE_HELLO:
369 LASSERT (payload_nob == 0);
370 msgsize = offsetof(kptl_msg_t, ptlm_u) +
371 sizeof(kptl_hello_msg_t);
375 msgsize = (msgsize + 7) & ~7;
376 LASSERT (msgsize <= peer->plp_max_msg_size);
378 CDEBUG(D_NET, "msgsize=%d\n",msgsize);
380 LIBCFS_ALLOC(tx, offsetof(ptllnd_tx_t, tx_msg) + msgsize);
383 CERROR("Can't allocate msg type %d for %s\n",
384 type, libcfs_id2str(peer->plp_id));
388 CFS_INIT_LIST_HEAD(&tx->tx_list);
391 tx->tx_lnetmsg = tx->tx_lnetreplymsg = NULL;
394 tx->tx_reqmdh = PTL_INVALID_HANDLE;
395 tx->tx_bulkmdh = PTL_INVALID_HANDLE;
396 tx->tx_msgsize = msgsize;
397 tx->tx_completing = 0;
400 PTLLND_DBGT_INIT(tx->tx_bulk_posted);
401 PTLLND_DBGT_INIT(tx->tx_bulk_done);
402 PTLLND_DBGT_INIT(tx->tx_req_posted);
403 PTLLND_DBGT_INIT(tx->tx_req_done);
406 tx->tx_msg.ptlm_magic = PTLLND_MSG_MAGIC;
407 tx->tx_msg.ptlm_version = PTLLND_MSG_VERSION;
408 tx->tx_msg.ptlm_type = type;
409 tx->tx_msg.ptlm_credits = 0;
410 tx->tx_msg.ptlm_nob = msgsize;
411 tx->tx_msg.ptlm_cksum = 0;
412 tx->tx_msg.ptlm_srcnid = ni->ni_nid;
413 tx->tx_msg.ptlm_srcstamp = plni->plni_stamp;
414 tx->tx_msg.ptlm_dstnid = peer->plp_id.nid;
415 tx->tx_msg.ptlm_dststamp = peer->plp_stamp;
416 tx->tx_msg.ptlm_srcpid = the_lnet.ln_pid;
417 tx->tx_msg.ptlm_dstpid = peer->plp_id.pid;
420 ptllnd_peer_addref(peer);
423 CDEBUG(D_NET, "tx=%p\n",tx);
429 ptllnd_abort_tx(ptllnd_tx_t *tx, ptl_handle_md_t *mdh)
431 ptllnd_peer_t *peer = tx->tx_peer;
432 lnet_ni_t *ni = peer->plp_ni;
434 time_t start = cfs_time_current_sec();
435 int w = PTLLND_WARN_LONG_WAIT;
437 while (!PtlHandleIsEqual(*mdh, PTL_INVALID_HANDLE)) {
438 rc = PtlMDUnlink(*mdh);
439 #ifndef LUSTRE_PORTALS_UNLINK_SEMANTICS
440 if (rc == PTL_OK) /* unlink successful => no unlinked event */
442 LASSERT (rc == PTL_MD_IN_USE);
444 if (cfs_time_current_sec() > start + w) {
445 CWARN("Waited %ds to abort tx to %s\n",
446 w, libcfs_id2str(peer->plp_id));
449 /* Wait for ptllnd_tx_event() to invalidate */
450 ptllnd_wait(ni, w*1000);
455 ptllnd_cull_tx_history(ptllnd_ni_t *plni)
457 int max = plni->plni_max_tx_history;
459 while (plni->plni_ntx_history > max) {
460 ptllnd_tx_t *tx = list_entry(plni->plni_tx_history.next,
461 ptllnd_tx_t, tx_list);
462 list_del(&tx->tx_list);
464 ptllnd_peer_decref(tx->tx_peer);
466 LIBCFS_FREE(tx, offsetof(ptllnd_tx_t, tx_msg) + tx->tx_msgsize);
468 LASSERT (plni->plni_ntxs > 0);
470 plni->plni_ntx_history--;
475 ptllnd_tx_done(ptllnd_tx_t *tx)
477 ptllnd_peer_t *peer = tx->tx_peer;
478 lnet_ni_t *ni = peer->plp_ni;
479 ptllnd_ni_t *plni = ni->ni_data;
481 /* CAVEAT EMPTOR: If this tx is being aborted, I'll continue to get
482 * events for this tx until it's unlinked. So I set tx_completing to
483 * flag the tx is getting handled */
485 if (tx->tx_completing)
488 tx->tx_completing = 1;
490 if (!list_empty(&tx->tx_list))
491 list_del_init(&tx->tx_list);
493 if (tx->tx_status != 0) {
494 CERROR("Completing tx with error\n");
496 ptllnd_close_peer(peer, tx->tx_status);
499 ptllnd_abort_tx(tx, &tx->tx_reqmdh);
500 ptllnd_abort_tx(tx, &tx->tx_bulkmdh);
502 if (tx->tx_niov > 0) {
503 LIBCFS_FREE(tx->tx_iov, tx->tx_niov * sizeof(*tx->tx_iov));
507 if (tx->tx_lnetreplymsg != NULL) {
508 LASSERT (tx->tx_type == PTLLND_MSG_TYPE_GET);
509 LASSERT (tx->tx_lnetmsg != NULL);
510 /* Simulate GET success always */
511 lnet_finalize(ni, tx->tx_lnetmsg, 0);
512 CDEBUG(D_NET, "lnet_finalize(tx_lnetreplymsg=%p)\n",tx->tx_lnetreplymsg);
513 lnet_finalize(ni, tx->tx_lnetreplymsg, tx->tx_status);
514 } else if (tx->tx_lnetmsg != NULL) {
515 lnet_finalize(ni, tx->tx_lnetmsg, tx->tx_status);
518 plni->plni_ntx_history++;
519 list_add_tail(&tx->tx_list, &plni->plni_tx_history);
521 ptllnd_cull_tx_history(plni);
525 ptllnd_set_txiov(ptllnd_tx_t *tx,
526 unsigned int niov, struct iovec *iov,
527 unsigned int offset, unsigned int len)
529 ptl_md_iovec_t *piov;
537 CDEBUG(D_NET, "niov =%d\n",niov);
538 CDEBUG(D_NET, "offset=%d\n",offset);
539 CDEBUG(D_NET, "len =%d\n",len);
543 * Remove iovec's at the beginning that
544 * are skipped because of the offset.
545 * Adjust the offset accordingly
549 if (offset < iov->iov_len)
551 offset -= iov->iov_len;
556 CDEBUG(D_NET, "niov =%d (after)\n",niov);
557 CDEBUG(D_NET, "offset=%d (after)\n",offset);
558 CDEBUG(D_NET, "len =%d (after)\n",len);
561 int temp_offset = offset;
563 LIBCFS_ALLOC(piov, niov * sizeof(*piov));
567 for (npiov = 0;; npiov++) {
568 CDEBUG(D_NET, "npiov=%d\n",npiov);
569 CDEBUG(D_NET, "offset=%d\n",temp_offset);
570 CDEBUG(D_NET, "len=%d\n",resid);
571 CDEBUG(D_NET, "iov[npiov].iov_len=%d\n",iov[npiov].iov_len);
573 LASSERT (npiov < niov);
574 LASSERT (iov->iov_len >= temp_offset);
576 piov[npiov].iov_base = iov[npiov].iov_base + temp_offset;
577 piov[npiov].iov_len = iov[npiov].iov_len - temp_offset;
579 if (piov[npiov].iov_len >= resid) {
580 piov[npiov].iov_len = resid;
584 resid -= piov[npiov].iov_len;
591 CDEBUG(D_NET, "tx->tx_iov=%p\n",tx->tx_iov);
592 CDEBUG(D_NET, "tx->tx_niov=%d\n",tx->tx_niov);
596 /* Dang! The piov I allocated was too big and it's a drag to
597 * have to maintain separate 'allocated' and 'used' sizes, so
598 * I'll just do it again; NB this doesn't happen normally... */
599 LIBCFS_FREE(piov, niov * sizeof(*piov));
605 ptllnd_set_md_buffer(ptl_md_t *md, ptllnd_tx_t *tx)
607 unsigned int niov = tx->tx_niov;
608 ptl_md_iovec_t *iov = tx->tx_iov;
610 LASSERT ((md->options & PTL_MD_IOVEC) == 0);
615 } else if (niov == 1) {
616 md->start = iov[0].iov_base;
617 md->length = iov[0].iov_len;
621 md->options |= PTL_MD_IOVEC;
626 ptllnd_post_buffer(ptllnd_buffer_t *buf)
628 lnet_ni_t *ni = buf->plb_ni;
629 ptllnd_ni_t *plni = ni->ni_data;
630 ptl_process_id_t anyid = {
634 .start = buf->plb_buffer,
635 .length = plni->plni_buffer_size,
636 .threshold = PTL_MD_THRESH_INF,
637 .max_size = plni->plni_max_msg_size,
638 .options = (PTLLND_MD_OPTIONS |
639 PTL_MD_OP_PUT | PTL_MD_MAX_SIZE |
640 PTL_MD_LOCAL_ALIGN8),
641 .user_ptr = ptllnd_obj2eventarg(buf, PTLLND_EVENTARG_TYPE_BUF),
642 .eq_handle = plni->plni_eqh};
646 LASSERT (!buf->plb_posted);
648 rc = PtlMEAttach(plni->plni_nih, plni->plni_portal,
649 anyid, LNET_MSG_MATCHBITS, 0,
650 PTL_UNLINK, PTL_INS_AFTER, &meh);
652 CERROR("PtlMEAttach failed: %d\n", rc);
657 plni->plni_nposted_buffers++;
659 rc = PtlMDAttach(meh, md, LNET_UNLINK, &buf->plb_md);
663 CERROR("PtlMDAttach failed: %d\n", rc);
666 plni->plni_nposted_buffers--;
668 rc = PtlMEUnlink(meh);
669 LASSERT (rc == PTL_OK);
675 ptllnd_check_sends(ptllnd_peer_t *peer)
677 lnet_ni_t *ni = peer->plp_ni;
678 ptllnd_ni_t *plni = ni->ni_data;
684 CDEBUG(D_NET, "plp_outstanding_credits=%d\n",peer->plp_outstanding_credits);
686 if (list_empty(&peer->plp_txq) &&
687 peer->plp_outstanding_credits >= PTLLND_CREDIT_HIGHWATER(plni) &&
688 peer->plp_credits != 0) {
690 tx = ptllnd_new_tx(peer, PTLLND_MSG_TYPE_NOOP, 0);
691 CDEBUG(D_NET, "NOOP tx=%p\n",tx);
693 CERROR("Can't return credits to %s\n",
694 libcfs_id2str(peer->plp_id));
696 list_add_tail(&tx->tx_list, &peer->plp_txq);
700 while (!list_empty(&peer->plp_txq)) {
701 tx = list_entry(peer->plp_txq.next, ptllnd_tx_t, tx_list);
703 CDEBUG(D_NET, "Looking at TX=%p\n",tx);
704 CDEBUG(D_NET, "plp_credits=%d\n",peer->plp_credits);
705 CDEBUG(D_NET, "plp_outstanding_credits=%d\n",peer->plp_outstanding_credits);
707 LASSERT (tx->tx_msgsize > 0);
709 LASSERT (peer->plp_outstanding_credits >= 0);
710 LASSERT (peer->plp_outstanding_credits <=
711 plni->plni_peer_credits);
712 LASSERT (peer->plp_credits >= 0);
713 LASSERT (peer->plp_credits <= peer->plp_max_credits);
715 if (peer->plp_credits == 0) { /* no credits */
716 PTLLND_HISTORY("%s[%d/%d]: no creds for %p",
717 libcfs_id2str(peer->plp_id),
719 peer->plp_outstanding_credits, tx);
723 if (peer->plp_credits == 1 && /* last credit reserved for */
724 peer->plp_outstanding_credits == 0) { /* returning credits */
725 PTLLND_HISTORY("%s[%d/%d]: too few creds for %p",
726 libcfs_id2str(peer->plp_id),
728 peer->plp_outstanding_credits, tx);
732 list_del(&tx->tx_list);
733 list_add_tail(&tx->tx_list, &peer->plp_activeq);
735 CDEBUG(D_NET, "Sending at TX=%p type=%s (%d)\n",tx,
736 ptllnd_msgtype2str(tx->tx_type),tx->tx_type);
738 if (tx->tx_type == PTLLND_MSG_TYPE_NOOP &&
739 (!list_empty(&peer->plp_txq) ||
740 peer->plp_outstanding_credits <
741 PTLLND_CREDIT_HIGHWATER(plni))) {
747 /* Set stamp at the last minute; on a new peer, I don't know it
748 * until I receive the HELLO back */
749 tx->tx_msg.ptlm_dststamp = peer->plp_stamp;
751 CDEBUG(D_NET, "Returning %d to peer\n",peer->plp_outstanding_credits);
754 * Return all the credits we have
756 tx->tx_msg.ptlm_credits = peer->plp_outstanding_credits;
757 peer->plp_outstanding_credits = 0;
764 if (plni->plni_checksum)
765 tx->tx_msg.ptlm_cksum =
766 ptllnd_cksum(&tx->tx_msg,
767 offsetof(kptl_msg_t, ptlm_u));
769 md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);
770 md.eq_handle = plni->plni_eqh;
772 md.options = PTLLND_MD_OPTIONS;
773 md.start = &tx->tx_msg;
774 md.length = tx->tx_msgsize;
776 rc = PtlMDBind(plni->plni_nih, md, LNET_UNLINK, &mdh);
778 CERROR("PtlMDBind for %s failed: %d\n",
779 libcfs_id2str(peer->plp_id), rc);
780 tx->tx_status = -EIO;
786 PTLLND_DBGT_STAMP(tx->tx_req_posted);
788 PTLLND_HISTORY("%s[%d/%d]: %s %p c %d", libcfs_id2str(peer->plp_id),
789 peer->plp_credits, peer->plp_outstanding_credits,
790 ptllnd_msgtype2str(tx->tx_type), tx,
791 tx->tx_msg.ptlm_credits);
793 rc = PtlPut(mdh, PTL_NOACK_REQ, peer->plp_ptlid,
794 plni->plni_portal, 0, LNET_MSG_MATCHBITS, 0, 0);
796 CERROR("PtlPut for %s failed: %d\n",
797 libcfs_id2str(peer->plp_id), rc);
798 tx->tx_status = -EIO;
806 ptllnd_passive_rdma(ptllnd_peer_t *peer, int type, lnet_msg_t *msg,
807 unsigned int niov, struct iovec *iov,
808 unsigned int offset, unsigned int len)
810 lnet_ni_t *ni = peer->plp_ni;
811 ptllnd_ni_t *plni = ni->ni_data;
812 ptllnd_tx_t *tx = ptllnd_new_tx(peer, type, 0);
822 CDEBUG(D_NET, "niov=%d offset=%d len=%d\n",niov,offset,len);
824 LASSERT (type == PTLLND_MSG_TYPE_GET ||
825 type == PTLLND_MSG_TYPE_PUT);
828 CERROR("Can't allocate %s tx for %s\n",
829 type == PTLLND_MSG_TYPE_GET ? "GET" : "PUT/REPLY",
830 libcfs_id2str(peer->plp_id));
834 rc = ptllnd_set_txiov(tx, niov, iov, offset, len);
836 CERROR ("Can't allocate iov %d for %s\n",
837 niov, libcfs_id2str(peer->plp_id));
842 md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);
843 md.eq_handle = plni->plni_eqh;
846 md.options = PTLLND_MD_OPTIONS;
847 if(type == PTLLND_MSG_TYPE_GET)
848 md.options |= PTL_MD_OP_PUT | PTL_MD_ACK_DISABLE;
850 md.options |= PTL_MD_OP_GET;
851 ptllnd_set_md_buffer(&md, tx);
853 start = cfs_time_current_sec();
854 w = PTLLND_WARN_LONG_WAIT;
856 while (!peer->plp_recvd_hello) { /* wait to validate plp_match */
857 if (peer->plp_closing) {
861 if (cfs_time_current_sec() > start + w) {
862 CWARN("Waited %ds to connect to %s\n",
863 w, libcfs_id2str(peer->plp_id));
866 ptllnd_wait(ni, w*1000);
869 if (peer->plp_match < PTL_RESERVED_MATCHBITS)
870 peer->plp_match = PTL_RESERVED_MATCHBITS;
871 matchbits = peer->plp_match++;
872 CDEBUG(D_NET, "matchbits " LPX64 " %s\n", matchbits,
873 ptllnd_ptlid2str(peer->plp_ptlid));
875 rc = PtlMEAttach(plni->plni_nih, plni->plni_portal, peer->plp_ptlid,
876 matchbits, 0, PTL_UNLINK, PTL_INS_BEFORE, &meh);
878 CERROR("PtlMEAttach for %s failed: %d\n",
879 libcfs_id2str(peer->plp_id), rc);
884 CDEBUG(D_NET, "md.start=%p\n",md.start);
885 CDEBUG(D_NET, "md.length=%d\n",md.length);
886 CDEBUG(D_NET, "md.threshold=%d\n",md.threshold);
887 CDEBUG(D_NET, "md.max_size=%d\n",md.max_size);
888 CDEBUG(D_NET, "md.options=0x%x\n",md.options);
889 CDEBUG(D_NET, "md.user_ptr=%p\n",md.user_ptr);
891 PTLLND_DBGT_STAMP(tx->tx_bulk_posted);
893 rc = PtlMDAttach(meh, md, LNET_UNLINK, &mdh);
895 CERROR("PtlMDAttach for %s failed: %d\n",
896 libcfs_id2str(peer->plp_id), rc);
897 rc2 = PtlMEUnlink(meh);
898 LASSERT (rc2 == PTL_OK);
902 tx->tx_bulkmdh = mdh;
905 * We need to set the stamp here because it
906 * we could have received a HELLO above that set
909 tx->tx_msg.ptlm_dststamp = peer->plp_stamp;
911 tx->tx_msg.ptlm_u.rdma.kptlrm_hdr = msg->msg_hdr;
912 tx->tx_msg.ptlm_u.rdma.kptlrm_matchbits = matchbits;
914 if (type == PTLLND_MSG_TYPE_GET) {
915 tx->tx_lnetreplymsg = lnet_create_reply_msg(ni, msg);
916 if (tx->tx_lnetreplymsg == NULL) {
917 CERROR("Can't create reply for GET to %s\n",
918 libcfs_id2str(msg->msg_target));
924 tx->tx_lnetmsg = msg;
925 PTLLND_HISTORY("%s[%d/%d]: post passive %s p %d %p",
926 libcfs_id2str(msg->msg_target),
927 peer->plp_credits, peer->plp_outstanding_credits,
928 lnet_msgtyp2str(msg->msg_type),
929 (le32_to_cpu(msg->msg_type) == LNET_MSG_PUT) ?
930 le32_to_cpu(msg->msg_hdr.msg.put.ptl_index) :
931 (le32_to_cpu(msg->msg_type) == LNET_MSG_GET) ?
932 le32_to_cpu(msg->msg_hdr.msg.get.ptl_index) : -1,
943 ptllnd_active_rdma(ptllnd_peer_t *peer, int type,
944 lnet_msg_t *msg, __u64 matchbits,
945 unsigned int niov, struct iovec *iov,
946 unsigned int offset, unsigned int len)
948 lnet_ni_t *ni = peer->plp_ni;
949 ptllnd_ni_t *plni = ni->ni_data;
950 ptllnd_tx_t *tx = ptllnd_new_tx(peer, type, 0);
955 LASSERT (type == PTLLND_RDMA_READ ||
956 type == PTLLND_RDMA_WRITE);
959 CERROR("Can't allocate tx for RDMA %s with %s\n",
960 (type == PTLLND_RDMA_WRITE) ? "write" : "read",
961 libcfs_id2str(peer->plp_id));
962 ptllnd_close_peer(peer, -ENOMEM);
966 rc = ptllnd_set_txiov(tx, niov, iov, offset, len);
968 CERROR ("Can't allocate iov %d for %s\n",
969 niov, libcfs_id2str(peer->plp_id));
974 md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);
975 md.eq_handle = plni->plni_eqh;
977 md.options = PTLLND_MD_OPTIONS;
978 md.threshold = (type == PTLLND_RDMA_READ) ? 2 : 1;
980 ptllnd_set_md_buffer(&md, tx);
982 rc = PtlMDBind(plni->plni_nih, md, LNET_UNLINK, &mdh);
984 CERROR("PtlMDBind for %s failed: %d\n",
985 libcfs_id2str(peer->plp_id), rc);
990 tx->tx_bulkmdh = mdh;
991 tx->tx_lnetmsg = msg;
993 list_add_tail(&tx->tx_list, &peer->plp_activeq);
994 PTLLND_DBGT_STAMP(tx->tx_bulk_posted);
996 if (type == PTLLND_RDMA_READ)
997 rc = PtlGet(mdh, peer->plp_ptlid,
998 plni->plni_portal, 0, matchbits, 0);
1000 rc = PtlPut(mdh, PTL_NOACK_REQ, peer->plp_ptlid,
1001 plni->plni_portal, 0, matchbits, 0,
1002 (msg == NULL) ? PTLLND_RDMA_FAIL : PTLLND_RDMA_OK);
1007 CERROR("Can't initiate RDMA with %s: %d\n",
1008 libcfs_id2str(peer->plp_id), rc);
1010 tx->tx_lnetmsg = NULL;
1013 ptllnd_tx_done(tx); /* this will close peer */
1018 ptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *msg)
1020 ptllnd_ni_t *plni = ni->ni_data;
1026 LASSERT (!msg->msg_routing);
1027 LASSERT (msg->msg_kiov == NULL);
1029 LASSERT (msg->msg_niov <= PTL_MD_MAX_IOV); /* !!! */
1031 CDEBUG(D_NET, "%s [%d]+%d,%d -> %s%s\n",
1032 lnet_msgtyp2str(msg->msg_type),
1033 msg->msg_niov, msg->msg_offset, msg->msg_len,
1034 libcfs_nid2str(msg->msg_target.nid),
1035 msg->msg_target_is_router ? "(rtr)" : "");
1037 if ((msg->msg_target.pid & LNET_PID_USERFLAG) != 0) {
1038 CERROR("Can't send to non-kernel peer %s\n",
1039 libcfs_id2str(msg->msg_target));
1040 return -EHOSTUNREACH;
1043 plp = ptllnd_find_peer(ni, msg->msg_target, 1);
1047 switch (msg->msg_type) {
1052 CDEBUG(D_NET, "LNET_MSG_ACK\n");
1054 LASSERT (msg->msg_len == 0);
1055 break; /* send IMMEDIATE */
1058 CDEBUG(D_NET, "LNET_MSG_GET nob=%d\n",msg->msg_md->md_length);
1060 if (msg->msg_target_is_router)
1061 break; /* send IMMEDIATE */
1063 nob = msg->msg_md->md_length;
1064 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[nob]);
1065 if (nob <= plni->plni_max_msg_size)
1068 LASSERT ((msg->msg_md->md_options & LNET_MD_KIOV) == 0);
1069 rc = ptllnd_passive_rdma(plp, PTLLND_MSG_TYPE_GET, msg,
1070 msg->msg_md->md_niov,
1071 msg->msg_md->md_iov.iov,
1072 0, msg->msg_md->md_length);
1073 ptllnd_peer_decref(plp);
1076 case LNET_MSG_REPLY:
1078 CDEBUG(D_NET, "LNET_MSG_PUT nob=%d\n",msg->msg_len);
1080 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[nob]);
1081 CDEBUG(D_NET, "msg_size=%d max=%d\n",msg->msg_len,plp->plp_max_msg_size);
1082 if (nob <= plp->plp_max_msg_size)
1083 break; /* send IMMEDIATE */
1085 rc = ptllnd_passive_rdma(plp, PTLLND_MSG_TYPE_PUT, msg,
1086 msg->msg_niov, msg->msg_iov,
1087 msg->msg_offset, msg->msg_len);
1088 ptllnd_peer_decref(plp);
1093 * NB copy the payload so we don't have to do a fragmented send */
1095 CDEBUG(D_NET, "IMMEDIATE len=%d\n", msg->msg_len);
1096 tx = ptllnd_new_tx(plp, PTLLND_MSG_TYPE_IMMEDIATE, msg->msg_len);
1098 CERROR("Can't allocate tx for lnet type %d to %s\n",
1099 msg->msg_type, libcfs_id2str(msg->msg_target));
1100 ptllnd_peer_decref(plp);
1104 lnet_copy_iov2flat(tx->tx_msgsize, &tx->tx_msg,
1105 offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload),
1106 msg->msg_niov, msg->msg_iov, msg->msg_offset,
1108 tx->tx_msg.ptlm_u.immediate.kptlim_hdr = msg->msg_hdr;
1110 tx->tx_lnetmsg = msg;
1111 PTLLND_HISTORY("%s[%d/%d]: post immediate %s p %d %p",
1112 libcfs_id2str(msg->msg_target),
1113 plp->plp_credits, plp->plp_outstanding_credits,
1114 lnet_msgtyp2str(msg->msg_type),
1115 (le32_to_cpu(msg->msg_type) == LNET_MSG_PUT) ?
1116 le32_to_cpu(msg->msg_hdr.msg.put.ptl_index) :
1117 (le32_to_cpu(msg->msg_type) == LNET_MSG_GET) ?
1118 le32_to_cpu(msg->msg_hdr.msg.get.ptl_index) : -1,
1121 ptllnd_peer_decref(plp);
1126 ptllnd_rx_done(ptllnd_rx_t *rx)
1128 ptllnd_peer_t *plp = rx->rx_peer;
1129 lnet_ni_t *ni = plp->plp_ni;
1130 ptllnd_ni_t *plni = ni->ni_data;
1132 plp->plp_outstanding_credits++;
1134 PTLLND_HISTORY("%s[%d/%d]: rx=%p done\n", libcfs_id2str(plp->plp_id),
1135 plp->plp_credits, plp->plp_outstanding_credits, rx);
1137 ptllnd_check_sends(rx->rx_peer);
1139 LASSERT (plni->plni_nrxs > 0);
1144 ptllnd_eager_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg,
1145 void **new_privatep)
1147 /* Shouldn't get here; recvs only block for router buffers */
1153 ptllnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg,
1154 int delayed, unsigned int niov,
1155 struct iovec *iov, lnet_kiov_t *kiov,
1156 unsigned int offset, unsigned int mlen, unsigned int rlen)
1158 ptllnd_rx_t *rx = private;
1162 LASSERT (kiov == NULL);
1163 LASSERT (niov <= PTL_MD_MAX_IOV); /* !!! */
1165 switch (rx->rx_msg->ptlm_type) {
1169 case PTLLND_MSG_TYPE_IMMEDIATE:
1170 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[mlen]);
1171 CDEBUG(D_NET, "PTLLND_MSG_TYPE_IMMEDIATE nob=%d\n",nob);
1172 if (nob > rx->rx_nob) {
1173 CERROR("Immediate message from %s too big: %d(%d)\n",
1174 libcfs_id2str(rx->rx_peer->plp_id),
1179 lnet_copy_flat2iov(niov, iov, offset,
1180 rx->rx_nob, rx->rx_msg,
1181 offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload),
1183 lnet_finalize(ni, msg, 0);
1186 case PTLLND_MSG_TYPE_PUT:
1187 CDEBUG(D_NET, "PTLLND_MSG_TYPE_PUT offset=%d mlen=%d\n",offset,mlen);
1188 rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_READ, msg,
1189 rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,
1190 niov, iov, offset, mlen);
1193 case PTLLND_MSG_TYPE_GET:
1194 CDEBUG(D_NET, "PTLLND_MSG_TYPE_GET\n");
1196 rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_WRITE, msg,
1197 rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,
1198 msg->msg_niov, msg->msg_iov,
1199 msg->msg_offset, msg->msg_len);
1201 rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_WRITE, NULL,
1202 rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,
1212 ptllnd_abort_on_nak(lnet_ni_t *ni)
1214 ptllnd_ni_t *plni = ni->ni_data;
1216 if (plni->plni_abort_on_nak)
1221 ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator,
1222 kptl_msg_t *msg, unsigned int nob)
1224 ptllnd_ni_t *plni = ni->ni_data;
1225 const int basenob = offsetof(kptl_msg_t, ptlm_u);
1226 lnet_process_id_t srcid;
1235 CERROR("Very short receive from %s\n",
1236 ptllnd_ptlid2str(initiator));
1240 /* I can at least read MAGIC/VERSION */
1242 flip = msg->ptlm_magic == __swab32(PTLLND_MSG_MAGIC);
1243 if (!flip && msg->ptlm_magic != PTLLND_MSG_MAGIC) {
1244 CERROR("Bad protocol magic %08x from %s\n",
1245 msg->ptlm_magic, ptllnd_ptlid2str(initiator));
1249 msg_version = flip ? __swab16(msg->ptlm_version) : msg->ptlm_version;
1251 if (msg_version != PTLLND_MSG_VERSION) {
1252 CERROR("Bad protocol version %04x from %s\n",
1253 (__u32)msg_version, ptllnd_ptlid2str(initiator));
1254 ptllnd_abort_on_nak(ni);
1258 if (nob < basenob) {
1259 CERROR("Short receive from %s: got %d, wanted at least %d\n",
1260 ptllnd_ptlid2str(initiator), nob, basenob);
1264 /* checksum must be computed with
1265 * 1) ptlm_cksum zero and
1266 * 2) BEFORE anything gets modified/flipped
1268 msg_cksum = flip ? __swab32(msg->ptlm_cksum) : msg->ptlm_cksum;
1269 msg->ptlm_cksum = 0;
1270 if (msg_cksum != 0 &&
1271 msg_cksum != ptllnd_cksum(msg, offsetof(kptl_msg_t, ptlm_u))) {
1272 CERROR("Bad checksum from %s\n", ptllnd_ptlid2str(initiator));
1276 msg->ptlm_version = msg_version;
1277 msg->ptlm_cksum = msg_cksum;
1280 /* NB stamps are opaque cookies */
1281 __swab32s(&msg->ptlm_nob);
1282 __swab64s(&msg->ptlm_srcnid);
1283 __swab64s(&msg->ptlm_dstnid);
1284 __swab32s(&msg->ptlm_srcpid);
1285 __swab32s(&msg->ptlm_dstpid);
1288 srcid.nid = msg->ptlm_srcnid;
1289 srcid.pid = msg->ptlm_srcpid;
1291 if (LNET_NIDNET(msg->ptlm_srcnid) != LNET_NIDNET(ni->ni_nid)) {
1292 CERROR("Bad source id %s from %s\n",
1293 libcfs_id2str(srcid),
1294 ptllnd_ptlid2str(initiator));
1298 if (msg->ptlm_type == PTLLND_MSG_TYPE_NAK) {
1299 CERROR("NAK from %s (%s)\n",
1300 libcfs_id2str(srcid),
1301 ptllnd_ptlid2str(initiator));
1302 ptllnd_abort_on_nak(ni);
1306 if (msg->ptlm_dstnid != ni->ni_nid ||
1307 msg->ptlm_dstpid != the_lnet.ln_pid) {
1308 CERROR("Bad dstid %s (%s expected) from %s\n",
1309 libcfs_id2str((lnet_process_id_t) {
1310 .nid = msg->ptlm_dstnid,
1311 .pid = msg->ptlm_dstpid}),
1312 libcfs_id2str((lnet_process_id_t) {
1314 .pid = the_lnet.ln_pid}),
1315 libcfs_id2str(srcid));
1319 if (msg->ptlm_dststamp != plni->plni_stamp) {
1320 CERROR("Bad dststamp "LPX64"("LPX64" expected) from %s\n",
1321 msg->ptlm_dststamp, plni->plni_stamp,
1322 libcfs_id2str(srcid));
1326 PTLLND_HISTORY("RX %s: %s %d %p", libcfs_id2str(srcid),
1327 ptllnd_msgtype2str(msg->ptlm_type), msg->ptlm_credits, &rx);
1329 switch (msg->ptlm_type) {
1330 case PTLLND_MSG_TYPE_PUT:
1331 case PTLLND_MSG_TYPE_GET:
1332 CDEBUG(D_NET, "PTLLND_MSG_TYPE_%s\n",
1333 msg->ptlm_type==PTLLND_MSG_TYPE_PUT ? "PUT" : "GET");
1334 if (nob < basenob + sizeof(kptl_rdma_msg_t)) {
1335 CERROR("Short rdma request from %s(%s)\n",
1336 libcfs_id2str(srcid),
1337 ptllnd_ptlid2str(initiator));
1341 __swab64s(&msg->ptlm_u.rdma.kptlrm_matchbits);
1344 case PTLLND_MSG_TYPE_IMMEDIATE:
1345 CDEBUG(D_NET, "PTLLND_MSG_TYPE_IMMEDIATE\n");
1346 if (nob < offsetof(kptl_msg_t,
1347 ptlm_u.immediate.kptlim_payload)) {
1348 CERROR("Short immediate from %s(%s)\n",
1349 libcfs_id2str(srcid),
1350 ptllnd_ptlid2str(initiator));
1355 case PTLLND_MSG_TYPE_HELLO:
1356 CDEBUG(D_NET, "PTLLND_MSG_TYPE_HELLO from %s(%s)\n",
1357 libcfs_id2str(srcid),
1358 ptllnd_ptlid2str(initiator));
1359 if (nob < basenob + sizeof(kptl_hello_msg_t)) {
1360 CERROR("Short hello from %s(%s)\n",
1361 libcfs_id2str(srcid),
1362 ptllnd_ptlid2str(initiator));
1366 __swab64s(&msg->ptlm_u.hello.kptlhm_matchbits);
1367 __swab32s(&msg->ptlm_u.hello.kptlhm_max_msg_size);
1371 case PTLLND_MSG_TYPE_NOOP:
1372 CDEBUG(D_NET, "PTLLND_MSG_TYPE_NOOP from %s(%s)\n",
1373 libcfs_id2str(srcid),
1374 ptllnd_ptlid2str(initiator));
1378 CERROR("Bad message type %d from %s(%s)\n", msg->ptlm_type,
1379 libcfs_id2str(srcid),
1380 ptllnd_ptlid2str(initiator));
1384 plp = ptllnd_find_peer(ni, srcid,
1385 msg->ptlm_type == PTLLND_MSG_TYPE_HELLO);
1387 CERROR("Can't find peer %s\n", libcfs_id2str(srcid));
1391 if (msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {
1392 if (plp->plp_recvd_hello) {
1393 CERROR("Unexpected HELLO from %s\n",
1394 libcfs_id2str(srcid));
1395 ptllnd_peer_decref(plp);
1399 CDEBUG(D_NET, "maxsz %d match "LPX64" stamp "LPX64"\n",
1400 msg->ptlm_u.hello.kptlhm_max_msg_size,
1401 msg->ptlm_u.hello.kptlhm_matchbits,
1402 msg->ptlm_srcstamp);
1404 plp->plp_max_msg_size = MAX(plni->plni_max_msg_size,
1405 msg->ptlm_u.hello.kptlhm_max_msg_size);
1406 plp->plp_match = msg->ptlm_u.hello.kptlhm_matchbits;
1407 plp->plp_stamp = msg->ptlm_srcstamp;
1408 plp->plp_max_credits += msg->ptlm_credits;
1409 plp->plp_recvd_hello = 1;
1411 CDEBUG(D_NET, "plp_max_msg_size=%d\n",plp->plp_max_msg_size);
1413 } else if (!plp->plp_recvd_hello) {
1415 CERROR("Bad message type %d (HELLO expected) from %s\n",
1416 msg->ptlm_type, libcfs_id2str(srcid));
1417 ptllnd_peer_decref(plp);
1420 } else if (msg->ptlm_srcstamp != plp->plp_stamp) {
1422 CERROR("Bad srcstamp "LPX64"("LPX64" expected) from %s\n",
1423 msg->ptlm_srcstamp, plp->plp_stamp,
1424 libcfs_id2str(srcid));
1425 ptllnd_peer_decref(plp);
1429 if (msg->ptlm_credits > 0) {
1430 CDEBUG(D_NET, "Getting back %d credits from peer\n",msg->ptlm_credits);
1431 if (plp->plp_credits + msg->ptlm_credits >
1432 plp->plp_max_credits) {
1433 CWARN("Too many credits from %s: %d + %d > %d\n",
1434 libcfs_id2str(srcid),
1435 plp->plp_credits, msg->ptlm_credits,
1436 plp->plp_max_credits);
1437 plp->plp_credits = plp->plp_max_credits;
1439 plp->plp_credits += msg->ptlm_credits;
1441 ptllnd_check_sends(plp);
1444 /* All OK so far; assume the message is good... */
1451 CDEBUG(D_NET, "rx=%p type=%d\n",&rx,msg->ptlm_type);
1453 switch (msg->ptlm_type) {
1454 default: /* message types have been checked already */
1455 ptllnd_rx_done(&rx);
1458 case PTLLND_MSG_TYPE_PUT:
1459 case PTLLND_MSG_TYPE_GET:
1460 CDEBUG(D_NET, "PTLLND_MSG_TYPE_%s\n",
1461 msg->ptlm_type==PTLLND_MSG_TYPE_PUT ? "PUT" : "GET");
1462 rc = lnet_parse(ni, &msg->ptlm_u.rdma.kptlrm_hdr,
1463 msg->ptlm_srcnid, &rx, 1);
1464 CDEBUG(D_NET, "lnet_parse rc=%d\n",rc);
1466 ptllnd_rx_done(&rx);
1469 case PTLLND_MSG_TYPE_IMMEDIATE:
1470 CDEBUG(D_NET, "PTLLND_MSG_TYPE_IMMEDIATE\n");
1471 rc = lnet_parse(ni, &msg->ptlm_u.immediate.kptlim_hdr,
1472 msg->ptlm_srcnid, &rx, 0);
1473 CDEBUG(D_NET, "lnet_parse rc=%d\n",rc);
1475 ptllnd_rx_done(&rx);
1479 ptllnd_peer_decref(plp);
1483 ptllnd_buf_event (lnet_ni_t *ni, ptl_event_t *event)
1485 ptllnd_buffer_t *buf = ptllnd_eventarg2obj(event->md.user_ptr);
1486 ptllnd_ni_t *plni = ni->ni_data;
1487 char *msg = &buf->plb_buffer[event->offset];
1489 int unlinked = event->type == PTL_EVENT_UNLINK;
1491 LASSERT (buf->plb_ni == ni);
1492 LASSERT (event->type == PTL_EVENT_PUT_END ||
1493 event->type == PTL_EVENT_UNLINK);
1495 CDEBUG(D_NET, "buf=%p event=%d\n",buf,event->type);
1497 if (event->ni_fail_type != PTL_NI_OK) {
1499 CERROR("event type %d, status %d from %s\n",
1500 event->type, event->ni_fail_type,
1501 ptllnd_ptlid2str(event->initiator));
1503 } else if (event->type == PTL_EVENT_PUT_END) {
1504 #if (PTL_MD_LOCAL_ALIGN8 == 0)
1505 /* Portals can't force message alignment - someone sending an
1506 * odd-length message could misalign subsequent messages */
1507 if ((event->mlength & 7) != 0) {
1508 CERROR("Message from %s has odd length %d: "
1509 "probable version incompatibility\n",
1510 ptllnd_ptlid2str(event->initiator),
1515 LASSERT ((event->offset & 7) == 0);
1517 ptllnd_parse_request(ni, event->initiator,
1518 (kptl_msg_t *)msg, event->mlength);
1521 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
1522 /* UNLINK event only on explicit unlink */
1523 repost = (event->unlinked && event->type != PTL_EVENT_UNLINK);
1524 if (event->unlinked)
1527 /* UNLINK event only on implicit unlink */
1528 repost = (event->type == PTL_EVENT_UNLINK);
1531 CDEBUG(D_NET, "repost=%d unlinked=%d\n",repost,unlinked);
1534 LASSERT(buf->plb_posted);
1535 buf->plb_posted = 0;
1536 plni->plni_nposted_buffers--;
1540 (void) ptllnd_post_buffer(buf);
1544 ptllnd_tx_event (lnet_ni_t *ni, ptl_event_t *event)
1546 ptllnd_ni_t *plni = ni->ni_data;
1547 ptllnd_tx_t *tx = ptllnd_eventarg2obj(event->md.user_ptr);
1548 int error = (event->ni_fail_type != PTL_NI_OK);
1551 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
1552 int unlinked = event->unlinked;
1554 int unlinked = (event->type == PTL_EVENT_UNLINK);
1558 CERROR("Error event type %d for %s for %s\n",
1559 event->type, ptllnd_msgtype2str(tx->tx_type),
1560 libcfs_id2str(tx->tx_peer->plp_id));
1562 LASSERT (!PtlHandleIsEqual(event->md_handle, PTL_INVALID_HANDLE));
1564 CDEBUG(D_NET, "tx=%p type=%s (%d)\n",tx,
1565 ptllnd_msgtype2str(tx->tx_type),tx->tx_type);
1566 CDEBUG(D_NET, "unlinked=%d\n",unlinked);
1567 CDEBUG(D_NET, "error=%d\n",error);
1569 isreq = PtlHandleIsEqual(event->md_handle, tx->tx_reqmdh);
1570 CDEBUG(D_NET, "isreq=%d\n",isreq);
1572 LASSERT (event->md.start == (void *)&tx->tx_msg);
1574 tx->tx_reqmdh = PTL_INVALID_HANDLE;
1575 PTLLND_DBGT_STAMP(tx->tx_req_done);
1579 isbulk = PtlHandleIsEqual(event->md_handle, tx->tx_bulkmdh);
1580 CDEBUG(D_NET, "isbulk=%d\n",isbulk);
1581 if ( isbulk && unlinked ) {
1582 tx->tx_bulkmdh = PTL_INVALID_HANDLE;
1583 PTLLND_DBGT_STAMP(tx->tx_bulk_done);
1586 LASSERT (!isreq != !isbulk); /* always one and only 1 match */
1588 PTLLND_HISTORY("%s[%d/%d]: TX done %p %s%s",
1589 libcfs_id2str(tx->tx_peer->plp_id),
1590 tx->tx_peer->plp_credits,
1591 tx->tx_peer->plp_outstanding_credits,
1592 tx, isreq ? "REQ" : "BULK", unlinked ? "(unlinked)" : "");
1594 LASSERT (!isreq != !isbulk); /* always one and only 1 match */
1595 switch (tx->tx_type) {
1599 case PTLLND_MSG_TYPE_NOOP:
1600 case PTLLND_MSG_TYPE_HELLO:
1601 case PTLLND_MSG_TYPE_IMMEDIATE:
1602 LASSERT (event->type == PTL_EVENT_UNLINK ||
1603 event->type == PTL_EVENT_SEND_END);
1607 case PTLLND_MSG_TYPE_GET:
1608 LASSERT (event->type == PTL_EVENT_UNLINK ||
1609 (isreq && event->type == PTL_EVENT_SEND_END) ||
1610 (isbulk && event->type == PTL_EVENT_PUT_END));
1612 if (isbulk && !error && event->type == PTL_EVENT_PUT_END) {
1613 /* Check GET matched */
1614 if (event->hdr_data == PTLLND_RDMA_OK) {
1615 lnet_set_reply_msg_len(ni,
1616 tx->tx_lnetreplymsg,
1619 CERROR ("Unmatched GET with %s\n",
1620 libcfs_id2str(tx->tx_peer->plp_id));
1621 tx->tx_status = -EIO;
1626 case PTLLND_MSG_TYPE_PUT:
1627 LASSERT (event->type == PTL_EVENT_UNLINK ||
1628 (isreq && event->type == PTL_EVENT_SEND_END) ||
1629 (isbulk && event->type == PTL_EVENT_GET_END));
1632 case PTLLND_RDMA_READ:
1633 LASSERT (event->type == PTL_EVENT_UNLINK ||
1634 event->type == PTL_EVENT_SEND_END ||
1635 event->type == PTL_EVENT_REPLY_END);
1639 case PTLLND_RDMA_WRITE:
1640 LASSERT (event->type == PTL_EVENT_UNLINK ||
1641 event->type == PTL_EVENT_SEND_END);
1645 /* Schedule ptllnd_tx_done() on error or last completion event */
1647 (PtlHandleIsEqual(tx->tx_bulkmdh, PTL_INVALID_HANDLE) &&
1648 PtlHandleIsEqual(tx->tx_reqmdh, PTL_INVALID_HANDLE))) {
1650 tx->tx_status = -EIO;
1651 list_del(&tx->tx_list);
1652 list_add_tail(&tx->tx_list, &plni->plni_zombie_txs);
1653 CDEBUG(D_NET, "tx=%p ONTO ZOMBIE LIST\n",tx);
1658 ptllnd_wait (lnet_ni_t *ni, int milliseconds)
1660 static struct timeval prevt;
1661 static int prevt_count;
1662 static int call_count;
1667 ptllnd_ni_t *plni = ni->ni_data;
1676 /* Handle any currently queued events, returning immediately if any.
1677 * Otherwise block for the timeout and handle all events queued
1680 gettimeofday(&t1, NULL);
1684 time_t then = cfs_time_current_sec();
1686 CDEBUG(D_NET, "Poll(%d)\n", timeout);
1688 rc = PtlEQPoll(&plni->plni_eqh, 1,
1689 (timeout < 0) ? PTL_TIME_FOREVER : timeout,
1693 (cfs_time_current_sec() - then)*1000 > timeout + 1000) {
1694 /* 1000 mS grace.............................^ */
1695 CERROR("SLOW PtlEQPoll(%d): %d seconds\n", timeout,
1696 (int)(cfs_time_current_sec() - then));
1699 CDEBUG(D_NET, "PtlEQPoll rc=%d\n",rc);
1702 if (rc == PTL_EQ_EMPTY) {
1703 if (found || /* handled some events */
1704 milliseconds == 0 || /* just checking */
1705 blocked) /* blocked already */
1709 timeout = (milliseconds < 0) ?
1710 PTL_TIME_FOREVER : milliseconds;
1714 LASSERT (rc == PTL_OK || rc == PTL_EQ_DROPPED);
1716 if (rc == PTL_EQ_DROPPED)
1717 CERROR("Event queue: size %d is too small\n",
1718 plni->plni_eq_size);
1720 CDEBUG(D_NET, "event.type=%s(%d)\n",
1721 ptllnd_evtype2str(event.type),event.type);
1724 switch (ptllnd_eventarg2type(event.md.user_ptr)) {
1728 case PTLLND_EVENTARG_TYPE_TX:
1729 ptllnd_tx_event(ni, &event);
1732 case PTLLND_EVENTARG_TYPE_BUF:
1733 ptllnd_buf_event(ni, &event);
1738 while (!list_empty(&plni->plni_zombie_txs)) {
1739 tx = list_entry(plni->plni_zombie_txs.next,
1740 ptllnd_tx_t, tx_list);
1741 CDEBUG(D_NET, "Process ZOMBIE tx=%p\n",tx);
1745 gettimeofday(&t2, NULL);
1747 if (prevt.tv_sec == 0 ||
1748 prevt.tv_sec != t2.tv_sec) {
1749 PTLLND_HISTORY("%d wait entered at %d.%06d - prev %d %d.%06d",
1750 call_count, (int)t1.tv_sec, (int)t1.tv_usec,
1751 prevt_count, (int)prevt.tv_sec, (int)prevt.tv_usec);