Whamcloud - gitweb
merge b_devel into HEAD, which will become 0.7.3
[fs/lustre-release.git] / lnet / lnet / lib-move.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * lib/lib-move.c
5  * Data movement routines
6  *
7  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
8  *  Copyright (c) 2001-2002 Sandia National Laboratories
9  *
10  *   This file is part of Lustre, http://www.sf.net/projects/lustre/
11  *
12  *   Lustre is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Lustre is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Lustre; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #ifndef __KERNEL__
27 # include <stdio.h>
28 #else
29 # define DEBUG_SUBSYSTEM S_PORTALS
30 # include <linux/kp30.h>
31 #endif
32 #include <portals/p30.h>
33 #include <portals/lib-p30.h>
34 #include <portals/arg-blocks.h>
35
36 /*
37  * Right now it does not check access control lists.
38  *
39  * We only support one MD per ME, which is how the Portals 3.1 spec is written.
40  * All previous complication is removed.
41  */
42
43 static lib_me_t *
44 lib_find_me(nal_cb_t *nal, int index, int op_mask, ptl_nid_t src_nid,
45             ptl_pid_t src_pid, ptl_size_t rlength, ptl_size_t roffset,
46             ptl_match_bits_t match_bits, ptl_size_t *mlength_out,
47             ptl_size_t *offset_out, int *unlink_out)
48 {
49         lib_ni_t         *ni = &nal->ni;
50         struct list_head *match_list = &ni->tbl.tbl[index];
51         struct list_head *tmp;
52         lib_me_t         *me;
53         lib_md_t         *md;
54         ptl_size_t        mlength;
55         ptl_size_t        offset;
56
57         ENTRY;
58
59         CDEBUG (D_NET, "Request from "LPU64".%d of length %d into portal %d "
60                 "MB="LPX64"\n", src_nid, src_pid, rlength, index, match_bits);
61
62         if (index < 0 || index >= ni->tbl.size) {
63                 CERROR("Invalid portal %d not in [0-%d]\n",
64                        index, ni->tbl.size);
65                 goto failed;
66         }
67
68         list_for_each (tmp, match_list) {
69                 me = list_entry(tmp, lib_me_t, me_list);
70                 md = me->md;
71
72                  /* ME attached but MD not attached yet */
73                 if (md == NULL)
74                         continue;
75
76                 LASSERT (me == md->me);
77
78                 /* MD deactivated */
79                 if (md->threshold == 0)
80                         continue;
81
82                 /* mismatched MD op */
83                 if ((md->options & op_mask) == 0)
84                         continue;
85
86                 /* mismatched ME nid/pid? */
87                 if (me->match_id.nid != PTL_NID_ANY &&
88                     me->match_id.nid != src_nid)
89                         continue;
90
91                 if (me->match_id.pid != PTL_PID_ANY &&
92                     me->match_id.pid != src_pid)
93                         continue;
94
95                 /* mismatched ME matchbits? */
96                 if (((me->match_bits ^ match_bits) & ~me->ignore_bits) != 0)
97                         continue;
98
99                 /* Hurrah! This _is_ a match; check it out... */
100
101                 if ((md->options & PTL_MD_MANAGE_REMOTE) == 0)
102                         offset = md->offset;
103                 else
104                         offset = roffset;
105
106                 mlength = md->length - offset;
107                 if ((md->options & PTL_MD_MAX_SIZE) != 0 &&
108                     mlength > md->max_size)
109                         mlength = md->max_size;
110
111                 if (rlength <= mlength) {        /* fits in allowed space */
112                         mlength = rlength;
113                 } else if ((md->options & PTL_MD_TRUNCATE) == 0) {
114                         /* this packet _really_ is too big */
115                         CERROR("Matching packet %d too big: %d left, "
116                                "%d allowed\n", rlength, md->length - offset,
117                                mlength);
118                         goto failed;
119                 }
120
121                 md->offset = offset + mlength;
122
123                 *offset_out = offset;
124                 *mlength_out = mlength;
125                 *unlink_out = ((md->options & PTL_MD_AUTO_UNLINK) != 0 &&
126                                md->offset >= (md->length - md->max_size));
127                 RETURN (me);
128         }
129
130  failed:
131         CERROR (LPU64": Dropping %s from "LPU64".%d portal %d match "LPX64
132                 " offset %d length %d: no match\n",
133                 ni->nid, (op_mask == PTL_MD_OP_GET) ? "GET" : "PUT",
134                 src_nid, src_pid, index, match_bits, roffset, rlength);
135         RETURN(NULL);
136 }
137
138 int do_PtlFailNid (nal_cb_t *nal, void *private, void *v_args, void *v_ret)
139 {
140         PtlFailNid_in     *args = v_args;
141         PtlFailNid_out    *ret  = v_ret;
142         lib_test_peer_t   *tp;
143         unsigned long      flags;
144         struct list_head  *el;
145         struct list_head  *next;
146         struct list_head   cull;
147         
148         if (args->threshold != 0) {
149                 /* Adding a new entry */
150                 tp = (lib_test_peer_t *)nal->cb_malloc (nal, sizeof (*tp));
151                 if (tp == NULL)
152                         return (ret->rc = PTL_FAIL);
153                 
154                 tp->tp_nid = args->nid;
155                 tp->tp_threshold = args->threshold;
156                 
157                 state_lock (nal, &flags);
158                 list_add (&tp->tp_list, &nal->ni.ni_test_peers);
159                 state_unlock (nal, &flags);
160                 return (ret->rc = PTL_OK);
161         }
162         
163         /* removing entries */
164         INIT_LIST_HEAD (&cull);
165         
166         state_lock (nal, &flags);
167
168         list_for_each_safe (el, next, &nal->ni.ni_test_peers) {
169                 tp = list_entry (el, lib_test_peer_t, tp_list);
170                 
171                 if (tp->tp_threshold == 0 ||    /* needs culling anyway */
172                     args->nid == PTL_NID_ANY || /* removing all entries */
173                     tp->tp_nid == args->nid)    /* matched this one */
174                 {
175                         list_del (&tp->tp_list);
176                         list_add (&tp->tp_list, &cull);
177                 }
178         }
179         
180         state_unlock (nal, &flags);
181                 
182         while (!list_empty (&cull)) {
183                 tp = list_entry (cull.next, lib_test_peer_t, tp_list);
184
185                 list_del (&tp->tp_list);
186                 nal->cb_free (nal, tp, sizeof (*tp));
187         }
188         return (ret->rc = PTL_OK);
189 }
190
191 static int
192 fail_peer (nal_cb_t *nal, ptl_nid_t nid, int outgoing) 
193 {
194         lib_test_peer_t  *tp;
195         struct list_head *el;
196         struct list_head *next;
197         unsigned long     flags;
198         struct list_head  cull;
199         int               fail = 0;
200
201         INIT_LIST_HEAD (&cull);
202         
203         state_lock (nal, &flags);
204
205         list_for_each_safe (el, next, &nal->ni.ni_test_peers) {
206                 tp = list_entry (el, lib_test_peer_t, tp_list);
207
208                 if (tp->tp_threshold == 0) {
209                         /* zombie entry */
210                         if (outgoing) {
211                                 /* only cull zombies on outgoing tests,
212                                  * since we may be at interrupt priority on
213                                  * incoming messages. */
214                                 list_del (&tp->tp_list);
215                                 list_add (&tp->tp_list, &cull);
216                         }
217                         continue;
218                 }
219                         
220                 if (tp->tp_nid == PTL_NID_ANY || /* fail every peer */
221                     nid == tp->tp_nid) {        /* fail this peer */
222                         fail = 1;
223                         
224                         if (tp->tp_threshold != PTL_MD_THRESH_INF) {
225                                 tp->tp_threshold--;
226                                 if (outgoing &&
227                                     tp->tp_threshold == 0) {
228                                         /* see above */
229                                         list_del (&tp->tp_list);
230                                         list_add (&tp->tp_list, &cull);
231                                 }
232                         }
233                         break;
234                 }
235         }
236         
237         state_unlock (nal, &flags);
238
239         while (!list_empty (&cull)) {
240                 tp = list_entry (cull.next, lib_test_peer_t, tp_list);
241                 list_del (&tp->tp_list);
242                 
243                 nal->cb_free (nal, tp, sizeof (*tp));
244         }
245
246         return (fail);
247 }
248
249 ptl_size_t
250 lib_iov_nob (int niov, struct iovec *iov)
251 {
252         ptl_size_t nob = 0;
253         
254         while (niov-- > 0)
255                 nob += (iov++)->iov_len;
256         
257         return (nob);
258 }
259
260 void
261 lib_copy_iov2buf (char *dest, int niov, struct iovec *iov, ptl_size_t len)
262 {
263         ptl_size_t nob;
264
265         while (len > 0)
266         {
267                 LASSERT (niov > 0);
268                 nob = MIN (iov->iov_len, len);
269                 memcpy (dest, iov->iov_base, nob);
270
271                 len -= nob;
272                 dest += nob;
273                 niov--;
274                 iov++;
275         }
276 }
277
278 void
279 lib_copy_buf2iov (int niov, struct iovec *iov, char *src, ptl_size_t len)
280 {
281         ptl_size_t nob;
282
283         while (len > 0)
284         {
285                 LASSERT (niov > 0);
286                 nob = MIN (iov->iov_len, len);
287                 memcpy (iov->iov_base, src, nob);
288                 
289                 len -= nob;
290                 src += nob;
291                 niov--;
292                 iov++;
293         }
294 }
295
296 static int
297 lib_extract_iov (struct iovec *dst, lib_md_t *md,
298                  ptl_size_t offset, ptl_size_t len)
299 {
300         /* Initialise 'dst' to the subset of 'src' starting at 'offset',
301          * for exactly 'len' bytes, and return the number of entries.
302          * NB not destructive to 'src' */
303         int             src_niov = md->md_niov;  
304         struct iovec   *src = md->md_iov.iov;
305         ptl_size_t      frag_len;
306         int             dst_niov;
307
308         LASSERT (len >= 0);
309         LASSERT (offset >= 0);
310         LASSERT (offset + len <= md->length);
311         
312         if (len == 0)                           /* no data => */
313                 return (0);                     /* no frags */
314
315         LASSERT (src_niov > 0);
316         while (offset >= src->iov_len) {      /* skip initial frags */
317                 offset -= src->iov_len;
318                 src_niov--;
319                 src++;
320                 LASSERT (src_niov > 0);
321         }
322
323         dst_niov = 1;
324         for (;;) {
325                 LASSERT (src_niov > 0);
326                 LASSERT (dst_niov <= PTL_MD_MAX_IOV);
327                 
328                 frag_len = src->iov_len - offset;
329                 dst->iov_base = ((char *)src->iov_base) + offset;
330
331                 if (len <= frag_len) {
332                         dst->iov_len = len;
333                         return (dst_niov);
334                 }
335                 
336                 dst->iov_len = frag_len;
337
338                 len -= frag_len;
339                 dst++;
340                 src++;
341                 dst_niov++;
342                 src_niov--;
343                 offset = 0;
344         }
345 }
346
347 #ifndef __KERNEL__
348 ptl_size_t
349 lib_kiov_nob (int niov, ptl_kiov_t *kiov) 
350 {
351         LASSERT (0);
352         return (0);
353 }
354
355 void
356 lib_copy_kiov2buf (char *dest, int niov, ptl_kiov_t *kiov, ptl_size_t len)
357 {
358         LASSERT (0);
359 }
360
361 void
362 lib_copy_buf2kiov (int niov, ptl_kiov_t *kiov, char *dest, ptl_size_t len)
363 {
364         LASSERT (0);
365 }
366
367 static int
368 lib_extract_kiov (ptl_kiov_t *dst, lib_md_t *md,
369                   ptl_size_t offset, ptl_size_t len)
370 {
371         LASSERT (0);
372 }
373
374 #else
375
376 ptl_size_t
377 lib_kiov_nob (int niov, ptl_kiov_t *kiov) 
378 {
379         ptl_size_t  nob = 0;
380
381         while (niov-- > 0)
382                 nob += (kiov++)->kiov_len;
383
384         return (nob);
385 }
386
387 void
388 lib_copy_kiov2buf (char *dest, int niov, ptl_kiov_t *kiov, ptl_size_t len)
389 {
390         ptl_size_t  nob;
391         char       *addr;
392         
393         LASSERT (!in_interrupt ());
394         while (len > 0)
395         {
396                 LASSERT (niov > 0);
397                 nob = MIN (kiov->kiov_len, len);
398                 
399                 addr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
400                 memcpy (dest, addr, nob);
401                 kunmap (kiov->kiov_page);
402                 
403                 len -= nob;
404                 dest += nob;
405                 niov--;
406                 kiov++;
407         }
408 }
409
410 void
411 lib_copy_buf2kiov (int niov, ptl_kiov_t *kiov, char *src, ptl_size_t len)
412 {
413         ptl_size_t  nob;
414         char       *addr;
415
416         LASSERT (!in_interrupt ());
417         while (len > 0)
418         {
419                 LASSERT (niov > 0);
420                 nob = MIN (kiov->kiov_len, len);
421                 
422                 addr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
423                 memcpy (addr, src, nob);
424                 kunmap (kiov->kiov_page);
425                 
426                 len -= nob;
427                 src += nob;
428                 niov--;
429                 kiov++;
430         }
431 }
432
433 static int
434 lib_extract_kiov (ptl_kiov_t *dst, lib_md_t *md,
435                   ptl_size_t offset, ptl_size_t len)
436 {
437         /* Initialise 'dst' to the subset of 'src' starting at 'offset',
438          * for exactly 'len' bytes, and return the number of entries.
439          * NB not destructive to 'src' */
440         int             src_niov = md->md_niov;  
441         ptl_kiov_t     *src = md->md_iov.kiov;
442         ptl_size_t      frag_len;
443         int             dst_niov;
444
445         LASSERT (len >= 0);
446         LASSERT (offset >= 0);
447         LASSERT (offset + len <= md->length);
448         
449         if (len == 0)                           /* no data => */
450                 return (0);                     /* no frags */
451
452         LASSERT (src_niov > 0);
453         while (offset >= src->kiov_len) {      /* skip initial frags */
454                 offset -= src->kiov_len;
455                 src_niov--;
456                 src++;
457                 LASSERT (src_niov > 0);
458         }
459
460         dst_niov = 1;
461         for (;;) {
462                 LASSERT (src_niov > 0);
463                 LASSERT (dst_niov <= PTL_MD_MAX_IOV);
464                 
465                 frag_len = src->kiov_len - offset;
466                 dst->kiov_page = src->kiov_page;
467                 dst->kiov_offset = src->kiov_offset + offset;
468
469                 if (len <= frag_len) {
470                         dst->kiov_len = len;
471                         LASSERT (dst->kiov_offset + dst->kiov_len <= PAGE_SIZE);
472                         return (dst_niov);
473                 }
474
475                 dst->kiov_len = frag_len;
476                 LASSERT (dst->kiov_offset + dst->kiov_len <= PAGE_SIZE);
477
478                 len -= frag_len;
479                 dst++;
480                 src++;
481                 dst_niov++;
482                 src_niov--;
483                 offset = 0;
484         }
485 }
486 #endif
487
488 void
489 lib_recv (nal_cb_t *nal, void *private, lib_msg_t *msg, lib_md_t *md,
490           ptl_size_t offset, ptl_size_t mlen, ptl_size_t rlen)
491 {
492         int   niov;
493
494         if (mlen == 0)
495                 nal->cb_recv (nal, private, msg, 0, NULL, 0, rlen);
496         else if ((md->options & PTL_MD_KIOV) == 0) {
497                 niov = lib_extract_iov (msg->msg_iov.iov, md, offset, mlen);
498                 nal->cb_recv (nal, private, msg,
499                               niov, msg->msg_iov.iov, mlen, rlen);
500         } else {
501                 niov = lib_extract_kiov (msg->msg_iov.kiov, md, offset, mlen);
502                 nal->cb_recv_pages (nal, private, msg, 
503                                     niov, msg->msg_iov.kiov, mlen, rlen);
504         }
505 }
506
507 int
508 lib_send (nal_cb_t *nal, void *private, lib_msg_t *msg,
509           ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
510           lib_md_t *md, ptl_size_t offset, ptl_size_t len) 
511 {
512         int   niov;
513
514         if (len == 0)
515                 return (nal->cb_send (nal, private, msg, 
516                                       hdr, type, nid, pid,
517                                       0, NULL, 0));
518         
519         if ((md->options & PTL_MD_KIOV) == 0) {
520                 niov = lib_extract_iov (msg->msg_iov.iov, md, offset, len);
521                 return (nal->cb_send (nal, private, msg, 
522                                       hdr, type, nid, pid,
523                                       niov, msg->msg_iov.iov, len));
524         }
525
526         niov = lib_extract_kiov (msg->msg_iov.kiov, md, offset, len);
527         return (nal->cb_send_pages (nal, private, msg, 
528                                     hdr, type, nid, pid,
529                                     niov, msg->msg_iov.kiov, len));
530 }
531
532 static lib_msg_t *
533 get_new_msg (nal_cb_t *nal, lib_md_t *md)
534 {
535         /* ALWAYS called holding the state_lock */
536         lib_counters_t *counters = &nal->ni.counters;
537         lib_msg_t      *msg      = lib_msg_alloc (nal);
538
539         if (msg == NULL)
540                 return (NULL);
541
542         memset (msg, 0, sizeof (*msg));
543
544         msg->send_ack = 0;
545
546         msg->md = md;
547         do_gettimeofday(&msg->ev.arrival_time);
548         md->pending++;
549         if (md->threshold != PTL_MD_THRESH_INF) {
550                 LASSERT (md->threshold > 0);
551                 md->threshold--;
552         }
553
554         counters->msgs_alloc++;
555         if (counters->msgs_alloc > counters->msgs_max)
556                 counters->msgs_max = counters->msgs_alloc;
557
558         list_add (&msg->msg_list, &nal->ni.ni_active_msgs);
559
560         return (msg);
561 }
562
563
564 /*
565  * Incoming messages have a ptl_msg_t object associated with them
566  * by the library.  This object encapsulates the state of the
567  * message and allows the NAL to do non-blocking receives or sends
568  * of long messages.
569  *
570  */
571 static int parse_put(nal_cb_t * nal, ptl_hdr_t * hdr, void *private)
572 {
573         lib_ni_t        *ni = &nal->ni;
574         ptl_size_t       mlength = 0;
575         ptl_size_t       offset = 0;
576         int              unlink = 0;
577         lib_me_t        *me;
578         lib_md_t        *md;
579         lib_msg_t       *msg;
580         unsigned long    flags;
581
582         /* Convert put fields to host byte order */
583         hdr->msg.put.match_bits = NTOH__u64 (hdr->msg.put.match_bits);
584         hdr->msg.put.ptl_index = NTOH__u32 (hdr->msg.put.ptl_index);
585         hdr->msg.put.offset = NTOH__u32 (hdr->msg.put.offset);
586
587         state_lock(nal, &flags);
588
589         me = lib_find_me(nal, hdr->msg.put.ptl_index, PTL_MD_OP_PUT,
590                          hdr->src_nid, hdr->src_pid,
591                          PTL_HDR_LENGTH (hdr), hdr->msg.put.offset,
592                          hdr->msg.put.match_bits,
593                          &mlength, &offset, &unlink);
594         if (me == NULL)
595                 goto drop;
596
597         md = me->md;
598         CDEBUG(D_NET, "Incoming put index %x from "LPU64"/%u of length %d/%d "
599                "into md "LPX64" [%d] + %d\n", hdr->msg.put.ptl_index,
600                hdr->src_nid, hdr->src_pid, mlength, PTL_HDR_LENGTH(hdr), 
601                md->md_lh.lh_cookie, md->md_niov, offset);
602
603         msg = get_new_msg (nal, md);
604         if (msg == NULL) {
605                 CERROR(LPU64": Dropping PUT from "LPU64": can't allocate msg\n",
606                        ni->nid, hdr->src_nid);
607                 goto drop;
608         }
609
610         if (!ptl_is_wire_handle_none(&hdr->msg.put.ack_wmd) &&
611             !(md->options & PTL_MD_ACK_DISABLE)) {
612                 msg->send_ack = 1;
613                 msg->ack_wmd = hdr->msg.put.ack_wmd;
614                 msg->nid = hdr->src_nid;
615                 msg->pid = hdr->src_pid;
616                 msg->ev.match_bits = hdr->msg.put.match_bits;
617         }
618
619         if (md->eq) {
620                 msg->ev.type = PTL_EVENT_PUT;
621                 msg->ev.initiator.nid = hdr->src_nid;
622                 msg->ev.initiator.pid = hdr->src_pid;
623                 msg->ev.portal = hdr->msg.put.ptl_index;
624                 msg->ev.match_bits = hdr->msg.put.match_bits;
625                 msg->ev.rlength = PTL_HDR_LENGTH(hdr);
626                 msg->ev.mlength = mlength;
627                 msg->ev.offset = offset;
628                 msg->ev.hdr_data = hdr->msg.put.hdr_data;
629
630                 /* NB if this match has exhausted the MD, we can't be sure
631                  * that this event will the the last one associated with
632                  * this MD in the event queue (another message already
633                  * matching this ME/MD could end up being last).  So we
634                  * remember the ME handle anyway and check again when we're
635                  * allocating our slot in the event queue.
636                  */
637                 ptl_me2handle (&msg->ev.unlinked_me, me);
638
639                 lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
640         }
641
642         ni->counters.recv_count++;
643         ni->counters.recv_length += mlength;
644
645         /* only unlink after MD's pending count has been bumped
646          * in get_new_msg() otherwise lib_me_unlink() will nuke it */
647         if (unlink) {
648                 md->md_flags |= PTL_MD_FLAG_AUTO_UNLINKED;
649                 lib_me_unlink (nal, me);
650         }
651
652         state_unlock(nal, &flags);
653
654         lib_recv (nal, private, msg, md, offset, mlength, PTL_HDR_LENGTH (hdr));
655         return 0;
656
657  drop:
658         nal->ni.counters.drop_count++;
659         nal->ni.counters.drop_length += PTL_HDR_LENGTH(hdr);
660         state_unlock (nal, &flags);
661         lib_recv (nal, private, NULL, NULL, 0, 0, PTL_HDR_LENGTH (hdr));
662         return -1;
663 }
664
665 static int parse_get(nal_cb_t * nal, ptl_hdr_t * hdr, void *private)
666 {
667         lib_ni_t        *ni = &nal->ni;
668         ptl_size_t       mlength = 0;
669         ptl_size_t       offset = 0;
670         int              unlink = 0;
671         lib_me_t        *me;
672         lib_md_t        *md;
673         lib_msg_t       *msg;
674         ptl_hdr_t        reply;
675         unsigned long    flags;
676         int              rc;
677
678         /* Convert get fields to host byte order */
679         hdr->msg.get.match_bits = NTOH__u64 (hdr->msg.get.match_bits);
680         hdr->msg.get.ptl_index = NTOH__u32 (hdr->msg.get.ptl_index);
681         hdr->msg.get.sink_length = NTOH__u32 (hdr->msg.get.sink_length);
682         hdr->msg.get.src_offset = NTOH__u32 (hdr->msg.get.src_offset);
683
684         /* compatibility check until field is deleted */
685         if (hdr->msg.get.return_offset != 0)
686                 CERROR("Unexpected non-zero get.return_offset %x from "
687                        LPU64"\n", hdr->msg.get.return_offset, hdr->src_nid);
688
689         state_lock(nal, &flags);
690
691         me = lib_find_me(nal, hdr->msg.get.ptl_index, PTL_MD_OP_GET,
692                          hdr->src_nid, hdr->src_pid,
693                          hdr->msg.get.sink_length, hdr->msg.get.src_offset,
694                          hdr->msg.get.match_bits,
695                          &mlength, &offset, &unlink);
696         if (me == NULL)
697                 goto drop;
698
699         md = me->md;
700         CDEBUG(D_NET, "Incoming get index %d from "LPU64".%u of length %d/%d "
701                "from md "LPX64" [%d] + %d\n", hdr->msg.get.ptl_index,
702                hdr->src_nid, hdr->src_pid, mlength, PTL_HDR_LENGTH(hdr), 
703                md->md_lh.lh_cookie, md->md_niov, offset);
704
705         msg = get_new_msg (nal, md);
706         if (msg == NULL) {
707                 CERROR(LPU64": Dropping GET from "LPU64": can't allocate msg\n",
708                        ni->nid, hdr->src_nid);
709                 goto drop;
710         }
711
712         if (md->eq) {
713                 msg->ev.type = PTL_EVENT_GET;
714                 msg->ev.initiator.nid = hdr->src_nid;
715                 msg->ev.initiator.pid = hdr->src_pid;
716                 msg->ev.portal = hdr->msg.get.ptl_index;
717                 msg->ev.match_bits = hdr->msg.get.match_bits;
718                 msg->ev.rlength = PTL_HDR_LENGTH(hdr);
719                 msg->ev.mlength = mlength;
720                 msg->ev.offset = offset;
721                 msg->ev.hdr_data = 0;
722
723                 /* NB if this match has exhausted the MD, we can't be sure
724                  * that this event will the the last one associated with
725                  * this MD in the event queue (another message already
726                  * matching this ME/MD could end up being last).  So we
727                  * remember the ME handle anyway and check again when we're
728                  * allocating our slot in the event queue.
729                  */
730                 ptl_me2handle (&msg->ev.unlinked_me, me);
731
732                 lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
733         }
734
735         ni->counters.send_count++;
736         ni->counters.send_length += mlength;
737
738         /* only unlink after MD's refcount has been bumped
739          * in get_new_msg() otherwise lib_me_unlink() will nuke it */
740         if (unlink) {
741                 md->md_flags |= PTL_MD_FLAG_AUTO_UNLINKED;
742                 lib_me_unlink (nal, me);
743         }
744
745         state_unlock(nal, &flags);
746
747         memset (&reply, 0, sizeof (reply));
748         reply.type     = HTON__u32 (PTL_MSG_REPLY);
749         reply.dest_nid = HTON__u64 (hdr->src_nid);
750         reply.src_nid  = HTON__u64 (ni->nid);
751         reply.dest_pid = HTON__u32 (hdr->src_pid);
752         reply.src_pid  = HTON__u32 (ni->pid);
753         PTL_HDR_LENGTH(&reply) = HTON__u32 (mlength);
754
755         reply.msg.reply.dst_wmd = hdr->msg.get.return_wmd;
756
757         rc = lib_send (nal, private, msg, &reply, PTL_MSG_REPLY, 
758                        hdr->src_nid, hdr->src_pid, md, offset, mlength);
759         if (rc != 0) {
760                 CERROR(LPU64": Dropping GET from "LPU64": send REPLY failed\n",
761                        ni->nid, hdr->src_nid);
762                 state_lock (nal, &flags);
763                 goto drop;
764         }
765
766         /* Complete the incoming message */
767         lib_recv (nal, private, NULL, NULL, 0, 0, PTL_HDR_LENGTH (hdr));
768         return (rc);
769  drop:
770         ni->counters.drop_count++;
771         ni->counters.drop_length += hdr->msg.get.sink_length;
772         state_unlock(nal, &flags);
773         lib_recv (nal, private, NULL, NULL, 0, 0, PTL_HDR_LENGTH (hdr));
774         return -1;
775 }
776
777 static int parse_reply(nal_cb_t * nal, ptl_hdr_t * hdr, void *private)
778 {
779         lib_ni_t        *ni = &nal->ni;
780         lib_md_t        *md;
781         int              rlength;
782         int              length;
783         lib_msg_t       *msg;
784         unsigned long    flags;
785
786         /* compatibility check until field is deleted */
787         if (hdr->msg.reply.dst_offset != 0)
788                 CERROR("Unexpected non-zero reply.dst_offset %x from "LPU64"\n",
789                        hdr->msg.reply.dst_offset, hdr->src_nid);
790
791         state_lock(nal, &flags);
792
793         /* NB handles only looked up by creator (no flips) */
794         md = ptl_wire_handle2md(&hdr->msg.reply.dst_wmd, nal);
795         if (md == NULL || md->threshold == 0) {
796                 CERROR (LPU64": Dropping REPLY from "LPU64" for %s MD "LPX64"."LPX64"\n",
797                         ni->nid, hdr->src_nid,
798                         md == NULL ? "invalid" : "inactive",
799                         hdr->msg.reply.dst_wmd.wh_interface_cookie,
800                         hdr->msg.reply.dst_wmd.wh_object_cookie);
801                 goto drop;
802         }
803
804         LASSERT (md->offset == 0);
805
806         length = rlength = PTL_HDR_LENGTH(hdr);
807
808         if (length > md->length) {
809                 if ((md->options & PTL_MD_TRUNCATE) == 0) {
810                         CERROR (LPU64": Dropping REPLY from "LPU64
811                                 " length %d for MD "LPX64" would overflow (%d)\n",
812                                 ni->nid, hdr->src_nid, length,
813                                 hdr->msg.reply.dst_wmd.wh_object_cookie,
814                                 md->length);
815                         goto drop;
816                 }
817                 length = md->length;
818         }
819
820         CDEBUG(D_NET, "Reply from "LPU64" of length %d/%d into md "LPX64"\n",
821                hdr->src_nid, length, rlength, 
822                hdr->msg.reply.dst_wmd.wh_object_cookie);
823
824         msg = get_new_msg (nal, md);
825         if (msg == NULL) {
826                 CERROR(LPU64": Dropping REPLY from "LPU64": can't "
827                        "allocate msg\n", ni->nid, hdr->src_nid);
828                 goto drop;
829         }
830
831         if (md->eq) {
832                 msg->ev.type = PTL_EVENT_REPLY;
833                 msg->ev.initiator.nid = hdr->src_nid;
834                 msg->ev.initiator.pid = hdr->src_pid;
835                 msg->ev.rlength = rlength;
836                 msg->ev.mlength = length;
837                 msg->ev.offset = 0;
838
839                 lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
840         }
841
842         ni->counters.recv_count++;
843         ni->counters.recv_length += length;
844
845         state_unlock(nal, &flags);
846
847         lib_recv (nal, private, msg, md, 0, length, rlength);
848         return 0;
849
850  drop:
851         nal->ni.counters.drop_count++;
852         nal->ni.counters.drop_length += PTL_HDR_LENGTH(hdr);
853         state_unlock (nal, &flags);
854         lib_recv (nal, private, NULL, NULL, 0, 0, PTL_HDR_LENGTH (hdr));
855         return -1;
856 }
857
858 static int parse_ack(nal_cb_t * nal, ptl_hdr_t * hdr, void *private)
859 {
860         lib_ni_t *ni = &nal->ni;
861         lib_md_t *md;
862         lib_msg_t *msg = NULL;
863         unsigned long flags;
864
865         /* Convert ack fields to host byte order */
866         hdr->msg.ack.match_bits = NTOH__u64 (hdr->msg.ack.match_bits);
867         hdr->msg.ack.mlength = NTOH__u32 (hdr->msg.ack.mlength);
868
869         state_lock(nal, &flags);
870
871         /* NB handles only looked up by creator (no flips) */
872         md = ptl_wire_handle2md(&hdr->msg.ack.dst_wmd, nal);
873         if (md == NULL || md->threshold == 0) {
874                 CDEBUG(D_INFO, LPU64": Dropping ACK from "LPU64" to %s MD "
875                        LPX64"."LPX64"\n", ni->nid, hdr->src_nid, 
876                        (md == NULL) ? "invalid" : "inactive",
877                        hdr->msg.ack.dst_wmd.wh_interface_cookie,
878                        hdr->msg.ack.dst_wmd.wh_object_cookie);
879                 goto drop;
880         }
881
882         CDEBUG(D_NET, LPU64": ACK from "LPU64" into md "LPX64"\n",
883                ni->nid, hdr->src_nid, 
884                hdr->msg.ack.dst_wmd.wh_object_cookie);
885
886         msg = get_new_msg (nal, md);
887         if (msg == NULL) {
888                 CERROR(LPU64": Dropping ACK from "LPU64": can't allocate msg\n",
889                        ni->nid, hdr->src_nid);
890                 goto drop;
891         }
892
893         if (md->eq) {
894                 msg->ev.type = PTL_EVENT_ACK;
895                 msg->ev.initiator.nid = hdr->src_nid;
896                 msg->ev.initiator.pid = hdr->src_pid;
897                 msg->ev.mlength = hdr->msg.ack.mlength;
898                 msg->ev.match_bits = hdr->msg.ack.match_bits;
899
900                 lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
901         }
902
903         ni->counters.recv_count++;
904         state_unlock(nal, &flags);
905         lib_recv (nal, private, msg, NULL, 0, 0, PTL_HDR_LENGTH (hdr));
906         return 0;
907
908  drop:
909         nal->ni.counters.drop_count++;
910         state_unlock (nal, &flags);
911         lib_recv (nal, private, NULL, NULL, 0, 0, PTL_HDR_LENGTH (hdr));
912         return -1;
913 }
914
915 static char *
916 hdr_type_string (ptl_hdr_t *hdr)
917 {
918         switch (hdr->type) {
919         case PTL_MSG_ACK:
920                 return ("ACK");
921         case PTL_MSG_PUT:
922                 return ("PUT");
923         case PTL_MSG_GET:
924                 return ("GET");
925         case PTL_MSG_REPLY:
926                 return ("REPLY");
927         case PTL_MSG_HELLO:
928                 return ("HELLO");
929         default:
930                 return ("<UNKNOWN>");
931         }
932 }
933
934 void print_hdr(nal_cb_t * nal, ptl_hdr_t * hdr)
935 {
936         char *type_str = hdr_type_string (hdr);
937
938         nal->cb_printf(nal, "P3 Header at %p of type %s\n", hdr, type_str);
939         nal->cb_printf(nal, "    From nid/pid %Lu/%Lu", hdr->src_nid,
940                        hdr->src_pid);
941         nal->cb_printf(nal, "    To nid/pid %Lu/%Lu\n", hdr->dest_nid,
942                        hdr->dest_pid);
943
944         switch (hdr->type) {
945         default:
946                 break;
947
948         case PTL_MSG_PUT:
949                 nal->cb_printf(nal,
950                                "    Ptl index %d, ack md "LPX64"."LPX64", "
951                                "match bits "LPX64"\n",
952                                hdr->msg.put.ptl_index,
953                                hdr->msg.put.ack_wmd.wh_interface_cookie,
954                                hdr->msg.put.ack_wmd.wh_object_cookie,
955                                hdr->msg.put.match_bits);
956                 nal->cb_printf(nal,
957                                "    Length %d, offset %d, hdr data "LPX64"\n",
958                                PTL_HDR_LENGTH(hdr), hdr->msg.put.offset,
959                                hdr->msg.put.hdr_data);
960                 break;
961
962         case PTL_MSG_GET:
963                 nal->cb_printf(nal,
964                                "    Ptl index %d, return md "LPX64"."LPX64", "
965                                "match bits "LPX64"\n", hdr->msg.get.ptl_index,
966                                hdr->msg.get.return_wmd.wh_interface_cookie,
967                                hdr->msg.get.return_wmd.wh_object_cookie,
968                                hdr->msg.get.match_bits);
969                 nal->cb_printf(nal,
970                                "    Length %d, src offset %d\n",
971                                hdr->msg.get.sink_length,
972                                hdr->msg.get.src_offset);
973                 break;
974
975         case PTL_MSG_ACK:
976                 nal->cb_printf(nal, "    dst md "LPX64"."LPX64", "
977                                "manipulated length %d\n",
978                                hdr->msg.ack.dst_wmd.wh_interface_cookie,
979                                hdr->msg.ack.dst_wmd.wh_object_cookie,
980                                hdr->msg.ack.mlength);
981                 break;
982
983         case PTL_MSG_REPLY:
984                 nal->cb_printf(nal, "    dst md "LPX64"."LPX64", "
985                                "length %d\n",
986                                hdr->msg.reply.dst_wmd.wh_interface_cookie,
987                                hdr->msg.reply.dst_wmd.wh_object_cookie,
988                                PTL_HDR_LENGTH(hdr));
989         }
990
991 }                               /* end of print_hdr() */
992
993
994 int lib_parse(nal_cb_t * nal, ptl_hdr_t * hdr, void *private)
995 {
996         unsigned long  flags;
997
998         /* NB static check; optimizer will elide this if it's right */
999         LASSERT (offsetof (ptl_hdr_t, msg.ack.length) ==
1000                  offsetof (ptl_hdr_t, msg.put.length));
1001         LASSERT (offsetof (ptl_hdr_t, msg.ack.length) ==
1002                  offsetof (ptl_hdr_t, msg.get.length));
1003         LASSERT (offsetof (ptl_hdr_t, msg.ack.length) ==
1004                  offsetof (ptl_hdr_t, msg.reply.length));
1005
1006         /* convert common fields to host byte order */
1007         hdr->dest_nid = NTOH__u64 (hdr->dest_nid);
1008         hdr->src_nid = NTOH__u64 (hdr->src_nid);
1009         hdr->dest_pid = NTOH__u32 (hdr->dest_pid);
1010         hdr->src_pid = NTOH__u32 (hdr->src_pid);
1011         hdr->type = NTOH__u32 (hdr->type);
1012         PTL_HDR_LENGTH(hdr) = NTOH__u32 (PTL_HDR_LENGTH(hdr));
1013 #if 0
1014         nal->cb_printf(nal, "%d: lib_parse: nal=%p hdr=%p type=%d\n",
1015                        nal->ni.nid, nal, hdr, hdr->type);
1016         print_hdr(nal, hdr);
1017 #endif
1018         if (hdr->type == PTL_MSG_HELLO) {
1019                 /* dest_nid is really ptl_magicversion_t */
1020                 ptl_magicversion_t *mv = (ptl_magicversion_t *)&hdr->dest_nid;
1021
1022                 CERROR (LPU64": Dropping unexpected HELLO message: "
1023                         "magic %d, version %d.%d from "LPD64"\n",
1024                         nal->ni.nid, mv->magic, 
1025                         mv->version_major, mv->version_minor,
1026                         hdr->src_nid);
1027                 lib_recv (nal, private, NULL, NULL, 0, 0, PTL_HDR_LENGTH (hdr));
1028                 return (-1);
1029         }
1030         
1031         if (hdr->dest_nid != nal->ni.nid) {
1032                 CERROR(LPU64": Dropping %s message from "LPU64" to "LPU64
1033                        " (not me)\n", nal->ni.nid, hdr_type_string (hdr),
1034                        hdr->src_nid, hdr->dest_nid);
1035
1036                 state_lock (nal, &flags);
1037                 nal->ni.counters.drop_count++;
1038                 nal->ni.counters.drop_length += PTL_HDR_LENGTH(hdr);
1039                 state_unlock (nal, &flags);
1040
1041                 lib_recv (nal, private, NULL, NULL, 0, 0, PTL_HDR_LENGTH (hdr));
1042                 return (-1);
1043         }
1044
1045         if (!list_empty (&nal->ni.ni_test_peers) && /* normally we don't */
1046             fail_peer (nal, hdr->src_nid, 0))      /* shall we now? */
1047         {
1048                 CERROR(LPU64": Dropping incoming %s from "LPU64
1049                        ": simulated failure\n",
1050                        nal->ni.nid, hdr_type_string (hdr), 
1051                        hdr->src_nid);
1052                 return (-1);
1053         }
1054         
1055         switch (hdr->type) {
1056         case PTL_MSG_ACK:
1057                 return (parse_ack(nal, hdr, private));
1058         case PTL_MSG_PUT:
1059                 return (parse_put(nal, hdr, private));
1060                 break;
1061         case PTL_MSG_GET:
1062                 return (parse_get(nal, hdr, private));
1063                 break;
1064         case PTL_MSG_REPLY:
1065                 return (parse_reply(nal, hdr, private));
1066                 break;
1067         default:
1068                 CERROR(LPU64": Dropping <unknown> message from "LPU64
1069                        ": Bad type=0x%x\n",  nal->ni.nid, hdr->src_nid,
1070                        hdr->type);
1071
1072                 lib_recv (nal, private, NULL, NULL, 0, 0, PTL_HDR_LENGTH (hdr));
1073                 return (-1);
1074         }
1075 }
1076
1077
1078 int do_PtlPut(nal_cb_t * nal, void *private, void *v_args, void *v_ret)
1079 {
1080         /*
1081          * Incoming:
1082          *      ptl_handle_md_t md_in
1083          *      ptl_ack_req_t ack_req_in
1084          *      ptl_process_id_t target_in
1085          *      ptl_pt_index_t portal_in
1086          *      ptl_ac_index_t cookie_in
1087          *      ptl_match_bits_t match_bits_in
1088          *      ptl_size_t offset_in
1089          *
1090          * Outgoing:
1091          */
1092
1093         PtlPut_in *args = v_args;
1094         PtlPut_out *ret = v_ret;
1095         ptl_hdr_t hdr;
1096
1097         lib_ni_t *ni = &nal->ni;
1098         lib_md_t *md;
1099         lib_msg_t *msg = NULL;
1100         ptl_process_id_t *id = &args->target_in;
1101         unsigned long flags;
1102
1103         if (!list_empty (&nal->ni.ni_test_peers) && /* normally we don't */
1104             fail_peer (nal, id->nid, 1))           /* shall we now? */
1105         {
1106                 CERROR(LPU64": Dropping PUT to "LPU64": simulated failure\n",
1107                        nal->ni.nid, id->nid);
1108                 return (ret->rc = PTL_INV_PROC);
1109         }
1110         
1111         ret->rc = PTL_OK;
1112         state_lock(nal, &flags);
1113         md = ptl_handle2md(&args->md_in, nal);
1114         if (md == NULL || !md->threshold) {
1115                 state_unlock(nal, &flags);
1116                 return ret->rc = PTL_INV_MD;
1117         }
1118
1119         CDEBUG(D_NET, "PtlPut -> %Lu: %lu\n", (unsigned long long)id->nid,
1120                (unsigned long)id->pid);
1121
1122         memset (&hdr, 0, sizeof (hdr));
1123         hdr.type     = HTON__u32 (PTL_MSG_PUT);
1124         hdr.dest_nid = HTON__u64 (id->nid);
1125         hdr.src_nid  = HTON__u64 (ni->nid);
1126         hdr.dest_pid = HTON__u32 (id->pid);
1127         hdr.src_pid  = HTON__u32 (ni->pid);
1128         PTL_HDR_LENGTH(&hdr) = HTON__u32 (md->length);
1129
1130         /* NB handles only looked up by creator (no flips) */
1131         if (args->ack_req_in == PTL_ACK_REQ) {
1132                 hdr.msg.put.ack_wmd.wh_interface_cookie = ni->ni_interface_cookie;
1133                 hdr.msg.put.ack_wmd.wh_object_cookie = md->md_lh.lh_cookie;
1134         } else {
1135                 hdr.msg.put.ack_wmd = PTL_WIRE_HANDLE_NONE;
1136         }
1137
1138         hdr.msg.put.match_bits = HTON__u64 (args->match_bits_in);
1139         hdr.msg.put.ptl_index = HTON__u32 (args->portal_in);
1140         hdr.msg.put.offset = HTON__u32 (args->offset_in);
1141         hdr.msg.put.hdr_data = args->hdr_data_in;
1142
1143         ni->counters.send_count++;
1144         ni->counters.send_length += md->length;
1145
1146         msg = get_new_msg (nal, md);
1147         if (msg == NULL) {
1148                 CERROR("BAD: could not allocate msg!\n");
1149                 state_unlock(nal, &flags);
1150                 return ret->rc = PTL_NOSPACE;
1151         }
1152
1153         /*
1154          * If this memory descriptor has an event queue associated with
1155          * it we need to allocate a message state object and record the
1156          * information about this operation that will be recorded into
1157          * event queue once the message has been completed.
1158          *
1159          * NB. We're now committed to the GET, since we just marked the MD
1160          * busy.  Callers who observe this (by getting PTL_MD_INUSE from
1161          * PtlMDUnlink()) expect a completion event to tell them when the
1162          * MD becomes idle. 
1163          */
1164         if (md->eq) {
1165                 msg->ev.type = PTL_EVENT_SENT;
1166                 msg->ev.initiator.nid = ni->nid;
1167                 msg->ev.initiator.pid = ni->pid;
1168                 msg->ev.portal = args->portal_in;
1169                 msg->ev.match_bits = args->match_bits_in;
1170                 msg->ev.rlength = md->length;
1171                 msg->ev.mlength = md->length;
1172                 msg->ev.offset = args->offset_in;
1173                 msg->ev.hdr_data = args->hdr_data_in;
1174
1175                 lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
1176         }
1177
1178         state_unlock(nal, &flags);
1179         
1180         lib_send (nal, private, msg, &hdr, PTL_MSG_PUT,
1181                   id->nid, id->pid, md, 0, md->length);
1182
1183         return ret->rc = PTL_OK;
1184 }
1185
1186
1187 int do_PtlGet(nal_cb_t * nal, void *private, void *v_args, void *v_ret)
1188 {
1189         /*
1190          * Incoming:
1191          *      ptl_handle_md_t md_in
1192          *      ptl_process_id_t target_in
1193          *      ptl_pt_index_t portal_in
1194          *      ptl_ac_index_t cookie_in
1195          *      ptl_match_bits_t match_bits_in
1196          *      ptl_size_t offset_in
1197          *
1198          * Outgoing:
1199          */
1200
1201         PtlGet_in *args = v_args;
1202         PtlGet_out *ret = v_ret;
1203         ptl_hdr_t hdr;
1204         lib_msg_t *msg = NULL;
1205         lib_ni_t *ni = &nal->ni;
1206         ptl_process_id_t *id = &args->target_in;
1207         lib_md_t *md;
1208         unsigned long flags;
1209
1210         if (!list_empty (&nal->ni.ni_test_peers) && /* normally we don't */
1211             fail_peer (nal, id->nid, 1))           /* shall we now? */
1212         {
1213                 CERROR(LPU64": Dropping PUT to "LPU64": simulated failure\n",
1214                        nal->ni.nid, id->nid);
1215                 return (ret->rc = PTL_INV_PROC);
1216         }
1217         
1218         state_lock(nal, &flags);
1219         md = ptl_handle2md(&args->md_in, nal);
1220         if (md == NULL || !md->threshold) {
1221                 state_unlock(nal, &flags);
1222                 return ret->rc = PTL_INV_MD;
1223         }
1224
1225         LASSERT (md->offset == 0);
1226
1227         CDEBUG(D_NET, "PtlGet -> %Lu: %lu\n", (unsigned long long)id->nid,
1228                (unsigned long)id->pid);
1229
1230         memset (&hdr, 0, sizeof (hdr));
1231         hdr.type     = HTON__u32 (PTL_MSG_GET);
1232         hdr.dest_nid = HTON__u64 (id->nid);
1233         hdr.src_nid  = HTON__u64 (ni->nid);
1234         hdr.dest_pid = HTON__u32 (id->pid);
1235         hdr.src_pid  = HTON__u32 (ni->pid);
1236         PTL_HDR_LENGTH(&hdr) = 0;
1237
1238         /* NB handles only looked up by creator (no flips) */
1239         hdr.msg.get.return_wmd.wh_interface_cookie = ni->ni_interface_cookie;
1240         hdr.msg.get.return_wmd.wh_object_cookie = md->md_lh.lh_cookie;
1241
1242         hdr.msg.get.match_bits = HTON__u64 (args->match_bits_in);
1243         hdr.msg.get.ptl_index = HTON__u32 (args->portal_in);
1244         hdr.msg.get.src_offset = HTON__u32 (args->offset_in);
1245         hdr.msg.get.sink_length = HTON__u32 (md->length);
1246
1247         ni->counters.send_count++;
1248
1249         msg = get_new_msg (nal, md);
1250         if (msg == NULL) {
1251                 CERROR("do_PtlGet: BAD - could not allocate cookie!\n");
1252                 state_unlock(nal, &flags);
1253                 return ret->rc = PTL_NOSPACE;
1254         }
1255
1256         /*
1257          * If this memory descriptor has an event queue associated with
1258          * it we must allocate a message state object that will record
1259          * the information to be filled in once the message has been
1260          * completed.  More information is in the do_PtlPut() comments.
1261          *
1262          * NB. We're now committed to the GET, since we just marked the MD
1263          * busy.  Callers who observe this (by getting PTL_MD_INUSE from
1264          * PtlMDUnlink()) expect a completion event to tell them when the
1265          * MD becomes idle. 
1266          */
1267         if (md->eq) {
1268                 msg->ev.type = PTL_EVENT_SENT;
1269                 msg->ev.initiator.nid = ni->nid;
1270                 msg->ev.initiator.pid = ni->pid;
1271                 msg->ev.portal = args->portal_in;
1272                 msg->ev.match_bits = args->match_bits_in;
1273                 msg->ev.rlength = md->length;
1274                 msg->ev.mlength = md->length;
1275                 msg->ev.offset = args->offset_in;
1276                 msg->ev.hdr_data = 0;
1277
1278                 lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
1279         }
1280
1281         state_unlock(nal, &flags);
1282
1283         lib_send (nal, private, msg, &hdr, PTL_MSG_GET,
1284                   id->nid, id->pid, NULL, 0, 0);
1285
1286         return ret->rc = PTL_OK;
1287 }
1288
1289 void lib_assert_wire_constants (void)
1290 {
1291         /* Wire protocol assertions generated by 'wirecheck' */
1292
1293         /* Constants... */
1294         LASSERT (PORTALS_PROTO_MAGIC == 0xeebc0ded);
1295         LASSERT (PORTALS_PROTO_VERSION_MAJOR == 0);
1296         LASSERT (PORTALS_PROTO_VERSION_MINOR == 1);
1297         LASSERT (PTL_MSG_ACK == 0);
1298         LASSERT (PTL_MSG_PUT == 1);
1299         LASSERT (PTL_MSG_GET == 2);
1300         LASSERT (PTL_MSG_REPLY == 3);
1301         LASSERT (PTL_MSG_HELLO == 4);
1302
1303         /* Checks for struct ptl_handle_wire_t */
1304         LASSERT (sizeof (ptl_handle_wire_t) == 16);
1305         LASSERT (offsetof (ptl_handle_wire_t, wh_interface_cookie) == 0);
1306         LASSERT (sizeof (((ptl_handle_wire_t *)0)->wh_interface_cookie) == 8);
1307         LASSERT (offsetof (ptl_handle_wire_t, wh_object_cookie) == 8);
1308         LASSERT (sizeof (((ptl_handle_wire_t *)0)->wh_object_cookie) == 8);
1309
1310         /* Checks for struct ptl_magicversion_t */
1311         LASSERT (sizeof (ptl_magicversion_t) == 8);
1312         LASSERT (offsetof (ptl_magicversion_t, magic) == 0);
1313         LASSERT (sizeof (((ptl_magicversion_t *)0)->magic) == 4);
1314         LASSERT (offsetof (ptl_magicversion_t, version_major) == 4);
1315         LASSERT (sizeof (((ptl_magicversion_t *)0)->version_major) == 2);
1316         LASSERT (offsetof (ptl_magicversion_t, version_minor) == 6);
1317         LASSERT (sizeof (((ptl_magicversion_t *)0)->version_minor) == 2);
1318
1319         /* Checks for struct ptl_hdr_t */
1320         LASSERT (sizeof (ptl_hdr_t) == 72);
1321         LASSERT (offsetof (ptl_hdr_t, dest_nid) == 0);
1322         LASSERT (sizeof (((ptl_hdr_t *)0)->dest_nid) == 8);
1323         LASSERT (offsetof (ptl_hdr_t, src_nid) == 8);
1324         LASSERT (sizeof (((ptl_hdr_t *)0)->src_nid) == 8);
1325         LASSERT (offsetof (ptl_hdr_t, dest_pid) == 16);
1326         LASSERT (sizeof (((ptl_hdr_t *)0)->dest_pid) == 4);
1327         LASSERT (offsetof (ptl_hdr_t, src_pid) == 20);
1328         LASSERT (sizeof (((ptl_hdr_t *)0)->src_pid) == 4);
1329         LASSERT (offsetof (ptl_hdr_t, type) == 24);
1330         LASSERT (sizeof (((ptl_hdr_t *)0)->type) == 4);
1331
1332         /* Ack */
1333         LASSERT (offsetof (ptl_hdr_t, msg.ack.mlength) == 28);
1334         LASSERT (sizeof (((ptl_hdr_t *)0)->msg.ack.mlength) == 4);
1335         LASSERT (offsetof (ptl_hdr_t, msg.ack.dst_wmd) == 32);
1336         LASSERT (sizeof (((ptl_hdr_t *)0)->msg.ack.dst_wmd) == 16);
1337         LASSERT (offsetof (ptl_hdr_t, msg.ack.match_bits) == 48);
1338         LASSERT (sizeof (((ptl_hdr_t *)0)->msg.ack.match_bits) == 8);
1339         LASSERT (offsetof (ptl_hdr_t, msg.ack.length) == 56);
1340         LASSERT (sizeof (((ptl_hdr_t *)0)->msg.ack.length) == 4);
1341
1342         /* Put */
1343         LASSERT (offsetof (ptl_hdr_t, msg.put.ptl_index) == 28);
1344         LASSERT (sizeof (((ptl_hdr_t *)0)->msg.put.ptl_index) == 4);
1345         LASSERT (offsetof (ptl_hdr_t, msg.put.ack_wmd) == 32);
1346         LASSERT (sizeof (((ptl_hdr_t *)0)->msg.put.ack_wmd) == 16);
1347         LASSERT (offsetof (ptl_hdr_t, msg.put.match_bits) == 48);
1348         LASSERT (sizeof (((ptl_hdr_t *)0)->msg.put.match_bits) == 8);
1349         LASSERT (offsetof (ptl_hdr_t, msg.put.length) == 56);
1350         LASSERT (sizeof (((ptl_hdr_t *)0)->msg.put.length) == 4);
1351         LASSERT (offsetof (ptl_hdr_t, msg.put.offset) == 60);
1352         LASSERT (sizeof (((ptl_hdr_t *)0)->msg.put.offset) == 4);
1353         LASSERT (offsetof (ptl_hdr_t, msg.put.hdr_data) == 64);
1354         LASSERT (sizeof (((ptl_hdr_t *)0)->msg.put.hdr_data) == 8);
1355
1356         /* Get */
1357         LASSERT (offsetof (ptl_hdr_t, msg.get.ptl_index) == 28);
1358         LASSERT (sizeof (((ptl_hdr_t *)0)->msg.get.ptl_index) == 4);
1359         LASSERT (offsetof (ptl_hdr_t, msg.get.return_wmd) == 32);
1360         LASSERT (sizeof (((ptl_hdr_t *)0)->msg.get.return_wmd) == 16);
1361         LASSERT (offsetof (ptl_hdr_t, msg.get.match_bits) == 48);
1362         LASSERT (sizeof (((ptl_hdr_t *)0)->msg.get.match_bits) == 8);
1363         LASSERT (offsetof (ptl_hdr_t, msg.get.length) == 56);
1364         LASSERT (sizeof (((ptl_hdr_t *)0)->msg.get.length) == 4);
1365         LASSERT (offsetof (ptl_hdr_t, msg.get.src_offset) == 60);
1366         LASSERT (sizeof (((ptl_hdr_t *)0)->msg.get.src_offset) == 4);
1367         LASSERT (offsetof (ptl_hdr_t, msg.get.return_offset) == 64);
1368         LASSERT (sizeof (((ptl_hdr_t *)0)->msg.get.return_offset) == 4);
1369         LASSERT (offsetof (ptl_hdr_t, msg.get.sink_length) == 68);
1370         LASSERT (sizeof (((ptl_hdr_t *)0)->msg.get.sink_length) == 4);
1371
1372         /* Reply */
1373         LASSERT (offsetof (ptl_hdr_t, msg.reply.dst_wmd) == 32);
1374         LASSERT (sizeof (((ptl_hdr_t *)0)->msg.reply.dst_wmd) == 16);
1375         LASSERT (offsetof (ptl_hdr_t, msg.reply.dst_offset) == 48);
1376         LASSERT (sizeof (((ptl_hdr_t *)0)->msg.reply.dst_offset) == 4);
1377         LASSERT (offsetof (ptl_hdr_t, msg.reply.length) == 56);
1378         LASSERT (sizeof (((ptl_hdr_t *)0)->msg.reply.length) == 4);
1379 }