Whamcloud - gitweb
b=5498
[fs/lustre-release.git] / lnet / ulnds / ptllnd / ptllnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved.
5  *   Author: Eric Barton <eeb@bartonsoftware.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   This file is confidential source code owned by Cluster File Systems.
11  *   No viewing, modification, compilation, redistribution, or any other
12  *   form of use is permitted except through a signed license agreement.
13  *
14  *   If you have not signed such an agreement, then you have no rights to
15  *   this file.  Please destroy it immediately and contact CFS.
16  *
17  */
18
19 #include "ptllnd.h"
20
21 void
22 ptllnd_set_tx_deadline(ptllnd_tx_t *tx)
23 {
24         ptllnd_peer_t  *peer = tx->tx_peer;
25         lnet_ni_t      *ni = peer->plp_ni;
26         ptllnd_ni_t    *plni = ni->ni_data;
27
28         tx->tx_deadline = cfs_time_current_sec() + plni->plni_timeout;
29 }
30
31 void
32 ptllnd_post_tx(ptllnd_tx_t *tx)
33 {
34         ptllnd_peer_t  *peer = tx->tx_peer;
35
36         ptllnd_set_tx_deadline(tx);
37         list_add_tail(&tx->tx_list, &peer->plp_txq);
38         ptllnd_check_sends(peer);
39 }
40
41 char *
42 ptllnd_ptlid2str(ptl_process_id_t id)
43 {
44         static char strs[8][32];
45         static int  idx = 0;
46
47         char   *str = strs[idx++];
48         
49         if (idx >= sizeof(strs)/sizeof(strs[0]))
50                 idx = 0;
51
52         snprintf(str, sizeof(strs[0]), FMT_PTLID, id.pid, id.nid);
53         return str;
54 }
55
56 void
57 ptllnd_destroy_peer(ptllnd_peer_t *peer)
58 {
59         lnet_ni_t         *ni = peer->plp_ni;
60         ptllnd_ni_t       *plni = ni->ni_data;
61         int                nmsg = peer->plp_lazy_credits +
62                                   plni->plni_peer_credits;
63
64         ptllnd_size_buffers(ni, -nmsg);
65
66         LASSERT (peer->plp_closing);
67         LASSERT (plni->plni_npeers > 0);
68         LASSERT (list_empty(&peer->plp_txq));
69         LASSERT (list_empty(&peer->plp_activeq));
70         plni->plni_npeers--;
71         LIBCFS_FREE(peer, sizeof(*peer));
72 }
73
74 void
75 ptllnd_abort_txs(ptllnd_ni_t *plni, struct list_head *q)
76 {
77         while (!list_empty(q)) {
78                 ptllnd_tx_t *tx = list_entry(q->next, ptllnd_tx_t, tx_list);
79
80                 tx->tx_status = -ESHUTDOWN;
81                 list_del(&tx->tx_list);
82                 list_add_tail(&tx->tx_list, &plni->plni_zombie_txs);
83         }
84 }
85
86 void
87 ptllnd_close_peer(ptllnd_peer_t *peer, int error)
88 {
89         lnet_ni_t   *ni = peer->plp_ni;
90         ptllnd_ni_t *plni = ni->ni_data;
91
92         if (peer->plp_closing)
93                 return;
94
95         peer->plp_closing = 1;
96
97         if (!list_empty(&peer->plp_txq) ||
98             !list_empty(&peer->plp_activeq) ||
99             error != 0) {
100                 CWARN("Closing %s\n", libcfs_id2str(peer->plp_id));
101                 if (plni->plni_debug)
102                         ptllnd_dump_debug(ni, peer->plp_id);
103         }
104         
105         ptllnd_abort_txs(plni, &peer->plp_txq);
106         ptllnd_abort_txs(plni, &peer->plp_activeq);
107
108         list_del(&peer->plp_list);
109         ptllnd_peer_decref(peer);
110 }
111
112 ptllnd_peer_t *
113 ptllnd_find_peer(lnet_ni_t *ni, lnet_process_id_t id, int create)
114 {
115         ptllnd_ni_t       *plni = ni->ni_data;
116         unsigned int       hash = LNET_NIDADDR(id.nid) % plni->plni_peer_hash_size;
117         struct list_head  *tmp;
118         ptllnd_peer_t     *plp;
119         ptllnd_tx_t       *tx;
120         int                rc;
121
122         LASSERT (LNET_NIDNET(id.nid) == LNET_NIDNET(ni->ni_nid));
123
124         list_for_each(tmp, &plni->plni_peer_hash[hash]) {
125                 plp = list_entry(tmp, ptllnd_peer_t, plp_list);
126
127                 if (plp->plp_id.nid == id.nid &&
128                     plp->plp_id.pid == id.pid) {
129                         ptllnd_peer_addref(plp);
130                         return plp;
131                 }
132         }
133
134         if (!create)
135                 return NULL;
136
137         /* New peer: check first for enough posted buffers */
138         plni->plni_npeers++;
139         rc = ptllnd_size_buffers(ni, plni->plni_peer_credits);
140         if (rc != 0) {
141                 plni->plni_npeers--;
142                 return NULL;
143         }
144
145         LIBCFS_ALLOC(plp, sizeof(*plp));
146         if (plp == NULL) {
147                 CERROR("Can't allocate new peer %s\n", libcfs_id2str(id));
148                 plni->plni_npeers--;
149                 ptllnd_size_buffers(ni, -plni->plni_peer_credits);
150                 return NULL;
151         }
152
153         plp->plp_ni = ni;
154         plp->plp_id = id;
155         plp->plp_ptlid.nid = LNET_NIDADDR(id.nid);
156         plp->plp_ptlid.pid = plni->plni_ptllnd_pid;
157         plp->plp_credits = 1; /* add more later when she gives me credits */
158         plp->plp_max_msg_size = plni->plni_max_msg_size; /* until I hear from her */
159         plp->plp_sent_credits = 1;              /* Implicit credit for HELLO */
160         plp->plp_outstanding_credits = plni->plni_peer_credits - 1;
161         plp->plp_lazy_credits = 0;
162         plp->plp_extra_lazy_credits = 0;
163         plp->plp_match = 0;
164         plp->plp_stamp = 0;
165         plp->plp_recvd_hello = 0;
166         plp->plp_closing = 0;
167         plp->plp_refcount = 1;
168         CFS_INIT_LIST_HEAD(&plp->plp_list);
169         CFS_INIT_LIST_HEAD(&plp->plp_txq);
170         CFS_INIT_LIST_HEAD(&plp->plp_activeq);
171
172         ptllnd_peer_addref(plp);
173         list_add_tail(&plp->plp_list, &plni->plni_peer_hash[hash]);
174
175         tx = ptllnd_new_tx(plp, PTLLND_MSG_TYPE_HELLO, 0);
176         if (tx == NULL) {
177                 CERROR("Can't send HELLO to %s\n", libcfs_id2str(id));
178                 ptllnd_close_peer(plp, -ENOMEM);
179                 ptllnd_peer_decref(plp);
180                 return NULL;
181         }
182
183         tx->tx_msg.ptlm_u.hello.kptlhm_matchbits = PTL_RESERVED_MATCHBITS;
184         tx->tx_msg.ptlm_u.hello.kptlhm_max_msg_size = plni->plni_max_msg_size;
185
186         PTLLND_HISTORY("%s[%d/%d+%d(%d)]: post hello %p", libcfs_id2str(id),
187                        tx->tx_peer->plp_credits,
188                        tx->tx_peer->plp_outstanding_credits,
189                        tx->tx_peer->plp_sent_credits,
190                        plni->plni_peer_credits + 
191                        tx->tx_peer->plp_lazy_credits, tx);
192         ptllnd_post_tx(tx);
193
194         return plp;
195 }
196
197 int
198 ptllnd_count_q(struct list_head *q)
199 {
200         struct list_head *e;
201         int               n = 0;
202         
203         list_for_each(e, q) {
204                 n++;
205         }
206         
207         return n;
208 }
209
210 const char *
211 ptllnd_tx_typestr(int type) 
212 {
213         switch (type) {
214         case PTLLND_RDMA_WRITE:
215                 return "rdma_write";
216                 
217         case PTLLND_RDMA_READ:
218                 return "rdma_read";
219
220         case PTLLND_MSG_TYPE_PUT:
221                 return "put_req";
222                 
223         case PTLLND_MSG_TYPE_GET:
224                 return "get_req";
225
226         case PTLLND_MSG_TYPE_IMMEDIATE:
227                 return "immediate";
228
229         case PTLLND_MSG_TYPE_NOOP:
230                 return "noop";
231
232         case PTLLND_MSG_TYPE_HELLO:
233                 return "hello";
234
235         default:
236                 return "<unknown>";
237         }
238 }
239
240 void
241 ptllnd_debug_tx(ptllnd_tx_t *tx) 
242 {
243         CDEBUG(D_WARNING, "%s %s b %ld.%06ld/%ld.%06ld"
244                " r %ld.%06ld/%ld.%06ld status %d\n",
245                ptllnd_tx_typestr(tx->tx_type),
246                libcfs_id2str(tx->tx_peer->plp_id),
247                tx->tx_bulk_posted.tv_sec, tx->tx_bulk_posted.tv_usec, 
248                tx->tx_bulk_done.tv_sec, tx->tx_bulk_done.tv_usec,
249                tx->tx_req_posted.tv_sec, tx->tx_req_posted.tv_usec,
250                tx->tx_req_done.tv_sec, tx->tx_req_done.tv_usec,
251                tx->tx_status);
252 }
253
254 void
255 ptllnd_debug_peer(lnet_ni_t *ni, lnet_process_id_t id)
256 {
257         ptllnd_peer_t    *plp = ptllnd_find_peer(ni, id, 0);
258         struct list_head *tmp;
259         ptllnd_ni_t      *plni = ni->ni_data;
260         ptllnd_tx_t      *tx;
261         
262         if (plp == NULL) {
263                 CDEBUG(D_WARNING, "No peer %s\n", libcfs_id2str(id));
264                 return;
265         }
266         
267         CDEBUG(D_WARNING, "%s %s%s [%d] "LPU64".%06d m "LPU64" q %d/%d c %d/%d+%d(%d)\n",
268                libcfs_id2str(id), 
269                plp->plp_recvd_hello ? "H" : "_",
270                plp->plp_closing     ? "C" : "_",
271                plp->plp_refcount,
272                plp->plp_stamp / 1000000, (int)(plp->plp_stamp % 1000000),
273                plp->plp_match,
274                ptllnd_count_q(&plp->plp_txq),
275                ptllnd_count_q(&plp->plp_activeq),
276                plp->plp_credits, plp->plp_outstanding_credits, plp->plp_sent_credits,
277                plni->plni_peer_credits + plp->plp_lazy_credits);
278
279         CDEBUG(D_WARNING, "txq:\n");
280         list_for_each (tmp, &plp->plp_txq) {
281                 tx = list_entry(tmp, ptllnd_tx_t, tx_list);
282                 
283                 ptllnd_debug_tx(tx);
284         }
285
286         CDEBUG(D_WARNING, "activeq:\n");
287         list_for_each (tmp, &plp->plp_activeq) {
288                 tx = list_entry(tmp, ptllnd_tx_t, tx_list);
289                 
290                 ptllnd_debug_tx(tx);
291         }
292
293         CDEBUG(D_WARNING, "zombies:\n");
294         list_for_each (tmp, &plni->plni_zombie_txs) {
295                 tx = list_entry(tmp, ptllnd_tx_t, tx_list);
296                 
297                 if (tx->tx_peer->plp_id.nid == id.nid &&
298                     tx->tx_peer->plp_id.pid == id.pid)
299                         ptllnd_debug_tx(tx);
300         }
301         
302         CDEBUG(D_WARNING, "history:\n");
303         list_for_each (tmp, &plni->plni_tx_history) {
304                 tx = list_entry(tmp, ptllnd_tx_t, tx_list);
305                 
306                 if (tx->tx_peer->plp_id.nid == id.nid &&
307                     tx->tx_peer->plp_id.pid == id.pid)
308                         ptllnd_debug_tx(tx);
309         }
310         
311         ptllnd_peer_decref(plp);
312 }
313
314 void
315 ptllnd_dump_debug(lnet_ni_t *ni, lnet_process_id_t id)
316 {
317         ptllnd_debug_peer(ni, id);
318         ptllnd_dump_history();
319 }
320
321 void
322 ptllnd_notify(lnet_ni_t *ni, lnet_nid_t nid, int alive)
323 {
324         lnet_process_id_t  id;
325         ptllnd_peer_t     *peer;
326         time_t             start = cfs_time_current_sec();
327         ptllnd_ni_t       *plni = ni->ni_data;
328         int                w = plni->plni_long_wait;
329
330         /* This is only actually used to connect to routers at startup! */
331         LASSERT(alive);
332
333         id.nid = nid;
334         id.pid = LUSTRE_SRV_LNET_PID;
335         
336         peer = ptllnd_find_peer(ni, id, 1);
337         if (peer == NULL)
338                 return;
339
340         /* wait for the peer to reply */
341         while (!peer->plp_recvd_hello) {
342                 if (w > 0 && cfs_time_current_sec() > start + w/1000) {
343                         CWARN("Waited %ds to connect to %s\n",
344                               (int)(cfs_time_current_sec() - start),
345                               libcfs_id2str(id));
346                         w *= 2;
347                 }
348                 
349                 ptllnd_wait(ni, w);
350         }
351         
352         ptllnd_peer_decref(peer);
353 }
354
355 int
356 ptllnd_setasync(lnet_ni_t *ni, lnet_process_id_t id, int nasync)
357 {
358         ptllnd_peer_t *peer = ptllnd_find_peer(ni, id, nasync > 0);
359         int            rc;
360         
361         if (peer == NULL)
362                 return -ENOMEM;
363
364         LASSERT (peer->plp_lazy_credits >= 0);
365         LASSERT (peer->plp_extra_lazy_credits >= 0);
366
367         /* If nasync < 0, we're being told we can reduce the total message
368          * headroom.  We can't do this right now because our peer might already
369          * have credits for the extra buffers, so we just account the extra
370          * headroom in case we need it later and only destroy buffers when the
371          * peer closes.
372          *
373          * Note that the following condition handles this case, where it
374          * actually increases the extra lazy credit counter. */
375
376         if (nasync <= peer->plp_extra_lazy_credits) {
377                 peer->plp_extra_lazy_credits -= nasync;
378                 return 0;
379         }
380
381         LASSERT (nasync > 0);
382
383         nasync -= peer->plp_extra_lazy_credits;
384         peer->plp_extra_lazy_credits = 0;
385         
386         rc = ptllnd_size_buffers(ni, nasync);
387         if (rc == 0) {
388                 peer->plp_lazy_credits += nasync;
389                 peer->plp_outstanding_credits += nasync;
390         }
391
392         return rc;
393 }
394
395 __u32
396 ptllnd_cksum (void *ptr, int nob)
397 {
398         char  *c  = ptr;
399         __u32  sum = 0;
400
401         while (nob-- > 0)
402                 sum = ((sum << 1) | (sum >> 31)) + *c++;
403
404         /* ensure I don't return 0 (== no checksum) */
405         return (sum == 0) ? 1 : sum;
406 }
407
408 ptllnd_tx_t *
409 ptllnd_new_tx(ptllnd_peer_t *peer, int type, int payload_nob)
410 {
411         lnet_ni_t   *ni = peer->plp_ni;
412         ptllnd_ni_t *plni = ni->ni_data;
413         ptllnd_tx_t *tx;
414         int          msgsize;
415
416         CDEBUG(D_NET, "peer=%p type=%d payload=%d\n", peer, type, payload_nob);
417
418         switch (type) {
419         default:
420                 LBUG();
421
422         case PTLLND_RDMA_WRITE:
423         case PTLLND_RDMA_READ:
424                 LASSERT (payload_nob == 0);
425                 msgsize = 0;
426                 break;
427
428         case PTLLND_MSG_TYPE_PUT:
429         case PTLLND_MSG_TYPE_GET:
430                 LASSERT (payload_nob == 0);
431                 msgsize = offsetof(kptl_msg_t, ptlm_u) + 
432                           sizeof(kptl_rdma_msg_t);
433                 break;
434
435         case PTLLND_MSG_TYPE_IMMEDIATE:
436                 msgsize = offsetof(kptl_msg_t,
437                                    ptlm_u.immediate.kptlim_payload[payload_nob]);
438                 break;
439
440         case PTLLND_MSG_TYPE_NOOP:
441                 LASSERT (payload_nob == 0);
442                 msgsize = offsetof(kptl_msg_t, ptlm_u);
443                 break;
444
445         case PTLLND_MSG_TYPE_HELLO:
446                 LASSERT (payload_nob == 0);
447                 msgsize = offsetof(kptl_msg_t, ptlm_u) +
448                           sizeof(kptl_hello_msg_t);
449                 break;
450         }
451
452         msgsize = (msgsize + 7) & ~7;
453         LASSERT (msgsize <= peer->plp_max_msg_size);
454
455         LIBCFS_ALLOC(tx, offsetof(ptllnd_tx_t, tx_msg) + msgsize);
456
457         if (tx == NULL) {
458                 CERROR("Can't allocate msg type %d for %s\n",
459                        type, libcfs_id2str(peer->plp_id));
460                 return NULL;
461         }
462
463         CFS_INIT_LIST_HEAD(&tx->tx_list);
464         tx->tx_peer = peer;
465         tx->tx_type = type;
466         tx->tx_lnetmsg = tx->tx_lnetreplymsg = NULL;
467         tx->tx_niov = 0;
468         tx->tx_iov = NULL;
469         tx->tx_reqmdh = PTL_INVALID_HANDLE;
470         tx->tx_bulkmdh = PTL_INVALID_HANDLE;
471         tx->tx_msgsize = msgsize;
472         tx->tx_completing = 0;
473         tx->tx_status = 0;
474
475         memset(&tx->tx_bulk_posted, 0, sizeof(tx->tx_bulk_posted));
476         memset(&tx->tx_bulk_done, 0, sizeof(tx->tx_bulk_done));
477         memset(&tx->tx_req_posted, 0, sizeof(tx->tx_req_posted));
478         memset(&tx->tx_req_done, 0, sizeof(tx->tx_req_done));
479
480         if (msgsize != 0) {
481                 tx->tx_msg.ptlm_magic = PTLLND_MSG_MAGIC;
482                 tx->tx_msg.ptlm_version = PTLLND_MSG_VERSION;
483                 tx->tx_msg.ptlm_type = type;
484                 tx->tx_msg.ptlm_credits = 0;
485                 tx->tx_msg.ptlm_nob = msgsize;
486                 tx->tx_msg.ptlm_cksum = 0;
487                 tx->tx_msg.ptlm_srcnid = ni->ni_nid;
488                 tx->tx_msg.ptlm_srcstamp = plni->plni_stamp;
489                 tx->tx_msg.ptlm_dstnid = peer->plp_id.nid;
490                 tx->tx_msg.ptlm_dststamp = peer->plp_stamp;
491                 tx->tx_msg.ptlm_srcpid = the_lnet.ln_pid;
492                 tx->tx_msg.ptlm_dstpid = peer->plp_id.pid;
493         }
494
495         ptllnd_peer_addref(peer);
496         plni->plni_ntxs++;
497
498         CDEBUG(D_NET, "tx=%p\n",tx);
499
500         return tx;
501 }
502
503 void
504 ptllnd_abort_tx(ptllnd_tx_t *tx, ptl_handle_md_t *mdh)
505 {
506         ptllnd_peer_t   *peer = tx->tx_peer;
507         lnet_ni_t       *ni = peer->plp_ni;
508         int              rc;
509         time_t           start = cfs_time_current_sec();
510         ptllnd_ni_t     *plni = ni->ni_data;
511         int              w = plni->plni_long_wait;
512
513         while (!PtlHandleIsEqual(*mdh, PTL_INVALID_HANDLE)) {
514                 rc = PtlMDUnlink(*mdh);
515 #ifndef LUSTRE_PORTALS_UNLINK_SEMANTICS
516                 if (rc == PTL_OK) /* unlink successful => no unlinked event */
517                         return;
518                 LASSERT (rc == PTL_MD_IN_USE);
519 #endif
520                 if (w > 0 && cfs_time_current_sec() > start + w/1000) {
521                         CWARN("Waited %ds to abort tx to %s\n",
522                               (int)(cfs_time_current_sec() - start),
523                               libcfs_id2str(peer->plp_id));
524                         w *= 2;
525                 }
526                 /* Wait for ptllnd_tx_event() to invalidate */
527                 ptllnd_wait(ni, w);
528         }
529 }
530
531 void
532 ptllnd_cull_tx_history(ptllnd_ni_t *plni)
533 {
534         int max = plni->plni_max_tx_history;
535
536         while (plni->plni_ntx_history > max) {
537                 ptllnd_tx_t *tx = list_entry(plni->plni_tx_history.next, 
538                                              ptllnd_tx_t, tx_list);
539                 list_del(&tx->tx_list);
540
541                 ptllnd_peer_decref(tx->tx_peer);
542
543                 LIBCFS_FREE(tx, offsetof(ptllnd_tx_t, tx_msg) + tx->tx_msgsize);
544
545                 LASSERT (plni->plni_ntxs > 0);
546                 plni->plni_ntxs--;
547                 plni->plni_ntx_history--;
548         }
549 }
550
551 void
552 ptllnd_tx_done(ptllnd_tx_t *tx)
553 {
554         ptllnd_peer_t   *peer = tx->tx_peer;
555         lnet_ni_t       *ni = peer->plp_ni;
556         ptllnd_ni_t     *plni = ni->ni_data;
557
558         /* CAVEAT EMPTOR: If this tx is being aborted, I'll continue to get
559          * events for this tx until it's unlinked.  So I set tx_completing to
560          * flag the tx is getting handled */
561
562         if (tx->tx_completing)
563                 return;
564
565         tx->tx_completing = 1;
566
567         if (!list_empty(&tx->tx_list))
568                 list_del_init(&tx->tx_list);
569
570         if (tx->tx_status != 0) {
571                 if (plni->plni_debug) {
572                         CERROR("Completing tx for %s with error %d\n",
573                                libcfs_id2str(peer->plp_id), tx->tx_status);
574                         ptllnd_debug_tx(tx);
575                 }
576                 ptllnd_close_peer(peer, tx->tx_status);
577         }
578         
579         ptllnd_abort_tx(tx, &tx->tx_reqmdh);
580         ptllnd_abort_tx(tx, &tx->tx_bulkmdh);
581
582         if (tx->tx_niov > 0) {
583                 LIBCFS_FREE(tx->tx_iov, tx->tx_niov * sizeof(*tx->tx_iov));
584                 tx->tx_niov = 0;
585         }
586
587         if (tx->tx_lnetreplymsg != NULL) {
588                 LASSERT (tx->tx_type == PTLLND_MSG_TYPE_GET);
589                 LASSERT (tx->tx_lnetmsg != NULL);
590                 /* Simulate GET success always  */
591                 lnet_finalize(ni, tx->tx_lnetmsg, 0);
592                 CDEBUG(D_NET, "lnet_finalize(tx_lnetreplymsg=%p)\n",tx->tx_lnetreplymsg);
593                 lnet_finalize(ni, tx->tx_lnetreplymsg, tx->tx_status);
594         } else if (tx->tx_lnetmsg != NULL) {
595                 lnet_finalize(ni, tx->tx_lnetmsg, tx->tx_status);
596         }
597
598         plni->plni_ntx_history++;
599         list_add_tail(&tx->tx_list, &plni->plni_tx_history);
600         
601         ptllnd_cull_tx_history(plni);
602 }
603
604 int
605 ptllnd_set_txiov(ptllnd_tx_t *tx,
606                  unsigned int niov, struct iovec *iov,
607                  unsigned int offset, unsigned int len)
608 {
609         ptl_md_iovec_t *piov;
610         int             npiov;
611
612         if (len == 0) {
613                 tx->tx_niov = 0;
614                 return 0;
615         }
616
617         /*
618          * Remove iovec's at the beginning that
619          * are skipped because of the offset.
620          * Adjust the offset accordingly
621          */
622         for (;;) {
623                 LASSERT (niov > 0);
624                 if (offset < iov->iov_len)
625                         break;
626                 offset -= iov->iov_len;
627                 niov--;
628                 iov++;
629         }
630
631         for (;;) {
632                 int temp_offset = offset;
633                 int resid = len;
634                 LIBCFS_ALLOC(piov, niov * sizeof(*piov));
635                 if (piov == NULL)
636                         return -ENOMEM;
637
638                 for (npiov = 0;; npiov++) {
639                         LASSERT (npiov < niov);
640                         LASSERT (iov->iov_len >= temp_offset);
641
642                         piov[npiov].iov_base = iov[npiov].iov_base + temp_offset;
643                         piov[npiov].iov_len = iov[npiov].iov_len - temp_offset;
644                         
645                         if (piov[npiov].iov_len >= resid) {
646                                 piov[npiov].iov_len = resid;
647                                 npiov++;
648                                 break;
649                         }
650                         resid -= piov[npiov].iov_len;
651                         temp_offset = 0;
652                 }
653
654                 if (npiov == niov) {
655                         tx->tx_niov = niov;
656                         tx->tx_iov = piov;
657                         return 0;
658                 }
659
660                 /* Dang! The piov I allocated was too big and it's a drag to
661                  * have to maintain separate 'allocated' and 'used' sizes, so
662                  * I'll just do it again; NB this doesn't happen normally... */
663                 LIBCFS_FREE(piov, niov * sizeof(*piov));
664                 niov = npiov;
665         }
666 }
667
668 void
669 ptllnd_set_md_buffer(ptl_md_t *md, ptllnd_tx_t *tx)
670 {
671         unsigned int    niov = tx->tx_niov;
672         ptl_md_iovec_t *iov = tx->tx_iov;
673
674         LASSERT ((md->options & PTL_MD_IOVEC) == 0);
675
676         if (niov == 0) {
677                 md->start = NULL;
678                 md->length = 0;
679         } else if (niov == 1) {
680                 md->start = iov[0].iov_base;
681                 md->length = iov[0].iov_len;
682         } else {
683                 md->start = iov;
684                 md->length = niov;
685                 md->options |= PTL_MD_IOVEC;
686         }
687 }
688
689 int
690 ptllnd_post_buffer(ptllnd_buffer_t *buf)
691 {
692         lnet_ni_t        *ni = buf->plb_ni;
693         ptllnd_ni_t      *plni = ni->ni_data;
694         ptl_process_id_t  anyid = {
695                 .nid       = PTL_NID_ANY,
696                 .pid       = PTL_PID_ANY};
697         ptl_md_t          md = {
698                 .start     = buf->plb_buffer,
699                 .length    = plni->plni_buffer_size,
700                 .threshold = PTL_MD_THRESH_INF,
701                 .max_size  = plni->plni_max_msg_size,
702                 .options   = (PTLLND_MD_OPTIONS |
703                               PTL_MD_OP_PUT | PTL_MD_MAX_SIZE | 
704                               PTL_MD_LOCAL_ALIGN8),
705                 .user_ptr  = ptllnd_obj2eventarg(buf, PTLLND_EVENTARG_TYPE_BUF),
706                 .eq_handle = plni->plni_eqh};
707         ptl_handle_me_t meh;
708         int             rc;
709
710         LASSERT (!buf->plb_posted);
711
712         rc = PtlMEAttach(plni->plni_nih, plni->plni_portal,
713                          anyid, LNET_MSG_MATCHBITS, 0,
714                          PTL_UNLINK, PTL_INS_AFTER, &meh);
715         if (rc != PTL_OK) {
716                 CERROR("PtlMEAttach failed: %s(%d)\n",
717                        ptllnd_errtype2str(rc), rc);
718                 return -ENOMEM;
719         }
720
721         buf->plb_posted = 1;
722         plni->plni_nposted_buffers++;
723
724         rc = PtlMDAttach(meh, md, LNET_UNLINK, &buf->plb_md);
725         if (rc == PTL_OK)
726                 return 0;
727
728         CERROR("PtlMDAttach failed: %s(%d)\n",
729                ptllnd_errtype2str(rc), rc);
730
731         buf->plb_posted = 0;
732         plni->plni_nposted_buffers--;
733
734         rc = PtlMEUnlink(meh);
735         LASSERT (rc == PTL_OK);
736
737         return -ENOMEM;
738 }
739
740 void
741 ptllnd_check_sends(ptllnd_peer_t *peer)
742 {
743         lnet_ni_t      *ni = peer->plp_ni;
744         ptllnd_ni_t    *plni = ni->ni_data;
745         ptllnd_tx_t    *tx;
746         ptl_md_t        md;
747         ptl_handle_md_t mdh;
748         int             rc;
749
750         CDEBUG(D_NET, "%s: [%d/%d+%d(%d)\n",
751                libcfs_id2str(peer->plp_id), peer->plp_credits,
752                peer->plp_outstanding_credits, peer->plp_sent_credits,
753                plni->plni_peer_credits + peer->plp_lazy_credits);
754
755         if (list_empty(&peer->plp_txq) &&
756             peer->plp_outstanding_credits >= PTLLND_CREDIT_HIGHWATER(plni) &&
757             peer->plp_credits != 0) {
758
759                 tx = ptllnd_new_tx(peer, PTLLND_MSG_TYPE_NOOP, 0);
760                 CDEBUG(D_NET, "NOOP tx=%p\n",tx);
761                 if (tx == NULL) {
762                         CERROR("Can't return credits to %s\n",
763                                libcfs_id2str(peer->plp_id));
764                 } else {
765                         ptllnd_set_tx_deadline(tx);
766                         list_add_tail(&tx->tx_list, &peer->plp_txq);
767                 }
768         }
769
770         while (!list_empty(&peer->plp_txq)) {
771                 tx = list_entry(peer->plp_txq.next, ptllnd_tx_t, tx_list);
772
773                 LASSERT (tx->tx_msgsize > 0);
774
775                 LASSERT (peer->plp_outstanding_credits >= 0);
776                 LASSERT (peer->plp_sent_credits >= 0);
777                 LASSERT (peer->plp_outstanding_credits + peer->plp_sent_credits
778                          <= plni->plni_peer_credits + peer->plp_lazy_credits);
779                 LASSERT (peer->plp_credits >= 0);
780
781                 if (peer->plp_credits == 0) {   /* no credits */
782                         PTLLND_HISTORY("%s[%d/%d+%d(%d)]: no creds for %p",
783                                        libcfs_id2str(peer->plp_id),
784                                        peer->plp_credits,
785                                        peer->plp_outstanding_credits,
786                                        peer->plp_sent_credits,
787                                        plni->plni_peer_credits +
788                                        peer->plp_lazy_credits, tx);
789                         break;
790                 }
791                 
792                 if (peer->plp_credits == 1 &&   /* last credit reserved for */
793                     peer->plp_outstanding_credits == 0) { /* returning credits */
794                         PTLLND_HISTORY("%s[%d/%d+%d(%d)]: too few creds for %p",
795                                        libcfs_id2str(peer->plp_id),
796                                        peer->plp_credits,
797                                        peer->plp_outstanding_credits,
798                                        peer->plp_sent_credits,
799                                        plni->plni_peer_credits +
800                                        peer->plp_lazy_credits, tx);
801                         break;
802                 }
803                 
804                 list_del(&tx->tx_list);
805                 list_add_tail(&tx->tx_list, &peer->plp_activeq);
806
807                 CDEBUG(D_NET, "Sending at TX=%p type=%s (%d)\n",tx,
808                         ptllnd_msgtype2str(tx->tx_type),tx->tx_type);
809
810                 if (tx->tx_type == PTLLND_MSG_TYPE_NOOP &&
811                     (!list_empty(&peer->plp_txq) ||
812                      peer->plp_outstanding_credits <
813                      PTLLND_CREDIT_HIGHWATER(plni))) {
814                         /* redundant NOOP */
815                         ptllnd_tx_done(tx);
816                         continue;
817                 }
818
819                 /* Set stamp at the last minute; on a new peer, I don't know it
820                  * until I receive the HELLO back */
821                 tx->tx_msg.ptlm_dststamp = peer->plp_stamp;
822
823                 /*
824                  * Return all the credits we have
825                  */
826                 tx->tx_msg.ptlm_credits = peer->plp_outstanding_credits;
827                 peer->plp_sent_credits += peer->plp_outstanding_credits;
828                 peer->plp_outstanding_credits = 0;
829
830                 /*
831                  * One less credit
832                  */
833                 peer->plp_credits--;
834
835                 if (plni->plni_checksum)
836                         tx->tx_msg.ptlm_cksum = 
837                                 ptllnd_cksum(&tx->tx_msg,
838                                              offsetof(kptl_msg_t, ptlm_u));
839
840                 md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);
841                 md.eq_handle = plni->plni_eqh;
842                 md.threshold = 1;
843                 md.options = PTLLND_MD_OPTIONS;
844                 md.start = &tx->tx_msg;
845                 md.length = tx->tx_msgsize;
846
847                 rc = PtlMDBind(plni->plni_nih, md, LNET_UNLINK, &mdh);
848                 if (rc != PTL_OK) {
849                         CERROR("PtlMDBind for %s failed: %s(%d)\n",
850                                libcfs_id2str(peer->plp_id),
851                                ptllnd_errtype2str(rc), rc);
852                         tx->tx_status = -EIO;
853                         ptllnd_tx_done(tx);
854                         break;
855                 }
856
857                 LASSERT (tx->tx_type != PTLLND_RDMA_WRITE &&
858                          tx->tx_type != PTLLND_RDMA_READ);
859                 
860                 tx->tx_reqmdh = mdh;
861                 gettimeofday(&tx->tx_req_posted, NULL);
862
863                 PTLLND_HISTORY("%s[%d/%d+%d(%d)]: %s %p c %d",
864                                libcfs_id2str(peer->plp_id),
865                                peer->plp_credits,
866                                peer->plp_outstanding_credits,
867                                peer->plp_sent_credits,
868                                plni->plni_peer_credits +
869                                peer->plp_lazy_credits,
870                                ptllnd_msgtype2str(tx->tx_type), tx,
871                                tx->tx_msg.ptlm_credits);
872
873                 rc = PtlPut(mdh, PTL_NOACK_REQ, peer->plp_ptlid,
874                             plni->plni_portal, 0, LNET_MSG_MATCHBITS, 0, 0);
875                 if (rc != PTL_OK) {
876                         CERROR("PtlPut for %s failed: %s(%d)\n",
877                                libcfs_id2str(peer->plp_id),
878                                ptllnd_errtype2str(rc), rc);
879                         tx->tx_status = -EIO;
880                         ptllnd_tx_done(tx);
881                         break;
882                 }
883         }
884 }
885
886 int
887 ptllnd_passive_rdma(ptllnd_peer_t *peer, int type, lnet_msg_t *msg,
888                     unsigned int niov, struct iovec *iov,
889                     unsigned int offset, unsigned int len)
890 {
891         lnet_ni_t      *ni = peer->plp_ni;
892         ptllnd_ni_t    *plni = ni->ni_data;
893         ptllnd_tx_t    *tx = ptllnd_new_tx(peer, type, 0);
894         __u64           matchbits;
895         ptl_md_t        md;
896         ptl_handle_md_t mdh;
897         ptl_handle_me_t meh;
898         int             rc;
899         int             rc2;
900         time_t          start;
901         int             w;
902
903         CDEBUG(D_NET, "niov=%d offset=%d len=%d\n",niov,offset,len);
904
905         LASSERT (type == PTLLND_MSG_TYPE_GET ||
906                  type == PTLLND_MSG_TYPE_PUT);
907
908         if (tx == NULL) {
909                 CERROR("Can't allocate %s tx for %s\n",
910                        type == PTLLND_MSG_TYPE_GET ? "GET" : "PUT/REPLY",
911                        libcfs_id2str(peer->plp_id));
912                 return -ENOMEM;
913         }
914
915         rc = ptllnd_set_txiov(tx, niov, iov, offset, len);
916         if (rc != 0) {
917                 CERROR ("Can't allocate iov %d for %s\n",
918                         niov, libcfs_id2str(peer->plp_id));
919                 rc = -ENOMEM;
920                 goto failed;
921         }
922
923         md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);
924         md.eq_handle = plni->plni_eqh;
925         md.threshold = 1;
926         md.max_size = 0;
927         md.options = PTLLND_MD_OPTIONS;
928         if(type == PTLLND_MSG_TYPE_GET)
929                 md.options |= PTL_MD_OP_PUT | PTL_MD_ACK_DISABLE;
930         else
931                 md.options |= PTL_MD_OP_GET;
932         ptllnd_set_md_buffer(&md, tx);
933
934         start = cfs_time_current_sec();
935         w = plni->plni_long_wait;
936
937         while (!peer->plp_recvd_hello) {        /* wait to validate plp_match */
938                 if (peer->plp_closing) {
939                         rc = -EIO;
940                         goto failed;
941                 }
942                 if (w > 0 && cfs_time_current_sec() > start + w/1000) {
943                         CWARN("Waited %ds to connect to %s\n",
944                               (int)(cfs_time_current_sec() - start),
945                               libcfs_id2str(peer->plp_id));
946                         w *= 2;
947                 }
948                 ptllnd_wait(ni, w);
949         }
950
951         if (peer->plp_match < PTL_RESERVED_MATCHBITS)
952                 peer->plp_match = PTL_RESERVED_MATCHBITS;
953         matchbits = peer->plp_match++;
954
955         rc = PtlMEAttach(plni->plni_nih, plni->plni_portal, peer->plp_ptlid,
956                          matchbits, 0, PTL_UNLINK, PTL_INS_BEFORE, &meh);
957         if (rc != PTL_OK) {
958                 CERROR("PtlMEAttach for %s failed: %s(%d)\n",
959                        libcfs_id2str(peer->plp_id),
960                        ptllnd_errtype2str(rc), rc);
961                 rc = -EIO;
962                 goto failed;
963         }
964
965         gettimeofday(&tx->tx_bulk_posted, NULL);
966
967         rc = PtlMDAttach(meh, md, LNET_UNLINK, &mdh);
968         if (rc != PTL_OK) {
969                 CERROR("PtlMDAttach for %s failed: %s(%d)\n",
970                        libcfs_id2str(peer->plp_id),
971                        ptllnd_errtype2str(rc), rc);
972                 rc2 = PtlMEUnlink(meh);
973                 LASSERT (rc2 == PTL_OK);
974                 rc = -EIO;
975                 goto failed;
976         }
977         tx->tx_bulkmdh = mdh;
978
979         /*
980          * We need to set the stamp here because it
981          * we could have received a HELLO above that set
982          * peer->plp_stamp
983          */
984         tx->tx_msg.ptlm_dststamp = peer->plp_stamp;
985
986         tx->tx_msg.ptlm_u.rdma.kptlrm_hdr = msg->msg_hdr;
987         tx->tx_msg.ptlm_u.rdma.kptlrm_matchbits = matchbits;
988
989         if (type == PTLLND_MSG_TYPE_GET) {
990                 tx->tx_lnetreplymsg = lnet_create_reply_msg(ni, msg);
991                 if (tx->tx_lnetreplymsg == NULL) {
992                         CERROR("Can't create reply for GET to %s\n",
993                                libcfs_id2str(msg->msg_target));
994                         rc = -ENOMEM;
995                         goto failed;
996                 }
997         }
998
999         tx->tx_lnetmsg = msg;
1000         PTLLND_HISTORY("%s[%d/%d+%d(%d)]: post passive %s p %d %p",
1001                        libcfs_id2str(msg->msg_target),
1002                        peer->plp_credits, peer->plp_outstanding_credits,
1003                        peer->plp_sent_credits,
1004                        plni->plni_peer_credits + peer->plp_lazy_credits,
1005                        lnet_msgtyp2str(msg->msg_type),
1006                        (le32_to_cpu(msg->msg_type) == LNET_MSG_PUT) ? 
1007                        le32_to_cpu(msg->msg_hdr.msg.put.ptl_index) :
1008                        (le32_to_cpu(msg->msg_type) == LNET_MSG_GET) ? 
1009                        le32_to_cpu(msg->msg_hdr.msg.get.ptl_index) : -1,
1010                        tx);
1011         ptllnd_post_tx(tx);
1012         return 0;
1013
1014  failed:
1015         ptllnd_tx_done(tx);
1016         return rc;
1017 }
1018
1019 int
1020 ptllnd_active_rdma(ptllnd_peer_t *peer, int type,
1021                    lnet_msg_t *msg, __u64 matchbits,
1022                    unsigned int niov, struct iovec *iov,
1023                    unsigned int offset, unsigned int len)
1024 {
1025         lnet_ni_t       *ni = peer->plp_ni;
1026         ptllnd_ni_t     *plni = ni->ni_data;
1027         ptllnd_tx_t     *tx = ptllnd_new_tx(peer, type, 0);
1028         ptl_md_t         md;
1029         ptl_handle_md_t  mdh;
1030         int              rc;
1031
1032         LASSERT (type == PTLLND_RDMA_READ ||
1033                  type == PTLLND_RDMA_WRITE);
1034
1035         if (tx == NULL) {
1036                 CERROR("Can't allocate tx for RDMA %s with %s\n",
1037                        (type == PTLLND_RDMA_WRITE) ? "write" : "read",
1038                        libcfs_id2str(peer->plp_id));
1039                 ptllnd_close_peer(peer, -ENOMEM);
1040                 return -ENOMEM;
1041         }
1042
1043         rc = ptllnd_set_txiov(tx, niov, iov, offset, len);
1044         if (rc != 0) {
1045                 CERROR ("Can't allocate iov %d for %s\n",
1046                         niov, libcfs_id2str(peer->plp_id));
1047                 rc = -ENOMEM;
1048                 goto failed;
1049         }
1050
1051         md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);
1052         md.eq_handle = plni->plni_eqh;
1053         md.max_size = 0;
1054         md.options = PTLLND_MD_OPTIONS;
1055         md.threshold = (type == PTLLND_RDMA_READ) ? 2 : 1;
1056
1057         ptllnd_set_md_buffer(&md, tx);
1058
1059         rc = PtlMDBind(plni->plni_nih, md, LNET_UNLINK, &mdh);
1060         if (rc != PTL_OK) {
1061                 CERROR("PtlMDBind for %s failed: %s(%d)\n",
1062                        libcfs_id2str(peer->plp_id),
1063                        ptllnd_errtype2str(rc), rc);
1064                 rc = -EIO;
1065                 goto failed;
1066         }
1067
1068         tx->tx_bulkmdh = mdh;
1069         tx->tx_lnetmsg = msg;
1070
1071         ptllnd_set_tx_deadline(tx);
1072         list_add_tail(&tx->tx_list, &peer->plp_activeq);
1073         gettimeofday(&tx->tx_bulk_posted, NULL);
1074
1075         if (type == PTLLND_RDMA_READ)
1076                 rc = PtlGet(mdh, peer->plp_ptlid,
1077                             plni->plni_portal, 0, matchbits, 0);
1078         else
1079                 rc = PtlPut(mdh, PTL_NOACK_REQ, peer->plp_ptlid,
1080                             plni->plni_portal, 0, matchbits, 0, 
1081                             (msg == NULL) ? PTLLND_RDMA_FAIL : PTLLND_RDMA_OK);
1082
1083         if (rc == PTL_OK)
1084                 return 0;
1085
1086         CERROR("Can't initiate RDMA with %s: %s(%d)\n",
1087                libcfs_id2str(peer->plp_id),
1088                ptllnd_errtype2str(rc), rc);
1089
1090         tx->tx_lnetmsg = NULL;
1091  failed:
1092         tx->tx_status = rc;
1093         ptllnd_tx_done(tx);    /* this will close peer */
1094         return rc;
1095 }
1096
1097 int
1098 ptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *msg)
1099 {
1100         ptllnd_ni_t    *plni = ni->ni_data;
1101         ptllnd_peer_t  *plp;
1102         ptllnd_tx_t    *tx;
1103         int             nob;
1104         int             rc;
1105
1106         LASSERT (!msg->msg_routing);
1107         LASSERT (msg->msg_kiov == NULL);
1108
1109         LASSERT (msg->msg_niov <= PTL_MD_MAX_IOV); /* !!! */
1110
1111         CDEBUG(D_NET, "%s [%d]+%d,%d -> %s%s\n", 
1112                lnet_msgtyp2str(msg->msg_type),
1113                msg->msg_niov, msg->msg_offset, msg->msg_len,
1114                libcfs_nid2str(msg->msg_target.nid),
1115                msg->msg_target_is_router ? "(rtr)" : "");
1116
1117         if ((msg->msg_target.pid & LNET_PID_USERFLAG) != 0) {
1118                 CERROR("Can't send to non-kernel peer %s\n",
1119                        libcfs_id2str(msg->msg_target));
1120                 return -EHOSTUNREACH;
1121         }
1122         
1123         plp = ptllnd_find_peer(ni, msg->msg_target, 1);
1124         if (plp == NULL)
1125                 return -ENOMEM;
1126
1127         switch (msg->msg_type) {
1128         default:
1129                 LBUG();
1130
1131         case LNET_MSG_ACK:
1132                 LASSERT (msg->msg_len == 0);
1133                 break;                          /* send IMMEDIATE */
1134
1135         case LNET_MSG_GET:
1136                 if (msg->msg_target_is_router)
1137                         break;                  /* send IMMEDIATE */
1138
1139                 nob = msg->msg_md->md_length;
1140                 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[nob]);
1141                 if (nob <= plni->plni_max_msg_size)
1142                         break;
1143
1144                 LASSERT ((msg->msg_md->md_options & LNET_MD_KIOV) == 0);
1145                 rc = ptllnd_passive_rdma(plp, PTLLND_MSG_TYPE_GET, msg,
1146                                          msg->msg_md->md_niov,
1147                                          msg->msg_md->md_iov.iov,
1148                                          0, msg->msg_md->md_length);
1149                 ptllnd_peer_decref(plp);
1150                 return rc;
1151
1152         case LNET_MSG_REPLY:
1153         case LNET_MSG_PUT:
1154                 nob = msg->msg_len;
1155                 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[nob]);
1156                 if (nob <= plp->plp_max_msg_size)
1157                         break;                  /* send IMMEDIATE */
1158
1159                 rc = ptllnd_passive_rdma(plp, PTLLND_MSG_TYPE_PUT, msg,
1160                                          msg->msg_niov, msg->msg_iov,
1161                                          msg->msg_offset, msg->msg_len);
1162                 ptllnd_peer_decref(plp);
1163                 return rc;
1164         }
1165
1166         /* send IMMEDIATE
1167          * NB copy the payload so we don't have to do a fragmented send */
1168
1169         tx = ptllnd_new_tx(plp, PTLLND_MSG_TYPE_IMMEDIATE, msg->msg_len);
1170         if (tx == NULL) {
1171                 CERROR("Can't allocate tx for lnet type %d to %s\n",
1172                        msg->msg_type, libcfs_id2str(msg->msg_target));
1173                 ptllnd_peer_decref(plp);
1174                 return -ENOMEM;
1175         }
1176
1177         lnet_copy_iov2flat(tx->tx_msgsize, &tx->tx_msg,
1178                            offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload),
1179                            msg->msg_niov, msg->msg_iov, msg->msg_offset,
1180                            msg->msg_len);
1181         tx->tx_msg.ptlm_u.immediate.kptlim_hdr = msg->msg_hdr;
1182
1183         tx->tx_lnetmsg = msg;
1184         PTLLND_HISTORY("%s[%d/%d+%d(%d)]: post immediate %s p %d %p",
1185                        libcfs_id2str(msg->msg_target),
1186                        plp->plp_credits, plp->plp_outstanding_credits,
1187                        plp->plp_sent_credits,
1188                        plni->plni_peer_credits + plp->plp_lazy_credits,
1189                        lnet_msgtyp2str(msg->msg_type),
1190                        (le32_to_cpu(msg->msg_type) == LNET_MSG_PUT) ? 
1191                        le32_to_cpu(msg->msg_hdr.msg.put.ptl_index) :
1192                        (le32_to_cpu(msg->msg_type) == LNET_MSG_GET) ? 
1193                        le32_to_cpu(msg->msg_hdr.msg.get.ptl_index) : -1,
1194                        tx);
1195         ptllnd_post_tx(tx);
1196         ptllnd_peer_decref(plp);
1197         return 0;
1198 }
1199
1200 void
1201 ptllnd_rx_done(ptllnd_rx_t *rx)
1202 {
1203         ptllnd_peer_t *plp = rx->rx_peer;
1204         lnet_ni_t     *ni = plp->plp_ni;
1205         ptllnd_ni_t   *plni = ni->ni_data;
1206
1207         plp->plp_outstanding_credits++;
1208
1209         PTLLND_HISTORY("%s[%d/%d+%d(%d)]: rx=%p done\n",
1210                        libcfs_id2str(plp->plp_id),
1211                        plp->plp_credits, plp->plp_outstanding_credits, 
1212                        plp->plp_sent_credits,
1213                        plni->plni_peer_credits + plp->plp_lazy_credits, rx);
1214
1215         ptllnd_check_sends(rx->rx_peer);
1216
1217         LASSERT (plni->plni_nrxs > 0);
1218         plni->plni_nrxs--;
1219 }
1220
1221 int
1222 ptllnd_eager_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg,
1223                   void **new_privatep)
1224 {
1225         /* Shouldn't get here; recvs only block for router buffers */
1226         LBUG();
1227         return 0;
1228 }
1229
1230 int
1231 ptllnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg,
1232             int delayed, unsigned int niov,
1233             struct iovec *iov, lnet_kiov_t *kiov,
1234             unsigned int offset, unsigned int mlen, unsigned int rlen)
1235 {
1236         ptllnd_rx_t    *rx = private;
1237         int             rc = 0;
1238         int             nob;
1239
1240         LASSERT (kiov == NULL);
1241         LASSERT (niov <= PTL_MD_MAX_IOV);       /* !!! */
1242
1243         switch (rx->rx_msg->ptlm_type) {
1244         default:
1245                 LBUG();
1246
1247         case PTLLND_MSG_TYPE_IMMEDIATE:
1248                 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[mlen]);
1249                 if (nob > rx->rx_nob) {
1250                         CERROR("Immediate message from %s too big: %d(%d)\n",
1251                                libcfs_id2str(rx->rx_peer->plp_id),
1252                                nob, rx->rx_nob);
1253                         rc = -EPROTO;
1254                         break;
1255                 }
1256                 lnet_copy_flat2iov(niov, iov, offset,
1257                                    rx->rx_nob, rx->rx_msg,
1258                                    offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload),
1259                                    mlen);
1260                 lnet_finalize(ni, msg, 0);
1261                 break;
1262
1263         case PTLLND_MSG_TYPE_PUT:
1264                 rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_READ, msg,
1265                                         rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,
1266                                         niov, iov, offset, mlen);
1267                 break;
1268
1269         case PTLLND_MSG_TYPE_GET:
1270                 if (msg != NULL)
1271                         rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_WRITE, msg,
1272                                                 rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,
1273                                                 msg->msg_niov, msg->msg_iov,
1274                                                 msg->msg_offset, msg->msg_len);
1275                 else
1276                         rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_WRITE, NULL,
1277                                                 rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,
1278                                                 0, NULL, 0, 0);
1279                 break;
1280         }
1281
1282         ptllnd_rx_done(rx);
1283         return rc;
1284 }
1285
1286 void
1287 ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator,
1288                      kptl_msg_t *msg, unsigned int nob)
1289 {
1290         ptllnd_ni_t      *plni = ni->ni_data;
1291         const int         basenob = offsetof(kptl_msg_t, ptlm_u);
1292         lnet_process_id_t srcid;
1293         ptllnd_rx_t       rx;
1294         int               flip;
1295         __u16             msg_version;
1296         __u32             msg_cksum;
1297         ptllnd_peer_t    *plp;
1298         int               rc;
1299
1300         if (nob < 6) {
1301                 CERROR("Very short receive from %s\n",
1302                        ptllnd_ptlid2str(initiator));
1303                 return;
1304         }
1305
1306         /* I can at least read MAGIC/VERSION */
1307
1308         flip = msg->ptlm_magic == __swab32(PTLLND_MSG_MAGIC);
1309         if (!flip && msg->ptlm_magic != PTLLND_MSG_MAGIC) {
1310                 CERROR("Bad protocol magic %08x from %s\n", 
1311                        msg->ptlm_magic, ptllnd_ptlid2str(initiator));
1312                 return;
1313         }
1314
1315         msg_version = flip ? __swab16(msg->ptlm_version) : msg->ptlm_version;
1316
1317         if (msg_version != PTLLND_MSG_VERSION) {
1318                 CERROR("Bad protocol version %04x from %s: %04x expected\n", 
1319                        (__u32)msg_version, ptllnd_ptlid2str(initiator), PTLLND_MSG_VERSION);
1320
1321                 if (plni->plni_abort_on_protocol_mismatch)
1322                         abort();
1323
1324                 return;
1325         }
1326
1327         if (nob < basenob) {
1328                 CERROR("Short receive from %s: got %d, wanted at least %d\n",
1329                        ptllnd_ptlid2str(initiator), nob, basenob);
1330                 return;
1331         }
1332
1333         /* checksum must be computed with
1334          * 1) ptlm_cksum zero and
1335          * 2) BEFORE anything gets modified/flipped
1336          */
1337         msg_cksum = flip ? __swab32(msg->ptlm_cksum) : msg->ptlm_cksum;
1338         msg->ptlm_cksum = 0;
1339         if (msg_cksum != 0 &&
1340             msg_cksum != ptllnd_cksum(msg, offsetof(kptl_msg_t, ptlm_u))) {
1341                 CERROR("Bad checksum from %s\n", ptllnd_ptlid2str(initiator));
1342                 return;
1343         }
1344
1345         msg->ptlm_version = msg_version;
1346         msg->ptlm_cksum = msg_cksum;
1347         
1348         if (flip) {
1349                 /* NB stamps are opaque cookies */
1350                 __swab32s(&msg->ptlm_nob);
1351                 __swab64s(&msg->ptlm_srcnid);
1352                 __swab64s(&msg->ptlm_dstnid);
1353                 __swab32s(&msg->ptlm_srcpid);
1354                 __swab32s(&msg->ptlm_dstpid);
1355         }
1356         
1357         srcid.nid = msg->ptlm_srcnid;
1358         srcid.pid = msg->ptlm_srcpid;
1359
1360         if (LNET_NIDNET(msg->ptlm_srcnid) != LNET_NIDNET(ni->ni_nid)) {
1361                 CERROR("Bad source id %s from %s\n",
1362                        libcfs_id2str(srcid),
1363                        ptllnd_ptlid2str(initiator));
1364                 return;
1365         }
1366
1367         if (msg->ptlm_type == PTLLND_MSG_TYPE_NAK) {
1368                 CERROR("NAK from %s (%s)\n", 
1369                        libcfs_id2str(srcid),
1370                        ptllnd_ptlid2str(initiator));
1371
1372                 if (plni->plni_dump_on_nak)
1373                         ptllnd_dump_debug(ni, srcid);
1374                 
1375                 if (plni->plni_abort_on_nak)
1376                         abort();
1377                 
1378                 return;
1379         }
1380         
1381         if (msg->ptlm_dstnid != ni->ni_nid ||
1382             msg->ptlm_dstpid != the_lnet.ln_pid) {
1383                 CERROR("Bad dstid %s (%s expected) from %s\n",
1384                        libcfs_id2str((lnet_process_id_t) {
1385                                .nid = msg->ptlm_dstnid,
1386                                .pid = msg->ptlm_dstpid}),
1387                        libcfs_id2str((lnet_process_id_t) {
1388                                .nid = ni->ni_nid,
1389                                .pid = the_lnet.ln_pid}),
1390                        libcfs_id2str(srcid));
1391                 return;
1392         }
1393
1394         if (msg->ptlm_dststamp != plni->plni_stamp) {
1395                 CERROR("Bad dststamp "LPX64"("LPX64" expected) from %s\n",
1396                        msg->ptlm_dststamp, plni->plni_stamp,
1397                        libcfs_id2str(srcid));
1398                 return;
1399         }
1400
1401         PTLLND_HISTORY("RX %s: %s %d %p", libcfs_id2str(srcid), 
1402                        ptllnd_msgtype2str(msg->ptlm_type),
1403                        msg->ptlm_credits, &rx);
1404
1405         switch (msg->ptlm_type) {
1406         case PTLLND_MSG_TYPE_PUT:
1407         case PTLLND_MSG_TYPE_GET:
1408                 if (nob < basenob + sizeof(kptl_rdma_msg_t)) {
1409                         CERROR("Short rdma request from %s(%s)\n",
1410                                libcfs_id2str(srcid),
1411                                ptllnd_ptlid2str(initiator));
1412                         return;
1413                 }
1414                 if (flip)
1415                         __swab64s(&msg->ptlm_u.rdma.kptlrm_matchbits);
1416                 break;
1417
1418         case PTLLND_MSG_TYPE_IMMEDIATE:
1419                 if (nob < offsetof(kptl_msg_t,
1420                                    ptlm_u.immediate.kptlim_payload)) {
1421                         CERROR("Short immediate from %s(%s)\n",
1422                                libcfs_id2str(srcid),
1423                                ptllnd_ptlid2str(initiator));
1424                         return;
1425                 }
1426                 break;
1427
1428         case PTLLND_MSG_TYPE_HELLO:
1429                 if (nob < basenob + sizeof(kptl_hello_msg_t)) {
1430                         CERROR("Short hello from %s(%s)\n",
1431                                libcfs_id2str(srcid),
1432                                ptllnd_ptlid2str(initiator));
1433                         return;
1434                 }
1435                 if(flip){
1436                         __swab64s(&msg->ptlm_u.hello.kptlhm_matchbits);
1437                         __swab32s(&msg->ptlm_u.hello.kptlhm_max_msg_size);
1438                 }
1439                 break;
1440                 
1441         case PTLLND_MSG_TYPE_NOOP:
1442                 break;
1443
1444         default:
1445                 CERROR("Bad message type %d from %s(%s)\n", msg->ptlm_type,
1446                        libcfs_id2str(srcid),
1447                        ptllnd_ptlid2str(initiator));
1448                 return;
1449         }
1450
1451         plp = ptllnd_find_peer(ni, srcid, 0);
1452         if (plp == NULL) {
1453                 CERROR("Can't find peer %s\n", libcfs_id2str(srcid));
1454                 return;
1455         }
1456
1457         if (msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {
1458                 if (plp->plp_recvd_hello) {
1459                         CERROR("Unexpected HELLO from %s\n",
1460                                libcfs_id2str(srcid));
1461                         ptllnd_peer_decref(plp);
1462                         return;
1463                 }
1464
1465                 plp->plp_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size;
1466                 plp->plp_match = msg->ptlm_u.hello.kptlhm_matchbits;
1467                 plp->plp_stamp = msg->ptlm_srcstamp;
1468                 plp->plp_recvd_hello = 1;
1469
1470         } else if (!plp->plp_recvd_hello) {
1471
1472                 CERROR("Bad message type %d (HELLO expected) from %s\n",
1473                        msg->ptlm_type, libcfs_id2str(srcid));
1474                 ptllnd_peer_decref(plp);
1475                 return;
1476
1477         } else if (msg->ptlm_srcstamp != plp->plp_stamp) {
1478
1479                 CERROR("Bad srcstamp "LPX64"("LPX64" expected) from %s\n",
1480                        msg->ptlm_srcstamp, plp->plp_stamp,
1481                        libcfs_id2str(srcid));
1482                 ptllnd_peer_decref(plp);
1483                 return;
1484         }
1485
1486         /* Check peer only sends when I've sent her credits */
1487         if (plp->plp_sent_credits == 0) {
1488                 CERROR("%s[%d/%d+%d(%d)]: unexpected message\n",
1489                        libcfs_id2str(plp->plp_id),
1490                        plp->plp_credits, plp->plp_outstanding_credits, 
1491                        plp->plp_sent_credits,
1492                        plni->plni_peer_credits + plp->plp_lazy_credits);
1493                 return;
1494         }
1495         plp->plp_sent_credits--;
1496         
1497         /* No check for credit overflow - the peer may post new buffers after
1498          * the startup handshake. */
1499         if (msg->ptlm_credits > 0) {
1500                 plp->plp_credits += msg->ptlm_credits;
1501                 ptllnd_check_sends(plp);
1502         }
1503
1504         /* All OK so far; assume the message is good... */
1505
1506         rx.rx_peer      = plp;
1507         rx.rx_msg       = msg;
1508         rx.rx_nob       = nob;
1509         plni->plni_nrxs++;
1510
1511         switch (msg->ptlm_type) {
1512         default: /* message types have been checked already */
1513                 ptllnd_rx_done(&rx);
1514                 break;
1515
1516         case PTLLND_MSG_TYPE_PUT:
1517         case PTLLND_MSG_TYPE_GET:
1518                 rc = lnet_parse(ni, &msg->ptlm_u.rdma.kptlrm_hdr,
1519                                 msg->ptlm_srcnid, &rx, 1);
1520                 if (rc < 0)
1521                         ptllnd_rx_done(&rx);
1522                 break;
1523
1524         case PTLLND_MSG_TYPE_IMMEDIATE:
1525                 rc = lnet_parse(ni, &msg->ptlm_u.immediate.kptlim_hdr,
1526                                 msg->ptlm_srcnid, &rx, 0);
1527                 if (rc < 0)
1528                         ptllnd_rx_done(&rx);
1529                 break;
1530         }
1531
1532         ptllnd_peer_decref(plp);
1533 }
1534
1535 void
1536 ptllnd_buf_event (lnet_ni_t *ni, ptl_event_t *event)
1537 {
1538         ptllnd_buffer_t *buf = ptllnd_eventarg2obj(event->md.user_ptr);
1539         ptllnd_ni_t     *plni = ni->ni_data;
1540         char            *msg = &buf->plb_buffer[event->offset];
1541         int              repost;
1542         int              unlinked = event->type == PTL_EVENT_UNLINK;
1543
1544         LASSERT (buf->plb_ni == ni);
1545         LASSERT (event->type == PTL_EVENT_PUT_END ||
1546                  event->type == PTL_EVENT_UNLINK);
1547
1548         if (event->ni_fail_type != PTL_NI_OK) {
1549
1550                 CERROR("event type %s(%d), status %s(%d) from %s\n",
1551                        ptllnd_evtype2str(event->type), event->type,
1552                        ptllnd_errtype2str(event->ni_fail_type), 
1553                        event->ni_fail_type,
1554                        ptllnd_ptlid2str(event->initiator));
1555
1556         } else if (event->type == PTL_EVENT_PUT_END) {
1557 #if (PTL_MD_LOCAL_ALIGN8 == 0)
1558                 /* Portals can't force message alignment - someone sending an
1559                  * odd-length message could misalign subsequent messages */
1560                 if ((event->mlength & 7) != 0) {
1561                         CERROR("Message from %s has odd length %llu: "
1562                                "probable version incompatibility\n",
1563                                ptllnd_ptlid2str(event->initiator),
1564                                event->mlength);
1565                         LBUG();
1566                 }
1567 #endif
1568                 LASSERT ((event->offset & 7) == 0);
1569
1570                 ptllnd_parse_request(ni, event->initiator,
1571                                      (kptl_msg_t *)msg, event->mlength);
1572         }
1573
1574 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
1575         /* UNLINK event only on explicit unlink */
1576         repost = (event->unlinked && event->type != PTL_EVENT_UNLINK);
1577         if (event->unlinked)
1578                 unlinked = 1;
1579 #else
1580         /* UNLINK event only on implicit unlink */
1581         repost = (event->type == PTL_EVENT_UNLINK);
1582 #endif
1583
1584         if (unlinked) {
1585                 LASSERT(buf->plb_posted);
1586                 buf->plb_posted = 0;
1587                 plni->plni_nposted_buffers--;
1588         }
1589
1590         if (repost)
1591                 (void) ptllnd_post_buffer(buf);
1592 }
1593
1594 void
1595 ptllnd_tx_event (lnet_ni_t *ni, ptl_event_t *event)
1596 {
1597         ptllnd_ni_t *plni = ni->ni_data;
1598         ptllnd_tx_t *tx = ptllnd_eventarg2obj(event->md.user_ptr);
1599         int          error = (event->ni_fail_type != PTL_NI_OK);
1600         int          isreq;
1601         int          isbulk;
1602 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
1603         int          unlinked = event->unlinked;
1604 #else
1605         int          unlinked = (event->type == PTL_EVENT_UNLINK);
1606 #endif
1607
1608         if (error)
1609                 CERROR("Error %s(%d) event %s(%d) unlinked %d, %s(%d) for %s\n",
1610                        ptllnd_errtype2str(event->ni_fail_type),
1611                        event->ni_fail_type,
1612                        ptllnd_evtype2str(event->type), event->type,
1613                        unlinked, ptllnd_msgtype2str(tx->tx_type), tx->tx_type,
1614                        libcfs_id2str(tx->tx_peer->plp_id));
1615
1616         LASSERT (!PtlHandleIsEqual(event->md_handle, PTL_INVALID_HANDLE));
1617
1618         isreq = PtlHandleIsEqual(event->md_handle, tx->tx_reqmdh);
1619         if (isreq) {
1620                 LASSERT (event->md.start == (void *)&tx->tx_msg);
1621                 if (unlinked) {
1622                         tx->tx_reqmdh = PTL_INVALID_HANDLE;
1623                         gettimeofday(&tx->tx_req_done, NULL);
1624                 }
1625         }
1626
1627         isbulk = PtlHandleIsEqual(event->md_handle, tx->tx_bulkmdh);
1628         if ( isbulk && unlinked ) {
1629                 tx->tx_bulkmdh = PTL_INVALID_HANDLE;
1630                 gettimeofday(&tx->tx_bulk_done, NULL);
1631         }
1632
1633         LASSERT (!isreq != !isbulk);            /* always one and only 1 match */
1634
1635         PTLLND_HISTORY("%s[%d/%d+%d(%d)]: TX done %p %s%s",
1636                        libcfs_id2str(tx->tx_peer->plp_id), 
1637                        tx->tx_peer->plp_credits,
1638                        tx->tx_peer->plp_outstanding_credits,
1639                        tx->tx_peer->plp_sent_credits,
1640                        plni->plni_peer_credits + tx->tx_peer->plp_lazy_credits,
1641                        tx, isreq ? "REQ" : "BULK", unlinked ? "(unlinked)" : "");
1642
1643         LASSERT (!isreq != !isbulk);            /* always one and only 1 match */
1644         switch (tx->tx_type) {
1645         default:
1646                 LBUG();
1647
1648         case PTLLND_MSG_TYPE_NOOP:
1649         case PTLLND_MSG_TYPE_HELLO:
1650         case PTLLND_MSG_TYPE_IMMEDIATE:
1651                 LASSERT (event->type == PTL_EVENT_UNLINK ||
1652                          event->type == PTL_EVENT_SEND_END);
1653                 LASSERT (isreq);
1654                 break;
1655
1656         case PTLLND_MSG_TYPE_GET:
1657                 LASSERT (event->type == PTL_EVENT_UNLINK ||
1658                          (isreq && event->type == PTL_EVENT_SEND_END) ||
1659                          (isbulk && event->type == PTL_EVENT_PUT_END));
1660
1661                 if (isbulk && !error && event->type == PTL_EVENT_PUT_END) {
1662                         /* Check GET matched */
1663                         if (event->hdr_data == PTLLND_RDMA_OK) {
1664                                 lnet_set_reply_msg_len(ni, 
1665                                                        tx->tx_lnetreplymsg,
1666                                                        event->mlength);
1667                         } else {
1668                                 CERROR ("Unmatched GET with %s\n",
1669                                         libcfs_id2str(tx->tx_peer->plp_id));
1670                                 tx->tx_status = -EIO;
1671                         }
1672                 }
1673                 break;
1674
1675         case PTLLND_MSG_TYPE_PUT:
1676                 LASSERT (event->type == PTL_EVENT_UNLINK ||
1677                          (isreq && event->type == PTL_EVENT_SEND_END) ||
1678                          (isbulk && event->type == PTL_EVENT_GET_END));
1679                 break;
1680
1681         case PTLLND_RDMA_READ:
1682                 LASSERT (event->type == PTL_EVENT_UNLINK ||
1683                          event->type == PTL_EVENT_SEND_END ||
1684                          event->type == PTL_EVENT_REPLY_END);
1685                 LASSERT (isbulk);
1686                 break;
1687
1688         case PTLLND_RDMA_WRITE:
1689                 LASSERT (event->type == PTL_EVENT_UNLINK ||
1690                          event->type == PTL_EVENT_SEND_END);
1691                 LASSERT (isbulk);
1692         }
1693
1694         /* Schedule ptllnd_tx_done() on error or last completion event */
1695         if (error ||
1696             (PtlHandleIsEqual(tx->tx_bulkmdh, PTL_INVALID_HANDLE) &&
1697              PtlHandleIsEqual(tx->tx_reqmdh, PTL_INVALID_HANDLE))) {
1698                 if (error)
1699                         tx->tx_status = -EIO;
1700                 list_del(&tx->tx_list);
1701                 list_add_tail(&tx->tx_list, &plni->plni_zombie_txs);
1702         }
1703 }
1704
1705 ptllnd_tx_t *
1706 ptllnd_find_timed_out_tx(ptllnd_peer_t *peer)
1707 {
1708         time_t            now = cfs_time_current_sec();
1709         struct list_head *tmp;
1710
1711         list_for_each(tmp, &peer->plp_txq) {
1712                 ptllnd_tx_t *tx = list_entry(tmp, ptllnd_tx_t, tx_list);
1713                 
1714                 if (tx->tx_deadline < now)
1715                         return tx;
1716         }
1717         
1718         list_for_each(tmp, &peer->plp_activeq) {
1719                 ptllnd_tx_t *tx = list_entry(tmp, ptllnd_tx_t, tx_list);
1720                 
1721                 if (tx->tx_deadline < now)
1722                         return tx;
1723         }
1724
1725         return NULL;
1726 }
1727
1728 void
1729 ptllnd_check_peer(ptllnd_peer_t *peer)
1730 {
1731         ptllnd_tx_t *tx = ptllnd_find_timed_out_tx(peer);
1732         
1733         if (tx == NULL)
1734                 return;
1735         
1736         CERROR("%s: timed out\n", libcfs_id2str(peer->plp_id));
1737         ptllnd_close_peer(peer, -ETIMEDOUT);
1738 }
1739
1740 void
1741 ptllnd_watchdog (lnet_ni_t *ni, time_t now)
1742 {
1743         ptllnd_ni_t      *plni = ni->ni_data;
1744         const int         n = 4;
1745         int               p = plni->plni_watchdog_interval;
1746         int               chunk = plni->plni_peer_hash_size;
1747         int               interval = now - (plni->plni_watchdog_nextt - p);
1748         int               i;
1749         struct list_head *hashlist;
1750         struct list_head *tmp;
1751         struct list_head *nxt;
1752
1753         /* Time to check for RDMA timeouts on a few more peers: 
1754          * I try to do checks every 'p' seconds on a proportion of the peer
1755          * table and I need to check every connection 'n' times within a
1756          * timeout interval, to ensure I detect a timeout on any connection
1757          * within (n+1)/n times the timeout interval. */
1758
1759         LASSERT (now >= plni->plni_watchdog_nextt);
1760
1761         if (plni->plni_timeout > n * interval) { /* Scan less than the whole table? */
1762                 chunk = (chunk * n * interval) / plni->plni_timeout;
1763                 if (chunk == 0)
1764                         chunk = 1;
1765         }
1766
1767         for (i = 0; i < chunk; i++) {
1768                 hashlist = &plni->plni_peer_hash[plni->plni_watchdog_peeridx];
1769                 
1770                 list_for_each_safe(tmp, nxt, hashlist) {
1771                         ptllnd_check_peer(list_entry(tmp, ptllnd_peer_t, plp_list));
1772                 }
1773                 
1774                 plni->plni_watchdog_peeridx = (plni->plni_watchdog_peeridx + 1) %
1775                                               plni->plni_peer_hash_size;
1776         }
1777
1778         plni->plni_watchdog_nextt = now + p;
1779 }
1780
1781 void
1782 ptllnd_wait (lnet_ni_t *ni, int milliseconds)
1783 {
1784         static struct timeval  prevt;
1785         static int             prevt_count;
1786         static int             call_count;
1787
1788         struct timeval         start;
1789         struct timeval         then;
1790         struct timeval         now;
1791         struct timeval         deadline;
1792         
1793         ptllnd_ni_t   *plni = ni->ni_data;
1794         ptllnd_tx_t   *tx;
1795         ptl_event_t    event;
1796         int            which;
1797         int            rc;
1798         int            found = 0;
1799         int            timeout = 0;
1800
1801         /* Handle any currently queued events, returning immediately if any.
1802          * Otherwise block for the timeout and handle all events queued
1803          * then. */
1804
1805         gettimeofday(&start, NULL);
1806         call_count++;
1807
1808         if (milliseconds <= 0) {
1809                 deadline = start;
1810         } else {
1811                 deadline.tv_sec  = start.tv_sec  +  milliseconds/1000;
1812                 deadline.tv_usec = start.tv_usec + (milliseconds % 1000)*1000;
1813
1814                 if (deadline.tv_usec >= 1000000) {
1815                         start.tv_usec -= 1000000;
1816                         start.tv_sec++;
1817                 }
1818         }
1819
1820         for (;;) {
1821                 gettimeofday(&then, NULL);
1822                 
1823                 rc = PtlEQPoll(&plni->plni_eqh, 1, timeout, &event, &which);
1824
1825                 gettimeofday(&now, NULL);
1826
1827                 if ((now.tv_sec*1000 + now.tv_usec/1000) - 
1828                     (then.tv_sec*1000 + then.tv_usec/1000) > timeout + 1000) {
1829                         /* 1000 mS grace...........................^ */
1830                         CERROR("SLOW PtlEQPoll(%d): %dmS elapsed\n", timeout,
1831                                (int)(now.tv_sec*1000 + now.tv_usec/1000) - 
1832                                (int)(then.tv_sec*1000 + then.tv_usec/1000));
1833                 }
1834
1835                 if (rc == PTL_EQ_EMPTY) {
1836                         if (found)              /* handled some events */
1837                                 break;
1838
1839                         if (now.tv_sec >= plni->plni_watchdog_nextt) { /* check timeouts? */
1840                                 ptllnd_watchdog(ni, now.tv_sec);
1841                                 LASSERT (now.tv_sec < plni->plni_watchdog_nextt);
1842                         }
1843                         
1844                         if (now.tv_sec > deadline.tv_sec || /* timeout expired */
1845                             (now.tv_sec == deadline.tv_sec &&
1846                              now.tv_usec >= deadline.tv_usec))
1847                                 break;
1848
1849                         if (milliseconds < 0 ||
1850                             plni->plni_watchdog_nextt <= deadline.tv_sec)  {
1851                                 timeout = (plni->plni_watchdog_nextt - now.tv_sec)*1000;
1852                         } else {
1853                                 timeout = (deadline.tv_sec - now.tv_sec)*1000 +
1854                                           (deadline.tv_usec - now.tv_usec)/1000;
1855                         }
1856
1857                         continue;
1858                 }
1859                 
1860                 LASSERT (rc == PTL_OK || rc == PTL_EQ_DROPPED);
1861
1862                 if (rc == PTL_EQ_DROPPED)
1863                         CERROR("Event queue: size %d is too small\n",
1864                                plni->plni_eq_size);
1865
1866                 timeout = 0;
1867                 found = 1;
1868
1869                 switch (ptllnd_eventarg2type(event.md.user_ptr)) {
1870                 default:
1871                         LBUG();
1872
1873                 case PTLLND_EVENTARG_TYPE_TX:
1874                         ptllnd_tx_event(ni, &event);
1875                         break;
1876
1877                 case PTLLND_EVENTARG_TYPE_BUF:
1878                         ptllnd_buf_event(ni, &event);
1879                         break;
1880                 }
1881         }
1882
1883         while (!list_empty(&plni->plni_zombie_txs)) {
1884                 tx = list_entry(plni->plni_zombie_txs.next,
1885                                 ptllnd_tx_t, tx_list);
1886                 list_del_init(&tx->tx_list);
1887                 ptllnd_tx_done(tx);
1888         }
1889
1890         if (prevt.tv_sec == 0 ||
1891             prevt.tv_sec != now.tv_sec) {
1892                 PTLLND_HISTORY("%d wait entered at %d.%06d - prev %d %d.%06d", 
1893                                call_count, (int)start.tv_sec, (int)start.tv_usec,
1894                                prevt_count, (int)prevt.tv_sec, (int)prevt.tv_usec);
1895                 prevt = now;
1896         }
1897 }