Whamcloud - gitweb
b=4552
[fs/lustre-release.git] / lnet / lnet / lib-move.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * lib/lib-move.c
5  * Data movement routines
6  *
7  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
8  *  Copyright (c) 2001-2002 Sandia National Laboratories
9  *
10  *   This file is part of Lustre, http://www.sf.net/projects/lustre/
11  *
12  *   Lustre is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Lustre is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Lustre; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #ifndef __KERNEL__
27 # include <stdio.h>
28 #else
29 # define DEBUG_SUBSYSTEM S_PORTALS
30 # include <linux/kp30.h>
31 #endif
32 #include <portals/p30.h>
33 #include <portals/lib-p30.h>
34
35 /* forward ref */
36 static void lib_commit_md (lib_nal_t *nal, lib_md_t *md, lib_msg_t *msg);
37
38 static lib_md_t *
39 lib_match_md(lib_nal_t *nal, int index, int op_mask, 
40              ptl_nid_t src_nid, ptl_pid_t src_pid, 
41              ptl_size_t rlength, ptl_size_t roffset,
42              ptl_match_bits_t match_bits, lib_msg_t *msg,
43              ptl_size_t *mlength_out, ptl_size_t *offset_out)
44 {
45         lib_ni_t         *ni = &nal->libnal_ni;
46         struct list_head *match_list = &ni->ni_portals.tbl[index];
47         struct list_head *tmp;
48         lib_me_t         *me;
49         lib_md_t         *md;
50         ptl_size_t        mlength;
51         ptl_size_t        offset;
52         ENTRY;
53
54         CDEBUG (D_NET, "Request from "LPU64".%d of length %d into portal %d "
55                 "MB="LPX64"\n", src_nid, src_pid, rlength, index, match_bits);
56
57         if (index < 0 || index >= ni->ni_portals.size) {
58                 CERROR("Invalid portal %d not in [0-%d]\n",
59                        index, ni->ni_portals.size);
60                 goto failed;
61         }
62
63         list_for_each (tmp, match_list) {
64                 me = list_entry(tmp, lib_me_t, me_list);
65                 md = me->md;
66
67                  /* ME attached but MD not attached yet */
68                 if (md == NULL)
69                         continue;
70
71                 LASSERT (me == md->me);
72
73                 /* mismatched MD op */
74                 if ((md->options & op_mask) == 0)
75                         continue;
76
77                 /* MD exhausted */
78                 if (lib_md_exhausted(md))
79                         continue;
80
81                 /* mismatched ME nid/pid? */
82                 if (me->match_id.nid != PTL_NID_ANY &&
83                     me->match_id.nid != src_nid)
84                         continue;
85                 
86                 CDEBUG(D_NET,"match_id.pid [%x], src_pid [%x]\n", me->match_id.pid, src_pid);
87
88                 if (me->match_id.pid != PTL_PID_ANY &&
89                     me->match_id.pid != src_pid)
90                         continue;
91
92                 /* mismatched ME matchbits? */
93                 if (((me->match_bits ^ match_bits) & ~me->ignore_bits) != 0)
94                         continue;
95
96                 /* Hurrah! This _is_ a match; check it out... */
97
98                 if ((md->options & PTL_MD_MANAGE_REMOTE) == 0)
99                         offset = md->offset;
100                 else
101                         offset = roffset;
102
103                 if ((md->options & PTL_MD_MAX_SIZE) != 0) {
104                         mlength = md->max_size;
105                         LASSERT (md->offset + mlength <= md->length);
106                 } else {
107                         mlength = md->length - offset;
108                 }
109
110                 if (rlength <= mlength) {        /* fits in allowed space */
111                         mlength = rlength;
112                 } else if ((md->options & PTL_MD_TRUNCATE) == 0) {
113                         /* this packet _really_ is too big */
114                         CERROR("Matching packet %d too big: %d left, "
115                                "%d allowed\n", rlength, md->length - offset,
116                                mlength);
117                         goto failed;
118                 }
119
120                 /* Commit to this ME/MD */
121                 CDEBUG(D_NET, "Incoming %s index %x from "LPU64"/%u of "
122                        "length %d/%d into md "LPX64" [%d] + %d\n", 
123                        (op_mask == PTL_MD_OP_PUT) ? "put" : "get",
124                        index, src_nid, src_pid, mlength, rlength, 
125                        md->md_lh.lh_cookie, md->md_niov, offset);
126
127                 lib_commit_md(nal, md, msg);
128                 md->offset = offset + mlength;
129
130                 /* NB Caller sets ev.type and ev.hdr_data */
131                 msg->ev.initiator.nid = src_nid;
132                 msg->ev.initiator.pid = src_pid;
133                 msg->ev.portal = index;
134                 msg->ev.match_bits = match_bits;
135                 msg->ev.rlength = rlength;
136                 msg->ev.mlength = mlength;
137                 msg->ev.offset = offset;
138
139                 lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
140
141                 *offset_out = offset;
142                 *mlength_out = mlength;
143
144                 /* Auto-unlink NOW, so the ME gets unlinked if required.
145                  * We bumped md->pending above so the MD just gets flagged
146                  * for unlink when it is finalized. */
147                 if ((md->md_flags & PTL_MD_FLAG_AUTO_UNLINK) != 0 &&
148                     lib_md_exhausted(md))
149                         lib_md_unlink(nal, md);
150
151                 RETURN (md);
152         }
153
154  failed:
155         CERROR (LPU64": Dropping %s from "LPU64".%d portal %d match "LPX64
156                 " offset %d length %d: no match\n",
157                 ni->ni_pid.nid, (op_mask == PTL_MD_OP_GET) ? "GET" : "PUT",
158                 src_nid, src_pid, index, match_bits, roffset, rlength);
159         RETURN(NULL);
160 }
161
162 int lib_api_fail_nid (nal_t *apinal, ptl_nid_t nid, unsigned int threshold)
163 {
164         lib_nal_t         *nal = apinal->nal_data;
165         lib_test_peer_t   *tp;
166         unsigned long      flags;
167         struct list_head  *el;
168         struct list_head  *next;
169         struct list_head   cull;
170         
171         if (threshold != 0) {
172                 /* Adding a new entry */
173                 PORTAL_ALLOC(tp, sizeof(*tp));
174                 if (tp == NULL)
175                         return PTL_NO_SPACE;
176                 
177                 tp->tp_nid = nid;
178                 tp->tp_threshold = threshold;
179                 
180                 LIB_LOCK(nal, flags);
181                 list_add_tail (&tp->tp_list, &nal->libnal_ni.ni_test_peers);
182                 LIB_UNLOCK(nal, flags);
183                 return PTL_OK;
184         }
185         
186         /* removing entries */
187         INIT_LIST_HEAD (&cull);
188         
189         LIB_LOCK(nal, flags);
190
191         list_for_each_safe (el, next, &nal->libnal_ni.ni_test_peers) {
192                 tp = list_entry (el, lib_test_peer_t, tp_list);
193                 
194                 if (tp->tp_threshold == 0 ||    /* needs culling anyway */
195                     nid == PTL_NID_ANY ||       /* removing all entries */
196                     tp->tp_nid == nid)          /* matched this one */
197                 {
198                         list_del (&tp->tp_list);
199                         list_add (&tp->tp_list, &cull);
200                 }
201         }
202         
203         LIB_UNLOCK(nal, flags);
204                 
205         while (!list_empty (&cull)) {
206                 tp = list_entry (cull.next, lib_test_peer_t, tp_list);
207
208                 list_del (&tp->tp_list);
209                 PORTAL_FREE(tp, sizeof (*tp));
210         }
211         return PTL_OK;
212 }
213
214 static int
215 fail_peer (lib_nal_t *nal, ptl_nid_t nid, int outgoing) 
216 {
217         lib_test_peer_t  *tp;
218         struct list_head *el;
219         struct list_head *next;
220         unsigned long     flags;
221         struct list_head  cull;
222         int               fail = 0;
223
224         INIT_LIST_HEAD (&cull);
225         
226         LIB_LOCK (nal, flags);
227
228         list_for_each_safe (el, next, &nal->libnal_ni.ni_test_peers) {
229                 tp = list_entry (el, lib_test_peer_t, tp_list);
230
231                 if (tp->tp_threshold == 0) {
232                         /* zombie entry */
233                         if (outgoing) {
234                                 /* only cull zombies on outgoing tests,
235                                  * since we may be at interrupt priority on
236                                  * incoming messages. */
237                                 list_del (&tp->tp_list);
238                                 list_add (&tp->tp_list, &cull);
239                         }
240                         continue;
241                 }
242                         
243                 if (tp->tp_nid == PTL_NID_ANY || /* fail every peer */
244                     nid == tp->tp_nid) {        /* fail this peer */
245                         fail = 1;
246                         
247                         if (tp->tp_threshold != PTL_MD_THRESH_INF) {
248                                 tp->tp_threshold--;
249                                 if (outgoing &&
250                                     tp->tp_threshold == 0) {
251                                         /* see above */
252                                         list_del (&tp->tp_list);
253                                         list_add (&tp->tp_list, &cull);
254                                 }
255                         }
256                         break;
257                 }
258         }
259         
260         LIB_UNLOCK (nal, flags);
261
262         while (!list_empty (&cull)) {
263                 tp = list_entry (cull.next, lib_test_peer_t, tp_list);
264                 list_del (&tp->tp_list);
265                 
266                 PORTAL_FREE(tp, sizeof (*tp));
267         }
268
269         return (fail);
270 }
271
272 ptl_size_t
273 lib_iov_nob (int niov, struct iovec *iov)
274 {
275         ptl_size_t nob = 0;
276         
277         while (niov-- > 0)
278                 nob += (iov++)->iov_len;
279         
280         return (nob);
281 }
282
283 void
284 lib_copy_iov2buf (char *dest, int niov, struct iovec *iov, 
285                   ptl_size_t offset, ptl_size_t len)
286 {
287         ptl_size_t nob;
288
289         if (len == 0)
290                 return;
291         
292         /* skip complete frags before 'offset' */
293         LASSERT (niov > 0);
294         while (offset >= iov->iov_len) {
295                 offset -= iov->iov_len;
296                 iov++;
297                 niov--;
298                 LASSERT (niov > 0);
299         }
300                 
301         do {
302                 LASSERT (niov > 0);
303                 nob = MIN (iov->iov_len - offset, len);
304                 memcpy (dest, iov->iov_base + offset, nob);
305
306                 len -= nob;
307                 dest += nob;
308                 niov--;
309                 iov++;
310                 offset = 0;
311         } while (len > 0);
312 }
313
314 void
315 lib_copy_buf2iov (int niov, struct iovec *iov, ptl_size_t offset, 
316                   char *src, ptl_size_t len)
317 {
318         ptl_size_t nob;
319
320         if (len == 0)
321                 return;
322
323         /* skip complete frags before 'offset' */
324         LASSERT (niov > 0);
325         while (offset >= iov->iov_len) {
326                 offset -= iov->iov_len;
327                 iov++;
328                 niov--;
329                 LASSERT (niov > 0);
330         }
331         
332         do {
333                 LASSERT (niov > 0);
334                 nob = MIN (iov->iov_len - offset, len);
335                 memcpy (iov->iov_base + offset, src, nob);
336                 
337                 len -= nob;
338                 src += nob;
339                 niov--;
340                 iov++;
341                 offset = 0;
342         } while (len > 0);
343 }
344
345 int
346 lib_extract_iov (int dst_niov, struct iovec *dst,
347                  int src_niov, struct iovec *src,
348                  ptl_size_t offset, ptl_size_t len)
349 {
350         /* Initialise 'dst' to the subset of 'src' starting at 'offset',
351          * for exactly 'len' bytes, and return the number of entries.
352          * NB not destructive to 'src' */
353         ptl_size_t      frag_len;
354         int             niov;
355
356         if (len == 0)                           /* no data => */
357                 return (0);                     /* no frags */
358
359         LASSERT (src_niov > 0);
360         while (offset >= src->iov_len) {      /* skip initial frags */
361                 offset -= src->iov_len;
362                 src_niov--;
363                 src++;
364                 LASSERT (src_niov > 0);
365         }
366
367         niov = 1;
368         for (;;) {
369                 LASSERT (src_niov > 0);
370                 LASSERT (niov <= dst_niov);
371                 
372                 frag_len = src->iov_len - offset;
373                 dst->iov_base = ((char *)src->iov_base) + offset;
374
375                 if (len <= frag_len) {
376                         dst->iov_len = len;
377                         return (niov);
378                 }
379                 
380                 dst->iov_len = frag_len;
381
382                 len -= frag_len;
383                 dst++;
384                 src++;
385                 niov++;
386                 src_niov--;
387                 offset = 0;
388         }
389 }
390
391 #ifndef __KERNEL__
392 ptl_size_t
393 lib_kiov_nob (int niov, ptl_kiov_t *kiov) 
394 {
395         LASSERT (0);
396         return (0);
397 }
398
399 void
400 lib_copy_kiov2buf (char *dest, int niov, ptl_kiov_t *kiov, 
401                    ptl_size_t offset, ptl_size_t len)
402 {
403         LASSERT (0);
404 }
405
406 void
407 lib_copy_buf2kiov (int niov, ptl_kiov_t *kiov, ptl_size_t offset,
408                    char *src, ptl_size_t len)
409 {
410         LASSERT (0);
411 }
412
413 int
414 lib_extract_kiov (int dst_niov, ptl_kiov_t *dst, 
415                   int src_niov, ptl_kiov_t *src,
416                   ptl_size_t offset, ptl_size_t len)
417 {
418         LASSERT (0);
419 }
420
421 #else
422
423 ptl_size_t
424 lib_kiov_nob (int niov, ptl_kiov_t *kiov) 
425 {
426         ptl_size_t  nob = 0;
427
428         while (niov-- > 0)
429                 nob += (kiov++)->kiov_len;
430
431         return (nob);
432 }
433
434 void
435 lib_copy_kiov2buf (char *dest, int niov, ptl_kiov_t *kiov, 
436                    ptl_size_t offset, ptl_size_t len)
437 {
438         ptl_size_t  nob;
439         char       *addr;
440
441         if (len == 0)
442                 return;
443         
444         LASSERT (!in_interrupt ());
445
446         LASSERT (niov > 0);
447         while (offset > kiov->kiov_len) {
448                 offset -= kiov->kiov_len;
449                 kiov++;
450                 niov--;
451                 LASSERT (niov > 0);
452         }
453         
454         do{
455                 LASSERT (niov > 0);
456                 nob = MIN (kiov->kiov_len - offset, len);
457                 
458                 addr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
459                 memcpy (dest, addr, nob);
460                 kunmap (kiov->kiov_page);
461                 
462                 len -= nob;
463                 dest += nob;
464                 niov--;
465                 kiov++;
466                 offset = 0;
467         } while (len > 0);
468 }
469
470 void
471 lib_copy_buf2kiov (int niov, ptl_kiov_t *kiov, ptl_size_t offset,
472                    char *src, ptl_size_t len)
473 {
474         ptl_size_t  nob;
475         char       *addr;
476
477         if (len == 0)
478                 return;
479
480         LASSERT (!in_interrupt ());
481
482         LASSERT (niov > 0);
483         while (offset >= kiov->kiov_len) {
484                 offset -= kiov->kiov_len;
485                 kiov++;
486                 niov--;
487                 LASSERT (niov > 0);
488         }
489         
490         do {
491                 LASSERT (niov > 0);
492                 nob = MIN (kiov->kiov_len - offset, len);
493                 
494                 addr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
495                 memcpy (addr, src, nob);
496                 kunmap (kiov->kiov_page);
497                 
498                 len -= nob;
499                 src += nob;
500                 niov--;
501                 kiov++;
502                 offset = 0;
503         } while (len > 0);
504 }
505
506 int
507 lib_extract_kiov (int dst_niov, ptl_kiov_t *dst, 
508                   int src_niov, ptl_kiov_t *src,
509                   ptl_size_t offset, ptl_size_t len)
510 {
511         /* Initialise 'dst' to the subset of 'src' starting at 'offset',
512          * for exactly 'len' bytes, and return the number of entries.
513          * NB not destructive to 'src' */
514         ptl_size_t      frag_len;
515         int             niov;
516
517         if (len == 0)                           /* no data => */
518                 return (0);                     /* no frags */
519
520         LASSERT (src_niov > 0);
521         while (offset >= src->kiov_len) {      /* skip initial frags */
522                 offset -= src->kiov_len;
523                 src_niov--;
524                 src++;
525                 LASSERT (src_niov > 0);
526         }
527
528         niov = 1;
529         for (;;) {
530                 LASSERT (src_niov > 0);
531                 LASSERT (niov <= dst_niov);
532                 
533                 frag_len = src->kiov_len - offset;
534                 dst->kiov_page = src->kiov_page;
535                 dst->kiov_offset = src->kiov_offset + offset;
536
537                 if (len <= frag_len) {
538                         dst->kiov_len = len;
539                         LASSERT (dst->kiov_offset + dst->kiov_len <= PAGE_SIZE);
540                         return (niov);
541                 }
542
543                 dst->kiov_len = frag_len;
544                 LASSERT (dst->kiov_offset + dst->kiov_len <= PAGE_SIZE);
545
546                 len -= frag_len;
547                 dst++;
548                 src++;
549                 niov++;
550                 src_niov--;
551                 offset = 0;
552         }
553 }
554 #endif
555
556 ptl_err_t
557 lib_recv (lib_nal_t *nal, void *private, lib_msg_t *msg, lib_md_t *md,
558           ptl_size_t offset, ptl_size_t mlen, ptl_size_t rlen)
559 {
560         if (mlen == 0)
561                 return (nal->libnal_recv(nal, private, msg,
562                                          0, NULL,
563                                          offset, mlen, rlen));
564
565         if ((md->options & PTL_MD_KIOV) == 0)
566                 return (nal->libnal_recv(nal, private, msg,
567                                          md->md_niov, md->md_iov.iov, 
568                                          offset, mlen, rlen));
569
570         return (nal->libnal_recv_pages(nal, private, msg, 
571                                        md->md_niov, md->md_iov.kiov,
572                                        offset, mlen, rlen));
573 }
574
575 ptl_err_t
576 lib_send (lib_nal_t *nal, void *private, lib_msg_t *msg,
577           ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
578           lib_md_t *md, ptl_size_t offset, ptl_size_t len) 
579 {
580         if (len == 0)
581                 return (nal->libnal_send(nal, private, msg,
582                                          hdr, type, nid, pid,
583                                          0, NULL,
584                                          offset, len));
585         
586         if ((md->options & PTL_MD_KIOV) == 0)
587                 return (nal->libnal_send(nal, private, msg, 
588                                          hdr, type, nid, pid,
589                                          md->md_niov, md->md_iov.iov,
590                                          offset, len));
591
592         return (nal->libnal_send_pages(nal, private, msg, 
593                                        hdr, type, nid, pid,
594                                        md->md_niov, md->md_iov.kiov,
595                                        offset, len));
596 }
597
598 static void
599 lib_commit_md (lib_nal_t *nal, lib_md_t *md, lib_msg_t *msg)
600 {
601         /* ALWAYS called holding the LIB_LOCK */
602         lib_counters_t *counters = &nal->libnal_ni.ni_counters;
603
604         /* Here, we commit the MD to a network OP by marking it busy and
605          * decrementing its threshold.  Come what may, the network "owns"
606          * the MD until a call to lib_finalize() signals completion. */
607         msg->md = md;
608          
609         md->pending++;
610         if (md->threshold != PTL_MD_THRESH_INF) {
611                 LASSERT (md->threshold > 0);
612                 md->threshold--;
613         }
614
615         counters->msgs_alloc++;
616         if (counters->msgs_alloc > counters->msgs_max)
617                 counters->msgs_max = counters->msgs_alloc;
618
619         list_add (&msg->msg_list, &nal->libnal_ni.ni_active_msgs);
620 }
621
622 static void
623 lib_drop_message (lib_nal_t *nal, void *private, ptl_hdr_t *hdr)
624 {
625         unsigned long flags;
626
627         /* CAVEAT EMPTOR: this only drops messages that we've not committed
628          * to receive (init_msg() not called) and therefore can't cause an
629          * event. */
630         
631         LIB_LOCK(nal, flags);
632         nal->libnal_ni.ni_counters.drop_count++;
633         nal->libnal_ni.ni_counters.drop_length += hdr->payload_length;
634         LIB_UNLOCK(nal, flags);
635
636         /* NULL msg => if NAL calls lib_finalize it will be a noop */
637         (void) lib_recv(nal, private, NULL, NULL, 0, 0, hdr->payload_length);
638 }
639
640 /*
641  * Incoming messages have a ptl_msg_t object associated with them
642  * by the library.  This object encapsulates the state of the
643  * message and allows the NAL to do non-blocking receives or sends
644  * of long messages.
645  *
646  */
647 static ptl_err_t
648 parse_put(lib_nal_t *nal, ptl_hdr_t *hdr, void *private, lib_msg_t *msg)
649 {
650         lib_ni_t        *ni = &nal->libnal_ni;
651         ptl_size_t       mlength = 0;
652         ptl_size_t       offset = 0;
653         ptl_err_t        rc;
654         lib_md_t        *md;
655         unsigned long    flags;
656                 
657         /* Convert put fields to host byte order */
658         hdr->msg.put.match_bits = NTOH__u64 (hdr->msg.put.match_bits);
659         hdr->msg.put.ptl_index = NTOH__u32 (hdr->msg.put.ptl_index);
660         hdr->msg.put.offset = NTOH__u32 (hdr->msg.put.offset);
661
662         LIB_LOCK(nal, flags);
663
664         md = lib_match_md(nal, hdr->msg.put.ptl_index, PTL_MD_OP_PUT,
665                           hdr->src_nid, hdr->src_pid,
666                           hdr->payload_length, hdr->msg.put.offset,
667                           hdr->msg.put.match_bits, msg,
668                           &mlength, &offset);
669         if (md == NULL) {
670                 LIB_UNLOCK(nal, flags);
671                 return (PTL_FAIL);
672         }
673
674         msg->ev.type = PTL_EVENT_PUT_END;
675         msg->ev.hdr_data = hdr->msg.put.hdr_data;
676
677         if (!ptl_is_wire_handle_none(&hdr->msg.put.ack_wmd) &&
678             !(md->options & PTL_MD_ACK_DISABLE)) {
679                 msg->ack_wmd = hdr->msg.put.ack_wmd;
680         }
681
682         ni->ni_counters.recv_count++;
683         ni->ni_counters.recv_length += mlength;
684
685         LIB_UNLOCK(nal, flags);
686
687         rc = lib_recv(nal, private, msg, md, offset, mlength,
688                       hdr->payload_length);
689         if (rc != PTL_OK)
690                 CERROR(LPU64": error on receiving PUT from "LPU64": %d\n",
691                        ni->ni_pid.nid, hdr->src_nid, rc);
692
693         return (rc);
694 }
695
696 static ptl_err_t
697 parse_get(lib_nal_t *nal, ptl_hdr_t *hdr, void *private, lib_msg_t *msg)
698 {
699         lib_ni_t        *ni = &nal->libnal_ni;
700         ptl_size_t       mlength = 0;
701         ptl_size_t       offset = 0;
702         lib_md_t        *md;
703         ptl_hdr_t        reply;
704         unsigned long    flags;
705         int              rc;
706
707         /* Convert get fields to host byte order */
708         hdr->msg.get.match_bits = NTOH__u64 (hdr->msg.get.match_bits);
709         hdr->msg.get.ptl_index = NTOH__u32 (hdr->msg.get.ptl_index);
710         hdr->msg.get.sink_length = NTOH__u32 (hdr->msg.get.sink_length);
711         hdr->msg.get.src_offset = NTOH__u32 (hdr->msg.get.src_offset);
712
713         LIB_LOCK(nal, flags);
714
715         md = lib_match_md(nal, hdr->msg.get.ptl_index, PTL_MD_OP_GET,
716                           hdr->src_nid, hdr->src_pid,
717                           hdr->msg.get.sink_length, hdr->msg.get.src_offset,
718                           hdr->msg.get.match_bits, msg,
719                           &mlength, &offset);
720         if (md == NULL) {
721                 LIB_UNLOCK(nal, flags);
722                 return (PTL_FAIL);
723         }
724
725         msg->ev.type = PTL_EVENT_GET_END;
726         msg->ev.hdr_data = 0;
727
728         ni->ni_counters.send_count++;
729         ni->ni_counters.send_length += mlength;
730
731         LIB_UNLOCK(nal, flags);
732
733         memset (&reply, 0, sizeof (reply));
734         reply.type     = HTON__u32 (PTL_MSG_REPLY);
735         reply.dest_nid = HTON__u64 (hdr->src_nid);
736         reply.dest_pid = HTON__u32 (hdr->src_pid);
737         reply.src_nid  = HTON__u64 (ni->ni_pid.nid);
738         reply.src_pid  = HTON__u32 (ni->ni_pid.pid);
739         reply.payload_length = HTON__u32 (mlength);
740
741         reply.msg.reply.dst_wmd = hdr->msg.get.return_wmd;
742
743         /* NB call lib_send() _BEFORE_ lib_recv() completes the incoming
744          * message.  Some NALs _require_ this to implement optimized GET */
745
746         rc = lib_send (nal, private, msg, &reply, PTL_MSG_REPLY, 
747                        hdr->src_nid, hdr->src_pid, md, offset, mlength);
748         if (rc != PTL_OK)
749                 CERROR(LPU64": Unable to send REPLY for GET from "LPU64": %d\n",
750                        ni->ni_pid.nid, hdr->src_nid, rc);
751
752         /* Discard any junk after the hdr */
753         (void) lib_recv(nal, private, NULL, NULL, 0, 0, hdr->payload_length);
754
755         return (rc);
756 }
757
758 static ptl_err_t
759 parse_reply(lib_nal_t *nal, ptl_hdr_t *hdr, void *private, lib_msg_t *msg)
760 {
761         lib_ni_t        *ni = &nal->libnal_ni;
762         lib_md_t        *md;
763         int              rlength;
764         int              length;
765         unsigned long    flags;
766         ptl_err_t        rc;
767
768         LIB_LOCK(nal, flags);
769
770         /* NB handles only looked up by creator (no flips) */
771         md = ptl_wire_handle2md(&hdr->msg.reply.dst_wmd, nal);
772         if (md == NULL || md->threshold == 0) {
773                 CERROR (LPU64": Dropping REPLY from "LPU64" for %s MD "LPX64"."LPX64"\n",
774                         ni->ni_pid.nid, hdr->src_nid,
775                         md == NULL ? "invalid" : "inactive",
776                         hdr->msg.reply.dst_wmd.wh_interface_cookie,
777                         hdr->msg.reply.dst_wmd.wh_object_cookie);
778
779                 LIB_UNLOCK(nal, flags);
780                 return (PTL_FAIL);
781         }
782
783         LASSERT (md->offset == 0);
784
785         length = rlength = hdr->payload_length;
786
787         if (length > md->length) {
788                 if ((md->options & PTL_MD_TRUNCATE) == 0) {
789                         CERROR (LPU64": Dropping REPLY from "LPU64
790                                 " length %d for MD "LPX64" would overflow (%d)\n",
791                                 ni->ni_pid.nid, hdr->src_nid, length,
792                                 hdr->msg.reply.dst_wmd.wh_object_cookie,
793                                 md->length);
794                         LIB_UNLOCK(nal, flags);
795                         return (PTL_FAIL);
796                 }
797                 length = md->length;
798         }
799
800         CDEBUG(D_NET, "Reply from "LPU64" of length %d/%d into md "LPX64"\n",
801                hdr->src_nid, length, rlength, 
802                hdr->msg.reply.dst_wmd.wh_object_cookie);
803
804         lib_commit_md(nal, md, msg);
805
806         msg->ev.type = PTL_EVENT_REPLY_END;
807         msg->ev.initiator.nid = hdr->src_nid;
808         msg->ev.initiator.pid = hdr->src_pid;
809         msg->ev.rlength = rlength;
810         msg->ev.mlength = length;
811         msg->ev.offset = 0;
812
813         lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
814
815         ni->ni_counters.recv_count++;
816         ni->ni_counters.recv_length += length;
817
818         LIB_UNLOCK(nal, flags);
819
820         rc = lib_recv(nal, private, msg, md, 0, length, rlength);
821         if (rc != PTL_OK)
822                 CERROR(LPU64": error on receiving REPLY from "LPU64": %d\n",
823                        ni->ni_pid.nid, hdr->src_nid, rc);
824
825         return (rc);
826 }
827
828 static ptl_err_t
829 parse_ack(lib_nal_t *nal, ptl_hdr_t *hdr, void *private, lib_msg_t *msg)
830 {
831         lib_ni_t      *ni = &nal->libnal_ni;
832         lib_md_t      *md;
833         unsigned long  flags;
834
835         /* Convert ack fields to host byte order */
836         hdr->msg.ack.match_bits = NTOH__u64 (hdr->msg.ack.match_bits);
837         hdr->msg.ack.mlength = NTOH__u32 (hdr->msg.ack.mlength);
838
839         LIB_LOCK(nal, flags);
840
841         /* NB handles only looked up by creator (no flips) */
842         md = ptl_wire_handle2md(&hdr->msg.ack.dst_wmd, nal);
843         if (md == NULL || md->threshold == 0) {
844                 CDEBUG(D_INFO, LPU64": Dropping ACK from "LPU64" to %s MD "
845                        LPX64"."LPX64"\n", ni->ni_pid.nid, hdr->src_nid, 
846                        (md == NULL) ? "invalid" : "inactive",
847                        hdr->msg.ack.dst_wmd.wh_interface_cookie,
848                        hdr->msg.ack.dst_wmd.wh_object_cookie);
849
850                 LIB_UNLOCK(nal, flags);
851                 return (PTL_FAIL);
852         }
853
854         CDEBUG(D_NET, LPU64": ACK from "LPU64" into md "LPX64"\n",
855                ni->ni_pid.nid, hdr->src_nid, 
856                hdr->msg.ack.dst_wmd.wh_object_cookie);
857
858         lib_commit_md(nal, md, msg);
859
860         msg->ev.type = PTL_EVENT_ACK;
861         msg->ev.initiator.nid = hdr->src_nid;
862         msg->ev.initiator.pid = hdr->src_pid;
863         msg->ev.mlength = hdr->msg.ack.mlength;
864         msg->ev.match_bits = hdr->msg.ack.match_bits;
865
866         lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
867
868         ni->ni_counters.recv_count++;
869
870         LIB_UNLOCK(nal, flags);
871         
872         /* We have received and matched up the ack OK, create the
873          * completion event now... */
874         lib_finalize(nal, private, msg, PTL_OK);
875
876         /* ...and now discard any junk after the hdr */
877         (void) lib_recv(nal, private, NULL, NULL, 0, 0, hdr->payload_length);
878  
879        return (PTL_OK);
880 }
881
882 static char *
883 hdr_type_string (ptl_hdr_t *hdr)
884 {
885         switch (hdr->type) {
886         case PTL_MSG_ACK:
887                 return ("ACK");
888         case PTL_MSG_PUT:
889                 return ("PUT");
890         case PTL_MSG_GET:
891                 return ("GET");
892         case PTL_MSG_REPLY:
893                 return ("REPLY");
894         case PTL_MSG_HELLO:
895                 return ("HELLO");
896         default:
897                 return ("<UNKNOWN>");
898         }
899 }
900
901 void print_hdr(lib_nal_t *nal, ptl_hdr_t * hdr)
902 {
903         char *type_str = hdr_type_string (hdr);
904
905         CWARN("P3 Header at %p of type %s\n", hdr, type_str);
906         CWARN("    From nid/pid "LPX64"/%u", hdr->src_nid, hdr->src_pid);
907         CWARN("    To nid/pid "LPX64"/%u\n", hdr->dest_nid, hdr->dest_pid);
908
909         switch (hdr->type) {
910         default:
911                 break;
912
913         case PTL_MSG_PUT:
914                 CWARN("    Ptl index %d, ack md "LPX64"."LPX64", "
915                       "match bits "LPX64"\n",
916                       hdr->msg.put.ptl_index,
917                       hdr->msg.put.ack_wmd.wh_interface_cookie,
918                       hdr->msg.put.ack_wmd.wh_object_cookie,
919                       hdr->msg.put.match_bits);
920                 CWARN("    Length %d, offset %d, hdr data "LPX64"\n",
921                       hdr->payload_length, hdr->msg.put.offset,
922                       hdr->msg.put.hdr_data);
923                 break;
924
925         case PTL_MSG_GET:
926                 CWARN("    Ptl index %d, return md "LPX64"."LPX64", "
927                       "match bits "LPX64"\n", hdr->msg.get.ptl_index,
928                       hdr->msg.get.return_wmd.wh_interface_cookie,
929                       hdr->msg.get.return_wmd.wh_object_cookie,
930                       hdr->msg.get.match_bits);
931                 CWARN("    Length %d, src offset %d\n",
932                       hdr->msg.get.sink_length,
933                       hdr->msg.get.src_offset);
934                 break;
935
936         case PTL_MSG_ACK:
937                 CWARN("    dst md "LPX64"."LPX64", "
938                       "manipulated length %d\n",
939                       hdr->msg.ack.dst_wmd.wh_interface_cookie,
940                       hdr->msg.ack.dst_wmd.wh_object_cookie,
941                       hdr->msg.ack.mlength);
942                 break;
943
944         case PTL_MSG_REPLY:
945                 CWARN("    dst md "LPX64"."LPX64", "
946                       "length %d\n",
947                       hdr->msg.reply.dst_wmd.wh_interface_cookie,
948                       hdr->msg.reply.dst_wmd.wh_object_cookie,
949                       hdr->payload_length);
950         }
951
952 }                               /* end of print_hdr() */
953
954
955 ptl_err_t
956 lib_parse(lib_nal_t *nal, ptl_hdr_t *hdr, void *private)
957 {
958         unsigned long  flags;
959         ptl_err_t      rc;
960         lib_msg_t     *msg;
961
962         /* NB we return PTL_OK if we manage to parse the header and believe
963          * it looks OK.  Anything that goes wrong with receiving the
964          * message after that point is the responsibility of the NAL */
965         
966         /* convert common fields to host byte order */
967         hdr->type = NTOH__u32 (hdr->type);
968         hdr->src_nid = NTOH__u64 (hdr->src_nid);
969         hdr->src_pid = NTOH__u32 (hdr->src_pid);
970         hdr->dest_pid = NTOH__u32 (hdr->dest_pid);
971         hdr->payload_length = NTOH__u32(hdr->payload_length);
972
973         switch (hdr->type) {
974         case PTL_MSG_HELLO: {
975                 /* dest_nid is really ptl_magicversion_t */
976                 ptl_magicversion_t *mv = (ptl_magicversion_t *)&hdr->dest_nid;
977
978                 mv->magic = NTOH__u32(mv->magic);
979                 mv->version_major = NTOH__u16(mv->version_major);
980                 mv->version_minor = NTOH__u16(mv->version_minor);
981
982                 if (mv->magic == PORTALS_PROTO_MAGIC &&
983                     mv->version_major == PORTALS_PROTO_VERSION_MAJOR &&
984                     mv->version_minor == PORTALS_PROTO_VERSION_MINOR) {
985                         CWARN (LPU64": Dropping unexpected HELLO message: "
986                                "magic %d, version %d.%d from "LPD64"\n",
987                                nal->libnal_ni.ni_pid.nid, mv->magic, 
988                                mv->version_major, mv->version_minor,
989                                hdr->src_nid);
990
991                         /* it's good but we don't want it */
992                         lib_drop_message(nal, private, hdr);
993                         return PTL_OK;
994                 }
995
996                 /* we got garbage */
997                 CERROR (LPU64": Bad HELLO message: "
998                         "magic %d, version %d.%d from "LPD64"\n",
999                         nal->libnal_ni.ni_pid.nid, mv->magic, 
1000                         mv->version_major, mv->version_minor,
1001                         hdr->src_nid);
1002                 return PTL_FAIL;
1003         }
1004
1005         case PTL_MSG_ACK:
1006         case PTL_MSG_PUT:
1007         case PTL_MSG_GET:
1008         case PTL_MSG_REPLY:
1009                 hdr->dest_nid = NTOH__u64 (hdr->dest_nid);
1010                 if (hdr->dest_nid != nal->libnal_ni.ni_pid.nid) {
1011                         CERROR(LPU64": BAD dest NID in %s message from"
1012                                LPU64" to "LPU64" (not me)\n", 
1013                                nal->libnal_ni.ni_pid.nid, hdr_type_string (hdr),
1014                                hdr->src_nid, hdr->dest_nid);
1015                         return PTL_FAIL;
1016                 }
1017                 break;
1018
1019         default:
1020                 CERROR(LPU64": Bad message type 0x%x from "LPU64"\n",
1021                        nal->libnal_ni.ni_pid.nid, hdr->type, hdr->src_nid);
1022                 return PTL_FAIL;
1023         }
1024
1025         /* We've decided we're not receiving garbage since we can parse the
1026          * header.  We will return PTL_OK come what may... */
1027
1028         if (!list_empty (&nal->libnal_ni.ni_test_peers) && /* normally we don't */
1029             fail_peer (nal, hdr->src_nid, 0))      /* shall we now? */
1030         {
1031                 CERROR(LPU64": Dropping incoming %s from "LPU64
1032                        ": simulated failure\n",
1033                        nal->libnal_ni.ni_pid.nid, hdr_type_string (hdr), 
1034                        hdr->src_nid);
1035                 lib_drop_message(nal, private, hdr);
1036                 return PTL_OK;
1037         }
1038
1039         msg = lib_msg_alloc(nal);
1040         if (msg == NULL) {
1041                 CERROR(LPU64": Dropping incoming %s from "LPU64
1042                        ": can't allocate a lib_msg_t\n",
1043                        nal->libnal_ni.ni_pid.nid, hdr_type_string (hdr), 
1044                        hdr->src_nid);
1045                 lib_drop_message(nal, private, hdr);
1046                 return PTL_OK;
1047         }
1048
1049         switch (hdr->type) {
1050         case PTL_MSG_ACK:
1051                 rc = parse_ack(nal, hdr, private, msg);
1052                 break;
1053         case PTL_MSG_PUT:
1054                 rc = parse_put(nal, hdr, private, msg);
1055                 break;
1056         case PTL_MSG_GET:
1057                 rc = parse_get(nal, hdr, private, msg);
1058                 break;
1059         case PTL_MSG_REPLY:
1060                 rc = parse_reply(nal, hdr, private, msg);
1061                 break;
1062         default:
1063                 LASSERT(0);
1064                 rc = PTL_FAIL;                  /* no compiler warning please */
1065                 break;
1066         }
1067                 
1068         if (rc != PTL_OK) {
1069                 if (msg->md != NULL) {
1070                         /* committed... */
1071                         lib_finalize(nal, private, msg, rc);
1072                 } else {
1073                         LIB_LOCK(nal, flags);
1074                         lib_msg_free(nal, msg); /* expects LIB_LOCK held */
1075                         LIB_UNLOCK(nal, flags);
1076
1077                         lib_drop_message(nal, private, hdr);
1078                 }
1079         }
1080
1081         return PTL_OK;
1082         /* That's "OK I can parse it", not "OK I like it" :) */
1083 }
1084
1085 int 
1086 lib_api_put(nal_t *apinal, ptl_handle_md_t *mdh, 
1087             ptl_ack_req_t ack, ptl_process_id_t *id,
1088             ptl_pt_index_t portal, ptl_ac_index_t ac,
1089             ptl_match_bits_t match_bits, 
1090             ptl_size_t offset, ptl_hdr_data_t hdr_data)
1091 {
1092         lib_nal_t        *nal = apinal->nal_data;
1093         lib_ni_t         *ni = &nal->libnal_ni;
1094         lib_msg_t        *msg;
1095         ptl_hdr_t         hdr;
1096         lib_md_t         *md;
1097         unsigned long     flags;
1098         int               rc;
1099         
1100         if (!list_empty (&ni->ni_test_peers) && /* normally we don't */
1101             fail_peer (nal, id->nid, 1))           /* shall we now? */
1102         {
1103                 CERROR("Dropping PUT to "LPU64": simulated failure\n",
1104                        id->nid);
1105                 return PTL_PROCESS_INVALID;
1106         }
1107
1108         msg = lib_msg_alloc(nal);
1109         if (msg == NULL) {
1110                 CERROR(LPU64": Dropping PUT to "LPU64": ENOMEM on lib_msg_t\n",
1111                        ni->ni_pid.nid, id->nid);
1112                 return PTL_NO_SPACE;
1113         }
1114
1115         LIB_LOCK(nal, flags);
1116
1117         md = ptl_handle2md(mdh, nal);
1118         if (md == NULL || md->threshold == 0) {
1119                 lib_msg_free(nal, msg);
1120                 LIB_UNLOCK(nal, flags);
1121         
1122                 return PTL_MD_INVALID;
1123         }
1124
1125         CDEBUG(D_NET, "PtlPut -> "LPX64"\n", id->nid);
1126
1127         memset (&hdr, 0, sizeof (hdr));
1128         hdr.type     = HTON__u32 (PTL_MSG_PUT);
1129         hdr.dest_nid = HTON__u64 (id->nid);
1130         hdr.dest_pid = HTON__u32 (id->pid);
1131         hdr.src_nid  = HTON__u64 (ni->ni_pid.nid);
1132         hdr.src_pid  = HTON__u32 (ni->ni_pid.pid);
1133         hdr.payload_length = HTON__u32 (md->length);
1134
1135         /* NB handles only looked up by creator (no flips) */
1136         if (ack == PTL_ACK_REQ) {
1137                 hdr.msg.put.ack_wmd.wh_interface_cookie = ni->ni_interface_cookie;
1138                 hdr.msg.put.ack_wmd.wh_object_cookie = md->md_lh.lh_cookie;
1139         } else {
1140                 hdr.msg.put.ack_wmd = PTL_WIRE_HANDLE_NONE;
1141         }
1142
1143         hdr.msg.put.match_bits = HTON__u64 (match_bits);
1144         hdr.msg.put.ptl_index = HTON__u32 (portal);
1145         hdr.msg.put.offset = HTON__u32 (offset);
1146         hdr.msg.put.hdr_data = hdr_data;
1147
1148         lib_commit_md(nal, md, msg);
1149         
1150         msg->ev.type = PTL_EVENT_SEND_END;
1151         msg->ev.initiator.nid = ni->ni_pid.nid;
1152         msg->ev.initiator.pid = ni->ni_pid.pid;
1153         msg->ev.portal = portal;
1154         msg->ev.match_bits = match_bits;
1155         msg->ev.rlength = md->length;
1156         msg->ev.mlength = md->length;
1157         msg->ev.offset = offset;
1158         msg->ev.hdr_data = hdr_data;
1159
1160         lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
1161
1162         ni->ni_counters.send_count++;
1163         ni->ni_counters.send_length += md->length;
1164
1165         LIB_UNLOCK(nal, flags);
1166         
1167         rc = lib_send (nal, NULL, msg, &hdr, PTL_MSG_PUT,
1168                        id->nid, id->pid, md, 0, md->length);
1169         if (rc != PTL_OK) {
1170                 CERROR("Error sending PUT to "LPX64": %d\n",
1171                        id->nid, rc);
1172                 lib_finalize (nal, NULL, msg, rc);
1173         }
1174         
1175         /* completion will be signalled by an event */
1176         return PTL_OK;
1177 }
1178
1179 lib_msg_t * 
1180 lib_create_reply_msg (lib_nal_t *nal, ptl_nid_t peer_nid, lib_msg_t *getmsg)
1181 {
1182         /* The NAL can DMA direct to the GET md (i.e. no REPLY msg).  This
1183          * returns a msg for the NAL to pass to lib_finalize() when the sink
1184          * data has been received.
1185          *
1186          * CAVEAT EMPTOR: 'getmsg' is the original GET, which is freed when
1187          * lib_finalize() is called on it, so the NAL must call this first */
1188
1189         lib_ni_t        *ni = &nal->libnal_ni;
1190         lib_msg_t       *msg = lib_msg_alloc(nal);
1191         lib_md_t        *getmd = getmsg->md;
1192         unsigned long    flags;
1193
1194         LIB_LOCK(nal, flags);
1195
1196         LASSERT (getmd->pending > 0);
1197
1198         if (msg == NULL) {
1199                 CERROR ("Dropping REPLY from "LPU64": can't allocate msg\n",
1200                         peer_nid);
1201                 goto drop;
1202         }
1203
1204         if (getmd->threshold == 0) {
1205                 CERROR ("Dropping REPLY from "LPU64" for inactive MD %p\n",
1206                         peer_nid, getmd);
1207                 goto drop_msg;
1208         }
1209
1210         LASSERT (getmd->offset == 0);
1211
1212         CDEBUG(D_NET, "Reply from "LPU64" md %p\n", peer_nid, getmd);
1213
1214         lib_commit_md (nal, getmd, msg);
1215
1216         msg->ev.type = PTL_EVENT_REPLY_END;
1217         msg->ev.initiator.nid = peer_nid;
1218         msg->ev.initiator.pid = 0;      /* XXX FIXME!!! */
1219         msg->ev.rlength = msg->ev.mlength = getmd->length;
1220         msg->ev.offset = 0;
1221
1222         lib_md_deconstruct(nal, getmd, &msg->ev.mem_desc);
1223
1224         ni->ni_counters.recv_count++;
1225         ni->ni_counters.recv_length += getmd->length;
1226
1227         LIB_UNLOCK(nal, flags);
1228
1229         return msg;
1230
1231  drop_msg:
1232         lib_msg_free(nal, msg);
1233  drop:
1234         nal->libnal_ni.ni_counters.drop_count++;
1235         nal->libnal_ni.ni_counters.drop_length += getmd->length;
1236
1237         LIB_UNLOCK (nal, flags);
1238
1239         return NULL;
1240 }
1241
1242 int 
1243 lib_api_get(nal_t *apinal, ptl_handle_md_t *mdh, ptl_process_id_t *id,
1244             ptl_pt_index_t portal, ptl_ac_index_t ac,
1245             ptl_match_bits_t match_bits, ptl_size_t offset)
1246 {
1247         lib_nal_t        *nal = apinal->nal_data;
1248         lib_ni_t         *ni = &nal->libnal_ni;
1249         lib_msg_t        *msg;
1250         ptl_hdr_t         hdr;
1251         lib_md_t         *md;
1252         unsigned long     flags;
1253         int               rc;
1254         
1255         if (!list_empty (&ni->ni_test_peers) && /* normally we don't */
1256             fail_peer (nal, id->nid, 1))           /* shall we now? */
1257         {
1258                 CERROR("Dropping PUT to "LPX64": simulated failure\n",
1259                        id->nid);
1260                 return PTL_PROCESS_INVALID;
1261         }
1262
1263         msg = lib_msg_alloc(nal);
1264         if (msg == NULL) {
1265                 CERROR("Dropping GET to "LPU64": ENOMEM on lib_msg_t\n",
1266                        id->nid);
1267                 return PTL_NO_SPACE;
1268         }
1269
1270         LIB_LOCK(nal, flags);
1271
1272         md = ptl_handle2md(mdh, nal);
1273         if (md == NULL || !md->threshold) {
1274                 lib_msg_free(nal, msg);
1275                 LIB_UNLOCK(nal, flags);
1276
1277                 return PTL_MD_INVALID;
1278         }
1279
1280         CDEBUG(D_NET, "PtlGet -> %Lu: %lu\n", (unsigned long long)id->nid,
1281                (unsigned long)id->pid);
1282
1283         memset (&hdr, 0, sizeof (hdr));
1284         hdr.type     = HTON__u32 (PTL_MSG_GET);
1285         hdr.dest_nid = HTON__u64 (id->nid);
1286         hdr.dest_pid = HTON__u32 (id->pid);
1287         hdr.src_nid  = HTON__u64 (ni->ni_pid.nid);
1288         hdr.src_pid  = HTON__u32 (ni->ni_pid.pid);
1289         hdr.payload_length = 0;
1290
1291         /* NB handles only looked up by creator (no flips) */
1292         hdr.msg.get.return_wmd.wh_interface_cookie = ni->ni_interface_cookie;
1293         hdr.msg.get.return_wmd.wh_object_cookie = md->md_lh.lh_cookie;
1294
1295         hdr.msg.get.match_bits = HTON__u64 (match_bits);
1296         hdr.msg.get.ptl_index = HTON__u32 (portal);
1297         hdr.msg.get.src_offset = HTON__u32 (offset);
1298         hdr.msg.get.sink_length = HTON__u32 (md->length);
1299
1300         lib_commit_md(nal, md, msg);
1301
1302         msg->ev.type = PTL_EVENT_SEND_END;
1303         msg->ev.initiator = ni->ni_pid;
1304         msg->ev.portal = portal;
1305         msg->ev.match_bits = match_bits;
1306         msg->ev.rlength = md->length;
1307         msg->ev.mlength = md->length;
1308         msg->ev.offset = offset;
1309         msg->ev.hdr_data = 0;
1310
1311         lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
1312
1313         ni->ni_counters.send_count++;
1314
1315         LIB_UNLOCK(nal, flags);
1316
1317         rc = lib_send (nal, NULL, msg, &hdr, PTL_MSG_GET,
1318                        id->nid, id->pid, NULL, 0, 0);
1319         if (rc != PTL_OK) {
1320                 CERROR(LPU64": error sending GET to "LPU64": %d\n",
1321                        ni->ni_pid.nid, id->nid, rc);
1322                 lib_finalize (nal, NULL, msg, rc);
1323         }
1324         
1325         /* completion will be signalled by an event */
1326         return PTL_OK;
1327 }
1328
1329 void lib_assert_wire_constants (void)
1330 {
1331         /* Wire protocol assertions generated by 'wirecheck'
1332          * running on Linux robert.bartonsoftware.com 2.4.20-18.9 #1 Thu May 29 06:54:41 EDT 2003 i68
1333          * with gcc version 3.2.2 20030222 (Red Hat Linux 3.2.2-5) */
1334
1335
1336         /* Constants... */
1337         LASSERT (PORTALS_PROTO_MAGIC == 0xeebc0ded);
1338         LASSERT (PORTALS_PROTO_VERSION_MAJOR == 0);
1339         LASSERT (PORTALS_PROTO_VERSION_MINOR == 3);
1340         LASSERT (PTL_MSG_ACK == 0);
1341         LASSERT (PTL_MSG_PUT == 1);
1342         LASSERT (PTL_MSG_GET == 2);
1343         LASSERT (PTL_MSG_REPLY == 3);
1344         LASSERT (PTL_MSG_HELLO == 4);
1345
1346         /* Checks for struct ptl_handle_wire_t */
1347         LASSERT ((int)sizeof(ptl_handle_wire_t) == 16);
1348         LASSERT (offsetof(ptl_handle_wire_t, wh_interface_cookie) == 0);
1349         LASSERT ((int)sizeof(((ptl_handle_wire_t *)0)->wh_interface_cookie) == 8);
1350         LASSERT (offsetof(ptl_handle_wire_t, wh_object_cookie) == 8);
1351         LASSERT ((int)sizeof(((ptl_handle_wire_t *)0)->wh_object_cookie) == 8);
1352
1353         /* Checks for struct ptl_magicversion_t */
1354         LASSERT ((int)sizeof(ptl_magicversion_t) == 8);
1355         LASSERT (offsetof(ptl_magicversion_t, magic) == 0);
1356         LASSERT ((int)sizeof(((ptl_magicversion_t *)0)->magic) == 4);
1357         LASSERT (offsetof(ptl_magicversion_t, version_major) == 4);
1358         LASSERT ((int)sizeof(((ptl_magicversion_t *)0)->version_major) == 2);
1359         LASSERT (offsetof(ptl_magicversion_t, version_minor) == 6);
1360         LASSERT ((int)sizeof(((ptl_magicversion_t *)0)->version_minor) == 2);
1361
1362         /* Checks for struct ptl_hdr_t */
1363         LASSERT ((int)sizeof(ptl_hdr_t) == 72);
1364         LASSERT (offsetof(ptl_hdr_t, dest_nid) == 0);
1365         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->dest_nid) == 8);
1366         LASSERT (offsetof(ptl_hdr_t, src_nid) == 8);
1367         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->src_nid) == 8);
1368         LASSERT (offsetof(ptl_hdr_t, dest_pid) == 16);
1369         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->dest_pid) == 4);
1370         LASSERT (offsetof(ptl_hdr_t, src_pid) == 20);
1371         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->src_pid) == 4);
1372         LASSERT (offsetof(ptl_hdr_t, type) == 24);
1373         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->type) == 4);
1374         LASSERT (offsetof(ptl_hdr_t, payload_length) == 28);
1375         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->payload_length) == 4);
1376         LASSERT (offsetof(ptl_hdr_t, msg) == 32);
1377         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg) == 40);
1378
1379         /* Ack */
1380         LASSERT (offsetof(ptl_hdr_t, msg.ack.dst_wmd) == 32);
1381         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.ack.dst_wmd) == 16);
1382         LASSERT (offsetof(ptl_hdr_t, msg.ack.match_bits) == 48);
1383         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.ack.match_bits) == 8);
1384         LASSERT (offsetof(ptl_hdr_t, msg.ack.mlength) == 56);
1385         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.ack.mlength) == 4);
1386
1387         /* Put */
1388         LASSERT (offsetof(ptl_hdr_t, msg.put.ack_wmd) == 32);
1389         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.put.ack_wmd) == 16);
1390         LASSERT (offsetof(ptl_hdr_t, msg.put.match_bits) == 48);
1391         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.put.match_bits) == 8);
1392         LASSERT (offsetof(ptl_hdr_t, msg.put.hdr_data) == 56);
1393         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.put.hdr_data) == 8);
1394         LASSERT (offsetof(ptl_hdr_t, msg.put.ptl_index) == 64);
1395         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.put.ptl_index) == 4);
1396         LASSERT (offsetof(ptl_hdr_t, msg.put.offset) == 68);
1397         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.put.offset) == 4);
1398
1399         /* Get */
1400         LASSERT (offsetof(ptl_hdr_t, msg.get.return_wmd) == 32);
1401         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.get.return_wmd) == 16);
1402         LASSERT (offsetof(ptl_hdr_t, msg.get.match_bits) == 48);
1403         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.get.match_bits) == 8);
1404         LASSERT (offsetof(ptl_hdr_t, msg.get.ptl_index) == 56);
1405         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.get.ptl_index) == 4);
1406         LASSERT (offsetof(ptl_hdr_t, msg.get.src_offset) == 60);
1407         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.get.src_offset) == 4);
1408         LASSERT (offsetof(ptl_hdr_t, msg.get.sink_length) == 64);
1409         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.get.sink_length) == 4);
1410
1411         /* Reply */
1412         LASSERT (offsetof(ptl_hdr_t, msg.reply.dst_wmd) == 32);
1413         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.reply.dst_wmd) == 16);
1414
1415         /* Hello */
1416         LASSERT (offsetof(ptl_hdr_t, msg.hello.incarnation) == 32);
1417         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.hello.incarnation) == 8);
1418         LASSERT (offsetof(ptl_hdr_t, msg.hello.type) == 40);
1419         LASSERT ((int)sizeof(((ptl_hdr_t *)0)->msg.hello.type) == 4);
1420 }