1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Data movement routines
7 * Copyright (c) 2001-2003 Cluster File Systems, Inc.
8 * Copyright (c) 2001-2002 Sandia National Laboratories
10 * This file is part of Lustre, http://www.sf.net/projects/lustre/
12 * Lustre is free software; you can redistribute it and/or
13 * modify it under the terms of version 2 of the GNU General Public
14 * License as published by the Free Software Foundation.
16 * Lustre is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU General Public License for more details.
21 * You should have received a copy of the GNU General Public License
22 * along with Lustre; if not, write to the Free Software
23 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
29 # define DEBUG_SUBSYSTEM S_PORTALS
30 # include <linux/kp30.h>
32 #include <portals/p30.h>
33 #include <portals/lib-p30.h>
34 #include <portals/arg-blocks.h>
37 * Right now it does not check access control lists.
39 * We only support one MD per ME, which is how the Portals 3.1 spec is written.
40 * All previous complication is removed.
44 lib_find_me(nal_cb_t *nal, int index, int op_mask, ptl_nid_t src_nid,
45 ptl_pid_t src_pid, ptl_size_t rlength, ptl_size_t roffset,
46 ptl_match_bits_t match_bits, ptl_size_t *mlength_out,
47 ptl_size_t *offset_out, int *unlink_out)
49 lib_ni_t *ni = &nal->ni;
50 struct list_head *match_list = &ni->tbl.tbl[index];
51 struct list_head *tmp;
59 CDEBUG (D_NET, "Request from "LPU64".%d of length %d into portal %d "
60 "MB="LPX64"\n", src_nid, src_pid, rlength, index, match_bits);
62 if (index < 0 || index >= ni->tbl.size) {
63 CERROR("Invalid portal %d not in [0-%d]\n",
68 list_for_each (tmp, match_list) {
69 me = list_entry(tmp, lib_me_t, me_list);
72 /* ME attached but MD not attached yet */
76 LASSERT (me == md->me);
79 if (md->threshold == 0)
82 /* mismatched MD op */
83 if ((md->options & op_mask) == 0)
86 /* mismatched ME nid/pid? */
87 if (me->match_id.nid != PTL_NID_ANY &&
88 me->match_id.nid != src_nid)
91 if (me->match_id.pid != PTL_PID_ANY &&
92 me->match_id.pid != src_pid)
95 /* mismatched ME matchbits? */
96 if (((me->match_bits ^ match_bits) & ~me->ignore_bits) != 0)
99 /* Hurrah! This _is_ a match; check it out... */
101 if ((md->options & PTL_MD_MANAGE_REMOTE) == 0)
106 mlength = md->length - offset;
107 if ((md->options & PTL_MD_MAX_SIZE) != 0 &&
108 mlength > md->max_size)
109 mlength = md->max_size;
111 if (rlength <= mlength) { /* fits in allowed space */
113 } else if ((md->options & PTL_MD_TRUNCATE) == 0) {
114 /* this packet _really_ is too big */
115 CERROR("Matching packet %d too big: %d left, "
116 "%d allowed\n", rlength, md->length - offset,
121 md->offset = offset + mlength;
123 *offset_out = offset;
124 *mlength_out = mlength;
125 *unlink_out = ((md->options & PTL_MD_AUTO_UNLINK) != 0 &&
126 md->offset >= (md->length - md->max_size));
131 CERROR (LPU64": Dropping %s from "LPU64".%d portal %d match "LPX64
132 " offset %d length %d: no match\n",
133 ni->nid, (op_mask == PTL_MD_OP_GET) ? "GET" : "PUT",
134 src_nid, src_pid, index, match_bits, roffset, rlength);
138 int do_PtlFailNid (nal_cb_t *nal, void *private, void *v_args, void *v_ret)
140 PtlFailNid_in *args = v_args;
141 PtlFailNid_out *ret = v_ret;
144 struct list_head *el;
145 struct list_head *next;
146 struct list_head cull;
148 if (args->threshold != 0) {
149 /* Adding a new entry */
150 tp = (lib_test_peer_t *)nal->cb_malloc (nal, sizeof (*tp));
152 return (ret->rc = PTL_FAIL);
154 tp->tp_nid = args->nid;
155 tp->tp_threshold = args->threshold;
157 state_lock (nal, &flags);
158 list_add (&tp->tp_list, &nal->ni.ni_test_peers);
159 state_unlock (nal, &flags);
160 return (ret->rc = PTL_OK);
163 /* removing entries */
164 INIT_LIST_HEAD (&cull);
166 state_lock (nal, &flags);
168 list_for_each_safe (el, next, &nal->ni.ni_test_peers) {
169 tp = list_entry (el, lib_test_peer_t, tp_list);
171 if (tp->tp_threshold == 0 || /* needs culling anyway */
172 args->nid == PTL_NID_ANY || /* removing all entries */
173 tp->tp_nid == args->nid) /* matched this one */
175 list_del (&tp->tp_list);
176 list_add (&tp->tp_list, &cull);
180 state_unlock (nal, &flags);
182 while (!list_empty (&cull)) {
183 tp = list_entry (cull.next, lib_test_peer_t, tp_list);
185 list_del (&tp->tp_list);
186 nal->cb_free (nal, tp, sizeof (*tp));
188 return (ret->rc = PTL_OK);
192 fail_peer (nal_cb_t *nal, ptl_nid_t nid, int outgoing)
195 struct list_head *el;
196 struct list_head *next;
198 struct list_head cull;
201 INIT_LIST_HEAD (&cull);
203 state_lock (nal, &flags);
205 list_for_each_safe (el, next, &nal->ni.ni_test_peers) {
206 tp = list_entry (el, lib_test_peer_t, tp_list);
208 if (tp->tp_threshold == 0) {
211 /* only cull zombies on outgoing tests,
212 * since we may be at interrupt priority on
213 * incoming messages. */
214 list_del (&tp->tp_list);
215 list_add (&tp->tp_list, &cull);
220 if (tp->tp_nid == PTL_NID_ANY || /* fail every peer */
221 nid == tp->tp_nid) { /* fail this peer */
224 if (tp->tp_threshold != PTL_MD_THRESH_INF) {
227 tp->tp_threshold == 0) {
229 list_del (&tp->tp_list);
230 list_add (&tp->tp_list, &cull);
237 state_unlock (nal, &flags);
239 while (!list_empty (&cull)) {
240 tp = list_entry (cull.next, lib_test_peer_t, tp_list);
241 list_del (&tp->tp_list);
243 nal->cb_free (nal, tp, sizeof (*tp));
250 lib_iov_nob (int niov, struct iovec *iov)
255 nob += (iov++)->iov_len;
261 lib_copy_iov2buf (char *dest, int niov, struct iovec *iov, ptl_size_t len)
268 nob = MIN (iov->iov_len, len);
269 memcpy (dest, iov->iov_base, nob);
279 lib_copy_buf2iov (int niov, struct iovec *iov, char *src, ptl_size_t len)
286 nob = MIN (iov->iov_len, len);
287 memcpy (iov->iov_base, src, nob);
297 lib_extract_iov (struct iovec *dst, lib_md_t *md,
298 ptl_size_t offset, ptl_size_t len)
300 /* Initialise 'dst' to the subset of 'src' starting at 'offset',
301 * for exactly 'len' bytes, and return the number of entries.
302 * NB not destructive to 'src' */
303 int src_niov = md->md_niov;
304 struct iovec *src = md->md_iov.iov;
308 LASSERT (offset + len <= md->length);
310 if (len == 0) /* no data => */
311 return (0); /* no frags */
313 LASSERT (src_niov > 0);
314 while (offset >= src->iov_len) { /* skip initial frags */
315 offset -= src->iov_len;
318 LASSERT (src_niov > 0);
323 LASSERT (src_niov > 0);
324 LASSERT (dst_niov <= PTL_MD_MAX_IOV);
326 frag_len = src->iov_len - offset;
327 dst->iov_base = ((char *)src->iov_base) + offset;
329 if (len <= frag_len) {
334 dst->iov_len = frag_len;
347 lib_kiov_nob (int niov, ptl_kiov_t *kiov)
354 lib_copy_kiov2buf (char *dest, int niov, ptl_kiov_t *kiov, ptl_size_t len)
360 lib_copy_buf2kiov (int niov, ptl_kiov_t *kiov, char *dest, ptl_size_t len)
366 lib_extract_kiov (ptl_kiov_t *dst, lib_md_t *md,
367 ptl_size_t offset, ptl_size_t len)
375 lib_kiov_nob (int niov, ptl_kiov_t *kiov)
380 nob += (kiov++)->kiov_len;
386 lib_copy_kiov2buf (char *dest, int niov, ptl_kiov_t *kiov, ptl_size_t len)
391 LASSERT (!in_interrupt ());
395 nob = MIN (kiov->kiov_len, len);
397 addr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
398 memcpy (dest, addr, nob);
399 kunmap (kiov->kiov_page);
409 lib_copy_buf2kiov (int niov, ptl_kiov_t *kiov, char *src, ptl_size_t len)
414 LASSERT (!in_interrupt ());
418 nob = MIN (kiov->kiov_len, len);
420 addr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
421 memcpy (addr, src, nob);
422 kunmap (kiov->kiov_page);
432 lib_extract_kiov (ptl_kiov_t *dst, lib_md_t *md,
433 ptl_size_t offset, ptl_size_t len)
435 /* Initialise 'dst' to the subset of 'src' starting at 'offset',
436 * for exactly 'len' bytes, and return the number of entries.
437 * NB not destructive to 'src' */
438 int src_niov = md->md_niov;
439 ptl_kiov_t *src = md->md_iov.kiov;
443 LASSERT (offset + len <= md->length);
445 if (len == 0) /* no data => */
446 return (0); /* no frags */
448 LASSERT (src_niov > 0);
449 while (offset >= src->kiov_len) { /* skip initial frags */
450 offset -= src->kiov_len;
453 LASSERT (src_niov > 0);
458 LASSERT (src_niov > 0);
459 LASSERT (dst_niov <= PTL_MD_MAX_IOV);
461 frag_len = src->kiov_len - offset;
462 dst->kiov_page = src->kiov_page;
463 dst->kiov_offset = src->kiov_offset + offset;
465 if (len <= frag_len) {
467 LASSERT (dst->kiov_offset + dst->kiov_len <= PAGE_SIZE);
471 dst->kiov_len = frag_len;
472 LASSERT (dst->kiov_offset + dst->kiov_len <= PAGE_SIZE);
485 lib_recv (nal_cb_t *nal, void *private, lib_msg_t *msg, lib_md_t *md,
486 ptl_size_t offset, ptl_size_t mlen, ptl_size_t rlen)
491 nal->cb_recv (nal, private, msg, 0, NULL, 0, rlen);
492 else if ((md->options & PTL_MD_KIOV) == 0) {
493 niov = lib_extract_iov (msg->msg_iov.iov, md, offset, mlen);
494 nal->cb_recv (nal, private, msg,
495 niov, msg->msg_iov.iov, mlen, rlen);
497 niov = lib_extract_kiov (msg->msg_iov.kiov, md, offset, mlen);
498 nal->cb_recv_pages (nal, private, msg,
499 niov, msg->msg_iov.kiov, mlen, rlen);
504 lib_send (nal_cb_t *nal, void *private, lib_msg_t *msg,
505 ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
506 lib_md_t *md, ptl_size_t offset, ptl_size_t len)
511 return (nal->cb_send (nal, private, msg,
515 if ((md->options & PTL_MD_KIOV) == 0) {
516 niov = lib_extract_iov (msg->msg_iov.iov, md, offset, len);
517 return (nal->cb_send (nal, private, msg,
519 niov, msg->msg_iov.iov, len));
522 niov = lib_extract_kiov (msg->msg_iov.kiov, md, offset, len);
523 return (nal->cb_send_pages (nal, private, msg,
525 niov, msg->msg_iov.kiov, len));
529 get_new_msg (nal_cb_t *nal, lib_md_t *md)
531 /* ALWAYS called holding the state_lock */
532 lib_counters_t *counters = &nal->ni.counters;
533 lib_msg_t *msg = lib_msg_alloc (nal);
538 memset (msg, 0, sizeof (*msg));
543 do_gettimeofday(&msg->ev.arrival_time);
545 if (md->threshold != PTL_MD_THRESH_INF) {
546 LASSERT (md->threshold > 0);
550 counters->msgs_alloc++;
551 if (counters->msgs_alloc > counters->msgs_max)
552 counters->msgs_max = counters->msgs_alloc;
554 list_add (&msg->msg_list, &nal->ni.ni_active_msgs);
560 * Incoming messages have a ptl_msg_t object associated with them
561 * by the library. This object encapsulates the state of the
562 * message and allows the NAL to do non-blocking receives or sends
566 static int parse_put(nal_cb_t * nal, ptl_hdr_t * hdr, void *private)
568 lib_ni_t *ni = &nal->ni;
569 ptl_size_t mlength = 0;
570 ptl_size_t offset = 0;
577 /* Convert put fields to host byte order */
578 hdr->msg.put.match_bits = NTOH__u64 (hdr->msg.put.match_bits);
579 hdr->msg.put.ptl_index = NTOH__u32 (hdr->msg.put.ptl_index);
580 hdr->msg.put.offset = NTOH__u32 (hdr->msg.put.offset);
582 state_lock(nal, &flags);
584 me = lib_find_me(nal, hdr->msg.put.ptl_index, PTL_MD_OP_PUT,
585 hdr->src_nid, hdr->src_pid,
586 hdr->payload_length, hdr->msg.put.offset,
587 hdr->msg.put.match_bits,
588 &mlength, &offset, &unlink);
593 CDEBUG(D_NET, "Incoming put index %x from "LPU64"/%u of length %d/%d "
594 "into md "LPX64" [%d] + %d\n", hdr->msg.put.ptl_index,
595 hdr->src_nid, hdr->src_pid, mlength, hdr->payload_length,
596 md->md_lh.lh_cookie, md->md_niov, offset);
598 msg = get_new_msg (nal, md);
600 CERROR(LPU64": Dropping PUT from "LPU64": can't allocate msg\n",
601 ni->nid, hdr->src_nid);
605 if (!ptl_is_wire_handle_none(&hdr->msg.put.ack_wmd) &&
606 !(md->options & PTL_MD_ACK_DISABLE)) {
608 msg->ack_wmd = hdr->msg.put.ack_wmd;
609 msg->nid = hdr->src_nid;
610 msg->pid = hdr->src_pid;
611 msg->ev.match_bits = hdr->msg.put.match_bits;
615 msg->ev.type = PTL_EVENT_PUT;
616 msg->ev.initiator.nid = hdr->src_nid;
617 msg->ev.initiator.pid = hdr->src_pid;
618 msg->ev.portal = hdr->msg.put.ptl_index;
619 msg->ev.match_bits = hdr->msg.put.match_bits;
620 msg->ev.rlength = hdr->payload_length;
621 msg->ev.mlength = mlength;
622 msg->ev.offset = offset;
623 msg->ev.hdr_data = hdr->msg.put.hdr_data;
625 /* NB if this match has exhausted the MD, we can't be sure
626 * that this event will the the last one associated with
627 * this MD in the event queue (another message already
628 * matching this ME/MD could end up being last). So we
629 * remember the ME handle anyway and check again when we're
630 * allocating our slot in the event queue.
632 ptl_me2handle (&msg->ev.unlinked_me, me);
634 lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
637 ni->counters.recv_count++;
638 ni->counters.recv_length += mlength;
640 /* only unlink after MD's pending count has been bumped
641 * in get_new_msg() otherwise lib_me_unlink() will nuke it */
643 md->md_flags |= PTL_MD_FLAG_AUTO_UNLINKED;
644 lib_me_unlink (nal, me);
647 state_unlock(nal, &flags);
649 lib_recv (nal, private, msg, md, offset, mlength, hdr->payload_length);
653 nal->ni.counters.drop_count++;
654 nal->ni.counters.drop_length += hdr->payload_length;
655 state_unlock (nal, &flags);
656 lib_recv (nal, private, NULL, NULL, 0, 0, hdr->payload_length);
660 static int parse_get(nal_cb_t * nal, ptl_hdr_t * hdr, void *private)
662 lib_ni_t *ni = &nal->ni;
663 ptl_size_t mlength = 0;
664 ptl_size_t offset = 0;
673 /* Convert get fields to host byte order */
674 hdr->msg.get.match_bits = NTOH__u64 (hdr->msg.get.match_bits);
675 hdr->msg.get.ptl_index = NTOH__u32 (hdr->msg.get.ptl_index);
676 hdr->msg.get.sink_length = NTOH__u32 (hdr->msg.get.sink_length);
677 hdr->msg.get.src_offset = NTOH__u32 (hdr->msg.get.src_offset);
679 state_lock(nal, &flags);
681 me = lib_find_me(nal, hdr->msg.get.ptl_index, PTL_MD_OP_GET,
682 hdr->src_nid, hdr->src_pid,
683 hdr->msg.get.sink_length, hdr->msg.get.src_offset,
684 hdr->msg.get.match_bits,
685 &mlength, &offset, &unlink);
690 CDEBUG(D_NET, "Incoming get index %d from "LPU64".%u of length %d/%d "
691 "from md "LPX64" [%d] + %d\n", hdr->msg.get.ptl_index,
692 hdr->src_nid, hdr->src_pid, mlength, hdr->payload_length,
693 md->md_lh.lh_cookie, md->md_niov, offset);
695 msg = get_new_msg (nal, md);
697 CERROR(LPU64": Dropping GET from "LPU64": can't allocate msg\n",
698 ni->nid, hdr->src_nid);
703 msg->ev.type = PTL_EVENT_GET;
704 msg->ev.initiator.nid = hdr->src_nid;
705 msg->ev.initiator.pid = hdr->src_pid;
706 msg->ev.portal = hdr->msg.get.ptl_index;
707 msg->ev.match_bits = hdr->msg.get.match_bits;
708 msg->ev.rlength = hdr->payload_length;
709 msg->ev.mlength = mlength;
710 msg->ev.offset = offset;
711 msg->ev.hdr_data = 0;
713 /* NB if this match has exhausted the MD, we can't be sure
714 * that this event will the the last one associated with
715 * this MD in the event queue (another message already
716 * matching this ME/MD could end up being last). So we
717 * remember the ME handle anyway and check again when we're
718 * allocating our slot in the event queue.
720 ptl_me2handle (&msg->ev.unlinked_me, me);
722 lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
725 ni->counters.send_count++;
726 ni->counters.send_length += mlength;
728 /* only unlink after MD's refcount has been bumped
729 * in get_new_msg() otherwise lib_me_unlink() will nuke it */
731 md->md_flags |= PTL_MD_FLAG_AUTO_UNLINKED;
732 lib_me_unlink (nal, me);
735 state_unlock(nal, &flags);
737 memset (&reply, 0, sizeof (reply));
738 reply.type = HTON__u32 (PTL_MSG_REPLY);
739 reply.dest_nid = HTON__u64 (hdr->src_nid);
740 reply.src_nid = HTON__u64 (ni->nid);
741 reply.dest_pid = HTON__u32 (hdr->src_pid);
742 reply.src_pid = HTON__u32 (ni->pid);
743 reply.payload_length = HTON__u32 (mlength);
745 reply.msg.reply.dst_wmd = hdr->msg.get.return_wmd;
747 rc = lib_send (nal, private, msg, &reply, PTL_MSG_REPLY,
748 hdr->src_nid, hdr->src_pid, md, offset, mlength);
750 CERROR(LPU64": Dropping GET from "LPU64": send REPLY failed\n",
751 ni->nid, hdr->src_nid);
752 /* Hmm, this will create a GET event and make believe
753 * the reply completed, which it kind of did, only the
754 * source won't get her reply */
755 lib_finalize (nal, private, msg);
756 state_lock (nal, &flags);
760 /* Complete the incoming message */
761 lib_recv (nal, private, NULL, NULL, 0, 0, hdr->payload_length);
764 ni->counters.drop_count++;
765 ni->counters.drop_length += hdr->msg.get.sink_length;
766 state_unlock(nal, &flags);
767 lib_recv (nal, private, NULL, NULL, 0, 0, hdr->payload_length);
771 static int parse_reply(nal_cb_t * nal, ptl_hdr_t * hdr, void *private)
773 lib_ni_t *ni = &nal->ni;
780 state_lock(nal, &flags);
782 /* NB handles only looked up by creator (no flips) */
783 md = ptl_wire_handle2md(&hdr->msg.reply.dst_wmd, nal);
784 if (md == NULL || md->threshold == 0) {
785 CERROR (LPU64": Dropping REPLY from "LPU64" for %s MD "LPX64"."LPX64"\n",
786 ni->nid, hdr->src_nid,
787 md == NULL ? "invalid" : "inactive",
788 hdr->msg.reply.dst_wmd.wh_interface_cookie,
789 hdr->msg.reply.dst_wmd.wh_object_cookie);
793 LASSERT (md->offset == 0);
795 length = rlength = hdr->payload_length;
797 if (length > md->length) {
798 if ((md->options & PTL_MD_TRUNCATE) == 0) {
799 CERROR (LPU64": Dropping REPLY from "LPU64
800 " length %d for MD "LPX64" would overflow (%d)\n",
801 ni->nid, hdr->src_nid, length,
802 hdr->msg.reply.dst_wmd.wh_object_cookie,
809 CDEBUG(D_NET, "Reply from "LPU64" of length %d/%d into md "LPX64"\n",
810 hdr->src_nid, length, rlength,
811 hdr->msg.reply.dst_wmd.wh_object_cookie);
813 msg = get_new_msg (nal, md);
815 CERROR(LPU64": Dropping REPLY from "LPU64": can't "
816 "allocate msg\n", ni->nid, hdr->src_nid);
821 msg->ev.type = PTL_EVENT_REPLY;
822 msg->ev.initiator.nid = hdr->src_nid;
823 msg->ev.initiator.pid = hdr->src_pid;
824 msg->ev.rlength = rlength;
825 msg->ev.mlength = length;
828 lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
831 ni->counters.recv_count++;
832 ni->counters.recv_length += length;
834 state_unlock(nal, &flags);
836 lib_recv (nal, private, msg, md, 0, length, rlength);
840 nal->ni.counters.drop_count++;
841 nal->ni.counters.drop_length += hdr->payload_length;
842 state_unlock (nal, &flags);
843 lib_recv (nal, private, NULL, NULL, 0, 0, hdr->payload_length);
847 static int parse_ack(nal_cb_t * nal, ptl_hdr_t * hdr, void *private)
849 lib_ni_t *ni = &nal->ni;
851 lib_msg_t *msg = NULL;
854 /* Convert ack fields to host byte order */
855 hdr->msg.ack.match_bits = NTOH__u64 (hdr->msg.ack.match_bits);
856 hdr->msg.ack.mlength = NTOH__u32 (hdr->msg.ack.mlength);
858 state_lock(nal, &flags);
860 /* NB handles only looked up by creator (no flips) */
861 md = ptl_wire_handle2md(&hdr->msg.ack.dst_wmd, nal);
862 if (md == NULL || md->threshold == 0) {
863 CDEBUG(D_INFO, LPU64": Dropping ACK from "LPU64" to %s MD "
864 LPX64"."LPX64"\n", ni->nid, hdr->src_nid,
865 (md == NULL) ? "invalid" : "inactive",
866 hdr->msg.ack.dst_wmd.wh_interface_cookie,
867 hdr->msg.ack.dst_wmd.wh_object_cookie);
871 CDEBUG(D_NET, LPU64": ACK from "LPU64" into md "LPX64"\n",
872 ni->nid, hdr->src_nid,
873 hdr->msg.ack.dst_wmd.wh_object_cookie);
875 msg = get_new_msg (nal, md);
877 CERROR(LPU64": Dropping ACK from "LPU64": can't allocate msg\n",
878 ni->nid, hdr->src_nid);
883 msg->ev.type = PTL_EVENT_ACK;
884 msg->ev.initiator.nid = hdr->src_nid;
885 msg->ev.initiator.pid = hdr->src_pid;
886 msg->ev.mlength = hdr->msg.ack.mlength;
887 msg->ev.match_bits = hdr->msg.ack.match_bits;
889 lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
892 ni->counters.recv_count++;
893 state_unlock(nal, &flags);
894 lib_recv (nal, private, msg, NULL, 0, 0, hdr->payload_length);
898 nal->ni.counters.drop_count++;
899 state_unlock (nal, &flags);
900 lib_recv (nal, private, NULL, NULL, 0, 0, hdr->payload_length);
905 hdr_type_string (ptl_hdr_t *hdr)
919 return ("<UNKNOWN>");
923 void print_hdr(nal_cb_t * nal, ptl_hdr_t * hdr)
925 char *type_str = hdr_type_string (hdr);
927 nal->cb_printf(nal, "P3 Header at %p of type %s\n", hdr, type_str);
928 nal->cb_printf(nal, " From nid/pid %Lu/%Lu", hdr->src_nid,
930 nal->cb_printf(nal, " To nid/pid %Lu/%Lu\n", hdr->dest_nid,
939 " Ptl index %d, ack md "LPX64"."LPX64", "
940 "match bits "LPX64"\n",
941 hdr->msg.put.ptl_index,
942 hdr->msg.put.ack_wmd.wh_interface_cookie,
943 hdr->msg.put.ack_wmd.wh_object_cookie,
944 hdr->msg.put.match_bits);
946 " Length %d, offset %d, hdr data "LPX64"\n",
947 hdr->payload_length, hdr->msg.put.offset,
948 hdr->msg.put.hdr_data);
953 " Ptl index %d, return md "LPX64"."LPX64", "
954 "match bits "LPX64"\n", hdr->msg.get.ptl_index,
955 hdr->msg.get.return_wmd.wh_interface_cookie,
956 hdr->msg.get.return_wmd.wh_object_cookie,
957 hdr->msg.get.match_bits);
959 " Length %d, src offset %d\n",
960 hdr->msg.get.sink_length,
961 hdr->msg.get.src_offset);
965 nal->cb_printf(nal, " dst md "LPX64"."LPX64", "
966 "manipulated length %d\n",
967 hdr->msg.ack.dst_wmd.wh_interface_cookie,
968 hdr->msg.ack.dst_wmd.wh_object_cookie,
969 hdr->msg.ack.mlength);
973 nal->cb_printf(nal, " dst md "LPX64"."LPX64", "
975 hdr->msg.reply.dst_wmd.wh_interface_cookie,
976 hdr->msg.reply.dst_wmd.wh_object_cookie,
977 hdr->payload_length);
980 } /* end of print_hdr() */
983 int lib_parse(nal_cb_t * nal, ptl_hdr_t * hdr, void *private)
987 /* convert common fields to host byte order */
988 hdr->dest_nid = NTOH__u64 (hdr->dest_nid);
989 hdr->src_nid = NTOH__u64 (hdr->src_nid);
990 hdr->dest_pid = NTOH__u32 (hdr->dest_pid);
991 hdr->src_pid = NTOH__u32 (hdr->src_pid);
992 hdr->type = NTOH__u32 (hdr->type);
993 hdr->payload_length = NTOH__u32(hdr->payload_length);
995 nal->cb_printf(nal, "%d: lib_parse: nal=%p hdr=%p type=%d\n",
996 nal->ni.nid, nal, hdr, hdr->type);
999 if (hdr->type == PTL_MSG_HELLO) {
1000 /* dest_nid is really ptl_magicversion_t */
1001 ptl_magicversion_t *mv = (ptl_magicversion_t *)&hdr->dest_nid;
1003 CERROR (LPU64": Dropping unexpected HELLO message: "
1004 "magic %d, version %d.%d from "LPD64"\n",
1005 nal->ni.nid, mv->magic,
1006 mv->version_major, mv->version_minor,
1008 lib_recv (nal, private, NULL, NULL, 0, 0, hdr->payload_length);
1012 if (hdr->dest_nid != nal->ni.nid) {
1013 CERROR(LPU64": Dropping %s message from "LPU64" to "LPU64
1014 " (not me)\n", nal->ni.nid, hdr_type_string (hdr),
1015 hdr->src_nid, hdr->dest_nid);
1017 state_lock (nal, &flags);
1018 nal->ni.counters.drop_count++;
1019 nal->ni.counters.drop_length += hdr->payload_length;
1020 state_unlock (nal, &flags);
1022 lib_recv (nal, private, NULL, NULL, 0, 0, hdr->payload_length);
1026 if (!list_empty (&nal->ni.ni_test_peers) && /* normally we don't */
1027 fail_peer (nal, hdr->src_nid, 0)) /* shall we now? */
1029 CERROR(LPU64": Dropping incoming %s from "LPU64
1030 ": simulated failure\n",
1031 nal->ni.nid, hdr_type_string (hdr),
1033 lib_recv (nal, private, NULL, NULL, 0, 0, hdr->payload_length);
1037 switch (hdr->type) {
1039 return (parse_ack(nal, hdr, private));
1041 return (parse_put(nal, hdr, private));
1044 return (parse_get(nal, hdr, private));
1047 return (parse_reply(nal, hdr, private));
1050 CERROR(LPU64": Dropping <unknown> message from "LPU64
1051 ": Bad type=0x%x\n", nal->ni.nid, hdr->src_nid,
1054 lib_recv (nal, private, NULL, NULL, 0, 0, hdr->payload_length);
1060 int do_PtlPut(nal_cb_t * nal, void *private, void *v_args, void *v_ret)
1064 * ptl_handle_md_t md_in
1065 * ptl_ack_req_t ack_req_in
1066 * ptl_process_id_t target_in
1067 * ptl_pt_index_t portal_in
1068 * ptl_ac_index_t cookie_in
1069 * ptl_match_bits_t match_bits_in
1070 * ptl_size_t offset_in
1075 PtlPut_in *args = v_args;
1076 PtlPut_out *ret = v_ret;
1079 lib_ni_t *ni = &nal->ni;
1081 lib_msg_t *msg = NULL;
1082 ptl_process_id_t *id = &args->target_in;
1083 unsigned long flags;
1086 if (!list_empty (&nal->ni.ni_test_peers) && /* normally we don't */
1087 fail_peer (nal, id->nid, 1)) /* shall we now? */
1089 CERROR(LPU64": Dropping PUT to "LPU64": simulated failure\n",
1090 nal->ni.nid, id->nid);
1091 return (ret->rc = PTL_INV_PROC);
1095 state_lock(nal, &flags);
1096 md = ptl_handle2md(&args->md_in, nal);
1097 if (md == NULL || !md->threshold) {
1098 state_unlock(nal, &flags);
1099 return ret->rc = PTL_INV_MD;
1102 CDEBUG(D_NET, "PtlPut -> %Lu: %lu\n", (unsigned long long)id->nid,
1103 (unsigned long)id->pid);
1105 memset (&hdr, 0, sizeof (hdr));
1106 hdr.type = HTON__u32 (PTL_MSG_PUT);
1107 hdr.dest_nid = HTON__u64 (id->nid);
1108 hdr.src_nid = HTON__u64 (ni->nid);
1109 hdr.dest_pid = HTON__u32 (id->pid);
1110 hdr.src_pid = HTON__u32 (ni->pid);
1111 hdr.payload_length = HTON__u32 (md->length);
1113 /* NB handles only looked up by creator (no flips) */
1114 if (args->ack_req_in == PTL_ACK_REQ) {
1115 hdr.msg.put.ack_wmd.wh_interface_cookie = ni->ni_interface_cookie;
1116 hdr.msg.put.ack_wmd.wh_object_cookie = md->md_lh.lh_cookie;
1118 hdr.msg.put.ack_wmd = PTL_WIRE_HANDLE_NONE;
1121 hdr.msg.put.match_bits = HTON__u64 (args->match_bits_in);
1122 hdr.msg.put.ptl_index = HTON__u32 (args->portal_in);
1123 hdr.msg.put.offset = HTON__u32 (args->offset_in);
1124 hdr.msg.put.hdr_data = args->hdr_data_in;
1126 ni->counters.send_count++;
1127 ni->counters.send_length += md->length;
1129 msg = get_new_msg (nal, md);
1131 CERROR("BAD: could not allocate msg!\n");
1132 state_unlock(nal, &flags);
1133 return ret->rc = PTL_NOSPACE;
1137 * If this memory descriptor has an event queue associated with
1138 * it we need to allocate a message state object and record the
1139 * information about this operation that will be recorded into
1140 * event queue once the message has been completed.
1142 * NB. We're now committed to the GET, since we just marked the MD
1143 * busy. Callers who observe this (by getting PTL_MD_INUSE from
1144 * PtlMDUnlink()) expect a completion event to tell them when the
1148 msg->ev.type = PTL_EVENT_SENT;
1149 msg->ev.initiator.nid = ni->nid;
1150 msg->ev.initiator.pid = ni->pid;
1151 msg->ev.portal = args->portal_in;
1152 msg->ev.match_bits = args->match_bits_in;
1153 msg->ev.rlength = md->length;
1154 msg->ev.mlength = md->length;
1155 msg->ev.offset = args->offset_in;
1156 msg->ev.hdr_data = args->hdr_data_in;
1158 lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
1161 state_unlock(nal, &flags);
1163 rc = lib_send (nal, private, msg, &hdr, PTL_MSG_PUT,
1164 id->nid, id->pid, md, 0, md->length);
1166 /* get_new_msg() committed us to sending by decrementing
1167 * md->threshold, so we have to act like we did send, but
1168 * the network dropped it. */
1169 lib_finalize (nal, private, msg);
1172 return ret->rc = PTL_OK;
1175 lib_msg_t * lib_fake_reply_msg (nal_cb_t *nal, ptl_nid_t peer_nid,
1178 /* The NAL can DMA direct to the GET md (i.e. no REPLY msg). This
1179 * returns a msg the NAL can pass to lib_finalize() so that a REPLY
1180 * event still occurs.
1182 * CAVEAT EMPTOR: 'getmd' is passed by pointer so it MUST be valid.
1183 * This can only be guaranteed while a lib_msg_t holds a reference
1184 * on it (ie. pending > 0), so best call this before the
1185 * lib_finalize() of the original GET. */
1187 lib_ni_t *ni = &nal->ni;
1189 unsigned long flags;
1191 state_lock(nal, &flags);
1193 LASSERT (getmd->pending > 0);
1195 if (getmd->threshold == 0) {
1196 CERROR ("Dropping REPLY from "LPU64" for inactive MD %p\n",
1201 LASSERT (getmd->offset == 0);
1203 CDEBUG(D_NET, "Reply from "LPU64" md %p\n", peer_nid, getmd);
1205 msg = get_new_msg (nal, getmd);
1207 CERROR("Dropping REPLY from "LPU64" md %p: can't allocate msg\n",
1213 msg->ev.type = PTL_EVENT_REPLY;
1214 msg->ev.initiator.nid = peer_nid;
1215 msg->ev.initiator.pid = 0; /* XXX FIXME!!! */
1216 msg->ev.rlength = msg->ev.mlength = getmd->length;
1219 lib_md_deconstruct(nal, getmd, &msg->ev.mem_desc);
1222 ni->counters.recv_count++;
1223 ni->counters.recv_length += getmd->length;
1225 state_unlock(nal, &flags);
1230 nal->ni.counters.drop_count++;
1231 nal->ni.counters.drop_length += getmd->length;
1233 state_unlock (nal, &flags);
1238 int do_PtlGet(nal_cb_t * nal, void *private, void *v_args, void *v_ret)
1242 * ptl_handle_md_t md_in
1243 * ptl_process_id_t target_in
1244 * ptl_pt_index_t portal_in
1245 * ptl_ac_index_t cookie_in
1246 * ptl_match_bits_t match_bits_in
1247 * ptl_size_t offset_in
1252 PtlGet_in *args = v_args;
1253 PtlGet_out *ret = v_ret;
1255 lib_msg_t *msg = NULL;
1256 lib_ni_t *ni = &nal->ni;
1257 ptl_process_id_t *id = &args->target_in;
1259 unsigned long flags;
1262 if (!list_empty (&nal->ni.ni_test_peers) && /* normally we don't */
1263 fail_peer (nal, id->nid, 1)) /* shall we now? */
1265 CERROR(LPU64": Dropping PUT to "LPU64": simulated failure\n",
1266 nal->ni.nid, id->nid);
1267 return (ret->rc = PTL_INV_PROC);
1270 state_lock(nal, &flags);
1271 md = ptl_handle2md(&args->md_in, nal);
1272 if (md == NULL || !md->threshold) {
1273 state_unlock(nal, &flags);
1274 return ret->rc = PTL_INV_MD;
1277 LASSERT (md->offset == 0);
1279 CDEBUG(D_NET, "PtlGet -> %Lu: %lu\n", (unsigned long long)id->nid,
1280 (unsigned long)id->pid);
1282 memset (&hdr, 0, sizeof (hdr));
1283 hdr.type = HTON__u32 (PTL_MSG_GET);
1284 hdr.dest_nid = HTON__u64 (id->nid);
1285 hdr.src_nid = HTON__u64 (ni->nid);
1286 hdr.dest_pid = HTON__u32 (id->pid);
1287 hdr.src_pid = HTON__u32 (ni->pid);
1288 hdr.payload_length = 0;
1290 /* NB handles only looked up by creator (no flips) */
1291 hdr.msg.get.return_wmd.wh_interface_cookie = ni->ni_interface_cookie;
1292 hdr.msg.get.return_wmd.wh_object_cookie = md->md_lh.lh_cookie;
1294 hdr.msg.get.match_bits = HTON__u64 (args->match_bits_in);
1295 hdr.msg.get.ptl_index = HTON__u32 (args->portal_in);
1296 hdr.msg.get.src_offset = HTON__u32 (args->offset_in);
1297 hdr.msg.get.sink_length = HTON__u32 (md->length);
1299 ni->counters.send_count++;
1301 msg = get_new_msg (nal, md);
1303 CERROR("do_PtlGet: BAD - could not allocate cookie!\n");
1304 state_unlock(nal, &flags);
1305 return ret->rc = PTL_NOSPACE;
1309 * If this memory descriptor has an event queue associated with
1310 * it we must allocate a message state object that will record
1311 * the information to be filled in once the message has been
1312 * completed. More information is in the do_PtlPut() comments.
1314 * NB. We're now committed to the GET, since we just marked the MD
1315 * busy. Callers who observe this (by getting PTL_MD_INUSE from
1316 * PtlMDUnlink()) expect a completion event to tell them when the
1320 msg->ev.type = PTL_EVENT_SENT;
1321 msg->ev.initiator.nid = ni->nid;
1322 msg->ev.initiator.pid = ni->pid;
1323 msg->ev.portal = args->portal_in;
1324 msg->ev.match_bits = args->match_bits_in;
1325 msg->ev.rlength = md->length;
1326 msg->ev.mlength = md->length;
1327 msg->ev.offset = args->offset_in;
1328 msg->ev.hdr_data = 0;
1330 lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
1333 state_unlock(nal, &flags);
1335 rc = lib_send (nal, private, msg, &hdr, PTL_MSG_GET,
1336 id->nid, id->pid, NULL, 0, 0);
1338 /* get_new_msg() committed us to sending by decrementing
1339 * md->threshold, so we have to act like we did send, but
1340 * the network dropped it. */
1341 lib_finalize (nal, private, msg);
1344 return ret->rc = PTL_OK;
1347 void lib_assert_wire_constants (void)
1349 /* Wire protocol assertions generated by 'wirecheck'
1350 * running on Linux robert.bartonsoftware.com 2.4.20-18.9 #1 Thu May 29 06:54:41 EDT 2003 i68
1351 * with gcc version 3.2.2 20030222 (Red Hat Linux 3.2.2-5) */
1355 LASSERT (PORTALS_PROTO_MAGIC == 0xeebc0ded);
1356 LASSERT (PORTALS_PROTO_VERSION_MAJOR == 0);
1357 LASSERT (PORTALS_PROTO_VERSION_MINOR == 3);
1358 LASSERT (PTL_MSG_ACK == 0);
1359 LASSERT (PTL_MSG_PUT == 1);
1360 LASSERT (PTL_MSG_GET == 2);
1361 LASSERT (PTL_MSG_REPLY == 3);
1362 LASSERT (PTL_MSG_HELLO == 4);
1364 /* Checks for struct ptl_handle_wire_t */
1365 LASSERT ((int)sizeof(ptl_handle_wire_t) == 16);
1366 LASSERT (offsetof(ptl_handle_wire_t, wh_interface_cookie) == 0);
1367 LASSERT ((int)sizeof(((ptl_handle_wire_t *)0)->wh_interface_cookie) == 8);
1368 LASSERT (offsetof(ptl_handle_wire_t, wh_object_cookie) == 8);
1369 LASSERT ((int)sizeof(((ptl_handle_wire_t *)0)->wh_object_cookie) == 8);
1371 /* Checks for struct ptl_magicversion_t */
1372 LASSERT ((int)sizeof(ptl_magicversion_t) == 8);
1373 LASSERT (offsetof(ptl_magicversion_t, magic) == 0);
1374 LASSERT ((int)sizeof(((ptl_magicversion_t *)0)->magic) == 4);
1375 LASSERT (offsetof(ptl_magicversion_t, version_major) == 4);
1376 LASSERT ((int)sizeof(((ptl_magicversion_t *)0)->version_major) == 2);
1377 LASSERT (offsetof(ptl_magicversion_t, version_minor) == 6);
1378 LASSERT ((int)sizeof(((ptl_magicversion_t *)0)->version_minor) == 2);
1380 /* Checks for struct ptl_hdr_t */
1381 LASSERT ((int)sizeof(ptl_hdr_t) == 72);
1382 LASSERT (offsetof(ptl_hdr_t, dest_nid) == 0);
1383 LASSERT ((int)sizeof(((ptl_hdr_t *)0)->dest_nid) == 8);
1384 LASSERT (offsetof(ptl_hdr_t, src_nid) == 8);
1385 LASSERT ((int)sizeof(((ptl_hdr_t *)0)->src_nid) == 8);
1386 LASSERT (offsetof(ptl_hdr_t, dest_pid) == 16);
1387 LASSERT ((int)sizeof(((ptl_hdr_t *)0)->dest_pid) == 4);
1388 LASSERT (offsetof(ptl_hdr_t, src_pid) == 20);
1389 LASSERT ((int)sizeof(((ptl_hdr_t *)0)->src_pid) == 4);
1390 LASSERT (offsetof(ptl_hdr_t, type) == 24);
1391 LASSERT ((int)sizeof(((ptl_hdr_t *)0)->type) == 4);
1392 LASSERT (offsetof(ptl_hdr_t, payload_length) == 28);
1393 LASSERT ((int)sizeof(((ptl_hdr_t *)0)->payload_length) == 4);
1394 LASSERT (offsetof(ptl_hdr_t, msg) == 32);
1395 LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg) == 40);
1398 LASSERT (offsetof(ptl_hdr_t, msg.ack.dst_wmd) == 32);
1399 LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.ack.dst_wmd) == 16);
1400 LASSERT (offsetof(ptl_hdr_t, msg.ack.match_bits) == 48);
1401 LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.ack.match_bits) == 8);
1402 LASSERT (offsetof(ptl_hdr_t, msg.ack.mlength) == 56);
1403 LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.ack.mlength) == 4);
1406 LASSERT (offsetof(ptl_hdr_t, msg.put.ack_wmd) == 32);
1407 LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.put.ack_wmd) == 16);
1408 LASSERT (offsetof(ptl_hdr_t, msg.put.match_bits) == 48);
1409 LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.put.match_bits) == 8);
1410 LASSERT (offsetof(ptl_hdr_t, msg.put.hdr_data) == 56);
1411 LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.put.hdr_data) == 8);
1412 LASSERT (offsetof(ptl_hdr_t, msg.put.ptl_index) == 64);
1413 LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.put.ptl_index) == 4);
1414 LASSERT (offsetof(ptl_hdr_t, msg.put.offset) == 68);
1415 LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.put.offset) == 4);
1418 LASSERT (offsetof(ptl_hdr_t, msg.get.return_wmd) == 32);
1419 LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.get.return_wmd) == 16);
1420 LASSERT (offsetof(ptl_hdr_t, msg.get.match_bits) == 48);
1421 LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.get.match_bits) == 8);
1422 LASSERT (offsetof(ptl_hdr_t, msg.get.ptl_index) == 56);
1423 LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.get.ptl_index) == 4);
1424 LASSERT (offsetof(ptl_hdr_t, msg.get.src_offset) == 60);
1425 LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.get.src_offset) == 4);
1426 LASSERT (offsetof(ptl_hdr_t, msg.get.sink_length) == 64);
1427 LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.get.sink_length) == 4);
1430 LASSERT (offsetof(ptl_hdr_t, msg.reply.dst_wmd) == 32);
1431 LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.reply.dst_wmd) == 16);
1434 LASSERT (offsetof(ptl_hdr_t, msg.hello.incarnation) == 32);
1435 LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.hello.incarnation) == 8);
1436 LASSERT (offsetof(ptl_hdr_t, msg.hello.type) == 40);
1437 LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.hello.type) == 4);