Whamcloud - gitweb
0f9e7e046bc756a2972d96a8868624470704f677
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd_peer.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved.
5  *   Author: PJ Kirner <pjkirner@clusterfs.com>
6  *           E Barton <eeb@bartonsoftware.com>
7  *
8  *   This file is part of the Lustre file system, http://www.lustre.org
9  *   Lustre is a trademark of Cluster File Systems, Inc.
10  *
11  *   This file is confidential source code owned by Cluster File Systems.
12  *   No viewing, modification, compilation, redistribution, or any other
13  *   form of use is permitted except through a signed license agreement.
14  *
15  *   If you have not signed such an agreement, then you have no rights to
16  *   this file.  Please destroy it immediately and contact CFS.
17  *
18  */
19
20 #include "ptllnd.h"
21 #include <libcfs/list.h>
22
23 static int
24 kptllnd_count_queue(struct list_head *q)
25 {
26         struct list_head *e;
27         int               n = 0;
28         
29         list_for_each(e, q) {
30                 n++;
31         }
32
33         return n;
34 }
35
36 int
37 kptllnd_get_peer_info(int index, 
38                       lnet_process_id_t *id,
39                       int *state, int *sent_hello,
40                       int *refcount, __u64 *incarnation,
41                       __u64 *next_matchbits, __u64 *last_matchbits_seen,
42                       int *nsendq, int *nactiveq,
43                       int *credits, int *outstanding_credits) 
44 {
45         rwlock_t         *g_lock = &kptllnd_data.kptl_peer_rw_lock;
46         unsigned long     flags;
47         struct list_head *ptmp;
48         kptl_peer_t      *peer;
49         int               i;
50         int               rc = -ENOENT;
51
52         read_lock_irqsave(g_lock, flags);
53
54         for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++) {
55                 
56                 list_for_each (ptmp, &kptllnd_data.kptl_peers[i]) {
57                         peer = list_entry(ptmp, kptl_peer_t, peer_list);
58
59                         if (index-- > 0)
60                                 continue;
61                         
62                         *id          = peer->peer_id;
63                         *state       = peer->peer_state;
64                         *sent_hello  = peer->peer_sent_hello;
65                         *refcount    = atomic_read(&peer->peer_refcount);
66                         *incarnation = peer->peer_incarnation;
67
68                         spin_lock(&peer->peer_lock);
69
70                         *next_matchbits      = peer->peer_next_matchbits;
71                         *last_matchbits_seen = peer->peer_last_matchbits_seen;
72                         *credits             = peer->peer_credits;
73                         *outstanding_credits = peer->peer_outstanding_credits;
74
75                         *nsendq   = kptllnd_count_queue(&peer->peer_sendq);
76                         *nactiveq = kptllnd_count_queue(&peer->peer_activeq);
77
78                         spin_unlock(&peer->peer_lock);
79
80                         rc = 0;
81                         goto out;
82                 }
83         }
84         
85  out:
86         read_unlock_irqrestore(g_lock, flags);
87         return rc;
88 }
89
90 void
91 kptllnd_peer_add_peertable_locked (kptl_peer_t *peer)
92 {
93         LASSERT (kptllnd_data.kptl_n_active_peers <
94                  kptllnd_data.kptl_expected_peers);
95
96         LASSERT (peer->peer_state == PEER_STATE_WAITING_HELLO ||
97                  peer->peer_state == PEER_STATE_ACTIVE);
98         
99         kptllnd_data.kptl_n_active_peers++;
100         atomic_inc(&peer->peer_refcount);       /* +1 ref for the list */
101
102         /* NB add to HEAD of peer list for MRU order!
103          * (see kptllnd_cull_peertable) */
104         list_add(&peer->peer_list, kptllnd_nid2peerlist(peer->peer_id.nid));
105 }
106
107 void
108 kptllnd_cull_peertable_locked (lnet_process_id_t pid)
109 {
110         /* I'm about to add a new peer with this portals ID to the peer table,
111          * so (a) this peer should not exist already and (b) I want to leave at
112          * most (max_procs_per_nid - 1) peers with this NID in the table. */
113         struct list_head  *peers = kptllnd_nid2peerlist(pid.nid);
114         int                cull_count = *kptllnd_tunables.kptl_max_procs_per_node;
115         int                count;
116         struct list_head  *tmp;
117         struct list_head  *nxt;
118         kptl_peer_t       *peer;
119         
120         count = 0;
121         list_for_each_safe (tmp, nxt, peers) {
122                 /* NB I rely on kptllnd_peer_add_peertable_locked to add peers
123                  * in MRU order */
124                 peer = list_entry(tmp, kptl_peer_t, peer_list);
125                         
126                 if (peer->peer_id.nid != pid.nid)
127                         continue;
128
129                 LASSERT (peer->peer_id.pid != pid.pid);
130                         
131                 count++;
132
133                 if (count < cull_count) /* recent (don't cull) */
134                         continue;
135
136                 CDEBUG(D_NET, "Cull %s(%s)\n",
137                        libcfs_id2str(peer->peer_id),
138                        kptllnd_ptlid2str(peer->peer_ptlid));
139                 
140                 kptllnd_peer_close_locked(peer, 0);
141         }
142 }
143
144 kptl_peer_t *
145 kptllnd_peer_allocate (lnet_process_id_t lpid, ptl_process_id_t ppid)
146 {
147         unsigned long    flags;
148         kptl_peer_t     *peer;
149
150         LIBCFS_ALLOC(peer, sizeof (*peer));
151         if (peer == NULL) {
152                 CERROR("Can't create peer %s (%s)\n",
153                        libcfs_id2str(lpid), 
154                        kptllnd_ptlid2str(ppid));
155                 return NULL;
156         }
157
158         memset(peer, 0, sizeof(*peer));         /* zero flags etc */
159
160         INIT_LIST_HEAD (&peer->peer_sendq);
161         INIT_LIST_HEAD (&peer->peer_activeq);
162         spin_lock_init (&peer->peer_lock);
163
164         peer->peer_state = PEER_STATE_ALLOCATED;
165         peer->peer_error = 0;
166         peer->peer_last_alive = cfs_time_current();
167         peer->peer_id = lpid;
168         peer->peer_ptlid = ppid;
169         peer->peer_credits = 1;                 /* enough for HELLO */
170         peer->peer_next_matchbits = PTL_RESERVED_MATCHBITS;
171         peer->peer_outstanding_credits = *kptllnd_tunables.kptl_peercredits - 1;
172         peer->peer_active_rxs = 0;
173
174         atomic_set(&peer->peer_refcount, 1);    /* 1 ref for caller */
175
176         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
177
178         peer->peer_myincarnation = kptllnd_data.kptl_incarnation;
179
180         /* Only increase # peers under lock, to guarantee we dont grow it
181          * during shutdown */
182         if (kptllnd_data.kptl_shutdown) {
183                 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, 
184                                         flags);
185                 LIBCFS_FREE(peer, sizeof(*peer));
186                 return NULL;
187         }
188
189         kptllnd_data.kptl_npeers++;
190         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
191         
192         return peer;
193 }
194
195 void
196 kptllnd_peer_destroy (kptl_peer_t *peer)
197 {
198         unsigned long flags;
199         
200         CDEBUG(D_NET, "Peer=%p\n", peer);
201
202         LASSERT (!in_interrupt());
203         LASSERT (atomic_read(&peer->peer_refcount) == 0);
204         LASSERT (peer->peer_active_rxs == 0);
205         LASSERT (peer->peer_state == PEER_STATE_ALLOCATED ||
206                  peer->peer_state == PEER_STATE_ZOMBIE);
207         LASSERT (list_empty(&peer->peer_sendq));
208         LASSERT (list_empty(&peer->peer_activeq));
209
210         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
211
212         if (peer->peer_state == PEER_STATE_ZOMBIE)
213                 list_del(&peer->peer_list);
214
215         kptllnd_data.kptl_npeers--;
216
217         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
218
219         LIBCFS_FREE (peer, sizeof (*peer));
220 }
221
222 void
223 kptllnd_peer_cancel_txs(kptl_peer_t *peer)
224 {
225         struct list_head   sendq;
226         struct list_head   activeq;
227         struct list_head  *tmp;
228         struct list_head  *nxt;
229         kptl_tx_t         *tx;
230         unsigned long      flags;
231
232         /* atomically grab all the peer's tx-es... */
233
234         spin_lock_irqsave(&peer->peer_lock, flags);
235
236         list_add(&sendq, &peer->peer_sendq);
237         list_del_init(&peer->peer_sendq);
238         list_for_each (tmp, &sendq) {
239                 tx = list_entry(tmp, kptl_tx_t, tx_list);
240                 tx->tx_active = 0;
241         }
242
243         list_add(&activeq, &peer->peer_activeq);
244         list_del_init(&peer->peer_activeq);
245         list_for_each (tmp, &activeq) {
246                 tx = list_entry(tmp, kptl_tx_t, tx_list);
247                 tx->tx_active = 0;
248         }
249
250         spin_unlock_irqrestore(&peer->peer_lock, flags);
251
252         /* ...then drop the peer's ref on them at leasure.  This will get
253          * kptllnd_tx_fini() to abort outstanding comms if necessary. */
254
255         list_for_each_safe (tmp, nxt, &sendq) {
256                 tx = list_entry(tmp, kptl_tx_t, tx_list);
257                 list_del(&tx->tx_list);
258                 tx->tx_status = -EIO;
259                 kptllnd_tx_decref(tx);
260         }
261
262         list_for_each_safe (tmp, nxt, &activeq) {
263                 tx = list_entry(tmp, kptl_tx_t, tx_list);
264                 list_del(&tx->tx_list);
265                 tx->tx_status = -EIO;
266                 kptllnd_tx_decref(tx);
267         }
268 }
269
270 void
271 kptllnd_peer_alive (kptl_peer_t *peer)
272 {
273         /* This is racy, but everyone's only writing cfs_time_current() */
274         peer->peer_last_alive = cfs_time_current();
275         mb();
276 }
277
278 void
279 kptllnd_peer_notify (kptl_peer_t *peer)
280 {
281         unsigned long flags;
282         time_t        last_alive = 0;
283         int           error = 0;
284         
285         spin_lock_irqsave(&peer->peer_lock, flags);
286
287         if (peer->peer_error != 0) {
288                 error = peer->peer_error;
289                 peer->peer_error = 0;
290                 
291                 last_alive = cfs_time_current_sec() - 
292                              cfs_duration_sec(cfs_time_current() - 
293                                               peer->peer_last_alive);
294         }
295         
296         spin_unlock_irqrestore(&peer->peer_lock, flags);
297
298         if (error != 0)
299                 lnet_notify (kptllnd_data.kptl_ni, peer->peer_id.nid, 0,
300                              last_alive);
301 }
302
303 void
304 kptllnd_handle_closing_peers ()
305 {
306         unsigned long           flags;
307         kptl_peer_t            *peer;
308         struct list_head       *tmp;
309         struct list_head       *nxt;
310         int                     idle;
311
312         /* Check with a read lock first to avoid blocking anyone */
313
314         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
315         idle = list_empty(&kptllnd_data.kptl_closing_peers);
316         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
317
318         if (idle)
319                 return;
320
321         /* Scan the closing peers and cancel their txs.
322          * NB only safe while there is only a single watchdog */
323
324         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
325
326         list_for_each_safe (tmp, nxt, &kptllnd_data.kptl_closing_peers) {
327                 peer = list_entry (tmp, kptl_peer_t, peer_list);
328
329                 LASSERT (peer->peer_state == PEER_STATE_CLOSING);
330
331                 list_del(&peer->peer_list);
332                 list_add_tail(&peer->peer_list,
333                               &kptllnd_data.kptl_zombie_peers);
334                 peer->peer_state = PEER_STATE_ZOMBIE;
335
336                 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
337
338                 kptllnd_peer_notify(peer);
339                 kptllnd_peer_cancel_txs(peer);
340                 kptllnd_peer_decref(peer);
341
342                 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
343         }
344
345         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
346 }
347
348 void
349 kptllnd_peer_close_locked(kptl_peer_t *peer, int why)
350 {
351         switch (peer->peer_state) {
352         default:
353                 LBUG();
354
355         case PEER_STATE_WAITING_HELLO:
356         case PEER_STATE_ACTIVE:
357                 /* Ensure new peers see a new incarnation of me */
358                 LASSERT(peer->peer_myincarnation <= kptllnd_data.kptl_incarnation);
359                 if (peer->peer_myincarnation == kptllnd_data.kptl_incarnation)
360                         kptllnd_data.kptl_incarnation++;
361
362                 /* Removing from peer table */
363                 kptllnd_data.kptl_n_active_peers--;
364                 LASSERT (kptllnd_data.kptl_n_active_peers >= 0);
365
366                 list_del(&peer->peer_list);
367                 kptllnd_peer_unreserve_buffers();
368
369                 peer->peer_error = why; /* stash 'why' only on first close */
370
371                 /* Schedule for immediate attention, taking peer table's ref */
372                 list_add_tail(&peer->peer_list, 
373                               &kptllnd_data.kptl_closing_peers);
374                 wake_up(&kptllnd_data.kptl_watchdog_waitq);
375                 break;
376
377         case PEER_STATE_ZOMBIE:
378                 /* Schedule for attention at next timeout */
379                 kptllnd_peer_addref(peer);
380                 list_del(&peer->peer_list);
381                 list_add_tail(&peer->peer_list, 
382                               &kptllnd_data.kptl_closing_peers);
383                 break;
384                 
385         case PEER_STATE_CLOSING:
386                 break;
387         }
388
389         peer->peer_state = PEER_STATE_CLOSING;
390 }
391
392 void
393 kptllnd_peer_close(kptl_peer_t *peer, int why)
394 {
395         unsigned long      flags;
396
397         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
398         kptllnd_peer_close_locked(peer, why);
399         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
400 }
401
402 int
403 kptllnd_peer_del(lnet_process_id_t id)
404 {
405         struct list_head  *ptmp;
406         struct list_head  *pnxt;
407         kptl_peer_t       *peer;
408         int                lo;
409         int                hi;
410         int                i;
411         unsigned long      flags;
412         int                rc = -ENOENT;
413
414         /*
415          * Find the single bucket we are supposed to look at or if nid is a
416          * wildcard (LNET_NID_ANY) then look at all of the buckets
417          */
418         if (id.nid != LNET_NID_ANY) {
419                 struct list_head *l = kptllnd_nid2peerlist(id.nid);
420                 
421                 lo = hi =  l - kptllnd_data.kptl_peers;
422         } else {
423                 if (id.pid != LNET_PID_ANY)
424                         return -EINVAL;
425                 
426                 lo = 0;
427                 hi = kptllnd_data.kptl_peer_hash_size - 1;
428         }
429
430 again:
431         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
432
433         for (i = lo; i <= hi; i++) {
434                 list_for_each_safe (ptmp, pnxt, &kptllnd_data.kptl_peers[i]) {
435                         peer = list_entry (ptmp, kptl_peer_t, peer_list);
436
437                         if (!(id.nid == LNET_NID_ANY || 
438                               (peer->peer_id.nid == id.nid &&
439                                (id.pid == LNET_PID_ANY || 
440                                 peer->peer_id.pid == id.pid))))
441                                 continue;
442
443                         kptllnd_peer_addref(peer); /* 1 ref for me... */
444
445                         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
446                                                flags);
447
448                         kptllnd_peer_close(peer, 0);
449                         kptllnd_peer_decref(peer); /* ...until here */
450
451                         rc = 0;         /* matched something */
452
453                         /* start again now I've dropped the lock */
454                         goto again;
455                 }
456         }
457
458         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
459
460         return (rc);
461 }
462
463 void
464 kptllnd_post_tx(kptl_peer_t *peer, kptl_tx_t *tx)
465 {
466         /* CAVEAT EMPTOR: I take over caller's ref on 'tx' */
467         ptl_handle_md_t  rdma_mdh = PTL_INVALID_HANDLE;
468         ptl_handle_md_t  msg_mdh = PTL_INVALID_HANDLE;
469         ptl_handle_me_t  meh;
470         ptl_md_t         md;
471         ptl_err_t        prc;
472         unsigned long    flags;
473
474         LASSERT (!tx->tx_idle);
475         LASSERT (!tx->tx_active);
476         LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
477         LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
478         LASSERT (tx->tx_type == TX_TYPE_SMALL_MESSAGE ||
479                  tx->tx_type == TX_TYPE_PUT_REQUEST ||
480                  tx->tx_type == TX_TYPE_GET_REQUEST);
481
482         kptllnd_set_tx_peer(tx, peer);
483
484         if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
485             tx->tx_type == TX_TYPE_GET_REQUEST) {
486
487                 spin_lock_irqsave(&peer->peer_lock, flags);
488
489                 /* Assume 64-bit matchbits can't wrap */
490                 LASSERT (peer->peer_next_matchbits >= PTL_RESERVED_MATCHBITS);
491                 tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits =
492                         peer->peer_next_matchbits++;
493                         
494                 spin_unlock_irqrestore(&peer->peer_lock, flags);
495
496                 prc = PtlMEAttach(kptllnd_data.kptl_nih,
497                                   *kptllnd_tunables.kptl_portal,
498                                   peer->peer_ptlid,
499                                   tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits,
500                                   0,             /* ignore bits */
501                                   PTL_UNLINK,
502                                   PTL_INS_BEFORE,
503                                   &meh);
504                 if (prc != PTL_OK) {
505                         CERROR("PtlMEAttach(%s) failed: %d\n",
506                                libcfs_id2str(peer->peer_id), prc);
507                         goto failed;
508                 }
509
510                 prc = PtlMDAttach(meh, tx->tx_rdma_md, PTL_UNLINK, &rdma_mdh);
511                 if (prc != PTL_OK) {
512                         CERROR("PtlMDAttach(%s) failed: %d\n",
513                                libcfs_id2str(tx->tx_peer->peer_id), prc);
514                         prc = PtlMEUnlink(meh);
515                         LASSERT(prc == PTL_OK);
516                         rdma_mdh = PTL_INVALID_HANDLE;
517                         goto failed;
518                 }
519
520                 /* I'm not racing with the event callback here.  It's a bug if
521                  * there's an event on the MD I just attached before I actually
522                  * send the RDMA request message which the event callback
523                  * catches by asserting 'rdma_mdh' is valid. */
524         }
525
526         memset(&md, 0, sizeof(md));
527         
528         md.start = tx->tx_msg;
529         md.length = tx->tx_msg->ptlm_nob;
530         md.threshold = 1;
531         md.options = PTL_MD_OP_PUT |
532                      PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
533                      PTL_MD_EVENT_START_DISABLE;
534         md.user_ptr = &tx->tx_msg_eventarg;
535         md.eq_handle = kptllnd_data.kptl_eqh;
536
537         prc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &msg_mdh);
538         if (prc != PTL_OK) {
539                 msg_mdh = PTL_INVALID_HANDLE;
540                 goto failed;
541         }
542         
543         spin_lock_irqsave(&peer->peer_lock, flags);
544
545         tx->tx_deadline = jiffies + (*kptllnd_tunables.kptl_timeout * HZ);
546         tx->tx_active = 1;
547         tx->tx_rdma_mdh = rdma_mdh;
548         tx->tx_msg_mdh = msg_mdh;
549
550         /* Ensure HELLO is sent first */
551         if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_HELLO)
552                 list_add(&tx->tx_list, &peer->peer_sendq);
553         else
554                 list_add_tail(&tx->tx_list, &peer->peer_sendq);
555
556         spin_unlock_irqrestore(&peer->peer_lock, flags);
557         return;
558         
559  failed:
560         spin_lock_irqsave(&peer->peer_lock, flags);
561
562         tx->tx_status = -EIO;
563         tx->tx_rdma_mdh = rdma_mdh;
564         tx->tx_msg_mdh = msg_mdh;
565
566         spin_unlock_irqrestore(&peer->peer_lock, flags);
567
568         kptllnd_tx_decref(tx);
569 }
570
571 void
572 kptllnd_peer_check_sends (kptl_peer_t *peer)
573 {
574
575         kptl_tx_t       *tx;
576         int              rc;
577         unsigned long    flags;
578
579         LASSERT(!in_interrupt());
580
581         spin_lock_irqsave(&peer->peer_lock, flags);
582
583         if (list_empty(&peer->peer_sendq) &&
584             peer->peer_outstanding_credits >= PTLLND_CREDIT_HIGHWATER &&
585             peer->peer_credits != 0) {
586
587                 /* post a NOOP to return credits */
588                 spin_unlock_irqrestore(&peer->peer_lock, flags);
589
590                 tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
591                 if (tx == NULL) {
592                         CERROR("Can't return credits to %s: can't allocate descriptor\n",
593                                libcfs_id2str(peer->peer_id));
594                 } else {
595                         kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_NOOP, 0);
596                         kptllnd_post_tx(peer, tx);
597                 }
598
599                 spin_lock_irqsave(&peer->peer_lock, flags);
600         }
601
602         while (!list_empty(&peer->peer_sendq)) {
603                 tx = list_entry (peer->peer_sendq.next, kptl_tx_t, tx_list);
604
605                 LASSERT (tx->tx_active);
606                 LASSERT (!PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
607                 LASSERT (tx->tx_type == TX_TYPE_SMALL_MESSAGE ||
608                          !PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
609
610                 LASSERT (peer->peer_outstanding_credits >= 0);
611                 LASSERT (peer->peer_outstanding_credits <= 
612                          *kptllnd_tunables.kptl_peercredits);
613                 LASSERT (peer->peer_credits >= 0);
614                 LASSERT (peer->peer_credits <= 
615                          *kptllnd_tunables.kptl_peercredits);
616
617                 /* Ensure HELLO is sent first */
618                 if (!peer->peer_sent_hello) {
619                         if (tx->tx_msg->ptlm_type != PTLLND_MSG_TYPE_HELLO)
620                                 break;
621                         peer->peer_sent_hello = 1;
622                 }
623
624                 if (peer->peer_credits == 0) {
625                         CDEBUG(D_NETTRACE, "%s[%d/%d]: no credits for %p\n",
626                                libcfs_id2str(peer->peer_id),
627                                peer->peer_credits, peer->peer_outstanding_credits, tx);
628                         break;
629                 }
630
631                 /* Don't use the last credit unless I've got credits to
632                  * return */
633                 if (peer->peer_credits == 1 &&
634                     peer->peer_outstanding_credits == 0) {
635                         CDEBUG(D_NETTRACE, "%s[%d/%d]: not using last credit for %p\n",
636                                libcfs_id2str(peer->peer_id),
637                                peer->peer_credits, peer->peer_outstanding_credits, tx);
638                         break;
639                 }
640
641                 list_del(&tx->tx_list);
642
643                 /* Discard any NOOP I queued if I'm not at the high-water mark
644                  * any more or more messages have been queued */
645                 if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_NOOP &&
646                     (!list_empty(&peer->peer_sendq) ||
647                      peer->peer_outstanding_credits < PTLLND_CREDIT_HIGHWATER)) {
648
649                         tx->tx_active = 0;
650
651                         spin_unlock_irqrestore(&peer->peer_lock, flags);
652
653                         CDEBUG(D_NET, "%s: redundant noop\n", 
654                                libcfs_id2str(peer->peer_id));
655                         kptllnd_tx_decref(tx);
656
657                         spin_lock_irqsave(&peer->peer_lock, flags);
658                         continue;
659                 }
660
661                 /* fill last-minute msg header fields */
662                 kptllnd_msg_pack(tx->tx_msg, peer);
663
664                 peer->peer_outstanding_credits = 0;
665                 peer->peer_credits--;
666
667                 CDEBUG(D_NETTRACE, "%s[%d/%d]: %s tx=%p nob=%d cred=%d\n",
668                        libcfs_id2str(peer->peer_id),
669                        peer->peer_credits, peer->peer_outstanding_credits,
670                        kptllnd_msgtype2str(tx->tx_msg->ptlm_type),
671                        tx, tx->tx_msg->ptlm_nob,
672                        tx->tx_msg->ptlm_credits);
673
674                 list_add_tail(&tx->tx_list, &peer->peer_activeq);
675
676                 kptllnd_tx_addref(tx);          /* 1 ref for me... */
677
678                 spin_unlock_irqrestore(&peer->peer_lock, flags);
679
680                 rc = PtlPut (tx->tx_msg_mdh,
681                              PTL_NOACK_REQ,
682                              peer->peer_ptlid,
683                              *kptllnd_tunables.kptl_portal,
684                              0,                 /* acl cookie */
685                              LNET_MSG_MATCHBITS,
686                              0,                 /* offset */
687                              0);                /* header data */
688                 if (rc != PTL_OK) {
689                         CERROR("PtlPut %s error %d\n",
690                                libcfs_id2str(peer->peer_id), rc);
691
692                         /* Nuke everything (including this tx) */
693                         kptllnd_peer_close(peer, -EIO);
694                         return;
695                 }
696
697                 kptllnd_tx_decref(tx);          /* drop my ref */
698
699                 spin_lock_irqsave(&peer->peer_lock, flags);
700         }
701
702         spin_unlock_irqrestore(&peer->peer_lock, flags);
703 }
704
705 kptl_tx_t *
706 kptllnd_find_timed_out_tx(kptl_peer_t *peer)
707 {
708         kptl_tx_t         *tx;
709         struct list_head  *tmp;
710         unsigned long      flags;
711
712         spin_lock_irqsave(&peer->peer_lock, flags);
713
714         list_for_each(tmp, &peer->peer_sendq) {
715                 tx = list_entry(peer->peer_sendq.next, kptl_tx_t, tx_list);
716
717                 if (time_after_eq(jiffies, tx->tx_deadline)) {
718                         kptllnd_tx_addref(tx);
719                         spin_unlock_irqrestore(&peer->peer_lock, flags);
720                         return tx;
721                 }
722         }
723
724         list_for_each(tmp, &peer->peer_activeq) {
725                 tx = list_entry(peer->peer_activeq.next, kptl_tx_t, tx_list);
726
727                 if (time_after_eq(jiffies, tx->tx_deadline)) {
728                         kptllnd_tx_addref(tx);
729                         spin_unlock_irqrestore(&peer->peer_lock, flags);
730                         return tx;
731                 }
732         }
733
734         spin_unlock_irqrestore(&peer->peer_lock, flags);
735         return NULL;
736 }
737
738
739 void
740 kptllnd_peer_check_bucket (int idx)
741 {
742         struct list_head  *peers = &kptllnd_data.kptl_peers[idx];
743         struct list_head  *ptmp;
744         kptl_peer_t       *peer;
745         kptl_tx_t         *tx;
746         unsigned long      flags;
747         int                nsend;
748         int                nactive;
749
750         CDEBUG(D_NET, "Bucket=%d\n", idx);
751
752  again:
753         /* NB. Shared lock while I just look */
754         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
755
756         list_for_each (ptmp, peers) {
757                 peer = list_entry (ptmp, kptl_peer_t, peer_list);
758
759                 CDEBUG(D_NET, "Peer=%s Credits=%d Outstanding=%d\n",
760                        libcfs_id2str(peer->peer_id),
761                        peer->peer_credits, peer->peer_outstanding_credits);
762
763                 /* In case we have enough credits to return via a
764                  * NOOP, but there were no non-blocking tx descs
765                  * free to do it last time... */
766                 kptllnd_peer_check_sends(peer);
767
768                 tx = kptllnd_find_timed_out_tx(peer);
769                 if (tx == NULL)
770                         continue;
771
772                 kptllnd_peer_addref(peer); /* 1 ref for me... */
773
774                 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
775                                        flags);
776
777                 spin_lock_irqsave(&peer->peer_lock, flags);
778                 nsend = kptllnd_count_queue(&peer->peer_sendq);
779                 nactive = kptllnd_count_queue(&peer->peer_activeq);
780                 spin_unlock_irqrestore(&peer->peer_lock, flags);
781
782                 LCONSOLE_ERROR("Timing out %s: please check Portals\n",
783                                libcfs_id2str(peer->peer_id));
784
785                 CERROR("%s timed out: cred %d outstanding %d sendq %d "
786                        "activeq %d Tx %s (%s%s%s) status %d T/O %ds\n",
787                        libcfs_id2str(peer->peer_id),
788                        peer->peer_credits, peer->peer_outstanding_credits,
789                        nsend, nactive, kptllnd_tx_typestr(tx->tx_type),
790                        tx->tx_active ? "A" : "",
791                        PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ?
792                        "" : "M",
793                        PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE) ?
794                        "" : "D",
795                        tx->tx_status, *kptllnd_tunables.kptl_timeout);
796
797                 kptllnd_dump_ptltrace();
798
799                 kptllnd_tx_decref(tx);
800
801                 kptllnd_peer_close(peer, -ETIMEDOUT);
802                 kptllnd_peer_decref(peer); /* ...until here */
803
804                 /* start again now I've dropped the lock */
805                 goto again;
806         }
807
808         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
809 }
810
811 kptl_peer_t *
812 kptllnd_id2peer_locked (lnet_process_id_t id)
813 {
814         struct list_head *peers = kptllnd_nid2peerlist(id.nid);
815         struct list_head *tmp;
816         kptl_peer_t      *peer;
817
818         list_for_each (tmp, peers) {
819
820                 peer = list_entry (tmp, kptl_peer_t, peer_list);
821
822                 LASSERT(peer->peer_state == PEER_STATE_WAITING_HELLO ||
823                         peer->peer_state == PEER_STATE_ACTIVE);
824                 
825                 if (peer->peer_id.nid != id.nid ||
826                     peer->peer_id.pid != id.pid)
827                         continue;
828
829                 kptllnd_peer_addref(peer);
830
831                 CDEBUG(D_NET, "%s -> %s (%d)\n",
832                        libcfs_id2str(id), 
833                        kptllnd_ptlid2str(peer->peer_ptlid),
834                        atomic_read (&peer->peer_refcount));
835                 return peer;
836         }
837
838         return NULL;
839 }
840
841 void
842 kptllnd_peertable_overflow_msg(char *str, lnet_process_id_t id)
843 {
844         LCONSOLE_ERROR("%s %s overflows the peer table[%d]: "
845                        "messages may be dropped\n",
846                        str, libcfs_id2str(id),
847                        kptllnd_data.kptl_n_active_peers);
848         LCONSOLE_ERROR("Please correct by increasing "
849                        "'max_nodes' or 'max_procs_per_node'\n");
850 }
851
852 __u64
853 kptllnd_get_last_seen_matchbits_locked(lnet_process_id_t lpid)
854 {
855         kptl_peer_t            *peer;
856         struct list_head       *tmp;
857
858         /* Find the last matchbits I saw this new peer using.  Note..
859            A. This peer cannot be in the peer table - she's new!
860            B. If I can't find the peer in the closing/zombie peers, all
861               matchbits are safe because all refs to the (old) peer have gone
862               so all txs have completed so there's no risk of matchbit
863               collision!
864          */
865
866         LASSERT(kptllnd_id2peer_locked(lpid) == NULL);
867
868         /* peer's last matchbits can't change after it comes out of the peer
869          * table, so first match is fine */
870
871         list_for_each (tmp, &kptllnd_data.kptl_closing_peers) {
872                 peer = list_entry (tmp, kptl_peer_t, peer_list);
873
874                 if (peer->peer_id.nid == lpid.nid &&
875                     peer->peer_id.pid == lpid.pid)
876                         return peer->peer_last_matchbits_seen;
877         }
878         
879         list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
880                 peer = list_entry (tmp, kptl_peer_t, peer_list);
881
882                 if (peer->peer_id.nid == lpid.nid &&
883                     peer->peer_id.pid == lpid.pid)
884                         return peer->peer_last_matchbits_seen;
885         }
886         
887         return PTL_RESERVED_MATCHBITS;
888 }
889
890 kptl_peer_t *
891 kptllnd_peer_handle_hello (ptl_process_id_t  initiator,
892                            kptl_msg_t       *msg)
893 {
894         rwlock_t           *g_lock = &kptllnd_data.kptl_peer_rw_lock;
895         kptl_peer_t        *peer;
896         kptl_peer_t        *new_peer;
897         lnet_process_id_t   lpid;
898         unsigned long       flags;
899         kptl_tx_t          *hello_tx;
900         int                 rc;
901         __u64               safe_matchbits;
902         __u64               last_matchbits_seen;
903
904         lpid.nid = msg->ptlm_srcnid;
905         lpid.pid = msg->ptlm_srcpid;
906
907         CDEBUG(D_NET, "hello from %s(%s)\n",
908                libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
909
910         if (initiator.pid != kptllnd_data.kptl_portals_id.pid &&
911             (msg->ptlm_srcpid & LNET_PID_USERFLAG) == 0) {
912                 /* If the peer's PID isn't _the_ ptllnd kernel pid, she must be
913                  * userspace.  Refuse the connection if she hasn't set the
914                  * correct flag in her PID... */
915                 CERROR("Userflag not set in hello from %s (%s)\n",
916                        libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
917                 return NULL;
918         }
919         
920         /* kptlhm_matchbits are the highest matchbits my peer may have used to
921          * RDMA to me.  I ensure I never register buffers for RDMA that could
922          * match any she used */
923         safe_matchbits = msg->ptlm_u.hello.kptlhm_matchbits + 1;
924
925         if (safe_matchbits < PTL_RESERVED_MATCHBITS) {
926                 CERROR("Illegal matchbits "LPX64" in HELLO from %s\n",
927                        safe_matchbits, libcfs_id2str(lpid));
928                 return NULL;
929         }
930         
931         if (msg->ptlm_u.hello.kptlhm_max_msg_size !=
932             *kptllnd_tunables.kptl_max_msg_size) {
933                 CERROR("max message size MUST be equal for all peers: "
934                        "got %d expected %d from %s\n",
935                        msg->ptlm_u.hello.kptlhm_max_msg_size,
936                        *kptllnd_tunables.kptl_max_msg_size,
937                        libcfs_id2str(lpid));
938                 return NULL;
939         }
940
941         if (msg->ptlm_credits + 1 != *kptllnd_tunables.kptl_peercredits) {
942                 CERROR("peercredits MUST be equal on all peers: "
943                        "got %d expected %d from %s\n",
944                        msg->ptlm_credits + 1,
945                        *kptllnd_tunables.kptl_peercredits,
946                        libcfs_id2str(lpid));
947                 return NULL;
948         }
949         
950         write_lock_irqsave(g_lock, flags);
951
952         peer = kptllnd_id2peer_locked(lpid);
953         if (peer != NULL) {
954                 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
955                         /* Completing HELLO handshake */
956                         LASSERT(peer->peer_incarnation == 0);
957
958                         if (msg->ptlm_dststamp != 0 &&
959                             msg->ptlm_dststamp != peer->peer_myincarnation) {
960                                 write_unlock_irqrestore(g_lock, flags);
961
962                                 CERROR("Ignoring HELLO from %s: unexpected "
963                                        "dststamp "LPX64" ("LPX64" wanted)\n",
964                                        libcfs_id2str(lpid),
965                                        msg->ptlm_dststamp,
966                                        peer->peer_myincarnation);
967                                 kptllnd_peer_decref(peer);
968                                 return NULL;
969                         }
970                         
971                         /* Concurrent initiation or response to my HELLO */
972                         peer->peer_state = PEER_STATE_ACTIVE;
973                         peer->peer_incarnation = msg->ptlm_srcstamp;
974                         peer->peer_next_matchbits = safe_matchbits;
975                         
976                         write_unlock_irqrestore(g_lock, flags);
977                         return peer;
978                 }
979
980                 if (msg->ptlm_dststamp != 0 &&
981                     msg->ptlm_dststamp <= peer->peer_myincarnation) {
982                         write_unlock_irqrestore(g_lock, flags);
983
984                         CERROR("Ignoring stale HELLO from %s: "
985                                "dststamp "LPX64" (current "LPX64")\n",
986                                libcfs_id2str(lpid),
987                                msg->ptlm_dststamp,
988                                peer->peer_myincarnation);
989                         kptllnd_peer_decref(peer);
990                         return NULL;
991                 }
992
993                 /* Brand new connection attempt: remove old incarnation */
994                 kptllnd_peer_close_locked(peer, 0);
995         }
996
997         kptllnd_cull_peertable_locked(lpid);
998
999         write_unlock_irqrestore(g_lock, flags);
1000
1001         if (peer != NULL) {
1002                 CDEBUG(D_NET, "Peer %s (%s) reconnecting:"
1003                        " stamp "LPX64"("LPX64")\n",
1004                        libcfs_id2str(lpid), kptllnd_ptlid2str(initiator),
1005                        msg->ptlm_srcstamp, peer->peer_incarnation);
1006
1007                 kptllnd_peer_decref(peer);
1008         }
1009
1010         hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1011         if (hello_tx == NULL) {
1012                 CERROR("Unable to allocate HELLO message for %s\n",
1013                        libcfs_id2str(lpid));
1014                 return NULL;
1015         }
1016
1017         kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1018                          sizeof(kptl_hello_msg_t));
1019
1020         new_peer = kptllnd_peer_allocate(lpid, initiator);
1021         if (new_peer == NULL) {
1022                 kptllnd_tx_decref(hello_tx);
1023                 return NULL;
1024         }
1025
1026         rc = kptllnd_peer_reserve_buffers();
1027         if (rc != 0) {
1028                 kptllnd_peer_decref(new_peer);
1029                 kptllnd_tx_decref(hello_tx);
1030
1031                 CERROR("Failed to reserve buffers for %s\n",
1032                        libcfs_id2str(lpid));
1033                 return NULL;
1034         }
1035
1036         write_lock_irqsave(g_lock, flags);
1037
1038         peer = kptllnd_id2peer_locked(lpid);
1039         if (peer != NULL) {
1040                 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
1041                         /* An outgoing message instantiated 'peer' for me and
1042                          * presumably provoked this reply */
1043                         CWARN("Outgoing instantiated peer %s\n", libcfs_id2str(lpid));
1044                         LASSERT(peer->peer_incarnation == 0);
1045
1046                         peer->peer_state = PEER_STATE_ACTIVE;
1047                         peer->peer_incarnation = msg->ptlm_srcstamp;
1048                         peer->peer_next_matchbits = safe_matchbits;
1049                 } else {
1050                         LASSERT (peer->peer_state == PEER_STATE_ACTIVE);
1051                         /* WOW!  Somehow this peer completed the HELLO
1052                          * handshake while I slept.  I guess I could have slept
1053                          * while it rebooted and sent a new HELLO, so I'll fail
1054                          * this one... */
1055                         CWARN("Wow! peer %s\n", libcfs_id2str(lpid));
1056                         kptllnd_peer_decref(peer);
1057                         peer = NULL;
1058                 }
1059                 
1060                 write_unlock_irqrestore(g_lock, flags);
1061
1062                 kptllnd_peer_unreserve_buffers();
1063                 kptllnd_peer_decref(new_peer);
1064                 kptllnd_tx_decref(hello_tx);
1065                 return peer;
1066         }
1067
1068         if (kptllnd_data.kptl_n_active_peers ==
1069             kptllnd_data.kptl_expected_peers) {
1070                 /* peer table full */
1071                 write_unlock_irqrestore(g_lock, flags);
1072
1073                 kptllnd_peertable_overflow_msg("Connection from ", lpid);
1074
1075                 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1076                 if (rc != 0) {
1077                         CERROR("Refusing connection from %s\n",
1078                                libcfs_id2str(lpid));
1079                         kptllnd_peer_unreserve_buffers();
1080                         kptllnd_peer_decref(new_peer);
1081                         kptllnd_tx_decref(hello_tx);
1082                         return NULL;
1083                 }
1084                 
1085                 write_lock_irqsave(g_lock, flags);
1086                 kptllnd_data.kptl_expected_peers++;
1087         }
1088
1089         last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(lpid);
1090
1091         hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1092         hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1093                 *kptllnd_tunables.kptl_max_msg_size;
1094
1095         new_peer->peer_state = PEER_STATE_ACTIVE;
1096         new_peer->peer_incarnation = msg->ptlm_srcstamp;
1097         new_peer->peer_next_matchbits = safe_matchbits;
1098         new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1099
1100         kptllnd_peer_add_peertable_locked(new_peer);
1101
1102         write_unlock_irqrestore(g_lock, flags);
1103
1104         /* NB someone else could get in now and post a message before I post
1105          * the HELLO, but post_tx/check_sends take care of that! */
1106
1107         CDEBUG(D_NETTRACE, "%s: post response hello %p\n",
1108                libcfs_id2str(new_peer->peer_id), hello_tx);
1109
1110         kptllnd_post_tx(new_peer, hello_tx);
1111         kptllnd_peer_check_sends(new_peer);
1112
1113         return new_peer;
1114 }
1115
1116 void
1117 kptllnd_tx_launch(kptl_tx_t *tx, lnet_process_id_t target)
1118 {
1119         rwlock_t         *g_lock = &kptllnd_data.kptl_peer_rw_lock;
1120         ptl_process_id_t  ptl_id;
1121         kptl_peer_t      *peer;
1122         kptl_peer_t      *new_peer = NULL;
1123         kptl_tx_t        *hello_tx = NULL;
1124         unsigned long     flags;
1125         int               rc;
1126         __u64             last_matchbits_seen;
1127
1128         LASSERT (tx->tx_lnet_msg != NULL);
1129         LASSERT (tx->tx_peer == NULL);
1130
1131         /* I expect to find the peer, so I only take a read lock... */
1132         read_lock_irqsave(g_lock, flags);
1133         peer = kptllnd_id2peer_locked(target);
1134         read_unlock_irqrestore(g_lock, flags);
1135
1136         if (peer != NULL) {
1137                 goto post;
1138         }
1139         
1140         if ((target.pid & LNET_PID_USERFLAG) != 0) {
1141                 CWARN("Refusing to create a new connection to %s "
1142                       "(non-kernel peer)\n", libcfs_id2str(target));
1143                 tx->tx_status = -EHOSTUNREACH;
1144                 goto failed;
1145         }
1146
1147         /* The new peer is a kernel ptllnd, and kernel ptllnds all have
1148          * the same portals PID */
1149         ptl_id.nid = kptllnd_lnet2ptlnid(target.nid);
1150         ptl_id.pid = kptllnd_data.kptl_portals_id.pid;
1151
1152         write_lock_irqsave(g_lock, flags);
1153
1154         peer = kptllnd_id2peer_locked(target);
1155         if (peer != NULL) {
1156                 write_unlock_irqrestore(g_lock, flags);
1157                 goto post;
1158         }
1159         
1160         kptllnd_cull_peertable_locked(target);
1161
1162         write_unlock_irqrestore(g_lock, flags);
1163                 
1164         hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1165         if (hello_tx == NULL) {
1166                 CERROR("Unable to allocate connect message for %s\n",
1167                        libcfs_id2str(target));
1168                 tx->tx_status = -ENOMEM;
1169                 goto failed;
1170         }
1171
1172         kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1173                          sizeof(kptl_hello_msg_t));
1174
1175         new_peer = kptllnd_peer_allocate(target, ptl_id);
1176         if (new_peer == NULL) {
1177                 tx->tx_status = -ENOMEM;
1178                 goto failed;
1179         }
1180
1181         rc = kptllnd_peer_reserve_buffers();
1182         if (rc != 0) {
1183                 tx->tx_status = rc;
1184                 goto failed;
1185         }
1186
1187         write_lock_irqsave(g_lock, flags);
1188
1189         peer = kptllnd_id2peer_locked(target);
1190         if (peer != NULL) {                     /* someone else beat me to it */
1191                 write_unlock_irqrestore(g_lock, flags);
1192
1193                 kptllnd_peer_unreserve_buffers();
1194                 kptllnd_peer_decref(new_peer);
1195                 kptllnd_tx_decref(hello_tx);
1196                 goto post;
1197         }
1198                 
1199         if (kptllnd_data.kptl_n_active_peers ==
1200             kptllnd_data.kptl_expected_peers) {
1201                 /* peer table full */
1202                 write_unlock_irqrestore(g_lock, flags);
1203
1204                 kptllnd_peertable_overflow_msg("Connection to ", target);
1205
1206                 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1207                 if (rc != 0) {
1208                         CERROR("Can't create connection to %s\n",
1209                                libcfs_id2str(target));
1210                         kptllnd_peer_unreserve_buffers();
1211                         tx->tx_status = -ENOMEM;
1212                         goto failed;
1213                 }
1214                 write_lock_irqsave(g_lock, flags);
1215                 kptllnd_data.kptl_expected_peers++;
1216         }
1217
1218         last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(target);
1219
1220         hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1221         hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1222                 *kptllnd_tunables.kptl_max_msg_size;
1223                 
1224         new_peer->peer_state = PEER_STATE_WAITING_HELLO;
1225         new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1226         
1227         kptllnd_peer_add_peertable_locked(new_peer);
1228
1229         write_unlock_irqrestore(g_lock, flags);
1230
1231         /* NB someone else could get in now and post a message before I post
1232          * the HELLO, but post_tx/check_sends take care of that! */
1233
1234         CDEBUG(D_NETTRACE, "%s: post initial hello %p\n",
1235                libcfs_id2str(new_peer->peer_id), hello_tx);
1236
1237         peer = new_peer;
1238         kptllnd_post_tx(peer, hello_tx);
1239
1240  post:
1241         kptllnd_post_tx(peer, tx);
1242         kptllnd_peer_check_sends(peer);
1243         kptllnd_peer_decref(peer);
1244         return;
1245         
1246  failed:
1247         if (hello_tx != NULL)
1248                 kptllnd_tx_decref(hello_tx);
1249
1250         if (new_peer != NULL)
1251                 kptllnd_peer_decref(new_peer);
1252
1253         LASSERT (tx->tx_status != 0);
1254         kptllnd_tx_decref(tx);
1255         
1256 }