Whamcloud - gitweb
b=2776
[fs/lustre-release.git] / lustre / portals / portals / lib-move.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * lib/lib-move.c
5  * Data movement routines
6  *
7  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
8  *  Copyright (c) 2001-2002 Sandia National Laboratories
9  *
10  *   This file is part of Lustre, http://www.sf.net/projects/lustre/
11  *
12  *   Lustre is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Lustre is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Lustre; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #ifndef __KERNEL__
27 # include <stdio.h>
28 #else
29 # define DEBUG_SUBSYSTEM S_PORTALS
30 # include <linux/kp30.h>
31 #endif
32 #include <portals/p30.h>
33 #include <portals/lib-p30.h>
34 #include <portals/arg-blocks.h>
35
36 /* forward ref */
37 static void lib_commit_md (nal_cb_t *nal, lib_md_t *md, lib_msg_t *msg);
38
39 static lib_md_t *
40 lib_match_md(nal_cb_t *nal, int index, int op_mask, 
41              ptl_nid_t src_nid, ptl_pid_t src_pid, 
42              ptl_size_t rlength, ptl_size_t roffset,
43              ptl_match_bits_t match_bits, lib_msg_t *msg,
44              ptl_size_t *mlength_out, ptl_size_t *offset_out)
45 {
46         lib_ni_t         *ni = &nal->ni;
47         struct list_head *match_list = &ni->tbl.tbl[index];
48         struct list_head *tmp;
49         lib_me_t         *me;
50         lib_md_t         *md;
51         ptl_size_t        mlength;
52         ptl_size_t        offset;
53         ENTRY;
54
55         CDEBUG (D_NET, "Request from "LPU64".%d of length %d into portal %d "
56                 "MB="LPX64"\n", src_nid, src_pid, rlength, index, match_bits);
57
58         if (index < 0 || index >= ni->tbl.size) {
59                 CERROR("Invalid portal %d not in [0-%d]\n",
60                        index, ni->tbl.size);
61                 goto failed;
62         }
63
64         list_for_each (tmp, match_list) {
65                 me = list_entry(tmp, lib_me_t, me_list);
66                 md = me->md;
67
68                  /* ME attached but MD not attached yet */
69                 if (md == NULL)
70                         continue;
71
72                 LASSERT (me == md->me);
73
74                 /* mismatched MD op */
75                 if ((md->options & op_mask) == 0)
76                         continue;
77
78                 /* MD exhausted */
79                 if (lib_md_exhausted(md))
80                         continue;
81
82                 /* mismatched ME nid/pid? */
83                 if (me->match_id.nid != PTL_NID_ANY &&
84                     me->match_id.nid != src_nid)
85                         continue;
86
87                 if (me->match_id.pid != PTL_PID_ANY &&
88                     me->match_id.pid != src_pid)
89                         continue;
90
91                 /* mismatched ME matchbits? */
92                 if (((me->match_bits ^ match_bits) & ~me->ignore_bits) != 0)
93                         continue;
94
95                 /* Hurrah! This _is_ a match; check it out... */
96
97                 if ((md->options & PTL_MD_MANAGE_REMOTE) == 0)
98                         offset = md->offset;
99                 else
100                         offset = roffset;
101
102                 if ((md->options & PTL_MD_MAX_SIZE) != 0) {
103                         mlength = md->max_size;
104                         LASSERT (md->offset + mlength <= md->length);
105                 } else {
106                         mlength = md->length - offset;
107                 }
108
109                 if (rlength <= mlength) {        /* fits in allowed space */
110                         mlength = rlength;
111                 } else if ((md->options & PTL_MD_TRUNCATE) == 0) {
112                         /* this packet _really_ is too big */
113                         CERROR("Matching packet %d too big: %d left, "
114                                "%d allowed\n", rlength, md->length - offset,
115                                mlength);
116                         goto failed;
117                 }
118
119                 /* Commit to this ME/MD */
120                 CDEBUG(D_NET, "Incoming %s index %x from "LPU64"/%u of "
121                        "length %d/%d into md "LPX64" [%d] + %d\n", 
122                        (op_mask == PTL_MD_OP_PUT) ? "put" : "get",
123                        index, src_nid, src_pid, mlength, rlength, 
124                        md->md_lh.lh_cookie, md->md_niov, offset);
125
126                 lib_commit_md(nal, md, msg);
127                 md->offset = offset + mlength;
128
129                 /* NB Caller sets ev.type and ev.hdr_data */
130                 msg->ev.initiator.nid = src_nid;
131                 msg->ev.initiator.pid = src_pid;
132                 msg->ev.portal = index;
133                 msg->ev.match_bits = match_bits;
134                 msg->ev.rlength = rlength;
135                 msg->ev.mlength = mlength;
136                 msg->ev.offset = offset;
137
138                 lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
139
140                 *offset_out = offset;
141                 *mlength_out = mlength;
142
143                 /* Auto-unlink NOW, so the ME gets unlinked if required.
144                  * We bumped md->pending above so the MD just gets flagged
145                  * for unlink when it is finalized. */
146                 if ((md->md_flags & PTL_MD_FLAG_AUTO_UNLINK) != 0 &&
147                     lib_md_exhausted(md))
148                         lib_md_unlink(nal, md);
149
150                 RETURN (md);
151         }
152
153  failed:
154         CERROR (LPU64": Dropping %s from "LPU64".%d portal %d match "LPX64
155                 " offset %d length %d: no match\n",
156                 ni->nid, (op_mask == PTL_MD_OP_GET) ? "GET" : "PUT",
157                 src_nid, src_pid, index, match_bits, roffset, rlength);
158         RETURN(NULL);
159 }
160
161 int do_PtlFailNid (nal_cb_t *nal, void *private, void *v_args, void *v_ret)
162 {
163         PtlFailNid_in     *args = v_args;
164         PtlFailNid_out    *ret  = v_ret;
165         lib_test_peer_t   *tp;
166         unsigned long      flags;
167         struct list_head  *el;
168         struct list_head  *next;
169         struct list_head   cull;
170         
171         if (args->threshold != 0) {
172                 /* Adding a new entry */
173                 tp = (lib_test_peer_t *)nal->cb_malloc (nal, sizeof (*tp));
174                 if (tp == NULL)
175                         return (ret->rc = PTL_FAIL);
176                 
177                 tp->tp_nid = args->nid;
178                 tp->tp_threshold = args->threshold;
179                 
180                 state_lock (nal, &flags);
181                 list_add (&tp->tp_list, &nal->ni.ni_test_peers);
182                 state_unlock (nal, &flags);
183                 return (ret->rc = PTL_OK);
184         }
185         
186         /* removing entries */
187         INIT_LIST_HEAD (&cull);
188         
189         state_lock (nal, &flags);
190
191         list_for_each_safe (el, next, &nal->ni.ni_test_peers) {
192                 tp = list_entry (el, lib_test_peer_t, tp_list);
193                 
194                 if (tp->tp_threshold == 0 ||    /* needs culling anyway */
195                     args->nid == PTL_NID_ANY || /* removing all entries */
196                     tp->tp_nid == args->nid)    /* matched this one */
197                 {
198                         list_del (&tp->tp_list);
199                         list_add (&tp->tp_list, &cull);
200                 }
201         }
202         
203         state_unlock (nal, &flags);
204                 
205         while (!list_empty (&cull)) {
206                 tp = list_entry (cull.next, lib_test_peer_t, tp_list);
207
208                 list_del (&tp->tp_list);
209                 nal->cb_free (nal, tp, sizeof (*tp));
210         }
211         return (ret->rc = PTL_OK);
212 }
213
214 static int
215 fail_peer (nal_cb_t *nal, ptl_nid_t nid, int outgoing) 
216 {
217         lib_test_peer_t  *tp;
218         struct list_head *el;
219         struct list_head *next;
220         unsigned long     flags;
221         struct list_head  cull;
222         int               fail = 0;
223
224         INIT_LIST_HEAD (&cull);
225         
226         state_lock (nal, &flags);
227
228         list_for_each_safe (el, next, &nal->ni.ni_test_peers) {
229                 tp = list_entry (el, lib_test_peer_t, tp_list);
230
231                 if (tp->tp_threshold == 0) {
232                         /* zombie entry */
233                         if (outgoing) {
234                                 /* only cull zombies on outgoing tests,
235                                  * since we may be at interrupt priority on
236                                  * incoming messages. */
237                                 list_del (&tp->tp_list);
238                                 list_add (&tp->tp_list, &cull);
239                         }
240                         continue;
241                 }
242                         
243                 if (tp->tp_nid == PTL_NID_ANY || /* fail every peer */
244                     nid == tp->tp_nid) {        /* fail this peer */
245                         fail = 1;
246                         
247                         if (tp->tp_threshold != PTL_MD_THRESH_INF) {
248                                 tp->tp_threshold--;
249                                 if (outgoing &&
250                                     tp->tp_threshold == 0) {
251                                         /* see above */
252                                         list_del (&tp->tp_list);
253                                         list_add (&tp->tp_list, &cull);
254                                 }
255                         }
256                         break;
257                 }
258         }
259         
260         state_unlock (nal, &flags);
261
262         while (!list_empty (&cull)) {
263                 tp = list_entry (cull.next, lib_test_peer_t, tp_list);
264                 list_del (&tp->tp_list);
265                 
266                 nal->cb_free (nal, tp, sizeof (*tp));
267         }
268
269         return (fail);
270 }
271
272 ptl_size_t
273 lib_iov_nob (int niov, struct iovec *iov)
274 {
275         ptl_size_t nob = 0;
276         
277         while (niov-- > 0)
278                 nob += (iov++)->iov_len;
279         
280         return (nob);
281 }
282
283 void
284 lib_copy_iov2buf (char *dest, int niov, struct iovec *iov, 
285                   ptl_size_t offset, ptl_size_t len)
286 {
287         ptl_size_t nob;
288
289         if (len == 0)
290                 return;
291         
292         /* skip complete frags before 'offset' */
293         LASSERT (niov > 0);
294         while (offset >= iov->iov_len) {
295                 offset -= iov->iov_len;
296                 iov++;
297                 niov--;
298                 LASSERT (niov > 0);
299         }
300                 
301         do {
302                 LASSERT (niov > 0);
303                 nob = MIN (iov->iov_len - offset, len);
304                 memcpy (dest, iov->iov_base + offset, nob);
305
306                 len -= nob;
307                 dest += nob;
308                 niov--;
309                 iov++;
310                 offset = 0;
311         } while (len > 0);
312 }
313
314 void
315 lib_copy_buf2iov (int niov, struct iovec *iov, ptl_size_t offset, 
316                   char *src, ptl_size_t len)
317 {
318         ptl_size_t nob;
319
320         if (len == 0)
321                 return;
322
323         /* skip complete frags before 'offset' */
324         LASSERT (niov > 0);
325         while (offset >= iov->iov_len) {
326                 offset -= iov->iov_len;
327                 iov++;
328                 niov--;
329                 LASSERT (niov > 0);
330         }
331         
332         do {
333                 LASSERT (niov > 0);
334                 nob = MIN (iov->iov_len - offset, len);
335                 memcpy (iov->iov_base + offset, src, nob);
336                 
337                 len -= nob;
338                 src += nob;
339                 niov--;
340                 iov++;
341                 offset = 0;
342         } while (len > 0);
343 }
344
345 int
346 lib_extract_iov (int dst_niov, struct iovec *dst,
347                  int src_niov, struct iovec *src,
348                  ptl_size_t offset, ptl_size_t len)
349 {
350         /* Initialise 'dst' to the subset of 'src' starting at 'offset',
351          * for exactly 'len' bytes, and return the number of entries.
352          * NB not destructive to 'src' */
353         ptl_size_t      frag_len;
354         int             niov;
355
356         if (len == 0)                           /* no data => */
357                 return (0);                     /* no frags */
358
359         LASSERT (src_niov > 0);
360         while (offset >= src->iov_len) {      /* skip initial frags */
361                 offset -= src->iov_len;
362                 src_niov--;
363                 src++;
364                 LASSERT (src_niov > 0);
365         }
366
367         niov = 1;
368         for (;;) {
369                 LASSERT (src_niov > 0);
370                 LASSERT (niov <= dst_niov);
371                 
372                 frag_len = src->iov_len - offset;
373                 dst->iov_base = ((char *)src->iov_base) + offset;
374
375                 if (len <= frag_len) {
376                         dst->iov_len = len;
377                         return (niov);
378                 }
379                 
380                 dst->iov_len = frag_len;
381
382                 len -= frag_len;
383                 dst++;
384                 src++;
385                 niov++;
386                 src_niov--;
387                 offset = 0;
388         }
389 }
390
391 #ifndef __KERNEL__
392 ptl_size_t
393 lib_kiov_nob (int niov, ptl_kiov_t *kiov) 
394 {
395         LASSERT (0);
396         return (0);
397 }
398
399 void
400 lib_copy_kiov2buf (char *dest, int niov, ptl_kiov_t *kiov, 
401                    ptl_size_t offset, ptl_size_t len)
402 {
403         LASSERT (0);
404 }
405
406 void
407 lib_copy_buf2kiov (int niov, ptl_kiov_t *kiov, ptl_size_t offset,
408                    char *src, ptl_size_t len)
409 {
410         LASSERT (0);
411 }
412
413 int
414 lib_extract_kiov (int dst_niov, ptl_kiov_t *dst, 
415                   int src_niov, ptl_kiov_t *src,
416                   ptl_size_t offset, ptl_size_t len)
417 {
418         LASSERT (0);
419 }
420
421 #else
422
423 ptl_size_t
424 lib_kiov_nob (int niov, ptl_kiov_t *kiov) 
425 {
426         ptl_size_t  nob = 0;
427
428         while (niov-- > 0)
429                 nob += (kiov++)->kiov_len;
430
431         return (nob);
432 }
433
434 void
435 lib_copy_kiov2buf (char *dest, int niov, ptl_kiov_t *kiov, 
436                    ptl_size_t offset, ptl_size_t len)
437 {
438         ptl_size_t  nob;
439         char       *addr;
440
441         if (len == 0)
442                 return;
443         
444         LASSERT (!in_interrupt ());
445
446         LASSERT (niov > 0);
447         while (offset > kiov->kiov_len) {
448                 offset -= kiov->kiov_len;
449                 kiov++;
450                 niov--;
451                 LASSERT (niov > 0);
452         }
453         
454         do{
455                 LASSERT (niov > 0);
456                 nob = MIN (kiov->kiov_len - offset, len);
457                 
458                 addr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
459                 memcpy (dest, addr, nob);
460                 kunmap (kiov->kiov_page);
461                 
462                 len -= nob;
463                 dest += nob;
464                 niov--;
465                 kiov++;
466                 offset = 0;
467         } while (len > 0);
468 }
469
470 void
471 lib_copy_buf2kiov (int niov, ptl_kiov_t *kiov, ptl_size_t offset,
472                    char *src, ptl_size_t len)
473 {
474         ptl_size_t  nob;
475         char       *addr;
476
477         if (len == 0)
478                 return;
479
480         LASSERT (!in_interrupt ());
481
482         LASSERT (niov > 0);
483         while (offset >= kiov->kiov_len) {
484                 offset -= kiov->kiov_len;
485                 kiov++;
486                 niov--;
487                 LASSERT (niov > 0);
488         }
489         
490         do {
491                 LASSERT (niov > 0);
492                 nob = MIN (kiov->kiov_len - offset, len);
493                 
494                 addr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
495                 memcpy (addr, src, nob);
496                 kunmap (kiov->kiov_page);
497                 
498                 len -= nob;
499                 src += nob;
500                 niov--;
501                 kiov++;
502                 offset = 0;
503         } while (len > 0);
504 }
505
506 int
507 lib_extract_kiov (int dst_niov, ptl_kiov_t *dst, 
508                   int src_niov, ptl_kiov_t *src,
509                   ptl_size_t offset, ptl_size_t len)
510 {
511         /* Initialise 'dst' to the subset of 'src' starting at 'offset',
512          * for exactly 'len' bytes, and return the number of entries.
513          * NB not destructive to 'src' */
514         ptl_size_t      frag_len;
515         int             niov;
516
517         if (len == 0)                           /* no data => */
518                 return (0);                     /* no frags */
519
520         LASSERT (src_niov > 0);
521         while (offset >= src->kiov_len) {      /* skip initial frags */
522                 offset -= src->kiov_len;
523                 src_niov--;
524                 src++;
525                 LASSERT (src_niov > 0);
526         }
527
528         niov = 1;
529         for (;;) {
530                 LASSERT (src_niov > 0);
531                 LASSERT (niov <= dst_niov);
532                 
533                 frag_len = src->kiov_len - offset;
534                 dst->kiov_page = src->kiov_page;
535                 dst->kiov_offset = src->kiov_offset + offset;
536
537                 if (len <= frag_len) {
538                         dst->kiov_len = len;
539                         LASSERT (dst->kiov_offset + dst->kiov_len <= PAGE_SIZE);
540                         return (niov);
541                 }
542
543                 dst->kiov_len = frag_len;
544                 LASSERT (dst->kiov_offset + dst->kiov_len <= PAGE_SIZE);
545
546                 len -= frag_len;
547                 dst++;
548                 src++;
549                 niov++;
550                 src_niov--;
551                 offset = 0;
552         }
553 }
554 #endif
555
556 ptl_err_t
557 lib_recv (nal_cb_t *nal, void *private, lib_msg_t *msg, lib_md_t *md,
558           ptl_size_t offset, ptl_size_t mlen, ptl_size_t rlen)
559 {
560         if (mlen == 0)
561                 return (nal->cb_recv(nal, private, msg,
562                                      0, NULL,
563                                      offset, mlen, rlen));
564
565         if ((md->options & PTL_MD_KIOV) == 0)
566                 return (nal->cb_recv(nal, private, msg,
567                                      md->md_niov, md->md_iov.iov, 
568                                      offset, mlen, rlen));
569
570         return (nal->cb_recv_pages(nal, private, msg, 
571                                    md->md_niov, md->md_iov.kiov,
572                                    offset, mlen, rlen));
573 }
574
575 ptl_err_t
576 lib_send (nal_cb_t *nal, void *private, lib_msg_t *msg,
577           ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
578           lib_md_t *md, ptl_size_t offset, ptl_size_t len) 
579 {
580         if (len == 0)
581                 return (nal->cb_send(nal, private, msg,
582                                      hdr, type, nid, pid,
583                                      0, NULL,
584                                      offset, len));
585         
586         if ((md->options & PTL_MD_KIOV) == 0)
587                 return (nal->cb_send(nal, private, msg, 
588                                      hdr, type, nid, pid,
589                                      md->md_niov, md->md_iov.iov,
590                                      offset, len));
591
592         return (nal->cb_send_pages(nal, private, msg, 
593                                    hdr, type, nid, pid,
594                                    md->md_niov, md->md_iov.kiov,
595                                    offset, len));
596 }
597
598 static void
599 lib_commit_md (nal_cb_t *nal, lib_md_t *md, lib_msg_t *msg)
600 {
601         /* ALWAYS called holding the state_lock */
602         lib_counters_t *counters = &nal->ni.counters;
603
604         /* Here, we commit the MD to a network OP by marking it busy and
605          * decrementing its threshold.  Come what may, the network "owns"
606          * the MD until a call to lib_finalize() signals completion. */
607         msg->md = md;
608          
609         md->pending++;
610         if (md->threshold != PTL_MD_THRESH_INF) {
611                 LASSERT (md->threshold > 0);
612                 md->threshold--;
613         }
614
615         counters->msgs_alloc++;
616         if (counters->msgs_alloc > counters->msgs_max)
617                 counters->msgs_max = counters->msgs_alloc;
618
619         list_add (&msg->msg_list, &nal->ni.ni_active_msgs);
620 }
621
622 static void
623 lib_drop_message (nal_cb_t *nal, void *private, ptl_hdr_t *hdr)
624 {
625         unsigned long flags;
626
627         /* CAVEAT EMPTOR: this only drops messages that we've not committed
628          * to receive (init_msg() not called) and therefore can't cause an
629          * event. */
630         
631         state_lock(nal, &flags);
632         nal->ni.counters.drop_count++;
633         nal->ni.counters.drop_length += hdr->payload_length;
634         state_unlock(nal, &flags);
635
636         /* NULL msg => if NAL calls lib_finalize it will be a noop */
637         (void) lib_recv(nal, private, NULL, NULL, 0, 0, hdr->payload_length);
638 }
639
640 /*
641  * Incoming messages have a ptl_msg_t object associated with them
642  * by the library.  This object encapsulates the state of the
643  * message and allows the NAL to do non-blocking receives or sends
644  * of long messages.
645  *
646  */
647 static ptl_err_t
648 parse_put(nal_cb_t *nal, ptl_hdr_t *hdr, void *private, lib_msg_t *msg)
649 {
650         lib_ni_t        *ni = &nal->ni;
651         ptl_size_t       mlength = 0;
652         ptl_size_t       offset = 0;
653         ptl_err_t        rc;
654         lib_md_t        *md;
655         unsigned long    flags;
656                 
657         /* Convert put fields to host byte order */
658         hdr->msg.put.match_bits = NTOH__u64 (hdr->msg.put.match_bits);
659         hdr->msg.put.ptl_index = NTOH__u32 (hdr->msg.put.ptl_index);
660         hdr->msg.put.offset = NTOH__u32 (hdr->msg.put.offset);
661
662         state_lock(nal, &flags);
663
664         md = lib_match_md(nal, hdr->msg.put.ptl_index, PTL_MD_OP_PUT,
665                           hdr->src_nid, hdr->src_pid,
666                           hdr->payload_length, hdr->msg.put.offset,
667                           hdr->msg.put.match_bits, msg,
668                           &mlength, &offset);
669         if (md == NULL) {
670                 state_unlock(nal, &flags);
671                 return (PTL_FAIL);
672         }
673
674         msg->ev.type = PTL_EVENT_PUT_END;
675         msg->ev.hdr_data = hdr->msg.put.hdr_data;
676
677         if (!ptl_is_wire_handle_none(&hdr->msg.put.ack_wmd) &&
678             !(md->options & PTL_MD_ACK_DISABLE)) {
679                 msg->ack_wmd = hdr->msg.put.ack_wmd;
680         }
681
682         ni->counters.recv_count++;
683         ni->counters.recv_length += mlength;
684
685         state_unlock(nal, &flags);
686
687         rc = lib_recv(nal, private, msg, md, offset, mlength,
688                       hdr->payload_length);
689         if (rc != PTL_OK)
690                 CERROR(LPU64": error on receiving PUT from "LPU64": %d\n",
691                        ni->nid, hdr->src_nid, rc);
692
693         return (rc);
694 }
695
696 static ptl_err_t
697 parse_get(nal_cb_t *nal, ptl_hdr_t *hdr, void *private, lib_msg_t *msg)
698 {
699         lib_ni_t        *ni = &nal->ni;
700         ptl_size_t       mlength = 0;
701         ptl_size_t       offset = 0;
702         lib_md_t        *md;
703         ptl_hdr_t        reply;
704         unsigned long    flags;
705         int              rc;
706
707         /* Convert get fields to host byte order */
708         hdr->msg.get.match_bits = NTOH__u64 (hdr->msg.get.match_bits);
709         hdr->msg.get.ptl_index = NTOH__u32 (hdr->msg.get.ptl_index);
710         hdr->msg.get.sink_length = NTOH__u32 (hdr->msg.get.sink_length);
711         hdr->msg.get.src_offset = NTOH__u32 (hdr->msg.get.src_offset);
712
713         state_lock(nal, &flags);
714
715         md = lib_match_md(nal, hdr->msg.get.ptl_index, PTL_MD_OP_GET,
716                           hdr->src_nid, hdr->src_pid,
717                           hdr->msg.get.sink_length, hdr->msg.get.src_offset,
718                           hdr->msg.get.match_bits, msg,
719                           &mlength, &offset);
720         if (md == NULL) {
721                 state_unlock(nal, &flags);
722                 return (PTL_FAIL);
723         }
724
725         msg->ev.type = PTL_EVENT_GET_END;
726         msg->ev.hdr_data = 0;
727
728         ni->counters.send_count++;
729         ni->counters.send_length += mlength;
730
731         state_unlock(nal, &flags);
732
733         memset (&reply, 0, sizeof (reply));
734         reply.type     = HTON__u32 (PTL_MSG_REPLY);
735         reply.dest_nid = HTON__u64 (hdr->src_nid);
736         reply.src_nid  = HTON__u64 (ni->nid);
737         reply.dest_pid = HTON__u32 (hdr->src_pid);
738         reply.src_pid  = HTON__u32 (ni->pid);
739         reply.payload_length = HTON__u32 (mlength);
740
741         reply.msg.reply.dst_wmd = hdr->msg.get.return_wmd;
742
743         /* NB call lib_send() _BEFORE_ lib_recv() completes the incoming
744          * message.  Some NALs _require_ this to implement optimized GET */
745
746         rc = lib_send (nal, private, msg, &reply, PTL_MSG_REPLY, 
747                        hdr->src_nid, hdr->src_pid, md, offset, mlength);
748         if (rc != PTL_OK)
749                 CERROR(LPU64": Unable to send REPLY for GET from "LPU64": %d\n",
750                        ni->nid, hdr->src_nid, rc);
751
752         /* Discard any junk after the hdr */
753         (void) lib_recv(nal, private, NULL, NULL, 0, 0, hdr->payload_length);
754
755         return (rc);
756 }
757
758 static ptl_err_t
759 parse_reply(nal_cb_t *nal, ptl_hdr_t *hdr, void *private, lib_msg_t *msg)
760 {
761         lib_ni_t        *ni = &nal->ni;
762         lib_md_t        *md;
763         int              rlength;
764         int              length;
765         unsigned long    flags;
766         ptl_err_t        rc;
767
768         state_lock(nal, &flags);
769
770         /* NB handles only looked up by creator (no flips) */
771         md = ptl_wire_handle2md(&hdr->msg.reply.dst_wmd, nal);
772         if (md == NULL || md->threshold == 0) {
773                 CERROR (LPU64": Dropping REPLY from "LPU64" for %s MD "LPX64"."LPX64"\n",
774                         ni->nid, hdr->src_nid,
775                         md == NULL ? "invalid" : "inactive",
776                         hdr->msg.reply.dst_wmd.wh_interface_cookie,
777                         hdr->msg.reply.dst_wmd.wh_object_cookie);
778
779                 state_unlock(nal, &flags);
780                 return (PTL_FAIL);
781         }
782
783         LASSERT (md->offset == 0);
784
785         length = rlength = hdr->payload_length;
786
787         if (length > md->length) {
788                 if ((md->options & PTL_MD_TRUNCATE) == 0) {
789                         CERROR (LPU64": Dropping REPLY from "LPU64
790                                 " length %d for MD "LPX64" would overflow (%d)\n",
791                                 ni->nid, hdr->src_nid, length,
792                                 hdr->msg.reply.dst_wmd.wh_object_cookie,
793                                 md->length);
794                         state_unlock(nal, &flags);
795                         return (PTL_FAIL);
796                 }
797                 length = md->length;
798         }
799
800         CDEBUG(D_NET, "Reply from "LPU64" of length %d/%d into md "LPX64"\n",
801                hdr->src_nid, length, rlength, 
802                hdr->msg.reply.dst_wmd.wh_object_cookie);
803
804         lib_commit_md(nal, md, msg);
805
806         msg->ev.type = PTL_EVENT_REPLY_END;
807         msg->ev.initiator.nid = hdr->src_nid;
808         msg->ev.initiator.pid = hdr->src_pid;
809         msg->ev.rlength = rlength;
810         msg->ev.mlength = length;
811         msg->ev.offset = 0;
812
813         lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
814
815         ni->counters.recv_count++;
816         ni->counters.recv_length += length;
817
818         state_unlock(nal, &flags);
819
820         rc = lib_recv(nal, private, msg, md, 0, length, rlength);
821         if (rc != PTL_OK)
822                 CERROR(LPU64": error on receiving REPLY from "LPU64": %d\n",
823                        ni->nid, hdr->src_nid, rc);
824
825         return (rc);
826 }
827
828 static ptl_err_t
829 parse_ack(nal_cb_t *nal, ptl_hdr_t *hdr, void *private, lib_msg_t *msg)
830 {
831         lib_ni_t      *ni = &nal->ni;
832         lib_md_t      *md;
833         unsigned long  flags;
834
835         /* Convert ack fields to host byte order */
836         hdr->msg.ack.match_bits = NTOH__u64 (hdr->msg.ack.match_bits);
837         hdr->msg.ack.mlength = NTOH__u32 (hdr->msg.ack.mlength);
838
839         state_lock(nal, &flags);
840
841         /* NB handles only looked up by creator (no flips) */
842         md = ptl_wire_handle2md(&hdr->msg.ack.dst_wmd, nal);
843         if (md == NULL || md->threshold == 0) {
844                 CDEBUG(D_INFO, LPU64": Dropping ACK from "LPU64" to %s MD "
845                        LPX64"."LPX64"\n", ni->nid, hdr->src_nid, 
846                        (md == NULL) ? "invalid" : "inactive",
847                        hdr->msg.ack.dst_wmd.wh_interface_cookie,
848                        hdr->msg.ack.dst_wmd.wh_object_cookie);
849
850                 state_unlock(nal, &flags);
851                 return (PTL_FAIL);
852         }
853
854         CDEBUG(D_NET, LPU64": ACK from "LPU64" into md "LPX64"\n",
855                ni->nid, hdr->src_nid, 
856                hdr->msg.ack.dst_wmd.wh_object_cookie);
857
858         lib_commit_md(nal, md, msg);
859
860         msg->ev.type = PTL_EVENT_ACK;
861         msg->ev.initiator.nid = hdr->src_nid;
862         msg->ev.initiator.pid = hdr->src_pid;
863         msg->ev.mlength = hdr->msg.ack.mlength;
864         msg->ev.match_bits = hdr->msg.ack.match_bits;
865
866         lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
867
868         ni->counters.recv_count++;
869
870         state_unlock(nal, &flags);
871         
872         /* We have received and matched up the ack OK, create the
873          * completion event now... */
874         lib_finalize(nal, private, msg, PTL_OK);
875
876         /* ...and now discard any junk after the hdr */
877         (void) lib_recv(nal, private, NULL, NULL, 0, 0, hdr->payload_length);
878  
879        return (PTL_OK);
880 }
881
882 static char *
883 hdr_type_string (ptl_hdr_t *hdr)
884 {
885         switch (hdr->type) {
886         case PTL_MSG_ACK:
887                 return ("ACK");
888         case PTL_MSG_PUT:
889                 return ("PUT");
890         case PTL_MSG_GET:
891                 return ("GET");
892         case PTL_MSG_REPLY:
893                 return ("REPLY");
894         case PTL_MSG_HELLO:
895                 return ("HELLO");
896         default:
897                 return ("<UNKNOWN>");
898         }
899 }
900
901 void print_hdr(nal_cb_t * nal, ptl_hdr_t * hdr)
902 {
903         char *type_str = hdr_type_string (hdr);
904
905         nal->cb_printf(nal, "P3 Header at %p of type %s\n", hdr, type_str);
906         nal->cb_printf(nal, "    From nid/pid %Lu/%Lu", hdr->src_nid,
907                        hdr->src_pid);
908         nal->cb_printf(nal, "    To nid/pid %Lu/%Lu\n", hdr->dest_nid,
909                        hdr->dest_pid);
910
911         switch (hdr->type) {
912         default:
913                 break;
914
915         case PTL_MSG_PUT:
916                 nal->cb_printf(nal,
917                                "    Ptl index %d, ack md "LPX64"."LPX64", "
918                                "match bits "LPX64"\n",
919                                hdr->msg.put.ptl_index,
920                                hdr->msg.put.ack_wmd.wh_interface_cookie,
921                                hdr->msg.put.ack_wmd.wh_object_cookie,
922                                hdr->msg.put.match_bits);
923                 nal->cb_printf(nal,
924                                "    Length %d, offset %d, hdr data "LPX64"\n",
925                                hdr->payload_length, hdr->msg.put.offset,
926                                hdr->msg.put.hdr_data);
927                 break;
928
929         case PTL_MSG_GET:
930                 nal->cb_printf(nal,
931                                "    Ptl index %d, return md "LPX64"."LPX64", "
932                                "match bits "LPX64"\n", hdr->msg.get.ptl_index,
933                                hdr->msg.get.return_wmd.wh_interface_cookie,
934                                hdr->msg.get.return_wmd.wh_object_cookie,
935                                hdr->msg.get.match_bits);
936                 nal->cb_printf(nal,
937                                "    Length %d, src offset %d\n",
938                                hdr->msg.get.sink_length,
939                                hdr->msg.get.src_offset);
940                 break;
941
942         case PTL_MSG_ACK:
943                 nal->cb_printf(nal, "    dst md "LPX64"."LPX64", "
944                                "manipulated length %d\n",
945                                hdr->msg.ack.dst_wmd.wh_interface_cookie,
946                                hdr->msg.ack.dst_wmd.wh_object_cookie,
947                                hdr->msg.ack.mlength);
948                 break;
949
950         case PTL_MSG_REPLY:
951                 nal->cb_printf(nal, "    dst md "LPX64"."LPX64", "
952                                "length %d\n",
953                                hdr->msg.reply.dst_wmd.wh_interface_cookie,
954                                hdr->msg.reply.dst_wmd.wh_object_cookie,
955                                hdr->payload_length);
956         }
957
958 }                               /* end of print_hdr() */
959
960
961 void 
962 lib_parse(nal_cb_t *nal, ptl_hdr_t *hdr, void *private)
963 {
964         unsigned long  flags;
965         ptl_err_t      rc;
966         lib_msg_t     *msg;
967         
968         /* convert common fields to host byte order */
969         hdr->dest_nid = NTOH__u64 (hdr->dest_nid);
970         hdr->src_nid = NTOH__u64 (hdr->src_nid);
971         hdr->dest_pid = NTOH__u32 (hdr->dest_pid);
972         hdr->src_pid = NTOH__u32 (hdr->src_pid);
973         hdr->type = NTOH__u32 (hdr->type);
974         hdr->payload_length = NTOH__u32(hdr->payload_length);
975 #if 0
976         nal->cb_printf(nal, "%d: lib_parse: nal=%p hdr=%p type=%d\n",
977                        nal->ni.nid, nal, hdr, hdr->type);
978         print_hdr(nal, hdr);
979 #endif
980         if (hdr->type == PTL_MSG_HELLO) {
981                 /* dest_nid is really ptl_magicversion_t */
982                 ptl_magicversion_t *mv = (ptl_magicversion_t *)&hdr->dest_nid;
983
984                 CERROR (LPU64": Dropping unexpected HELLO message: "
985                         "magic %d, version %d.%d from "LPD64"\n",
986                         nal->ni.nid, mv->magic, 
987                         mv->version_major, mv->version_minor,
988                         hdr->src_nid);
989                 lib_drop_message(nal, private, hdr);
990                 return;
991         }
992         
993         if (hdr->dest_nid != nal->ni.nid) {
994                 CERROR(LPU64": Dropping %s message from "LPU64" to "LPU64
995                        " (not me)\n", nal->ni.nid, hdr_type_string (hdr),
996                        hdr->src_nid, hdr->dest_nid);
997                 lib_drop_message(nal, private, hdr);
998                 return;
999         }
1000
1001         if (!list_empty (&nal->ni.ni_test_peers) && /* normally we don't */
1002             fail_peer (nal, hdr->src_nid, 0))      /* shall we now? */
1003         {
1004                 CERROR(LPU64": Dropping incoming %s from "LPU64
1005                        ": simulated failure\n",
1006                        nal->ni.nid, hdr_type_string (hdr), 
1007                        hdr->src_nid);
1008                 lib_drop_message(nal, private, hdr);
1009                 return;
1010         }
1011
1012         msg = lib_msg_alloc(nal);
1013         if (msg == NULL) {
1014                 CERROR(LPU64": Dropping incoming %s from "LPU64
1015                        ": can't allocate a lib_msg_t\n",
1016                        nal->ni.nid, hdr_type_string (hdr), 
1017                        hdr->src_nid);
1018                 lib_drop_message(nal, private, hdr);
1019                 return;
1020         }
1021
1022         switch (hdr->type) {
1023         case PTL_MSG_ACK:
1024                 rc = parse_ack(nal, hdr, private, msg);
1025                 break;
1026         case PTL_MSG_PUT:
1027                 rc = parse_put(nal, hdr, private, msg);
1028                 break;
1029         case PTL_MSG_GET:
1030                 rc = parse_get(nal, hdr, private, msg);
1031                 break;
1032         case PTL_MSG_REPLY:
1033                 rc = parse_reply(nal, hdr, private, msg);
1034                 break;
1035         default:
1036                 CERROR(LPU64": Dropping <unknown> message from "LPU64
1037                        ": Bad type=0x%x\n",  nal->ni.nid, hdr->src_nid,
1038                        hdr->type);
1039                 rc = PTL_FAIL;
1040                 break;
1041         }
1042                 
1043         if (rc != PTL_OK) {
1044                 if (msg->md != NULL) {
1045                         /* committed... */
1046                         lib_finalize(nal, private, msg, rc);
1047                 } else {
1048                         state_lock(nal, &flags);
1049                         lib_msg_free(nal, msg); /* expects state_lock held */
1050                         state_unlock(nal, &flags);
1051
1052                         lib_drop_message(nal, private, hdr);
1053                 }
1054         }
1055 }
1056
1057 int 
1058 do_PtlPut(nal_cb_t *nal, void *private, void *v_args, void *v_ret)
1059 {
1060         /*
1061          * Incoming:
1062          *      ptl_handle_md_t md_in
1063          *      ptl_ack_req_t ack_req_in
1064          *      ptl_process_id_t target_in
1065          *      ptl_pt_index_t portal_in
1066          *      ptl_ac_index_t cookie_in
1067          *      ptl_match_bits_t match_bits_in
1068          *      ptl_size_t offset_in
1069          *
1070          * Outgoing:
1071          */
1072
1073         PtlPut_in        *args = v_args;
1074         ptl_process_id_t *id = &args->target_in;
1075         PtlPut_out       *ret = v_ret;
1076         lib_ni_t         *ni = &nal->ni;
1077         lib_msg_t        *msg;
1078         ptl_hdr_t         hdr;
1079         lib_md_t         *md;
1080         unsigned long     flags;
1081         int               rc;
1082         
1083         if (!list_empty (&nal->ni.ni_test_peers) && /* normally we don't */
1084             fail_peer (nal, id->nid, 1))           /* shall we now? */
1085         {
1086                 CERROR(LPU64": Dropping PUT to "LPU64": simulated failure\n",
1087                        nal->ni.nid, id->nid);
1088                 return (ret->rc = PTL_PROCESS_INVALID);
1089         }
1090
1091         msg = lib_msg_alloc(nal);
1092         if (msg == NULL) {
1093                 CERROR(LPU64": Dropping PUT to "LPU64": ENOMEM on lib_msg_t\n",
1094                        ni->nid, id->nid);
1095                 return (ret->rc = PTL_NO_SPACE);
1096         }
1097
1098         state_lock(nal, &flags);
1099
1100         md = ptl_handle2md(&args->md_in, nal);
1101         if (md == NULL || md->threshold == 0) {
1102                 lib_msg_free(nal, msg);
1103                 state_unlock(nal, &flags);
1104         
1105                 return (ret->rc = PTL_MD_INVALID);
1106         }
1107
1108         CDEBUG(D_NET, "PtlPut -> %Lu: %lu\n", (unsigned long long)id->nid,
1109                (unsigned long)id->pid);
1110
1111         memset (&hdr, 0, sizeof (hdr));
1112         hdr.type     = HTON__u32 (PTL_MSG_PUT);
1113         hdr.dest_nid = HTON__u64 (id->nid);
1114         hdr.src_nid  = HTON__u64 (ni->nid);
1115         hdr.dest_pid = HTON__u32 (id->pid);
1116         hdr.src_pid  = HTON__u32 (ni->pid);
1117         hdr.payload_length = HTON__u32 (md->length);
1118
1119         /* NB handles only looked up by creator (no flips) */
1120         if (args->ack_req_in == PTL_ACK_REQ) {
1121                 hdr.msg.put.ack_wmd.wh_interface_cookie = ni->ni_interface_cookie;
1122                 hdr.msg.put.ack_wmd.wh_object_cookie = md->md_lh.lh_cookie;
1123         } else {
1124                 hdr.msg.put.ack_wmd = PTL_WIRE_HANDLE_NONE;
1125         }
1126
1127         hdr.msg.put.match_bits = HTON__u64 (args->match_bits_in);
1128         hdr.msg.put.ptl_index = HTON__u32 (args->portal_in);
1129         hdr.msg.put.offset = HTON__u32 (args->offset_in);
1130         hdr.msg.put.hdr_data = args->hdr_data_in;
1131
1132         lib_commit_md(nal, md, msg);
1133         
1134         msg->ev.type = PTL_EVENT_SEND_END;
1135         msg->ev.initiator.nid = ni->nid;
1136         msg->ev.initiator.pid = ni->pid;
1137         msg->ev.portal = args->portal_in;
1138         msg->ev.match_bits = args->match_bits_in;
1139         msg->ev.rlength = md->length;
1140         msg->ev.mlength = md->length;
1141         msg->ev.offset = args->offset_in;
1142         msg->ev.hdr_data = args->hdr_data_in;
1143
1144         lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
1145
1146         ni->counters.send_count++;
1147         ni->counters.send_length += md->length;
1148
1149         state_unlock(nal, &flags);
1150         
1151         rc = lib_send (nal, private, msg, &hdr, PTL_MSG_PUT,
1152                        id->nid, id->pid, md, 0, md->length);
1153         if (rc != PTL_OK) {
1154                 CERROR(LPU64": error sending PUT to "LPU64": %d\n",
1155                        ni->nid, id->nid, rc);
1156                 lib_finalize (nal, private, msg, rc);
1157         }
1158         
1159         /* completion will be signalled by an event */
1160         return ret->rc = PTL_OK;
1161 }
1162
1163 lib_msg_t * 
1164 lib_create_reply_msg (nal_cb_t *nal, ptl_nid_t peer_nid, lib_msg_t *getmsg)
1165 {
1166         /* The NAL can DMA direct to the GET md (i.e. no REPLY msg).  This
1167          * returns a msg for the NAL to pass to lib_finalize() when the sink
1168          * data has been received.
1169          *
1170          * CAVEAT EMPTOR: 'getmsg' is the original GET, which is freed when
1171          * lib_finalize() is called on it, so the NAL must call this first */
1172
1173         lib_ni_t        *ni = &nal->ni;
1174         lib_msg_t       *msg = lib_msg_alloc(nal);
1175         lib_md_t        *getmd = getmsg->md;
1176         unsigned long    flags;
1177
1178         state_lock(nal, &flags);
1179
1180         LASSERT (getmd->pending > 0);
1181
1182         if (msg == NULL) {
1183                 CERROR ("Dropping REPLY from "LPU64": can't allocate msg\n",
1184                         peer_nid);
1185                 goto drop;
1186         }
1187
1188         if (getmd->threshold == 0) {
1189                 CERROR ("Dropping REPLY from "LPU64" for inactive MD %p\n",
1190                         peer_nid, getmd);
1191                 goto drop_msg;
1192         }
1193
1194         LASSERT (getmd->offset == 0);
1195
1196         CDEBUG(D_NET, "Reply from "LPU64" md %p\n", peer_nid, getmd);
1197
1198         lib_commit_md (nal, getmd, msg);
1199
1200         msg->ev.type = PTL_EVENT_REPLY_END;
1201         msg->ev.initiator.nid = peer_nid;
1202         msg->ev.initiator.pid = 0;      /* XXX FIXME!!! */
1203         msg->ev.rlength = msg->ev.mlength = getmd->length;
1204         msg->ev.offset = 0;
1205
1206         lib_md_deconstruct(nal, getmd, &msg->ev.mem_desc);
1207
1208         ni->counters.recv_count++;
1209         ni->counters.recv_length += getmd->length;
1210
1211         state_unlock(nal, &flags);
1212
1213         return msg;
1214
1215  drop_msg:
1216         lib_msg_free(nal, msg);
1217  drop:
1218         nal->ni.counters.drop_count++;
1219         nal->ni.counters.drop_length += getmd->length;
1220
1221         state_unlock (nal, &flags);
1222
1223         return NULL;
1224 }
1225
1226 int 
1227 do_PtlGet(nal_cb_t *nal, void *private, void *v_args, void *v_ret)
1228 {
1229         /*
1230          * Incoming:
1231          *      ptl_handle_md_t md_in
1232          *      ptl_process_id_t target_in
1233          *      ptl_pt_index_t portal_in
1234          *      ptl_ac_index_t cookie_in
1235          *      ptl_match_bits_t match_bits_in
1236          *      ptl_size_t offset_in
1237          *
1238          * Outgoing:
1239          */
1240
1241         PtlGet_in        *args = v_args;
1242         ptl_process_id_t *id = &args->target_in;
1243         PtlGet_out       *ret = v_ret;
1244         lib_ni_t         *ni = &nal->ni;
1245         lib_msg_t        *msg;
1246         ptl_hdr_t         hdr;
1247         lib_md_t         *md;
1248         unsigned long     flags;
1249         int               rc;
1250         
1251         if (!list_empty (&nal->ni.ni_test_peers) && /* normally we don't */
1252             fail_peer (nal, id->nid, 1))           /* shall we now? */
1253         {
1254                 CERROR(LPU64": Dropping PUT to "LPU64": simulated failure\n",
1255                        nal->ni.nid, id->nid);
1256                 return (ret->rc = PTL_PROCESS_INVALID);
1257         }
1258
1259         msg = lib_msg_alloc(nal);
1260         if (msg == NULL) {
1261                 CERROR(LPU64": Dropping GET to "LPU64": ENOMEM on lib_msg_t\n",
1262                        ni->nid, id->nid);
1263                 return (ret->rc = PTL_NO_SPACE);
1264         }
1265
1266         state_lock(nal, &flags);
1267
1268         md = ptl_handle2md(&args->md_in, nal);
1269         if (md == NULL || !md->threshold) {
1270                 lib_msg_free(nal, msg);
1271                 state_unlock(nal, &flags);
1272
1273                 return ret->rc = PTL_MD_INVALID;
1274         }
1275
1276         CDEBUG(D_NET, "PtlGet -> %Lu: %lu\n", (unsigned long long)id->nid,
1277                (unsigned long)id->pid);
1278
1279         memset (&hdr, 0, sizeof (hdr));
1280         hdr.type     = HTON__u32 (PTL_MSG_GET);
1281         hdr.dest_nid = HTON__u64 (id->nid);
1282         hdr.src_nid  = HTON__u64 (ni->nid);
1283         hdr.dest_pid = HTON__u32 (id->pid);
1284         hdr.src_pid  = HTON__u32 (ni->pid);
1285         hdr.payload_length = 0;
1286
1287         /* NB handles only looked up by creator (no flips) */
1288         hdr.msg.get.return_wmd.wh_interface_cookie = ni->ni_interface_cookie;
1289         hdr.msg.get.return_wmd.wh_object_cookie = md->md_lh.lh_cookie;
1290
1291         hdr.msg.get.match_bits = HTON__u64 (args->match_bits_in);
1292         hdr.msg.get.ptl_index = HTON__u32 (args->portal_in);
1293         hdr.msg.get.src_offset = HTON__u32 (args->offset_in);
1294         hdr.msg.get.sink_length = HTON__u32 (md->length);
1295
1296         lib_commit_md(nal, md, msg);
1297
1298         msg->ev.type = PTL_EVENT_SEND_END;
1299         msg->ev.initiator.nid = ni->nid;
1300         msg->ev.initiator.pid = ni->pid;
1301         msg->ev.portal = args->portal_in;
1302         msg->ev.match_bits = args->match_bits_in;
1303         msg->ev.rlength = md->length;
1304         msg->ev.mlength = md->length;
1305         msg->ev.offset = args->offset_in;
1306         msg->ev.hdr_data = 0;
1307
1308         lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
1309
1310         ni->counters.send_count++;
1311
1312         state_unlock(nal, &flags);
1313
1314         rc = lib_send (nal, private, msg, &hdr, PTL_MSG_GET,
1315                        id->nid, id->pid, NULL, 0, 0);
1316         if (rc != PTL_OK) {
1317                 CERROR(LPU64": error sending GET to "LPU64": %d\n",
1318                        ni->nid, id->nid, rc);
1319                 lib_finalize (nal, private, msg, rc);
1320         }
1321         
1322         /* completion will be signalled by an event */
1323         return ret->rc = PTL_OK;
1324 }
1325
1326 void lib_assert_wire_constants (void)
1327 {
1328         /* Wire protocol assertions generated by 'wirecheck'
1329          * running on Linux robert.bartonsoftware.com 2.4.20-18.9 #1 Thu May 29 06:54:41 EDT 2003 i68
1330          * with gcc version 3.2.2 20030222 (Red Hat Linux 3.2.2-5) */
1331
1332
1333         /* Constants... */
1334         LASSERT (PORTALS_PROTO_MAGIC == 0xeebc0ded);
1335         LASSERT (PORTALS_PROTO_VERSION_MAJOR == 0);
1336         LASSERT (PORTALS_PROTO_VERSION_MINOR == 3);
1337         LASSERT (PTL_MSG_ACK == 0);
1338         LASSERT (PTL_MSG_PUT == 1);
1339         LASSERT (PTL_MSG_GET == 2);
1340         LASSERT (PTL_MSG_REPLY == 3);
1341         LASSERT (PTL_MSG_HELLO == 4);
1342
1343         /* Checks for struct ptl_handle_wire_t */
1344         LASSERT ((int)sizeof(ptl_handle_wire_t) == 16);
1345         LASSERT (offsetof(ptl_handle_wire_t, wh_interface_cookie) == 0);
1346         LASSERT ((int)sizeof(((ptl_handle_wire_t *)0)->wh_interface_cookie) == 8);
1347         LASSERT (offsetof(ptl_handle_wire_t, wh_object_cookie) == 8);
1348         LASSERT ((int)sizeof(((ptl_handle_wire_t *)0)->wh_object_cookie) == 8);
1349
1350         /* Checks for struct ptl_magicversion_t */
1351         LASSERT ((int)sizeof(ptl_magicversion_t) == 8);
1352         LASSERT (offsetof(ptl_magicversion_t, magic) == 0);
1353         LASSERT ((int)sizeof(((ptl_magicversion_t *)0)->magic) == 4);
1354         LASSERT (offsetof(ptl_magicversion_t, version_major) == 4);
1355         LASSERT ((int)sizeof(((ptl_magicversion_t *)0)->version_major) == 2);
1356         LASSERT (offsetof(ptl_magicversion_t, version_minor) == 6);
1357         LASSERT ((int)sizeof(((ptl_magicversion_t *)0)->version_minor) == 2);
1358
1359         /* Checks for struct ptl_hdr_t */
1360         LASSERT ((int)sizeof(ptl_hdr_t) == 72);
1361         LASSERT (offsetof(ptl_hdr_t, dest_nid) == 0);
1362         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->dest_nid) == 8);
1363         LASSERT (offsetof(ptl_hdr_t, src_nid) == 8);
1364         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->src_nid) == 8);
1365         LASSERT (offsetof(ptl_hdr_t, dest_pid) == 16);
1366         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->dest_pid) == 4);
1367         LASSERT (offsetof(ptl_hdr_t, src_pid) == 20);
1368         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->src_pid) == 4);
1369         LASSERT (offsetof(ptl_hdr_t, type) == 24);
1370         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->type) == 4);
1371         LASSERT (offsetof(ptl_hdr_t, payload_length) == 28);
1372         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->payload_length) == 4);
1373         LASSERT (offsetof(ptl_hdr_t, msg) == 32);
1374         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg) == 40);
1375
1376         /* Ack */
1377         LASSERT (offsetof(ptl_hdr_t, msg.ack.dst_wmd) == 32);
1378         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.ack.dst_wmd) == 16);
1379         LASSERT (offsetof(ptl_hdr_t, msg.ack.match_bits) == 48);
1380         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.ack.match_bits) == 8);
1381         LASSERT (offsetof(ptl_hdr_t, msg.ack.mlength) == 56);
1382         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.ack.mlength) == 4);
1383
1384         /* Put */
1385         LASSERT (offsetof(ptl_hdr_t, msg.put.ack_wmd) == 32);
1386         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.put.ack_wmd) == 16);
1387         LASSERT (offsetof(ptl_hdr_t, msg.put.match_bits) == 48);
1388         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.put.match_bits) == 8);
1389         LASSERT (offsetof(ptl_hdr_t, msg.put.hdr_data) == 56);
1390         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.put.hdr_data) == 8);
1391         LASSERT (offsetof(ptl_hdr_t, msg.put.ptl_index) == 64);
1392         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.put.ptl_index) == 4);
1393         LASSERT (offsetof(ptl_hdr_t, msg.put.offset) == 68);
1394         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.put.offset) == 4);
1395
1396         /* Get */
1397         LASSERT (offsetof(ptl_hdr_t, msg.get.return_wmd) == 32);
1398         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.get.return_wmd) == 16);
1399         LASSERT (offsetof(ptl_hdr_t, msg.get.match_bits) == 48);
1400         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.get.match_bits) == 8);
1401         LASSERT (offsetof(ptl_hdr_t, msg.get.ptl_index) == 56);
1402         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.get.ptl_index) == 4);
1403         LASSERT (offsetof(ptl_hdr_t, msg.get.src_offset) == 60);
1404         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.get.src_offset) == 4);
1405         LASSERT (offsetof(ptl_hdr_t, msg.get.sink_length) == 64);
1406         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.get.sink_length) == 4);
1407
1408         /* Reply */
1409         LASSERT (offsetof(ptl_hdr_t, msg.reply.dst_wmd) == 32);
1410         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.reply.dst_wmd) == 16);
1411
1412         /* Hello */
1413         LASSERT (offsetof(ptl_hdr_t, msg.hello.incarnation) == 32);
1414         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.hello.incarnation) == 8);
1415         LASSERT (offsetof(ptl_hdr_t, msg.hello.type) == 40);
1416         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.hello.type) == 4);
1417 }