1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Data movement routines
7 * Copyright (c) 2001-2003 Cluster File Systems, Inc.
8 * Copyright (c) 2001-2002 Sandia National Laboratories
10 * This file is part of Lustre, http://www.sf.net/projects/lustre/
12 * Lustre is free software; you can redistribute it and/or
13 * modify it under the terms of version 2 of the GNU General Public
14 * License as published by the Free Software Foundation.
16 * Lustre is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU General Public License for more details.
21 * You should have received a copy of the GNU General Public License
22 * along with Lustre; if not, write to the Free Software
23 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
29 # define DEBUG_SUBSYSTEM S_PORTALS
30 # include <linux/kp30.h>
32 #include <portals/p30.h>
33 #include <portals/lib-p30.h>
34 #include <portals/arg-blocks.h>
37 * Right now it does not check access control lists.
39 * We only support one MD per ME, which is how the Portals 3.1 spec is written.
40 * All previous complication is removed.
44 lib_find_me(nal_cb_t *nal, int index, int op_mask, ptl_nid_t src_nid,
45 ptl_pid_t src_pid, ptl_size_t rlength, ptl_size_t roffset,
46 ptl_match_bits_t match_bits, ptl_size_t *mlength_out,
47 ptl_size_t *offset_out, int *unlink_out)
49 lib_ni_t *ni = &nal->ni;
50 struct list_head *match_list = &ni->tbl.tbl[index];
51 struct list_head *tmp;
59 CDEBUG (D_NET, "Request from "LPU64".%d of length %d into portal %d "
60 "MB="LPX64"\n", src_nid, src_pid, rlength, index, match_bits);
62 if (index < 0 || index >= ni->tbl.size) {
63 CERROR("Invalid portal %d not in [0-%d]\n",
68 list_for_each (tmp, match_list) {
69 me = list_entry(tmp, lib_me_t, me_list);
72 /* ME attached but MD not attached yet */
76 LASSERT (me == md->me);
79 if (md->threshold == 0)
82 /* mismatched MD op */
83 if ((md->options & op_mask) == 0)
86 /* mismatched ME nid/pid? */
87 if (me->match_id.nid != PTL_NID_ANY &&
88 me->match_id.nid != src_nid)
91 if (me->match_id.pid != PTL_PID_ANY &&
92 me->match_id.pid != src_pid)
95 /* mismatched ME matchbits? */
96 if (((me->match_bits ^ match_bits) & ~me->ignore_bits) != 0)
99 /* Hurrah! This _is_ a match; check it out... */
101 if ((md->options & PTL_MD_MANAGE_REMOTE) == 0)
106 mlength = md->length - offset;
107 if ((md->options & PTL_MD_MAX_SIZE) != 0 &&
108 mlength > md->max_size)
109 mlength = md->max_size;
111 if (rlength <= mlength) { /* fits in allowed space */
113 } else if ((md->options & PTL_MD_TRUNCATE) == 0) {
114 /* this packet _really_ is too big */
115 CERROR("Matching packet %d too big: %d left, "
116 "%d allowed\n", rlength, md->length - offset,
121 md->offset = offset + mlength;
123 *offset_out = offset;
124 *mlength_out = mlength;
125 *unlink_out = ((md->options & PTL_MD_AUTO_UNLINK) != 0 &&
126 md->offset >= (md->length - md->max_size));
131 CERROR (LPU64": Dropping %s from "LPU64".%d portal %d match "LPX64
132 " offset %d length %d: no match\n",
133 ni->nid, (op_mask == PTL_MD_OP_GET) ? "GET" : "PUT",
134 src_nid, src_pid, index, match_bits, roffset, rlength);
138 int do_PtlFailNid (nal_cb_t *nal, void *private, void *v_args, void *v_ret)
140 PtlFailNid_in *args = v_args;
141 PtlFailNid_out *ret = v_ret;
144 struct list_head *el;
145 struct list_head *next;
146 struct list_head cull;
148 if (args->threshold != 0) {
149 /* Adding a new entry */
150 tp = (lib_test_peer_t *)nal->cb_malloc (nal, sizeof (*tp));
152 return (ret->rc = PTL_FAIL);
154 tp->tp_nid = args->nid;
155 tp->tp_threshold = args->threshold;
157 state_lock (nal, &flags);
158 list_add (&tp->tp_list, &nal->ni.ni_test_peers);
159 state_unlock (nal, &flags);
160 return (ret->rc = PTL_OK);
163 /* removing entries */
164 INIT_LIST_HEAD (&cull);
166 state_lock (nal, &flags);
168 list_for_each_safe (el, next, &nal->ni.ni_test_peers) {
169 tp = list_entry (el, lib_test_peer_t, tp_list);
171 if (tp->tp_threshold == 0 || /* needs culling anyway */
172 args->nid == PTL_NID_ANY || /* removing all entries */
173 tp->tp_nid == args->nid) /* matched this one */
175 list_del (&tp->tp_list);
176 list_add (&tp->tp_list, &cull);
180 state_unlock (nal, &flags);
182 while (!list_empty (&cull)) {
183 tp = list_entry (cull.next, lib_test_peer_t, tp_list);
185 list_del (&tp->tp_list);
186 nal->cb_free (nal, tp, sizeof (*tp));
188 return (ret->rc = PTL_OK);
192 fail_peer (nal_cb_t *nal, ptl_nid_t nid, int outgoing)
195 struct list_head *el;
196 struct list_head *next;
198 struct list_head cull;
201 INIT_LIST_HEAD (&cull);
203 state_lock (nal, &flags);
205 list_for_each_safe (el, next, &nal->ni.ni_test_peers) {
206 tp = list_entry (el, lib_test_peer_t, tp_list);
208 if (tp->tp_threshold == 0) {
211 /* only cull zombies on outgoing tests,
212 * since we may be at interrupt priority on
213 * incoming messages. */
214 list_del (&tp->tp_list);
215 list_add (&tp->tp_list, &cull);
220 if (tp->tp_nid == PTL_NID_ANY || /* fail every peer */
221 nid == tp->tp_nid) { /* fail this peer */
224 if (tp->tp_threshold != PTL_MD_THRESH_INF) {
227 tp->tp_threshold == 0) {
229 list_del (&tp->tp_list);
230 list_add (&tp->tp_list, &cull);
237 state_unlock (nal, &flags);
239 while (!list_empty (&cull)) {
240 tp = list_entry (cull.next, lib_test_peer_t, tp_list);
241 list_del (&tp->tp_list);
243 nal->cb_free (nal, tp, sizeof (*tp));
250 lib_iov_nob (int niov, struct iovec *iov)
255 nob += (iov++)->iov_len;
261 lib_copy_iov2buf (char *dest, int niov, struct iovec *iov, ptl_size_t len)
268 nob = MIN (iov->iov_len, len);
269 memcpy (dest, iov->iov_base, nob);
279 lib_copy_buf2iov (int niov, struct iovec *iov, char *src, ptl_size_t len)
286 nob = MIN (iov->iov_len, len);
287 memcpy (iov->iov_base, src, nob);
297 lib_extract_iov (struct iovec *dst, lib_md_t *md,
298 ptl_size_t offset, ptl_size_t len)
300 /* Initialise 'dst' to the subset of 'src' starting at 'offset',
301 * for exactly 'len' bytes, and return the number of entries.
302 * NB not destructive to 'src' */
303 int src_niov = md->md_niov;
304 struct iovec *src = md->md_iov.iov;
309 LASSERT (offset >= 0);
310 LASSERT (offset + len <= md->length);
312 if (len == 0) /* no data => */
313 return (0); /* no frags */
315 LASSERT (src_niov > 0);
316 while (offset >= src->iov_len) { /* skip initial frags */
317 offset -= src->iov_len;
320 LASSERT (src_niov > 0);
325 LASSERT (src_niov > 0);
326 LASSERT (dst_niov <= PTL_MD_MAX_IOV);
328 frag_len = src->iov_len - offset;
329 dst->iov_base = ((char *)src->iov_base) + offset;
331 if (len <= frag_len) {
336 dst->iov_len = frag_len;
349 lib_kiov_nob (int niov, ptl_kiov_t *kiov)
356 lib_copy_kiov2buf (char *dest, int niov, ptl_kiov_t *kiov, ptl_size_t len)
362 lib_copy_buf2kiov (int niov, ptl_kiov_t *kiov, char *dest, ptl_size_t len)
368 lib_extract_kiov (ptl_kiov_t *dst, lib_md_t *md,
369 ptl_size_t offset, ptl_size_t len)
377 lib_kiov_nob (int niov, ptl_kiov_t *kiov)
382 nob += (kiov++)->kiov_len;
388 lib_copy_kiov2buf (char *dest, int niov, ptl_kiov_t *kiov, ptl_size_t len)
393 LASSERT (!in_interrupt ());
397 nob = MIN (kiov->kiov_len, len);
399 addr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
400 memcpy (dest, addr, nob);
401 kunmap (kiov->kiov_page);
411 lib_copy_buf2kiov (int niov, ptl_kiov_t *kiov, char *src, ptl_size_t len)
416 LASSERT (!in_interrupt ());
420 nob = MIN (kiov->kiov_len, len);
422 addr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
423 memcpy (addr, src, nob);
424 kunmap (kiov->kiov_page);
434 lib_extract_kiov (ptl_kiov_t *dst, lib_md_t *md,
435 ptl_size_t offset, ptl_size_t len)
437 /* Initialise 'dst' to the subset of 'src' starting at 'offset',
438 * for exactly 'len' bytes, and return the number of entries.
439 * NB not destructive to 'src' */
440 int src_niov = md->md_niov;
441 ptl_kiov_t *src = md->md_iov.kiov;
446 LASSERT (offset >= 0);
447 LASSERT (offset + len <= md->length);
449 if (len == 0) /* no data => */
450 return (0); /* no frags */
452 LASSERT (src_niov > 0);
453 while (offset >= src->kiov_len) { /* skip initial frags */
454 offset -= src->kiov_len;
457 LASSERT (src_niov > 0);
462 LASSERT (src_niov > 0);
463 LASSERT (dst_niov <= PTL_MD_MAX_IOV);
465 frag_len = src->kiov_len - offset;
466 dst->kiov_page = src->kiov_page;
467 dst->kiov_offset = src->kiov_offset + offset;
469 if (len <= frag_len) {
471 LASSERT (dst->kiov_offset + dst->kiov_len <= PAGE_SIZE);
475 dst->kiov_len = frag_len;
476 LASSERT (dst->kiov_offset + dst->kiov_len <= PAGE_SIZE);
489 lib_recv (nal_cb_t *nal, void *private, lib_msg_t *msg, lib_md_t *md,
490 ptl_size_t offset, ptl_size_t mlen, ptl_size_t rlen)
495 nal->cb_recv (nal, private, msg, 0, NULL, 0, rlen);
496 else if ((md->options & PTL_MD_KIOV) == 0) {
497 niov = lib_extract_iov (msg->msg_iov.iov, md, offset, mlen);
498 nal->cb_recv (nal, private, msg,
499 niov, msg->msg_iov.iov, mlen, rlen);
501 niov = lib_extract_kiov (msg->msg_iov.kiov, md, offset, mlen);
502 nal->cb_recv_pages (nal, private, msg,
503 niov, msg->msg_iov.kiov, mlen, rlen);
508 lib_send (nal_cb_t *nal, void *private, lib_msg_t *msg,
509 ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
510 lib_md_t *md, ptl_size_t offset, ptl_size_t len)
515 return (nal->cb_send (nal, private, msg,
519 if ((md->options & PTL_MD_KIOV) == 0) {
520 niov = lib_extract_iov (msg->msg_iov.iov, md, offset, len);
521 return (nal->cb_send (nal, private, msg,
523 niov, msg->msg_iov.iov, len));
526 niov = lib_extract_kiov (msg->msg_iov.kiov, md, offset, len);
527 return (nal->cb_send_pages (nal, private, msg,
529 niov, msg->msg_iov.kiov, len));
533 get_new_msg (nal_cb_t *nal, lib_md_t *md)
535 /* ALWAYS called holding the state_lock */
536 lib_counters_t *counters = &nal->ni.counters;
537 lib_msg_t *msg = lib_msg_alloc (nal);
542 memset (msg, 0, sizeof (*msg));
547 do_gettimeofday(&msg->ev.arrival_time);
549 if (md->threshold != PTL_MD_THRESH_INF) {
550 LASSERT (md->threshold > 0);
554 counters->msgs_alloc++;
555 if (counters->msgs_alloc > counters->msgs_max)
556 counters->msgs_max = counters->msgs_alloc;
558 list_add (&msg->msg_list, &nal->ni.ni_active_msgs);
564 * Incoming messages have a ptl_msg_t object associated with them
565 * by the library. This object encapsulates the state of the
566 * message and allows the NAL to do non-blocking receives or sends
570 static int parse_put(nal_cb_t * nal, ptl_hdr_t * hdr, void *private)
572 lib_ni_t *ni = &nal->ni;
573 ptl_size_t mlength = 0;
574 ptl_size_t offset = 0;
581 /* Convert put fields to host byte order */
582 hdr->msg.put.match_bits = NTOH__u64 (hdr->msg.put.match_bits);
583 hdr->msg.put.ptl_index = NTOH__u32 (hdr->msg.put.ptl_index);
584 hdr->msg.put.offset = NTOH__u32 (hdr->msg.put.offset);
586 state_lock(nal, &flags);
588 me = lib_find_me(nal, hdr->msg.put.ptl_index, PTL_MD_OP_PUT,
589 hdr->src_nid, hdr->src_pid,
590 PTL_HDR_LENGTH (hdr), hdr->msg.put.offset,
591 hdr->msg.put.match_bits,
592 &mlength, &offset, &unlink);
597 CDEBUG(D_NET, "Incoming put index %x from "LPU64"/%u of length %d/%d "
598 "into md "LPX64" [%d] + %d\n", hdr->msg.put.ptl_index,
599 hdr->src_nid, hdr->src_pid, mlength, PTL_HDR_LENGTH(hdr),
600 md->md_lh.lh_cookie, md->md_niov, offset);
602 msg = get_new_msg (nal, md);
604 CERROR(LPU64": Dropping PUT from "LPU64": can't allocate msg\n",
605 ni->nid, hdr->src_nid);
609 if (!ptl_is_wire_handle_none(&hdr->msg.put.ack_wmd) &&
610 !(md->options & PTL_MD_ACK_DISABLE)) {
612 msg->ack_wmd = hdr->msg.put.ack_wmd;
613 msg->nid = hdr->src_nid;
614 msg->pid = hdr->src_pid;
615 msg->ev.match_bits = hdr->msg.put.match_bits;
619 msg->ev.type = PTL_EVENT_PUT;
620 msg->ev.initiator.nid = hdr->src_nid;
621 msg->ev.initiator.pid = hdr->src_pid;
622 msg->ev.portal = hdr->msg.put.ptl_index;
623 msg->ev.match_bits = hdr->msg.put.match_bits;
624 msg->ev.rlength = PTL_HDR_LENGTH(hdr);
625 msg->ev.mlength = mlength;
626 msg->ev.offset = offset;
627 msg->ev.hdr_data = hdr->msg.put.hdr_data;
629 /* NB if this match has exhausted the MD, we can't be sure
630 * that this event will the the last one associated with
631 * this MD in the event queue (another message already
632 * matching this ME/MD could end up being last). So we
633 * remember the ME handle anyway and check again when we're
634 * allocating our slot in the event queue.
636 ptl_me2handle (&msg->ev.unlinked_me, me);
638 lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
641 ni->counters.recv_count++;
642 ni->counters.recv_length += mlength;
644 /* only unlink after MD's pending count has been bumped
645 * in get_new_msg() otherwise lib_me_unlink() will nuke it */
647 md->md_flags |= PTL_MD_FLAG_AUTO_UNLINKED;
648 lib_me_unlink (nal, me);
651 state_unlock(nal, &flags);
653 lib_recv (nal, private, msg, md, offset, mlength, PTL_HDR_LENGTH (hdr));
657 nal->ni.counters.drop_count++;
658 nal->ni.counters.drop_length += PTL_HDR_LENGTH(hdr);
659 state_unlock (nal, &flags);
660 lib_recv (nal, private, NULL, NULL, 0, 0, PTL_HDR_LENGTH (hdr));
664 static int parse_get(nal_cb_t * nal, ptl_hdr_t * hdr, void *private)
666 lib_ni_t *ni = &nal->ni;
667 ptl_size_t mlength = 0;
668 ptl_size_t offset = 0;
677 /* Convert get fields to host byte order */
678 hdr->msg.get.match_bits = NTOH__u64 (hdr->msg.get.match_bits);
679 hdr->msg.get.ptl_index = NTOH__u32 (hdr->msg.get.ptl_index);
680 hdr->msg.get.sink_length = NTOH__u32 (hdr->msg.get.sink_length);
681 hdr->msg.get.src_offset = NTOH__u32 (hdr->msg.get.src_offset);
683 /* compatibility check until field is deleted */
684 if (hdr->msg.get.return_offset != 0)
685 CERROR("Unexpected non-zero get.return_offset %x from "
686 LPU64"\n", hdr->msg.get.return_offset, hdr->src_nid);
688 state_lock(nal, &flags);
690 me = lib_find_me(nal, hdr->msg.get.ptl_index, PTL_MD_OP_GET,
691 hdr->src_nid, hdr->src_pid,
692 hdr->msg.get.sink_length, hdr->msg.get.src_offset,
693 hdr->msg.get.match_bits,
694 &mlength, &offset, &unlink);
699 CDEBUG(D_NET, "Incoming get index %d from "LPU64".%u of length %d/%d "
700 "from md "LPX64" [%d] + %d\n", hdr->msg.get.ptl_index,
701 hdr->src_nid, hdr->src_pid, mlength, PTL_HDR_LENGTH(hdr),
702 md->md_lh.lh_cookie, md->md_niov, offset);
704 msg = get_new_msg (nal, md);
706 CERROR(LPU64": Dropping GET from "LPU64": can't allocate msg\n",
707 ni->nid, hdr->src_nid);
712 msg->ev.type = PTL_EVENT_GET;
713 msg->ev.initiator.nid = hdr->src_nid;
714 msg->ev.initiator.pid = hdr->src_pid;
715 msg->ev.portal = hdr->msg.get.ptl_index;
716 msg->ev.match_bits = hdr->msg.get.match_bits;
717 msg->ev.rlength = PTL_HDR_LENGTH(hdr);
718 msg->ev.mlength = mlength;
719 msg->ev.offset = offset;
720 msg->ev.hdr_data = 0;
722 /* NB if this match has exhausted the MD, we can't be sure
723 * that this event will the the last one associated with
724 * this MD in the event queue (another message already
725 * matching this ME/MD could end up being last). So we
726 * remember the ME handle anyway and check again when we're
727 * allocating our slot in the event queue.
729 ptl_me2handle (&msg->ev.unlinked_me, me);
731 lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
734 ni->counters.send_count++;
735 ni->counters.send_length += mlength;
737 /* only unlink after MD's refcount has been bumped
738 * in get_new_msg() otherwise lib_me_unlink() will nuke it */
740 md->md_flags |= PTL_MD_FLAG_AUTO_UNLINKED;
741 lib_me_unlink (nal, me);
744 state_unlock(nal, &flags);
746 memset (&reply, 0, sizeof (reply));
747 reply.type = HTON__u32 (PTL_MSG_REPLY);
748 reply.dest_nid = HTON__u64 (hdr->src_nid);
749 reply.src_nid = HTON__u64 (ni->nid);
750 reply.dest_pid = HTON__u32 (hdr->src_pid);
751 reply.src_pid = HTON__u32 (ni->pid);
752 PTL_HDR_LENGTH(&reply) = HTON__u32 (mlength);
754 reply.msg.reply.dst_wmd = hdr->msg.get.return_wmd;
756 rc = lib_send (nal, private, msg, &reply, PTL_MSG_REPLY,
757 hdr->src_nid, hdr->src_pid, md, offset, mlength);
759 CERROR(LPU64": Dropping GET from "LPU64": send REPLY failed\n",
760 ni->nid, hdr->src_nid);
761 /* Hmm, this will create a GET event and make believe
762 * the reply completed, which it kind of did, only the
763 * source won't get her reply */
764 lib_finalize (nal, private, msg);
765 state_lock (nal, &flags);
769 /* Complete the incoming message */
770 lib_recv (nal, private, NULL, NULL, 0, 0, PTL_HDR_LENGTH (hdr));
773 ni->counters.drop_count++;
774 ni->counters.drop_length += hdr->msg.get.sink_length;
775 state_unlock(nal, &flags);
776 lib_recv (nal, private, NULL, NULL, 0, 0, PTL_HDR_LENGTH (hdr));
780 static int parse_reply(nal_cb_t * nal, ptl_hdr_t * hdr, void *private)
782 lib_ni_t *ni = &nal->ni;
789 /* compatibility check until field is deleted */
790 if (hdr->msg.reply.dst_offset != 0)
791 CERROR("Unexpected non-zero reply.dst_offset %x from "LPU64"\n",
792 hdr->msg.reply.dst_offset, hdr->src_nid);
794 state_lock(nal, &flags);
796 /* NB handles only looked up by creator (no flips) */
797 md = ptl_wire_handle2md(&hdr->msg.reply.dst_wmd, nal);
798 if (md == NULL || md->threshold == 0) {
799 CERROR (LPU64": Dropping REPLY from "LPU64" for %s MD "LPX64"."LPX64"\n",
800 ni->nid, hdr->src_nid,
801 md == NULL ? "invalid" : "inactive",
802 hdr->msg.reply.dst_wmd.wh_interface_cookie,
803 hdr->msg.reply.dst_wmd.wh_object_cookie);
807 LASSERT (md->offset == 0);
809 length = rlength = PTL_HDR_LENGTH(hdr);
811 if (length > md->length) {
812 if ((md->options & PTL_MD_TRUNCATE) == 0) {
813 CERROR (LPU64": Dropping REPLY from "LPU64
814 " length %d for MD "LPX64" would overflow (%d)\n",
815 ni->nid, hdr->src_nid, length,
816 hdr->msg.reply.dst_wmd.wh_object_cookie,
823 CDEBUG(D_NET, "Reply from "LPU64" of length %d/%d into md "LPX64"\n",
824 hdr->src_nid, length, rlength,
825 hdr->msg.reply.dst_wmd.wh_object_cookie);
827 msg = get_new_msg (nal, md);
829 CERROR(LPU64": Dropping REPLY from "LPU64": can't "
830 "allocate msg\n", ni->nid, hdr->src_nid);
835 msg->ev.type = PTL_EVENT_REPLY;
836 msg->ev.initiator.nid = hdr->src_nid;
837 msg->ev.initiator.pid = hdr->src_pid;
838 msg->ev.rlength = rlength;
839 msg->ev.mlength = length;
842 lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
845 ni->counters.recv_count++;
846 ni->counters.recv_length += length;
848 state_unlock(nal, &flags);
850 lib_recv (nal, private, msg, md, 0, length, rlength);
854 nal->ni.counters.drop_count++;
855 nal->ni.counters.drop_length += PTL_HDR_LENGTH(hdr);
856 state_unlock (nal, &flags);
857 lib_recv (nal, private, NULL, NULL, 0, 0, PTL_HDR_LENGTH (hdr));
861 static int parse_ack(nal_cb_t * nal, ptl_hdr_t * hdr, void *private)
863 lib_ni_t *ni = &nal->ni;
865 lib_msg_t *msg = NULL;
868 /* Convert ack fields to host byte order */
869 hdr->msg.ack.match_bits = NTOH__u64 (hdr->msg.ack.match_bits);
870 hdr->msg.ack.mlength = NTOH__u32 (hdr->msg.ack.mlength);
872 state_lock(nal, &flags);
874 /* NB handles only looked up by creator (no flips) */
875 md = ptl_wire_handle2md(&hdr->msg.ack.dst_wmd, nal);
876 if (md == NULL || md->threshold == 0) {
877 CDEBUG(D_INFO, LPU64": Dropping ACK from "LPU64" to %s MD "
878 LPX64"."LPX64"\n", ni->nid, hdr->src_nid,
879 (md == NULL) ? "invalid" : "inactive",
880 hdr->msg.ack.dst_wmd.wh_interface_cookie,
881 hdr->msg.ack.dst_wmd.wh_object_cookie);
885 CDEBUG(D_NET, LPU64": ACK from "LPU64" into md "LPX64"\n",
886 ni->nid, hdr->src_nid,
887 hdr->msg.ack.dst_wmd.wh_object_cookie);
889 msg = get_new_msg (nal, md);
891 CERROR(LPU64": Dropping ACK from "LPU64": can't allocate msg\n",
892 ni->nid, hdr->src_nid);
897 msg->ev.type = PTL_EVENT_ACK;
898 msg->ev.initiator.nid = hdr->src_nid;
899 msg->ev.initiator.pid = hdr->src_pid;
900 msg->ev.mlength = hdr->msg.ack.mlength;
901 msg->ev.match_bits = hdr->msg.ack.match_bits;
903 lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
906 ni->counters.recv_count++;
907 state_unlock(nal, &flags);
908 lib_recv (nal, private, msg, NULL, 0, 0, PTL_HDR_LENGTH (hdr));
912 nal->ni.counters.drop_count++;
913 state_unlock (nal, &flags);
914 lib_recv (nal, private, NULL, NULL, 0, 0, PTL_HDR_LENGTH (hdr));
919 hdr_type_string (ptl_hdr_t *hdr)
933 return ("<UNKNOWN>");
937 void print_hdr(nal_cb_t * nal, ptl_hdr_t * hdr)
939 char *type_str = hdr_type_string (hdr);
941 nal->cb_printf(nal, "P3 Header at %p of type %s\n", hdr, type_str);
942 nal->cb_printf(nal, " From nid/pid %Lu/%Lu", hdr->src_nid,
944 nal->cb_printf(nal, " To nid/pid %Lu/%Lu\n", hdr->dest_nid,
953 " Ptl index %d, ack md "LPX64"."LPX64", "
954 "match bits "LPX64"\n",
955 hdr->msg.put.ptl_index,
956 hdr->msg.put.ack_wmd.wh_interface_cookie,
957 hdr->msg.put.ack_wmd.wh_object_cookie,
958 hdr->msg.put.match_bits);
960 " Length %d, offset %d, hdr data "LPX64"\n",
961 PTL_HDR_LENGTH(hdr), hdr->msg.put.offset,
962 hdr->msg.put.hdr_data);
967 " Ptl index %d, return md "LPX64"."LPX64", "
968 "match bits "LPX64"\n", hdr->msg.get.ptl_index,
969 hdr->msg.get.return_wmd.wh_interface_cookie,
970 hdr->msg.get.return_wmd.wh_object_cookie,
971 hdr->msg.get.match_bits);
973 " Length %d, src offset %d\n",
974 hdr->msg.get.sink_length,
975 hdr->msg.get.src_offset);
979 nal->cb_printf(nal, " dst md "LPX64"."LPX64", "
980 "manipulated length %d\n",
981 hdr->msg.ack.dst_wmd.wh_interface_cookie,
982 hdr->msg.ack.dst_wmd.wh_object_cookie,
983 hdr->msg.ack.mlength);
987 nal->cb_printf(nal, " dst md "LPX64"."LPX64", "
989 hdr->msg.reply.dst_wmd.wh_interface_cookie,
990 hdr->msg.reply.dst_wmd.wh_object_cookie,
991 PTL_HDR_LENGTH(hdr));
994 } /* end of print_hdr() */
997 int lib_parse(nal_cb_t * nal, ptl_hdr_t * hdr, void *private)
1001 /* NB static check; optimizer will elide this if it's right */
1002 LASSERT (offsetof (ptl_hdr_t, msg.ack.length) ==
1003 offsetof (ptl_hdr_t, msg.put.length));
1004 LASSERT (offsetof (ptl_hdr_t, msg.ack.length) ==
1005 offsetof (ptl_hdr_t, msg.get.length));
1006 LASSERT (offsetof (ptl_hdr_t, msg.ack.length) ==
1007 offsetof (ptl_hdr_t, msg.reply.length));
1009 /* convert common fields to host byte order */
1010 hdr->dest_nid = NTOH__u64 (hdr->dest_nid);
1011 hdr->src_nid = NTOH__u64 (hdr->src_nid);
1012 hdr->dest_pid = NTOH__u32 (hdr->dest_pid);
1013 hdr->src_pid = NTOH__u32 (hdr->src_pid);
1014 hdr->type = NTOH__u32 (hdr->type);
1015 PTL_HDR_LENGTH(hdr) = NTOH__u32 (PTL_HDR_LENGTH(hdr));
1017 nal->cb_printf(nal, "%d: lib_parse: nal=%p hdr=%p type=%d\n",
1018 nal->ni.nid, nal, hdr, hdr->type);
1019 print_hdr(nal, hdr);
1021 if (hdr->type == PTL_MSG_HELLO) {
1022 /* dest_nid is really ptl_magicversion_t */
1023 ptl_magicversion_t *mv = (ptl_magicversion_t *)&hdr->dest_nid;
1025 CERROR (LPU64": Dropping unexpected HELLO message: "
1026 "magic %d, version %d.%d from "LPD64"\n",
1027 nal->ni.nid, mv->magic,
1028 mv->version_major, mv->version_minor,
1030 lib_recv (nal, private, NULL, NULL, 0, 0, PTL_HDR_LENGTH (hdr));
1034 if (hdr->dest_nid != nal->ni.nid) {
1035 CERROR(LPU64": Dropping %s message from "LPU64" to "LPU64
1036 " (not me)\n", nal->ni.nid, hdr_type_string (hdr),
1037 hdr->src_nid, hdr->dest_nid);
1039 state_lock (nal, &flags);
1040 nal->ni.counters.drop_count++;
1041 nal->ni.counters.drop_length += PTL_HDR_LENGTH(hdr);
1042 state_unlock (nal, &flags);
1044 lib_recv (nal, private, NULL, NULL, 0, 0, PTL_HDR_LENGTH (hdr));
1048 if (!list_empty (&nal->ni.ni_test_peers) && /* normally we don't */
1049 fail_peer (nal, hdr->src_nid, 0)) /* shall we now? */
1051 CERROR(LPU64": Dropping incoming %s from "LPU64
1052 ": simulated failure\n",
1053 nal->ni.nid, hdr_type_string (hdr),
1058 switch (hdr->type) {
1060 return (parse_ack(nal, hdr, private));
1062 return (parse_put(nal, hdr, private));
1065 return (parse_get(nal, hdr, private));
1068 return (parse_reply(nal, hdr, private));
1071 CERROR(LPU64": Dropping <unknown> message from "LPU64
1072 ": Bad type=0x%x\n", nal->ni.nid, hdr->src_nid,
1075 lib_recv (nal, private, NULL, NULL, 0, 0, PTL_HDR_LENGTH (hdr));
1081 int do_PtlPut(nal_cb_t * nal, void *private, void *v_args, void *v_ret)
1085 * ptl_handle_md_t md_in
1086 * ptl_ack_req_t ack_req_in
1087 * ptl_process_id_t target_in
1088 * ptl_pt_index_t portal_in
1089 * ptl_ac_index_t cookie_in
1090 * ptl_match_bits_t match_bits_in
1091 * ptl_size_t offset_in
1096 PtlPut_in *args = v_args;
1097 PtlPut_out *ret = v_ret;
1100 lib_ni_t *ni = &nal->ni;
1102 lib_msg_t *msg = NULL;
1103 ptl_process_id_t *id = &args->target_in;
1104 unsigned long flags;
1107 if (!list_empty (&nal->ni.ni_test_peers) && /* normally we don't */
1108 fail_peer (nal, id->nid, 1)) /* shall we now? */
1110 CERROR(LPU64": Dropping PUT to "LPU64": simulated failure\n",
1111 nal->ni.nid, id->nid);
1112 return (ret->rc = PTL_INV_PROC);
1116 state_lock(nal, &flags);
1117 md = ptl_handle2md(&args->md_in, nal);
1118 if (md == NULL || !md->threshold) {
1119 state_unlock(nal, &flags);
1120 return ret->rc = PTL_INV_MD;
1123 CDEBUG(D_NET, "PtlPut -> %Lu: %lu\n", (unsigned long long)id->nid,
1124 (unsigned long)id->pid);
1126 memset (&hdr, 0, sizeof (hdr));
1127 hdr.type = HTON__u32 (PTL_MSG_PUT);
1128 hdr.dest_nid = HTON__u64 (id->nid);
1129 hdr.src_nid = HTON__u64 (ni->nid);
1130 hdr.dest_pid = HTON__u32 (id->pid);
1131 hdr.src_pid = HTON__u32 (ni->pid);
1132 PTL_HDR_LENGTH(&hdr) = HTON__u32 (md->length);
1134 /* NB handles only looked up by creator (no flips) */
1135 if (args->ack_req_in == PTL_ACK_REQ) {
1136 hdr.msg.put.ack_wmd.wh_interface_cookie = ni->ni_interface_cookie;
1137 hdr.msg.put.ack_wmd.wh_object_cookie = md->md_lh.lh_cookie;
1139 hdr.msg.put.ack_wmd = PTL_WIRE_HANDLE_NONE;
1142 hdr.msg.put.match_bits = HTON__u64 (args->match_bits_in);
1143 hdr.msg.put.ptl_index = HTON__u32 (args->portal_in);
1144 hdr.msg.put.offset = HTON__u32 (args->offset_in);
1145 hdr.msg.put.hdr_data = args->hdr_data_in;
1147 ni->counters.send_count++;
1148 ni->counters.send_length += md->length;
1150 msg = get_new_msg (nal, md);
1152 CERROR("BAD: could not allocate msg!\n");
1153 state_unlock(nal, &flags);
1154 return ret->rc = PTL_NOSPACE;
1158 * If this memory descriptor has an event queue associated with
1159 * it we need to allocate a message state object and record the
1160 * information about this operation that will be recorded into
1161 * event queue once the message has been completed.
1163 * NB. We're now committed to the GET, since we just marked the MD
1164 * busy. Callers who observe this (by getting PTL_MD_INUSE from
1165 * PtlMDUnlink()) expect a completion event to tell them when the
1169 msg->ev.type = PTL_EVENT_SENT;
1170 msg->ev.initiator.nid = ni->nid;
1171 msg->ev.initiator.pid = ni->pid;
1172 msg->ev.portal = args->portal_in;
1173 msg->ev.match_bits = args->match_bits_in;
1174 msg->ev.rlength = md->length;
1175 msg->ev.mlength = md->length;
1176 msg->ev.offset = args->offset_in;
1177 msg->ev.hdr_data = args->hdr_data_in;
1179 lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
1182 state_unlock(nal, &flags);
1184 rc = lib_send (nal, private, msg, &hdr, PTL_MSG_PUT,
1185 id->nid, id->pid, md, 0, md->length);
1187 /* get_new_msg() committed us to sending by decrementing
1188 * md->threshold, so we have to act like we did send, but
1189 * the network dropped it. */
1190 lib_finalize (nal, private, msg);
1193 return ret->rc = PTL_OK;
1197 int do_PtlGet(nal_cb_t * nal, void *private, void *v_args, void *v_ret)
1201 * ptl_handle_md_t md_in
1202 * ptl_process_id_t target_in
1203 * ptl_pt_index_t portal_in
1204 * ptl_ac_index_t cookie_in
1205 * ptl_match_bits_t match_bits_in
1206 * ptl_size_t offset_in
1211 PtlGet_in *args = v_args;
1212 PtlGet_out *ret = v_ret;
1214 lib_msg_t *msg = NULL;
1215 lib_ni_t *ni = &nal->ni;
1216 ptl_process_id_t *id = &args->target_in;
1218 unsigned long flags;
1221 if (!list_empty (&nal->ni.ni_test_peers) && /* normally we don't */
1222 fail_peer (nal, id->nid, 1)) /* shall we now? */
1224 CERROR(LPU64": Dropping PUT to "LPU64": simulated failure\n",
1225 nal->ni.nid, id->nid);
1226 return (ret->rc = PTL_INV_PROC);
1229 state_lock(nal, &flags);
1230 md = ptl_handle2md(&args->md_in, nal);
1231 if (md == NULL || !md->threshold) {
1232 state_unlock(nal, &flags);
1233 return ret->rc = PTL_INV_MD;
1236 LASSERT (md->offset == 0);
1238 CDEBUG(D_NET, "PtlGet -> %Lu: %lu\n", (unsigned long long)id->nid,
1239 (unsigned long)id->pid);
1241 memset (&hdr, 0, sizeof (hdr));
1242 hdr.type = HTON__u32 (PTL_MSG_GET);
1243 hdr.dest_nid = HTON__u64 (id->nid);
1244 hdr.src_nid = HTON__u64 (ni->nid);
1245 hdr.dest_pid = HTON__u32 (id->pid);
1246 hdr.src_pid = HTON__u32 (ni->pid);
1247 PTL_HDR_LENGTH(&hdr) = 0;
1249 /* NB handles only looked up by creator (no flips) */
1250 hdr.msg.get.return_wmd.wh_interface_cookie = ni->ni_interface_cookie;
1251 hdr.msg.get.return_wmd.wh_object_cookie = md->md_lh.lh_cookie;
1253 hdr.msg.get.match_bits = HTON__u64 (args->match_bits_in);
1254 hdr.msg.get.ptl_index = HTON__u32 (args->portal_in);
1255 hdr.msg.get.src_offset = HTON__u32 (args->offset_in);
1256 hdr.msg.get.sink_length = HTON__u32 (md->length);
1258 ni->counters.send_count++;
1260 msg = get_new_msg (nal, md);
1262 CERROR("do_PtlGet: BAD - could not allocate cookie!\n");
1263 state_unlock(nal, &flags);
1264 return ret->rc = PTL_NOSPACE;
1268 * If this memory descriptor has an event queue associated with
1269 * it we must allocate a message state object that will record
1270 * the information to be filled in once the message has been
1271 * completed. More information is in the do_PtlPut() comments.
1273 * NB. We're now committed to the GET, since we just marked the MD
1274 * busy. Callers who observe this (by getting PTL_MD_INUSE from
1275 * PtlMDUnlink()) expect a completion event to tell them when the
1279 msg->ev.type = PTL_EVENT_SENT;
1280 msg->ev.initiator.nid = ni->nid;
1281 msg->ev.initiator.pid = ni->pid;
1282 msg->ev.portal = args->portal_in;
1283 msg->ev.match_bits = args->match_bits_in;
1284 msg->ev.rlength = md->length;
1285 msg->ev.mlength = md->length;
1286 msg->ev.offset = args->offset_in;
1287 msg->ev.hdr_data = 0;
1289 lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
1292 state_unlock(nal, &flags);
1294 rc = lib_send (nal, private, msg, &hdr, PTL_MSG_GET,
1295 id->nid, id->pid, NULL, 0, 0);
1297 /* get_new_msg() committed us to sending by decrementing
1298 * md->threshold, so we have to act like we did send, but
1299 * the network dropped it. */
1300 lib_finalize (nal, private, msg);
1303 return ret->rc = PTL_OK;
1306 void lib_assert_wire_constants (void)
1308 /* Wire protocol assertions generated by 'wirecheck' */
1311 LASSERT (PORTALS_PROTO_MAGIC == 0xeebc0ded);
1312 LASSERT (PORTALS_PROTO_VERSION_MAJOR == 0);
1313 LASSERT (PORTALS_PROTO_VERSION_MINOR == 1);
1314 LASSERT (PTL_MSG_ACK == 0);
1315 LASSERT (PTL_MSG_PUT == 1);
1316 LASSERT (PTL_MSG_GET == 2);
1317 LASSERT (PTL_MSG_REPLY == 3);
1318 LASSERT (PTL_MSG_HELLO == 4);
1320 /* Checks for struct ptl_handle_wire_t */
1321 LASSERT (sizeof (ptl_handle_wire_t) == 16);
1322 LASSERT (offsetof (ptl_handle_wire_t, wh_interface_cookie) == 0);
1323 LASSERT (sizeof (((ptl_handle_wire_t *)0)->wh_interface_cookie) == 8);
1324 LASSERT (offsetof (ptl_handle_wire_t, wh_object_cookie) == 8);
1325 LASSERT (sizeof (((ptl_handle_wire_t *)0)->wh_object_cookie) == 8);
1327 /* Checks for struct ptl_magicversion_t */
1328 LASSERT (sizeof (ptl_magicversion_t) == 8);
1329 LASSERT (offsetof (ptl_magicversion_t, magic) == 0);
1330 LASSERT (sizeof (((ptl_magicversion_t *)0)->magic) == 4);
1331 LASSERT (offsetof (ptl_magicversion_t, version_major) == 4);
1332 LASSERT (sizeof (((ptl_magicversion_t *)0)->version_major) == 2);
1333 LASSERT (offsetof (ptl_magicversion_t, version_minor) == 6);
1334 LASSERT (sizeof (((ptl_magicversion_t *)0)->version_minor) == 2);
1336 /* Checks for struct ptl_hdr_t */
1337 LASSERT (sizeof (ptl_hdr_t) == 72);
1338 LASSERT (offsetof (ptl_hdr_t, dest_nid) == 0);
1339 LASSERT (sizeof (((ptl_hdr_t *)0)->dest_nid) == 8);
1340 LASSERT (offsetof (ptl_hdr_t, src_nid) == 8);
1341 LASSERT (sizeof (((ptl_hdr_t *)0)->src_nid) == 8);
1342 LASSERT (offsetof (ptl_hdr_t, dest_pid) == 16);
1343 LASSERT (sizeof (((ptl_hdr_t *)0)->dest_pid) == 4);
1344 LASSERT (offsetof (ptl_hdr_t, src_pid) == 20);
1345 LASSERT (sizeof (((ptl_hdr_t *)0)->src_pid) == 4);
1346 LASSERT (offsetof (ptl_hdr_t, type) == 24);
1347 LASSERT (sizeof (((ptl_hdr_t *)0)->type) == 4);
1350 LASSERT (offsetof (ptl_hdr_t, msg.ack.mlength) == 28);
1351 LASSERT (sizeof (((ptl_hdr_t *)0)->msg.ack.mlength) == 4);
1352 LASSERT (offsetof (ptl_hdr_t, msg.ack.dst_wmd) == 32);
1353 LASSERT (sizeof (((ptl_hdr_t *)0)->msg.ack.dst_wmd) == 16);
1354 LASSERT (offsetof (ptl_hdr_t, msg.ack.match_bits) == 48);
1355 LASSERT (sizeof (((ptl_hdr_t *)0)->msg.ack.match_bits) == 8);
1356 LASSERT (offsetof (ptl_hdr_t, msg.ack.length) == 56);
1357 LASSERT (sizeof (((ptl_hdr_t *)0)->msg.ack.length) == 4);
1360 LASSERT (offsetof (ptl_hdr_t, msg.put.ptl_index) == 28);
1361 LASSERT (sizeof (((ptl_hdr_t *)0)->msg.put.ptl_index) == 4);
1362 LASSERT (offsetof (ptl_hdr_t, msg.put.ack_wmd) == 32);
1363 LASSERT (sizeof (((ptl_hdr_t *)0)->msg.put.ack_wmd) == 16);
1364 LASSERT (offsetof (ptl_hdr_t, msg.put.match_bits) == 48);
1365 LASSERT (sizeof (((ptl_hdr_t *)0)->msg.put.match_bits) == 8);
1366 LASSERT (offsetof (ptl_hdr_t, msg.put.length) == 56);
1367 LASSERT (sizeof (((ptl_hdr_t *)0)->msg.put.length) == 4);
1368 LASSERT (offsetof (ptl_hdr_t, msg.put.offset) == 60);
1369 LASSERT (sizeof (((ptl_hdr_t *)0)->msg.put.offset) == 4);
1370 LASSERT (offsetof (ptl_hdr_t, msg.put.hdr_data) == 64);
1371 LASSERT (sizeof (((ptl_hdr_t *)0)->msg.put.hdr_data) == 8);
1374 LASSERT (offsetof (ptl_hdr_t, msg.get.ptl_index) == 28);
1375 LASSERT (sizeof (((ptl_hdr_t *)0)->msg.get.ptl_index) == 4);
1376 LASSERT (offsetof (ptl_hdr_t, msg.get.return_wmd) == 32);
1377 LASSERT (sizeof (((ptl_hdr_t *)0)->msg.get.return_wmd) == 16);
1378 LASSERT (offsetof (ptl_hdr_t, msg.get.match_bits) == 48);
1379 LASSERT (sizeof (((ptl_hdr_t *)0)->msg.get.match_bits) == 8);
1380 LASSERT (offsetof (ptl_hdr_t, msg.get.length) == 56);
1381 LASSERT (sizeof (((ptl_hdr_t *)0)->msg.get.length) == 4);
1382 LASSERT (offsetof (ptl_hdr_t, msg.get.src_offset) == 60);
1383 LASSERT (sizeof (((ptl_hdr_t *)0)->msg.get.src_offset) == 4);
1384 LASSERT (offsetof (ptl_hdr_t, msg.get.return_offset) == 64);
1385 LASSERT (sizeof (((ptl_hdr_t *)0)->msg.get.return_offset) == 4);
1386 LASSERT (offsetof (ptl_hdr_t, msg.get.sink_length) == 68);
1387 LASSERT (sizeof (((ptl_hdr_t *)0)->msg.get.sink_length) == 4);
1390 LASSERT (offsetof (ptl_hdr_t, msg.reply.dst_wmd) == 32);
1391 LASSERT (sizeof (((ptl_hdr_t *)0)->msg.reply.dst_wmd) == 16);
1392 LASSERT (offsetof (ptl_hdr_t, msg.reply.dst_offset) == 48);
1393 LASSERT (sizeof (((ptl_hdr_t *)0)->msg.reply.dst_offset) == 4);
1394 LASSERT (offsetof (ptl_hdr_t, msg.reply.length) == 56);
1395 LASSERT (sizeof (((ptl_hdr_t *)0)->msg.reply.length) == 4);