Whamcloud - gitweb
0114c42a621b095d5ffcfbccd7ae7852698bcfb8
[fs/lustre-release.git] / lnet / ulnds / ptllnd / ptllnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved.
5  *   Author: Eric Barton <eeb@bartonsoftware.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   This file is confidential source code owned by Cluster File Systems.
11  *   No viewing, modification, compilation, redistribution, or any other
12  *   form of use is permitted except through a signed license agreement.
13  *
14  *   If you have not signed such an agreement, then you have no rights to
15  *   this file.  Please destroy it immediately and contact CFS.
16  *
17  */
18
19 #include "ptllnd.h"
20
21 char *
22 ptllnd_ptlid2str(ptl_process_id_t id)
23 {
24         static char strs[8][32];
25         static int  idx = 0;
26
27         char   *str = strs[idx++];
28         
29         if (idx >= sizeof(strs)/sizeof(strs[0]))
30                 idx = 0;
31
32         snprintf(str, sizeof(strs[0]), FMT_PTLID, id.pid, id.nid);
33         return str;
34 }
35
36 void
37 ptllnd_destroy_peer(ptllnd_peer_t *peer)
38 {
39         lnet_ni_t         *ni = peer->plp_ni;
40         ptllnd_ni_t       *plni = ni->ni_data;
41
42         LASSERT (peer->plp_closing);
43         LASSERT (plni->plni_npeers > 0);
44         LASSERT (list_empty(&peer->plp_txq));
45         LASSERT (list_empty(&peer->plp_activeq));
46         plni->plni_npeers--;
47         LIBCFS_FREE(peer, sizeof(*peer));
48 }
49
50 void
51 ptllnd_abort_txs(ptllnd_ni_t *plni, struct list_head *q)
52 {
53         while (!list_empty(q)) {
54                 ptllnd_tx_t *tx = list_entry(q->next, ptllnd_tx_t, tx_list);
55
56                 tx->tx_status = -ESHUTDOWN;
57                 list_del(&tx->tx_list);
58                 list_add_tail(&tx->tx_list, &plni->plni_zombie_txs);
59         }
60 }
61
62 void
63 ptllnd_close_peer(ptllnd_peer_t *peer, int error)
64 {
65         lnet_ni_t   *ni = peer->plp_ni;
66         ptllnd_ni_t *plni = ni->ni_data;
67
68         if (peer->plp_closing)
69                 return;
70
71         peer->plp_closing = 1;
72
73         if (!list_empty(&peer->plp_txq) ||
74             !list_empty(&peer->plp_activeq) ||
75             error != 0) {
76                 CERROR("Closing %s\n", libcfs_id2str(peer->plp_id));
77                 ptllnd_debug_peer(ni, peer->plp_id);
78         }
79         
80         ptllnd_abort_txs(plni, &peer->plp_txq);
81         ptllnd_abort_txs(plni, &peer->plp_activeq);
82
83         list_del(&peer->plp_list);
84         ptllnd_peer_decref(peer);
85 }
86
87 ptllnd_peer_t *
88 ptllnd_find_peer(lnet_ni_t *ni, lnet_process_id_t id, int create)
89 {
90         ptllnd_ni_t       *plni = ni->ni_data;
91         unsigned int       hash = LNET_NIDADDR(id.nid) % plni->plni_peer_hash_size;
92         struct list_head  *tmp;
93         ptllnd_peer_t     *plp;
94         ptllnd_tx_t       *tx;
95         int                rc;
96
97         LASSERT (LNET_NIDNET(id.nid) == LNET_NIDNET(ni->ni_nid));
98
99         list_for_each(tmp, &plni->plni_peer_hash[hash]) {
100                 plp = list_entry(tmp, ptllnd_peer_t, plp_list);
101
102                 if (plp->plp_id.nid == id.nid &&
103                     plp->plp_id.pid == id.pid) {
104                         ptllnd_peer_addref(plp);
105                         return plp;
106                 }
107         }
108
109         if (!create)
110                 return NULL;
111
112         /* New peer: check first for enough posted buffers */
113         plni->plni_npeers++;
114         rc = ptllnd_grow_buffers(ni);
115         if (rc != 0) {
116                 plni->plni_npeers--;
117                 return NULL;
118         }
119
120         LIBCFS_ALLOC(plp, sizeof(*plp));
121         if (plp == NULL) {
122                 CERROR("Can't allocate new peer %s\n", libcfs_id2str(id));
123                 plni->plni_npeers--;
124                 return NULL;
125         }
126
127         CDEBUG(D_NET, "new peer=%p\n",plp);
128
129         plp->plp_ni = ni;
130         plp->plp_id = id;
131         plp->plp_ptlid.nid = LNET_NIDADDR(id.nid);
132         plp->plp_ptlid.pid = plni->plni_ptllnd_pid;
133         plp->plp_max_credits =
134         plp->plp_credits = 1; /* add more later when she gives me credits */
135         plp->plp_max_msg_size = plni->plni_max_msg_size; /* until I hear from her */
136         plp->plp_outstanding_credits = plni->plni_peer_credits - 1;
137         plp->plp_match = 0;
138         plp->plp_stamp = 0;
139         plp->plp_recvd_hello = 0;
140         plp->plp_closing = 0;
141         plp->plp_refcount = 1;
142         CFS_INIT_LIST_HEAD(&plp->plp_list);
143         CFS_INIT_LIST_HEAD(&plp->plp_txq);
144         CFS_INIT_LIST_HEAD(&plp->plp_activeq);
145
146         ptllnd_peer_addref(plp);
147         list_add_tail(&plp->plp_list, &plni->plni_peer_hash[hash]);
148
149         tx = ptllnd_new_tx(plp, PTLLND_MSG_TYPE_HELLO, 0);
150         if (tx == NULL) {
151                 CERROR("Can't send HELLO to %s\n", libcfs_id2str(id));
152                 ptllnd_close_peer(plp, -ENOMEM);
153                 ptllnd_peer_decref(plp);
154                 return NULL;
155         }
156
157         tx->tx_msg.ptlm_u.hello.kptlhm_matchbits = PTL_RESERVED_MATCHBITS;
158         tx->tx_msg.ptlm_u.hello.kptlhm_max_msg_size = plni->plni_max_msg_size;
159
160         ptllnd_post_tx(tx);
161
162         return plp;
163 }
164
165 int
166 ptllnd_count_q(struct list_head *q)
167 {
168         struct list_head *e;
169         int               n = 0;
170         
171         list_for_each(e, q) {
172                 n++;
173         }
174         
175         return n;
176 }
177
178 const char *
179 ptllnd_tx_typestr(int type) 
180 {
181         switch (type) {
182         case PTLLND_RDMA_WRITE:
183                 return "rdma_write";
184                 
185         case PTLLND_RDMA_READ:
186                 return "rdma_read";
187
188         case PTLLND_MSG_TYPE_PUT:
189                 return "put_req";
190                 
191         case PTLLND_MSG_TYPE_GET:
192                 return "get_req";
193
194         case PTLLND_MSG_TYPE_IMMEDIATE:
195                 return "immediate";
196
197         case PTLLND_MSG_TYPE_NOOP:
198                 return "noop";
199
200         case PTLLND_MSG_TYPE_HELLO:
201                 return "hello";
202
203         default:
204                 return "<unknown>";
205         }
206 }
207
208 void
209 ptllnd_debug_tx(ptllnd_tx_t *tx) 
210 {
211         CDEBUG(D_WARNING, "%s %s b "DBGT_FMT"/"DBGT_FMT
212                " r "DBGT_FMT"/"DBGT_FMT" status %d\n",
213                ptllnd_tx_typestr(tx->tx_type),
214                libcfs_id2str(tx->tx_peer->plp_id)
215                DBGT_ARGS(tx->tx_bulk_posted) DBGT_ARGS(tx->tx_bulk_done)
216                DBGT_ARGS(tx->tx_req_posted) DBGT_ARGS(tx->tx_req_done),
217                tx->tx_status);
218 }
219
220 void
221 ptllnd_debug_peer(lnet_ni_t *ni, lnet_process_id_t id)
222 {
223         ptllnd_peer_t    *plp = ptllnd_find_peer(ni, id, 0);
224         struct list_head *tmp;
225         ptllnd_ni_t      *plni = ni->ni_data;
226         ptllnd_tx_t      *tx;
227         
228         if (plp == NULL) {
229                 CDEBUG(D_WARNING, "No peer %s\n", libcfs_id2str(id));
230                 return;
231         }
232         
233         CDEBUG(D_WARNING, "%s %s%s [%d] "LPD64".%06d m "LPD64" q %d/%d c %d/%d(%d)\n",
234                libcfs_id2str(id), 
235                plp->plp_recvd_hello ? "H" : "_",
236                plp->plp_closing     ? "C" : "_",
237                plp->plp_refcount,
238                plp->plp_stamp / 1000000, (int)(plp->plp_stamp % 1000000),
239                plp->plp_match,
240                ptllnd_count_q(&plp->plp_txq),
241                ptllnd_count_q(&plp->plp_activeq),
242                plp->plp_credits, plp->plp_outstanding_credits, plp->plp_max_credits);
243
244         CDEBUG(D_WARNING, "txq:\n");
245         list_for_each (tmp, &plp->plp_txq) {
246                 tx = list_entry(tmp, ptllnd_tx_t, tx_list);
247                 
248                 ptllnd_debug_tx(tx);
249         }
250
251         CDEBUG(D_WARNING, "activeq:\n");
252         list_for_each (tmp, &plp->plp_activeq) {
253                 tx = list_entry(tmp, ptllnd_tx_t, tx_list);
254                 
255                 ptllnd_debug_tx(tx);
256         }
257
258         CDEBUG(D_WARNING, "zombies:\n");
259         list_for_each (tmp, &plni->plni_zombie_txs) {
260                 tx = list_entry(tmp, ptllnd_tx_t, tx_list);
261                 
262                 if (tx->tx_peer->plp_id.nid == id.nid &&
263                     tx->tx_peer->plp_id.pid == id.pid)
264                         ptllnd_debug_tx(tx);
265         }
266         
267         CDEBUG(D_WARNING, "history:\n");
268         list_for_each (tmp, &plni->plni_tx_history) {
269                 tx = list_entry(tmp, ptllnd_tx_t, tx_list);
270                 
271                 if (tx->tx_peer->plp_id.nid == id.nid &&
272                     tx->tx_peer->plp_id.pid == id.pid)
273                         ptllnd_debug_tx(tx);
274         }
275         
276         ptllnd_peer_decref(plp);
277 }
278
279 void
280 ptllnd_notify(lnet_ni_t *ni, lnet_nid_t nid, int alive)
281 {
282         lnet_process_id_t  id;
283         ptllnd_peer_t     *peer;
284         time_t             start = cfs_time_current_sec();
285         int                w = PTLLND_WARN_LONG_WAIT;
286         
287         /* This is only actually used to connect to routers at startup! */
288         if (!alive) {
289                 LBUG();
290                 return;
291         }
292
293         id.nid = nid;
294         id.pid = LUSTRE_SRV_LNET_PID;
295         
296         peer = ptllnd_find_peer(ni, id, 1);
297         if (peer == NULL)
298                 return;
299
300         /* wait for the peer to reply */
301         while (!peer->plp_recvd_hello) {
302                 if (cfs_time_current_sec() > start + w) {
303                         CWARN("Waited %ds to connect to %s\n",
304                               w, libcfs_id2str(id));
305                         w *= 2;
306                 }
307                 
308                 ptllnd_wait(ni, w*1000);
309         }
310         
311         ptllnd_peer_decref(peer);
312 }
313
314 __u32
315 ptllnd_cksum (void *ptr, int nob)
316 {
317         char  *c  = ptr;
318         __u32  sum = 0;
319
320         while (nob-- > 0)
321                 sum = ((sum << 1) | (sum >> 31)) + *c++;
322
323         /* ensure I don't return 0 (== no checksum) */
324         return (sum == 0) ? 1 : sum;
325 }
326
327 ptllnd_tx_t *
328 ptllnd_new_tx(ptllnd_peer_t *peer, int type, int payload_nob)
329 {
330         lnet_ni_t   *ni = peer->plp_ni;
331         ptllnd_ni_t *plni = ni->ni_data;
332         ptllnd_tx_t *tx;
333         int          msgsize;
334
335         CDEBUG(D_NET, "peer=%p type=%d payload=%d\n",peer,type,payload_nob);
336
337         switch (type) {
338         default:
339                 LBUG();
340
341         case PTLLND_RDMA_WRITE:
342         case PTLLND_RDMA_READ:
343                 LASSERT (payload_nob == 0);
344                 msgsize = 0;
345                 break;
346
347         case PTLLND_MSG_TYPE_PUT:
348         case PTLLND_MSG_TYPE_GET:
349                 LASSERT (payload_nob == 0);
350                 msgsize = offsetof(kptl_msg_t, ptlm_u) + 
351                           sizeof(kptl_rdma_msg_t);
352                 break;
353
354         case PTLLND_MSG_TYPE_IMMEDIATE:
355                 msgsize = offsetof(kptl_msg_t,
356                                    ptlm_u.immediate.kptlim_payload[payload_nob]);
357                 break;
358
359         case PTLLND_MSG_TYPE_NOOP:
360                 LASSERT (payload_nob == 0);
361                 msgsize = offsetof(kptl_msg_t, ptlm_u);
362                 break;
363
364         case PTLLND_MSG_TYPE_HELLO:
365                 LASSERT (payload_nob == 0);
366                 msgsize = offsetof(kptl_msg_t, ptlm_u) +
367                           sizeof(kptl_hello_msg_t);
368                 break;
369         }
370
371         msgsize = (msgsize + 7) & ~7;
372         LASSERT (msgsize <= peer->plp_max_msg_size);
373
374         CDEBUG(D_NET, "msgsize=%d\n",msgsize);
375
376         LIBCFS_ALLOC(tx, offsetof(ptllnd_tx_t, tx_msg) + msgsize);
377
378         if (tx == NULL) {
379                 CERROR("Can't allocate msg type %d for %s\n",
380                        type, libcfs_id2str(peer->plp_id));
381                 return NULL;
382         }
383
384         CFS_INIT_LIST_HEAD(&tx->tx_list);
385         tx->tx_peer = peer;
386         tx->tx_type = type;
387         tx->tx_lnetmsg = tx->tx_lnetreplymsg = NULL;
388         tx->tx_niov = 0;
389         tx->tx_iov = NULL;
390         tx->tx_reqmdh = PTL_INVALID_HANDLE;
391         tx->tx_bulkmdh = PTL_INVALID_HANDLE;
392         tx->tx_msgsize = msgsize;
393         tx->tx_completing = 0;
394         tx->tx_status = 0;
395
396         PTLLND_DBGT_INIT(tx->tx_bulk_posted);
397         PTLLND_DBGT_INIT(tx->tx_bulk_done);
398         PTLLND_DBGT_INIT(tx->tx_req_posted);
399         PTLLND_DBGT_INIT(tx->tx_req_done);
400
401         if (msgsize != 0) {
402                 tx->tx_msg.ptlm_magic = PTLLND_MSG_MAGIC;
403                 tx->tx_msg.ptlm_version = PTLLND_MSG_VERSION;
404                 tx->tx_msg.ptlm_type = type;
405                 tx->tx_msg.ptlm_credits = 0;
406                 tx->tx_msg.ptlm_nob = msgsize;
407                 tx->tx_msg.ptlm_cksum = 0;
408                 tx->tx_msg.ptlm_srcnid = ni->ni_nid;
409                 tx->tx_msg.ptlm_srcstamp = plni->plni_stamp;
410                 tx->tx_msg.ptlm_dstnid = peer->plp_id.nid;
411                 tx->tx_msg.ptlm_dststamp = peer->plp_stamp;
412                 tx->tx_msg.ptlm_srcpid = the_lnet.ln_pid;
413                 tx->tx_msg.ptlm_dstpid = peer->plp_id.pid;
414         }
415
416         ptllnd_peer_addref(peer);
417         plni->plni_ntxs++;
418
419         CDEBUG(D_NET, "tx=%p\n",tx);
420
421         return tx;
422 }
423
424 void
425 ptllnd_abort_tx(ptllnd_tx_t *tx, ptl_handle_md_t *mdh)
426 {
427         ptllnd_peer_t   *peer = tx->tx_peer;
428         lnet_ni_t       *ni = peer->plp_ni;
429         int              rc;
430         time_t           start = cfs_time_current_sec();
431         int              w = PTLLND_WARN_LONG_WAIT;
432
433         while (!PtlHandleIsEqual(*mdh, PTL_INVALID_HANDLE)) {
434                 rc = PtlMDUnlink(*mdh);
435 #ifndef LUSTRE_PORTALS_UNLINK_SEMANTICS
436                 if (rc == PTL_OK) /* unlink successful => no unlinked event */
437                         return;
438                 LASSERT (rc == PTL_MD_IN_USE);
439 #endif
440                 if (cfs_time_current_sec() > start + w) {
441                         CWARN("Waited %ds to abort tx to %s\n",
442                               w, libcfs_id2str(peer->plp_id));
443                         w *= 2;
444                 }
445                 /* Wait for ptllnd_tx_event() to invalidate */
446                 ptllnd_wait(ni, w*1000);
447         }
448 }
449
450 void
451 ptllnd_cull_tx_history(ptllnd_ni_t *plni)
452 {
453         int max = plni->plni_max_tx_history;
454
455         while (plni->plni_ntx_history > max) {
456                 ptllnd_tx_t *tx = list_entry(plni->plni_tx_history.next, 
457                                              ptllnd_tx_t, tx_list);
458                 list_del(&tx->tx_list);
459
460                 ptllnd_peer_decref(tx->tx_peer);
461
462                 LIBCFS_FREE(tx, offsetof(ptllnd_tx_t, tx_msg) + tx->tx_msgsize);
463
464                 LASSERT (plni->plni_ntxs > 0);
465                 plni->plni_ntxs--;
466                 plni->plni_ntx_history--;
467         }
468 }
469
470 void
471 ptllnd_tx_done(ptllnd_tx_t *tx)
472 {
473         ptllnd_peer_t   *peer = tx->tx_peer;
474         lnet_ni_t       *ni = peer->plp_ni;
475         ptllnd_ni_t     *plni = ni->ni_data;
476
477         /* CAVEAT EMPTOR: If this tx is being aborted, I'll continue to get
478          * events for this tx until it's unlinked.  So I set tx_completing to
479          * flag the tx is getting handled */
480
481         if (tx->tx_completing)
482                 return;
483
484         tx->tx_completing = 1;
485
486         if (!list_empty(&tx->tx_list))
487                 list_del_init(&tx->tx_list);
488
489         if (tx->tx_status != 0) {
490                 CERROR("Completing tx with error\n");
491                 ptllnd_debug_tx(tx);
492                 ptllnd_close_peer(peer, tx->tx_status);
493         }
494         
495         ptllnd_abort_tx(tx, &tx->tx_reqmdh);
496         ptllnd_abort_tx(tx, &tx->tx_bulkmdh);
497
498         if (tx->tx_niov > 0) {
499                 LIBCFS_FREE(tx->tx_iov, tx->tx_niov * sizeof(*tx->tx_iov));
500                 tx->tx_niov = 0;
501         }
502
503         if (tx->tx_lnetreplymsg != NULL) {
504                 LASSERT (tx->tx_type == PTLLND_MSG_TYPE_GET);
505                 LASSERT (tx->tx_lnetmsg != NULL);
506                 /* Simulate GET success always  */
507                 lnet_finalize(ni, tx->tx_lnetmsg, 0);
508                 CDEBUG(D_NET, "lnet_finalize(tx_lnetreplymsg=%p)\n",tx->tx_lnetreplymsg);
509                 lnet_finalize(ni, tx->tx_lnetreplymsg, tx->tx_status);
510         } else if (tx->tx_lnetmsg != NULL) {
511                 lnet_finalize(ni, tx->tx_lnetmsg, tx->tx_status);
512         }
513
514         plni->plni_ntx_history++;
515         list_add_tail(&tx->tx_list, &plni->plni_tx_history);
516         
517         ptllnd_cull_tx_history(plni);
518 }
519
520 int
521 ptllnd_set_txiov(ptllnd_tx_t *tx,
522                  unsigned int niov, struct iovec *iov,
523                  unsigned int offset, unsigned int len)
524 {
525         ptl_md_iovec_t *piov;
526         int             npiov;
527
528         if (len == 0) {
529                 tx->tx_niov = 0;
530                 return 0;
531         }
532
533         CDEBUG(D_NET, "niov  =%d\n",niov);
534         CDEBUG(D_NET, "offset=%d\n",offset);
535         CDEBUG(D_NET, "len   =%d\n",len);
536
537
538         /*
539          * Remove iovec's at the beginning that
540          * are skipped because of the offset.
541          * Adjust the offset accordingly
542          */
543         for (;;) {
544                 LASSERT (niov > 0);
545                 if (offset < iov->iov_len)
546                         break;
547                 offset -= iov->iov_len;
548                 niov--;
549                 iov++;
550         }
551
552         CDEBUG(D_NET, "niov  =%d (after)\n",niov);
553         CDEBUG(D_NET, "offset=%d (after)\n",offset);
554         CDEBUG(D_NET, "len   =%d (after)\n",len);
555
556         for (;;) {
557                 int temp_offset = offset;
558                 int resid = len;
559                 LIBCFS_ALLOC(piov, niov * sizeof(*piov));
560                 if (piov == NULL)
561                         return -ENOMEM;
562
563                 for (npiov = 0;; npiov++) {
564                         CDEBUG(D_NET, "npiov=%d\n",npiov);
565                         CDEBUG(D_NET, "offset=%d\n",temp_offset);
566                         CDEBUG(D_NET, "len=%d\n",resid);
567                         CDEBUG(D_NET, "iov[npiov].iov_len=%d\n",iov[npiov].iov_len);
568
569                         LASSERT (npiov < niov);
570                         LASSERT (iov->iov_len >= temp_offset);
571
572                         piov[npiov].iov_base = iov[npiov].iov_base + temp_offset;
573                         piov[npiov].iov_len = iov[npiov].iov_len - temp_offset;
574                         
575                         if (piov[npiov].iov_len >= resid) {
576                                 piov[npiov].iov_len = resid;
577                                 npiov++;
578                                 break;
579                         }
580                         resid -= piov[npiov].iov_len;
581                         temp_offset = 0;
582                 }
583
584                 if (npiov == niov) {
585                         tx->tx_niov = niov;
586                         tx->tx_iov = piov;
587                         CDEBUG(D_NET, "tx->tx_iov=%p\n",tx->tx_iov);
588                         CDEBUG(D_NET, "tx->tx_niov=%d\n",tx->tx_niov);
589                         return 0;
590                 }
591
592                 /* Dang! The piov I allocated was too big and it's a drag to
593                  * have to maintain separate 'allocated' and 'used' sizes, so
594                  * I'll just do it again; NB this doesn't happen normally... */
595                 LIBCFS_FREE(piov, niov * sizeof(*piov));
596                 niov = npiov;
597         }
598 }
599
600 void
601 ptllnd_set_md_buffer(ptl_md_t *md, ptllnd_tx_t *tx)
602 {
603         unsigned int    niov = tx->tx_niov;
604         ptl_md_iovec_t *iov = tx->tx_iov;
605
606         LASSERT ((md->options & PTL_MD_IOVEC) == 0);
607
608         if (niov == 0) {
609                 md->start = NULL;
610                 md->length = 0;
611         } else if (niov == 1) {
612                 md->start = iov[0].iov_base;
613                 md->length = iov[0].iov_len;
614         } else {
615                 md->start = iov;
616                 md->length = niov;
617                 md->options |= PTL_MD_IOVEC;
618         }
619 }
620
621 int
622 ptllnd_post_buffer(ptllnd_buffer_t *buf)
623 {
624         lnet_ni_t        *ni = buf->plb_ni;
625         ptllnd_ni_t      *plni = ni->ni_data;
626         ptl_process_id_t  anyid = {
627                 .nid       = PTL_NID_ANY,
628                 .pid       = PTL_PID_ANY};
629         ptl_md_t          md = {
630                 .start     = buf->plb_buffer,
631                 .length    = plni->plni_buffer_size,
632                 .threshold = PTL_MD_THRESH_INF,
633                 .max_size  = plni->plni_max_msg_size,
634                 .options   = (PTLLND_MD_OPTIONS |
635                               PTL_MD_OP_PUT | PTL_MD_MAX_SIZE | 
636                               PTL_MD_LOCAL_ALIGN8),
637                 .user_ptr  = ptllnd_obj2eventarg(buf, PTLLND_EVENTARG_TYPE_BUF),
638                 .eq_handle = plni->plni_eqh};
639         ptl_handle_me_t meh;
640         int             rc;
641
642         LASSERT (!buf->plb_posted);
643
644         rc = PtlMEAttach(plni->plni_nih, plni->plni_portal,
645                          anyid, LNET_MSG_MATCHBITS, 0,
646                          PTL_UNLINK, PTL_INS_AFTER, &meh);
647         if (rc != PTL_OK) {
648                 CERROR("PtlMEAttach failed: %d\n", rc);
649                 return -ENOMEM;
650         }
651
652         buf->plb_posted = 1;
653         plni->plni_nposted_buffers++;
654
655         rc = PtlMDAttach(meh, md, LNET_UNLINK, &buf->plb_md);
656         if (rc == PTL_OK)
657                 return 0;
658
659         CERROR("PtlMDAttach failed: %d\n", rc);
660
661         buf->plb_posted = 0;
662         plni->plni_nposted_buffers--;
663
664         rc = PtlMEUnlink(meh);
665         LASSERT (rc == PTL_OK);
666
667         return -ENOMEM;
668 }
669
670 void
671 ptllnd_check_sends(ptllnd_peer_t *peer)
672 {
673         lnet_ni_t      *ni = peer->plp_ni;
674         ptllnd_ni_t    *plni = ni->ni_data;
675         ptllnd_tx_t    *tx;
676         ptl_md_t        md;
677         ptl_handle_md_t mdh;
678         int             rc;
679
680         CDEBUG(D_NET, "plp_outstanding_credits=%d\n",peer->plp_outstanding_credits);
681
682         if (list_empty(&peer->plp_txq) &&
683             peer->plp_outstanding_credits >=
684             PTLLND_CREDIT_HIGHWATER(plni)) {
685
686                 tx = ptllnd_new_tx(peer, PTLLND_MSG_TYPE_NOOP, 0);
687                 CDEBUG(D_NET, "NOOP tx=%p\n",tx);
688                 if (tx == NULL) {
689                         CERROR("Can't return credits to %s\n",
690                                libcfs_id2str(peer->plp_id));
691                 } else {
692                         list_add_tail(&tx->tx_list, &peer->plp_txq);
693                 }
694         }
695
696         while (!list_empty(&peer->plp_txq)) {
697                 tx = list_entry(peer->plp_txq.next, ptllnd_tx_t, tx_list);
698
699                 CDEBUG(D_NET, "Looking at TX=%p\n",tx);
700                 CDEBUG(D_NET, "plp_credits=%d\n",peer->plp_credits);
701                 CDEBUG(D_NET, "plp_outstanding_credits=%d\n",peer->plp_outstanding_credits);
702
703                 LASSERT (tx->tx_msgsize > 0);
704
705                 LASSERT (peer->plp_outstanding_credits >= 0);
706                 LASSERT (peer->plp_outstanding_credits <=
707                          plni->plni_peer_credits);
708                 LASSERT (peer->plp_credits >= 0);
709                 LASSERT (peer->plp_credits <= peer->plp_max_credits);
710
711                 if (peer->plp_credits == 0)     /* no credits */
712                         break;
713
714                 if (peer->plp_credits == 1 &&   /* last credit reserved for */
715                     peer->plp_outstanding_credits == 0) /* returning credits */
716                         break;
717
718                 list_del(&tx->tx_list);
719                 list_add_tail(&tx->tx_list, &peer->plp_activeq);
720
721                 CDEBUG(D_NET, "Sending at TX=%p type=%s (%d)\n",tx,
722                         ptllnd_msgtype2str(tx->tx_type),tx->tx_type);
723
724                 if (tx->tx_type == PTLLND_MSG_TYPE_NOOP &&
725                     (!list_empty(&peer->plp_txq) ||
726                      peer->plp_outstanding_credits <
727                      PTLLND_CREDIT_HIGHWATER(plni))) {
728                         /* redundant NOOP */
729                         ptllnd_tx_done(tx);
730                         continue;
731                 }
732
733                 /* Set stamp at the last minute; on a new peer, I don't know it
734                  * until I receive the HELLO back */
735                 tx->tx_msg.ptlm_dststamp = peer->plp_stamp;
736
737                 CDEBUG(D_NET, "Returning %d to peer\n",peer->plp_outstanding_credits);
738
739                 /*
740                  * Return all the credits we have
741                  */
742                 tx->tx_msg.ptlm_credits = peer->plp_outstanding_credits;
743                 peer->plp_outstanding_credits = 0;
744
745                 /*
746                  * One less credit
747                  */
748                 peer->plp_credits--;
749
750                 if (plni->plni_checksum)
751                         tx->tx_msg.ptlm_cksum = 
752                                 ptllnd_cksum(&tx->tx_msg,
753                                              offsetof(kptl_msg_t, ptlm_u));
754
755                 md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);
756                 md.eq_handle = plni->plni_eqh;
757                 md.threshold = 1;
758                 md.options = PTLLND_MD_OPTIONS;
759                 md.start = &tx->tx_msg;
760                 md.length = tx->tx_msgsize;
761
762                 rc = PtlMDBind(plni->plni_nih, md, LNET_UNLINK, &mdh);
763                 if (rc != PTL_OK) {
764                         CERROR("PtlMDBind for %s failed: %d\n",
765                                libcfs_id2str(peer->plp_id), rc);
766                         tx->tx_status = -EIO;
767                         ptllnd_tx_done(tx);
768                         break;
769                 }
770
771                 tx->tx_reqmdh = mdh;
772                 PTLLND_DBGT_STAMP(tx->tx_req_posted);
773
774                 rc = PtlPut(mdh, PTL_NOACK_REQ, peer->plp_ptlid,
775                             plni->plni_portal, 0, LNET_MSG_MATCHBITS, 0, 0);
776                 if (rc != PTL_OK) {
777                         CERROR("PtlPut for %s failed: %d\n",
778                                libcfs_id2str(peer->plp_id), rc);
779                         tx->tx_status = -EIO;
780                         ptllnd_tx_done(tx);
781                         break;
782                 }
783         }
784 }
785
786 int
787 ptllnd_passive_rdma(ptllnd_peer_t *peer, int type, lnet_msg_t *msg,
788                     unsigned int niov, struct iovec *iov,
789                     unsigned int offset, unsigned int len)
790 {
791         lnet_ni_t      *ni = peer->plp_ni;
792         ptllnd_ni_t    *plni = ni->ni_data;
793         ptllnd_tx_t    *tx = ptllnd_new_tx(peer, type, 0);
794         __u64           matchbits;
795         ptl_md_t        md;
796         ptl_handle_md_t mdh;
797         ptl_handle_me_t meh;
798         int             rc;
799         int             rc2;
800         time_t          start;
801         int             w;
802
803         CDEBUG(D_NET, "niov=%d offset=%d len=%d\n",niov,offset,len);
804
805         LASSERT (type == PTLLND_MSG_TYPE_GET ||
806                  type == PTLLND_MSG_TYPE_PUT);
807
808         if (tx == NULL) {
809                 CERROR("Can't allocate %s tx for %s\n",
810                        type == PTLLND_MSG_TYPE_GET ? "GET" : "PUT/REPLY",
811                        libcfs_id2str(peer->plp_id));
812                 return -ENOMEM;
813         }
814
815         rc = ptllnd_set_txiov(tx, niov, iov, offset, len);
816         if (rc != 0) {
817                 CERROR ("Can't allocate iov %d for %s\n",
818                         niov, libcfs_id2str(peer->plp_id));
819                 rc = -ENOMEM;
820                 goto failed;
821         }
822
823         md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);
824         md.eq_handle = plni->plni_eqh;
825         md.threshold = 1;
826         md.max_size = 0;
827         md.options = PTLLND_MD_OPTIONS;
828         if(type == PTLLND_MSG_TYPE_GET)
829                 md.options |= PTL_MD_OP_PUT | PTL_MD_ACK_DISABLE;
830         else
831                 md.options |= PTL_MD_OP_GET;
832         ptllnd_set_md_buffer(&md, tx);
833
834         start = cfs_time_current_sec();
835         w = PTLLND_WARN_LONG_WAIT;
836
837         while (!peer->plp_recvd_hello) {        /* wait to validate plp_match */
838                 if (peer->plp_closing) {
839                         rc = -EIO;
840                         goto failed;
841                 }
842                 if (cfs_time_current_sec() > start + w) {
843                         CWARN("Waited %ds to connect to %s\n",
844                               w, libcfs_id2str(peer->plp_id));
845                         w *= 2;
846                 }
847                 ptllnd_wait(ni, w*1000);
848         }
849
850         if (peer->plp_match < PTL_RESERVED_MATCHBITS)
851                 peer->plp_match = PTL_RESERVED_MATCHBITS;
852         matchbits = peer->plp_match++;
853         CDEBUG(D_NET, "matchbits " LPX64 " %s\n", matchbits,
854                ptllnd_ptlid2str(peer->plp_ptlid));
855
856         rc = PtlMEAttach(plni->plni_nih, plni->plni_portal, peer->plp_ptlid,
857                          matchbits, 0, PTL_UNLINK, PTL_INS_BEFORE, &meh);
858         if (rc != PTL_OK) {
859                 CERROR("PtlMEAttach for %s failed: %d\n",
860                        libcfs_id2str(peer->plp_id), rc);
861                 rc = -EIO;
862                 goto failed;
863         }
864
865         CDEBUG(D_NET, "md.start=%p\n",md.start);
866         CDEBUG(D_NET, "md.length=%d\n",md.length);
867         CDEBUG(D_NET, "md.threshold=%d\n",md.threshold);
868         CDEBUG(D_NET, "md.max_size=%d\n",md.max_size);
869         CDEBUG(D_NET, "md.options=0x%x\n",md.options);
870         CDEBUG(D_NET, "md.user_ptr=%p\n",md.user_ptr);
871
872         PTLLND_DBGT_STAMP(tx->tx_bulk_posted);
873
874         rc = PtlMDAttach(meh, md, LNET_UNLINK, &mdh);
875         if (rc != PTL_OK) {
876                 CERROR("PtlMDAttach for %s failed: %d\n",
877                        libcfs_id2str(peer->plp_id), rc);
878                 rc2 = PtlMEUnlink(meh);
879                 LASSERT (rc2 == PTL_OK);
880                 rc = -EIO;
881                 goto failed;
882         }
883         tx->tx_bulkmdh = mdh;
884
885         /*
886          * We need to set the stamp here because it
887          * we could have received a HELLO above that set
888          * peer->plp_stamp
889          */
890         tx->tx_msg.ptlm_dststamp = peer->plp_stamp;
891
892         tx->tx_msg.ptlm_u.rdma.kptlrm_hdr = msg->msg_hdr;
893         tx->tx_msg.ptlm_u.rdma.kptlrm_matchbits = matchbits;
894
895         if (type == PTLLND_MSG_TYPE_GET) {
896                 tx->tx_lnetreplymsg = lnet_create_reply_msg(ni, msg);
897                 if (tx->tx_lnetreplymsg == NULL) {
898                         CERROR("Can't create reply for GET to %s\n",
899                                libcfs_id2str(msg->msg_target));
900                         rc = -ENOMEM;
901                         goto failed;
902                 }
903         }
904
905         tx->tx_lnetmsg = msg;
906         ptllnd_post_tx(tx);
907         return 0;
908
909  failed:
910         ptllnd_tx_done(tx);
911         return rc;
912 }
913
914 int
915 ptllnd_active_rdma(ptllnd_peer_t *peer, int type,
916                    lnet_msg_t *msg, __u64 matchbits,
917                    unsigned int niov, struct iovec *iov,
918                    unsigned int offset, unsigned int len)
919 {
920         lnet_ni_t       *ni = peer->plp_ni;
921         ptllnd_ni_t     *plni = ni->ni_data;
922         ptllnd_tx_t     *tx = ptllnd_new_tx(peer, type, 0);
923         ptl_md_t         md;
924         ptl_handle_md_t  mdh;
925         int              rc;
926
927         LASSERT (type == PTLLND_RDMA_READ ||
928                  type == PTLLND_RDMA_WRITE);
929
930         if (tx == NULL) {
931                 CERROR("Can't allocate tx for RDMA %s with %s\n",
932                        (type == PTLLND_RDMA_WRITE) ? "write" : "read",
933                        libcfs_id2str(peer->plp_id));
934                 ptllnd_close_peer(peer, -ENOMEM);
935                 return -ENOMEM;
936         }
937
938         rc = ptllnd_set_txiov(tx, niov, iov, offset, len);
939         if (rc != 0) {
940                 CERROR ("Can't allocate iov %d for %s\n",
941                         niov, libcfs_id2str(peer->plp_id));
942                 rc = -ENOMEM;
943                 goto failed;
944         }
945
946         md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);
947         md.eq_handle = plni->plni_eqh;
948         md.max_size = 0;
949         md.options = PTLLND_MD_OPTIONS;
950         md.threshold = (type == PTLLND_RDMA_READ) ? 2 : 1;
951
952         ptllnd_set_md_buffer(&md, tx);
953
954         rc = PtlMDBind(plni->plni_nih, md, LNET_UNLINK, &mdh);
955         if (rc != PTL_OK) {
956                 CERROR("PtlMDBind for %s failed: %d\n",
957                        libcfs_id2str(peer->plp_id), rc);
958                 rc = -EIO;
959                 goto failed;
960         }
961
962         tx->tx_bulkmdh = mdh;
963         tx->tx_lnetmsg = msg;
964
965         list_add_tail(&tx->tx_list, &peer->plp_activeq);
966         PTLLND_DBGT_STAMP(tx->tx_bulk_posted);
967
968         if (type == PTLLND_RDMA_READ)
969                 rc = PtlGet(mdh, peer->plp_ptlid,
970                             plni->plni_portal, 0, matchbits, 0);
971         else
972                 rc = PtlPut(mdh, PTL_NOACK_REQ, peer->plp_ptlid,
973                             plni->plni_portal, 0, matchbits, 0, 
974                             (msg == NULL) ? PTLLND_RDMA_FAIL : PTLLND_RDMA_OK);
975
976         if (rc == PTL_OK)
977                 return 0;
978
979         CERROR("Can't initiate RDMA with %s: %d\n",
980                libcfs_id2str(peer->plp_id), rc);
981
982         tx->tx_lnetmsg = NULL;
983  failed:
984         tx->tx_status = rc;
985         ptllnd_tx_done(tx);    /* this will close peer */
986         return rc;
987 }
988
989 int
990 ptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *msg)
991 {
992         ptllnd_ni_t    *plni = ni->ni_data;
993         ptllnd_peer_t  *plp;
994         ptllnd_tx_t    *tx;
995         int             nob;
996         int             rc;
997
998         LASSERT (!msg->msg_routing);
999         LASSERT (msg->msg_kiov == NULL);
1000
1001         LASSERT (msg->msg_niov <= PTL_MD_MAX_IOV); /* !!! */
1002
1003         CDEBUG(D_NET, "%s [%d]+%d,%d -> %s%s\n", 
1004                lnet_msgtyp2str(msg->msg_type),
1005                msg->msg_niov, msg->msg_offset, msg->msg_len,
1006                libcfs_nid2str(msg->msg_target.nid),
1007                msg->msg_target_is_router ? "(rtr)" : "");
1008
1009         if ((msg->msg_target.pid & LNET_PID_USERFLAG) != 0) {
1010                 CERROR("Can't send to non-kernel peer %s\n",
1011                        libcfs_id2str(msg->msg_target));
1012                 return -EHOSTUNREACH;
1013         }
1014         
1015         plp = ptllnd_find_peer(ni, msg->msg_target, 1);
1016         if (plp == NULL)
1017                 return -ENOMEM;
1018
1019         switch (msg->msg_type) {
1020         default:
1021                 LBUG();
1022
1023         case LNET_MSG_ACK:
1024                 CDEBUG(D_NET, "LNET_MSG_ACK\n");
1025
1026                 LASSERT (msg->msg_len == 0);
1027                 break;                          /* send IMMEDIATE */
1028
1029         case LNET_MSG_GET:
1030                 CDEBUG(D_NET, "LNET_MSG_GET nob=%d\n",msg->msg_md->md_length);
1031
1032                 if (msg->msg_target_is_router)
1033                         break;                  /* send IMMEDIATE */
1034
1035                 nob = msg->msg_md->md_length;
1036                 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[nob]);
1037                 if (nob <= plni->plni_max_msg_size)
1038                         break;
1039
1040                 LASSERT ((msg->msg_md->md_options & LNET_MD_KIOV) == 0);
1041                 rc = ptllnd_passive_rdma(plp, PTLLND_MSG_TYPE_GET, msg,
1042                                          msg->msg_md->md_niov,
1043                                          msg->msg_md->md_iov.iov,
1044                                          0, msg->msg_md->md_length);
1045                 ptllnd_peer_decref(plp);
1046                 return rc;
1047
1048         case LNET_MSG_REPLY:
1049         case LNET_MSG_PUT:
1050                 CDEBUG(D_NET, "LNET_MSG_PUT nob=%d\n",msg->msg_len);
1051                 nob = msg->msg_len;
1052                 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[nob]);
1053                 CDEBUG(D_NET, "msg_size=%d max=%d\n",msg->msg_len,plp->plp_max_msg_size);
1054                 if (nob <= plp->plp_max_msg_size)
1055                         break;                  /* send IMMEDIATE */
1056
1057                 rc = ptllnd_passive_rdma(plp, PTLLND_MSG_TYPE_PUT, msg,
1058                                          msg->msg_niov, msg->msg_iov,
1059                                          msg->msg_offset, msg->msg_len);
1060                 ptllnd_peer_decref(plp);
1061                 return rc;
1062         }
1063
1064         /* send IMMEDIATE
1065          * NB copy the payload so we don't have to do a fragmented send */
1066
1067         CDEBUG(D_NET, "IMMEDIATE len=%d\n", msg->msg_len);
1068         tx = ptllnd_new_tx(plp, PTLLND_MSG_TYPE_IMMEDIATE, msg->msg_len);
1069         if (tx == NULL) {
1070                 CERROR("Can't allocate tx for lnet type %d to %s\n",
1071                        msg->msg_type, libcfs_id2str(msg->msg_target));
1072                 ptllnd_peer_decref(plp);
1073                 return -ENOMEM;
1074         }
1075
1076         lnet_copy_iov2flat(tx->tx_msgsize, &tx->tx_msg,
1077                            offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload),
1078                            msg->msg_niov, msg->msg_iov, msg->msg_offset,
1079                            msg->msg_len);
1080         tx->tx_msg.ptlm_u.immediate.kptlim_hdr = msg->msg_hdr;
1081
1082         tx->tx_lnetmsg = msg;
1083         ptllnd_post_tx(tx);
1084         ptllnd_peer_decref(plp);
1085         return 0;
1086 }
1087
1088 void
1089 ptllnd_rx_done(ptllnd_rx_t *rx)
1090 {
1091         ptllnd_peer_t *plp = rx->rx_peer;
1092         lnet_ni_t     *ni = plp->plp_ni;
1093         ptllnd_ni_t   *plni = ni->ni_data;
1094
1095         CDEBUG(D_NET, "rx=%p\n", rx);
1096
1097         plp->plp_outstanding_credits++;
1098         ptllnd_check_sends(rx->rx_peer);
1099
1100         LASSERT (plni->plni_nrxs > 0);
1101         plni->plni_nrxs--;
1102 }
1103
1104 int
1105 ptllnd_eager_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg,
1106                   void **new_privatep)
1107 {
1108         /* Shouldn't get here; recvs only block for router buffers */
1109         LBUG();
1110         return 0;
1111 }
1112
1113 int
1114 ptllnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg,
1115             int delayed, unsigned int niov,
1116             struct iovec *iov, lnet_kiov_t *kiov,
1117             unsigned int offset, unsigned int mlen, unsigned int rlen)
1118 {
1119         ptllnd_rx_t    *rx = private;
1120         int             rc = 0;
1121         int             nob;
1122
1123         LASSERT (kiov == NULL);
1124         LASSERT (niov <= PTL_MD_MAX_IOV);       /* !!! */
1125
1126         switch (rx->rx_msg->ptlm_type) {
1127         default:
1128                 LBUG();
1129
1130         case PTLLND_MSG_TYPE_IMMEDIATE:
1131                 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[mlen]);
1132                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_IMMEDIATE nob=%d\n",nob);
1133                 if (nob > rx->rx_nob) {
1134                         CERROR("Immediate message from %s too big: %d(%d)\n",
1135                                libcfs_id2str(rx->rx_peer->plp_id),
1136                                nob, rx->rx_nob);
1137                         rc = -EPROTO;
1138                         break;
1139                 }
1140                 lnet_copy_flat2iov(niov, iov, offset,
1141                                    rx->rx_nob, rx->rx_msg,
1142                                    offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload),
1143                                    mlen);
1144                 lnet_finalize(ni, msg, 0);
1145                 break;
1146
1147         case PTLLND_MSG_TYPE_PUT:
1148                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_PUT offset=%d mlen=%d\n",offset,mlen);
1149                 rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_READ, msg,
1150                                         rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,
1151                                         niov, iov, offset, mlen);
1152                 break;
1153
1154         case PTLLND_MSG_TYPE_GET:
1155                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_GET\n");
1156                 if (msg != NULL)
1157                         rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_WRITE, msg,
1158                                                 rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,
1159                                                 msg->msg_niov, msg->msg_iov,
1160                                                 msg->msg_offset, msg->msg_len);
1161                 else
1162                         rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_WRITE, NULL,
1163                                                 rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,
1164                                                 0, NULL, 0, 0);
1165                 break;
1166         }
1167
1168         ptllnd_rx_done(rx);
1169         return rc;
1170 }
1171
1172 void
1173 ptllnd_abort_on_nak(lnet_ni_t *ni)
1174 {
1175         ptllnd_ni_t      *plni = ni->ni_data;
1176
1177         if (plni->plni_abort_on_nak)
1178                 abort();
1179 }
1180
1181 void
1182 ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator,
1183                      kptl_msg_t *msg, unsigned int nob)
1184 {
1185         ptllnd_ni_t      *plni = ni->ni_data;
1186         const int         basenob = offsetof(kptl_msg_t, ptlm_u);
1187         lnet_process_id_t srcid;
1188         ptllnd_rx_t       rx;
1189         int               flip;
1190         __u16             msg_version;
1191         __u32             msg_cksum;
1192         ptllnd_peer_t    *plp;
1193         int               rc;
1194
1195         if (nob < 6) {
1196                 CERROR("Very short receive from %s\n",
1197                        ptllnd_ptlid2str(initiator));
1198                 return;
1199         }
1200
1201         /* I can at least read MAGIC/VERSION */
1202
1203         flip = msg->ptlm_magic == __swab32(PTLLND_MSG_MAGIC);
1204         if (!flip && msg->ptlm_magic != PTLLND_MSG_MAGIC) {
1205                 CERROR("Bad protocol magic %08x from %s\n", 
1206                        msg->ptlm_magic, ptllnd_ptlid2str(initiator));
1207                 return;
1208         }
1209
1210         msg_version = flip ? __swab16(msg->ptlm_version) : msg->ptlm_version;
1211
1212         if (msg_version != PTLLND_MSG_VERSION) {
1213                 CERROR("Bad protocol version %04x from %s\n", 
1214                        (__u32)msg_version, ptllnd_ptlid2str(initiator));
1215                 ptllnd_abort_on_nak(ni);
1216                 return;
1217         }
1218
1219         if (nob < basenob) {
1220                 CERROR("Short receive from %s: got %d, wanted at least %d\n",
1221                        ptllnd_ptlid2str(initiator), nob, basenob);
1222                 return;
1223         }
1224
1225         /* checksum must be computed with
1226          * 1) ptlm_cksum zero and
1227          * 2) BEFORE anything gets modified/flipped
1228          */
1229         msg_cksum = flip ? __swab32(msg->ptlm_cksum) : msg->ptlm_cksum;
1230         msg->ptlm_cksum = 0;
1231         if (msg_cksum != 0 &&
1232             msg_cksum != ptllnd_cksum(msg, offsetof(kptl_msg_t, ptlm_u))) {
1233                 CERROR("Bad checksum from %s\n", ptllnd_ptlid2str(initiator));
1234                 return;
1235         }
1236
1237         msg->ptlm_version = msg_version;
1238         msg->ptlm_cksum = msg_cksum;
1239         
1240         if (flip) {
1241                 /* NB stamps are opaque cookies */
1242                 __swab32s(&msg->ptlm_nob);
1243                 __swab64s(&msg->ptlm_srcnid);
1244                 __swab64s(&msg->ptlm_dstnid);
1245                 __swab32s(&msg->ptlm_srcpid);
1246                 __swab32s(&msg->ptlm_dstpid);
1247         }
1248         
1249         srcid.nid = msg->ptlm_srcnid;
1250         srcid.pid = msg->ptlm_srcpid;
1251
1252         if (LNET_NIDNET(msg->ptlm_srcnid) != LNET_NIDNET(ni->ni_nid)) {
1253                 CERROR("Bad source id %s from %s\n",
1254                        libcfs_id2str(srcid),
1255                        ptllnd_ptlid2str(initiator));
1256                 return;
1257         }
1258
1259         if (msg->ptlm_type == PTLLND_MSG_TYPE_NAK) {
1260                 CERROR("NAK from %s (%s)\n", 
1261                        libcfs_id2str(srcid),
1262                        ptllnd_ptlid2str(initiator));
1263                 ptllnd_abort_on_nak(ni);
1264                 return;
1265         }
1266         
1267         if (msg->ptlm_dstnid != ni->ni_nid ||
1268             msg->ptlm_dstpid != the_lnet.ln_pid) {
1269                 CERROR("Bad dstid %s (%s expected) from %s\n",
1270                        libcfs_id2str((lnet_process_id_t) {
1271                                .nid = msg->ptlm_dstnid,
1272                                .pid = msg->ptlm_dstpid}),
1273                        libcfs_id2str((lnet_process_id_t) {
1274                                .nid = ni->ni_nid,
1275                                .pid = the_lnet.ln_pid}),
1276                        libcfs_id2str(srcid));
1277                 return;
1278         }
1279
1280         if (msg->ptlm_dststamp != plni->plni_stamp) {
1281                 CERROR("Bad dststamp "LPX64"("LPX64" expected) from %s\n",
1282                        msg->ptlm_dststamp, plni->plni_stamp,
1283                        libcfs_id2str(srcid));
1284                 return;
1285         }
1286        
1287         switch (msg->ptlm_type) {
1288         case PTLLND_MSG_TYPE_PUT:
1289         case PTLLND_MSG_TYPE_GET:
1290                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_%s\n",
1291                         msg->ptlm_type==PTLLND_MSG_TYPE_PUT ? "PUT" : "GET");
1292                 if (nob < basenob + sizeof(kptl_rdma_msg_t)) {
1293                         CERROR("Short rdma request from %s(%s)\n",
1294                                libcfs_id2str(srcid),
1295                                ptllnd_ptlid2str(initiator));
1296                         return;
1297                 }
1298                 if (flip)
1299                         __swab64s(&msg->ptlm_u.rdma.kptlrm_matchbits);
1300                 break;
1301
1302         case PTLLND_MSG_TYPE_IMMEDIATE:
1303                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_IMMEDIATE\n");
1304                 if (nob < offsetof(kptl_msg_t,
1305                                    ptlm_u.immediate.kptlim_payload)) {
1306                         CERROR("Short immediate from %s(%s)\n",
1307                                libcfs_id2str(srcid),
1308                                ptllnd_ptlid2str(initiator));
1309                         return;
1310                 }
1311                 break;
1312
1313         case PTLLND_MSG_TYPE_HELLO:
1314                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_HELLO from %s(%s)\n",
1315                                libcfs_id2str(srcid),
1316                                ptllnd_ptlid2str(initiator));
1317                 if (nob < basenob + sizeof(kptl_hello_msg_t)) {
1318                         CERROR("Short hello from %s(%s)\n",
1319                                libcfs_id2str(srcid),
1320                                ptllnd_ptlid2str(initiator));
1321                         return;
1322                 }
1323                 if(flip){
1324                         __swab64s(&msg->ptlm_u.hello.kptlhm_matchbits);
1325                         __swab32s(&msg->ptlm_u.hello.kptlhm_max_msg_size);
1326                 }
1327                 break;
1328                 
1329         case PTLLND_MSG_TYPE_NOOP:
1330                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_NOOP from %s(%s)\n",
1331                                libcfs_id2str(srcid),
1332                                ptllnd_ptlid2str(initiator));        
1333                 break;
1334
1335         default:
1336                 CERROR("Bad message type %d from %s(%s)\n", msg->ptlm_type,
1337                        libcfs_id2str(srcid),
1338                        ptllnd_ptlid2str(initiator));
1339                 return;
1340         }
1341
1342         plp = ptllnd_find_peer(ni, srcid,
1343                                msg->ptlm_type == PTLLND_MSG_TYPE_HELLO);
1344         if (plp == NULL) {
1345                 CERROR("Can't find peer %s\n", libcfs_id2str(srcid));
1346                 return;
1347         }
1348
1349         if (msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {
1350                 if (plp->plp_recvd_hello) {
1351                         CERROR("Unexpected HELLO from %s\n",
1352                                libcfs_id2str(srcid));
1353                         ptllnd_peer_decref(plp);
1354                         return;
1355                 }
1356
1357                 CDEBUG(D_NET, "maxsz %d match "LPX64" stamp "LPX64"\n",
1358                        msg->ptlm_u.hello.kptlhm_max_msg_size,
1359                        msg->ptlm_u.hello.kptlhm_matchbits,
1360                        msg->ptlm_srcstamp);
1361
1362                 plp->plp_max_msg_size = MAX(plni->plni_max_msg_size,
1363                         msg->ptlm_u.hello.kptlhm_max_msg_size);
1364                 plp->plp_match = msg->ptlm_u.hello.kptlhm_matchbits;
1365                 plp->plp_stamp = msg->ptlm_srcstamp;
1366                 plp->plp_max_credits += msg->ptlm_credits;
1367                 plp->plp_recvd_hello = 1;
1368
1369                 CDEBUG(D_NET, "plp_max_msg_size=%d\n",plp->plp_max_msg_size);
1370
1371         } else if (!plp->plp_recvd_hello) {
1372
1373                 CERROR("Bad message type %d (HELLO expected) from %s\n",
1374                        msg->ptlm_type, libcfs_id2str(srcid));
1375                 ptllnd_peer_decref(plp);
1376                 return;
1377
1378         } else if (msg->ptlm_srcstamp != plp->plp_stamp) {
1379
1380                 CERROR("Bad srcstamp "LPX64"("LPX64" expected) from %s\n",
1381                        msg->ptlm_srcstamp, plp->plp_stamp,
1382                        libcfs_id2str(srcid));
1383                 ptllnd_peer_decref(plp);
1384                 return;
1385         }
1386
1387         if (msg->ptlm_credits > 0) {
1388                 CDEBUG(D_NET, "Getting back %d credits from peer\n",msg->ptlm_credits);
1389                 if (plp->plp_credits + msg->ptlm_credits >
1390                     plp->plp_max_credits) {
1391                         CWARN("Too many credits from %s: %d + %d > %d\n",
1392                               libcfs_id2str(srcid),
1393                               plp->plp_credits, msg->ptlm_credits,
1394                               plp->plp_max_credits);
1395                         plp->plp_credits = plp->plp_max_credits;
1396                 } else {
1397                         plp->plp_credits += msg->ptlm_credits;
1398                 }
1399                 ptllnd_check_sends(plp);
1400         }
1401
1402         /* All OK so far; assume the message is good... */
1403
1404         rx.rx_peer      = plp;
1405         rx.rx_msg       = msg;
1406         rx.rx_nob       = nob;
1407         plni->plni_nrxs++;
1408
1409         CDEBUG(D_NET, "rx=%p type=%d\n",&rx,msg->ptlm_type);
1410
1411         switch (msg->ptlm_type) {
1412         default: /* message types have been checked already */
1413                 ptllnd_rx_done(&rx);
1414                 break;
1415
1416         case PTLLND_MSG_TYPE_PUT:
1417         case PTLLND_MSG_TYPE_GET:
1418                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_%s\n",
1419                         msg->ptlm_type==PTLLND_MSG_TYPE_PUT ? "PUT" : "GET");
1420                 rc = lnet_parse(ni, &msg->ptlm_u.rdma.kptlrm_hdr,
1421                                 msg->ptlm_srcnid, &rx, 1);
1422                 CDEBUG(D_NET, "lnet_parse rc=%d\n",rc);
1423                 if (rc < 0)
1424                         ptllnd_rx_done(&rx);
1425                 break;
1426
1427         case PTLLND_MSG_TYPE_IMMEDIATE:
1428                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_IMMEDIATE\n");
1429                 rc = lnet_parse(ni, &msg->ptlm_u.immediate.kptlim_hdr,
1430                                 msg->ptlm_srcnid, &rx, 0);
1431                 CDEBUG(D_NET, "lnet_parse rc=%d\n",rc);
1432                 if (rc < 0)
1433                         ptllnd_rx_done(&rx);
1434                 break;
1435         }
1436
1437         ptllnd_peer_decref(plp);
1438 }
1439
1440 void
1441 ptllnd_buf_event (lnet_ni_t *ni, ptl_event_t *event)
1442 {
1443         ptllnd_buffer_t *buf = ptllnd_eventarg2obj(event->md.user_ptr);
1444         ptllnd_ni_t     *plni = ni->ni_data;
1445         char            *msg = &buf->plb_buffer[event->offset];
1446         int              repost;
1447         int              unlinked = event->type == PTL_EVENT_UNLINK;
1448
1449         LASSERT (buf->plb_ni == ni);
1450         LASSERT (event->type == PTL_EVENT_PUT_END ||
1451                  event->type == PTL_EVENT_UNLINK);
1452
1453         CDEBUG(D_NET, "buf=%p event=%d\n",buf,event->type);
1454
1455         if (event->ni_fail_type != PTL_NI_OK) {
1456
1457                 CERROR("event type %d, status %d from %s\n",
1458                        event->type, event->ni_fail_type,
1459                        ptllnd_ptlid2str(event->initiator));
1460
1461         } else if (event->type == PTL_EVENT_PUT_END) {
1462 #if (PTL_MD_LOCAL_ALIGN8 == 0)
1463                 /* Portals can't force message alignment - someone sending an
1464                  * odd-length message could misalign subsequent messages */
1465                 if ((event->mlength & 7) != 0) {
1466                         CERROR("Message from %s has odd length %d: "
1467                                "probable version incompatibility\n",
1468                                ptllnd_ptlid2str(event->initiator),
1469                                event->mlength);
1470                         LBUG();
1471                 }
1472 #endif
1473                 LASSERT ((event->offset & 7) == 0);
1474
1475                 ptllnd_parse_request(ni, event->initiator,
1476                                      (kptl_msg_t *)msg, event->mlength);
1477         }
1478
1479 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
1480         /* UNLINK event only on explicit unlink */
1481         repost = (event->unlinked && event->type != PTL_EVENT_UNLINK);
1482         if (event->unlinked)
1483                 unlinked = 1;
1484 #else
1485         /* UNLINK event only on implicit unlink */
1486         repost = (event->type == PTL_EVENT_UNLINK);
1487 #endif
1488
1489         CDEBUG(D_NET, "repost=%d unlinked=%d\n",repost,unlinked);
1490
1491         if (unlinked) {
1492                 LASSERT(buf->plb_posted);
1493                 buf->plb_posted = 0;
1494                 plni->plni_nposted_buffers--;
1495         }
1496
1497         if (repost)
1498                 (void) ptllnd_post_buffer(buf);
1499 }
1500
1501 void
1502 ptllnd_tx_event (lnet_ni_t *ni, ptl_event_t *event)
1503 {
1504         ptllnd_ni_t *plni = ni->ni_data;
1505         ptllnd_tx_t *tx = ptllnd_eventarg2obj(event->md.user_ptr);
1506         int          error = (event->ni_fail_type != PTL_NI_OK);
1507         int          isreq;
1508         int          isbulk;
1509 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
1510         int          unlinked = event->unlinked;
1511 #else
1512         int          unlinked = (event->type == PTL_EVENT_UNLINK);
1513 #endif
1514
1515         if (error)
1516                 CERROR("Error event type %d for %s for %s\n",
1517                        event->type, ptllnd_msgtype2str(tx->tx_type),
1518                        libcfs_id2str(tx->tx_peer->plp_id));
1519
1520         LASSERT (!PtlHandleIsEqual(event->md_handle, PTL_INVALID_HANDLE));
1521
1522         CDEBUG(D_NET, "tx=%p type=%s (%d)\n",tx,
1523                 ptllnd_msgtype2str(tx->tx_type),tx->tx_type);
1524         CDEBUG(D_NET, "unlinked=%d\n",unlinked);
1525         CDEBUG(D_NET, "error=%d\n",error);
1526
1527         isreq = PtlHandleIsEqual(event->md_handle, tx->tx_reqmdh);
1528         CDEBUG(D_NET, "isreq=%d\n",isreq);
1529         if (isreq) {
1530                 LASSERT (event->md.start == (void *)&tx->tx_msg);
1531                 if (unlinked) {
1532                         tx->tx_reqmdh = PTL_INVALID_HANDLE;
1533                         PTLLND_DBGT_STAMP(tx->tx_req_done);
1534                 }
1535         }
1536
1537         isbulk = PtlHandleIsEqual(event->md_handle, tx->tx_bulkmdh);
1538         CDEBUG(D_NET, "isbulk=%d\n",isbulk);
1539         if ( isbulk && unlinked ) {
1540                 tx->tx_bulkmdh = PTL_INVALID_HANDLE;
1541                 PTLLND_DBGT_STAMP(tx->tx_bulk_done);
1542         }
1543
1544         LASSERT (!isreq != !isbulk);            /* always one and only 1 match */
1545
1546         switch (tx->tx_type) {
1547         default:
1548                 LBUG();
1549
1550         case PTLLND_MSG_TYPE_NOOP:
1551         case PTLLND_MSG_TYPE_HELLO:
1552         case PTLLND_MSG_TYPE_IMMEDIATE:
1553                 LASSERT (event->type == PTL_EVENT_UNLINK ||
1554                          event->type == PTL_EVENT_SEND_END);
1555                 LASSERT (isreq);
1556                 break;
1557
1558         case PTLLND_MSG_TYPE_GET:
1559                 LASSERT (event->type == PTL_EVENT_UNLINK ||
1560                          (isreq && event->type == PTL_EVENT_SEND_END) ||
1561                          (isbulk && event->type == PTL_EVENT_PUT_END));
1562
1563                 if (isbulk && !error && event->type == PTL_EVENT_PUT_END) {
1564                         /* Check GET matched */
1565                         if (event->hdr_data == PTLLND_RDMA_OK) {
1566                                 lnet_set_reply_msg_len(ni, 
1567                                                        tx->tx_lnetreplymsg,
1568                                                        event->mlength);
1569                         } else {
1570                                 CERROR ("Unmatched GET with %s\n",
1571                                         libcfs_id2str(tx->tx_peer->plp_id));
1572                                 tx->tx_status = -EIO;
1573                         }
1574                 }
1575                 break;
1576
1577         case PTLLND_MSG_TYPE_PUT:
1578                 LASSERT (event->type == PTL_EVENT_UNLINK ||
1579                          (isreq && event->type == PTL_EVENT_SEND_END) ||
1580                          (isbulk && event->type == PTL_EVENT_GET_END));
1581                 break;
1582
1583         case PTLLND_RDMA_READ:
1584                 LASSERT (event->type == PTL_EVENT_UNLINK ||
1585                          event->type == PTL_EVENT_SEND_END ||
1586                          event->type == PTL_EVENT_REPLY_END);
1587                 LASSERT (isbulk);
1588                 break;
1589
1590         case PTLLND_RDMA_WRITE:
1591                 LASSERT (event->type == PTL_EVENT_UNLINK ||
1592                          event->type == PTL_EVENT_SEND_END);
1593                 LASSERT (isbulk);
1594         }
1595
1596         /* Schedule ptllnd_tx_done() on error or last completion event */
1597         if (error ||
1598             (PtlHandleIsEqual(tx->tx_bulkmdh, PTL_INVALID_HANDLE) &&
1599              PtlHandleIsEqual(tx->tx_reqmdh, PTL_INVALID_HANDLE))) {
1600                 if (error)
1601                         tx->tx_status = -EIO;
1602                 list_del(&tx->tx_list);
1603                 list_add_tail(&tx->tx_list, &plni->plni_zombie_txs);
1604                 CDEBUG(D_NET, "tx=%p ONTO ZOMBIE LIST\n",tx);
1605         }
1606 }
1607
1608 void
1609 ptllnd_wait (lnet_ni_t *ni, int milliseconds)
1610 {
1611         ptllnd_ni_t   *plni = ni->ni_data;
1612         ptllnd_tx_t   *tx;
1613         ptl_event_t    event;
1614         int            which;
1615         int            rc;
1616         int            blocked = 0;
1617         int            found = 0;
1618         int            timeout = 0;
1619
1620         /* Handle any currently queued events, returning immediately if any.
1621          * Otherwise block for the timeout and handle all events queued
1622          * then. */
1623
1624         for (;;) {
1625                 time_t  then = cfs_time_current_sec();
1626
1627                 CDEBUG(D_NET, "Poll(%d)\n", timeout);
1628                 
1629                 rc = PtlEQPoll(&plni->plni_eqh, 1,
1630                                (timeout < 0) ? PTL_TIME_FOREVER : timeout,
1631                                &event, &which);
1632
1633                 if (timeout >= 0 &&
1634                     (cfs_time_current_sec() - then)*1000 > timeout + 1000) {
1635                         /* 1000 mS grace.............................^ */
1636                         CERROR("SLOW PtlEQPoll(%d): %d seconds\n", timeout,
1637                                (int)(cfs_time_current_sec() - then));
1638                 }
1639                 
1640                 CDEBUG(D_NET, "PtlEQPoll rc=%d\n",rc);
1641                 timeout = 0;
1642
1643                 if (rc == PTL_EQ_EMPTY) {
1644                         if (found ||            /* handled some events */
1645                             milliseconds == 0 || /* just checking */
1646                             blocked)            /* blocked already */
1647                                 break;
1648
1649                         blocked = 1;
1650                         timeout = milliseconds;
1651                         continue;
1652                 }
1653
1654                 LASSERT (rc == PTL_OK || rc == PTL_EQ_DROPPED);
1655
1656                 if (rc == PTL_EQ_DROPPED)
1657                         CERROR("Event queue: size %d is too small\n",
1658                                plni->plni_eq_size);
1659
1660                 CDEBUG(D_NET, "event.type=%s(%d)\n",
1661                        ptllnd_evtype2str(event.type),event.type);
1662
1663                 found = 1;
1664                 switch (ptllnd_eventarg2type(event.md.user_ptr)) {
1665                 default:
1666                         LBUG();
1667
1668                 case PTLLND_EVENTARG_TYPE_TX:
1669                         ptllnd_tx_event(ni, &event);
1670                         break;
1671
1672                 case PTLLND_EVENTARG_TYPE_BUF:
1673                         ptllnd_buf_event(ni, &event);
1674                         break;
1675                 }
1676         }
1677
1678         while (!list_empty(&plni->plni_zombie_txs)) {
1679                 tx = list_entry(plni->plni_zombie_txs.next,
1680                                 ptllnd_tx_t, tx_list);
1681                 CDEBUG(D_NET, "Process ZOMBIE tx=%p\n",tx);
1682                 ptllnd_tx_done(tx);
1683         }
1684 }