Whamcloud - gitweb
add macro LCONSOLE_ERROR_MSG with extra parameter and map
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd_peer.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved.
5  *   Author: PJ Kirner <pjkirner@clusterfs.com>
6  *           E Barton <eeb@bartonsoftware.com>
7  *
8  *   This file is part of the Lustre file system, http://www.lustre.org
9  *   Lustre is a trademark of Cluster File Systems, Inc.
10  *
11  *   This file is confidential source code owned by Cluster File Systems.
12  *   No viewing, modification, compilation, redistribution, or any other
13  *   form of use is permitted except through a signed license agreement.
14  *
15  *   If you have not signed such an agreement, then you have no rights to
16  *   this file.  Please destroy it immediately and contact CFS.
17  *
18  */
19
20 #include "ptllnd.h"
21 #include <libcfs/list.h>
22
23 static int
24 kptllnd_count_queue(struct list_head *q)
25 {
26         struct list_head *e;
27         int               n = 0;
28         
29         list_for_each(e, q) {
30                 n++;
31         }
32
33         return n;
34 }
35
36 int
37 kptllnd_get_peer_info(int index, 
38                       lnet_process_id_t *id,
39                       int *state, int *sent_hello,
40                       int *refcount, __u64 *incarnation,
41                       __u64 *next_matchbits, __u64 *last_matchbits_seen,
42                       int *nsendq, int *nactiveq,
43                       int *credits, int *outstanding_credits) 
44 {
45         rwlock_t         *g_lock = &kptllnd_data.kptl_peer_rw_lock;
46         unsigned long     flags;
47         struct list_head *ptmp;
48         kptl_peer_t      *peer;
49         int               i;
50         int               rc = -ENOENT;
51
52         read_lock_irqsave(g_lock, flags);
53
54         for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++) {
55                 
56                 list_for_each (ptmp, &kptllnd_data.kptl_peers[i]) {
57                         peer = list_entry(ptmp, kptl_peer_t, peer_list);
58
59                         if (index-- > 0)
60                                 continue;
61                         
62                         *id          = peer->peer_id;
63                         *state       = peer->peer_state;
64                         *sent_hello  = peer->peer_sent_hello;
65                         *refcount    = atomic_read(&peer->peer_refcount);
66                         *incarnation = peer->peer_incarnation;
67
68                         spin_lock(&peer->peer_lock);
69
70                         *next_matchbits      = peer->peer_next_matchbits;
71                         *last_matchbits_seen = peer->peer_last_matchbits_seen;
72                         *credits             = peer->peer_credits;
73                         *outstanding_credits = peer->peer_outstanding_credits;
74
75                         *nsendq   = kptllnd_count_queue(&peer->peer_sendq);
76                         *nactiveq = kptllnd_count_queue(&peer->peer_activeq);
77
78                         spin_unlock(&peer->peer_lock);
79
80                         rc = 0;
81                         goto out;
82                 }
83         }
84         
85  out:
86         read_unlock_irqrestore(g_lock, flags);
87         return rc;
88 }
89
90 void
91 kptllnd_peer_add_peertable_locked (kptl_peer_t *peer)
92 {
93         LASSERT (kptllnd_data.kptl_n_active_peers <
94                  kptllnd_data.kptl_expected_peers);
95
96         LASSERT (peer->peer_state == PEER_STATE_WAITING_HELLO ||
97                  peer->peer_state == PEER_STATE_ACTIVE);
98         
99         kptllnd_data.kptl_n_active_peers++;
100         atomic_inc(&peer->peer_refcount);       /* +1 ref for the list */
101
102         /* NB add to HEAD of peer list for MRU order!
103          * (see kptllnd_cull_peertable) */
104         list_add(&peer->peer_list, kptllnd_nid2peerlist(peer->peer_id.nid));
105 }
106
107 void
108 kptllnd_cull_peertable_locked (lnet_process_id_t pid)
109 {
110         /* I'm about to add a new peer with this portals ID to the peer table,
111          * so (a) this peer should not exist already and (b) I want to leave at
112          * most (max_procs_per_nid - 1) peers with this NID in the table. */
113         struct list_head  *peers = kptllnd_nid2peerlist(pid.nid);
114         int                cull_count = *kptllnd_tunables.kptl_max_procs_per_node;
115         int                count;
116         struct list_head  *tmp;
117         struct list_head  *nxt;
118         kptl_peer_t       *peer;
119         
120         count = 0;
121         list_for_each_safe (tmp, nxt, peers) {
122                 /* NB I rely on kptllnd_peer_add_peertable_locked to add peers
123                  * in MRU order */
124                 peer = list_entry(tmp, kptl_peer_t, peer_list);
125                         
126                 if (peer->peer_id.nid != pid.nid)
127                         continue;
128
129                 LASSERT (peer->peer_id.pid != pid.pid);
130                         
131                 count++;
132
133                 if (count < cull_count) /* recent (don't cull) */
134                         continue;
135
136                 CDEBUG(D_NET, "Cull %s(%s)\n",
137                        libcfs_id2str(peer->peer_id),
138                        kptllnd_ptlid2str(peer->peer_ptlid));
139                 
140                 kptllnd_peer_close_locked(peer, 0);
141         }
142 }
143
144 kptl_peer_t *
145 kptllnd_peer_allocate (lnet_process_id_t lpid, ptl_process_id_t ppid)
146 {
147         unsigned long    flags;
148         kptl_peer_t     *peer;
149
150         LIBCFS_ALLOC(peer, sizeof (*peer));
151         if (peer == NULL) {
152                 CERROR("Can't create peer %s (%s)\n",
153                        libcfs_id2str(lpid), 
154                        kptllnd_ptlid2str(ppid));
155                 return NULL;
156         }
157
158         memset(peer, 0, sizeof(*peer));         /* zero flags etc */
159
160         INIT_LIST_HEAD (&peer->peer_sendq);
161         INIT_LIST_HEAD (&peer->peer_activeq);
162         spin_lock_init (&peer->peer_lock);
163
164         peer->peer_state = PEER_STATE_ALLOCATED;
165         peer->peer_error = 0;
166         peer->peer_last_alive = cfs_time_current();
167         peer->peer_id = lpid;
168         peer->peer_ptlid = ppid;
169         peer->peer_credits = 1;                 /* enough for HELLO */
170         peer->peer_next_matchbits = PTL_RESERVED_MATCHBITS;
171         peer->peer_outstanding_credits = *kptllnd_tunables.kptl_peercredits - 1;
172         peer->peer_sent_credits = 1;           /* HELLO credit is implicit */
173         peer->peer_max_msg_size = PTLLND_MIN_BUFFER_SIZE; /* until we know better */
174
175         atomic_set(&peer->peer_refcount, 1);    /* 1 ref for caller */
176
177         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
178
179         peer->peer_myincarnation = kptllnd_data.kptl_incarnation;
180
181         /* Only increase # peers under lock, to guarantee we dont grow it
182          * during shutdown */
183         if (kptllnd_data.kptl_shutdown) {
184                 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, 
185                                         flags);
186                 LIBCFS_FREE(peer, sizeof(*peer));
187                 return NULL;
188         }
189
190         kptllnd_data.kptl_npeers++;
191         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
192         
193         return peer;
194 }
195
196 void
197 kptllnd_peer_destroy (kptl_peer_t *peer)
198 {
199         unsigned long flags;
200         
201         CDEBUG(D_NET, "Peer=%p\n", peer);
202
203         LASSERT (!in_interrupt());
204         LASSERT (atomic_read(&peer->peer_refcount) == 0);
205         LASSERT (peer->peer_state == PEER_STATE_ALLOCATED ||
206                  peer->peer_state == PEER_STATE_ZOMBIE);
207         LASSERT (list_empty(&peer->peer_sendq));
208         LASSERT (list_empty(&peer->peer_activeq));
209
210         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
211
212         if (peer->peer_state == PEER_STATE_ZOMBIE)
213                 list_del(&peer->peer_list);
214
215         kptllnd_data.kptl_npeers--;
216
217         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
218
219         LIBCFS_FREE (peer, sizeof (*peer));
220 }
221
222 void
223 kptllnd_cancel_txlist (struct list_head *peerq, struct list_head *txs)
224 {
225         struct list_head  *tmp;
226         struct list_head  *nxt;
227         kptl_tx_t         *tx;
228
229         list_for_each_safe (tmp, nxt, peerq) {
230                 tx = list_entry(tmp, kptl_tx_t, tx_list);
231
232                 list_del(&tx->tx_list);
233                 list_add_tail(&tx->tx_list, txs);
234
235                 tx->tx_status = -EIO;
236                 tx->tx_active = 0;
237         }
238 }
239
240 void
241 kptllnd_peer_cancel_txs(kptl_peer_t *peer, struct list_head *txs)
242 {
243         unsigned long   flags;
244
245         spin_lock_irqsave(&peer->peer_lock, flags);
246
247         kptllnd_cancel_txlist(&peer->peer_sendq, txs);
248         kptllnd_cancel_txlist(&peer->peer_activeq, txs);
249                 
250         spin_unlock_irqrestore(&peer->peer_lock, flags);
251 }
252
253 void
254 kptllnd_peer_alive (kptl_peer_t *peer)
255 {
256         /* This is racy, but everyone's only writing cfs_time_current() */
257         peer->peer_last_alive = cfs_time_current();
258         mb();
259 }
260
261 void
262 kptllnd_peer_notify (kptl_peer_t *peer)
263 {
264         unsigned long flags;
265         time_t        last_alive = 0;
266         int           error = 0;
267         
268         spin_lock_irqsave(&peer->peer_lock, flags);
269
270         if (peer->peer_error != 0) {
271                 error = peer->peer_error;
272                 peer->peer_error = 0;
273                 
274                 last_alive = cfs_time_current_sec() - 
275                              cfs_duration_sec(cfs_time_current() - 
276                                               peer->peer_last_alive);
277         }
278         
279         spin_unlock_irqrestore(&peer->peer_lock, flags);
280
281         if (error != 0)
282                 lnet_notify (kptllnd_data.kptl_ni, peer->peer_id.nid, 0,
283                              last_alive);
284 }
285
286 void
287 kptllnd_handle_closing_peers ()
288 {
289         unsigned long           flags;
290         struct list_head        txs;
291         kptl_peer_t            *peer;
292         struct list_head       *tmp;
293         struct list_head       *nxt;
294         kptl_tx_t              *tx;
295         int                     idle;
296
297         /* Check with a read lock first to avoid blocking anyone */
298
299         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
300         idle = list_empty(&kptllnd_data.kptl_closing_peers) &&
301                list_empty(&kptllnd_data.kptl_zombie_peers);
302         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
303
304         if (idle)
305                 return;
306
307         INIT_LIST_HEAD(&txs);
308
309         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
310
311         /* Cancel txs on all zombie peers.  NB anyone dropping the last peer
312          * ref removes it from this list, so I musn't drop the lock while
313          * scanning it. */
314         list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
315                 peer = list_entry (tmp, kptl_peer_t, peer_list);
316
317                 LASSERT (peer->peer_state == PEER_STATE_ZOMBIE);
318
319                 kptllnd_peer_cancel_txs(peer, &txs);
320         }
321
322         /* Notify LNET and cancel txs on closing (i.e. newly closed) peers.  NB
323          * I'm the only one removing from this list, but peers can be added on
324          * the end any time I drop the lock. */
325
326         list_for_each_safe (tmp, nxt, &kptllnd_data.kptl_closing_peers) {
327                 peer = list_entry (tmp, kptl_peer_t, peer_list);
328
329                 LASSERT (peer->peer_state == PEER_STATE_CLOSING);
330
331                 list_del(&peer->peer_list);
332                 list_add_tail(&peer->peer_list,
333                               &kptllnd_data.kptl_zombie_peers);
334                 peer->peer_state = PEER_STATE_ZOMBIE;
335
336                 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
337
338                 kptllnd_peer_notify(peer);
339                 kptllnd_peer_cancel_txs(peer, &txs);
340                 kptllnd_peer_decref(peer);
341
342                 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
343         }
344
345         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
346
347         /* Drop peer's ref on all cancelled txs.  This will get
348          * kptllnd_tx_fini() to abort outstanding comms if necessary. */
349
350         list_for_each_safe (tmp, nxt, &txs) {
351                 tx = list_entry(tmp, kptl_tx_t, tx_list);
352                 list_del(&tx->tx_list);
353                 kptllnd_tx_decref(tx);
354         }
355 }
356
357 void
358 kptllnd_peer_close_locked(kptl_peer_t *peer, int why)
359 {
360         switch (peer->peer_state) {
361         default:
362                 LBUG();
363
364         case PEER_STATE_WAITING_HELLO:
365         case PEER_STATE_ACTIVE:
366                 /* Ensure new peers see a new incarnation of me */
367                 LASSERT(peer->peer_myincarnation <= kptllnd_data.kptl_incarnation);
368                 if (peer->peer_myincarnation == kptllnd_data.kptl_incarnation)
369                         kptllnd_data.kptl_incarnation++;
370
371                 /* Removing from peer table */
372                 kptllnd_data.kptl_n_active_peers--;
373                 LASSERT (kptllnd_data.kptl_n_active_peers >= 0);
374
375                 list_del(&peer->peer_list);
376                 kptllnd_peer_unreserve_buffers();
377
378                 peer->peer_error = why; /* stash 'why' only on first close */
379                 peer->peer_state = PEER_STATE_CLOSING;
380
381                 /* Schedule for immediate attention, taking peer table's ref */
382                 list_add_tail(&peer->peer_list, 
383                               &kptllnd_data.kptl_closing_peers);
384                 wake_up(&kptllnd_data.kptl_watchdog_waitq);
385                 break;
386
387         case PEER_STATE_ZOMBIE:
388         case PEER_STATE_CLOSING:
389                 break;
390         }
391 }
392
393 void
394 kptllnd_peer_close(kptl_peer_t *peer, int why)
395 {
396         unsigned long      flags;
397
398         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
399         kptllnd_peer_close_locked(peer, why);
400         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
401 }
402
403 int
404 kptllnd_peer_del(lnet_process_id_t id)
405 {
406         struct list_head  *ptmp;
407         struct list_head  *pnxt;
408         kptl_peer_t       *peer;
409         int                lo;
410         int                hi;
411         int                i;
412         unsigned long      flags;
413         int                rc = -ENOENT;
414
415         /*
416          * Find the single bucket we are supposed to look at or if nid is a
417          * wildcard (LNET_NID_ANY) then look at all of the buckets
418          */
419         if (id.nid != LNET_NID_ANY) {
420                 struct list_head *l = kptllnd_nid2peerlist(id.nid);
421                 
422                 lo = hi =  l - kptllnd_data.kptl_peers;
423         } else {
424                 if (id.pid != LNET_PID_ANY)
425                         return -EINVAL;
426                 
427                 lo = 0;
428                 hi = kptllnd_data.kptl_peer_hash_size - 1;
429         }
430
431 again:
432         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
433
434         for (i = lo; i <= hi; i++) {
435                 list_for_each_safe (ptmp, pnxt, &kptllnd_data.kptl_peers[i]) {
436                         peer = list_entry (ptmp, kptl_peer_t, peer_list);
437
438                         if (!(id.nid == LNET_NID_ANY || 
439                               (peer->peer_id.nid == id.nid &&
440                                (id.pid == LNET_PID_ANY || 
441                                 peer->peer_id.pid == id.pid))))
442                                 continue;
443
444                         kptllnd_peer_addref(peer); /* 1 ref for me... */
445
446                         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
447                                                flags);
448
449                         kptllnd_peer_close(peer, 0);
450                         kptllnd_peer_decref(peer); /* ...until here */
451
452                         rc = 0;         /* matched something */
453
454                         /* start again now I've dropped the lock */
455                         goto again;
456                 }
457         }
458
459         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
460
461         return (rc);
462 }
463
464 void
465 kptllnd_post_tx(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
466 {
467         /* CAVEAT EMPTOR: I take over caller's ref on 'tx' */
468         ptl_handle_md_t  msg_mdh;
469         ptl_md_t         md;
470         ptl_err_t        prc;
471         unsigned long    flags;
472
473         LASSERT (!tx->tx_idle);
474         LASSERT (!tx->tx_active);
475         LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
476         LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
477         LASSERT (tx->tx_type == TX_TYPE_SMALL_MESSAGE ||
478                  tx->tx_type == TX_TYPE_PUT_REQUEST ||
479                  tx->tx_type == TX_TYPE_GET_REQUEST);
480
481         kptllnd_set_tx_peer(tx, peer);
482
483         memset(&md, 0, sizeof(md));
484
485         md.threshold = tx->tx_acked ? 2 : 1;    /* SEND END + ACK? */
486         md.options = PTL_MD_OP_PUT |
487                      PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
488                      PTL_MD_EVENT_START_DISABLE;
489         md.user_ptr = &tx->tx_msg_eventarg;
490         md.eq_handle = kptllnd_data.kptl_eqh;
491
492         if (nfrag == 0) {
493                 md.start = tx->tx_msg;
494                 md.length = tx->tx_msg->ptlm_nob;
495         } else {
496                 LASSERT (nfrag > 1);
497                 LASSERT (tx->tx_frags->iov[0].iov_base == (void *)tx->tx_msg);
498
499                 md.start = tx->tx_frags;
500                 md.length = nfrag;
501                 md.options |= PTL_MD_IOVEC;
502         }
503
504         prc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &msg_mdh);
505         if (prc != PTL_OK) {
506                 CERROR("PtlMDBind(%s) failed: %s(%d)\n",
507                        libcfs_id2str(peer->peer_id),
508                        kptllnd_errtype2str(prc), prc);
509                 tx->tx_status = -EIO;
510                 kptllnd_tx_decref(tx);
511                 return;
512         }
513         
514         spin_lock_irqsave(&peer->peer_lock, flags);
515
516         tx->tx_deadline = jiffies + (*kptllnd_tunables.kptl_timeout * HZ);
517         tx->tx_active = 1;
518         tx->tx_msg_mdh = msg_mdh;
519
520         /* Ensure HELLO is sent first */
521         if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_HELLO)
522                 list_add(&tx->tx_list, &peer->peer_sendq);
523         else
524                 list_add_tail(&tx->tx_list, &peer->peer_sendq);
525
526         spin_unlock_irqrestore(&peer->peer_lock, flags);
527 }
528
529 void
530 kptllnd_peer_check_sends (kptl_peer_t *peer)
531 {
532         ptl_handle_me_t  meh;
533         kptl_tx_t       *tx;
534         int              rc;
535         unsigned long    flags;
536
537         LASSERT(!in_interrupt());
538
539         spin_lock_irqsave(&peer->peer_lock, flags);
540
541         peer->peer_retry_noop = 0;
542
543         if (list_empty(&peer->peer_sendq) &&
544             peer->peer_outstanding_credits >= PTLLND_CREDIT_HIGHWATER &&
545             peer->peer_credits != 0) {
546
547                 /* post a NOOP to return credits */
548                 spin_unlock_irqrestore(&peer->peer_lock, flags);
549
550                 tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
551                 if (tx == NULL) {
552                         CERROR("Can't return credits to %s: can't allocate descriptor\n",
553                                libcfs_id2str(peer->peer_id));
554                 } else {
555                         kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_NOOP, 0);
556                         kptllnd_post_tx(peer, tx, 0);
557                 }
558
559                 spin_lock_irqsave(&peer->peer_lock, flags);
560                 peer->peer_retry_noop = (tx == NULL);
561         }
562
563         while (!list_empty(&peer->peer_sendq)) {
564                 tx = list_entry (peer->peer_sendq.next, kptl_tx_t, tx_list);
565
566                 LASSERT (tx->tx_active);
567                 LASSERT (!PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
568                 LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
569
570                 LASSERT (peer->peer_outstanding_credits >= 0);
571                 LASSERT (peer->peer_sent_credits >= 0);
572                 LASSERT (peer->peer_sent_credits +
573                          peer->peer_outstanding_credits <=
574                          *kptllnd_tunables.kptl_peercredits);
575                 LASSERT (peer->peer_credits >= 0);
576
577                 /* Ensure HELLO is sent first */
578                 if (!peer->peer_sent_hello) {
579                         if (tx->tx_msg->ptlm_type != PTLLND_MSG_TYPE_HELLO)
580                                 break;
581                         peer->peer_sent_hello = 1;
582                 }
583
584                 if (peer->peer_credits == 0) {
585                         CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: no credits for %p\n",
586                                libcfs_id2str(peer->peer_id), 
587                                peer->peer_credits,
588                                peer->peer_outstanding_credits, 
589                                peer->peer_sent_credits, tx);
590                         break;
591                 }
592
593                 /* Don't use the last credit unless I've got credits to
594                  * return */
595                 if (peer->peer_credits == 1 &&
596                     peer->peer_outstanding_credits == 0) {
597                         CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: "
598                                "not using last credit for %p\n",
599                                libcfs_id2str(peer->peer_id), 
600                                peer->peer_credits,
601                                peer->peer_outstanding_credits,
602                                peer->peer_sent_credits, tx);
603                         break;
604                 }
605
606                 list_del(&tx->tx_list);
607
608                 /* Discard any NOOP I queued if I'm not at the high-water mark
609                  * any more or more messages have been queued */
610                 if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_NOOP &&
611                     (!list_empty(&peer->peer_sendq) ||
612                      peer->peer_outstanding_credits < PTLLND_CREDIT_HIGHWATER)) {
613
614                         tx->tx_active = 0;
615
616                         spin_unlock_irqrestore(&peer->peer_lock, flags);
617
618                         CDEBUG(D_NET, "%s: redundant noop\n", 
619                                libcfs_id2str(peer->peer_id));
620                         kptllnd_tx_decref(tx);
621
622                         spin_lock_irqsave(&peer->peer_lock, flags);
623                         continue;
624                 }
625
626                 /* fill last-minute msg fields */
627                 kptllnd_msg_pack(tx->tx_msg, peer);
628
629                 if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
630                     tx->tx_type == TX_TYPE_GET_REQUEST) {
631                         /* peer_next_matchbits must be known good */
632                         LASSERT (peer->peer_state >= PEER_STATE_ACTIVE);
633                         /* Assume 64-bit matchbits can't wrap */
634                         LASSERT (peer->peer_next_matchbits >= PTL_RESERVED_MATCHBITS);
635                         tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits =
636                                 peer->peer_next_matchbits++;
637                 }
638                 
639                 peer->peer_sent_credits += peer->peer_outstanding_credits;
640                 peer->peer_outstanding_credits = 0;
641                 peer->peer_credits--;
642
643                 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: %s tx=%p nob=%d cred=%d\n",
644                        libcfs_id2str(peer->peer_id), peer->peer_credits,
645                        peer->peer_outstanding_credits, peer->peer_sent_credits,
646                        kptllnd_msgtype2str(tx->tx_msg->ptlm_type),
647                        tx, tx->tx_msg->ptlm_nob,
648                        tx->tx_msg->ptlm_credits);
649
650                 list_add_tail(&tx->tx_list, &peer->peer_activeq);
651
652                 kptllnd_tx_addref(tx);          /* 1 ref for me... */
653
654                 spin_unlock_irqrestore(&peer->peer_lock, flags);
655
656                 if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
657                     tx->tx_type == TX_TYPE_GET_REQUEST) {
658                         /* Post bulk now we have safe matchbits */
659                         rc = PtlMEAttach(kptllnd_data.kptl_nih,
660                                          *kptllnd_tunables.kptl_portal,
661                                          peer->peer_ptlid,
662                                          tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits,
663                                          0,             /* ignore bits */
664                                          PTL_UNLINK,
665                                          PTL_INS_BEFORE,
666                                          &meh);
667                         if (rc != PTL_OK) {
668                                 CERROR("PtlMEAttach(%s) failed: %s(%d)\n",
669                                        libcfs_id2str(peer->peer_id),
670                                        kptllnd_errtype2str(rc), rc);
671                                 goto failed;
672                         }
673
674                         rc = PtlMDAttach(meh, tx->tx_rdma_md, PTL_UNLINK,
675                                          &tx->tx_rdma_mdh);
676                         if (rc != PTL_OK) {
677                                 CERROR("PtlMDAttach(%s) failed: %s(%d)\n",
678                                        libcfs_id2str(tx->tx_peer->peer_id),
679                                        kptllnd_errtype2str(rc), rc);
680                                 rc = PtlMEUnlink(meh);
681                                 LASSERT(rc == PTL_OK);
682                                 tx->tx_rdma_mdh = PTL_INVALID_HANDLE;
683                                 goto failed;
684                         }
685                         /* I'm not racing with the event callback here.  It's a
686                          * bug if there's an event on the MD I just attached
687                          * before I actually send the RDMA request message -
688                          * probably matchbits re-used in error. */
689                 }
690
691                 tx->tx_tposted = jiffies;       /* going on the wire */
692
693                 rc = PtlPut (tx->tx_msg_mdh,
694                              tx->tx_acked ? PTL_ACK_REQ : PTL_NOACK_REQ,
695                              peer->peer_ptlid,
696                              *kptllnd_tunables.kptl_portal,
697                              0,                 /* acl cookie */
698                              LNET_MSG_MATCHBITS,
699                              0,                 /* offset */
700                              0);                /* header data */
701                 if (rc != PTL_OK) {
702                         CERROR("PtlPut %s error %s(%d)\n",
703                                libcfs_id2str(peer->peer_id),
704                                kptllnd_errtype2str(rc), rc);
705                         goto failed;
706                 }
707
708                 kptllnd_tx_decref(tx);          /* drop my ref */
709
710                 spin_lock_irqsave(&peer->peer_lock, flags);
711         }
712
713         spin_unlock_irqrestore(&peer->peer_lock, flags);
714         return;
715
716  failed:
717         /* Nuke everything (including tx we were trying) */
718         kptllnd_peer_close(peer, -EIO);
719         kptllnd_tx_decref(tx);
720 }
721
722 kptl_tx_t *
723 kptllnd_find_timed_out_tx(kptl_peer_t *peer)
724 {
725         kptl_tx_t         *tx;
726         struct list_head  *tmp;
727
728         list_for_each(tmp, &peer->peer_sendq) {
729                 tx = list_entry(peer->peer_sendq.next, kptl_tx_t, tx_list);
730
731                 if (time_after_eq(jiffies, tx->tx_deadline)) {
732                         kptllnd_tx_addref(tx);
733                         return tx;
734                 }
735         }
736
737         list_for_each(tmp, &peer->peer_activeq) {
738                 tx = list_entry(peer->peer_activeq.next, kptl_tx_t, tx_list);
739
740                 if (time_after_eq(jiffies, tx->tx_deadline)) {
741                         kptllnd_tx_addref(tx);
742                         return tx;
743                 }
744         }
745
746         return NULL;
747 }
748
749
750 void
751 kptllnd_peer_check_bucket (int idx, int stamp)
752 {
753         struct list_head  *peers = &kptllnd_data.kptl_peers[idx];
754         struct list_head  *ptmp;
755         kptl_peer_t       *peer;
756         kptl_tx_t         *tx;
757         unsigned long      flags;
758         int                nsend;
759         int                nactive;
760         int                check_sends;
761
762         CDEBUG(D_NET, "Bucket=%d, stamp=%d\n", idx, stamp);
763
764  again:
765         /* NB. Shared lock while I just look */
766         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
767
768         list_for_each (ptmp, peers) {
769                 peer = list_entry (ptmp, kptl_peer_t, peer_list);
770
771                 CDEBUG(D_NET, "Peer=%s Credits=%d Outstanding=%d Send=%d\n",
772                        libcfs_id2str(peer->peer_id), peer->peer_credits, 
773                        peer->peer_outstanding_credits, peer->peer_sent_credits);
774
775                 spin_lock(&peer->peer_lock);
776
777                 if (peer->peer_check_stamp == stamp) {
778                         /* checked already this pass */
779                         spin_unlock(&peer->peer_lock);
780                         continue;
781                 }
782
783                 peer->peer_check_stamp = stamp;
784                 tx = kptllnd_find_timed_out_tx(peer);
785                 check_sends = peer->peer_retry_noop;
786                 
787                 spin_unlock(&peer->peer_lock);
788                 
789                 if (tx == NULL && !check_sends)
790                         continue;
791
792                 kptllnd_peer_addref(peer); /* 1 ref for me... */
793
794                 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
795
796                 if (tx == NULL) { /* nothing timed out */
797                         kptllnd_peer_check_sends(peer);
798                         kptllnd_peer_decref(peer); /* ...until here or... */
799
800                         /* rescan after dropping the lock */
801                         goto again;
802                 }
803
804                 spin_lock_irqsave(&peer->peer_lock, flags);
805                 nsend = kptllnd_count_queue(&peer->peer_sendq);
806                 nactive = kptllnd_count_queue(&peer->peer_activeq);
807                 spin_unlock_irqrestore(&peer->peer_lock, flags);
808
809                 LCONSOLE_ERROR_MSG(0x126, "Timing out %s: %s\n",
810                                    libcfs_id2str(peer->peer_id),
811                                    (tx->tx_tposted == 0) ? 
812                                    "no free peer buffers" : 
813                                    "please check Portals");
814
815                 CERROR("%s timed out: cred %d outstanding %d, sent %d, "
816                        "sendq %d, activeq %d Tx %p %s (%s%s%s) status %d "
817                        "%sposted %lu T/O %ds\n",
818                        libcfs_id2str(peer->peer_id), peer->peer_credits,
819                        peer->peer_outstanding_credits, peer->peer_sent_credits,
820                        nsend, nactive, tx, kptllnd_tx_typestr(tx->tx_type),
821                        tx->tx_active ? "A" : "",
822                        PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ?
823                        "" : "M",
824                        PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE) ?
825                        "" : "D",
826                        tx->tx_status,
827                        (tx->tx_tposted == 0) ? "not " : "",
828                        (tx->tx_tposted == 0) ? 0UL : (jiffies - tx->tx_tposted),
829                        *kptllnd_tunables.kptl_timeout);
830
831                 kptllnd_dump_ptltrace();
832
833                 kptllnd_tx_decref(tx);
834
835                 kptllnd_peer_close(peer, -ETIMEDOUT);
836                 kptllnd_peer_decref(peer); /* ...until here */
837
838                 /* start again now I've dropped the lock */
839                 goto again;
840         }
841
842         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
843 }
844
845 kptl_peer_t *
846 kptllnd_id2peer_locked (lnet_process_id_t id)
847 {
848         struct list_head *peers = kptllnd_nid2peerlist(id.nid);
849         struct list_head *tmp;
850         kptl_peer_t      *peer;
851
852         list_for_each (tmp, peers) {
853
854                 peer = list_entry (tmp, kptl_peer_t, peer_list);
855
856                 LASSERT(peer->peer_state == PEER_STATE_WAITING_HELLO ||
857                         peer->peer_state == PEER_STATE_ACTIVE);
858                 
859                 if (peer->peer_id.nid != id.nid ||
860                     peer->peer_id.pid != id.pid)
861                         continue;
862
863                 kptllnd_peer_addref(peer);
864
865                 CDEBUG(D_NET, "%s -> %s (%d)\n",
866                        libcfs_id2str(id), 
867                        kptllnd_ptlid2str(peer->peer_ptlid),
868                        atomic_read (&peer->peer_refcount));
869                 return peer;
870         }
871
872         return NULL;
873 }
874
875 void
876 kptllnd_peertable_overflow_msg(char *str, lnet_process_id_t id)
877 {
878         LCONSOLE_ERROR_MSG(0x127, "%s %s overflows the peer table[%d]: "
879                            "messages may be dropped\n",
880                            str, libcfs_id2str(id),
881                            kptllnd_data.kptl_n_active_peers);
882         LCONSOLE_ERROR_MSG(0x128, "Please correct by increasing "
883                            "'max_nodes' or 'max_procs_per_node'\n");
884 }
885
886 __u64
887 kptllnd_get_last_seen_matchbits_locked(lnet_process_id_t lpid)
888 {
889         kptl_peer_t            *peer;
890         struct list_head       *tmp;
891
892         /* Find the last matchbits I saw this new peer using.  Note..
893            A. This peer cannot be in the peer table - she's new!
894            B. If I can't find the peer in the closing/zombie peers, all
895               matchbits are safe because all refs to the (old) peer have gone
896               so all txs have completed so there's no risk of matchbit
897               collision!
898          */
899
900         LASSERT(kptllnd_id2peer_locked(lpid) == NULL);
901
902         /* peer's last matchbits can't change after it comes out of the peer
903          * table, so first match is fine */
904
905         list_for_each (tmp, &kptllnd_data.kptl_closing_peers) {
906                 peer = list_entry (tmp, kptl_peer_t, peer_list);
907
908                 if (peer->peer_id.nid == lpid.nid &&
909                     peer->peer_id.pid == lpid.pid)
910                         return peer->peer_last_matchbits_seen;
911         }
912         
913         list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
914                 peer = list_entry (tmp, kptl_peer_t, peer_list);
915
916                 if (peer->peer_id.nid == lpid.nid &&
917                     peer->peer_id.pid == lpid.pid)
918                         return peer->peer_last_matchbits_seen;
919         }
920         
921         return PTL_RESERVED_MATCHBITS;
922 }
923
924 kptl_peer_t *
925 kptllnd_peer_handle_hello (ptl_process_id_t  initiator,
926                            kptl_msg_t       *msg)
927 {
928         rwlock_t           *g_lock = &kptllnd_data.kptl_peer_rw_lock;
929         kptl_peer_t        *peer;
930         kptl_peer_t        *new_peer;
931         lnet_process_id_t   lpid;
932         unsigned long       flags;
933         kptl_tx_t          *hello_tx;
934         int                 rc;
935         __u64               safe_matchbits;
936         __u64               last_matchbits_seen;
937
938         lpid.nid = msg->ptlm_srcnid;
939         lpid.pid = msg->ptlm_srcpid;
940
941         CDEBUG(D_NET, "hello from %s(%s)\n",
942                libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
943
944         if (initiator.pid != kptllnd_data.kptl_portals_id.pid &&
945             (msg->ptlm_srcpid & LNET_PID_USERFLAG) == 0) {
946                 /* If the peer's PID isn't _the_ ptllnd kernel pid, she must be
947                  * userspace.  Refuse the connection if she hasn't set the
948                  * correct flag in her PID... */
949                 CERROR("Userflag not set in hello from %s (%s)\n",
950                        libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
951                 return NULL;
952         }
953         
954         /* kptlhm_matchbits are the highest matchbits my peer may have used to
955          * RDMA to me.  I ensure I never register buffers for RDMA that could
956          * match any she used */
957         safe_matchbits = msg->ptlm_u.hello.kptlhm_matchbits + 1;
958
959         if (safe_matchbits < PTL_RESERVED_MATCHBITS) {
960                 CERROR("Illegal matchbits "LPX64" in HELLO from %s\n",
961                        safe_matchbits, libcfs_id2str(lpid));
962                 return NULL;
963         }
964         
965         if (msg->ptlm_u.hello.kptlhm_max_msg_size < PTLLND_MIN_BUFFER_SIZE) {
966                 CERROR("%s: max message size %d < MIN %d",
967                        libcfs_id2str(lpid),
968                        msg->ptlm_u.hello.kptlhm_max_msg_size,
969                        PTLLND_MIN_BUFFER_SIZE);
970                 return NULL;
971         }
972
973         if (msg->ptlm_credits <= 1) {
974                 CERROR("Need more than 1+%d credits from %s\n",
975                        msg->ptlm_credits, libcfs_id2str(lpid));
976                 return NULL;
977         }
978         
979         write_lock_irqsave(g_lock, flags);
980
981         peer = kptllnd_id2peer_locked(lpid);
982         if (peer != NULL) {
983                 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
984                         /* Completing HELLO handshake */
985                         LASSERT(peer->peer_incarnation == 0);
986
987                         if (msg->ptlm_dststamp != 0 &&
988                             msg->ptlm_dststamp != peer->peer_myincarnation) {
989                                 write_unlock_irqrestore(g_lock, flags);
990
991                                 CERROR("Ignoring HELLO from %s: unexpected "
992                                        "dststamp "LPX64" ("LPX64" wanted)\n",
993                                        libcfs_id2str(lpid),
994                                        msg->ptlm_dststamp,
995                                        peer->peer_myincarnation);
996                                 kptllnd_peer_decref(peer);
997                                 return NULL;
998                         }
999                         
1000                         /* Concurrent initiation or response to my HELLO */
1001                         peer->peer_state = PEER_STATE_ACTIVE;
1002                         peer->peer_incarnation = msg->ptlm_srcstamp;
1003                         peer->peer_next_matchbits = safe_matchbits;
1004                         peer->peer_max_msg_size =
1005                                 msg->ptlm_u.hello.kptlhm_max_msg_size;
1006                         
1007                         write_unlock_irqrestore(g_lock, flags);
1008                         return peer;
1009                 }
1010
1011                 if (msg->ptlm_dststamp != 0 &&
1012                     msg->ptlm_dststamp <= peer->peer_myincarnation) {
1013                         write_unlock_irqrestore(g_lock, flags);
1014
1015                         CERROR("Ignoring stale HELLO from %s: "
1016                                "dststamp "LPX64" (current "LPX64")\n",
1017                                libcfs_id2str(lpid),
1018                                msg->ptlm_dststamp,
1019                                peer->peer_myincarnation);
1020                         kptllnd_peer_decref(peer);
1021                         return NULL;
1022                 }
1023
1024                 /* Brand new connection attempt: remove old incarnation */
1025                 kptllnd_peer_close_locked(peer, 0);
1026         }
1027
1028         kptllnd_cull_peertable_locked(lpid);
1029
1030         write_unlock_irqrestore(g_lock, flags);
1031
1032         if (peer != NULL) {
1033                 CDEBUG(D_NET, "Peer %s (%s) reconnecting:"
1034                        " stamp "LPX64"("LPX64")\n",
1035                        libcfs_id2str(lpid), kptllnd_ptlid2str(initiator),
1036                        msg->ptlm_srcstamp, peer->peer_incarnation);
1037
1038                 kptllnd_peer_decref(peer);
1039         }
1040
1041         hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1042         if (hello_tx == NULL) {
1043                 CERROR("Unable to allocate HELLO message for %s\n",
1044                        libcfs_id2str(lpid));
1045                 return NULL;
1046         }
1047
1048         kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1049                          sizeof(kptl_hello_msg_t));
1050
1051         new_peer = kptllnd_peer_allocate(lpid, initiator);
1052         if (new_peer == NULL) {
1053                 kptllnd_tx_decref(hello_tx);
1054                 return NULL;
1055         }
1056
1057         rc = kptllnd_peer_reserve_buffers();
1058         if (rc != 0) {
1059                 kptllnd_peer_decref(new_peer);
1060                 kptllnd_tx_decref(hello_tx);
1061
1062                 CERROR("Failed to reserve buffers for %s\n",
1063                        libcfs_id2str(lpid));
1064                 return NULL;
1065         }
1066
1067         write_lock_irqsave(g_lock, flags);
1068  again:
1069         peer = kptllnd_id2peer_locked(lpid);
1070         if (peer != NULL) {
1071                 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
1072                         /* An outgoing message instantiated 'peer' for me */
1073                         LASSERT(peer->peer_incarnation == 0);
1074
1075                         peer->peer_state = PEER_STATE_ACTIVE;
1076                         peer->peer_incarnation = msg->ptlm_srcstamp;
1077                         peer->peer_next_matchbits = safe_matchbits;
1078                         peer->peer_max_msg_size =
1079                                 msg->ptlm_u.hello.kptlhm_max_msg_size;
1080
1081                         write_unlock_irqrestore(g_lock, flags);
1082
1083                         CWARN("Outgoing instantiated peer %s\n",
1084                               libcfs_id2str(lpid));
1085                 } else {
1086                         LASSERT (peer->peer_state == PEER_STATE_ACTIVE);
1087
1088                         write_unlock_irqrestore(g_lock, flags);
1089
1090                         /* WOW!  Somehow this peer completed the HELLO
1091                          * handshake while I slept.  I guess I could have slept
1092                          * while it rebooted and sent a new HELLO, so I'll fail
1093                          * this one... */
1094                         CWARN("Wow! peer %s\n", libcfs_id2str(lpid));
1095                         kptllnd_peer_decref(peer);
1096                         peer = NULL;
1097                 }
1098
1099                 kptllnd_peer_unreserve_buffers();
1100                 kptllnd_peer_decref(new_peer);
1101                 kptllnd_tx_decref(hello_tx);
1102                 return peer;
1103         }
1104
1105         if (kptllnd_data.kptl_n_active_peers ==
1106             kptllnd_data.kptl_expected_peers) {
1107                 /* peer table full */
1108                 write_unlock_irqrestore(g_lock, flags);
1109
1110                 kptllnd_peertable_overflow_msg("Connection from ", lpid);
1111
1112                 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1113                 if (rc != 0) {
1114                         CERROR("Refusing connection from %s\n",
1115                                libcfs_id2str(lpid));
1116                         kptllnd_peer_unreserve_buffers();
1117                         kptllnd_peer_decref(new_peer);
1118                         kptllnd_tx_decref(hello_tx);
1119                         return NULL;
1120                 }
1121                 
1122                 write_lock_irqsave(g_lock, flags);
1123                 kptllnd_data.kptl_expected_peers++;
1124                 goto again;
1125         }
1126
1127         last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(lpid);
1128
1129         hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1130         hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1131                 *kptllnd_tunables.kptl_max_msg_size;
1132
1133         new_peer->peer_state = PEER_STATE_ACTIVE;
1134         new_peer->peer_incarnation = msg->ptlm_srcstamp;
1135         new_peer->peer_next_matchbits = safe_matchbits;
1136         new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1137         new_peer->peer_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size;
1138
1139         kptllnd_peer_add_peertable_locked(new_peer);
1140
1141         write_unlock_irqrestore(g_lock, flags);
1142
1143         /* NB someone else could get in now and post a message before I post
1144          * the HELLO, but post_tx/check_sends take care of that! */
1145
1146         CDEBUG(D_NETTRACE, "%s: post response hello %p\n",
1147                libcfs_id2str(new_peer->peer_id), hello_tx);
1148
1149         kptllnd_post_tx(new_peer, hello_tx, 0);
1150         kptllnd_peer_check_sends(new_peer);
1151
1152         return new_peer;
1153 }
1154
1155 void
1156 kptllnd_tx_launch(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
1157 {
1158         kptllnd_post_tx(peer, tx, nfrag);
1159         kptllnd_peer_check_sends(peer);
1160 }
1161
1162 int
1163 kptllnd_find_target(kptl_peer_t **peerp, lnet_process_id_t target)
1164 {
1165         rwlock_t         *g_lock = &kptllnd_data.kptl_peer_rw_lock;
1166         ptl_process_id_t  ptl_id;
1167         kptl_peer_t      *new_peer;
1168         kptl_tx_t        *hello_tx;
1169         unsigned long     flags;
1170         int               rc;
1171         __u64             last_matchbits_seen;
1172
1173         /* I expect to find the peer, so I only take a read lock... */
1174         read_lock_irqsave(g_lock, flags);
1175         *peerp = kptllnd_id2peer_locked(target);
1176         read_unlock_irqrestore(g_lock, flags);
1177
1178         if (*peerp != NULL)
1179                 return 0;
1180         
1181         if ((target.pid & LNET_PID_USERFLAG) != 0) {
1182                 CWARN("Refusing to create a new connection to %s "
1183                       "(non-kernel peer)\n", libcfs_id2str(target));
1184                 return -EHOSTUNREACH;
1185         }
1186
1187         /* The new peer is a kernel ptllnd, and kernel ptllnds all have
1188          * the same portals PID */
1189         ptl_id.nid = kptllnd_lnet2ptlnid(target.nid);
1190         ptl_id.pid = kptllnd_data.kptl_portals_id.pid;
1191
1192         hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1193         if (hello_tx == NULL) {
1194                 CERROR("Unable to allocate connect message for %s\n",
1195                        libcfs_id2str(target));
1196                 return -ENOMEM;
1197         }
1198
1199         kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1200                          sizeof(kptl_hello_msg_t));
1201
1202         new_peer = kptllnd_peer_allocate(target, ptl_id);
1203         if (new_peer == NULL) {
1204                 rc = -ENOMEM;
1205                 goto unwind_0;
1206         }
1207
1208         rc = kptllnd_peer_reserve_buffers();
1209         if (rc != 0)
1210                 goto unwind_1;
1211
1212         write_lock_irqsave(g_lock, flags);
1213  again:
1214         *peerp = kptllnd_id2peer_locked(target);
1215         if (*peerp != NULL) {
1216                 write_unlock_irqrestore(g_lock, flags);
1217                 goto unwind_2;
1218         }
1219
1220         kptllnd_cull_peertable_locked(target);
1221
1222         if (kptllnd_data.kptl_n_active_peers ==
1223             kptllnd_data.kptl_expected_peers) {
1224                 /* peer table full */
1225                 write_unlock_irqrestore(g_lock, flags);
1226
1227                 kptllnd_peertable_overflow_msg("Connection to ", target);
1228
1229                 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1230                 if (rc != 0) {
1231                         CERROR("Can't create connection to %s\n",
1232                                libcfs_id2str(target));
1233                         rc = -ENOMEM;
1234                         goto unwind_2;
1235                 }
1236                 write_lock_irqsave(g_lock, flags);
1237                 kptllnd_data.kptl_expected_peers++;
1238                 goto again;
1239         }
1240
1241         last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(target);
1242
1243         hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1244         hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1245                 *kptllnd_tunables.kptl_max_msg_size;
1246                 
1247         new_peer->peer_state = PEER_STATE_WAITING_HELLO;
1248         new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1249         
1250         kptllnd_peer_add_peertable_locked(new_peer);
1251
1252         write_unlock_irqrestore(g_lock, flags);
1253
1254         /* NB someone else could get in now and post a message before I post
1255          * the HELLO, but post_tx/check_sends take care of that! */
1256
1257         CDEBUG(D_NETTRACE, "%s: post initial hello %p\n",
1258                libcfs_id2str(new_peer->peer_id), hello_tx);
1259
1260         kptllnd_post_tx(new_peer, hello_tx, 0);
1261         kptllnd_peer_check_sends(new_peer);
1262        
1263         *peerp = new_peer;
1264         return 0;
1265         
1266  unwind_2:
1267         kptllnd_peer_unreserve_buffers();
1268  unwind_1:
1269         kptllnd_peer_decref(new_peer);
1270  unwind_0:
1271         kptllnd_tx_decref(hello_tx);
1272
1273         return rc;
1274 }