Whamcloud - gitweb
Severity : major
[fs/lustre-release.git] / lnet / ulnds / ptllnd / ptllnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved.
5  *   Author: Eric Barton <eeb@bartonsoftware.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   This file is confidential source code owned by Cluster File Systems.
11  *   No viewing, modification, compilation, redistribution, or any other
12  *   form of use is permitted except through a signed license agreement.
13  *
14  *   If you have not signed such an agreement, then you have no rights to
15  *   this file.  Please destroy it immediately and contact CFS.
16  *
17  */
18
19 #include "ptllnd.h"
20
21 char *
22 ptllnd_ptlid2str(ptl_process_id_t id)
23 {
24         static char strs[8][32];
25         static int  idx = 0;
26
27         char   *str = strs[idx++];
28         
29         if (idx >= sizeof(strs)/sizeof(strs[0]))
30                 idx = 0;
31
32         snprintf(str, sizeof(strs[0]), FMT_PTLID, id.pid, id.nid);
33         return str;
34 }
35
36 void
37 ptllnd_destroy_peer(ptllnd_peer_t *peer)
38 {
39         lnet_ni_t         *ni = peer->plp_ni;
40         ptllnd_ni_t       *plni = ni->ni_data;
41         int                nmsg = peer->plp_lazy_credits +
42                                   plni->plni_peer_credits;
43
44         ptllnd_size_buffers(ni, -nmsg);
45
46         LASSERT (peer->plp_closing);
47         LASSERT (plni->plni_npeers > 0);
48         LASSERT (list_empty(&peer->plp_txq));
49         LASSERT (list_empty(&peer->plp_activeq));
50         plni->plni_npeers--;
51         LIBCFS_FREE(peer, sizeof(*peer));
52 }
53
54 void
55 ptllnd_abort_txs(ptllnd_ni_t *plni, struct list_head *q)
56 {
57         while (!list_empty(q)) {
58                 ptllnd_tx_t *tx = list_entry(q->next, ptllnd_tx_t, tx_list);
59
60                 tx->tx_status = -ESHUTDOWN;
61                 list_del(&tx->tx_list);
62                 list_add_tail(&tx->tx_list, &plni->plni_zombie_txs);
63         }
64 }
65
66 void
67 ptllnd_close_peer(ptllnd_peer_t *peer, int error)
68 {
69         lnet_ni_t   *ni = peer->plp_ni;
70         ptllnd_ni_t *plni = ni->ni_data;
71
72         if (peer->plp_closing)
73                 return;
74
75         peer->plp_closing = 1;
76
77         if (!list_empty(&peer->plp_txq) ||
78             !list_empty(&peer->plp_activeq) ||
79             error != 0) {
80                 CERROR("Closing %s\n", libcfs_id2str(peer->plp_id));
81                 ptllnd_debug_peer(ni, peer->plp_id);
82         }
83         
84         ptllnd_abort_txs(plni, &peer->plp_txq);
85         ptllnd_abort_txs(plni, &peer->plp_activeq);
86
87         list_del(&peer->plp_list);
88         ptllnd_peer_decref(peer);
89 }
90
91 ptllnd_peer_t *
92 ptllnd_find_peer(lnet_ni_t *ni, lnet_process_id_t id, int create)
93 {
94         ptllnd_ni_t       *plni = ni->ni_data;
95         unsigned int       hash = LNET_NIDADDR(id.nid) % plni->plni_peer_hash_size;
96         struct list_head  *tmp;
97         ptllnd_peer_t     *plp;
98         ptllnd_tx_t       *tx;
99         int                rc;
100
101         LASSERT (LNET_NIDNET(id.nid) == LNET_NIDNET(ni->ni_nid));
102
103         list_for_each(tmp, &plni->plni_peer_hash[hash]) {
104                 plp = list_entry(tmp, ptllnd_peer_t, plp_list);
105
106                 if (plp->plp_id.nid == id.nid &&
107                     plp->plp_id.pid == id.pid) {
108                         ptllnd_peer_addref(plp);
109                         return plp;
110                 }
111         }
112
113         if (!create)
114                 return NULL;
115
116         /* New peer: check first for enough posted buffers */
117         plni->plni_npeers++;
118         rc = ptllnd_size_buffers(ni, plni->plni_peer_credits);
119         if (rc != 0) {
120                 plni->plni_npeers--;
121                 return NULL;
122         }
123
124         LIBCFS_ALLOC(plp, sizeof(*plp));
125         if (plp == NULL) {
126                 CERROR("Can't allocate new peer %s\n", libcfs_id2str(id));
127                 plni->plni_npeers--;
128                 ptllnd_size_buffers(ni, -plni->plni_peer_credits);
129                 return NULL;
130         }
131
132         plp->plp_ni = ni;
133         plp->plp_id = id;
134         plp->plp_ptlid.nid = LNET_NIDADDR(id.nid);
135         plp->plp_ptlid.pid = plni->plni_ptllnd_pid;
136         plp->plp_credits = 1; /* add more later when she gives me credits */
137         plp->plp_max_msg_size = plni->plni_max_msg_size; /* until I hear from her */
138         plp->plp_sent_credits = 1;              /* Implicit credit for HELLO */
139         plp->plp_outstanding_credits = plni->plni_peer_credits - 1;
140         plp->plp_lazy_credits = 0;
141         plp->plp_extra_lazy_credits = 0;
142         plp->plp_match = 0;
143         plp->plp_stamp = 0;
144         plp->plp_recvd_hello = 0;
145         plp->plp_closing = 0;
146         plp->plp_refcount = 1;
147         CFS_INIT_LIST_HEAD(&plp->plp_list);
148         CFS_INIT_LIST_HEAD(&plp->plp_txq);
149         CFS_INIT_LIST_HEAD(&plp->plp_activeq);
150
151         ptllnd_peer_addref(plp);
152         list_add_tail(&plp->plp_list, &plni->plni_peer_hash[hash]);
153
154         tx = ptllnd_new_tx(plp, PTLLND_MSG_TYPE_HELLO, 0);
155         if (tx == NULL) {
156                 CERROR("Can't send HELLO to %s\n", libcfs_id2str(id));
157                 ptllnd_close_peer(plp, -ENOMEM);
158                 ptllnd_peer_decref(plp);
159                 return NULL;
160         }
161
162         tx->tx_msg.ptlm_u.hello.kptlhm_matchbits = PTL_RESERVED_MATCHBITS;
163         tx->tx_msg.ptlm_u.hello.kptlhm_max_msg_size = plni->plni_max_msg_size;
164
165         PTLLND_HISTORY("%s[%d/%d+%d(%d)]: post hello %p", libcfs_id2str(id),
166                        tx->tx_peer->plp_credits,
167                        tx->tx_peer->plp_outstanding_credits,
168                        tx->tx_peer->plp_sent_credits,
169                        plni->plni_peer_credits + 
170                        tx->tx_peer->plp_lazy_credits, tx);
171         ptllnd_post_tx(tx);
172
173         return plp;
174 }
175
176 int
177 ptllnd_count_q(struct list_head *q)
178 {
179         struct list_head *e;
180         int               n = 0;
181         
182         list_for_each(e, q) {
183                 n++;
184         }
185         
186         return n;
187 }
188
189 const char *
190 ptllnd_tx_typestr(int type) 
191 {
192         switch (type) {
193         case PTLLND_RDMA_WRITE:
194                 return "rdma_write";
195                 
196         case PTLLND_RDMA_READ:
197                 return "rdma_read";
198
199         case PTLLND_MSG_TYPE_PUT:
200                 return "put_req";
201                 
202         case PTLLND_MSG_TYPE_GET:
203                 return "get_req";
204
205         case PTLLND_MSG_TYPE_IMMEDIATE:
206                 return "immediate";
207
208         case PTLLND_MSG_TYPE_NOOP:
209                 return "noop";
210
211         case PTLLND_MSG_TYPE_HELLO:
212                 return "hello";
213
214         default:
215                 return "<unknown>";
216         }
217 }
218
219 void
220 ptllnd_debug_tx(ptllnd_tx_t *tx) 
221 {
222         CDEBUG(D_WARNING, "%s %s b "DBGT_FMT"/"DBGT_FMT
223                " r "DBGT_FMT"/"DBGT_FMT" status %d\n",
224                ptllnd_tx_typestr(tx->tx_type),
225                libcfs_id2str(tx->tx_peer->plp_id)
226                DBGT_ARGS(tx->tx_bulk_posted) DBGT_ARGS(tx->tx_bulk_done)
227                DBGT_ARGS(tx->tx_req_posted) DBGT_ARGS(tx->tx_req_done),
228                tx->tx_status);
229 }
230
231 void
232 ptllnd_debug_peer(lnet_ni_t *ni, lnet_process_id_t id)
233 {
234         ptllnd_peer_t    *plp = ptllnd_find_peer(ni, id, 0);
235         struct list_head *tmp;
236         ptllnd_ni_t      *plni = ni->ni_data;
237         ptllnd_tx_t      *tx;
238         
239         if (plp == NULL) {
240                 CDEBUG(D_WARNING, "No peer %s\n", libcfs_id2str(id));
241                 return;
242         }
243         
244         CDEBUG(D_WARNING, "%s %s%s [%d] "LPD64".%06d m "LPD64" q %d/%d c %d/%d+%d(%d)\n",
245                libcfs_id2str(id), 
246                plp->plp_recvd_hello ? "H" : "_",
247                plp->plp_closing     ? "C" : "_",
248                plp->plp_refcount,
249                plp->plp_stamp / 1000000, (int)(plp->plp_stamp % 1000000),
250                plp->plp_match,
251                ptllnd_count_q(&plp->plp_txq),
252                ptllnd_count_q(&plp->plp_activeq),
253                plp->plp_credits, plp->plp_outstanding_credits, plp->plp_sent_credits,
254                plni->plni_peer_credits + plp->plp_lazy_credits);
255
256         CDEBUG(D_WARNING, "txq:\n");
257         list_for_each (tmp, &plp->plp_txq) {
258                 tx = list_entry(tmp, ptllnd_tx_t, tx_list);
259                 
260                 ptllnd_debug_tx(tx);
261         }
262
263         CDEBUG(D_WARNING, "activeq:\n");
264         list_for_each (tmp, &plp->plp_activeq) {
265                 tx = list_entry(tmp, ptllnd_tx_t, tx_list);
266                 
267                 ptllnd_debug_tx(tx);
268         }
269
270         CDEBUG(D_WARNING, "zombies:\n");
271         list_for_each (tmp, &plni->plni_zombie_txs) {
272                 tx = list_entry(tmp, ptllnd_tx_t, tx_list);
273                 
274                 if (tx->tx_peer->plp_id.nid == id.nid &&
275                     tx->tx_peer->plp_id.pid == id.pid)
276                         ptllnd_debug_tx(tx);
277         }
278         
279         CDEBUG(D_WARNING, "history:\n");
280         list_for_each (tmp, &plni->plni_tx_history) {
281                 tx = list_entry(tmp, ptllnd_tx_t, tx_list);
282                 
283                 if (tx->tx_peer->plp_id.nid == id.nid &&
284                     tx->tx_peer->plp_id.pid == id.pid)
285                         ptllnd_debug_tx(tx);
286         }
287         
288         ptllnd_peer_decref(plp);
289         ptllnd_dump_history();
290 }
291
292 void
293 ptllnd_notify(lnet_ni_t *ni, lnet_nid_t nid, int alive)
294 {
295         lnet_process_id_t  id;
296         ptllnd_peer_t     *peer;
297         time_t             start = cfs_time_current_sec();
298         int                w = PTLLND_WARN_LONG_WAIT;
299
300         /* This is only actually used to connect to routers at startup! */
301         if (!alive) {
302                 LBUG();
303                 return;
304         }
305
306         id.nid = nid;
307         id.pid = LUSTRE_SRV_LNET_PID;
308         
309         peer = ptllnd_find_peer(ni, id, 1);
310         if (peer == NULL)
311                 return;
312
313         /* wait for the peer to reply */
314         while (!peer->plp_recvd_hello) {
315                 if (cfs_time_current_sec() > start + w) {
316                         CWARN("Waited %ds to connect to %s\n",
317                               w, libcfs_id2str(id));
318                         w *= 2;
319                 }
320                 
321                 ptllnd_wait(ni, w*1000);
322         }
323         
324         ptllnd_peer_decref(peer);
325 }
326
327 int
328 ptllnd_setasync(lnet_ni_t *ni, lnet_process_id_t id, int nasync)
329 {
330         ptllnd_peer_t *peer = ptllnd_find_peer(ni, id, nasync > 0);
331         int            rc;
332         
333         if (peer == NULL)
334                 return -ENOMEM;
335
336         LASSERT (peer->plp_lazy_credits >= 0);
337         LASSERT (peer->plp_extra_lazy_credits >= 0);
338
339         /* If nasync < 0, we're being told we can reduce the total message
340          * headroom.  We can't do this right now because our peer might already
341          * have credits for the extra buffers, so we just account the extra
342          * headroom in case we need it later and only destroy buffers when the
343          * peer closes.
344          *
345          * Note that the following condition handles this case, where it
346          * actually increases the extra lazy credit counter. */
347
348         if (nasync <= peer->plp_extra_lazy_credits) {
349                 peer->plp_extra_lazy_credits -= nasync;
350                 return 0;
351         }
352
353         LASSERT (nasync > 0);
354
355         nasync -= peer->plp_extra_lazy_credits;
356         peer->plp_extra_lazy_credits = 0;
357         
358         rc = ptllnd_size_buffers(ni, nasync);
359         if (rc == 0) {
360                 peer->plp_lazy_credits += nasync;
361                 peer->plp_outstanding_credits += nasync;
362         }
363
364         return rc;
365 }
366
367 __u32
368 ptllnd_cksum (void *ptr, int nob)
369 {
370         char  *c  = ptr;
371         __u32  sum = 0;
372
373         while (nob-- > 0)
374                 sum = ((sum << 1) | (sum >> 31)) + *c++;
375
376         /* ensure I don't return 0 (== no checksum) */
377         return (sum == 0) ? 1 : sum;
378 }
379
380 ptllnd_tx_t *
381 ptllnd_new_tx(ptllnd_peer_t *peer, int type, int payload_nob)
382 {
383         lnet_ni_t   *ni = peer->plp_ni;
384         ptllnd_ni_t *plni = ni->ni_data;
385         ptllnd_tx_t *tx;
386         int          msgsize;
387
388         CDEBUG(D_NET, "peer=%p type=%d payload=%d\n", peer, type, payload_nob);
389
390         switch (type) {
391         default:
392                 LBUG();
393
394         case PTLLND_RDMA_WRITE:
395         case PTLLND_RDMA_READ:
396                 LASSERT (payload_nob == 0);
397                 msgsize = 0;
398                 break;
399
400         case PTLLND_MSG_TYPE_PUT:
401         case PTLLND_MSG_TYPE_GET:
402                 LASSERT (payload_nob == 0);
403                 msgsize = offsetof(kptl_msg_t, ptlm_u) + 
404                           sizeof(kptl_rdma_msg_t);
405                 break;
406
407         case PTLLND_MSG_TYPE_IMMEDIATE:
408                 msgsize = offsetof(kptl_msg_t,
409                                    ptlm_u.immediate.kptlim_payload[payload_nob]);
410                 break;
411
412         case PTLLND_MSG_TYPE_NOOP:
413                 LASSERT (payload_nob == 0);
414                 msgsize = offsetof(kptl_msg_t, ptlm_u);
415                 break;
416
417         case PTLLND_MSG_TYPE_HELLO:
418                 LASSERT (payload_nob == 0);
419                 msgsize = offsetof(kptl_msg_t, ptlm_u) +
420                           sizeof(kptl_hello_msg_t);
421                 break;
422         }
423
424         msgsize = (msgsize + 7) & ~7;
425         LASSERT (msgsize <= peer->plp_max_msg_size);
426
427         LIBCFS_ALLOC(tx, offsetof(ptllnd_tx_t, tx_msg) + msgsize);
428
429         if (tx == NULL) {
430                 CERROR("Can't allocate msg type %d for %s\n",
431                        type, libcfs_id2str(peer->plp_id));
432                 return NULL;
433         }
434
435         CFS_INIT_LIST_HEAD(&tx->tx_list);
436         tx->tx_peer = peer;
437         tx->tx_type = type;
438         tx->tx_lnetmsg = tx->tx_lnetreplymsg = NULL;
439         tx->tx_niov = 0;
440         tx->tx_iov = NULL;
441         tx->tx_reqmdh = PTL_INVALID_HANDLE;
442         tx->tx_bulkmdh = PTL_INVALID_HANDLE;
443         tx->tx_msgsize = msgsize;
444         tx->tx_completing = 0;
445         tx->tx_status = 0;
446
447         PTLLND_DBGT_INIT(tx->tx_bulk_posted);
448         PTLLND_DBGT_INIT(tx->tx_bulk_done);
449         PTLLND_DBGT_INIT(tx->tx_req_posted);
450         PTLLND_DBGT_INIT(tx->tx_req_done);
451
452         if (msgsize != 0) {
453                 tx->tx_msg.ptlm_magic = PTLLND_MSG_MAGIC;
454                 tx->tx_msg.ptlm_version = PTLLND_MSG_VERSION;
455                 tx->tx_msg.ptlm_type = type;
456                 tx->tx_msg.ptlm_credits = 0;
457                 tx->tx_msg.ptlm_nob = msgsize;
458                 tx->tx_msg.ptlm_cksum = 0;
459                 tx->tx_msg.ptlm_srcnid = ni->ni_nid;
460                 tx->tx_msg.ptlm_srcstamp = plni->plni_stamp;
461                 tx->tx_msg.ptlm_dstnid = peer->plp_id.nid;
462                 tx->tx_msg.ptlm_dststamp = peer->plp_stamp;
463                 tx->tx_msg.ptlm_srcpid = the_lnet.ln_pid;
464                 tx->tx_msg.ptlm_dstpid = peer->plp_id.pid;
465         }
466
467         ptllnd_peer_addref(peer);
468         plni->plni_ntxs++;
469
470         CDEBUG(D_NET, "tx=%p\n",tx);
471
472         return tx;
473 }
474
475 void
476 ptllnd_abort_tx(ptllnd_tx_t *tx, ptl_handle_md_t *mdh)
477 {
478         ptllnd_peer_t   *peer = tx->tx_peer;
479         lnet_ni_t       *ni = peer->plp_ni;
480         int              rc;
481         time_t           start = cfs_time_current_sec();
482         int              w = PTLLND_WARN_LONG_WAIT;
483
484         while (!PtlHandleIsEqual(*mdh, PTL_INVALID_HANDLE)) {
485                 rc = PtlMDUnlink(*mdh);
486 #ifndef LUSTRE_PORTALS_UNLINK_SEMANTICS
487                 if (rc == PTL_OK) /* unlink successful => no unlinked event */
488                         return;
489                 LASSERT (rc == PTL_MD_IN_USE);
490 #endif
491                 if (cfs_time_current_sec() > start + w) {
492                         CWARN("Waited %ds to abort tx to %s\n",
493                               w, libcfs_id2str(peer->plp_id));
494                         w *= 2;
495                 }
496                 /* Wait for ptllnd_tx_event() to invalidate */
497                 ptllnd_wait(ni, w*1000);
498         }
499 }
500
501 void
502 ptllnd_cull_tx_history(ptllnd_ni_t *plni)
503 {
504         int max = plni->plni_max_tx_history;
505
506         while (plni->plni_ntx_history > max) {
507                 ptllnd_tx_t *tx = list_entry(plni->plni_tx_history.next, 
508                                              ptllnd_tx_t, tx_list);
509                 list_del(&tx->tx_list);
510
511                 ptllnd_peer_decref(tx->tx_peer);
512
513                 LIBCFS_FREE(tx, offsetof(ptllnd_tx_t, tx_msg) + tx->tx_msgsize);
514
515                 LASSERT (plni->plni_ntxs > 0);
516                 plni->plni_ntxs--;
517                 plni->plni_ntx_history--;
518         }
519 }
520
521 void
522 ptllnd_tx_done(ptllnd_tx_t *tx)
523 {
524         ptllnd_peer_t   *peer = tx->tx_peer;
525         lnet_ni_t       *ni = peer->plp_ni;
526         ptllnd_ni_t     *plni = ni->ni_data;
527
528         /* CAVEAT EMPTOR: If this tx is being aborted, I'll continue to get
529          * events for this tx until it's unlinked.  So I set tx_completing to
530          * flag the tx is getting handled */
531
532         if (tx->tx_completing)
533                 return;
534
535         tx->tx_completing = 1;
536
537         if (!list_empty(&tx->tx_list))
538                 list_del_init(&tx->tx_list);
539
540         if (tx->tx_status != 0) {
541                 CERROR("Completing tx with error\n");
542                 ptllnd_debug_tx(tx);
543                 ptllnd_close_peer(peer, tx->tx_status);
544         }
545         
546         ptllnd_abort_tx(tx, &tx->tx_reqmdh);
547         ptllnd_abort_tx(tx, &tx->tx_bulkmdh);
548
549         if (tx->tx_niov > 0) {
550                 LIBCFS_FREE(tx->tx_iov, tx->tx_niov * sizeof(*tx->tx_iov));
551                 tx->tx_niov = 0;
552         }
553
554         if (tx->tx_lnetreplymsg != NULL) {
555                 LASSERT (tx->tx_type == PTLLND_MSG_TYPE_GET);
556                 LASSERT (tx->tx_lnetmsg != NULL);
557                 /* Simulate GET success always  */
558                 lnet_finalize(ni, tx->tx_lnetmsg, 0);
559                 CDEBUG(D_NET, "lnet_finalize(tx_lnetreplymsg=%p)\n",tx->tx_lnetreplymsg);
560                 lnet_finalize(ni, tx->tx_lnetreplymsg, tx->tx_status);
561         } else if (tx->tx_lnetmsg != NULL) {
562                 lnet_finalize(ni, tx->tx_lnetmsg, tx->tx_status);
563         }
564
565         plni->plni_ntx_history++;
566         list_add_tail(&tx->tx_list, &plni->plni_tx_history);
567         
568         ptllnd_cull_tx_history(plni);
569 }
570
571 int
572 ptllnd_set_txiov(ptllnd_tx_t *tx,
573                  unsigned int niov, struct iovec *iov,
574                  unsigned int offset, unsigned int len)
575 {
576         ptl_md_iovec_t *piov;
577         int             npiov;
578
579         if (len == 0) {
580                 tx->tx_niov = 0;
581                 return 0;
582         }
583
584         /*
585          * Remove iovec's at the beginning that
586          * are skipped because of the offset.
587          * Adjust the offset accordingly
588          */
589         for (;;) {
590                 LASSERT (niov > 0);
591                 if (offset < iov->iov_len)
592                         break;
593                 offset -= iov->iov_len;
594                 niov--;
595                 iov++;
596         }
597
598         for (;;) {
599                 int temp_offset = offset;
600                 int resid = len;
601                 LIBCFS_ALLOC(piov, niov * sizeof(*piov));
602                 if (piov == NULL)
603                         return -ENOMEM;
604
605                 for (npiov = 0;; npiov++) {
606                         LASSERT (npiov < niov);
607                         LASSERT (iov->iov_len >= temp_offset);
608
609                         piov[npiov].iov_base = iov[npiov].iov_base + temp_offset;
610                         piov[npiov].iov_len = iov[npiov].iov_len - temp_offset;
611                         
612                         if (piov[npiov].iov_len >= resid) {
613                                 piov[npiov].iov_len = resid;
614                                 npiov++;
615                                 break;
616                         }
617                         resid -= piov[npiov].iov_len;
618                         temp_offset = 0;
619                 }
620
621                 if (npiov == niov) {
622                         tx->tx_niov = niov;
623                         tx->tx_iov = piov;
624                         return 0;
625                 }
626
627                 /* Dang! The piov I allocated was too big and it's a drag to
628                  * have to maintain separate 'allocated' and 'used' sizes, so
629                  * I'll just do it again; NB this doesn't happen normally... */
630                 LIBCFS_FREE(piov, niov * sizeof(*piov));
631                 niov = npiov;
632         }
633 }
634
635 void
636 ptllnd_set_md_buffer(ptl_md_t *md, ptllnd_tx_t *tx)
637 {
638         unsigned int    niov = tx->tx_niov;
639         ptl_md_iovec_t *iov = tx->tx_iov;
640
641         LASSERT ((md->options & PTL_MD_IOVEC) == 0);
642
643         if (niov == 0) {
644                 md->start = NULL;
645                 md->length = 0;
646         } else if (niov == 1) {
647                 md->start = iov[0].iov_base;
648                 md->length = iov[0].iov_len;
649         } else {
650                 md->start = iov;
651                 md->length = niov;
652                 md->options |= PTL_MD_IOVEC;
653         }
654 }
655
656 int
657 ptllnd_post_buffer(ptllnd_buffer_t *buf)
658 {
659         lnet_ni_t        *ni = buf->plb_ni;
660         ptllnd_ni_t      *plni = ni->ni_data;
661         ptl_process_id_t  anyid = {
662                 .nid       = PTL_NID_ANY,
663                 .pid       = PTL_PID_ANY};
664         ptl_md_t          md = {
665                 .start     = buf->plb_buffer,
666                 .length    = plni->plni_buffer_size,
667                 .threshold = PTL_MD_THRESH_INF,
668                 .max_size  = plni->plni_max_msg_size,
669                 .options   = (PTLLND_MD_OPTIONS |
670                               PTL_MD_OP_PUT | PTL_MD_MAX_SIZE | 
671                               PTL_MD_LOCAL_ALIGN8),
672                 .user_ptr  = ptllnd_obj2eventarg(buf, PTLLND_EVENTARG_TYPE_BUF),
673                 .eq_handle = plni->plni_eqh};
674         ptl_handle_me_t meh;
675         int             rc;
676
677         LASSERT (!buf->plb_posted);
678
679         rc = PtlMEAttach(plni->plni_nih, plni->plni_portal,
680                          anyid, LNET_MSG_MATCHBITS, 0,
681                          PTL_UNLINK, PTL_INS_AFTER, &meh);
682         if (rc != PTL_OK) {
683                 CERROR("PtlMEAttach failed: %d\n", rc);
684                 return -ENOMEM;
685         }
686
687         buf->plb_posted = 1;
688         plni->plni_nposted_buffers++;
689
690         rc = PtlMDAttach(meh, md, LNET_UNLINK, &buf->plb_md);
691         if (rc == PTL_OK)
692                 return 0;
693
694         CERROR("PtlMDAttach failed: %d\n", rc);
695
696         buf->plb_posted = 0;
697         plni->plni_nposted_buffers--;
698
699         rc = PtlMEUnlink(meh);
700         LASSERT (rc == PTL_OK);
701
702         return -ENOMEM;
703 }
704
705 void
706 ptllnd_check_sends(ptllnd_peer_t *peer)
707 {
708         lnet_ni_t      *ni = peer->plp_ni;
709         ptllnd_ni_t    *plni = ni->ni_data;
710         ptllnd_tx_t    *tx;
711         ptl_md_t        md;
712         ptl_handle_md_t mdh;
713         int             rc;
714
715         CDEBUG(D_NET, "%s: [%d/%d+%d(%d)\n",
716                libcfs_id2str(peer->plp_id), peer->plp_credits,
717                peer->plp_outstanding_credits, peer->plp_sent_credits,
718                plni->plni_peer_credits + peer->plp_lazy_credits);
719
720         if (list_empty(&peer->plp_txq) &&
721             peer->plp_outstanding_credits >= PTLLND_CREDIT_HIGHWATER(plni) &&
722             peer->plp_credits != 0) {
723
724                 tx = ptllnd_new_tx(peer, PTLLND_MSG_TYPE_NOOP, 0);
725                 CDEBUG(D_NET, "NOOP tx=%p\n",tx);
726                 if (tx == NULL) {
727                         CERROR("Can't return credits to %s\n",
728                                libcfs_id2str(peer->plp_id));
729                 } else {
730                         list_add_tail(&tx->tx_list, &peer->plp_txq);
731                 }
732         }
733
734         while (!list_empty(&peer->plp_txq)) {
735                 tx = list_entry(peer->plp_txq.next, ptllnd_tx_t, tx_list);
736
737                 LASSERT (tx->tx_msgsize > 0);
738
739                 LASSERT (peer->plp_outstanding_credits >= 0);
740                 LASSERT (peer->plp_sent_credits >= 0);
741                 LASSERT (peer->plp_outstanding_credits + peer->plp_sent_credits
742                          <= plni->plni_peer_credits + peer->plp_lazy_credits);
743                 LASSERT (peer->plp_credits >= 0);
744
745                 if (peer->plp_credits == 0) {   /* no credits */
746                         PTLLND_HISTORY("%s[%d/%d+%d(%d)]: no creds for %p",
747                                        libcfs_id2str(peer->plp_id),
748                                        peer->plp_credits,
749                                        peer->plp_outstanding_credits,
750                                        peer->plp_sent_credits,
751                                        plni->plni_peer_credits +
752                                        peer->plp_lazy_credits, tx);
753                         break;
754                 }
755                 
756                 if (peer->plp_credits == 1 &&   /* last credit reserved for */
757                     peer->plp_outstanding_credits == 0) { /* returning credits */
758                         PTLLND_HISTORY("%s[%d/%d+%d(%d)]: too few creds for %p",
759                                        libcfs_id2str(peer->plp_id),
760                                        peer->plp_credits,
761                                        peer->plp_outstanding_credits,
762                                        peer->plp_sent_credits,
763                                        plni->plni_peer_credits +
764                                        peer->plp_lazy_credits, tx);
765                         break;
766                 }
767                 
768                 list_del(&tx->tx_list);
769                 list_add_tail(&tx->tx_list, &peer->plp_activeq);
770
771                 CDEBUG(D_NET, "Sending at TX=%p type=%s (%d)\n",tx,
772                         ptllnd_msgtype2str(tx->tx_type),tx->tx_type);
773
774                 if (tx->tx_type == PTLLND_MSG_TYPE_NOOP &&
775                     (!list_empty(&peer->plp_txq) ||
776                      peer->plp_outstanding_credits <
777                      PTLLND_CREDIT_HIGHWATER(plni))) {
778                         /* redundant NOOP */
779                         ptllnd_tx_done(tx);
780                         continue;
781                 }
782
783                 /* Set stamp at the last minute; on a new peer, I don't know it
784                  * until I receive the HELLO back */
785                 tx->tx_msg.ptlm_dststamp = peer->plp_stamp;
786
787                 /*
788                  * Return all the credits we have
789                  */
790                 tx->tx_msg.ptlm_credits = peer->plp_outstanding_credits;
791                 peer->plp_sent_credits += peer->plp_outstanding_credits;
792                 peer->plp_outstanding_credits = 0;
793
794                 /*
795                  * One less credit
796                  */
797                 peer->plp_credits--;
798
799                 if (plni->plni_checksum)
800                         tx->tx_msg.ptlm_cksum = 
801                                 ptllnd_cksum(&tx->tx_msg,
802                                              offsetof(kptl_msg_t, ptlm_u));
803
804                 md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);
805                 md.eq_handle = plni->plni_eqh;
806                 md.threshold = 1;
807                 md.options = PTLLND_MD_OPTIONS;
808                 md.start = &tx->tx_msg;
809                 md.length = tx->tx_msgsize;
810
811                 rc = PtlMDBind(plni->plni_nih, md, LNET_UNLINK, &mdh);
812                 if (rc != PTL_OK) {
813                         CERROR("PtlMDBind for %s failed: %d\n",
814                                libcfs_id2str(peer->plp_id), rc);
815                         tx->tx_status = -EIO;
816                         ptllnd_tx_done(tx);
817                         break;
818                 }
819
820                 LASSERT (tx->tx_type != PTLLND_RDMA_WRITE &&
821                          tx->tx_type != PTLLND_RDMA_READ);
822                 
823                 tx->tx_reqmdh = mdh;
824                 PTLLND_DBGT_STAMP(tx->tx_req_posted);
825
826                 PTLLND_HISTORY("%s[%d/%d+%d(%d)]: %s %p c %d",
827                                libcfs_id2str(peer->plp_id),
828                                peer->plp_credits,
829                                peer->plp_outstanding_credits,
830                                peer->plp_sent_credits,
831                                plni->plni_peer_credits +
832                                peer->plp_lazy_credits,
833                                ptllnd_msgtype2str(tx->tx_type), tx,
834                                tx->tx_msg.ptlm_credits);
835
836                 rc = PtlPut(mdh, PTL_NOACK_REQ, peer->plp_ptlid,
837                             plni->plni_portal, 0, LNET_MSG_MATCHBITS, 0, 0);
838                 if (rc != PTL_OK) {
839                         CERROR("PtlPut for %s failed: %d\n",
840                                libcfs_id2str(peer->plp_id), rc);
841                         tx->tx_status = -EIO;
842                         ptllnd_tx_done(tx);
843                         break;
844                 }
845         }
846 }
847
848 int
849 ptllnd_passive_rdma(ptllnd_peer_t *peer, int type, lnet_msg_t *msg,
850                     unsigned int niov, struct iovec *iov,
851                     unsigned int offset, unsigned int len)
852 {
853         lnet_ni_t      *ni = peer->plp_ni;
854         ptllnd_ni_t    *plni = ni->ni_data;
855         ptllnd_tx_t    *tx = ptllnd_new_tx(peer, type, 0);
856         __u64           matchbits;
857         ptl_md_t        md;
858         ptl_handle_md_t mdh;
859         ptl_handle_me_t meh;
860         int             rc;
861         int             rc2;
862         time_t          start;
863         int             w;
864
865         CDEBUG(D_NET, "niov=%d offset=%d len=%d\n",niov,offset,len);
866
867         LASSERT (type == PTLLND_MSG_TYPE_GET ||
868                  type == PTLLND_MSG_TYPE_PUT);
869
870         if (tx == NULL) {
871                 CERROR("Can't allocate %s tx for %s\n",
872                        type == PTLLND_MSG_TYPE_GET ? "GET" : "PUT/REPLY",
873                        libcfs_id2str(peer->plp_id));
874                 return -ENOMEM;
875         }
876
877         rc = ptllnd_set_txiov(tx, niov, iov, offset, len);
878         if (rc != 0) {
879                 CERROR ("Can't allocate iov %d for %s\n",
880                         niov, libcfs_id2str(peer->plp_id));
881                 rc = -ENOMEM;
882                 goto failed;
883         }
884
885         md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);
886         md.eq_handle = plni->plni_eqh;
887         md.threshold = 1;
888         md.max_size = 0;
889         md.options = PTLLND_MD_OPTIONS;
890         if(type == PTLLND_MSG_TYPE_GET)
891                 md.options |= PTL_MD_OP_PUT | PTL_MD_ACK_DISABLE;
892         else
893                 md.options |= PTL_MD_OP_GET;
894         ptllnd_set_md_buffer(&md, tx);
895
896         start = cfs_time_current_sec();
897         w = PTLLND_WARN_LONG_WAIT;
898
899         while (!peer->plp_recvd_hello) {        /* wait to validate plp_match */
900                 if (peer->plp_closing) {
901                         rc = -EIO;
902                         goto failed;
903                 }
904                 if (cfs_time_current_sec() > start + w) {
905                         CWARN("Waited %ds to connect to %s\n",
906                               w, libcfs_id2str(peer->plp_id));
907                         w *= 2;
908                 }
909                 ptllnd_wait(ni, w*1000);
910         }
911
912         if (peer->plp_match < PTL_RESERVED_MATCHBITS)
913                 peer->plp_match = PTL_RESERVED_MATCHBITS;
914         matchbits = peer->plp_match++;
915         CDEBUG(D_NET, "matchbits " LPX64 " %s\n", matchbits,
916                ptllnd_ptlid2str(peer->plp_ptlid));
917
918         rc = PtlMEAttach(plni->plni_nih, plni->plni_portal, peer->plp_ptlid,
919                          matchbits, 0, PTL_UNLINK, PTL_INS_BEFORE, &meh);
920         if (rc != PTL_OK) {
921                 CERROR("PtlMEAttach for %s failed: %d\n",
922                        libcfs_id2str(peer->plp_id), rc);
923                 rc = -EIO;
924                 goto failed;
925         }
926
927         PTLLND_DBGT_STAMP(tx->tx_bulk_posted);
928
929         rc = PtlMDAttach(meh, md, LNET_UNLINK, &mdh);
930         if (rc != PTL_OK) {
931                 CERROR("PtlMDAttach for %s failed: %d\n",
932                        libcfs_id2str(peer->plp_id), rc);
933                 rc2 = PtlMEUnlink(meh);
934                 LASSERT (rc2 == PTL_OK);
935                 rc = -EIO;
936                 goto failed;
937         }
938         tx->tx_bulkmdh = mdh;
939
940         /*
941          * We need to set the stamp here because it
942          * we could have received a HELLO above that set
943          * peer->plp_stamp
944          */
945         tx->tx_msg.ptlm_dststamp = peer->plp_stamp;
946
947         tx->tx_msg.ptlm_u.rdma.kptlrm_hdr = msg->msg_hdr;
948         tx->tx_msg.ptlm_u.rdma.kptlrm_matchbits = matchbits;
949
950         if (type == PTLLND_MSG_TYPE_GET) {
951                 tx->tx_lnetreplymsg = lnet_create_reply_msg(ni, msg);
952                 if (tx->tx_lnetreplymsg == NULL) {
953                         CERROR("Can't create reply for GET to %s\n",
954                                libcfs_id2str(msg->msg_target));
955                         rc = -ENOMEM;
956                         goto failed;
957                 }
958         }
959
960         tx->tx_lnetmsg = msg;
961         PTLLND_HISTORY("%s[%d/%d+%d(%d)]: post passive %s p %d %p",
962                        libcfs_id2str(msg->msg_target),
963                        peer->plp_credits, peer->plp_outstanding_credits,
964                        peer->plp_sent_credits,
965                        plni->plni_peer_credits + peer->plp_lazy_credits,
966                        lnet_msgtyp2str(msg->msg_type),
967                        (le32_to_cpu(msg->msg_type) == LNET_MSG_PUT) ? 
968                        le32_to_cpu(msg->msg_hdr.msg.put.ptl_index) :
969                        (le32_to_cpu(msg->msg_type) == LNET_MSG_GET) ? 
970                        le32_to_cpu(msg->msg_hdr.msg.get.ptl_index) : -1,
971                        tx);
972         ptllnd_post_tx(tx);
973         return 0;
974
975  failed:
976         ptllnd_tx_done(tx);
977         return rc;
978 }
979
980 int
981 ptllnd_active_rdma(ptllnd_peer_t *peer, int type,
982                    lnet_msg_t *msg, __u64 matchbits,
983                    unsigned int niov, struct iovec *iov,
984                    unsigned int offset, unsigned int len)
985 {
986         lnet_ni_t       *ni = peer->plp_ni;
987         ptllnd_ni_t     *plni = ni->ni_data;
988         ptllnd_tx_t     *tx = ptllnd_new_tx(peer, type, 0);
989         ptl_md_t         md;
990         ptl_handle_md_t  mdh;
991         int              rc;
992
993         LASSERT (type == PTLLND_RDMA_READ ||
994                  type == PTLLND_RDMA_WRITE);
995
996         if (tx == NULL) {
997                 CERROR("Can't allocate tx for RDMA %s with %s\n",
998                        (type == PTLLND_RDMA_WRITE) ? "write" : "read",
999                        libcfs_id2str(peer->plp_id));
1000                 ptllnd_close_peer(peer, -ENOMEM);
1001                 return -ENOMEM;
1002         }
1003
1004         rc = ptllnd_set_txiov(tx, niov, iov, offset, len);
1005         if (rc != 0) {
1006                 CERROR ("Can't allocate iov %d for %s\n",
1007                         niov, libcfs_id2str(peer->plp_id));
1008                 rc = -ENOMEM;
1009                 goto failed;
1010         }
1011
1012         md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);
1013         md.eq_handle = plni->plni_eqh;
1014         md.max_size = 0;
1015         md.options = PTLLND_MD_OPTIONS;
1016         md.threshold = (type == PTLLND_RDMA_READ) ? 2 : 1;
1017
1018         ptllnd_set_md_buffer(&md, tx);
1019
1020         rc = PtlMDBind(plni->plni_nih, md, LNET_UNLINK, &mdh);
1021         if (rc != PTL_OK) {
1022                 CERROR("PtlMDBind for %s failed: %d\n",
1023                        libcfs_id2str(peer->plp_id), rc);
1024                 rc = -EIO;
1025                 goto failed;
1026         }
1027
1028         tx->tx_bulkmdh = mdh;
1029         tx->tx_lnetmsg = msg;
1030
1031         list_add_tail(&tx->tx_list, &peer->plp_activeq);
1032         PTLLND_DBGT_STAMP(tx->tx_bulk_posted);
1033
1034         if (type == PTLLND_RDMA_READ)
1035                 rc = PtlGet(mdh, peer->plp_ptlid,
1036                             plni->plni_portal, 0, matchbits, 0);
1037         else
1038                 rc = PtlPut(mdh, PTL_NOACK_REQ, peer->plp_ptlid,
1039                             plni->plni_portal, 0, matchbits, 0, 
1040                             (msg == NULL) ? PTLLND_RDMA_FAIL : PTLLND_RDMA_OK);
1041
1042         if (rc == PTL_OK)
1043                 return 0;
1044
1045         CERROR("Can't initiate RDMA with %s: %d\n",
1046                libcfs_id2str(peer->plp_id), rc);
1047
1048         tx->tx_lnetmsg = NULL;
1049  failed:
1050         tx->tx_status = rc;
1051         ptllnd_tx_done(tx);    /* this will close peer */
1052         return rc;
1053 }
1054
1055 int
1056 ptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *msg)
1057 {
1058         ptllnd_ni_t    *plni = ni->ni_data;
1059         ptllnd_peer_t  *plp;
1060         ptllnd_tx_t    *tx;
1061         int             nob;
1062         int             rc;
1063
1064         LASSERT (!msg->msg_routing);
1065         LASSERT (msg->msg_kiov == NULL);
1066
1067         LASSERT (msg->msg_niov <= PTL_MD_MAX_IOV); /* !!! */
1068
1069         CDEBUG(D_NET, "%s [%d]+%d,%d -> %s%s\n", 
1070                lnet_msgtyp2str(msg->msg_type),
1071                msg->msg_niov, msg->msg_offset, msg->msg_len,
1072                libcfs_nid2str(msg->msg_target.nid),
1073                msg->msg_target_is_router ? "(rtr)" : "");
1074
1075         if ((msg->msg_target.pid & LNET_PID_USERFLAG) != 0) {
1076                 CERROR("Can't send to non-kernel peer %s\n",
1077                        libcfs_id2str(msg->msg_target));
1078                 return -EHOSTUNREACH;
1079         }
1080         
1081         plp = ptllnd_find_peer(ni, msg->msg_target, 1);
1082         if (plp == NULL)
1083                 return -ENOMEM;
1084
1085         switch (msg->msg_type) {
1086         default:
1087                 LBUG();
1088
1089         case LNET_MSG_ACK:
1090                 LASSERT (msg->msg_len == 0);
1091                 break;                          /* send IMMEDIATE */
1092
1093         case LNET_MSG_GET:
1094                 if (msg->msg_target_is_router)
1095                         break;                  /* send IMMEDIATE */
1096
1097                 nob = msg->msg_md->md_length;
1098                 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[nob]);
1099                 if (nob <= plni->plni_max_msg_size)
1100                         break;
1101
1102                 LASSERT ((msg->msg_md->md_options & LNET_MD_KIOV) == 0);
1103                 rc = ptllnd_passive_rdma(plp, PTLLND_MSG_TYPE_GET, msg,
1104                                          msg->msg_md->md_niov,
1105                                          msg->msg_md->md_iov.iov,
1106                                          0, msg->msg_md->md_length);
1107                 ptllnd_peer_decref(plp);
1108                 return rc;
1109
1110         case LNET_MSG_REPLY:
1111         case LNET_MSG_PUT:
1112                 nob = msg->msg_len;
1113                 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[nob]);
1114                 if (nob <= plp->plp_max_msg_size)
1115                         break;                  /* send IMMEDIATE */
1116
1117                 rc = ptllnd_passive_rdma(plp, PTLLND_MSG_TYPE_PUT, msg,
1118                                          msg->msg_niov, msg->msg_iov,
1119                                          msg->msg_offset, msg->msg_len);
1120                 ptllnd_peer_decref(plp);
1121                 return rc;
1122         }
1123
1124         /* send IMMEDIATE
1125          * NB copy the payload so we don't have to do a fragmented send */
1126
1127         tx = ptllnd_new_tx(plp, PTLLND_MSG_TYPE_IMMEDIATE, msg->msg_len);
1128         if (tx == NULL) {
1129                 CERROR("Can't allocate tx for lnet type %d to %s\n",
1130                        msg->msg_type, libcfs_id2str(msg->msg_target));
1131                 ptllnd_peer_decref(plp);
1132                 return -ENOMEM;
1133         }
1134
1135         lnet_copy_iov2flat(tx->tx_msgsize, &tx->tx_msg,
1136                            offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload),
1137                            msg->msg_niov, msg->msg_iov, msg->msg_offset,
1138                            msg->msg_len);
1139         tx->tx_msg.ptlm_u.immediate.kptlim_hdr = msg->msg_hdr;
1140
1141         tx->tx_lnetmsg = msg;
1142         PTLLND_HISTORY("%s[%d/%d+%d(%d)]: post immediate %s p %d %p",
1143                        libcfs_id2str(msg->msg_target),
1144                        plp->plp_credits, plp->plp_outstanding_credits,
1145                        plp->plp_sent_credits,
1146                        plni->plni_peer_credits + plp->plp_lazy_credits,
1147                        lnet_msgtyp2str(msg->msg_type),
1148                        (le32_to_cpu(msg->msg_type) == LNET_MSG_PUT) ? 
1149                        le32_to_cpu(msg->msg_hdr.msg.put.ptl_index) :
1150                        (le32_to_cpu(msg->msg_type) == LNET_MSG_GET) ? 
1151                        le32_to_cpu(msg->msg_hdr.msg.get.ptl_index) : -1,
1152                        tx);
1153         ptllnd_post_tx(tx);
1154         ptllnd_peer_decref(plp);
1155         return 0;
1156 }
1157
1158 void
1159 ptllnd_rx_done(ptllnd_rx_t *rx)
1160 {
1161         ptllnd_peer_t *plp = rx->rx_peer;
1162         lnet_ni_t     *ni = plp->plp_ni;
1163         ptllnd_ni_t   *plni = ni->ni_data;
1164
1165         plp->plp_outstanding_credits++;
1166
1167         PTLLND_HISTORY("%s[%d/%d+%d(%d)]: rx=%p done\n",
1168                        libcfs_id2str(plp->plp_id),
1169                        plp->plp_credits, plp->plp_outstanding_credits, 
1170                        plp->plp_sent_credits,
1171                        plni->plni_peer_credits + plp->plp_lazy_credits, rx);
1172
1173         ptllnd_check_sends(rx->rx_peer);
1174
1175         LASSERT (plni->plni_nrxs > 0);
1176         plni->plni_nrxs--;
1177 }
1178
1179 int
1180 ptllnd_eager_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg,
1181                   void **new_privatep)
1182 {
1183         /* Shouldn't get here; recvs only block for router buffers */
1184         LBUG();
1185         return 0;
1186 }
1187
1188 int
1189 ptllnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg,
1190             int delayed, unsigned int niov,
1191             struct iovec *iov, lnet_kiov_t *kiov,
1192             unsigned int offset, unsigned int mlen, unsigned int rlen)
1193 {
1194         ptllnd_rx_t    *rx = private;
1195         int             rc = 0;
1196         int             nob;
1197
1198         LASSERT (kiov == NULL);
1199         LASSERT (niov <= PTL_MD_MAX_IOV);       /* !!! */
1200
1201         switch (rx->rx_msg->ptlm_type) {
1202         default:
1203                 LBUG();
1204
1205         case PTLLND_MSG_TYPE_IMMEDIATE:
1206                 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[mlen]);
1207                 if (nob > rx->rx_nob) {
1208                         CERROR("Immediate message from %s too big: %d(%d)\n",
1209                                libcfs_id2str(rx->rx_peer->plp_id),
1210                                nob, rx->rx_nob);
1211                         rc = -EPROTO;
1212                         break;
1213                 }
1214                 lnet_copy_flat2iov(niov, iov, offset,
1215                                    rx->rx_nob, rx->rx_msg,
1216                                    offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload),
1217                                    mlen);
1218                 lnet_finalize(ni, msg, 0);
1219                 break;
1220
1221         case PTLLND_MSG_TYPE_PUT:
1222                 rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_READ, msg,
1223                                         rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,
1224                                         niov, iov, offset, mlen);
1225                 break;
1226
1227         case PTLLND_MSG_TYPE_GET:
1228                 if (msg != NULL)
1229                         rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_WRITE, msg,
1230                                                 rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,
1231                                                 msg->msg_niov, msg->msg_iov,
1232                                                 msg->msg_offset, msg->msg_len);
1233                 else
1234                         rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_WRITE, NULL,
1235                                                 rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,
1236                                                 0, NULL, 0, 0);
1237                 break;
1238         }
1239
1240         ptllnd_rx_done(rx);
1241         return rc;
1242 }
1243
1244 void
1245 ptllnd_abort_on_nak(lnet_ni_t *ni)
1246 {
1247         ptllnd_ni_t      *plni = ni->ni_data;
1248
1249         if (plni->plni_dump_on_nak)
1250                 ptllnd_dump_history();
1251
1252         if (plni->plni_abort_on_nak)
1253                 abort();
1254 }
1255
1256 void
1257 ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator,
1258                      kptl_msg_t *msg, unsigned int nob)
1259 {
1260         ptllnd_ni_t      *plni = ni->ni_data;
1261         const int         basenob = offsetof(kptl_msg_t, ptlm_u);
1262         lnet_process_id_t srcid;
1263         ptllnd_rx_t       rx;
1264         int               flip;
1265         __u16             msg_version;
1266         __u32             msg_cksum;
1267         ptllnd_peer_t    *plp;
1268         int               rc;
1269
1270         if (nob < 6) {
1271                 CERROR("Very short receive from %s\n",
1272                        ptllnd_ptlid2str(initiator));
1273                 return;
1274         }
1275
1276         /* I can at least read MAGIC/VERSION */
1277
1278         flip = msg->ptlm_magic == __swab32(PTLLND_MSG_MAGIC);
1279         if (!flip && msg->ptlm_magic != PTLLND_MSG_MAGIC) {
1280                 CERROR("Bad protocol magic %08x from %s\n", 
1281                        msg->ptlm_magic, ptllnd_ptlid2str(initiator));
1282                 return;
1283         }
1284
1285         msg_version = flip ? __swab16(msg->ptlm_version) : msg->ptlm_version;
1286
1287         if (msg_version != PTLLND_MSG_VERSION) {
1288                 CERROR("Bad protocol version %04x from %s\n", 
1289                        (__u32)msg_version, ptllnd_ptlid2str(initiator));
1290                 ptllnd_abort_on_nak(ni);
1291                 return;
1292         }
1293
1294         if (nob < basenob) {
1295                 CERROR("Short receive from %s: got %d, wanted at least %d\n",
1296                        ptllnd_ptlid2str(initiator), nob, basenob);
1297                 return;
1298         }
1299
1300         /* checksum must be computed with
1301          * 1) ptlm_cksum zero and
1302          * 2) BEFORE anything gets modified/flipped
1303          */
1304         msg_cksum = flip ? __swab32(msg->ptlm_cksum) : msg->ptlm_cksum;
1305         msg->ptlm_cksum = 0;
1306         if (msg_cksum != 0 &&
1307             msg_cksum != ptllnd_cksum(msg, offsetof(kptl_msg_t, ptlm_u))) {
1308                 CERROR("Bad checksum from %s\n", ptllnd_ptlid2str(initiator));
1309                 return;
1310         }
1311
1312         msg->ptlm_version = msg_version;
1313         msg->ptlm_cksum = msg_cksum;
1314         
1315         if (flip) {
1316                 /* NB stamps are opaque cookies */
1317                 __swab32s(&msg->ptlm_nob);
1318                 __swab64s(&msg->ptlm_srcnid);
1319                 __swab64s(&msg->ptlm_dstnid);
1320                 __swab32s(&msg->ptlm_srcpid);
1321                 __swab32s(&msg->ptlm_dstpid);
1322         }
1323         
1324         srcid.nid = msg->ptlm_srcnid;
1325         srcid.pid = msg->ptlm_srcpid;
1326
1327         if (LNET_NIDNET(msg->ptlm_srcnid) != LNET_NIDNET(ni->ni_nid)) {
1328                 CERROR("Bad source id %s from %s\n",
1329                        libcfs_id2str(srcid),
1330                        ptllnd_ptlid2str(initiator));
1331                 return;
1332         }
1333
1334         if (msg->ptlm_type == PTLLND_MSG_TYPE_NAK) {
1335                 CERROR("NAK from %s (%s)\n", 
1336                        libcfs_id2str(srcid),
1337                        ptllnd_ptlid2str(initiator));
1338                 ptllnd_abort_on_nak(ni);
1339                 return;
1340         }
1341         
1342         if (msg->ptlm_dstnid != ni->ni_nid ||
1343             msg->ptlm_dstpid != the_lnet.ln_pid) {
1344                 CERROR("Bad dstid %s (%s expected) from %s\n",
1345                        libcfs_id2str((lnet_process_id_t) {
1346                                .nid = msg->ptlm_dstnid,
1347                                .pid = msg->ptlm_dstpid}),
1348                        libcfs_id2str((lnet_process_id_t) {
1349                                .nid = ni->ni_nid,
1350                                .pid = the_lnet.ln_pid}),
1351                        libcfs_id2str(srcid));
1352                 return;
1353         }
1354
1355         if (msg->ptlm_dststamp != plni->plni_stamp) {
1356                 CERROR("Bad dststamp "LPX64"("LPX64" expected) from %s\n",
1357                        msg->ptlm_dststamp, plni->plni_stamp,
1358                        libcfs_id2str(srcid));
1359                 return;
1360         }
1361
1362         PTLLND_HISTORY("RX %s: %s %d %p", libcfs_id2str(srcid), 
1363                        ptllnd_msgtype2str(msg->ptlm_type),
1364                        msg->ptlm_credits, &rx);
1365
1366         switch (msg->ptlm_type) {
1367         case PTLLND_MSG_TYPE_PUT:
1368         case PTLLND_MSG_TYPE_GET:
1369                 if (nob < basenob + sizeof(kptl_rdma_msg_t)) {
1370                         CERROR("Short rdma request from %s(%s)\n",
1371                                libcfs_id2str(srcid),
1372                                ptllnd_ptlid2str(initiator));
1373                         return;
1374                 }
1375                 if (flip)
1376                         __swab64s(&msg->ptlm_u.rdma.kptlrm_matchbits);
1377                 break;
1378
1379         case PTLLND_MSG_TYPE_IMMEDIATE:
1380                 if (nob < offsetof(kptl_msg_t,
1381                                    ptlm_u.immediate.kptlim_payload)) {
1382                         CERROR("Short immediate from %s(%s)\n",
1383                                libcfs_id2str(srcid),
1384                                ptllnd_ptlid2str(initiator));
1385                         return;
1386                 }
1387                 break;
1388
1389         case PTLLND_MSG_TYPE_HELLO:
1390                 if (nob < basenob + sizeof(kptl_hello_msg_t)) {
1391                         CERROR("Short hello from %s(%s)\n",
1392                                libcfs_id2str(srcid),
1393                                ptllnd_ptlid2str(initiator));
1394                         return;
1395                 }
1396                 if(flip){
1397                         __swab64s(&msg->ptlm_u.hello.kptlhm_matchbits);
1398                         __swab32s(&msg->ptlm_u.hello.kptlhm_max_msg_size);
1399                 }
1400                 break;
1401                 
1402         case PTLLND_MSG_TYPE_NOOP:
1403                 break;
1404
1405         default:
1406                 CERROR("Bad message type %d from %s(%s)\n", msg->ptlm_type,
1407                        libcfs_id2str(srcid),
1408                        ptllnd_ptlid2str(initiator));
1409                 return;
1410         }
1411
1412         plp = ptllnd_find_peer(ni, srcid, 0);
1413         if (plp == NULL) {
1414                 CERROR("Can't find peer %s\n", libcfs_id2str(srcid));
1415                 return;
1416         }
1417
1418         if (msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {
1419                 if (plp->plp_recvd_hello) {
1420                         CERROR("Unexpected HELLO from %s\n",
1421                                libcfs_id2str(srcid));
1422                         ptllnd_peer_decref(plp);
1423                         return;
1424                 }
1425
1426                 plp->plp_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size;
1427                 plp->plp_match = msg->ptlm_u.hello.kptlhm_matchbits;
1428                 plp->plp_stamp = msg->ptlm_srcstamp;
1429                 plp->plp_recvd_hello = 1;
1430
1431         } else if (!plp->plp_recvd_hello) {
1432
1433                 CERROR("Bad message type %d (HELLO expected) from %s\n",
1434                        msg->ptlm_type, libcfs_id2str(srcid));
1435                 ptllnd_peer_decref(plp);
1436                 return;
1437
1438         } else if (msg->ptlm_srcstamp != plp->plp_stamp) {
1439
1440                 CERROR("Bad srcstamp "LPX64"("LPX64" expected) from %s\n",
1441                        msg->ptlm_srcstamp, plp->plp_stamp,
1442                        libcfs_id2str(srcid));
1443                 ptllnd_peer_decref(plp);
1444                 return;
1445         }
1446
1447         /* Check peer only sends when I've sent her credits */
1448         if (plp->plp_sent_credits == 0) {
1449                 CERROR("%s[%d/%d+%d(%d)]: unexpected message\n",
1450                        libcfs_id2str(plp->plp_id),
1451                        plp->plp_credits, plp->plp_outstanding_credits, 
1452                        plp->plp_sent_credits,
1453                        plni->plni_peer_credits + plp->plp_lazy_credits);
1454                 return;
1455         }
1456         plp->plp_sent_credits--;
1457         
1458         /* No check for credit overflow - the peer may post new buffers after
1459          * the startup handshake. */
1460         if (msg->ptlm_credits > 0) {
1461                 plp->plp_credits += msg->ptlm_credits;
1462                 ptllnd_check_sends(plp);
1463         }
1464
1465         /* All OK so far; assume the message is good... */
1466
1467         rx.rx_peer      = plp;
1468         rx.rx_msg       = msg;
1469         rx.rx_nob       = nob;
1470         plni->plni_nrxs++;
1471
1472         switch (msg->ptlm_type) {
1473         default: /* message types have been checked already */
1474                 ptllnd_rx_done(&rx);
1475                 break;
1476
1477         case PTLLND_MSG_TYPE_PUT:
1478         case PTLLND_MSG_TYPE_GET:
1479                 rc = lnet_parse(ni, &msg->ptlm_u.rdma.kptlrm_hdr,
1480                                 msg->ptlm_srcnid, &rx, 1);
1481                 if (rc < 0)
1482                         ptllnd_rx_done(&rx);
1483                 break;
1484
1485         case PTLLND_MSG_TYPE_IMMEDIATE:
1486                 rc = lnet_parse(ni, &msg->ptlm_u.immediate.kptlim_hdr,
1487                                 msg->ptlm_srcnid, &rx, 0);
1488                 if (rc < 0)
1489                         ptllnd_rx_done(&rx);
1490                 break;
1491         }
1492
1493         ptllnd_peer_decref(plp);
1494 }
1495
1496 void
1497 ptllnd_buf_event (lnet_ni_t *ni, ptl_event_t *event)
1498 {
1499         ptllnd_buffer_t *buf = ptllnd_eventarg2obj(event->md.user_ptr);
1500         ptllnd_ni_t     *plni = ni->ni_data;
1501         char            *msg = &buf->plb_buffer[event->offset];
1502         int              repost;
1503         int              unlinked = event->type == PTL_EVENT_UNLINK;
1504
1505         LASSERT (buf->plb_ni == ni);
1506         LASSERT (event->type == PTL_EVENT_PUT_END ||
1507                  event->type == PTL_EVENT_UNLINK);
1508
1509         if (event->ni_fail_type != PTL_NI_OK) {
1510
1511                 CERROR("event type %s(%d), status %s(%d) from %s\n",
1512                        ptllnd_evtype2str(event->type), event->type,
1513                        ptllnd_errtype2str(event->ni_fail_type), 
1514                        event->ni_fail_type,
1515                        ptllnd_ptlid2str(event->initiator));
1516
1517         } else if (event->type == PTL_EVENT_PUT_END) {
1518 #if (PTL_MD_LOCAL_ALIGN8 == 0)
1519                 /* Portals can't force message alignment - someone sending an
1520                  * odd-length message could misalign subsequent messages */
1521                 if ((event->mlength & 7) != 0) {
1522                         CERROR("Message from %s has odd length %llu: "
1523                                "probable version incompatibility\n",
1524                                ptllnd_ptlid2str(event->initiator),
1525                                event->mlength);
1526                         LBUG();
1527                 }
1528 #endif
1529                 LASSERT ((event->offset & 7) == 0);
1530
1531                 ptllnd_parse_request(ni, event->initiator,
1532                                      (kptl_msg_t *)msg, event->mlength);
1533         }
1534
1535 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
1536         /* UNLINK event only on explicit unlink */
1537         repost = (event->unlinked && event->type != PTL_EVENT_UNLINK);
1538         if (event->unlinked)
1539                 unlinked = 1;
1540 #else
1541         /* UNLINK event only on implicit unlink */
1542         repost = (event->type == PTL_EVENT_UNLINK);
1543 #endif
1544
1545         if (unlinked) {
1546                 LASSERT(buf->plb_posted);
1547                 buf->plb_posted = 0;
1548                 plni->plni_nposted_buffers--;
1549         }
1550
1551         if (repost)
1552                 (void) ptllnd_post_buffer(buf);
1553 }
1554
1555 void
1556 ptllnd_tx_event (lnet_ni_t *ni, ptl_event_t *event)
1557 {
1558         ptllnd_ni_t *plni = ni->ni_data;
1559         ptllnd_tx_t *tx = ptllnd_eventarg2obj(event->md.user_ptr);
1560         int          error = (event->ni_fail_type != PTL_NI_OK);
1561         int          isreq;
1562         int          isbulk;
1563 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
1564         int          unlinked = event->unlinked;
1565 #else
1566         int          unlinked = (event->type == PTL_EVENT_UNLINK);
1567 #endif
1568
1569         if (error)
1570                 CERROR("Error %s(%d) event %s(%d) unlinked %d, %s(%d) for %s\n",
1571                        ptllnd_errtype2str(event->ni_fail_type),
1572                        event->ni_fail_type,
1573                        ptllnd_evtype2str(event->type), event->type,
1574                        unlinked, ptllnd_msgtype2str(tx->tx_type), tx->tx_type,
1575                        libcfs_id2str(tx->tx_peer->plp_id));
1576
1577         LASSERT (!PtlHandleIsEqual(event->md_handle, PTL_INVALID_HANDLE));
1578
1579         isreq = PtlHandleIsEqual(event->md_handle, tx->tx_reqmdh);
1580         if (isreq) {
1581                 LASSERT (event->md.start == (void *)&tx->tx_msg);
1582                 if (unlinked) {
1583                         tx->tx_reqmdh = PTL_INVALID_HANDLE;
1584                         PTLLND_DBGT_STAMP(tx->tx_req_done);
1585                 }
1586         }
1587
1588         isbulk = PtlHandleIsEqual(event->md_handle, tx->tx_bulkmdh);
1589         if ( isbulk && unlinked ) {
1590                 tx->tx_bulkmdh = PTL_INVALID_HANDLE;
1591                 PTLLND_DBGT_STAMP(tx->tx_bulk_done);
1592         }
1593
1594         LASSERT (!isreq != !isbulk);            /* always one and only 1 match */
1595
1596         PTLLND_HISTORY("%s[%d/%d+%d(%d)]: TX done %p %s%s",
1597                        libcfs_id2str(tx->tx_peer->plp_id), 
1598                        tx->tx_peer->plp_credits,
1599                        tx->tx_peer->plp_outstanding_credits,
1600                        tx->tx_peer->plp_sent_credits,
1601                        plni->plni_peer_credits + tx->tx_peer->plp_lazy_credits,
1602                        tx, isreq ? "REQ" : "BULK", unlinked ? "(unlinked)" : "");
1603
1604         LASSERT (!isreq != !isbulk);            /* always one and only 1 match */
1605         switch (tx->tx_type) {
1606         default:
1607                 LBUG();
1608
1609         case PTLLND_MSG_TYPE_NOOP:
1610         case PTLLND_MSG_TYPE_HELLO:
1611         case PTLLND_MSG_TYPE_IMMEDIATE:
1612                 LASSERT (event->type == PTL_EVENT_UNLINK ||
1613                          event->type == PTL_EVENT_SEND_END);
1614                 LASSERT (isreq);
1615                 break;
1616
1617         case PTLLND_MSG_TYPE_GET:
1618                 LASSERT (event->type == PTL_EVENT_UNLINK ||
1619                          (isreq && event->type == PTL_EVENT_SEND_END) ||
1620                          (isbulk && event->type == PTL_EVENT_PUT_END));
1621
1622                 if (isbulk && !error && event->type == PTL_EVENT_PUT_END) {
1623                         /* Check GET matched */
1624                         if (event->hdr_data == PTLLND_RDMA_OK) {
1625                                 lnet_set_reply_msg_len(ni, 
1626                                                        tx->tx_lnetreplymsg,
1627                                                        event->mlength);
1628                         } else {
1629                                 CERROR ("Unmatched GET with %s\n",
1630                                         libcfs_id2str(tx->tx_peer->plp_id));
1631                                 tx->tx_status = -EIO;
1632                         }
1633                 }
1634                 break;
1635
1636         case PTLLND_MSG_TYPE_PUT:
1637                 LASSERT (event->type == PTL_EVENT_UNLINK ||
1638                          (isreq && event->type == PTL_EVENT_SEND_END) ||
1639                          (isbulk && event->type == PTL_EVENT_GET_END));
1640                 break;
1641
1642         case PTLLND_RDMA_READ:
1643                 LASSERT (event->type == PTL_EVENT_UNLINK ||
1644                          event->type == PTL_EVENT_SEND_END ||
1645                          event->type == PTL_EVENT_REPLY_END);
1646                 LASSERT (isbulk);
1647                 break;
1648
1649         case PTLLND_RDMA_WRITE:
1650                 LASSERT (event->type == PTL_EVENT_UNLINK ||
1651                          event->type == PTL_EVENT_SEND_END);
1652                 LASSERT (isbulk);
1653         }
1654
1655         /* Schedule ptllnd_tx_done() on error or last completion event */
1656         if (error ||
1657             (PtlHandleIsEqual(tx->tx_bulkmdh, PTL_INVALID_HANDLE) &&
1658              PtlHandleIsEqual(tx->tx_reqmdh, PTL_INVALID_HANDLE))) {
1659                 if (error)
1660                         tx->tx_status = -EIO;
1661                 list_del(&tx->tx_list);
1662                 list_add_tail(&tx->tx_list, &plni->plni_zombie_txs);
1663         }
1664 }
1665
1666 void
1667 ptllnd_wait (lnet_ni_t *ni, int milliseconds)
1668 {
1669         static struct timeval  prevt;
1670         static int             prevt_count;
1671         static int             call_count;
1672
1673         struct timeval         t1;
1674         struct timeval         t2;
1675         
1676         ptllnd_ni_t   *plni = ni->ni_data;
1677         ptllnd_tx_t   *tx;
1678         ptl_event_t    event;
1679         int            which;
1680         int            rc;
1681         int            blocked = 0;
1682         int            found = 0;
1683         int            timeout = 0;
1684
1685         /* Handle any currently queued events, returning immediately if any.
1686          * Otherwise block for the timeout and handle all events queued
1687          * then. */
1688
1689         gettimeofday(&t1, NULL);
1690         call_count++;
1691
1692         for (;;) {
1693                 time_t  then = cfs_time_current_sec();
1694
1695                 rc = PtlEQPoll(&plni->plni_eqh, 1,
1696                                (timeout < 0) ? PTL_TIME_FOREVER : timeout,
1697                                &event, &which);
1698
1699                 if (timeout >= 0 &&
1700                     (cfs_time_current_sec() - then)*1000 > timeout + 1000) {
1701                         /* 1000 mS grace.............................^ */
1702                         CERROR("SLOW PtlEQPoll(%d): %d seconds\n", timeout,
1703                                (int)(cfs_time_current_sec() - then));
1704                 }
1705                 
1706                 timeout = 0;
1707
1708                 if (rc == PTL_EQ_EMPTY) {
1709                         if (found ||            /* handled some events */
1710                             milliseconds == 0 || /* just checking */
1711                             blocked)            /* blocked already */
1712                                 break;
1713
1714                         blocked = 1;
1715                         timeout = (milliseconds < 0) ?
1716                                   PTL_TIME_FOREVER : milliseconds;
1717                         continue;
1718                 }
1719
1720                 LASSERT (rc == PTL_OK || rc == PTL_EQ_DROPPED);
1721
1722                 if (rc == PTL_EQ_DROPPED)
1723                         CERROR("Event queue: size %d is too small\n",
1724                                plni->plni_eq_size);
1725
1726                 found = 1;
1727                 switch (ptllnd_eventarg2type(event.md.user_ptr)) {
1728                 default:
1729                         LBUG();
1730
1731                 case PTLLND_EVENTARG_TYPE_TX:
1732                         ptllnd_tx_event(ni, &event);
1733                         break;
1734
1735                 case PTLLND_EVENTARG_TYPE_BUF:
1736                         ptllnd_buf_event(ni, &event);
1737                         break;
1738                 }
1739         }
1740
1741         while (!list_empty(&plni->plni_zombie_txs)) {
1742                 tx = list_entry(plni->plni_zombie_txs.next,
1743                                 ptllnd_tx_t, tx_list);
1744                 ptllnd_tx_done(tx);
1745         }
1746
1747         gettimeofday(&t2, NULL);
1748                 
1749         if (prevt.tv_sec == 0 ||
1750             prevt.tv_sec != t2.tv_sec) {
1751                 PTLLND_HISTORY("%d wait entered at %d.%06d - prev %d %d.%06d", 
1752                                call_count, (int)t1.tv_sec, (int)t1.tv_usec,
1753                                prevt_count, (int)prevt.tv_sec, (int)prevt.tv_usec);
1754                 prevt = t2;
1755         }
1756 }