1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved.
5 * Author: Eric Barton <eeb@bartonsoftware.com>
7 * This file is part of the Lustre file system, http://www.lustre.org
8 * Lustre is a trademark of Cluster File Systems, Inc.
10 * This file is confidential source code owned by Cluster File Systems.
11 * No viewing, modification, compilation, redistribution, or any other
12 * form of use is permitted except through a signed license agreement.
14 * If you have not signed such an agreement, then you have no rights to
15 * this file. Please destroy it immediately and contact CFS.
22 ptllnd_ptlid2str(ptl_process_id_t id)
24 static char strs[8][32];
27 char *str = strs[idx++];
29 if (idx >= sizeof(strs)/sizeof(strs[0]))
32 snprintf(str, sizeof(strs[0]), FMT_PTLID, id.pid, id.nid);
37 ptllnd_destroy_peer(ptllnd_peer_t *peer)
39 lnet_ni_t *ni = peer->plp_ni;
40 ptllnd_ni_t *plni = ni->ni_data;
41 int nmsg = peer->plp_lazy_credits +
42 plni->plni_peer_credits;
44 ptllnd_size_buffers(ni, -nmsg);
46 LASSERT (peer->plp_closing);
47 LASSERT (plni->plni_npeers > 0);
48 LASSERT (list_empty(&peer->plp_txq));
49 LASSERT (list_empty(&peer->plp_activeq));
51 LIBCFS_FREE(peer, sizeof(*peer));
55 ptllnd_abort_txs(ptllnd_ni_t *plni, struct list_head *q)
57 while (!list_empty(q)) {
58 ptllnd_tx_t *tx = list_entry(q->next, ptllnd_tx_t, tx_list);
60 tx->tx_status = -ESHUTDOWN;
61 list_del(&tx->tx_list);
62 list_add_tail(&tx->tx_list, &plni->plni_zombie_txs);
67 ptllnd_close_peer(ptllnd_peer_t *peer, int error)
69 lnet_ni_t *ni = peer->plp_ni;
70 ptllnd_ni_t *plni = ni->ni_data;
72 if (peer->plp_closing)
75 peer->plp_closing = 1;
77 if (!list_empty(&peer->plp_txq) ||
78 !list_empty(&peer->plp_activeq) ||
80 CERROR("Closing %s\n", libcfs_id2str(peer->plp_id));
81 ptllnd_debug_peer(ni, peer->plp_id);
84 ptllnd_abort_txs(plni, &peer->plp_txq);
85 ptllnd_abort_txs(plni, &peer->plp_activeq);
87 list_del(&peer->plp_list);
88 ptllnd_peer_decref(peer);
92 ptllnd_find_peer(lnet_ni_t *ni, lnet_process_id_t id, int create)
94 ptllnd_ni_t *plni = ni->ni_data;
95 unsigned int hash = LNET_NIDADDR(id.nid) % plni->plni_peer_hash_size;
96 struct list_head *tmp;
101 LASSERT (LNET_NIDNET(id.nid) == LNET_NIDNET(ni->ni_nid));
103 list_for_each(tmp, &plni->plni_peer_hash[hash]) {
104 plp = list_entry(tmp, ptllnd_peer_t, plp_list);
106 if (plp->plp_id.nid == id.nid &&
107 plp->plp_id.pid == id.pid) {
108 ptllnd_peer_addref(plp);
116 /* New peer: check first for enough posted buffers */
118 rc = ptllnd_size_buffers(ni, plni->plni_peer_credits);
124 LIBCFS_ALLOC(plp, sizeof(*plp));
126 CERROR("Can't allocate new peer %s\n", libcfs_id2str(id));
128 ptllnd_size_buffers(ni, -plni->plni_peer_credits);
134 plp->plp_ptlid.nid = LNET_NIDADDR(id.nid);
135 plp->plp_ptlid.pid = plni->plni_ptllnd_pid;
136 plp->plp_credits = 1; /* add more later when she gives me credits */
137 plp->plp_max_msg_size = plni->plni_max_msg_size; /* until I hear from her */
138 plp->plp_sent_credits = 1; /* Implicit credit for HELLO */
139 plp->plp_outstanding_credits = plni->plni_peer_credits - 1;
140 plp->plp_lazy_credits = 0;
141 plp->plp_extra_lazy_credits = 0;
144 plp->plp_recvd_hello = 0;
145 plp->plp_closing = 0;
146 plp->plp_refcount = 1;
147 CFS_INIT_LIST_HEAD(&plp->plp_list);
148 CFS_INIT_LIST_HEAD(&plp->plp_txq);
149 CFS_INIT_LIST_HEAD(&plp->plp_activeq);
151 ptllnd_peer_addref(plp);
152 list_add_tail(&plp->plp_list, &plni->plni_peer_hash[hash]);
154 tx = ptllnd_new_tx(plp, PTLLND_MSG_TYPE_HELLO, 0);
156 CERROR("Can't send HELLO to %s\n", libcfs_id2str(id));
157 ptllnd_close_peer(plp, -ENOMEM);
158 ptllnd_peer_decref(plp);
162 tx->tx_msg.ptlm_u.hello.kptlhm_matchbits = PTL_RESERVED_MATCHBITS;
163 tx->tx_msg.ptlm_u.hello.kptlhm_max_msg_size = plni->plni_max_msg_size;
165 PTLLND_HISTORY("%s[%d/%d+%d(%d)]: post hello %p", libcfs_id2str(id),
166 tx->tx_peer->plp_credits,
167 tx->tx_peer->plp_outstanding_credits,
168 tx->tx_peer->plp_sent_credits,
169 plni->plni_peer_credits +
170 tx->tx_peer->plp_lazy_credits, tx);
177 ptllnd_count_q(struct list_head *q)
182 list_for_each(e, q) {
190 ptllnd_tx_typestr(int type)
193 case PTLLND_RDMA_WRITE:
196 case PTLLND_RDMA_READ:
199 case PTLLND_MSG_TYPE_PUT:
202 case PTLLND_MSG_TYPE_GET:
205 case PTLLND_MSG_TYPE_IMMEDIATE:
208 case PTLLND_MSG_TYPE_NOOP:
211 case PTLLND_MSG_TYPE_HELLO:
220 ptllnd_debug_tx(ptllnd_tx_t *tx)
222 CDEBUG(D_WARNING, "%s %s b "DBGT_FMT"/"DBGT_FMT
223 " r "DBGT_FMT"/"DBGT_FMT" status %d\n",
224 ptllnd_tx_typestr(tx->tx_type),
225 libcfs_id2str(tx->tx_peer->plp_id)
226 DBGT_ARGS(tx->tx_bulk_posted) DBGT_ARGS(tx->tx_bulk_done)
227 DBGT_ARGS(tx->tx_req_posted) DBGT_ARGS(tx->tx_req_done),
232 ptllnd_debug_peer(lnet_ni_t *ni, lnet_process_id_t id)
234 ptllnd_peer_t *plp = ptllnd_find_peer(ni, id, 0);
235 struct list_head *tmp;
236 ptllnd_ni_t *plni = ni->ni_data;
240 CDEBUG(D_WARNING, "No peer %s\n", libcfs_id2str(id));
244 CDEBUG(D_WARNING, "%s %s%s [%d] "LPD64".%06d m "LPD64" q %d/%d c %d/%d+%d(%d)\n",
246 plp->plp_recvd_hello ? "H" : "_",
247 plp->plp_closing ? "C" : "_",
249 plp->plp_stamp / 1000000, (int)(plp->plp_stamp % 1000000),
251 ptllnd_count_q(&plp->plp_txq),
252 ptllnd_count_q(&plp->plp_activeq),
253 plp->plp_credits, plp->plp_outstanding_credits, plp->plp_sent_credits,
254 plni->plni_peer_credits + plp->plp_lazy_credits);
256 CDEBUG(D_WARNING, "txq:\n");
257 list_for_each (tmp, &plp->plp_txq) {
258 tx = list_entry(tmp, ptllnd_tx_t, tx_list);
263 CDEBUG(D_WARNING, "activeq:\n");
264 list_for_each (tmp, &plp->plp_activeq) {
265 tx = list_entry(tmp, ptllnd_tx_t, tx_list);
270 CDEBUG(D_WARNING, "zombies:\n");
271 list_for_each (tmp, &plni->plni_zombie_txs) {
272 tx = list_entry(tmp, ptllnd_tx_t, tx_list);
274 if (tx->tx_peer->plp_id.nid == id.nid &&
275 tx->tx_peer->plp_id.pid == id.pid)
279 CDEBUG(D_WARNING, "history:\n");
280 list_for_each (tmp, &plni->plni_tx_history) {
281 tx = list_entry(tmp, ptllnd_tx_t, tx_list);
283 if (tx->tx_peer->plp_id.nid == id.nid &&
284 tx->tx_peer->plp_id.pid == id.pid)
288 ptllnd_peer_decref(plp);
289 ptllnd_dump_history();
293 ptllnd_notify(lnet_ni_t *ni, lnet_nid_t nid, int alive)
295 lnet_process_id_t id;
297 time_t start = cfs_time_current_sec();
298 int w = PTLLND_WARN_LONG_WAIT;
300 /* This is only actually used to connect to routers at startup! */
307 id.pid = LUSTRE_SRV_LNET_PID;
309 peer = ptllnd_find_peer(ni, id, 1);
313 /* wait for the peer to reply */
314 while (!peer->plp_recvd_hello) {
315 if (cfs_time_current_sec() > start + w) {
316 CWARN("Waited %ds to connect to %s\n",
317 w, libcfs_id2str(id));
321 ptllnd_wait(ni, w*1000);
324 ptllnd_peer_decref(peer);
328 ptllnd_setasync(lnet_ni_t *ni, lnet_process_id_t id, int nasync)
330 ptllnd_peer_t *peer = ptllnd_find_peer(ni, id, nasync > 0);
336 LASSERT (peer->plp_lazy_credits >= 0);
337 LASSERT (peer->plp_extra_lazy_credits >= 0);
339 /* If nasync < 0, we're being told we can reduce the total message
340 * headroom. We can't do this right now because our peer might already
341 * have credits for the extra buffers, so we just account the extra
342 * headroom in case we need it later and only destroy buffers when the
345 * Note that the following condition handles this case, where it
346 * actually increases the extra lazy credit counter. */
348 if (nasync <= peer->plp_extra_lazy_credits) {
349 peer->plp_extra_lazy_credits -= nasync;
353 LASSERT (nasync > 0);
355 nasync -= peer->plp_extra_lazy_credits;
356 peer->plp_extra_lazy_credits = 0;
358 rc = ptllnd_size_buffers(ni, nasync);
360 peer->plp_lazy_credits += nasync;
361 peer->plp_outstanding_credits += nasync;
368 ptllnd_cksum (void *ptr, int nob)
374 sum = ((sum << 1) | (sum >> 31)) + *c++;
376 /* ensure I don't return 0 (== no checksum) */
377 return (sum == 0) ? 1 : sum;
381 ptllnd_new_tx(ptllnd_peer_t *peer, int type, int payload_nob)
383 lnet_ni_t *ni = peer->plp_ni;
384 ptllnd_ni_t *plni = ni->ni_data;
388 CDEBUG(D_NET, "peer=%p type=%d payload=%d\n", peer, type, payload_nob);
394 case PTLLND_RDMA_WRITE:
395 case PTLLND_RDMA_READ:
396 LASSERT (payload_nob == 0);
400 case PTLLND_MSG_TYPE_PUT:
401 case PTLLND_MSG_TYPE_GET:
402 LASSERT (payload_nob == 0);
403 msgsize = offsetof(kptl_msg_t, ptlm_u) +
404 sizeof(kptl_rdma_msg_t);
407 case PTLLND_MSG_TYPE_IMMEDIATE:
408 msgsize = offsetof(kptl_msg_t,
409 ptlm_u.immediate.kptlim_payload[payload_nob]);
412 case PTLLND_MSG_TYPE_NOOP:
413 LASSERT (payload_nob == 0);
414 msgsize = offsetof(kptl_msg_t, ptlm_u);
417 case PTLLND_MSG_TYPE_HELLO:
418 LASSERT (payload_nob == 0);
419 msgsize = offsetof(kptl_msg_t, ptlm_u) +
420 sizeof(kptl_hello_msg_t);
424 msgsize = (msgsize + 7) & ~7;
425 LASSERT (msgsize <= peer->plp_max_msg_size);
427 LIBCFS_ALLOC(tx, offsetof(ptllnd_tx_t, tx_msg) + msgsize);
430 CERROR("Can't allocate msg type %d for %s\n",
431 type, libcfs_id2str(peer->plp_id));
435 CFS_INIT_LIST_HEAD(&tx->tx_list);
438 tx->tx_lnetmsg = tx->tx_lnetreplymsg = NULL;
441 tx->tx_reqmdh = PTL_INVALID_HANDLE;
442 tx->tx_bulkmdh = PTL_INVALID_HANDLE;
443 tx->tx_msgsize = msgsize;
444 tx->tx_completing = 0;
447 PTLLND_DBGT_INIT(tx->tx_bulk_posted);
448 PTLLND_DBGT_INIT(tx->tx_bulk_done);
449 PTLLND_DBGT_INIT(tx->tx_req_posted);
450 PTLLND_DBGT_INIT(tx->tx_req_done);
453 tx->tx_msg.ptlm_magic = PTLLND_MSG_MAGIC;
454 tx->tx_msg.ptlm_version = PTLLND_MSG_VERSION;
455 tx->tx_msg.ptlm_type = type;
456 tx->tx_msg.ptlm_credits = 0;
457 tx->tx_msg.ptlm_nob = msgsize;
458 tx->tx_msg.ptlm_cksum = 0;
459 tx->tx_msg.ptlm_srcnid = ni->ni_nid;
460 tx->tx_msg.ptlm_srcstamp = plni->plni_stamp;
461 tx->tx_msg.ptlm_dstnid = peer->plp_id.nid;
462 tx->tx_msg.ptlm_dststamp = peer->plp_stamp;
463 tx->tx_msg.ptlm_srcpid = the_lnet.ln_pid;
464 tx->tx_msg.ptlm_dstpid = peer->plp_id.pid;
467 ptllnd_peer_addref(peer);
470 CDEBUG(D_NET, "tx=%p\n",tx);
476 ptllnd_abort_tx(ptllnd_tx_t *tx, ptl_handle_md_t *mdh)
478 ptllnd_peer_t *peer = tx->tx_peer;
479 lnet_ni_t *ni = peer->plp_ni;
481 time_t start = cfs_time_current_sec();
482 int w = PTLLND_WARN_LONG_WAIT;
484 while (!PtlHandleIsEqual(*mdh, PTL_INVALID_HANDLE)) {
485 rc = PtlMDUnlink(*mdh);
486 #ifndef LUSTRE_PORTALS_UNLINK_SEMANTICS
487 if (rc == PTL_OK) /* unlink successful => no unlinked event */
489 LASSERT (rc == PTL_MD_IN_USE);
491 if (cfs_time_current_sec() > start + w) {
492 CWARN("Waited %ds to abort tx to %s\n",
493 w, libcfs_id2str(peer->plp_id));
496 /* Wait for ptllnd_tx_event() to invalidate */
497 ptllnd_wait(ni, w*1000);
502 ptllnd_cull_tx_history(ptllnd_ni_t *plni)
504 int max = plni->plni_max_tx_history;
506 while (plni->plni_ntx_history > max) {
507 ptllnd_tx_t *tx = list_entry(plni->plni_tx_history.next,
508 ptllnd_tx_t, tx_list);
509 list_del(&tx->tx_list);
511 ptllnd_peer_decref(tx->tx_peer);
513 LIBCFS_FREE(tx, offsetof(ptllnd_tx_t, tx_msg) + tx->tx_msgsize);
515 LASSERT (plni->plni_ntxs > 0);
517 plni->plni_ntx_history--;
522 ptllnd_tx_done(ptllnd_tx_t *tx)
524 ptllnd_peer_t *peer = tx->tx_peer;
525 lnet_ni_t *ni = peer->plp_ni;
526 ptllnd_ni_t *plni = ni->ni_data;
528 /* CAVEAT EMPTOR: If this tx is being aborted, I'll continue to get
529 * events for this tx until it's unlinked. So I set tx_completing to
530 * flag the tx is getting handled */
532 if (tx->tx_completing)
535 tx->tx_completing = 1;
537 if (!list_empty(&tx->tx_list))
538 list_del_init(&tx->tx_list);
540 if (tx->tx_status != 0) {
541 CERROR("Completing tx with error\n");
543 ptllnd_close_peer(peer, tx->tx_status);
546 ptllnd_abort_tx(tx, &tx->tx_reqmdh);
547 ptllnd_abort_tx(tx, &tx->tx_bulkmdh);
549 if (tx->tx_niov > 0) {
550 LIBCFS_FREE(tx->tx_iov, tx->tx_niov * sizeof(*tx->tx_iov));
554 if (tx->tx_lnetreplymsg != NULL) {
555 LASSERT (tx->tx_type == PTLLND_MSG_TYPE_GET);
556 LASSERT (tx->tx_lnetmsg != NULL);
557 /* Simulate GET success always */
558 lnet_finalize(ni, tx->tx_lnetmsg, 0);
559 CDEBUG(D_NET, "lnet_finalize(tx_lnetreplymsg=%p)\n",tx->tx_lnetreplymsg);
560 lnet_finalize(ni, tx->tx_lnetreplymsg, tx->tx_status);
561 } else if (tx->tx_lnetmsg != NULL) {
562 lnet_finalize(ni, tx->tx_lnetmsg, tx->tx_status);
565 plni->plni_ntx_history++;
566 list_add_tail(&tx->tx_list, &plni->plni_tx_history);
568 ptllnd_cull_tx_history(plni);
572 ptllnd_set_txiov(ptllnd_tx_t *tx,
573 unsigned int niov, struct iovec *iov,
574 unsigned int offset, unsigned int len)
576 ptl_md_iovec_t *piov;
585 * Remove iovec's at the beginning that
586 * are skipped because of the offset.
587 * Adjust the offset accordingly
591 if (offset < iov->iov_len)
593 offset -= iov->iov_len;
599 int temp_offset = offset;
601 LIBCFS_ALLOC(piov, niov * sizeof(*piov));
605 for (npiov = 0;; npiov++) {
606 LASSERT (npiov < niov);
607 LASSERT (iov->iov_len >= temp_offset);
609 piov[npiov].iov_base = iov[npiov].iov_base + temp_offset;
610 piov[npiov].iov_len = iov[npiov].iov_len - temp_offset;
612 if (piov[npiov].iov_len >= resid) {
613 piov[npiov].iov_len = resid;
617 resid -= piov[npiov].iov_len;
627 /* Dang! The piov I allocated was too big and it's a drag to
628 * have to maintain separate 'allocated' and 'used' sizes, so
629 * I'll just do it again; NB this doesn't happen normally... */
630 LIBCFS_FREE(piov, niov * sizeof(*piov));
636 ptllnd_set_md_buffer(ptl_md_t *md, ptllnd_tx_t *tx)
638 unsigned int niov = tx->tx_niov;
639 ptl_md_iovec_t *iov = tx->tx_iov;
641 LASSERT ((md->options & PTL_MD_IOVEC) == 0);
646 } else if (niov == 1) {
647 md->start = iov[0].iov_base;
648 md->length = iov[0].iov_len;
652 md->options |= PTL_MD_IOVEC;
657 ptllnd_post_buffer(ptllnd_buffer_t *buf)
659 lnet_ni_t *ni = buf->plb_ni;
660 ptllnd_ni_t *plni = ni->ni_data;
661 ptl_process_id_t anyid = {
665 .start = buf->plb_buffer,
666 .length = plni->plni_buffer_size,
667 .threshold = PTL_MD_THRESH_INF,
668 .max_size = plni->plni_max_msg_size,
669 .options = (PTLLND_MD_OPTIONS |
670 PTL_MD_OP_PUT | PTL_MD_MAX_SIZE |
671 PTL_MD_LOCAL_ALIGN8),
672 .user_ptr = ptllnd_obj2eventarg(buf, PTLLND_EVENTARG_TYPE_BUF),
673 .eq_handle = plni->plni_eqh};
677 LASSERT (!buf->plb_posted);
679 rc = PtlMEAttach(plni->plni_nih, plni->plni_portal,
680 anyid, LNET_MSG_MATCHBITS, 0,
681 PTL_UNLINK, PTL_INS_AFTER, &meh);
683 CERROR("PtlMEAttach failed: %d\n", rc);
688 plni->plni_nposted_buffers++;
690 rc = PtlMDAttach(meh, md, LNET_UNLINK, &buf->plb_md);
694 CERROR("PtlMDAttach failed: %d\n", rc);
697 plni->plni_nposted_buffers--;
699 rc = PtlMEUnlink(meh);
700 LASSERT (rc == PTL_OK);
706 ptllnd_check_sends(ptllnd_peer_t *peer)
708 lnet_ni_t *ni = peer->plp_ni;
709 ptllnd_ni_t *plni = ni->ni_data;
715 CDEBUG(D_NET, "%s: [%d/%d+%d(%d)\n",
716 libcfs_id2str(peer->plp_id), peer->plp_credits,
717 peer->plp_outstanding_credits, peer->plp_sent_credits,
718 plni->plni_peer_credits + peer->plp_lazy_credits);
720 if (list_empty(&peer->plp_txq) &&
721 peer->plp_outstanding_credits >= PTLLND_CREDIT_HIGHWATER(plni) &&
722 peer->plp_credits != 0) {
724 tx = ptllnd_new_tx(peer, PTLLND_MSG_TYPE_NOOP, 0);
725 CDEBUG(D_NET, "NOOP tx=%p\n",tx);
727 CERROR("Can't return credits to %s\n",
728 libcfs_id2str(peer->plp_id));
730 list_add_tail(&tx->tx_list, &peer->plp_txq);
734 while (!list_empty(&peer->plp_txq)) {
735 tx = list_entry(peer->plp_txq.next, ptllnd_tx_t, tx_list);
737 LASSERT (tx->tx_msgsize > 0);
739 LASSERT (peer->plp_outstanding_credits >= 0);
740 LASSERT (peer->plp_sent_credits >= 0);
741 LASSERT (peer->plp_outstanding_credits + peer->plp_sent_credits
742 <= plni->plni_peer_credits + peer->plp_lazy_credits);
743 LASSERT (peer->plp_credits >= 0);
745 if (peer->plp_credits == 0) { /* no credits */
746 PTLLND_HISTORY("%s[%d/%d+%d(%d)]: no creds for %p",
747 libcfs_id2str(peer->plp_id),
749 peer->plp_outstanding_credits,
750 peer->plp_sent_credits,
751 plni->plni_peer_credits +
752 peer->plp_lazy_credits, tx);
756 if (peer->plp_credits == 1 && /* last credit reserved for */
757 peer->plp_outstanding_credits == 0) { /* returning credits */
758 PTLLND_HISTORY("%s[%d/%d+%d(%d)]: too few creds for %p",
759 libcfs_id2str(peer->plp_id),
761 peer->plp_outstanding_credits,
762 peer->plp_sent_credits,
763 plni->plni_peer_credits +
764 peer->plp_lazy_credits, tx);
768 list_del(&tx->tx_list);
769 list_add_tail(&tx->tx_list, &peer->plp_activeq);
771 CDEBUG(D_NET, "Sending at TX=%p type=%s (%d)\n",tx,
772 ptllnd_msgtype2str(tx->tx_type),tx->tx_type);
774 if (tx->tx_type == PTLLND_MSG_TYPE_NOOP &&
775 (!list_empty(&peer->plp_txq) ||
776 peer->plp_outstanding_credits <
777 PTLLND_CREDIT_HIGHWATER(plni))) {
783 /* Set stamp at the last minute; on a new peer, I don't know it
784 * until I receive the HELLO back */
785 tx->tx_msg.ptlm_dststamp = peer->plp_stamp;
788 * Return all the credits we have
790 tx->tx_msg.ptlm_credits = peer->plp_outstanding_credits;
791 peer->plp_sent_credits += peer->plp_outstanding_credits;
792 peer->plp_outstanding_credits = 0;
799 if (plni->plni_checksum)
800 tx->tx_msg.ptlm_cksum =
801 ptllnd_cksum(&tx->tx_msg,
802 offsetof(kptl_msg_t, ptlm_u));
804 md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);
805 md.eq_handle = plni->plni_eqh;
807 md.options = PTLLND_MD_OPTIONS;
808 md.start = &tx->tx_msg;
809 md.length = tx->tx_msgsize;
811 rc = PtlMDBind(plni->plni_nih, md, LNET_UNLINK, &mdh);
813 CERROR("PtlMDBind for %s failed: %d\n",
814 libcfs_id2str(peer->plp_id), rc);
815 tx->tx_status = -EIO;
820 LASSERT (tx->tx_type != PTLLND_RDMA_WRITE &&
821 tx->tx_type != PTLLND_RDMA_READ);
824 PTLLND_DBGT_STAMP(tx->tx_req_posted);
826 PTLLND_HISTORY("%s[%d/%d+%d(%d)]: %s %p c %d",
827 libcfs_id2str(peer->plp_id),
829 peer->plp_outstanding_credits,
830 peer->plp_sent_credits,
831 plni->plni_peer_credits +
832 peer->plp_lazy_credits,
833 ptllnd_msgtype2str(tx->tx_type), tx,
834 tx->tx_msg.ptlm_credits);
836 rc = PtlPut(mdh, PTL_NOACK_REQ, peer->plp_ptlid,
837 plni->plni_portal, 0, LNET_MSG_MATCHBITS, 0, 0);
839 CERROR("PtlPut for %s failed: %d\n",
840 libcfs_id2str(peer->plp_id), rc);
841 tx->tx_status = -EIO;
849 ptllnd_passive_rdma(ptllnd_peer_t *peer, int type, lnet_msg_t *msg,
850 unsigned int niov, struct iovec *iov,
851 unsigned int offset, unsigned int len)
853 lnet_ni_t *ni = peer->plp_ni;
854 ptllnd_ni_t *plni = ni->ni_data;
855 ptllnd_tx_t *tx = ptllnd_new_tx(peer, type, 0);
865 CDEBUG(D_NET, "niov=%d offset=%d len=%d\n",niov,offset,len);
867 LASSERT (type == PTLLND_MSG_TYPE_GET ||
868 type == PTLLND_MSG_TYPE_PUT);
871 CERROR("Can't allocate %s tx for %s\n",
872 type == PTLLND_MSG_TYPE_GET ? "GET" : "PUT/REPLY",
873 libcfs_id2str(peer->plp_id));
877 rc = ptllnd_set_txiov(tx, niov, iov, offset, len);
879 CERROR ("Can't allocate iov %d for %s\n",
880 niov, libcfs_id2str(peer->plp_id));
885 md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);
886 md.eq_handle = plni->plni_eqh;
889 md.options = PTLLND_MD_OPTIONS;
890 if(type == PTLLND_MSG_TYPE_GET)
891 md.options |= PTL_MD_OP_PUT | PTL_MD_ACK_DISABLE;
893 md.options |= PTL_MD_OP_GET;
894 ptllnd_set_md_buffer(&md, tx);
896 start = cfs_time_current_sec();
897 w = PTLLND_WARN_LONG_WAIT;
899 while (!peer->plp_recvd_hello) { /* wait to validate plp_match */
900 if (peer->plp_closing) {
904 if (cfs_time_current_sec() > start + w) {
905 CWARN("Waited %ds to connect to %s\n",
906 w, libcfs_id2str(peer->plp_id));
909 ptllnd_wait(ni, w*1000);
912 if (peer->plp_match < PTL_RESERVED_MATCHBITS)
913 peer->plp_match = PTL_RESERVED_MATCHBITS;
914 matchbits = peer->plp_match++;
915 CDEBUG(D_NET, "matchbits " LPX64 " %s\n", matchbits,
916 ptllnd_ptlid2str(peer->plp_ptlid));
918 rc = PtlMEAttach(plni->plni_nih, plni->plni_portal, peer->plp_ptlid,
919 matchbits, 0, PTL_UNLINK, PTL_INS_BEFORE, &meh);
921 CERROR("PtlMEAttach for %s failed: %d\n",
922 libcfs_id2str(peer->plp_id), rc);
927 PTLLND_DBGT_STAMP(tx->tx_bulk_posted);
929 rc = PtlMDAttach(meh, md, LNET_UNLINK, &mdh);
931 CERROR("PtlMDAttach for %s failed: %d\n",
932 libcfs_id2str(peer->plp_id), rc);
933 rc2 = PtlMEUnlink(meh);
934 LASSERT (rc2 == PTL_OK);
938 tx->tx_bulkmdh = mdh;
941 * We need to set the stamp here because it
942 * we could have received a HELLO above that set
945 tx->tx_msg.ptlm_dststamp = peer->plp_stamp;
947 tx->tx_msg.ptlm_u.rdma.kptlrm_hdr = msg->msg_hdr;
948 tx->tx_msg.ptlm_u.rdma.kptlrm_matchbits = matchbits;
950 if (type == PTLLND_MSG_TYPE_GET) {
951 tx->tx_lnetreplymsg = lnet_create_reply_msg(ni, msg);
952 if (tx->tx_lnetreplymsg == NULL) {
953 CERROR("Can't create reply for GET to %s\n",
954 libcfs_id2str(msg->msg_target));
960 tx->tx_lnetmsg = msg;
961 PTLLND_HISTORY("%s[%d/%d+%d(%d)]: post passive %s p %d %p",
962 libcfs_id2str(msg->msg_target),
963 peer->plp_credits, peer->plp_outstanding_credits,
964 peer->plp_sent_credits,
965 plni->plni_peer_credits + peer->plp_lazy_credits,
966 lnet_msgtyp2str(msg->msg_type),
967 (le32_to_cpu(msg->msg_type) == LNET_MSG_PUT) ?
968 le32_to_cpu(msg->msg_hdr.msg.put.ptl_index) :
969 (le32_to_cpu(msg->msg_type) == LNET_MSG_GET) ?
970 le32_to_cpu(msg->msg_hdr.msg.get.ptl_index) : -1,
981 ptllnd_active_rdma(ptllnd_peer_t *peer, int type,
982 lnet_msg_t *msg, __u64 matchbits,
983 unsigned int niov, struct iovec *iov,
984 unsigned int offset, unsigned int len)
986 lnet_ni_t *ni = peer->plp_ni;
987 ptllnd_ni_t *plni = ni->ni_data;
988 ptllnd_tx_t *tx = ptllnd_new_tx(peer, type, 0);
993 LASSERT (type == PTLLND_RDMA_READ ||
994 type == PTLLND_RDMA_WRITE);
997 CERROR("Can't allocate tx for RDMA %s with %s\n",
998 (type == PTLLND_RDMA_WRITE) ? "write" : "read",
999 libcfs_id2str(peer->plp_id));
1000 ptllnd_close_peer(peer, -ENOMEM);
1004 rc = ptllnd_set_txiov(tx, niov, iov, offset, len);
1006 CERROR ("Can't allocate iov %d for %s\n",
1007 niov, libcfs_id2str(peer->plp_id));
1012 md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);
1013 md.eq_handle = plni->plni_eqh;
1015 md.options = PTLLND_MD_OPTIONS;
1016 md.threshold = (type == PTLLND_RDMA_READ) ? 2 : 1;
1018 ptllnd_set_md_buffer(&md, tx);
1020 rc = PtlMDBind(plni->plni_nih, md, LNET_UNLINK, &mdh);
1022 CERROR("PtlMDBind for %s failed: %d\n",
1023 libcfs_id2str(peer->plp_id), rc);
1028 tx->tx_bulkmdh = mdh;
1029 tx->tx_lnetmsg = msg;
1031 list_add_tail(&tx->tx_list, &peer->plp_activeq);
1032 PTLLND_DBGT_STAMP(tx->tx_bulk_posted);
1034 if (type == PTLLND_RDMA_READ)
1035 rc = PtlGet(mdh, peer->plp_ptlid,
1036 plni->plni_portal, 0, matchbits, 0);
1038 rc = PtlPut(mdh, PTL_NOACK_REQ, peer->plp_ptlid,
1039 plni->plni_portal, 0, matchbits, 0,
1040 (msg == NULL) ? PTLLND_RDMA_FAIL : PTLLND_RDMA_OK);
1045 CERROR("Can't initiate RDMA with %s: %d\n",
1046 libcfs_id2str(peer->plp_id), rc);
1048 tx->tx_lnetmsg = NULL;
1051 ptllnd_tx_done(tx); /* this will close peer */
1056 ptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *msg)
1058 ptllnd_ni_t *plni = ni->ni_data;
1064 LASSERT (!msg->msg_routing);
1065 LASSERT (msg->msg_kiov == NULL);
1067 LASSERT (msg->msg_niov <= PTL_MD_MAX_IOV); /* !!! */
1069 CDEBUG(D_NET, "%s [%d]+%d,%d -> %s%s\n",
1070 lnet_msgtyp2str(msg->msg_type),
1071 msg->msg_niov, msg->msg_offset, msg->msg_len,
1072 libcfs_nid2str(msg->msg_target.nid),
1073 msg->msg_target_is_router ? "(rtr)" : "");
1075 if ((msg->msg_target.pid & LNET_PID_USERFLAG) != 0) {
1076 CERROR("Can't send to non-kernel peer %s\n",
1077 libcfs_id2str(msg->msg_target));
1078 return -EHOSTUNREACH;
1081 plp = ptllnd_find_peer(ni, msg->msg_target, 1);
1085 switch (msg->msg_type) {
1090 LASSERT (msg->msg_len == 0);
1091 break; /* send IMMEDIATE */
1094 if (msg->msg_target_is_router)
1095 break; /* send IMMEDIATE */
1097 nob = msg->msg_md->md_length;
1098 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[nob]);
1099 if (nob <= plni->plni_max_msg_size)
1102 LASSERT ((msg->msg_md->md_options & LNET_MD_KIOV) == 0);
1103 rc = ptllnd_passive_rdma(plp, PTLLND_MSG_TYPE_GET, msg,
1104 msg->msg_md->md_niov,
1105 msg->msg_md->md_iov.iov,
1106 0, msg->msg_md->md_length);
1107 ptllnd_peer_decref(plp);
1110 case LNET_MSG_REPLY:
1113 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[nob]);
1114 if (nob <= plp->plp_max_msg_size)
1115 break; /* send IMMEDIATE */
1117 rc = ptllnd_passive_rdma(plp, PTLLND_MSG_TYPE_PUT, msg,
1118 msg->msg_niov, msg->msg_iov,
1119 msg->msg_offset, msg->msg_len);
1120 ptllnd_peer_decref(plp);
1125 * NB copy the payload so we don't have to do a fragmented send */
1127 tx = ptllnd_new_tx(plp, PTLLND_MSG_TYPE_IMMEDIATE, msg->msg_len);
1129 CERROR("Can't allocate tx for lnet type %d to %s\n",
1130 msg->msg_type, libcfs_id2str(msg->msg_target));
1131 ptllnd_peer_decref(plp);
1135 lnet_copy_iov2flat(tx->tx_msgsize, &tx->tx_msg,
1136 offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload),
1137 msg->msg_niov, msg->msg_iov, msg->msg_offset,
1139 tx->tx_msg.ptlm_u.immediate.kptlim_hdr = msg->msg_hdr;
1141 tx->tx_lnetmsg = msg;
1142 PTLLND_HISTORY("%s[%d/%d+%d(%d)]: post immediate %s p %d %p",
1143 libcfs_id2str(msg->msg_target),
1144 plp->plp_credits, plp->plp_outstanding_credits,
1145 plp->plp_sent_credits,
1146 plni->plni_peer_credits + plp->plp_lazy_credits,
1147 lnet_msgtyp2str(msg->msg_type),
1148 (le32_to_cpu(msg->msg_type) == LNET_MSG_PUT) ?
1149 le32_to_cpu(msg->msg_hdr.msg.put.ptl_index) :
1150 (le32_to_cpu(msg->msg_type) == LNET_MSG_GET) ?
1151 le32_to_cpu(msg->msg_hdr.msg.get.ptl_index) : -1,
1154 ptllnd_peer_decref(plp);
1159 ptllnd_rx_done(ptllnd_rx_t *rx)
1161 ptllnd_peer_t *plp = rx->rx_peer;
1162 lnet_ni_t *ni = plp->plp_ni;
1163 ptllnd_ni_t *plni = ni->ni_data;
1165 plp->plp_outstanding_credits++;
1167 PTLLND_HISTORY("%s[%d/%d+%d(%d)]: rx=%p done\n",
1168 libcfs_id2str(plp->plp_id),
1169 plp->plp_credits, plp->plp_outstanding_credits,
1170 plp->plp_sent_credits,
1171 plni->plni_peer_credits + plp->plp_lazy_credits, rx);
1173 ptllnd_check_sends(rx->rx_peer);
1175 LASSERT (plni->plni_nrxs > 0);
1180 ptllnd_eager_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg,
1181 void **new_privatep)
1183 /* Shouldn't get here; recvs only block for router buffers */
1189 ptllnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg,
1190 int delayed, unsigned int niov,
1191 struct iovec *iov, lnet_kiov_t *kiov,
1192 unsigned int offset, unsigned int mlen, unsigned int rlen)
1194 ptllnd_rx_t *rx = private;
1198 LASSERT (kiov == NULL);
1199 LASSERT (niov <= PTL_MD_MAX_IOV); /* !!! */
1201 switch (rx->rx_msg->ptlm_type) {
1205 case PTLLND_MSG_TYPE_IMMEDIATE:
1206 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[mlen]);
1207 if (nob > rx->rx_nob) {
1208 CERROR("Immediate message from %s too big: %d(%d)\n",
1209 libcfs_id2str(rx->rx_peer->plp_id),
1214 lnet_copy_flat2iov(niov, iov, offset,
1215 rx->rx_nob, rx->rx_msg,
1216 offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload),
1218 lnet_finalize(ni, msg, 0);
1221 case PTLLND_MSG_TYPE_PUT:
1222 rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_READ, msg,
1223 rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,
1224 niov, iov, offset, mlen);
1227 case PTLLND_MSG_TYPE_GET:
1229 rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_WRITE, msg,
1230 rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,
1231 msg->msg_niov, msg->msg_iov,
1232 msg->msg_offset, msg->msg_len);
1234 rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_WRITE, NULL,
1235 rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,
1245 ptllnd_abort_on_nak(lnet_ni_t *ni)
1247 ptllnd_ni_t *plni = ni->ni_data;
1249 if (plni->plni_dump_on_nak)
1250 ptllnd_dump_history();
1252 if (plni->plni_abort_on_nak)
1257 ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator,
1258 kptl_msg_t *msg, unsigned int nob)
1260 ptllnd_ni_t *plni = ni->ni_data;
1261 const int basenob = offsetof(kptl_msg_t, ptlm_u);
1262 lnet_process_id_t srcid;
1271 CERROR("Very short receive from %s\n",
1272 ptllnd_ptlid2str(initiator));
1276 /* I can at least read MAGIC/VERSION */
1278 flip = msg->ptlm_magic == __swab32(PTLLND_MSG_MAGIC);
1279 if (!flip && msg->ptlm_magic != PTLLND_MSG_MAGIC) {
1280 CERROR("Bad protocol magic %08x from %s\n",
1281 msg->ptlm_magic, ptllnd_ptlid2str(initiator));
1285 msg_version = flip ? __swab16(msg->ptlm_version) : msg->ptlm_version;
1287 if (msg_version != PTLLND_MSG_VERSION) {
1288 CERROR("Bad protocol version %04x from %s\n",
1289 (__u32)msg_version, ptllnd_ptlid2str(initiator));
1290 ptllnd_abort_on_nak(ni);
1294 if (nob < basenob) {
1295 CERROR("Short receive from %s: got %d, wanted at least %d\n",
1296 ptllnd_ptlid2str(initiator), nob, basenob);
1300 /* checksum must be computed with
1301 * 1) ptlm_cksum zero and
1302 * 2) BEFORE anything gets modified/flipped
1304 msg_cksum = flip ? __swab32(msg->ptlm_cksum) : msg->ptlm_cksum;
1305 msg->ptlm_cksum = 0;
1306 if (msg_cksum != 0 &&
1307 msg_cksum != ptllnd_cksum(msg, offsetof(kptl_msg_t, ptlm_u))) {
1308 CERROR("Bad checksum from %s\n", ptllnd_ptlid2str(initiator));
1312 msg->ptlm_version = msg_version;
1313 msg->ptlm_cksum = msg_cksum;
1316 /* NB stamps are opaque cookies */
1317 __swab32s(&msg->ptlm_nob);
1318 __swab64s(&msg->ptlm_srcnid);
1319 __swab64s(&msg->ptlm_dstnid);
1320 __swab32s(&msg->ptlm_srcpid);
1321 __swab32s(&msg->ptlm_dstpid);
1324 srcid.nid = msg->ptlm_srcnid;
1325 srcid.pid = msg->ptlm_srcpid;
1327 if (LNET_NIDNET(msg->ptlm_srcnid) != LNET_NIDNET(ni->ni_nid)) {
1328 CERROR("Bad source id %s from %s\n",
1329 libcfs_id2str(srcid),
1330 ptllnd_ptlid2str(initiator));
1334 if (msg->ptlm_type == PTLLND_MSG_TYPE_NAK) {
1335 CERROR("NAK from %s (%s)\n",
1336 libcfs_id2str(srcid),
1337 ptllnd_ptlid2str(initiator));
1338 ptllnd_abort_on_nak(ni);
1342 if (msg->ptlm_dstnid != ni->ni_nid ||
1343 msg->ptlm_dstpid != the_lnet.ln_pid) {
1344 CERROR("Bad dstid %s (%s expected) from %s\n",
1345 libcfs_id2str((lnet_process_id_t) {
1346 .nid = msg->ptlm_dstnid,
1347 .pid = msg->ptlm_dstpid}),
1348 libcfs_id2str((lnet_process_id_t) {
1350 .pid = the_lnet.ln_pid}),
1351 libcfs_id2str(srcid));
1355 if (msg->ptlm_dststamp != plni->plni_stamp) {
1356 CERROR("Bad dststamp "LPX64"("LPX64" expected) from %s\n",
1357 msg->ptlm_dststamp, plni->plni_stamp,
1358 libcfs_id2str(srcid));
1362 PTLLND_HISTORY("RX %s: %s %d %p", libcfs_id2str(srcid),
1363 ptllnd_msgtype2str(msg->ptlm_type),
1364 msg->ptlm_credits, &rx);
1366 switch (msg->ptlm_type) {
1367 case PTLLND_MSG_TYPE_PUT:
1368 case PTLLND_MSG_TYPE_GET:
1369 if (nob < basenob + sizeof(kptl_rdma_msg_t)) {
1370 CERROR("Short rdma request from %s(%s)\n",
1371 libcfs_id2str(srcid),
1372 ptllnd_ptlid2str(initiator));
1376 __swab64s(&msg->ptlm_u.rdma.kptlrm_matchbits);
1379 case PTLLND_MSG_TYPE_IMMEDIATE:
1380 if (nob < offsetof(kptl_msg_t,
1381 ptlm_u.immediate.kptlim_payload)) {
1382 CERROR("Short immediate from %s(%s)\n",
1383 libcfs_id2str(srcid),
1384 ptllnd_ptlid2str(initiator));
1389 case PTLLND_MSG_TYPE_HELLO:
1390 if (nob < basenob + sizeof(kptl_hello_msg_t)) {
1391 CERROR("Short hello from %s(%s)\n",
1392 libcfs_id2str(srcid),
1393 ptllnd_ptlid2str(initiator));
1397 __swab64s(&msg->ptlm_u.hello.kptlhm_matchbits);
1398 __swab32s(&msg->ptlm_u.hello.kptlhm_max_msg_size);
1402 case PTLLND_MSG_TYPE_NOOP:
1406 CERROR("Bad message type %d from %s(%s)\n", msg->ptlm_type,
1407 libcfs_id2str(srcid),
1408 ptllnd_ptlid2str(initiator));
1412 plp = ptllnd_find_peer(ni, srcid, 0);
1414 CERROR("Can't find peer %s\n", libcfs_id2str(srcid));
1418 if (msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {
1419 if (plp->plp_recvd_hello) {
1420 CERROR("Unexpected HELLO from %s\n",
1421 libcfs_id2str(srcid));
1422 ptllnd_peer_decref(plp);
1426 plp->plp_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size;
1427 plp->plp_match = msg->ptlm_u.hello.kptlhm_matchbits;
1428 plp->plp_stamp = msg->ptlm_srcstamp;
1429 plp->plp_recvd_hello = 1;
1431 } else if (!plp->plp_recvd_hello) {
1433 CERROR("Bad message type %d (HELLO expected) from %s\n",
1434 msg->ptlm_type, libcfs_id2str(srcid));
1435 ptllnd_peer_decref(plp);
1438 } else if (msg->ptlm_srcstamp != plp->plp_stamp) {
1440 CERROR("Bad srcstamp "LPX64"("LPX64" expected) from %s\n",
1441 msg->ptlm_srcstamp, plp->plp_stamp,
1442 libcfs_id2str(srcid));
1443 ptllnd_peer_decref(plp);
1447 /* Check peer only sends when I've sent her credits */
1448 if (plp->plp_sent_credits == 0) {
1449 CERROR("%s[%d/%d+%d(%d)]: unexpected message\n",
1450 libcfs_id2str(plp->plp_id),
1451 plp->plp_credits, plp->plp_outstanding_credits,
1452 plp->plp_sent_credits,
1453 plni->plni_peer_credits + plp->plp_lazy_credits);
1456 plp->plp_sent_credits--;
1458 /* No check for credit overflow - the peer may post new buffers after
1459 * the startup handshake. */
1460 if (msg->ptlm_credits > 0) {
1461 plp->plp_credits += msg->ptlm_credits;
1462 ptllnd_check_sends(plp);
1465 /* All OK so far; assume the message is good... */
1472 switch (msg->ptlm_type) {
1473 default: /* message types have been checked already */
1474 ptllnd_rx_done(&rx);
1477 case PTLLND_MSG_TYPE_PUT:
1478 case PTLLND_MSG_TYPE_GET:
1479 rc = lnet_parse(ni, &msg->ptlm_u.rdma.kptlrm_hdr,
1480 msg->ptlm_srcnid, &rx, 1);
1482 ptllnd_rx_done(&rx);
1485 case PTLLND_MSG_TYPE_IMMEDIATE:
1486 rc = lnet_parse(ni, &msg->ptlm_u.immediate.kptlim_hdr,
1487 msg->ptlm_srcnid, &rx, 0);
1489 ptllnd_rx_done(&rx);
1493 ptllnd_peer_decref(plp);
1497 ptllnd_buf_event (lnet_ni_t *ni, ptl_event_t *event)
1499 ptllnd_buffer_t *buf = ptllnd_eventarg2obj(event->md.user_ptr);
1500 ptllnd_ni_t *plni = ni->ni_data;
1501 char *msg = &buf->plb_buffer[event->offset];
1503 int unlinked = event->type == PTL_EVENT_UNLINK;
1505 LASSERT (buf->plb_ni == ni);
1506 LASSERT (event->type == PTL_EVENT_PUT_END ||
1507 event->type == PTL_EVENT_UNLINK);
1509 if (event->ni_fail_type != PTL_NI_OK) {
1511 CERROR("event type %s(%d), status %s(%d) from %s\n",
1512 ptllnd_evtype2str(event->type), event->type,
1513 ptllnd_errtype2str(event->ni_fail_type),
1514 event->ni_fail_type,
1515 ptllnd_ptlid2str(event->initiator));
1517 } else if (event->type == PTL_EVENT_PUT_END) {
1518 #if (PTL_MD_LOCAL_ALIGN8 == 0)
1519 /* Portals can't force message alignment - someone sending an
1520 * odd-length message could misalign subsequent messages */
1521 if ((event->mlength & 7) != 0) {
1522 CERROR("Message from %s has odd length %llu: "
1523 "probable version incompatibility\n",
1524 ptllnd_ptlid2str(event->initiator),
1529 LASSERT ((event->offset & 7) == 0);
1531 ptllnd_parse_request(ni, event->initiator,
1532 (kptl_msg_t *)msg, event->mlength);
1535 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
1536 /* UNLINK event only on explicit unlink */
1537 repost = (event->unlinked && event->type != PTL_EVENT_UNLINK);
1538 if (event->unlinked)
1541 /* UNLINK event only on implicit unlink */
1542 repost = (event->type == PTL_EVENT_UNLINK);
1546 LASSERT(buf->plb_posted);
1547 buf->plb_posted = 0;
1548 plni->plni_nposted_buffers--;
1552 (void) ptllnd_post_buffer(buf);
1556 ptllnd_tx_event (lnet_ni_t *ni, ptl_event_t *event)
1558 ptllnd_ni_t *plni = ni->ni_data;
1559 ptllnd_tx_t *tx = ptllnd_eventarg2obj(event->md.user_ptr);
1560 int error = (event->ni_fail_type != PTL_NI_OK);
1563 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
1564 int unlinked = event->unlinked;
1566 int unlinked = (event->type == PTL_EVENT_UNLINK);
1570 CERROR("Error %s(%d) event %s(%d) unlinked %d, %s(%d) for %s\n",
1571 ptllnd_errtype2str(event->ni_fail_type),
1572 event->ni_fail_type,
1573 ptllnd_evtype2str(event->type), event->type,
1574 unlinked, ptllnd_msgtype2str(tx->tx_type), tx->tx_type,
1575 libcfs_id2str(tx->tx_peer->plp_id));
1577 LASSERT (!PtlHandleIsEqual(event->md_handle, PTL_INVALID_HANDLE));
1579 isreq = PtlHandleIsEqual(event->md_handle, tx->tx_reqmdh);
1581 LASSERT (event->md.start == (void *)&tx->tx_msg);
1583 tx->tx_reqmdh = PTL_INVALID_HANDLE;
1584 PTLLND_DBGT_STAMP(tx->tx_req_done);
1588 isbulk = PtlHandleIsEqual(event->md_handle, tx->tx_bulkmdh);
1589 if ( isbulk && unlinked ) {
1590 tx->tx_bulkmdh = PTL_INVALID_HANDLE;
1591 PTLLND_DBGT_STAMP(tx->tx_bulk_done);
1594 LASSERT (!isreq != !isbulk); /* always one and only 1 match */
1596 PTLLND_HISTORY("%s[%d/%d+%d(%d)]: TX done %p %s%s",
1597 libcfs_id2str(tx->tx_peer->plp_id),
1598 tx->tx_peer->plp_credits,
1599 tx->tx_peer->plp_outstanding_credits,
1600 tx->tx_peer->plp_sent_credits,
1601 plni->plni_peer_credits + tx->tx_peer->plp_lazy_credits,
1602 tx, isreq ? "REQ" : "BULK", unlinked ? "(unlinked)" : "");
1604 LASSERT (!isreq != !isbulk); /* always one and only 1 match */
1605 switch (tx->tx_type) {
1609 case PTLLND_MSG_TYPE_NOOP:
1610 case PTLLND_MSG_TYPE_HELLO:
1611 case PTLLND_MSG_TYPE_IMMEDIATE:
1612 LASSERT (event->type == PTL_EVENT_UNLINK ||
1613 event->type == PTL_EVENT_SEND_END);
1617 case PTLLND_MSG_TYPE_GET:
1618 LASSERT (event->type == PTL_EVENT_UNLINK ||
1619 (isreq && event->type == PTL_EVENT_SEND_END) ||
1620 (isbulk && event->type == PTL_EVENT_PUT_END));
1622 if (isbulk && !error && event->type == PTL_EVENT_PUT_END) {
1623 /* Check GET matched */
1624 if (event->hdr_data == PTLLND_RDMA_OK) {
1625 lnet_set_reply_msg_len(ni,
1626 tx->tx_lnetreplymsg,
1629 CERROR ("Unmatched GET with %s\n",
1630 libcfs_id2str(tx->tx_peer->plp_id));
1631 tx->tx_status = -EIO;
1636 case PTLLND_MSG_TYPE_PUT:
1637 LASSERT (event->type == PTL_EVENT_UNLINK ||
1638 (isreq && event->type == PTL_EVENT_SEND_END) ||
1639 (isbulk && event->type == PTL_EVENT_GET_END));
1642 case PTLLND_RDMA_READ:
1643 LASSERT (event->type == PTL_EVENT_UNLINK ||
1644 event->type == PTL_EVENT_SEND_END ||
1645 event->type == PTL_EVENT_REPLY_END);
1649 case PTLLND_RDMA_WRITE:
1650 LASSERT (event->type == PTL_EVENT_UNLINK ||
1651 event->type == PTL_EVENT_SEND_END);
1655 /* Schedule ptllnd_tx_done() on error or last completion event */
1657 (PtlHandleIsEqual(tx->tx_bulkmdh, PTL_INVALID_HANDLE) &&
1658 PtlHandleIsEqual(tx->tx_reqmdh, PTL_INVALID_HANDLE))) {
1660 tx->tx_status = -EIO;
1661 list_del(&tx->tx_list);
1662 list_add_tail(&tx->tx_list, &plni->plni_zombie_txs);
1667 ptllnd_wait (lnet_ni_t *ni, int milliseconds)
1669 static struct timeval prevt;
1670 static int prevt_count;
1671 static int call_count;
1676 ptllnd_ni_t *plni = ni->ni_data;
1685 /* Handle any currently queued events, returning immediately if any.
1686 * Otherwise block for the timeout and handle all events queued
1689 gettimeofday(&t1, NULL);
1693 time_t then = cfs_time_current_sec();
1695 rc = PtlEQPoll(&plni->plni_eqh, 1,
1696 (timeout < 0) ? PTL_TIME_FOREVER : timeout,
1700 (cfs_time_current_sec() - then)*1000 > timeout + 1000) {
1701 /* 1000 mS grace.............................^ */
1702 CERROR("SLOW PtlEQPoll(%d): %d seconds\n", timeout,
1703 (int)(cfs_time_current_sec() - then));
1708 if (rc == PTL_EQ_EMPTY) {
1709 if (found || /* handled some events */
1710 milliseconds == 0 || /* just checking */
1711 blocked) /* blocked already */
1715 timeout = (milliseconds < 0) ?
1716 PTL_TIME_FOREVER : milliseconds;
1720 LASSERT (rc == PTL_OK || rc == PTL_EQ_DROPPED);
1722 if (rc == PTL_EQ_DROPPED)
1723 CERROR("Event queue: size %d is too small\n",
1724 plni->plni_eq_size);
1727 switch (ptllnd_eventarg2type(event.md.user_ptr)) {
1731 case PTLLND_EVENTARG_TYPE_TX:
1732 ptllnd_tx_event(ni, &event);
1735 case PTLLND_EVENTARG_TYPE_BUF:
1736 ptllnd_buf_event(ni, &event);
1741 while (!list_empty(&plni->plni_zombie_txs)) {
1742 tx = list_entry(plni->plni_zombie_txs.next,
1743 ptllnd_tx_t, tx_list);
1747 gettimeofday(&t2, NULL);
1749 if (prevt.tv_sec == 0 ||
1750 prevt.tv_sec != t2.tv_sec) {
1751 PTLLND_HISTORY("%d wait entered at %d.%06d - prev %d %d.%06d",
1752 call_count, (int)t1.tv_sec, (int)t1.tv_usec,
1753 prevt_count, (int)prevt.tv_sec, (int)prevt.tv_usec);