Whamcloud - gitweb
merge b_multinet into HEAD
[fs/lustre-release.git] / lnet / lnet / lib-move.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * lib/lib-move.c
5  * Data movement routines
6  *
7  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
8  *  Copyright (c) 2001-2002 Sandia National Laboratories
9  *
10  *   This file is part of Lustre, http://www.sf.net/projects/lustre/
11  *
12  *   Lustre is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Lustre is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Lustre; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #ifndef __KERNEL__
27 # include <stdio.h>
28 #else
29 # define DEBUG_SUBSYSTEM S_PORTALS
30 # include <linux/kp30.h>
31 #endif
32 #include <portals/p30.h>
33 #include <portals/lib-p30.h>
34 #include <portals/arg-blocks.h>
35
36 /*
37  * Right now it does not check access control lists.
38  *
39  * We only support one MD per ME, which is how the Portals 3.1 spec is written.
40  * All previous complication is removed.
41  */
42
43 static lib_me_t *
44 lib_find_me(nal_cb_t *nal, int index, int op_mask, ptl_nid_t src_nid,
45             ptl_pid_t src_pid, ptl_size_t rlength, ptl_size_t roffset,
46             ptl_match_bits_t match_bits, ptl_size_t *mlength_out,
47             ptl_size_t *offset_out, int *unlink_out)
48 {
49         lib_ni_t         *ni = &nal->ni;
50         struct list_head *match_list = &ni->tbl.tbl[index];
51         struct list_head *tmp;
52         lib_me_t         *me;
53         lib_md_t         *md;
54         ptl_size_t        mlength;
55         ptl_size_t        offset;
56
57         ENTRY;
58
59         CDEBUG (D_NET, "Request from "LPU64".%d of length %d into portal %d "
60                 "MB="LPX64"\n", src_nid, src_pid, rlength, index, match_bits);
61
62         if (index < 0 || index >= ni->tbl.size) {
63                 CERROR("Invalid portal %d not in [0-%d]\n",
64                        index, ni->tbl.size);
65                 goto failed;
66         }
67
68         list_for_each (tmp, match_list) {
69                 me = list_entry(tmp, lib_me_t, me_list);
70                 md = me->md;
71
72                  /* ME attached but MD not attached yet */
73                 if (md == NULL)
74                         continue;
75
76                 LASSERT (me == md->me);
77
78                 /* MD deactivated */
79                 if (md->threshold == 0)
80                         continue;
81
82                 /* mismatched MD op */
83                 if ((md->options & op_mask) == 0)
84                         continue;
85
86                 /* mismatched ME nid/pid? */
87                 if (me->match_id.nid != PTL_NID_ANY &&
88                     me->match_id.nid != src_nid)
89                         continue;
90
91                 if (me->match_id.pid != PTL_PID_ANY &&
92                     me->match_id.pid != src_pid)
93                         continue;
94
95                 /* mismatched ME matchbits? */
96                 if (((me->match_bits ^ match_bits) & ~me->ignore_bits) != 0)
97                         continue;
98
99                 /* Hurrah! This _is_ a match; check it out... */
100
101                 if ((md->options & PTL_MD_MANAGE_REMOTE) == 0)
102                         offset = md->offset;
103                 else
104                         offset = roffset;
105
106                 mlength = md->length - offset;
107                 if ((md->options & PTL_MD_MAX_SIZE) != 0 &&
108                     mlength > md->max_size)
109                         mlength = md->max_size;
110
111                 if (rlength <= mlength) {        /* fits in allowed space */
112                         mlength = rlength;
113                 } else if ((md->options & PTL_MD_TRUNCATE) == 0) {
114                         /* this packet _really_ is too big */
115                         CERROR("Matching packet %d too big: %d left, "
116                                "%d allowed\n", rlength, md->length - offset,
117                                mlength);
118                         goto failed;
119                 }
120
121                 md->offset = offset + mlength;
122
123                 *offset_out = offset;
124                 *mlength_out = mlength;
125                 *unlink_out = ((md->options & PTL_MD_AUTO_UNLINK) != 0 &&
126                                md->offset >= (md->length - md->max_size));
127                 RETURN (me);
128         }
129
130  failed:
131         CERROR (LPU64": Dropping %s from "LPU64".%d portal %d match "LPX64
132                 " offset %d length %d: no match\n",
133                 ni->nid, (op_mask == PTL_MD_OP_GET) ? "GET" : "PUT",
134                 src_nid, src_pid, index, match_bits, roffset, rlength);
135         RETURN(NULL);
136 }
137
138 int do_PtlFailNid (nal_cb_t *nal, void *private, void *v_args, void *v_ret)
139 {
140         PtlFailNid_in     *args = v_args;
141         PtlFailNid_out    *ret  = v_ret;
142         lib_test_peer_t   *tp;
143         unsigned long      flags;
144         struct list_head  *el;
145         struct list_head  *next;
146         struct list_head   cull;
147         
148         if (args->threshold != 0) {
149                 /* Adding a new entry */
150                 tp = (lib_test_peer_t *)nal->cb_malloc (nal, sizeof (*tp));
151                 if (tp == NULL)
152                         return (ret->rc = PTL_FAIL);
153                 
154                 tp->tp_nid = args->nid;
155                 tp->tp_threshold = args->threshold;
156                 
157                 state_lock (nal, &flags);
158                 list_add (&tp->tp_list, &nal->ni.ni_test_peers);
159                 state_unlock (nal, &flags);
160                 return (ret->rc = PTL_OK);
161         }
162         
163         /* removing entries */
164         INIT_LIST_HEAD (&cull);
165         
166         state_lock (nal, &flags);
167
168         list_for_each_safe (el, next, &nal->ni.ni_test_peers) {
169                 tp = list_entry (el, lib_test_peer_t, tp_list);
170                 
171                 if (tp->tp_threshold == 0 ||    /* needs culling anyway */
172                     args->nid == PTL_NID_ANY || /* removing all entries */
173                     tp->tp_nid == args->nid)    /* matched this one */
174                 {
175                         list_del (&tp->tp_list);
176                         list_add (&tp->tp_list, &cull);
177                 }
178         }
179         
180         state_unlock (nal, &flags);
181                 
182         while (!list_empty (&cull)) {
183                 tp = list_entry (cull.next, lib_test_peer_t, tp_list);
184
185                 list_del (&tp->tp_list);
186                 nal->cb_free (nal, tp, sizeof (*tp));
187         }
188         return (ret->rc = PTL_OK);
189 }
190
191 static int
192 fail_peer (nal_cb_t *nal, ptl_nid_t nid, int outgoing) 
193 {
194         lib_test_peer_t  *tp;
195         struct list_head *el;
196         struct list_head *next;
197         unsigned long     flags;
198         struct list_head  cull;
199         int               fail = 0;
200
201         INIT_LIST_HEAD (&cull);
202         
203         state_lock (nal, &flags);
204
205         list_for_each_safe (el, next, &nal->ni.ni_test_peers) {
206                 tp = list_entry (el, lib_test_peer_t, tp_list);
207
208                 if (tp->tp_threshold == 0) {
209                         /* zombie entry */
210                         if (outgoing) {
211                                 /* only cull zombies on outgoing tests,
212                                  * since we may be at interrupt priority on
213                                  * incoming messages. */
214                                 list_del (&tp->tp_list);
215                                 list_add (&tp->tp_list, &cull);
216                         }
217                         continue;
218                 }
219                         
220                 if (tp->tp_nid == PTL_NID_ANY || /* fail every peer */
221                     nid == tp->tp_nid) {        /* fail this peer */
222                         fail = 1;
223                         
224                         if (tp->tp_threshold != PTL_MD_THRESH_INF) {
225                                 tp->tp_threshold--;
226                                 if (outgoing &&
227                                     tp->tp_threshold == 0) {
228                                         /* see above */
229                                         list_del (&tp->tp_list);
230                                         list_add (&tp->tp_list, &cull);
231                                 }
232                         }
233                         break;
234                 }
235         }
236         
237         state_unlock (nal, &flags);
238
239         while (!list_empty (&cull)) {
240                 tp = list_entry (cull.next, lib_test_peer_t, tp_list);
241                 list_del (&tp->tp_list);
242                 
243                 nal->cb_free (nal, tp, sizeof (*tp));
244         }
245
246         return (fail);
247 }
248
249 ptl_size_t
250 lib_iov_nob (int niov, struct iovec *iov)
251 {
252         ptl_size_t nob = 0;
253         
254         while (niov-- > 0)
255                 nob += (iov++)->iov_len;
256         
257         return (nob);
258 }
259
260 void
261 lib_copy_iov2buf (char *dest, int niov, struct iovec *iov, ptl_size_t len)
262 {
263         ptl_size_t nob;
264
265         while (len > 0)
266         {
267                 LASSERT (niov > 0);
268                 nob = MIN (iov->iov_len, len);
269                 memcpy (dest, iov->iov_base, nob);
270
271                 len -= nob;
272                 dest += nob;
273                 niov--;
274                 iov++;
275         }
276 }
277
278 void
279 lib_copy_buf2iov (int niov, struct iovec *iov, char *src, ptl_size_t len)
280 {
281         ptl_size_t nob;
282
283         while (len > 0)
284         {
285                 LASSERT (niov > 0);
286                 nob = MIN (iov->iov_len, len);
287                 memcpy (iov->iov_base, src, nob);
288                 
289                 len -= nob;
290                 src += nob;
291                 niov--;
292                 iov++;
293         }
294 }
295
296 static int
297 lib_extract_iov (struct iovec *dst, lib_md_t *md,
298                  ptl_size_t offset, ptl_size_t len)
299 {
300         /* Initialise 'dst' to the subset of 'src' starting at 'offset',
301          * for exactly 'len' bytes, and return the number of entries.
302          * NB not destructive to 'src' */
303         int             src_niov = md->md_niov;  
304         struct iovec   *src = md->md_iov.iov;
305         ptl_size_t      frag_len;
306         int             dst_niov;
307
308         LASSERT (len >= 0);
309         LASSERT (offset >= 0);
310         LASSERT (offset + len <= md->length);
311         
312         if (len == 0)                           /* no data => */
313                 return (0);                     /* no frags */
314
315         LASSERT (src_niov > 0);
316         while (offset >= src->iov_len) {      /* skip initial frags */
317                 offset -= src->iov_len;
318                 src_niov--;
319                 src++;
320                 LASSERT (src_niov > 0);
321         }
322
323         dst_niov = 1;
324         for (;;) {
325                 LASSERT (src_niov > 0);
326                 LASSERT (dst_niov <= PTL_MD_MAX_IOV);
327                 
328                 frag_len = src->iov_len - offset;
329                 dst->iov_base = ((char *)src->iov_base) + offset;
330
331                 if (len <= frag_len) {
332                         dst->iov_len = len;
333                         return (dst_niov);
334                 }
335                 
336                 dst->iov_len = frag_len;
337
338                 len -= frag_len;
339                 dst++;
340                 src++;
341                 dst_niov++;
342                 src_niov--;
343                 offset = 0;
344         }
345 }
346
347 #ifndef __KERNEL__
348 ptl_size_t
349 lib_kiov_nob (int niov, ptl_kiov_t *kiov) 
350 {
351         LASSERT (0);
352         return (0);
353 }
354
355 void
356 lib_copy_kiov2buf (char *dest, int niov, ptl_kiov_t *kiov, ptl_size_t len)
357 {
358         LASSERT (0);
359 }
360
361 void
362 lib_copy_buf2kiov (int niov, ptl_kiov_t *kiov, char *dest, ptl_size_t len)
363 {
364         LASSERT (0);
365 }
366
367 static int
368 lib_extract_kiov (ptl_kiov_t *dst, lib_md_t *md,
369                   ptl_size_t offset, ptl_size_t len)
370 {
371         LASSERT (0);
372 }
373
374 #else
375
376 ptl_size_t
377 lib_kiov_nob (int niov, ptl_kiov_t *kiov) 
378 {
379         ptl_size_t  nob = 0;
380
381         while (niov-- > 0)
382                 nob += (kiov++)->kiov_len;
383
384         return (nob);
385 }
386
387 void
388 lib_copy_kiov2buf (char *dest, int niov, ptl_kiov_t *kiov, ptl_size_t len)
389 {
390         ptl_size_t  nob;
391         char       *addr;
392         
393         LASSERT (!in_interrupt ());
394         while (len > 0)
395         {
396                 LASSERT (niov > 0);
397                 nob = MIN (kiov->kiov_len, len);
398                 
399                 addr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
400                 memcpy (dest, addr, nob);
401                 kunmap (kiov->kiov_page);
402                 
403                 len -= nob;
404                 dest += nob;
405                 niov--;
406                 kiov++;
407         }
408 }
409
410 void
411 lib_copy_buf2kiov (int niov, ptl_kiov_t *kiov, char *src, ptl_size_t len)
412 {
413         ptl_size_t  nob;
414         char       *addr;
415
416         LASSERT (!in_interrupt ());
417         while (len > 0)
418         {
419                 LASSERT (niov > 0);
420                 nob = MIN (kiov->kiov_len, len);
421                 
422                 addr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
423                 memcpy (addr, src, nob);
424                 kunmap (kiov->kiov_page);
425                 
426                 len -= nob;
427                 src += nob;
428                 niov--;
429                 kiov++;
430         }
431 }
432
433 static int
434 lib_extract_kiov (ptl_kiov_t *dst, lib_md_t *md,
435                   ptl_size_t offset, ptl_size_t len)
436 {
437         /* Initialise 'dst' to the subset of 'src' starting at 'offset',
438          * for exactly 'len' bytes, and return the number of entries.
439          * NB not destructive to 'src' */
440         int             src_niov = md->md_niov;  
441         ptl_kiov_t     *src = md->md_iov.kiov;
442         ptl_size_t      frag_len;
443         int             dst_niov;
444
445         LASSERT (len >= 0);
446         LASSERT (offset >= 0);
447         LASSERT (offset + len <= md->length);
448         
449         if (len == 0)                           /* no data => */
450                 return (0);                     /* no frags */
451
452         LASSERT (src_niov > 0);
453         while (offset >= src->kiov_len) {      /* skip initial frags */
454                 offset -= src->kiov_len;
455                 src_niov--;
456                 src++;
457                 LASSERT (src_niov > 0);
458         }
459
460         dst_niov = 1;
461         for (;;) {
462                 LASSERT (src_niov > 0);
463                 LASSERT (dst_niov <= PTL_MD_MAX_IOV);
464                 
465                 frag_len = src->kiov_len - offset;
466                 dst->kiov_page = src->kiov_page;
467                 dst->kiov_offset = src->kiov_offset + offset;
468
469                 if (len <= frag_len) {
470                         dst->kiov_len = len;
471                         LASSERT (dst->kiov_offset + dst->kiov_len <= PAGE_SIZE);
472                         return (dst_niov);
473                 }
474
475                 dst->kiov_len = frag_len;
476                 LASSERT (dst->kiov_offset + dst->kiov_len <= PAGE_SIZE);
477
478                 len -= frag_len;
479                 dst++;
480                 src++;
481                 dst_niov++;
482                 src_niov--;
483                 offset = 0;
484         }
485 }
486 #endif
487
488 void
489 lib_recv (nal_cb_t *nal, void *private, lib_msg_t *msg, lib_md_t *md,
490           ptl_size_t offset, ptl_size_t mlen, ptl_size_t rlen)
491 {
492         int   niov;
493
494         if (mlen == 0)
495                 nal->cb_recv (nal, private, msg, 0, NULL, 0, rlen);
496         else if ((md->options & PTL_MD_KIOV) == 0) {
497                 niov = lib_extract_iov (msg->msg_iov.iov, md, offset, mlen);
498                 nal->cb_recv (nal, private, msg,
499                               niov, msg->msg_iov.iov, mlen, rlen);
500         } else {
501                 niov = lib_extract_kiov (msg->msg_iov.kiov, md, offset, mlen);
502                 nal->cb_recv_pages (nal, private, msg, 
503                                     niov, msg->msg_iov.kiov, mlen, rlen);
504         }
505 }
506
507 int
508 lib_send (nal_cb_t *nal, void *private, lib_msg_t *msg,
509           ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
510           lib_md_t *md, ptl_size_t offset, ptl_size_t len) 
511 {
512         int   niov;
513
514         if (len == 0)
515                 return (nal->cb_send (nal, private, msg, 
516                                       hdr, type, nid, pid,
517                                       0, NULL, 0));
518         
519         if ((md->options & PTL_MD_KIOV) == 0) {
520                 niov = lib_extract_iov (msg->msg_iov.iov, md, offset, len);
521                 return (nal->cb_send (nal, private, msg, 
522                                       hdr, type, nid, pid,
523                                       niov, msg->msg_iov.iov, len));
524         }
525
526         niov = lib_extract_kiov (msg->msg_iov.kiov, md, offset, len);
527         return (nal->cb_send_pages (nal, private, msg, 
528                                     hdr, type, nid, pid,
529                                     niov, msg->msg_iov.kiov, len));
530 }
531
532 static lib_msg_t *
533 get_new_msg (nal_cb_t *nal, lib_md_t *md)
534 {
535         /* ALWAYS called holding the state_lock */
536         lib_counters_t *counters = &nal->ni.counters;
537         lib_msg_t      *msg      = lib_msg_alloc (nal);
538
539         if (msg == NULL)
540                 return (NULL);
541
542         memset (msg, 0, sizeof (*msg));
543
544         msg->send_ack = 0;
545
546         msg->md = md;
547         do_gettimeofday(&msg->ev.arrival_time);
548         md->pending++;
549         if (md->threshold != PTL_MD_THRESH_INF) {
550                 LASSERT (md->threshold > 0);
551                 md->threshold--;
552         }
553
554         counters->msgs_alloc++;
555         if (counters->msgs_alloc > counters->msgs_max)
556                 counters->msgs_max = counters->msgs_alloc;
557
558         list_add (&msg->msg_list, &nal->ni.ni_active_msgs);
559
560         return (msg);
561 }
562
563 /*
564  * Incoming messages have a ptl_msg_t object associated with them
565  * by the library.  This object encapsulates the state of the
566  * message and allows the NAL to do non-blocking receives or sends
567  * of long messages.
568  *
569  */
570 static int parse_put(nal_cb_t * nal, ptl_hdr_t * hdr, void *private)
571 {
572         lib_ni_t        *ni = &nal->ni;
573         ptl_size_t       mlength = 0;
574         ptl_size_t       offset = 0;
575         int              unlink = 0;
576         lib_me_t        *me;
577         lib_md_t        *md;
578         lib_msg_t       *msg;
579         unsigned long    flags;
580
581         /* Convert put fields to host byte order */
582         hdr->msg.put.match_bits = NTOH__u64 (hdr->msg.put.match_bits);
583         hdr->msg.put.ptl_index = NTOH__u32 (hdr->msg.put.ptl_index);
584         hdr->msg.put.offset = NTOH__u32 (hdr->msg.put.offset);
585
586         state_lock(nal, &flags);
587
588         me = lib_find_me(nal, hdr->msg.put.ptl_index, PTL_MD_OP_PUT,
589                          hdr->src_nid, hdr->src_pid,
590                          PTL_HDR_LENGTH (hdr), hdr->msg.put.offset,
591                          hdr->msg.put.match_bits,
592                          &mlength, &offset, &unlink);
593         if (me == NULL)
594                 goto drop;
595
596         md = me->md;
597         CDEBUG(D_NET, "Incoming put index %x from "LPU64"/%u of length %d/%d "
598                "into md "LPX64" [%d] + %d\n", hdr->msg.put.ptl_index,
599                hdr->src_nid, hdr->src_pid, mlength, PTL_HDR_LENGTH(hdr), 
600                md->md_lh.lh_cookie, md->md_niov, offset);
601
602         msg = get_new_msg (nal, md);
603         if (msg == NULL) {
604                 CERROR(LPU64": Dropping PUT from "LPU64": can't allocate msg\n",
605                        ni->nid, hdr->src_nid);
606                 goto drop;
607         }
608
609         if (!ptl_is_wire_handle_none(&hdr->msg.put.ack_wmd) &&
610             !(md->options & PTL_MD_ACK_DISABLE)) {
611                 msg->send_ack = 1;
612                 msg->ack_wmd = hdr->msg.put.ack_wmd;
613                 msg->nid = hdr->src_nid;
614                 msg->pid = hdr->src_pid;
615                 msg->ev.match_bits = hdr->msg.put.match_bits;
616         }
617
618         if (md->eq) {
619                 msg->ev.type = PTL_EVENT_PUT;
620                 msg->ev.initiator.nid = hdr->src_nid;
621                 msg->ev.initiator.pid = hdr->src_pid;
622                 msg->ev.portal = hdr->msg.put.ptl_index;
623                 msg->ev.match_bits = hdr->msg.put.match_bits;
624                 msg->ev.rlength = PTL_HDR_LENGTH(hdr);
625                 msg->ev.mlength = mlength;
626                 msg->ev.offset = offset;
627                 msg->ev.hdr_data = hdr->msg.put.hdr_data;
628
629                 /* NB if this match has exhausted the MD, we can't be sure
630                  * that this event will the the last one associated with
631                  * this MD in the event queue (another message already
632                  * matching this ME/MD could end up being last).  So we
633                  * remember the ME handle anyway and check again when we're
634                  * allocating our slot in the event queue.
635                  */
636                 ptl_me2handle (&msg->ev.unlinked_me, me);
637
638                 lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
639         }
640
641         ni->counters.recv_count++;
642         ni->counters.recv_length += mlength;
643
644         /* only unlink after MD's pending count has been bumped
645          * in get_new_msg() otherwise lib_me_unlink() will nuke it */
646         if (unlink) {
647                 md->md_flags |= PTL_MD_FLAG_AUTO_UNLINKED;
648                 lib_me_unlink (nal, me);
649         }
650
651         state_unlock(nal, &flags);
652
653         lib_recv (nal, private, msg, md, offset, mlength, PTL_HDR_LENGTH (hdr));
654         return 0;
655
656  drop:
657         nal->ni.counters.drop_count++;
658         nal->ni.counters.drop_length += PTL_HDR_LENGTH(hdr);
659         state_unlock (nal, &flags);
660         lib_recv (nal, private, NULL, NULL, 0, 0, PTL_HDR_LENGTH (hdr));
661         return -1;
662 }
663
664 static int parse_get(nal_cb_t * nal, ptl_hdr_t * hdr, void *private)
665 {
666         lib_ni_t        *ni = &nal->ni;
667         ptl_size_t       mlength = 0;
668         ptl_size_t       offset = 0;
669         int              unlink = 0;
670         lib_me_t        *me;
671         lib_md_t        *md;
672         lib_msg_t       *msg;
673         ptl_hdr_t        reply;
674         unsigned long    flags;
675         int              rc;
676
677         /* Convert get fields to host byte order */
678         hdr->msg.get.match_bits = NTOH__u64 (hdr->msg.get.match_bits);
679         hdr->msg.get.ptl_index = NTOH__u32 (hdr->msg.get.ptl_index);
680         hdr->msg.get.sink_length = NTOH__u32 (hdr->msg.get.sink_length);
681         hdr->msg.get.src_offset = NTOH__u32 (hdr->msg.get.src_offset);
682
683         /* compatibility check until field is deleted */
684         if (hdr->msg.get.return_offset != 0)
685                 CERROR("Unexpected non-zero get.return_offset %x from "
686                        LPU64"\n", hdr->msg.get.return_offset, hdr->src_nid);
687
688         state_lock(nal, &flags);
689
690         me = lib_find_me(nal, hdr->msg.get.ptl_index, PTL_MD_OP_GET,
691                          hdr->src_nid, hdr->src_pid,
692                          hdr->msg.get.sink_length, hdr->msg.get.src_offset,
693                          hdr->msg.get.match_bits,
694                          &mlength, &offset, &unlink);
695         if (me == NULL)
696                 goto drop;
697
698         md = me->md;
699         CDEBUG(D_NET, "Incoming get index %d from "LPU64".%u of length %d/%d "
700                "from md "LPX64" [%d] + %d\n", hdr->msg.get.ptl_index,
701                hdr->src_nid, hdr->src_pid, mlength, PTL_HDR_LENGTH(hdr), 
702                md->md_lh.lh_cookie, md->md_niov, offset);
703
704         msg = get_new_msg (nal, md);
705         if (msg == NULL) {
706                 CERROR(LPU64": Dropping GET from "LPU64": can't allocate msg\n",
707                        ni->nid, hdr->src_nid);
708                 goto drop;
709         }
710
711         if (md->eq) {
712                 msg->ev.type = PTL_EVENT_GET;
713                 msg->ev.initiator.nid = hdr->src_nid;
714                 msg->ev.initiator.pid = hdr->src_pid;
715                 msg->ev.portal = hdr->msg.get.ptl_index;
716                 msg->ev.match_bits = hdr->msg.get.match_bits;
717                 msg->ev.rlength = PTL_HDR_LENGTH(hdr);
718                 msg->ev.mlength = mlength;
719                 msg->ev.offset = offset;
720                 msg->ev.hdr_data = 0;
721
722                 /* NB if this match has exhausted the MD, we can't be sure
723                  * that this event will the the last one associated with
724                  * this MD in the event queue (another message already
725                  * matching this ME/MD could end up being last).  So we
726                  * remember the ME handle anyway and check again when we're
727                  * allocating our slot in the event queue.
728                  */
729                 ptl_me2handle (&msg->ev.unlinked_me, me);
730
731                 lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
732         }
733
734         ni->counters.send_count++;
735         ni->counters.send_length += mlength;
736
737         /* only unlink after MD's refcount has been bumped
738          * in get_new_msg() otherwise lib_me_unlink() will nuke it */
739         if (unlink) {
740                 md->md_flags |= PTL_MD_FLAG_AUTO_UNLINKED;
741                 lib_me_unlink (nal, me);
742         }
743
744         state_unlock(nal, &flags);
745
746         memset (&reply, 0, sizeof (reply));
747         reply.type     = HTON__u32 (PTL_MSG_REPLY);
748         reply.dest_nid = HTON__u64 (hdr->src_nid);
749         reply.src_nid  = HTON__u64 (ni->nid);
750         reply.dest_pid = HTON__u32 (hdr->src_pid);
751         reply.src_pid  = HTON__u32 (ni->pid);
752         PTL_HDR_LENGTH(&reply) = HTON__u32 (mlength);
753
754         reply.msg.reply.dst_wmd = hdr->msg.get.return_wmd;
755
756         rc = lib_send (nal, private, msg, &reply, PTL_MSG_REPLY, 
757                        hdr->src_nid, hdr->src_pid, md, offset, mlength);
758         if (rc != PTL_OK) {
759                 CERROR(LPU64": Dropping GET from "LPU64": send REPLY failed\n",
760                        ni->nid, hdr->src_nid);
761                 /* Hmm, this will create a GET event and make believe
762                  * the reply completed, which it kind of did, only the
763                  * source won't get her reply */
764                 lib_finalize (nal, private, msg);
765                 state_lock (nal, &flags);
766                 goto drop;
767         }
768
769         /* Complete the incoming message */
770         lib_recv (nal, private, NULL, NULL, 0, 0, PTL_HDR_LENGTH (hdr));
771         return (rc);
772  drop:
773         ni->counters.drop_count++;
774         ni->counters.drop_length += hdr->msg.get.sink_length;
775         state_unlock(nal, &flags);
776         lib_recv (nal, private, NULL, NULL, 0, 0, PTL_HDR_LENGTH (hdr));
777         return -1;
778 }
779
780 static int parse_reply(nal_cb_t * nal, ptl_hdr_t * hdr, void *private)
781 {
782         lib_ni_t        *ni = &nal->ni;
783         lib_md_t        *md;
784         int              rlength;
785         int              length;
786         lib_msg_t       *msg;
787         unsigned long    flags;
788
789         /* compatibility check until field is deleted */
790         if (hdr->msg.reply.dst_offset != 0)
791                 CERROR("Unexpected non-zero reply.dst_offset %x from "LPU64"\n",
792                        hdr->msg.reply.dst_offset, hdr->src_nid);
793
794         state_lock(nal, &flags);
795
796         /* NB handles only looked up by creator (no flips) */
797         md = ptl_wire_handle2md(&hdr->msg.reply.dst_wmd, nal);
798         if (md == NULL || md->threshold == 0) {
799                 CERROR (LPU64": Dropping REPLY from "LPU64" for %s MD "LPX64"."LPX64"\n",
800                         ni->nid, hdr->src_nid,
801                         md == NULL ? "invalid" : "inactive",
802                         hdr->msg.reply.dst_wmd.wh_interface_cookie,
803                         hdr->msg.reply.dst_wmd.wh_object_cookie);
804                 goto drop;
805         }
806
807         LASSERT (md->offset == 0);
808
809         length = rlength = PTL_HDR_LENGTH(hdr);
810
811         if (length > md->length) {
812                 if ((md->options & PTL_MD_TRUNCATE) == 0) {
813                         CERROR (LPU64": Dropping REPLY from "LPU64
814                                 " length %d for MD "LPX64" would overflow (%d)\n",
815                                 ni->nid, hdr->src_nid, length,
816                                 hdr->msg.reply.dst_wmd.wh_object_cookie,
817                                 md->length);
818                         goto drop;
819                 }
820                 length = md->length;
821         }
822
823         CDEBUG(D_NET, "Reply from "LPU64" of length %d/%d into md "LPX64"\n",
824                hdr->src_nid, length, rlength, 
825                hdr->msg.reply.dst_wmd.wh_object_cookie);
826
827         msg = get_new_msg (nal, md);
828         if (msg == NULL) {
829                 CERROR(LPU64": Dropping REPLY from "LPU64": can't "
830                        "allocate msg\n", ni->nid, hdr->src_nid);
831                 goto drop;
832         }
833
834         if (md->eq) {
835                 msg->ev.type = PTL_EVENT_REPLY;
836                 msg->ev.initiator.nid = hdr->src_nid;
837                 msg->ev.initiator.pid = hdr->src_pid;
838                 msg->ev.rlength = rlength;
839                 msg->ev.mlength = length;
840                 msg->ev.offset = 0;
841
842                 lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
843         }
844
845         ni->counters.recv_count++;
846         ni->counters.recv_length += length;
847
848         state_unlock(nal, &flags);
849
850         lib_recv (nal, private, msg, md, 0, length, rlength);
851         return 0;
852
853  drop:
854         nal->ni.counters.drop_count++;
855         nal->ni.counters.drop_length += PTL_HDR_LENGTH(hdr);
856         state_unlock (nal, &flags);
857         lib_recv (nal, private, NULL, NULL, 0, 0, PTL_HDR_LENGTH (hdr));
858         return -1;
859 }
860
861 static int parse_ack(nal_cb_t * nal, ptl_hdr_t * hdr, void *private)
862 {
863         lib_ni_t *ni = &nal->ni;
864         lib_md_t *md;
865         lib_msg_t *msg = NULL;
866         unsigned long flags;
867
868         /* Convert ack fields to host byte order */
869         hdr->msg.ack.match_bits = NTOH__u64 (hdr->msg.ack.match_bits);
870         hdr->msg.ack.mlength = NTOH__u32 (hdr->msg.ack.mlength);
871
872         state_lock(nal, &flags);
873
874         /* NB handles only looked up by creator (no flips) */
875         md = ptl_wire_handle2md(&hdr->msg.ack.dst_wmd, nal);
876         if (md == NULL || md->threshold == 0) {
877                 CDEBUG(D_INFO, LPU64": Dropping ACK from "LPU64" to %s MD "
878                        LPX64"."LPX64"\n", ni->nid, hdr->src_nid, 
879                        (md == NULL) ? "invalid" : "inactive",
880                        hdr->msg.ack.dst_wmd.wh_interface_cookie,
881                        hdr->msg.ack.dst_wmd.wh_object_cookie);
882                 goto drop;
883         }
884
885         CDEBUG(D_NET, LPU64": ACK from "LPU64" into md "LPX64"\n",
886                ni->nid, hdr->src_nid, 
887                hdr->msg.ack.dst_wmd.wh_object_cookie);
888
889         msg = get_new_msg (nal, md);
890         if (msg == NULL) {
891                 CERROR(LPU64": Dropping ACK from "LPU64": can't allocate msg\n",
892                        ni->nid, hdr->src_nid);
893                 goto drop;
894         }
895
896         if (md->eq) {
897                 msg->ev.type = PTL_EVENT_ACK;
898                 msg->ev.initiator.nid = hdr->src_nid;
899                 msg->ev.initiator.pid = hdr->src_pid;
900                 msg->ev.mlength = hdr->msg.ack.mlength;
901                 msg->ev.match_bits = hdr->msg.ack.match_bits;
902
903                 lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
904         }
905
906         ni->counters.recv_count++;
907         state_unlock(nal, &flags);
908         lib_recv (nal, private, msg, NULL, 0, 0, PTL_HDR_LENGTH (hdr));
909         return 0;
910
911  drop:
912         nal->ni.counters.drop_count++;
913         state_unlock (nal, &flags);
914         lib_recv (nal, private, NULL, NULL, 0, 0, PTL_HDR_LENGTH (hdr));
915         return -1;
916 }
917
918 static char *
919 hdr_type_string (ptl_hdr_t *hdr)
920 {
921         switch (hdr->type) {
922         case PTL_MSG_ACK:
923                 return ("ACK");
924         case PTL_MSG_PUT:
925                 return ("PUT");
926         case PTL_MSG_GET:
927                 return ("GET");
928         case PTL_MSG_REPLY:
929                 return ("REPLY");
930         case PTL_MSG_HELLO:
931                 return ("HELLO");
932         default:
933                 return ("<UNKNOWN>");
934         }
935 }
936
937 void print_hdr(nal_cb_t * nal, ptl_hdr_t * hdr)
938 {
939         char *type_str = hdr_type_string (hdr);
940
941         nal->cb_printf(nal, "P3 Header at %p of type %s\n", hdr, type_str);
942         nal->cb_printf(nal, "    From nid/pid %Lu/%Lu", hdr->src_nid,
943                        hdr->src_pid);
944         nal->cb_printf(nal, "    To nid/pid %Lu/%Lu\n", hdr->dest_nid,
945                        hdr->dest_pid);
946
947         switch (hdr->type) {
948         default:
949                 break;
950
951         case PTL_MSG_PUT:
952                 nal->cb_printf(nal,
953                                "    Ptl index %d, ack md "LPX64"."LPX64", "
954                                "match bits "LPX64"\n",
955                                hdr->msg.put.ptl_index,
956                                hdr->msg.put.ack_wmd.wh_interface_cookie,
957                                hdr->msg.put.ack_wmd.wh_object_cookie,
958                                hdr->msg.put.match_bits);
959                 nal->cb_printf(nal,
960                                "    Length %d, offset %d, hdr data "LPX64"\n",
961                                PTL_HDR_LENGTH(hdr), hdr->msg.put.offset,
962                                hdr->msg.put.hdr_data);
963                 break;
964
965         case PTL_MSG_GET:
966                 nal->cb_printf(nal,
967                                "    Ptl index %d, return md "LPX64"."LPX64", "
968                                "match bits "LPX64"\n", hdr->msg.get.ptl_index,
969                                hdr->msg.get.return_wmd.wh_interface_cookie,
970                                hdr->msg.get.return_wmd.wh_object_cookie,
971                                hdr->msg.get.match_bits);
972                 nal->cb_printf(nal,
973                                "    Length %d, src offset %d\n",
974                                hdr->msg.get.sink_length,
975                                hdr->msg.get.src_offset);
976                 break;
977
978         case PTL_MSG_ACK:
979                 nal->cb_printf(nal, "    dst md "LPX64"."LPX64", "
980                                "manipulated length %d\n",
981                                hdr->msg.ack.dst_wmd.wh_interface_cookie,
982                                hdr->msg.ack.dst_wmd.wh_object_cookie,
983                                hdr->msg.ack.mlength);
984                 break;
985
986         case PTL_MSG_REPLY:
987                 nal->cb_printf(nal, "    dst md "LPX64"."LPX64", "
988                                "length %d\n",
989                                hdr->msg.reply.dst_wmd.wh_interface_cookie,
990                                hdr->msg.reply.dst_wmd.wh_object_cookie,
991                                PTL_HDR_LENGTH(hdr));
992         }
993
994 }                               /* end of print_hdr() */
995
996
997 int lib_parse(nal_cb_t * nal, ptl_hdr_t * hdr, void *private)
998 {
999         unsigned long  flags;
1000
1001         /* NB static check; optimizer will elide this if it's right */
1002         LASSERT (offsetof (ptl_hdr_t, msg.ack.length) ==
1003                  offsetof (ptl_hdr_t, msg.put.length));
1004         LASSERT (offsetof (ptl_hdr_t, msg.ack.length) ==
1005                  offsetof (ptl_hdr_t, msg.get.length));
1006         LASSERT (offsetof (ptl_hdr_t, msg.ack.length) ==
1007                  offsetof (ptl_hdr_t, msg.reply.length));
1008
1009         /* convert common fields to host byte order */
1010         hdr->dest_nid = NTOH__u64 (hdr->dest_nid);
1011         hdr->src_nid = NTOH__u64 (hdr->src_nid);
1012         hdr->dest_pid = NTOH__u32 (hdr->dest_pid);
1013         hdr->src_pid = NTOH__u32 (hdr->src_pid);
1014         hdr->type = NTOH__u32 (hdr->type);
1015         PTL_HDR_LENGTH(hdr) = NTOH__u32 (PTL_HDR_LENGTH(hdr));
1016 #if 0
1017         nal->cb_printf(nal, "%d: lib_parse: nal=%p hdr=%p type=%d\n",
1018                        nal->ni.nid, nal, hdr, hdr->type);
1019         print_hdr(nal, hdr);
1020 #endif
1021         if (hdr->type == PTL_MSG_HELLO) {
1022                 /* dest_nid is really ptl_magicversion_t */
1023                 ptl_magicversion_t *mv = (ptl_magicversion_t *)&hdr->dest_nid;
1024
1025                 CERROR (LPU64": Dropping unexpected HELLO message: "
1026                         "magic %d, version %d.%d from "LPD64"\n",
1027                         nal->ni.nid, mv->magic, 
1028                         mv->version_major, mv->version_minor,
1029                         hdr->src_nid);
1030                 lib_recv (nal, private, NULL, NULL, 0, 0, PTL_HDR_LENGTH (hdr));
1031                 return (-1);
1032         }
1033         
1034         if (hdr->dest_nid != nal->ni.nid) {
1035                 CERROR(LPU64": Dropping %s message from "LPU64" to "LPU64
1036                        " (not me)\n", nal->ni.nid, hdr_type_string (hdr),
1037                        hdr->src_nid, hdr->dest_nid);
1038
1039                 state_lock (nal, &flags);
1040                 nal->ni.counters.drop_count++;
1041                 nal->ni.counters.drop_length += PTL_HDR_LENGTH(hdr);
1042                 state_unlock (nal, &flags);
1043
1044                 lib_recv (nal, private, NULL, NULL, 0, 0, PTL_HDR_LENGTH (hdr));
1045                 return (-1);
1046         }
1047
1048         if (!list_empty (&nal->ni.ni_test_peers) && /* normally we don't */
1049             fail_peer (nal, hdr->src_nid, 0))      /* shall we now? */
1050         {
1051                 CERROR(LPU64": Dropping incoming %s from "LPU64
1052                        ": simulated failure\n",
1053                        nal->ni.nid, hdr_type_string (hdr), 
1054                        hdr->src_nid);
1055                 return (-1);
1056         }
1057         
1058         switch (hdr->type) {
1059         case PTL_MSG_ACK:
1060                 return (parse_ack(nal, hdr, private));
1061         case PTL_MSG_PUT:
1062                 return (parse_put(nal, hdr, private));
1063                 break;
1064         case PTL_MSG_GET:
1065                 return (parse_get(nal, hdr, private));
1066                 break;
1067         case PTL_MSG_REPLY:
1068                 return (parse_reply(nal, hdr, private));
1069                 break;
1070         default:
1071                 CERROR(LPU64": Dropping <unknown> message from "LPU64
1072                        ": Bad type=0x%x\n",  nal->ni.nid, hdr->src_nid,
1073                        hdr->type);
1074
1075                 lib_recv (nal, private, NULL, NULL, 0, 0, PTL_HDR_LENGTH (hdr));
1076                 return (-1);
1077         }
1078 }
1079
1080
1081 int do_PtlPut(nal_cb_t * nal, void *private, void *v_args, void *v_ret)
1082 {
1083         /*
1084          * Incoming:
1085          *      ptl_handle_md_t md_in
1086          *      ptl_ack_req_t ack_req_in
1087          *      ptl_process_id_t target_in
1088          *      ptl_pt_index_t portal_in
1089          *      ptl_ac_index_t cookie_in
1090          *      ptl_match_bits_t match_bits_in
1091          *      ptl_size_t offset_in
1092          *
1093          * Outgoing:
1094          */
1095
1096         PtlPut_in *args = v_args;
1097         PtlPut_out *ret = v_ret;
1098         ptl_hdr_t hdr;
1099
1100         lib_ni_t *ni = &nal->ni;
1101         lib_md_t *md;
1102         lib_msg_t *msg = NULL;
1103         ptl_process_id_t *id = &args->target_in;
1104         unsigned long flags;
1105         int           rc;
1106         
1107         if (!list_empty (&nal->ni.ni_test_peers) && /* normally we don't */
1108             fail_peer (nal, id->nid, 1))           /* shall we now? */
1109         {
1110                 CERROR(LPU64": Dropping PUT to "LPU64": simulated failure\n",
1111                        nal->ni.nid, id->nid);
1112                 return (ret->rc = PTL_INV_PROC);
1113         }
1114         
1115         ret->rc = PTL_OK;
1116         state_lock(nal, &flags);
1117         md = ptl_handle2md(&args->md_in, nal);
1118         if (md == NULL || !md->threshold) {
1119                 state_unlock(nal, &flags);
1120                 return ret->rc = PTL_INV_MD;
1121         }
1122
1123         CDEBUG(D_NET, "PtlPut -> %Lu: %lu\n", (unsigned long long)id->nid,
1124                (unsigned long)id->pid);
1125
1126         memset (&hdr, 0, sizeof (hdr));
1127         hdr.type     = HTON__u32 (PTL_MSG_PUT);
1128         hdr.dest_nid = HTON__u64 (id->nid);
1129         hdr.src_nid  = HTON__u64 (ni->nid);
1130         hdr.dest_pid = HTON__u32 (id->pid);
1131         hdr.src_pid  = HTON__u32 (ni->pid);
1132         PTL_HDR_LENGTH(&hdr) = HTON__u32 (md->length);
1133
1134         /* NB handles only looked up by creator (no flips) */
1135         if (args->ack_req_in == PTL_ACK_REQ) {
1136                 hdr.msg.put.ack_wmd.wh_interface_cookie = ni->ni_interface_cookie;
1137                 hdr.msg.put.ack_wmd.wh_object_cookie = md->md_lh.lh_cookie;
1138         } else {
1139                 hdr.msg.put.ack_wmd = PTL_WIRE_HANDLE_NONE;
1140         }
1141
1142         hdr.msg.put.match_bits = HTON__u64 (args->match_bits_in);
1143         hdr.msg.put.ptl_index = HTON__u32 (args->portal_in);
1144         hdr.msg.put.offset = HTON__u32 (args->offset_in);
1145         hdr.msg.put.hdr_data = args->hdr_data_in;
1146
1147         ni->counters.send_count++;
1148         ni->counters.send_length += md->length;
1149
1150         msg = get_new_msg (nal, md);
1151         if (msg == NULL) {
1152                 CERROR("BAD: could not allocate msg!\n");
1153                 state_unlock(nal, &flags);
1154                 return ret->rc = PTL_NOSPACE;
1155         }
1156
1157         /*
1158          * If this memory descriptor has an event queue associated with
1159          * it we need to allocate a message state object and record the
1160          * information about this operation that will be recorded into
1161          * event queue once the message has been completed.
1162          *
1163          * NB. We're now committed to the GET, since we just marked the MD
1164          * busy.  Callers who observe this (by getting PTL_MD_INUSE from
1165          * PtlMDUnlink()) expect a completion event to tell them when the
1166          * MD becomes idle. 
1167          */
1168         if (md->eq) {
1169                 msg->ev.type = PTL_EVENT_SENT;
1170                 msg->ev.initiator.nid = ni->nid;
1171                 msg->ev.initiator.pid = ni->pid;
1172                 msg->ev.portal = args->portal_in;
1173                 msg->ev.match_bits = args->match_bits_in;
1174                 msg->ev.rlength = md->length;
1175                 msg->ev.mlength = md->length;
1176                 msg->ev.offset = args->offset_in;
1177                 msg->ev.hdr_data = args->hdr_data_in;
1178
1179                 lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
1180         }
1181
1182         state_unlock(nal, &flags);
1183         
1184         rc = lib_send (nal, private, msg, &hdr, PTL_MSG_PUT,
1185                        id->nid, id->pid, md, 0, md->length);
1186         if (rc != PTL_OK) {
1187                 /* get_new_msg() committed us to sending by decrementing
1188                  * md->threshold, so we have to act like we did send, but
1189                  * the network dropped it. */
1190                 lib_finalize (nal, private, msg);
1191         }
1192         
1193         return ret->rc = PTL_OK;
1194 }
1195
1196
1197 int do_PtlGet(nal_cb_t * nal, void *private, void *v_args, void *v_ret)
1198 {
1199         /*
1200          * Incoming:
1201          *      ptl_handle_md_t md_in
1202          *      ptl_process_id_t target_in
1203          *      ptl_pt_index_t portal_in
1204          *      ptl_ac_index_t cookie_in
1205          *      ptl_match_bits_t match_bits_in
1206          *      ptl_size_t offset_in
1207          *
1208          * Outgoing:
1209          */
1210
1211         PtlGet_in *args = v_args;
1212         PtlGet_out *ret = v_ret;
1213         ptl_hdr_t hdr;
1214         lib_msg_t *msg = NULL;
1215         lib_ni_t *ni = &nal->ni;
1216         ptl_process_id_t *id = &args->target_in;
1217         lib_md_t *md;
1218         unsigned long flags;
1219         int           rc;
1220         
1221         if (!list_empty (&nal->ni.ni_test_peers) && /* normally we don't */
1222             fail_peer (nal, id->nid, 1))           /* shall we now? */
1223         {
1224                 CERROR(LPU64": Dropping PUT to "LPU64": simulated failure\n",
1225                        nal->ni.nid, id->nid);
1226                 return (ret->rc = PTL_INV_PROC);
1227         }
1228         
1229         state_lock(nal, &flags);
1230         md = ptl_handle2md(&args->md_in, nal);
1231         if (md == NULL || !md->threshold) {
1232                 state_unlock(nal, &flags);
1233                 return ret->rc = PTL_INV_MD;
1234         }
1235
1236         LASSERT (md->offset == 0);
1237
1238         CDEBUG(D_NET, "PtlGet -> %Lu: %lu\n", (unsigned long long)id->nid,
1239                (unsigned long)id->pid);
1240
1241         memset (&hdr, 0, sizeof (hdr));
1242         hdr.type     = HTON__u32 (PTL_MSG_GET);
1243         hdr.dest_nid = HTON__u64 (id->nid);
1244         hdr.src_nid  = HTON__u64 (ni->nid);
1245         hdr.dest_pid = HTON__u32 (id->pid);
1246         hdr.src_pid  = HTON__u32 (ni->pid);
1247         PTL_HDR_LENGTH(&hdr) = 0;
1248
1249         /* NB handles only looked up by creator (no flips) */
1250         hdr.msg.get.return_wmd.wh_interface_cookie = ni->ni_interface_cookie;
1251         hdr.msg.get.return_wmd.wh_object_cookie = md->md_lh.lh_cookie;
1252
1253         hdr.msg.get.match_bits = HTON__u64 (args->match_bits_in);
1254         hdr.msg.get.ptl_index = HTON__u32 (args->portal_in);
1255         hdr.msg.get.src_offset = HTON__u32 (args->offset_in);
1256         hdr.msg.get.sink_length = HTON__u32 (md->length);
1257
1258         ni->counters.send_count++;
1259
1260         msg = get_new_msg (nal, md);
1261         if (msg == NULL) {
1262                 CERROR("do_PtlGet: BAD - could not allocate cookie!\n");
1263                 state_unlock(nal, &flags);
1264                 return ret->rc = PTL_NOSPACE;
1265         }
1266
1267         /*
1268          * If this memory descriptor has an event queue associated with
1269          * it we must allocate a message state object that will record
1270          * the information to be filled in once the message has been
1271          * completed.  More information is in the do_PtlPut() comments.
1272          *
1273          * NB. We're now committed to the GET, since we just marked the MD
1274          * busy.  Callers who observe this (by getting PTL_MD_INUSE from
1275          * PtlMDUnlink()) expect a completion event to tell them when the
1276          * MD becomes idle. 
1277          */
1278         if (md->eq) {
1279                 msg->ev.type = PTL_EVENT_SENT;
1280                 msg->ev.initiator.nid = ni->nid;
1281                 msg->ev.initiator.pid = ni->pid;
1282                 msg->ev.portal = args->portal_in;
1283                 msg->ev.match_bits = args->match_bits_in;
1284                 msg->ev.rlength = md->length;
1285                 msg->ev.mlength = md->length;
1286                 msg->ev.offset = args->offset_in;
1287                 msg->ev.hdr_data = 0;
1288
1289                 lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
1290         }
1291
1292         state_unlock(nal, &flags);
1293
1294         rc = lib_send (nal, private, msg, &hdr, PTL_MSG_GET,
1295                        id->nid, id->pid, NULL, 0, 0);
1296         if (rc != PTL_OK) {
1297                 /* get_new_msg() committed us to sending by decrementing
1298                  * md->threshold, so we have to act like we did send, but
1299                  * the network dropped it. */
1300                 lib_finalize (nal, private, msg);
1301         }
1302         
1303         return ret->rc = PTL_OK;
1304 }
1305
1306 void lib_assert_wire_constants (void)
1307 {
1308         /* Wire protocol assertions generated by 'wirecheck' */
1309
1310         /* Constants... */
1311         LASSERT (PORTALS_PROTO_MAGIC == 0xeebc0ded);
1312         LASSERT (PORTALS_PROTO_VERSION_MAJOR == 0);
1313         LASSERT (PORTALS_PROTO_VERSION_MINOR == 1);
1314         LASSERT (PTL_MSG_ACK == 0);
1315         LASSERT (PTL_MSG_PUT == 1);
1316         LASSERT (PTL_MSG_GET == 2);
1317         LASSERT (PTL_MSG_REPLY == 3);
1318         LASSERT (PTL_MSG_HELLO == 4);
1319
1320         /* Checks for struct ptl_handle_wire_t */
1321         LASSERT (sizeof (ptl_handle_wire_t) == 16);
1322         LASSERT (offsetof (ptl_handle_wire_t, wh_interface_cookie) == 0);
1323         LASSERT (sizeof (((ptl_handle_wire_t *)0)->wh_interface_cookie) == 8);
1324         LASSERT (offsetof (ptl_handle_wire_t, wh_object_cookie) == 8);
1325         LASSERT (sizeof (((ptl_handle_wire_t *)0)->wh_object_cookie) == 8);
1326
1327         /* Checks for struct ptl_magicversion_t */
1328         LASSERT (sizeof (ptl_magicversion_t) == 8);
1329         LASSERT (offsetof (ptl_magicversion_t, magic) == 0);
1330         LASSERT (sizeof (((ptl_magicversion_t *)0)->magic) == 4);
1331         LASSERT (offsetof (ptl_magicversion_t, version_major) == 4);
1332         LASSERT (sizeof (((ptl_magicversion_t *)0)->version_major) == 2);
1333         LASSERT (offsetof (ptl_magicversion_t, version_minor) == 6);
1334         LASSERT (sizeof (((ptl_magicversion_t *)0)->version_minor) == 2);
1335
1336         /* Checks for struct ptl_hdr_t */
1337         LASSERT (sizeof (ptl_hdr_t) == 72);
1338         LASSERT (offsetof (ptl_hdr_t, dest_nid) == 0);
1339         LASSERT (sizeof (((ptl_hdr_t *)0)->dest_nid) == 8);
1340         LASSERT (offsetof (ptl_hdr_t, src_nid) == 8);
1341         LASSERT (sizeof (((ptl_hdr_t *)0)->src_nid) == 8);
1342         LASSERT (offsetof (ptl_hdr_t, dest_pid) == 16);
1343         LASSERT (sizeof (((ptl_hdr_t *)0)->dest_pid) == 4);
1344         LASSERT (offsetof (ptl_hdr_t, src_pid) == 20);
1345         LASSERT (sizeof (((ptl_hdr_t *)0)->src_pid) == 4);
1346         LASSERT (offsetof (ptl_hdr_t, type) == 24);
1347         LASSERT (sizeof (((ptl_hdr_t *)0)->type) == 4);
1348
1349         /* Ack */
1350         LASSERT (offsetof (ptl_hdr_t, msg.ack.mlength) == 28);
1351         LASSERT (sizeof (((ptl_hdr_t *)0)->msg.ack.mlength) == 4);
1352         LASSERT (offsetof (ptl_hdr_t, msg.ack.dst_wmd) == 32);
1353         LASSERT (sizeof (((ptl_hdr_t *)0)->msg.ack.dst_wmd) == 16);
1354         LASSERT (offsetof (ptl_hdr_t, msg.ack.match_bits) == 48);
1355         LASSERT (sizeof (((ptl_hdr_t *)0)->msg.ack.match_bits) == 8);
1356         LASSERT (offsetof (ptl_hdr_t, msg.ack.length) == 56);
1357         LASSERT (sizeof (((ptl_hdr_t *)0)->msg.ack.length) == 4);
1358
1359         /* Put */
1360         LASSERT (offsetof (ptl_hdr_t, msg.put.ptl_index) == 28);
1361         LASSERT (sizeof (((ptl_hdr_t *)0)->msg.put.ptl_index) == 4);
1362         LASSERT (offsetof (ptl_hdr_t, msg.put.ack_wmd) == 32);
1363         LASSERT (sizeof (((ptl_hdr_t *)0)->msg.put.ack_wmd) == 16);
1364         LASSERT (offsetof (ptl_hdr_t, msg.put.match_bits) == 48);
1365         LASSERT (sizeof (((ptl_hdr_t *)0)->msg.put.match_bits) == 8);
1366         LASSERT (offsetof (ptl_hdr_t, msg.put.length) == 56);
1367         LASSERT (sizeof (((ptl_hdr_t *)0)->msg.put.length) == 4);
1368         LASSERT (offsetof (ptl_hdr_t, msg.put.offset) == 60);
1369         LASSERT (sizeof (((ptl_hdr_t *)0)->msg.put.offset) == 4);
1370         LASSERT (offsetof (ptl_hdr_t, msg.put.hdr_data) == 64);
1371         LASSERT (sizeof (((ptl_hdr_t *)0)->msg.put.hdr_data) == 8);
1372
1373         /* Get */
1374         LASSERT (offsetof (ptl_hdr_t, msg.get.ptl_index) == 28);
1375         LASSERT (sizeof (((ptl_hdr_t *)0)->msg.get.ptl_index) == 4);
1376         LASSERT (offsetof (ptl_hdr_t, msg.get.return_wmd) == 32);
1377         LASSERT (sizeof (((ptl_hdr_t *)0)->msg.get.return_wmd) == 16);
1378         LASSERT (offsetof (ptl_hdr_t, msg.get.match_bits) == 48);
1379         LASSERT (sizeof (((ptl_hdr_t *)0)->msg.get.match_bits) == 8);
1380         LASSERT (offsetof (ptl_hdr_t, msg.get.length) == 56);
1381         LASSERT (sizeof (((ptl_hdr_t *)0)->msg.get.length) == 4);
1382         LASSERT (offsetof (ptl_hdr_t, msg.get.src_offset) == 60);
1383         LASSERT (sizeof (((ptl_hdr_t *)0)->msg.get.src_offset) == 4);
1384         LASSERT (offsetof (ptl_hdr_t, msg.get.return_offset) == 64);
1385         LASSERT (sizeof (((ptl_hdr_t *)0)->msg.get.return_offset) == 4);
1386         LASSERT (offsetof (ptl_hdr_t, msg.get.sink_length) == 68);
1387         LASSERT (sizeof (((ptl_hdr_t *)0)->msg.get.sink_length) == 4);
1388
1389         /* Reply */
1390         LASSERT (offsetof (ptl_hdr_t, msg.reply.dst_wmd) == 32);
1391         LASSERT (sizeof (((ptl_hdr_t *)0)->msg.reply.dst_wmd) == 16);
1392         LASSERT (offsetof (ptl_hdr_t, msg.reply.dst_offset) == 48);
1393         LASSERT (sizeof (((ptl_hdr_t *)0)->msg.reply.dst_offset) == 4);
1394         LASSERT (offsetof (ptl_hdr_t, msg.reply.length) == 56);
1395         LASSERT (sizeof (((ptl_hdr_t *)0)->msg.reply.length) == 4);
1396 }