Whamcloud - gitweb
b=16098
[fs/lustre-release.git] / lnet / ulnds / ptllnd / ptllnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see [sun.com URL with a
20  * copy of GPLv2].
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/ulnds/ptllnd/ptllnd_cb.c
37  *
38  * Author: Eric Barton <eeb@bartonsoftware.com>
39  */
40
41 #include "ptllnd.h"
42
43 void
44 ptllnd_set_tx_deadline(ptllnd_tx_t *tx)
45 {
46         ptllnd_peer_t  *peer = tx->tx_peer;
47         lnet_ni_t      *ni = peer->plp_ni;
48         ptllnd_ni_t    *plni = ni->ni_data;
49
50         tx->tx_deadline = cfs_time_current_sec() + plni->plni_timeout;
51 }
52
53 void
54 ptllnd_post_tx(ptllnd_tx_t *tx)
55 {
56         ptllnd_peer_t  *peer = tx->tx_peer;
57
58         ptllnd_set_tx_deadline(tx);
59         list_add_tail(&tx->tx_list, &peer->plp_txq);
60         ptllnd_check_sends(peer);
61 }
62
63 char *
64 ptllnd_ptlid2str(ptl_process_id_t id)
65 {
66         static char strs[8][32];
67         static int  idx = 0;
68
69         char   *str = strs[idx++];
70         
71         if (idx >= sizeof(strs)/sizeof(strs[0]))
72                 idx = 0;
73
74         snprintf(str, sizeof(strs[0]), FMT_PTLID, id.pid, id.nid);
75         return str;
76 }
77
78 void
79 ptllnd_destroy_peer(ptllnd_peer_t *peer)
80 {
81         lnet_ni_t         *ni = peer->plp_ni;
82         ptllnd_ni_t       *plni = ni->ni_data;
83         int                nmsg = peer->plp_lazy_credits +
84                                   plni->plni_peer_credits;
85
86         ptllnd_size_buffers(ni, -nmsg);
87
88         LASSERT (peer->plp_closing);
89         LASSERT (plni->plni_npeers > 0);
90         LASSERT (list_empty(&peer->plp_txq));
91         LASSERT (list_empty(&peer->plp_activeq));
92         plni->plni_npeers--;
93         LIBCFS_FREE(peer, sizeof(*peer));
94 }
95
96 void
97 ptllnd_abort_txs(ptllnd_ni_t *plni, struct list_head *q)
98 {
99         while (!list_empty(q)) {
100                 ptllnd_tx_t *tx = list_entry(q->next, ptllnd_tx_t, tx_list);
101
102                 tx->tx_status = -ESHUTDOWN;
103                 list_del(&tx->tx_list);
104                 list_add_tail(&tx->tx_list, &plni->plni_zombie_txs);
105         }
106 }
107
108 void
109 ptllnd_close_peer(ptllnd_peer_t *peer, int error)
110 {
111         lnet_ni_t   *ni = peer->plp_ni;
112         ptllnd_ni_t *plni = ni->ni_data;
113
114         if (peer->plp_closing)
115                 return;
116
117         peer->plp_closing = 1;
118
119         if (!list_empty(&peer->plp_txq) ||
120             !list_empty(&peer->plp_activeq) ||
121             error != 0) {
122                 CWARN("Closing %s\n", libcfs_id2str(peer->plp_id));
123                 if (plni->plni_debug)
124                         ptllnd_dump_debug(ni, peer->plp_id);
125         }
126         
127         ptllnd_abort_txs(plni, &peer->plp_txq);
128         ptllnd_abort_txs(plni, &peer->plp_activeq);
129
130         list_del(&peer->plp_list);
131         ptllnd_peer_decref(peer);
132 }
133
134 ptllnd_peer_t *
135 ptllnd_find_peer(lnet_ni_t *ni, lnet_process_id_t id, int create)
136 {
137         ptllnd_ni_t       *plni = ni->ni_data;
138         unsigned int       hash = LNET_NIDADDR(id.nid) % plni->plni_peer_hash_size;
139         struct list_head  *tmp;
140         ptllnd_peer_t     *plp;
141         ptllnd_tx_t       *tx;
142         int                rc;
143
144         LASSERT (LNET_NIDNET(id.nid) == LNET_NIDNET(ni->ni_nid));
145
146         list_for_each(tmp, &plni->plni_peer_hash[hash]) {
147                 plp = list_entry(tmp, ptllnd_peer_t, plp_list);
148
149                 if (plp->plp_id.nid == id.nid &&
150                     plp->plp_id.pid == id.pid) {
151                         ptllnd_peer_addref(plp);
152                         return plp;
153                 }
154         }
155
156         if (!create)
157                 return NULL;
158
159         /* New peer: check first for enough posted buffers */
160         plni->plni_npeers++;
161         rc = ptllnd_size_buffers(ni, plni->plni_peer_credits);
162         if (rc != 0) {
163                 plni->plni_npeers--;
164                 return NULL;
165         }
166
167         LIBCFS_ALLOC(plp, sizeof(*plp));
168         if (plp == NULL) {
169                 CERROR("Can't allocate new peer %s\n", libcfs_id2str(id));
170                 plni->plni_npeers--;
171                 ptllnd_size_buffers(ni, -plni->plni_peer_credits);
172                 return NULL;
173         }
174
175         plp->plp_ni = ni;
176         plp->plp_id = id;
177         plp->plp_ptlid.nid = LNET_NIDADDR(id.nid);
178         plp->plp_ptlid.pid = plni->plni_ptllnd_pid;
179         plp->plp_credits = 1; /* add more later when she gives me credits */
180         plp->plp_max_msg_size = plni->plni_max_msg_size; /* until I hear from her */
181         plp->plp_sent_credits = 1;              /* Implicit credit for HELLO */
182         plp->plp_outstanding_credits = plni->plni_peer_credits - 1;
183         plp->plp_lazy_credits = 0;
184         plp->plp_extra_lazy_credits = 0;
185         plp->plp_match = 0;
186         plp->plp_stamp = 0;
187         plp->plp_recvd_hello = 0;
188         plp->plp_closing = 0;
189         plp->plp_refcount = 1;
190         CFS_INIT_LIST_HEAD(&plp->plp_list);
191         CFS_INIT_LIST_HEAD(&plp->plp_txq);
192         CFS_INIT_LIST_HEAD(&plp->plp_activeq);
193
194         ptllnd_peer_addref(plp);
195         list_add_tail(&plp->plp_list, &plni->plni_peer_hash[hash]);
196
197         tx = ptllnd_new_tx(plp, PTLLND_MSG_TYPE_HELLO, 0);
198         if (tx == NULL) {
199                 CERROR("Can't send HELLO to %s\n", libcfs_id2str(id));
200                 ptllnd_close_peer(plp, -ENOMEM);
201                 ptllnd_peer_decref(plp);
202                 return NULL;
203         }
204
205         tx->tx_msg.ptlm_u.hello.kptlhm_matchbits = PTL_RESERVED_MATCHBITS;
206         tx->tx_msg.ptlm_u.hello.kptlhm_max_msg_size = plni->plni_max_msg_size;
207
208         PTLLND_HISTORY("%s[%d/%d+%d(%d)]: post hello %p", libcfs_id2str(id),
209                        tx->tx_peer->plp_credits,
210                        tx->tx_peer->plp_outstanding_credits,
211                        tx->tx_peer->plp_sent_credits,
212                        plni->plni_peer_credits + 
213                        tx->tx_peer->plp_lazy_credits, tx);
214         ptllnd_post_tx(tx);
215
216         return plp;
217 }
218
219 int
220 ptllnd_count_q(struct list_head *q)
221 {
222         struct list_head *e;
223         int               n = 0;
224         
225         list_for_each(e, q) {
226                 n++;
227         }
228         
229         return n;
230 }
231
232 const char *
233 ptllnd_tx_typestr(int type) 
234 {
235         switch (type) {
236         case PTLLND_RDMA_WRITE:
237                 return "rdma_write";
238                 
239         case PTLLND_RDMA_READ:
240                 return "rdma_read";
241
242         case PTLLND_MSG_TYPE_PUT:
243                 return "put_req";
244                 
245         case PTLLND_MSG_TYPE_GET:
246                 return "get_req";
247
248         case PTLLND_MSG_TYPE_IMMEDIATE:
249                 return "immediate";
250
251         case PTLLND_MSG_TYPE_NOOP:
252                 return "noop";
253
254         case PTLLND_MSG_TYPE_HELLO:
255                 return "hello";
256
257         default:
258                 return "<unknown>";
259         }
260 }
261
262 void
263 ptllnd_debug_tx(ptllnd_tx_t *tx) 
264 {
265         CDEBUG(D_WARNING, "%s %s b %ld.%06ld/%ld.%06ld"
266                " r %ld.%06ld/%ld.%06ld status %d\n",
267                ptllnd_tx_typestr(tx->tx_type),
268                libcfs_id2str(tx->tx_peer->plp_id),
269                tx->tx_bulk_posted.tv_sec, tx->tx_bulk_posted.tv_usec, 
270                tx->tx_bulk_done.tv_sec, tx->tx_bulk_done.tv_usec,
271                tx->tx_req_posted.tv_sec, tx->tx_req_posted.tv_usec,
272                tx->tx_req_done.tv_sec, tx->tx_req_done.tv_usec,
273                tx->tx_status);
274 }
275
276 void
277 ptllnd_debug_peer(lnet_ni_t *ni, lnet_process_id_t id)
278 {
279         ptllnd_peer_t    *plp = ptllnd_find_peer(ni, id, 0);
280         struct list_head *tmp;
281         ptllnd_ni_t      *plni = ni->ni_data;
282         ptllnd_tx_t      *tx;
283         
284         if (plp == NULL) {
285                 CDEBUG(D_WARNING, "No peer %s\n", libcfs_id2str(id));
286                 return;
287         }
288         
289         CDEBUG(D_WARNING, "%s %s%s [%d] "LPU64".%06d m "LPU64" q %d/%d c %d/%d+%d(%d)\n",
290                libcfs_id2str(id), 
291                plp->plp_recvd_hello ? "H" : "_",
292                plp->plp_closing     ? "C" : "_",
293                plp->plp_refcount,
294                plp->plp_stamp / 1000000, (int)(plp->plp_stamp % 1000000),
295                plp->plp_match,
296                ptllnd_count_q(&plp->plp_txq),
297                ptllnd_count_q(&plp->plp_activeq),
298                plp->plp_credits, plp->plp_outstanding_credits, plp->plp_sent_credits,
299                plni->plni_peer_credits + plp->plp_lazy_credits);
300
301         CDEBUG(D_WARNING, "txq:\n");
302         list_for_each (tmp, &plp->plp_txq) {
303                 tx = list_entry(tmp, ptllnd_tx_t, tx_list);
304                 
305                 ptllnd_debug_tx(tx);
306         }
307
308         CDEBUG(D_WARNING, "activeq:\n");
309         list_for_each (tmp, &plp->plp_activeq) {
310                 tx = list_entry(tmp, ptllnd_tx_t, tx_list);
311                 
312                 ptllnd_debug_tx(tx);
313         }
314
315         CDEBUG(D_WARNING, "zombies:\n");
316         list_for_each (tmp, &plni->plni_zombie_txs) {
317                 tx = list_entry(tmp, ptllnd_tx_t, tx_list);
318                 
319                 if (tx->tx_peer->plp_id.nid == id.nid &&
320                     tx->tx_peer->plp_id.pid == id.pid)
321                         ptllnd_debug_tx(tx);
322         }
323         
324         CDEBUG(D_WARNING, "history:\n");
325         list_for_each (tmp, &plni->plni_tx_history) {
326                 tx = list_entry(tmp, ptllnd_tx_t, tx_list);
327                 
328                 if (tx->tx_peer->plp_id.nid == id.nid &&
329                     tx->tx_peer->plp_id.pid == id.pid)
330                         ptllnd_debug_tx(tx);
331         }
332         
333         ptllnd_peer_decref(plp);
334 }
335
336 void
337 ptllnd_dump_debug(lnet_ni_t *ni, lnet_process_id_t id)
338 {
339         ptllnd_debug_peer(ni, id);
340         ptllnd_dump_history();
341 }
342
343 void
344 ptllnd_notify(lnet_ni_t *ni, lnet_nid_t nid, int alive)
345 {
346         lnet_process_id_t  id;
347         ptllnd_peer_t     *peer;
348         time_t             start = cfs_time_current_sec();
349         ptllnd_ni_t       *plni = ni->ni_data;
350         int                w = plni->plni_long_wait;
351
352         /* This is only actually used to connect to routers at startup! */
353         LASSERT(alive);
354
355         id.nid = nid;
356         id.pid = LUSTRE_SRV_LNET_PID;
357         
358         peer = ptllnd_find_peer(ni, id, 1);
359         if (peer == NULL)
360                 return;
361
362         /* wait for the peer to reply */
363         while (!peer->plp_recvd_hello) {
364                 if (w > 0 && cfs_time_current_sec() > start + w/1000) {
365                         CWARN("Waited %ds to connect to %s\n",
366                               (int)(cfs_time_current_sec() - start),
367                               libcfs_id2str(id));
368                         w *= 2;
369                 }
370                 
371                 ptllnd_wait(ni, w);
372         }
373         
374         ptllnd_peer_decref(peer);
375 }
376
377 int
378 ptllnd_setasync(lnet_ni_t *ni, lnet_process_id_t id, int nasync)
379 {
380         ptllnd_peer_t *peer = ptllnd_find_peer(ni, id, nasync > 0);
381         int            rc;
382         
383         if (peer == NULL)
384                 return -ENOMEM;
385
386         LASSERT (peer->plp_lazy_credits >= 0);
387         LASSERT (peer->plp_extra_lazy_credits >= 0);
388
389         /* If nasync < 0, we're being told we can reduce the total message
390          * headroom.  We can't do this right now because our peer might already
391          * have credits for the extra buffers, so we just account the extra
392          * headroom in case we need it later and only destroy buffers when the
393          * peer closes.
394          *
395          * Note that the following condition handles this case, where it
396          * actually increases the extra lazy credit counter. */
397
398         if (nasync <= peer->plp_extra_lazy_credits) {
399                 peer->plp_extra_lazy_credits -= nasync;
400                 return 0;
401         }
402
403         LASSERT (nasync > 0);
404
405         nasync -= peer->plp_extra_lazy_credits;
406         peer->plp_extra_lazy_credits = 0;
407         
408         rc = ptllnd_size_buffers(ni, nasync);
409         if (rc == 0) {
410                 peer->plp_lazy_credits += nasync;
411                 peer->plp_outstanding_credits += nasync;
412         }
413
414         return rc;
415 }
416
417 __u32
418 ptllnd_cksum (void *ptr, int nob)
419 {
420         char  *c  = ptr;
421         __u32  sum = 0;
422
423         while (nob-- > 0)
424                 sum = ((sum << 1) | (sum >> 31)) + *c++;
425
426         /* ensure I don't return 0 (== no checksum) */
427         return (sum == 0) ? 1 : sum;
428 }
429
430 ptllnd_tx_t *
431 ptllnd_new_tx(ptllnd_peer_t *peer, int type, int payload_nob)
432 {
433         lnet_ni_t   *ni = peer->plp_ni;
434         ptllnd_ni_t *plni = ni->ni_data;
435         ptllnd_tx_t *tx;
436         int          msgsize;
437
438         CDEBUG(D_NET, "peer=%p type=%d payload=%d\n", peer, type, payload_nob);
439
440         switch (type) {
441         default:
442                 LBUG();
443
444         case PTLLND_RDMA_WRITE:
445         case PTLLND_RDMA_READ:
446                 LASSERT (payload_nob == 0);
447                 msgsize = 0;
448                 break;
449
450         case PTLLND_MSG_TYPE_PUT:
451         case PTLLND_MSG_TYPE_GET:
452                 LASSERT (payload_nob == 0);
453                 msgsize = offsetof(kptl_msg_t, ptlm_u) + 
454                           sizeof(kptl_rdma_msg_t);
455                 break;
456
457         case PTLLND_MSG_TYPE_IMMEDIATE:
458                 msgsize = offsetof(kptl_msg_t,
459                                    ptlm_u.immediate.kptlim_payload[payload_nob]);
460                 break;
461
462         case PTLLND_MSG_TYPE_NOOP:
463                 LASSERT (payload_nob == 0);
464                 msgsize = offsetof(kptl_msg_t, ptlm_u);
465                 break;
466
467         case PTLLND_MSG_TYPE_HELLO:
468                 LASSERT (payload_nob == 0);
469                 msgsize = offsetof(kptl_msg_t, ptlm_u) +
470                           sizeof(kptl_hello_msg_t);
471                 break;
472         }
473
474         msgsize = (msgsize + 7) & ~7;
475         LASSERT (msgsize <= peer->plp_max_msg_size);
476
477         LIBCFS_ALLOC(tx, offsetof(ptllnd_tx_t, tx_msg) + msgsize);
478
479         if (tx == NULL) {
480                 CERROR("Can't allocate msg type %d for %s\n",
481                        type, libcfs_id2str(peer->plp_id));
482                 return NULL;
483         }
484
485         CFS_INIT_LIST_HEAD(&tx->tx_list);
486         tx->tx_peer = peer;
487         tx->tx_type = type;
488         tx->tx_lnetmsg = tx->tx_lnetreplymsg = NULL;
489         tx->tx_niov = 0;
490         tx->tx_iov = NULL;
491         tx->tx_reqmdh = PTL_INVALID_HANDLE;
492         tx->tx_bulkmdh = PTL_INVALID_HANDLE;
493         tx->tx_msgsize = msgsize;
494         tx->tx_completing = 0;
495         tx->tx_status = 0;
496
497         memset(&tx->tx_bulk_posted, 0, sizeof(tx->tx_bulk_posted));
498         memset(&tx->tx_bulk_done, 0, sizeof(tx->tx_bulk_done));
499         memset(&tx->tx_req_posted, 0, sizeof(tx->tx_req_posted));
500         memset(&tx->tx_req_done, 0, sizeof(tx->tx_req_done));
501
502         if (msgsize != 0) {
503                 tx->tx_msg.ptlm_magic = PTLLND_MSG_MAGIC;
504                 tx->tx_msg.ptlm_version = PTLLND_MSG_VERSION;
505                 tx->tx_msg.ptlm_type = type;
506                 tx->tx_msg.ptlm_credits = 0;
507                 tx->tx_msg.ptlm_nob = msgsize;
508                 tx->tx_msg.ptlm_cksum = 0;
509                 tx->tx_msg.ptlm_srcnid = ni->ni_nid;
510                 tx->tx_msg.ptlm_srcstamp = plni->plni_stamp;
511                 tx->tx_msg.ptlm_dstnid = peer->plp_id.nid;
512                 tx->tx_msg.ptlm_dststamp = peer->plp_stamp;
513                 tx->tx_msg.ptlm_srcpid = the_lnet.ln_pid;
514                 tx->tx_msg.ptlm_dstpid = peer->plp_id.pid;
515         }
516
517         ptllnd_peer_addref(peer);
518         plni->plni_ntxs++;
519
520         CDEBUG(D_NET, "tx=%p\n",tx);
521
522         return tx;
523 }
524
525 void
526 ptllnd_abort_tx(ptllnd_tx_t *tx, ptl_handle_md_t *mdh)
527 {
528         ptllnd_peer_t   *peer = tx->tx_peer;
529         lnet_ni_t       *ni = peer->plp_ni;
530         int              rc;
531         time_t           start = cfs_time_current_sec();
532         ptllnd_ni_t     *plni = ni->ni_data;
533         int              w = plni->plni_long_wait;
534
535         while (!PtlHandleIsEqual(*mdh, PTL_INVALID_HANDLE)) {
536                 rc = PtlMDUnlink(*mdh);
537 #ifndef LUSTRE_PORTALS_UNLINK_SEMANTICS
538                 if (rc == PTL_OK) /* unlink successful => no unlinked event */
539                         return;
540                 LASSERT (rc == PTL_MD_IN_USE);
541 #endif
542                 if (w > 0 && cfs_time_current_sec() > start + w/1000) {
543                         CWARN("Waited %ds to abort tx to %s\n",
544                               (int)(cfs_time_current_sec() - start),
545                               libcfs_id2str(peer->plp_id));
546                         w *= 2;
547                 }
548                 /* Wait for ptllnd_tx_event() to invalidate */
549                 ptllnd_wait(ni, w);
550         }
551 }
552
553 void
554 ptllnd_cull_tx_history(ptllnd_ni_t *plni)
555 {
556         int max = plni->plni_max_tx_history;
557
558         while (plni->plni_ntx_history > max) {
559                 ptllnd_tx_t *tx = list_entry(plni->plni_tx_history.next, 
560                                              ptllnd_tx_t, tx_list);
561                 list_del(&tx->tx_list);
562
563                 ptllnd_peer_decref(tx->tx_peer);
564
565                 LIBCFS_FREE(tx, offsetof(ptllnd_tx_t, tx_msg) + tx->tx_msgsize);
566
567                 LASSERT (plni->plni_ntxs > 0);
568                 plni->plni_ntxs--;
569                 plni->plni_ntx_history--;
570         }
571 }
572
573 void
574 ptllnd_tx_done(ptllnd_tx_t *tx)
575 {
576         ptllnd_peer_t   *peer = tx->tx_peer;
577         lnet_ni_t       *ni = peer->plp_ni;
578         ptllnd_ni_t     *plni = ni->ni_data;
579
580         /* CAVEAT EMPTOR: If this tx is being aborted, I'll continue to get
581          * events for this tx until it's unlinked.  So I set tx_completing to
582          * flag the tx is getting handled */
583
584         if (tx->tx_completing)
585                 return;
586
587         tx->tx_completing = 1;
588
589         if (!list_empty(&tx->tx_list))
590                 list_del_init(&tx->tx_list);
591
592         if (tx->tx_status != 0) {
593                 if (plni->plni_debug) {
594                         CERROR("Completing tx for %s with error %d\n",
595                                libcfs_id2str(peer->plp_id), tx->tx_status);
596                         ptllnd_debug_tx(tx);
597                 }
598                 ptllnd_close_peer(peer, tx->tx_status);
599         }
600         
601         ptllnd_abort_tx(tx, &tx->tx_reqmdh);
602         ptllnd_abort_tx(tx, &tx->tx_bulkmdh);
603
604         if (tx->tx_niov > 0) {
605                 LIBCFS_FREE(tx->tx_iov, tx->tx_niov * sizeof(*tx->tx_iov));
606                 tx->tx_niov = 0;
607         }
608
609         if (tx->tx_lnetreplymsg != NULL) {
610                 LASSERT (tx->tx_type == PTLLND_MSG_TYPE_GET);
611                 LASSERT (tx->tx_lnetmsg != NULL);
612                 /* Simulate GET success always  */
613                 lnet_finalize(ni, tx->tx_lnetmsg, 0);
614                 CDEBUG(D_NET, "lnet_finalize(tx_lnetreplymsg=%p)\n",tx->tx_lnetreplymsg);
615                 lnet_finalize(ni, tx->tx_lnetreplymsg, tx->tx_status);
616         } else if (tx->tx_lnetmsg != NULL) {
617                 lnet_finalize(ni, tx->tx_lnetmsg, tx->tx_status);
618         }
619
620         plni->plni_ntx_history++;
621         list_add_tail(&tx->tx_list, &plni->plni_tx_history);
622         
623         ptllnd_cull_tx_history(plni);
624 }
625
626 int
627 ptllnd_set_txiov(ptllnd_tx_t *tx,
628                  unsigned int niov, struct iovec *iov,
629                  unsigned int offset, unsigned int len)
630 {
631         ptl_md_iovec_t *piov;
632         int             npiov;
633
634         if (len == 0) {
635                 tx->tx_niov = 0;
636                 return 0;
637         }
638
639         /*
640          * Remove iovec's at the beginning that
641          * are skipped because of the offset.
642          * Adjust the offset accordingly
643          */
644         for (;;) {
645                 LASSERT (niov > 0);
646                 if (offset < iov->iov_len)
647                         break;
648                 offset -= iov->iov_len;
649                 niov--;
650                 iov++;
651         }
652
653         for (;;) {
654                 int temp_offset = offset;
655                 int resid = len;
656                 LIBCFS_ALLOC(piov, niov * sizeof(*piov));
657                 if (piov == NULL)
658                         return -ENOMEM;
659
660                 for (npiov = 0;; npiov++) {
661                         LASSERT (npiov < niov);
662                         LASSERT (iov->iov_len >= temp_offset);
663
664                         piov[npiov].iov_base = iov[npiov].iov_base + temp_offset;
665                         piov[npiov].iov_len = iov[npiov].iov_len - temp_offset;
666                         
667                         if (piov[npiov].iov_len >= resid) {
668                                 piov[npiov].iov_len = resid;
669                                 npiov++;
670                                 break;
671                         }
672                         resid -= piov[npiov].iov_len;
673                         temp_offset = 0;
674                 }
675
676                 if (npiov == niov) {
677                         tx->tx_niov = niov;
678                         tx->tx_iov = piov;
679                         return 0;
680                 }
681
682                 /* Dang! The piov I allocated was too big and it's a drag to
683                  * have to maintain separate 'allocated' and 'used' sizes, so
684                  * I'll just do it again; NB this doesn't happen normally... */
685                 LIBCFS_FREE(piov, niov * sizeof(*piov));
686                 niov = npiov;
687         }
688 }
689
690 void
691 ptllnd_set_md_buffer(ptl_md_t *md, ptllnd_tx_t *tx)
692 {
693         unsigned int    niov = tx->tx_niov;
694         ptl_md_iovec_t *iov = tx->tx_iov;
695
696         LASSERT ((md->options & PTL_MD_IOVEC) == 0);
697
698         if (niov == 0) {
699                 md->start = NULL;
700                 md->length = 0;
701         } else if (niov == 1) {
702                 md->start = iov[0].iov_base;
703                 md->length = iov[0].iov_len;
704         } else {
705                 md->start = iov;
706                 md->length = niov;
707                 md->options |= PTL_MD_IOVEC;
708         }
709 }
710
711 int
712 ptllnd_post_buffer(ptllnd_buffer_t *buf)
713 {
714         lnet_ni_t        *ni = buf->plb_ni;
715         ptllnd_ni_t      *plni = ni->ni_data;
716         ptl_process_id_t  anyid = {
717                 .nid       = PTL_NID_ANY,
718                 .pid       = PTL_PID_ANY};
719         ptl_md_t          md = {
720                 .start     = buf->plb_buffer,
721                 .length    = plni->plni_buffer_size,
722                 .threshold = PTL_MD_THRESH_INF,
723                 .max_size  = plni->plni_max_msg_size,
724                 .options   = (PTLLND_MD_OPTIONS |
725                               PTL_MD_OP_PUT | PTL_MD_MAX_SIZE | 
726                               PTL_MD_LOCAL_ALIGN8),
727                 .user_ptr  = ptllnd_obj2eventarg(buf, PTLLND_EVENTARG_TYPE_BUF),
728                 .eq_handle = plni->plni_eqh};
729         ptl_handle_me_t meh;
730         int             rc;
731
732         LASSERT (!buf->plb_posted);
733
734         rc = PtlMEAttach(plni->plni_nih, plni->plni_portal,
735                          anyid, LNET_MSG_MATCHBITS, 0,
736                          PTL_UNLINK, PTL_INS_AFTER, &meh);
737         if (rc != PTL_OK) {
738                 CERROR("PtlMEAttach failed: %s(%d)\n",
739                        ptllnd_errtype2str(rc), rc);
740                 return -ENOMEM;
741         }
742
743         buf->plb_posted = 1;
744         plni->plni_nposted_buffers++;
745
746         rc = PtlMDAttach(meh, md, LNET_UNLINK, &buf->plb_md);
747         if (rc == PTL_OK)
748                 return 0;
749
750         CERROR("PtlMDAttach failed: %s(%d)\n",
751                ptllnd_errtype2str(rc), rc);
752
753         buf->plb_posted = 0;
754         plni->plni_nposted_buffers--;
755
756         rc = PtlMEUnlink(meh);
757         LASSERT (rc == PTL_OK);
758
759         return -ENOMEM;
760 }
761
762 void
763 ptllnd_check_sends(ptllnd_peer_t *peer)
764 {
765         lnet_ni_t      *ni = peer->plp_ni;
766         ptllnd_ni_t    *plni = ni->ni_data;
767         ptllnd_tx_t    *tx;
768         ptl_md_t        md;
769         ptl_handle_md_t mdh;
770         int             rc;
771
772         CDEBUG(D_NET, "%s: [%d/%d+%d(%d)\n",
773                libcfs_id2str(peer->plp_id), peer->plp_credits,
774                peer->plp_outstanding_credits, peer->plp_sent_credits,
775                plni->plni_peer_credits + peer->plp_lazy_credits);
776
777         if (list_empty(&peer->plp_txq) &&
778             peer->plp_outstanding_credits >= PTLLND_CREDIT_HIGHWATER(plni) &&
779             peer->plp_credits != 0) {
780
781                 tx = ptllnd_new_tx(peer, PTLLND_MSG_TYPE_NOOP, 0);
782                 CDEBUG(D_NET, "NOOP tx=%p\n",tx);
783                 if (tx == NULL) {
784                         CERROR("Can't return credits to %s\n",
785                                libcfs_id2str(peer->plp_id));
786                 } else {
787                         ptllnd_set_tx_deadline(tx);
788                         list_add_tail(&tx->tx_list, &peer->plp_txq);
789                 }
790         }
791
792         while (!list_empty(&peer->plp_txq)) {
793                 tx = list_entry(peer->plp_txq.next, ptllnd_tx_t, tx_list);
794
795                 LASSERT (tx->tx_msgsize > 0);
796
797                 LASSERT (peer->plp_outstanding_credits >= 0);
798                 LASSERT (peer->plp_sent_credits >= 0);
799                 LASSERT (peer->plp_outstanding_credits + peer->plp_sent_credits
800                          <= plni->plni_peer_credits + peer->plp_lazy_credits);
801                 LASSERT (peer->plp_credits >= 0);
802
803                 if (peer->plp_credits == 0) {   /* no credits */
804                         PTLLND_HISTORY("%s[%d/%d+%d(%d)]: no creds for %p",
805                                        libcfs_id2str(peer->plp_id),
806                                        peer->plp_credits,
807                                        peer->plp_outstanding_credits,
808                                        peer->plp_sent_credits,
809                                        plni->plni_peer_credits +
810                                        peer->plp_lazy_credits, tx);
811                         break;
812                 }
813                 
814                 if (peer->plp_credits == 1 &&   /* last credit reserved for */
815                     peer->plp_outstanding_credits == 0) { /* returning credits */
816                         PTLLND_HISTORY("%s[%d/%d+%d(%d)]: too few creds for %p",
817                                        libcfs_id2str(peer->plp_id),
818                                        peer->plp_credits,
819                                        peer->plp_outstanding_credits,
820                                        peer->plp_sent_credits,
821                                        plni->plni_peer_credits +
822                                        peer->plp_lazy_credits, tx);
823                         break;
824                 }
825                 
826                 list_del(&tx->tx_list);
827                 list_add_tail(&tx->tx_list, &peer->plp_activeq);
828
829                 CDEBUG(D_NET, "Sending at TX=%p type=%s (%d)\n",tx,
830                         ptllnd_msgtype2str(tx->tx_type),tx->tx_type);
831
832                 if (tx->tx_type == PTLLND_MSG_TYPE_NOOP &&
833                     (!list_empty(&peer->plp_txq) ||
834                      peer->plp_outstanding_credits <
835                      PTLLND_CREDIT_HIGHWATER(plni))) {
836                         /* redundant NOOP */
837                         ptllnd_tx_done(tx);
838                         continue;
839                 }
840
841                 /* Set stamp at the last minute; on a new peer, I don't know it
842                  * until I receive the HELLO back */
843                 tx->tx_msg.ptlm_dststamp = peer->plp_stamp;
844
845                 /*
846                  * Return all the credits we have
847                  */
848                 tx->tx_msg.ptlm_credits = peer->plp_outstanding_credits;
849                 peer->plp_sent_credits += peer->plp_outstanding_credits;
850                 peer->plp_outstanding_credits = 0;
851
852                 /*
853                  * One less credit
854                  */
855                 peer->plp_credits--;
856
857                 if (plni->plni_checksum)
858                         tx->tx_msg.ptlm_cksum = 
859                                 ptllnd_cksum(&tx->tx_msg,
860                                              offsetof(kptl_msg_t, ptlm_u));
861
862                 md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);
863                 md.eq_handle = plni->plni_eqh;
864                 md.threshold = 1;
865                 md.options = PTLLND_MD_OPTIONS;
866                 md.start = &tx->tx_msg;
867                 md.length = tx->tx_msgsize;
868
869                 rc = PtlMDBind(plni->plni_nih, md, LNET_UNLINK, &mdh);
870                 if (rc != PTL_OK) {
871                         CERROR("PtlMDBind for %s failed: %s(%d)\n",
872                                libcfs_id2str(peer->plp_id),
873                                ptllnd_errtype2str(rc), rc);
874                         tx->tx_status = -EIO;
875                         ptllnd_tx_done(tx);
876                         break;
877                 }
878
879                 LASSERT (tx->tx_type != PTLLND_RDMA_WRITE &&
880                          tx->tx_type != PTLLND_RDMA_READ);
881                 
882                 tx->tx_reqmdh = mdh;
883                 gettimeofday(&tx->tx_req_posted, NULL);
884
885                 PTLLND_HISTORY("%s[%d/%d+%d(%d)]: %s %p c %d",
886                                libcfs_id2str(peer->plp_id),
887                                peer->plp_credits,
888                                peer->plp_outstanding_credits,
889                                peer->plp_sent_credits,
890                                plni->plni_peer_credits +
891                                peer->plp_lazy_credits,
892                                ptllnd_msgtype2str(tx->tx_type), tx,
893                                tx->tx_msg.ptlm_credits);
894
895                 rc = PtlPut(mdh, PTL_NOACK_REQ, peer->plp_ptlid,
896                             plni->plni_portal, 0, LNET_MSG_MATCHBITS, 0, 0);
897                 if (rc != PTL_OK) {
898                         CERROR("PtlPut for %s failed: %s(%d)\n",
899                                libcfs_id2str(peer->plp_id),
900                                ptllnd_errtype2str(rc), rc);
901                         tx->tx_status = -EIO;
902                         ptllnd_tx_done(tx);
903                         break;
904                 }
905         }
906 }
907
908 int
909 ptllnd_passive_rdma(ptllnd_peer_t *peer, int type, lnet_msg_t *msg,
910                     unsigned int niov, struct iovec *iov,
911                     unsigned int offset, unsigned int len)
912 {
913         lnet_ni_t      *ni = peer->plp_ni;
914         ptllnd_ni_t    *plni = ni->ni_data;
915         ptllnd_tx_t    *tx = ptllnd_new_tx(peer, type, 0);
916         __u64           matchbits;
917         ptl_md_t        md;
918         ptl_handle_md_t mdh;
919         ptl_handle_me_t meh;
920         int             rc;
921         int             rc2;
922         time_t          start;
923         int             w;
924
925         CDEBUG(D_NET, "niov=%d offset=%d len=%d\n",niov,offset,len);
926
927         LASSERT (type == PTLLND_MSG_TYPE_GET ||
928                  type == PTLLND_MSG_TYPE_PUT);
929
930         if (tx == NULL) {
931                 CERROR("Can't allocate %s tx for %s\n",
932                        type == PTLLND_MSG_TYPE_GET ? "GET" : "PUT/REPLY",
933                        libcfs_id2str(peer->plp_id));
934                 return -ENOMEM;
935         }
936
937         rc = ptllnd_set_txiov(tx, niov, iov, offset, len);
938         if (rc != 0) {
939                 CERROR ("Can't allocate iov %d for %s\n",
940                         niov, libcfs_id2str(peer->plp_id));
941                 rc = -ENOMEM;
942                 goto failed;
943         }
944
945         md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);
946         md.eq_handle = plni->plni_eqh;
947         md.threshold = 1;
948         md.max_size = 0;
949         md.options = PTLLND_MD_OPTIONS;
950         if(type == PTLLND_MSG_TYPE_GET)
951                 md.options |= PTL_MD_OP_PUT | PTL_MD_ACK_DISABLE;
952         else
953                 md.options |= PTL_MD_OP_GET;
954         ptllnd_set_md_buffer(&md, tx);
955
956         start = cfs_time_current_sec();
957         w = plni->plni_long_wait;
958
959         while (!peer->plp_recvd_hello) {        /* wait to validate plp_match */
960                 if (peer->plp_closing) {
961                         rc = -EIO;
962                         goto failed;
963                 }
964                 if (w > 0 && cfs_time_current_sec() > start + w/1000) {
965                         CWARN("Waited %ds to connect to %s\n",
966                               (int)(cfs_time_current_sec() - start),
967                               libcfs_id2str(peer->plp_id));
968                         w *= 2;
969                 }
970                 ptllnd_wait(ni, w);
971         }
972
973         if (peer->plp_match < PTL_RESERVED_MATCHBITS)
974                 peer->plp_match = PTL_RESERVED_MATCHBITS;
975         matchbits = peer->plp_match++;
976
977         rc = PtlMEAttach(plni->plni_nih, plni->plni_portal, peer->plp_ptlid,
978                          matchbits, 0, PTL_UNLINK, PTL_INS_BEFORE, &meh);
979         if (rc != PTL_OK) {
980                 CERROR("PtlMEAttach for %s failed: %s(%d)\n",
981                        libcfs_id2str(peer->plp_id),
982                        ptllnd_errtype2str(rc), rc);
983                 rc = -EIO;
984                 goto failed;
985         }
986
987         gettimeofday(&tx->tx_bulk_posted, NULL);
988
989         rc = PtlMDAttach(meh, md, LNET_UNLINK, &mdh);
990         if (rc != PTL_OK) {
991                 CERROR("PtlMDAttach for %s failed: %s(%d)\n",
992                        libcfs_id2str(peer->plp_id),
993                        ptllnd_errtype2str(rc), rc);
994                 rc2 = PtlMEUnlink(meh);
995                 LASSERT (rc2 == PTL_OK);
996                 rc = -EIO;
997                 goto failed;
998         }
999         tx->tx_bulkmdh = mdh;
1000
1001         /*
1002          * We need to set the stamp here because it
1003          * we could have received a HELLO above that set
1004          * peer->plp_stamp
1005          */
1006         tx->tx_msg.ptlm_dststamp = peer->plp_stamp;
1007
1008         tx->tx_msg.ptlm_u.rdma.kptlrm_hdr = msg->msg_hdr;
1009         tx->tx_msg.ptlm_u.rdma.kptlrm_matchbits = matchbits;
1010
1011         if (type == PTLLND_MSG_TYPE_GET) {
1012                 tx->tx_lnetreplymsg = lnet_create_reply_msg(ni, msg);
1013                 if (tx->tx_lnetreplymsg == NULL) {
1014                         CERROR("Can't create reply for GET to %s\n",
1015                                libcfs_id2str(msg->msg_target));
1016                         rc = -ENOMEM;
1017                         goto failed;
1018                 }
1019         }
1020
1021         tx->tx_lnetmsg = msg;
1022         PTLLND_HISTORY("%s[%d/%d+%d(%d)]: post passive %s p %d %p",
1023                        libcfs_id2str(msg->msg_target),
1024                        peer->plp_credits, peer->plp_outstanding_credits,
1025                        peer->plp_sent_credits,
1026                        plni->plni_peer_credits + peer->plp_lazy_credits,
1027                        lnet_msgtyp2str(msg->msg_type),
1028                        (le32_to_cpu(msg->msg_type) == LNET_MSG_PUT) ? 
1029                        le32_to_cpu(msg->msg_hdr.msg.put.ptl_index) :
1030                        (le32_to_cpu(msg->msg_type) == LNET_MSG_GET) ? 
1031                        le32_to_cpu(msg->msg_hdr.msg.get.ptl_index) : -1,
1032                        tx);
1033         ptllnd_post_tx(tx);
1034         return 0;
1035
1036  failed:
1037         ptllnd_tx_done(tx);
1038         return rc;
1039 }
1040
1041 int
1042 ptllnd_active_rdma(ptllnd_peer_t *peer, int type,
1043                    lnet_msg_t *msg, __u64 matchbits,
1044                    unsigned int niov, struct iovec *iov,
1045                    unsigned int offset, unsigned int len)
1046 {
1047         lnet_ni_t       *ni = peer->plp_ni;
1048         ptllnd_ni_t     *plni = ni->ni_data;
1049         ptllnd_tx_t     *tx = ptllnd_new_tx(peer, type, 0);
1050         ptl_md_t         md;
1051         ptl_handle_md_t  mdh;
1052         int              rc;
1053
1054         LASSERT (type == PTLLND_RDMA_READ ||
1055                  type == PTLLND_RDMA_WRITE);
1056
1057         if (tx == NULL) {
1058                 CERROR("Can't allocate tx for RDMA %s with %s\n",
1059                        (type == PTLLND_RDMA_WRITE) ? "write" : "read",
1060                        libcfs_id2str(peer->plp_id));
1061                 ptllnd_close_peer(peer, -ENOMEM);
1062                 return -ENOMEM;
1063         }
1064
1065         rc = ptllnd_set_txiov(tx, niov, iov, offset, len);
1066         if (rc != 0) {
1067                 CERROR ("Can't allocate iov %d for %s\n",
1068                         niov, libcfs_id2str(peer->plp_id));
1069                 rc = -ENOMEM;
1070                 goto failed;
1071         }
1072
1073         md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);
1074         md.eq_handle = plni->plni_eqh;
1075         md.max_size = 0;
1076         md.options = PTLLND_MD_OPTIONS;
1077         md.threshold = (type == PTLLND_RDMA_READ) ? 2 : 1;
1078
1079         ptllnd_set_md_buffer(&md, tx);
1080
1081         rc = PtlMDBind(plni->plni_nih, md, LNET_UNLINK, &mdh);
1082         if (rc != PTL_OK) {
1083                 CERROR("PtlMDBind for %s failed: %s(%d)\n",
1084                        libcfs_id2str(peer->plp_id),
1085                        ptllnd_errtype2str(rc), rc);
1086                 rc = -EIO;
1087                 goto failed;
1088         }
1089
1090         tx->tx_bulkmdh = mdh;
1091         tx->tx_lnetmsg = msg;
1092
1093         ptllnd_set_tx_deadline(tx);
1094         list_add_tail(&tx->tx_list, &peer->plp_activeq);
1095         gettimeofday(&tx->tx_bulk_posted, NULL);
1096
1097         if (type == PTLLND_RDMA_READ)
1098                 rc = PtlGet(mdh, peer->plp_ptlid,
1099                             plni->plni_portal, 0, matchbits, 0);
1100         else
1101                 rc = PtlPut(mdh, PTL_NOACK_REQ, peer->plp_ptlid,
1102                             plni->plni_portal, 0, matchbits, 0, 
1103                             (msg == NULL) ? PTLLND_RDMA_FAIL : PTLLND_RDMA_OK);
1104
1105         if (rc == PTL_OK)
1106                 return 0;
1107
1108         CERROR("Can't initiate RDMA with %s: %s(%d)\n",
1109                libcfs_id2str(peer->plp_id),
1110                ptllnd_errtype2str(rc), rc);
1111
1112         tx->tx_lnetmsg = NULL;
1113  failed:
1114         tx->tx_status = rc;
1115         ptllnd_tx_done(tx);    /* this will close peer */
1116         return rc;
1117 }
1118
1119 int
1120 ptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *msg)
1121 {
1122         ptllnd_ni_t    *plni = ni->ni_data;
1123         ptllnd_peer_t  *plp;
1124         ptllnd_tx_t    *tx;
1125         int             nob;
1126         int             rc;
1127
1128         LASSERT (!msg->msg_routing);
1129         LASSERT (msg->msg_kiov == NULL);
1130
1131         LASSERT (msg->msg_niov <= PTL_MD_MAX_IOV); /* !!! */
1132
1133         CDEBUG(D_NET, "%s [%d]+%d,%d -> %s%s\n", 
1134                lnet_msgtyp2str(msg->msg_type),
1135                msg->msg_niov, msg->msg_offset, msg->msg_len,
1136                libcfs_nid2str(msg->msg_target.nid),
1137                msg->msg_target_is_router ? "(rtr)" : "");
1138
1139         if ((msg->msg_target.pid & LNET_PID_USERFLAG) != 0) {
1140                 CERROR("Can't send to non-kernel peer %s\n",
1141                        libcfs_id2str(msg->msg_target));
1142                 return -EHOSTUNREACH;
1143         }
1144         
1145         plp = ptllnd_find_peer(ni, msg->msg_target, 1);
1146         if (plp == NULL)
1147                 return -ENOMEM;
1148
1149         switch (msg->msg_type) {
1150         default:
1151                 LBUG();
1152
1153         case LNET_MSG_ACK:
1154                 LASSERT (msg->msg_len == 0);
1155                 break;                          /* send IMMEDIATE */
1156
1157         case LNET_MSG_GET:
1158                 if (msg->msg_target_is_router)
1159                         break;                  /* send IMMEDIATE */
1160
1161                 nob = msg->msg_md->md_length;
1162                 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[nob]);
1163                 if (nob <= plni->plni_max_msg_size)
1164                         break;
1165
1166                 LASSERT ((msg->msg_md->md_options & LNET_MD_KIOV) == 0);
1167                 rc = ptllnd_passive_rdma(plp, PTLLND_MSG_TYPE_GET, msg,
1168                                          msg->msg_md->md_niov,
1169                                          msg->msg_md->md_iov.iov,
1170                                          0, msg->msg_md->md_length);
1171                 ptllnd_peer_decref(plp);
1172                 return rc;
1173
1174         case LNET_MSG_REPLY:
1175         case LNET_MSG_PUT:
1176                 nob = msg->msg_len;
1177                 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[nob]);
1178                 if (nob <= plp->plp_max_msg_size)
1179                         break;                  /* send IMMEDIATE */
1180
1181                 rc = ptllnd_passive_rdma(plp, PTLLND_MSG_TYPE_PUT, msg,
1182                                          msg->msg_niov, msg->msg_iov,
1183                                          msg->msg_offset, msg->msg_len);
1184                 ptllnd_peer_decref(plp);
1185                 return rc;
1186         }
1187
1188         /* send IMMEDIATE
1189          * NB copy the payload so we don't have to do a fragmented send */
1190
1191         tx = ptllnd_new_tx(plp, PTLLND_MSG_TYPE_IMMEDIATE, msg->msg_len);
1192         if (tx == NULL) {
1193                 CERROR("Can't allocate tx for lnet type %d to %s\n",
1194                        msg->msg_type, libcfs_id2str(msg->msg_target));
1195                 ptllnd_peer_decref(plp);
1196                 return -ENOMEM;
1197         }
1198
1199         lnet_copy_iov2flat(tx->tx_msgsize, &tx->tx_msg,
1200                            offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload),
1201                            msg->msg_niov, msg->msg_iov, msg->msg_offset,
1202                            msg->msg_len);
1203         tx->tx_msg.ptlm_u.immediate.kptlim_hdr = msg->msg_hdr;
1204
1205         tx->tx_lnetmsg = msg;
1206         PTLLND_HISTORY("%s[%d/%d+%d(%d)]: post immediate %s p %d %p",
1207                        libcfs_id2str(msg->msg_target),
1208                        plp->plp_credits, plp->plp_outstanding_credits,
1209                        plp->plp_sent_credits,
1210                        plni->plni_peer_credits + plp->plp_lazy_credits,
1211                        lnet_msgtyp2str(msg->msg_type),
1212                        (le32_to_cpu(msg->msg_type) == LNET_MSG_PUT) ? 
1213                        le32_to_cpu(msg->msg_hdr.msg.put.ptl_index) :
1214                        (le32_to_cpu(msg->msg_type) == LNET_MSG_GET) ? 
1215                        le32_to_cpu(msg->msg_hdr.msg.get.ptl_index) : -1,
1216                        tx);
1217         ptllnd_post_tx(tx);
1218         ptllnd_peer_decref(plp);
1219         return 0;
1220 }
1221
1222 void
1223 ptllnd_rx_done(ptllnd_rx_t *rx)
1224 {
1225         ptllnd_peer_t *plp = rx->rx_peer;
1226         lnet_ni_t     *ni = plp->plp_ni;
1227         ptllnd_ni_t   *plni = ni->ni_data;
1228
1229         plp->plp_outstanding_credits++;
1230
1231         PTLLND_HISTORY("%s[%d/%d+%d(%d)]: rx=%p done\n",
1232                        libcfs_id2str(plp->plp_id),
1233                        plp->plp_credits, plp->plp_outstanding_credits, 
1234                        plp->plp_sent_credits,
1235                        plni->plni_peer_credits + plp->plp_lazy_credits, rx);
1236
1237         ptllnd_check_sends(rx->rx_peer);
1238
1239         LASSERT (plni->plni_nrxs > 0);
1240         plni->plni_nrxs--;
1241 }
1242
1243 int
1244 ptllnd_eager_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg,
1245                   void **new_privatep)
1246 {
1247         /* Shouldn't get here; recvs only block for router buffers */
1248         LBUG();
1249         return 0;
1250 }
1251
1252 int
1253 ptllnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg,
1254             int delayed, unsigned int niov,
1255             struct iovec *iov, lnet_kiov_t *kiov,
1256             unsigned int offset, unsigned int mlen, unsigned int rlen)
1257 {
1258         ptllnd_rx_t    *rx = private;
1259         int             rc = 0;
1260         int             nob;
1261
1262         LASSERT (kiov == NULL);
1263         LASSERT (niov <= PTL_MD_MAX_IOV);       /* !!! */
1264
1265         switch (rx->rx_msg->ptlm_type) {
1266         default:
1267                 LBUG();
1268
1269         case PTLLND_MSG_TYPE_IMMEDIATE:
1270                 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[mlen]);
1271                 if (nob > rx->rx_nob) {
1272                         CERROR("Immediate message from %s too big: %d(%d)\n",
1273                                libcfs_id2str(rx->rx_peer->plp_id),
1274                                nob, rx->rx_nob);
1275                         rc = -EPROTO;
1276                         break;
1277                 }
1278                 lnet_copy_flat2iov(niov, iov, offset,
1279                                    rx->rx_nob, rx->rx_msg,
1280                                    offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload),
1281                                    mlen);
1282                 lnet_finalize(ni, msg, 0);
1283                 break;
1284
1285         case PTLLND_MSG_TYPE_PUT:
1286                 rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_READ, msg,
1287                                         rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,
1288                                         niov, iov, offset, mlen);
1289                 break;
1290
1291         case PTLLND_MSG_TYPE_GET:
1292                 if (msg != NULL)
1293                         rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_WRITE, msg,
1294                                                 rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,
1295                                                 msg->msg_niov, msg->msg_iov,
1296                                                 msg->msg_offset, msg->msg_len);
1297                 else
1298                         rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_WRITE, NULL,
1299                                                 rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,
1300                                                 0, NULL, 0, 0);
1301                 break;
1302         }
1303
1304         ptllnd_rx_done(rx);
1305         return rc;
1306 }
1307
1308 void
1309 ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator,
1310                      kptl_msg_t *msg, unsigned int nob)
1311 {
1312         ptllnd_ni_t      *plni = ni->ni_data;
1313         const int         basenob = offsetof(kptl_msg_t, ptlm_u);
1314         lnet_process_id_t srcid;
1315         ptllnd_rx_t       rx;
1316         int               flip;
1317         __u16             msg_version;
1318         __u32             msg_cksum;
1319         ptllnd_peer_t    *plp;
1320         int               rc;
1321
1322         if (nob < 6) {
1323                 CERROR("Very short receive from %s\n",
1324                        ptllnd_ptlid2str(initiator));
1325                 return;
1326         }
1327
1328         /* I can at least read MAGIC/VERSION */
1329
1330         flip = msg->ptlm_magic == __swab32(PTLLND_MSG_MAGIC);
1331         if (!flip && msg->ptlm_magic != PTLLND_MSG_MAGIC) {
1332                 CERROR("Bad protocol magic %08x from %s\n", 
1333                        msg->ptlm_magic, ptllnd_ptlid2str(initiator));
1334                 return;
1335         }
1336
1337         msg_version = flip ? __swab16(msg->ptlm_version) : msg->ptlm_version;
1338
1339         if (msg_version != PTLLND_MSG_VERSION) {
1340                 CERROR("Bad protocol version %04x from %s: %04x expected\n", 
1341                        (__u32)msg_version, ptllnd_ptlid2str(initiator), PTLLND_MSG_VERSION);
1342
1343                 if (plni->plni_abort_on_protocol_mismatch)
1344                         abort();
1345
1346                 return;
1347         }
1348
1349         if (nob < basenob) {
1350                 CERROR("Short receive from %s: got %d, wanted at least %d\n",
1351                        ptllnd_ptlid2str(initiator), nob, basenob);
1352                 return;
1353         }
1354
1355         /* checksum must be computed with
1356          * 1) ptlm_cksum zero and
1357          * 2) BEFORE anything gets modified/flipped
1358          */
1359         msg_cksum = flip ? __swab32(msg->ptlm_cksum) : msg->ptlm_cksum;
1360         msg->ptlm_cksum = 0;
1361         if (msg_cksum != 0 &&
1362             msg_cksum != ptllnd_cksum(msg, offsetof(kptl_msg_t, ptlm_u))) {
1363                 CERROR("Bad checksum from %s\n", ptllnd_ptlid2str(initiator));
1364                 return;
1365         }
1366
1367         msg->ptlm_version = msg_version;
1368         msg->ptlm_cksum = msg_cksum;
1369         
1370         if (flip) {
1371                 /* NB stamps are opaque cookies */
1372                 __swab32s(&msg->ptlm_nob);
1373                 __swab64s(&msg->ptlm_srcnid);
1374                 __swab64s(&msg->ptlm_dstnid);
1375                 __swab32s(&msg->ptlm_srcpid);
1376                 __swab32s(&msg->ptlm_dstpid);
1377         }
1378         
1379         srcid.nid = msg->ptlm_srcnid;
1380         srcid.pid = msg->ptlm_srcpid;
1381
1382         if (LNET_NIDNET(msg->ptlm_srcnid) != LNET_NIDNET(ni->ni_nid)) {
1383                 CERROR("Bad source id %s from %s\n",
1384                        libcfs_id2str(srcid),
1385                        ptllnd_ptlid2str(initiator));
1386                 return;
1387         }
1388
1389         if (msg->ptlm_type == PTLLND_MSG_TYPE_NAK) {
1390                 CERROR("NAK from %s (%s)\n", 
1391                        libcfs_id2str(srcid),
1392                        ptllnd_ptlid2str(initiator));
1393
1394                 if (plni->plni_dump_on_nak)
1395                         ptllnd_dump_debug(ni, srcid);
1396                 
1397                 if (plni->plni_abort_on_nak)
1398                         abort();
1399                 
1400                 return;
1401         }
1402         
1403         if (msg->ptlm_dstnid != ni->ni_nid ||
1404             msg->ptlm_dstpid != the_lnet.ln_pid) {
1405                 CERROR("Bad dstid %s (%s expected) from %s\n",
1406                        libcfs_id2str((lnet_process_id_t) {
1407                                .nid = msg->ptlm_dstnid,
1408                                .pid = msg->ptlm_dstpid}),
1409                        libcfs_id2str((lnet_process_id_t) {
1410                                .nid = ni->ni_nid,
1411                                .pid = the_lnet.ln_pid}),
1412                        libcfs_id2str(srcid));
1413                 return;
1414         }
1415
1416         if (msg->ptlm_dststamp != plni->plni_stamp) {
1417                 CERROR("Bad dststamp "LPX64"("LPX64" expected) from %s\n",
1418                        msg->ptlm_dststamp, plni->plni_stamp,
1419                        libcfs_id2str(srcid));
1420                 return;
1421         }
1422
1423         PTLLND_HISTORY("RX %s: %s %d %p", libcfs_id2str(srcid), 
1424                        ptllnd_msgtype2str(msg->ptlm_type),
1425                        msg->ptlm_credits, &rx);
1426
1427         switch (msg->ptlm_type) {
1428         case PTLLND_MSG_TYPE_PUT:
1429         case PTLLND_MSG_TYPE_GET:
1430                 if (nob < basenob + sizeof(kptl_rdma_msg_t)) {
1431                         CERROR("Short rdma request from %s(%s)\n",
1432                                libcfs_id2str(srcid),
1433                                ptllnd_ptlid2str(initiator));
1434                         return;
1435                 }
1436                 if (flip)
1437                         __swab64s(&msg->ptlm_u.rdma.kptlrm_matchbits);
1438                 break;
1439
1440         case PTLLND_MSG_TYPE_IMMEDIATE:
1441                 if (nob < offsetof(kptl_msg_t,
1442                                    ptlm_u.immediate.kptlim_payload)) {
1443                         CERROR("Short immediate from %s(%s)\n",
1444                                libcfs_id2str(srcid),
1445                                ptllnd_ptlid2str(initiator));
1446                         return;
1447                 }
1448                 break;
1449
1450         case PTLLND_MSG_TYPE_HELLO:
1451                 if (nob < basenob + sizeof(kptl_hello_msg_t)) {
1452                         CERROR("Short hello from %s(%s)\n",
1453                                libcfs_id2str(srcid),
1454                                ptllnd_ptlid2str(initiator));
1455                         return;
1456                 }
1457                 if(flip){
1458                         __swab64s(&msg->ptlm_u.hello.kptlhm_matchbits);
1459                         __swab32s(&msg->ptlm_u.hello.kptlhm_max_msg_size);
1460                 }
1461                 break;
1462                 
1463         case PTLLND_MSG_TYPE_NOOP:
1464                 break;
1465
1466         default:
1467                 CERROR("Bad message type %d from %s(%s)\n", msg->ptlm_type,
1468                        libcfs_id2str(srcid),
1469                        ptllnd_ptlid2str(initiator));
1470                 return;
1471         }
1472
1473         plp = ptllnd_find_peer(ni, srcid, 0);
1474         if (plp == NULL) {
1475                 CERROR("Can't find peer %s\n", libcfs_id2str(srcid));
1476                 return;
1477         }
1478
1479         if (msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {
1480                 if (plp->plp_recvd_hello) {
1481                         CERROR("Unexpected HELLO from %s\n",
1482                                libcfs_id2str(srcid));
1483                         ptllnd_peer_decref(plp);
1484                         return;
1485                 }
1486
1487                 plp->plp_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size;
1488                 plp->plp_match = msg->ptlm_u.hello.kptlhm_matchbits;
1489                 plp->plp_stamp = msg->ptlm_srcstamp;
1490                 plp->plp_recvd_hello = 1;
1491
1492         } else if (!plp->plp_recvd_hello) {
1493
1494                 CERROR("Bad message type %d (HELLO expected) from %s\n",
1495                        msg->ptlm_type, libcfs_id2str(srcid));
1496                 ptllnd_peer_decref(plp);
1497                 return;
1498
1499         } else if (msg->ptlm_srcstamp != plp->plp_stamp) {
1500
1501                 CERROR("Bad srcstamp "LPX64"("LPX64" expected) from %s\n",
1502                        msg->ptlm_srcstamp, plp->plp_stamp,
1503                        libcfs_id2str(srcid));
1504                 ptllnd_peer_decref(plp);
1505                 return;
1506         }
1507
1508         /* Check peer only sends when I've sent her credits */
1509         if (plp->plp_sent_credits == 0) {
1510                 CERROR("%s[%d/%d+%d(%d)]: unexpected message\n",
1511                        libcfs_id2str(plp->plp_id),
1512                        plp->plp_credits, plp->plp_outstanding_credits, 
1513                        plp->plp_sent_credits,
1514                        plni->plni_peer_credits + plp->plp_lazy_credits);
1515                 return;
1516         }
1517         plp->plp_sent_credits--;
1518         
1519         /* No check for credit overflow - the peer may post new buffers after
1520          * the startup handshake. */
1521         if (msg->ptlm_credits > 0) {
1522                 plp->plp_credits += msg->ptlm_credits;
1523                 ptllnd_check_sends(plp);
1524         }
1525
1526         /* All OK so far; assume the message is good... */
1527
1528         rx.rx_peer      = plp;
1529         rx.rx_msg       = msg;
1530         rx.rx_nob       = nob;
1531         plni->plni_nrxs++;
1532
1533         switch (msg->ptlm_type) {
1534         default: /* message types have been checked already */
1535                 ptllnd_rx_done(&rx);
1536                 break;
1537
1538         case PTLLND_MSG_TYPE_PUT:
1539         case PTLLND_MSG_TYPE_GET:
1540                 rc = lnet_parse(ni, &msg->ptlm_u.rdma.kptlrm_hdr,
1541                                 msg->ptlm_srcnid, &rx, 1);
1542                 if (rc < 0)
1543                         ptllnd_rx_done(&rx);
1544                 break;
1545
1546         case PTLLND_MSG_TYPE_IMMEDIATE:
1547                 rc = lnet_parse(ni, &msg->ptlm_u.immediate.kptlim_hdr,
1548                                 msg->ptlm_srcnid, &rx, 0);
1549                 if (rc < 0)
1550                         ptllnd_rx_done(&rx);
1551                 break;
1552         }
1553
1554         ptllnd_peer_decref(plp);
1555 }
1556
1557 void
1558 ptllnd_buf_event (lnet_ni_t *ni, ptl_event_t *event)
1559 {
1560         ptllnd_buffer_t *buf = ptllnd_eventarg2obj(event->md.user_ptr);
1561         ptllnd_ni_t     *plni = ni->ni_data;
1562         char            *msg = &buf->plb_buffer[event->offset];
1563         int              repost;
1564         int              unlinked = event->type == PTL_EVENT_UNLINK;
1565
1566         LASSERT (buf->plb_ni == ni);
1567         LASSERT (event->type == PTL_EVENT_PUT_END ||
1568                  event->type == PTL_EVENT_UNLINK);
1569
1570         if (event->ni_fail_type != PTL_NI_OK) {
1571
1572                 CERROR("event type %s(%d), status %s(%d) from %s\n",
1573                        ptllnd_evtype2str(event->type), event->type,
1574                        ptllnd_errtype2str(event->ni_fail_type), 
1575                        event->ni_fail_type,
1576                        ptllnd_ptlid2str(event->initiator));
1577
1578         } else if (event->type == PTL_EVENT_PUT_END) {
1579 #if (PTL_MD_LOCAL_ALIGN8 == 0)
1580                 /* Portals can't force message alignment - someone sending an
1581                  * odd-length message could misalign subsequent messages */
1582                 if ((event->mlength & 7) != 0) {
1583                         CERROR("Message from %s has odd length %llu: "
1584                                "probable version incompatibility\n",
1585                                ptllnd_ptlid2str(event->initiator),
1586                                event->mlength);
1587                         LBUG();
1588                 }
1589 #endif
1590                 LASSERT ((event->offset & 7) == 0);
1591
1592                 ptllnd_parse_request(ni, event->initiator,
1593                                      (kptl_msg_t *)msg, event->mlength);
1594         }
1595
1596 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
1597         /* UNLINK event only on explicit unlink */
1598         repost = (event->unlinked && event->type != PTL_EVENT_UNLINK);
1599         if (event->unlinked)
1600                 unlinked = 1;
1601 #else
1602         /* UNLINK event only on implicit unlink */
1603         repost = (event->type == PTL_EVENT_UNLINK);
1604 #endif
1605
1606         if (unlinked) {
1607                 LASSERT(buf->plb_posted);
1608                 buf->plb_posted = 0;
1609                 plni->plni_nposted_buffers--;
1610         }
1611
1612         if (repost)
1613                 (void) ptllnd_post_buffer(buf);
1614 }
1615
1616 void
1617 ptllnd_tx_event (lnet_ni_t *ni, ptl_event_t *event)
1618 {
1619         ptllnd_ni_t *plni = ni->ni_data;
1620         ptllnd_tx_t *tx = ptllnd_eventarg2obj(event->md.user_ptr);
1621         int          error = (event->ni_fail_type != PTL_NI_OK);
1622         int          isreq;
1623         int          isbulk;
1624 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
1625         int          unlinked = event->unlinked;
1626 #else
1627         int          unlinked = (event->type == PTL_EVENT_UNLINK);
1628 #endif
1629
1630         if (error)
1631                 CERROR("Error %s(%d) event %s(%d) unlinked %d, %s(%d) for %s\n",
1632                        ptllnd_errtype2str(event->ni_fail_type),
1633                        event->ni_fail_type,
1634                        ptllnd_evtype2str(event->type), event->type,
1635                        unlinked, ptllnd_msgtype2str(tx->tx_type), tx->tx_type,
1636                        libcfs_id2str(tx->tx_peer->plp_id));
1637
1638         LASSERT (!PtlHandleIsEqual(event->md_handle, PTL_INVALID_HANDLE));
1639
1640         isreq = PtlHandleIsEqual(event->md_handle, tx->tx_reqmdh);
1641         if (isreq) {
1642                 LASSERT (event->md.start == (void *)&tx->tx_msg);
1643                 if (unlinked) {
1644                         tx->tx_reqmdh = PTL_INVALID_HANDLE;
1645                         gettimeofday(&tx->tx_req_done, NULL);
1646                 }
1647         }
1648
1649         isbulk = PtlHandleIsEqual(event->md_handle, tx->tx_bulkmdh);
1650         if ( isbulk && unlinked ) {
1651                 tx->tx_bulkmdh = PTL_INVALID_HANDLE;
1652                 gettimeofday(&tx->tx_bulk_done, NULL);
1653         }
1654
1655         LASSERT (!isreq != !isbulk);            /* always one and only 1 match */
1656
1657         PTLLND_HISTORY("%s[%d/%d+%d(%d)]: TX done %p %s%s",
1658                        libcfs_id2str(tx->tx_peer->plp_id), 
1659                        tx->tx_peer->plp_credits,
1660                        tx->tx_peer->plp_outstanding_credits,
1661                        tx->tx_peer->plp_sent_credits,
1662                        plni->plni_peer_credits + tx->tx_peer->plp_lazy_credits,
1663                        tx, isreq ? "REQ" : "BULK", unlinked ? "(unlinked)" : "");
1664
1665         LASSERT (!isreq != !isbulk);            /* always one and only 1 match */
1666         switch (tx->tx_type) {
1667         default:
1668                 LBUG();
1669
1670         case PTLLND_MSG_TYPE_NOOP:
1671         case PTLLND_MSG_TYPE_HELLO:
1672         case PTLLND_MSG_TYPE_IMMEDIATE:
1673                 LASSERT (event->type == PTL_EVENT_UNLINK ||
1674                          event->type == PTL_EVENT_SEND_END);
1675                 LASSERT (isreq);
1676                 break;
1677
1678         case PTLLND_MSG_TYPE_GET:
1679                 LASSERT (event->type == PTL_EVENT_UNLINK ||
1680                          (isreq && event->type == PTL_EVENT_SEND_END) ||
1681                          (isbulk && event->type == PTL_EVENT_PUT_END));
1682
1683                 if (isbulk && !error && event->type == PTL_EVENT_PUT_END) {
1684                         /* Check GET matched */
1685                         if (event->hdr_data == PTLLND_RDMA_OK) {
1686                                 lnet_set_reply_msg_len(ni, 
1687                                                        tx->tx_lnetreplymsg,
1688                                                        event->mlength);
1689                         } else {
1690                                 CERROR ("Unmatched GET with %s\n",
1691                                         libcfs_id2str(tx->tx_peer->plp_id));
1692                                 tx->tx_status = -EIO;
1693                         }
1694                 }
1695                 break;
1696
1697         case PTLLND_MSG_TYPE_PUT:
1698                 LASSERT (event->type == PTL_EVENT_UNLINK ||
1699                          (isreq && event->type == PTL_EVENT_SEND_END) ||
1700                          (isbulk && event->type == PTL_EVENT_GET_END));
1701                 break;
1702
1703         case PTLLND_RDMA_READ:
1704                 LASSERT (event->type == PTL_EVENT_UNLINK ||
1705                          event->type == PTL_EVENT_SEND_END ||
1706                          event->type == PTL_EVENT_REPLY_END);
1707                 LASSERT (isbulk);
1708                 break;
1709
1710         case PTLLND_RDMA_WRITE:
1711                 LASSERT (event->type == PTL_EVENT_UNLINK ||
1712                          event->type == PTL_EVENT_SEND_END);
1713                 LASSERT (isbulk);
1714         }
1715
1716         /* Schedule ptllnd_tx_done() on error or last completion event */
1717         if (error ||
1718             (PtlHandleIsEqual(tx->tx_bulkmdh, PTL_INVALID_HANDLE) &&
1719              PtlHandleIsEqual(tx->tx_reqmdh, PTL_INVALID_HANDLE))) {
1720                 if (error)
1721                         tx->tx_status = -EIO;
1722                 list_del(&tx->tx_list);
1723                 list_add_tail(&tx->tx_list, &plni->plni_zombie_txs);
1724         }
1725 }
1726
1727 ptllnd_tx_t *
1728 ptllnd_find_timed_out_tx(ptllnd_peer_t *peer)
1729 {
1730         time_t            now = cfs_time_current_sec();
1731         struct list_head *tmp;
1732
1733         list_for_each(tmp, &peer->plp_txq) {
1734                 ptllnd_tx_t *tx = list_entry(tmp, ptllnd_tx_t, tx_list);
1735                 
1736                 if (tx->tx_deadline < now)
1737                         return tx;
1738         }
1739         
1740         list_for_each(tmp, &peer->plp_activeq) {
1741                 ptllnd_tx_t *tx = list_entry(tmp, ptllnd_tx_t, tx_list);
1742                 
1743                 if (tx->tx_deadline < now)
1744                         return tx;
1745         }
1746
1747         return NULL;
1748 }
1749
1750 void
1751 ptllnd_check_peer(ptllnd_peer_t *peer)
1752 {
1753         ptllnd_tx_t *tx = ptllnd_find_timed_out_tx(peer);
1754         
1755         if (tx == NULL)
1756                 return;
1757         
1758         CERROR("%s: timed out\n", libcfs_id2str(peer->plp_id));
1759         ptllnd_close_peer(peer, -ETIMEDOUT);
1760 }
1761
1762 void
1763 ptllnd_watchdog (lnet_ni_t *ni, time_t now)
1764 {
1765         ptllnd_ni_t      *plni = ni->ni_data;
1766         const int         n = 4;
1767         int               p = plni->plni_watchdog_interval;
1768         int               chunk = plni->plni_peer_hash_size;
1769         int               interval = now - (plni->plni_watchdog_nextt - p);
1770         int               i;
1771         struct list_head *hashlist;
1772         struct list_head *tmp;
1773         struct list_head *nxt;
1774
1775         /* Time to check for RDMA timeouts on a few more peers: 
1776          * I try to do checks every 'p' seconds on a proportion of the peer
1777          * table and I need to check every connection 'n' times within a
1778          * timeout interval, to ensure I detect a timeout on any connection
1779          * within (n+1)/n times the timeout interval. */
1780
1781         LASSERT (now >= plni->plni_watchdog_nextt);
1782
1783         if (plni->plni_timeout > n * interval) { /* Scan less than the whole table? */
1784                 chunk = (chunk * n * interval) / plni->plni_timeout;
1785                 if (chunk == 0)
1786                         chunk = 1;
1787         }
1788
1789         for (i = 0; i < chunk; i++) {
1790                 hashlist = &plni->plni_peer_hash[plni->plni_watchdog_peeridx];
1791                 
1792                 list_for_each_safe(tmp, nxt, hashlist) {
1793                         ptllnd_check_peer(list_entry(tmp, ptllnd_peer_t, plp_list));
1794                 }
1795                 
1796                 plni->plni_watchdog_peeridx = (plni->plni_watchdog_peeridx + 1) %
1797                                               plni->plni_peer_hash_size;
1798         }
1799
1800         plni->plni_watchdog_nextt = now + p;
1801 }
1802
1803 void
1804 ptllnd_wait (lnet_ni_t *ni, int milliseconds)
1805 {
1806         static struct timeval  prevt;
1807         static int             prevt_count;
1808         static int             call_count;
1809
1810         struct timeval         start;
1811         struct timeval         then;
1812         struct timeval         now;
1813         struct timeval         deadline;
1814         
1815         ptllnd_ni_t   *plni = ni->ni_data;
1816         ptllnd_tx_t   *tx;
1817         ptl_event_t    event;
1818         int            which;
1819         int            rc;
1820         int            found = 0;
1821         int            timeout = 0;
1822
1823         /* Handle any currently queued events, returning immediately if any.
1824          * Otherwise block for the timeout and handle all events queued
1825          * then. */
1826
1827         gettimeofday(&start, NULL);
1828         call_count++;
1829
1830         if (milliseconds <= 0) {
1831                 deadline = start;
1832         } else {
1833                 deadline.tv_sec  = start.tv_sec  +  milliseconds/1000;
1834                 deadline.tv_usec = start.tv_usec + (milliseconds % 1000)*1000;
1835
1836                 if (deadline.tv_usec >= 1000000) {
1837                         start.tv_usec -= 1000000;
1838                         start.tv_sec++;
1839                 }
1840         }
1841
1842         for (;;) {
1843                 gettimeofday(&then, NULL);
1844                 
1845                 rc = PtlEQPoll(&plni->plni_eqh, 1, timeout, &event, &which);
1846
1847                 gettimeofday(&now, NULL);
1848
1849                 if ((now.tv_sec*1000 + now.tv_usec/1000) - 
1850                     (then.tv_sec*1000 + then.tv_usec/1000) > timeout + 1000) {
1851                         /* 1000 mS grace...........................^ */
1852                         CERROR("SLOW PtlEQPoll(%d): %dmS elapsed\n", timeout,
1853                                (int)(now.tv_sec*1000 + now.tv_usec/1000) - 
1854                                (int)(then.tv_sec*1000 + then.tv_usec/1000));
1855                 }
1856
1857                 if (rc == PTL_EQ_EMPTY) {
1858                         if (found)              /* handled some events */
1859                                 break;
1860
1861                         if (now.tv_sec >= plni->plni_watchdog_nextt) { /* check timeouts? */
1862                                 ptllnd_watchdog(ni, now.tv_sec);
1863                                 LASSERT (now.tv_sec < plni->plni_watchdog_nextt);
1864                         }
1865                         
1866                         if (now.tv_sec > deadline.tv_sec || /* timeout expired */
1867                             (now.tv_sec == deadline.tv_sec &&
1868                              now.tv_usec >= deadline.tv_usec))
1869                                 break;
1870
1871                         if (milliseconds < 0 ||
1872                             plni->plni_watchdog_nextt <= deadline.tv_sec)  {
1873                                 timeout = (plni->plni_watchdog_nextt - now.tv_sec)*1000;
1874                         } else {
1875                                 timeout = (deadline.tv_sec - now.tv_sec)*1000 +
1876                                           (deadline.tv_usec - now.tv_usec)/1000;
1877                         }
1878
1879                         continue;
1880                 }
1881                 
1882                 LASSERT (rc == PTL_OK || rc == PTL_EQ_DROPPED);
1883
1884                 if (rc == PTL_EQ_DROPPED)
1885                         CERROR("Event queue: size %d is too small\n",
1886                                plni->plni_eq_size);
1887
1888                 timeout = 0;
1889                 found = 1;
1890
1891                 switch (ptllnd_eventarg2type(event.md.user_ptr)) {
1892                 default:
1893                         LBUG();
1894
1895                 case PTLLND_EVENTARG_TYPE_TX:
1896                         ptllnd_tx_event(ni, &event);
1897                         break;
1898
1899                 case PTLLND_EVENTARG_TYPE_BUF:
1900                         ptllnd_buf_event(ni, &event);
1901                         break;
1902                 }
1903         }
1904
1905         while (!list_empty(&plni->plni_zombie_txs)) {
1906                 tx = list_entry(plni->plni_zombie_txs.next,
1907                                 ptllnd_tx_t, tx_list);
1908                 list_del_init(&tx->tx_list);
1909                 ptllnd_tx_done(tx);
1910         }
1911
1912         if (prevt.tv_sec == 0 ||
1913             prevt.tv_sec != now.tv_sec) {
1914                 PTLLND_HISTORY("%d wait entered at %d.%06d - prev %d %d.%06d", 
1915                                call_count, (int)start.tv_sec, (int)start.tv_usec,
1916                                prevt_count, (int)prevt.tv_sec, (int)prevt.tv_usec);
1917                 prevt = now;
1918         }
1919 }