1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Data movement routines
7 * Copyright (c) 2001-2003 Cluster File Systems, Inc.
8 * Copyright (c) 2001-2002 Sandia National Laboratories
10 * This file is part of Lustre, http://www.sf.net/projects/lustre/
12 * Lustre is free software; you can redistribute it and/or
13 * modify it under the terms of version 2 of the GNU General Public
14 * License as published by the Free Software Foundation.
16 * Lustre is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU General Public License for more details.
21 * You should have received a copy of the GNU General Public License
22 * along with Lustre; if not, write to the Free Software
23 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
29 # define DEBUG_SUBSYSTEM S_PORTALS
30 # include <linux/kp30.h>
32 #include <portals/p30.h>
33 #include <portals/lib-p30.h>
34 #include <portals/arg-blocks.h>
37 * Right now it does not check access control lists.
39 * We only support one MD per ME, which is how the Portals 3.1 spec is written.
40 * All previous complication is removed.
44 lib_find_me(nal_cb_t *nal, int index, int op_mask, ptl_nid_t src_nid,
45 ptl_pid_t src_pid, ptl_size_t rlength, ptl_size_t roffset,
46 ptl_match_bits_t match_bits, ptl_size_t *mlength_out,
47 ptl_size_t *offset_out, int *unlink_out)
49 lib_ni_t *ni = &nal->ni;
50 struct list_head *match_list = &ni->tbl.tbl[index];
51 struct list_head *tmp;
59 CDEBUG (D_NET, "Request from "LPU64".%d of length %d into portal %d "
60 "MB="LPX64"\n", src_nid, src_pid, rlength, index, match_bits);
62 if (index < 0 || index >= ni->tbl.size) {
63 CERROR("Invalid portal %d not in [0-%d]\n",
68 list_for_each (tmp, match_list) {
69 me = list_entry(tmp, lib_me_t, me_list);
72 /* ME attached but MD not attached yet */
76 LASSERT (me == md->me);
79 if (md->threshold == 0)
82 /* mismatched MD op */
83 if ((md->options & op_mask) == 0)
86 /* mismatched ME nid/pid? */
87 if (me->match_id.nid != PTL_NID_ANY &&
88 me->match_id.nid != src_nid)
91 if (me->match_id.pid != PTL_PID_ANY &&
92 me->match_id.pid != src_pid)
95 /* mismatched ME matchbits? */
96 if (((me->match_bits ^ match_bits) & ~me->ignore_bits) != 0)
99 /* Hurrah! This _is_ a match; check it out... */
101 if ((md->options & PTL_MD_MANAGE_REMOTE) == 0)
106 mlength = md->length - offset;
107 if ((md->options & PTL_MD_MAX_SIZE) != 0 &&
108 mlength > md->max_size)
109 mlength = md->max_size;
111 if (rlength <= mlength) { /* fits in allowed space */
113 } else if ((md->options & PTL_MD_TRUNCATE) == 0) {
114 /* this packet _really_ is too big */
115 CERROR("Matching packet %d too big: %d left, "
116 "%d allowed\n", rlength, md->length - offset,
121 md->offset = offset + mlength;
123 *offset_out = offset;
124 *mlength_out = mlength;
125 *unlink_out = ((md->options & PTL_MD_AUTO_UNLINK) != 0 &&
126 md->offset >= (md->length - md->max_size));
131 CERROR (LPU64": Dropping %s from "LPU64".%d portal %d match "LPX64
132 " offset %d length %d: no match\n",
133 ni->nid, (op_mask == PTL_MD_OP_GET) ? "GET" : "PUT",
134 src_nid, src_pid, index, match_bits, roffset, rlength);
138 int do_PtlFailNid (nal_cb_t *nal, void *private, void *v_args, void *v_ret)
140 PtlFailNid_in *args = v_args;
141 PtlFailNid_out *ret = v_ret;
144 struct list_head *el;
145 struct list_head *next;
146 struct list_head cull;
148 if (args->threshold != 0) {
149 /* Adding a new entry */
150 tp = (lib_test_peer_t *)nal->cb_malloc (nal, sizeof (*tp));
152 return (ret->rc = PTL_FAIL);
154 tp->tp_nid = args->nid;
155 tp->tp_threshold = args->threshold;
157 state_lock (nal, &flags);
158 list_add (&tp->tp_list, &nal->ni.ni_test_peers);
159 state_unlock (nal, &flags);
160 return (ret->rc = PTL_OK);
163 /* removing entries */
164 INIT_LIST_HEAD (&cull);
166 state_lock (nal, &flags);
168 list_for_each_safe (el, next, &nal->ni.ni_test_peers) {
169 tp = list_entry (el, lib_test_peer_t, tp_list);
171 if (tp->tp_threshold == 0 || /* needs culling anyway */
172 args->nid == PTL_NID_ANY || /* removing all entries */
173 tp->tp_nid == args->nid) /* matched this one */
175 list_del (&tp->tp_list);
176 list_add (&tp->tp_list, &cull);
180 state_unlock (nal, &flags);
182 while (!list_empty (&cull)) {
183 tp = list_entry (cull.next, lib_test_peer_t, tp_list);
185 list_del (&tp->tp_list);
186 nal->cb_free (nal, tp, sizeof (*tp));
188 return (ret->rc = PTL_OK);
192 fail_peer (nal_cb_t *nal, ptl_nid_t nid, int outgoing)
195 struct list_head *el;
196 struct list_head *next;
198 struct list_head cull;
201 INIT_LIST_HEAD (&cull);
203 state_lock (nal, &flags);
205 list_for_each_safe (el, next, &nal->ni.ni_test_peers) {
206 tp = list_entry (el, lib_test_peer_t, tp_list);
208 if (tp->tp_threshold == 0) {
211 /* only cull zombies on outgoing tests,
212 * since we may be at interrupt priority on
213 * incoming messages. */
214 list_del (&tp->tp_list);
215 list_add (&tp->tp_list, &cull);
220 if (tp->tp_nid == PTL_NID_ANY || /* fail every peer */
221 nid == tp->tp_nid) { /* fail this peer */
224 if (tp->tp_threshold != PTL_MD_THRESH_INF) {
227 tp->tp_threshold == 0) {
229 list_del (&tp->tp_list);
230 list_add (&tp->tp_list, &cull);
237 state_unlock (nal, &flags);
239 while (!list_empty (&cull)) {
240 tp = list_entry (cull.next, lib_test_peer_t, tp_list);
241 list_del (&tp->tp_list);
243 nal->cb_free (nal, tp, sizeof (*tp));
250 lib_iov_nob (int niov, struct iovec *iov)
255 nob += (iov++)->iov_len;
261 lib_copy_iov2buf (char *dest, int niov, struct iovec *iov,
262 ptl_size_t offset, ptl_size_t len)
269 /* skip complete frags before 'offset' */
271 while (offset >= iov->iov_len) {
272 offset -= iov->iov_len;
280 nob = MIN (iov->iov_len - offset, len);
281 memcpy (dest, iov->iov_base + offset, nob);
292 lib_copy_buf2iov (int niov, struct iovec *iov, ptl_size_t offset,
293 char *src, ptl_size_t len)
300 /* skip complete frags before 'offset' */
302 while (offset >= iov->iov_len) {
303 offset -= iov->iov_len;
311 nob = MIN (iov->iov_len - offset, len);
312 memcpy (iov->iov_base + offset, src, nob);
323 lib_extract_iov (int dst_niov, struct iovec *dst,
324 int src_niov, struct iovec *src,
325 ptl_size_t offset, ptl_size_t len)
327 /* Initialise 'dst' to the subset of 'src' starting at 'offset',
328 * for exactly 'len' bytes, and return the number of entries.
329 * NB not destructive to 'src' */
333 if (len == 0) /* no data => */
334 return (0); /* no frags */
336 LASSERT (src_niov > 0);
337 while (offset >= src->iov_len) { /* skip initial frags */
338 offset -= src->iov_len;
341 LASSERT (src_niov > 0);
346 LASSERT (src_niov > 0);
347 LASSERT (niov <= dst_niov);
349 frag_len = src->iov_len - offset;
350 dst->iov_base = ((char *)src->iov_base) + offset;
352 if (len <= frag_len) {
357 dst->iov_len = frag_len;
370 lib_kiov_nob (int niov, ptl_kiov_t *kiov)
377 lib_copy_kiov2buf (char *dest, int niov, ptl_kiov_t *kiov,
378 ptl_size_t offset, ptl_size_t len)
384 lib_copy_buf2kiov (int niov, ptl_kiov_t *kiov, ptl_size_t offset,
385 char *src, ptl_size_t len)
391 lib_extract_kiov (int dst_niov, ptl_kiov_t *dst,
392 int src_niov, ptl_kiov_t *src,
393 ptl_size_t offset, ptl_size_t len)
401 lib_kiov_nob (int niov, ptl_kiov_t *kiov)
406 nob += (kiov++)->kiov_len;
412 lib_copy_kiov2buf (char *dest, int niov, ptl_kiov_t *kiov,
413 ptl_size_t offset, ptl_size_t len)
421 LASSERT (!in_interrupt ());
424 while (offset > kiov->kiov_len) {
425 offset -= kiov->kiov_len;
433 nob = MIN (kiov->kiov_len - offset, len);
435 addr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
436 memcpy (dest, addr, nob);
437 kunmap (kiov->kiov_page);
448 lib_copy_buf2kiov (int niov, ptl_kiov_t *kiov, ptl_size_t offset,
449 char *src, ptl_size_t len)
457 LASSERT (!in_interrupt ());
460 while (offset >= kiov->kiov_len) {
461 offset -= kiov->kiov_len;
469 nob = MIN (kiov->kiov_len - offset, len);
471 addr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
472 memcpy (addr, src, nob);
473 kunmap (kiov->kiov_page);
484 lib_extract_kiov (int dst_niov, ptl_kiov_t *dst,
485 int src_niov, ptl_kiov_t *src,
486 ptl_size_t offset, ptl_size_t len)
488 /* Initialise 'dst' to the subset of 'src' starting at 'offset',
489 * for exactly 'len' bytes, and return the number of entries.
490 * NB not destructive to 'src' */
494 if (len == 0) /* no data => */
495 return (0); /* no frags */
497 LASSERT (src_niov > 0);
498 while (offset >= src->kiov_len) { /* skip initial frags */
499 offset -= src->kiov_len;
502 LASSERT (src_niov > 0);
507 LASSERT (src_niov > 0);
508 LASSERT (niov <= dst_niov);
510 frag_len = src->kiov_len - offset;
511 dst->kiov_page = src->kiov_page;
512 dst->kiov_offset = src->kiov_offset + offset;
514 if (len <= frag_len) {
516 LASSERT (dst->kiov_offset + dst->kiov_len <= PAGE_SIZE);
520 dst->kiov_len = frag_len;
521 LASSERT (dst->kiov_offset + dst->kiov_len <= PAGE_SIZE);
534 lib_recv (nal_cb_t *nal, void *private, lib_msg_t *msg, lib_md_t *md,
535 ptl_size_t offset, ptl_size_t mlen, ptl_size_t rlen)
538 return (nal->cb_recv(nal, private, msg,
540 offset, mlen, rlen));
542 if ((md->options & PTL_MD_KIOV) == 0)
543 return (nal->cb_recv(nal, private, msg,
544 md->md_niov, md->md_iov.iov,
545 offset, mlen, rlen));
547 return (nal->cb_recv_pages(nal, private, msg,
548 md->md_niov, md->md_iov.kiov,
549 offset, mlen, rlen));
553 lib_send (nal_cb_t *nal, void *private, lib_msg_t *msg,
554 ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
555 lib_md_t *md, ptl_size_t offset, ptl_size_t len)
558 return (nal->cb_send(nal, private, msg,
563 if ((md->options & PTL_MD_KIOV) == 0)
564 return (nal->cb_send(nal, private, msg,
566 md->md_niov, md->md_iov.iov,
569 return (nal->cb_send_pages(nal, private, msg,
571 md->md_niov, md->md_iov.kiov,
576 lib_commit_md (nal_cb_t *nal, lib_md_t *md, lib_msg_t *msg)
578 /* ALWAYS called holding the state_lock */
579 lib_counters_t *counters = &nal->ni.counters;
581 /* Here, we commit the MD to a network OP by marking it busy and
582 * decrementing its threshold. Come what may, the network "owns"
583 * the MD until a call to lib_finalize() signals completion. */
587 if (md->threshold != PTL_MD_THRESH_INF) {
588 LASSERT (md->threshold > 0);
592 counters->msgs_alloc++;
593 if (counters->msgs_alloc > counters->msgs_max)
594 counters->msgs_max = counters->msgs_alloc;
596 list_add (&msg->msg_list, &nal->ni.ni_active_msgs);
600 lib_drop_message (nal_cb_t *nal, void *private, ptl_hdr_t *hdr)
604 /* CAVEAT EMPTOR: this only drops messages that we've not committed
605 * to receive (init_msg() not called) and therefore can't cause an
608 state_lock(nal, &flags);
609 nal->ni.counters.drop_count++;
610 nal->ni.counters.drop_length += hdr->payload_length;
611 state_unlock(nal, &flags);
613 /* NULL msg => if NAL calls lib_finalize it will be a noop */
614 (void) lib_recv(nal, private, NULL, NULL, 0, 0, hdr->payload_length);
618 * Incoming messages have a ptl_msg_t object associated with them
619 * by the library. This object encapsulates the state of the
620 * message and allows the NAL to do non-blocking receives or sends
625 parse_put(nal_cb_t *nal, ptl_hdr_t *hdr, void *private, lib_msg_t *msg)
627 lib_ni_t *ni = &nal->ni;
628 ptl_size_t mlength = 0;
629 ptl_size_t offset = 0;
636 /* Convert put fields to host byte order */
637 hdr->msg.put.match_bits = NTOH__u64 (hdr->msg.put.match_bits);
638 hdr->msg.put.ptl_index = NTOH__u32 (hdr->msg.put.ptl_index);
639 hdr->msg.put.offset = NTOH__u32 (hdr->msg.put.offset);
641 state_lock(nal, &flags);
643 me = lib_find_me(nal, hdr->msg.put.ptl_index, PTL_MD_OP_PUT,
644 hdr->src_nid, hdr->src_pid,
645 hdr->payload_length, hdr->msg.put.offset,
646 hdr->msg.put.match_bits,
647 &mlength, &offset, &unlink);
649 state_unlock(nal, &flags);
654 CDEBUG(D_NET, "Incoming put index %x from "LPU64"/%u of length %d/%d "
655 "into md "LPX64" [%d] + %d\n", hdr->msg.put.ptl_index,
656 hdr->src_nid, hdr->src_pid, mlength, hdr->payload_length,
657 md->md_lh.lh_cookie, md->md_niov, offset);
659 lib_commit_md(nal, md, msg);
661 msg->ev.type = PTL_EVENT_PUT;
662 msg->ev.initiator.nid = hdr->src_nid;
663 msg->ev.initiator.pid = hdr->src_pid;
664 msg->ev.portal = hdr->msg.put.ptl_index;
665 msg->ev.match_bits = hdr->msg.put.match_bits;
666 msg->ev.rlength = hdr->payload_length;
667 msg->ev.mlength = mlength;
668 msg->ev.offset = offset;
669 msg->ev.hdr_data = hdr->msg.put.hdr_data;
671 lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
673 if (!ptl_is_wire_handle_none(&hdr->msg.put.ack_wmd) &&
674 !(md->options & PTL_MD_ACK_DISABLE)) {
675 msg->ack_wmd = hdr->msg.put.ack_wmd;
678 ni->counters.recv_count++;
679 ni->counters.recv_length += mlength;
681 /* only unlink after MD's pending count has been bumped in
682 * lib_commit_md() otherwise lib_me_unlink() will nuke it */
684 lib_me_unlink (nal, me);
686 state_unlock(nal, &flags);
688 rc = lib_recv(nal, private, msg, md, offset, mlength,
689 hdr->payload_length);
691 CERROR(LPU64": error on receiving PUT from "LPU64": %d\n",
692 ni->nid, hdr->src_nid, rc);
698 parse_get(nal_cb_t *nal, ptl_hdr_t *hdr, void *private, lib_msg_t *msg)
700 lib_ni_t *ni = &nal->ni;
701 ptl_size_t mlength = 0;
702 ptl_size_t offset = 0;
710 /* Convert get fields to host byte order */
711 hdr->msg.get.match_bits = NTOH__u64 (hdr->msg.get.match_bits);
712 hdr->msg.get.ptl_index = NTOH__u32 (hdr->msg.get.ptl_index);
713 hdr->msg.get.sink_length = NTOH__u32 (hdr->msg.get.sink_length);
714 hdr->msg.get.src_offset = NTOH__u32 (hdr->msg.get.src_offset);
716 state_lock(nal, &flags);
718 me = lib_find_me(nal, hdr->msg.get.ptl_index, PTL_MD_OP_GET,
719 hdr->src_nid, hdr->src_pid,
720 hdr->msg.get.sink_length, hdr->msg.get.src_offset,
721 hdr->msg.get.match_bits,
722 &mlength, &offset, &unlink);
724 state_unlock(nal, &flags);
729 CDEBUG(D_NET, "Incoming get index %d from "LPU64".%u of length %d/%d "
730 "from md "LPX64" [%d] + %d\n", hdr->msg.get.ptl_index,
731 hdr->src_nid, hdr->src_pid, mlength, hdr->payload_length,
732 md->md_lh.lh_cookie, md->md_niov, offset);
734 lib_commit_md(nal, md, msg);
736 msg->ev.type = PTL_EVENT_GET;
737 msg->ev.initiator.nid = hdr->src_nid;
738 msg->ev.initiator.pid = hdr->src_pid;
739 msg->ev.portal = hdr->msg.get.ptl_index;
740 msg->ev.match_bits = hdr->msg.get.match_bits;
741 msg->ev.rlength = hdr->payload_length;
742 msg->ev.mlength = mlength;
743 msg->ev.offset = offset;
744 msg->ev.hdr_data = 0;
746 lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
748 ni->counters.send_count++;
749 ni->counters.send_length += mlength;
751 /* only unlink after MD's refcount has been bumped in
752 * lib_commit_md() otherwise lib_me_unlink() will nuke it */
754 lib_me_unlink (nal, me);
756 state_unlock(nal, &flags);
758 memset (&reply, 0, sizeof (reply));
759 reply.type = HTON__u32 (PTL_MSG_REPLY);
760 reply.dest_nid = HTON__u64 (hdr->src_nid);
761 reply.src_nid = HTON__u64 (ni->nid);
762 reply.dest_pid = HTON__u32 (hdr->src_pid);
763 reply.src_pid = HTON__u32 (ni->pid);
764 reply.payload_length = HTON__u32 (mlength);
766 reply.msg.reply.dst_wmd = hdr->msg.get.return_wmd;
768 /* NB call lib_send() _BEFORE_ lib_recv() completes the incoming
769 * message. Some NALs _require_ this to implement optimized GET */
771 rc = lib_send (nal, private, msg, &reply, PTL_MSG_REPLY,
772 hdr->src_nid, hdr->src_pid, md, offset, mlength);
774 CERROR(LPU64": Unable to send REPLY for GET from "LPU64": %d\n",
775 ni->nid, hdr->src_nid, rc);
777 /* Discard any junk after the hdr */
778 (void) lib_recv(nal, private, NULL, NULL, 0, 0, hdr->payload_length);
784 parse_reply(nal_cb_t *nal, ptl_hdr_t *hdr, void *private, lib_msg_t *msg)
786 lib_ni_t *ni = &nal->ni;
793 state_lock(nal, &flags);
795 /* NB handles only looked up by creator (no flips) */
796 md = ptl_wire_handle2md(&hdr->msg.reply.dst_wmd, nal);
797 if (md == NULL || md->threshold == 0) {
798 CERROR (LPU64": Dropping REPLY from "LPU64" for %s MD "LPX64"."LPX64"\n",
799 ni->nid, hdr->src_nid,
800 md == NULL ? "invalid" : "inactive",
801 hdr->msg.reply.dst_wmd.wh_interface_cookie,
802 hdr->msg.reply.dst_wmd.wh_object_cookie);
804 state_unlock(nal, &flags);
808 LASSERT (md->offset == 0);
810 length = rlength = hdr->payload_length;
812 if (length > md->length) {
813 if ((md->options & PTL_MD_TRUNCATE) == 0) {
814 CERROR (LPU64": Dropping REPLY from "LPU64
815 " length %d for MD "LPX64" would overflow (%d)\n",
816 ni->nid, hdr->src_nid, length,
817 hdr->msg.reply.dst_wmd.wh_object_cookie,
819 state_unlock(nal, &flags);
825 CDEBUG(D_NET, "Reply from "LPU64" of length %d/%d into md "LPX64"\n",
826 hdr->src_nid, length, rlength,
827 hdr->msg.reply.dst_wmd.wh_object_cookie);
829 lib_commit_md(nal, md, msg);
831 msg->ev.type = PTL_EVENT_REPLY;
832 msg->ev.initiator.nid = hdr->src_nid;
833 msg->ev.initiator.pid = hdr->src_pid;
834 msg->ev.rlength = rlength;
835 msg->ev.mlength = length;
838 lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
840 ni->counters.recv_count++;
841 ni->counters.recv_length += length;
843 state_unlock(nal, &flags);
845 rc = lib_recv(nal, private, msg, md, 0, length, rlength);
847 CERROR(LPU64": error on receiving REPLY from "LPU64": %d\n",
848 ni->nid, hdr->src_nid, rc);
854 parse_ack(nal_cb_t *nal, ptl_hdr_t *hdr, void *private, lib_msg_t *msg)
856 lib_ni_t *ni = &nal->ni;
860 /* Convert ack fields to host byte order */
861 hdr->msg.ack.match_bits = NTOH__u64 (hdr->msg.ack.match_bits);
862 hdr->msg.ack.mlength = NTOH__u32 (hdr->msg.ack.mlength);
864 state_lock(nal, &flags);
866 /* NB handles only looked up by creator (no flips) */
867 md = ptl_wire_handle2md(&hdr->msg.ack.dst_wmd, nal);
868 if (md == NULL || md->threshold == 0) {
869 CDEBUG(D_INFO, LPU64": Dropping ACK from "LPU64" to %s MD "
870 LPX64"."LPX64"\n", ni->nid, hdr->src_nid,
871 (md == NULL) ? "invalid" : "inactive",
872 hdr->msg.ack.dst_wmd.wh_interface_cookie,
873 hdr->msg.ack.dst_wmd.wh_object_cookie);
875 state_unlock(nal, &flags);
879 CDEBUG(D_NET, LPU64": ACK from "LPU64" into md "LPX64"\n",
880 ni->nid, hdr->src_nid,
881 hdr->msg.ack.dst_wmd.wh_object_cookie);
883 lib_commit_md(nal, md, msg);
885 msg->ev.type = PTL_EVENT_ACK;
886 msg->ev.initiator.nid = hdr->src_nid;
887 msg->ev.initiator.pid = hdr->src_pid;
888 msg->ev.mlength = hdr->msg.ack.mlength;
889 msg->ev.match_bits = hdr->msg.ack.match_bits;
891 lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
893 ni->counters.recv_count++;
895 state_unlock(nal, &flags);
897 /* We have received and matched up the ack OK, create the
898 * completion event now... */
899 lib_finalize(nal, private, msg, PTL_OK);
901 /* ...and now discard any junk after the hdr */
902 (void) lib_recv(nal, private, NULL, NULL, 0, 0, hdr->payload_length);
908 hdr_type_string (ptl_hdr_t *hdr)
922 return ("<UNKNOWN>");
926 void print_hdr(nal_cb_t * nal, ptl_hdr_t * hdr)
928 char *type_str = hdr_type_string (hdr);
930 nal->cb_printf(nal, "P3 Header at %p of type %s\n", hdr, type_str);
931 nal->cb_printf(nal, " From nid/pid %Lu/%Lu", hdr->src_nid,
933 nal->cb_printf(nal, " To nid/pid %Lu/%Lu\n", hdr->dest_nid,
942 " Ptl index %d, ack md "LPX64"."LPX64", "
943 "match bits "LPX64"\n",
944 hdr->msg.put.ptl_index,
945 hdr->msg.put.ack_wmd.wh_interface_cookie,
946 hdr->msg.put.ack_wmd.wh_object_cookie,
947 hdr->msg.put.match_bits);
949 " Length %d, offset %d, hdr data "LPX64"\n",
950 hdr->payload_length, hdr->msg.put.offset,
951 hdr->msg.put.hdr_data);
956 " Ptl index %d, return md "LPX64"."LPX64", "
957 "match bits "LPX64"\n", hdr->msg.get.ptl_index,
958 hdr->msg.get.return_wmd.wh_interface_cookie,
959 hdr->msg.get.return_wmd.wh_object_cookie,
960 hdr->msg.get.match_bits);
962 " Length %d, src offset %d\n",
963 hdr->msg.get.sink_length,
964 hdr->msg.get.src_offset);
968 nal->cb_printf(nal, " dst md "LPX64"."LPX64", "
969 "manipulated length %d\n",
970 hdr->msg.ack.dst_wmd.wh_interface_cookie,
971 hdr->msg.ack.dst_wmd.wh_object_cookie,
972 hdr->msg.ack.mlength);
976 nal->cb_printf(nal, " dst md "LPX64"."LPX64", "
978 hdr->msg.reply.dst_wmd.wh_interface_cookie,
979 hdr->msg.reply.dst_wmd.wh_object_cookie,
980 hdr->payload_length);
983 } /* end of print_hdr() */
987 lib_parse(nal_cb_t *nal, ptl_hdr_t *hdr, void *private)
993 /* convert common fields to host byte order */
994 hdr->dest_nid = NTOH__u64 (hdr->dest_nid);
995 hdr->src_nid = NTOH__u64 (hdr->src_nid);
996 hdr->dest_pid = NTOH__u32 (hdr->dest_pid);
997 hdr->src_pid = NTOH__u32 (hdr->src_pid);
998 hdr->type = NTOH__u32 (hdr->type);
999 hdr->payload_length = NTOH__u32(hdr->payload_length);
1001 nal->cb_printf(nal, "%d: lib_parse: nal=%p hdr=%p type=%d\n",
1002 nal->ni.nid, nal, hdr, hdr->type);
1003 print_hdr(nal, hdr);
1005 if (hdr->type == PTL_MSG_HELLO) {
1006 /* dest_nid is really ptl_magicversion_t */
1007 ptl_magicversion_t *mv = (ptl_magicversion_t *)&hdr->dest_nid;
1009 CERROR (LPU64": Dropping unexpected HELLO message: "
1010 "magic %d, version %d.%d from "LPD64"\n",
1011 nal->ni.nid, mv->magic,
1012 mv->version_major, mv->version_minor,
1014 lib_drop_message(nal, private, hdr);
1018 if (hdr->dest_nid != nal->ni.nid) {
1019 CERROR(LPU64": Dropping %s message from "LPU64" to "LPU64
1020 " (not me)\n", nal->ni.nid, hdr_type_string (hdr),
1021 hdr->src_nid, hdr->dest_nid);
1022 lib_drop_message(nal, private, hdr);
1026 if (!list_empty (&nal->ni.ni_test_peers) && /* normally we don't */
1027 fail_peer (nal, hdr->src_nid, 0)) /* shall we now? */
1029 CERROR(LPU64": Dropping incoming %s from "LPU64
1030 ": simulated failure\n",
1031 nal->ni.nid, hdr_type_string (hdr),
1033 lib_drop_message(nal, private, hdr);
1037 msg = lib_msg_alloc(nal);
1039 CERROR(LPU64": Dropping incoming %s from "LPU64
1040 ": can't allocate a lib_msg_t\n",
1041 nal->ni.nid, hdr_type_string (hdr),
1043 lib_drop_message(nal, private, hdr);
1047 do_gettimeofday(&msg->ev.arrival_time);
1049 switch (hdr->type) {
1051 rc = parse_ack(nal, hdr, private, msg);
1054 rc = parse_put(nal, hdr, private, msg);
1057 rc = parse_get(nal, hdr, private, msg);
1060 rc = parse_reply(nal, hdr, private, msg);
1063 CERROR(LPU64": Dropping <unknown> message from "LPU64
1064 ": Bad type=0x%x\n", nal->ni.nid, hdr->src_nid,
1071 if (msg->md != NULL) {
1073 lib_finalize(nal, private, msg, rc);
1075 state_lock(nal, &flags);
1076 lib_msg_free(nal, msg); /* expects state_lock held */
1077 state_unlock(nal, &flags);
1079 lib_drop_message(nal, private, hdr);
1085 do_PtlPut(nal_cb_t *nal, void *private, void *v_args, void *v_ret)
1089 * ptl_handle_md_t md_in
1090 * ptl_ack_req_t ack_req_in
1091 * ptl_process_id_t target_in
1092 * ptl_pt_index_t portal_in
1093 * ptl_ac_index_t cookie_in
1094 * ptl_match_bits_t match_bits_in
1095 * ptl_size_t offset_in
1100 PtlPut_in *args = v_args;
1101 ptl_process_id_t *id = &args->target_in;
1102 PtlPut_out *ret = v_ret;
1103 lib_ni_t *ni = &nal->ni;
1107 unsigned long flags;
1110 if (!list_empty (&nal->ni.ni_test_peers) && /* normally we don't */
1111 fail_peer (nal, id->nid, 1)) /* shall we now? */
1113 CERROR(LPU64": Dropping PUT to "LPU64": simulated failure\n",
1114 nal->ni.nid, id->nid);
1115 return (ret->rc = PTL_INV_PROC);
1118 msg = lib_msg_alloc(nal);
1120 CERROR(LPU64": Dropping PUT to "LPU64": ENOMEM on lib_msg_t\n",
1122 return (ret->rc = PTL_NOSPACE);
1125 state_lock(nal, &flags);
1127 md = ptl_handle2md(&args->md_in, nal);
1128 if (md == NULL || md->threshold == 0) {
1129 lib_msg_free(nal, msg);
1130 state_unlock(nal, &flags);
1132 return (ret->rc = PTL_INV_MD);
1135 CDEBUG(D_NET, "PtlPut -> %Lu: %lu\n", (unsigned long long)id->nid,
1136 (unsigned long)id->pid);
1138 memset (&hdr, 0, sizeof (hdr));
1139 hdr.type = HTON__u32 (PTL_MSG_PUT);
1140 hdr.dest_nid = HTON__u64 (id->nid);
1141 hdr.src_nid = HTON__u64 (ni->nid);
1142 hdr.dest_pid = HTON__u32 (id->pid);
1143 hdr.src_pid = HTON__u32 (ni->pid);
1144 hdr.payload_length = HTON__u32 (md->length);
1146 /* NB handles only looked up by creator (no flips) */
1147 if (args->ack_req_in == PTL_ACK_REQ) {
1148 hdr.msg.put.ack_wmd.wh_interface_cookie = ni->ni_interface_cookie;
1149 hdr.msg.put.ack_wmd.wh_object_cookie = md->md_lh.lh_cookie;
1151 hdr.msg.put.ack_wmd = PTL_WIRE_HANDLE_NONE;
1154 hdr.msg.put.match_bits = HTON__u64 (args->match_bits_in);
1155 hdr.msg.put.ptl_index = HTON__u32 (args->portal_in);
1156 hdr.msg.put.offset = HTON__u32 (args->offset_in);
1157 hdr.msg.put.hdr_data = args->hdr_data_in;
1159 lib_commit_md(nal, md, msg);
1161 msg->ev.type = PTL_EVENT_SENT;
1162 msg->ev.initiator.nid = ni->nid;
1163 msg->ev.initiator.pid = ni->pid;
1164 msg->ev.portal = args->portal_in;
1165 msg->ev.match_bits = args->match_bits_in;
1166 msg->ev.rlength = md->length;
1167 msg->ev.mlength = md->length;
1168 msg->ev.offset = args->offset_in;
1169 msg->ev.hdr_data = args->hdr_data_in;
1171 lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
1173 ni->counters.send_count++;
1174 ni->counters.send_length += md->length;
1176 state_unlock(nal, &flags);
1178 rc = lib_send (nal, private, msg, &hdr, PTL_MSG_PUT,
1179 id->nid, id->pid, md, 0, md->length);
1181 CERROR(LPU64": error sending PUT to "LPU64": %d\n",
1182 ni->nid, id->nid, rc);
1183 lib_finalize (nal, private, msg, rc);
1186 /* completion will be signalled by an event */
1187 return ret->rc = PTL_OK;
1191 lib_fake_reply_msg (nal_cb_t *nal, ptl_nid_t peer_nid, lib_md_t *getmd)
1193 /* The NAL can DMA direct to the GET md (i.e. no REPLY msg). This
1194 * returns a msg the NAL can pass to lib_finalize() so that a REPLY
1195 * event still occurs.
1197 * CAVEAT EMPTOR: 'getmd' is passed by pointer so it MUST be valid.
1198 * This can only be guaranteed while a lib_msg_t holds a reference
1199 * on it (ie. pending > 0), so best call this before the
1200 * lib_finalize() of the original GET. */
1202 lib_ni_t *ni = &nal->ni;
1203 lib_msg_t *msg = lib_msg_alloc(nal);
1204 unsigned long flags;
1206 state_lock(nal, &flags);
1208 LASSERT (getmd->pending > 0);
1211 CERROR ("Dropping REPLY from "LPU64": can't allocate msg\n",
1216 if (getmd->threshold == 0) {
1217 CERROR ("Dropping REPLY from "LPU64" for inactive MD %p\n",
1222 LASSERT (getmd->offset == 0);
1224 CDEBUG(D_NET, "Reply from "LPU64" md %p\n", peer_nid, getmd);
1226 lib_commit_md (nal, getmd, msg);
1228 msg->ev.type = PTL_EVENT_REPLY;
1229 msg->ev.initiator.nid = peer_nid;
1230 msg->ev.initiator.pid = 0; /* XXX FIXME!!! */
1231 msg->ev.rlength = msg->ev.mlength = getmd->length;
1234 lib_md_deconstruct(nal, getmd, &msg->ev.mem_desc);
1236 ni->counters.recv_count++;
1237 ni->counters.recv_length += getmd->length;
1239 state_unlock(nal, &flags);
1244 lib_msg_free(nal, msg);
1246 nal->ni.counters.drop_count++;
1247 nal->ni.counters.drop_length += getmd->length;
1249 state_unlock (nal, &flags);
1255 do_PtlGet(nal_cb_t *nal, void *private, void *v_args, void *v_ret)
1259 * ptl_handle_md_t md_in
1260 * ptl_process_id_t target_in
1261 * ptl_pt_index_t portal_in
1262 * ptl_ac_index_t cookie_in
1263 * ptl_match_bits_t match_bits_in
1264 * ptl_size_t offset_in
1269 PtlGet_in *args = v_args;
1270 ptl_process_id_t *id = &args->target_in;
1271 PtlGet_out *ret = v_ret;
1272 lib_ni_t *ni = &nal->ni;
1276 unsigned long flags;
1279 if (!list_empty (&nal->ni.ni_test_peers) && /* normally we don't */
1280 fail_peer (nal, id->nid, 1)) /* shall we now? */
1282 CERROR(LPU64": Dropping PUT to "LPU64": simulated failure\n",
1283 nal->ni.nid, id->nid);
1284 return (ret->rc = PTL_INV_PROC);
1287 msg = lib_msg_alloc(nal);
1289 CERROR(LPU64": Dropping GET to "LPU64": ENOMEM on lib_msg_t\n",
1291 return (ret->rc = PTL_NOSPACE);
1294 state_lock(nal, &flags);
1296 md = ptl_handle2md(&args->md_in, nal);
1297 if (md == NULL || !md->threshold) {
1298 lib_msg_free(nal, msg);
1299 state_unlock(nal, &flags);
1301 return ret->rc = PTL_INV_MD;
1304 CDEBUG(D_NET, "PtlGet -> %Lu: %lu\n", (unsigned long long)id->nid,
1305 (unsigned long)id->pid);
1307 memset (&hdr, 0, sizeof (hdr));
1308 hdr.type = HTON__u32 (PTL_MSG_GET);
1309 hdr.dest_nid = HTON__u64 (id->nid);
1310 hdr.src_nid = HTON__u64 (ni->nid);
1311 hdr.dest_pid = HTON__u32 (id->pid);
1312 hdr.src_pid = HTON__u32 (ni->pid);
1313 hdr.payload_length = 0;
1315 /* NB handles only looked up by creator (no flips) */
1316 hdr.msg.get.return_wmd.wh_interface_cookie = ni->ni_interface_cookie;
1317 hdr.msg.get.return_wmd.wh_object_cookie = md->md_lh.lh_cookie;
1319 hdr.msg.get.match_bits = HTON__u64 (args->match_bits_in);
1320 hdr.msg.get.ptl_index = HTON__u32 (args->portal_in);
1321 hdr.msg.get.src_offset = HTON__u32 (args->offset_in);
1322 hdr.msg.get.sink_length = HTON__u32 (md->length);
1324 lib_commit_md(nal, md, msg);
1326 msg->ev.type = PTL_EVENT_SENT;
1327 msg->ev.initiator.nid = ni->nid;
1328 msg->ev.initiator.pid = ni->pid;
1329 msg->ev.portal = args->portal_in;
1330 msg->ev.match_bits = args->match_bits_in;
1331 msg->ev.rlength = md->length;
1332 msg->ev.mlength = md->length;
1333 msg->ev.offset = args->offset_in;
1334 msg->ev.hdr_data = 0;
1336 lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
1338 ni->counters.send_count++;
1340 state_unlock(nal, &flags);
1342 rc = lib_send (nal, private, msg, &hdr, PTL_MSG_GET,
1343 id->nid, id->pid, NULL, 0, 0);
1345 CERROR(LPU64": error sending GET to "LPU64": %d\n",
1346 ni->nid, id->nid, rc);
1347 lib_finalize (nal, private, msg, rc);
1350 /* completion will be signalled by an event */
1351 return ret->rc = PTL_OK;
1354 void lib_assert_wire_constants (void)
1356 /* Wire protocol assertions generated by 'wirecheck'
1357 * running on Linux robert.bartonsoftware.com 2.4.20-18.9 #1 Thu May 29 06:54:41 EDT 2003 i68
1358 * with gcc version 3.2.2 20030222 (Red Hat Linux 3.2.2-5) */
1362 LASSERT (PORTALS_PROTO_MAGIC == 0xeebc0ded);
1363 LASSERT (PORTALS_PROTO_VERSION_MAJOR == 0);
1364 LASSERT (PORTALS_PROTO_VERSION_MINOR == 3);
1365 LASSERT (PTL_MSG_ACK == 0);
1366 LASSERT (PTL_MSG_PUT == 1);
1367 LASSERT (PTL_MSG_GET == 2);
1368 LASSERT (PTL_MSG_REPLY == 3);
1369 LASSERT (PTL_MSG_HELLO == 4);
1371 /* Checks for struct ptl_handle_wire_t */
1372 LASSERT ((int)sizeof(ptl_handle_wire_t) == 16);
1373 LASSERT (offsetof(ptl_handle_wire_t, wh_interface_cookie) == 0);
1374 LASSERT ((int)sizeof(((ptl_handle_wire_t *)0)->wh_interface_cookie) == 8);
1375 LASSERT (offsetof(ptl_handle_wire_t, wh_object_cookie) == 8);
1376 LASSERT ((int)sizeof(((ptl_handle_wire_t *)0)->wh_object_cookie) == 8);
1378 /* Checks for struct ptl_magicversion_t */
1379 LASSERT ((int)sizeof(ptl_magicversion_t) == 8);
1380 LASSERT (offsetof(ptl_magicversion_t, magic) == 0);
1381 LASSERT ((int)sizeof(((ptl_magicversion_t *)0)->magic) == 4);
1382 LASSERT (offsetof(ptl_magicversion_t, version_major) == 4);
1383 LASSERT ((int)sizeof(((ptl_magicversion_t *)0)->version_major) == 2);
1384 LASSERT (offsetof(ptl_magicversion_t, version_minor) == 6);
1385 LASSERT ((int)sizeof(((ptl_magicversion_t *)0)->version_minor) == 2);
1387 /* Checks for struct ptl_hdr_t */
1388 LASSERT ((int)sizeof(ptl_hdr_t) == 72);
1389 LASSERT (offsetof(ptl_hdr_t, dest_nid) == 0);
1390 LASSERT ((int)sizeof(((ptl_hdr_t *)0)->dest_nid) == 8);
1391 LASSERT (offsetof(ptl_hdr_t, src_nid) == 8);
1392 LASSERT ((int)sizeof(((ptl_hdr_t *)0)->src_nid) == 8);
1393 LASSERT (offsetof(ptl_hdr_t, dest_pid) == 16);
1394 LASSERT ((int)sizeof(((ptl_hdr_t *)0)->dest_pid) == 4);
1395 LASSERT (offsetof(ptl_hdr_t, src_pid) == 20);
1396 LASSERT ((int)sizeof(((ptl_hdr_t *)0)->src_pid) == 4);
1397 LASSERT (offsetof(ptl_hdr_t, type) == 24);
1398 LASSERT ((int)sizeof(((ptl_hdr_t *)0)->type) == 4);
1399 LASSERT (offsetof(ptl_hdr_t, payload_length) == 28);
1400 LASSERT ((int)sizeof(((ptl_hdr_t *)0)->payload_length) == 4);
1401 LASSERT (offsetof(ptl_hdr_t, msg) == 32);
1402 LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg) == 40);
1405 LASSERT (offsetof(ptl_hdr_t, msg.ack.dst_wmd) == 32);
1406 LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.ack.dst_wmd) == 16);
1407 LASSERT (offsetof(ptl_hdr_t, msg.ack.match_bits) == 48);
1408 LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.ack.match_bits) == 8);
1409 LASSERT (offsetof(ptl_hdr_t, msg.ack.mlength) == 56);
1410 LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.ack.mlength) == 4);
1413 LASSERT (offsetof(ptl_hdr_t, msg.put.ack_wmd) == 32);
1414 LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.put.ack_wmd) == 16);
1415 LASSERT (offsetof(ptl_hdr_t, msg.put.match_bits) == 48);
1416 LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.put.match_bits) == 8);
1417 LASSERT (offsetof(ptl_hdr_t, msg.put.hdr_data) == 56);
1418 LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.put.hdr_data) == 8);
1419 LASSERT (offsetof(ptl_hdr_t, msg.put.ptl_index) == 64);
1420 LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.put.ptl_index) == 4);
1421 LASSERT (offsetof(ptl_hdr_t, msg.put.offset) == 68);
1422 LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.put.offset) == 4);
1425 LASSERT (offsetof(ptl_hdr_t, msg.get.return_wmd) == 32);
1426 LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.get.return_wmd) == 16);
1427 LASSERT (offsetof(ptl_hdr_t, msg.get.match_bits) == 48);
1428 LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.get.match_bits) == 8);
1429 LASSERT (offsetof(ptl_hdr_t, msg.get.ptl_index) == 56);
1430 LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.get.ptl_index) == 4);
1431 LASSERT (offsetof(ptl_hdr_t, msg.get.src_offset) == 60);
1432 LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.get.src_offset) == 4);
1433 LASSERT (offsetof(ptl_hdr_t, msg.get.sink_length) == 64);
1434 LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.get.sink_length) == 4);
1437 LASSERT (offsetof(ptl_hdr_t, msg.reply.dst_wmd) == 32);
1438 LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.reply.dst_wmd) == 16);
1441 LASSERT (offsetof(ptl_hdr_t, msg.hello.incarnation) == 32);
1442 LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.hello.incarnation) == 8);
1443 LASSERT (offsetof(ptl_hdr_t, msg.hello.type) == 40);
1444 LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.hello.type) == 4);