Whamcloud - gitweb
- merge 2 weeks of b1_4 fixes onto HEAD
[fs/lustre-release.git] / lustre / portals / portals / lib-move.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * lib/lib-move.c
5  * Data movement routines
6  *
7  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
8  *  Copyright (c) 2001-2002 Sandia National Laboratories
9  *
10  *   This file is part of Lustre, http://www.sf.net/projects/lustre/
11  *
12  *   Lustre is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Lustre is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Lustre; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #ifndef __KERNEL__
27 # include <stdio.h>
28 #else
29 # define DEBUG_SUBSYSTEM S_PORTALS
30 # include <linux/kp30.h>
31 #endif
32 #include <portals/p30.h>
33 #include <portals/lib-p30.h>
34
35 /* forward ref */
36 static void lib_commit_md (lib_nal_t *nal, lib_md_t *md, lib_msg_t *msg);
37
38 static lib_md_t *
39 lib_match_md(lib_nal_t *nal, int index, int op_mask, 
40              ptl_nid_t src_nid, ptl_pid_t src_pid, 
41              ptl_size_t rlength, ptl_size_t roffset,
42              ptl_match_bits_t match_bits, lib_msg_t *msg,
43              ptl_size_t *mlength_out, ptl_size_t *offset_out)
44 {
45         lib_ni_t         *ni = &nal->libnal_ni;
46         struct list_head *match_list = &ni->ni_portals.tbl[index];
47         struct list_head *tmp;
48         lib_me_t         *me;
49         lib_md_t         *md;
50         ptl_size_t        mlength;
51         ptl_size_t        offset;
52         ENTRY;
53
54         CDEBUG (D_NET, "Request from "LPU64".%d of length %d into portal %d "
55                 "MB="LPX64"\n", src_nid, src_pid, rlength, index, match_bits);
56
57         if (index < 0 || index >= ni->ni_portals.size) {
58                 CERROR("Invalid portal %d not in [0-%d]\n",
59                        index, ni->ni_portals.size);
60                 goto failed;
61         }
62
63         list_for_each (tmp, match_list) {
64                 me = list_entry(tmp, lib_me_t, me_list);
65                 md = me->md;
66
67                  /* ME attached but MD not attached yet */
68                 if (md == NULL)
69                         continue;
70
71                 LASSERT (me == md->me);
72
73                 /* mismatched MD op */
74                 if ((md->options & op_mask) == 0)
75                         continue;
76
77                 /* MD exhausted */
78                 if (lib_md_exhausted(md))
79                         continue;
80
81                 /* mismatched ME nid/pid? */
82                 if (me->match_id.nid != PTL_NID_ANY &&
83                     me->match_id.nid != src_nid)
84                         continue;
85
86                 if (me->match_id.pid != PTL_PID_ANY &&
87                     me->match_id.pid != src_pid)
88                         continue;
89
90                 /* mismatched ME matchbits? */
91                 if (((me->match_bits ^ match_bits) & ~me->ignore_bits) != 0)
92                         continue;
93
94                 /* Hurrah! This _is_ a match; check it out... */
95
96                 if ((md->options & PTL_MD_MANAGE_REMOTE) == 0)
97                         offset = md->offset;
98                 else
99                         offset = roffset;
100
101                 if ((md->options & PTL_MD_MAX_SIZE) != 0) {
102                         mlength = md->max_size;
103                         LASSERT (md->offset + mlength <= md->length);
104                 } else {
105                         mlength = md->length - offset;
106                 }
107
108                 if (rlength <= mlength) {        /* fits in allowed space */
109                         mlength = rlength;
110                 } else if ((md->options & PTL_MD_TRUNCATE) == 0) {
111                         /* this packet _really_ is too big */
112                         CERROR("Matching packet %d too big: %d left, "
113                                "%d allowed\n", rlength, md->length - offset,
114                                mlength);
115                         goto failed;
116                 }
117
118                 /* Commit to this ME/MD */
119                 CDEBUG(D_NET, "Incoming %s index %x from "LPU64"/%u of "
120                        "length %d/%d into md "LPX64" [%d] + %d\n", 
121                        (op_mask == PTL_MD_OP_PUT) ? "put" : "get",
122                        index, src_nid, src_pid, mlength, rlength, 
123                        md->md_lh.lh_cookie, md->md_niov, offset);
124
125                 lib_commit_md(nal, md, msg);
126                 md->offset = offset + mlength;
127
128                 /* NB Caller sets ev.type and ev.hdr_data */
129                 msg->ev.initiator.nid = src_nid;
130                 msg->ev.initiator.pid = src_pid;
131                 msg->ev.portal = index;
132                 msg->ev.match_bits = match_bits;
133                 msg->ev.rlength = rlength;
134                 msg->ev.mlength = mlength;
135                 msg->ev.offset = offset;
136
137                 lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
138
139                 *offset_out = offset;
140                 *mlength_out = mlength;
141
142                 /* Auto-unlink NOW, so the ME gets unlinked if required.
143                  * We bumped md->pending above so the MD just gets flagged
144                  * for unlink when it is finalized. */
145                 if ((md->md_flags & PTL_MD_FLAG_AUTO_UNLINK) != 0 &&
146                     lib_md_exhausted(md))
147                         lib_md_unlink(nal, md);
148
149                 RETURN (md);
150         }
151
152  failed:
153         CERROR (LPU64": Dropping %s from "LPU64".%d portal %d match "LPX64
154                 " offset %d length %d: no match\n",
155                 ni->ni_pid.nid, (op_mask == PTL_MD_OP_GET) ? "GET" : "PUT",
156                 src_nid, src_pid, index, match_bits, roffset, rlength);
157         RETURN(NULL);
158 }
159
160 int lib_api_fail_nid (nal_t *apinal, ptl_nid_t nid, unsigned int threshold)
161 {
162         lib_nal_t         *nal = apinal->nal_data;
163         lib_test_peer_t   *tp;
164         unsigned long      flags;
165         struct list_head  *el;
166         struct list_head  *next;
167         struct list_head   cull;
168         
169         if (threshold != 0) {
170                 /* Adding a new entry */
171                 PORTAL_ALLOC(tp, sizeof(*tp));
172                 if (tp == NULL)
173                         return PTL_NO_SPACE;
174                 
175                 tp->tp_nid = nid;
176                 tp->tp_threshold = threshold;
177                 
178                 LIB_LOCK(nal, flags);
179                 list_add_tail (&tp->tp_list, &nal->libnal_ni.ni_test_peers);
180                 LIB_UNLOCK(nal, flags);
181                 return PTL_OK;
182         }
183         
184         /* removing entries */
185         INIT_LIST_HEAD (&cull);
186         
187         LIB_LOCK(nal, flags);
188
189         list_for_each_safe (el, next, &nal->libnal_ni.ni_test_peers) {
190                 tp = list_entry (el, lib_test_peer_t, tp_list);
191                 
192                 if (tp->tp_threshold == 0 ||    /* needs culling anyway */
193                     nid == PTL_NID_ANY ||       /* removing all entries */
194                     tp->tp_nid == nid)          /* matched this one */
195                 {
196                         list_del (&tp->tp_list);
197                         list_add (&tp->tp_list, &cull);
198                 }
199         }
200         
201         LIB_UNLOCK(nal, flags);
202                 
203         while (!list_empty (&cull)) {
204                 tp = list_entry (cull.next, lib_test_peer_t, tp_list);
205
206                 list_del (&tp->tp_list);
207                 PORTAL_FREE(tp, sizeof (*tp));
208         }
209         return PTL_OK;
210 }
211
212 static int
213 fail_peer (lib_nal_t *nal, ptl_nid_t nid, int outgoing) 
214 {
215         lib_test_peer_t  *tp;
216         struct list_head *el;
217         struct list_head *next;
218         unsigned long     flags;
219         struct list_head  cull;
220         int               fail = 0;
221
222         INIT_LIST_HEAD (&cull);
223         
224         LIB_LOCK (nal, flags);
225
226         list_for_each_safe (el, next, &nal->libnal_ni.ni_test_peers) {
227                 tp = list_entry (el, lib_test_peer_t, tp_list);
228
229                 if (tp->tp_threshold == 0) {
230                         /* zombie entry */
231                         if (outgoing) {
232                                 /* only cull zombies on outgoing tests,
233                                  * since we may be at interrupt priority on
234                                  * incoming messages. */
235                                 list_del (&tp->tp_list);
236                                 list_add (&tp->tp_list, &cull);
237                         }
238                         continue;
239                 }
240                         
241                 if (tp->tp_nid == PTL_NID_ANY || /* fail every peer */
242                     nid == tp->tp_nid) {        /* fail this peer */
243                         fail = 1;
244                         
245                         if (tp->tp_threshold != PTL_MD_THRESH_INF) {
246                                 tp->tp_threshold--;
247                                 if (outgoing &&
248                                     tp->tp_threshold == 0) {
249                                         /* see above */
250                                         list_del (&tp->tp_list);
251                                         list_add (&tp->tp_list, &cull);
252                                 }
253                         }
254                         break;
255                 }
256         }
257         
258         LIB_UNLOCK (nal, flags);
259
260         while (!list_empty (&cull)) {
261                 tp = list_entry (cull.next, lib_test_peer_t, tp_list);
262                 list_del (&tp->tp_list);
263                 
264                 PORTAL_FREE(tp, sizeof (*tp));
265         }
266
267         return (fail);
268 }
269
270 ptl_size_t
271 lib_iov_nob (int niov, struct iovec *iov)
272 {
273         ptl_size_t nob = 0;
274         
275         while (niov-- > 0)
276                 nob += (iov++)->iov_len;
277         
278         return (nob);
279 }
280
281 void
282 lib_copy_iov2buf (char *dest, int niov, struct iovec *iov, 
283                   ptl_size_t offset, ptl_size_t len)
284 {
285         ptl_size_t nob;
286
287         if (len == 0)
288                 return;
289         
290         /* skip complete frags before 'offset' */
291         LASSERT (niov > 0);
292         while (offset >= iov->iov_len) {
293                 offset -= iov->iov_len;
294                 iov++;
295                 niov--;
296                 LASSERT (niov > 0);
297         }
298                 
299         do {
300                 LASSERT (niov > 0);
301                 nob = MIN (iov->iov_len - offset, len);
302                 memcpy (dest, iov->iov_base + offset, nob);
303
304                 len -= nob;
305                 dest += nob;
306                 niov--;
307                 iov++;
308                 offset = 0;
309         } while (len > 0);
310 }
311
312 void
313 lib_copy_buf2iov (int niov, struct iovec *iov, ptl_size_t offset, 
314                   char *src, ptl_size_t len)
315 {
316         ptl_size_t nob;
317
318         if (len == 0)
319                 return;
320
321         /* skip complete frags before 'offset' */
322         LASSERT (niov > 0);
323         while (offset >= iov->iov_len) {
324                 offset -= iov->iov_len;
325                 iov++;
326                 niov--;
327                 LASSERT (niov > 0);
328         }
329         
330         do {
331                 LASSERT (niov > 0);
332                 nob = MIN (iov->iov_len - offset, len);
333                 memcpy (iov->iov_base + offset, src, nob);
334                 
335                 len -= nob;
336                 src += nob;
337                 niov--;
338                 iov++;
339                 offset = 0;
340         } while (len > 0);
341 }
342
343 int
344 lib_extract_iov (int dst_niov, struct iovec *dst,
345                  int src_niov, struct iovec *src,
346                  ptl_size_t offset, ptl_size_t len)
347 {
348         /* Initialise 'dst' to the subset of 'src' starting at 'offset',
349          * for exactly 'len' bytes, and return the number of entries.
350          * NB not destructive to 'src' */
351         ptl_size_t      frag_len;
352         int             niov;
353
354         if (len == 0)                           /* no data => */
355                 return (0);                     /* no frags */
356
357         LASSERT (src_niov > 0);
358         while (offset >= src->iov_len) {      /* skip initial frags */
359                 offset -= src->iov_len;
360                 src_niov--;
361                 src++;
362                 LASSERT (src_niov > 0);
363         }
364
365         niov = 1;
366         for (;;) {
367                 LASSERT (src_niov > 0);
368                 LASSERT (niov <= dst_niov);
369                 
370                 frag_len = src->iov_len - offset;
371                 dst->iov_base = ((char *)src->iov_base) + offset;
372
373                 if (len <= frag_len) {
374                         dst->iov_len = len;
375                         return (niov);
376                 }
377                 
378                 dst->iov_len = frag_len;
379
380                 len -= frag_len;
381                 dst++;
382                 src++;
383                 niov++;
384                 src_niov--;
385                 offset = 0;
386         }
387 }
388
389 #ifndef __KERNEL__
390 ptl_size_t
391 lib_kiov_nob (int niov, ptl_kiov_t *kiov) 
392 {
393         LASSERT (0);
394         return (0);
395 }
396
397 void
398 lib_copy_kiov2buf (char *dest, int niov, ptl_kiov_t *kiov, 
399                    ptl_size_t offset, ptl_size_t len)
400 {
401         LASSERT (0);
402 }
403
404 void
405 lib_copy_buf2kiov (int niov, ptl_kiov_t *kiov, ptl_size_t offset,
406                    char *src, ptl_size_t len)
407 {
408         LASSERT (0);
409 }
410
411 int
412 lib_extract_kiov (int dst_niov, ptl_kiov_t *dst, 
413                   int src_niov, ptl_kiov_t *src,
414                   ptl_size_t offset, ptl_size_t len)
415 {
416         LASSERT (0);
417 }
418
419 #else
420
421 ptl_size_t
422 lib_kiov_nob (int niov, ptl_kiov_t *kiov) 
423 {
424         ptl_size_t  nob = 0;
425
426         while (niov-- > 0)
427                 nob += (kiov++)->kiov_len;
428
429         return (nob);
430 }
431
432 void
433 lib_copy_kiov2buf (char *dest, int niov, ptl_kiov_t *kiov, 
434                    ptl_size_t offset, ptl_size_t len)
435 {
436         ptl_size_t  nob;
437         char       *addr;
438
439         if (len == 0)
440                 return;
441         
442         LASSERT (!in_interrupt ());
443
444         LASSERT (niov > 0);
445         while (offset > kiov->kiov_len) {
446                 offset -= kiov->kiov_len;
447                 kiov++;
448                 niov--;
449                 LASSERT (niov > 0);
450         }
451         
452         do{
453                 LASSERT (niov > 0);
454                 nob = MIN (kiov->kiov_len - offset, len);
455                 
456                 addr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
457                 memcpy (dest, addr, nob);
458                 kunmap (kiov->kiov_page);
459                 
460                 len -= nob;
461                 dest += nob;
462                 niov--;
463                 kiov++;
464                 offset = 0;
465         } while (len > 0);
466 }
467
468 void
469 lib_copy_buf2kiov (int niov, ptl_kiov_t *kiov, ptl_size_t offset,
470                    char *src, ptl_size_t len)
471 {
472         ptl_size_t  nob;
473         char       *addr;
474
475         if (len == 0)
476                 return;
477
478         LASSERT (!in_interrupt ());
479
480         LASSERT (niov > 0);
481         while (offset >= kiov->kiov_len) {
482                 offset -= kiov->kiov_len;
483                 kiov++;
484                 niov--;
485                 LASSERT (niov > 0);
486         }
487         
488         do {
489                 LASSERT (niov > 0);
490                 nob = MIN (kiov->kiov_len - offset, len);
491                 
492                 addr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
493                 memcpy (addr, src, nob);
494                 kunmap (kiov->kiov_page);
495                 
496                 len -= nob;
497                 src += nob;
498                 niov--;
499                 kiov++;
500                 offset = 0;
501         } while (len > 0);
502 }
503
504 int
505 lib_extract_kiov (int dst_niov, ptl_kiov_t *dst, 
506                   int src_niov, ptl_kiov_t *src,
507                   ptl_size_t offset, ptl_size_t len)
508 {
509         /* Initialise 'dst' to the subset of 'src' starting at 'offset',
510          * for exactly 'len' bytes, and return the number of entries.
511          * NB not destructive to 'src' */
512         ptl_size_t      frag_len;
513         int             niov;
514
515         if (len == 0)                           /* no data => */
516                 return (0);                     /* no frags */
517
518         LASSERT (src_niov > 0);
519         while (offset >= src->kiov_len) {      /* skip initial frags */
520                 offset -= src->kiov_len;
521                 src_niov--;
522                 src++;
523                 LASSERT (src_niov > 0);
524         }
525
526         niov = 1;
527         for (;;) {
528                 LASSERT (src_niov > 0);
529                 LASSERT (niov <= dst_niov);
530                 
531                 frag_len = src->kiov_len - offset;
532                 dst->kiov_page = src->kiov_page;
533                 dst->kiov_offset = src->kiov_offset + offset;
534
535                 if (len <= frag_len) {
536                         dst->kiov_len = len;
537                         LASSERT (dst->kiov_offset + dst->kiov_len <= PAGE_SIZE);
538                         return (niov);
539                 }
540
541                 dst->kiov_len = frag_len;
542                 LASSERT (dst->kiov_offset + dst->kiov_len <= PAGE_SIZE);
543
544                 len -= frag_len;
545                 dst++;
546                 src++;
547                 niov++;
548                 src_niov--;
549                 offset = 0;
550         }
551 }
552 #endif
553
554 ptl_err_t
555 lib_recv (lib_nal_t *nal, void *private, lib_msg_t *msg, lib_md_t *md,
556           ptl_size_t offset, ptl_size_t mlen, ptl_size_t rlen)
557 {
558         if (mlen == 0)
559                 return (nal->libnal_recv(nal, private, msg,
560                                          0, NULL,
561                                          offset, mlen, rlen));
562
563         if ((md->options & PTL_MD_KIOV) == 0)
564                 return (nal->libnal_recv(nal, private, msg,
565                                          md->md_niov, md->md_iov.iov, 
566                                          offset, mlen, rlen));
567
568         return (nal->libnal_recv_pages(nal, private, msg, 
569                                        md->md_niov, md->md_iov.kiov,
570                                        offset, mlen, rlen));
571 }
572
573 ptl_err_t
574 lib_send (lib_nal_t *nal, void *private, lib_msg_t *msg,
575           ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
576           lib_md_t *md, ptl_size_t offset, ptl_size_t len) 
577 {
578         if (len == 0)
579                 return (nal->libnal_send(nal, private, msg,
580                                          hdr, type, nid, pid,
581                                          0, NULL,
582                                          offset, len));
583         
584         if ((md->options & PTL_MD_KIOV) == 0)
585                 return (nal->libnal_send(nal, private, msg, 
586                                          hdr, type, nid, pid,
587                                          md->md_niov, md->md_iov.iov,
588                                          offset, len));
589
590         return (nal->libnal_send_pages(nal, private, msg, 
591                                        hdr, type, nid, pid,
592                                        md->md_niov, md->md_iov.kiov,
593                                        offset, len));
594 }
595
596 static void
597 lib_commit_md (lib_nal_t *nal, lib_md_t *md, lib_msg_t *msg)
598 {
599         /* ALWAYS called holding the LIB_LOCK */
600         lib_counters_t *counters = &nal->libnal_ni.ni_counters;
601
602         /* Here, we commit the MD to a network OP by marking it busy and
603          * decrementing its threshold.  Come what may, the network "owns"
604          * the MD until a call to lib_finalize() signals completion. */
605         msg->md = md;
606          
607         md->pending++;
608         if (md->threshold != PTL_MD_THRESH_INF) {
609                 LASSERT (md->threshold > 0);
610                 md->threshold--;
611         }
612
613         counters->msgs_alloc++;
614         if (counters->msgs_alloc > counters->msgs_max)
615                 counters->msgs_max = counters->msgs_alloc;
616
617         list_add (&msg->msg_list, &nal->libnal_ni.ni_active_msgs);
618 }
619
620 static void
621 lib_drop_message (lib_nal_t *nal, void *private, ptl_hdr_t *hdr)
622 {
623         unsigned long flags;
624
625         /* CAVEAT EMPTOR: this only drops messages that we've not committed
626          * to receive (init_msg() not called) and therefore can't cause an
627          * event. */
628         
629         LIB_LOCK(nal, flags);
630         nal->libnal_ni.ni_counters.drop_count++;
631         nal->libnal_ni.ni_counters.drop_length += hdr->payload_length;
632         LIB_UNLOCK(nal, flags);
633
634         /* NULL msg => if NAL calls lib_finalize it will be a noop */
635         (void) lib_recv(nal, private, NULL, NULL, 0, 0, hdr->payload_length);
636 }
637
638 /*
639  * Incoming messages have a ptl_msg_t object associated with them
640  * by the library.  This object encapsulates the state of the
641  * message and allows the NAL to do non-blocking receives or sends
642  * of long messages.
643  *
644  */
645 static ptl_err_t
646 parse_put(lib_nal_t *nal, ptl_hdr_t *hdr, void *private, lib_msg_t *msg)
647 {
648         lib_ni_t        *ni = &nal->libnal_ni;
649         ptl_size_t       mlength = 0;
650         ptl_size_t       offset = 0;
651         ptl_err_t        rc;
652         lib_md_t        *md;
653         unsigned long    flags;
654                 
655         /* Convert put fields to host byte order */
656         hdr->msg.put.match_bits = NTOH__u64 (hdr->msg.put.match_bits);
657         hdr->msg.put.ptl_index = NTOH__u32 (hdr->msg.put.ptl_index);
658         hdr->msg.put.offset = NTOH__u32 (hdr->msg.put.offset);
659
660         LIB_LOCK(nal, flags);
661
662         md = lib_match_md(nal, hdr->msg.put.ptl_index, PTL_MD_OP_PUT,
663                           hdr->src_nid, hdr->src_pid,
664                           hdr->payload_length, hdr->msg.put.offset,
665                           hdr->msg.put.match_bits, msg,
666                           &mlength, &offset);
667         if (md == NULL) {
668                 LIB_UNLOCK(nal, flags);
669                 return (PTL_FAIL);
670         }
671
672         msg->ev.type = PTL_EVENT_PUT_END;
673         msg->ev.hdr_data = hdr->msg.put.hdr_data;
674
675         if (!ptl_is_wire_handle_none(&hdr->msg.put.ack_wmd) &&
676             !(md->options & PTL_MD_ACK_DISABLE)) {
677                 msg->ack_wmd = hdr->msg.put.ack_wmd;
678         }
679
680         ni->ni_counters.recv_count++;
681         ni->ni_counters.recv_length += mlength;
682
683         LIB_UNLOCK(nal, flags);
684
685         rc = lib_recv(nal, private, msg, md, offset, mlength,
686                       hdr->payload_length);
687         if (rc != PTL_OK)
688                 CERROR(LPU64": error on receiving PUT from "LPU64": %d\n",
689                        ni->ni_pid.nid, hdr->src_nid, rc);
690
691         return (rc);
692 }
693
694 static ptl_err_t
695 parse_get(lib_nal_t *nal, ptl_hdr_t *hdr, void *private, lib_msg_t *msg)
696 {
697         lib_ni_t        *ni = &nal->libnal_ni;
698         ptl_size_t       mlength = 0;
699         ptl_size_t       offset = 0;
700         lib_md_t        *md;
701         ptl_hdr_t        reply;
702         unsigned long    flags;
703         int              rc;
704
705         /* Convert get fields to host byte order */
706         hdr->msg.get.match_bits = NTOH__u64 (hdr->msg.get.match_bits);
707         hdr->msg.get.ptl_index = NTOH__u32 (hdr->msg.get.ptl_index);
708         hdr->msg.get.sink_length = NTOH__u32 (hdr->msg.get.sink_length);
709         hdr->msg.get.src_offset = NTOH__u32 (hdr->msg.get.src_offset);
710
711         LIB_LOCK(nal, flags);
712
713         md = lib_match_md(nal, hdr->msg.get.ptl_index, PTL_MD_OP_GET,
714                           hdr->src_nid, hdr->src_pid,
715                           hdr->msg.get.sink_length, hdr->msg.get.src_offset,
716                           hdr->msg.get.match_bits, msg,
717                           &mlength, &offset);
718         if (md == NULL) {
719                 LIB_UNLOCK(nal, flags);
720                 return (PTL_FAIL);
721         }
722
723         msg->ev.type = PTL_EVENT_GET_END;
724         msg->ev.hdr_data = 0;
725
726         ni->ni_counters.send_count++;
727         ni->ni_counters.send_length += mlength;
728
729         LIB_UNLOCK(nal, flags);
730
731         memset (&reply, 0, sizeof (reply));
732         reply.type     = HTON__u32 (PTL_MSG_REPLY);
733         reply.dest_nid = HTON__u64 (hdr->src_nid);
734         reply.dest_pid = HTON__u32 (hdr->src_pid);
735         reply.src_nid  = HTON__u64 (ni->ni_pid.nid);
736         reply.src_pid  = HTON__u32 (ni->ni_pid.pid);
737         reply.payload_length = HTON__u32 (mlength);
738
739         reply.msg.reply.dst_wmd = hdr->msg.get.return_wmd;
740
741         /* NB call lib_send() _BEFORE_ lib_recv() completes the incoming
742          * message.  Some NALs _require_ this to implement optimized GET */
743
744         rc = lib_send (nal, private, msg, &reply, PTL_MSG_REPLY, 
745                        hdr->src_nid, hdr->src_pid, md, offset, mlength);
746         if (rc != PTL_OK)
747                 CERROR(LPU64": Unable to send REPLY for GET from "LPU64": %d\n",
748                        ni->ni_pid.nid, hdr->src_nid, rc);
749
750         /* Discard any junk after the hdr */
751         (void) lib_recv(nal, private, NULL, NULL, 0, 0, hdr->payload_length);
752
753         return (rc);
754 }
755
756 static ptl_err_t
757 parse_reply(lib_nal_t *nal, ptl_hdr_t *hdr, void *private, lib_msg_t *msg)
758 {
759         lib_ni_t        *ni = &nal->libnal_ni;
760         lib_md_t        *md;
761         int              rlength;
762         int              length;
763         unsigned long    flags;
764         ptl_err_t        rc;
765
766         LIB_LOCK(nal, flags);
767
768         /* NB handles only looked up by creator (no flips) */
769         md = ptl_wire_handle2md(&hdr->msg.reply.dst_wmd, nal);
770         if (md == NULL || md->threshold == 0) {
771                 CERROR (LPU64": Dropping REPLY from "LPU64" for %s MD "LPX64"."LPX64"\n",
772                         ni->ni_pid.nid, hdr->src_nid,
773                         md == NULL ? "invalid" : "inactive",
774                         hdr->msg.reply.dst_wmd.wh_interface_cookie,
775                         hdr->msg.reply.dst_wmd.wh_object_cookie);
776
777                 LIB_UNLOCK(nal, flags);
778                 return (PTL_FAIL);
779         }
780
781         LASSERT (md->offset == 0);
782
783         length = rlength = hdr->payload_length;
784
785         if (length > md->length) {
786                 if ((md->options & PTL_MD_TRUNCATE) == 0) {
787                         CERROR (LPU64": Dropping REPLY from "LPU64
788                                 " length %d for MD "LPX64" would overflow (%d)\n",
789                                 ni->ni_pid.nid, hdr->src_nid, length,
790                                 hdr->msg.reply.dst_wmd.wh_object_cookie,
791                                 md->length);
792                         LIB_UNLOCK(nal, flags);
793                         return (PTL_FAIL);
794                 }
795                 length = md->length;
796         }
797
798         CDEBUG(D_NET, "Reply from "LPU64" of length %d/%d into md "LPX64"\n",
799                hdr->src_nid, length, rlength, 
800                hdr->msg.reply.dst_wmd.wh_object_cookie);
801
802         lib_commit_md(nal, md, msg);
803
804         msg->ev.type = PTL_EVENT_REPLY_END;
805         msg->ev.initiator.nid = hdr->src_nid;
806         msg->ev.initiator.pid = hdr->src_pid;
807         msg->ev.rlength = rlength;
808         msg->ev.mlength = length;
809         msg->ev.offset = 0;
810
811         lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
812
813         ni->ni_counters.recv_count++;
814         ni->ni_counters.recv_length += length;
815
816         LIB_UNLOCK(nal, flags);
817
818         rc = lib_recv(nal, private, msg, md, 0, length, rlength);
819         if (rc != PTL_OK)
820                 CERROR(LPU64": error on receiving REPLY from "LPU64": %d\n",
821                        ni->ni_pid.nid, hdr->src_nid, rc);
822
823         return (rc);
824 }
825
826 static ptl_err_t
827 parse_ack(lib_nal_t *nal, ptl_hdr_t *hdr, void *private, lib_msg_t *msg)
828 {
829         lib_ni_t      *ni = &nal->libnal_ni;
830         lib_md_t      *md;
831         unsigned long  flags;
832
833         /* Convert ack fields to host byte order */
834         hdr->msg.ack.match_bits = NTOH__u64 (hdr->msg.ack.match_bits);
835         hdr->msg.ack.mlength = NTOH__u32 (hdr->msg.ack.mlength);
836
837         LIB_LOCK(nal, flags);
838
839         /* NB handles only looked up by creator (no flips) */
840         md = ptl_wire_handle2md(&hdr->msg.ack.dst_wmd, nal);
841         if (md == NULL || md->threshold == 0) {
842                 CDEBUG(D_INFO, LPU64": Dropping ACK from "LPU64" to %s MD "
843                        LPX64"."LPX64"\n", ni->ni_pid.nid, hdr->src_nid, 
844                        (md == NULL) ? "invalid" : "inactive",
845                        hdr->msg.ack.dst_wmd.wh_interface_cookie,
846                        hdr->msg.ack.dst_wmd.wh_object_cookie);
847
848                 LIB_UNLOCK(nal, flags);
849                 return (PTL_FAIL);
850         }
851
852         CDEBUG(D_NET, LPU64": ACK from "LPU64" into md "LPX64"\n",
853                ni->ni_pid.nid, hdr->src_nid, 
854                hdr->msg.ack.dst_wmd.wh_object_cookie);
855
856         lib_commit_md(nal, md, msg);
857
858         msg->ev.type = PTL_EVENT_ACK;
859         msg->ev.initiator.nid = hdr->src_nid;
860         msg->ev.initiator.pid = hdr->src_pid;
861         msg->ev.mlength = hdr->msg.ack.mlength;
862         msg->ev.match_bits = hdr->msg.ack.match_bits;
863
864         lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
865
866         ni->ni_counters.recv_count++;
867
868         LIB_UNLOCK(nal, flags);
869         
870         /* We have received and matched up the ack OK, create the
871          * completion event now... */
872         lib_finalize(nal, private, msg, PTL_OK);
873
874         /* ...and now discard any junk after the hdr */
875         (void) lib_recv(nal, private, NULL, NULL, 0, 0, hdr->payload_length);
876  
877        return (PTL_OK);
878 }
879
880 static char *
881 hdr_type_string (ptl_hdr_t *hdr)
882 {
883         switch (hdr->type) {
884         case PTL_MSG_ACK:
885                 return ("ACK");
886         case PTL_MSG_PUT:
887                 return ("PUT");
888         case PTL_MSG_GET:
889                 return ("GET");
890         case PTL_MSG_REPLY:
891                 return ("REPLY");
892         case PTL_MSG_HELLO:
893                 return ("HELLO");
894         default:
895                 return ("<UNKNOWN>");
896         }
897 }
898
899 void print_hdr(lib_nal_t *nal, ptl_hdr_t * hdr)
900 {
901         char *type_str = hdr_type_string (hdr);
902
903         CWARN("P3 Header at %p of type %s\n", hdr, type_str);
904         CWARN("    From nid/pid "LPX64"/%u", hdr->src_nid, hdr->src_pid);
905         CWARN("    To nid/pid "LPX64"/%u\n", hdr->dest_nid, hdr->dest_pid);
906
907         switch (hdr->type) {
908         default:
909                 break;
910
911         case PTL_MSG_PUT:
912                 CWARN("    Ptl index %d, ack md "LPX64"."LPX64", "
913                       "match bits "LPX64"\n",
914                       hdr->msg.put.ptl_index,
915                       hdr->msg.put.ack_wmd.wh_interface_cookie,
916                       hdr->msg.put.ack_wmd.wh_object_cookie,
917                       hdr->msg.put.match_bits);
918                 CWARN("    Length %d, offset %d, hdr data "LPX64"\n",
919                       hdr->payload_length, hdr->msg.put.offset,
920                       hdr->msg.put.hdr_data);
921                 break;
922
923         case PTL_MSG_GET:
924                 CWARN("    Ptl index %d, return md "LPX64"."LPX64", "
925                       "match bits "LPX64"\n", hdr->msg.get.ptl_index,
926                       hdr->msg.get.return_wmd.wh_interface_cookie,
927                       hdr->msg.get.return_wmd.wh_object_cookie,
928                       hdr->msg.get.match_bits);
929                 CWARN("    Length %d, src offset %d\n",
930                       hdr->msg.get.sink_length,
931                       hdr->msg.get.src_offset);
932                 break;
933
934         case PTL_MSG_ACK:
935                 CWARN("    dst md "LPX64"."LPX64", "
936                       "manipulated length %d\n",
937                       hdr->msg.ack.dst_wmd.wh_interface_cookie,
938                       hdr->msg.ack.dst_wmd.wh_object_cookie,
939                       hdr->msg.ack.mlength);
940                 break;
941
942         case PTL_MSG_REPLY:
943                 CWARN("    dst md "LPX64"."LPX64", "
944                       "length %d\n",
945                       hdr->msg.reply.dst_wmd.wh_interface_cookie,
946                       hdr->msg.reply.dst_wmd.wh_object_cookie,
947                       hdr->payload_length);
948         }
949
950 }                               /* end of print_hdr() */
951
952
953 ptl_err_t
954 lib_parse(lib_nal_t *nal, ptl_hdr_t *hdr, void *private)
955 {
956         unsigned long  flags;
957         ptl_err_t      rc;
958         lib_msg_t     *msg;
959
960         /* NB we return PTL_OK if we manage to parse the header and believe
961          * it looks OK.  Anything that goes wrong with receiving the
962          * message after that point is the responsibility of the NAL */
963         
964         /* convert common fields to host byte order */
965         hdr->type = NTOH__u32 (hdr->type);
966         hdr->src_nid = NTOH__u64 (hdr->src_nid);
967         hdr->src_pid = NTOH__u32 (hdr->src_pid);
968         hdr->dest_pid = NTOH__u32 (hdr->dest_pid);
969         hdr->payload_length = NTOH__u32(hdr->payload_length);
970
971         switch (hdr->type) {
972         case PTL_MSG_HELLO: {
973                 /* dest_nid is really ptl_magicversion_t */
974                 ptl_magicversion_t *mv = (ptl_magicversion_t *)&hdr->dest_nid;
975
976                 mv->magic = NTOH__u32(mv->magic);
977                 mv->version_major = NTOH__u16(mv->version_major);
978                 mv->version_minor = NTOH__u16(mv->version_minor);
979
980                 if (mv->magic == PORTALS_PROTO_MAGIC &&
981                     mv->version_major == PORTALS_PROTO_VERSION_MAJOR &&
982                     mv->version_minor == PORTALS_PROTO_VERSION_MINOR) {
983                         CWARN (LPU64": Dropping unexpected HELLO message: "
984                                "magic %d, version %d.%d from "LPD64"\n",
985                                nal->libnal_ni.ni_pid.nid, mv->magic, 
986                                mv->version_major, mv->version_minor,
987                                hdr->src_nid);
988
989                         /* it's good but we don't want it */
990                         lib_drop_message(nal, private, hdr);
991                         return PTL_OK;
992                 }
993
994                 /* we got garbage */
995                 CERROR (LPU64": Bad HELLO message: "
996                         "magic %d, version %d.%d from "LPD64"\n",
997                         nal->libnal_ni.ni_pid.nid, mv->magic, 
998                         mv->version_major, mv->version_minor,
999                         hdr->src_nid);
1000                 return PTL_FAIL;
1001         }
1002
1003         case PTL_MSG_ACK:
1004         case PTL_MSG_PUT:
1005         case PTL_MSG_GET:
1006         case PTL_MSG_REPLY:
1007                 hdr->dest_nid = NTOH__u64 (hdr->dest_nid);
1008                 if (hdr->dest_nid != nal->libnal_ni.ni_pid.nid) {
1009                         CERROR(LPU64": BAD dest NID in %s message from"
1010                                LPU64" to "LPU64" (not me)\n", 
1011                                nal->libnal_ni.ni_pid.nid, hdr_type_string (hdr),
1012                                hdr->src_nid, hdr->dest_nid);
1013                         return PTL_FAIL;
1014                 }
1015                 break;
1016
1017         default:
1018                 CERROR(LPU64": Bad message type 0x%x from "LPU64"\n",
1019                        nal->libnal_ni.ni_pid.nid, hdr->type, hdr->src_nid);
1020                 return PTL_FAIL;
1021         }
1022
1023         /* We've decided we're not receiving garbage since we can parse the
1024          * header.  We will return PTL_OK come what may... */
1025
1026         if (!list_empty (&nal->libnal_ni.ni_test_peers) && /* normally we don't */
1027             fail_peer (nal, hdr->src_nid, 0))      /* shall we now? */
1028         {
1029                 CERROR(LPU64": Dropping incoming %s from "LPU64
1030                        ": simulated failure\n",
1031                        nal->libnal_ni.ni_pid.nid, hdr_type_string (hdr), 
1032                        hdr->src_nid);
1033                 lib_drop_message(nal, private, hdr);
1034                 return PTL_OK;
1035         }
1036
1037         msg = lib_msg_alloc(nal);
1038         if (msg == NULL) {
1039                 CERROR(LPU64": Dropping incoming %s from "LPU64
1040                        ": can't allocate a lib_msg_t\n",
1041                        nal->libnal_ni.ni_pid.nid, hdr_type_string (hdr), 
1042                        hdr->src_nid);
1043                 lib_drop_message(nal, private, hdr);
1044                 return PTL_OK;
1045         }
1046
1047         switch (hdr->type) {
1048         case PTL_MSG_ACK:
1049                 rc = parse_ack(nal, hdr, private, msg);
1050                 break;
1051         case PTL_MSG_PUT:
1052                 rc = parse_put(nal, hdr, private, msg);
1053                 break;
1054         case PTL_MSG_GET:
1055                 rc = parse_get(nal, hdr, private, msg);
1056                 break;
1057         case PTL_MSG_REPLY:
1058                 rc = parse_reply(nal, hdr, private, msg);
1059                 break;
1060         default:
1061                 LASSERT(0);
1062                 rc = PTL_FAIL;                  /* no compiler warning please */
1063                 break;
1064         }
1065                 
1066         if (rc != PTL_OK) {
1067                 if (msg->md != NULL) {
1068                         /* committed... */
1069                         lib_finalize(nal, private, msg, rc);
1070                 } else {
1071                         LIB_LOCK(nal, flags);
1072                         lib_msg_free(nal, msg); /* expects LIB_LOCK held */
1073                         LIB_UNLOCK(nal, flags);
1074
1075                         lib_drop_message(nal, private, hdr);
1076                 }
1077         }
1078
1079         return PTL_OK;
1080         /* That's "OK I can parse it", not "OK I like it" :) */
1081 }
1082
1083 int 
1084 lib_api_put(nal_t *apinal, ptl_handle_md_t *mdh, 
1085             ptl_ack_req_t ack, ptl_process_id_t *id,
1086             ptl_pt_index_t portal, ptl_ac_index_t ac,
1087             ptl_match_bits_t match_bits, 
1088             ptl_size_t offset, ptl_hdr_data_t hdr_data)
1089 {
1090         lib_nal_t        *nal = apinal->nal_data;
1091         lib_ni_t         *ni = &nal->libnal_ni;
1092         lib_msg_t        *msg;
1093         ptl_hdr_t         hdr;
1094         lib_md_t         *md;
1095         unsigned long     flags;
1096         int               rc;
1097         
1098         if (!list_empty (&ni->ni_test_peers) && /* normally we don't */
1099             fail_peer (nal, id->nid, 1))           /* shall we now? */
1100         {
1101                 CERROR("Dropping PUT to "LPU64": simulated failure\n",
1102                        id->nid);
1103                 return PTL_PROCESS_INVALID;
1104         }
1105
1106         msg = lib_msg_alloc(nal);
1107         if (msg == NULL) {
1108                 CERROR(LPU64": Dropping PUT to "LPU64": ENOMEM on lib_msg_t\n",
1109                        ni->ni_pid.nid, id->nid);
1110                 return PTL_NO_SPACE;
1111         }
1112
1113         LIB_LOCK(nal, flags);
1114
1115         md = ptl_handle2md(mdh, nal);
1116         if (md == NULL || md->threshold == 0) {
1117                 lib_msg_free(nal, msg);
1118                 LIB_UNLOCK(nal, flags);
1119         
1120                 return PTL_MD_INVALID;
1121         }
1122
1123         CDEBUG(D_NET, "PtlPut -> "LPX64"\n", id->nid);
1124
1125         memset (&hdr, 0, sizeof (hdr));
1126         hdr.type     = HTON__u32 (PTL_MSG_PUT);
1127         hdr.dest_nid = HTON__u64 (id->nid);
1128         hdr.dest_pid = HTON__u32 (id->pid);
1129         hdr.src_nid  = HTON__u64 (ni->ni_pid.nid);
1130         hdr.src_pid  = HTON__u32 (ni->ni_pid.pid);
1131         hdr.payload_length = HTON__u32 (md->length);
1132
1133         /* NB handles only looked up by creator (no flips) */
1134         if (ack == PTL_ACK_REQ) {
1135                 hdr.msg.put.ack_wmd.wh_interface_cookie = ni->ni_interface_cookie;
1136                 hdr.msg.put.ack_wmd.wh_object_cookie = md->md_lh.lh_cookie;
1137         } else {
1138                 hdr.msg.put.ack_wmd = PTL_WIRE_HANDLE_NONE;
1139         }
1140
1141         hdr.msg.put.match_bits = HTON__u64 (match_bits);
1142         hdr.msg.put.ptl_index = HTON__u32 (portal);
1143         hdr.msg.put.offset = HTON__u32 (offset);
1144         hdr.msg.put.hdr_data = hdr_data;
1145
1146         lib_commit_md(nal, md, msg);
1147         
1148         msg->ev.type = PTL_EVENT_SEND_END;
1149         msg->ev.initiator.nid = ni->ni_pid.nid;
1150         msg->ev.initiator.pid = ni->ni_pid.pid;
1151         msg->ev.portal = portal;
1152         msg->ev.match_bits = match_bits;
1153         msg->ev.rlength = md->length;
1154         msg->ev.mlength = md->length;
1155         msg->ev.offset = offset;
1156         msg->ev.hdr_data = hdr_data;
1157
1158         lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
1159
1160         ni->ni_counters.send_count++;
1161         ni->ni_counters.send_length += md->length;
1162
1163         LIB_UNLOCK(nal, flags);
1164         
1165         rc = lib_send (nal, NULL, msg, &hdr, PTL_MSG_PUT,
1166                        id->nid, id->pid, md, 0, md->length);
1167         if (rc != PTL_OK) {
1168                 CERROR("Error sending PUT to "LPX64": %d\n",
1169                        id->nid, rc);
1170                 lib_finalize (nal, NULL, msg, rc);
1171         }
1172         
1173         /* completion will be signalled by an event */
1174         return PTL_OK;
1175 }
1176
1177 lib_msg_t * 
1178 lib_create_reply_msg (lib_nal_t *nal, ptl_nid_t peer_nid, lib_msg_t *getmsg)
1179 {
1180         /* The NAL can DMA direct to the GET md (i.e. no REPLY msg).  This
1181          * returns a msg for the NAL to pass to lib_finalize() when the sink
1182          * data has been received.
1183          *
1184          * CAVEAT EMPTOR: 'getmsg' is the original GET, which is freed when
1185          * lib_finalize() is called on it, so the NAL must call this first */
1186
1187         lib_ni_t        *ni = &nal->libnal_ni;
1188         lib_msg_t       *msg = lib_msg_alloc(nal);
1189         lib_md_t        *getmd = getmsg->md;
1190         unsigned long    flags;
1191
1192         LIB_LOCK(nal, flags);
1193
1194         LASSERT (getmd->pending > 0);
1195
1196         if (msg == NULL) {
1197                 CERROR ("Dropping REPLY from "LPU64": can't allocate msg\n",
1198                         peer_nid);
1199                 goto drop;
1200         }
1201
1202         if (getmd->threshold == 0) {
1203                 CERROR ("Dropping REPLY from "LPU64" for inactive MD %p\n",
1204                         peer_nid, getmd);
1205                 goto drop_msg;
1206         }
1207
1208         LASSERT (getmd->offset == 0);
1209
1210         CDEBUG(D_NET, "Reply from "LPU64" md %p\n", peer_nid, getmd);
1211
1212         lib_commit_md (nal, getmd, msg);
1213
1214         msg->ev.type = PTL_EVENT_REPLY_END;
1215         msg->ev.initiator.nid = peer_nid;
1216         msg->ev.initiator.pid = 0;      /* XXX FIXME!!! */
1217         msg->ev.rlength = msg->ev.mlength = getmd->length;
1218         msg->ev.offset = 0;
1219
1220         lib_md_deconstruct(nal, getmd, &msg->ev.mem_desc);
1221
1222         ni->ni_counters.recv_count++;
1223         ni->ni_counters.recv_length += getmd->length;
1224
1225         LIB_UNLOCK(nal, flags);
1226
1227         return msg;
1228
1229  drop_msg:
1230         lib_msg_free(nal, msg);
1231  drop:
1232         nal->libnal_ni.ni_counters.drop_count++;
1233         nal->libnal_ni.ni_counters.drop_length += getmd->length;
1234
1235         LIB_UNLOCK (nal, flags);
1236
1237         return NULL;
1238 }
1239
1240 int 
1241 lib_api_get(nal_t *apinal, ptl_handle_md_t *mdh, ptl_process_id_t *id,
1242             ptl_pt_index_t portal, ptl_ac_index_t ac,
1243             ptl_match_bits_t match_bits, ptl_size_t offset)
1244 {
1245         lib_nal_t        *nal = apinal->nal_data;
1246         lib_ni_t         *ni = &nal->libnal_ni;
1247         lib_msg_t        *msg;
1248         ptl_hdr_t         hdr;
1249         lib_md_t         *md;
1250         unsigned long     flags;
1251         int               rc;
1252         
1253         if (!list_empty (&ni->ni_test_peers) && /* normally we don't */
1254             fail_peer (nal, id->nid, 1))           /* shall we now? */
1255         {
1256                 CERROR("Dropping PUT to "LPX64": simulated failure\n",
1257                        id->nid);
1258                 return PTL_PROCESS_INVALID;
1259         }
1260
1261         msg = lib_msg_alloc(nal);
1262         if (msg == NULL) {
1263                 CERROR("Dropping GET to "LPU64": ENOMEM on lib_msg_t\n",
1264                        id->nid);
1265                 return PTL_NO_SPACE;
1266         }
1267
1268         LIB_LOCK(nal, flags);
1269
1270         md = ptl_handle2md(mdh, nal);
1271         if (md == NULL || !md->threshold) {
1272                 lib_msg_free(nal, msg);
1273                 LIB_UNLOCK(nal, flags);
1274
1275                 return PTL_MD_INVALID;
1276         }
1277
1278         CDEBUG(D_NET, "PtlGet -> %Lu: %lu\n", (unsigned long long)id->nid,
1279                (unsigned long)id->pid);
1280
1281         memset (&hdr, 0, sizeof (hdr));
1282         hdr.type     = HTON__u32 (PTL_MSG_GET);
1283         hdr.dest_nid = HTON__u64 (id->nid);
1284         hdr.dest_pid = HTON__u32 (id->pid);
1285         hdr.src_nid  = HTON__u64 (ni->ni_pid.nid);
1286         hdr.src_pid  = HTON__u32 (ni->ni_pid.pid);
1287         hdr.payload_length = 0;
1288
1289         /* NB handles only looked up by creator (no flips) */
1290         hdr.msg.get.return_wmd.wh_interface_cookie = ni->ni_interface_cookie;
1291         hdr.msg.get.return_wmd.wh_object_cookie = md->md_lh.lh_cookie;
1292
1293         hdr.msg.get.match_bits = HTON__u64 (match_bits);
1294         hdr.msg.get.ptl_index = HTON__u32 (portal);
1295         hdr.msg.get.src_offset = HTON__u32 (offset);
1296         hdr.msg.get.sink_length = HTON__u32 (md->length);
1297
1298         lib_commit_md(nal, md, msg);
1299
1300         msg->ev.type = PTL_EVENT_SEND_END;
1301         msg->ev.initiator = ni->ni_pid;
1302         msg->ev.portal = portal;
1303         msg->ev.match_bits = match_bits;
1304         msg->ev.rlength = md->length;
1305         msg->ev.mlength = md->length;
1306         msg->ev.offset = offset;
1307         msg->ev.hdr_data = 0;
1308
1309         lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
1310
1311         ni->ni_counters.send_count++;
1312
1313         LIB_UNLOCK(nal, flags);
1314
1315         rc = lib_send (nal, NULL, msg, &hdr, PTL_MSG_GET,
1316                        id->nid, id->pid, NULL, 0, 0);
1317         if (rc != PTL_OK) {
1318                 CERROR(LPU64": error sending GET to "LPU64": %d\n",
1319                        ni->ni_pid.nid, id->nid, rc);
1320                 lib_finalize (nal, NULL, msg, rc);
1321         }
1322         
1323         /* completion will be signalled by an event */
1324         return PTL_OK;
1325 }
1326
1327 void lib_assert_wire_constants (void)
1328 {
1329         /* Wire protocol assertions generated by 'wirecheck'
1330          * running on Linux robert.bartonsoftware.com 2.4.20-18.9 #1 Thu May 29 06:54:41 EDT 2003 i68
1331          * with gcc version 3.2.2 20030222 (Red Hat Linux 3.2.2-5) */
1332
1333
1334         /* Constants... */
1335         LASSERT (PORTALS_PROTO_MAGIC == 0xeebc0ded);
1336         LASSERT (PORTALS_PROTO_VERSION_MAJOR == 0);
1337         LASSERT (PORTALS_PROTO_VERSION_MINOR == 3);
1338         LASSERT (PTL_MSG_ACK == 0);
1339         LASSERT (PTL_MSG_PUT == 1);
1340         LASSERT (PTL_MSG_GET == 2);
1341         LASSERT (PTL_MSG_REPLY == 3);
1342         LASSERT (PTL_MSG_HELLO == 4);
1343
1344         /* Checks for struct ptl_handle_wire_t */
1345         LASSERT ((int)sizeof(ptl_handle_wire_t) == 16);
1346         LASSERT (offsetof(ptl_handle_wire_t, wh_interface_cookie) == 0);
1347         LASSERT ((int)sizeof(((ptl_handle_wire_t *)0)->wh_interface_cookie) == 8);
1348         LASSERT (offsetof(ptl_handle_wire_t, wh_object_cookie) == 8);
1349         LASSERT ((int)sizeof(((ptl_handle_wire_t *)0)->wh_object_cookie) == 8);
1350
1351         /* Checks for struct ptl_magicversion_t */
1352         LASSERT ((int)sizeof(ptl_magicversion_t) == 8);
1353         LASSERT (offsetof(ptl_magicversion_t, magic) == 0);
1354         LASSERT ((int)sizeof(((ptl_magicversion_t *)0)->magic) == 4);
1355         LASSERT (offsetof(ptl_magicversion_t, version_major) == 4);
1356         LASSERT ((int)sizeof(((ptl_magicversion_t *)0)->version_major) == 2);
1357         LASSERT (offsetof(ptl_magicversion_t, version_minor) == 6);
1358         LASSERT ((int)sizeof(((ptl_magicversion_t *)0)->version_minor) == 2);
1359
1360         /* Checks for struct ptl_hdr_t */
1361         LASSERT ((int)sizeof(ptl_hdr_t) == 72);
1362         LASSERT (offsetof(ptl_hdr_t, dest_nid) == 0);
1363         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->dest_nid) == 8);
1364         LASSERT (offsetof(ptl_hdr_t, src_nid) == 8);
1365         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->src_nid) == 8);
1366         LASSERT (offsetof(ptl_hdr_t, dest_pid) == 16);
1367         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->dest_pid) == 4);
1368         LASSERT (offsetof(ptl_hdr_t, src_pid) == 20);
1369         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->src_pid) == 4);
1370         LASSERT (offsetof(ptl_hdr_t, type) == 24);
1371         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->type) == 4);
1372         LASSERT (offsetof(ptl_hdr_t, payload_length) == 28);
1373         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->payload_length) == 4);
1374         LASSERT (offsetof(ptl_hdr_t, msg) == 32);
1375         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg) == 40);
1376
1377         /* Ack */
1378         LASSERT (offsetof(ptl_hdr_t, msg.ack.dst_wmd) == 32);
1379         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.ack.dst_wmd) == 16);
1380         LASSERT (offsetof(ptl_hdr_t, msg.ack.match_bits) == 48);
1381         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.ack.match_bits) == 8);
1382         LASSERT (offsetof(ptl_hdr_t, msg.ack.mlength) == 56);
1383         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.ack.mlength) == 4);
1384
1385         /* Put */
1386         LASSERT (offsetof(ptl_hdr_t, msg.put.ack_wmd) == 32);
1387         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.put.ack_wmd) == 16);
1388         LASSERT (offsetof(ptl_hdr_t, msg.put.match_bits) == 48);
1389         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.put.match_bits) == 8);
1390         LASSERT (offsetof(ptl_hdr_t, msg.put.hdr_data) == 56);
1391         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.put.hdr_data) == 8);
1392         LASSERT (offsetof(ptl_hdr_t, msg.put.ptl_index) == 64);
1393         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.put.ptl_index) == 4);
1394         LASSERT (offsetof(ptl_hdr_t, msg.put.offset) == 68);
1395         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.put.offset) == 4);
1396
1397         /* Get */
1398         LASSERT (offsetof(ptl_hdr_t, msg.get.return_wmd) == 32);
1399         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.get.return_wmd) == 16);
1400         LASSERT (offsetof(ptl_hdr_t, msg.get.match_bits) == 48);
1401         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.get.match_bits) == 8);
1402         LASSERT (offsetof(ptl_hdr_t, msg.get.ptl_index) == 56);
1403         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.get.ptl_index) == 4);
1404         LASSERT (offsetof(ptl_hdr_t, msg.get.src_offset) == 60);
1405         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.get.src_offset) == 4);
1406         LASSERT (offsetof(ptl_hdr_t, msg.get.sink_length) == 64);
1407         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.get.sink_length) == 4);
1408
1409         /* Reply */
1410         LASSERT (offsetof(ptl_hdr_t, msg.reply.dst_wmd) == 32);
1411         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.reply.dst_wmd) == 16);
1412
1413         /* Hello */
1414         LASSERT (offsetof(ptl_hdr_t, msg.hello.incarnation) == 32);
1415         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.hello.incarnation) == 8);
1416         LASSERT (offsetof(ptl_hdr_t, msg.hello.type) == 40);
1417         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.hello.type) == 4);
1418 }