Whamcloud - gitweb
9cfa071348ce6208444e8b83079b925993672f26
[fs/lustre-release.git] / lnet / ulnds / ptllnd / ptllnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/ulnds/ptllnd/ptllnd_cb.c
37  *
38  * Author: Eric Barton <eeb@bartonsoftware.com>
39  */
40
41 #include "ptllnd.h"
42
43 void
44 ptllnd_set_tx_deadline(ptllnd_tx_t *tx)
45 {
46         ptllnd_peer_t  *peer = tx->tx_peer;
47         lnet_ni_t      *ni = peer->plp_ni;
48         ptllnd_ni_t    *plni = ni->ni_data;
49
50         tx->tx_deadline = cfs_time_current_sec() + plni->plni_timeout;
51 }
52
53 void
54 ptllnd_post_tx(ptllnd_tx_t *tx)
55 {
56         ptllnd_peer_t  *peer = tx->tx_peer;
57
58         LASSERT (tx->tx_type != PTLLND_MSG_TYPE_NOOP);
59
60         ptllnd_set_tx_deadline(tx);
61         list_add_tail(&tx->tx_list, &peer->plp_txq);
62         ptllnd_check_sends(peer);
63 }
64
65 char *
66 ptllnd_ptlid2str(ptl_process_id_t id)
67 {
68         static char strs[8][32];
69         static int  idx = 0;
70
71         char   *str = strs[idx++];
72
73         if (idx >= sizeof(strs)/sizeof(strs[0]))
74                 idx = 0;
75
76         snprintf(str, sizeof(strs[0]), FMT_PTLID, id.pid, id.nid);
77         return str;
78 }
79
80 void
81 ptllnd_destroy_peer(ptllnd_peer_t *peer)
82 {
83         lnet_ni_t         *ni = peer->plp_ni;
84         ptllnd_ni_t       *plni = ni->ni_data;
85         int                nmsg = peer->plp_lazy_credits +
86                                   plni->plni_peer_credits;
87
88         ptllnd_size_buffers(ni, -nmsg);
89
90         LASSERT (peer->plp_closing);
91         LASSERT (plni->plni_npeers > 0);
92         LASSERT (list_empty(&peer->plp_txq));
93         LASSERT (list_empty(&peer->plp_noopq));
94         LASSERT (list_empty(&peer->plp_activeq));
95         plni->plni_npeers--;
96         LIBCFS_FREE(peer, sizeof(*peer));
97 }
98
99 void
100 ptllnd_abort_txs(ptllnd_ni_t *plni, struct list_head *q)
101 {
102         while (!list_empty(q)) {
103                 ptllnd_tx_t *tx = list_entry(q->next, ptllnd_tx_t, tx_list);
104
105                 tx->tx_status = -ESHUTDOWN;
106                 list_del(&tx->tx_list);
107                 list_add_tail(&tx->tx_list, &plni->plni_zombie_txs);
108         }
109 }
110
111 void
112 ptllnd_close_peer(ptllnd_peer_t *peer, int error)
113 {
114         lnet_ni_t   *ni = peer->plp_ni;
115         ptllnd_ni_t *plni = ni->ni_data;
116
117         if (peer->plp_closing)
118                 return;
119
120         peer->plp_closing = 1;
121
122         if (!list_empty(&peer->plp_txq) ||
123             !list_empty(&peer->plp_noopq) ||
124             !list_empty(&peer->plp_activeq) ||
125             error != 0) {
126                 CWARN("Closing %s\n", libcfs_id2str(peer->plp_id));
127                 if (plni->plni_debug)
128                         ptllnd_dump_debug(ni, peer->plp_id);
129         }
130
131         ptllnd_abort_txs(plni, &peer->plp_txq);
132         ptllnd_abort_txs(plni, &peer->plp_noopq);
133         ptllnd_abort_txs(plni, &peer->plp_activeq);
134
135         list_del(&peer->plp_list);
136         ptllnd_peer_decref(peer);
137 }
138
139 ptllnd_peer_t *
140 ptllnd_find_peer(lnet_ni_t *ni, lnet_process_id_t id, int create)
141 {
142         ptllnd_ni_t       *plni = ni->ni_data;
143         unsigned int       hash = LNET_NIDADDR(id.nid) % plni->plni_peer_hash_size;
144         ptllnd_peer_t     *plp;
145         ptllnd_tx_t       *tx;
146         int                rc;
147
148         LASSERT (LNET_NIDNET(id.nid) == LNET_NIDNET(ni->ni_nid));
149
150         list_for_each_entry (plp, &plni->plni_peer_hash[hash], plp_list) {
151                 if (plp->plp_id.nid == id.nid &&
152                     plp->plp_id.pid == id.pid) {
153                         ptllnd_peer_addref(plp);
154                         return plp;
155                 }
156         }
157
158         if (!create)
159                 return NULL;
160
161         /* New peer: check first for enough posted buffers */
162         plni->plni_npeers++;
163         rc = ptllnd_size_buffers(ni, plni->plni_peer_credits);
164         if (rc != 0) {
165                 plni->plni_npeers--;
166                 return NULL;
167         }
168
169         LIBCFS_ALLOC(plp, sizeof(*plp));
170         if (plp == NULL) {
171                 CERROR("Can't allocate new peer %s\n", libcfs_id2str(id));
172                 plni->plni_npeers--;
173                 ptllnd_size_buffers(ni, -plni->plni_peer_credits);
174                 return NULL;
175         }
176
177         plp->plp_ni = ni;
178         plp->plp_id = id;
179         plp->plp_ptlid.nid = LNET_NIDADDR(id.nid);
180         plp->plp_ptlid.pid = plni->plni_ptllnd_pid;
181         plp->plp_credits = 1; /* add more later when she gives me credits */
182         plp->plp_max_msg_size = plni->plni_max_msg_size; /* until I hear from her */
183         plp->plp_sent_credits = 1;              /* Implicit credit for HELLO */
184         plp->plp_outstanding_credits = plni->plni_peer_credits - 1;
185         plp->plp_lazy_credits = 0;
186         plp->plp_extra_lazy_credits = 0;
187         plp->plp_match = 0;
188         plp->plp_stamp = 0;
189         plp->plp_sent_hello = 0;
190         plp->plp_recvd_hello = 0;
191         plp->plp_closing = 0;
192         plp->plp_refcount = 1;
193         CFS_INIT_LIST_HEAD(&plp->plp_list);
194         CFS_INIT_LIST_HEAD(&plp->plp_txq);
195         CFS_INIT_LIST_HEAD(&plp->plp_noopq);
196         CFS_INIT_LIST_HEAD(&plp->plp_activeq);
197
198         ptllnd_peer_addref(plp);
199         list_add_tail(&plp->plp_list, &plni->plni_peer_hash[hash]);
200
201         tx = ptllnd_new_tx(plp, PTLLND_MSG_TYPE_HELLO, 0);
202         if (tx == NULL) {
203                 CERROR("Can't send HELLO to %s\n", libcfs_id2str(id));
204                 ptllnd_close_peer(plp, -ENOMEM);
205                 ptllnd_peer_decref(plp);
206                 return NULL;
207         }
208
209         tx->tx_msg.ptlm_u.hello.kptlhm_matchbits = PTL_RESERVED_MATCHBITS;
210         tx->tx_msg.ptlm_u.hello.kptlhm_max_msg_size = plni->plni_max_msg_size;
211
212         PTLLND_HISTORY("%s[%d/%d+%d(%d)]: post hello %p", libcfs_id2str(id),
213                        tx->tx_peer->plp_credits,
214                        tx->tx_peer->plp_outstanding_credits,
215                        tx->tx_peer->plp_sent_credits,
216                        plni->plni_peer_credits + 
217                        tx->tx_peer->plp_lazy_credits, tx);
218         ptllnd_post_tx(tx);
219
220         return plp;
221 }
222
223 int
224 ptllnd_count_q(struct list_head *q)
225 {
226         struct list_head *e;
227         int               n = 0;
228
229         list_for_each(e, q) {
230                 n++;
231         }
232
233         return n;
234 }
235
236 const char *
237 ptllnd_tx_typestr(int type)
238 {
239         switch (type) {
240         case PTLLND_RDMA_WRITE:
241                 return "rdma_write";
242
243         case PTLLND_RDMA_READ:
244                 return "rdma_read";
245
246         case PTLLND_MSG_TYPE_PUT:
247                 return "put_req";
248
249         case PTLLND_MSG_TYPE_GET:
250                 return "get_req";
251
252         case PTLLND_MSG_TYPE_IMMEDIATE:
253                 return "immediate";
254
255         case PTLLND_MSG_TYPE_NOOP:
256                 return "noop";
257
258         case PTLLND_MSG_TYPE_HELLO:
259                 return "hello";
260
261         default:
262                 return "<unknown>";
263         }
264 }
265
266 void
267 ptllnd_debug_tx(ptllnd_tx_t *tx)
268 {
269         CDEBUG(D_WARNING, "%s %s b %ld.%06ld/%ld.%06ld"
270                " r %ld.%06ld/%ld.%06ld status %d\n",
271                ptllnd_tx_typestr(tx->tx_type),
272                libcfs_id2str(tx->tx_peer->plp_id),
273                tx->tx_bulk_posted.tv_sec, tx->tx_bulk_posted.tv_usec,
274                tx->tx_bulk_done.tv_sec, tx->tx_bulk_done.tv_usec,
275                tx->tx_req_posted.tv_sec, tx->tx_req_posted.tv_usec,
276                tx->tx_req_done.tv_sec, tx->tx_req_done.tv_usec,
277                tx->tx_status);
278 }
279
280 void
281 ptllnd_debug_peer(lnet_ni_t *ni, lnet_process_id_t id)
282 {
283         ptllnd_peer_t    *plp = ptllnd_find_peer(ni, id, 0);
284         ptllnd_ni_t      *plni = ni->ni_data;
285         ptllnd_tx_t      *tx;
286
287         if (plp == NULL) {
288                 CDEBUG(D_WARNING, "No peer %s\n", libcfs_id2str(id));
289                 return;
290         }
291
292         CWARN("%s %s%s [%d] "LPU64".%06d m "LPU64" q %d/%d/%d c %d/%d+%d(%d)\n",
293               libcfs_id2str(id),
294               plp->plp_recvd_hello ? "H" : "_",
295               plp->plp_closing     ? "C" : "_",
296               plp->plp_refcount,
297               plp->plp_stamp / 1000000, (int)(plp->plp_stamp % 1000000),
298               plp->plp_match,
299               ptllnd_count_q(&plp->plp_txq),
300               ptllnd_count_q(&plp->plp_noopq),
301               ptllnd_count_q(&plp->plp_activeq),
302               plp->plp_credits, plp->plp_outstanding_credits, plp->plp_sent_credits,
303               plni->plni_peer_credits + plp->plp_lazy_credits);
304
305         CDEBUG(D_WARNING, "txq:\n");
306         list_for_each_entry (tx, &plp->plp_txq, tx_list) {
307                 ptllnd_debug_tx(tx);
308         }
309
310         CDEBUG(D_WARNING, "noopq:\n");
311         list_for_each_entry (tx, &plp->plp_noopq, tx_list) {
312                 ptllnd_debug_tx(tx);
313         }
314
315         CDEBUG(D_WARNING, "activeq:\n");
316         list_for_each_entry (tx, &plp->plp_activeq, tx_list) {
317                 ptllnd_debug_tx(tx);
318         }
319
320         CDEBUG(D_WARNING, "zombies:\n");
321         list_for_each_entry (tx, &plni->plni_zombie_txs, tx_list) {
322                 if (tx->tx_peer->plp_id.nid == id.nid &&
323                     tx->tx_peer->plp_id.pid == id.pid)
324                         ptllnd_debug_tx(tx);
325         }
326
327         CDEBUG(D_WARNING, "history:\n");
328         list_for_each_entry (tx, &plni->plni_tx_history, tx_list) {
329                 if (tx->tx_peer->plp_id.nid == id.nid &&
330                     tx->tx_peer->plp_id.pid == id.pid)
331                         ptllnd_debug_tx(tx);
332         }
333
334         ptllnd_peer_decref(plp);
335 }
336
337 void
338 ptllnd_dump_debug(lnet_ni_t *ni, lnet_process_id_t id)
339 {
340         ptllnd_debug_peer(ni, id);
341         ptllnd_dump_history();
342 }
343
344 void
345 ptllnd_notify(lnet_ni_t *ni, lnet_nid_t nid, int alive)
346 {
347         lnet_process_id_t  id;
348         ptllnd_peer_t     *peer;
349         time_t             start = cfs_time_current_sec();
350         ptllnd_ni_t       *plni = ni->ni_data;
351         int                w = plni->plni_long_wait;
352
353         /* This is only actually used to connect to routers at startup! */
354         LASSERT(alive);
355
356         id.nid = nid;
357         id.pid = LUSTRE_SRV_LNET_PID;
358
359         peer = ptllnd_find_peer(ni, id, 1);
360         if (peer == NULL)
361                 return;
362
363         /* wait for the peer to reply */
364         while (!peer->plp_recvd_hello) {
365                 if (w > 0 && cfs_time_current_sec() > start + w/1000) {
366                         CWARN("Waited %ds to connect to %s\n",
367                               (int)(cfs_time_current_sec() - start),
368                               libcfs_id2str(id));
369                         w *= 2;
370                 }
371
372                 ptllnd_wait(ni, w);
373         }
374
375         ptllnd_peer_decref(peer);
376 }
377
378 int
379 ptllnd_setasync(lnet_ni_t *ni, lnet_process_id_t id, int nasync)
380 {
381         ptllnd_peer_t *peer = ptllnd_find_peer(ni, id, nasync > 0);
382         int            rc;
383
384         if (peer == NULL)
385                 return -ENOMEM;
386
387         LASSERT (peer->plp_lazy_credits >= 0);
388         LASSERT (peer->plp_extra_lazy_credits >= 0);
389
390         /* If nasync < 0, we're being told we can reduce the total message
391          * headroom.  We can't do this right now because our peer might already
392          * have credits for the extra buffers, so we just account the extra
393          * headroom in case we need it later and only destroy buffers when the
394          * peer closes.
395          *
396          * Note that the following condition handles this case, where it
397          * actually increases the extra lazy credit counter. */
398
399         if (nasync <= peer->plp_extra_lazy_credits) {
400                 peer->plp_extra_lazy_credits -= nasync;
401                 return 0;
402         }
403
404         LASSERT (nasync > 0);
405
406         nasync -= peer->plp_extra_lazy_credits;
407         peer->plp_extra_lazy_credits = 0;
408
409         rc = ptllnd_size_buffers(ni, nasync);
410         if (rc == 0) {
411                 peer->plp_lazy_credits += nasync;
412                 peer->plp_outstanding_credits += nasync;
413         }
414
415         return rc;
416 }
417
418 __u32
419 ptllnd_cksum (void *ptr, int nob)
420 {
421         char  *c  = ptr;
422         __u32  sum = 0;
423
424         while (nob-- > 0)
425                 sum = ((sum << 1) | (sum >> 31)) + *c++;
426
427         /* ensure I don't return 0 (== no checksum) */
428         return (sum == 0) ? 1 : sum;
429 }
430
431 ptllnd_tx_t *
432 ptllnd_new_tx(ptllnd_peer_t *peer, int type, int payload_nob)
433 {
434         lnet_ni_t   *ni = peer->plp_ni;
435         ptllnd_ni_t *plni = ni->ni_data;
436         ptllnd_tx_t *tx;
437         int          msgsize;
438
439         CDEBUG(D_NET, "peer=%p type=%d payload=%d\n", peer, type, payload_nob);
440
441         switch (type) {
442         default:
443                 LBUG();
444
445         case PTLLND_RDMA_WRITE:
446         case PTLLND_RDMA_READ:
447                 LASSERT (payload_nob == 0);
448                 msgsize = 0;
449                 break;
450
451         case PTLLND_MSG_TYPE_PUT:
452         case PTLLND_MSG_TYPE_GET:
453                 LASSERT (payload_nob == 0);
454                 msgsize = offsetof(kptl_msg_t, ptlm_u) + 
455                           sizeof(kptl_rdma_msg_t);
456                 break;
457
458         case PTLLND_MSG_TYPE_IMMEDIATE:
459                 msgsize = offsetof(kptl_msg_t,
460                                    ptlm_u.immediate.kptlim_payload[payload_nob]);
461                 break;
462
463         case PTLLND_MSG_TYPE_NOOP:
464                 LASSERT (payload_nob == 0);
465                 msgsize = offsetof(kptl_msg_t, ptlm_u);
466                 break;
467
468         case PTLLND_MSG_TYPE_HELLO:
469                 LASSERT (payload_nob == 0);
470                 msgsize = offsetof(kptl_msg_t, ptlm_u) +
471                           sizeof(kptl_hello_msg_t);
472                 break;
473         }
474
475         msgsize = (msgsize + 7) & ~7;
476         LASSERT (msgsize <= peer->plp_max_msg_size);
477
478         LIBCFS_ALLOC(tx, offsetof(ptllnd_tx_t, tx_msg) + msgsize);
479
480         if (tx == NULL) {
481                 CERROR("Can't allocate msg type %d for %s\n",
482                        type, libcfs_id2str(peer->plp_id));
483                 return NULL;
484         }
485
486         CFS_INIT_LIST_HEAD(&tx->tx_list);
487         tx->tx_peer = peer;
488         tx->tx_type = type;
489         tx->tx_lnetmsg = tx->tx_lnetreplymsg = NULL;
490         tx->tx_niov = 0;
491         tx->tx_iov = NULL;
492         tx->tx_reqmdh = PTL_INVALID_HANDLE;
493         tx->tx_bulkmdh = PTL_INVALID_HANDLE;
494         tx->tx_msgsize = msgsize;
495         tx->tx_completing = 0;
496         tx->tx_status = 0;
497
498         memset(&tx->tx_bulk_posted, 0, sizeof(tx->tx_bulk_posted));
499         memset(&tx->tx_bulk_done, 0, sizeof(tx->tx_bulk_done));
500         memset(&tx->tx_req_posted, 0, sizeof(tx->tx_req_posted));
501         memset(&tx->tx_req_done, 0, sizeof(tx->tx_req_done));
502
503         if (msgsize != 0) {
504                 tx->tx_msg.ptlm_magic = PTLLND_MSG_MAGIC;
505                 tx->tx_msg.ptlm_version = PTLLND_MSG_VERSION;
506                 tx->tx_msg.ptlm_type = type;
507                 tx->tx_msg.ptlm_credits = 0;
508                 tx->tx_msg.ptlm_nob = msgsize;
509                 tx->tx_msg.ptlm_cksum = 0;
510                 tx->tx_msg.ptlm_srcnid = ni->ni_nid;
511                 tx->tx_msg.ptlm_srcstamp = plni->plni_stamp;
512                 tx->tx_msg.ptlm_dstnid = peer->plp_id.nid;
513                 tx->tx_msg.ptlm_dststamp = peer->plp_stamp;
514                 tx->tx_msg.ptlm_srcpid = the_lnet.ln_pid;
515                 tx->tx_msg.ptlm_dstpid = peer->plp_id.pid;
516         }
517
518         ptllnd_peer_addref(peer);
519         plni->plni_ntxs++;
520
521         CDEBUG(D_NET, "tx=%p\n",tx);
522
523         return tx;
524 }
525
526 void
527 ptllnd_abort_tx(ptllnd_tx_t *tx, ptl_handle_md_t *mdh)
528 {
529         ptllnd_peer_t   *peer = tx->tx_peer;
530         lnet_ni_t       *ni = peer->plp_ni;
531         int              rc;
532         time_t           start = cfs_time_current_sec();
533         ptllnd_ni_t     *plni = ni->ni_data;
534         int              w = plni->plni_long_wait;
535
536         while (!PtlHandleIsEqual(*mdh, PTL_INVALID_HANDLE)) {
537                 rc = PtlMDUnlink(*mdh);
538 #ifndef LUSTRE_PORTALS_UNLINK_SEMANTICS
539                 if (rc == PTL_OK) /* unlink successful => no unlinked event */
540                         return;
541                 LASSERT (rc == PTL_MD_IN_USE);
542 #endif
543                 if (w > 0 && cfs_time_current_sec() > start + w/1000) {
544                         CWARN("Waited %ds to abort tx to %s\n",
545                               (int)(cfs_time_current_sec() - start),
546                               libcfs_id2str(peer->plp_id));
547                         w *= 2;
548                 }
549                 /* Wait for ptllnd_tx_event() to invalidate */
550                 ptllnd_wait(ni, w);
551         }
552 }
553
554 void
555 ptllnd_cull_tx_history(ptllnd_ni_t *plni)
556 {
557         int max = plni->plni_max_tx_history;
558
559         while (plni->plni_ntx_history > max) {
560                 ptllnd_tx_t *tx = list_entry(plni->plni_tx_history.next, 
561                                              ptllnd_tx_t, tx_list);
562                 list_del(&tx->tx_list);
563
564                 ptllnd_peer_decref(tx->tx_peer);
565
566                 LIBCFS_FREE(tx, offsetof(ptllnd_tx_t, tx_msg) + tx->tx_msgsize);
567
568                 LASSERT (plni->plni_ntxs > 0);
569                 plni->plni_ntxs--;
570                 plni->plni_ntx_history--;
571         }
572 }
573
574 void
575 ptllnd_tx_done(ptllnd_tx_t *tx)
576 {
577         ptllnd_peer_t   *peer = tx->tx_peer;
578         lnet_ni_t       *ni = peer->plp_ni;
579         ptllnd_ni_t     *plni = ni->ni_data;
580
581         /* CAVEAT EMPTOR: If this tx is being aborted, I'll continue to get
582          * events for this tx until it's unlinked.  So I set tx_completing to
583          * flag the tx is getting handled */
584
585         if (tx->tx_completing)
586                 return;
587
588         tx->tx_completing = 1;
589
590         if (!list_empty(&tx->tx_list))
591                 list_del_init(&tx->tx_list);
592
593         if (tx->tx_status != 0) {
594                 if (plni->plni_debug) {
595                         CERROR("Completing tx for %s with error %d\n",
596                                libcfs_id2str(peer->plp_id), tx->tx_status);
597                         ptllnd_debug_tx(tx);
598                 }
599                 ptllnd_close_peer(peer, tx->tx_status);
600         }
601
602         ptllnd_abort_tx(tx, &tx->tx_reqmdh);
603         ptllnd_abort_tx(tx, &tx->tx_bulkmdh);
604
605         if (tx->tx_niov > 0) {
606                 LIBCFS_FREE(tx->tx_iov, tx->tx_niov * sizeof(*tx->tx_iov));
607                 tx->tx_niov = 0;
608         }
609
610         if (tx->tx_lnetreplymsg != NULL) {
611                 LASSERT (tx->tx_type == PTLLND_MSG_TYPE_GET);
612                 LASSERT (tx->tx_lnetmsg != NULL);
613                 /* Simulate GET success always  */
614                 lnet_finalize(ni, tx->tx_lnetmsg, 0);
615                 CDEBUG(D_NET, "lnet_finalize(tx_lnetreplymsg=%p)\n",tx->tx_lnetreplymsg);
616                 lnet_finalize(ni, tx->tx_lnetreplymsg, tx->tx_status);
617         } else if (tx->tx_lnetmsg != NULL) {
618                 lnet_finalize(ni, tx->tx_lnetmsg, tx->tx_status);
619         }
620
621         plni->plni_ntx_history++;
622         list_add_tail(&tx->tx_list, &plni->plni_tx_history);
623
624         ptllnd_cull_tx_history(plni);
625 }
626
627 int
628 ptllnd_set_txiov(ptllnd_tx_t *tx,
629                  unsigned int niov, struct iovec *iov,
630                  unsigned int offset, unsigned int len)
631 {
632         ptl_md_iovec_t *piov;
633         int             npiov;
634
635         if (len == 0) {
636                 tx->tx_niov = 0;
637                 return 0;
638         }
639
640         /*
641          * Remove iovec's at the beginning that
642          * are skipped because of the offset.
643          * Adjust the offset accordingly
644          */
645         for (;;) {
646                 LASSERT (niov > 0);
647                 if (offset < iov->iov_len)
648                         break;
649                 offset -= iov->iov_len;
650                 niov--;
651                 iov++;
652         }
653
654         for (;;) {
655                 int temp_offset = offset;
656                 int resid = len;
657                 LIBCFS_ALLOC(piov, niov * sizeof(*piov));
658                 if (piov == NULL)
659                         return -ENOMEM;
660
661                 for (npiov = 0;; npiov++) {
662                         LASSERT (npiov < niov);
663                         LASSERT (iov->iov_len >= temp_offset);
664
665                         piov[npiov].iov_base = iov[npiov].iov_base + temp_offset;
666                         piov[npiov].iov_len = iov[npiov].iov_len - temp_offset;
667
668                         if (piov[npiov].iov_len >= resid) {
669                                 piov[npiov].iov_len = resid;
670                                 npiov++;
671                                 break;
672                         }
673                         resid -= piov[npiov].iov_len;
674                         temp_offset = 0;
675                 }
676
677                 if (npiov == niov) {
678                         tx->tx_niov = niov;
679                         tx->tx_iov = piov;
680                         return 0;
681                 }
682
683                 /* Dang! The piov I allocated was too big and it's a drag to
684                  * have to maintain separate 'allocated' and 'used' sizes, so
685                  * I'll just do it again; NB this doesn't happen normally... */
686                 LIBCFS_FREE(piov, niov * sizeof(*piov));
687                 niov = npiov;
688         }
689 }
690
691 void
692 ptllnd_set_md_buffer(ptl_md_t *md, ptllnd_tx_t *tx)
693 {
694         unsigned int    niov = tx->tx_niov;
695         ptl_md_iovec_t *iov = tx->tx_iov;
696
697         LASSERT ((md->options & PTL_MD_IOVEC) == 0);
698
699         if (niov == 0) {
700                 md->start = NULL;
701                 md->length = 0;
702         } else if (niov == 1) {
703                 md->start = iov[0].iov_base;
704                 md->length = iov[0].iov_len;
705         } else {
706                 md->start = iov;
707                 md->length = niov;
708                 md->options |= PTL_MD_IOVEC;
709         }
710 }
711
712 int
713 ptllnd_post_buffer(ptllnd_buffer_t *buf)
714 {
715         lnet_ni_t        *ni = buf->plb_ni;
716         ptllnd_ni_t      *plni = ni->ni_data;
717         ptl_process_id_t  anyid = {
718                 .nid       = PTL_NID_ANY,
719                 .pid       = PTL_PID_ANY};
720         ptl_md_t          md = {
721                 .start     = buf->plb_buffer,
722                 .length    = plni->plni_buffer_size,
723                 .threshold = PTL_MD_THRESH_INF,
724                 .max_size  = plni->plni_max_msg_size,
725                 .options   = (PTLLND_MD_OPTIONS |
726                               PTL_MD_OP_PUT | PTL_MD_MAX_SIZE | 
727                               PTL_MD_LOCAL_ALIGN8),
728                 .user_ptr  = ptllnd_obj2eventarg(buf, PTLLND_EVENTARG_TYPE_BUF),
729                 .eq_handle = plni->plni_eqh};
730         ptl_handle_me_t meh;
731         int             rc;
732
733         LASSERT (!buf->plb_posted);
734
735         rc = PtlMEAttach(plni->plni_nih, plni->plni_portal,
736                          anyid, LNET_MSG_MATCHBITS, 0,
737                          PTL_UNLINK, PTL_INS_AFTER, &meh);
738         if (rc != PTL_OK) {
739                 CERROR("PtlMEAttach failed: %s(%d)\n",
740                        ptllnd_errtype2str(rc), rc);
741                 return -ENOMEM;
742         }
743
744         buf->plb_posted = 1;
745         plni->plni_nposted_buffers++;
746
747         rc = PtlMDAttach(meh, md, LNET_UNLINK, &buf->plb_md);
748         if (rc == PTL_OK)
749                 return 0;
750
751         CERROR("PtlMDAttach failed: %s(%d)\n",
752                ptllnd_errtype2str(rc), rc);
753
754         buf->plb_posted = 0;
755         plni->plni_nposted_buffers--;
756
757         rc = PtlMEUnlink(meh);
758         LASSERT (rc == PTL_OK);
759
760         return -ENOMEM;
761 }
762
763 static inline int
764 ptllnd_peer_send_noop (ptllnd_peer_t *peer)
765 {
766         ptllnd_ni_t *plni = peer->plp_ni->ni_data;
767
768         if (!peer->plp_sent_hello ||
769             peer->plp_credits == 0 ||
770             !list_empty(&peer->plp_noopq) ||
771             peer->plp_outstanding_credits < PTLLND_CREDIT_HIGHWATER(plni))
772                 return 0;
773
774         /* No tx to piggyback NOOP onto or no credit to send a tx */
775         return (list_empty(&peer->plp_txq) || peer->plp_credits == 1);
776 }
777
778 void
779 ptllnd_check_sends(ptllnd_peer_t *peer)
780 {
781         ptllnd_ni_t    *plni = peer->plp_ni->ni_data;
782         ptllnd_tx_t    *tx;
783         ptl_md_t        md;
784         ptl_handle_md_t mdh;
785         int             rc;
786
787         CDEBUG(D_NET, "%s: [%d/%d+%d(%d)\n",
788                libcfs_id2str(peer->plp_id), peer->plp_credits,
789                peer->plp_outstanding_credits, peer->plp_sent_credits,
790                plni->plni_peer_credits + peer->plp_lazy_credits);
791
792         if (ptllnd_peer_send_noop(peer)) {
793                 tx = ptllnd_new_tx(peer, PTLLND_MSG_TYPE_NOOP, 0);
794                 CDEBUG(D_NET, "NOOP tx=%p\n",tx);
795                 if (tx == NULL) {
796                         CERROR("Can't return credits to %s\n",
797                                libcfs_id2str(peer->plp_id));
798                 } else {
799                         ptllnd_set_tx_deadline(tx);
800                         list_add_tail(&tx->tx_list, &peer->plp_noopq);
801                 }
802         }
803
804         for (;;) {
805                 if (!list_empty(&peer->plp_noopq)) {
806                         LASSERT (peer->plp_sent_hello);
807                         tx = list_entry(peer->plp_noopq.next,
808                                         ptllnd_tx_t, tx_list);
809                 } else if (!list_empty(&peer->plp_txq)) {
810                         tx = list_entry(peer->plp_txq.next,
811                                         ptllnd_tx_t, tx_list);
812                 } else {
813                         /* nothing to send right now */
814                         break;
815                 }
816
817                 LASSERT (tx->tx_msgsize > 0);
818
819                 LASSERT (peer->plp_outstanding_credits >= 0);
820                 LASSERT (peer->plp_sent_credits >= 0);
821                 LASSERT (peer->plp_outstanding_credits + peer->plp_sent_credits
822                          <= plni->plni_peer_credits + peer->plp_lazy_credits);
823                 LASSERT (peer->plp_credits >= 0);
824
825                 /* say HELLO first */
826                 if (!peer->plp_sent_hello) {
827                         LASSERT (list_empty(&peer->plp_noopq));
828                         LASSERT (tx->tx_type == PTLLND_MSG_TYPE_HELLO);
829
830                         peer->plp_sent_hello = 1;
831                 }
832
833                 if (peer->plp_credits == 0) {   /* no credits */
834                         PTLLND_HISTORY("%s[%d/%d+%d(%d)]: no creds for %p",
835                                        libcfs_id2str(peer->plp_id),
836                                        peer->plp_credits,
837                                        peer->plp_outstanding_credits,
838                                        peer->plp_sent_credits,
839                                        plni->plni_peer_credits +
840                                        peer->plp_lazy_credits, tx);
841                         break;
842                 }
843
844                 /* Last/Initial credit reserved for NOOP/HELLO */
845                 if (peer->plp_credits == 1 &&
846                     tx->tx_type != PTLLND_MSG_TYPE_NOOP &&
847                     tx->tx_type != PTLLND_MSG_TYPE_HELLO) {
848                         PTLLND_HISTORY("%s[%d/%d+%d(%d)]: too few creds for %p",
849                                        libcfs_id2str(peer->plp_id),
850                                        peer->plp_credits,
851                                        peer->plp_outstanding_credits,
852                                        peer->plp_sent_credits,
853                                        plni->plni_peer_credits +
854                                        peer->plp_lazy_credits, tx);
855                         break;
856                 }
857
858                 list_del(&tx->tx_list);
859                 list_add_tail(&tx->tx_list, &peer->plp_activeq);
860
861                 CDEBUG(D_NET, "Sending at TX=%p type=%s (%d)\n",tx,
862                         ptllnd_msgtype2str(tx->tx_type),tx->tx_type);
863
864                 if (tx->tx_type == PTLLND_MSG_TYPE_NOOP &&
865                     !ptllnd_peer_send_noop(peer)) {
866                         /* redundant NOOP */
867                         ptllnd_tx_done(tx);
868                         continue;
869                 }
870
871                 /* Set stamp at the last minute; on a new peer, I don't know it
872                  * until I receive the HELLO back */
873                 tx->tx_msg.ptlm_dststamp = peer->plp_stamp;
874
875                 /*
876                  * Return all the credits we have
877                  */
878                 tx->tx_msg.ptlm_credits = MIN(PTLLND_MSG_MAX_CREDITS,
879                                               peer->plp_outstanding_credits);
880                 peer->plp_sent_credits += tx->tx_msg.ptlm_credits;
881                 peer->plp_outstanding_credits -= tx->tx_msg.ptlm_credits;
882
883                 /*
884                  * One less credit
885                  */
886                 peer->plp_credits--;
887
888                 if (plni->plni_checksum)
889                         tx->tx_msg.ptlm_cksum = 
890                                 ptllnd_cksum(&tx->tx_msg,
891                                              offsetof(kptl_msg_t, ptlm_u));
892
893                 md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);
894                 md.eq_handle = plni->plni_eqh;
895                 md.threshold = 1;
896                 md.options = PTLLND_MD_OPTIONS;
897                 md.start = &tx->tx_msg;
898                 md.length = tx->tx_msgsize;
899
900                 rc = PtlMDBind(plni->plni_nih, md, LNET_UNLINK, &mdh);
901                 if (rc != PTL_OK) {
902                         CERROR("PtlMDBind for %s failed: %s(%d)\n",
903                                libcfs_id2str(peer->plp_id),
904                                ptllnd_errtype2str(rc), rc);
905                         tx->tx_status = -EIO;
906                         ptllnd_tx_done(tx);
907                         break;
908                 }
909
910                 LASSERT (tx->tx_type != PTLLND_RDMA_WRITE &&
911                          tx->tx_type != PTLLND_RDMA_READ);
912
913                 tx->tx_reqmdh = mdh;
914                 gettimeofday(&tx->tx_req_posted, NULL);
915
916                 PTLLND_HISTORY("%s[%d/%d+%d(%d)]: %s %p c %d",
917                                libcfs_id2str(peer->plp_id),
918                                peer->plp_credits,
919                                peer->plp_outstanding_credits,
920                                peer->plp_sent_credits,
921                                plni->plni_peer_credits +
922                                peer->plp_lazy_credits,
923                                ptllnd_msgtype2str(tx->tx_type), tx,
924                                tx->tx_msg.ptlm_credits);
925
926                 rc = PtlPut(mdh, PTL_NOACK_REQ, peer->plp_ptlid,
927                             plni->plni_portal, 0, LNET_MSG_MATCHBITS, 0, 0);
928                 if (rc != PTL_OK) {
929                         CERROR("PtlPut for %s failed: %s(%d)\n",
930                                libcfs_id2str(peer->plp_id),
931                                ptllnd_errtype2str(rc), rc);
932                         tx->tx_status = -EIO;
933                         ptllnd_tx_done(tx);
934                         break;
935                 }
936         }
937 }
938
939 int
940 ptllnd_passive_rdma(ptllnd_peer_t *peer, int type, lnet_msg_t *msg,
941                     unsigned int niov, struct iovec *iov,
942                     unsigned int offset, unsigned int len)
943 {
944         lnet_ni_t      *ni = peer->plp_ni;
945         ptllnd_ni_t    *plni = ni->ni_data;
946         ptllnd_tx_t    *tx = ptllnd_new_tx(peer, type, 0);
947         __u64           matchbits;
948         ptl_md_t        md;
949         ptl_handle_md_t mdh;
950         ptl_handle_me_t meh;
951         int             rc;
952         int             rc2;
953         time_t          start;
954         int             w;
955
956         CDEBUG(D_NET, "niov=%d offset=%d len=%d\n",niov,offset,len);
957
958         LASSERT (type == PTLLND_MSG_TYPE_GET ||
959                  type == PTLLND_MSG_TYPE_PUT);
960
961         if (tx == NULL) {
962                 CERROR("Can't allocate %s tx for %s\n",
963                        type == PTLLND_MSG_TYPE_GET ? "GET" : "PUT/REPLY",
964                        libcfs_id2str(peer->plp_id));
965                 return -ENOMEM;
966         }
967
968         rc = ptllnd_set_txiov(tx, niov, iov, offset, len);
969         if (rc != 0) {
970                 CERROR ("Can't allocate iov %d for %s\n",
971                         niov, libcfs_id2str(peer->plp_id));
972                 rc = -ENOMEM;
973                 goto failed;
974         }
975
976         md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);
977         md.eq_handle = plni->plni_eqh;
978         md.threshold = 1;
979         md.max_size = 0;
980         md.options = PTLLND_MD_OPTIONS;
981         if(type == PTLLND_MSG_TYPE_GET)
982                 md.options |= PTL_MD_OP_PUT | PTL_MD_ACK_DISABLE;
983         else
984                 md.options |= PTL_MD_OP_GET;
985         ptllnd_set_md_buffer(&md, tx);
986
987         start = cfs_time_current_sec();
988         w = plni->plni_long_wait;
989
990         while (!peer->plp_recvd_hello) {        /* wait to validate plp_match */
991                 if (peer->plp_closing) {
992                         rc = -EIO;
993                         goto failed;
994                 }
995                 if (w > 0 && cfs_time_current_sec() > start + w/1000) {
996                         CWARN("Waited %ds to connect to %s\n",
997                               (int)(cfs_time_current_sec() - start),
998                               libcfs_id2str(peer->plp_id));
999                         w *= 2;
1000                 }
1001                 ptllnd_wait(ni, w);
1002         }
1003
1004         if (peer->plp_match < PTL_RESERVED_MATCHBITS)
1005                 peer->plp_match = PTL_RESERVED_MATCHBITS;
1006         matchbits = peer->plp_match++;
1007
1008         rc = PtlMEAttach(plni->plni_nih, plni->plni_portal, peer->plp_ptlid,
1009                          matchbits, 0, PTL_UNLINK, PTL_INS_BEFORE, &meh);
1010         if (rc != PTL_OK) {
1011                 CERROR("PtlMEAttach for %s failed: %s(%d)\n",
1012                        libcfs_id2str(peer->plp_id),
1013                        ptllnd_errtype2str(rc), rc);
1014                 rc = -EIO;
1015                 goto failed;
1016         }
1017
1018         gettimeofday(&tx->tx_bulk_posted, NULL);
1019
1020         rc = PtlMDAttach(meh, md, LNET_UNLINK, &mdh);
1021         if (rc != PTL_OK) {
1022                 CERROR("PtlMDAttach for %s failed: %s(%d)\n",
1023                        libcfs_id2str(peer->plp_id),
1024                        ptllnd_errtype2str(rc), rc);
1025                 rc2 = PtlMEUnlink(meh);
1026                 LASSERT (rc2 == PTL_OK);
1027                 rc = -EIO;
1028                 goto failed;
1029         }
1030         tx->tx_bulkmdh = mdh;
1031
1032         /*
1033          * We need to set the stamp here because it
1034          * we could have received a HELLO above that set
1035          * peer->plp_stamp
1036          */
1037         tx->tx_msg.ptlm_dststamp = peer->plp_stamp;
1038
1039         tx->tx_msg.ptlm_u.rdma.kptlrm_hdr = msg->msg_hdr;
1040         tx->tx_msg.ptlm_u.rdma.kptlrm_matchbits = matchbits;
1041
1042         if (type == PTLLND_MSG_TYPE_GET) {
1043                 tx->tx_lnetreplymsg = lnet_create_reply_msg(ni, msg);
1044                 if (tx->tx_lnetreplymsg == NULL) {
1045                         CERROR("Can't create reply for GET to %s\n",
1046                                libcfs_id2str(msg->msg_target));
1047                         rc = -ENOMEM;
1048                         goto failed;
1049                 }
1050         }
1051
1052         tx->tx_lnetmsg = msg;
1053         PTLLND_HISTORY("%s[%d/%d+%d(%d)]: post passive %s p %d %p",
1054                        libcfs_id2str(msg->msg_target),
1055                        peer->plp_credits, peer->plp_outstanding_credits,
1056                        peer->plp_sent_credits,
1057                        plni->plni_peer_credits + peer->plp_lazy_credits,
1058                        lnet_msgtyp2str(msg->msg_type),
1059                        (le32_to_cpu(msg->msg_type) == LNET_MSG_PUT) ? 
1060                        le32_to_cpu(msg->msg_hdr.msg.put.ptl_index) :
1061                        (le32_to_cpu(msg->msg_type) == LNET_MSG_GET) ? 
1062                        le32_to_cpu(msg->msg_hdr.msg.get.ptl_index) : -1,
1063                        tx);
1064         ptllnd_post_tx(tx);
1065         return 0;
1066
1067  failed:
1068         ptllnd_tx_done(tx);
1069         return rc;
1070 }
1071
1072 int
1073 ptllnd_active_rdma(ptllnd_peer_t *peer, int type,
1074                    lnet_msg_t *msg, __u64 matchbits,
1075                    unsigned int niov, struct iovec *iov,
1076                    unsigned int offset, unsigned int len)
1077 {
1078         lnet_ni_t       *ni = peer->plp_ni;
1079         ptllnd_ni_t     *plni = ni->ni_data;
1080         ptllnd_tx_t     *tx = ptllnd_new_tx(peer, type, 0);
1081         ptl_md_t         md;
1082         ptl_handle_md_t  mdh;
1083         int              rc;
1084
1085         LASSERT (type == PTLLND_RDMA_READ ||
1086                  type == PTLLND_RDMA_WRITE);
1087
1088         if (tx == NULL) {
1089                 CERROR("Can't allocate tx for RDMA %s with %s\n",
1090                        (type == PTLLND_RDMA_WRITE) ? "write" : "read",
1091                        libcfs_id2str(peer->plp_id));
1092                 ptllnd_close_peer(peer, -ENOMEM);
1093                 return -ENOMEM;
1094         }
1095
1096         rc = ptllnd_set_txiov(tx, niov, iov, offset, len);
1097         if (rc != 0) {
1098                 CERROR ("Can't allocate iov %d for %s\n",
1099                         niov, libcfs_id2str(peer->plp_id));
1100                 rc = -ENOMEM;
1101                 goto failed;
1102         }
1103
1104         md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);
1105         md.eq_handle = plni->plni_eqh;
1106         md.max_size = 0;
1107         md.options = PTLLND_MD_OPTIONS;
1108         md.threshold = (type == PTLLND_RDMA_READ) ? 2 : 1;
1109
1110         ptllnd_set_md_buffer(&md, tx);
1111
1112         rc = PtlMDBind(plni->plni_nih, md, LNET_UNLINK, &mdh);
1113         if (rc != PTL_OK) {
1114                 CERROR("PtlMDBind for %s failed: %s(%d)\n",
1115                        libcfs_id2str(peer->plp_id),
1116                        ptllnd_errtype2str(rc), rc);
1117                 rc = -EIO;
1118                 goto failed;
1119         }
1120
1121         tx->tx_bulkmdh = mdh;
1122         tx->tx_lnetmsg = msg;
1123
1124         ptllnd_set_tx_deadline(tx);
1125         list_add_tail(&tx->tx_list, &peer->plp_activeq);
1126         gettimeofday(&tx->tx_bulk_posted, NULL);
1127
1128         if (type == PTLLND_RDMA_READ)
1129                 rc = PtlGet(mdh, peer->plp_ptlid,
1130                             plni->plni_portal, 0, matchbits, 0);
1131         else
1132                 rc = PtlPut(mdh, PTL_NOACK_REQ, peer->plp_ptlid,
1133                             plni->plni_portal, 0, matchbits, 0, 
1134                             (msg == NULL) ? PTLLND_RDMA_FAIL : PTLLND_RDMA_OK);
1135
1136         if (rc == PTL_OK)
1137                 return 0;
1138
1139         CERROR("Can't initiate RDMA with %s: %s(%d)\n",
1140                libcfs_id2str(peer->plp_id),
1141                ptllnd_errtype2str(rc), rc);
1142
1143         tx->tx_lnetmsg = NULL;
1144  failed:
1145         tx->tx_status = rc;
1146         ptllnd_tx_done(tx);    /* this will close peer */
1147         return rc;
1148 }
1149
1150 int
1151 ptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *msg)
1152 {
1153         ptllnd_ni_t    *plni = ni->ni_data;
1154         ptllnd_peer_t  *plp;
1155         ptllnd_tx_t    *tx;
1156         int             nob;
1157         int             rc;
1158
1159         LASSERT (!msg->msg_routing);
1160         LASSERT (msg->msg_kiov == NULL);
1161
1162         LASSERT (msg->msg_niov <= PTL_MD_MAX_IOV); /* !!! */
1163
1164         CDEBUG(D_NET, "%s [%d]+%d,%d -> %s%s\n",
1165                lnet_msgtyp2str(msg->msg_type),
1166                msg->msg_niov, msg->msg_offset, msg->msg_len,
1167                libcfs_nid2str(msg->msg_target.nid),
1168                msg->msg_target_is_router ? "(rtr)" : "");
1169
1170         if ((msg->msg_target.pid & LNET_PID_USERFLAG) != 0) {
1171                 CERROR("Can't send to non-kernel peer %s\n",
1172                        libcfs_id2str(msg->msg_target));
1173                 return -EHOSTUNREACH;
1174         }
1175
1176         plp = ptllnd_find_peer(ni, msg->msg_target, 1);
1177         if (plp == NULL)
1178                 return -ENOMEM;
1179
1180         switch (msg->msg_type) {
1181         default:
1182                 LBUG();
1183
1184         case LNET_MSG_ACK:
1185                 LASSERT (msg->msg_len == 0);
1186                 break;                          /* send IMMEDIATE */
1187
1188         case LNET_MSG_GET:
1189                 if (msg->msg_target_is_router)
1190                         break;                  /* send IMMEDIATE */
1191
1192                 nob = msg->msg_md->md_length;
1193                 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[nob]);
1194                 if (nob <= plni->plni_max_msg_size)
1195                         break;
1196
1197                 LASSERT ((msg->msg_md->md_options & LNET_MD_KIOV) == 0);
1198                 rc = ptllnd_passive_rdma(plp, PTLLND_MSG_TYPE_GET, msg,
1199                                          msg->msg_md->md_niov,
1200                                          msg->msg_md->md_iov.iov,
1201                                          0, msg->msg_md->md_length);
1202                 ptllnd_peer_decref(plp);
1203                 return rc;
1204
1205         case LNET_MSG_REPLY:
1206         case LNET_MSG_PUT:
1207                 nob = msg->msg_len;
1208                 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[nob]);
1209                 if (nob <= plp->plp_max_msg_size)
1210                         break;                  /* send IMMEDIATE */
1211
1212                 rc = ptllnd_passive_rdma(plp, PTLLND_MSG_TYPE_PUT, msg,
1213                                          msg->msg_niov, msg->msg_iov,
1214                                          msg->msg_offset, msg->msg_len);
1215                 ptllnd_peer_decref(plp);
1216                 return rc;
1217         }
1218
1219         /* send IMMEDIATE
1220          * NB copy the payload so we don't have to do a fragmented send */
1221
1222         tx = ptllnd_new_tx(plp, PTLLND_MSG_TYPE_IMMEDIATE, msg->msg_len);
1223         if (tx == NULL) {
1224                 CERROR("Can't allocate tx for lnet type %d to %s\n",
1225                        msg->msg_type, libcfs_id2str(msg->msg_target));
1226                 ptllnd_peer_decref(plp);
1227                 return -ENOMEM;
1228         }
1229
1230         lnet_copy_iov2flat(tx->tx_msgsize, &tx->tx_msg,
1231                            offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload),
1232                            msg->msg_niov, msg->msg_iov, msg->msg_offset,
1233                            msg->msg_len);
1234         tx->tx_msg.ptlm_u.immediate.kptlim_hdr = msg->msg_hdr;
1235
1236         tx->tx_lnetmsg = msg;
1237         PTLLND_HISTORY("%s[%d/%d+%d(%d)]: post immediate %s p %d %p",
1238                        libcfs_id2str(msg->msg_target),
1239                        plp->plp_credits, plp->plp_outstanding_credits,
1240                        plp->plp_sent_credits,
1241                        plni->plni_peer_credits + plp->plp_lazy_credits,
1242                        lnet_msgtyp2str(msg->msg_type),
1243                        (le32_to_cpu(msg->msg_type) == LNET_MSG_PUT) ? 
1244                        le32_to_cpu(msg->msg_hdr.msg.put.ptl_index) :
1245                        (le32_to_cpu(msg->msg_type) == LNET_MSG_GET) ? 
1246                        le32_to_cpu(msg->msg_hdr.msg.get.ptl_index) : -1,
1247                        tx);
1248         ptllnd_post_tx(tx);
1249         ptllnd_peer_decref(plp);
1250         return 0;
1251 }
1252
1253 void
1254 ptllnd_rx_done(ptllnd_rx_t *rx)
1255 {
1256         ptllnd_peer_t *plp = rx->rx_peer;
1257         ptllnd_ni_t   *plni = plp->plp_ni->ni_data;
1258
1259         plp->plp_outstanding_credits++;
1260
1261         PTLLND_HISTORY("%s[%d/%d+%d(%d)]: rx=%p done\n",
1262                        libcfs_id2str(plp->plp_id),
1263                        plp->plp_credits, plp->plp_outstanding_credits, 
1264                        plp->plp_sent_credits,
1265                        plni->plni_peer_credits + plp->plp_lazy_credits, rx);
1266
1267         ptllnd_check_sends(plp);
1268
1269         LASSERT (plni->plni_nrxs > 0);
1270         plni->plni_nrxs--;
1271 }
1272
1273 int
1274 ptllnd_eager_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg,
1275                   void **new_privatep)
1276 {
1277         /* Shouldn't get here; recvs only block for router buffers */
1278         LBUG();
1279         return 0;
1280 }
1281
1282 int
1283 ptllnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg,
1284             int delayed, unsigned int niov,
1285             struct iovec *iov, lnet_kiov_t *kiov,
1286             unsigned int offset, unsigned int mlen, unsigned int rlen)
1287 {
1288         ptllnd_rx_t    *rx = private;
1289         int             rc = 0;
1290         int             nob;
1291
1292         LASSERT (kiov == NULL);
1293         LASSERT (niov <= PTL_MD_MAX_IOV);       /* !!! */
1294
1295         switch (rx->rx_msg->ptlm_type) {
1296         default:
1297                 LBUG();
1298
1299         case PTLLND_MSG_TYPE_IMMEDIATE:
1300                 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[mlen]);
1301                 if (nob > rx->rx_nob) {
1302                         CERROR("Immediate message from %s too big: %d(%d)\n",
1303                                libcfs_id2str(rx->rx_peer->plp_id),
1304                                nob, rx->rx_nob);
1305                         rc = -EPROTO;
1306                         break;
1307                 }
1308                 lnet_copy_flat2iov(niov, iov, offset,
1309                                    rx->rx_nob, rx->rx_msg,
1310                                    offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload),
1311                                    mlen);
1312                 lnet_finalize(ni, msg, 0);
1313                 break;
1314
1315         case PTLLND_MSG_TYPE_PUT:
1316                 rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_READ, msg,
1317                                         rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,
1318                                         niov, iov, offset, mlen);
1319                 break;
1320
1321         case PTLLND_MSG_TYPE_GET:
1322                 if (msg != NULL)
1323                         rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_WRITE, msg,
1324                                                 rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,
1325                                                 msg->msg_niov, msg->msg_iov,
1326                                                 msg->msg_offset, msg->msg_len);
1327                 else
1328                         rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_WRITE, NULL,
1329                                                 rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,
1330                                                 0, NULL, 0, 0);
1331                 break;
1332         }
1333
1334         ptllnd_rx_done(rx);
1335         return rc;
1336 }
1337
1338 void
1339 ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator,
1340                      kptl_msg_t *msg, unsigned int nob)
1341 {
1342         ptllnd_ni_t      *plni = ni->ni_data;
1343         const int         basenob = offsetof(kptl_msg_t, ptlm_u);
1344         lnet_process_id_t srcid;
1345         ptllnd_rx_t       rx;
1346         int               flip;
1347         __u16             msg_version;
1348         __u32             msg_cksum;
1349         ptllnd_peer_t    *plp;
1350         int               rc;
1351
1352         if (nob < 6) {
1353                 CERROR("Very short receive from %s\n",
1354                        ptllnd_ptlid2str(initiator));
1355                 return;
1356         }
1357
1358         /* I can at least read MAGIC/VERSION */
1359
1360         flip = msg->ptlm_magic == __swab32(PTLLND_MSG_MAGIC);
1361         if (!flip && msg->ptlm_magic != PTLLND_MSG_MAGIC) {
1362                 CERROR("Bad protocol magic %08x from %s\n", 
1363                        msg->ptlm_magic, ptllnd_ptlid2str(initiator));
1364                 return;
1365         }
1366
1367         msg_version = flip ? __swab16(msg->ptlm_version) : msg->ptlm_version;
1368
1369         if (msg_version != PTLLND_MSG_VERSION) {
1370                 CERROR("Bad protocol version %04x from %s: %04x expected\n",
1371                        (__u32)msg_version, ptllnd_ptlid2str(initiator), PTLLND_MSG_VERSION);
1372
1373                 if (plni->plni_abort_on_protocol_mismatch)
1374                         abort();
1375
1376                 return;
1377         }
1378
1379         if (nob < basenob) {
1380                 CERROR("Short receive from %s: got %d, wanted at least %d\n",
1381                        ptllnd_ptlid2str(initiator), nob, basenob);
1382                 return;
1383         }
1384
1385         /* checksum must be computed with
1386          * 1) ptlm_cksum zero and
1387          * 2) BEFORE anything gets modified/flipped
1388          */
1389         msg_cksum = flip ? __swab32(msg->ptlm_cksum) : msg->ptlm_cksum;
1390         msg->ptlm_cksum = 0;
1391         if (msg_cksum != 0 &&
1392             msg_cksum != ptllnd_cksum(msg, offsetof(kptl_msg_t, ptlm_u))) {
1393                 CERROR("Bad checksum from %s\n", ptllnd_ptlid2str(initiator));
1394                 return;
1395         }
1396
1397         msg->ptlm_version = msg_version;
1398         msg->ptlm_cksum = msg_cksum;
1399
1400         if (flip) {
1401                 /* NB stamps are opaque cookies */
1402                 __swab32s(&msg->ptlm_nob);
1403                 __swab64s(&msg->ptlm_srcnid);
1404                 __swab64s(&msg->ptlm_dstnid);
1405                 __swab32s(&msg->ptlm_srcpid);
1406                 __swab32s(&msg->ptlm_dstpid);
1407         }
1408
1409         srcid.nid = msg->ptlm_srcnid;
1410         srcid.pid = msg->ptlm_srcpid;
1411
1412         if (LNET_NIDNET(msg->ptlm_srcnid) != LNET_NIDNET(ni->ni_nid)) {
1413                 CERROR("Bad source id %s from %s\n",
1414                        libcfs_id2str(srcid),
1415                        ptllnd_ptlid2str(initiator));
1416                 return;
1417         }
1418
1419         if (msg->ptlm_type == PTLLND_MSG_TYPE_NAK) {
1420                 CERROR("NAK from %s (%s)\n",
1421                        libcfs_id2str(srcid),
1422                        ptllnd_ptlid2str(initiator));
1423
1424                 if (plni->plni_dump_on_nak)
1425                         ptllnd_dump_debug(ni, srcid);
1426
1427                 if (plni->plni_abort_on_nak)
1428                         abort();
1429
1430                 return;
1431         }
1432
1433         if (msg->ptlm_dstnid != ni->ni_nid ||
1434             msg->ptlm_dstpid != the_lnet.ln_pid) {
1435                 CERROR("Bad dstid %s (%s expected) from %s\n",
1436                        libcfs_id2str((lnet_process_id_t) {
1437                                .nid = msg->ptlm_dstnid,
1438                                .pid = msg->ptlm_dstpid}),
1439                        libcfs_id2str((lnet_process_id_t) {
1440                                .nid = ni->ni_nid,
1441                                .pid = the_lnet.ln_pid}),
1442                        libcfs_id2str(srcid));
1443                 return;
1444         }
1445
1446         if (msg->ptlm_dststamp != plni->plni_stamp) {
1447                 CERROR("Bad dststamp "LPX64"("LPX64" expected) from %s\n",
1448                        msg->ptlm_dststamp, plni->plni_stamp,
1449                        libcfs_id2str(srcid));
1450                 return;
1451         }
1452
1453         PTLLND_HISTORY("RX %s: %s %d %p", libcfs_id2str(srcid), 
1454                        ptllnd_msgtype2str(msg->ptlm_type),
1455                        msg->ptlm_credits, &rx);
1456
1457         switch (msg->ptlm_type) {
1458         case PTLLND_MSG_TYPE_PUT:
1459         case PTLLND_MSG_TYPE_GET:
1460                 if (nob < basenob + sizeof(kptl_rdma_msg_t)) {
1461                         CERROR("Short rdma request from %s(%s)\n",
1462                                libcfs_id2str(srcid),
1463                                ptllnd_ptlid2str(initiator));
1464                         return;
1465                 }
1466                 if (flip)
1467                         __swab64s(&msg->ptlm_u.rdma.kptlrm_matchbits);
1468                 break;
1469
1470         case PTLLND_MSG_TYPE_IMMEDIATE:
1471                 if (nob < offsetof(kptl_msg_t,
1472                                    ptlm_u.immediate.kptlim_payload)) {
1473                         CERROR("Short immediate from %s(%s)\n",
1474                                libcfs_id2str(srcid),
1475                                ptllnd_ptlid2str(initiator));
1476                         return;
1477                 }
1478                 break;
1479
1480         case PTLLND_MSG_TYPE_HELLO:
1481                 if (nob < basenob + sizeof(kptl_hello_msg_t)) {
1482                         CERROR("Short hello from %s(%s)\n",
1483                                libcfs_id2str(srcid),
1484                                ptllnd_ptlid2str(initiator));
1485                         return;
1486                 }
1487                 if(flip){
1488                         __swab64s(&msg->ptlm_u.hello.kptlhm_matchbits);
1489                         __swab32s(&msg->ptlm_u.hello.kptlhm_max_msg_size);
1490                 }
1491                 break;
1492
1493         case PTLLND_MSG_TYPE_NOOP:
1494                 break;
1495
1496         default:
1497                 CERROR("Bad message type %d from %s(%s)\n", msg->ptlm_type,
1498                        libcfs_id2str(srcid),
1499                        ptllnd_ptlid2str(initiator));
1500                 return;
1501         }
1502
1503         plp = ptllnd_find_peer(ni, srcid, 0);
1504         if (plp == NULL) {
1505                 CERROR("Can't find peer %s\n", libcfs_id2str(srcid));
1506                 return;
1507         }
1508
1509         if (msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {
1510                 if (plp->plp_recvd_hello) {
1511                         CERROR("Unexpected HELLO from %s\n",
1512                                libcfs_id2str(srcid));
1513                         ptllnd_peer_decref(plp);
1514                         return;
1515                 }
1516
1517                 plp->plp_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size;
1518                 plp->plp_match = msg->ptlm_u.hello.kptlhm_matchbits;
1519                 plp->plp_stamp = msg->ptlm_srcstamp;
1520                 plp->plp_recvd_hello = 1;
1521
1522         } else if (!plp->plp_recvd_hello) {
1523
1524                 CERROR("Bad message type %d (HELLO expected) from %s\n",
1525                        msg->ptlm_type, libcfs_id2str(srcid));
1526                 ptllnd_peer_decref(plp);
1527                 return;
1528
1529         } else if (msg->ptlm_srcstamp != plp->plp_stamp) {
1530
1531                 CERROR("Bad srcstamp "LPX64"("LPX64" expected) from %s\n",
1532                        msg->ptlm_srcstamp, plp->plp_stamp,
1533                        libcfs_id2str(srcid));
1534                 ptllnd_peer_decref(plp);
1535                 return;
1536         }
1537
1538         /* Check peer only sends when I've sent her credits */
1539         if (plp->plp_sent_credits == 0) {
1540                 CERROR("%s[%d/%d+%d(%d)]: unexpected message\n",
1541                        libcfs_id2str(plp->plp_id),
1542                        plp->plp_credits, plp->plp_outstanding_credits,
1543                        plp->plp_sent_credits,
1544                        plni->plni_peer_credits + plp->plp_lazy_credits);
1545                 return;
1546         }
1547         plp->plp_sent_credits--;
1548
1549         /* No check for credit overflow - the peer may post new buffers after
1550          * the startup handshake. */
1551         plp->plp_credits += msg->ptlm_credits;
1552
1553         /* All OK so far; assume the message is good... */
1554
1555         rx.rx_peer      = plp;
1556         rx.rx_msg       = msg;
1557         rx.rx_nob       = nob;
1558         plni->plni_nrxs++;
1559
1560         switch (msg->ptlm_type) {
1561         default: /* message types have been checked already */
1562                 ptllnd_rx_done(&rx);
1563                 break;
1564
1565         case PTLLND_MSG_TYPE_PUT:
1566         case PTLLND_MSG_TYPE_GET:
1567                 rc = lnet_parse(ni, &msg->ptlm_u.rdma.kptlrm_hdr,
1568                                 msg->ptlm_srcnid, &rx, 1);
1569                 if (rc < 0)
1570                         ptllnd_rx_done(&rx);
1571                 break;
1572
1573         case PTLLND_MSG_TYPE_IMMEDIATE:
1574                 rc = lnet_parse(ni, &msg->ptlm_u.immediate.kptlim_hdr,
1575                                 msg->ptlm_srcnid, &rx, 0);
1576                 if (rc < 0)
1577                         ptllnd_rx_done(&rx);
1578                 break;
1579         }
1580
1581         if (msg->ptlm_credits > 0)
1582                 ptllnd_check_sends(plp);
1583
1584         ptllnd_peer_decref(plp);
1585 }
1586
1587 void
1588 ptllnd_buf_event (lnet_ni_t *ni, ptl_event_t *event)
1589 {
1590         ptllnd_buffer_t *buf = ptllnd_eventarg2obj(event->md.user_ptr);
1591         ptllnd_ni_t     *plni = ni->ni_data;
1592         char            *msg = &buf->plb_buffer[event->offset];
1593         int              repost;
1594         int              unlinked = event->type == PTL_EVENT_UNLINK;
1595
1596         LASSERT (buf->plb_ni == ni);
1597         LASSERT (event->type == PTL_EVENT_PUT_END ||
1598                  event->type == PTL_EVENT_UNLINK);
1599
1600         if (event->ni_fail_type != PTL_NI_OK) {
1601
1602                 CERROR("event type %s(%d), status %s(%d) from %s\n",
1603                        ptllnd_evtype2str(event->type), event->type,
1604                        ptllnd_errtype2str(event->ni_fail_type), 
1605                        event->ni_fail_type,
1606                        ptllnd_ptlid2str(event->initiator));
1607
1608         } else if (event->type == PTL_EVENT_PUT_END) {
1609 #if (PTL_MD_LOCAL_ALIGN8 == 0)
1610                 /* Portals can't force message alignment - someone sending an
1611                  * odd-length message could misalign subsequent messages */
1612                 if ((event->mlength & 7) != 0) {
1613                         CERROR("Message from %s has odd length %u: "
1614                                "probable version incompatibility\n",
1615                                ptllnd_ptlid2str(event->initiator),
1616                                event->mlength);
1617                         LBUG();
1618                 }
1619 #endif
1620                 LASSERT ((event->offset & 7) == 0);
1621
1622                 ptllnd_parse_request(ni, event->initiator,
1623                                      (kptl_msg_t *)msg, event->mlength);
1624         }
1625
1626 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
1627         /* UNLINK event only on explicit unlink */
1628         repost = (event->unlinked && event->type != PTL_EVENT_UNLINK);
1629         if (event->unlinked)
1630                 unlinked = 1;
1631 #else
1632         /* UNLINK event only on implicit unlink */
1633         repost = (event->type == PTL_EVENT_UNLINK);
1634 #endif
1635
1636         if (unlinked) {
1637                 LASSERT(buf->plb_posted);
1638                 buf->plb_posted = 0;
1639                 plni->plni_nposted_buffers--;
1640         }
1641
1642         if (repost)
1643                 (void) ptllnd_post_buffer(buf);
1644 }
1645
1646 void
1647 ptllnd_tx_event (lnet_ni_t *ni, ptl_event_t *event)
1648 {
1649         ptllnd_ni_t *plni = ni->ni_data;
1650         ptllnd_tx_t *tx = ptllnd_eventarg2obj(event->md.user_ptr);
1651         int          error = (event->ni_fail_type != PTL_NI_OK);
1652         int          isreq;
1653         int          isbulk;
1654 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
1655         int          unlinked = event->unlinked;
1656 #else
1657         int          unlinked = (event->type == PTL_EVENT_UNLINK);
1658 #endif
1659
1660         if (error)
1661                 CERROR("Error %s(%d) event %s(%d) unlinked %d, %s(%d) for %s\n",
1662                        ptllnd_errtype2str(event->ni_fail_type),
1663                        event->ni_fail_type,
1664                        ptllnd_evtype2str(event->type), event->type,
1665                        unlinked, ptllnd_msgtype2str(tx->tx_type), tx->tx_type,
1666                        libcfs_id2str(tx->tx_peer->plp_id));
1667
1668         LASSERT (!PtlHandleIsEqual(event->md_handle, PTL_INVALID_HANDLE));
1669
1670         isreq = PtlHandleIsEqual(event->md_handle, tx->tx_reqmdh);
1671         if (isreq) {
1672                 LASSERT (event->md.start == (void *)&tx->tx_msg);
1673                 if (unlinked) {
1674                         tx->tx_reqmdh = PTL_INVALID_HANDLE;
1675                         gettimeofday(&tx->tx_req_done, NULL);
1676                 }
1677         }
1678
1679         isbulk = PtlHandleIsEqual(event->md_handle, tx->tx_bulkmdh);
1680         if ( isbulk && unlinked ) {
1681                 tx->tx_bulkmdh = PTL_INVALID_HANDLE;
1682                 gettimeofday(&tx->tx_bulk_done, NULL);
1683         }
1684
1685         LASSERT (!isreq != !isbulk);            /* always one and only 1 match */
1686
1687         PTLLND_HISTORY("%s[%d/%d+%d(%d)]: TX done %p %s%s",
1688                        libcfs_id2str(tx->tx_peer->plp_id),
1689                        tx->tx_peer->plp_credits,
1690                        tx->tx_peer->plp_outstanding_credits,
1691                        tx->tx_peer->plp_sent_credits,
1692                        plni->plni_peer_credits + tx->tx_peer->plp_lazy_credits,
1693                        tx, isreq ? "REQ" : "BULK", unlinked ? "(unlinked)" : "");
1694
1695         LASSERT (!isreq != !isbulk);            /* always one and only 1 match */
1696         switch (tx->tx_type) {
1697         default:
1698                 LBUG();
1699
1700         case PTLLND_MSG_TYPE_NOOP:
1701         case PTLLND_MSG_TYPE_HELLO:
1702         case PTLLND_MSG_TYPE_IMMEDIATE:
1703                 LASSERT (event->type == PTL_EVENT_UNLINK ||
1704                          event->type == PTL_EVENT_SEND_END);
1705                 LASSERT (isreq);
1706                 break;
1707
1708         case PTLLND_MSG_TYPE_GET:
1709                 LASSERT (event->type == PTL_EVENT_UNLINK ||
1710                          (isreq && event->type == PTL_EVENT_SEND_END) ||
1711                          (isbulk && event->type == PTL_EVENT_PUT_END));
1712
1713                 if (isbulk && !error && event->type == PTL_EVENT_PUT_END) {
1714                         /* Check GET matched */
1715                         if (event->hdr_data == PTLLND_RDMA_OK) {
1716                                 lnet_set_reply_msg_len(ni, 
1717                                                        tx->tx_lnetreplymsg,
1718                                                        event->mlength);
1719                         } else {
1720                                 CERROR ("Unmatched GET with %s\n",
1721                                         libcfs_id2str(tx->tx_peer->plp_id));
1722                                 tx->tx_status = -EIO;
1723                         }
1724                 }
1725                 break;
1726
1727         case PTLLND_MSG_TYPE_PUT:
1728                 LASSERT (event->type == PTL_EVENT_UNLINK ||
1729                          (isreq && event->type == PTL_EVENT_SEND_END) ||
1730                          (isbulk && event->type == PTL_EVENT_GET_END));
1731                 break;
1732
1733         case PTLLND_RDMA_READ:
1734                 LASSERT (event->type == PTL_EVENT_UNLINK ||
1735                          event->type == PTL_EVENT_SEND_END ||
1736                          event->type == PTL_EVENT_REPLY_END);
1737                 LASSERT (isbulk);
1738                 break;
1739
1740         case PTLLND_RDMA_WRITE:
1741                 LASSERT (event->type == PTL_EVENT_UNLINK ||
1742                          event->type == PTL_EVENT_SEND_END);
1743                 LASSERT (isbulk);
1744         }
1745
1746         /* Schedule ptllnd_tx_done() on error or last completion event */
1747         if (error ||
1748             (PtlHandleIsEqual(tx->tx_bulkmdh, PTL_INVALID_HANDLE) &&
1749              PtlHandleIsEqual(tx->tx_reqmdh, PTL_INVALID_HANDLE))) {
1750                 if (error)
1751                         tx->tx_status = -EIO;
1752                 list_del(&tx->tx_list);
1753                 list_add_tail(&tx->tx_list, &plni->plni_zombie_txs);
1754         }
1755 }
1756
1757 ptllnd_tx_t *
1758 ptllnd_find_timed_out_tx(ptllnd_peer_t *peer)
1759 {
1760         time_t            now = cfs_time_current_sec();
1761         ptllnd_tx_t *tx;
1762
1763         list_for_each_entry (tx, &peer->plp_txq, tx_list) {
1764                 if (tx->tx_deadline < now)
1765                         return tx;
1766         }
1767
1768         list_for_each_entry (tx, &peer->plp_noopq, tx_list) {
1769                 if (tx->tx_deadline < now)
1770                         return tx;
1771         }
1772
1773         list_for_each_entry (tx, &peer->plp_activeq, tx_list) {
1774                 if (tx->tx_deadline < now)
1775                         return tx;
1776         }
1777
1778         return NULL;
1779 }
1780
1781 void
1782 ptllnd_check_peer(ptllnd_peer_t *peer)
1783 {
1784         ptllnd_tx_t *tx = ptllnd_find_timed_out_tx(peer);
1785
1786         if (tx == NULL)
1787                 return;
1788
1789         CERROR("%s: timed out\n", libcfs_id2str(peer->plp_id));
1790         ptllnd_close_peer(peer, -ETIMEDOUT);
1791 }
1792
1793 void
1794 ptllnd_watchdog (lnet_ni_t *ni, time_t now)
1795 {
1796         ptllnd_ni_t      *plni = ni->ni_data;
1797         const int         n = 4;
1798         int               p = plni->plni_watchdog_interval;
1799         int               chunk = plni->plni_peer_hash_size;
1800         int               interval = now - (plni->plni_watchdog_nextt - p);
1801         int               i;
1802         struct list_head *hashlist;
1803         struct list_head *tmp;
1804         struct list_head *nxt;
1805
1806         /* Time to check for RDMA timeouts on a few more peers: 
1807          * I try to do checks every 'p' seconds on a proportion of the peer
1808          * table and I need to check every connection 'n' times within a
1809          * timeout interval, to ensure I detect a timeout on any connection
1810          * within (n+1)/n times the timeout interval. */
1811
1812         LASSERT (now >= plni->plni_watchdog_nextt);
1813
1814         if (plni->plni_timeout > n * interval) { /* Scan less than the whole table? */
1815                 chunk = (chunk * n * interval) / plni->plni_timeout;
1816                 if (chunk == 0)
1817                         chunk = 1;
1818         }
1819
1820         for (i = 0; i < chunk; i++) {
1821                 hashlist = &plni->plni_peer_hash[plni->plni_watchdog_peeridx];
1822
1823                 list_for_each_safe(tmp, nxt, hashlist) {
1824                         ptllnd_check_peer(list_entry(tmp, ptllnd_peer_t, plp_list));
1825                 }
1826
1827                 plni->plni_watchdog_peeridx = (plni->plni_watchdog_peeridx + 1) %
1828                                               plni->plni_peer_hash_size;
1829         }
1830
1831         plni->plni_watchdog_nextt = now + p;
1832 }
1833
1834 void
1835 ptllnd_wait (lnet_ni_t *ni, int milliseconds)
1836 {
1837         static struct timeval  prevt;
1838         static int             prevt_count;
1839         static int             call_count;
1840
1841         struct timeval         start;
1842         struct timeval         then;
1843         struct timeval         now;
1844         struct timeval         deadline;
1845
1846         ptllnd_ni_t   *plni = ni->ni_data;
1847         ptllnd_tx_t   *tx;
1848         ptl_event_t    event;
1849         int            which;
1850         int            rc;
1851         int            found = 0;
1852         int            timeout = 0;
1853
1854         /* Handle any currently queued events, returning immediately if any.
1855          * Otherwise block for the timeout and handle all events queued
1856          * then. */
1857
1858         gettimeofday(&start, NULL);
1859         call_count++;
1860
1861         if (milliseconds <= 0) {
1862                 deadline = start;
1863         } else {
1864                 deadline.tv_sec  = start.tv_sec  +  milliseconds/1000;
1865                 deadline.tv_usec = start.tv_usec + (milliseconds % 1000)*1000;
1866
1867                 if (deadline.tv_usec >= 1000000) {
1868                         start.tv_usec -= 1000000;
1869                         start.tv_sec++;
1870                 }
1871         }
1872
1873         for (;;) {
1874                 gettimeofday(&then, NULL);
1875
1876                 rc = PtlEQPoll(&plni->plni_eqh, 1, timeout, &event, &which);
1877
1878                 gettimeofday(&now, NULL);
1879
1880                 if ((now.tv_sec*1000 + now.tv_usec/1000) - 
1881                     (then.tv_sec*1000 + then.tv_usec/1000) > timeout + 1000) {
1882                         /* 1000 mS grace...........................^ */
1883                         CERROR("SLOW PtlEQPoll(%d): %dmS elapsed\n", timeout,
1884                                (int)(now.tv_sec*1000 + now.tv_usec/1000) - 
1885                                (int)(then.tv_sec*1000 + then.tv_usec/1000));
1886                 }
1887
1888                 if (rc == PTL_EQ_EMPTY) {
1889                         if (found)              /* handled some events */
1890                                 break;
1891
1892                         if (now.tv_sec >= plni->plni_watchdog_nextt) { /* check timeouts? */
1893                                 ptllnd_watchdog(ni, now.tv_sec);
1894                                 LASSERT (now.tv_sec < plni->plni_watchdog_nextt);
1895                         }
1896
1897                         if (now.tv_sec > deadline.tv_sec || /* timeout expired */
1898                             (now.tv_sec == deadline.tv_sec &&
1899                              now.tv_usec >= deadline.tv_usec))
1900                                 break;
1901
1902                         if (milliseconds < 0 ||
1903                             plni->plni_watchdog_nextt <= deadline.tv_sec)  {
1904                                 timeout = (plni->plni_watchdog_nextt - now.tv_sec)*1000;
1905                         } else {
1906                                 timeout = (deadline.tv_sec - now.tv_sec)*1000 +
1907                                           (deadline.tv_usec - now.tv_usec)/1000;
1908                         }
1909
1910                         continue;
1911                 }
1912
1913                 LASSERT (rc == PTL_OK || rc == PTL_EQ_DROPPED);
1914
1915                 if (rc == PTL_EQ_DROPPED)
1916                         CERROR("Event queue: size %d is too small\n",
1917                                plni->plni_eq_size);
1918
1919                 timeout = 0;
1920                 found = 1;
1921
1922                 switch (ptllnd_eventarg2type(event.md.user_ptr)) {
1923                 default:
1924                         LBUG();
1925
1926                 case PTLLND_EVENTARG_TYPE_TX:
1927                         ptllnd_tx_event(ni, &event);
1928                         break;
1929
1930                 case PTLLND_EVENTARG_TYPE_BUF:
1931                         ptllnd_buf_event(ni, &event);
1932                         break;
1933                 }
1934         }
1935
1936         while (!list_empty(&plni->plni_zombie_txs)) {
1937                 tx = list_entry(plni->plni_zombie_txs.next,
1938                                 ptllnd_tx_t, tx_list);
1939                 list_del_init(&tx->tx_list);
1940                 ptllnd_tx_done(tx);
1941         }
1942
1943         if (prevt.tv_sec == 0 ||
1944             prevt.tv_sec != now.tv_sec) {
1945                 PTLLND_HISTORY("%d wait entered at %d.%06d - prev %d %d.%06d", 
1946                                call_count, (int)start.tv_sec, (int)start.tv_usec,
1947                                prevt_count, (int)prevt.tv_sec, (int)prevt.tv_usec);
1948                 prevt = now;
1949         }
1950 }