Whamcloud - gitweb
bc62e80b0b58760f4dc7994f756151dbca0af77d
[fs/lustre-release.git] / lnet / ulnds / ptllnd / ptllnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved.
5  *   Author: Eric Barton <eeb@bartonsoftware.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   This file is confidential source code owned by Cluster File Systems.
11  *   No viewing, modification, compilation, redistribution, or any other
12  *   form of use is permitted except through a signed license agreement.
13  *
14  *   If you have not signed such an agreement, then you have no rights to
15  *   this file.  Please destroy it immediately and contact CFS.
16  *
17  */
18
19 #include "ptllnd.h"
20
21 char *
22 ptllnd_ptlid2str(ptl_process_id_t id)
23 {
24         static char strs[8][32];
25         static int  idx = 0;
26
27         char   *str = strs[idx++];
28         
29         if (idx >= sizeof(strs)/sizeof(strs[0]))
30                 idx = 0;
31
32         snprintf(str, sizeof(strs[0]), FMT_PTLID, id.pid, id.nid);
33         return str;
34 }
35
36 void
37 ptllnd_destroy_peer(ptllnd_peer_t *peer)
38 {
39         lnet_ni_t         *ni = peer->plp_ni;
40         ptllnd_ni_t       *plni = ni->ni_data;
41
42         LASSERT (peer->plp_closing);
43         LASSERT (plni->plni_npeers > 0);
44         LASSERT (list_empty(&peer->plp_txq));
45         LASSERT (list_empty(&peer->plp_activeq));
46         plni->plni_npeers--;
47         LIBCFS_FREE(peer, sizeof(*peer));
48 }
49
50 void
51 ptllnd_abort_txs(ptllnd_ni_t *plni, struct list_head *q)
52 {
53         while (!list_empty(q)) {
54                 ptllnd_tx_t *tx = list_entry(q->next, ptllnd_tx_t, tx_list);
55
56                 tx->tx_status = -ESHUTDOWN;
57                 list_del(&tx->tx_list);
58                 list_add_tail(&tx->tx_list, &plni->plni_zombie_txs);
59         }
60 }
61
62 void
63 ptllnd_close_peer(ptllnd_peer_t *peer, int error)
64 {
65         lnet_ni_t   *ni = peer->plp_ni;
66         ptllnd_ni_t *plni = ni->ni_data;
67
68         if (peer->plp_closing)
69                 return;
70
71         peer->plp_closing = 1;
72
73         if (!list_empty(&peer->plp_txq) ||
74             !list_empty(&peer->plp_activeq) ||
75             error != 0) {
76                 CERROR("Closing %s\n", libcfs_id2str(peer->plp_id));
77                 ptllnd_debug_peer(ni, peer->plp_id);
78         }
79         
80         ptllnd_abort_txs(plni, &peer->plp_txq);
81         ptllnd_abort_txs(plni, &peer->plp_activeq);
82
83         list_del(&peer->plp_list);
84         ptllnd_peer_decref(peer);
85 }
86
87 ptllnd_peer_t *
88 ptllnd_find_peer(lnet_ni_t *ni, lnet_process_id_t id, int create)
89 {
90         ptllnd_ni_t       *plni = ni->ni_data;
91         unsigned int       hash = LNET_NIDADDR(id.nid) % plni->plni_peer_hash_size;
92         struct list_head  *tmp;
93         ptllnd_peer_t     *plp;
94         ptllnd_tx_t       *tx;
95         int                rc;
96
97         LASSERT (LNET_NIDNET(id.nid) == LNET_NIDNET(ni->ni_nid));
98
99         list_for_each(tmp, &plni->plni_peer_hash[hash]) {
100                 plp = list_entry(tmp, ptllnd_peer_t, plp_list);
101
102                 if (plp->plp_id.nid == id.nid &&
103                     plp->plp_id.pid == id.pid) {
104                         ptllnd_peer_addref(plp);
105                         return plp;
106                 }
107         }
108
109         if (!create)
110                 return NULL;
111
112         /* New peer: check first for enough posted buffers */
113         plni->plni_npeers++;
114         rc = ptllnd_grow_buffers(ni);
115         if (rc != 0) {
116                 plni->plni_npeers--;
117                 return NULL;
118         }
119
120         LIBCFS_ALLOC(plp, sizeof(*plp));
121         if (plp == NULL) {
122                 CERROR("Can't allocate new peer %s\n", libcfs_id2str(id));
123                 plni->plni_npeers--;
124                 return NULL;
125         }
126
127         CDEBUG(D_NET, "new peer=%p\n",plp);
128
129         plp->plp_ni = ni;
130         plp->plp_id = id;
131         plp->plp_ptlid.nid = LNET_NIDADDR(id.nid);
132         plp->plp_ptlid.pid = plni->plni_ptllnd_pid;
133         plp->plp_max_credits =
134         plp->plp_credits = 1; /* add more later when she gives me credits */
135         plp->plp_max_msg_size = plni->plni_max_msg_size; /* until I hear from her */
136         plp->plp_outstanding_credits = plni->plni_peer_credits - 1;
137         plp->plp_match = 0;
138         plp->plp_stamp = 0;
139         plp->plp_recvd_hello = 0;
140         plp->plp_closing = 0;
141         plp->plp_refcount = 1;
142         CFS_INIT_LIST_HEAD(&plp->plp_list);
143         CFS_INIT_LIST_HEAD(&plp->plp_txq);
144         CFS_INIT_LIST_HEAD(&plp->plp_activeq);
145
146         ptllnd_peer_addref(plp);
147         list_add_tail(&plp->plp_list, &plni->plni_peer_hash[hash]);
148
149         tx = ptllnd_new_tx(plp, PTLLND_MSG_TYPE_HELLO, 0);
150         if (tx == NULL) {
151                 CERROR("Can't send HELLO to %s\n", libcfs_id2str(id));
152                 ptllnd_close_peer(plp, -ENOMEM);
153                 ptllnd_peer_decref(plp);
154                 return NULL;
155         }
156
157         tx->tx_msg.ptlm_u.hello.kptlhm_matchbits = PTL_RESERVED_MATCHBITS;
158         tx->tx_msg.ptlm_u.hello.kptlhm_max_msg_size = plni->plni_max_msg_size;
159
160         PTLLND_HISTORY("%s[%d/%d]: post hello %p", libcfs_id2str(id),
161                        tx->tx_peer->plp_credits,
162                        tx->tx_peer->plp_outstanding_credits, tx);
163         ptllnd_post_tx(tx);
164
165         return plp;
166 }
167
168 int
169 ptllnd_count_q(struct list_head *q)
170 {
171         struct list_head *e;
172         int               n = 0;
173         
174         list_for_each(e, q) {
175                 n++;
176         }
177         
178         return n;
179 }
180
181 const char *
182 ptllnd_tx_typestr(int type) 
183 {
184         switch (type) {
185         case PTLLND_RDMA_WRITE:
186                 return "rdma_write";
187                 
188         case PTLLND_RDMA_READ:
189                 return "rdma_read";
190
191         case PTLLND_MSG_TYPE_PUT:
192                 return "put_req";
193                 
194         case PTLLND_MSG_TYPE_GET:
195                 return "get_req";
196
197         case PTLLND_MSG_TYPE_IMMEDIATE:
198                 return "immediate";
199
200         case PTLLND_MSG_TYPE_NOOP:
201                 return "noop";
202
203         case PTLLND_MSG_TYPE_HELLO:
204                 return "hello";
205
206         default:
207                 return "<unknown>";
208         }
209 }
210
211 void
212 ptllnd_debug_tx(ptllnd_tx_t *tx) 
213 {
214         CDEBUG(D_WARNING, "%s %s b "DBGT_FMT"/"DBGT_FMT
215                " r "DBGT_FMT"/"DBGT_FMT" status %d\n",
216                ptllnd_tx_typestr(tx->tx_type),
217                libcfs_id2str(tx->tx_peer->plp_id)
218                DBGT_ARGS(tx->tx_bulk_posted) DBGT_ARGS(tx->tx_bulk_done)
219                DBGT_ARGS(tx->tx_req_posted) DBGT_ARGS(tx->tx_req_done),
220                tx->tx_status);
221 }
222
223 void
224 ptllnd_debug_peer(lnet_ni_t *ni, lnet_process_id_t id)
225 {
226         ptllnd_peer_t    *plp = ptllnd_find_peer(ni, id, 0);
227         struct list_head *tmp;
228         ptllnd_ni_t      *plni = ni->ni_data;
229         ptllnd_tx_t      *tx;
230         
231         if (plp == NULL) {
232                 CDEBUG(D_WARNING, "No peer %s\n", libcfs_id2str(id));
233                 return;
234         }
235         
236         CDEBUG(D_WARNING, "%s %s%s [%d] "LPD64".%06d m "LPD64" q %d/%d c %d/%d(%d)\n",
237                libcfs_id2str(id), 
238                plp->plp_recvd_hello ? "H" : "_",
239                plp->plp_closing     ? "C" : "_",
240                plp->plp_refcount,
241                plp->plp_stamp / 1000000, (int)(plp->plp_stamp % 1000000),
242                plp->plp_match,
243                ptllnd_count_q(&plp->plp_txq),
244                ptllnd_count_q(&plp->plp_activeq),
245                plp->plp_credits, plp->plp_outstanding_credits, plp->plp_max_credits);
246
247         CDEBUG(D_WARNING, "txq:\n");
248         list_for_each (tmp, &plp->plp_txq) {
249                 tx = list_entry(tmp, ptllnd_tx_t, tx_list);
250                 
251                 ptllnd_debug_tx(tx);
252         }
253
254         CDEBUG(D_WARNING, "activeq:\n");
255         list_for_each (tmp, &plp->plp_activeq) {
256                 tx = list_entry(tmp, ptllnd_tx_t, tx_list);
257                 
258                 ptllnd_debug_tx(tx);
259         }
260
261         CDEBUG(D_WARNING, "zombies:\n");
262         list_for_each (tmp, &plni->plni_zombie_txs) {
263                 tx = list_entry(tmp, ptllnd_tx_t, tx_list);
264                 
265                 if (tx->tx_peer->plp_id.nid == id.nid &&
266                     tx->tx_peer->plp_id.pid == id.pid)
267                         ptllnd_debug_tx(tx);
268         }
269         
270         CDEBUG(D_WARNING, "history:\n");
271         list_for_each (tmp, &plni->plni_tx_history) {
272                 tx = list_entry(tmp, ptllnd_tx_t, tx_list);
273                 
274                 if (tx->tx_peer->plp_id.nid == id.nid &&
275                     tx->tx_peer->plp_id.pid == id.pid)
276                         ptllnd_debug_tx(tx);
277         }
278         
279         ptllnd_peer_decref(plp);
280         ptllnd_dump_history();
281 }
282
283 void
284 ptllnd_notify(lnet_ni_t *ni, lnet_nid_t nid, int alive)
285 {
286         lnet_process_id_t  id;
287         ptllnd_peer_t     *peer;
288         time_t             start = cfs_time_current_sec();
289         int                w = PTLLND_WARN_LONG_WAIT;
290         
291         /* This is only actually used to connect to routers at startup! */
292         if (!alive) {
293                 LBUG();
294                 return;
295         }
296
297         id.nid = nid;
298         id.pid = LUSTRE_SRV_LNET_PID;
299         
300         peer = ptllnd_find_peer(ni, id, 1);
301         if (peer == NULL)
302                 return;
303
304         /* wait for the peer to reply */
305         while (!peer->plp_recvd_hello) {
306                 if (cfs_time_current_sec() > start + w) {
307                         CWARN("Waited %ds to connect to %s\n",
308                               w, libcfs_id2str(id));
309                         w *= 2;
310                 }
311                 
312                 ptllnd_wait(ni, w*1000);
313         }
314         
315         ptllnd_peer_decref(peer);
316 }
317
318 __u32
319 ptllnd_cksum (void *ptr, int nob)
320 {
321         char  *c  = ptr;
322         __u32  sum = 0;
323
324         while (nob-- > 0)
325                 sum = ((sum << 1) | (sum >> 31)) + *c++;
326
327         /* ensure I don't return 0 (== no checksum) */
328         return (sum == 0) ? 1 : sum;
329 }
330
331 ptllnd_tx_t *
332 ptllnd_new_tx(ptllnd_peer_t *peer, int type, int payload_nob)
333 {
334         lnet_ni_t   *ni = peer->plp_ni;
335         ptllnd_ni_t *plni = ni->ni_data;
336         ptllnd_tx_t *tx;
337         int          msgsize;
338
339         CDEBUG(D_NET, "peer=%p type=%d payload=%d\n",peer,type,payload_nob);
340
341         switch (type) {
342         default:
343                 LBUG();
344
345         case PTLLND_RDMA_WRITE:
346         case PTLLND_RDMA_READ:
347                 LASSERT (payload_nob == 0);
348                 msgsize = 0;
349                 break;
350
351         case PTLLND_MSG_TYPE_PUT:
352         case PTLLND_MSG_TYPE_GET:
353                 LASSERT (payload_nob == 0);
354                 msgsize = offsetof(kptl_msg_t, ptlm_u) + 
355                           sizeof(kptl_rdma_msg_t);
356                 break;
357
358         case PTLLND_MSG_TYPE_IMMEDIATE:
359                 msgsize = offsetof(kptl_msg_t,
360                                    ptlm_u.immediate.kptlim_payload[payload_nob]);
361                 break;
362
363         case PTLLND_MSG_TYPE_NOOP:
364                 LASSERT (payload_nob == 0);
365                 msgsize = offsetof(kptl_msg_t, ptlm_u);
366                 break;
367
368         case PTLLND_MSG_TYPE_HELLO:
369                 LASSERT (payload_nob == 0);
370                 msgsize = offsetof(kptl_msg_t, ptlm_u) +
371                           sizeof(kptl_hello_msg_t);
372                 break;
373         }
374
375         msgsize = (msgsize + 7) & ~7;
376         LASSERT (msgsize <= peer->plp_max_msg_size);
377
378         CDEBUG(D_NET, "msgsize=%d\n",msgsize);
379
380         LIBCFS_ALLOC(tx, offsetof(ptllnd_tx_t, tx_msg) + msgsize);
381
382         if (tx == NULL) {
383                 CERROR("Can't allocate msg type %d for %s\n",
384                        type, libcfs_id2str(peer->plp_id));
385                 return NULL;
386         }
387
388         CFS_INIT_LIST_HEAD(&tx->tx_list);
389         tx->tx_peer = peer;
390         tx->tx_type = type;
391         tx->tx_lnetmsg = tx->tx_lnetreplymsg = NULL;
392         tx->tx_niov = 0;
393         tx->tx_iov = NULL;
394         tx->tx_reqmdh = PTL_INVALID_HANDLE;
395         tx->tx_bulkmdh = PTL_INVALID_HANDLE;
396         tx->tx_msgsize = msgsize;
397         tx->tx_completing = 0;
398         tx->tx_status = 0;
399
400         PTLLND_DBGT_INIT(tx->tx_bulk_posted);
401         PTLLND_DBGT_INIT(tx->tx_bulk_done);
402         PTLLND_DBGT_INIT(tx->tx_req_posted);
403         PTLLND_DBGT_INIT(tx->tx_req_done);
404
405         if (msgsize != 0) {
406                 tx->tx_msg.ptlm_magic = PTLLND_MSG_MAGIC;
407                 tx->tx_msg.ptlm_version = PTLLND_MSG_VERSION;
408                 tx->tx_msg.ptlm_type = type;
409                 tx->tx_msg.ptlm_credits = 0;
410                 tx->tx_msg.ptlm_nob = msgsize;
411                 tx->tx_msg.ptlm_cksum = 0;
412                 tx->tx_msg.ptlm_srcnid = ni->ni_nid;
413                 tx->tx_msg.ptlm_srcstamp = plni->plni_stamp;
414                 tx->tx_msg.ptlm_dstnid = peer->plp_id.nid;
415                 tx->tx_msg.ptlm_dststamp = peer->plp_stamp;
416                 tx->tx_msg.ptlm_srcpid = the_lnet.ln_pid;
417                 tx->tx_msg.ptlm_dstpid = peer->plp_id.pid;
418         }
419
420         ptllnd_peer_addref(peer);
421         plni->plni_ntxs++;
422
423         CDEBUG(D_NET, "tx=%p\n",tx);
424
425         return tx;
426 }
427
428 void
429 ptllnd_abort_tx(ptllnd_tx_t *tx, ptl_handle_md_t *mdh)
430 {
431         ptllnd_peer_t   *peer = tx->tx_peer;
432         lnet_ni_t       *ni = peer->plp_ni;
433         int              rc;
434         time_t           start = cfs_time_current_sec();
435         int              w = PTLLND_WARN_LONG_WAIT;
436
437         while (!PtlHandleIsEqual(*mdh, PTL_INVALID_HANDLE)) {
438                 rc = PtlMDUnlink(*mdh);
439 #ifndef LUSTRE_PORTALS_UNLINK_SEMANTICS
440                 if (rc == PTL_OK) /* unlink successful => no unlinked event */
441                         return;
442                 LASSERT (rc == PTL_MD_IN_USE);
443 #endif
444                 if (cfs_time_current_sec() > start + w) {
445                         CWARN("Waited %ds to abort tx to %s\n",
446                               w, libcfs_id2str(peer->plp_id));
447                         w *= 2;
448                 }
449                 /* Wait for ptllnd_tx_event() to invalidate */
450                 ptllnd_wait(ni, w*1000);
451         }
452 }
453
454 void
455 ptllnd_cull_tx_history(ptllnd_ni_t *plni)
456 {
457         int max = plni->plni_max_tx_history;
458
459         while (plni->plni_ntx_history > max) {
460                 ptllnd_tx_t *tx = list_entry(plni->plni_tx_history.next, 
461                                              ptllnd_tx_t, tx_list);
462                 list_del(&tx->tx_list);
463
464                 ptllnd_peer_decref(tx->tx_peer);
465
466                 LIBCFS_FREE(tx, offsetof(ptllnd_tx_t, tx_msg) + tx->tx_msgsize);
467
468                 LASSERT (plni->plni_ntxs > 0);
469                 plni->plni_ntxs--;
470                 plni->plni_ntx_history--;
471         }
472 }
473
474 void
475 ptllnd_tx_done(ptllnd_tx_t *tx)
476 {
477         ptllnd_peer_t   *peer = tx->tx_peer;
478         lnet_ni_t       *ni = peer->plp_ni;
479         ptllnd_ni_t     *plni = ni->ni_data;
480
481         /* CAVEAT EMPTOR: If this tx is being aborted, I'll continue to get
482          * events for this tx until it's unlinked.  So I set tx_completing to
483          * flag the tx is getting handled */
484
485         if (tx->tx_completing)
486                 return;
487
488         tx->tx_completing = 1;
489
490         if (!list_empty(&tx->tx_list))
491                 list_del_init(&tx->tx_list);
492
493         if (tx->tx_status != 0) {
494                 CERROR("Completing tx with error\n");
495                 ptllnd_debug_tx(tx);
496                 ptllnd_close_peer(peer, tx->tx_status);
497         }
498         
499         ptllnd_abort_tx(tx, &tx->tx_reqmdh);
500         ptllnd_abort_tx(tx, &tx->tx_bulkmdh);
501
502         if (tx->tx_niov > 0) {
503                 LIBCFS_FREE(tx->tx_iov, tx->tx_niov * sizeof(*tx->tx_iov));
504                 tx->tx_niov = 0;
505         }
506
507         if (tx->tx_lnetreplymsg != NULL) {
508                 LASSERT (tx->tx_type == PTLLND_MSG_TYPE_GET);
509                 LASSERT (tx->tx_lnetmsg != NULL);
510                 /* Simulate GET success always  */
511                 lnet_finalize(ni, tx->tx_lnetmsg, 0);
512                 CDEBUG(D_NET, "lnet_finalize(tx_lnetreplymsg=%p)\n",tx->tx_lnetreplymsg);
513                 lnet_finalize(ni, tx->tx_lnetreplymsg, tx->tx_status);
514         } else if (tx->tx_lnetmsg != NULL) {
515                 lnet_finalize(ni, tx->tx_lnetmsg, tx->tx_status);
516         }
517
518         plni->plni_ntx_history++;
519         list_add_tail(&tx->tx_list, &plni->plni_tx_history);
520         
521         ptllnd_cull_tx_history(plni);
522 }
523
524 int
525 ptllnd_set_txiov(ptllnd_tx_t *tx,
526                  unsigned int niov, struct iovec *iov,
527                  unsigned int offset, unsigned int len)
528 {
529         ptl_md_iovec_t *piov;
530         int             npiov;
531
532         if (len == 0) {
533                 tx->tx_niov = 0;
534                 return 0;
535         }
536
537         CDEBUG(D_NET, "niov  =%d\n",niov);
538         CDEBUG(D_NET, "offset=%d\n",offset);
539         CDEBUG(D_NET, "len   =%d\n",len);
540
541
542         /*
543          * Remove iovec's at the beginning that
544          * are skipped because of the offset.
545          * Adjust the offset accordingly
546          */
547         for (;;) {
548                 LASSERT (niov > 0);
549                 if (offset < iov->iov_len)
550                         break;
551                 offset -= iov->iov_len;
552                 niov--;
553                 iov++;
554         }
555
556         CDEBUG(D_NET, "niov  =%d (after)\n",niov);
557         CDEBUG(D_NET, "offset=%d (after)\n",offset);
558         CDEBUG(D_NET, "len   =%d (after)\n",len);
559
560         for (;;) {
561                 int temp_offset = offset;
562                 int resid = len;
563                 LIBCFS_ALLOC(piov, niov * sizeof(*piov));
564                 if (piov == NULL)
565                         return -ENOMEM;
566
567                 for (npiov = 0;; npiov++) {
568                         CDEBUG(D_NET, "npiov=%d\n",npiov);
569                         CDEBUG(D_NET, "offset=%d\n",temp_offset);
570                         CDEBUG(D_NET, "len=%d\n",resid);
571                         CDEBUG(D_NET, "iov[npiov].iov_len=%lu\n",iov[npiov].iov_len);
572
573                         LASSERT (npiov < niov);
574                         LASSERT (iov->iov_len >= temp_offset);
575
576                         piov[npiov].iov_base = iov[npiov].iov_base + temp_offset;
577                         piov[npiov].iov_len = iov[npiov].iov_len - temp_offset;
578                         
579                         if (piov[npiov].iov_len >= resid) {
580                                 piov[npiov].iov_len = resid;
581                                 npiov++;
582                                 break;
583                         }
584                         resid -= piov[npiov].iov_len;
585                         temp_offset = 0;
586                 }
587
588                 if (npiov == niov) {
589                         tx->tx_niov = niov;
590                         tx->tx_iov = piov;
591                         CDEBUG(D_NET, "tx->tx_iov=%p\n",tx->tx_iov);
592                         CDEBUG(D_NET, "tx->tx_niov=%d\n",tx->tx_niov);
593                         return 0;
594                 }
595
596                 /* Dang! The piov I allocated was too big and it's a drag to
597                  * have to maintain separate 'allocated' and 'used' sizes, so
598                  * I'll just do it again; NB this doesn't happen normally... */
599                 LIBCFS_FREE(piov, niov * sizeof(*piov));
600                 niov = npiov;
601         }
602 }
603
604 void
605 ptllnd_set_md_buffer(ptl_md_t *md, ptllnd_tx_t *tx)
606 {
607         unsigned int    niov = tx->tx_niov;
608         ptl_md_iovec_t *iov = tx->tx_iov;
609
610         LASSERT ((md->options & PTL_MD_IOVEC) == 0);
611
612         if (niov == 0) {
613                 md->start = NULL;
614                 md->length = 0;
615         } else if (niov == 1) {
616                 md->start = iov[0].iov_base;
617                 md->length = iov[0].iov_len;
618         } else {
619                 md->start = iov;
620                 md->length = niov;
621                 md->options |= PTL_MD_IOVEC;
622         }
623 }
624
625 int
626 ptllnd_post_buffer(ptllnd_buffer_t *buf)
627 {
628         lnet_ni_t        *ni = buf->plb_ni;
629         ptllnd_ni_t      *plni = ni->ni_data;
630         ptl_process_id_t  anyid = {
631                 .nid       = PTL_NID_ANY,
632                 .pid       = PTL_PID_ANY};
633         ptl_md_t          md = {
634                 .start     = buf->plb_buffer,
635                 .length    = plni->plni_buffer_size,
636                 .threshold = PTL_MD_THRESH_INF,
637                 .max_size  = plni->plni_max_msg_size,
638                 .options   = (PTLLND_MD_OPTIONS |
639                               PTL_MD_OP_PUT | PTL_MD_MAX_SIZE | 
640                               PTL_MD_LOCAL_ALIGN8),
641                 .user_ptr  = ptllnd_obj2eventarg(buf, PTLLND_EVENTARG_TYPE_BUF),
642                 .eq_handle = plni->plni_eqh};
643         ptl_handle_me_t meh;
644         int             rc;
645
646         LASSERT (!buf->plb_posted);
647
648         rc = PtlMEAttach(plni->plni_nih, plni->plni_portal,
649                          anyid, LNET_MSG_MATCHBITS, 0,
650                          PTL_UNLINK, PTL_INS_AFTER, &meh);
651         if (rc != PTL_OK) {
652                 CERROR("PtlMEAttach failed: %d\n", rc);
653                 return -ENOMEM;
654         }
655
656         buf->plb_posted = 1;
657         plni->plni_nposted_buffers++;
658
659         rc = PtlMDAttach(meh, md, LNET_UNLINK, &buf->plb_md);
660         if (rc == PTL_OK)
661                 return 0;
662
663         CERROR("PtlMDAttach failed: %d\n", rc);
664
665         buf->plb_posted = 0;
666         plni->plni_nposted_buffers--;
667
668         rc = PtlMEUnlink(meh);
669         LASSERT (rc == PTL_OK);
670
671         return -ENOMEM;
672 }
673
674 void
675 ptllnd_check_sends(ptllnd_peer_t *peer)
676 {
677         lnet_ni_t      *ni = peer->plp_ni;
678         ptllnd_ni_t    *plni = ni->ni_data;
679         ptllnd_tx_t    *tx;
680         ptl_md_t        md;
681         ptl_handle_md_t mdh;
682         int             rc;
683
684         CDEBUG(D_NET, "plp_outstanding_credits=%d\n",peer->plp_outstanding_credits);
685
686         if (list_empty(&peer->plp_txq) &&
687             peer->plp_outstanding_credits >= PTLLND_CREDIT_HIGHWATER(plni) &&
688             peer->plp_credits != 0) {
689
690                 tx = ptllnd_new_tx(peer, PTLLND_MSG_TYPE_NOOP, 0);
691                 CDEBUG(D_NET, "NOOP tx=%p\n",tx);
692                 if (tx == NULL) {
693                         CERROR("Can't return credits to %s\n",
694                                libcfs_id2str(peer->plp_id));
695                 } else {
696                         list_add_tail(&tx->tx_list, &peer->plp_txq);
697                 }
698         }
699
700         while (!list_empty(&peer->plp_txq)) {
701                 tx = list_entry(peer->plp_txq.next, ptllnd_tx_t, tx_list);
702
703                 CDEBUG(D_NET, "Looking at TX=%p\n",tx);
704                 CDEBUG(D_NET, "plp_credits=%d\n",peer->plp_credits);
705                 CDEBUG(D_NET, "plp_outstanding_credits=%d\n",peer->plp_outstanding_credits);
706
707                 LASSERT (tx->tx_msgsize > 0);
708
709                 LASSERT (peer->plp_outstanding_credits >= 0);
710                 LASSERT (peer->plp_outstanding_credits <=
711                          plni->plni_peer_credits);
712                 LASSERT (peer->plp_credits >= 0);
713                 LASSERT (peer->plp_credits <= peer->plp_max_credits);
714
715                 if (peer->plp_credits == 0) {   /* no credits */
716                         PTLLND_HISTORY("%s[%d/%d]: no creds for %p",
717                                        libcfs_id2str(peer->plp_id),
718                                        peer->plp_credits,
719                                        peer->plp_outstanding_credits, tx);
720                         break;
721                 }
722                 
723                 if (peer->plp_credits == 1 &&   /* last credit reserved for */
724                     peer->plp_outstanding_credits == 0) { /* returning credits */
725                         PTLLND_HISTORY("%s[%d/%d]: too few creds for %p",
726                                        libcfs_id2str(peer->plp_id),
727                                        peer->plp_credits,
728                                        peer->plp_outstanding_credits, tx);
729                         break;
730                 }
731                 
732                 list_del(&tx->tx_list);
733                 list_add_tail(&tx->tx_list, &peer->plp_activeq);
734
735                 CDEBUG(D_NET, "Sending at TX=%p type=%s (%d)\n",tx,
736                         ptllnd_msgtype2str(tx->tx_type),tx->tx_type);
737
738                 if (tx->tx_type == PTLLND_MSG_TYPE_NOOP &&
739                     (!list_empty(&peer->plp_txq) ||
740                      peer->plp_outstanding_credits <
741                      PTLLND_CREDIT_HIGHWATER(plni))) {
742                         /* redundant NOOP */
743                         ptllnd_tx_done(tx);
744                         continue;
745                 }
746
747                 /* Set stamp at the last minute; on a new peer, I don't know it
748                  * until I receive the HELLO back */
749                 tx->tx_msg.ptlm_dststamp = peer->plp_stamp;
750
751                 CDEBUG(D_NET, "Returning %d to peer\n",peer->plp_outstanding_credits);
752
753                 /*
754                  * Return all the credits we have
755                  */
756                 tx->tx_msg.ptlm_credits = peer->plp_outstanding_credits;
757                 peer->plp_outstanding_credits = 0;
758
759                 /*
760                  * One less credit
761                  */
762                 peer->plp_credits--;
763
764                 if (plni->plni_checksum)
765                         tx->tx_msg.ptlm_cksum = 
766                                 ptllnd_cksum(&tx->tx_msg,
767                                              offsetof(kptl_msg_t, ptlm_u));
768
769                 md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);
770                 md.eq_handle = plni->plni_eqh;
771                 md.threshold = 1;
772                 md.options = PTLLND_MD_OPTIONS;
773                 md.start = &tx->tx_msg;
774                 md.length = tx->tx_msgsize;
775
776                 rc = PtlMDBind(plni->plni_nih, md, LNET_UNLINK, &mdh);
777                 if (rc != PTL_OK) {
778                         CERROR("PtlMDBind for %s failed: %d\n",
779                                libcfs_id2str(peer->plp_id), rc);
780                         tx->tx_status = -EIO;
781                         ptllnd_tx_done(tx);
782                         break;
783                 }
784
785                 tx->tx_reqmdh = mdh;
786                 PTLLND_DBGT_STAMP(tx->tx_req_posted);
787
788                 PTLLND_HISTORY("%s[%d/%d]: %s %p c %d", libcfs_id2str(peer->plp_id),
789                                peer->plp_credits, peer->plp_outstanding_credits,
790                                ptllnd_msgtype2str(tx->tx_type), tx,
791                                tx->tx_msg.ptlm_credits);
792
793                 rc = PtlPut(mdh, PTL_NOACK_REQ, peer->plp_ptlid,
794                             plni->plni_portal, 0, LNET_MSG_MATCHBITS, 0, 0);
795                 if (rc != PTL_OK) {
796                         CERROR("PtlPut for %s failed: %d\n",
797                                libcfs_id2str(peer->plp_id), rc);
798                         tx->tx_status = -EIO;
799                         ptllnd_tx_done(tx);
800                         break;
801                 }
802         }
803 }
804
805 int
806 ptllnd_passive_rdma(ptllnd_peer_t *peer, int type, lnet_msg_t *msg,
807                     unsigned int niov, struct iovec *iov,
808                     unsigned int offset, unsigned int len)
809 {
810         lnet_ni_t      *ni = peer->plp_ni;
811         ptllnd_ni_t    *plni = ni->ni_data;
812         ptllnd_tx_t    *tx = ptllnd_new_tx(peer, type, 0);
813         __u64           matchbits;
814         ptl_md_t        md;
815         ptl_handle_md_t mdh;
816         ptl_handle_me_t meh;
817         int             rc;
818         int             rc2;
819         time_t          start;
820         int             w;
821
822         CDEBUG(D_NET, "niov=%d offset=%d len=%d\n",niov,offset,len);
823
824         LASSERT (type == PTLLND_MSG_TYPE_GET ||
825                  type == PTLLND_MSG_TYPE_PUT);
826
827         if (tx == NULL) {
828                 CERROR("Can't allocate %s tx for %s\n",
829                        type == PTLLND_MSG_TYPE_GET ? "GET" : "PUT/REPLY",
830                        libcfs_id2str(peer->plp_id));
831                 return -ENOMEM;
832         }
833
834         rc = ptllnd_set_txiov(tx, niov, iov, offset, len);
835         if (rc != 0) {
836                 CERROR ("Can't allocate iov %d for %s\n",
837                         niov, libcfs_id2str(peer->plp_id));
838                 rc = -ENOMEM;
839                 goto failed;
840         }
841
842         md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);
843         md.eq_handle = plni->plni_eqh;
844         md.threshold = 1;
845         md.max_size = 0;
846         md.options = PTLLND_MD_OPTIONS;
847         if(type == PTLLND_MSG_TYPE_GET)
848                 md.options |= PTL_MD_OP_PUT | PTL_MD_ACK_DISABLE;
849         else
850                 md.options |= PTL_MD_OP_GET;
851         ptllnd_set_md_buffer(&md, tx);
852
853         start = cfs_time_current_sec();
854         w = PTLLND_WARN_LONG_WAIT;
855
856         while (!peer->plp_recvd_hello) {        /* wait to validate plp_match */
857                 if (peer->plp_closing) {
858                         rc = -EIO;
859                         goto failed;
860                 }
861                 if (cfs_time_current_sec() > start + w) {
862                         CWARN("Waited %ds to connect to %s\n",
863                               w, libcfs_id2str(peer->plp_id));
864                         w *= 2;
865                 }
866                 ptllnd_wait(ni, w*1000);
867         }
868
869         if (peer->plp_match < PTL_RESERVED_MATCHBITS)
870                 peer->plp_match = PTL_RESERVED_MATCHBITS;
871         matchbits = peer->plp_match++;
872         CDEBUG(D_NET, "matchbits " LPX64 " %s\n", matchbits,
873                ptllnd_ptlid2str(peer->plp_ptlid));
874
875         rc = PtlMEAttach(plni->plni_nih, plni->plni_portal, peer->plp_ptlid,
876                          matchbits, 0, PTL_UNLINK, PTL_INS_BEFORE, &meh);
877         if (rc != PTL_OK) {
878                 CERROR("PtlMEAttach for %s failed: %d\n",
879                        libcfs_id2str(peer->plp_id), rc);
880                 rc = -EIO;
881                 goto failed;
882         }
883
884         CDEBUG(D_NET, "md.start=%p\n",md.start);
885         CDEBUG(D_NET, "md.length=%llu\n",md.length);
886         CDEBUG(D_NET, "md.threshold=%d\n",md.threshold);
887         CDEBUG(D_NET, "md.max_size=%d\n",md.max_size);
888         CDEBUG(D_NET, "md.options=0x%x\n",md.options);
889         CDEBUG(D_NET, "md.user_ptr=%p\n",md.user_ptr);
890
891         PTLLND_DBGT_STAMP(tx->tx_bulk_posted);
892
893         rc = PtlMDAttach(meh, md, LNET_UNLINK, &mdh);
894         if (rc != PTL_OK) {
895                 CERROR("PtlMDAttach for %s failed: %d\n",
896                        libcfs_id2str(peer->plp_id), rc);
897                 rc2 = PtlMEUnlink(meh);
898                 LASSERT (rc2 == PTL_OK);
899                 rc = -EIO;
900                 goto failed;
901         }
902         tx->tx_bulkmdh = mdh;
903
904         /*
905          * We need to set the stamp here because it
906          * we could have received a HELLO above that set
907          * peer->plp_stamp
908          */
909         tx->tx_msg.ptlm_dststamp = peer->plp_stamp;
910
911         tx->tx_msg.ptlm_u.rdma.kptlrm_hdr = msg->msg_hdr;
912         tx->tx_msg.ptlm_u.rdma.kptlrm_matchbits = matchbits;
913
914         if (type == PTLLND_MSG_TYPE_GET) {
915                 tx->tx_lnetreplymsg = lnet_create_reply_msg(ni, msg);
916                 if (tx->tx_lnetreplymsg == NULL) {
917                         CERROR("Can't create reply for GET to %s\n",
918                                libcfs_id2str(msg->msg_target));
919                         rc = -ENOMEM;
920                         goto failed;
921                 }
922         }
923
924         tx->tx_lnetmsg = msg;
925         PTLLND_HISTORY("%s[%d/%d]: post passive %s p %d %p",
926                        libcfs_id2str(msg->msg_target),
927                        peer->plp_credits, peer->plp_outstanding_credits,
928                        lnet_msgtyp2str(msg->msg_type),
929                        (le32_to_cpu(msg->msg_type) == LNET_MSG_PUT) ? 
930                        le32_to_cpu(msg->msg_hdr.msg.put.ptl_index) :
931                        (le32_to_cpu(msg->msg_type) == LNET_MSG_GET) ? 
932                        le32_to_cpu(msg->msg_hdr.msg.get.ptl_index) : -1,
933                        tx);
934         ptllnd_post_tx(tx);
935         return 0;
936
937  failed:
938         ptllnd_tx_done(tx);
939         return rc;
940 }
941
942 int
943 ptllnd_active_rdma(ptllnd_peer_t *peer, int type,
944                    lnet_msg_t *msg, __u64 matchbits,
945                    unsigned int niov, struct iovec *iov,
946                    unsigned int offset, unsigned int len)
947 {
948         lnet_ni_t       *ni = peer->plp_ni;
949         ptllnd_ni_t     *plni = ni->ni_data;
950         ptllnd_tx_t     *tx = ptllnd_new_tx(peer, type, 0);
951         ptl_md_t         md;
952         ptl_handle_md_t  mdh;
953         int              rc;
954
955         LASSERT (type == PTLLND_RDMA_READ ||
956                  type == PTLLND_RDMA_WRITE);
957
958         if (tx == NULL) {
959                 CERROR("Can't allocate tx for RDMA %s with %s\n",
960                        (type == PTLLND_RDMA_WRITE) ? "write" : "read",
961                        libcfs_id2str(peer->plp_id));
962                 ptllnd_close_peer(peer, -ENOMEM);
963                 return -ENOMEM;
964         }
965
966         rc = ptllnd_set_txiov(tx, niov, iov, offset, len);
967         if (rc != 0) {
968                 CERROR ("Can't allocate iov %d for %s\n",
969                         niov, libcfs_id2str(peer->plp_id));
970                 rc = -ENOMEM;
971                 goto failed;
972         }
973
974         md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);
975         md.eq_handle = plni->plni_eqh;
976         md.max_size = 0;
977         md.options = PTLLND_MD_OPTIONS;
978         md.threshold = (type == PTLLND_RDMA_READ) ? 2 : 1;
979
980         ptllnd_set_md_buffer(&md, tx);
981
982         rc = PtlMDBind(plni->plni_nih, md, LNET_UNLINK, &mdh);
983         if (rc != PTL_OK) {
984                 CERROR("PtlMDBind for %s failed: %d\n",
985                        libcfs_id2str(peer->plp_id), rc);
986                 rc = -EIO;
987                 goto failed;
988         }
989
990         tx->tx_bulkmdh = mdh;
991         tx->tx_lnetmsg = msg;
992
993         list_add_tail(&tx->tx_list, &peer->plp_activeq);
994         PTLLND_DBGT_STAMP(tx->tx_bulk_posted);
995
996         if (type == PTLLND_RDMA_READ)
997                 rc = PtlGet(mdh, peer->plp_ptlid,
998                             plni->plni_portal, 0, matchbits, 0);
999         else
1000                 rc = PtlPut(mdh, PTL_NOACK_REQ, peer->plp_ptlid,
1001                             plni->plni_portal, 0, matchbits, 0, 
1002                             (msg == NULL) ? PTLLND_RDMA_FAIL : PTLLND_RDMA_OK);
1003
1004         if (rc == PTL_OK)
1005                 return 0;
1006
1007         CERROR("Can't initiate RDMA with %s: %d\n",
1008                libcfs_id2str(peer->plp_id), rc);
1009
1010         tx->tx_lnetmsg = NULL;
1011  failed:
1012         tx->tx_status = rc;
1013         ptllnd_tx_done(tx);    /* this will close peer */
1014         return rc;
1015 }
1016
1017 int
1018 ptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *msg)
1019 {
1020         ptllnd_ni_t    *plni = ni->ni_data;
1021         ptllnd_peer_t  *plp;
1022         ptllnd_tx_t    *tx;
1023         int             nob;
1024         int             rc;
1025
1026         LASSERT (!msg->msg_routing);
1027         LASSERT (msg->msg_kiov == NULL);
1028
1029         LASSERT (msg->msg_niov <= PTL_MD_MAX_IOV); /* !!! */
1030
1031         CDEBUG(D_NET, "%s [%d]+%d,%d -> %s%s\n", 
1032                lnet_msgtyp2str(msg->msg_type),
1033                msg->msg_niov, msg->msg_offset, msg->msg_len,
1034                libcfs_nid2str(msg->msg_target.nid),
1035                msg->msg_target_is_router ? "(rtr)" : "");
1036
1037         if ((msg->msg_target.pid & LNET_PID_USERFLAG) != 0) {
1038                 CERROR("Can't send to non-kernel peer %s\n",
1039                        libcfs_id2str(msg->msg_target));
1040                 return -EHOSTUNREACH;
1041         }
1042         
1043         plp = ptllnd_find_peer(ni, msg->msg_target, 1);
1044         if (plp == NULL)
1045                 return -ENOMEM;
1046
1047         switch (msg->msg_type) {
1048         default:
1049                 LBUG();
1050
1051         case LNET_MSG_ACK:
1052                 CDEBUG(D_NET, "LNET_MSG_ACK\n");
1053
1054                 LASSERT (msg->msg_len == 0);
1055                 break;                          /* send IMMEDIATE */
1056
1057         case LNET_MSG_GET:
1058                 CDEBUG(D_NET, "LNET_MSG_GET nob=%d\n",msg->msg_md->md_length);
1059
1060                 if (msg->msg_target_is_router)
1061                         break;                  /* send IMMEDIATE */
1062
1063                 nob = msg->msg_md->md_length;
1064                 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[nob]);
1065                 if (nob <= plni->plni_max_msg_size)
1066                         break;
1067
1068                 LASSERT ((msg->msg_md->md_options & LNET_MD_KIOV) == 0);
1069                 rc = ptllnd_passive_rdma(plp, PTLLND_MSG_TYPE_GET, msg,
1070                                          msg->msg_md->md_niov,
1071                                          msg->msg_md->md_iov.iov,
1072                                          0, msg->msg_md->md_length);
1073                 ptllnd_peer_decref(plp);
1074                 return rc;
1075
1076         case LNET_MSG_REPLY:
1077         case LNET_MSG_PUT:
1078                 CDEBUG(D_NET, "LNET_MSG_PUT nob=%d\n",msg->msg_len);
1079                 nob = msg->msg_len;
1080                 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[nob]);
1081                 CDEBUG(D_NET, "msg_size=%d max=%d\n",msg->msg_len,plp->plp_max_msg_size);
1082                 if (nob <= plp->plp_max_msg_size)
1083                         break;                  /* send IMMEDIATE */
1084
1085                 rc = ptllnd_passive_rdma(plp, PTLLND_MSG_TYPE_PUT, msg,
1086                                          msg->msg_niov, msg->msg_iov,
1087                                          msg->msg_offset, msg->msg_len);
1088                 ptllnd_peer_decref(plp);
1089                 return rc;
1090         }
1091
1092         /* send IMMEDIATE
1093          * NB copy the payload so we don't have to do a fragmented send */
1094
1095         CDEBUG(D_NET, "IMMEDIATE len=%d\n", msg->msg_len);
1096         tx = ptllnd_new_tx(plp, PTLLND_MSG_TYPE_IMMEDIATE, msg->msg_len);
1097         if (tx == NULL) {
1098                 CERROR("Can't allocate tx for lnet type %d to %s\n",
1099                        msg->msg_type, libcfs_id2str(msg->msg_target));
1100                 ptllnd_peer_decref(plp);
1101                 return -ENOMEM;
1102         }
1103
1104         lnet_copy_iov2flat(tx->tx_msgsize, &tx->tx_msg,
1105                            offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload),
1106                            msg->msg_niov, msg->msg_iov, msg->msg_offset,
1107                            msg->msg_len);
1108         tx->tx_msg.ptlm_u.immediate.kptlim_hdr = msg->msg_hdr;
1109
1110         tx->tx_lnetmsg = msg;
1111         PTLLND_HISTORY("%s[%d/%d]: post immediate %s p %d %p",
1112                        libcfs_id2str(msg->msg_target),
1113                        plp->plp_credits, plp->plp_outstanding_credits,
1114                        lnet_msgtyp2str(msg->msg_type),
1115                        (le32_to_cpu(msg->msg_type) == LNET_MSG_PUT) ? 
1116                        le32_to_cpu(msg->msg_hdr.msg.put.ptl_index) :
1117                        (le32_to_cpu(msg->msg_type) == LNET_MSG_GET) ? 
1118                        le32_to_cpu(msg->msg_hdr.msg.get.ptl_index) : -1,
1119                        tx);
1120         ptllnd_post_tx(tx);
1121         ptllnd_peer_decref(plp);
1122         return 0;
1123 }
1124
1125 void
1126 ptllnd_rx_done(ptllnd_rx_t *rx)
1127 {
1128         ptllnd_peer_t *plp = rx->rx_peer;
1129         lnet_ni_t     *ni = plp->plp_ni;
1130         ptllnd_ni_t   *plni = ni->ni_data;
1131
1132         plp->plp_outstanding_credits++;
1133
1134         PTLLND_HISTORY("%s[%d/%d]: rx=%p done\n", libcfs_id2str(plp->plp_id),
1135                        plp->plp_credits, plp->plp_outstanding_credits, rx);
1136
1137         ptllnd_check_sends(rx->rx_peer);
1138
1139         LASSERT (plni->plni_nrxs > 0);
1140         plni->plni_nrxs--;
1141 }
1142
1143 int
1144 ptllnd_eager_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg,
1145                   void **new_privatep)
1146 {
1147         /* Shouldn't get here; recvs only block for router buffers */
1148         LBUG();
1149         return 0;
1150 }
1151
1152 int
1153 ptllnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg,
1154             int delayed, unsigned int niov,
1155             struct iovec *iov, lnet_kiov_t *kiov,
1156             unsigned int offset, unsigned int mlen, unsigned int rlen)
1157 {
1158         ptllnd_rx_t    *rx = private;
1159         int             rc = 0;
1160         int             nob;
1161
1162         LASSERT (kiov == NULL);
1163         LASSERT (niov <= PTL_MD_MAX_IOV);       /* !!! */
1164
1165         switch (rx->rx_msg->ptlm_type) {
1166         default:
1167                 LBUG();
1168
1169         case PTLLND_MSG_TYPE_IMMEDIATE:
1170                 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[mlen]);
1171                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_IMMEDIATE nob=%d\n",nob);
1172                 if (nob > rx->rx_nob) {
1173                         CERROR("Immediate message from %s too big: %d(%d)\n",
1174                                libcfs_id2str(rx->rx_peer->plp_id),
1175                                nob, rx->rx_nob);
1176                         rc = -EPROTO;
1177                         break;
1178                 }
1179                 lnet_copy_flat2iov(niov, iov, offset,
1180                                    rx->rx_nob, rx->rx_msg,
1181                                    offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload),
1182                                    mlen);
1183                 lnet_finalize(ni, msg, 0);
1184                 break;
1185
1186         case PTLLND_MSG_TYPE_PUT:
1187                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_PUT offset=%d mlen=%d\n",offset,mlen);
1188                 rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_READ, msg,
1189                                         rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,
1190                                         niov, iov, offset, mlen);
1191                 break;
1192
1193         case PTLLND_MSG_TYPE_GET:
1194                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_GET\n");
1195                 if (msg != NULL)
1196                         rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_WRITE, msg,
1197                                                 rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,
1198                                                 msg->msg_niov, msg->msg_iov,
1199                                                 msg->msg_offset, msg->msg_len);
1200                 else
1201                         rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_WRITE, NULL,
1202                                                 rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,
1203                                                 0, NULL, 0, 0);
1204                 break;
1205         }
1206
1207         ptllnd_rx_done(rx);
1208         return rc;
1209 }
1210
1211 void
1212 ptllnd_abort_on_nak(lnet_ni_t *ni)
1213 {
1214         ptllnd_ni_t      *plni = ni->ni_data;
1215
1216         if (plni->plni_abort_on_nak)
1217                 abort();
1218 }
1219
1220 void
1221 ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator,
1222                      kptl_msg_t *msg, unsigned int nob)
1223 {
1224         ptllnd_ni_t      *plni = ni->ni_data;
1225         const int         basenob = offsetof(kptl_msg_t, ptlm_u);
1226         lnet_process_id_t srcid;
1227         ptllnd_rx_t       rx;
1228         int               flip;
1229         __u16             msg_version;
1230         __u32             msg_cksum;
1231         ptllnd_peer_t    *plp;
1232         int               rc;
1233
1234         if (nob < 6) {
1235                 CERROR("Very short receive from %s\n",
1236                        ptllnd_ptlid2str(initiator));
1237                 return;
1238         }
1239
1240         /* I can at least read MAGIC/VERSION */
1241
1242         flip = msg->ptlm_magic == __swab32(PTLLND_MSG_MAGIC);
1243         if (!flip && msg->ptlm_magic != PTLLND_MSG_MAGIC) {
1244                 CERROR("Bad protocol magic %08x from %s\n", 
1245                        msg->ptlm_magic, ptllnd_ptlid2str(initiator));
1246                 return;
1247         }
1248
1249         msg_version = flip ? __swab16(msg->ptlm_version) : msg->ptlm_version;
1250
1251         if (msg_version != PTLLND_MSG_VERSION) {
1252                 CERROR("Bad protocol version %04x from %s\n", 
1253                        (__u32)msg_version, ptllnd_ptlid2str(initiator));
1254                 ptllnd_abort_on_nak(ni);
1255                 return;
1256         }
1257
1258         if (nob < basenob) {
1259                 CERROR("Short receive from %s: got %d, wanted at least %d\n",
1260                        ptllnd_ptlid2str(initiator), nob, basenob);
1261                 return;
1262         }
1263
1264         /* checksum must be computed with
1265          * 1) ptlm_cksum zero and
1266          * 2) BEFORE anything gets modified/flipped
1267          */
1268         msg_cksum = flip ? __swab32(msg->ptlm_cksum) : msg->ptlm_cksum;
1269         msg->ptlm_cksum = 0;
1270         if (msg_cksum != 0 &&
1271             msg_cksum != ptllnd_cksum(msg, offsetof(kptl_msg_t, ptlm_u))) {
1272                 CERROR("Bad checksum from %s\n", ptllnd_ptlid2str(initiator));
1273                 return;
1274         }
1275
1276         msg->ptlm_version = msg_version;
1277         msg->ptlm_cksum = msg_cksum;
1278         
1279         if (flip) {
1280                 /* NB stamps are opaque cookies */
1281                 __swab32s(&msg->ptlm_nob);
1282                 __swab64s(&msg->ptlm_srcnid);
1283                 __swab64s(&msg->ptlm_dstnid);
1284                 __swab32s(&msg->ptlm_srcpid);
1285                 __swab32s(&msg->ptlm_dstpid);
1286         }
1287         
1288         srcid.nid = msg->ptlm_srcnid;
1289         srcid.pid = msg->ptlm_srcpid;
1290
1291         if (LNET_NIDNET(msg->ptlm_srcnid) != LNET_NIDNET(ni->ni_nid)) {
1292                 CERROR("Bad source id %s from %s\n",
1293                        libcfs_id2str(srcid),
1294                        ptllnd_ptlid2str(initiator));
1295                 return;
1296         }
1297
1298         if (msg->ptlm_type == PTLLND_MSG_TYPE_NAK) {
1299                 CERROR("NAK from %s (%s)\n", 
1300                        libcfs_id2str(srcid),
1301                        ptllnd_ptlid2str(initiator));
1302                 ptllnd_abort_on_nak(ni);
1303                 return;
1304         }
1305         
1306         if (msg->ptlm_dstnid != ni->ni_nid ||
1307             msg->ptlm_dstpid != the_lnet.ln_pid) {
1308                 CERROR("Bad dstid %s (%s expected) from %s\n",
1309                        libcfs_id2str((lnet_process_id_t) {
1310                                .nid = msg->ptlm_dstnid,
1311                                .pid = msg->ptlm_dstpid}),
1312                        libcfs_id2str((lnet_process_id_t) {
1313                                .nid = ni->ni_nid,
1314                                .pid = the_lnet.ln_pid}),
1315                        libcfs_id2str(srcid));
1316                 return;
1317         }
1318
1319         if (msg->ptlm_dststamp != plni->plni_stamp) {
1320                 CERROR("Bad dststamp "LPX64"("LPX64" expected) from %s\n",
1321                        msg->ptlm_dststamp, plni->plni_stamp,
1322                        libcfs_id2str(srcid));
1323                 return;
1324         }
1325
1326         PTLLND_HISTORY("RX %s: %s %d %p", libcfs_id2str(srcid), 
1327                        ptllnd_msgtype2str(msg->ptlm_type), msg->ptlm_credits, &rx);
1328
1329         switch (msg->ptlm_type) {
1330         case PTLLND_MSG_TYPE_PUT:
1331         case PTLLND_MSG_TYPE_GET:
1332                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_%s\n",
1333                         msg->ptlm_type==PTLLND_MSG_TYPE_PUT ? "PUT" : "GET");
1334                 if (nob < basenob + sizeof(kptl_rdma_msg_t)) {
1335                         CERROR("Short rdma request from %s(%s)\n",
1336                                libcfs_id2str(srcid),
1337                                ptllnd_ptlid2str(initiator));
1338                         return;
1339                 }
1340                 if (flip)
1341                         __swab64s(&msg->ptlm_u.rdma.kptlrm_matchbits);
1342                 break;
1343
1344         case PTLLND_MSG_TYPE_IMMEDIATE:
1345                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_IMMEDIATE\n");
1346                 if (nob < offsetof(kptl_msg_t,
1347                                    ptlm_u.immediate.kptlim_payload)) {
1348                         CERROR("Short immediate from %s(%s)\n",
1349                                libcfs_id2str(srcid),
1350                                ptllnd_ptlid2str(initiator));
1351                         return;
1352                 }
1353                 break;
1354
1355         case PTLLND_MSG_TYPE_HELLO:
1356                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_HELLO from %s(%s)\n",
1357                                libcfs_id2str(srcid),
1358                                ptllnd_ptlid2str(initiator));
1359                 if (nob < basenob + sizeof(kptl_hello_msg_t)) {
1360                         CERROR("Short hello from %s(%s)\n",
1361                                libcfs_id2str(srcid),
1362                                ptllnd_ptlid2str(initiator));
1363                         return;
1364                 }
1365                 if(flip){
1366                         __swab64s(&msg->ptlm_u.hello.kptlhm_matchbits);
1367                         __swab32s(&msg->ptlm_u.hello.kptlhm_max_msg_size);
1368                 }
1369                 break;
1370                 
1371         case PTLLND_MSG_TYPE_NOOP:
1372                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_NOOP from %s(%s)\n",
1373                                libcfs_id2str(srcid),
1374                                ptllnd_ptlid2str(initiator));        
1375                 break;
1376
1377         default:
1378                 CERROR("Bad message type %d from %s(%s)\n", msg->ptlm_type,
1379                        libcfs_id2str(srcid),
1380                        ptllnd_ptlid2str(initiator));
1381                 return;
1382         }
1383
1384         plp = ptllnd_find_peer(ni, srcid,
1385                                msg->ptlm_type == PTLLND_MSG_TYPE_HELLO);
1386         if (plp == NULL) {
1387                 CERROR("Can't find peer %s\n", libcfs_id2str(srcid));
1388                 return;
1389         }
1390
1391         if (msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {
1392                 if (plp->plp_recvd_hello) {
1393                         CERROR("Unexpected HELLO from %s\n",
1394                                libcfs_id2str(srcid));
1395                         ptllnd_peer_decref(plp);
1396                         return;
1397                 }
1398
1399                 CDEBUG(D_NET, "maxsz %d match "LPX64" stamp "LPX64"\n",
1400                        msg->ptlm_u.hello.kptlhm_max_msg_size,
1401                        msg->ptlm_u.hello.kptlhm_matchbits,
1402                        msg->ptlm_srcstamp);
1403
1404                 plp->plp_max_msg_size = MAX(plni->plni_max_msg_size,
1405                         msg->ptlm_u.hello.kptlhm_max_msg_size);
1406                 plp->plp_match = msg->ptlm_u.hello.kptlhm_matchbits;
1407                 plp->plp_stamp = msg->ptlm_srcstamp;
1408                 plp->plp_max_credits += msg->ptlm_credits;
1409                 plp->plp_recvd_hello = 1;
1410
1411                 CDEBUG(D_NET, "plp_max_msg_size=%d\n",plp->plp_max_msg_size);
1412
1413         } else if (!plp->plp_recvd_hello) {
1414
1415                 CERROR("Bad message type %d (HELLO expected) from %s\n",
1416                        msg->ptlm_type, libcfs_id2str(srcid));
1417                 ptllnd_peer_decref(plp);
1418                 return;
1419
1420         } else if (msg->ptlm_srcstamp != plp->plp_stamp) {
1421
1422                 CERROR("Bad srcstamp "LPX64"("LPX64" expected) from %s\n",
1423                        msg->ptlm_srcstamp, plp->plp_stamp,
1424                        libcfs_id2str(srcid));
1425                 ptllnd_peer_decref(plp);
1426                 return;
1427         }
1428
1429         if (msg->ptlm_credits > 0) {
1430                 CDEBUG(D_NET, "Getting back %d credits from peer\n",msg->ptlm_credits);
1431                 if (plp->plp_credits + msg->ptlm_credits >
1432                     plp->plp_max_credits) {
1433                         CWARN("Too many credits from %s: %d + %d > %d\n",
1434                               libcfs_id2str(srcid),
1435                               plp->plp_credits, msg->ptlm_credits,
1436                               plp->plp_max_credits);
1437                         plp->plp_credits = plp->plp_max_credits;
1438                 } else {
1439                         plp->plp_credits += msg->ptlm_credits;
1440                 }
1441                 ptllnd_check_sends(plp);
1442         }
1443
1444         /* All OK so far; assume the message is good... */
1445
1446         rx.rx_peer      = plp;
1447         rx.rx_msg       = msg;
1448         rx.rx_nob       = nob;
1449         plni->plni_nrxs++;
1450
1451         CDEBUG(D_NET, "rx=%p type=%d\n",&rx,msg->ptlm_type);
1452
1453         switch (msg->ptlm_type) {
1454         default: /* message types have been checked already */
1455                 ptllnd_rx_done(&rx);
1456                 break;
1457
1458         case PTLLND_MSG_TYPE_PUT:
1459         case PTLLND_MSG_TYPE_GET:
1460                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_%s\n",
1461                         msg->ptlm_type==PTLLND_MSG_TYPE_PUT ? "PUT" : "GET");
1462                 rc = lnet_parse(ni, &msg->ptlm_u.rdma.kptlrm_hdr,
1463                                 msg->ptlm_srcnid, &rx, 1);
1464                 CDEBUG(D_NET, "lnet_parse rc=%d\n",rc);
1465                 if (rc < 0)
1466                         ptllnd_rx_done(&rx);
1467                 break;
1468
1469         case PTLLND_MSG_TYPE_IMMEDIATE:
1470                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_IMMEDIATE\n");
1471                 rc = lnet_parse(ni, &msg->ptlm_u.immediate.kptlim_hdr,
1472                                 msg->ptlm_srcnid, &rx, 0);
1473                 CDEBUG(D_NET, "lnet_parse rc=%d\n",rc);
1474                 if (rc < 0)
1475                         ptllnd_rx_done(&rx);
1476                 break;
1477         }
1478
1479         ptllnd_peer_decref(plp);
1480 }
1481
1482 void
1483 ptllnd_buf_event (lnet_ni_t *ni, ptl_event_t *event)
1484 {
1485         ptllnd_buffer_t *buf = ptllnd_eventarg2obj(event->md.user_ptr);
1486         ptllnd_ni_t     *plni = ni->ni_data;
1487         char            *msg = &buf->plb_buffer[event->offset];
1488         int              repost;
1489         int              unlinked = event->type == PTL_EVENT_UNLINK;
1490
1491         LASSERT (buf->plb_ni == ni);
1492         LASSERT (event->type == PTL_EVENT_PUT_END ||
1493                  event->type == PTL_EVENT_UNLINK);
1494
1495         CDEBUG(D_NET, "buf=%p event=%d\n",buf,event->type);
1496
1497         if (event->ni_fail_type != PTL_NI_OK) {
1498
1499                 CERROR("event type %d, status %d from %s\n",
1500                        event->type, event->ni_fail_type,
1501                        ptllnd_ptlid2str(event->initiator));
1502
1503         } else if (event->type == PTL_EVENT_PUT_END) {
1504 #if (PTL_MD_LOCAL_ALIGN8 == 0)
1505                 /* Portals can't force message alignment - someone sending an
1506                  * odd-length message could misalign subsequent messages */
1507                 if ((event->mlength & 7) != 0) {
1508                         CERROR("Message from %s has odd length %llu: "
1509                                "probable version incompatibility\n",
1510                                ptllnd_ptlid2str(event->initiator),
1511                                event->mlength);
1512                         LBUG();
1513                 }
1514 #endif
1515                 LASSERT ((event->offset & 7) == 0);
1516
1517                 ptllnd_parse_request(ni, event->initiator,
1518                                      (kptl_msg_t *)msg, event->mlength);
1519         }
1520
1521 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
1522         /* UNLINK event only on explicit unlink */
1523         repost = (event->unlinked && event->type != PTL_EVENT_UNLINK);
1524         if (event->unlinked)
1525                 unlinked = 1;
1526 #else
1527         /* UNLINK event only on implicit unlink */
1528         repost = (event->type == PTL_EVENT_UNLINK);
1529 #endif
1530
1531         CDEBUG(D_NET, "repost=%d unlinked=%d\n",repost,unlinked);
1532
1533         if (unlinked) {
1534                 LASSERT(buf->plb_posted);
1535                 buf->plb_posted = 0;
1536                 plni->plni_nposted_buffers--;
1537         }
1538
1539         if (repost)
1540                 (void) ptllnd_post_buffer(buf);
1541 }
1542
1543 void
1544 ptllnd_tx_event (lnet_ni_t *ni, ptl_event_t *event)
1545 {
1546         ptllnd_ni_t *plni = ni->ni_data;
1547         ptllnd_tx_t *tx = ptllnd_eventarg2obj(event->md.user_ptr);
1548         int          error = (event->ni_fail_type != PTL_NI_OK);
1549         int          isreq;
1550         int          isbulk;
1551 #ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
1552         int          unlinked = event->unlinked;
1553 #else
1554         int          unlinked = (event->type == PTL_EVENT_UNLINK);
1555 #endif
1556
1557         if (error)
1558                 CERROR("Error event type %d for %s for %s\n",
1559                        event->type, ptllnd_msgtype2str(tx->tx_type),
1560                        libcfs_id2str(tx->tx_peer->plp_id));
1561
1562         LASSERT (!PtlHandleIsEqual(event->md_handle, PTL_INVALID_HANDLE));
1563
1564         CDEBUG(D_NET, "tx=%p type=%s (%d)\n",tx,
1565                 ptllnd_msgtype2str(tx->tx_type),tx->tx_type);
1566         CDEBUG(D_NET, "unlinked=%d\n",unlinked);
1567         CDEBUG(D_NET, "error=%d\n",error);
1568
1569         isreq = PtlHandleIsEqual(event->md_handle, tx->tx_reqmdh);
1570         CDEBUG(D_NET, "isreq=%d\n",isreq);
1571         if (isreq) {
1572                 LASSERT (event->md.start == (void *)&tx->tx_msg);
1573                 if (unlinked) {
1574                         tx->tx_reqmdh = PTL_INVALID_HANDLE;
1575                         PTLLND_DBGT_STAMP(tx->tx_req_done);
1576                 }
1577         }
1578
1579         isbulk = PtlHandleIsEqual(event->md_handle, tx->tx_bulkmdh);
1580         CDEBUG(D_NET, "isbulk=%d\n",isbulk);
1581         if ( isbulk && unlinked ) {
1582                 tx->tx_bulkmdh = PTL_INVALID_HANDLE;
1583                 PTLLND_DBGT_STAMP(tx->tx_bulk_done);
1584         }
1585
1586         LASSERT (!isreq != !isbulk);            /* always one and only 1 match */
1587
1588         PTLLND_HISTORY("%s[%d/%d]: TX done %p %s%s",
1589                        libcfs_id2str(tx->tx_peer->plp_id), 
1590                        tx->tx_peer->plp_credits,
1591                        tx->tx_peer->plp_outstanding_credits,
1592                        tx, isreq ? "REQ" : "BULK", unlinked ? "(unlinked)" : "");
1593
1594         LASSERT (!isreq != !isbulk);            /* always one and only 1 match */
1595         switch (tx->tx_type) {
1596         default:
1597                 LBUG();
1598
1599         case PTLLND_MSG_TYPE_NOOP:
1600         case PTLLND_MSG_TYPE_HELLO:
1601         case PTLLND_MSG_TYPE_IMMEDIATE:
1602                 LASSERT (event->type == PTL_EVENT_UNLINK ||
1603                          event->type == PTL_EVENT_SEND_END);
1604                 LASSERT (isreq);
1605                 break;
1606
1607         case PTLLND_MSG_TYPE_GET:
1608                 LASSERT (event->type == PTL_EVENT_UNLINK ||
1609                          (isreq && event->type == PTL_EVENT_SEND_END) ||
1610                          (isbulk && event->type == PTL_EVENT_PUT_END));
1611
1612                 if (isbulk && !error && event->type == PTL_EVENT_PUT_END) {
1613                         /* Check GET matched */
1614                         if (event->hdr_data == PTLLND_RDMA_OK) {
1615                                 lnet_set_reply_msg_len(ni, 
1616                                                        tx->tx_lnetreplymsg,
1617                                                        event->mlength);
1618                         } else {
1619                                 CERROR ("Unmatched GET with %s\n",
1620                                         libcfs_id2str(tx->tx_peer->plp_id));
1621                                 tx->tx_status = -EIO;
1622                         }
1623                 }
1624                 break;
1625
1626         case PTLLND_MSG_TYPE_PUT:
1627                 LASSERT (event->type == PTL_EVENT_UNLINK ||
1628                          (isreq && event->type == PTL_EVENT_SEND_END) ||
1629                          (isbulk && event->type == PTL_EVENT_GET_END));
1630                 break;
1631
1632         case PTLLND_RDMA_READ:
1633                 LASSERT (event->type == PTL_EVENT_UNLINK ||
1634                          event->type == PTL_EVENT_SEND_END ||
1635                          event->type == PTL_EVENT_REPLY_END);
1636                 LASSERT (isbulk);
1637                 break;
1638
1639         case PTLLND_RDMA_WRITE:
1640                 LASSERT (event->type == PTL_EVENT_UNLINK ||
1641                          event->type == PTL_EVENT_SEND_END);
1642                 LASSERT (isbulk);
1643         }
1644
1645         /* Schedule ptllnd_tx_done() on error or last completion event */
1646         if (error ||
1647             (PtlHandleIsEqual(tx->tx_bulkmdh, PTL_INVALID_HANDLE) &&
1648              PtlHandleIsEqual(tx->tx_reqmdh, PTL_INVALID_HANDLE))) {
1649                 if (error)
1650                         tx->tx_status = -EIO;
1651                 list_del(&tx->tx_list);
1652                 list_add_tail(&tx->tx_list, &plni->plni_zombie_txs);
1653                 CDEBUG(D_NET, "tx=%p ONTO ZOMBIE LIST\n",tx);
1654         }
1655 }
1656
1657 void
1658 ptllnd_wait (lnet_ni_t *ni, int milliseconds)
1659 {
1660         static struct timeval  prevt;
1661         static int             prevt_count;
1662         static int             call_count;
1663
1664         struct timeval         t1;
1665         struct timeval         t2;
1666         
1667         ptllnd_ni_t   *plni = ni->ni_data;
1668         ptllnd_tx_t   *tx;
1669         ptl_event_t    event;
1670         int            which;
1671         int            rc;
1672         int            blocked = 0;
1673         int            found = 0;
1674         int            timeout = 0;
1675
1676         /* Handle any currently queued events, returning immediately if any.
1677          * Otherwise block for the timeout and handle all events queued
1678          * then. */
1679
1680         gettimeofday(&t1, NULL);
1681         call_count++;
1682
1683         for (;;) {
1684                 time_t  then = cfs_time_current_sec();
1685
1686                 CDEBUG(D_NET, "Poll(%d)\n", timeout);
1687                 
1688                 rc = PtlEQPoll(&plni->plni_eqh, 1,
1689                                (timeout < 0) ? PTL_TIME_FOREVER : timeout,
1690                                &event, &which);
1691
1692                 if (timeout >= 0 &&
1693                     (cfs_time_current_sec() - then)*1000 > timeout + 1000) {
1694                         /* 1000 mS grace.............................^ */
1695                         CERROR("SLOW PtlEQPoll(%d): %d seconds\n", timeout,
1696                                (int)(cfs_time_current_sec() - then));
1697                 }
1698                 
1699                 CDEBUG(D_NET, "PtlEQPoll rc=%d\n",rc);
1700                 timeout = 0;
1701
1702                 if (rc == PTL_EQ_EMPTY) {
1703                         if (found ||            /* handled some events */
1704                             milliseconds == 0 || /* just checking */
1705                             blocked)            /* blocked already */
1706                                 break;
1707
1708                         blocked = 1;
1709                         timeout = (milliseconds < 0) ?
1710                                   PTL_TIME_FOREVER : milliseconds;
1711                         continue;
1712                 }
1713
1714                 LASSERT (rc == PTL_OK || rc == PTL_EQ_DROPPED);
1715
1716                 if (rc == PTL_EQ_DROPPED)
1717                         CERROR("Event queue: size %d is too small\n",
1718                                plni->plni_eq_size);
1719
1720                 CDEBUG(D_NET, "event.type=%s(%d)\n",
1721                        ptllnd_evtype2str(event.type),event.type);
1722
1723                 found = 1;
1724                 switch (ptllnd_eventarg2type(event.md.user_ptr)) {
1725                 default:
1726                         LBUG();
1727
1728                 case PTLLND_EVENTARG_TYPE_TX:
1729                         ptllnd_tx_event(ni, &event);
1730                         break;
1731
1732                 case PTLLND_EVENTARG_TYPE_BUF:
1733                         ptllnd_buf_event(ni, &event);
1734                         break;
1735                 }
1736         }
1737
1738         while (!list_empty(&plni->plni_zombie_txs)) {
1739                 tx = list_entry(plni->plni_zombie_txs.next,
1740                                 ptllnd_tx_t, tx_list);
1741                 CDEBUG(D_NET, "Process ZOMBIE tx=%p\n",tx);
1742                 ptllnd_tx_done(tx);
1743         }
1744
1745         gettimeofday(&t2, NULL);
1746                 
1747         if (prevt.tv_sec == 0 ||
1748             prevt.tv_sec != t2.tv_sec) {
1749                 PTLLND_HISTORY("%d wait entered at %d.%06d - prev %d %d.%06d", 
1750                                call_count, (int)t1.tv_sec, (int)t1.tv_usec,
1751                                prevt_count, (int)prevt.tv_sec, (int)prevt.tv_usec);
1752                 prevt = t2;
1753         }
1754 }