Whamcloud - gitweb
b=13490,i=maxim:
[fs/lustre-release.git] / lnet / ulnds / ptllnd / ptllnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/ulnds/ptllnd/ptllnd_cb.c
37  *
38  * Author: Eric Barton <eeb@bartonsoftware.com>
39  */
40
41 #include "ptllnd.h"
42
43 void
44 ptllnd_set_tx_deadline(ptllnd_tx_t *tx)
45 {
46         ptllnd_peer_t  *peer = tx->tx_peer;
47         lnet_ni_t      *ni = peer->plp_ni;
48         ptllnd_ni_t    *plni = ni->ni_data;
49
50         tx->tx_deadline = cfs_time_current_sec() + plni->plni_timeout;
51 }
52
53 void
54 ptllnd_post_tx(ptllnd_tx_t *tx)
55 {
56         ptllnd_peer_t  *peer = tx->tx_peer;
57
58         LASSERT (tx->tx_type != PTLLND_MSG_TYPE_NOOP);
59
60         ptllnd_set_tx_deadline(tx);
61         list_add_tail(&tx->tx_list, &peer->plp_txq);
62         ptllnd_check_sends(peer);
63 }
64
65 char *
66 ptllnd_ptlid2str(ptl_process_id_t id)
67 {
68         static char strs[8][32];
69         static int  idx = 0;
70
71         char   *str = strs[idx++];
72
73         if (idx >= sizeof(strs)/sizeof(strs[0]))
74                 idx = 0;
75
76         snprintf(str, sizeof(strs[0]), FMT_PTLID, id.pid, id.nid);
77         return str;
78 }
79
80 void
81 ptllnd_destroy_peer(ptllnd_peer_t *peer)
82 {
83         lnet_ni_t         *ni = peer->plp_ni;
84         ptllnd_ni_t       *plni = ni->ni_data;
85         int                nmsg = peer->plp_lazy_credits +
86                                   plni->plni_peer_credits;
87
88         ptllnd_size_buffers(ni, -nmsg);
89
90         LASSERT (peer->plp_closing);
91         LASSERT (plni->plni_npeers > 0);
92         LASSERT (list_empty(&peer->plp_txq));
93         LASSERT (list_empty(&peer->plp_noopq));
94         LASSERT (list_empty(&peer->plp_activeq));
95         plni->plni_npeers--;
96         LIBCFS_FREE(peer, sizeof(*peer));
97 }
98
99 void
100 ptllnd_abort_txs(ptllnd_ni_t *plni, struct list_head *q)
101 {
102         while (!list_empty(q)) {
103                 ptllnd_tx_t *tx = list_entry(q->next, ptllnd_tx_t, tx_list);
104
105                 tx->tx_status = -ESHUTDOWN;
106                 list_del(&tx->tx_list);
107                 list_add_tail(&tx->tx_list, &plni->plni_zombie_txs);
108         }
109 }
110
111 void
112 ptllnd_close_peer(ptllnd_peer_t *peer, int error)
113 {
114         lnet_ni_t   *ni = peer->plp_ni;
115         ptllnd_ni_t *plni = ni->ni_data;
116
117         if (peer->plp_closing)
118                 return;
119
120         peer->plp_closing = 1;
121
122         if (!list_empty(&peer->plp_txq) ||
123             !list_empty(&peer->plp_noopq) ||
124             !list_empty(&peer->plp_activeq) ||
125             error != 0) {
126                 CWARN("Closing %s\n", libcfs_id2str(peer->plp_id));
127                 if (plni->plni_debug)
128                         ptllnd_dump_debug(ni, peer->plp_id);
129         }
130
131         ptllnd_abort_txs(plni, &peer->plp_txq);
132         ptllnd_abort_txs(plni, &peer->plp_noopq);
133         ptllnd_abort_txs(plni, &peer->plp_activeq);
134
135         list_del(&peer->plp_list);
136         ptllnd_peer_decref(peer);
137 }
138
139 ptllnd_peer_t *
140 ptllnd_find_peer(lnet_ni_t *ni, lnet_process_id_t id, int create)
141 {
142         ptllnd_ni_t       *plni = ni->ni_data;
143         unsigned int       hash = LNET_NIDADDR(id.nid) % plni->plni_peer_hash_size;
144         ptllnd_peer_t     *plp;
145         ptllnd_tx_t       *tx;
146         int                rc;
147
148         LASSERT (LNET_NIDNET(id.nid) == LNET_NIDNET(ni->ni_nid));
149
150         list_for_each_entry (plp, &plni->plni_peer_hash[hash], plp_list) {
151                 if (plp->plp_id.nid == id.nid &&
152                     plp->plp_id.pid == id.pid) {
153                         ptllnd_peer_addref(plp);
154                         return plp;
155                 }
156         }
157
158         if (!create)
159                 return NULL;
160
161         /* New peer: check first for enough posted buffers */
162         plni->plni_npeers++;
163         rc = ptllnd_size_buffers(ni, plni->plni_peer_credits);
164         if (rc != 0) {
165                 plni->plni_npeers--;
166                 return NULL;
167         }
168
169         LIBCFS_ALLOC(plp, sizeof(*plp));
170         if (plp == NULL) {
171                 CERROR("Can't allocate new peer %s\n", libcfs_id2str(id));
172                 plni->plni_npeers--;
173                 ptllnd_size_buffers(ni, -plni->plni_peer_credits);
174                 return NULL;
175         }
176
177         plp->plp_ni = ni;
178         plp->plp_id = id;
179         plp->plp_ptlid.nid = LNET_NIDADDR(id.nid);
180         plp->plp_ptlid.pid = plni->plni_ptllnd_pid;
181         plp->plp_credits = 1; /* add more later when she gives me credits */
182         plp->plp_max_msg_size = plni->plni_max_msg_size; /* until I hear from her */
183         plp->plp_sent_credits = 1;              /* Implicit credit for HELLO */
184         plp->plp_outstanding_credits = plni->plni_peer_credits - 1;
185         plp->plp_lazy_credits = 0;
186         plp->plp_extra_lazy_credits = 0;
187         plp->plp_match = 0;
188         plp->plp_stamp = 0;
189         plp->plp_sent_hello = 0;
190         plp->plp_recvd_hello = 0;
191         plp->plp_closing = 0;
192         plp->plp_refcount = 1;
193         CFS_INIT_LIST_HEAD(&plp->plp_list);
194         CFS_INIT_LIST_HEAD(&plp->plp_txq);
195         CFS_INIT_LIST_HEAD(&plp->plp_noopq);
196         CFS_INIT_LIST_HEAD(&plp->plp_activeq);
197
198         ptllnd_peer_addref(plp);
199         list_add_tail(&plp->plp_list, &plni->plni_peer_hash[hash]);
200
201         tx = ptllnd_new_tx(plp, PTLLND_MSG_TYPE_HELLO, 0);
202         if (tx == NULL) {
203                 CERROR("Can't send HELLO to %s\n", libcfs_id2str(id));
204                 ptllnd_close_peer(plp, -ENOMEM);
205                 ptllnd_peer_decref(plp);
206                 return NULL;
207         }
208
209         tx->tx_msg.ptlm_u.hello.kptlhm_matchbits = PTL_RESERVED_MATCHBITS;
210         tx->tx_msg.ptlm_u.hello.kptlhm_max_msg_size = plni->plni_max_msg_size;
211
212         PTLLND_HISTORY("%s[%d/%d+%d(%d)]: post hello %p", libcfs_id2str(id),
213                        tx->tx_peer->plp_credits,
214                        tx->tx_peer->plp_outstanding_credits,
215                        tx->tx_peer->plp_sent_credits,
216                        plni->plni_peer_credits + 
217                        tx->tx_peer->plp_lazy_credits, tx);
218         ptllnd_post_tx(tx);
219
220         return plp;
221 }
222
223 int
224 ptllnd_count_q(struct list_head *q)
225 {
226         struct list_head *e;
227         int               n = 0;
228
229         list_for_each(e, q) {
230                 n++;
231         }
232
233         return n;
234 }
235
236 const char *
237 ptllnd_tx_typestr(int type)
238 {
239         switch (type) {
240         case PTLLND_RDMA_WRITE:
241                 return "rdma_write";
242
243         case PTLLND_RDMA_READ:
244                 return "rdma_read";
245
246         case PTLLND_MSG_TYPE_PUT:
247                 return "put_req";
248
249         case PTLLND_MSG_TYPE_GET:
250                 return "get_req";
251
252         case PTLLND_MSG_TYPE_IMMEDIATE:
253                 return "immediate";
254
255         case PTLLND_MSG_TYPE_NOOP:
256                 return "noop";
257
258         case PTLLND_MSG_TYPE_HELLO:
259                 return "hello";
260
261         default:
262                 return "<unknown>";
263         }
264 }
265
266 void
267 ptllnd_debug_tx(ptllnd_tx_t *tx)
268 {
269         CDEBUG(D_WARNING, "%s %s b %ld.%06ld/%ld.%06ld"
270                " r %ld.%06ld/%ld.%06ld status %d\n",
271                ptllnd_tx_typestr(tx->tx_type),
272                libcfs_id2str(tx->tx_peer->plp_id),
273                tx->tx_bulk_posted.tv_sec, tx->tx_bulk_posted.tv_usec,
274                tx->tx_bulk_done.tv_sec, tx->tx_bulk_done.tv_usec,
275                tx->tx_req_posted.tv_sec, tx->tx_req_posted.tv_usec,
276                tx->tx_req_done.tv_sec, tx->tx_req_done.tv_usec,
277                tx->tx_status);
278 }
279
280 void
281 ptllnd_debug_peer(lnet_ni_t *ni, lnet_process_id_t id)
282 {
283         ptllnd_peer_t    *plp = ptllnd_find_peer(ni, id, 0);
284         ptllnd_ni_t      *plni = ni->ni_data;
285         ptllnd_tx_t      *tx;
286
287         if (plp == NULL) {
288                 CDEBUG(D_WARNING, "No peer %s\n", libcfs_id2str(id));
289                 return;
290         }
291
292         CWARN("%s %s%s [%d] "LPU64".%06d m "LPU64" q %d/%d/%d c %d/%d+%d(%d)\n",
293               libcfs_id2str(id),
294               plp->plp_recvd_hello ? "H" : "_",
295               plp->plp_closing     ? "C" : "_",
296               plp->plp_refcount,
297               plp->plp_stamp / 1000000, (int)(plp->plp_stamp % 1000000),
298               plp->plp_match,
299               ptllnd_count_q(&plp->plp_txq),
300               ptllnd_count_q(&plp->plp_noopq),
301               ptllnd_count_q(&plp->plp_activeq),
302               plp->plp_credits, plp->plp_outstanding_credits, plp->plp_sent_credits,
303               plni->plni_peer_credits + plp->plp_lazy_credits);
304
305         CDEBUG(D_WARNING, "txq:\n");
306         list_for_each_entry (tx, &plp->plp_txq, tx_list) {
307                 ptllnd_debug_tx(tx);
308         }
309
310         CDEBUG(D_WARNING, "noopq:\n");
311         list_for_each_entry (tx, &plp->plp_noopq, tx_list) {
312                 ptllnd_debug_tx(tx);
313         }
314
315         CDEBUG(D_WARNING, "activeq:\n");
316         list_for_each_entry (tx, &plp->plp_activeq, tx_list) {
317                 ptllnd_debug_tx(tx);
318         }
319
320         CDEBUG(D_WARNING, "zombies:\n");
321         list_for_each_entry (tx, &plni->plni_zombie_txs, tx_list) {
322                 if (tx->tx_peer->plp_id.nid == id.nid &&
323                     tx->tx_peer->plp_id.pid == id.pid)
324                         ptllnd_debug_tx(tx);
325         }
326
327         CDEBUG(D_WARNING, "history:\n");
328         list_for_each_entry (tx, &plni->plni_tx_history, tx_list) {
329                 if (tx->tx_peer->plp_id.nid == id.nid &&
330                     tx->tx_peer->plp_id.pid == id.pid)
331                         ptllnd_debug_tx(tx);
332         }
333
334         ptllnd_peer_decref(plp);
335 }
336
337 void
338 ptllnd_dump_debug(lnet_ni_t *ni, lnet_process_id_t id)
339 {
340         ptllnd_debug_peer(ni, id);
341         ptllnd_dump_history();
342 }
343
344 void
345 ptllnd_notify(lnet_ni_t *ni, lnet_nid_t nid, int alive)
346 {
347         lnet_process_id_t  id;
348         ptllnd_peer_t     *peer;
349         time_t             start = cfs_time_current_sec();
350         ptllnd_ni_t       *plni = ni->ni_data;
351         int                w = plni->plni_long_wait;
352
353         /* This is only actually used to connect to routers at startup! */
354         LASSERT(alive);
355
356         id.nid = nid;
357         id.pid = LUSTRE_SRV_LNET_PID;
358
359         peer = ptllnd_find_peer(ni, id, 1);
360         if (peer == NULL)
361                 return;
362
363         /* wait for the peer to reply */
364         while (!peer->plp_recvd_hello) {
365                 if (w > 0 && cfs_time_current_sec() > start + w/1000) {
366                         CWARN("Waited %ds to connect to %s\n",
367                               (int)(cfs_time_current_sec() - start),
368                               libcfs_id2str(id));
369                         w *= 2;
370                 }
371
372                 ptllnd_wait(ni, w);
373         }
374
375         ptllnd_peer_decref(peer);
376 }
377
378 int
379 ptllnd_setasync(lnet_ni_t *ni, lnet_process_id_t id, int nasync)
380 {
381         ptllnd_peer_t *peer = ptllnd_find_peer(ni, id, nasync > 0);
382         int            rc;
383
384         if (peer == NULL)
385                 return -ENOMEM;
386
387         LASSERT (peer->plp_lazy_credits >= 0);
388         LASSERT (peer->plp_extra_lazy_credits >= 0);
389
390         /* If nasync < 0, we're being told we can reduce the total message
391          * headroom.  We can't do this right now because our peer might already
392          * have credits for the extra buffers, so we just account the extra
393          * headroom in case we need it later and only destroy buffers when the
394          * peer closes.
395          *
396          * Note that the following condition handles this case, where it
397          * actually increases the extra lazy credit counter. */
398
399         if (nasync <= peer->plp_extra_lazy_credits) {
400                 peer->plp_extra_lazy_credits -= nasync;
401                 return 0;
402         }
403
404         LASSERT (nasync > 0);
405
406         nasync -= peer->plp_extra_lazy_credits;
407         peer->plp_extra_lazy_credits = 0;
408
409         rc = ptllnd_size_buffers(ni, nasync);
410         if (rc == 0) {
411                 peer->plp_lazy_credits += nasync;
412                 peer->plp_outstanding_credits += nasync;
413         }
414
415         return rc;
416 }
417
418 __u32
419 ptllnd_cksum (void *ptr, int nob)
420 {
421         char  *c  = ptr;
422         __u32  sum = 0;
423
424         while (nob-- > 0)
425                 sum = ((sum << 1) | (sum >> 31)) + *c++;
426
427         /* ensure I don't return 0 (== no checksum) */
428         return (sum == 0) ? 1 : sum;
429 }
430
431 ptllnd_tx_t *
432 ptllnd_new_tx(ptllnd_peer_t *peer, int type, int payload_nob)
433 {
434         lnet_ni_t   *ni = peer->plp_ni;
435         ptllnd_ni_t *plni = ni->ni_data;
436         ptllnd_tx_t *tx;
437         int          msgsize;
438
439         CDEBUG(D_NET, "peer=%p type=%d payload=%d\n", peer, type, payload_nob);
440
441         switch (type) {
442         default:
443                 LBUG();
444
445         case PTLLND_RDMA_WRITE:
446         case PTLLND_RDMA_READ:
447                 LASSERT (payload_nob == 0);
448                 msgsize = 0;
449                 break;
450
451         case PTLLND_MSG_TYPE_PUT:
452         case PTLLND_MSG_TYPE_GET:
453                 LASSERT (payload_nob == 0);
454                 msgsize = offsetof(kptl_msg_t, ptlm_u) + 
455                           sizeof(kptl_rdma_msg_t);
456                 break;
457
458         case PTLLND_MSG_TYPE_IMMEDIATE:
459                 msgsize = offsetof(kptl_msg_t,
460                                    ptlm_u.immediate.kptlim_payload[payload_nob]);
461                 break;
462
463         case PTLLND_MSG_TYPE_NOOP:
464                 LASSERT (payload_nob == 0);
465                 msgsize = offsetof(kptl_msg_t, ptlm_u);
466                 break;
467
468         case PTLLND_MSG_TYPE_HELLO:
469                 LASSERT (payload_nob == 0);
470                 msgsize = offsetof(kptl_msg_t, ptlm_u) +
471                           sizeof(kptl_hello_msg_t);
472                 break;
473         }
474
475         msgsize = (msgsize + 7) & ~7;
476         LASSERT (msgsize <= peer->plp_max_msg_size);
477
478         LIBCFS_ALLOC(tx, offsetof(ptllnd_tx_t, tx_msg) + msgsize);
479
480         if (tx == NULL) {
481                 CERROR("Can't allocate msg type %d for %s\n",
482                        type, libcfs_id2str(peer->plp_id));
483                 return NULL;
484         }
485
486         CFS_INIT_LIST_HEAD(&tx->tx_list);
487         tx->tx_peer = peer;
488         tx->tx_type = type;
489         tx->tx_lnetmsg = tx->tx_lnetreplymsg = NULL;
490         tx->tx_niov = 0;
491         tx->tx_iov = NULL;
492         tx->tx_reqmdh = PTL_INVALID_HANDLE;
493         tx->tx_bulkmdh = PTL_INVALID_HANDLE;
494         tx->tx_msgsize = msgsize;
495         tx->tx_completing = 0;
496         tx->tx_status = 0;
497
498         memset(&tx->tx_bulk_posted, 0, sizeof(tx->tx_bulk_posted));
499         memset(&tx->tx_bulk_done, 0, sizeof(tx->tx_bulk_done));
500         memset(&tx->tx_req_posted, 0, sizeof(tx->tx_req_posted));
501         memset(&tx->tx_req_done, 0, sizeof(tx->tx_req_done));
502
503         if (msgsize != 0) {
504                 tx->tx_msg.ptlm_magic = PTLLND_MSG_MAGIC;
505                 tx->tx_msg.ptlm_version = PTLLND_MSG_VERSION;
506                 tx->tx_msg.ptlm_type = type;
507                 tx->tx_msg.ptlm_credits = 0;
508                 tx->tx_msg.ptlm_nob = msgsize;
509                 tx->tx_msg.ptlm_cksum = 0;
510                 tx->tx_msg.ptlm_srcnid = ni->ni_nid;
511                 tx->tx_msg.ptlm_srcstamp = plni->plni_stamp;
512                 tx->tx_msg.ptlm_dstnid = peer->plp_id.nid;
513                 tx->tx_msg.ptlm_dststamp = peer->plp_stamp;
514                 tx->tx_msg.ptlm_srcpid = the_lnet.ln_pid;
515                 tx->tx_msg.ptlm_dstpid = peer->plp_id.pid;
516         }
517
518         ptllnd_peer_addref(peer);
519         plni->plni_ntxs++;
520
521         CDEBUG(D_NET, "tx=%p\n",tx);
522
523         return tx;
524 }
525
526 void
527 ptllnd_abort_tx(ptllnd_tx_t *tx, ptl_handle_md_t *mdh)
528 {
529         ptllnd_peer_t   *peer = tx->tx_peer;
530         lnet_ni_t       *ni = peer->plp_ni;
531         int              rc;
532         time_t           start = cfs_time_current_sec();
533         ptllnd_ni_t     *plni = ni->ni_data;
534         int              w = plni->plni_long_wait;
535
536         while (!PtlHandleIsEqual(*mdh, PTL_INVALID_HANDLE)) {
537                 rc = PtlMDUnlink(*mdh);
538 #ifndef LUSTRE_PORTALS_UNLINK_SEMANTICS
539                 if (rc == PTL_OK) /* unlink successful => no unlinked event */
540                         return;
541                 LASSERT (rc == PTL_MD_IN_USE);
542 #endif
543                 if (w > 0 && cfs_time_current_sec() > start + w/1000) {
544                         CWARN("Waited %ds to abort tx to %s\n",
545                               (int)(cfs_time_current_sec() - start),
546                               libcfs_id2str(peer->plp_id));
547                         w *= 2;
548                 }
549                 /* Wait for ptllnd_tx_event() to invalidate */
550                 ptllnd_wait(ni, w);
551         }
552 }
553
554 void
555 ptllnd_cull_tx_history(ptllnd_ni_t *plni)
556 {
557         int max = plni->plni_max_tx_history;
558
559         while (plni->plni_ntx_history > max) {
560                 ptllnd_tx_t *tx = list_entry(plni->plni_tx_history.next, 
561                                              ptllnd_tx_t, tx_list);
562                 list_del(&tx->tx_list);
563
564                 ptllnd_peer_decref(tx->tx_peer);
565
566                 LIBCFS_FREE(tx, offsetof(ptllnd_tx_t, tx_msg) + tx->tx_msgsize);
567
568                 LASSERT (plni->plni_ntxs > 0);
569                 plni->plni_ntxs--;
570                 plni->plni_ntx_history--;
571         }
572 }
573
574 void
575 ptllnd_tx_done(ptllnd_tx_t *tx)
576 {
577         ptllnd_peer_t   *peer = tx->tx_peer;
578         lnet_ni_t       *ni = peer->plp_ni;
579         ptllnd_ni_t     *plni = ni->ni_data;
580
581         /* CAVEAT EMPTOR: If this tx is being aborted, I'll continue to get
582          * events for this tx until it's unlinked.  So I set tx_completing to
583          * flag the tx is getting handled */
584
585         if (tx->tx_completing)
586                 return;
587
588         tx->tx_completing = 1;
589
590         if (!list_empty(&tx->tx_list))
591                 list_del_init(&tx->tx_list);
592
593         if (tx->tx_status != 0) {
594                 if (plni->plni_debug) {
595                         CERROR("Completing tx for %s with error %d\n",
596                                libcfs_id2str(peer->plp_id), tx->tx_status);
597                         ptllnd_debug_tx(tx);
598                 }
599                 ptllnd_close_peer(peer, tx->tx_status);
600         }
601
602         ptllnd_abort_tx(tx, &tx->tx_reqmdh);
603         ptllnd_abort_tx(tx, &tx->tx_bulkmdh);
604
605         if (tx->tx_niov > 0) {
606                 LIBCFS_FREE(tx->tx_iov, tx->tx_niov * sizeof(*tx->tx_iov));
607                 tx->tx_niov = 0;
608         }
609
610         if (tx->tx_lnetreplymsg != NULL) {
611                 LASSERT (tx->tx_type == PTLLND_MSG_TYPE_GET);
612                 LASSERT (tx->tx_lnetmsg != NULL);
613                 /* Simulate GET success always  */
614                 lnet_finalize(ni, tx->tx_lnetmsg, 0);
615                 CDEBUG(D_NET, "lnet_finalize(tx_lnetreplymsg=%p)\n",tx->tx_lnetreplymsg);
616                 lnet_finalize(ni, tx->tx_lnetreplymsg, tx->tx_status);
617         } else if (tx->tx_lnetmsg != NULL) {
618                 lnet_finalize(ni, tx->tx_lnetmsg, tx->tx_status);
619         }
620
621         plni->plni_ntx_history++;
622         list_add_tail(&tx->tx_list, &plni->plni_tx_history);
623
624         ptllnd_cull_tx_history(plni);
625 }
626
627 int
628 ptllnd_set_txiov(ptllnd_tx_t *tx,
629                  unsigned int niov, struct iovec *iov,
630                  unsigned int offset, unsigned int len)
631 {
632         ptl_md_iovec_t *piov;
633         int             npiov;
634
635         if (len == 0) {
636                 tx->tx_niov = 0;
637                 return 0;
638         }
639
640         /*
641          * Remove iovec's at the beginning that
642          * are skipped because of the offset.
643          * Adjust the offset accordingly
644          */
645         for (;;) {
646                 LASSERT (niov > 0);
647                 if (offset < iov->iov_len)
648                         break;
649                 offset -= iov->iov_len;
650                 niov--;
651                 iov++;
652         }
653
654         for (;;) {
655                 int temp_offset = offset;
656                 int resid = len;
657                 LIBCFS_ALLOC(piov, niov * sizeof(*piov));
658                 if (piov == NULL)
659                         return -ENOMEM;
660
661                 for (npiov = 0;; npiov++) {
662                         LASSERT (npiov < niov);
663                         LASSERT (iov->iov_len >= temp_offset);
664
665                         piov[npiov].iov_base = iov[npiov].iov_base + temp_offset;
666                         piov[npiov].iov_len = iov[npiov].iov_len - temp_offset;
667
668                         if (piov[npiov].iov_len >= resid) {
669                                 piov[npiov].iov_len = resid;
670                                 npiov++;
671                                 break;
672                         }
673                         resid -= piov[npiov].iov_len;
674                         temp_offset = 0;
675                 }
676
677                 if (npiov == niov) {
678                         tx->tx_niov = niov;
679                         tx->tx_iov = piov;
680                         return 0;
681                 }
682
683                 /* Dang! The piov I allocated was too big and it's a drag to
684                  * have to maintain separate 'allocated' and 'used' sizes, so
685                  * I'll just do it again; NB this doesn't happen normally... */
686                 LIBCFS_FREE(piov, niov * sizeof(*piov));
687                 niov = npiov;
688         }
689 }
690
691 void
692 ptllnd_set_md_buffer(ptl_md_t *md, ptllnd_tx_t *tx)
693 {
694         unsigned int    niov = tx->tx_niov;
695         ptl_md_iovec_t *iov = tx->tx_iov;
696
697         LASSERT ((md->options & PTL_MD_IOVEC) == 0);
698
699         if (niov == 0) {
700                 md->start = NULL;
701                 md->length = 0;
702         } else if (niov == 1) {
703                 md->start = iov[0].iov_base;
704                 md->length = iov[0].iov_len;
705         } else {
706                 md->start = iov;
707                 md->length = niov;
708                 md->options |= PTL_MD_IOVEC;
709         }
710 }
711
712 int
713 ptllnd_post_buffer(ptllnd_buffer_t *buf)
714 {
715         lnet_ni_t        *ni = buf->plb_ni;
716         ptllnd_ni_t      *plni = ni->ni_data;
717         ptl_process_id_t  anyid = {
718                 .nid       = PTL_NID_ANY,
719                 .pid       = PTL_PID_ANY};
720         ptl_md_t          md = {
721                 .start     = buf->plb_buffer,
722                 .length    = plni->plni_buffer_size,
723                 .threshold = PTL_MD_THRESH_INF,
724                 .max_size  = plni->plni_max_msg_size,
725                 .options   = (PTLLND_MD_OPTIONS |
726                               PTL_MD_OP_PUT | PTL_MD_MAX_SIZE | 
727                               PTL_MD_LOCAL_ALIGN8),
728                 .user_ptr  = ptllnd_obj2eventarg(buf, PTLLND_EVENTARG_TYPE_BUF),
729                 .eq_handle = plni->plni_eqh};
730         ptl_handle_me_t meh;
731         int             rc;
732
733         LASSERT (!buf->plb_posted);
734
735         rc = PtlMEAttach(plni->plni_nih, plni->plni_portal,
736                          anyid, LNET_MSG_MATCHBITS, 0,
737                          PTL_UNLINK, PTL_INS_AFTER, &meh);
738         if (rc != PTL_OK) {
739                 CERROR("PtlMEAttach failed: %s(%d)\n",
740                        ptllnd_errtype2str(rc), rc);
741                 return -ENOMEM;
742         }
743
744         buf->plb_posted = 1;
745         plni->plni_nposted_buffers++;
746
747         rc = PtlMDAttach(meh, md, LNET_UNLINK, &buf->plb_md);
748         if (rc == PTL_OK)
749                 return 0;
750
751         CERROR("PtlMDAttach failed: %s(%d)\n",
752                ptllnd_errtype2str(rc), rc);
753
754         buf->plb_posted = 0;
755         plni->plni_nposted_buffers--;
756
757         rc = PtlMEUnlink(meh);
758         LASSERT (rc == PTL_OK);
759
760         return -ENOMEM;
761 }
762
763 static inline int
764 ptllnd_peer_send_noop (ptllnd_peer_t *peer)
765 {
766         ptllnd_ni_t *plni = peer->plp_ni->ni_data;
767
768         if (!peer->plp_sent_hello ||
769             peer->plp_credits == 0 ||
770             !list_empty(&peer->plp_noopq) ||
771             peer->plp_outstanding_credits < PTLLND_CREDIT_HIGHWATER(plni))
772                 return 0;
773
774         /* No tx to piggyback NOOP onto or no credit to send a tx */
775         return (list_empty(&peer->plp_txq) || peer->plp_credits == 1);
776 }
777
778 void
779 ptllnd_check_sends(ptllnd_peer_t *peer)
780 {
781         ptllnd_ni_t    *plni = peer->plp_ni->ni_data;
782         ptllnd_tx_t    *tx;
783         ptl_md_t        md;
784         ptl_handle_md_t mdh;
785         int             rc;
786
787         CDEBUG(D_NET, "%s: [%d/%d+%d(%d)\n",
788                libcfs_id2str(peer->plp_id), peer->plp_credits,
789                peer->plp_outstanding_credits, peer->plp_sent_credits,
790                plni->plni_peer_credits + peer->plp_lazy_credits);
791
792         if (ptllnd_peer_send_noop(peer)) {
793                 tx = ptllnd_new_tx(peer, PTLLND_MSG_TYPE_NOOP, 0);
794                 CDEBUG(D_NET, "NOOP tx=%p\n",tx);
795                 if (tx == NULL) {
796                         CERROR("Can't return credits to %s\n",
797                                libcfs_id2str(peer->plp_id));
798                 } else {
799                         ptllnd_set_tx_deadline(tx);
800                         list_add_tail(&tx->tx_list, &peer->plp_noopq);
801                 }
802         }
803
804         for (;;) {
805                 if (!list_empty(&peer->plp_noopq)) {
806                         LASSERT (peer->plp_sent_hello);
807                         tx = list_entry(peer->plp_noopq.next,
808                                         ptllnd_tx_t, tx_list);
809                 } else if (!list_empty(&peer->plp_txq)) {
810                         tx = list_entry(peer->plp_txq.next,
811                                         ptllnd_tx_t, tx_list);
812                 } else {
813                         /* nothing to send right now */
814                         break;
815                 }
816
817                 LASSERT (tx->tx_msgsize > 0);
818
819                 LASSERT (peer->plp_outstanding_credits >= 0);
820                 LASSERT (peer->plp_sent_credits >= 0);
821                 LASSERT (peer->plp_outstanding_credits + peer->plp_sent_credits
822                          <= plni->plni_peer_credits + peer->plp_lazy_credits);
823                 LASSERT (peer->plp_credits >= 0);
824
825                 /* say HELLO first */
826                 if (!peer->plp_sent_hello) {
827                         LASSERT (list_empty(&peer->plp_noopq));
828                         LASSERT (tx->tx_type == PTLLND_MSG_TYPE_HELLO);
829
830                         peer->plp_sent_hello = 1;
831                 }
832
833                 if (peer->plp_credits == 0) {   /* no credits */
834                         PTLLND_HISTORY("%s[%d/%d+%d(%d)]: no creds for %p",
835                                        libcfs_id2str(peer->plp_id),
836                                        peer->plp_credits,
837                                        peer->plp_outstanding_credits,
838                                        peer->plp_sent_credits,
839                                        plni->plni_peer_credits +
840                                        peer->plp_lazy_credits, tx);
841                         break;
842                 }
843
844                 /* Last/Initial credit reserved for NOOP/HELLO */
845                 if (peer->plp_credits == 1 &&
846                     tx->tx_type != PTLLND_MSG_TYPE_NOOP &&
847                     tx->tx_type != PTLLND_MSG_TYPE_HELLO) {
848                         PTLLND_HISTORY("%s[%d/%d+%d(%d)]: too few creds for %p",
849                                        libcfs_id2str(peer->plp_id),
850                                        peer->plp_credits,
851                                        peer->plp_outstanding_credits,
852                                        peer->plp_sent_credits,
853                                        plni->plni_peer_credits +
854                                        peer->plp_lazy_credits, tx);
855                         break;
856                 }
857
858                 list_del(&tx->tx_list);
859                 list_add_tail(&tx->tx_list, &peer->plp_activeq);
860
861                 CDEBUG(D_NET, "Sending at TX=%p type=%s (%d)\n",tx,
862                         ptllnd_msgtype2str(tx->tx_type),tx->tx_type);
863
864                 if (tx->tx_type == PTLLND_MSG_TYPE_NOOP &&
865                     !ptllnd_peer_send_noop(peer)) {
866                         /* redundant NOOP */
867                         ptllnd_tx_done(tx);
868                         continue;
869                 }
870
871                 /* Set stamp at the last minute; on a new peer, I don't know it
872                  * until I receive the HELLO back */
873                 tx->tx_msg.ptlm_dststamp = peer->plp_stamp;
874
875                 /*
876                  * Return all the credits we have
877                  */
878                 tx->tx_msg.ptlm_credits = peer->plp_outstanding_credits;
879                 peer->plp_sent_credits += peer->plp_outstanding_credits;
880                 peer->plp_outstanding_credits = 0;
881
882                 /*
883                  * One less credit
884                  */
885                 peer->plp_credits--;
886
887                 if (plni->plni_checksum)
888                         tx->tx_msg.ptlm_cksum = 
889                                 ptllnd_cksum(&tx->tx_msg,
890                                              offsetof(kptl_msg_t, ptlm_u));
891
892                 md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);
893                 md.eq_handle = plni->plni_eqh;
894                 md.threshold = 1;
895                 md.options = PTLLND_MD_OPTIONS;
896                 md.start = &tx->tx_msg;
897                 md.length = tx->tx_msgsize;
898
899                 rc = PtlMDBind(plni->plni_nih, md, LNET_UNLINK, &mdh);
900                 if (rc != PTL_OK) {
901                         CERROR("PtlMDBind for %s failed: %s(%d)\n",
902                                libcfs_id2str(peer->plp_id),
903                                ptllnd_errtype2str(rc), rc);
904                         tx->tx_status = -EIO;
905                         ptllnd_tx_done(tx);
906                         break;
907                 }
908
909                 LASSERT (tx->tx_type != PTLLND_RDMA_WRITE &&
910                          tx->tx_type != PTLLND_RDMA_READ);
911
912                 tx->tx_reqmdh = mdh;
913                 gettimeofday(&tx->tx_req_posted, NULL);
914
915                 PTLLND_HISTORY("%s[%d/%d+%d(%d)]: %s %p c %d",
916                                libcfs_id2str(peer->plp_id),
917                                peer->plp_credits,
918                                peer->plp_outstanding_credits,
919                                peer->plp_sent_credits,
920                                plni->plni_peer_credits +
921                                peer->plp_lazy_credits,
922                                ptllnd_msgtype2str(tx->tx_type), tx,
923                                tx->tx_msg.ptlm_credits);
924
925                 rc = PtlPut(mdh, PTL_NOACK_REQ, peer->plp_ptlid,
926                             plni->plni_portal, 0, LNET_MSG_MATCHBITS, 0, 0);
927                 if (rc != PTL_OK) {
928                         CERROR("PtlPut for %s failed: %s(%d)\n",
929                                libcfs_id2str(peer->plp_id),
930                                ptllnd_errtype2str(rc), rc);
931                         tx->tx_status = -EIO;
932                         ptllnd_tx_done(tx);
933                         break;
934                 }
935         }
936 }
937
938 int
939 ptllnd_passive_rdma(ptllnd_peer_t *peer, int type, lnet_msg_t *msg,
940                     unsigned int niov, struct iovec *iov,
941                     unsigned int offset, unsigned int len)
942 {
943         lnet_ni_t      *ni = peer->plp_ni;
944         ptllnd_ni_t    *plni = ni->ni_data;
945         ptllnd_tx_t    *tx = ptllnd_new_tx(peer, type, 0);
946         __u64           matchbits;
947         ptl_md_t        md;
948         ptl_handle_md_t mdh;
949         ptl_handle_me_t meh;
950         int             rc;
951         int             rc2;
952         time_t          start;
953         int             w;
954
955         CDEBUG(D_NET, "niov=%d offset=%d len=%d\n",niov,offset,len);
956
957         LASSERT (type == PTLLND_MSG_TYPE_GET ||
958                  type == PTLLND_MSG_TYPE_PUT);
959
960         if (tx == NULL) {
961                 CERROR("Can't allocate %s tx for %s\n",
962                        type == PTLLND_MSG_TYPE_GET ? "GET" : "PUT/REPLY",
963                        libcfs_id2str(peer->plp_id));
964                 return -ENOMEM;
965         }
966
967         rc = ptllnd_set_txiov(tx, niov, iov, offset, len);
968         if (rc != 0) {
969                 CERROR ("Can't allocate iov %d for %s\n",
970                         niov, libcfs_id2str(peer->plp_id));
971                 rc = -ENOMEM;
972                 goto failed;
973         }
974
975         md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);
976         md.eq_handle = plni->plni_eqh;
977         md.threshold = 1;
978         md.max_size = 0;
979         md.options = PTLLND_MD_OPTIONS;
980         if(type == PTLLND_MSG_TYPE_GET)
981                 md.options |= PTL_MD_OP_PUT | PTL_MD_ACK_DISABLE;
982         else
983                 md.options |= PTL_MD_OP_GET;
984         ptllnd_set_md_buffer(&md, tx);
985
986         start = cfs_time_current_sec();
987         w = plni->plni_long_wait;
988
989         while (!peer->plp_recvd_hello) {        /* wait to validate plp_match */
990                 if (peer->plp_closing) {
991                         rc = -EIO;
992                         goto failed;
993                 }
994                 if (w > 0 && cfs_time_current_sec() > start + w/1000) {
995                         CWARN("Waited %ds to connect to %s\n",
996                               (int)(cfs_time_current_sec() - start),
997                               libcfs_id2str(peer->plp_id));
998                         w *= 2;
999                 }
1000                 ptllnd_wait(ni, w);
1001         }
1002
1003         if (peer->plp_match < PTL_RESERVED_MATCHBITS)
1004                 peer->plp_match = PTL_RESERVED_MATCHBITS;
1005         matchbits = peer->plp_match++;
1006
1007         rc = PtlMEAttach(plni->plni_nih, plni->plni_portal, peer->plp_ptlid,
1008                          matchbits, 0, PTL_UNLINK, PTL_INS_BEFORE, &meh);
1009         if (rc != PTL_OK) {
1010                 CERROR("PtlMEAttach for %s failed: %s(%d)\n",
1011                        libcfs_id2str(peer->plp_id),
1012                        ptllnd_errtype2str(rc), rc);
1013                 rc = -EIO;
1014                 goto failed;
1015         }
1016
1017         gettimeofday(&tx->tx_bulk_posted, NULL);
1018
1019         rc = PtlMDAttach(meh, md, LNET_UNLINK, &mdh);
1020         if (rc != PTL_OK) {
1021                 CERROR("PtlMDAttach for %s failed: %s(%d)\n",
1022                        libcfs_id2str(peer->plp_id),
1023                        ptllnd_errtype2str(rc), rc);
1024                 rc2 = PtlMEUnlink(meh);
1025                 LASSERT (rc2 == PTL_OK);
1026                 rc = -EIO;
1027                 goto failed;
1028         }
1029         tx->tx_bulkmdh = mdh;
1030
1031         /*
1032          * We need to set the stamp here because it
1033          * we could have received a HELLO above that set
1034          * peer->plp_stamp
1035          */
1036         tx->tx_msg.ptlm_dststamp = peer->plp_stamp;
1037
1038         tx->tx_msg.ptlm_u.rdma.kptlrm_hdr = msg->msg_hdr;
1039         tx->tx_msg.ptlm_u.rdma.kptlrm_matchbits = matchbits;
1040
1041         if (type == PTLLND_MSG_TYPE_GET) {
1042                 tx->tx_lnetreplymsg = lnet_create_reply_msg(ni, msg);
1043                 if (tx->tx_lnetreplymsg == NULL) {
1044                         CERROR("Can't create reply for GET to %s\n",
1045                                libcfs_id2str(msg->msg_target));
1046                         rc = -ENOMEM;
1047                         goto failed;
1048                 }
1049         }
1050
1051         tx->tx_lnetmsg = msg;
1052         PTLLND_HISTORY("%s[%d/%d+%d(%d)]: post passive %s p %d %p",
1053                        libcfs_id2str(msg->msg_target),
1054                        peer->plp_credits, peer->plp_outstanding_credits,
1055                        peer->plp_sent_credits,
1056                        plni->plni_peer_credits + peer->plp_lazy_credits,
1057                        lnet_msgtyp2str(msg->msg_type),
1058                        (le32_to_cpu(msg->msg_type) == LNET_MSG_PUT) ? 
1059                        le32_to_cpu(msg->msg_hdr.msg.put.ptl_index) :
1060                        (le32_to_cpu(msg->msg_type) == LNET_MSG_GET) ? 
1061                        le32_to_cpu(msg->msg_hdr.msg.get.ptl_index) : -1,
1062                        tx);
1063         ptllnd_post_tx(tx);
1064         return 0;
1065
1066  failed:
1067         ptllnd_tx_done(tx);
1068         return rc;
1069 }
1070
1071 int
1072 ptllnd_active_rdma(ptllnd_peer_t *peer, int type,
1073                    lnet_msg_t *msg, __u64 matchbits,
1074                    unsigned int niov, struct iovec *iov,
1075                    unsigned int offset, unsigned int len)
1076 {
1077         lnet_ni_t       *ni = peer->plp_ni;
1078         ptllnd_ni_t     *plni = ni->ni_data;
1079         ptllnd_tx_t     *tx = ptllnd_new_tx(peer, type, 0);
1080         ptl_md_t         md;
1081         ptl_handle_md_t  mdh;
1082         int              rc;
1083
1084         LASSERT (type == PTLLND_RDMA_READ ||
1085                  type == PTLLND_RDMA_WRITE);
1086
1087         if (tx == NULL) {
1088                 CERROR("Can't allocate tx for RDMA %s with %s\n",
1089                        (type == PTLLND_RDMA_WRITE) ? "write" : "read",
1090                        libcfs_id2str(peer->plp_id));
1091                 ptllnd_close_peer(peer, -ENOMEM);
1092                 return -ENOMEM;
1093         }
1094
1095         rc = ptllnd_set_txiov(tx, niov, iov, offset, len);
1096         if (rc != 0) {
1097                 CERROR ("Can't allocate iov %d for %s\n",
1098                         niov, libcfs_id2str(peer->plp_id));
1099                 rc = -ENOMEM;
1100                 goto failed;
1101         }
1102
1103         md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);
1104         md.eq_handle = plni->plni_eqh;
1105         md.max_size = 0;
1106         md.options = PTLLND_MD_OPTIONS;
1107         md.threshold = (type == PTLLND_RDMA_READ) ? 2 : 1;
1108
1109         ptllnd_set_md_buffer(&md, tx);
1110
1111         rc = PtlMDBind(plni->plni_nih, md, LNET_UNLINK, &mdh);
1112         if (rc != PTL_OK) {
1113                 CERROR("PtlMDBind for %s failed: %s(%d)\n",
1114                        libcfs_id2str(peer->plp_id),
1115                        ptllnd_errtype2str(rc), rc);
1116                 rc = -EIO;
1117                 goto failed;
1118         }
1119
1120         tx->tx_bulkmdh = mdh;
1121         tx->tx_lnetmsg = msg;
1122
1123         ptllnd_set_tx_deadline(tx);
1124         list_add_tail(&tx->tx_list, &peer->plp_activeq);
1125         gettimeofday(&tx->tx_bulk_posted, NULL);
1126
1127         if (type == PTLLND_RDMA_READ)
1128                 rc = PtlGet(mdh, peer->plp_ptlid,
1129                             plni->plni_portal, 0, matchbits, 0);
1130         else
1131                 rc = PtlPut(mdh, PTL_NOACK_REQ, peer->plp_ptlid,
1132                             plni->plni_portal, 0, matchbits, 0, 
1133                             (msg == NULL) ? PTLLND_RDMA_FAIL : PTLLND_RDMA_OK);
1134
1135         if (rc == PTL_OK)
1136                 return 0;
1137
1138         CERROR("Can't initiate RDMA with %s: %s(%d)\n",
1139                libcfs_id2str(peer->plp_id),
1140                ptllnd_errtype2str(rc), rc);
1141
1142         tx->tx_lnetmsg = NULL;
1143  failed:
1144         tx->tx_status = rc;
1145         ptllnd_tx_done(tx);    /* this will close peer */
1146         return rc;
1147 }
1148
1149 int
1150 ptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *msg)
1151 {
1152         ptllnd_ni_t    *plni = ni->ni_data;
1153         ptllnd_peer_t  *plp;
1154         ptllnd_tx_t    *tx;
1155         int             nob;
1156         int             rc;
1157
1158         LASSERT (!msg->msg_routing);
1159         LASSERT (msg->msg_kiov == NULL);
1160
1161         LASSERT (msg->msg_niov <= PTL_MD_MAX_IOV); /* !!! */
1162
1163         CDEBUG(D_NET, "%s [%d]+%d,%d -> %s%s\n",
1164                lnet_msgtyp2str(msg->msg_type),
1165                msg->msg_niov, msg->msg_offset, msg->msg_len,
1166                libcfs_nid2str(msg->msg_target.nid),
1167                msg->msg_target_is_router ? "(rtr)" : "");
1168
1169         if ((msg->msg_target.pid & LNET_PID_USERFLAG) != 0) {
1170                 CERROR("Can't send to non-kernel peer %s\n",
1171                        libcfs_id2str(msg->msg_target));
1172                 return -EHOSTUNREACH;
1173         }
1174
1175         plp = ptllnd_find_peer(ni, msg->msg_target, 1);
1176         if (plp == NULL)
1177                 return -ENOMEM;
1178
1179         switch (msg->msg_type) {
1180         default:
1181                 LBUG();
1182
1183         case LNET_MSG_ACK:
1184                 LASSERT (msg->msg_len == 0);
1185                 break;                          /* send IMMEDIATE */
1186
1187         case LNET_MSG_GET:
1188                 if (msg->msg_target_is_router)
1189                         break;                  /* send IMMEDIATE */
1190
1191                 nob = msg->msg_md->md_length;
1192                 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[nob]);
1193                 if (nob <= plni->plni_max_msg_size)
1194                         break;
1195
1196                 LASSERT ((msg->msg_md->md_options & LNET_MD_KIOV) == 0);
1197                 rc = ptllnd_passive_rdma(plp, PTLLND_MSG_TYPE_GET, msg,
1198                                          msg->msg_md->md_niov,
1199                                          msg->msg_md->md_iov.iov,
1200                                          0, msg->msg_md->md_length);
1201                 ptllnd_peer_decref(plp);
1202                 return rc;
1203
1204         case LNET_MSG_REPLY:
1205         case LNET_MSG_PUT:
1206                 nob = msg->msg_len;
1207                 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[nob]);
1208                 if (nob <= plp->plp_max_msg_size)
1209                         break;                  /* send IMMEDIATE */
1210
1211                 rc = ptllnd_passive_rdma(plp, PTLLND_MSG_TYPE_PUT, msg,
1212                                          msg->msg_niov, msg->msg_iov,
1213                                          msg->msg_offset, msg->msg_len);
1214                 ptllnd_peer_decref(plp);
1215                 return rc;
1216         }
1217
1218         /* send IMMEDIATE
1219          * NB copy the payload so we don't have to do a fragmented send */
1220
1221         tx = ptllnd_new_tx(plp, PTLLND_MSG_TYPE_IMMEDIATE, msg->msg_len);
1222         if (tx == NULL) {
1223                 CERROR("Can't allocate tx for lnet type %d to %s\n",
1224                        msg->msg_type, libcfs_id2str(msg->msg_target));
1225                 ptllnd_peer_decref(plp);
1226                 return -ENOMEM;
1227         }
1228
1229         lnet_copy_iov2flat(tx->tx_msgsize, &tx->tx_msg,
1230                            offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload),
1231                            msg->msg_niov, msg->msg_iov, msg->msg_offset,
1232                            msg->msg_len);
1233         tx->tx_msg.ptlm_u.immediate.kptlim_hdr = msg->msg_hdr;
1234
1235         tx->tx_lnetmsg = msg;
1236         PTLLND_HISTORY("%s[%d/%d+%d(%d)]: post immediate %s p %d %p",
1237                        libcfs_id2str(msg->msg_target),
1238                        plp->plp_credits, plp->plp_outstanding_credits,
1239                        plp->plp_sent_credits,
1240                        plni->plni_peer_credits + plp->plp_lazy_credits,
1241                        lnet_msgtyp2str(msg->msg_type),
1242                        (le32_to_cpu(msg->msg_type) == LNET_MSG_PUT) ? 
1243                        le32_to_cpu(msg->msg_hdr.msg.put.ptl_index) :
1244                        (le32_to_cpu(msg->msg_type) == LNET_MSG_GET) ? 
1245                        le32_to_cpu(msg->msg_hdr.msg.get.ptl_index) : -1,
1246                        tx);
1247         ptllnd_post_tx(tx);
1248         ptllnd_peer_decref(plp);
1249         return 0;
1250 }
1251
1252 void
1253 ptllnd_rx_done(ptllnd_rx_t *rx)
1254 {
1255         ptllnd_peer_t *plp = rx->rx_peer;
1256         ptllnd_ni_t   *plni = plp->plp_ni->ni_data;
1257
1258         plp->plp_outstanding_credits++;
1259
1260         PTLLND_HISTORY("%s[%d/%d+%d(%d)]: rx=%p done\n",
1261                        libcfs_id2str(plp->plp_id),
1262                        plp->plp_credits, plp->plp_outstanding_credits, 
1263                        plp->plp_sent_credits,
1264                        plni->plni_peer_credits + plp->plp_lazy_credits, rx);
1265
1266         ptllnd_check_sends(plp);
1267
1268         LASSERT (plni->plni_nrxs > 0);
1269         plni->plni_nrxs--;
1270 }
1271
1272 int
1273 ptllnd_eager_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg,
1274                   void **new_privatep)
1275 {
1276         /* Shouldn't get here; recvs only block for router buffers */
1277         LBUG();
1278         return 0;
1279 }
1280
1281 int
1282 ptllnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg,
1283             int delayed, unsigned int niov,
1284             struct iovec *iov, lnet_kiov_t *kiov,
1285             unsigned int offset, unsigned int mlen, unsigned int rlen)
1286 {
1287         ptllnd_rx_t    *rx = private;
1288         int             rc = 0;
1289         int             nob;
1290
1291         LASSERT (kiov == NULL);
1292         LASSERT (niov <= PTL_MD_MAX_IOV);       /* !!! */
1293
1294         switch (rx->rx_msg->ptlm_type) {
1295         default:
1296                 LBUG();
1297
1298         case PTLLND_MSG_TYPE_IMMEDIATE:
1299                 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[mlen]);
1300                 if (nob > rx->rx_nob) {
1301                         CERROR("Immediate message from %s too big: %d(%d)\n",
1302                                libcfs_id2str(rx->rx_peer->plp_id),
1303                                nob, rx->rx_nob);
1304                         rc = -EPROTO;
1305                         break;
1306                 }
1307                 lnet_copy_flat2iov(niov, iov, offset,
1308                                    rx->rx_nob, rx->rx_msg,
1309                                    offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload),
1310                                    mlen);
1311                 lnet_finalize(ni, msg, 0);
1312                 break;
1313
1314         case PTLLND_MSG_TYPE_PUT:
1315                 rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_READ, msg,
1316                                         rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,
1317                                         niov, iov, offset, mlen);
1318                 break;
1319
1320         case PTLLND_MSG_TYPE_GET:
1321                 if (msg != NULL)
1322                         rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_WRITE, msg,
1323                                                 rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,
1324                                                 msg->msg_niov, msg->msg_iov,
1325                                                 msg->msg_offset, msg->msg_len);
1326                 else
1327                         rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_WRITE, NULL,
1328                                                 rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,
1329                                                 0, NULL, 0, 0);
1330                 break;
1331         }
1332
1333         ptllnd_rx_done(rx);
1334         return rc;
1335 }
1336
1337 void
1338 ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator,
1339                      kptl_msg_t *msg, unsigned int nob)
1340 {
1341         ptllnd_ni_t      *plni = ni->ni_data;
1342         const int         basenob = offsetof(kptl_msg_t, ptlm_u);
1343         lnet_process_id_t srcid;
1344         ptllnd_rx_t       rx;
1345         int               flip;
1346         __u16             msg_version;
1347         __u32             msg_cksum;
1348         ptllnd_peer_t    *plp;
1349         int               rc;
1350
1351         if (nob < 6) {
1352                 CERROR("Very short receive from %s\n",
1353                        ptllnd_ptlid2str(initiator));
1354                 return;
1355         }
1356
1357         /* I can at least read MAGIC/VERSION */
1358
1359         flip = msg->ptlm_magic == __swab32(PTLLND_MSG_MAGIC);
1360         if (!flip && msg->ptlm_magic != PTLLND_MSG_MAGIC) {
1361                 CERROR("Bad protocol magic %08x from %s\n", 
1362                        msg->ptlm_magic, ptllnd_ptlid2str(initiator));
1363                 return;
1364         }
1365
1366         msg_version = flip ? __swab16(msg->ptlm_version) : msg->ptlm_version;
1367
1368         if (msg_version != PTLLND_MSG_VERSION) {
1369                 CERROR("Bad protocol version %04x from %s: %04x expected\n",
1370                        (__u32)msg_version, ptllnd_ptlid2str(initiator), PTLLND_MSG_VERSION);
1371
1372                 if (plni->plni_abort_on_protocol_mismatch)
1373                         abort();
1374
1375                 return;
1376         }
1377
1378         if (nob < basenob) {
1379                 CERROR("Short receive from %s: got %d, wanted at least %d\n",
1380                        ptllnd_ptlid2str(initiator), nob, basenob);
1381                 return;
1382         }
1383
1384         /* checksum must be computed with
1385          * 1) ptlm_cksum zero and
1386          * 2) BEFORE anything gets modified/flipped
1387          */
1388         msg_cksum = flip ? __swab32(msg->ptlm_cksum) : msg->ptlm_cksum;
1389         msg->ptlm_cksum = 0;
1390         if (msg_cksum != 0 &&
1391             msg_cksum != ptllnd_cksum(msg, offsetof(kptl_msg_t, ptlm_u))) {
1392                 CERROR("Bad checksum from %s\n", ptllnd_ptlid2str(initiator));
1393                 return;
1394         }
1395
1396         msg->ptlm_version = msg_version;
1397         msg->ptlm_cksum = msg_cksum;
1398
1399         if (flip) {
1400                 /* NB stamps are opaque cookies */
1401                 __swab32s(&msg->ptlm_nob);
1402                 __swab64s(&msg->ptlm_srcnid);
1403                 __swab64s(&msg->ptlm_dstnid);
1404                 __swab32s(&msg->ptlm_srcpid);
1405                 __swab32s(&msg->ptlm_dstpid);
1406         }
1407
1408         srcid.nid = msg->ptlm_srcnid;
1409         srcid.pid = msg->ptlm_srcpid;
1410
1411         if (LNET_NIDNET(msg->ptlm_srcnid) != LNET_NIDNET(ni->ni_nid)) {
1412                 CERROR("Bad source id %s from %s\n",
1413                        libcfs_id2str(srcid),
1414                        ptllnd_ptlid2str(initiator));
1415                 return;
1416         }
1417
1418         if (msg->ptlm_type == PTLLND_MSG_TYPE_NAK) {
1419                 CERROR("NAK from %s (%s)\n",
1420                        libcfs_id2str(srcid),
1421                        ptllnd_ptlid2str(initiator));
1422
1423                 if (plni->plni_dump_on_nak)
1424                         ptllnd_dump_debug(ni, srcid);
1425
1426                 if (plni->plni_abort_on_nak)
1427                         abort();
1428
1429                 return;
1430         }
1431
1432         if (msg->ptlm_dstnid != ni->ni_nid ||
1433             msg->ptlm_dstpid != the_lnet.ln_pid) {
1434                 CERROR("Bad dstid %s (%s expected) from %s\n",
1435                        libcfs_id2str((lnet_process_id_t) {
1436                                .nid = msg->ptlm_dstnid,
1437                                .pid = msg->ptlm_dstpid}),
1438                        libcfs_id2str((lnet_process_id_t) {
1439                                .nid = ni->ni_nid,
1440                                .pid = the_lnet.ln_pid}),
1441                        libcfs_id2str(srcid));
1442                 return;
1443         }
1444
1445         if (msg->ptlm_dststamp != plni->plni_stamp) {
1446                 CERROR("Bad dststamp "LPX64"("LPX64" expected) from %s\n",
1447                        msg->ptlm_dststamp, plni->plni_stamp,
1448                        libcfs_id2str(srcid));
1449                 return;
1450         }
1451
1452         PTLLND_HISTORY("RX %s: %s %d %p", libcfs_id2str(srcid), 
1453                        ptllnd_msgtype2str(msg->ptlm_type),
1454                        msg->ptlm_credits, &rx);
1455
1456         switch (msg->ptlm_type) {
1457         case PTLLND_MSG_TYPE_PUT:
1458         case PTLLND_MSG_TYPE_GET:
1459                 if (nob < basenob + sizeof(kptl_rdma_msg_t)) {
1460                         CERROR("Short rdma request from %s(%s)\n",
1461                                libcfs_id2str(srcid),
1462                                ptllnd_ptlid2str(initiator));
1463                         return;
1464                 }
1465                 if (flip)
1466                         __swab64s(&msg->ptlm_u.rdma.kptlrm_matchbits);
1467                 break;
1468
1469         case PTLLND_MSG_TYPE_IMMEDIATE:
1470                 if (nob < offsetof(kptl_msg_t,
1471                                    ptlm_u.immediate.kptlim_payload)) {
1472                         CERROR("Short immediate from %s(%s)\n",
1473                                libcfs_id2str(srcid),
1474                                ptllnd_ptlid2str(initiator));
1475                         return;
1476                 }
1477                 break;
1478
1479         case PTLLND_MSG_TYPE_HELLO:
1480                 if (nob < basenob + sizeof(kptl_hello_msg_t)) {
1481                         CERROR("Short hello from %s(%s)\n",
1482                                libcfs_id2str(srcid),
1483                                ptllnd_ptlid2str(initiator));
1484                         return;
1485                 }
1486                 if(flip){
1487                         __swab64s(&msg->ptlm_u.hello.kptlhm_matchbits);
1488                         __swab32s(&msg->ptlm_u.hello.kptlhm_max_msg_size);
1489                 }
1490                 break;
1491
1492         case PTLLND_MSG_TYPE_NOOP:
1493                 break;
1494
1495         default:
1496                 CERROR("Bad message type %d from %s(%s)\n", msg->ptlm_type,
1497                        libcfs_id2str(srcid),
1498                        ptllnd_ptlid2str(initiator));
1499                 return;
1500         }
1501
1502         plp = ptllnd_find_peer(ni, srcid, 0);
1503         if (plp == NULL) {
1504                 CERROR("Can't find peer %s\n", libcfs_id2str(srcid));
1505                 return;
1506         }
1507
1508         if (msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {
1509                 if (plp->plp_recvd_hello) {
1510                         CERROR("Unexpected HELLO from %s\n",
1511                                libcfs_id2str(srcid));
1512                         ptllnd_peer_decref(plp);
1513                         return;
1514                 }
1515
1516                 plp->plp_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size;
1517                 plp->plp_match = msg->ptlm_u.hello.kptlhm_matchbits;
1518                 plp->plp_stamp = msg->ptlm_srcstamp;
1519                 plp->plp_recvd_hello = 1;
1520
1521         } else if (!plp->plp_recvd_hello) {
1522
1523                 CERROR("Bad message type %d (HELLO expected) from %s\n",
1524                        msg->ptlm_type, libcfs_id2str(srcid));
1525                 ptllnd_peer_decref(plp);
1526                 return;
1527
1528         } else if (msg->ptlm_srcstamp != plp->plp_stamp) {
1529
1530                 CERROR("Bad srcstamp "LPX64"("LPX64" expected) from %s\n",
1531                        msg->ptlm_srcstamp, plp->plp_stamp,
1532                        libcfs_id2str(srcid));
1533                 ptllnd_peer_decref(plp);
1534                 return;
1535         }
1536
1537         /* Check peer only sends when I've sent her credits */
1538         if (plp->plp_sent_credits == 0) {
1539                 CERROR("%s[%d/%d+%d(%d)]: unexpected message\n",
1540                        libcfs_id2str(plp->plp_id),
1541                        plp->plp_credits, plp->plp_outstanding_credits,
1542                        plp->plp_sent_credits,
1543                        plni->plni_peer_credits + plp->plp_lazy_credits);
1544                 return;
1545         }
1546         plp->plp_sent_credits--;
1547
1548         /* No check for credit overflow - the peer may post new buffers after
1549          * the startup handshake. */
1550         plp->plp_credits += msg->ptlm_credits;
1551
1552         /* All OK so far; assume the message is good... */
1553
1554         rx.rx_peer      = plp;
1555         rx.rx_msg       = msg;
1556         rx.rx_nob       = nob;
1557         plni->plni_nrxs++;
1558
1559         switch (msg->ptlm_type) {
1560         default: /* message types have been checked already */
1561                 ptllnd_rx_done(&rx);
1562                 break;
1563
1564         case PTLLND_MSG_TYPE_PUT:
1565         case PTLLND_MSG_TYPE_GET:
1566                 rc = lnet_parse(ni, &msg->ptlm_u.rdma.kptlrm_hdr,
1567                                 msg->ptlm_srcnid, &rx, 1);
1568                 if (rc < 0)
1569                         ptllnd_rx_done(&rx);
1570                 break;
1571
1572         case PTLLND_MSG_TYPE_IMMEDIATE:
1573                 rc = lnet_parse(ni, &msg->ptlm_u.immediate.kptlim_hdr,
1574                                 msg->ptlm_srcnid, &rx, 0);
1575                 if (rc < 0)
1576                         ptllnd_rx_done(&rx);
1577                 break;
1578         }
1579
1580         if (msg->ptlm_credits > 0)
1581                 ptllnd_check_sends(plp);
1582
1583         ptllnd_peer_decref(plp);
1584 }
1585
1586 void
1587 ptllnd_buf_event (lnet_ni_t *ni, ptl_event_t *event)
1588 {
1589         ptllnd_buffer_t *buf = ptllnd_eventarg2obj(event->md.user_ptr);
1590         ptllnd_ni_t     *plni = ni->ni_data;
1591         char            *msg = &buf->plb_buffer[event->offset];
1592         int              repost;
1593         int              unlinked = event->type == PTL_EVENT_UNLINK;
1594
1595         LASSERT (buf->plb_ni == ni);
1596         LASSERT (event->type == PTL_EVENT_PUT_END ||
1597                  event->type == PTL_EVENT_UNLINK);
1598
1599         if (event->ni_fail_type != PTL_NI_OK) {
1600
1601                 CERROR("event type %s(%d), status %s(%d) from %s\n",
1602                        ptllnd_evtype2str(event->type), event->type,
1603                        ptllnd_errtype2str(event->ni_fail_type), 
1604                        event->ni_fail_type,
1605                        ptllnd_ptlid2str(event->initiator));
1606
1607         } else if (event->type == PTL_EVENT_PUT_END) {
1608 #if (PTL_MD_LOCAL_ALIGN8 == 0)
1609                 /* Portals can't force message alignment - someone sending an
1610                  * odd-length message could misalign subsequent messages */
1611                 if ((event->mlength & 7) != 0) {
1612                         CERROR("Message from %s has odd length %u: "
1613                                "probable version incompatibility\n",
1614                                ptllnd_ptlid2str(event->initiator),
1615                                event->mlength);
1616                         LBUG();
1617                 }
1618 #endif
1619                 LASSERT ((event->offset & 7) == 0);
1620
1621                 ptllnd_parse_request(ni, event->initiator,
1622                                      (kptl_msg_t *)msg, event->mlength);
1623         }
1624
1625 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
1626         /* UNLINK event only on explicit unlink */
1627         repost = (event->unlinked && event->type != PTL_EVENT_UNLINK);
1628         if (event->unlinked)
1629                 unlinked = 1;
1630 #else
1631         /* UNLINK event only on implicit unlink */
1632         repost = (event->type == PTL_EVENT_UNLINK);
1633 #endif
1634
1635         if (unlinked) {
1636                 LASSERT(buf->plb_posted);
1637                 buf->plb_posted = 0;
1638                 plni->plni_nposted_buffers--;
1639         }
1640
1641         if (repost)
1642                 (void) ptllnd_post_buffer(buf);
1643 }
1644
1645 void
1646 ptllnd_tx_event (lnet_ni_t *ni, ptl_event_t *event)
1647 {
1648         ptllnd_ni_t *plni = ni->ni_data;
1649         ptllnd_tx_t *tx = ptllnd_eventarg2obj(event->md.user_ptr);
1650         int          error = (event->ni_fail_type != PTL_NI_OK);
1651         int          isreq;
1652         int          isbulk;
1653 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
1654         int          unlinked = event->unlinked;
1655 #else
1656         int          unlinked = (event->type == PTL_EVENT_UNLINK);
1657 #endif
1658
1659         if (error)
1660                 CERROR("Error %s(%d) event %s(%d) unlinked %d, %s(%d) for %s\n",
1661                        ptllnd_errtype2str(event->ni_fail_type),
1662                        event->ni_fail_type,
1663                        ptllnd_evtype2str(event->type), event->type,
1664                        unlinked, ptllnd_msgtype2str(tx->tx_type), tx->tx_type,
1665                        libcfs_id2str(tx->tx_peer->plp_id));
1666
1667         LASSERT (!PtlHandleIsEqual(event->md_handle, PTL_INVALID_HANDLE));
1668
1669         isreq = PtlHandleIsEqual(event->md_handle, tx->tx_reqmdh);
1670         if (isreq) {
1671                 LASSERT (event->md.start == (void *)&tx->tx_msg);
1672                 if (unlinked) {
1673                         tx->tx_reqmdh = PTL_INVALID_HANDLE;
1674                         gettimeofday(&tx->tx_req_done, NULL);
1675                 }
1676         }
1677
1678         isbulk = PtlHandleIsEqual(event->md_handle, tx->tx_bulkmdh);
1679         if ( isbulk && unlinked ) {
1680                 tx->tx_bulkmdh = PTL_INVALID_HANDLE;
1681                 gettimeofday(&tx->tx_bulk_done, NULL);
1682         }
1683
1684         LASSERT (!isreq != !isbulk);            /* always one and only 1 match */
1685
1686         PTLLND_HISTORY("%s[%d/%d+%d(%d)]: TX done %p %s%s",
1687                        libcfs_id2str(tx->tx_peer->plp_id),
1688                        tx->tx_peer->plp_credits,
1689                        tx->tx_peer->plp_outstanding_credits,
1690                        tx->tx_peer->plp_sent_credits,
1691                        plni->plni_peer_credits + tx->tx_peer->plp_lazy_credits,
1692                        tx, isreq ? "REQ" : "BULK", unlinked ? "(unlinked)" : "");
1693
1694         LASSERT (!isreq != !isbulk);            /* always one and only 1 match */
1695         switch (tx->tx_type) {
1696         default:
1697                 LBUG();
1698
1699         case PTLLND_MSG_TYPE_NOOP:
1700         case PTLLND_MSG_TYPE_HELLO:
1701         case PTLLND_MSG_TYPE_IMMEDIATE:
1702                 LASSERT (event->type == PTL_EVENT_UNLINK ||
1703                          event->type == PTL_EVENT_SEND_END);
1704                 LASSERT (isreq);
1705                 break;
1706
1707         case PTLLND_MSG_TYPE_GET:
1708                 LASSERT (event->type == PTL_EVENT_UNLINK ||
1709                          (isreq && event->type == PTL_EVENT_SEND_END) ||
1710                          (isbulk && event->type == PTL_EVENT_PUT_END));
1711
1712                 if (isbulk && !error && event->type == PTL_EVENT_PUT_END) {
1713                         /* Check GET matched */
1714                         if (event->hdr_data == PTLLND_RDMA_OK) {
1715                                 lnet_set_reply_msg_len(ni, 
1716                                                        tx->tx_lnetreplymsg,
1717                                                        event->mlength);
1718                         } else {
1719                                 CERROR ("Unmatched GET with %s\n",
1720                                         libcfs_id2str(tx->tx_peer->plp_id));
1721                                 tx->tx_status = -EIO;
1722                         }
1723                 }
1724                 break;
1725
1726         case PTLLND_MSG_TYPE_PUT:
1727                 LASSERT (event->type == PTL_EVENT_UNLINK ||
1728                          (isreq && event->type == PTL_EVENT_SEND_END) ||
1729                          (isbulk && event->type == PTL_EVENT_GET_END));
1730                 break;
1731
1732         case PTLLND_RDMA_READ:
1733                 LASSERT (event->type == PTL_EVENT_UNLINK ||
1734                          event->type == PTL_EVENT_SEND_END ||
1735                          event->type == PTL_EVENT_REPLY_END);
1736                 LASSERT (isbulk);
1737                 break;
1738
1739         case PTLLND_RDMA_WRITE:
1740                 LASSERT (event->type == PTL_EVENT_UNLINK ||
1741                          event->type == PTL_EVENT_SEND_END);
1742                 LASSERT (isbulk);
1743         }
1744
1745         /* Schedule ptllnd_tx_done() on error or last completion event */
1746         if (error ||
1747             (PtlHandleIsEqual(tx->tx_bulkmdh, PTL_INVALID_HANDLE) &&
1748              PtlHandleIsEqual(tx->tx_reqmdh, PTL_INVALID_HANDLE))) {
1749                 if (error)
1750                         tx->tx_status = -EIO;
1751                 list_del(&tx->tx_list);
1752                 list_add_tail(&tx->tx_list, &plni->plni_zombie_txs);
1753         }
1754 }
1755
1756 ptllnd_tx_t *
1757 ptllnd_find_timed_out_tx(ptllnd_peer_t *peer)
1758 {
1759         time_t            now = cfs_time_current_sec();
1760         ptllnd_tx_t *tx;
1761
1762         list_for_each_entry (tx, &peer->plp_txq, tx_list) {
1763                 if (tx->tx_deadline < now)
1764                         return tx;
1765         }
1766
1767         list_for_each_entry (tx, &peer->plp_noopq, tx_list) {
1768                 if (tx->tx_deadline < now)
1769                         return tx;
1770         }
1771
1772         list_for_each_entry (tx, &peer->plp_activeq, tx_list) {
1773                 if (tx->tx_deadline < now)
1774                         return tx;
1775         }
1776
1777         return NULL;
1778 }
1779
1780 void
1781 ptllnd_check_peer(ptllnd_peer_t *peer)
1782 {
1783         ptllnd_tx_t *tx = ptllnd_find_timed_out_tx(peer);
1784
1785         if (tx == NULL)
1786                 return;
1787
1788         CERROR("%s: timed out\n", libcfs_id2str(peer->plp_id));
1789         ptllnd_close_peer(peer, -ETIMEDOUT);
1790 }
1791
1792 void
1793 ptllnd_watchdog (lnet_ni_t *ni, time_t now)
1794 {
1795         ptllnd_ni_t      *plni = ni->ni_data;
1796         const int         n = 4;
1797         int               p = plni->plni_watchdog_interval;
1798         int               chunk = plni->plni_peer_hash_size;
1799         int               interval = now - (plni->plni_watchdog_nextt - p);
1800         int               i;
1801         struct list_head *hashlist;
1802         struct list_head *tmp;
1803         struct list_head *nxt;
1804
1805         /* Time to check for RDMA timeouts on a few more peers: 
1806          * I try to do checks every 'p' seconds on a proportion of the peer
1807          * table and I need to check every connection 'n' times within a
1808          * timeout interval, to ensure I detect a timeout on any connection
1809          * within (n+1)/n times the timeout interval. */
1810
1811         LASSERT (now >= plni->plni_watchdog_nextt);
1812
1813         if (plni->plni_timeout > n * interval) { /* Scan less than the whole table? */
1814                 chunk = (chunk * n * interval) / plni->plni_timeout;
1815                 if (chunk == 0)
1816                         chunk = 1;
1817         }
1818
1819         for (i = 0; i < chunk; i++) {
1820                 hashlist = &plni->plni_peer_hash[plni->plni_watchdog_peeridx];
1821
1822                 list_for_each_safe(tmp, nxt, hashlist) {
1823                         ptllnd_check_peer(list_entry(tmp, ptllnd_peer_t, plp_list));
1824                 }
1825
1826                 plni->plni_watchdog_peeridx = (plni->plni_watchdog_peeridx + 1) %
1827                                               plni->plni_peer_hash_size;
1828         }
1829
1830         plni->plni_watchdog_nextt = now + p;
1831 }
1832
1833 void
1834 ptllnd_wait (lnet_ni_t *ni, int milliseconds)
1835 {
1836         static struct timeval  prevt;
1837         static int             prevt_count;
1838         static int             call_count;
1839
1840         struct timeval         start;
1841         struct timeval         then;
1842         struct timeval         now;
1843         struct timeval         deadline;
1844
1845         ptllnd_ni_t   *plni = ni->ni_data;
1846         ptllnd_tx_t   *tx;
1847         ptl_event_t    event;
1848         int            which;
1849         int            rc;
1850         int            found = 0;
1851         int            timeout = 0;
1852
1853         /* Handle any currently queued events, returning immediately if any.
1854          * Otherwise block for the timeout and handle all events queued
1855          * then. */
1856
1857         gettimeofday(&start, NULL);
1858         call_count++;
1859
1860         if (milliseconds <= 0) {
1861                 deadline = start;
1862         } else {
1863                 deadline.tv_sec  = start.tv_sec  +  milliseconds/1000;
1864                 deadline.tv_usec = start.tv_usec + (milliseconds % 1000)*1000;
1865
1866                 if (deadline.tv_usec >= 1000000) {
1867                         start.tv_usec -= 1000000;
1868                         start.tv_sec++;
1869                 }
1870         }
1871
1872         for (;;) {
1873                 gettimeofday(&then, NULL);
1874
1875                 rc = PtlEQPoll(&plni->plni_eqh, 1, timeout, &event, &which);
1876
1877                 gettimeofday(&now, NULL);
1878
1879                 if ((now.tv_sec*1000 + now.tv_usec/1000) - 
1880                     (then.tv_sec*1000 + then.tv_usec/1000) > timeout + 1000) {
1881                         /* 1000 mS grace...........................^ */
1882                         CERROR("SLOW PtlEQPoll(%d): %dmS elapsed\n", timeout,
1883                                (int)(now.tv_sec*1000 + now.tv_usec/1000) - 
1884                                (int)(then.tv_sec*1000 + then.tv_usec/1000));
1885                 }
1886
1887                 if (rc == PTL_EQ_EMPTY) {
1888                         if (found)              /* handled some events */
1889                                 break;
1890
1891                         if (now.tv_sec >= plni->plni_watchdog_nextt) { /* check timeouts? */
1892                                 ptllnd_watchdog(ni, now.tv_sec);
1893                                 LASSERT (now.tv_sec < plni->plni_watchdog_nextt);
1894                         }
1895
1896                         if (now.tv_sec > deadline.tv_sec || /* timeout expired */
1897                             (now.tv_sec == deadline.tv_sec &&
1898                              now.tv_usec >= deadline.tv_usec))
1899                                 break;
1900
1901                         if (milliseconds < 0 ||
1902                             plni->plni_watchdog_nextt <= deadline.tv_sec)  {
1903                                 timeout = (plni->plni_watchdog_nextt - now.tv_sec)*1000;
1904                         } else {
1905                                 timeout = (deadline.tv_sec - now.tv_sec)*1000 +
1906                                           (deadline.tv_usec - now.tv_usec)/1000;
1907                         }
1908
1909                         continue;
1910                 }
1911
1912                 LASSERT (rc == PTL_OK || rc == PTL_EQ_DROPPED);
1913
1914                 if (rc == PTL_EQ_DROPPED)
1915                         CERROR("Event queue: size %d is too small\n",
1916                                plni->plni_eq_size);
1917
1918                 timeout = 0;
1919                 found = 1;
1920
1921                 switch (ptllnd_eventarg2type(event.md.user_ptr)) {
1922                 default:
1923                         LBUG();
1924
1925                 case PTLLND_EVENTARG_TYPE_TX:
1926                         ptllnd_tx_event(ni, &event);
1927                         break;
1928
1929                 case PTLLND_EVENTARG_TYPE_BUF:
1930                         ptllnd_buf_event(ni, &event);
1931                         break;
1932                 }
1933         }
1934
1935         while (!list_empty(&plni->plni_zombie_txs)) {
1936                 tx = list_entry(plni->plni_zombie_txs.next,
1937                                 ptllnd_tx_t, tx_list);
1938                 list_del_init(&tx->tx_list);
1939                 ptllnd_tx_done(tx);
1940         }
1941
1942         if (prevt.tv_sec == 0 ||
1943             prevt.tv_sec != now.tv_sec) {
1944                 PTLLND_HISTORY("%d wait entered at %d.%06d - prev %d %d.%06d", 
1945                                call_count, (int)start.tv_sec, (int)start.tv_usec,
1946                                prevt_count, (int)prevt.tv_sec, (int)prevt.tv_usec);
1947                 prevt = now;
1948         }
1949 }