1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Data movement routines
7 * Copyright (c) 2001-2003 Cluster File Systems, Inc.
8 * Copyright (c) 2001-2002 Sandia National Laboratories
10 * This file is part of Lustre, http://www.sf.net/projects/lustre/
12 * Lustre is free software; you can redistribute it and/or
13 * modify it under the terms of version 2 of the GNU General Public
14 * License as published by the Free Software Foundation.
16 * Lustre is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU General Public License for more details.
21 * You should have received a copy of the GNU General Public License
22 * along with Lustre; if not, write to the Free Software
23 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
29 # define DEBUG_SUBSYSTEM S_PORTALS
30 # include <linux/kp30.h>
32 #include <portals/p30.h>
33 #include <portals/lib-p30.h>
34 #include <portals/arg-blocks.h>
37 * Right now it does not check access control lists.
39 * We only support one MD per ME, which is how the Portals 3.1 spec is written.
40 * All previous complication is removed.
44 lib_find_me(nal_cb_t *nal, int index, int op_mask, ptl_nid_t src_nid,
45 ptl_pid_t src_pid, ptl_size_t rlength, ptl_size_t roffset,
46 ptl_match_bits_t match_bits, ptl_size_t *mlength_out,
47 ptl_size_t *offset_out, int *unlink_out)
49 lib_ni_t *ni = &nal->ni;
50 struct list_head *match_list = &ni->tbl.tbl[index];
51 struct list_head *tmp;
59 CDEBUG (D_NET, "Request from "LPU64".%d of length %d into portal %d "
60 "MB="LPX64"\n", src_nid, src_pid, rlength, index, match_bits);
62 if (index < 0 || index >= ni->tbl.size) {
63 CERROR("Invalid portal %d not in [0-%d]\n",
68 list_for_each (tmp, match_list) {
69 me = list_entry(tmp, lib_me_t, me_list);
72 /* ME attached but MD not attached yet */
76 LASSERT (me == md->me);
79 if (md->threshold == 0)
82 /* mismatched MD op */
83 if ((md->options & op_mask) == 0)
86 /* mismatched ME nid/pid? */
87 if (me->match_id.nid != PTL_NID_ANY &&
88 me->match_id.nid != src_nid)
91 if (me->match_id.pid != PTL_PID_ANY &&
92 me->match_id.pid != src_pid)
95 /* mismatched ME matchbits? */
96 if (((me->match_bits ^ match_bits) & ~me->ignore_bits) != 0)
99 /* Hurrah! This _is_ a match; check it out... */
101 if ((md->options & PTL_MD_MANAGE_REMOTE) == 0)
106 mlength = md->length - offset;
107 if ((md->options & PTL_MD_MAX_SIZE) != 0 &&
108 mlength > md->max_size)
109 mlength = md->max_size;
111 if (rlength <= mlength) { /* fits in allowed space */
113 } else if ((md->options & PTL_MD_TRUNCATE) == 0) {
114 /* this packet _really_ is too big */
115 CERROR("Matching packet %d too big: %d left, "
116 "%d allowed\n", rlength, md->length - offset,
121 md->offset = offset + mlength;
123 *offset_out = offset;
124 *mlength_out = mlength;
125 *unlink_out = ((md->options & PTL_MD_AUTO_UNLINK) != 0 &&
126 md->offset >= (md->length - md->max_size));
131 CERROR (LPU64": Dropping %s from "LPU64".%d portal %d match "LPX64
132 " offset %d length %d: no match\n",
133 ni->nid, (op_mask == PTL_MD_OP_GET) ? "GET" : "PUT",
134 src_nid, src_pid, index, match_bits, roffset, rlength);
138 int do_PtlFailNid (nal_cb_t *nal, void *private, void *v_args, void *v_ret)
140 PtlFailNid_in *args = v_args;
141 PtlFailNid_out *ret = v_ret;
144 struct list_head *el;
145 struct list_head *next;
146 struct list_head cull;
148 if (args->threshold != 0) {
149 /* Adding a new entry */
150 tp = (lib_test_peer_t *)nal->cb_malloc (nal, sizeof (*tp));
152 return (ret->rc = PTL_FAIL);
154 tp->tp_nid = args->nid;
155 tp->tp_threshold = args->threshold;
157 state_lock (nal, &flags);
158 list_add (&tp->tp_list, &nal->ni.ni_test_peers);
159 state_unlock (nal, &flags);
160 return (ret->rc = PTL_OK);
163 /* removing entries */
164 INIT_LIST_HEAD (&cull);
166 state_lock (nal, &flags);
168 list_for_each_safe (el, next, &nal->ni.ni_test_peers) {
169 tp = list_entry (el, lib_test_peer_t, tp_list);
171 if (tp->tp_threshold == 0 || /* needs culling anyway */
172 args->nid == PTL_NID_ANY || /* removing all entries */
173 tp->tp_nid == args->nid) /* matched this one */
175 list_del (&tp->tp_list);
176 list_add (&tp->tp_list, &cull);
180 state_unlock (nal, &flags);
182 while (!list_empty (&cull)) {
183 tp = list_entry (cull.next, lib_test_peer_t, tp_list);
185 list_del (&tp->tp_list);
186 nal->cb_free (nal, tp, sizeof (*tp));
188 return (ret->rc = PTL_OK);
192 fail_peer (nal_cb_t *nal, ptl_nid_t nid, int outgoing)
195 struct list_head *el;
196 struct list_head *next;
198 struct list_head cull;
201 INIT_LIST_HEAD (&cull);
203 state_lock (nal, &flags);
205 list_for_each_safe (el, next, &nal->ni.ni_test_peers) {
206 tp = list_entry (el, lib_test_peer_t, tp_list);
208 if (tp->tp_threshold == 0) {
211 /* only cull zombies on outgoing tests,
212 * since we may be at interrupt priority on
213 * incoming messages. */
214 list_del (&tp->tp_list);
215 list_add (&tp->tp_list, &cull);
220 if (tp->tp_nid == PTL_NID_ANY || /* fail every peer */
221 nid == tp->tp_nid) { /* fail this peer */
224 if (tp->tp_threshold != PTL_MD_THRESH_INF) {
227 tp->tp_threshold == 0) {
229 list_del (&tp->tp_list);
230 list_add (&tp->tp_list, &cull);
237 state_unlock (nal, &flags);
239 while (!list_empty (&cull)) {
240 tp = list_entry (cull.next, lib_test_peer_t, tp_list);
241 list_del (&tp->tp_list);
243 nal->cb_free (nal, tp, sizeof (*tp));
250 lib_iov_nob (int niov, struct iovec *iov)
255 nob += (iov++)->iov_len;
261 lib_copy_iov2buf (char *dest, int niov, struct iovec *iov, ptl_size_t len)
268 nob = MIN (iov->iov_len, len);
269 memcpy (dest, iov->iov_base, nob);
279 lib_copy_buf2iov (int niov, struct iovec *iov, char *src, ptl_size_t len)
286 nob = MIN (iov->iov_len, len);
287 memcpy (iov->iov_base, src, nob);
297 lib_extract_iov (struct iovec *dst, lib_md_t *md,
298 ptl_size_t offset, ptl_size_t len)
300 /* Initialise 'dst' to the subset of 'src' starting at 'offset',
301 * for exactly 'len' bytes, and return the number of entries.
302 * NB not destructive to 'src' */
303 int src_niov = md->md_niov;
304 struct iovec *src = md->md_iov.iov;
309 LASSERT (offset >= 0);
310 LASSERT (offset + len <= md->length);
312 if (len == 0) /* no data => */
313 return (0); /* no frags */
315 LASSERT (src_niov > 0);
316 while (offset >= src->iov_len) { /* skip initial frags */
317 offset -= src->iov_len;
320 LASSERT (src_niov > 0);
325 LASSERT (src_niov > 0);
326 LASSERT (dst_niov <= PTL_MD_MAX_IOV);
328 frag_len = src->iov_len - offset;
329 dst->iov_base = ((char *)src->iov_base) + offset;
331 if (len <= frag_len) {
336 dst->iov_len = frag_len;
349 lib_kiov_nob (int niov, ptl_kiov_t *kiov)
356 lib_copy_kiov2buf (char *dest, int niov, ptl_kiov_t *kiov, ptl_size_t len)
362 lib_copy_buf2kiov (int niov, ptl_kiov_t *kiov, char *dest, ptl_size_t len)
368 lib_extract_kiov (ptl_kiov_t *dst, lib_md_t *md,
369 ptl_size_t offset, ptl_size_t len)
377 lib_kiov_nob (int niov, ptl_kiov_t *kiov)
382 nob += (kiov++)->kiov_len;
388 lib_copy_kiov2buf (char *dest, int niov, ptl_kiov_t *kiov, ptl_size_t len)
393 LASSERT (!in_interrupt ());
397 nob = MIN (kiov->kiov_len, len);
399 addr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
400 memcpy (dest, addr, nob);
401 kunmap (kiov->kiov_page);
411 lib_copy_buf2kiov (int niov, ptl_kiov_t *kiov, char *src, ptl_size_t len)
416 LASSERT (!in_interrupt ());
420 nob = MIN (kiov->kiov_len, len);
422 addr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
423 memcpy (addr, src, nob);
424 kunmap (kiov->kiov_page);
434 lib_extract_kiov (ptl_kiov_t *dst, lib_md_t *md,
435 ptl_size_t offset, ptl_size_t len)
437 /* Initialise 'dst' to the subset of 'src' starting at 'offset',
438 * for exactly 'len' bytes, and return the number of entries.
439 * NB not destructive to 'src' */
440 int src_niov = md->md_niov;
441 ptl_kiov_t *src = md->md_iov.kiov;
446 LASSERT (offset >= 0);
447 LASSERT (offset + len <= md->length);
449 if (len == 0) /* no data => */
450 return (0); /* no frags */
452 LASSERT (src_niov > 0);
453 while (offset >= src->kiov_len) { /* skip initial frags */
454 offset -= src->kiov_len;
457 LASSERT (src_niov > 0);
462 LASSERT (src_niov > 0);
463 LASSERT (dst_niov <= PTL_MD_MAX_IOV);
465 frag_len = src->kiov_len - offset;
466 dst->kiov_page = src->kiov_page;
467 dst->kiov_offset = src->kiov_offset + offset;
469 if (len <= frag_len) {
471 LASSERT (dst->kiov_offset + dst->kiov_len <= PAGE_SIZE);
475 dst->kiov_len = frag_len;
476 LASSERT (dst->kiov_offset + dst->kiov_len <= PAGE_SIZE);
489 lib_recv (nal_cb_t *nal, void *private, lib_msg_t *msg, lib_md_t *md,
490 ptl_size_t offset, ptl_size_t mlen, ptl_size_t rlen)
495 nal->cb_recv (nal, private, msg, 0, NULL, 0, rlen);
496 else if ((md->options & PTL_MD_KIOV) == 0) {
497 niov = lib_extract_iov (msg->msg_iov.iov, md, offset, mlen);
498 nal->cb_recv (nal, private, msg,
499 niov, msg->msg_iov.iov, mlen, rlen);
501 niov = lib_extract_kiov (msg->msg_iov.kiov, md, offset, mlen);
502 nal->cb_recv_pages (nal, private, msg,
503 niov, msg->msg_iov.kiov, mlen, rlen);
508 lib_send (nal_cb_t *nal, void *private, lib_msg_t *msg,
509 ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
510 lib_md_t *md, ptl_size_t offset, ptl_size_t len)
515 return (nal->cb_send (nal, private, msg,
519 if ((md->options & PTL_MD_KIOV) == 0) {
520 niov = lib_extract_iov (msg->msg_iov.iov, md, offset, len);
521 return (nal->cb_send (nal, private, msg,
523 niov, msg->msg_iov.iov, len));
526 niov = lib_extract_kiov (msg->msg_iov.kiov, md, offset, len);
527 return (nal->cb_send_pages (nal, private, msg,
529 niov, msg->msg_iov.kiov, len));
533 get_new_msg (nal_cb_t *nal, lib_md_t *md)
535 /* ALWAYS called holding the state_lock */
536 lib_counters_t *counters = &nal->ni.counters;
537 lib_msg_t *msg = lib_msg_alloc (nal);
542 memset (msg, 0, sizeof (*msg));
547 msg->ev.arrival_time = get_cycles();
549 if (md->threshold != PTL_MD_THRESH_INF) {
550 LASSERT (md->threshold > 0);
554 counters->msgs_alloc++;
555 if (counters->msgs_alloc > counters->msgs_max)
556 counters->msgs_max = counters->msgs_alloc;
558 list_add (&msg->msg_list, &nal->ni.ni_active_msgs);
565 * Incoming messages have a ptl_msg_t object associated with them
566 * by the library. This object encapsulates the state of the
567 * message and allows the NAL to do non-blocking receives or sends
571 static int parse_put(nal_cb_t * nal, ptl_hdr_t * hdr, void *private)
573 lib_ni_t *ni = &nal->ni;
574 ptl_size_t mlength = 0;
575 ptl_size_t offset = 0;
582 /* Convert put fields to host byte order */
583 hdr->msg.put.match_bits = NTOH__u64 (hdr->msg.put.match_bits);
584 hdr->msg.put.ptl_index = NTOH__u32 (hdr->msg.put.ptl_index);
585 hdr->msg.put.offset = NTOH__u32 (hdr->msg.put.offset);
587 state_lock(nal, &flags);
589 me = lib_find_me(nal, hdr->msg.put.ptl_index, PTL_MD_OP_PUT,
590 hdr->src_nid, hdr->src_pid,
591 PTL_HDR_LENGTH (hdr), hdr->msg.put.offset,
592 hdr->msg.put.match_bits,
593 &mlength, &offset, &unlink);
598 CDEBUG(D_NET, "Incoming put index %x from "LPU64"/%u of length %d/%d "
599 "into md "LPX64" [%d] + %d\n", hdr->msg.put.ptl_index,
600 hdr->src_nid, hdr->src_pid, mlength, PTL_HDR_LENGTH(hdr),
601 md->md_lh.lh_cookie, md->md_niov, offset);
603 msg = get_new_msg (nal, md);
605 CERROR(LPU64": Dropping PUT from "LPU64": can't allocate msg\n",
606 ni->nid, hdr->src_nid);
610 if (!ptl_is_wire_handle_none(&hdr->msg.put.ack_wmd) &&
611 !(md->options & PTL_MD_ACK_DISABLE)) {
613 msg->ack_wmd = hdr->msg.put.ack_wmd;
614 msg->nid = hdr->src_nid;
615 msg->pid = hdr->src_pid;
616 msg->ev.match_bits = hdr->msg.put.match_bits;
620 msg->ev.type = PTL_EVENT_PUT;
621 msg->ev.initiator.nid = hdr->src_nid;
622 msg->ev.initiator.pid = hdr->src_pid;
623 msg->ev.portal = hdr->msg.put.ptl_index;
624 msg->ev.match_bits = hdr->msg.put.match_bits;
625 msg->ev.rlength = PTL_HDR_LENGTH(hdr);
626 msg->ev.mlength = mlength;
627 msg->ev.offset = offset;
628 msg->ev.hdr_data = hdr->msg.put.hdr_data;
630 /* NB if this match has exhausted the MD, we can't be sure
631 * that this event will the the last one associated with
632 * this MD in the event queue (another message already
633 * matching this ME/MD could end up being last). So we
634 * remember the ME handle anyway and check again when we're
635 * allocating our slot in the event queue.
637 ptl_me2handle (&msg->ev.unlinked_me, me);
639 lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
642 ni->counters.recv_count++;
643 ni->counters.recv_length += mlength;
645 /* only unlink after MD's pending count has been bumped
646 * in get_new_msg() otherwise lib_me_unlink() will nuke it */
648 md->md_flags |= PTL_MD_FLAG_AUTO_UNLINKED;
649 lib_me_unlink (nal, me);
652 state_unlock(nal, &flags);
654 lib_recv (nal, private, msg, md, offset, mlength, PTL_HDR_LENGTH (hdr));
658 nal->ni.counters.drop_count++;
659 nal->ni.counters.drop_length += PTL_HDR_LENGTH(hdr);
660 state_unlock (nal, &flags);
661 lib_recv (nal, private, NULL, NULL, 0, 0, PTL_HDR_LENGTH (hdr));
665 static int parse_get(nal_cb_t * nal, ptl_hdr_t * hdr, void *private)
667 lib_ni_t *ni = &nal->ni;
668 ptl_size_t mlength = 0;
669 ptl_size_t offset = 0;
678 /* Convert get fields to host byte order */
679 hdr->msg.get.match_bits = NTOH__u64 (hdr->msg.get.match_bits);
680 hdr->msg.get.ptl_index = NTOH__u32 (hdr->msg.get.ptl_index);
681 hdr->msg.get.sink_length = NTOH__u32 (hdr->msg.get.sink_length);
682 hdr->msg.get.src_offset = NTOH__u32 (hdr->msg.get.src_offset);
684 /* compatibility check until field is deleted */
685 if (hdr->msg.get.return_offset != 0)
686 CERROR("Unexpected non-zero get.return_offset %x from "
687 LPU64"\n", hdr->msg.get.return_offset, hdr->src_nid);
689 state_lock(nal, &flags);
691 me = lib_find_me(nal, hdr->msg.get.ptl_index, PTL_MD_OP_GET,
692 hdr->src_nid, hdr->src_pid,
693 hdr->msg.get.sink_length, hdr->msg.get.src_offset,
694 hdr->msg.get.match_bits,
695 &mlength, &offset, &unlink);
700 CDEBUG(D_NET, "Incoming get index %d from "LPU64".%u of length %d/%d "
701 "from md "LPX64" [%d] + %d\n", hdr->msg.get.ptl_index,
702 hdr->src_nid, hdr->src_pid, mlength, PTL_HDR_LENGTH(hdr),
703 md->md_lh.lh_cookie, md->md_niov, offset);
705 msg = get_new_msg (nal, md);
707 CERROR(LPU64": Dropping GET from "LPU64": can't allocate msg\n",
708 ni->nid, hdr->src_nid);
713 msg->ev.type = PTL_EVENT_GET;
714 msg->ev.initiator.nid = hdr->src_nid;
715 msg->ev.initiator.pid = hdr->src_pid;
716 msg->ev.portal = hdr->msg.get.ptl_index;
717 msg->ev.match_bits = hdr->msg.get.match_bits;
718 msg->ev.rlength = PTL_HDR_LENGTH(hdr);
719 msg->ev.mlength = mlength;
720 msg->ev.offset = offset;
721 msg->ev.hdr_data = 0;
723 /* NB if this match has exhausted the MD, we can't be sure
724 * that this event will the the last one associated with
725 * this MD in the event queue (another message already
726 * matching this ME/MD could end up being last). So we
727 * remember the ME handle anyway and check again when we're
728 * allocating our slot in the event queue.
730 ptl_me2handle (&msg->ev.unlinked_me, me);
732 lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
735 ni->counters.send_count++;
736 ni->counters.send_length += mlength;
738 /* only unlink after MD's refcount has been bumped
739 * in get_new_msg() otherwise lib_me_unlink() will nuke it */
741 md->md_flags |= PTL_MD_FLAG_AUTO_UNLINKED;
742 lib_me_unlink (nal, me);
745 state_unlock(nal, &flags);
747 memset (&reply, 0, sizeof (reply));
748 reply.type = HTON__u32 (PTL_MSG_REPLY);
749 reply.dest_nid = HTON__u64 (hdr->src_nid);
750 reply.src_nid = HTON__u64 (ni->nid);
751 reply.dest_pid = HTON__u32 (hdr->src_pid);
752 reply.src_pid = HTON__u32 (ni->pid);
753 PTL_HDR_LENGTH(&reply) = HTON__u32 (mlength);
755 reply.msg.reply.dst_wmd = hdr->msg.get.return_wmd;
757 rc = lib_send (nal, private, msg, &reply, PTL_MSG_REPLY,
758 hdr->src_nid, hdr->src_pid, md, offset, mlength);
760 CERROR(LPU64": Dropping GET from "LPU64": send REPLY failed\n",
761 ni->nid, hdr->src_nid);
762 state_lock (nal, &flags);
766 /* Complete the incoming message */
767 lib_recv (nal, private, NULL, NULL, 0, 0, PTL_HDR_LENGTH (hdr));
770 ni->counters.drop_count++;
771 ni->counters.drop_length += hdr->msg.get.sink_length;
772 state_unlock(nal, &flags);
773 lib_recv (nal, private, NULL, NULL, 0, 0, PTL_HDR_LENGTH (hdr));
777 static int parse_reply(nal_cb_t * nal, ptl_hdr_t * hdr, void *private)
779 lib_ni_t *ni = &nal->ni;
786 /* compatibility check until field is deleted */
787 if (hdr->msg.reply.dst_offset != 0)
788 CERROR("Unexpected non-zero reply.dst_offset %x from "LPU64"\n",
789 hdr->msg.reply.dst_offset, hdr->src_nid);
791 state_lock(nal, &flags);
793 /* NB handles only looked up by creator (no flips) */
794 md = ptl_wire_handle2md(&hdr->msg.reply.dst_wmd, nal);
795 if (md == NULL || md->threshold == 0) {
796 CERROR (LPU64": Dropping REPLY from "LPU64" for %s MD "LPX64"."LPX64"\n",
797 ni->nid, hdr->src_nid,
798 md == NULL ? "invalid" : "inactive",
799 hdr->msg.reply.dst_wmd.wh_interface_cookie,
800 hdr->msg.reply.dst_wmd.wh_object_cookie);
804 LASSERT (md->offset == 0);
806 length = rlength = PTL_HDR_LENGTH(hdr);
808 if (length > md->length) {
809 if ((md->options & PTL_MD_TRUNCATE) == 0) {
810 CERROR (LPU64": Dropping REPLY from "LPU64
811 " length %d for MD "LPX64" would overflow (%d)\n",
812 ni->nid, hdr->src_nid, length,
813 hdr->msg.reply.dst_wmd.wh_object_cookie,
820 CDEBUG(D_NET, "Reply from "LPU64" of length %d/%d into md "LPX64"\n",
821 hdr->src_nid, length, rlength,
822 hdr->msg.reply.dst_wmd.wh_object_cookie);
824 msg = get_new_msg (nal, md);
826 CERROR(LPU64": Dropping REPLY from "LPU64": can't "
827 "allocate msg\n", ni->nid, hdr->src_nid);
832 msg->ev.type = PTL_EVENT_REPLY;
833 msg->ev.initiator.nid = hdr->src_nid;
834 msg->ev.initiator.pid = hdr->src_pid;
835 msg->ev.rlength = rlength;
836 msg->ev.mlength = length;
839 lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
842 ni->counters.recv_count++;
843 ni->counters.recv_length += length;
845 state_unlock(nal, &flags);
847 lib_recv (nal, private, msg, md, 0, length, rlength);
851 nal->ni.counters.drop_count++;
852 nal->ni.counters.drop_length += PTL_HDR_LENGTH(hdr);
853 state_unlock (nal, &flags);
854 lib_recv (nal, private, NULL, NULL, 0, 0, PTL_HDR_LENGTH (hdr));
858 static int parse_ack(nal_cb_t * nal, ptl_hdr_t * hdr, void *private)
860 lib_ni_t *ni = &nal->ni;
862 lib_msg_t *msg = NULL;
865 /* Convert ack fields to host byte order */
866 hdr->msg.ack.match_bits = NTOH__u64 (hdr->msg.ack.match_bits);
867 hdr->msg.ack.mlength = NTOH__u32 (hdr->msg.ack.mlength);
869 state_lock(nal, &flags);
871 /* NB handles only looked up by creator (no flips) */
872 md = ptl_wire_handle2md(&hdr->msg.ack.dst_wmd, nal);
873 if (md == NULL || md->threshold == 0) {
874 CDEBUG(D_INFO, LPU64": Dropping ACK from "LPU64" to %s MD "
875 LPX64"."LPX64"\n", ni->nid, hdr->src_nid,
876 (md == NULL) ? "invalid" : "inactive",
877 hdr->msg.ack.dst_wmd.wh_interface_cookie,
878 hdr->msg.ack.dst_wmd.wh_object_cookie);
882 CDEBUG(D_NET, LPU64": ACK from "LPU64" into md "LPX64"\n",
883 ni->nid, hdr->src_nid,
884 hdr->msg.ack.dst_wmd.wh_object_cookie);
886 msg = get_new_msg (nal, md);
888 CERROR(LPU64": Dropping ACK from "LPU64": can't allocate msg\n",
889 ni->nid, hdr->src_nid);
894 msg->ev.type = PTL_EVENT_ACK;
895 msg->ev.initiator.nid = hdr->src_nid;
896 msg->ev.initiator.pid = hdr->src_pid;
897 msg->ev.mlength = hdr->msg.ack.mlength;
898 msg->ev.match_bits = hdr->msg.ack.match_bits;
900 lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
903 ni->counters.recv_count++;
904 state_unlock(nal, &flags);
905 lib_recv (nal, private, msg, NULL, 0, 0, PTL_HDR_LENGTH (hdr));
909 nal->ni.counters.drop_count++;
910 state_unlock (nal, &flags);
911 lib_recv (nal, private, NULL, NULL, 0, 0, PTL_HDR_LENGTH (hdr));
916 hdr_type_string (ptl_hdr_t *hdr)
930 return ("<UNKNOWN>");
934 void print_hdr(nal_cb_t * nal, ptl_hdr_t * hdr)
936 char *type_str = hdr_type_string (hdr);
938 nal->cb_printf(nal, "P3 Header at %p of type %s\n", hdr, type_str);
939 nal->cb_printf(nal, " From nid/pid %Lu/%Lu", hdr->src_nid,
941 nal->cb_printf(nal, " To nid/pid %Lu/%Lu\n", hdr->dest_nid,
950 " Ptl index %d, ack md "LPX64"."LPX64", "
951 "match bits "LPX64"\n",
952 hdr->msg.put.ptl_index,
953 hdr->msg.put.ack_wmd.wh_interface_cookie,
954 hdr->msg.put.ack_wmd.wh_object_cookie,
955 hdr->msg.put.match_bits);
957 " Length %d, offset %d, hdr data "LPX64"\n",
958 PTL_HDR_LENGTH(hdr), hdr->msg.put.offset,
959 hdr->msg.put.hdr_data);
964 " Ptl index %d, return md "LPX64"."LPX64", "
965 "match bits "LPX64"\n", hdr->msg.get.ptl_index,
966 hdr->msg.get.return_wmd.wh_interface_cookie,
967 hdr->msg.get.return_wmd.wh_object_cookie,
968 hdr->msg.get.match_bits);
970 " Length %d, src offset %d\n",
971 hdr->msg.get.sink_length,
972 hdr->msg.get.src_offset);
976 nal->cb_printf(nal, " dst md "LPX64"."LPX64", "
977 "manipulated length %d\n",
978 hdr->msg.ack.dst_wmd.wh_interface_cookie,
979 hdr->msg.ack.dst_wmd.wh_object_cookie,
980 hdr->msg.ack.mlength);
984 nal->cb_printf(nal, " dst md "LPX64"."LPX64", "
986 hdr->msg.reply.dst_wmd.wh_interface_cookie,
987 hdr->msg.reply.dst_wmd.wh_object_cookie,
988 PTL_HDR_LENGTH(hdr));
991 } /* end of print_hdr() */
994 int lib_parse(nal_cb_t * nal, ptl_hdr_t * hdr, void *private)
998 /* NB static check; optimizer will elide this if it's right */
999 LASSERT (offsetof (ptl_hdr_t, msg.ack.length) ==
1000 offsetof (ptl_hdr_t, msg.put.length));
1001 LASSERT (offsetof (ptl_hdr_t, msg.ack.length) ==
1002 offsetof (ptl_hdr_t, msg.get.length));
1003 LASSERT (offsetof (ptl_hdr_t, msg.ack.length) ==
1004 offsetof (ptl_hdr_t, msg.reply.length));
1006 /* convert common fields to host byte order */
1007 hdr->dest_nid = NTOH__u64 (hdr->dest_nid);
1008 hdr->src_nid = NTOH__u64 (hdr->src_nid);
1009 hdr->dest_pid = NTOH__u32 (hdr->dest_pid);
1010 hdr->src_pid = NTOH__u32 (hdr->src_pid);
1011 hdr->type = NTOH__u32 (hdr->type);
1012 PTL_HDR_LENGTH(hdr) = NTOH__u32 (PTL_HDR_LENGTH(hdr));
1014 nal->cb_printf(nal, "%d: lib_parse: nal=%p hdr=%p type=%d\n",
1015 nal->ni.nid, nal, hdr, hdr->type);
1016 print_hdr(nal, hdr);
1018 if (hdr->type == PTL_MSG_HELLO) {
1019 /* dest_nid is really ptl_magicversion_t */
1020 ptl_magicversion_t *mv = (ptl_magicversion_t *)&hdr->dest_nid;
1022 CERROR (LPU64": Dropping unexpected HELLO message: "
1023 "magic %d, version %d.%d from "LPD64"\n",
1024 nal->ni.nid, mv->magic,
1025 mv->version_major, mv->version_minor,
1027 lib_recv (nal, private, NULL, NULL, 0, 0, PTL_HDR_LENGTH (hdr));
1031 if (hdr->dest_nid != nal->ni.nid) {
1032 CERROR(LPU64": Dropping %s message from "LPU64" to "LPU64
1033 " (not me)\n", nal->ni.nid, hdr_type_string (hdr),
1034 hdr->src_nid, hdr->dest_nid);
1036 state_lock (nal, &flags);
1037 nal->ni.counters.drop_count++;
1038 nal->ni.counters.drop_length += PTL_HDR_LENGTH(hdr);
1039 state_unlock (nal, &flags);
1041 lib_recv (nal, private, NULL, NULL, 0, 0, PTL_HDR_LENGTH (hdr));
1045 if (!list_empty (&nal->ni.ni_test_peers) && /* normally we don't */
1046 fail_peer (nal, hdr->src_nid, 0)) /* shall we now? */
1048 CERROR(LPU64": Dropping incoming %s from "LPU64
1049 ": simulated failure\n",
1050 nal->ni.nid, hdr_type_string (hdr),
1055 switch (hdr->type) {
1057 return (parse_ack(nal, hdr, private));
1059 return (parse_put(nal, hdr, private));
1062 return (parse_get(nal, hdr, private));
1065 return (parse_reply(nal, hdr, private));
1068 CERROR(LPU64": Dropping <unknown> message from "LPU64
1069 ": Bad type=0x%x\n", nal->ni.nid, hdr->src_nid,
1072 lib_recv (nal, private, NULL, NULL, 0, 0, PTL_HDR_LENGTH (hdr));
1078 int do_PtlPut(nal_cb_t * nal, void *private, void *v_args, void *v_ret)
1082 * ptl_handle_md_t md_in
1083 * ptl_ack_req_t ack_req_in
1084 * ptl_process_id_t target_in
1085 * ptl_pt_index_t portal_in
1086 * ptl_ac_index_t cookie_in
1087 * ptl_match_bits_t match_bits_in
1088 * ptl_size_t offset_in
1093 PtlPut_in *args = v_args;
1094 PtlPut_out *ret = v_ret;
1097 lib_ni_t *ni = &nal->ni;
1099 lib_msg_t *msg = NULL;
1100 ptl_process_id_t *id = &args->target_in;
1101 unsigned long flags;
1103 if (!list_empty (&nal->ni.ni_test_peers) && /* normally we don't */
1104 fail_peer (nal, id->nid, 1)) /* shall we now? */
1106 CERROR(LPU64": Dropping PUT to "LPU64": simulated failure\n",
1107 nal->ni.nid, id->nid);
1108 return (ret->rc = PTL_INV_PROC);
1112 state_lock(nal, &flags);
1113 md = ptl_handle2md(&args->md_in, nal);
1114 if (md == NULL || !md->threshold) {
1115 state_unlock(nal, &flags);
1116 return ret->rc = PTL_INV_MD;
1119 CDEBUG(D_NET, "PtlPut -> %Lu: %lu\n", (unsigned long long)id->nid,
1120 (unsigned long)id->pid);
1122 memset (&hdr, 0, sizeof (hdr));
1123 hdr.type = HTON__u32 (PTL_MSG_PUT);
1124 hdr.dest_nid = HTON__u64 (id->nid);
1125 hdr.src_nid = HTON__u64 (ni->nid);
1126 hdr.dest_pid = HTON__u32 (id->pid);
1127 hdr.src_pid = HTON__u32 (ni->pid);
1128 PTL_HDR_LENGTH(&hdr) = HTON__u32 (md->length);
1130 /* NB handles only looked up by creator (no flips) */
1131 if (args->ack_req_in == PTL_ACK_REQ) {
1132 hdr.msg.put.ack_wmd.wh_interface_cookie = ni->ni_interface_cookie;
1133 hdr.msg.put.ack_wmd.wh_object_cookie = md->md_lh.lh_cookie;
1135 hdr.msg.put.ack_wmd = PTL_WIRE_HANDLE_NONE;
1138 hdr.msg.put.match_bits = HTON__u64 (args->match_bits_in);
1139 hdr.msg.put.ptl_index = HTON__u32 (args->portal_in);
1140 hdr.msg.put.offset = HTON__u32 (args->offset_in);
1141 hdr.msg.put.hdr_data = args->hdr_data_in;
1143 ni->counters.send_count++;
1144 ni->counters.send_length += md->length;
1146 msg = get_new_msg (nal, md);
1148 CERROR("BAD: could not allocate msg!\n");
1149 state_unlock(nal, &flags);
1150 return ret->rc = PTL_NOSPACE;
1154 * If this memory descriptor has an event queue associated with
1155 * it we need to allocate a message state object and record the
1156 * information about this operation that will be recorded into
1157 * event queue once the message has been completed.
1159 * NB. We're now committed to the GET, since we just marked the MD
1160 * busy. Callers who observe this (by getting PTL_MD_INUSE from
1161 * PtlMDUnlink()) expect a completion event to tell them when the
1165 msg->ev.type = PTL_EVENT_SENT;
1166 msg->ev.initiator.nid = ni->nid;
1167 msg->ev.initiator.pid = ni->pid;
1168 msg->ev.portal = args->portal_in;
1169 msg->ev.match_bits = args->match_bits_in;
1170 msg->ev.rlength = md->length;
1171 msg->ev.mlength = md->length;
1172 msg->ev.offset = args->offset_in;
1173 msg->ev.hdr_data = args->hdr_data_in;
1175 lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
1178 state_unlock(nal, &flags);
1180 lib_send (nal, private, msg, &hdr, PTL_MSG_PUT,
1181 id->nid, id->pid, md, 0, md->length);
1183 return ret->rc = PTL_OK;
1187 int do_PtlGet(nal_cb_t * nal, void *private, void *v_args, void *v_ret)
1191 * ptl_handle_md_t md_in
1192 * ptl_process_id_t target_in
1193 * ptl_pt_index_t portal_in
1194 * ptl_ac_index_t cookie_in
1195 * ptl_match_bits_t match_bits_in
1196 * ptl_size_t offset_in
1201 PtlGet_in *args = v_args;
1202 PtlGet_out *ret = v_ret;
1204 lib_msg_t *msg = NULL;
1205 lib_ni_t *ni = &nal->ni;
1206 ptl_process_id_t *id = &args->target_in;
1208 unsigned long flags;
1210 if (!list_empty (&nal->ni.ni_test_peers) && /* normally we don't */
1211 fail_peer (nal, id->nid, 1)) /* shall we now? */
1213 CERROR(LPU64": Dropping PUT to "LPU64": simulated failure\n",
1214 nal->ni.nid, id->nid);
1215 return (ret->rc = PTL_INV_PROC);
1218 state_lock(nal, &flags);
1219 md = ptl_handle2md(&args->md_in, nal);
1220 if (md == NULL || !md->threshold) {
1221 state_unlock(nal, &flags);
1222 return ret->rc = PTL_INV_MD;
1225 LASSERT (md->offset == 0);
1227 CDEBUG(D_NET, "PtlGet -> %Lu: %lu\n", (unsigned long long)id->nid,
1228 (unsigned long)id->pid);
1230 memset (&hdr, 0, sizeof (hdr));
1231 hdr.type = HTON__u32 (PTL_MSG_GET);
1232 hdr.dest_nid = HTON__u64 (id->nid);
1233 hdr.src_nid = HTON__u64 (ni->nid);
1234 hdr.dest_pid = HTON__u32 (id->pid);
1235 hdr.src_pid = HTON__u32 (ni->pid);
1236 PTL_HDR_LENGTH(&hdr) = 0;
1238 /* NB handles only looked up by creator (no flips) */
1239 hdr.msg.get.return_wmd.wh_interface_cookie = ni->ni_interface_cookie;
1240 hdr.msg.get.return_wmd.wh_object_cookie = md->md_lh.lh_cookie;
1242 hdr.msg.get.match_bits = HTON__u64 (args->match_bits_in);
1243 hdr.msg.get.ptl_index = HTON__u32 (args->portal_in);
1244 hdr.msg.get.src_offset = HTON__u32 (args->offset_in);
1245 hdr.msg.get.sink_length = HTON__u32 (md->length);
1247 ni->counters.send_count++;
1249 msg = get_new_msg (nal, md);
1251 CERROR("do_PtlGet: BAD - could not allocate cookie!\n");
1252 state_unlock(nal, &flags);
1253 return ret->rc = PTL_NOSPACE;
1257 * If this memory descriptor has an event queue associated with
1258 * it we must allocate a message state object that will record
1259 * the information to be filled in once the message has been
1260 * completed. More information is in the do_PtlPut() comments.
1262 * NB. We're now committed to the GET, since we just marked the MD
1263 * busy. Callers who observe this (by getting PTL_MD_INUSE from
1264 * PtlMDUnlink()) expect a completion event to tell them when the
1268 msg->ev.type = PTL_EVENT_SENT;
1269 msg->ev.initiator.nid = ni->nid;
1270 msg->ev.initiator.pid = ni->pid;
1271 msg->ev.portal = args->portal_in;
1272 msg->ev.match_bits = args->match_bits_in;
1273 msg->ev.rlength = md->length;
1274 msg->ev.mlength = md->length;
1275 msg->ev.offset = args->offset_in;
1276 msg->ev.hdr_data = 0;
1278 lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
1281 state_unlock(nal, &flags);
1283 lib_send (nal, private, msg, &hdr, PTL_MSG_GET,
1284 id->nid, id->pid, NULL, 0, 0);
1286 return ret->rc = PTL_OK;
1289 void lib_assert_wire_constants (void)
1291 /* Wire protocol assertions generated by 'wirecheck' */
1294 LASSERT (PORTALS_PROTO_MAGIC == 0xeebc0ded);
1295 LASSERT (PORTALS_PROTO_VERSION_MAJOR == 0);
1296 LASSERT (PORTALS_PROTO_VERSION_MINOR == 1);
1297 LASSERT (PTL_MSG_ACK == 0);
1298 LASSERT (PTL_MSG_PUT == 1);
1299 LASSERT (PTL_MSG_GET == 2);
1300 LASSERT (PTL_MSG_REPLY == 3);
1301 LASSERT (PTL_MSG_HELLO == 4);
1303 /* Checks for struct ptl_handle_wire_t */
1304 LASSERT (sizeof (ptl_handle_wire_t) == 16);
1305 LASSERT (offsetof (ptl_handle_wire_t, wh_interface_cookie) == 0);
1306 LASSERT (sizeof (((ptl_handle_wire_t *)0)->wh_interface_cookie) == 8);
1307 LASSERT (offsetof (ptl_handle_wire_t, wh_object_cookie) == 8);
1308 LASSERT (sizeof (((ptl_handle_wire_t *)0)->wh_object_cookie) == 8);
1310 /* Checks for struct ptl_magicversion_t */
1311 LASSERT (sizeof (ptl_magicversion_t) == 8);
1312 LASSERT (offsetof (ptl_magicversion_t, magic) == 0);
1313 LASSERT (sizeof (((ptl_magicversion_t *)0)->magic) == 4);
1314 LASSERT (offsetof (ptl_magicversion_t, version_major) == 4);
1315 LASSERT (sizeof (((ptl_magicversion_t *)0)->version_major) == 2);
1316 LASSERT (offsetof (ptl_magicversion_t, version_minor) == 6);
1317 LASSERT (sizeof (((ptl_magicversion_t *)0)->version_minor) == 2);
1319 /* Checks for struct ptl_hdr_t */
1320 LASSERT (sizeof (ptl_hdr_t) == 72);
1321 LASSERT (offsetof (ptl_hdr_t, dest_nid) == 0);
1322 LASSERT (sizeof (((ptl_hdr_t *)0)->dest_nid) == 8);
1323 LASSERT (offsetof (ptl_hdr_t, src_nid) == 8);
1324 LASSERT (sizeof (((ptl_hdr_t *)0)->src_nid) == 8);
1325 LASSERT (offsetof (ptl_hdr_t, dest_pid) == 16);
1326 LASSERT (sizeof (((ptl_hdr_t *)0)->dest_pid) == 4);
1327 LASSERT (offsetof (ptl_hdr_t, src_pid) == 20);
1328 LASSERT (sizeof (((ptl_hdr_t *)0)->src_pid) == 4);
1329 LASSERT (offsetof (ptl_hdr_t, type) == 24);
1330 LASSERT (sizeof (((ptl_hdr_t *)0)->type) == 4);
1333 LASSERT (offsetof (ptl_hdr_t, msg.ack.mlength) == 28);
1334 LASSERT (sizeof (((ptl_hdr_t *)0)->msg.ack.mlength) == 4);
1335 LASSERT (offsetof (ptl_hdr_t, msg.ack.dst_wmd) == 32);
1336 LASSERT (sizeof (((ptl_hdr_t *)0)->msg.ack.dst_wmd) == 16);
1337 LASSERT (offsetof (ptl_hdr_t, msg.ack.match_bits) == 48);
1338 LASSERT (sizeof (((ptl_hdr_t *)0)->msg.ack.match_bits) == 8);
1339 LASSERT (offsetof (ptl_hdr_t, msg.ack.length) == 56);
1340 LASSERT (sizeof (((ptl_hdr_t *)0)->msg.ack.length) == 4);
1343 LASSERT (offsetof (ptl_hdr_t, msg.put.ptl_index) == 28);
1344 LASSERT (sizeof (((ptl_hdr_t *)0)->msg.put.ptl_index) == 4);
1345 LASSERT (offsetof (ptl_hdr_t, msg.put.ack_wmd) == 32);
1346 LASSERT (sizeof (((ptl_hdr_t *)0)->msg.put.ack_wmd) == 16);
1347 LASSERT (offsetof (ptl_hdr_t, msg.put.match_bits) == 48);
1348 LASSERT (sizeof (((ptl_hdr_t *)0)->msg.put.match_bits) == 8);
1349 LASSERT (offsetof (ptl_hdr_t, msg.put.length) == 56);
1350 LASSERT (sizeof (((ptl_hdr_t *)0)->msg.put.length) == 4);
1351 LASSERT (offsetof (ptl_hdr_t, msg.put.offset) == 60);
1352 LASSERT (sizeof (((ptl_hdr_t *)0)->msg.put.offset) == 4);
1353 LASSERT (offsetof (ptl_hdr_t, msg.put.hdr_data) == 64);
1354 LASSERT (sizeof (((ptl_hdr_t *)0)->msg.put.hdr_data) == 8);
1357 LASSERT (offsetof (ptl_hdr_t, msg.get.ptl_index) == 28);
1358 LASSERT (sizeof (((ptl_hdr_t *)0)->msg.get.ptl_index) == 4);
1359 LASSERT (offsetof (ptl_hdr_t, msg.get.return_wmd) == 32);
1360 LASSERT (sizeof (((ptl_hdr_t *)0)->msg.get.return_wmd) == 16);
1361 LASSERT (offsetof (ptl_hdr_t, msg.get.match_bits) == 48);
1362 LASSERT (sizeof (((ptl_hdr_t *)0)->msg.get.match_bits) == 8);
1363 LASSERT (offsetof (ptl_hdr_t, msg.get.length) == 56);
1364 LASSERT (sizeof (((ptl_hdr_t *)0)->msg.get.length) == 4);
1365 LASSERT (offsetof (ptl_hdr_t, msg.get.src_offset) == 60);
1366 LASSERT (sizeof (((ptl_hdr_t *)0)->msg.get.src_offset) == 4);
1367 LASSERT (offsetof (ptl_hdr_t, msg.get.return_offset) == 64);
1368 LASSERT (sizeof (((ptl_hdr_t *)0)->msg.get.return_offset) == 4);
1369 LASSERT (offsetof (ptl_hdr_t, msg.get.sink_length) == 68);
1370 LASSERT (sizeof (((ptl_hdr_t *)0)->msg.get.sink_length) == 4);
1373 LASSERT (offsetof (ptl_hdr_t, msg.reply.dst_wmd) == 32);
1374 LASSERT (sizeof (((ptl_hdr_t *)0)->msg.reply.dst_wmd) == 16);
1375 LASSERT (offsetof (ptl_hdr_t, msg.reply.dst_offset) == 48);
1376 LASSERT (sizeof (((ptl_hdr_t *)0)->msg.reply.dst_offset) == 4);
1377 LASSERT (offsetof (ptl_hdr_t, msg.reply.length) == 56);
1378 LASSERT (sizeof (((ptl_hdr_t *)0)->msg.reply.length) == 4);