Whamcloud - gitweb
ff7814428f947ce2b36e397a0d192cbec0b77f1d
[fs/lustre-release.git] / lnet / ulnds / ptllnd / ptllnd_cb.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  */
30 /*
31  * This file is part of Lustre, http://www.lustre.org/
32  * Lustre is a trademark of Sun Microsystems, Inc.
33  *
34  * lnet/ulnds/ptllnd/ptllnd_cb.c
35  *
36  * Author: Eric Barton <eeb@bartonsoftware.com>
37  */
38
39 #include "ptllnd.h"
40
41 void
42 ptllnd_set_tx_deadline(ptllnd_tx_t *tx)
43 {
44         ptllnd_peer_t  *peer = tx->tx_peer;
45         lnet_ni_t      *ni = peer->plp_ni;
46         ptllnd_ni_t    *plni = ni->ni_data;
47
48         tx->tx_deadline = cfs_time_current_sec() + plni->plni_timeout;
49 }
50
51 void
52 ptllnd_post_tx(ptllnd_tx_t *tx)
53 {
54         ptllnd_peer_t  *peer = tx->tx_peer;
55
56         LASSERT (tx->tx_type != PTLLND_MSG_TYPE_NOOP);
57
58         ptllnd_set_tx_deadline(tx);
59         cfs_list_add_tail(&tx->tx_list, &peer->plp_txq);
60         ptllnd_check_sends(peer);
61 }
62
63 char *
64 ptllnd_ptlid2str(ptl_process_id_t id)
65 {
66         static char strs[8][32];
67         static int  idx = 0;
68
69         char   *str = strs[idx++];
70
71         if (idx >= sizeof(strs)/sizeof(strs[0]))
72                 idx = 0;
73
74         snprintf(str, sizeof(strs[0]), FMT_PTLID, id.pid, id.nid);
75         return str;
76 }
77
78 void
79 ptllnd_destroy_peer(ptllnd_peer_t *peer)
80 {
81         lnet_ni_t         *ni = peer->plp_ni;
82         ptllnd_ni_t       *plni = ni->ni_data;
83         int                nmsg = peer->plp_lazy_credits +
84                                   plni->plni_peer_credits;
85
86         ptllnd_size_buffers(ni, -nmsg);
87
88         LASSERT (peer->plp_closing);
89         LASSERT (plni->plni_npeers > 0);
90         LASSERT (cfs_list_empty(&peer->plp_txq));
91         LASSERT (cfs_list_empty(&peer->plp_noopq));
92         LASSERT (cfs_list_empty(&peer->plp_activeq));
93         plni->plni_npeers--;
94         LIBCFS_FREE(peer, sizeof(*peer));
95 }
96
97 void
98 ptllnd_abort_txs(ptllnd_ni_t *plni, cfs_list_t *q)
99 {
100         while (!cfs_list_empty(q)) {
101                 ptllnd_tx_t *tx = cfs_list_entry(q->next, ptllnd_tx_t, tx_list);
102
103                 tx->tx_status = -ESHUTDOWN;
104                 cfs_list_del(&tx->tx_list);
105                 cfs_list_add_tail(&tx->tx_list, &plni->plni_zombie_txs);
106         }
107 }
108
109 void
110 ptllnd_close_peer(ptllnd_peer_t *peer, int error)
111 {
112         lnet_ni_t   *ni = peer->plp_ni;
113         ptllnd_ni_t *plni = ni->ni_data;
114
115         if (peer->plp_closing)
116                 return;
117
118         peer->plp_closing = 1;
119
120         if (!cfs_list_empty(&peer->plp_txq) ||
121             !cfs_list_empty(&peer->plp_noopq) ||
122             !cfs_list_empty(&peer->plp_activeq) ||
123             error != 0) {
124                 CWARN("Closing %s: %d\n", libcfs_id2str(peer->plp_id), error);
125                 if (plni->plni_debug)
126                         ptllnd_dump_debug(ni, peer->plp_id);
127         }
128
129         ptllnd_abort_txs(plni, &peer->plp_txq);
130         ptllnd_abort_txs(plni, &peer->plp_noopq);
131         ptllnd_abort_txs(plni, &peer->plp_activeq);
132
133         cfs_list_del(&peer->plp_list);
134         ptllnd_peer_decref(peer);
135 }
136
137 ptllnd_peer_t *
138 ptllnd_find_peer(lnet_ni_t *ni, lnet_process_id_t id, int create)
139 {
140         ptllnd_ni_t       *plni = ni->ni_data;
141         unsigned int       hash = LNET_NIDADDR(id.nid) % plni->plni_peer_hash_size;
142         ptllnd_peer_t     *plp;
143         ptllnd_tx_t       *tx;
144         int                rc;
145
146         LASSERT (LNET_NIDNET(id.nid) == LNET_NIDNET(ni->ni_nid));
147
148         cfs_list_for_each_entry (plp, &plni->plni_peer_hash[hash], plp_list) {
149                 if (plp->plp_id.nid == id.nid &&
150                     plp->plp_id.pid == id.pid) {
151                         ptllnd_peer_addref(plp);
152                         return plp;
153                 }
154         }
155
156         if (!create)
157                 return NULL;
158
159         /* New peer: check first for enough posted buffers */
160         plni->plni_npeers++;
161         rc = ptllnd_size_buffers(ni, plni->plni_peer_credits);
162         if (rc != 0) {
163                 plni->plni_npeers--;
164                 return NULL;
165         }
166
167         LIBCFS_ALLOC(plp, sizeof(*plp));
168         if (plp == NULL) {
169                 CERROR("Can't allocate new peer %s\n", libcfs_id2str(id));
170                 plni->plni_npeers--;
171                 ptllnd_size_buffers(ni, -plni->plni_peer_credits);
172                 return NULL;
173         }
174
175         plp->plp_ni = ni;
176         plp->plp_id = id;
177         plp->plp_ptlid.nid = LNET_NIDADDR(id.nid);
178         plp->plp_ptlid.pid = plni->plni_ptllnd_pid;
179         plp->plp_credits = 1; /* add more later when she gives me credits */
180         plp->plp_max_msg_size = plni->plni_max_msg_size; /* until I hear from her */
181         plp->plp_sent_credits = 1;              /* Implicit credit for HELLO */
182         plp->plp_outstanding_credits = plni->plni_peer_credits - 1;
183         plp->plp_lazy_credits = 0;
184         plp->plp_extra_lazy_credits = 0;
185         plp->plp_match = 0;
186         plp->plp_stamp = 0;
187         plp->plp_sent_hello = 0;
188         plp->plp_recvd_hello = 0;
189         plp->plp_closing = 0;
190         plp->plp_refcount = 1;
191         CFS_INIT_LIST_HEAD(&plp->plp_list);
192         CFS_INIT_LIST_HEAD(&plp->plp_txq);
193         CFS_INIT_LIST_HEAD(&plp->plp_noopq);
194         CFS_INIT_LIST_HEAD(&plp->plp_activeq);
195
196         ptllnd_peer_addref(plp);
197         cfs_list_add_tail(&plp->plp_list, &plni->plni_peer_hash[hash]);
198
199         tx = ptllnd_new_tx(plp, PTLLND_MSG_TYPE_HELLO, 0);
200         if (tx == NULL) {
201                 CERROR("Can't send HELLO to %s\n", libcfs_id2str(id));
202                 ptllnd_close_peer(plp, -ENOMEM);
203                 ptllnd_peer_decref(plp);
204                 return NULL;
205         }
206
207         tx->tx_msg.ptlm_u.hello.kptlhm_matchbits = PTL_RESERVED_MATCHBITS;
208         tx->tx_msg.ptlm_u.hello.kptlhm_max_msg_size = plni->plni_max_msg_size;
209
210         PTLLND_HISTORY("%s[%d/%d+%d(%d)]: post hello %p", libcfs_id2str(id),
211                        tx->tx_peer->plp_credits,
212                        tx->tx_peer->plp_outstanding_credits,
213                        tx->tx_peer->plp_sent_credits,
214                        plni->plni_peer_credits + 
215                        tx->tx_peer->plp_lazy_credits, tx);
216         ptllnd_post_tx(tx);
217
218         return plp;
219 }
220
221 int
222 ptllnd_count_q(cfs_list_t *q)
223 {
224         cfs_list_t *e;
225         int         n = 0;
226
227         cfs_list_for_each(e, q) {
228                 n++;
229         }
230
231         return n;
232 }
233
234 const char *
235 ptllnd_tx_typestr(int type)
236 {
237         switch (type) {
238         case PTLLND_RDMA_WRITE:
239                 return "rdma_write";
240
241         case PTLLND_RDMA_READ:
242                 return "rdma_read";
243
244         case PTLLND_MSG_TYPE_PUT:
245                 return "put_req";
246
247         case PTLLND_MSG_TYPE_GET:
248                 return "get_req";
249
250         case PTLLND_MSG_TYPE_IMMEDIATE:
251                 return "immediate";
252
253         case PTLLND_MSG_TYPE_NOOP:
254                 return "noop";
255
256         case PTLLND_MSG_TYPE_HELLO:
257                 return "hello";
258
259         default:
260                 return "<unknown>";
261         }
262 }
263
264 void
265 ptllnd_debug_tx(ptllnd_tx_t *tx)
266 {
267         CDEBUG(D_WARNING, "%s %s b %ld.%06ld/%ld.%06ld"
268                " r %ld.%06ld/%ld.%06ld status %d\n",
269                ptllnd_tx_typestr(tx->tx_type),
270                libcfs_id2str(tx->tx_peer->plp_id),
271                tx->tx_bulk_posted.tv_sec, tx->tx_bulk_posted.tv_usec,
272                tx->tx_bulk_done.tv_sec, tx->tx_bulk_done.tv_usec,
273                tx->tx_req_posted.tv_sec, tx->tx_req_posted.tv_usec,
274                tx->tx_req_done.tv_sec, tx->tx_req_done.tv_usec,
275                tx->tx_status);
276 }
277
278 void
279 ptllnd_debug_peer(lnet_ni_t *ni, lnet_process_id_t id)
280 {
281         ptllnd_peer_t    *plp = ptllnd_find_peer(ni, id, 0);
282         ptllnd_ni_t      *plni = ni->ni_data;
283         ptllnd_tx_t      *tx;
284
285         if (plp == NULL) {
286                 CDEBUG(D_WARNING, "No peer %s\n", libcfs_id2str(id));
287                 return;
288         }
289
290         CWARN("%s %s%s [%d] "LPU64".%06d m "LPU64" q %d/%d/%d c %d/%d+%d(%d)\n",
291               libcfs_id2str(id),
292               plp->plp_recvd_hello ? "H" : "_",
293               plp->plp_closing     ? "C" : "_",
294               plp->plp_refcount,
295               plp->plp_stamp / 1000000, (int)(plp->plp_stamp % 1000000),
296               plp->plp_match,
297               ptllnd_count_q(&plp->plp_txq),
298               ptllnd_count_q(&plp->plp_noopq),
299               ptllnd_count_q(&plp->plp_activeq),
300               plp->plp_credits, plp->plp_outstanding_credits, plp->plp_sent_credits,
301               plni->plni_peer_credits + plp->plp_lazy_credits);
302
303         CDEBUG(D_WARNING, "txq:\n");
304         cfs_list_for_each_entry (tx, &plp->plp_txq, tx_list) {
305                 ptllnd_debug_tx(tx);
306         }
307
308         CDEBUG(D_WARNING, "noopq:\n");
309         cfs_list_for_each_entry (tx, &plp->plp_noopq, tx_list) {
310                 ptllnd_debug_tx(tx);
311         }
312
313         CDEBUG(D_WARNING, "activeq:\n");
314         cfs_list_for_each_entry (tx, &plp->plp_activeq, tx_list) {
315                 ptllnd_debug_tx(tx);
316         }
317
318         CDEBUG(D_WARNING, "zombies:\n");
319         cfs_list_for_each_entry (tx, &plni->plni_zombie_txs, tx_list) {
320                 if (tx->tx_peer->plp_id.nid == id.nid &&
321                     tx->tx_peer->plp_id.pid == id.pid)
322                         ptllnd_debug_tx(tx);
323         }
324
325         CDEBUG(D_WARNING, "history:\n");
326         cfs_list_for_each_entry (tx, &plni->plni_tx_history, tx_list) {
327                 if (tx->tx_peer->plp_id.nid == id.nid &&
328                     tx->tx_peer->plp_id.pid == id.pid)
329                         ptllnd_debug_tx(tx);
330         }
331
332         ptllnd_peer_decref(plp);
333 }
334
335 void
336 ptllnd_dump_debug(lnet_ni_t *ni, lnet_process_id_t id)
337 {
338         ptllnd_debug_peer(ni, id);
339         ptllnd_dump_history();
340 }
341
342 int
343 ptllnd_setasync(lnet_ni_t *ni, lnet_process_id_t id, int nasync)
344 {
345         ptllnd_peer_t *peer = ptllnd_find_peer(ni, id, nasync > 0);
346         int            rc;
347
348         if (peer == NULL)
349                 return -ENOMEM;
350
351         LASSERT (peer->plp_lazy_credits >= 0);
352         LASSERT (peer->plp_extra_lazy_credits >= 0);
353
354         /* If nasync < 0, we're being told we can reduce the total message
355          * headroom.  We can't do this right now because our peer might already
356          * have credits for the extra buffers, so we just account the extra
357          * headroom in case we need it later and only destroy buffers when the
358          * peer closes.
359          *
360          * Note that the following condition handles this case, where it
361          * actually increases the extra lazy credit counter. */
362
363         if (nasync <= peer->plp_extra_lazy_credits) {
364                 peer->plp_extra_lazy_credits -= nasync;
365                 return 0;
366         }
367
368         LASSERT (nasync > 0);
369
370         nasync -= peer->plp_extra_lazy_credits;
371         peer->plp_extra_lazy_credits = 0;
372
373         rc = ptllnd_size_buffers(ni, nasync);
374         if (rc == 0) {
375                 peer->plp_lazy_credits += nasync;
376                 peer->plp_outstanding_credits += nasync;
377         }
378
379         return rc;
380 }
381
382 __u32
383 ptllnd_cksum (void *ptr, int nob)
384 {
385         char  *c  = ptr;
386         __u32  sum = 0;
387
388         while (nob-- > 0)
389                 sum = ((sum << 1) | (sum >> 31)) + *c++;
390
391         /* ensure I don't return 0 (== no checksum) */
392         return (sum == 0) ? 1 : sum;
393 }
394
395 ptllnd_tx_t *
396 ptllnd_new_tx(ptllnd_peer_t *peer, int type, int payload_nob)
397 {
398         lnet_ni_t   *ni = peer->plp_ni;
399         ptllnd_ni_t *plni = ni->ni_data;
400         ptllnd_tx_t *tx;
401         int          msgsize;
402
403         CDEBUG(D_NET, "peer=%p type=%d payload=%d\n", peer, type, payload_nob);
404
405         switch (type) {
406         default:
407                 LBUG();
408
409         case PTLLND_RDMA_WRITE:
410         case PTLLND_RDMA_READ:
411                 LASSERT (payload_nob == 0);
412                 msgsize = 0;
413                 break;
414
415         case PTLLND_MSG_TYPE_PUT:
416         case PTLLND_MSG_TYPE_GET:
417                 LASSERT (payload_nob == 0);
418                 msgsize = offsetof(kptl_msg_t, ptlm_u) + 
419                           sizeof(kptl_rdma_msg_t);
420                 break;
421
422         case PTLLND_MSG_TYPE_IMMEDIATE:
423                 msgsize = offsetof(kptl_msg_t,
424                                    ptlm_u.immediate.kptlim_payload[payload_nob]);
425                 break;
426
427         case PTLLND_MSG_TYPE_NOOP:
428                 LASSERT (payload_nob == 0);
429                 msgsize = offsetof(kptl_msg_t, ptlm_u);
430                 break;
431
432         case PTLLND_MSG_TYPE_HELLO:
433                 LASSERT (payload_nob == 0);
434                 msgsize = offsetof(kptl_msg_t, ptlm_u) +
435                           sizeof(kptl_hello_msg_t);
436                 break;
437         }
438
439         msgsize = (msgsize + 7) & ~7;
440         LASSERT (msgsize <= peer->plp_max_msg_size);
441
442         LIBCFS_ALLOC(tx, offsetof(ptllnd_tx_t, tx_msg) + msgsize);
443
444         if (tx == NULL) {
445                 CERROR("Can't allocate msg type %d for %s\n",
446                        type, libcfs_id2str(peer->plp_id));
447                 return NULL;
448         }
449
450         CFS_INIT_LIST_HEAD(&tx->tx_list);
451         tx->tx_peer = peer;
452         tx->tx_type = type;
453         tx->tx_lnetmsg = tx->tx_lnetreplymsg = NULL;
454         tx->tx_niov = 0;
455         tx->tx_iov = NULL;
456         tx->tx_reqmdh = PTL_INVALID_HANDLE;
457         tx->tx_bulkmdh = PTL_INVALID_HANDLE;
458         tx->tx_msgsize = msgsize;
459         tx->tx_completing = 0;
460         tx->tx_status = 0;
461
462         memset(&tx->tx_bulk_posted, 0, sizeof(tx->tx_bulk_posted));
463         memset(&tx->tx_bulk_done, 0, sizeof(tx->tx_bulk_done));
464         memset(&tx->tx_req_posted, 0, sizeof(tx->tx_req_posted));
465         memset(&tx->tx_req_done, 0, sizeof(tx->tx_req_done));
466
467         if (msgsize != 0) {
468                 tx->tx_msg.ptlm_magic = PTLLND_MSG_MAGIC;
469                 tx->tx_msg.ptlm_version = PTLLND_MSG_VERSION;
470                 tx->tx_msg.ptlm_type = type;
471                 tx->tx_msg.ptlm_credits = 0;
472                 tx->tx_msg.ptlm_nob = msgsize;
473                 tx->tx_msg.ptlm_cksum = 0;
474                 tx->tx_msg.ptlm_srcnid = ni->ni_nid;
475                 tx->tx_msg.ptlm_srcstamp = plni->plni_stamp;
476                 tx->tx_msg.ptlm_dstnid = peer->plp_id.nid;
477                 tx->tx_msg.ptlm_dststamp = peer->plp_stamp;
478                 tx->tx_msg.ptlm_srcpid = the_lnet.ln_pid;
479                 tx->tx_msg.ptlm_dstpid = peer->plp_id.pid;
480         }
481
482         ptllnd_peer_addref(peer);
483         plni->plni_ntxs++;
484
485         CDEBUG(D_NET, "tx=%p\n", tx);
486
487         return tx;
488 }
489
490 void
491 ptllnd_abort_tx(ptllnd_tx_t *tx, ptl_handle_md_t *mdh)
492 {
493         ptllnd_peer_t   *peer = tx->tx_peer;
494         lnet_ni_t       *ni = peer->plp_ni;
495         int              rc;
496         time_t           start = cfs_time_current_sec();
497         ptllnd_ni_t     *plni = ni->ni_data;
498         int              w = plni->plni_long_wait;
499
500         while (!PtlHandleIsEqual(*mdh, PTL_INVALID_HANDLE)) {
501                 rc = PtlMDUnlink(*mdh);
502 #ifndef LUSTRE_PORTALS_UNLINK_SEMANTICS
503                 if (rc == PTL_OK) /* unlink successful => no unlinked event */
504                         return;
505                 LASSERT (rc == PTL_MD_IN_USE);
506 #endif
507                 if (w > 0 && cfs_time_current_sec() > start + w/1000) {
508                         CWARN("Waited %ds to abort tx to %s\n",
509                               (int)(cfs_time_current_sec() - start),
510                               libcfs_id2str(peer->plp_id));
511                         w *= 2;
512                 }
513                 /* Wait for ptllnd_tx_event() to invalidate */
514                 ptllnd_wait(ni, w);
515         }
516 }
517
518 void
519 ptllnd_cull_tx_history(ptllnd_ni_t *plni)
520 {
521         int max = plni->plni_max_tx_history;
522
523         while (plni->plni_ntx_history > max) {
524                 ptllnd_tx_t *tx = cfs_list_entry(plni->plni_tx_history.next,
525                                                  ptllnd_tx_t, tx_list);
526                 cfs_list_del(&tx->tx_list);
527
528                 ptllnd_peer_decref(tx->tx_peer);
529
530                 LIBCFS_FREE(tx, offsetof(ptllnd_tx_t, tx_msg) + tx->tx_msgsize);
531
532                 LASSERT (plni->plni_ntxs > 0);
533                 plni->plni_ntxs--;
534                 plni->plni_ntx_history--;
535         }
536 }
537
538 void
539 ptllnd_tx_done(ptllnd_tx_t *tx)
540 {
541         ptllnd_peer_t   *peer = tx->tx_peer;
542         lnet_ni_t       *ni = peer->plp_ni;
543         ptllnd_ni_t     *plni = ni->ni_data;
544
545         /* CAVEAT EMPTOR: If this tx is being aborted, I'll continue to get
546          * events for this tx until it's unlinked.  So I set tx_completing to
547          * flag the tx is getting handled */
548
549         if (tx->tx_completing)
550                 return;
551
552         tx->tx_completing = 1;
553
554         if (!cfs_list_empty(&tx->tx_list))
555                 cfs_list_del_init(&tx->tx_list);
556
557         if (tx->tx_status != 0) {
558                 if (plni->plni_debug) {
559                         CERROR("Completing tx for %s with error %d\n",
560                                libcfs_id2str(peer->plp_id), tx->tx_status);
561                         ptllnd_debug_tx(tx);
562                 }
563                 ptllnd_close_peer(peer, tx->tx_status);
564         }
565
566         ptllnd_abort_tx(tx, &tx->tx_reqmdh);
567         ptllnd_abort_tx(tx, &tx->tx_bulkmdh);
568
569         if (tx->tx_niov > 0) {
570                 LIBCFS_FREE(tx->tx_iov, tx->tx_niov * sizeof(*tx->tx_iov));
571                 tx->tx_niov = 0;
572         }
573
574         if (tx->tx_lnetreplymsg != NULL) {
575                 LASSERT (tx->tx_type == PTLLND_MSG_TYPE_GET);
576                 LASSERT (tx->tx_lnetmsg != NULL);
577                 /* Simulate GET success always  */
578                 lnet_finalize(ni, tx->tx_lnetmsg, 0);
579                 CDEBUG(D_NET, "lnet_finalize(tx_lnetreplymsg=%p)\n",tx->tx_lnetreplymsg);
580                 lnet_finalize(ni, tx->tx_lnetreplymsg, tx->tx_status);
581         } else if (tx->tx_lnetmsg != NULL) {
582                 lnet_finalize(ni, tx->tx_lnetmsg, tx->tx_status);
583         }
584
585         plni->plni_ntx_history++;
586         cfs_list_add_tail(&tx->tx_list, &plni->plni_tx_history);
587
588         ptllnd_cull_tx_history(plni);
589 }
590
591 int
592 ptllnd_set_txiov(ptllnd_tx_t *tx,
593                  unsigned int niov, struct iovec *iov,
594                  unsigned int offset, unsigned int len)
595 {
596         ptl_md_iovec_t *piov;
597         int             npiov;
598
599         if (len == 0) {
600                 tx->tx_niov = 0;
601                 return 0;
602         }
603
604         /*
605          * Remove iovec's at the beginning that
606          * are skipped because of the offset.
607          * Adjust the offset accordingly
608          */
609         for (;;) {
610                 LASSERT (niov > 0);
611                 if (offset < iov->iov_len)
612                         break;
613                 offset -= iov->iov_len;
614                 niov--;
615                 iov++;
616         }
617
618         for (;;) {
619                 int temp_offset = offset;
620                 int resid = len;
621                 LIBCFS_ALLOC(piov, niov * sizeof(*piov));
622                 if (piov == NULL)
623                         return -ENOMEM;
624
625                 for (npiov = 0;; npiov++) {
626                         LASSERT (npiov < niov);
627                         LASSERT (iov->iov_len >= temp_offset);
628
629                         piov[npiov].iov_base = iov[npiov].iov_base + temp_offset;
630                         piov[npiov].iov_len = iov[npiov].iov_len - temp_offset;
631
632                         if (piov[npiov].iov_len >= resid) {
633                                 piov[npiov].iov_len = resid;
634                                 npiov++;
635                                 break;
636                         }
637                         resid -= piov[npiov].iov_len;
638                         temp_offset = 0;
639                 }
640
641                 if (npiov == niov) {
642                         tx->tx_niov = niov;
643                         tx->tx_iov = piov;
644                         return 0;
645                 }
646
647                 /* Dang! The piov I allocated was too big and it's a drag to
648                  * have to maintain separate 'allocated' and 'used' sizes, so
649                  * I'll just do it again; NB this doesn't happen normally... */
650                 LIBCFS_FREE(piov, niov * sizeof(*piov));
651                 niov = npiov;
652         }
653 }
654
655 void
656 ptllnd_set_md_buffer(ptl_md_t *md, ptllnd_tx_t *tx)
657 {
658         unsigned int    niov = tx->tx_niov;
659         ptl_md_iovec_t *iov = tx->tx_iov;
660
661         LASSERT ((md->options & PTL_MD_IOVEC) == 0);
662
663         if (niov == 0) {
664                 md->start = NULL;
665                 md->length = 0;
666         } else if (niov == 1) {
667                 md->start = iov[0].iov_base;
668                 md->length = iov[0].iov_len;
669         } else {
670                 md->start = iov;
671                 md->length = niov;
672                 md->options |= PTL_MD_IOVEC;
673         }
674 }
675
676 int
677 ptllnd_post_buffer(ptllnd_buffer_t *buf)
678 {
679         lnet_ni_t        *ni = buf->plb_ni;
680         ptllnd_ni_t      *plni = ni->ni_data;
681         ptl_process_id_t  anyid = {
682                 .nid       = PTL_NID_ANY,
683                 .pid       = PTL_PID_ANY};
684         ptl_md_t          md = {
685                 .start     = buf->plb_buffer,
686                 .length    = plni->plni_buffer_size,
687                 .threshold = PTL_MD_THRESH_INF,
688                 .max_size  = plni->plni_max_msg_size,
689                 .options   = (PTLLND_MD_OPTIONS |
690                               PTL_MD_OP_PUT | PTL_MD_MAX_SIZE | 
691                               PTL_MD_LOCAL_ALIGN8),
692                 .user_ptr  = ptllnd_obj2eventarg(buf, PTLLND_EVENTARG_TYPE_BUF),
693                 .eq_handle = plni->plni_eqh};
694         ptl_handle_me_t meh;
695         int             rc;
696
697         LASSERT (!buf->plb_posted);
698
699         rc = PtlMEAttach(plni->plni_nih, plni->plni_portal,
700                          anyid, LNET_MSG_MATCHBITS, 0,
701                          PTL_UNLINK, PTL_INS_AFTER, &meh);
702         if (rc != PTL_OK) {
703                 CERROR("PtlMEAttach failed: %s(%d)\n",
704                        ptllnd_errtype2str(rc), rc);
705                 return -ENOMEM;
706         }
707
708         buf->plb_posted = 1;
709         plni->plni_nposted_buffers++;
710
711         rc = PtlMDAttach(meh, md, LNET_UNLINK, &buf->plb_md);
712         if (rc == PTL_OK)
713                 return 0;
714
715         CERROR("PtlMDAttach failed: %s(%d)\n",
716                ptllnd_errtype2str(rc), rc);
717
718         buf->plb_posted = 0;
719         plni->plni_nposted_buffers--;
720
721         rc = PtlMEUnlink(meh);
722         LASSERT (rc == PTL_OK);
723
724         return -ENOMEM;
725 }
726
727 static inline int
728 ptllnd_peer_send_noop (ptllnd_peer_t *peer)
729 {
730         ptllnd_ni_t *plni = peer->plp_ni->ni_data;
731
732         if (!peer->plp_sent_hello ||
733             peer->plp_credits == 0 ||
734             !cfs_list_empty(&peer->plp_noopq) ||
735             peer->plp_outstanding_credits < PTLLND_CREDIT_HIGHWATER(plni))
736                 return 0;
737
738         /* No tx to piggyback NOOP onto or no credit to send a tx */
739         return (cfs_list_empty(&peer->plp_txq) || peer->plp_credits == 1);
740 }
741
742 void
743 ptllnd_check_sends(ptllnd_peer_t *peer)
744 {
745         ptllnd_ni_t    *plni = peer->plp_ni->ni_data;
746         ptllnd_tx_t    *tx;
747         ptl_md_t        md;
748         ptl_handle_md_t mdh;
749         int             rc;
750
751         CDEBUG(D_NET, "%s: [%d/%d+%d(%d)\n",
752                libcfs_id2str(peer->plp_id), peer->plp_credits,
753                peer->plp_outstanding_credits, peer->plp_sent_credits,
754                plni->plni_peer_credits + peer->plp_lazy_credits);
755
756         if (ptllnd_peer_send_noop(peer)) {
757                 tx = ptllnd_new_tx(peer, PTLLND_MSG_TYPE_NOOP, 0);
758                 CDEBUG(D_NET, "NOOP tx=%p\n",tx);
759                 if (tx == NULL) {
760                         CERROR("Can't return credits to %s\n",
761                                libcfs_id2str(peer->plp_id));
762                 } else {
763                         ptllnd_set_tx_deadline(tx);
764                         cfs_list_add_tail(&tx->tx_list, &peer->plp_noopq);
765                 }
766         }
767
768         for (;;) {
769                 if (!cfs_list_empty(&peer->plp_noopq)) {
770                         LASSERT (peer->plp_sent_hello);
771                         tx = cfs_list_entry(peer->plp_noopq.next,
772                                             ptllnd_tx_t, tx_list);
773                 } else if (!cfs_list_empty(&peer->plp_txq)) {
774                         tx = cfs_list_entry(peer->plp_txq.next,
775                                             ptllnd_tx_t, tx_list);
776                 } else {
777                         /* nothing to send right now */
778                         break;
779                 }
780
781                 LASSERT (tx->tx_msgsize > 0);
782
783                 LASSERT (peer->plp_outstanding_credits >= 0);
784                 LASSERT (peer->plp_sent_credits >= 0);
785                 LASSERT (peer->plp_outstanding_credits + peer->plp_sent_credits
786                          <= plni->plni_peer_credits + peer->plp_lazy_credits);
787                 LASSERT (peer->plp_credits >= 0);
788
789                 /* say HELLO first */
790                 if (!peer->plp_sent_hello) {
791                         LASSERT (cfs_list_empty(&peer->plp_noopq));
792                         LASSERT (tx->tx_type == PTLLND_MSG_TYPE_HELLO);
793
794                         peer->plp_sent_hello = 1;
795                 }
796
797                 if (peer->plp_credits == 0) {   /* no credits */
798                         PTLLND_HISTORY("%s[%d/%d+%d(%d)]: no creds for %p",
799                                        libcfs_id2str(peer->plp_id),
800                                        peer->plp_credits,
801                                        peer->plp_outstanding_credits,
802                                        peer->plp_sent_credits,
803                                        plni->plni_peer_credits +
804                                        peer->plp_lazy_credits, tx);
805                         break;
806                 }
807
808                 /* Last/Initial credit reserved for NOOP/HELLO */
809                 if (peer->plp_credits == 1 &&
810                     tx->tx_type != PTLLND_MSG_TYPE_NOOP &&
811                     tx->tx_type != PTLLND_MSG_TYPE_HELLO) {
812                         PTLLND_HISTORY("%s[%d/%d+%d(%d)]: too few creds for %p",
813                                        libcfs_id2str(peer->plp_id),
814                                        peer->plp_credits,
815                                        peer->plp_outstanding_credits,
816                                        peer->plp_sent_credits,
817                                        plni->plni_peer_credits +
818                                        peer->plp_lazy_credits, tx);
819                         break;
820                 }
821
822                 cfs_list_del(&tx->tx_list);
823                 cfs_list_add_tail(&tx->tx_list, &peer->plp_activeq);
824
825                 CDEBUG(D_NET, "Sending at TX=%p type=%s (%d)\n",tx,
826                        ptllnd_msgtype2str(tx->tx_type),tx->tx_type);
827
828                 if (tx->tx_type == PTLLND_MSG_TYPE_NOOP &&
829                     !ptllnd_peer_send_noop(peer)) {
830                         /* redundant NOOP */
831                         ptllnd_tx_done(tx);
832                         continue;
833                 }
834
835                 /* Set stamp at the last minute; on a new peer, I don't know it
836                  * until I receive the HELLO back */
837                 tx->tx_msg.ptlm_dststamp = peer->plp_stamp;
838
839                 /*
840                  * Return all the credits we have
841                  */
842                 tx->tx_msg.ptlm_credits = MIN(PTLLND_MSG_MAX_CREDITS,
843                                               peer->plp_outstanding_credits);
844                 peer->plp_sent_credits += tx->tx_msg.ptlm_credits;
845                 peer->plp_outstanding_credits -= tx->tx_msg.ptlm_credits;
846
847                 /*
848                  * One less credit
849                  */
850                 peer->plp_credits--;
851
852                 if (plni->plni_checksum)
853                         tx->tx_msg.ptlm_cksum = 
854                                 ptllnd_cksum(&tx->tx_msg,
855                                              offsetof(kptl_msg_t, ptlm_u));
856
857                 md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);
858                 md.eq_handle = plni->plni_eqh;
859                 md.threshold = 1;
860                 md.options = PTLLND_MD_OPTIONS;
861                 md.start = &tx->tx_msg;
862                 md.length = tx->tx_msgsize;
863
864                 rc = PtlMDBind(plni->plni_nih, md, LNET_UNLINK, &mdh);
865                 if (rc != PTL_OK) {
866                         CERROR("PtlMDBind for %s failed: %s(%d)\n",
867                                libcfs_id2str(peer->plp_id),
868                                ptllnd_errtype2str(rc), rc);
869                         tx->tx_status = -EIO;
870                         ptllnd_tx_done(tx);
871                         break;
872                 }
873
874                 LASSERT (tx->tx_type != PTLLND_RDMA_WRITE &&
875                          tx->tx_type != PTLLND_RDMA_READ);
876
877                 tx->tx_reqmdh = mdh;
878                 gettimeofday(&tx->tx_req_posted, NULL);
879
880                 PTLLND_HISTORY("%s[%d/%d+%d(%d)]: %s %p c %d",
881                                libcfs_id2str(peer->plp_id),
882                                peer->plp_credits,
883                                peer->plp_outstanding_credits,
884                                peer->plp_sent_credits,
885                                plni->plni_peer_credits +
886                                peer->plp_lazy_credits,
887                                ptllnd_msgtype2str(tx->tx_type), tx,
888                                tx->tx_msg.ptlm_credits);
889
890                 rc = PtlPut(mdh, PTL_NOACK_REQ, peer->plp_ptlid,
891                             plni->plni_portal, 0, LNET_MSG_MATCHBITS, 0, 0);
892                 if (rc != PTL_OK) {
893                         CERROR("PtlPut for %s failed: %s(%d)\n",
894                                libcfs_id2str(peer->plp_id),
895                                ptllnd_errtype2str(rc), rc);
896                         tx->tx_status = -EIO;
897                         ptllnd_tx_done(tx);
898                         break;
899                 }
900         }
901 }
902
903 int
904 ptllnd_passive_rdma(ptllnd_peer_t *peer, int type, lnet_msg_t *msg,
905                     unsigned int niov, struct iovec *iov,
906                     unsigned int offset, unsigned int len)
907 {
908         lnet_ni_t      *ni = peer->plp_ni;
909         ptllnd_ni_t    *plni = ni->ni_data;
910         ptllnd_tx_t    *tx = ptllnd_new_tx(peer, type, 0);
911         __u64           matchbits;
912         ptl_md_t        md;
913         ptl_handle_md_t mdh;
914         ptl_handle_me_t meh;
915         int             rc;
916         int             rc2;
917         time_t          start;
918         int             w;
919
920         CDEBUG(D_NET, "niov=%d offset=%d len=%d\n",niov,offset,len);
921
922         LASSERT (type == PTLLND_MSG_TYPE_GET ||
923                  type == PTLLND_MSG_TYPE_PUT);
924
925         if (tx == NULL) {
926                 CERROR("Can't allocate %s tx for %s\n",
927                        ptllnd_msgtype2str(type), libcfs_id2str(peer->plp_id));
928                 return -ENOMEM;
929         }
930
931         rc = ptllnd_set_txiov(tx, niov, iov, offset, len);
932         if (rc != 0) {
933                 CERROR("Can't allocate iov %d for %s\n",
934                        niov, libcfs_id2str(peer->plp_id));
935                 rc = -ENOMEM;
936                 goto failed;
937         }
938
939         md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);
940         md.eq_handle = plni->plni_eqh;
941         md.threshold = 1;
942         md.max_size = 0;
943         md.options = PTLLND_MD_OPTIONS;
944         if(type == PTLLND_MSG_TYPE_GET)
945                 md.options |= PTL_MD_OP_PUT | PTL_MD_ACK_DISABLE;
946         else
947                 md.options |= PTL_MD_OP_GET;
948         ptllnd_set_md_buffer(&md, tx);
949
950         start = cfs_time_current_sec();
951         w = plni->plni_long_wait;
952         ptllnd_set_tx_deadline(tx);
953
954         while (!peer->plp_recvd_hello) {    /* wait to validate plp_match */
955                 if (peer->plp_closing) {
956                         rc = -EIO;
957                         goto failed;
958                 }
959
960                 /* NB must check here to avoid unbounded wait - tx not yet
961                  * on peer->plp_txq, so ptllnd_watchdog can't expire it */
962                 if (tx->tx_deadline < cfs_time_current_sec()) {
963                         CERROR("%s tx for %s timed out\n",
964                                ptllnd_msgtype2str(type),
965                                libcfs_id2str(peer->plp_id));
966                         rc = -ETIMEDOUT;
967                         goto failed;
968                 }
969
970                 if (w > 0 && cfs_time_current_sec() > start + w/1000) {
971                         CWARN("Waited %ds to connect to %s\n",
972                               (int)(cfs_time_current_sec() - start),
973                               libcfs_id2str(peer->plp_id));
974                         w *= 2;
975                 }
976                 ptllnd_wait(ni, w);
977         }
978
979         if (peer->plp_match < PTL_RESERVED_MATCHBITS)
980                 peer->plp_match = PTL_RESERVED_MATCHBITS;
981         matchbits = peer->plp_match++;
982
983         rc = PtlMEAttach(plni->plni_nih, plni->plni_portal, peer->plp_ptlid,
984                          matchbits, 0, PTL_UNLINK, PTL_INS_BEFORE, &meh);
985         if (rc != PTL_OK) {
986                 CERROR("PtlMEAttach for %s failed: %s(%d)\n",
987                        libcfs_id2str(peer->plp_id),
988                        ptllnd_errtype2str(rc), rc);
989                 rc = -EIO;
990                 goto failed;
991         }
992
993         gettimeofday(&tx->tx_bulk_posted, NULL);
994
995         rc = PtlMDAttach(meh, md, LNET_UNLINK, &mdh);
996         if (rc != PTL_OK) {
997                 CERROR("PtlMDAttach for %s failed: %s(%d)\n",
998                        libcfs_id2str(peer->plp_id),
999                        ptllnd_errtype2str(rc), rc);
1000                 rc2 = PtlMEUnlink(meh);
1001                 LASSERT (rc2 == PTL_OK);
1002                 rc = -EIO;
1003                 goto failed;
1004         }
1005         tx->tx_bulkmdh = mdh;
1006
1007         /*
1008          * We need to set the stamp here because it
1009          * we could have received a HELLO above that set
1010          * peer->plp_stamp
1011          */
1012         tx->tx_msg.ptlm_dststamp = peer->plp_stamp;
1013
1014         tx->tx_msg.ptlm_u.rdma.kptlrm_hdr = msg->msg_hdr;
1015         tx->tx_msg.ptlm_u.rdma.kptlrm_matchbits = matchbits;
1016
1017         if (type == PTLLND_MSG_TYPE_GET) {
1018                 tx->tx_lnetreplymsg = lnet_create_reply_msg(ni, msg);
1019                 if (tx->tx_lnetreplymsg == NULL) {
1020                         CERROR("Can't create reply for GET to %s\n",
1021                                libcfs_id2str(msg->msg_target));
1022                         rc = -ENOMEM;
1023                         goto failed;
1024                 }
1025         }
1026
1027         tx->tx_lnetmsg = msg;
1028         PTLLND_HISTORY("%s[%d/%d+%d(%d)]: post passive %s p %d %p",
1029                        libcfs_id2str(msg->msg_target),
1030                        peer->plp_credits, peer->plp_outstanding_credits,
1031                        peer->plp_sent_credits,
1032                        plni->plni_peer_credits + peer->plp_lazy_credits,
1033                        lnet_msgtyp2str(msg->msg_type),
1034                        (le32_to_cpu(msg->msg_type) == LNET_MSG_PUT) ? 
1035                        le32_to_cpu(msg->msg_hdr.msg.put.ptl_index) :
1036                        (le32_to_cpu(msg->msg_type) == LNET_MSG_GET) ? 
1037                        le32_to_cpu(msg->msg_hdr.msg.get.ptl_index) : -1,
1038                        tx);
1039         ptllnd_post_tx(tx);
1040         return 0;
1041
1042  failed:
1043         tx->tx_status = rc;
1044         ptllnd_tx_done(tx);
1045         return rc;
1046 }
1047
1048 int
1049 ptllnd_active_rdma(ptllnd_peer_t *peer, int type,
1050                    lnet_msg_t *msg, __u64 matchbits,
1051                    unsigned int niov, struct iovec *iov,
1052                    unsigned int offset, unsigned int len)
1053 {
1054         lnet_ni_t       *ni = peer->plp_ni;
1055         ptllnd_ni_t     *plni = ni->ni_data;
1056         ptllnd_tx_t     *tx = ptllnd_new_tx(peer, type, 0);
1057         ptl_md_t         md;
1058         ptl_handle_md_t  mdh;
1059         int              rc;
1060
1061         LASSERT (type == PTLLND_RDMA_READ ||
1062                  type == PTLLND_RDMA_WRITE);
1063
1064         if (tx == NULL) {
1065                 CERROR("Can't allocate tx for RDMA %s with %s\n",
1066                        (type == PTLLND_RDMA_WRITE) ? "write" : "read",
1067                        libcfs_id2str(peer->plp_id));
1068                 ptllnd_close_peer(peer, -ENOMEM);
1069                 return -ENOMEM;
1070         }
1071
1072         rc = ptllnd_set_txiov(tx, niov, iov, offset, len);
1073         if (rc != 0) {
1074                 CERROR("Can't allocate iov %d for %s\n",
1075                        niov, libcfs_id2str(peer->plp_id));
1076                 rc = -ENOMEM;
1077                 goto failed;
1078         }
1079
1080         md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);
1081         md.eq_handle = plni->plni_eqh;
1082         md.max_size = 0;
1083         md.options = PTLLND_MD_OPTIONS;
1084         md.threshold = (type == PTLLND_RDMA_READ) ? 2 : 1;
1085
1086         ptllnd_set_md_buffer(&md, tx);
1087
1088         rc = PtlMDBind(plni->plni_nih, md, LNET_UNLINK, &mdh);
1089         if (rc != PTL_OK) {
1090                 CERROR("PtlMDBind for %s failed: %s(%d)\n",
1091                        libcfs_id2str(peer->plp_id),
1092                        ptllnd_errtype2str(rc), rc);
1093                 rc = -EIO;
1094                 goto failed;
1095         }
1096
1097         tx->tx_bulkmdh = mdh;
1098         tx->tx_lnetmsg = msg;
1099
1100         ptllnd_set_tx_deadline(tx);
1101         cfs_list_add_tail(&tx->tx_list, &peer->plp_activeq);
1102         gettimeofday(&tx->tx_bulk_posted, NULL);
1103
1104         if (type == PTLLND_RDMA_READ)
1105                 rc = PtlGet(mdh, peer->plp_ptlid,
1106                             plni->plni_portal, 0, matchbits, 0);
1107         else
1108                 rc = PtlPut(mdh, PTL_NOACK_REQ, peer->plp_ptlid,
1109                             plni->plni_portal, 0, matchbits, 0, 
1110                             (msg == NULL) ? PTLLND_RDMA_FAIL : PTLLND_RDMA_OK);
1111
1112         if (rc == PTL_OK)
1113                 return 0;
1114
1115         CERROR("Can't initiate RDMA with %s: %s(%d)\n",
1116                libcfs_id2str(peer->plp_id),
1117                ptllnd_errtype2str(rc), rc);
1118
1119         tx->tx_lnetmsg = NULL;
1120  failed:
1121         tx->tx_status = rc;
1122         ptllnd_tx_done(tx);    /* this will close peer */
1123         return rc;
1124 }
1125
1126 int
1127 ptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *msg)
1128 {
1129         ptllnd_ni_t    *plni = ni->ni_data;
1130         ptllnd_peer_t  *plp;
1131         ptllnd_tx_t    *tx;
1132         int             nob;
1133         int             rc;
1134
1135         LASSERT (!msg->msg_routing);
1136         LASSERT (msg->msg_kiov == NULL);
1137
1138         LASSERT (msg->msg_niov <= PTL_MD_MAX_IOV); /* !!! */
1139
1140         CDEBUG(D_NET, "%s [%d]+%d,%d -> %s%s\n",
1141                lnet_msgtyp2str(msg->msg_type),
1142                msg->msg_niov, msg->msg_offset, msg->msg_len,
1143                libcfs_nid2str(msg->msg_target.nid),
1144                msg->msg_target_is_router ? "(rtr)" : "");
1145
1146         if ((msg->msg_target.pid & LNET_PID_USERFLAG) != 0) {
1147                 CERROR("Can't send to non-kernel peer %s\n",
1148                        libcfs_id2str(msg->msg_target));
1149                 return -EHOSTUNREACH;
1150         }
1151
1152         plp = ptllnd_find_peer(ni, msg->msg_target, 1);
1153         if (plp == NULL)
1154                 return -ENOMEM;
1155
1156         switch (msg->msg_type) {
1157         default:
1158                 LBUG();
1159
1160         case LNET_MSG_ACK:
1161                 LASSERT (msg->msg_len == 0);
1162                 break;                          /* send IMMEDIATE */
1163
1164         case LNET_MSG_GET:
1165                 if (msg->msg_target_is_router)
1166                         break;                  /* send IMMEDIATE */
1167
1168                 nob = msg->msg_md->md_length;
1169                 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[nob]);
1170                 if (nob <= plni->plni_max_msg_size)
1171                         break;
1172
1173                 LASSERT ((msg->msg_md->md_options & LNET_MD_KIOV) == 0);
1174                 rc = ptllnd_passive_rdma(plp, PTLLND_MSG_TYPE_GET, msg,
1175                                          msg->msg_md->md_niov,
1176                                          msg->msg_md->md_iov.iov,
1177                                          0, msg->msg_md->md_length);
1178                 ptllnd_peer_decref(plp);
1179                 return rc;
1180
1181         case LNET_MSG_REPLY:
1182         case LNET_MSG_PUT:
1183                 nob = msg->msg_len;
1184                 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[nob]);
1185                 if (nob <= plp->plp_max_msg_size)
1186                         break;                  /* send IMMEDIATE */
1187
1188                 rc = ptllnd_passive_rdma(plp, PTLLND_MSG_TYPE_PUT, msg,
1189                                          msg->msg_niov, msg->msg_iov,
1190                                          msg->msg_offset, msg->msg_len);
1191                 ptllnd_peer_decref(plp);
1192                 return rc;
1193         }
1194
1195         /* send IMMEDIATE
1196          * NB copy the payload so we don't have to do a fragmented send */
1197
1198         tx = ptllnd_new_tx(plp, PTLLND_MSG_TYPE_IMMEDIATE, msg->msg_len);
1199         if (tx == NULL) {
1200                 CERROR("Can't allocate tx for lnet type %d to %s\n",
1201                        msg->msg_type, libcfs_id2str(msg->msg_target));
1202                 ptllnd_peer_decref(plp);
1203                 return -ENOMEM;
1204         }
1205
1206         lnet_copy_iov2flat(tx->tx_msgsize, &tx->tx_msg,
1207                            offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload),
1208                            msg->msg_niov, msg->msg_iov, msg->msg_offset,
1209                            msg->msg_len);
1210         tx->tx_msg.ptlm_u.immediate.kptlim_hdr = msg->msg_hdr;
1211
1212         tx->tx_lnetmsg = msg;
1213         PTLLND_HISTORY("%s[%d/%d+%d(%d)]: post immediate %s p %d %p",
1214                        libcfs_id2str(msg->msg_target),
1215                        plp->plp_credits, plp->plp_outstanding_credits,
1216                        plp->plp_sent_credits,
1217                        plni->plni_peer_credits + plp->plp_lazy_credits,
1218                        lnet_msgtyp2str(msg->msg_type),
1219                        (le32_to_cpu(msg->msg_type) == LNET_MSG_PUT) ? 
1220                        le32_to_cpu(msg->msg_hdr.msg.put.ptl_index) :
1221                        (le32_to_cpu(msg->msg_type) == LNET_MSG_GET) ? 
1222                        le32_to_cpu(msg->msg_hdr.msg.get.ptl_index) : -1,
1223                        tx);
1224         ptllnd_post_tx(tx);
1225         ptllnd_peer_decref(plp);
1226         return 0;
1227 }
1228
1229 void
1230 ptllnd_rx_done(ptllnd_rx_t *rx)
1231 {
1232         ptllnd_peer_t *plp = rx->rx_peer;
1233         ptllnd_ni_t   *plni = plp->plp_ni->ni_data;
1234
1235         plp->plp_outstanding_credits++;
1236
1237         PTLLND_HISTORY("%s[%d/%d+%d(%d)]: rx=%p done\n",
1238                        libcfs_id2str(plp->plp_id),
1239                        plp->plp_credits, plp->plp_outstanding_credits, 
1240                        plp->plp_sent_credits,
1241                        plni->plni_peer_credits + plp->plp_lazy_credits, rx);
1242
1243         ptllnd_check_sends(plp);
1244
1245         LASSERT (plni->plni_nrxs > 0);
1246         plni->plni_nrxs--;
1247 }
1248
1249 int
1250 ptllnd_eager_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg,
1251                   void **new_privatep)
1252 {
1253         /* Shouldn't get here; recvs only block for router buffers */
1254         LBUG();
1255         return 0;
1256 }
1257
1258 int
1259 ptllnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg,
1260             int delayed, unsigned int niov,
1261             struct iovec *iov, lnet_kiov_t *kiov,
1262             unsigned int offset, unsigned int mlen, unsigned int rlen)
1263 {
1264         ptllnd_rx_t    *rx = private;
1265         int             rc = 0;
1266         int             nob;
1267
1268         LASSERT (kiov == NULL);
1269         LASSERT (niov <= PTL_MD_MAX_IOV);       /* !!! */
1270
1271         switch (rx->rx_msg->ptlm_type) {
1272         default:
1273                 LBUG();
1274
1275         case PTLLND_MSG_TYPE_IMMEDIATE:
1276                 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[mlen]);
1277                 if (nob > rx->rx_nob) {
1278                         CERROR("Immediate message from %s too big: %d(%d)\n",
1279                                libcfs_id2str(rx->rx_peer->plp_id),
1280                                nob, rx->rx_nob);
1281                         rc = -EPROTO;
1282                         break;
1283                 }
1284                 lnet_copy_flat2iov(niov, iov, offset,
1285                                    rx->rx_nob, rx->rx_msg,
1286                                    offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload),
1287                                    mlen);
1288                 lnet_finalize(ni, msg, 0);
1289                 break;
1290
1291         case PTLLND_MSG_TYPE_PUT:
1292                 rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_READ, msg,
1293                                         rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,
1294                                         niov, iov, offset, mlen);
1295                 break;
1296
1297         case PTLLND_MSG_TYPE_GET:
1298                 if (msg != NULL)
1299                         rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_WRITE, msg,
1300                                                 rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,
1301                                                 msg->msg_niov, msg->msg_iov,
1302                                                 msg->msg_offset, msg->msg_len);
1303                 else
1304                         rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_WRITE, NULL,
1305                                                 rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,
1306                                                 0, NULL, 0, 0);
1307                 break;
1308         }
1309
1310         ptllnd_rx_done(rx);
1311         return rc;
1312 }
1313
1314 void
1315 ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator,
1316                      kptl_msg_t *msg, unsigned int nob)
1317 {
1318         ptllnd_ni_t      *plni = ni->ni_data;
1319         const int         basenob = offsetof(kptl_msg_t, ptlm_u);
1320         lnet_process_id_t srcid;
1321         ptllnd_rx_t       rx;
1322         int               flip;
1323         __u16             msg_version;
1324         __u32             msg_cksum;
1325         ptllnd_peer_t    *plp;
1326         int               rc;
1327
1328         if (nob < 6) {
1329                 CERROR("Very short receive from %s\n",
1330                        ptllnd_ptlid2str(initiator));
1331                 return;
1332         }
1333
1334         /* I can at least read MAGIC/VERSION */
1335
1336         flip = msg->ptlm_magic == __swab32(PTLLND_MSG_MAGIC);
1337         if (!flip && msg->ptlm_magic != PTLLND_MSG_MAGIC) {
1338                 CERROR("Bad protocol magic %08x from %s\n", 
1339                        msg->ptlm_magic, ptllnd_ptlid2str(initiator));
1340                 return;
1341         }
1342
1343         msg_version = flip ? __swab16(msg->ptlm_version) : msg->ptlm_version;
1344
1345         if (msg_version != PTLLND_MSG_VERSION) {
1346                 CERROR("Bad protocol version %04x from %s: %04x expected\n",
1347                        (__u32)msg_version, ptllnd_ptlid2str(initiator), PTLLND_MSG_VERSION);
1348
1349                 if (plni->plni_abort_on_protocol_mismatch)
1350                         abort();
1351
1352                 return;
1353         }
1354
1355         if (nob < basenob) {
1356                 CERROR("Short receive from %s: got %d, wanted at least %d\n",
1357                        ptllnd_ptlid2str(initiator), nob, basenob);
1358                 return;
1359         }
1360
1361         /* checksum must be computed with
1362          * 1) ptlm_cksum zero and
1363          * 2) BEFORE anything gets modified/flipped
1364          */
1365         msg_cksum = flip ? __swab32(msg->ptlm_cksum) : msg->ptlm_cksum;
1366         msg->ptlm_cksum = 0;
1367         if (msg_cksum != 0 &&
1368             msg_cksum != ptllnd_cksum(msg, offsetof(kptl_msg_t, ptlm_u))) {
1369                 CERROR("Bad checksum from %s\n", ptllnd_ptlid2str(initiator));
1370                 return;
1371         }
1372
1373         msg->ptlm_version = msg_version;
1374         msg->ptlm_cksum = msg_cksum;
1375
1376         if (flip) {
1377                 /* NB stamps are opaque cookies */
1378                 __swab32s(&msg->ptlm_nob);
1379                 __swab64s(&msg->ptlm_srcnid);
1380                 __swab64s(&msg->ptlm_dstnid);
1381                 __swab32s(&msg->ptlm_srcpid);
1382                 __swab32s(&msg->ptlm_dstpid);
1383         }
1384
1385         srcid.nid = msg->ptlm_srcnid;
1386         srcid.pid = msg->ptlm_srcpid;
1387
1388         if (LNET_NIDNET(msg->ptlm_srcnid) != LNET_NIDNET(ni->ni_nid)) {
1389                 CERROR("Bad source id %s from %s\n",
1390                        libcfs_id2str(srcid),
1391                        ptllnd_ptlid2str(initiator));
1392                 return;
1393         }
1394
1395         if (msg->ptlm_type == PTLLND_MSG_TYPE_NAK) {
1396                 CERROR("NAK from %s (%s)\n",
1397                        libcfs_id2str(srcid),
1398                        ptllnd_ptlid2str(initiator));
1399
1400                 if (plni->plni_dump_on_nak)
1401                         ptllnd_dump_debug(ni, srcid);
1402
1403                 if (plni->plni_abort_on_nak)
1404                         abort();
1405
1406                 plp = ptllnd_find_peer(ni, srcid, 0);
1407                 if (plp == NULL) {
1408                         CERROR("Ignore NAK from %s: no peer\n", libcfs_id2str(srcid));
1409                         return;
1410                 }
1411                 ptllnd_close_peer(plp, -EPROTO);
1412                 ptllnd_peer_decref(plp);
1413                 return;
1414         }
1415
1416         if (msg->ptlm_dstnid != ni->ni_nid ||
1417             msg->ptlm_dstpid != the_lnet.ln_pid) {
1418                 CERROR("Bad dstid %s (%s expected) from %s\n",
1419                        libcfs_id2str((lnet_process_id_t) {
1420                                .nid = msg->ptlm_dstnid,
1421                                .pid = msg->ptlm_dstpid}),
1422                        libcfs_id2str((lnet_process_id_t) {
1423                                .nid = ni->ni_nid,
1424                                .pid = the_lnet.ln_pid}),
1425                        libcfs_id2str(srcid));
1426                 return;
1427         }
1428
1429         if (msg->ptlm_dststamp != plni->plni_stamp) {
1430                 CERROR("Bad dststamp "LPX64"("LPX64" expected) from %s\n",
1431                        msg->ptlm_dststamp, plni->plni_stamp,
1432                        libcfs_id2str(srcid));
1433                 return;
1434         }
1435
1436         PTLLND_HISTORY("RX %s: %s %d %p", libcfs_id2str(srcid), 
1437                        ptllnd_msgtype2str(msg->ptlm_type),
1438                        msg->ptlm_credits, &rx);
1439
1440         switch (msg->ptlm_type) {
1441         case PTLLND_MSG_TYPE_PUT:
1442         case PTLLND_MSG_TYPE_GET:
1443                 if (nob < basenob + sizeof(kptl_rdma_msg_t)) {
1444                         CERROR("Short rdma request from %s(%s)\n",
1445                                libcfs_id2str(srcid),
1446                                ptllnd_ptlid2str(initiator));
1447                         return;
1448                 }
1449                 if (flip)
1450                         __swab64s(&msg->ptlm_u.rdma.kptlrm_matchbits);
1451                 break;
1452
1453         case PTLLND_MSG_TYPE_IMMEDIATE:
1454                 if (nob < offsetof(kptl_msg_t,
1455                                    ptlm_u.immediate.kptlim_payload)) {
1456                         CERROR("Short immediate from %s(%s)\n",
1457                                libcfs_id2str(srcid),
1458                                ptllnd_ptlid2str(initiator));
1459                         return;
1460                 }
1461                 break;
1462
1463         case PTLLND_MSG_TYPE_HELLO:
1464                 if (nob < basenob + sizeof(kptl_hello_msg_t)) {
1465                         CERROR("Short hello from %s(%s)\n",
1466                                libcfs_id2str(srcid),
1467                                ptllnd_ptlid2str(initiator));
1468                         return;
1469                 }
1470                 if(flip){
1471                         __swab64s(&msg->ptlm_u.hello.kptlhm_matchbits);
1472                         __swab32s(&msg->ptlm_u.hello.kptlhm_max_msg_size);
1473                 }
1474                 break;
1475
1476         case PTLLND_MSG_TYPE_NOOP:
1477                 break;
1478
1479         default:
1480                 CERROR("Bad message type %d from %s(%s)\n", msg->ptlm_type,
1481                        libcfs_id2str(srcid),
1482                        ptllnd_ptlid2str(initiator));
1483                 return;
1484         }
1485
1486         plp = ptllnd_find_peer(ni, srcid, 0);
1487         if (plp == NULL) {
1488                 CERROR("Can't find peer %s\n", libcfs_id2str(srcid));
1489                 return;
1490         }
1491
1492         if (msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {
1493                 if (plp->plp_recvd_hello) {
1494                         CERROR("Unexpected HELLO from %s\n",
1495                                libcfs_id2str(srcid));
1496                         ptllnd_peer_decref(plp);
1497                         return;
1498                 }
1499
1500                 plp->plp_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size;
1501                 plp->plp_match = msg->ptlm_u.hello.kptlhm_matchbits;
1502                 plp->plp_stamp = msg->ptlm_srcstamp;
1503                 plp->plp_recvd_hello = 1;
1504
1505         } else if (!plp->plp_recvd_hello) {
1506
1507                 CERROR("Bad message type %d (HELLO expected) from %s\n",
1508                        msg->ptlm_type, libcfs_id2str(srcid));
1509                 ptllnd_peer_decref(plp);
1510                 return;
1511
1512         } else if (msg->ptlm_srcstamp != plp->plp_stamp) {
1513
1514                 CERROR("Bad srcstamp "LPX64"("LPX64" expected) from %s\n",
1515                        msg->ptlm_srcstamp, plp->plp_stamp,
1516                        libcfs_id2str(srcid));
1517                 ptllnd_peer_decref(plp);
1518                 return;
1519         }
1520
1521         /* Check peer only sends when I've sent her credits */
1522         if (plp->plp_sent_credits == 0) {
1523                 CERROR("%s[%d/%d+%d(%d)]: unexpected message\n",
1524                        libcfs_id2str(plp->plp_id),
1525                        plp->plp_credits, plp->plp_outstanding_credits,
1526                        plp->plp_sent_credits,
1527                        plni->plni_peer_credits + plp->plp_lazy_credits);
1528                 return;
1529         }
1530         plp->plp_sent_credits--;
1531
1532         /* No check for credit overflow - the peer may post new buffers after
1533          * the startup handshake. */
1534         plp->plp_credits += msg->ptlm_credits;
1535
1536         /* All OK so far; assume the message is good... */
1537
1538         rx.rx_peer      = plp;
1539         rx.rx_msg       = msg;
1540         rx.rx_nob       = nob;
1541         plni->plni_nrxs++;
1542
1543         switch (msg->ptlm_type) {
1544         default: /* message types have been checked already */
1545                 ptllnd_rx_done(&rx);
1546                 break;
1547
1548         case PTLLND_MSG_TYPE_PUT:
1549         case PTLLND_MSG_TYPE_GET:
1550                 rc = lnet_parse(ni, &msg->ptlm_u.rdma.kptlrm_hdr,
1551                                 msg->ptlm_srcnid, &rx, 1);
1552                 if (rc < 0)
1553                         ptllnd_rx_done(&rx);
1554                 break;
1555
1556         case PTLLND_MSG_TYPE_IMMEDIATE:
1557                 rc = lnet_parse(ni, &msg->ptlm_u.immediate.kptlim_hdr,
1558                                 msg->ptlm_srcnid, &rx, 0);
1559                 if (rc < 0)
1560                         ptllnd_rx_done(&rx);
1561                 break;
1562         }
1563
1564         if (msg->ptlm_credits > 0)
1565                 ptllnd_check_sends(plp);
1566
1567         ptllnd_peer_decref(plp);
1568 }
1569
1570 void
1571 ptllnd_buf_event (lnet_ni_t *ni, ptl_event_t *event)
1572 {
1573         ptllnd_buffer_t *buf = ptllnd_eventarg2obj(event->md.user_ptr);
1574         ptllnd_ni_t     *plni = ni->ni_data;
1575         char            *msg = &buf->plb_buffer[event->offset];
1576         int              repost;
1577         int              unlinked = event->type == PTL_EVENT_UNLINK;
1578
1579         LASSERT (buf->plb_ni == ni);
1580         LASSERT (event->type == PTL_EVENT_PUT_END ||
1581                  event->type == PTL_EVENT_UNLINK);
1582
1583         if (event->ni_fail_type != PTL_NI_OK) {
1584
1585                 CERROR("event type %s(%d), status %s(%d) from %s\n",
1586                        ptllnd_evtype2str(event->type), event->type,
1587                        ptllnd_errtype2str(event->ni_fail_type), 
1588                        event->ni_fail_type,
1589                        ptllnd_ptlid2str(event->initiator));
1590
1591         } else if (event->type == PTL_EVENT_PUT_END) {
1592 #if (PTL_MD_LOCAL_ALIGN8 == 0)
1593                 /* Portals can't force message alignment - someone sending an
1594                  * odd-length message could misalign subsequent messages */
1595                 if ((event->mlength & 7) != 0) {
1596                         CERROR("Message from %s has odd length "LPU64
1597                                " probable version incompatibility\n",
1598                                ptllnd_ptlid2str(event->initiator),
1599                                event->mlength);
1600                         LBUG();
1601                 }
1602 #endif
1603                 LASSERT ((event->offset & 7) == 0);
1604
1605                 ptllnd_parse_request(ni, event->initiator,
1606                                      (kptl_msg_t *)msg, event->mlength);
1607         }
1608
1609 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
1610         /* UNLINK event only on explicit unlink */
1611         repost = (event->unlinked && event->type != PTL_EVENT_UNLINK);
1612         if (event->unlinked)
1613                 unlinked = 1;
1614 #else
1615         /* UNLINK event only on implicit unlink */
1616         repost = (event->type == PTL_EVENT_UNLINK);
1617 #endif
1618
1619         if (unlinked) {
1620                 LASSERT(buf->plb_posted);
1621                 buf->plb_posted = 0;
1622                 plni->plni_nposted_buffers--;
1623         }
1624
1625         if (repost)
1626                 (void) ptllnd_post_buffer(buf);
1627 }
1628
1629 void
1630 ptllnd_tx_event (lnet_ni_t *ni, ptl_event_t *event)
1631 {
1632         ptllnd_ni_t *plni = ni->ni_data;
1633         ptllnd_tx_t *tx = ptllnd_eventarg2obj(event->md.user_ptr);
1634         int          error = (event->ni_fail_type != PTL_NI_OK);
1635         int          isreq;
1636         int          isbulk;
1637 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
1638         int          unlinked = event->unlinked;
1639 #else
1640         int          unlinked = (event->type == PTL_EVENT_UNLINK);
1641 #endif
1642
1643         if (error)
1644                 CERROR("Error %s(%d) event %s(%d) unlinked %d, %s(%d) for %s\n",
1645                        ptllnd_errtype2str(event->ni_fail_type),
1646                        event->ni_fail_type,
1647                        ptllnd_evtype2str(event->type), event->type,
1648                        unlinked, ptllnd_msgtype2str(tx->tx_type), tx->tx_type,
1649                        libcfs_id2str(tx->tx_peer->plp_id));
1650
1651         LASSERT (!PtlHandleIsEqual(event->md_handle, PTL_INVALID_HANDLE));
1652
1653         isreq = PtlHandleIsEqual(event->md_handle, tx->tx_reqmdh);
1654         if (isreq) {
1655                 LASSERT (event->md.start == (void *)&tx->tx_msg);
1656                 if (unlinked) {
1657                         tx->tx_reqmdh = PTL_INVALID_HANDLE;
1658                         gettimeofday(&tx->tx_req_done, NULL);
1659                 }
1660         }
1661
1662         isbulk = PtlHandleIsEqual(event->md_handle, tx->tx_bulkmdh);
1663         if ( isbulk && unlinked ) {
1664                 tx->tx_bulkmdh = PTL_INVALID_HANDLE;
1665                 gettimeofday(&tx->tx_bulk_done, NULL);
1666         }
1667
1668         LASSERT (!isreq != !isbulk);            /* always one and only 1 match */
1669
1670         PTLLND_HISTORY("%s[%d/%d+%d(%d)]: TX done %p %s%s",
1671                        libcfs_id2str(tx->tx_peer->plp_id),
1672                        tx->tx_peer->plp_credits,
1673                        tx->tx_peer->plp_outstanding_credits,
1674                        tx->tx_peer->plp_sent_credits,
1675                        plni->plni_peer_credits + tx->tx_peer->plp_lazy_credits,
1676                        tx, isreq ? "REQ" : "BULK", unlinked ? "(unlinked)" : "");
1677
1678         LASSERT (!isreq != !isbulk);            /* always one and only 1 match */
1679         switch (tx->tx_type) {
1680         default:
1681                 LBUG();
1682
1683         case PTLLND_MSG_TYPE_NOOP:
1684         case PTLLND_MSG_TYPE_HELLO:
1685         case PTLLND_MSG_TYPE_IMMEDIATE:
1686                 LASSERT (event->type == PTL_EVENT_UNLINK ||
1687                          event->type == PTL_EVENT_SEND_END);
1688                 LASSERT (isreq);
1689                 break;
1690
1691         case PTLLND_MSG_TYPE_GET:
1692                 LASSERT (event->type == PTL_EVENT_UNLINK ||
1693                          (isreq && event->type == PTL_EVENT_SEND_END) ||
1694                          (isbulk && event->type == PTL_EVENT_PUT_END));
1695
1696                 if (isbulk && !error && event->type == PTL_EVENT_PUT_END) {
1697                         /* Check GET matched */
1698                         if (event->hdr_data == PTLLND_RDMA_OK) {
1699                                 lnet_set_reply_msg_len(ni, 
1700                                                        tx->tx_lnetreplymsg,
1701                                                        event->mlength);
1702                         } else {
1703                                 CERROR ("Unmatched GET with %s\n",
1704                                         libcfs_id2str(tx->tx_peer->plp_id));
1705                                 tx->tx_status = -EIO;
1706                         }
1707                 }
1708                 break;
1709
1710         case PTLLND_MSG_TYPE_PUT:
1711                 LASSERT (event->type == PTL_EVENT_UNLINK ||
1712                          (isreq && event->type == PTL_EVENT_SEND_END) ||
1713                          (isbulk && event->type == PTL_EVENT_GET_END));
1714                 break;
1715
1716         case PTLLND_RDMA_READ:
1717                 LASSERT (event->type == PTL_EVENT_UNLINK ||
1718                          event->type == PTL_EVENT_SEND_END ||
1719                          event->type == PTL_EVENT_REPLY_END);
1720                 LASSERT (isbulk);
1721                 break;
1722
1723         case PTLLND_RDMA_WRITE:
1724                 LASSERT (event->type == PTL_EVENT_UNLINK ||
1725                          event->type == PTL_EVENT_SEND_END);
1726                 LASSERT (isbulk);
1727         }
1728
1729         /* Schedule ptllnd_tx_done() on error or last completion event */
1730         if (error ||
1731             (PtlHandleIsEqual(tx->tx_bulkmdh, PTL_INVALID_HANDLE) &&
1732              PtlHandleIsEqual(tx->tx_reqmdh, PTL_INVALID_HANDLE))) {
1733                 if (error)
1734                         tx->tx_status = -EIO;
1735                 cfs_list_del(&tx->tx_list);
1736                 cfs_list_add_tail(&tx->tx_list, &plni->plni_zombie_txs);
1737         }
1738 }
1739
1740 ptllnd_tx_t *
1741 ptllnd_find_timed_out_tx(ptllnd_peer_t *peer)
1742 {
1743         time_t            now = cfs_time_current_sec();
1744         ptllnd_tx_t *tx;
1745
1746         cfs_list_for_each_entry (tx, &peer->plp_txq, tx_list) {
1747                 if (tx->tx_deadline < now)
1748                         return tx;
1749         }
1750
1751         cfs_list_for_each_entry (tx, &peer->plp_noopq, tx_list) {
1752                 if (tx->tx_deadline < now)
1753                         return tx;
1754         }
1755
1756         cfs_list_for_each_entry (tx, &peer->plp_activeq, tx_list) {
1757                 if (tx->tx_deadline < now)
1758                         return tx;
1759         }
1760
1761         return NULL;
1762 }
1763
1764 void
1765 ptllnd_check_peer(ptllnd_peer_t *peer)
1766 {
1767         ptllnd_tx_t *tx = ptllnd_find_timed_out_tx(peer);
1768
1769         if (tx == NULL)
1770                 return;
1771
1772         CERROR("%s (sent %d recvd %d, credits %d/%d/%d/%d/%d): timed out %p %p\n",
1773                libcfs_id2str(peer->plp_id), peer->plp_sent_hello, peer->plp_recvd_hello,
1774                peer->plp_credits, peer->plp_outstanding_credits,
1775                peer->plp_sent_credits, peer->plp_lazy_credits,
1776                peer->plp_extra_lazy_credits, tx, tx->tx_lnetmsg);
1777         ptllnd_debug_tx(tx);
1778         ptllnd_close_peer(peer, -ETIMEDOUT);
1779 }
1780
1781 void
1782 ptllnd_watchdog (lnet_ni_t *ni, time_t now)
1783 {
1784         ptllnd_ni_t      *plni = ni->ni_data;
1785         const int         n = 4;
1786         int               p = plni->plni_watchdog_interval;
1787         int               chunk = plni->plni_peer_hash_size;
1788         int               interval = now - (plni->plni_watchdog_nextt - p);
1789         int               i;
1790         cfs_list_t       *hashlist;
1791         cfs_list_t       *tmp;
1792         cfs_list_t       *nxt;
1793
1794         /* Time to check for RDMA timeouts on a few more peers:
1795          * I try to do checks every 'p' seconds on a proportion of the peer
1796          * table and I need to check every connection 'n' times within a
1797          * timeout interval, to ensure I detect a timeout on any connection
1798          * within (n+1)/n times the timeout interval. */
1799
1800         LASSERT (now >= plni->plni_watchdog_nextt);
1801
1802         if (plni->plni_timeout > n * interval) { /* Scan less than the whole table? */
1803                 chunk = (chunk * n * interval) / plni->plni_timeout;
1804                 if (chunk == 0)
1805                         chunk = 1;
1806         }
1807
1808         for (i = 0; i < chunk; i++) {
1809                 hashlist = &plni->plni_peer_hash[plni->plni_watchdog_peeridx];
1810
1811                 cfs_list_for_each_safe(tmp, nxt, hashlist) {
1812                         ptllnd_check_peer(cfs_list_entry(tmp, ptllnd_peer_t,
1813                                           plp_list));
1814                 }
1815
1816                 plni->plni_watchdog_peeridx = (plni->plni_watchdog_peeridx + 1) %
1817                                               plni->plni_peer_hash_size;
1818         }
1819
1820         plni->plni_watchdog_nextt = now + p;
1821 }
1822
1823 void
1824 ptllnd_wait (lnet_ni_t *ni, int milliseconds)
1825 {
1826         static struct timeval  prevt;
1827         static int             prevt_count;
1828         static int             call_count;
1829
1830         struct timeval         start;
1831         struct timeval         then;
1832         struct timeval         now;
1833         struct timeval         deadline;
1834
1835         ptllnd_ni_t   *plni = ni->ni_data;
1836         ptllnd_tx_t   *tx;
1837         ptl_event_t    event;
1838         int            which;
1839         int            rc;
1840         int            found = 0;
1841         int            timeout = 0;
1842
1843         /* Handle any currently queued events, returning immediately if any.
1844          * Otherwise block for the timeout and handle all events queued
1845          * then. */
1846
1847         gettimeofday(&start, NULL);
1848         call_count++;
1849
1850         if (milliseconds <= 0) {
1851                 deadline = start;
1852         } else {
1853                 deadline.tv_sec  = start.tv_sec  +  milliseconds/1000;
1854                 deadline.tv_usec = start.tv_usec + (milliseconds % 1000)*1000;
1855
1856                 if (deadline.tv_usec >= 1000000) {
1857                         start.tv_usec -= 1000000;
1858                         start.tv_sec++;
1859                 }
1860         }
1861
1862         for (;;) {
1863                 gettimeofday(&then, NULL);
1864
1865                 rc = PtlEQPoll(&plni->plni_eqh, 1, timeout, &event, &which);
1866
1867                 gettimeofday(&now, NULL);
1868
1869                 if ((now.tv_sec*1000 + now.tv_usec/1000) - 
1870                     (then.tv_sec*1000 + then.tv_usec/1000) > timeout + 1000) {
1871                         /* 1000 mS grace...........................^ */
1872                         CERROR("SLOW PtlEQPoll(%d): %dmS elapsed\n", timeout,
1873                                (int)(now.tv_sec*1000 + now.tv_usec/1000) - 
1874                                (int)(then.tv_sec*1000 + then.tv_usec/1000));
1875                 }
1876
1877                 if (rc == PTL_EQ_EMPTY) {
1878                         if (found)              /* handled some events */
1879                                 break;
1880
1881                         if (now.tv_sec >= plni->plni_watchdog_nextt) { /* check timeouts? */
1882                                 ptllnd_watchdog(ni, now.tv_sec);
1883                                 LASSERT (now.tv_sec < plni->plni_watchdog_nextt);
1884                         }
1885
1886                         if (now.tv_sec > deadline.tv_sec || /* timeout expired */
1887                             (now.tv_sec == deadline.tv_sec &&
1888                              now.tv_usec >= deadline.tv_usec))
1889                                 break;
1890
1891                         if (milliseconds < 0 ||
1892                             plni->plni_watchdog_nextt <= deadline.tv_sec)  {
1893                                 timeout = (plni->plni_watchdog_nextt - now.tv_sec)*1000;
1894                         } else {
1895                                 timeout = (deadline.tv_sec - now.tv_sec)*1000 +
1896                                           (deadline.tv_usec - now.tv_usec)/1000;
1897                         }
1898
1899                         continue;
1900                 }
1901
1902                 LASSERT (rc == PTL_OK || rc == PTL_EQ_DROPPED);
1903
1904                 if (rc == PTL_EQ_DROPPED)
1905                         CERROR("Event queue: size %d is too small\n",
1906                                plni->plni_eq_size);
1907
1908                 timeout = 0;
1909                 found = 1;
1910
1911                 switch (ptllnd_eventarg2type(event.md.user_ptr)) {
1912                 default:
1913                         LBUG();
1914
1915                 case PTLLND_EVENTARG_TYPE_TX:
1916                         ptllnd_tx_event(ni, &event);
1917                         break;
1918
1919                 case PTLLND_EVENTARG_TYPE_BUF:
1920                         ptllnd_buf_event(ni, &event);
1921                         break;
1922                 }
1923         }
1924
1925         while (!cfs_list_empty(&plni->plni_zombie_txs)) {
1926                 tx = cfs_list_entry(plni->plni_zombie_txs.next,
1927                                 ptllnd_tx_t, tx_list);
1928                 cfs_list_del_init(&tx->tx_list);
1929                 ptllnd_tx_done(tx);
1930         }
1931
1932         if (prevt.tv_sec == 0 ||
1933             prevt.tv_sec != now.tv_sec) {
1934                 PTLLND_HISTORY("%d wait entered at %d.%06d - prev %d %d.%06d", 
1935                                call_count, (int)start.tv_sec, (int)start.tv_usec,
1936                                prevt_count, (int)prevt.tv_sec, (int)prevt.tv_usec);
1937                 prevt = now;
1938         }
1939 }