Whamcloud - gitweb
i=liang,b=13065:
[fs/lustre-release.git] / lnet / ulnds / ptllnd / ptllnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/ulnds/ptllnd/ptllnd_cb.c
37  *
38  * Author: Eric Barton <eeb@bartonsoftware.com>
39  */
40
41 #include "ptllnd.h"
42
43 void
44 ptllnd_set_tx_deadline(ptllnd_tx_t *tx)
45 {
46         ptllnd_peer_t  *peer = tx->tx_peer;
47         lnet_ni_t      *ni = peer->plp_ni;
48         ptllnd_ni_t    *plni = ni->ni_data;
49
50         tx->tx_deadline = cfs_time_current_sec() + plni->plni_timeout;
51 }
52
53 void
54 ptllnd_post_tx(ptllnd_tx_t *tx)
55 {
56         ptllnd_peer_t  *peer = tx->tx_peer;
57
58         LASSERT (tx->tx_type != PTLLND_MSG_TYPE_NOOP);
59
60         ptllnd_set_tx_deadline(tx);
61         list_add_tail(&tx->tx_list, &peer->plp_txq);
62         ptllnd_check_sends(peer);
63 }
64
65 char *
66 ptllnd_ptlid2str(ptl_process_id_t id)
67 {
68         static char strs[8][32];
69         static int  idx = 0;
70
71         char   *str = strs[idx++];
72
73         if (idx >= sizeof(strs)/sizeof(strs[0]))
74                 idx = 0;
75
76         snprintf(str, sizeof(strs[0]), FMT_PTLID, id.pid, id.nid);
77         return str;
78 }
79
80 void
81 ptllnd_destroy_peer(ptllnd_peer_t *peer)
82 {
83         lnet_ni_t         *ni = peer->plp_ni;
84         ptllnd_ni_t       *plni = ni->ni_data;
85         int                nmsg = peer->plp_lazy_credits +
86                                   plni->plni_peer_credits;
87
88         ptllnd_size_buffers(ni, -nmsg);
89
90         LASSERT (peer->plp_closing);
91         LASSERT (plni->plni_npeers > 0);
92         LASSERT (list_empty(&peer->plp_txq));
93         LASSERT (list_empty(&peer->plp_noopq));
94         LASSERT (list_empty(&peer->plp_activeq));
95         plni->plni_npeers--;
96         LIBCFS_FREE(peer, sizeof(*peer));
97 }
98
99 void
100 ptllnd_abort_txs(ptllnd_ni_t *plni, struct list_head *q)
101 {
102         while (!list_empty(q)) {
103                 ptllnd_tx_t *tx = list_entry(q->next, ptllnd_tx_t, tx_list);
104
105                 tx->tx_status = -ESHUTDOWN;
106                 list_del(&tx->tx_list);
107                 list_add_tail(&tx->tx_list, &plni->plni_zombie_txs);
108         }
109 }
110
111 void
112 ptllnd_close_peer(ptllnd_peer_t *peer, int error)
113 {
114         lnet_ni_t   *ni = peer->plp_ni;
115         ptllnd_ni_t *plni = ni->ni_data;
116
117         if (peer->plp_closing)
118                 return;
119
120         peer->plp_closing = 1;
121
122         if (!list_empty(&peer->plp_txq) ||
123             !list_empty(&peer->plp_noopq) ||
124             !list_empty(&peer->plp_activeq) ||
125             error != 0) {
126                 CWARN("Closing %s: %d\n", libcfs_id2str(peer->plp_id), error);
127                 if (plni->plni_debug)
128                         ptllnd_dump_debug(ni, peer->plp_id);
129         }
130
131         ptllnd_abort_txs(plni, &peer->plp_txq);
132         ptllnd_abort_txs(plni, &peer->plp_noopq);
133         ptllnd_abort_txs(plni, &peer->plp_activeq);
134
135         list_del(&peer->plp_list);
136         ptllnd_peer_decref(peer);
137 }
138
139 ptllnd_peer_t *
140 ptllnd_find_peer(lnet_ni_t *ni, lnet_process_id_t id, int create)
141 {
142         ptllnd_ni_t       *plni = ni->ni_data;
143         unsigned int       hash = LNET_NIDADDR(id.nid) % plni->plni_peer_hash_size;
144         ptllnd_peer_t     *plp;
145         ptllnd_tx_t       *tx;
146         int                rc;
147
148         LASSERT (LNET_NIDNET(id.nid) == LNET_NIDNET(ni->ni_nid));
149
150         list_for_each_entry (plp, &plni->plni_peer_hash[hash], plp_list) {
151                 if (plp->plp_id.nid == id.nid &&
152                     plp->plp_id.pid == id.pid) {
153                         ptllnd_peer_addref(plp);
154                         return plp;
155                 }
156         }
157
158         if (!create)
159                 return NULL;
160
161         /* New peer: check first for enough posted buffers */
162         plni->plni_npeers++;
163         rc = ptllnd_size_buffers(ni, plni->plni_peer_credits);
164         if (rc != 0) {
165                 plni->plni_npeers--;
166                 return NULL;
167         }
168
169         LIBCFS_ALLOC(plp, sizeof(*plp));
170         if (plp == NULL) {
171                 CERROR("Can't allocate new peer %s\n", libcfs_id2str(id));
172                 plni->plni_npeers--;
173                 ptllnd_size_buffers(ni, -plni->plni_peer_credits);
174                 return NULL;
175         }
176
177         plp->plp_ni = ni;
178         plp->plp_id = id;
179         plp->plp_ptlid.nid = LNET_NIDADDR(id.nid);
180         plp->plp_ptlid.pid = plni->plni_ptllnd_pid;
181         plp->plp_credits = 1; /* add more later when she gives me credits */
182         plp->plp_max_msg_size = plni->plni_max_msg_size; /* until I hear from her */
183         plp->plp_sent_credits = 1;              /* Implicit credit for HELLO */
184         plp->plp_outstanding_credits = plni->plni_peer_credits - 1;
185         plp->plp_lazy_credits = 0;
186         plp->plp_extra_lazy_credits = 0;
187         plp->plp_match = 0;
188         plp->plp_stamp = 0;
189         plp->plp_sent_hello = 0;
190         plp->plp_recvd_hello = 0;
191         plp->plp_closing = 0;
192         plp->plp_refcount = 1;
193         CFS_INIT_LIST_HEAD(&plp->plp_list);
194         CFS_INIT_LIST_HEAD(&plp->plp_txq);
195         CFS_INIT_LIST_HEAD(&plp->plp_noopq);
196         CFS_INIT_LIST_HEAD(&plp->plp_activeq);
197
198         ptllnd_peer_addref(plp);
199         list_add_tail(&plp->plp_list, &plni->plni_peer_hash[hash]);
200
201         tx = ptllnd_new_tx(plp, PTLLND_MSG_TYPE_HELLO, 0);
202         if (tx == NULL) {
203                 CERROR("Can't send HELLO to %s\n", libcfs_id2str(id));
204                 ptllnd_close_peer(plp, -ENOMEM);
205                 ptllnd_peer_decref(plp);
206                 return NULL;
207         }
208
209         tx->tx_msg.ptlm_u.hello.kptlhm_matchbits = PTL_RESERVED_MATCHBITS;
210         tx->tx_msg.ptlm_u.hello.kptlhm_max_msg_size = plni->plni_max_msg_size;
211
212         PTLLND_HISTORY("%s[%d/%d+%d(%d)]: post hello %p", libcfs_id2str(id),
213                        tx->tx_peer->plp_credits,
214                        tx->tx_peer->plp_outstanding_credits,
215                        tx->tx_peer->plp_sent_credits,
216                        plni->plni_peer_credits + 
217                        tx->tx_peer->plp_lazy_credits, tx);
218         ptllnd_post_tx(tx);
219
220         return plp;
221 }
222
223 int
224 ptllnd_count_q(struct list_head *q)
225 {
226         struct list_head *e;
227         int               n = 0;
228
229         list_for_each(e, q) {
230                 n++;
231         }
232
233         return n;
234 }
235
236 const char *
237 ptllnd_tx_typestr(int type)
238 {
239         switch (type) {
240         case PTLLND_RDMA_WRITE:
241                 return "rdma_write";
242
243         case PTLLND_RDMA_READ:
244                 return "rdma_read";
245
246         case PTLLND_MSG_TYPE_PUT:
247                 return "put_req";
248
249         case PTLLND_MSG_TYPE_GET:
250                 return "get_req";
251
252         case PTLLND_MSG_TYPE_IMMEDIATE:
253                 return "immediate";
254
255         case PTLLND_MSG_TYPE_NOOP:
256                 return "noop";
257
258         case PTLLND_MSG_TYPE_HELLO:
259                 return "hello";
260
261         default:
262                 return "<unknown>";
263         }
264 }
265
266 void
267 ptllnd_debug_tx(ptllnd_tx_t *tx)
268 {
269         CDEBUG(D_WARNING, "%s %s b %ld.%06ld/%ld.%06ld"
270                " r %ld.%06ld/%ld.%06ld status %d\n",
271                ptllnd_tx_typestr(tx->tx_type),
272                libcfs_id2str(tx->tx_peer->plp_id),
273                tx->tx_bulk_posted.tv_sec, tx->tx_bulk_posted.tv_usec,
274                tx->tx_bulk_done.tv_sec, tx->tx_bulk_done.tv_usec,
275                tx->tx_req_posted.tv_sec, tx->tx_req_posted.tv_usec,
276                tx->tx_req_done.tv_sec, tx->tx_req_done.tv_usec,
277                tx->tx_status);
278 }
279
280 void
281 ptllnd_debug_peer(lnet_ni_t *ni, lnet_process_id_t id)
282 {
283         ptllnd_peer_t    *plp = ptllnd_find_peer(ni, id, 0);
284         ptllnd_ni_t      *plni = ni->ni_data;
285         ptllnd_tx_t      *tx;
286
287         if (plp == NULL) {
288                 CDEBUG(D_WARNING, "No peer %s\n", libcfs_id2str(id));
289                 return;
290         }
291
292         CWARN("%s %s%s [%d] "LPU64".%06d m "LPU64" q %d/%d/%d c %d/%d+%d(%d)\n",
293               libcfs_id2str(id),
294               plp->plp_recvd_hello ? "H" : "_",
295               plp->plp_closing     ? "C" : "_",
296               plp->plp_refcount,
297               plp->plp_stamp / 1000000, (int)(plp->plp_stamp % 1000000),
298               plp->plp_match,
299               ptllnd_count_q(&plp->plp_txq),
300               ptllnd_count_q(&plp->plp_noopq),
301               ptllnd_count_q(&plp->plp_activeq),
302               plp->plp_credits, plp->plp_outstanding_credits, plp->plp_sent_credits,
303               plni->plni_peer_credits + plp->plp_lazy_credits);
304
305         CDEBUG(D_WARNING, "txq:\n");
306         list_for_each_entry (tx, &plp->plp_txq, tx_list) {
307                 ptllnd_debug_tx(tx);
308         }
309
310         CDEBUG(D_WARNING, "noopq:\n");
311         list_for_each_entry (tx, &plp->plp_noopq, tx_list) {
312                 ptllnd_debug_tx(tx);
313         }
314
315         CDEBUG(D_WARNING, "activeq:\n");
316         list_for_each_entry (tx, &plp->plp_activeq, tx_list) {
317                 ptllnd_debug_tx(tx);
318         }
319
320         CDEBUG(D_WARNING, "zombies:\n");
321         list_for_each_entry (tx, &plni->plni_zombie_txs, tx_list) {
322                 if (tx->tx_peer->plp_id.nid == id.nid &&
323                     tx->tx_peer->plp_id.pid == id.pid)
324                         ptllnd_debug_tx(tx);
325         }
326
327         CDEBUG(D_WARNING, "history:\n");
328         list_for_each_entry (tx, &plni->plni_tx_history, tx_list) {
329                 if (tx->tx_peer->plp_id.nid == id.nid &&
330                     tx->tx_peer->plp_id.pid == id.pid)
331                         ptllnd_debug_tx(tx);
332         }
333
334         ptllnd_peer_decref(plp);
335 }
336
337 void
338 ptllnd_dump_debug(lnet_ni_t *ni, lnet_process_id_t id)
339 {
340         ptllnd_debug_peer(ni, id);
341         ptllnd_dump_history();
342 }
343
344 int
345 ptllnd_setasync(lnet_ni_t *ni, lnet_process_id_t id, int nasync)
346 {
347         ptllnd_peer_t *peer = ptllnd_find_peer(ni, id, nasync > 0);
348         int            rc;
349
350         if (peer == NULL)
351                 return -ENOMEM;
352
353         LASSERT (peer->plp_lazy_credits >= 0);
354         LASSERT (peer->plp_extra_lazy_credits >= 0);
355
356         /* If nasync < 0, we're being told we can reduce the total message
357          * headroom.  We can't do this right now because our peer might already
358          * have credits for the extra buffers, so we just account the extra
359          * headroom in case we need it later and only destroy buffers when the
360          * peer closes.
361          *
362          * Note that the following condition handles this case, where it
363          * actually increases the extra lazy credit counter. */
364
365         if (nasync <= peer->plp_extra_lazy_credits) {
366                 peer->plp_extra_lazy_credits -= nasync;
367                 return 0;
368         }
369
370         LASSERT (nasync > 0);
371
372         nasync -= peer->plp_extra_lazy_credits;
373         peer->plp_extra_lazy_credits = 0;
374
375         rc = ptllnd_size_buffers(ni, nasync);
376         if (rc == 0) {
377                 peer->plp_lazy_credits += nasync;
378                 peer->plp_outstanding_credits += nasync;
379         }
380
381         return rc;
382 }
383
384 __u32
385 ptllnd_cksum (void *ptr, int nob)
386 {
387         char  *c  = ptr;
388         __u32  sum = 0;
389
390         while (nob-- > 0)
391                 sum = ((sum << 1) | (sum >> 31)) + *c++;
392
393         /* ensure I don't return 0 (== no checksum) */
394         return (sum == 0) ? 1 : sum;
395 }
396
397 ptllnd_tx_t *
398 ptllnd_new_tx(ptllnd_peer_t *peer, int type, int payload_nob)
399 {
400         lnet_ni_t   *ni = peer->plp_ni;
401         ptllnd_ni_t *plni = ni->ni_data;
402         ptllnd_tx_t *tx;
403         int          msgsize;
404
405         CDEBUG(D_NET, "peer=%p type=%d payload=%d\n", peer, type, payload_nob);
406
407         switch (type) {
408         default:
409                 LBUG();
410
411         case PTLLND_RDMA_WRITE:
412         case PTLLND_RDMA_READ:
413                 LASSERT (payload_nob == 0);
414                 msgsize = 0;
415                 break;
416
417         case PTLLND_MSG_TYPE_PUT:
418         case PTLLND_MSG_TYPE_GET:
419                 LASSERT (payload_nob == 0);
420                 msgsize = offsetof(kptl_msg_t, ptlm_u) + 
421                           sizeof(kptl_rdma_msg_t);
422                 break;
423
424         case PTLLND_MSG_TYPE_IMMEDIATE:
425                 msgsize = offsetof(kptl_msg_t,
426                                    ptlm_u.immediate.kptlim_payload[payload_nob]);
427                 break;
428
429         case PTLLND_MSG_TYPE_NOOP:
430                 LASSERT (payload_nob == 0);
431                 msgsize = offsetof(kptl_msg_t, ptlm_u);
432                 break;
433
434         case PTLLND_MSG_TYPE_HELLO:
435                 LASSERT (payload_nob == 0);
436                 msgsize = offsetof(kptl_msg_t, ptlm_u) +
437                           sizeof(kptl_hello_msg_t);
438                 break;
439         }
440
441         msgsize = (msgsize + 7) & ~7;
442         LASSERT (msgsize <= peer->plp_max_msg_size);
443
444         LIBCFS_ALLOC(tx, offsetof(ptllnd_tx_t, tx_msg) + msgsize);
445
446         if (tx == NULL) {
447                 CERROR("Can't allocate msg type %d for %s\n",
448                        type, libcfs_id2str(peer->plp_id));
449                 return NULL;
450         }
451
452         CFS_INIT_LIST_HEAD(&tx->tx_list);
453         tx->tx_peer = peer;
454         tx->tx_type = type;
455         tx->tx_lnetmsg = tx->tx_lnetreplymsg = NULL;
456         tx->tx_niov = 0;
457         tx->tx_iov = NULL;
458         tx->tx_reqmdh = PTL_INVALID_HANDLE;
459         tx->tx_bulkmdh = PTL_INVALID_HANDLE;
460         tx->tx_msgsize = msgsize;
461         tx->tx_completing = 0;
462         tx->tx_status = 0;
463
464         memset(&tx->tx_bulk_posted, 0, sizeof(tx->tx_bulk_posted));
465         memset(&tx->tx_bulk_done, 0, sizeof(tx->tx_bulk_done));
466         memset(&tx->tx_req_posted, 0, sizeof(tx->tx_req_posted));
467         memset(&tx->tx_req_done, 0, sizeof(tx->tx_req_done));
468
469         if (msgsize != 0) {
470                 tx->tx_msg.ptlm_magic = PTLLND_MSG_MAGIC;
471                 tx->tx_msg.ptlm_version = PTLLND_MSG_VERSION;
472                 tx->tx_msg.ptlm_type = type;
473                 tx->tx_msg.ptlm_credits = 0;
474                 tx->tx_msg.ptlm_nob = msgsize;
475                 tx->tx_msg.ptlm_cksum = 0;
476                 tx->tx_msg.ptlm_srcnid = ni->ni_nid;
477                 tx->tx_msg.ptlm_srcstamp = plni->plni_stamp;
478                 tx->tx_msg.ptlm_dstnid = peer->plp_id.nid;
479                 tx->tx_msg.ptlm_dststamp = peer->plp_stamp;
480                 tx->tx_msg.ptlm_srcpid = the_lnet.ln_pid;
481                 tx->tx_msg.ptlm_dstpid = peer->plp_id.pid;
482         }
483
484         ptllnd_peer_addref(peer);
485         plni->plni_ntxs++;
486
487         CDEBUG(D_NET, "tx=%p\n", tx);
488
489         return tx;
490 }
491
492 void
493 ptllnd_abort_tx(ptllnd_tx_t *tx, ptl_handle_md_t *mdh)
494 {
495         ptllnd_peer_t   *peer = tx->tx_peer;
496         lnet_ni_t       *ni = peer->plp_ni;
497         int              rc;
498         time_t           start = cfs_time_current_sec();
499         ptllnd_ni_t     *plni = ni->ni_data;
500         int              w = plni->plni_long_wait;
501
502         while (!PtlHandleIsEqual(*mdh, PTL_INVALID_HANDLE)) {
503                 rc = PtlMDUnlink(*mdh);
504 #ifndef LUSTRE_PORTALS_UNLINK_SEMANTICS
505                 if (rc == PTL_OK) /* unlink successful => no unlinked event */
506                         return;
507                 LASSERT (rc == PTL_MD_IN_USE);
508 #endif
509                 if (w > 0 && cfs_time_current_sec() > start + w/1000) {
510                         CWARN("Waited %ds to abort tx to %s\n",
511                               (int)(cfs_time_current_sec() - start),
512                               libcfs_id2str(peer->plp_id));
513                         w *= 2;
514                 }
515                 /* Wait for ptllnd_tx_event() to invalidate */
516                 ptllnd_wait(ni, w);
517         }
518 }
519
520 void
521 ptllnd_cull_tx_history(ptllnd_ni_t *plni)
522 {
523         int max = plni->plni_max_tx_history;
524
525         while (plni->plni_ntx_history > max) {
526                 ptllnd_tx_t *tx = list_entry(plni->plni_tx_history.next, 
527                                              ptllnd_tx_t, tx_list);
528                 list_del(&tx->tx_list);
529
530                 ptllnd_peer_decref(tx->tx_peer);
531
532                 LIBCFS_FREE(tx, offsetof(ptllnd_tx_t, tx_msg) + tx->tx_msgsize);
533
534                 LASSERT (plni->plni_ntxs > 0);
535                 plni->plni_ntxs--;
536                 plni->plni_ntx_history--;
537         }
538 }
539
540 void
541 ptllnd_tx_done(ptllnd_tx_t *tx)
542 {
543         ptllnd_peer_t   *peer = tx->tx_peer;
544         lnet_ni_t       *ni = peer->plp_ni;
545         ptllnd_ni_t     *plni = ni->ni_data;
546
547         /* CAVEAT EMPTOR: If this tx is being aborted, I'll continue to get
548          * events for this tx until it's unlinked.  So I set tx_completing to
549          * flag the tx is getting handled */
550
551         if (tx->tx_completing)
552                 return;
553
554         tx->tx_completing = 1;
555
556         if (!list_empty(&tx->tx_list))
557                 list_del_init(&tx->tx_list);
558
559         if (tx->tx_status != 0) {
560                 if (plni->plni_debug) {
561                         CERROR("Completing tx for %s with error %d\n",
562                                libcfs_id2str(peer->plp_id), tx->tx_status);
563                         ptllnd_debug_tx(tx);
564                 }
565                 ptllnd_close_peer(peer, tx->tx_status);
566         }
567
568         ptllnd_abort_tx(tx, &tx->tx_reqmdh);
569         ptllnd_abort_tx(tx, &tx->tx_bulkmdh);
570
571         if (tx->tx_niov > 0) {
572                 LIBCFS_FREE(tx->tx_iov, tx->tx_niov * sizeof(*tx->tx_iov));
573                 tx->tx_niov = 0;
574         }
575
576         if (tx->tx_lnetreplymsg != NULL) {
577                 LASSERT (tx->tx_type == PTLLND_MSG_TYPE_GET);
578                 LASSERT (tx->tx_lnetmsg != NULL);
579                 /* Simulate GET success always  */
580                 lnet_finalize(ni, tx->tx_lnetmsg, 0);
581                 CDEBUG(D_NET, "lnet_finalize(tx_lnetreplymsg=%p)\n",tx->tx_lnetreplymsg);
582                 lnet_finalize(ni, tx->tx_lnetreplymsg, tx->tx_status);
583         } else if (tx->tx_lnetmsg != NULL) {
584                 lnet_finalize(ni, tx->tx_lnetmsg, tx->tx_status);
585         }
586
587         plni->plni_ntx_history++;
588         list_add_tail(&tx->tx_list, &plni->plni_tx_history);
589
590         ptllnd_cull_tx_history(plni);
591 }
592
593 int
594 ptllnd_set_txiov(ptllnd_tx_t *tx,
595                  unsigned int niov, struct iovec *iov,
596                  unsigned int offset, unsigned int len)
597 {
598         ptl_md_iovec_t *piov;
599         int             npiov;
600
601         if (len == 0) {
602                 tx->tx_niov = 0;
603                 return 0;
604         }
605
606         /*
607          * Remove iovec's at the beginning that
608          * are skipped because of the offset.
609          * Adjust the offset accordingly
610          */
611         for (;;) {
612                 LASSERT (niov > 0);
613                 if (offset < iov->iov_len)
614                         break;
615                 offset -= iov->iov_len;
616                 niov--;
617                 iov++;
618         }
619
620         for (;;) {
621                 int temp_offset = offset;
622                 int resid = len;
623                 LIBCFS_ALLOC(piov, niov * sizeof(*piov));
624                 if (piov == NULL)
625                         return -ENOMEM;
626
627                 for (npiov = 0;; npiov++) {
628                         LASSERT (npiov < niov);
629                         LASSERT (iov->iov_len >= temp_offset);
630
631                         piov[npiov].iov_base = iov[npiov].iov_base + temp_offset;
632                         piov[npiov].iov_len = iov[npiov].iov_len - temp_offset;
633
634                         if (piov[npiov].iov_len >= resid) {
635                                 piov[npiov].iov_len = resid;
636                                 npiov++;
637                                 break;
638                         }
639                         resid -= piov[npiov].iov_len;
640                         temp_offset = 0;
641                 }
642
643                 if (npiov == niov) {
644                         tx->tx_niov = niov;
645                         tx->tx_iov = piov;
646                         return 0;
647                 }
648
649                 /* Dang! The piov I allocated was too big and it's a drag to
650                  * have to maintain separate 'allocated' and 'used' sizes, so
651                  * I'll just do it again; NB this doesn't happen normally... */
652                 LIBCFS_FREE(piov, niov * sizeof(*piov));
653                 niov = npiov;
654         }
655 }
656
657 void
658 ptllnd_set_md_buffer(ptl_md_t *md, ptllnd_tx_t *tx)
659 {
660         unsigned int    niov = tx->tx_niov;
661         ptl_md_iovec_t *iov = tx->tx_iov;
662
663         LASSERT ((md->options & PTL_MD_IOVEC) == 0);
664
665         if (niov == 0) {
666                 md->start = NULL;
667                 md->length = 0;
668         } else if (niov == 1) {
669                 md->start = iov[0].iov_base;
670                 md->length = iov[0].iov_len;
671         } else {
672                 md->start = iov;
673                 md->length = niov;
674                 md->options |= PTL_MD_IOVEC;
675         }
676 }
677
678 int
679 ptllnd_post_buffer(ptllnd_buffer_t *buf)
680 {
681         lnet_ni_t        *ni = buf->plb_ni;
682         ptllnd_ni_t      *plni = ni->ni_data;
683         ptl_process_id_t  anyid = {
684                 .nid       = PTL_NID_ANY,
685                 .pid       = PTL_PID_ANY};
686         ptl_md_t          md = {
687                 .start     = buf->plb_buffer,
688                 .length    = plni->plni_buffer_size,
689                 .threshold = PTL_MD_THRESH_INF,
690                 .max_size  = plni->plni_max_msg_size,
691                 .options   = (PTLLND_MD_OPTIONS |
692                               PTL_MD_OP_PUT | PTL_MD_MAX_SIZE | 
693                               PTL_MD_LOCAL_ALIGN8),
694                 .user_ptr  = ptllnd_obj2eventarg(buf, PTLLND_EVENTARG_TYPE_BUF),
695                 .eq_handle = plni->plni_eqh};
696         ptl_handle_me_t meh;
697         int             rc;
698
699         LASSERT (!buf->plb_posted);
700
701         rc = PtlMEAttach(plni->plni_nih, plni->plni_portal,
702                          anyid, LNET_MSG_MATCHBITS, 0,
703                          PTL_UNLINK, PTL_INS_AFTER, &meh);
704         if (rc != PTL_OK) {
705                 CERROR("PtlMEAttach failed: %s(%d)\n",
706                        ptllnd_errtype2str(rc), rc);
707                 return -ENOMEM;
708         }
709
710         buf->plb_posted = 1;
711         plni->plni_nposted_buffers++;
712
713         rc = PtlMDAttach(meh, md, LNET_UNLINK, &buf->plb_md);
714         if (rc == PTL_OK)
715                 return 0;
716
717         CERROR("PtlMDAttach failed: %s(%d)\n",
718                ptllnd_errtype2str(rc), rc);
719
720         buf->plb_posted = 0;
721         plni->plni_nposted_buffers--;
722
723         rc = PtlMEUnlink(meh);
724         LASSERT (rc == PTL_OK);
725
726         return -ENOMEM;
727 }
728
729 static inline int
730 ptllnd_peer_send_noop (ptllnd_peer_t *peer)
731 {
732         ptllnd_ni_t *plni = peer->plp_ni->ni_data;
733
734         if (!peer->plp_sent_hello ||
735             peer->plp_credits == 0 ||
736             !list_empty(&peer->plp_noopq) ||
737             peer->plp_outstanding_credits < PTLLND_CREDIT_HIGHWATER(plni))
738                 return 0;
739
740         /* No tx to piggyback NOOP onto or no credit to send a tx */
741         return (list_empty(&peer->plp_txq) || peer->plp_credits == 1);
742 }
743
744 void
745 ptllnd_check_sends(ptllnd_peer_t *peer)
746 {
747         ptllnd_ni_t    *plni = peer->plp_ni->ni_data;
748         ptllnd_tx_t    *tx;
749         ptl_md_t        md;
750         ptl_handle_md_t mdh;
751         int             rc;
752
753         CDEBUG(D_NET, "%s: [%d/%d+%d(%d)\n",
754                libcfs_id2str(peer->plp_id), peer->plp_credits,
755                peer->plp_outstanding_credits, peer->plp_sent_credits,
756                plni->plni_peer_credits + peer->plp_lazy_credits);
757
758         if (ptllnd_peer_send_noop(peer)) {
759                 tx = ptllnd_new_tx(peer, PTLLND_MSG_TYPE_NOOP, 0);
760                 CDEBUG(D_NET, "NOOP tx=%p\n",tx);
761                 if (tx == NULL) {
762                         CERROR("Can't return credits to %s\n",
763                                libcfs_id2str(peer->plp_id));
764                 } else {
765                         ptllnd_set_tx_deadline(tx);
766                         list_add_tail(&tx->tx_list, &peer->plp_noopq);
767                 }
768         }
769
770         for (;;) {
771                 if (!list_empty(&peer->plp_noopq)) {
772                         LASSERT (peer->plp_sent_hello);
773                         tx = list_entry(peer->plp_noopq.next,
774                                         ptllnd_tx_t, tx_list);
775                 } else if (!list_empty(&peer->plp_txq)) {
776                         tx = list_entry(peer->plp_txq.next,
777                                         ptllnd_tx_t, tx_list);
778                 } else {
779                         /* nothing to send right now */
780                         break;
781                 }
782
783                 LASSERT (tx->tx_msgsize > 0);
784
785                 LASSERT (peer->plp_outstanding_credits >= 0);
786                 LASSERT (peer->plp_sent_credits >= 0);
787                 LASSERT (peer->plp_outstanding_credits + peer->plp_sent_credits
788                          <= plni->plni_peer_credits + peer->plp_lazy_credits);
789                 LASSERT (peer->plp_credits >= 0);
790
791                 /* say HELLO first */
792                 if (!peer->plp_sent_hello) {
793                         LASSERT (list_empty(&peer->plp_noopq));
794                         LASSERT (tx->tx_type == PTLLND_MSG_TYPE_HELLO);
795
796                         peer->plp_sent_hello = 1;
797                 }
798
799                 if (peer->plp_credits == 0) {   /* no credits */
800                         PTLLND_HISTORY("%s[%d/%d+%d(%d)]: no creds for %p",
801                                        libcfs_id2str(peer->plp_id),
802                                        peer->plp_credits,
803                                        peer->plp_outstanding_credits,
804                                        peer->plp_sent_credits,
805                                        plni->plni_peer_credits +
806                                        peer->plp_lazy_credits, tx);
807                         break;
808                 }
809
810                 /* Last/Initial credit reserved for NOOP/HELLO */
811                 if (peer->plp_credits == 1 &&
812                     tx->tx_type != PTLLND_MSG_TYPE_NOOP &&
813                     tx->tx_type != PTLLND_MSG_TYPE_HELLO) {
814                         PTLLND_HISTORY("%s[%d/%d+%d(%d)]: too few creds for %p",
815                                        libcfs_id2str(peer->plp_id),
816                                        peer->plp_credits,
817                                        peer->plp_outstanding_credits,
818                                        peer->plp_sent_credits,
819                                        plni->plni_peer_credits +
820                                        peer->plp_lazy_credits, tx);
821                         break;
822                 }
823
824                 list_del(&tx->tx_list);
825                 list_add_tail(&tx->tx_list, &peer->plp_activeq);
826
827                 CDEBUG(D_NET, "Sending at TX=%p type=%s (%d)\n",tx,
828                        ptllnd_msgtype2str(tx->tx_type),tx->tx_type);
829
830                 if (tx->tx_type == PTLLND_MSG_TYPE_NOOP &&
831                     !ptllnd_peer_send_noop(peer)) {
832                         /* redundant NOOP */
833                         ptllnd_tx_done(tx);
834                         continue;
835                 }
836
837                 /* Set stamp at the last minute; on a new peer, I don't know it
838                  * until I receive the HELLO back */
839                 tx->tx_msg.ptlm_dststamp = peer->plp_stamp;
840
841                 /*
842                  * Return all the credits we have
843                  */
844                 tx->tx_msg.ptlm_credits = MIN(PTLLND_MSG_MAX_CREDITS,
845                                               peer->plp_outstanding_credits);
846                 peer->plp_sent_credits += tx->tx_msg.ptlm_credits;
847                 peer->plp_outstanding_credits -= tx->tx_msg.ptlm_credits;
848
849                 /*
850                  * One less credit
851                  */
852                 peer->plp_credits--;
853
854                 if (plni->plni_checksum)
855                         tx->tx_msg.ptlm_cksum = 
856                                 ptllnd_cksum(&tx->tx_msg,
857                                              offsetof(kptl_msg_t, ptlm_u));
858
859                 md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);
860                 md.eq_handle = plni->plni_eqh;
861                 md.threshold = 1;
862                 md.options = PTLLND_MD_OPTIONS;
863                 md.start = &tx->tx_msg;
864                 md.length = tx->tx_msgsize;
865
866                 rc = PtlMDBind(plni->plni_nih, md, LNET_UNLINK, &mdh);
867                 if (rc != PTL_OK) {
868                         CERROR("PtlMDBind for %s failed: %s(%d)\n",
869                                libcfs_id2str(peer->plp_id),
870                                ptllnd_errtype2str(rc), rc);
871                         tx->tx_status = -EIO;
872                         ptllnd_tx_done(tx);
873                         break;
874                 }
875
876                 LASSERT (tx->tx_type != PTLLND_RDMA_WRITE &&
877                          tx->tx_type != PTLLND_RDMA_READ);
878
879                 tx->tx_reqmdh = mdh;
880                 gettimeofday(&tx->tx_req_posted, NULL);
881
882                 PTLLND_HISTORY("%s[%d/%d+%d(%d)]: %s %p c %d",
883                                libcfs_id2str(peer->plp_id),
884                                peer->plp_credits,
885                                peer->plp_outstanding_credits,
886                                peer->plp_sent_credits,
887                                plni->plni_peer_credits +
888                                peer->plp_lazy_credits,
889                                ptllnd_msgtype2str(tx->tx_type), tx,
890                                tx->tx_msg.ptlm_credits);
891
892                 rc = PtlPut(mdh, PTL_NOACK_REQ, peer->plp_ptlid,
893                             plni->plni_portal, 0, LNET_MSG_MATCHBITS, 0, 0);
894                 if (rc != PTL_OK) {
895                         CERROR("PtlPut for %s failed: %s(%d)\n",
896                                libcfs_id2str(peer->plp_id),
897                                ptllnd_errtype2str(rc), rc);
898                         tx->tx_status = -EIO;
899                         ptllnd_tx_done(tx);
900                         break;
901                 }
902         }
903 }
904
905 int
906 ptllnd_passive_rdma(ptllnd_peer_t *peer, int type, lnet_msg_t *msg,
907                     unsigned int niov, struct iovec *iov,
908                     unsigned int offset, unsigned int len)
909 {
910         lnet_ni_t      *ni = peer->plp_ni;
911         ptllnd_ni_t    *plni = ni->ni_data;
912         ptllnd_tx_t    *tx = ptllnd_new_tx(peer, type, 0);
913         __u64           matchbits;
914         ptl_md_t        md;
915         ptl_handle_md_t mdh;
916         ptl_handle_me_t meh;
917         int             rc;
918         int             rc2;
919         time_t          start;
920         int             w;
921
922         CDEBUG(D_NET, "niov=%d offset=%d len=%d\n",niov,offset,len);
923
924         LASSERT (type == PTLLND_MSG_TYPE_GET ||
925                  type == PTLLND_MSG_TYPE_PUT);
926
927         if (tx == NULL) {
928                 CERROR("Can't allocate %s tx for %s\n",
929                        ptllnd_msgtype2str(type), libcfs_id2str(peer->plp_id));
930                 return -ENOMEM;
931         }
932
933         rc = ptllnd_set_txiov(tx, niov, iov, offset, len);
934         if (rc != 0) {
935                 CERROR("Can't allocate iov %d for %s\n",
936                        niov, libcfs_id2str(peer->plp_id));
937                 rc = -ENOMEM;
938                 goto failed;
939         }
940
941         md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);
942         md.eq_handle = plni->plni_eqh;
943         md.threshold = 1;
944         md.max_size = 0;
945         md.options = PTLLND_MD_OPTIONS;
946         if(type == PTLLND_MSG_TYPE_GET)
947                 md.options |= PTL_MD_OP_PUT | PTL_MD_ACK_DISABLE;
948         else
949                 md.options |= PTL_MD_OP_GET;
950         ptllnd_set_md_buffer(&md, tx);
951
952         start = cfs_time_current_sec();
953         w = plni->plni_long_wait;
954         ptllnd_set_tx_deadline(tx);
955
956         while (!peer->plp_recvd_hello) {    /* wait to validate plp_match */
957                 if (peer->plp_closing) {
958                         rc = -EIO;
959                         goto failed;
960                 }
961
962                 /* NB must check here to avoid unbounded wait - tx not yet
963                  * on peer->plp_txq, so ptllnd_watchdog can't expire it */
964                 if (tx->tx_deadline < cfs_time_current_sec()) {
965                         CERROR("%s tx for %s timed out\n",
966                                ptllnd_msgtype2str(type),
967                                libcfs_id2str(peer->plp_id));
968                         rc = -ETIMEDOUT;
969                         goto failed;
970                 }
971
972                 if (w > 0 && cfs_time_current_sec() > start + w/1000) {
973                         CWARN("Waited %ds to connect to %s\n",
974                               (int)(cfs_time_current_sec() - start),
975                               libcfs_id2str(peer->plp_id));
976                         w *= 2;
977                 }
978                 ptllnd_wait(ni, w);
979         }
980
981         if (peer->plp_match < PTL_RESERVED_MATCHBITS)
982                 peer->plp_match = PTL_RESERVED_MATCHBITS;
983         matchbits = peer->plp_match++;
984
985         rc = PtlMEAttach(plni->plni_nih, plni->plni_portal, peer->plp_ptlid,
986                          matchbits, 0, PTL_UNLINK, PTL_INS_BEFORE, &meh);
987         if (rc != PTL_OK) {
988                 CERROR("PtlMEAttach for %s failed: %s(%d)\n",
989                        libcfs_id2str(peer->plp_id),
990                        ptllnd_errtype2str(rc), rc);
991                 rc = -EIO;
992                 goto failed;
993         }
994
995         gettimeofday(&tx->tx_bulk_posted, NULL);
996
997         rc = PtlMDAttach(meh, md, LNET_UNLINK, &mdh);
998         if (rc != PTL_OK) {
999                 CERROR("PtlMDAttach for %s failed: %s(%d)\n",
1000                        libcfs_id2str(peer->plp_id),
1001                        ptllnd_errtype2str(rc), rc);
1002                 rc2 = PtlMEUnlink(meh);
1003                 LASSERT (rc2 == PTL_OK);
1004                 rc = -EIO;
1005                 goto failed;
1006         }
1007         tx->tx_bulkmdh = mdh;
1008
1009         /*
1010          * We need to set the stamp here because it
1011          * we could have received a HELLO above that set
1012          * peer->plp_stamp
1013          */
1014         tx->tx_msg.ptlm_dststamp = peer->plp_stamp;
1015
1016         tx->tx_msg.ptlm_u.rdma.kptlrm_hdr = msg->msg_hdr;
1017         tx->tx_msg.ptlm_u.rdma.kptlrm_matchbits = matchbits;
1018
1019         if (type == PTLLND_MSG_TYPE_GET) {
1020                 tx->tx_lnetreplymsg = lnet_create_reply_msg(ni, msg);
1021                 if (tx->tx_lnetreplymsg == NULL) {
1022                         CERROR("Can't create reply for GET to %s\n",
1023                                libcfs_id2str(msg->msg_target));
1024                         rc = -ENOMEM;
1025                         goto failed;
1026                 }
1027         }
1028
1029         tx->tx_lnetmsg = msg;
1030         PTLLND_HISTORY("%s[%d/%d+%d(%d)]: post passive %s p %d %p",
1031                        libcfs_id2str(msg->msg_target),
1032                        peer->plp_credits, peer->plp_outstanding_credits,
1033                        peer->plp_sent_credits,
1034                        plni->plni_peer_credits + peer->plp_lazy_credits,
1035                        lnet_msgtyp2str(msg->msg_type),
1036                        (le32_to_cpu(msg->msg_type) == LNET_MSG_PUT) ? 
1037                        le32_to_cpu(msg->msg_hdr.msg.put.ptl_index) :
1038                        (le32_to_cpu(msg->msg_type) == LNET_MSG_GET) ? 
1039                        le32_to_cpu(msg->msg_hdr.msg.get.ptl_index) : -1,
1040                        tx);
1041         ptllnd_post_tx(tx);
1042         return 0;
1043
1044  failed:
1045         tx->tx_status = rc;
1046         ptllnd_tx_done(tx);
1047         return rc;
1048 }
1049
1050 int
1051 ptllnd_active_rdma(ptllnd_peer_t *peer, int type,
1052                    lnet_msg_t *msg, __u64 matchbits,
1053                    unsigned int niov, struct iovec *iov,
1054                    unsigned int offset, unsigned int len)
1055 {
1056         lnet_ni_t       *ni = peer->plp_ni;
1057         ptllnd_ni_t     *plni = ni->ni_data;
1058         ptllnd_tx_t     *tx = ptllnd_new_tx(peer, type, 0);
1059         ptl_md_t         md;
1060         ptl_handle_md_t  mdh;
1061         int              rc;
1062
1063         LASSERT (type == PTLLND_RDMA_READ ||
1064                  type == PTLLND_RDMA_WRITE);
1065
1066         if (tx == NULL) {
1067                 CERROR("Can't allocate tx for RDMA %s with %s\n",
1068                        (type == PTLLND_RDMA_WRITE) ? "write" : "read",
1069                        libcfs_id2str(peer->plp_id));
1070                 ptllnd_close_peer(peer, -ENOMEM);
1071                 return -ENOMEM;
1072         }
1073
1074         rc = ptllnd_set_txiov(tx, niov, iov, offset, len);
1075         if (rc != 0) {
1076                 CERROR("Can't allocate iov %d for %s\n",
1077                        niov, libcfs_id2str(peer->plp_id));
1078                 rc = -ENOMEM;
1079                 goto failed;
1080         }
1081
1082         md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);
1083         md.eq_handle = plni->plni_eqh;
1084         md.max_size = 0;
1085         md.options = PTLLND_MD_OPTIONS;
1086         md.threshold = (type == PTLLND_RDMA_READ) ? 2 : 1;
1087
1088         ptllnd_set_md_buffer(&md, tx);
1089
1090         rc = PtlMDBind(plni->plni_nih, md, LNET_UNLINK, &mdh);
1091         if (rc != PTL_OK) {
1092                 CERROR("PtlMDBind for %s failed: %s(%d)\n",
1093                        libcfs_id2str(peer->plp_id),
1094                        ptllnd_errtype2str(rc), rc);
1095                 rc = -EIO;
1096                 goto failed;
1097         }
1098
1099         tx->tx_bulkmdh = mdh;
1100         tx->tx_lnetmsg = msg;
1101
1102         ptllnd_set_tx_deadline(tx);
1103         list_add_tail(&tx->tx_list, &peer->plp_activeq);
1104         gettimeofday(&tx->tx_bulk_posted, NULL);
1105
1106         if (type == PTLLND_RDMA_READ)
1107                 rc = PtlGet(mdh, peer->plp_ptlid,
1108                             plni->plni_portal, 0, matchbits, 0);
1109         else
1110                 rc = PtlPut(mdh, PTL_NOACK_REQ, peer->plp_ptlid,
1111                             plni->plni_portal, 0, matchbits, 0, 
1112                             (msg == NULL) ? PTLLND_RDMA_FAIL : PTLLND_RDMA_OK);
1113
1114         if (rc == PTL_OK)
1115                 return 0;
1116
1117         CERROR("Can't initiate RDMA with %s: %s(%d)\n",
1118                libcfs_id2str(peer->plp_id),
1119                ptllnd_errtype2str(rc), rc);
1120
1121         tx->tx_lnetmsg = NULL;
1122  failed:
1123         tx->tx_status = rc;
1124         ptllnd_tx_done(tx);    /* this will close peer */
1125         return rc;
1126 }
1127
1128 int
1129 ptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *msg)
1130 {
1131         ptllnd_ni_t    *plni = ni->ni_data;
1132         ptllnd_peer_t  *plp;
1133         ptllnd_tx_t    *tx;
1134         int             nob;
1135         int             rc;
1136
1137         LASSERT (!msg->msg_routing);
1138         LASSERT (msg->msg_kiov == NULL);
1139
1140         LASSERT (msg->msg_niov <= PTL_MD_MAX_IOV); /* !!! */
1141
1142         CDEBUG(D_NET, "%s [%d]+%d,%d -> %s%s\n",
1143                lnet_msgtyp2str(msg->msg_type),
1144                msg->msg_niov, msg->msg_offset, msg->msg_len,
1145                libcfs_nid2str(msg->msg_target.nid),
1146                msg->msg_target_is_router ? "(rtr)" : "");
1147
1148         if ((msg->msg_target.pid & LNET_PID_USERFLAG) != 0) {
1149                 CERROR("Can't send to non-kernel peer %s\n",
1150                        libcfs_id2str(msg->msg_target));
1151                 return -EHOSTUNREACH;
1152         }
1153
1154         plp = ptllnd_find_peer(ni, msg->msg_target, 1);
1155         if (plp == NULL)
1156                 return -ENOMEM;
1157
1158         switch (msg->msg_type) {
1159         default:
1160                 LBUG();
1161
1162         case LNET_MSG_ACK:
1163                 LASSERT (msg->msg_len == 0);
1164                 break;                          /* send IMMEDIATE */
1165
1166         case LNET_MSG_GET:
1167                 if (msg->msg_target_is_router)
1168                         break;                  /* send IMMEDIATE */
1169
1170                 nob = msg->msg_md->md_length;
1171                 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[nob]);
1172                 if (nob <= plni->plni_max_msg_size)
1173                         break;
1174
1175                 LASSERT ((msg->msg_md->md_options & LNET_MD_KIOV) == 0);
1176                 rc = ptllnd_passive_rdma(plp, PTLLND_MSG_TYPE_GET, msg,
1177                                          msg->msg_md->md_niov,
1178                                          msg->msg_md->md_iov.iov,
1179                                          0, msg->msg_md->md_length);
1180                 ptllnd_peer_decref(plp);
1181                 return rc;
1182
1183         case LNET_MSG_REPLY:
1184         case LNET_MSG_PUT:
1185                 nob = msg->msg_len;
1186                 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[nob]);
1187                 if (nob <= plp->plp_max_msg_size)
1188                         break;                  /* send IMMEDIATE */
1189
1190                 rc = ptllnd_passive_rdma(plp, PTLLND_MSG_TYPE_PUT, msg,
1191                                          msg->msg_niov, msg->msg_iov,
1192                                          msg->msg_offset, msg->msg_len);
1193                 ptllnd_peer_decref(plp);
1194                 return rc;
1195         }
1196
1197         /* send IMMEDIATE
1198          * NB copy the payload so we don't have to do a fragmented send */
1199
1200         tx = ptllnd_new_tx(plp, PTLLND_MSG_TYPE_IMMEDIATE, msg->msg_len);
1201         if (tx == NULL) {
1202                 CERROR("Can't allocate tx for lnet type %d to %s\n",
1203                        msg->msg_type, libcfs_id2str(msg->msg_target));
1204                 ptllnd_peer_decref(plp);
1205                 return -ENOMEM;
1206         }
1207
1208         lnet_copy_iov2flat(tx->tx_msgsize, &tx->tx_msg,
1209                            offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload),
1210                            msg->msg_niov, msg->msg_iov, msg->msg_offset,
1211                            msg->msg_len);
1212         tx->tx_msg.ptlm_u.immediate.kptlim_hdr = msg->msg_hdr;
1213
1214         tx->tx_lnetmsg = msg;
1215         PTLLND_HISTORY("%s[%d/%d+%d(%d)]: post immediate %s p %d %p",
1216                        libcfs_id2str(msg->msg_target),
1217                        plp->plp_credits, plp->plp_outstanding_credits,
1218                        plp->plp_sent_credits,
1219                        plni->plni_peer_credits + plp->plp_lazy_credits,
1220                        lnet_msgtyp2str(msg->msg_type),
1221                        (le32_to_cpu(msg->msg_type) == LNET_MSG_PUT) ? 
1222                        le32_to_cpu(msg->msg_hdr.msg.put.ptl_index) :
1223                        (le32_to_cpu(msg->msg_type) == LNET_MSG_GET) ? 
1224                        le32_to_cpu(msg->msg_hdr.msg.get.ptl_index) : -1,
1225                        tx);
1226         ptllnd_post_tx(tx);
1227         ptllnd_peer_decref(plp);
1228         return 0;
1229 }
1230
1231 void
1232 ptllnd_rx_done(ptllnd_rx_t *rx)
1233 {
1234         ptllnd_peer_t *plp = rx->rx_peer;
1235         ptllnd_ni_t   *plni = plp->plp_ni->ni_data;
1236
1237         plp->plp_outstanding_credits++;
1238
1239         PTLLND_HISTORY("%s[%d/%d+%d(%d)]: rx=%p done\n",
1240                        libcfs_id2str(plp->plp_id),
1241                        plp->plp_credits, plp->plp_outstanding_credits, 
1242                        plp->plp_sent_credits,
1243                        plni->plni_peer_credits + plp->plp_lazy_credits, rx);
1244
1245         ptllnd_check_sends(plp);
1246
1247         LASSERT (plni->plni_nrxs > 0);
1248         plni->plni_nrxs--;
1249 }
1250
1251 int
1252 ptllnd_eager_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg,
1253                   void **new_privatep)
1254 {
1255         /* Shouldn't get here; recvs only block for router buffers */
1256         LBUG();
1257         return 0;
1258 }
1259
1260 int
1261 ptllnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg,
1262             int delayed, unsigned int niov,
1263             struct iovec *iov, lnet_kiov_t *kiov,
1264             unsigned int offset, unsigned int mlen, unsigned int rlen)
1265 {
1266         ptllnd_rx_t    *rx = private;
1267         int             rc = 0;
1268         int             nob;
1269
1270         LASSERT (kiov == NULL);
1271         LASSERT (niov <= PTL_MD_MAX_IOV);       /* !!! */
1272
1273         switch (rx->rx_msg->ptlm_type) {
1274         default:
1275                 LBUG();
1276
1277         case PTLLND_MSG_TYPE_IMMEDIATE:
1278                 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[mlen]);
1279                 if (nob > rx->rx_nob) {
1280                         CERROR("Immediate message from %s too big: %d(%d)\n",
1281                                libcfs_id2str(rx->rx_peer->plp_id),
1282                                nob, rx->rx_nob);
1283                         rc = -EPROTO;
1284                         break;
1285                 }
1286                 lnet_copy_flat2iov(niov, iov, offset,
1287                                    rx->rx_nob, rx->rx_msg,
1288                                    offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload),
1289                                    mlen);
1290                 lnet_finalize(ni, msg, 0);
1291                 break;
1292
1293         case PTLLND_MSG_TYPE_PUT:
1294                 rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_READ, msg,
1295                                         rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,
1296                                         niov, iov, offset, mlen);
1297                 break;
1298
1299         case PTLLND_MSG_TYPE_GET:
1300                 if (msg != NULL)
1301                         rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_WRITE, msg,
1302                                                 rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,
1303                                                 msg->msg_niov, msg->msg_iov,
1304                                                 msg->msg_offset, msg->msg_len);
1305                 else
1306                         rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_WRITE, NULL,
1307                                                 rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,
1308                                                 0, NULL, 0, 0);
1309                 break;
1310         }
1311
1312         ptllnd_rx_done(rx);
1313         return rc;
1314 }
1315
1316 void
1317 ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator,
1318                      kptl_msg_t *msg, unsigned int nob)
1319 {
1320         ptllnd_ni_t      *plni = ni->ni_data;
1321         const int         basenob = offsetof(kptl_msg_t, ptlm_u);
1322         lnet_process_id_t srcid;
1323         ptllnd_rx_t       rx;
1324         int               flip;
1325         __u16             msg_version;
1326         __u32             msg_cksum;
1327         ptllnd_peer_t    *plp;
1328         int               rc;
1329
1330         if (nob < 6) {
1331                 CERROR("Very short receive from %s\n",
1332                        ptllnd_ptlid2str(initiator));
1333                 return;
1334         }
1335
1336         /* I can at least read MAGIC/VERSION */
1337
1338         flip = msg->ptlm_magic == __swab32(PTLLND_MSG_MAGIC);
1339         if (!flip && msg->ptlm_magic != PTLLND_MSG_MAGIC) {
1340                 CERROR("Bad protocol magic %08x from %s\n", 
1341                        msg->ptlm_magic, ptllnd_ptlid2str(initiator));
1342                 return;
1343         }
1344
1345         msg_version = flip ? __swab16(msg->ptlm_version) : msg->ptlm_version;
1346
1347         if (msg_version != PTLLND_MSG_VERSION) {
1348                 CERROR("Bad protocol version %04x from %s: %04x expected\n",
1349                        (__u32)msg_version, ptllnd_ptlid2str(initiator), PTLLND_MSG_VERSION);
1350
1351                 if (plni->plni_abort_on_protocol_mismatch)
1352                         abort();
1353
1354                 return;
1355         }
1356
1357         if (nob < basenob) {
1358                 CERROR("Short receive from %s: got %d, wanted at least %d\n",
1359                        ptllnd_ptlid2str(initiator), nob, basenob);
1360                 return;
1361         }
1362
1363         /* checksum must be computed with
1364          * 1) ptlm_cksum zero and
1365          * 2) BEFORE anything gets modified/flipped
1366          */
1367         msg_cksum = flip ? __swab32(msg->ptlm_cksum) : msg->ptlm_cksum;
1368         msg->ptlm_cksum = 0;
1369         if (msg_cksum != 0 &&
1370             msg_cksum != ptllnd_cksum(msg, offsetof(kptl_msg_t, ptlm_u))) {
1371                 CERROR("Bad checksum from %s\n", ptllnd_ptlid2str(initiator));
1372                 return;
1373         }
1374
1375         msg->ptlm_version = msg_version;
1376         msg->ptlm_cksum = msg_cksum;
1377
1378         if (flip) {
1379                 /* NB stamps are opaque cookies */
1380                 __swab32s(&msg->ptlm_nob);
1381                 __swab64s(&msg->ptlm_srcnid);
1382                 __swab64s(&msg->ptlm_dstnid);
1383                 __swab32s(&msg->ptlm_srcpid);
1384                 __swab32s(&msg->ptlm_dstpid);
1385         }
1386
1387         srcid.nid = msg->ptlm_srcnid;
1388         srcid.pid = msg->ptlm_srcpid;
1389
1390         if (LNET_NIDNET(msg->ptlm_srcnid) != LNET_NIDNET(ni->ni_nid)) {
1391                 CERROR("Bad source id %s from %s\n",
1392                        libcfs_id2str(srcid),
1393                        ptllnd_ptlid2str(initiator));
1394                 return;
1395         }
1396
1397         if (msg->ptlm_type == PTLLND_MSG_TYPE_NAK) {
1398                 CERROR("NAK from %s (%s)\n",
1399                        libcfs_id2str(srcid),
1400                        ptllnd_ptlid2str(initiator));
1401
1402                 if (plni->plni_dump_on_nak)
1403                         ptllnd_dump_debug(ni, srcid);
1404
1405                 if (plni->plni_abort_on_nak)
1406                         abort();
1407
1408                 plp = ptllnd_find_peer(ni, srcid, 0);
1409                 if (plp == NULL) {
1410                         CERROR("Ignore NAK from %s: no peer\n", libcfs_id2str(srcid));
1411                         return;
1412                 }
1413                 ptllnd_close_peer(plp, -EPROTO);
1414                 ptllnd_peer_decref(plp);
1415                 return;
1416         }
1417
1418         if (msg->ptlm_dstnid != ni->ni_nid ||
1419             msg->ptlm_dstpid != the_lnet.ln_pid) {
1420                 CERROR("Bad dstid %s (%s expected) from %s\n",
1421                        libcfs_id2str((lnet_process_id_t) {
1422                                .nid = msg->ptlm_dstnid,
1423                                .pid = msg->ptlm_dstpid}),
1424                        libcfs_id2str((lnet_process_id_t) {
1425                                .nid = ni->ni_nid,
1426                                .pid = the_lnet.ln_pid}),
1427                        libcfs_id2str(srcid));
1428                 return;
1429         }
1430
1431         if (msg->ptlm_dststamp != plni->plni_stamp) {
1432                 CERROR("Bad dststamp "LPX64"("LPX64" expected) from %s\n",
1433                        msg->ptlm_dststamp, plni->plni_stamp,
1434                        libcfs_id2str(srcid));
1435                 return;
1436         }
1437
1438         PTLLND_HISTORY("RX %s: %s %d %p", libcfs_id2str(srcid), 
1439                        ptllnd_msgtype2str(msg->ptlm_type),
1440                        msg->ptlm_credits, &rx);
1441
1442         switch (msg->ptlm_type) {
1443         case PTLLND_MSG_TYPE_PUT:
1444         case PTLLND_MSG_TYPE_GET:
1445                 if (nob < basenob + sizeof(kptl_rdma_msg_t)) {
1446                         CERROR("Short rdma request from %s(%s)\n",
1447                                libcfs_id2str(srcid),
1448                                ptllnd_ptlid2str(initiator));
1449                         return;
1450                 }
1451                 if (flip)
1452                         __swab64s(&msg->ptlm_u.rdma.kptlrm_matchbits);
1453                 break;
1454
1455         case PTLLND_MSG_TYPE_IMMEDIATE:
1456                 if (nob < offsetof(kptl_msg_t,
1457                                    ptlm_u.immediate.kptlim_payload)) {
1458                         CERROR("Short immediate from %s(%s)\n",
1459                                libcfs_id2str(srcid),
1460                                ptllnd_ptlid2str(initiator));
1461                         return;
1462                 }
1463                 break;
1464
1465         case PTLLND_MSG_TYPE_HELLO:
1466                 if (nob < basenob + sizeof(kptl_hello_msg_t)) {
1467                         CERROR("Short hello from %s(%s)\n",
1468                                libcfs_id2str(srcid),
1469                                ptllnd_ptlid2str(initiator));
1470                         return;
1471                 }
1472                 if(flip){
1473                         __swab64s(&msg->ptlm_u.hello.kptlhm_matchbits);
1474                         __swab32s(&msg->ptlm_u.hello.kptlhm_max_msg_size);
1475                 }
1476                 break;
1477
1478         case PTLLND_MSG_TYPE_NOOP:
1479                 break;
1480
1481         default:
1482                 CERROR("Bad message type %d from %s(%s)\n", msg->ptlm_type,
1483                        libcfs_id2str(srcid),
1484                        ptllnd_ptlid2str(initiator));
1485                 return;
1486         }
1487
1488         plp = ptllnd_find_peer(ni, srcid, 0);
1489         if (plp == NULL) {
1490                 CERROR("Can't find peer %s\n", libcfs_id2str(srcid));
1491                 return;
1492         }
1493
1494         if (msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {
1495                 if (plp->plp_recvd_hello) {
1496                         CERROR("Unexpected HELLO from %s\n",
1497                                libcfs_id2str(srcid));
1498                         ptllnd_peer_decref(plp);
1499                         return;
1500                 }
1501
1502                 plp->plp_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size;
1503                 plp->plp_match = msg->ptlm_u.hello.kptlhm_matchbits;
1504                 plp->plp_stamp = msg->ptlm_srcstamp;
1505                 plp->plp_recvd_hello = 1;
1506
1507         } else if (!plp->plp_recvd_hello) {
1508
1509                 CERROR("Bad message type %d (HELLO expected) from %s\n",
1510                        msg->ptlm_type, libcfs_id2str(srcid));
1511                 ptllnd_peer_decref(plp);
1512                 return;
1513
1514         } else if (msg->ptlm_srcstamp != plp->plp_stamp) {
1515
1516                 CERROR("Bad srcstamp "LPX64"("LPX64" expected) from %s\n",
1517                        msg->ptlm_srcstamp, plp->plp_stamp,
1518                        libcfs_id2str(srcid));
1519                 ptllnd_peer_decref(plp);
1520                 return;
1521         }
1522
1523         /* Check peer only sends when I've sent her credits */
1524         if (plp->plp_sent_credits == 0) {
1525                 CERROR("%s[%d/%d+%d(%d)]: unexpected message\n",
1526                        libcfs_id2str(plp->plp_id),
1527                        plp->plp_credits, plp->plp_outstanding_credits,
1528                        plp->plp_sent_credits,
1529                        plni->plni_peer_credits + plp->plp_lazy_credits);
1530                 return;
1531         }
1532         plp->plp_sent_credits--;
1533
1534         /* No check for credit overflow - the peer may post new buffers after
1535          * the startup handshake. */
1536         plp->plp_credits += msg->ptlm_credits;
1537
1538         /* All OK so far; assume the message is good... */
1539
1540         rx.rx_peer      = plp;
1541         rx.rx_msg       = msg;
1542         rx.rx_nob       = nob;
1543         plni->plni_nrxs++;
1544
1545         switch (msg->ptlm_type) {
1546         default: /* message types have been checked already */
1547                 ptllnd_rx_done(&rx);
1548                 break;
1549
1550         case PTLLND_MSG_TYPE_PUT:
1551         case PTLLND_MSG_TYPE_GET:
1552                 rc = lnet_parse(ni, &msg->ptlm_u.rdma.kptlrm_hdr,
1553                                 msg->ptlm_srcnid, &rx, 1);
1554                 if (rc < 0)
1555                         ptllnd_rx_done(&rx);
1556                 break;
1557
1558         case PTLLND_MSG_TYPE_IMMEDIATE:
1559                 rc = lnet_parse(ni, &msg->ptlm_u.immediate.kptlim_hdr,
1560                                 msg->ptlm_srcnid, &rx, 0);
1561                 if (rc < 0)
1562                         ptllnd_rx_done(&rx);
1563                 break;
1564         }
1565
1566         if (msg->ptlm_credits > 0)
1567                 ptllnd_check_sends(plp);
1568
1569         ptllnd_peer_decref(plp);
1570 }
1571
1572 void
1573 ptllnd_buf_event (lnet_ni_t *ni, ptl_event_t *event)
1574 {
1575         ptllnd_buffer_t *buf = ptllnd_eventarg2obj(event->md.user_ptr);
1576         ptllnd_ni_t     *plni = ni->ni_data;
1577         char            *msg = &buf->plb_buffer[event->offset];
1578         int              repost;
1579         int              unlinked = event->type == PTL_EVENT_UNLINK;
1580
1581         LASSERT (buf->plb_ni == ni);
1582         LASSERT (event->type == PTL_EVENT_PUT_END ||
1583                  event->type == PTL_EVENT_UNLINK);
1584
1585         if (event->ni_fail_type != PTL_NI_OK) {
1586
1587                 CERROR("event type %s(%d), status %s(%d) from %s\n",
1588                        ptllnd_evtype2str(event->type), event->type,
1589                        ptllnd_errtype2str(event->ni_fail_type), 
1590                        event->ni_fail_type,
1591                        ptllnd_ptlid2str(event->initiator));
1592
1593         } else if (event->type == PTL_EVENT_PUT_END) {
1594 #if (PTL_MD_LOCAL_ALIGN8 == 0)
1595                 /* Portals can't force message alignment - someone sending an
1596                  * odd-length message could misalign subsequent messages */
1597                 if ((event->mlength & 7) != 0) {
1598                         CERROR("Message from %s has odd length "LPU64
1599                                " probable version incompatibility\n",
1600                                ptllnd_ptlid2str(event->initiator),
1601                                event->mlength);
1602                         LBUG();
1603                 }
1604 #endif
1605                 LASSERT ((event->offset & 7) == 0);
1606
1607                 ptllnd_parse_request(ni, event->initiator,
1608                                      (kptl_msg_t *)msg, event->mlength);
1609         }
1610
1611 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
1612         /* UNLINK event only on explicit unlink */
1613         repost = (event->unlinked && event->type != PTL_EVENT_UNLINK);
1614         if (event->unlinked)
1615                 unlinked = 1;
1616 #else
1617         /* UNLINK event only on implicit unlink */
1618         repost = (event->type == PTL_EVENT_UNLINK);
1619 #endif
1620
1621         if (unlinked) {
1622                 LASSERT(buf->plb_posted);
1623                 buf->plb_posted = 0;
1624                 plni->plni_nposted_buffers--;
1625         }
1626
1627         if (repost)
1628                 (void) ptllnd_post_buffer(buf);
1629 }
1630
1631 void
1632 ptllnd_tx_event (lnet_ni_t *ni, ptl_event_t *event)
1633 {
1634         ptllnd_ni_t *plni = ni->ni_data;
1635         ptllnd_tx_t *tx = ptllnd_eventarg2obj(event->md.user_ptr);
1636         int          error = (event->ni_fail_type != PTL_NI_OK);
1637         int          isreq;
1638         int          isbulk;
1639 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
1640         int          unlinked = event->unlinked;
1641 #else
1642         int          unlinked = (event->type == PTL_EVENT_UNLINK);
1643 #endif
1644
1645         if (error)
1646                 CERROR("Error %s(%d) event %s(%d) unlinked %d, %s(%d) for %s\n",
1647                        ptllnd_errtype2str(event->ni_fail_type),
1648                        event->ni_fail_type,
1649                        ptllnd_evtype2str(event->type), event->type,
1650                        unlinked, ptllnd_msgtype2str(tx->tx_type), tx->tx_type,
1651                        libcfs_id2str(tx->tx_peer->plp_id));
1652
1653         LASSERT (!PtlHandleIsEqual(event->md_handle, PTL_INVALID_HANDLE));
1654
1655         isreq = PtlHandleIsEqual(event->md_handle, tx->tx_reqmdh);
1656         if (isreq) {
1657                 LASSERT (event->md.start == (void *)&tx->tx_msg);
1658                 if (unlinked) {
1659                         tx->tx_reqmdh = PTL_INVALID_HANDLE;
1660                         gettimeofday(&tx->tx_req_done, NULL);
1661                 }
1662         }
1663
1664         isbulk = PtlHandleIsEqual(event->md_handle, tx->tx_bulkmdh);
1665         if ( isbulk && unlinked ) {
1666                 tx->tx_bulkmdh = PTL_INVALID_HANDLE;
1667                 gettimeofday(&tx->tx_bulk_done, NULL);
1668         }
1669
1670         LASSERT (!isreq != !isbulk);            /* always one and only 1 match */
1671
1672         PTLLND_HISTORY("%s[%d/%d+%d(%d)]: TX done %p %s%s",
1673                        libcfs_id2str(tx->tx_peer->plp_id),
1674                        tx->tx_peer->plp_credits,
1675                        tx->tx_peer->plp_outstanding_credits,
1676                        tx->tx_peer->plp_sent_credits,
1677                        plni->plni_peer_credits + tx->tx_peer->plp_lazy_credits,
1678                        tx, isreq ? "REQ" : "BULK", unlinked ? "(unlinked)" : "");
1679
1680         LASSERT (!isreq != !isbulk);            /* always one and only 1 match */
1681         switch (tx->tx_type) {
1682         default:
1683                 LBUG();
1684
1685         case PTLLND_MSG_TYPE_NOOP:
1686         case PTLLND_MSG_TYPE_HELLO:
1687         case PTLLND_MSG_TYPE_IMMEDIATE:
1688                 LASSERT (event->type == PTL_EVENT_UNLINK ||
1689                          event->type == PTL_EVENT_SEND_END);
1690                 LASSERT (isreq);
1691                 break;
1692
1693         case PTLLND_MSG_TYPE_GET:
1694                 LASSERT (event->type == PTL_EVENT_UNLINK ||
1695                          (isreq && event->type == PTL_EVENT_SEND_END) ||
1696                          (isbulk && event->type == PTL_EVENT_PUT_END));
1697
1698                 if (isbulk && !error && event->type == PTL_EVENT_PUT_END) {
1699                         /* Check GET matched */
1700                         if (event->hdr_data == PTLLND_RDMA_OK) {
1701                                 lnet_set_reply_msg_len(ni, 
1702                                                        tx->tx_lnetreplymsg,
1703                                                        event->mlength);
1704                         } else {
1705                                 CERROR ("Unmatched GET with %s\n",
1706                                         libcfs_id2str(tx->tx_peer->plp_id));
1707                                 tx->tx_status = -EIO;
1708                         }
1709                 }
1710                 break;
1711
1712         case PTLLND_MSG_TYPE_PUT:
1713                 LASSERT (event->type == PTL_EVENT_UNLINK ||
1714                          (isreq && event->type == PTL_EVENT_SEND_END) ||
1715                          (isbulk && event->type == PTL_EVENT_GET_END));
1716                 break;
1717
1718         case PTLLND_RDMA_READ:
1719                 LASSERT (event->type == PTL_EVENT_UNLINK ||
1720                          event->type == PTL_EVENT_SEND_END ||
1721                          event->type == PTL_EVENT_REPLY_END);
1722                 LASSERT (isbulk);
1723                 break;
1724
1725         case PTLLND_RDMA_WRITE:
1726                 LASSERT (event->type == PTL_EVENT_UNLINK ||
1727                          event->type == PTL_EVENT_SEND_END);
1728                 LASSERT (isbulk);
1729         }
1730
1731         /* Schedule ptllnd_tx_done() on error or last completion event */
1732         if (error ||
1733             (PtlHandleIsEqual(tx->tx_bulkmdh, PTL_INVALID_HANDLE) &&
1734              PtlHandleIsEqual(tx->tx_reqmdh, PTL_INVALID_HANDLE))) {
1735                 if (error)
1736                         tx->tx_status = -EIO;
1737                 list_del(&tx->tx_list);
1738                 list_add_tail(&tx->tx_list, &plni->plni_zombie_txs);
1739         }
1740 }
1741
1742 ptllnd_tx_t *
1743 ptllnd_find_timed_out_tx(ptllnd_peer_t *peer)
1744 {
1745         time_t            now = cfs_time_current_sec();
1746         ptllnd_tx_t *tx;
1747
1748         list_for_each_entry (tx, &peer->plp_txq, tx_list) {
1749                 if (tx->tx_deadline < now)
1750                         return tx;
1751         }
1752
1753         list_for_each_entry (tx, &peer->plp_noopq, tx_list) {
1754                 if (tx->tx_deadline < now)
1755                         return tx;
1756         }
1757
1758         list_for_each_entry (tx, &peer->plp_activeq, tx_list) {
1759                 if (tx->tx_deadline < now)
1760                         return tx;
1761         }
1762
1763         return NULL;
1764 }
1765
1766 void
1767 ptllnd_check_peer(ptllnd_peer_t *peer)
1768 {
1769         ptllnd_tx_t *tx = ptllnd_find_timed_out_tx(peer);
1770
1771         if (tx == NULL)
1772                 return;
1773
1774         CERROR("%s (sent %d recvd %d, credits %d/%d/%d/%d/%d): timed out %p %p\n",
1775                libcfs_id2str(peer->plp_id), peer->plp_sent_hello, peer->plp_recvd_hello,
1776                peer->plp_credits, peer->plp_outstanding_credits,
1777                peer->plp_sent_credits, peer->plp_lazy_credits,
1778                peer->plp_extra_lazy_credits, tx, tx->tx_lnetmsg);
1779         ptllnd_debug_tx(tx);
1780         ptllnd_close_peer(peer, -ETIMEDOUT);
1781 }
1782
1783 void
1784 ptllnd_watchdog (lnet_ni_t *ni, time_t now)
1785 {
1786         ptllnd_ni_t      *plni = ni->ni_data;
1787         const int         n = 4;
1788         int               p = plni->plni_watchdog_interval;
1789         int               chunk = plni->plni_peer_hash_size;
1790         int               interval = now - (plni->plni_watchdog_nextt - p);
1791         int               i;
1792         struct list_head *hashlist;
1793         struct list_head *tmp;
1794         struct list_head *nxt;
1795
1796         /* Time to check for RDMA timeouts on a few more peers: 
1797          * I try to do checks every 'p' seconds on a proportion of the peer
1798          * table and I need to check every connection 'n' times within a
1799          * timeout interval, to ensure I detect a timeout on any connection
1800          * within (n+1)/n times the timeout interval. */
1801
1802         LASSERT (now >= plni->plni_watchdog_nextt);
1803
1804         if (plni->plni_timeout > n * interval) { /* Scan less than the whole table? */
1805                 chunk = (chunk * n * interval) / plni->plni_timeout;
1806                 if (chunk == 0)
1807                         chunk = 1;
1808         }
1809
1810         for (i = 0; i < chunk; i++) {
1811                 hashlist = &plni->plni_peer_hash[plni->plni_watchdog_peeridx];
1812
1813                 list_for_each_safe(tmp, nxt, hashlist) {
1814                         ptllnd_check_peer(list_entry(tmp, ptllnd_peer_t, plp_list));
1815                 }
1816
1817                 plni->plni_watchdog_peeridx = (plni->plni_watchdog_peeridx + 1) %
1818                                               plni->plni_peer_hash_size;
1819         }
1820
1821         plni->plni_watchdog_nextt = now + p;
1822 }
1823
1824 void
1825 ptllnd_wait (lnet_ni_t *ni, int milliseconds)
1826 {
1827         static struct timeval  prevt;
1828         static int             prevt_count;
1829         static int             call_count;
1830
1831         struct timeval         start;
1832         struct timeval         then;
1833         struct timeval         now;
1834         struct timeval         deadline;
1835
1836         ptllnd_ni_t   *plni = ni->ni_data;
1837         ptllnd_tx_t   *tx;
1838         ptl_event_t    event;
1839         int            which;
1840         int            rc;
1841         int            found = 0;
1842         int            timeout = 0;
1843
1844         /* Handle any currently queued events, returning immediately if any.
1845          * Otherwise block for the timeout and handle all events queued
1846          * then. */
1847
1848         gettimeofday(&start, NULL);
1849         call_count++;
1850
1851         if (milliseconds <= 0) {
1852                 deadline = start;
1853         } else {
1854                 deadline.tv_sec  = start.tv_sec  +  milliseconds/1000;
1855                 deadline.tv_usec = start.tv_usec + (milliseconds % 1000)*1000;
1856
1857                 if (deadline.tv_usec >= 1000000) {
1858                         start.tv_usec -= 1000000;
1859                         start.tv_sec++;
1860                 }
1861         }
1862
1863         for (;;) {
1864                 gettimeofday(&then, NULL);
1865
1866                 rc = PtlEQPoll(&plni->plni_eqh, 1, timeout, &event, &which);
1867
1868                 gettimeofday(&now, NULL);
1869
1870                 if ((now.tv_sec*1000 + now.tv_usec/1000) - 
1871                     (then.tv_sec*1000 + then.tv_usec/1000) > timeout + 1000) {
1872                         /* 1000 mS grace...........................^ */
1873                         CERROR("SLOW PtlEQPoll(%d): %dmS elapsed\n", timeout,
1874                                (int)(now.tv_sec*1000 + now.tv_usec/1000) - 
1875                                (int)(then.tv_sec*1000 + then.tv_usec/1000));
1876                 }
1877
1878                 if (rc == PTL_EQ_EMPTY) {
1879                         if (found)              /* handled some events */
1880                                 break;
1881
1882                         if (now.tv_sec >= plni->plni_watchdog_nextt) { /* check timeouts? */
1883                                 ptllnd_watchdog(ni, now.tv_sec);
1884                                 LASSERT (now.tv_sec < plni->plni_watchdog_nextt);
1885                         }
1886
1887                         if (now.tv_sec > deadline.tv_sec || /* timeout expired */
1888                             (now.tv_sec == deadline.tv_sec &&
1889                              now.tv_usec >= deadline.tv_usec))
1890                                 break;
1891
1892                         if (milliseconds < 0 ||
1893                             plni->plni_watchdog_nextt <= deadline.tv_sec)  {
1894                                 timeout = (plni->plni_watchdog_nextt - now.tv_sec)*1000;
1895                         } else {
1896                                 timeout = (deadline.tv_sec - now.tv_sec)*1000 +
1897                                           (deadline.tv_usec - now.tv_usec)/1000;
1898                         }
1899
1900                         continue;
1901                 }
1902
1903                 LASSERT (rc == PTL_OK || rc == PTL_EQ_DROPPED);
1904
1905                 if (rc == PTL_EQ_DROPPED)
1906                         CERROR("Event queue: size %d is too small\n",
1907                                plni->plni_eq_size);
1908
1909                 timeout = 0;
1910                 found = 1;
1911
1912                 switch (ptllnd_eventarg2type(event.md.user_ptr)) {
1913                 default:
1914                         LBUG();
1915
1916                 case PTLLND_EVENTARG_TYPE_TX:
1917                         ptllnd_tx_event(ni, &event);
1918                         break;
1919
1920                 case PTLLND_EVENTARG_TYPE_BUF:
1921                         ptllnd_buf_event(ni, &event);
1922                         break;
1923                 }
1924         }
1925
1926         while (!list_empty(&plni->plni_zombie_txs)) {
1927                 tx = list_entry(plni->plni_zombie_txs.next,
1928                                 ptllnd_tx_t, tx_list);
1929                 list_del_init(&tx->tx_list);
1930                 ptllnd_tx_done(tx);
1931         }
1932
1933         if (prevt.tv_sec == 0 ||
1934             prevt.tv_sec != now.tv_sec) {
1935                 PTLLND_HISTORY("%d wait entered at %d.%06d - prev %d %d.%06d", 
1936                                call_count, (int)start.tv_sec, (int)start.tv_usec,
1937                                prevt_count, (int)prevt.tv_sec, (int)prevt.tv_usec);
1938                 prevt = now;
1939         }
1940 }