Whamcloud - gitweb
77b7191dfb27d6d8da2883833e0635c23b7db1ef
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd_peer.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved.
5  *   Author: PJ Kirner <pjkirner@clusterfs.com>
6  *           E Barton <eeb@bartonsoftware.com>
7  *
8  *   This file is part of the Lustre file system, http://www.lustre.org
9  *   Lustre is a trademark of Cluster File Systems, Inc.
10  *
11  *   This file is confidential source code owned by Cluster File Systems.
12  *   No viewing, modification, compilation, redistribution, or any other
13  *   form of use is permitted except through a signed license agreement.
14  *
15  *   If you have not signed such an agreement, then you have no rights to
16  *   this file.  Please destroy it immediately and contact CFS.
17  *
18  */
19
20 #include "ptllnd.h"
21 #include <libcfs/list.h>
22
23 static int
24 kptllnd_count_queue(struct list_head *q)
25 {
26         struct list_head *e;
27         int               n = 0;
28         
29         list_for_each(e, q) {
30                 n++;
31         }
32
33         return n;
34 }
35
36 int
37 kptllnd_get_peer_info(int index, 
38                       lnet_process_id_t *id,
39                       int *state, int *sent_hello,
40                       int *refcount, __u64 *incarnation,
41                       __u64 *next_matchbits, __u64 *last_matchbits_seen,
42                       int *nsendq, int *nactiveq,
43                       int *credits, int *outstanding_credits) 
44 {
45         rwlock_t         *g_lock = &kptllnd_data.kptl_peer_rw_lock;
46         unsigned long     flags;
47         struct list_head *ptmp;
48         kptl_peer_t      *peer;
49         int               i;
50         int               rc = -ENOENT;
51
52         read_lock_irqsave(g_lock, flags);
53
54         for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++) {
55                 
56                 list_for_each (ptmp, &kptllnd_data.kptl_peers[i]) {
57                         peer = list_entry(ptmp, kptl_peer_t, peer_list);
58
59                         if (index-- > 0)
60                                 continue;
61                         
62                         *id          = peer->peer_id;
63                         *state       = peer->peer_state;
64                         *sent_hello  = peer->peer_sent_hello;
65                         *refcount    = atomic_read(&peer->peer_refcount);
66                         *incarnation = peer->peer_incarnation;
67
68                         spin_lock(&peer->peer_lock);
69
70                         *next_matchbits      = peer->peer_next_matchbits;
71                         *last_matchbits_seen = peer->peer_last_matchbits_seen;
72                         *credits             = peer->peer_credits;
73                         *outstanding_credits = peer->peer_outstanding_credits;
74
75                         *nsendq   = kptllnd_count_queue(&peer->peer_sendq);
76                         *nactiveq = kptllnd_count_queue(&peer->peer_activeq);
77
78                         spin_unlock(&peer->peer_lock);
79
80                         rc = 0;
81                         goto out;
82                 }
83         }
84         
85  out:
86         read_unlock_irqrestore(g_lock, flags);
87         return rc;
88 }
89
90 void
91 kptllnd_peer_add_peertable_locked (kptl_peer_t *peer)
92 {
93         LASSERT (kptllnd_data.kptl_n_active_peers <
94                  kptllnd_data.kptl_expected_peers);
95
96         LASSERT (peer->peer_state == PEER_STATE_WAITING_HELLO ||
97                  peer->peer_state == PEER_STATE_ACTIVE);
98         
99         kptllnd_data.kptl_n_active_peers++;
100         atomic_inc(&peer->peer_refcount);       /* +1 ref for the list */
101
102         /* NB add to HEAD of peer list for MRU order!
103          * (see kptllnd_cull_peertable) */
104         list_add(&peer->peer_list, kptllnd_nid2peerlist(peer->peer_id.nid));
105 }
106
107 void
108 kptllnd_cull_peertable_locked (lnet_process_id_t pid)
109 {
110         /* I'm about to add a new peer with this portals ID to the peer table,
111          * so (a) this peer should not exist already and (b) I want to leave at
112          * most (max_procs_per_nid - 1) peers with this NID in the table. */
113         struct list_head  *peers = kptllnd_nid2peerlist(pid.nid);
114         int                cull_count = *kptllnd_tunables.kptl_max_procs_per_node;
115         int                count;
116         struct list_head  *tmp;
117         struct list_head  *nxt;
118         kptl_peer_t       *peer;
119         
120         count = 0;
121         list_for_each_safe (tmp, nxt, peers) {
122                 /* NB I rely on kptllnd_peer_add_peertable_locked to add peers
123                  * in MRU order */
124                 peer = list_entry(tmp, kptl_peer_t, peer_list);
125                         
126                 if (peer->peer_id.nid != pid.nid)
127                         continue;
128
129                 LASSERT (peer->peer_id.pid != pid.pid);
130                         
131                 count++;
132
133                 if (count < cull_count) /* recent (don't cull) */
134                         continue;
135
136                 CDEBUG(D_NET, "Cull %s(%s)\n",
137                        libcfs_id2str(peer->peer_id),
138                        kptllnd_ptlid2str(peer->peer_ptlid));
139                 
140                 kptllnd_peer_close_locked(peer, 0);
141         }
142 }
143
144 kptl_peer_t *
145 kptllnd_peer_allocate (lnet_process_id_t lpid, ptl_process_id_t ppid)
146 {
147         unsigned long    flags;
148         kptl_peer_t     *peer;
149
150         LIBCFS_ALLOC(peer, sizeof (*peer));
151         if (peer == NULL) {
152                 CERROR("Can't create peer %s (%s)\n",
153                        libcfs_id2str(lpid), 
154                        kptllnd_ptlid2str(ppid));
155                 return NULL;
156         }
157
158         memset(peer, 0, sizeof(*peer));         /* zero flags etc */
159
160         INIT_LIST_HEAD (&peer->peer_sendq);
161         INIT_LIST_HEAD (&peer->peer_activeq);
162         spin_lock_init (&peer->peer_lock);
163
164         peer->peer_state = PEER_STATE_ALLOCATED;
165         peer->peer_error = 0;
166         peer->peer_last_alive = cfs_time_current();
167         peer->peer_id = lpid;
168         peer->peer_ptlid = ppid;
169         peer->peer_credits = 1;                 /* enough for HELLO */
170         peer->peer_next_matchbits = PTL_RESERVED_MATCHBITS;
171         peer->peer_outstanding_credits = *kptllnd_tunables.kptl_peercredits - 1;
172         peer->peer_sent_credits = 1;           /* HELLO credit is implicit */
173         peer->peer_max_msg_size = PTLLND_MIN_BUFFER_SIZE; /* until we know better */
174
175         atomic_set(&peer->peer_refcount, 1);    /* 1 ref for caller */
176
177         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
178
179         peer->peer_myincarnation = kptllnd_data.kptl_incarnation;
180
181         /* Only increase # peers under lock, to guarantee we dont grow it
182          * during shutdown */
183         if (kptllnd_data.kptl_shutdown) {
184                 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, 
185                                         flags);
186                 LIBCFS_FREE(peer, sizeof(*peer));
187                 return NULL;
188         }
189
190         kptllnd_data.kptl_npeers++;
191         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
192         
193         return peer;
194 }
195
196 void
197 kptllnd_peer_destroy (kptl_peer_t *peer)
198 {
199         unsigned long flags;
200         
201         CDEBUG(D_NET, "Peer=%p\n", peer);
202
203         LASSERT (!in_interrupt());
204         LASSERT (atomic_read(&peer->peer_refcount) == 0);
205         LASSERT (peer->peer_state == PEER_STATE_ALLOCATED ||
206                  peer->peer_state == PEER_STATE_ZOMBIE);
207         LASSERT (list_empty(&peer->peer_sendq));
208         LASSERT (list_empty(&peer->peer_activeq));
209
210         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
211
212         if (peer->peer_state == PEER_STATE_ZOMBIE)
213                 list_del(&peer->peer_list);
214
215         kptllnd_data.kptl_npeers--;
216
217         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
218
219         LIBCFS_FREE (peer, sizeof (*peer));
220 }
221
222 void
223 kptllnd_cancel_txlist (struct list_head *peerq, struct list_head *txs)
224 {
225         struct list_head  *tmp;
226         struct list_head  *nxt;
227         kptl_tx_t         *tx;
228
229         list_for_each_safe (tmp, nxt, peerq) {
230                 tx = list_entry(tmp, kptl_tx_t, tx_list);
231
232                 list_del(&tx->tx_list);
233                 list_add_tail(&tx->tx_list, txs);
234
235                 tx->tx_status = -EIO;
236                 tx->tx_active = 0;
237         }
238 }
239
240 void
241 kptllnd_peer_cancel_txs(kptl_peer_t *peer, struct list_head *txs)
242 {
243         unsigned long   flags;
244
245         spin_lock_irqsave(&peer->peer_lock, flags);
246
247         kptllnd_cancel_txlist(&peer->peer_sendq, txs);
248         kptllnd_cancel_txlist(&peer->peer_activeq, txs);
249                 
250         spin_unlock_irqrestore(&peer->peer_lock, flags);
251 }
252
253 void
254 kptllnd_peer_alive (kptl_peer_t *peer)
255 {
256         /* This is racy, but everyone's only writing cfs_time_current() */
257         peer->peer_last_alive = cfs_time_current();
258         mb();
259 }
260
261 void
262 kptllnd_peer_notify (kptl_peer_t *peer)
263 {
264         unsigned long flags;
265         time_t        last_alive = 0;
266         int           error = 0;
267         
268         spin_lock_irqsave(&peer->peer_lock, flags);
269
270         if (peer->peer_error != 0) {
271                 error = peer->peer_error;
272                 peer->peer_error = 0;
273                 
274                 last_alive = cfs_time_current_sec() - 
275                              cfs_duration_sec(cfs_time_current() - 
276                                               peer->peer_last_alive);
277         }
278         
279         spin_unlock_irqrestore(&peer->peer_lock, flags);
280
281         if (error != 0)
282                 lnet_notify (kptllnd_data.kptl_ni, peer->peer_id.nid, 0,
283                              last_alive);
284 }
285
286 void
287 kptllnd_handle_closing_peers ()
288 {
289         unsigned long           flags;
290         struct list_head        txs;
291         kptl_peer_t            *peer;
292         struct list_head       *tmp;
293         struct list_head       *nxt;
294         kptl_tx_t              *tx;
295         int                     idle;
296
297         /* Check with a read lock first to avoid blocking anyone */
298
299         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
300         idle = list_empty(&kptllnd_data.kptl_closing_peers) &&
301                list_empty(&kptllnd_data.kptl_zombie_peers);
302         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
303
304         if (idle)
305                 return;
306
307         INIT_LIST_HEAD(&txs);
308
309         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
310
311         /* Cancel txs on all zombie peers.  NB anyone dropping the last peer
312          * ref removes it from this list, so I musn't drop the lock while
313          * scanning it. */
314         list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
315                 peer = list_entry (tmp, kptl_peer_t, peer_list);
316
317                 LASSERT (peer->peer_state == PEER_STATE_ZOMBIE);
318
319                 kptllnd_peer_cancel_txs(peer, &txs);
320         }
321
322         /* Notify LNET and cancel txs on closing (i.e. newly closed) peers.  NB
323          * I'm the only one removing from this list, but peers can be added on
324          * the end any time I drop the lock. */
325
326         list_for_each_safe (tmp, nxt, &kptllnd_data.kptl_closing_peers) {
327                 peer = list_entry (tmp, kptl_peer_t, peer_list);
328
329                 LASSERT (peer->peer_state == PEER_STATE_CLOSING);
330
331                 list_del(&peer->peer_list);
332                 list_add_tail(&peer->peer_list,
333                               &kptllnd_data.kptl_zombie_peers);
334                 peer->peer_state = PEER_STATE_ZOMBIE;
335
336                 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
337
338                 kptllnd_peer_notify(peer);
339                 kptllnd_peer_cancel_txs(peer, &txs);
340                 kptllnd_peer_decref(peer);
341
342                 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
343         }
344
345         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
346
347         /* Drop peer's ref on all cancelled txs.  This will get
348          * kptllnd_tx_fini() to abort outstanding comms if necessary. */
349
350         list_for_each_safe (tmp, nxt, &txs) {
351                 tx = list_entry(tmp, kptl_tx_t, tx_list);
352                 list_del(&tx->tx_list);
353                 kptllnd_tx_decref(tx);
354         }
355 }
356
357 void
358 kptllnd_peer_close_locked(kptl_peer_t *peer, int why)
359 {
360         switch (peer->peer_state) {
361         default:
362                 LBUG();
363
364         case PEER_STATE_WAITING_HELLO:
365         case PEER_STATE_ACTIVE:
366                 /* Ensure new peers see a new incarnation of me */
367                 LASSERT(peer->peer_myincarnation <= kptllnd_data.kptl_incarnation);
368                 if (peer->peer_myincarnation == kptllnd_data.kptl_incarnation)
369                         kptllnd_data.kptl_incarnation++;
370
371                 /* Removing from peer table */
372                 kptllnd_data.kptl_n_active_peers--;
373                 LASSERT (kptllnd_data.kptl_n_active_peers >= 0);
374
375                 list_del(&peer->peer_list);
376                 kptllnd_peer_unreserve_buffers();
377
378                 peer->peer_error = why; /* stash 'why' only on first close */
379                 peer->peer_state = PEER_STATE_CLOSING;
380
381                 /* Schedule for immediate attention, taking peer table's ref */
382                 list_add_tail(&peer->peer_list, 
383                               &kptllnd_data.kptl_closing_peers);
384                 wake_up(&kptllnd_data.kptl_watchdog_waitq);
385                 break;
386
387         case PEER_STATE_ZOMBIE:
388         case PEER_STATE_CLOSING:
389                 break;
390         }
391 }
392
393 void
394 kptllnd_peer_close(kptl_peer_t *peer, int why)
395 {
396         unsigned long      flags;
397
398         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
399         kptllnd_peer_close_locked(peer, why);
400         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
401 }
402
403 int
404 kptllnd_peer_del(lnet_process_id_t id)
405 {
406         struct list_head  *ptmp;
407         struct list_head  *pnxt;
408         kptl_peer_t       *peer;
409         int                lo;
410         int                hi;
411         int                i;
412         unsigned long      flags;
413         int                rc = -ENOENT;
414
415         /*
416          * Find the single bucket we are supposed to look at or if nid is a
417          * wildcard (LNET_NID_ANY) then look at all of the buckets
418          */
419         if (id.nid != LNET_NID_ANY) {
420                 struct list_head *l = kptllnd_nid2peerlist(id.nid);
421                 
422                 lo = hi =  l - kptllnd_data.kptl_peers;
423         } else {
424                 if (id.pid != LNET_PID_ANY)
425                         return -EINVAL;
426                 
427                 lo = 0;
428                 hi = kptllnd_data.kptl_peer_hash_size - 1;
429         }
430
431 again:
432         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
433
434         for (i = lo; i <= hi; i++) {
435                 list_for_each_safe (ptmp, pnxt, &kptllnd_data.kptl_peers[i]) {
436                         peer = list_entry (ptmp, kptl_peer_t, peer_list);
437
438                         if (!(id.nid == LNET_NID_ANY || 
439                               (peer->peer_id.nid == id.nid &&
440                                (id.pid == LNET_PID_ANY || 
441                                 peer->peer_id.pid == id.pid))))
442                                 continue;
443
444                         kptllnd_peer_addref(peer); /* 1 ref for me... */
445
446                         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
447                                                flags);
448
449                         kptllnd_peer_close(peer, 0);
450                         kptllnd_peer_decref(peer); /* ...until here */
451
452                         rc = 0;         /* matched something */
453
454                         /* start again now I've dropped the lock */
455                         goto again;
456                 }
457         }
458
459         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
460
461         return (rc);
462 }
463
464 void
465 kptllnd_post_tx(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
466 {
467         /* CAVEAT EMPTOR: I take over caller's ref on 'tx' */
468         ptl_handle_md_t  msg_mdh;
469         ptl_md_t         md;
470         ptl_err_t        prc;
471         unsigned long    flags;
472
473         LASSERT (!tx->tx_idle);
474         LASSERT (!tx->tx_active);
475         LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
476         LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
477         LASSERT (tx->tx_type == TX_TYPE_SMALL_MESSAGE ||
478                  tx->tx_type == TX_TYPE_PUT_REQUEST ||
479                  tx->tx_type == TX_TYPE_GET_REQUEST);
480
481         kptllnd_set_tx_peer(tx, peer);
482
483         memset(&md, 0, sizeof(md));
484
485         md.threshold = tx->tx_acked ? 2 : 1;    /* SEND END + ACK? */
486         md.options = PTL_MD_OP_PUT |
487                      PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
488                      PTL_MD_EVENT_START_DISABLE;
489         md.user_ptr = &tx->tx_msg_eventarg;
490         md.eq_handle = kptllnd_data.kptl_eqh;
491
492         if (nfrag == 0) {
493                 md.start = tx->tx_msg;
494                 md.length = tx->tx_msg->ptlm_nob;
495         } else {
496                 LASSERT (nfrag > 1);
497                 LASSERT (tx->tx_frags->iov[0].iov_base == (void *)tx->tx_msg);
498
499                 md.start = tx->tx_frags;
500                 md.length = nfrag;
501                 md.options |= PTL_MD_IOVEC;
502         }
503
504         prc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &msg_mdh);
505         if (prc != PTL_OK) {
506                 CERROR("PtlMDBind(%s) failed: %d\n",
507                        libcfs_id2str(peer->peer_id), prc);
508                 tx->tx_status = -EIO;
509                 kptllnd_tx_decref(tx);
510                 return;
511         }
512         
513         spin_lock_irqsave(&peer->peer_lock, flags);
514
515         tx->tx_deadline = jiffies + (*kptllnd_tunables.kptl_timeout * HZ);
516         tx->tx_active = 1;
517         tx->tx_msg_mdh = msg_mdh;
518
519         /* Ensure HELLO is sent first */
520         if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_HELLO)
521                 list_add(&tx->tx_list, &peer->peer_sendq);
522         else
523                 list_add_tail(&tx->tx_list, &peer->peer_sendq);
524
525         spin_unlock_irqrestore(&peer->peer_lock, flags);
526 }
527
528 void
529 kptllnd_peer_check_sends (kptl_peer_t *peer)
530 {
531         ptl_handle_me_t  meh;
532         kptl_tx_t       *tx;
533         int              rc;
534         unsigned long    flags;
535
536         LASSERT(!in_interrupt());
537
538         spin_lock_irqsave(&peer->peer_lock, flags);
539
540         peer->peer_retry_noop = 0;
541
542         if (list_empty(&peer->peer_sendq) &&
543             peer->peer_outstanding_credits >= PTLLND_CREDIT_HIGHWATER &&
544             peer->peer_credits != 0) {
545
546                 /* post a NOOP to return credits */
547                 spin_unlock_irqrestore(&peer->peer_lock, flags);
548
549                 tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
550                 if (tx == NULL) {
551                         CERROR("Can't return credits to %s: can't allocate descriptor\n",
552                                libcfs_id2str(peer->peer_id));
553                 } else {
554                         kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_NOOP, 0);
555                         kptllnd_post_tx(peer, tx, 0);
556                 }
557
558                 spin_lock_irqsave(&peer->peer_lock, flags);
559                 peer->peer_retry_noop = (tx == NULL);
560         }
561
562         while (!list_empty(&peer->peer_sendq)) {
563                 tx = list_entry (peer->peer_sendq.next, kptl_tx_t, tx_list);
564
565                 LASSERT (tx->tx_active);
566                 LASSERT (!PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
567                 LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
568
569                 LASSERT (peer->peer_outstanding_credits >= 0);
570                 LASSERT (peer->peer_sent_credits >= 0);
571                 LASSERT (peer->peer_sent_credits +
572                          peer->peer_outstanding_credits <=
573                          *kptllnd_tunables.kptl_peercredits);
574                 LASSERT (peer->peer_credits >= 0);
575
576                 /* Ensure HELLO is sent first */
577                 if (!peer->peer_sent_hello) {
578                         if (tx->tx_msg->ptlm_type != PTLLND_MSG_TYPE_HELLO)
579                                 break;
580                         peer->peer_sent_hello = 1;
581                 }
582
583                 if (peer->peer_credits == 0) {
584                         CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: no credits for %p\n",
585                                libcfs_id2str(peer->peer_id), 
586                                peer->peer_credits,
587                                peer->peer_outstanding_credits, 
588                                peer->peer_sent_credits, tx);
589                         break;
590                 }
591
592                 /* Don't use the last credit unless I've got credits to
593                  * return */
594                 if (peer->peer_credits == 1 &&
595                     peer->peer_outstanding_credits == 0) {
596                         CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: "
597                                "not using last credit for %p\n",
598                                libcfs_id2str(peer->peer_id), 
599                                peer->peer_credits,
600                                peer->peer_outstanding_credits,
601                                peer->peer_sent_credits, tx);
602                         break;
603                 }
604
605                 list_del(&tx->tx_list);
606
607                 /* Discard any NOOP I queued if I'm not at the high-water mark
608                  * any more or more messages have been queued */
609                 if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_NOOP &&
610                     (!list_empty(&peer->peer_sendq) ||
611                      peer->peer_outstanding_credits < PTLLND_CREDIT_HIGHWATER)) {
612
613                         tx->tx_active = 0;
614
615                         spin_unlock_irqrestore(&peer->peer_lock, flags);
616
617                         CDEBUG(D_NET, "%s: redundant noop\n", 
618                                libcfs_id2str(peer->peer_id));
619                         kptllnd_tx_decref(tx);
620
621                         spin_lock_irqsave(&peer->peer_lock, flags);
622                         continue;
623                 }
624
625                 /* fill last-minute msg fields */
626                 kptllnd_msg_pack(tx->tx_msg, peer);
627
628                 if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
629                     tx->tx_type == TX_TYPE_GET_REQUEST) {
630                         /* peer_next_matchbits must be known good */
631                         LASSERT (peer->peer_state >= PEER_STATE_ACTIVE);
632                         /* Assume 64-bit matchbits can't wrap */
633                         LASSERT (peer->peer_next_matchbits >= PTL_RESERVED_MATCHBITS);
634                         tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits =
635                                 peer->peer_next_matchbits++;
636                 }
637                 
638                 peer->peer_sent_credits += peer->peer_outstanding_credits;
639                 peer->peer_outstanding_credits = 0;
640                 peer->peer_credits--;
641
642                 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: %s tx=%p nob=%d cred=%d\n",
643                        libcfs_id2str(peer->peer_id), peer->peer_credits,
644                        peer->peer_outstanding_credits, peer->peer_sent_credits,
645                        kptllnd_msgtype2str(tx->tx_msg->ptlm_type),
646                        tx, tx->tx_msg->ptlm_nob,
647                        tx->tx_msg->ptlm_credits);
648
649                 list_add_tail(&tx->tx_list, &peer->peer_activeq);
650
651                 kptllnd_tx_addref(tx);          /* 1 ref for me... */
652
653                 spin_unlock_irqrestore(&peer->peer_lock, flags);
654
655                 if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
656                     tx->tx_type == TX_TYPE_GET_REQUEST) {
657                         /* Post bulk now we have safe matchbits */
658                         rc = PtlMEAttach(kptllnd_data.kptl_nih,
659                                          *kptllnd_tunables.kptl_portal,
660                                          peer->peer_ptlid,
661                                          tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits,
662                                          0,             /* ignore bits */
663                                          PTL_UNLINK,
664                                          PTL_INS_BEFORE,
665                                          &meh);
666                         if (rc != PTL_OK) {
667                                 CERROR("PtlMEAttach(%s) failed: %d\n",
668                                        libcfs_id2str(peer->peer_id), rc);
669                                 goto failed;
670                         }
671
672                         rc = PtlMDAttach(meh, tx->tx_rdma_md, PTL_UNLINK,
673                                          &tx->tx_rdma_mdh);
674                         if (rc != PTL_OK) {
675                                 CERROR("PtlMDAttach(%s) failed: %d\n",
676                                        libcfs_id2str(tx->tx_peer->peer_id), rc);
677                                 rc = PtlMEUnlink(meh);
678                                 LASSERT(rc == PTL_OK);
679                                 tx->tx_rdma_mdh = PTL_INVALID_HANDLE;
680                                 goto failed;
681                         }
682                         /* I'm not racing with the event callback here.  It's a
683                          * bug if there's an event on the MD I just attached
684                          * before I actually send the RDMA request message -
685                          * probably matchbits re-used in error. */
686                 }
687
688                 tx->tx_tposted = jiffies;       /* going on the wire */
689
690                 rc = PtlPut (tx->tx_msg_mdh,
691                              tx->tx_acked ? PTL_ACK_REQ : PTL_NOACK_REQ,
692                              peer->peer_ptlid,
693                              *kptllnd_tunables.kptl_portal,
694                              0,                 /* acl cookie */
695                              LNET_MSG_MATCHBITS,
696                              0,                 /* offset */
697                              0);                /* header data */
698                 if (rc != PTL_OK) {
699                         CERROR("PtlPut %s error %d\n",
700                                libcfs_id2str(peer->peer_id), rc);
701                         goto failed;
702                 }
703
704                 kptllnd_tx_decref(tx);          /* drop my ref */
705
706                 spin_lock_irqsave(&peer->peer_lock, flags);
707         }
708
709         spin_unlock_irqrestore(&peer->peer_lock, flags);
710         return;
711
712  failed:
713         /* Nuke everything (including tx we were trying) */
714         kptllnd_peer_close(peer, -EIO);
715         kptllnd_tx_decref(tx);
716 }
717
718 kptl_tx_t *
719 kptllnd_find_timed_out_tx(kptl_peer_t *peer)
720 {
721         kptl_tx_t         *tx;
722         struct list_head  *tmp;
723
724         list_for_each(tmp, &peer->peer_sendq) {
725                 tx = list_entry(peer->peer_sendq.next, kptl_tx_t, tx_list);
726
727                 if (time_after_eq(jiffies, tx->tx_deadline)) {
728                         kptllnd_tx_addref(tx);
729                         return tx;
730                 }
731         }
732
733         list_for_each(tmp, &peer->peer_activeq) {
734                 tx = list_entry(peer->peer_activeq.next, kptl_tx_t, tx_list);
735
736                 if (time_after_eq(jiffies, tx->tx_deadline)) {
737                         kptllnd_tx_addref(tx);
738                         return tx;
739                 }
740         }
741
742         return NULL;
743 }
744
745
746 void
747 kptllnd_peer_check_bucket (int idx, int stamp)
748 {
749         struct list_head  *peers = &kptllnd_data.kptl_peers[idx];
750         struct list_head  *ptmp;
751         kptl_peer_t       *peer;
752         kptl_tx_t         *tx;
753         unsigned long      flags;
754         int                nsend;
755         int                nactive;
756         int                check_sends;
757
758         CDEBUG(D_NET, "Bucket=%d, stamp=%d\n", idx, stamp);
759
760  again:
761         /* NB. Shared lock while I just look */
762         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
763
764         list_for_each (ptmp, peers) {
765                 peer = list_entry (ptmp, kptl_peer_t, peer_list);
766
767                 CDEBUG(D_NET, "Peer=%s Credits=%d Outstanding=%d Send=%d\n",
768                        libcfs_id2str(peer->peer_id), peer->peer_credits, 
769                        peer->peer_outstanding_credits, peer->peer_sent_credits);
770
771                 spin_lock(&peer->peer_lock);
772
773                 if (peer->peer_check_stamp == stamp) {
774                         /* checked already this pass */
775                         spin_unlock(&peer->peer_lock);
776                         continue;
777                 }
778
779                 peer->peer_check_stamp = stamp;
780                 tx = kptllnd_find_timed_out_tx(peer);
781                 check_sends = peer->peer_retry_noop;
782                 
783                 spin_unlock(&peer->peer_lock);
784                 
785                 if (tx == NULL && !check_sends)
786                         continue;
787
788                 kptllnd_peer_addref(peer); /* 1 ref for me... */
789
790                 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
791
792                 if (tx == NULL) { /* nothing timed out */
793                         kptllnd_peer_check_sends(peer);
794                         kptllnd_peer_decref(peer); /* ...until here or... */
795
796                         /* rescan after dropping the lock */
797                         goto again;
798                 }
799
800                 spin_lock_irqsave(&peer->peer_lock, flags);
801                 nsend = kptllnd_count_queue(&peer->peer_sendq);
802                 nactive = kptllnd_count_queue(&peer->peer_activeq);
803                 spin_unlock_irqrestore(&peer->peer_lock, flags);
804
805                 LCONSOLE_ERROR("Timing out %s: %s\n",
806                                libcfs_id2str(peer->peer_id),
807                                (tx->tx_tposted == 0) ? 
808                                "no free peer buffers" : "please check Portals");
809
810                 CERROR("%s timed out: cred %d outstanding %d, sent %d, "
811                        "sendq %d, activeq %d Tx %p %s (%s%s%s) status %d "
812                        "%sposted %lu T/O %ds\n",
813                        libcfs_id2str(peer->peer_id), peer->peer_credits,
814                        peer->peer_outstanding_credits, peer->peer_sent_credits,
815                        nsend, nactive, tx, kptllnd_tx_typestr(tx->tx_type),
816                        tx->tx_active ? "A" : "",
817                        PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ?
818                        "" : "M",
819                        PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE) ?
820                        "" : "D",
821                        tx->tx_status,
822                        (tx->tx_tposted == 0) ? "not " : "",
823                        (tx->tx_tposted == 0) ? 0UL : (jiffies - tx->tx_tposted),
824                        *kptllnd_tunables.kptl_timeout);
825
826                 kptllnd_dump_ptltrace();
827
828                 kptllnd_tx_decref(tx);
829
830                 kptllnd_peer_close(peer, -ETIMEDOUT);
831                 kptllnd_peer_decref(peer); /* ...until here */
832
833                 /* start again now I've dropped the lock */
834                 goto again;
835         }
836
837         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
838 }
839
840 kptl_peer_t *
841 kptllnd_id2peer_locked (lnet_process_id_t id)
842 {
843         struct list_head *peers = kptllnd_nid2peerlist(id.nid);
844         struct list_head *tmp;
845         kptl_peer_t      *peer;
846
847         list_for_each (tmp, peers) {
848
849                 peer = list_entry (tmp, kptl_peer_t, peer_list);
850
851                 LASSERT(peer->peer_state == PEER_STATE_WAITING_HELLO ||
852                         peer->peer_state == PEER_STATE_ACTIVE);
853                 
854                 if (peer->peer_id.nid != id.nid ||
855                     peer->peer_id.pid != id.pid)
856                         continue;
857
858                 kptllnd_peer_addref(peer);
859
860                 CDEBUG(D_NET, "%s -> %s (%d)\n",
861                        libcfs_id2str(id), 
862                        kptllnd_ptlid2str(peer->peer_ptlid),
863                        atomic_read (&peer->peer_refcount));
864                 return peer;
865         }
866
867         return NULL;
868 }
869
870 void
871 kptllnd_peertable_overflow_msg(char *str, lnet_process_id_t id)
872 {
873         LCONSOLE_ERROR("%s %s overflows the peer table[%d]: "
874                        "messages may be dropped\n",
875                        str, libcfs_id2str(id),
876                        kptllnd_data.kptl_n_active_peers);
877         LCONSOLE_ERROR("Please correct by increasing "
878                        "'max_nodes' or 'max_procs_per_node'\n");
879 }
880
881 __u64
882 kptllnd_get_last_seen_matchbits_locked(lnet_process_id_t lpid)
883 {
884         kptl_peer_t            *peer;
885         struct list_head       *tmp;
886
887         /* Find the last matchbits I saw this new peer using.  Note..
888            A. This peer cannot be in the peer table - she's new!
889            B. If I can't find the peer in the closing/zombie peers, all
890               matchbits are safe because all refs to the (old) peer have gone
891               so all txs have completed so there's no risk of matchbit
892               collision!
893          */
894
895         LASSERT(kptllnd_id2peer_locked(lpid) == NULL);
896
897         /* peer's last matchbits can't change after it comes out of the peer
898          * table, so first match is fine */
899
900         list_for_each (tmp, &kptllnd_data.kptl_closing_peers) {
901                 peer = list_entry (tmp, kptl_peer_t, peer_list);
902
903                 if (peer->peer_id.nid == lpid.nid &&
904                     peer->peer_id.pid == lpid.pid)
905                         return peer->peer_last_matchbits_seen;
906         }
907         
908         list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
909                 peer = list_entry (tmp, kptl_peer_t, peer_list);
910
911                 if (peer->peer_id.nid == lpid.nid &&
912                     peer->peer_id.pid == lpid.pid)
913                         return peer->peer_last_matchbits_seen;
914         }
915         
916         return PTL_RESERVED_MATCHBITS;
917 }
918
919 kptl_peer_t *
920 kptllnd_peer_handle_hello (ptl_process_id_t  initiator,
921                            kptl_msg_t       *msg)
922 {
923         rwlock_t           *g_lock = &kptllnd_data.kptl_peer_rw_lock;
924         kptl_peer_t        *peer;
925         kptl_peer_t        *new_peer;
926         lnet_process_id_t   lpid;
927         unsigned long       flags;
928         kptl_tx_t          *hello_tx;
929         int                 rc;
930         __u64               safe_matchbits;
931         __u64               last_matchbits_seen;
932
933         lpid.nid = msg->ptlm_srcnid;
934         lpid.pid = msg->ptlm_srcpid;
935
936         CDEBUG(D_NET, "hello from %s(%s)\n",
937                libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
938
939         if (initiator.pid != kptllnd_data.kptl_portals_id.pid &&
940             (msg->ptlm_srcpid & LNET_PID_USERFLAG) == 0) {
941                 /* If the peer's PID isn't _the_ ptllnd kernel pid, she must be
942                  * userspace.  Refuse the connection if she hasn't set the
943                  * correct flag in her PID... */
944                 CERROR("Userflag not set in hello from %s (%s)\n",
945                        libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
946                 return NULL;
947         }
948         
949         /* kptlhm_matchbits are the highest matchbits my peer may have used to
950          * RDMA to me.  I ensure I never register buffers for RDMA that could
951          * match any she used */
952         safe_matchbits = msg->ptlm_u.hello.kptlhm_matchbits + 1;
953
954         if (safe_matchbits < PTL_RESERVED_MATCHBITS) {
955                 CERROR("Illegal matchbits "LPX64" in HELLO from %s\n",
956                        safe_matchbits, libcfs_id2str(lpid));
957                 return NULL;
958         }
959         
960         if (msg->ptlm_u.hello.kptlhm_max_msg_size < PTLLND_MIN_BUFFER_SIZE) {
961                 CERROR("%s: max message size %d < MIN %d",
962                        libcfs_id2str(lpid),
963                        msg->ptlm_u.hello.kptlhm_max_msg_size,
964                        *kptllnd_tunables.kptl_max_msg_size);
965                 return NULL;
966         }
967
968         if (msg->ptlm_credits <= 1) {
969                 CERROR("Need more than 1+%d credits from %s\n",
970                        msg->ptlm_credits, libcfs_id2str(lpid));
971                 return NULL;
972         }
973         
974         write_lock_irqsave(g_lock, flags);
975
976         peer = kptllnd_id2peer_locked(lpid);
977         if (peer != NULL) {
978                 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
979                         /* Completing HELLO handshake */
980                         LASSERT(peer->peer_incarnation == 0);
981
982                         if (msg->ptlm_dststamp != 0 &&
983                             msg->ptlm_dststamp != peer->peer_myincarnation) {
984                                 write_unlock_irqrestore(g_lock, flags);
985
986                                 CERROR("Ignoring HELLO from %s: unexpected "
987                                        "dststamp "LPX64" ("LPX64" wanted)\n",
988                                        libcfs_id2str(lpid),
989                                        msg->ptlm_dststamp,
990                                        peer->peer_myincarnation);
991                                 kptllnd_peer_decref(peer);
992                                 return NULL;
993                         }
994                         
995                         /* Concurrent initiation or response to my HELLO */
996                         peer->peer_state = PEER_STATE_ACTIVE;
997                         peer->peer_incarnation = msg->ptlm_srcstamp;
998                         peer->peer_next_matchbits = safe_matchbits;
999                         peer->peer_max_msg_size =
1000                                 msg->ptlm_u.hello.kptlhm_max_msg_size;
1001                         
1002                         write_unlock_irqrestore(g_lock, flags);
1003                         return peer;
1004                 }
1005
1006                 if (msg->ptlm_dststamp != 0 &&
1007                     msg->ptlm_dststamp <= peer->peer_myincarnation) {
1008                         write_unlock_irqrestore(g_lock, flags);
1009
1010                         CERROR("Ignoring stale HELLO from %s: "
1011                                "dststamp "LPX64" (current "LPX64")\n",
1012                                libcfs_id2str(lpid),
1013                                msg->ptlm_dststamp,
1014                                peer->peer_myincarnation);
1015                         kptllnd_peer_decref(peer);
1016                         return NULL;
1017                 }
1018
1019                 /* Brand new connection attempt: remove old incarnation */
1020                 kptllnd_peer_close_locked(peer, 0);
1021         }
1022
1023         kptllnd_cull_peertable_locked(lpid);
1024
1025         write_unlock_irqrestore(g_lock, flags);
1026
1027         if (peer != NULL) {
1028                 CDEBUG(D_NET, "Peer %s (%s) reconnecting:"
1029                        " stamp "LPX64"("LPX64")\n",
1030                        libcfs_id2str(lpid), kptllnd_ptlid2str(initiator),
1031                        msg->ptlm_srcstamp, peer->peer_incarnation);
1032
1033                 kptllnd_peer_decref(peer);
1034         }
1035
1036         hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1037         if (hello_tx == NULL) {
1038                 CERROR("Unable to allocate HELLO message for %s\n",
1039                        libcfs_id2str(lpid));
1040                 return NULL;
1041         }
1042
1043         kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1044                          sizeof(kptl_hello_msg_t));
1045
1046         new_peer = kptllnd_peer_allocate(lpid, initiator);
1047         if (new_peer == NULL) {
1048                 kptllnd_tx_decref(hello_tx);
1049                 return NULL;
1050         }
1051
1052         rc = kptllnd_peer_reserve_buffers();
1053         if (rc != 0) {
1054                 kptllnd_peer_decref(new_peer);
1055                 kptllnd_tx_decref(hello_tx);
1056
1057                 CERROR("Failed to reserve buffers for %s\n",
1058                        libcfs_id2str(lpid));
1059                 return NULL;
1060         }
1061
1062         write_lock_irqsave(g_lock, flags);
1063  again:
1064         peer = kptllnd_id2peer_locked(lpid);
1065         if (peer != NULL) {
1066                 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
1067                         /* An outgoing message instantiated 'peer' for me */
1068                         LASSERT(peer->peer_incarnation == 0);
1069
1070                         peer->peer_state = PEER_STATE_ACTIVE;
1071                         peer->peer_incarnation = msg->ptlm_srcstamp;
1072                         peer->peer_next_matchbits = safe_matchbits;
1073                         peer->peer_max_msg_size =
1074                                 msg->ptlm_u.hello.kptlhm_max_msg_size;
1075
1076                         write_unlock_irqrestore(g_lock, flags);
1077
1078                         CWARN("Outgoing instantiated peer %s\n",
1079                               libcfs_id2str(lpid));
1080                 } else {
1081                         LASSERT (peer->peer_state == PEER_STATE_ACTIVE);
1082
1083                         write_unlock_irqrestore(g_lock, flags);
1084
1085                         /* WOW!  Somehow this peer completed the HELLO
1086                          * handshake while I slept.  I guess I could have slept
1087                          * while it rebooted and sent a new HELLO, so I'll fail
1088                          * this one... */
1089                         CWARN("Wow! peer %s\n", libcfs_id2str(lpid));
1090                         kptllnd_peer_decref(peer);
1091                         peer = NULL;
1092                 }
1093
1094                 kptllnd_peer_unreserve_buffers();
1095                 kptllnd_peer_decref(new_peer);
1096                 kptllnd_tx_decref(hello_tx);
1097                 return peer;
1098         }
1099
1100         if (kptllnd_data.kptl_n_active_peers ==
1101             kptllnd_data.kptl_expected_peers) {
1102                 /* peer table full */
1103                 write_unlock_irqrestore(g_lock, flags);
1104
1105                 kptllnd_peertable_overflow_msg("Connection from ", lpid);
1106
1107                 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1108                 if (rc != 0) {
1109                         CERROR("Refusing connection from %s\n",
1110                                libcfs_id2str(lpid));
1111                         kptllnd_peer_unreserve_buffers();
1112                         kptllnd_peer_decref(new_peer);
1113                         kptllnd_tx_decref(hello_tx);
1114                         return NULL;
1115                 }
1116                 
1117                 write_lock_irqsave(g_lock, flags);
1118                 kptllnd_data.kptl_expected_peers++;
1119                 goto again;
1120         }
1121
1122         last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(lpid);
1123
1124         hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1125         hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1126                 *kptllnd_tunables.kptl_max_msg_size;
1127
1128         new_peer->peer_state = PEER_STATE_ACTIVE;
1129         new_peer->peer_incarnation = msg->ptlm_srcstamp;
1130         new_peer->peer_next_matchbits = safe_matchbits;
1131         new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1132         new_peer->peer_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size;
1133
1134         kptllnd_peer_add_peertable_locked(new_peer);
1135
1136         write_unlock_irqrestore(g_lock, flags);
1137
1138         /* NB someone else could get in now and post a message before I post
1139          * the HELLO, but post_tx/check_sends take care of that! */
1140
1141         CDEBUG(D_NETTRACE, "%s: post response hello %p\n",
1142                libcfs_id2str(new_peer->peer_id), hello_tx);
1143
1144         kptllnd_post_tx(new_peer, hello_tx, 0);
1145         kptllnd_peer_check_sends(new_peer);
1146
1147         return new_peer;
1148 }
1149
1150 void
1151 kptllnd_tx_launch(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
1152 {
1153         kptllnd_post_tx(peer, tx, nfrag);
1154         kptllnd_peer_check_sends(peer);
1155 }
1156
1157 int
1158 kptllnd_find_target(kptl_peer_t **peerp, lnet_process_id_t target)
1159 {
1160         rwlock_t         *g_lock = &kptllnd_data.kptl_peer_rw_lock;
1161         ptl_process_id_t  ptl_id;
1162         kptl_peer_t      *new_peer;
1163         kptl_tx_t        *hello_tx;
1164         unsigned long     flags;
1165         int               rc;
1166         __u64             last_matchbits_seen;
1167
1168         /* I expect to find the peer, so I only take a read lock... */
1169         read_lock_irqsave(g_lock, flags);
1170         *peerp = kptllnd_id2peer_locked(target);
1171         read_unlock_irqrestore(g_lock, flags);
1172
1173         if (*peerp != NULL)
1174                 return 0;
1175         
1176         if ((target.pid & LNET_PID_USERFLAG) != 0) {
1177                 CWARN("Refusing to create a new connection to %s "
1178                       "(non-kernel peer)\n", libcfs_id2str(target));
1179                 return -EHOSTUNREACH;
1180         }
1181
1182         /* The new peer is a kernel ptllnd, and kernel ptllnds all have
1183          * the same portals PID */
1184         ptl_id.nid = kptllnd_lnet2ptlnid(target.nid);
1185         ptl_id.pid = kptllnd_data.kptl_portals_id.pid;
1186
1187         hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1188         if (hello_tx == NULL) {
1189                 CERROR("Unable to allocate connect message for %s\n",
1190                        libcfs_id2str(target));
1191                 return -ENOMEM;
1192         }
1193
1194         kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1195                          sizeof(kptl_hello_msg_t));
1196
1197         new_peer = kptllnd_peer_allocate(target, ptl_id);
1198         if (new_peer == NULL) {
1199                 rc = -ENOMEM;
1200                 goto unwind_0;
1201         }
1202
1203         rc = kptllnd_peer_reserve_buffers();
1204         if (rc != 0)
1205                 goto unwind_1;
1206
1207         write_lock_irqsave(g_lock, flags);
1208  again:
1209         *peerp = kptllnd_id2peer_locked(target);
1210         if (*peerp != NULL) {
1211                 write_unlock_irqrestore(g_lock, flags);
1212                 goto unwind_2;
1213         }
1214
1215         kptllnd_cull_peertable_locked(target);
1216
1217         if (kptllnd_data.kptl_n_active_peers ==
1218             kptllnd_data.kptl_expected_peers) {
1219                 /* peer table full */
1220                 write_unlock_irqrestore(g_lock, flags);
1221
1222                 kptllnd_peertable_overflow_msg("Connection to ", target);
1223
1224                 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1225                 if (rc != 0) {
1226                         CERROR("Can't create connection to %s\n",
1227                                libcfs_id2str(target));
1228                         rc = -ENOMEM;
1229                         goto unwind_2;
1230                 }
1231                 write_lock_irqsave(g_lock, flags);
1232                 kptllnd_data.kptl_expected_peers++;
1233                 goto again;
1234         }
1235
1236         last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(target);
1237
1238         hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1239         hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1240                 *kptllnd_tunables.kptl_max_msg_size;
1241                 
1242         new_peer->peer_state = PEER_STATE_WAITING_HELLO;
1243         new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1244         
1245         kptllnd_peer_add_peertable_locked(new_peer);
1246
1247         write_unlock_irqrestore(g_lock, flags);
1248
1249         /* NB someone else could get in now and post a message before I post
1250          * the HELLO, but post_tx/check_sends take care of that! */
1251
1252         CDEBUG(D_NETTRACE, "%s: post initial hello %p\n",
1253                libcfs_id2str(new_peer->peer_id), hello_tx);
1254
1255         kptllnd_post_tx(new_peer, hello_tx, 0);
1256         kptllnd_peer_check_sends(new_peer);
1257        
1258         *peerp = new_peer;
1259         return 0;
1260         
1261  unwind_2:
1262         kptllnd_peer_unreserve_buffers();
1263  unwind_1:
1264         kptllnd_peer_decref(new_peer);
1265  unwind_0:
1266         kptllnd_tx_decref(hello_tx);
1267
1268         return rc;
1269 }