Whamcloud - gitweb
fc52df75e6db22a0f0ae893f54a279e2be153394
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd_peer.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved.
5  *   Author: PJ Kirner <pjkirner@clusterfs.com>
6  *           E Barton <eeb@bartonsoftware.com>
7  *
8  *   This file is part of the Lustre file system, http://www.lustre.org
9  *   Lustre is a trademark of Cluster File Systems, Inc.
10  *
11  *   This file is confidential source code owned by Cluster File Systems.
12  *   No viewing, modification, compilation, redistribution, or any other
13  *   form of use is permitted except through a signed license agreement.
14  *
15  *   If you have not signed such an agreement, then you have no rights to
16  *   this file.  Please destroy it immediately and contact CFS.
17  *
18  */
19
20 #include "ptllnd.h"
21 #include <libcfs/list.h>
22
23 static int
24 kptllnd_count_queue(struct list_head *q)
25 {
26         struct list_head *e;
27         int               n = 0;
28         
29         list_for_each(e, q) {
30                 n++;
31         }
32
33         return n;
34 }
35
36 int
37 kptllnd_get_peer_info(int index, 
38                       lnet_process_id_t *id,
39                       int *state, int *sent_hello,
40                       int *refcount, __u64 *incarnation,
41                       __u64 *next_matchbits, __u64 *last_matchbits_seen,
42                       int *nsendq, int *nactiveq,
43                       int *credits, int *outstanding_credits) 
44 {
45         rwlock_t         *g_lock = &kptllnd_data.kptl_peer_rw_lock;
46         unsigned long     flags;
47         struct list_head *ptmp;
48         kptl_peer_t      *peer;
49         int               i;
50         int               rc = -ENOENT;
51
52         read_lock_irqsave(g_lock, flags);
53
54         for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++) {
55                 
56                 list_for_each (ptmp, &kptllnd_data.kptl_peers[i]) {
57                         peer = list_entry(ptmp, kptl_peer_t, peer_list);
58
59                         if (index-- > 0)
60                                 continue;
61                         
62                         *id          = peer->peer_id;
63                         *state       = peer->peer_state;
64                         *sent_hello  = peer->peer_sent_hello;
65                         *refcount    = atomic_read(&peer->peer_refcount);
66                         *incarnation = peer->peer_incarnation;
67
68                         spin_lock(&peer->peer_lock);
69
70                         *next_matchbits      = peer->peer_next_matchbits;
71                         *last_matchbits_seen = peer->peer_last_matchbits_seen;
72                         *credits             = peer->peer_credits;
73                         *outstanding_credits = peer->peer_outstanding_credits;
74
75                         *nsendq   = kptllnd_count_queue(&peer->peer_sendq);
76                         *nactiveq = kptllnd_count_queue(&peer->peer_activeq);
77
78                         spin_unlock(&peer->peer_lock);
79
80                         rc = 0;
81                         goto out;
82                 }
83         }
84         
85  out:
86         read_unlock_irqrestore(g_lock, flags);
87         return rc;
88 }
89
90 void
91 kptllnd_peer_add_peertable_locked (kptl_peer_t *peer)
92 {
93         LASSERT (kptllnd_data.kptl_n_active_peers <
94                  kptllnd_data.kptl_expected_peers);
95
96         LASSERT (peer->peer_state == PEER_STATE_WAITING_HELLO ||
97                  peer->peer_state == PEER_STATE_ACTIVE);
98         
99         kptllnd_data.kptl_n_active_peers++;
100         atomic_inc(&peer->peer_refcount);       /* +1 ref for the list */
101
102         /* NB add to HEAD of peer list for MRU order!
103          * (see kptllnd_cull_peertable) */
104         list_add(&peer->peer_list, kptllnd_nid2peerlist(peer->peer_id.nid));
105 }
106
107 void
108 kptllnd_cull_peertable_locked (lnet_process_id_t pid)
109 {
110         /* I'm about to add a new peer with this portals ID to the peer table,
111          * so (a) this peer should not exist already and (b) I want to leave at
112          * most (max_procs_per_nid - 1) peers with this NID in the table. */
113         struct list_head  *peers = kptllnd_nid2peerlist(pid.nid);
114         int                cull_count = *kptllnd_tunables.kptl_max_procs_per_node;
115         int                count;
116         struct list_head  *tmp;
117         struct list_head  *nxt;
118         kptl_peer_t       *peer;
119         
120         count = 0;
121         list_for_each_safe (tmp, nxt, peers) {
122                 /* NB I rely on kptllnd_peer_add_peertable_locked to add peers
123                  * in MRU order */
124                 peer = list_entry(tmp, kptl_peer_t, peer_list);
125                         
126                 if (peer->peer_id.nid != pid.nid)
127                         continue;
128
129                 LASSERT (peer->peer_id.pid != pid.pid);
130                         
131                 count++;
132
133                 if (count < cull_count) /* recent (don't cull) */
134                         continue;
135
136                 CDEBUG(D_NET, "Cull %s(%s)\n",
137                        libcfs_id2str(peer->peer_id),
138                        kptllnd_ptlid2str(peer->peer_ptlid));
139                 
140                 kptllnd_peer_close_locked(peer, 0);
141         }
142 }
143
144 kptl_peer_t *
145 kptllnd_peer_allocate (lnet_process_id_t lpid, ptl_process_id_t ppid)
146 {
147         unsigned long    flags;
148         kptl_peer_t     *peer;
149
150         LIBCFS_ALLOC(peer, sizeof (*peer));
151         if (peer == NULL) {
152                 CERROR("Can't create peer %s (%s)\n",
153                        libcfs_id2str(lpid), 
154                        kptllnd_ptlid2str(ppid));
155                 return NULL;
156         }
157
158         memset(peer, 0, sizeof(*peer));         /* zero flags etc */
159
160         INIT_LIST_HEAD (&peer->peer_sendq);
161         INIT_LIST_HEAD (&peer->peer_activeq);
162         spin_lock_init (&peer->peer_lock);
163
164         peer->peer_state = PEER_STATE_ALLOCATED;
165         peer->peer_error = 0;
166         peer->peer_last_alive = cfs_time_current();
167         peer->peer_id = lpid;
168         peer->peer_ptlid = ppid;
169         peer->peer_credits = 1;                 /* enough for HELLO */
170         peer->peer_next_matchbits = PTL_RESERVED_MATCHBITS;
171         peer->peer_outstanding_credits = *kptllnd_tunables.kptl_peercredits - 1;
172         peer->peer_sent_credits = 1;           /* HELLO credit is implicit */
173         peer->peer_max_msg_size = PTLLND_MIN_BUFFER_SIZE; /* until we know better */
174
175         atomic_set(&peer->peer_refcount, 1);    /* 1 ref for caller */
176
177         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
178
179         peer->peer_myincarnation = kptllnd_data.kptl_incarnation;
180
181         /* Only increase # peers under lock, to guarantee we dont grow it
182          * during shutdown */
183         if (kptllnd_data.kptl_shutdown) {
184                 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, 
185                                         flags);
186                 LIBCFS_FREE(peer, sizeof(*peer));
187                 return NULL;
188         }
189
190         kptllnd_data.kptl_npeers++;
191         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
192         
193         return peer;
194 }
195
196 void
197 kptllnd_peer_destroy (kptl_peer_t *peer)
198 {
199         unsigned long flags;
200         
201         CDEBUG(D_NET, "Peer=%p\n", peer);
202
203         LASSERT (!in_interrupt());
204         LASSERT (atomic_read(&peer->peer_refcount) == 0);
205         LASSERT (peer->peer_state == PEER_STATE_ALLOCATED ||
206                  peer->peer_state == PEER_STATE_ZOMBIE);
207         LASSERT (list_empty(&peer->peer_sendq));
208         LASSERT (list_empty(&peer->peer_activeq));
209
210         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
211
212         if (peer->peer_state == PEER_STATE_ZOMBIE)
213                 list_del(&peer->peer_list);
214
215         kptllnd_data.kptl_npeers--;
216
217         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
218
219         LIBCFS_FREE (peer, sizeof (*peer));
220 }
221
222 void
223 kptllnd_cancel_txlist (struct list_head *peerq, struct list_head *txs)
224 {
225         struct list_head  *tmp;
226         struct list_head  *nxt;
227         kptl_tx_t         *tx;
228
229         list_for_each_safe (tmp, nxt, peerq) {
230                 tx = list_entry(tmp, kptl_tx_t, tx_list);
231
232                 list_del(&tx->tx_list);
233                 list_add_tail(&tx->tx_list, txs);
234
235                 tx->tx_status = -EIO;
236                 tx->tx_active = 0;
237         }
238 }
239
240 void
241 kptllnd_peer_cancel_txs(kptl_peer_t *peer, struct list_head *txs)
242 {
243         unsigned long   flags;
244
245         spin_lock_irqsave(&peer->peer_lock, flags);
246
247         kptllnd_cancel_txlist(&peer->peer_sendq, txs);
248         kptllnd_cancel_txlist(&peer->peer_activeq, txs);
249                 
250         spin_unlock_irqrestore(&peer->peer_lock, flags);
251 }
252
253 void
254 kptllnd_peer_alive (kptl_peer_t *peer)
255 {
256         /* This is racy, but everyone's only writing cfs_time_current() */
257         peer->peer_last_alive = cfs_time_current();
258         mb();
259 }
260
261 void
262 kptllnd_peer_notify (kptl_peer_t *peer)
263 {
264         unsigned long flags;
265         time_t        last_alive = 0;
266         int           error = 0;
267         
268         spin_lock_irqsave(&peer->peer_lock, flags);
269
270         if (peer->peer_error != 0) {
271                 error = peer->peer_error;
272                 peer->peer_error = 0;
273                 
274                 last_alive = cfs_time_current_sec() - 
275                              cfs_duration_sec(cfs_time_current() - 
276                                               peer->peer_last_alive);
277         }
278         
279         spin_unlock_irqrestore(&peer->peer_lock, flags);
280
281         if (error != 0)
282                 lnet_notify (kptllnd_data.kptl_ni, peer->peer_id.nid, 0,
283                              last_alive);
284 }
285
286 void
287 kptllnd_handle_closing_peers ()
288 {
289         unsigned long           flags;
290         struct list_head        txs;
291         kptl_peer_t            *peer;
292         struct list_head       *tmp;
293         struct list_head       *nxt;
294         kptl_tx_t              *tx;
295         int                     idle;
296
297         /* Check with a read lock first to avoid blocking anyone */
298
299         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
300         idle = list_empty(&kptllnd_data.kptl_closing_peers) &&
301                list_empty(&kptllnd_data.kptl_zombie_peers);
302         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
303
304         if (idle)
305                 return;
306
307         INIT_LIST_HEAD(&txs);
308
309         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
310
311         /* Cancel txs on all zombie peers.  NB anyone dropping the last peer
312          * ref removes it from this list, so I musn't drop the lock while
313          * scanning it. */
314         list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
315                 peer = list_entry (tmp, kptl_peer_t, peer_list);
316
317                 LASSERT (peer->peer_state == PEER_STATE_ZOMBIE);
318
319                 kptllnd_peer_cancel_txs(peer, &txs);
320         }
321
322         /* Notify LNET and cancel txs on closing (i.e. newly closed) peers.  NB
323          * I'm the only one removing from this list, but peers can be added on
324          * the end any time I drop the lock. */
325
326         list_for_each_safe (tmp, nxt, &kptllnd_data.kptl_closing_peers) {
327                 peer = list_entry (tmp, kptl_peer_t, peer_list);
328
329                 LASSERT (peer->peer_state == PEER_STATE_CLOSING);
330
331                 list_del(&peer->peer_list);
332                 list_add_tail(&peer->peer_list,
333                               &kptllnd_data.kptl_zombie_peers);
334                 peer->peer_state = PEER_STATE_ZOMBIE;
335
336                 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
337
338                 kptllnd_peer_notify(peer);
339                 kptllnd_peer_cancel_txs(peer, &txs);
340                 kptllnd_peer_decref(peer);
341
342                 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
343         }
344
345         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
346
347         /* Drop peer's ref on all cancelled txs.  This will get
348          * kptllnd_tx_fini() to abort outstanding comms if necessary. */
349
350         list_for_each_safe (tmp, nxt, &txs) {
351                 tx = list_entry(tmp, kptl_tx_t, tx_list);
352                 list_del(&tx->tx_list);
353                 kptllnd_tx_decref(tx);
354         }
355 }
356
357 void
358 kptllnd_peer_close_locked(kptl_peer_t *peer, int why)
359 {
360         switch (peer->peer_state) {
361         default:
362                 LBUG();
363
364         case PEER_STATE_WAITING_HELLO:
365         case PEER_STATE_ACTIVE:
366                 /* Ensure new peers see a new incarnation of me */
367                 LASSERT(peer->peer_myincarnation <= kptllnd_data.kptl_incarnation);
368                 if (peer->peer_myincarnation == kptllnd_data.kptl_incarnation)
369                         kptllnd_data.kptl_incarnation++;
370
371                 /* Removing from peer table */
372                 kptllnd_data.kptl_n_active_peers--;
373                 LASSERT (kptllnd_data.kptl_n_active_peers >= 0);
374
375                 list_del(&peer->peer_list);
376                 kptllnd_peer_unreserve_buffers();
377
378                 peer->peer_error = why; /* stash 'why' only on first close */
379                 peer->peer_state = PEER_STATE_CLOSING;
380
381                 /* Schedule for immediate attention, taking peer table's ref */
382                 list_add_tail(&peer->peer_list, 
383                               &kptllnd_data.kptl_closing_peers);
384                 wake_up(&kptllnd_data.kptl_watchdog_waitq);
385                 break;
386
387         case PEER_STATE_ZOMBIE:
388         case PEER_STATE_CLOSING:
389                 break;
390         }
391 }
392
393 void
394 kptllnd_peer_close(kptl_peer_t *peer, int why)
395 {
396         unsigned long      flags;
397
398         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
399         kptllnd_peer_close_locked(peer, why);
400         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
401 }
402
403 int
404 kptllnd_peer_del(lnet_process_id_t id)
405 {
406         struct list_head  *ptmp;
407         struct list_head  *pnxt;
408         kptl_peer_t       *peer;
409         int                lo;
410         int                hi;
411         int                i;
412         unsigned long      flags;
413         int                rc = -ENOENT;
414
415         /*
416          * Find the single bucket we are supposed to look at or if nid is a
417          * wildcard (LNET_NID_ANY) then look at all of the buckets
418          */
419         if (id.nid != LNET_NID_ANY) {
420                 struct list_head *l = kptllnd_nid2peerlist(id.nid);
421                 
422                 lo = hi =  l - kptllnd_data.kptl_peers;
423         } else {
424                 if (id.pid != LNET_PID_ANY)
425                         return -EINVAL;
426                 
427                 lo = 0;
428                 hi = kptllnd_data.kptl_peer_hash_size - 1;
429         }
430
431 again:
432         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
433
434         for (i = lo; i <= hi; i++) {
435                 list_for_each_safe (ptmp, pnxt, &kptllnd_data.kptl_peers[i]) {
436                         peer = list_entry (ptmp, kptl_peer_t, peer_list);
437
438                         if (!(id.nid == LNET_NID_ANY || 
439                               (peer->peer_id.nid == id.nid &&
440                                (id.pid == LNET_PID_ANY || 
441                                 peer->peer_id.pid == id.pid))))
442                                 continue;
443
444                         kptllnd_peer_addref(peer); /* 1 ref for me... */
445
446                         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
447                                                flags);
448
449                         kptllnd_peer_close(peer, 0);
450                         kptllnd_peer_decref(peer); /* ...until here */
451
452                         rc = 0;         /* matched something */
453
454                         /* start again now I've dropped the lock */
455                         goto again;
456                 }
457         }
458
459         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
460
461         return (rc);
462 }
463
464 void
465 kptllnd_post_tx(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
466 {
467         /* CAVEAT EMPTOR: I take over caller's ref on 'tx' */
468         ptl_handle_md_t  msg_mdh;
469         ptl_md_t         md;
470         ptl_err_t        prc;
471         unsigned long    flags;
472
473         LASSERT (!tx->tx_idle);
474         LASSERT (!tx->tx_active);
475         LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
476         LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
477         LASSERT (tx->tx_type == TX_TYPE_SMALL_MESSAGE ||
478                  tx->tx_type == TX_TYPE_PUT_REQUEST ||
479                  tx->tx_type == TX_TYPE_GET_REQUEST);
480
481         kptllnd_set_tx_peer(tx, peer);
482
483         memset(&md, 0, sizeof(md));
484
485         md.threshold = tx->tx_acked ? 2 : 1;    /* SEND END + ACK? */
486         md.options = PTL_MD_OP_PUT |
487                      PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
488                      PTL_MD_EVENT_START_DISABLE;
489         md.user_ptr = &tx->tx_msg_eventarg;
490         md.eq_handle = kptllnd_data.kptl_eqh;
491
492         if (nfrag == 0) {
493                 md.start = tx->tx_msg;
494                 md.length = tx->tx_msg->ptlm_nob;
495         } else {
496                 LASSERT (nfrag > 1);
497                 LASSERT (tx->tx_frags->iov[0].iov_base == (void *)tx->tx_msg);
498
499                 md.start = tx->tx_frags;
500                 md.length = nfrag;
501                 md.options |= PTL_MD_IOVEC;
502         }
503
504         prc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &msg_mdh);
505         if (prc != PTL_OK) {
506                 CERROR("PtlMDBind(%s) failed: %s(%d)\n",
507                        libcfs_id2str(peer->peer_id),
508                        kptllnd_errtype2str(prc), prc);
509                 tx->tx_status = -EIO;
510                 kptllnd_tx_decref(tx);
511                 return;
512         }
513         
514         spin_lock_irqsave(&peer->peer_lock, flags);
515
516         tx->tx_deadline = jiffies + (*kptllnd_tunables.kptl_timeout * HZ);
517         tx->tx_active = 1;
518         tx->tx_msg_mdh = msg_mdh;
519
520         /* Ensure HELLO is sent first */
521         if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_HELLO)
522                 list_add(&tx->tx_list, &peer->peer_sendq);
523         else
524                 list_add_tail(&tx->tx_list, &peer->peer_sendq);
525
526         spin_unlock_irqrestore(&peer->peer_lock, flags);
527 }
528
529 void
530 kptllnd_peer_check_sends (kptl_peer_t *peer)
531 {
532         ptl_handle_me_t  meh;
533         kptl_tx_t       *tx;
534         int              rc;
535         unsigned long    flags;
536
537         LASSERT(!in_interrupt());
538
539         spin_lock_irqsave(&peer->peer_lock, flags);
540
541         peer->peer_retry_noop = 0;
542
543         if (list_empty(&peer->peer_sendq) &&
544             peer->peer_outstanding_credits >= PTLLND_CREDIT_HIGHWATER &&
545             peer->peer_credits != 0) {
546
547                 /* post a NOOP to return credits */
548                 spin_unlock_irqrestore(&peer->peer_lock, flags);
549
550                 tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
551                 if (tx == NULL) {
552                         CERROR("Can't return credits to %s: can't allocate descriptor\n",
553                                libcfs_id2str(peer->peer_id));
554                 } else {
555                         kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_NOOP, 0);
556                         kptllnd_post_tx(peer, tx, 0);
557                 }
558
559                 spin_lock_irqsave(&peer->peer_lock, flags);
560                 peer->peer_retry_noop = (tx == NULL);
561         }
562
563         while (!list_empty(&peer->peer_sendq)) {
564                 tx = list_entry (peer->peer_sendq.next, kptl_tx_t, tx_list);
565
566                 LASSERT (tx->tx_active);
567                 LASSERT (!PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
568                 LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
569
570                 LASSERT (peer->peer_outstanding_credits >= 0);
571                 LASSERT (peer->peer_sent_credits >= 0);
572                 LASSERT (peer->peer_sent_credits +
573                          peer->peer_outstanding_credits <=
574                          *kptllnd_tunables.kptl_peercredits);
575                 LASSERT (peer->peer_credits >= 0);
576
577                 /* Ensure HELLO is sent first */
578                 if (!peer->peer_sent_hello) {
579                         if (tx->tx_msg->ptlm_type != PTLLND_MSG_TYPE_HELLO)
580                                 break;
581                         peer->peer_sent_hello = 1;
582                 }
583
584                 if (peer->peer_credits == 0) {
585                         CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: no credits for %p\n",
586                                libcfs_id2str(peer->peer_id), 
587                                peer->peer_credits,
588                                peer->peer_outstanding_credits, 
589                                peer->peer_sent_credits, tx);
590                         break;
591                 }
592
593                 /* Don't use the last credit unless I've got credits to
594                  * return */
595                 if (peer->peer_credits == 1 &&
596                     peer->peer_outstanding_credits == 0) {
597                         CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: "
598                                "not using last credit for %p\n",
599                                libcfs_id2str(peer->peer_id), 
600                                peer->peer_credits,
601                                peer->peer_outstanding_credits,
602                                peer->peer_sent_credits, tx);
603                         break;
604                 }
605
606                 list_del(&tx->tx_list);
607
608                 /* Discard any NOOP I queued if I'm not at the high-water mark
609                  * any more or more messages have been queued */
610                 if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_NOOP &&
611                     (!list_empty(&peer->peer_sendq) ||
612                      peer->peer_outstanding_credits < PTLLND_CREDIT_HIGHWATER)) {
613
614                         tx->tx_active = 0;
615
616                         spin_unlock_irqrestore(&peer->peer_lock, flags);
617
618                         CDEBUG(D_NET, "%s: redundant noop\n", 
619                                libcfs_id2str(peer->peer_id));
620                         kptllnd_tx_decref(tx);
621
622                         spin_lock_irqsave(&peer->peer_lock, flags);
623                         continue;
624                 }
625
626                 /* fill last-minute msg fields */
627                 kptllnd_msg_pack(tx->tx_msg, peer);
628
629                 if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
630                     tx->tx_type == TX_TYPE_GET_REQUEST) {
631                         /* peer_next_matchbits must be known good */
632                         LASSERT (peer->peer_state >= PEER_STATE_ACTIVE);
633                         /* Assume 64-bit matchbits can't wrap */
634                         LASSERT (peer->peer_next_matchbits >= PTL_RESERVED_MATCHBITS);
635                         tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits =
636                                 peer->peer_next_matchbits++;
637                 }
638                 
639                 peer->peer_sent_credits += peer->peer_outstanding_credits;
640                 peer->peer_outstanding_credits = 0;
641                 peer->peer_credits--;
642
643                 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: %s tx=%p nob=%d cred=%d\n",
644                        libcfs_id2str(peer->peer_id), peer->peer_credits,
645                        peer->peer_outstanding_credits, peer->peer_sent_credits,
646                        kptllnd_msgtype2str(tx->tx_msg->ptlm_type),
647                        tx, tx->tx_msg->ptlm_nob,
648                        tx->tx_msg->ptlm_credits);
649
650                 list_add_tail(&tx->tx_list, &peer->peer_activeq);
651
652                 kptllnd_tx_addref(tx);          /* 1 ref for me... */
653
654                 spin_unlock_irqrestore(&peer->peer_lock, flags);
655
656                 if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
657                     tx->tx_type == TX_TYPE_GET_REQUEST) {
658                         /* Post bulk now we have safe matchbits */
659                         rc = PtlMEAttach(kptllnd_data.kptl_nih,
660                                          *kptllnd_tunables.kptl_portal,
661                                          peer->peer_ptlid,
662                                          tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits,
663                                          0,             /* ignore bits */
664                                          PTL_UNLINK,
665                                          PTL_INS_BEFORE,
666                                          &meh);
667                         if (rc != PTL_OK) {
668                                 CERROR("PtlMEAttach(%s) failed: %s(%d)\n",
669                                        libcfs_id2str(peer->peer_id),
670                                        kptllnd_errtype2str(rc), rc);
671                                 goto failed;
672                         }
673
674                         rc = PtlMDAttach(meh, tx->tx_rdma_md, PTL_UNLINK,
675                                          &tx->tx_rdma_mdh);
676                         if (rc != PTL_OK) {
677                                 CERROR("PtlMDAttach(%s) failed: %s(%d)\n",
678                                        libcfs_id2str(tx->tx_peer->peer_id),
679                                        kptllnd_errtype2str(rc), rc);
680                                 rc = PtlMEUnlink(meh);
681                                 LASSERT(rc == PTL_OK);
682                                 tx->tx_rdma_mdh = PTL_INVALID_HANDLE;
683                                 goto failed;
684                         }
685                         /* I'm not racing with the event callback here.  It's a
686                          * bug if there's an event on the MD I just attached
687                          * before I actually send the RDMA request message -
688                          * probably matchbits re-used in error. */
689                 }
690
691                 tx->tx_tposted = jiffies;       /* going on the wire */
692
693                 rc = PtlPut (tx->tx_msg_mdh,
694                              tx->tx_acked ? PTL_ACK_REQ : PTL_NOACK_REQ,
695                              peer->peer_ptlid,
696                              *kptllnd_tunables.kptl_portal,
697                              0,                 /* acl cookie */
698                              LNET_MSG_MATCHBITS,
699                              0,                 /* offset */
700                              0);                /* header data */
701                 if (rc != PTL_OK) {
702                         CERROR("PtlPut %s error %s(%d)\n",
703                                libcfs_id2str(peer->peer_id),
704                                kptllnd_errtype2str(rc), rc);
705                         goto failed;
706                 }
707
708                 kptllnd_tx_decref(tx);          /* drop my ref */
709
710                 spin_lock_irqsave(&peer->peer_lock, flags);
711         }
712
713         spin_unlock_irqrestore(&peer->peer_lock, flags);
714         return;
715
716  failed:
717         /* Nuke everything (including tx we were trying) */
718         kptllnd_peer_close(peer, -EIO);
719         kptllnd_tx_decref(tx);
720 }
721
722 kptl_tx_t *
723 kptllnd_find_timed_out_tx(kptl_peer_t *peer)
724 {
725         kptl_tx_t         *tx;
726         struct list_head  *tmp;
727
728         list_for_each(tmp, &peer->peer_sendq) {
729                 tx = list_entry(peer->peer_sendq.next, kptl_tx_t, tx_list);
730
731                 if (time_after_eq(jiffies, tx->tx_deadline)) {
732                         kptllnd_tx_addref(tx);
733                         return tx;
734                 }
735         }
736
737         list_for_each(tmp, &peer->peer_activeq) {
738                 tx = list_entry(peer->peer_activeq.next, kptl_tx_t, tx_list);
739
740                 if (time_after_eq(jiffies, tx->tx_deadline)) {
741                         kptllnd_tx_addref(tx);
742                         return tx;
743                 }
744         }
745
746         return NULL;
747 }
748
749
750 void
751 kptllnd_peer_check_bucket (int idx, int stamp)
752 {
753         struct list_head  *peers = &kptllnd_data.kptl_peers[idx];
754         struct list_head  *ptmp;
755         kptl_peer_t       *peer;
756         kptl_tx_t         *tx;
757         unsigned long      flags;
758         int                nsend;
759         int                nactive;
760         int                check_sends;
761
762         CDEBUG(D_NET, "Bucket=%d, stamp=%d\n", idx, stamp);
763
764  again:
765         /* NB. Shared lock while I just look */
766         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
767
768         list_for_each (ptmp, peers) {
769                 peer = list_entry (ptmp, kptl_peer_t, peer_list);
770
771                 CDEBUG(D_NET, "Peer=%s Credits=%d Outstanding=%d Send=%d\n",
772                        libcfs_id2str(peer->peer_id), peer->peer_credits, 
773                        peer->peer_outstanding_credits, peer->peer_sent_credits);
774
775                 spin_lock(&peer->peer_lock);
776
777                 if (peer->peer_check_stamp == stamp) {
778                         /* checked already this pass */
779                         spin_unlock(&peer->peer_lock);
780                         continue;
781                 }
782
783                 peer->peer_check_stamp = stamp;
784                 tx = kptllnd_find_timed_out_tx(peer);
785                 check_sends = peer->peer_retry_noop;
786                 
787                 spin_unlock(&peer->peer_lock);
788                 
789                 if (tx == NULL && !check_sends)
790                         continue;
791
792                 kptllnd_peer_addref(peer); /* 1 ref for me... */
793
794                 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
795
796                 if (tx == NULL) { /* nothing timed out */
797                         kptllnd_peer_check_sends(peer);
798                         kptllnd_peer_decref(peer); /* ...until here or... */
799
800                         /* rescan after dropping the lock */
801                         goto again;
802                 }
803
804                 spin_lock_irqsave(&peer->peer_lock, flags);
805                 nsend = kptllnd_count_queue(&peer->peer_sendq);
806                 nactive = kptllnd_count_queue(&peer->peer_activeq);
807                 spin_unlock_irqrestore(&peer->peer_lock, flags);
808
809                 LCONSOLE_ERROR("Timing out %s: %s\n",
810                                libcfs_id2str(peer->peer_id),
811                                (tx->tx_tposted == 0) ? 
812                                "no free peer buffers" : "please check Portals");
813
814                 CERROR("%s timed out: cred %d outstanding %d, sent %d, "
815                        "sendq %d, activeq %d Tx %p %s (%s%s%s) status %d "
816                        "%sposted %lu T/O %ds\n",
817                        libcfs_id2str(peer->peer_id), peer->peer_credits,
818                        peer->peer_outstanding_credits, peer->peer_sent_credits,
819                        nsend, nactive, tx, kptllnd_tx_typestr(tx->tx_type),
820                        tx->tx_active ? "A" : "",
821                        PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ?
822                        "" : "M",
823                        PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE) ?
824                        "" : "D",
825                        tx->tx_status,
826                        (tx->tx_tposted == 0) ? "not " : "",
827                        (tx->tx_tposted == 0) ? 0UL : (jiffies - tx->tx_tposted),
828                        *kptllnd_tunables.kptl_timeout);
829
830                 kptllnd_dump_ptltrace();
831
832                 kptllnd_tx_decref(tx);
833
834                 kptllnd_peer_close(peer, -ETIMEDOUT);
835                 kptllnd_peer_decref(peer); /* ...until here */
836
837                 /* start again now I've dropped the lock */
838                 goto again;
839         }
840
841         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
842 }
843
844 kptl_peer_t *
845 kptllnd_id2peer_locked (lnet_process_id_t id)
846 {
847         struct list_head *peers = kptllnd_nid2peerlist(id.nid);
848         struct list_head *tmp;
849         kptl_peer_t      *peer;
850
851         list_for_each (tmp, peers) {
852
853                 peer = list_entry (tmp, kptl_peer_t, peer_list);
854
855                 LASSERT(peer->peer_state == PEER_STATE_WAITING_HELLO ||
856                         peer->peer_state == PEER_STATE_ACTIVE);
857                 
858                 if (peer->peer_id.nid != id.nid ||
859                     peer->peer_id.pid != id.pid)
860                         continue;
861
862                 kptllnd_peer_addref(peer);
863
864                 CDEBUG(D_NET, "%s -> %s (%d)\n",
865                        libcfs_id2str(id), 
866                        kptllnd_ptlid2str(peer->peer_ptlid),
867                        atomic_read (&peer->peer_refcount));
868                 return peer;
869         }
870
871         return NULL;
872 }
873
874 void
875 kptllnd_peertable_overflow_msg(char *str, lnet_process_id_t id)
876 {
877         LCONSOLE_ERROR("%s %s overflows the peer table[%d]: "
878                        "messages may be dropped\n",
879                        str, libcfs_id2str(id),
880                        kptllnd_data.kptl_n_active_peers);
881         LCONSOLE_ERROR("Please correct by increasing "
882                        "'max_nodes' or 'max_procs_per_node'\n");
883 }
884
885 __u64
886 kptllnd_get_last_seen_matchbits_locked(lnet_process_id_t lpid)
887 {
888         kptl_peer_t            *peer;
889         struct list_head       *tmp;
890
891         /* Find the last matchbits I saw this new peer using.  Note..
892            A. This peer cannot be in the peer table - she's new!
893            B. If I can't find the peer in the closing/zombie peers, all
894               matchbits are safe because all refs to the (old) peer have gone
895               so all txs have completed so there's no risk of matchbit
896               collision!
897          */
898
899         LASSERT(kptllnd_id2peer_locked(lpid) == NULL);
900
901         /* peer's last matchbits can't change after it comes out of the peer
902          * table, so first match is fine */
903
904         list_for_each (tmp, &kptllnd_data.kptl_closing_peers) {
905                 peer = list_entry (tmp, kptl_peer_t, peer_list);
906
907                 if (peer->peer_id.nid == lpid.nid &&
908                     peer->peer_id.pid == lpid.pid)
909                         return peer->peer_last_matchbits_seen;
910         }
911         
912         list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
913                 peer = list_entry (tmp, kptl_peer_t, peer_list);
914
915                 if (peer->peer_id.nid == lpid.nid &&
916                     peer->peer_id.pid == lpid.pid)
917                         return peer->peer_last_matchbits_seen;
918         }
919         
920         return PTL_RESERVED_MATCHBITS;
921 }
922
923 kptl_peer_t *
924 kptllnd_peer_handle_hello (ptl_process_id_t  initiator,
925                            kptl_msg_t       *msg)
926 {
927         rwlock_t           *g_lock = &kptllnd_data.kptl_peer_rw_lock;
928         kptl_peer_t        *peer;
929         kptl_peer_t        *new_peer;
930         lnet_process_id_t   lpid;
931         unsigned long       flags;
932         kptl_tx_t          *hello_tx;
933         int                 rc;
934         __u64               safe_matchbits;
935         __u64               last_matchbits_seen;
936
937         lpid.nid = msg->ptlm_srcnid;
938         lpid.pid = msg->ptlm_srcpid;
939
940         CDEBUG(D_NET, "hello from %s(%s)\n",
941                libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
942
943         if (initiator.pid != kptllnd_data.kptl_portals_id.pid &&
944             (msg->ptlm_srcpid & LNET_PID_USERFLAG) == 0) {
945                 /* If the peer's PID isn't _the_ ptllnd kernel pid, she must be
946                  * userspace.  Refuse the connection if she hasn't set the
947                  * correct flag in her PID... */
948                 CERROR("Userflag not set in hello from %s (%s)\n",
949                        libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
950                 return NULL;
951         }
952         
953         /* kptlhm_matchbits are the highest matchbits my peer may have used to
954          * RDMA to me.  I ensure I never register buffers for RDMA that could
955          * match any she used */
956         safe_matchbits = msg->ptlm_u.hello.kptlhm_matchbits + 1;
957
958         if (safe_matchbits < PTL_RESERVED_MATCHBITS) {
959                 CERROR("Illegal matchbits "LPX64" in HELLO from %s\n",
960                        safe_matchbits, libcfs_id2str(lpid));
961                 return NULL;
962         }
963         
964         if (msg->ptlm_u.hello.kptlhm_max_msg_size < PTLLND_MIN_BUFFER_SIZE) {
965                 CERROR("%s: max message size %d < MIN %d",
966                        libcfs_id2str(lpid),
967                        msg->ptlm_u.hello.kptlhm_max_msg_size,
968                        PTLLND_MIN_BUFFER_SIZE);
969                 return NULL;
970         }
971
972         if (msg->ptlm_credits <= 1) {
973                 CERROR("Need more than 1+%d credits from %s\n",
974                        msg->ptlm_credits, libcfs_id2str(lpid));
975                 return NULL;
976         }
977         
978         write_lock_irqsave(g_lock, flags);
979
980         peer = kptllnd_id2peer_locked(lpid);
981         if (peer != NULL) {
982                 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
983                         /* Completing HELLO handshake */
984                         LASSERT(peer->peer_incarnation == 0);
985
986                         if (msg->ptlm_dststamp != 0 &&
987                             msg->ptlm_dststamp != peer->peer_myincarnation) {
988                                 write_unlock_irqrestore(g_lock, flags);
989
990                                 CERROR("Ignoring HELLO from %s: unexpected "
991                                        "dststamp "LPX64" ("LPX64" wanted)\n",
992                                        libcfs_id2str(lpid),
993                                        msg->ptlm_dststamp,
994                                        peer->peer_myincarnation);
995                                 kptllnd_peer_decref(peer);
996                                 return NULL;
997                         }
998                         
999                         /* Concurrent initiation or response to my HELLO */
1000                         peer->peer_state = PEER_STATE_ACTIVE;
1001                         peer->peer_incarnation = msg->ptlm_srcstamp;
1002                         peer->peer_next_matchbits = safe_matchbits;
1003                         peer->peer_max_msg_size =
1004                                 msg->ptlm_u.hello.kptlhm_max_msg_size;
1005                         
1006                         write_unlock_irqrestore(g_lock, flags);
1007                         return peer;
1008                 }
1009
1010                 if (msg->ptlm_dststamp != 0 &&
1011                     msg->ptlm_dststamp <= peer->peer_myincarnation) {
1012                         write_unlock_irqrestore(g_lock, flags);
1013
1014                         CERROR("Ignoring stale HELLO from %s: "
1015                                "dststamp "LPX64" (current "LPX64")\n",
1016                                libcfs_id2str(lpid),
1017                                msg->ptlm_dststamp,
1018                                peer->peer_myincarnation);
1019                         kptllnd_peer_decref(peer);
1020                         return NULL;
1021                 }
1022
1023                 /* Brand new connection attempt: remove old incarnation */
1024                 kptllnd_peer_close_locked(peer, 0);
1025         }
1026
1027         kptllnd_cull_peertable_locked(lpid);
1028
1029         write_unlock_irqrestore(g_lock, flags);
1030
1031         if (peer != NULL) {
1032                 CDEBUG(D_NET, "Peer %s (%s) reconnecting:"
1033                        " stamp "LPX64"("LPX64")\n",
1034                        libcfs_id2str(lpid), kptllnd_ptlid2str(initiator),
1035                        msg->ptlm_srcstamp, peer->peer_incarnation);
1036
1037                 kptllnd_peer_decref(peer);
1038         }
1039
1040         hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1041         if (hello_tx == NULL) {
1042                 CERROR("Unable to allocate HELLO message for %s\n",
1043                        libcfs_id2str(lpid));
1044                 return NULL;
1045         }
1046
1047         kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1048                          sizeof(kptl_hello_msg_t));
1049
1050         new_peer = kptllnd_peer_allocate(lpid, initiator);
1051         if (new_peer == NULL) {
1052                 kptllnd_tx_decref(hello_tx);
1053                 return NULL;
1054         }
1055
1056         rc = kptllnd_peer_reserve_buffers();
1057         if (rc != 0) {
1058                 kptllnd_peer_decref(new_peer);
1059                 kptllnd_tx_decref(hello_tx);
1060
1061                 CERROR("Failed to reserve buffers for %s\n",
1062                        libcfs_id2str(lpid));
1063                 return NULL;
1064         }
1065
1066         write_lock_irqsave(g_lock, flags);
1067  again:
1068         peer = kptllnd_id2peer_locked(lpid);
1069         if (peer != NULL) {
1070                 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
1071                         /* An outgoing message instantiated 'peer' for me */
1072                         LASSERT(peer->peer_incarnation == 0);
1073
1074                         peer->peer_state = PEER_STATE_ACTIVE;
1075                         peer->peer_incarnation = msg->ptlm_srcstamp;
1076                         peer->peer_next_matchbits = safe_matchbits;
1077                         peer->peer_max_msg_size =
1078                                 msg->ptlm_u.hello.kptlhm_max_msg_size;
1079
1080                         write_unlock_irqrestore(g_lock, flags);
1081
1082                         CWARN("Outgoing instantiated peer %s\n",
1083                               libcfs_id2str(lpid));
1084                 } else {
1085                         LASSERT (peer->peer_state == PEER_STATE_ACTIVE);
1086
1087                         write_unlock_irqrestore(g_lock, flags);
1088
1089                         /* WOW!  Somehow this peer completed the HELLO
1090                          * handshake while I slept.  I guess I could have slept
1091                          * while it rebooted and sent a new HELLO, so I'll fail
1092                          * this one... */
1093                         CWARN("Wow! peer %s\n", libcfs_id2str(lpid));
1094                         kptllnd_peer_decref(peer);
1095                         peer = NULL;
1096                 }
1097
1098                 kptllnd_peer_unreserve_buffers();
1099                 kptllnd_peer_decref(new_peer);
1100                 kptllnd_tx_decref(hello_tx);
1101                 return peer;
1102         }
1103
1104         if (kptllnd_data.kptl_n_active_peers ==
1105             kptllnd_data.kptl_expected_peers) {
1106                 /* peer table full */
1107                 write_unlock_irqrestore(g_lock, flags);
1108
1109                 kptllnd_peertable_overflow_msg("Connection from ", lpid);
1110
1111                 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1112                 if (rc != 0) {
1113                         CERROR("Refusing connection from %s\n",
1114                                libcfs_id2str(lpid));
1115                         kptllnd_peer_unreserve_buffers();
1116                         kptllnd_peer_decref(new_peer);
1117                         kptllnd_tx_decref(hello_tx);
1118                         return NULL;
1119                 }
1120                 
1121                 write_lock_irqsave(g_lock, flags);
1122                 kptllnd_data.kptl_expected_peers++;
1123                 goto again;
1124         }
1125
1126         last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(lpid);
1127
1128         hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1129         hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1130                 *kptllnd_tunables.kptl_max_msg_size;
1131
1132         new_peer->peer_state = PEER_STATE_ACTIVE;
1133         new_peer->peer_incarnation = msg->ptlm_srcstamp;
1134         new_peer->peer_next_matchbits = safe_matchbits;
1135         new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1136         new_peer->peer_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size;
1137
1138         kptllnd_peer_add_peertable_locked(new_peer);
1139
1140         write_unlock_irqrestore(g_lock, flags);
1141
1142         /* NB someone else could get in now and post a message before I post
1143          * the HELLO, but post_tx/check_sends take care of that! */
1144
1145         CDEBUG(D_NETTRACE, "%s: post response hello %p\n",
1146                libcfs_id2str(new_peer->peer_id), hello_tx);
1147
1148         kptllnd_post_tx(new_peer, hello_tx, 0);
1149         kptllnd_peer_check_sends(new_peer);
1150
1151         return new_peer;
1152 }
1153
1154 void
1155 kptllnd_tx_launch(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
1156 {
1157         kptllnd_post_tx(peer, tx, nfrag);
1158         kptllnd_peer_check_sends(peer);
1159 }
1160
1161 int
1162 kptllnd_find_target(kptl_peer_t **peerp, lnet_process_id_t target)
1163 {
1164         rwlock_t         *g_lock = &kptllnd_data.kptl_peer_rw_lock;
1165         ptl_process_id_t  ptl_id;
1166         kptl_peer_t      *new_peer;
1167         kptl_tx_t        *hello_tx;
1168         unsigned long     flags;
1169         int               rc;
1170         __u64             last_matchbits_seen;
1171
1172         /* I expect to find the peer, so I only take a read lock... */
1173         read_lock_irqsave(g_lock, flags);
1174         *peerp = kptllnd_id2peer_locked(target);
1175         read_unlock_irqrestore(g_lock, flags);
1176
1177         if (*peerp != NULL)
1178                 return 0;
1179         
1180         if ((target.pid & LNET_PID_USERFLAG) != 0) {
1181                 CWARN("Refusing to create a new connection to %s "
1182                       "(non-kernel peer)\n", libcfs_id2str(target));
1183                 return -EHOSTUNREACH;
1184         }
1185
1186         /* The new peer is a kernel ptllnd, and kernel ptllnds all have
1187          * the same portals PID */
1188         ptl_id.nid = kptllnd_lnet2ptlnid(target.nid);
1189         ptl_id.pid = kptllnd_data.kptl_portals_id.pid;
1190
1191         hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1192         if (hello_tx == NULL) {
1193                 CERROR("Unable to allocate connect message for %s\n",
1194                        libcfs_id2str(target));
1195                 return -ENOMEM;
1196         }
1197
1198         kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1199                          sizeof(kptl_hello_msg_t));
1200
1201         new_peer = kptllnd_peer_allocate(target, ptl_id);
1202         if (new_peer == NULL) {
1203                 rc = -ENOMEM;
1204                 goto unwind_0;
1205         }
1206
1207         rc = kptllnd_peer_reserve_buffers();
1208         if (rc != 0)
1209                 goto unwind_1;
1210
1211         write_lock_irqsave(g_lock, flags);
1212  again:
1213         *peerp = kptllnd_id2peer_locked(target);
1214         if (*peerp != NULL) {
1215                 write_unlock_irqrestore(g_lock, flags);
1216                 goto unwind_2;
1217         }
1218
1219         kptllnd_cull_peertable_locked(target);
1220
1221         if (kptllnd_data.kptl_n_active_peers ==
1222             kptllnd_data.kptl_expected_peers) {
1223                 /* peer table full */
1224                 write_unlock_irqrestore(g_lock, flags);
1225
1226                 kptllnd_peertable_overflow_msg("Connection to ", target);
1227
1228                 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1229                 if (rc != 0) {
1230                         CERROR("Can't create connection to %s\n",
1231                                libcfs_id2str(target));
1232                         rc = -ENOMEM;
1233                         goto unwind_2;
1234                 }
1235                 write_lock_irqsave(g_lock, flags);
1236                 kptllnd_data.kptl_expected_peers++;
1237                 goto again;
1238         }
1239
1240         last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(target);
1241
1242         hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1243         hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1244                 *kptllnd_tunables.kptl_max_msg_size;
1245                 
1246         new_peer->peer_state = PEER_STATE_WAITING_HELLO;
1247         new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1248         
1249         kptllnd_peer_add_peertable_locked(new_peer);
1250
1251         write_unlock_irqrestore(g_lock, flags);
1252
1253         /* NB someone else could get in now and post a message before I post
1254          * the HELLO, but post_tx/check_sends take care of that! */
1255
1256         CDEBUG(D_NETTRACE, "%s: post initial hello %p\n",
1257                libcfs_id2str(new_peer->peer_id), hello_tx);
1258
1259         kptllnd_post_tx(new_peer, hello_tx, 0);
1260         kptllnd_peer_check_sends(new_peer);
1261        
1262         *peerp = new_peer;
1263         return 0;
1264         
1265  unwind_2:
1266         kptllnd_peer_unreserve_buffers();
1267  unwind_1:
1268         kptllnd_peer_decref(new_peer);
1269  unwind_0:
1270         kptllnd_tx_decref(hello_tx);
1271
1272         return rc;
1273 }