Whamcloud - gitweb
- should iterate over peer_sendq and peer_activeq.
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd_peer.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved.
5  *   Author: PJ Kirner <pjkirner@clusterfs.com>
6  *           E Barton <eeb@bartonsoftware.com>
7  *
8  *   This file is part of the Lustre file system, http://www.lustre.org
9  *   Lustre is a trademark of Cluster File Systems, Inc.
10  *
11  *   This file is confidential source code owned by Cluster File Systems.
12  *   No viewing, modification, compilation, redistribution, or any other
13  *   form of use is permitted except through a signed license agreement.
14  *
15  *   If you have not signed such an agreement, then you have no rights to
16  *   this file.  Please destroy it immediately and contact CFS.
17  *
18  */
19
20 #include "ptllnd.h"
21 #include <libcfs/list.h>
22
23 static int
24 kptllnd_count_queue(struct list_head *q)
25 {
26         struct list_head *e;
27         int               n = 0;
28         
29         list_for_each(e, q) {
30                 n++;
31         }
32
33         return n;
34 }
35
36 int
37 kptllnd_get_peer_info(int index, 
38                       lnet_process_id_t *id,
39                       int *state, int *sent_hello,
40                       int *refcount, __u64 *incarnation,
41                       __u64 *next_matchbits, __u64 *last_matchbits_seen,
42                       int *nsendq, int *nactiveq,
43                       int *credits, int *outstanding_credits) 
44 {
45         rwlock_t         *g_lock = &kptllnd_data.kptl_peer_rw_lock;
46         unsigned long     flags;
47         struct list_head *ptmp;
48         kptl_peer_t      *peer;
49         int               i;
50         int               rc = -ENOENT;
51
52         read_lock_irqsave(g_lock, flags);
53
54         for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++) {
55                 
56                 list_for_each (ptmp, &kptllnd_data.kptl_peers[i]) {
57                         peer = list_entry(ptmp, kptl_peer_t, peer_list);
58
59                         if (index-- > 0)
60                                 continue;
61                         
62                         *id          = peer->peer_id;
63                         *state       = peer->peer_state;
64                         *sent_hello  = peer->peer_sent_hello;
65                         *refcount    = atomic_read(&peer->peer_refcount);
66                         *incarnation = peer->peer_incarnation;
67
68                         spin_lock(&peer->peer_lock);
69
70                         *next_matchbits      = peer->peer_next_matchbits;
71                         *last_matchbits_seen = peer->peer_last_matchbits_seen;
72                         *credits             = peer->peer_credits;
73                         *outstanding_credits = peer->peer_outstanding_credits;
74
75                         *nsendq   = kptllnd_count_queue(&peer->peer_sendq);
76                         *nactiveq = kptllnd_count_queue(&peer->peer_activeq);
77
78                         spin_unlock(&peer->peer_lock);
79
80                         rc = 0;
81                         goto out;
82                 }
83         }
84         
85  out:
86         read_unlock_irqrestore(g_lock, flags);
87         return rc;
88 }
89
90 void
91 kptllnd_peer_add_peertable_locked (kptl_peer_t *peer)
92 {
93         LASSERT (!kptllnd_data.kptl_shutdown);
94         LASSERT (kptllnd_data.kptl_n_active_peers <
95                  kptllnd_data.kptl_expected_peers);
96
97         LASSERT (peer->peer_state == PEER_STATE_WAITING_HELLO ||
98                  peer->peer_state == PEER_STATE_ACTIVE);
99         
100         kptllnd_data.kptl_n_active_peers++;
101         atomic_inc(&peer->peer_refcount);       /* +1 ref for the list */
102
103         /* NB add to HEAD of peer list for MRU order!
104          * (see kptllnd_cull_peertable) */
105         list_add(&peer->peer_list, kptllnd_nid2peerlist(peer->peer_id.nid));
106 }
107
108 void
109 kptllnd_cull_peertable_locked (lnet_process_id_t pid)
110 {
111         /* I'm about to add a new peer with this portals ID to the peer table,
112          * so (a) this peer should not exist already and (b) I want to leave at
113          * most (max_procs_per_nid - 1) peers with this NID in the table. */
114         struct list_head  *peers = kptllnd_nid2peerlist(pid.nid);
115         int                cull_count = *kptllnd_tunables.kptl_max_procs_per_node;
116         int                count;
117         struct list_head  *tmp;
118         struct list_head  *nxt;
119         kptl_peer_t       *peer;
120         
121         count = 0;
122         list_for_each_safe (tmp, nxt, peers) {
123                 /* NB I rely on kptllnd_peer_add_peertable_locked to add peers
124                  * in MRU order */
125                 peer = list_entry(tmp, kptl_peer_t, peer_list);
126                         
127                 if (peer->peer_id.nid != pid.nid)
128                         continue;
129
130                 LASSERT (peer->peer_id.pid != pid.pid);
131                         
132                 count++;
133
134                 if (count < cull_count) /* recent (don't cull) */
135                         continue;
136
137                 CDEBUG(D_NET, "Cull %s(%s)\n",
138                        libcfs_id2str(peer->peer_id),
139                        kptllnd_ptlid2str(peer->peer_ptlid));
140                 
141                 kptllnd_peer_close_locked(peer, 0);
142         }
143 }
144
145 kptl_peer_t *
146 kptllnd_peer_allocate (lnet_process_id_t lpid, ptl_process_id_t ppid)
147 {
148         unsigned long    flags;
149         kptl_peer_t     *peer;
150
151         LIBCFS_ALLOC(peer, sizeof (*peer));
152         if (peer == NULL) {
153                 CERROR("Can't create peer %s (%s)\n",
154                        libcfs_id2str(lpid), 
155                        kptllnd_ptlid2str(ppid));
156                 return NULL;
157         }
158
159         memset(peer, 0, sizeof(*peer));         /* zero flags etc */
160
161         INIT_LIST_HEAD (&peer->peer_sendq);
162         INIT_LIST_HEAD (&peer->peer_activeq);
163         spin_lock_init (&peer->peer_lock);
164
165         peer->peer_state = PEER_STATE_ALLOCATED;
166         peer->peer_error = 0;
167         peer->peer_last_alive = cfs_time_current();
168         peer->peer_id = lpid;
169         peer->peer_ptlid = ppid;
170         peer->peer_credits = 1;                 /* enough for HELLO */
171         peer->peer_next_matchbits = PTL_RESERVED_MATCHBITS;
172         peer->peer_outstanding_credits = *kptllnd_tunables.kptl_peercredits - 1;
173         peer->peer_sent_credits = 1;           /* HELLO credit is implicit */
174         peer->peer_max_msg_size = PTLLND_MIN_BUFFER_SIZE; /* until we know better */
175
176         atomic_set(&peer->peer_refcount, 1);    /* 1 ref for caller */
177
178         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
179
180         peer->peer_myincarnation = kptllnd_data.kptl_incarnation;
181
182         /* Only increase # peers under lock, to guarantee we dont grow it
183          * during shutdown */
184         if (kptllnd_data.kptl_shutdown) {
185                 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, 
186                                         flags);
187                 LIBCFS_FREE(peer, sizeof(*peer));
188                 return NULL;
189         }
190
191         kptllnd_data.kptl_npeers++;
192         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
193         
194         return peer;
195 }
196
197 void
198 kptllnd_peer_destroy (kptl_peer_t *peer)
199 {
200         unsigned long flags;
201         
202         CDEBUG(D_NET, "Peer=%p\n", peer);
203
204         LASSERT (!in_interrupt());
205         LASSERT (atomic_read(&peer->peer_refcount) == 0);
206         LASSERT (peer->peer_state == PEER_STATE_ALLOCATED ||
207                  peer->peer_state == PEER_STATE_ZOMBIE);
208         LASSERT (list_empty(&peer->peer_sendq));
209         LASSERT (list_empty(&peer->peer_activeq));
210
211         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
212
213         if (peer->peer_state == PEER_STATE_ZOMBIE)
214                 list_del(&peer->peer_list);
215
216         kptllnd_data.kptl_npeers--;
217
218         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
219
220         LIBCFS_FREE (peer, sizeof (*peer));
221 }
222
223 void
224 kptllnd_cancel_txlist (struct list_head *peerq, struct list_head *txs)
225 {
226         struct list_head  *tmp;
227         struct list_head  *nxt;
228         kptl_tx_t         *tx;
229
230         list_for_each_safe (tmp, nxt, peerq) {
231                 tx = list_entry(tmp, kptl_tx_t, tx_list);
232
233                 list_del(&tx->tx_list);
234                 list_add_tail(&tx->tx_list, txs);
235
236                 tx->tx_status = -EIO;
237                 tx->tx_active = 0;
238         }
239 }
240
241 void
242 kptllnd_peer_cancel_txs(kptl_peer_t *peer, struct list_head *txs)
243 {
244         unsigned long   flags;
245
246         spin_lock_irqsave(&peer->peer_lock, flags);
247
248         kptllnd_cancel_txlist(&peer->peer_sendq, txs);
249         kptllnd_cancel_txlist(&peer->peer_activeq, txs);
250                 
251         spin_unlock_irqrestore(&peer->peer_lock, flags);
252 }
253
254 void
255 kptllnd_peer_alive (kptl_peer_t *peer)
256 {
257         /* This is racy, but everyone's only writing cfs_time_current() */
258         peer->peer_last_alive = cfs_time_current();
259         mb();
260 }
261
262 void
263 kptllnd_peer_notify (kptl_peer_t *peer)
264 {
265         unsigned long flags;
266         time_t        last_alive = 0;
267         int           error = 0;
268         
269         spin_lock_irqsave(&peer->peer_lock, flags);
270
271         if (peer->peer_error != 0) {
272                 error = peer->peer_error;
273                 peer->peer_error = 0;
274                 
275                 last_alive = cfs_time_current_sec() - 
276                              cfs_duration_sec(cfs_time_current() - 
277                                               peer->peer_last_alive);
278         }
279         
280         spin_unlock_irqrestore(&peer->peer_lock, flags);
281
282         if (error != 0)
283                 lnet_notify (kptllnd_data.kptl_ni, peer->peer_id.nid, 0,
284                              last_alive);
285 }
286
287 void
288 kptllnd_handle_closing_peers ()
289 {
290         unsigned long           flags;
291         struct list_head        txs;
292         kptl_peer_t            *peer;
293         struct list_head       *tmp;
294         struct list_head       *nxt;
295         kptl_tx_t              *tx;
296         int                     idle;
297
298         /* Check with a read lock first to avoid blocking anyone */
299
300         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
301         idle = list_empty(&kptllnd_data.kptl_closing_peers) &&
302                list_empty(&kptllnd_data.kptl_zombie_peers);
303         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
304
305         if (idle)
306                 return;
307
308         INIT_LIST_HEAD(&txs);
309
310         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
311
312         /* Cancel txs on all zombie peers.  NB anyone dropping the last peer
313          * ref removes it from this list, so I musn't drop the lock while
314          * scanning it. */
315         list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
316                 peer = list_entry (tmp, kptl_peer_t, peer_list);
317
318                 LASSERT (peer->peer_state == PEER_STATE_ZOMBIE);
319
320                 kptllnd_peer_cancel_txs(peer, &txs);
321         }
322
323         /* Notify LNET and cancel txs on closing (i.e. newly closed) peers.  NB
324          * I'm the only one removing from this list, but peers can be added on
325          * the end any time I drop the lock. */
326
327         list_for_each_safe (tmp, nxt, &kptllnd_data.kptl_closing_peers) {
328                 peer = list_entry (tmp, kptl_peer_t, peer_list);
329
330                 LASSERT (peer->peer_state == PEER_STATE_CLOSING);
331
332                 list_del(&peer->peer_list);
333                 list_add_tail(&peer->peer_list,
334                               &kptllnd_data.kptl_zombie_peers);
335                 peer->peer_state = PEER_STATE_ZOMBIE;
336
337                 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
338
339                 kptllnd_peer_notify(peer);
340                 kptllnd_peer_cancel_txs(peer, &txs);
341                 kptllnd_peer_decref(peer);
342
343                 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
344         }
345
346         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
347
348         /* Drop peer's ref on all cancelled txs.  This will get
349          * kptllnd_tx_fini() to abort outstanding comms if necessary. */
350
351         list_for_each_safe (tmp, nxt, &txs) {
352                 tx = list_entry(tmp, kptl_tx_t, tx_list);
353                 list_del(&tx->tx_list);
354                 kptllnd_tx_decref(tx);
355         }
356 }
357
358 void
359 kptllnd_peer_close_locked(kptl_peer_t *peer, int why)
360 {
361         switch (peer->peer_state) {
362         default:
363                 LBUG();
364
365         case PEER_STATE_WAITING_HELLO:
366         case PEER_STATE_ACTIVE:
367                 /* Ensure new peers see a new incarnation of me */
368                 LASSERT(peer->peer_myincarnation <= kptllnd_data.kptl_incarnation);
369                 if (peer->peer_myincarnation == kptllnd_data.kptl_incarnation)
370                         kptllnd_data.kptl_incarnation++;
371
372                 /* Removing from peer table */
373                 kptllnd_data.kptl_n_active_peers--;
374                 LASSERT (kptllnd_data.kptl_n_active_peers >= 0);
375
376                 list_del(&peer->peer_list);
377                 kptllnd_peer_unreserve_buffers();
378
379                 peer->peer_error = why; /* stash 'why' only on first close */
380                 peer->peer_state = PEER_STATE_CLOSING;
381
382                 /* Schedule for immediate attention, taking peer table's ref */
383                 list_add_tail(&peer->peer_list, 
384                               &kptllnd_data.kptl_closing_peers);
385                 wake_up(&kptllnd_data.kptl_watchdog_waitq);
386                 break;
387
388         case PEER_STATE_ZOMBIE:
389         case PEER_STATE_CLOSING:
390                 break;
391         }
392 }
393
394 void
395 kptllnd_peer_close(kptl_peer_t *peer, int why)
396 {
397         unsigned long      flags;
398
399         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
400         kptllnd_peer_close_locked(peer, why);
401         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
402 }
403
404 int
405 kptllnd_peer_del(lnet_process_id_t id)
406 {
407         struct list_head  *ptmp;
408         struct list_head  *pnxt;
409         kptl_peer_t       *peer;
410         int                lo;
411         int                hi;
412         int                i;
413         unsigned long      flags;
414         int                rc = -ENOENT;
415
416         /*
417          * Find the single bucket we are supposed to look at or if nid is a
418          * wildcard (LNET_NID_ANY) then look at all of the buckets
419          */
420         if (id.nid != LNET_NID_ANY) {
421                 struct list_head *l = kptllnd_nid2peerlist(id.nid);
422                 
423                 lo = hi =  l - kptllnd_data.kptl_peers;
424         } else {
425                 if (id.pid != LNET_PID_ANY)
426                         return -EINVAL;
427                 
428                 lo = 0;
429                 hi = kptllnd_data.kptl_peer_hash_size - 1;
430         }
431
432 again:
433         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
434
435         for (i = lo; i <= hi; i++) {
436                 list_for_each_safe (ptmp, pnxt, &kptllnd_data.kptl_peers[i]) {
437                         peer = list_entry (ptmp, kptl_peer_t, peer_list);
438
439                         if (!(id.nid == LNET_NID_ANY || 
440                               (peer->peer_id.nid == id.nid &&
441                                (id.pid == LNET_PID_ANY || 
442                                 peer->peer_id.pid == id.pid))))
443                                 continue;
444
445                         kptllnd_peer_addref(peer); /* 1 ref for me... */
446
447                         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
448                                                flags);
449
450                         kptllnd_peer_close(peer, 0);
451                         kptllnd_peer_decref(peer); /* ...until here */
452
453                         rc = 0;         /* matched something */
454
455                         /* start again now I've dropped the lock */
456                         goto again;
457                 }
458         }
459
460         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
461
462         return (rc);
463 }
464
465 void
466 kptllnd_post_tx(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
467 {
468         /* CAVEAT EMPTOR: I take over caller's ref on 'tx' */
469         ptl_handle_md_t  msg_mdh;
470         ptl_md_t         md;
471         ptl_err_t        prc;
472         unsigned long    flags;
473
474         LASSERT (!tx->tx_idle);
475         LASSERT (!tx->tx_active);
476         LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
477         LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
478         LASSERT (tx->tx_type == TX_TYPE_SMALL_MESSAGE ||
479                  tx->tx_type == TX_TYPE_PUT_REQUEST ||
480                  tx->tx_type == TX_TYPE_GET_REQUEST);
481
482         kptllnd_set_tx_peer(tx, peer);
483
484         memset(&md, 0, sizeof(md));
485
486         md.threshold = tx->tx_acked ? 2 : 1;    /* SEND END + ACK? */
487         md.options = PTL_MD_OP_PUT |
488                      PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
489                      PTL_MD_EVENT_START_DISABLE;
490         md.user_ptr = &tx->tx_msg_eventarg;
491         md.eq_handle = kptllnd_data.kptl_eqh;
492
493         if (nfrag == 0) {
494                 md.start = tx->tx_msg;
495                 md.length = tx->tx_msg->ptlm_nob;
496         } else {
497                 LASSERT (nfrag > 1);
498                 LASSERT (tx->tx_frags->iov[0].iov_base == (void *)tx->tx_msg);
499
500                 md.start = tx->tx_frags;
501                 md.length = nfrag;
502                 md.options |= PTL_MD_IOVEC;
503         }
504
505         prc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &msg_mdh);
506         if (prc != PTL_OK) {
507                 CERROR("PtlMDBind(%s) failed: %s(%d)\n",
508                        libcfs_id2str(peer->peer_id),
509                        kptllnd_errtype2str(prc), prc);
510                 tx->tx_status = -EIO;
511                 kptllnd_tx_decref(tx);
512                 return;
513         }
514         
515         spin_lock_irqsave(&peer->peer_lock, flags);
516
517         tx->tx_deadline = jiffies + (*kptllnd_tunables.kptl_timeout * HZ);
518         tx->tx_active = 1;
519         tx->tx_msg_mdh = msg_mdh;
520
521         /* Ensure HELLO is sent first */
522         if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_HELLO)
523                 list_add(&tx->tx_list, &peer->peer_sendq);
524         else
525                 list_add_tail(&tx->tx_list, &peer->peer_sendq);
526
527         spin_unlock_irqrestore(&peer->peer_lock, flags);
528 }
529
530 void
531 kptllnd_peer_check_sends (kptl_peer_t *peer)
532 {
533         ptl_handle_me_t  meh;
534         kptl_tx_t       *tx;
535         int              rc;
536         unsigned long    flags;
537
538         LASSERT(!in_interrupt());
539
540         spin_lock_irqsave(&peer->peer_lock, flags);
541
542         peer->peer_retry_noop = 0;
543
544         if (list_empty(&peer->peer_sendq) &&
545             peer->peer_outstanding_credits >= PTLLND_CREDIT_HIGHWATER &&
546             peer->peer_credits != 0) {
547
548                 /* post a NOOP to return credits */
549                 spin_unlock_irqrestore(&peer->peer_lock, flags);
550
551                 tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
552                 if (tx == NULL) {
553                         CERROR("Can't return credits to %s: can't allocate descriptor\n",
554                                libcfs_id2str(peer->peer_id));
555                 } else {
556                         kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_NOOP, 0);
557                         kptllnd_post_tx(peer, tx, 0);
558                 }
559
560                 spin_lock_irqsave(&peer->peer_lock, flags);
561                 peer->peer_retry_noop = (tx == NULL);
562         }
563
564         while (!list_empty(&peer->peer_sendq)) {
565                 tx = list_entry (peer->peer_sendq.next, kptl_tx_t, tx_list);
566
567                 LASSERT (tx->tx_active);
568                 LASSERT (!PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
569                 LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
570
571                 LASSERT (peer->peer_outstanding_credits >= 0);
572                 LASSERT (peer->peer_sent_credits >= 0);
573                 LASSERT (peer->peer_sent_credits +
574                          peer->peer_outstanding_credits <=
575                          *kptllnd_tunables.kptl_peercredits);
576                 LASSERT (peer->peer_credits >= 0);
577
578                 /* Ensure HELLO is sent first */
579                 if (!peer->peer_sent_hello) {
580                         if (tx->tx_msg->ptlm_type != PTLLND_MSG_TYPE_HELLO)
581                                 break;
582                         peer->peer_sent_hello = 1;
583                 }
584
585                 if (peer->peer_credits == 0) {
586                         CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: no credits for %p\n",
587                                libcfs_id2str(peer->peer_id), 
588                                peer->peer_credits,
589                                peer->peer_outstanding_credits, 
590                                peer->peer_sent_credits, tx);
591                         break;
592                 }
593
594                 /* Don't use the last credit unless I've got credits to
595                  * return */
596                 if (peer->peer_credits == 1 &&
597                     peer->peer_outstanding_credits == 0) {
598                         CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: "
599                                "not using last credit for %p\n",
600                                libcfs_id2str(peer->peer_id), 
601                                peer->peer_credits,
602                                peer->peer_outstanding_credits,
603                                peer->peer_sent_credits, tx);
604                         break;
605                 }
606
607                 list_del(&tx->tx_list);
608
609                 /* Discard any NOOP I queued if I'm not at the high-water mark
610                  * any more or more messages have been queued */
611                 if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_NOOP &&
612                     (!list_empty(&peer->peer_sendq) ||
613                      peer->peer_outstanding_credits < PTLLND_CREDIT_HIGHWATER)) {
614
615                         tx->tx_active = 0;
616
617                         spin_unlock_irqrestore(&peer->peer_lock, flags);
618
619                         CDEBUG(D_NET, "%s: redundant noop\n", 
620                                libcfs_id2str(peer->peer_id));
621                         kptllnd_tx_decref(tx);
622
623                         spin_lock_irqsave(&peer->peer_lock, flags);
624                         continue;
625                 }
626
627                 /* fill last-minute msg fields */
628                 kptllnd_msg_pack(tx->tx_msg, peer);
629
630                 if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
631                     tx->tx_type == TX_TYPE_GET_REQUEST) {
632                         /* peer_next_matchbits must be known good */
633                         LASSERT (peer->peer_state >= PEER_STATE_ACTIVE);
634                         /* Assume 64-bit matchbits can't wrap */
635                         LASSERT (peer->peer_next_matchbits >= PTL_RESERVED_MATCHBITS);
636                         tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits =
637                                 peer->peer_next_matchbits++;
638                 }
639                 
640                 peer->peer_sent_credits += peer->peer_outstanding_credits;
641                 peer->peer_outstanding_credits = 0;
642                 peer->peer_credits--;
643
644                 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: %s tx=%p nob=%d cred=%d\n",
645                        libcfs_id2str(peer->peer_id), peer->peer_credits,
646                        peer->peer_outstanding_credits, peer->peer_sent_credits,
647                        kptllnd_msgtype2str(tx->tx_msg->ptlm_type),
648                        tx, tx->tx_msg->ptlm_nob,
649                        tx->tx_msg->ptlm_credits);
650
651                 list_add_tail(&tx->tx_list, &peer->peer_activeq);
652
653                 kptllnd_tx_addref(tx);          /* 1 ref for me... */
654
655                 spin_unlock_irqrestore(&peer->peer_lock, flags);
656
657                 if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
658                     tx->tx_type == TX_TYPE_GET_REQUEST) {
659                         /* Post bulk now we have safe matchbits */
660                         rc = PtlMEAttach(kptllnd_data.kptl_nih,
661                                          *kptllnd_tunables.kptl_portal,
662                                          peer->peer_ptlid,
663                                          tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits,
664                                          0,             /* ignore bits */
665                                          PTL_UNLINK,
666                                          PTL_INS_BEFORE,
667                                          &meh);
668                         if (rc != PTL_OK) {
669                                 CERROR("PtlMEAttach(%s) failed: %s(%d)\n",
670                                        libcfs_id2str(peer->peer_id),
671                                        kptllnd_errtype2str(rc), rc);
672                                 goto failed;
673                         }
674
675                         rc = PtlMDAttach(meh, tx->tx_rdma_md, PTL_UNLINK,
676                                          &tx->tx_rdma_mdh);
677                         if (rc != PTL_OK) {
678                                 CERROR("PtlMDAttach(%s) failed: %s(%d)\n",
679                                        libcfs_id2str(tx->tx_peer->peer_id),
680                                        kptllnd_errtype2str(rc), rc);
681                                 rc = PtlMEUnlink(meh);
682                                 LASSERT(rc == PTL_OK);
683                                 tx->tx_rdma_mdh = PTL_INVALID_HANDLE;
684                                 goto failed;
685                         }
686                         /* I'm not racing with the event callback here.  It's a
687                          * bug if there's an event on the MD I just attached
688                          * before I actually send the RDMA request message -
689                          * probably matchbits re-used in error. */
690                 }
691
692                 tx->tx_tposted = jiffies;       /* going on the wire */
693
694                 rc = PtlPut (tx->tx_msg_mdh,
695                              tx->tx_acked ? PTL_ACK_REQ : PTL_NOACK_REQ,
696                              peer->peer_ptlid,
697                              *kptllnd_tunables.kptl_portal,
698                              0,                 /* acl cookie */
699                              LNET_MSG_MATCHBITS,
700                              0,                 /* offset */
701                              0);                /* header data */
702                 if (rc != PTL_OK) {
703                         CERROR("PtlPut %s error %s(%d)\n",
704                                libcfs_id2str(peer->peer_id),
705                                kptllnd_errtype2str(rc), rc);
706                         goto failed;
707                 }
708
709                 kptllnd_tx_decref(tx);          /* drop my ref */
710
711                 spin_lock_irqsave(&peer->peer_lock, flags);
712         }
713
714         spin_unlock_irqrestore(&peer->peer_lock, flags);
715         return;
716
717  failed:
718         /* Nuke everything (including tx we were trying) */
719         kptllnd_peer_close(peer, -EIO);
720         kptllnd_tx_decref(tx);
721 }
722
723 kptl_tx_t *
724 kptllnd_find_timed_out_tx(kptl_peer_t *peer)
725 {
726         kptl_tx_t         *tx;
727         struct list_head  *ele;
728
729         list_for_each(ele, &peer->peer_sendq) {
730                 tx = list_entry(ele, kptl_tx_t, tx_list);
731
732                 if (time_after_eq(jiffies, tx->tx_deadline)) {
733                         kptllnd_tx_addref(tx);
734                         return tx;
735                 }
736         }
737
738         list_for_each(ele, &peer->peer_activeq) {
739                 tx = list_entry(ele, kptl_tx_t, tx_list);
740
741                 if (time_after_eq(jiffies, tx->tx_deadline)) {
742                         kptllnd_tx_addref(tx);
743                         return tx;
744                 }
745         }
746
747         return NULL;
748 }
749
750
751 void
752 kptllnd_peer_check_bucket (int idx, int stamp)
753 {
754         struct list_head  *peers = &kptllnd_data.kptl_peers[idx];
755         struct list_head  *ptmp;
756         kptl_peer_t       *peer;
757         kptl_tx_t         *tx;
758         unsigned long      flags;
759         int                nsend;
760         int                nactive;
761         int                check_sends;
762
763         CDEBUG(D_NET, "Bucket=%d, stamp=%d\n", idx, stamp);
764
765  again:
766         /* NB. Shared lock while I just look */
767         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
768
769         list_for_each (ptmp, peers) {
770                 peer = list_entry (ptmp, kptl_peer_t, peer_list);
771
772                 CDEBUG(D_NET, "Peer=%s Credits=%d Outstanding=%d Send=%d\n",
773                        libcfs_id2str(peer->peer_id), peer->peer_credits, 
774                        peer->peer_outstanding_credits, peer->peer_sent_credits);
775
776                 spin_lock(&peer->peer_lock);
777
778                 if (peer->peer_check_stamp == stamp) {
779                         /* checked already this pass */
780                         spin_unlock(&peer->peer_lock);
781                         continue;
782                 }
783
784                 peer->peer_check_stamp = stamp;
785                 tx = kptllnd_find_timed_out_tx(peer);
786                 check_sends = peer->peer_retry_noop;
787                 
788                 spin_unlock(&peer->peer_lock);
789                 
790                 if (tx == NULL && !check_sends)
791                         continue;
792
793                 kptllnd_peer_addref(peer); /* 1 ref for me... */
794
795                 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
796
797                 if (tx == NULL) { /* nothing timed out */
798                         kptllnd_peer_check_sends(peer);
799                         kptllnd_peer_decref(peer); /* ...until here or... */
800
801                         /* rescan after dropping the lock */
802                         goto again;
803                 }
804
805                 spin_lock_irqsave(&peer->peer_lock, flags);
806                 nsend = kptllnd_count_queue(&peer->peer_sendq);
807                 nactive = kptllnd_count_queue(&peer->peer_activeq);
808                 spin_unlock_irqrestore(&peer->peer_lock, flags);
809
810                 LCONSOLE_ERROR_MSG(0x126, "Timing out %s: %s\n",
811                                    libcfs_id2str(peer->peer_id),
812                                    (tx->tx_tposted == 0) ? 
813                                    "no free peer buffers" : 
814                                    "please check Portals");
815
816                 CERROR("%s timed out: cred %d outstanding %d, sent %d, "
817                        "sendq %d, activeq %d Tx %p %s (%s%s%s) status %d "
818                        "%sposted %lu T/O %ds\n",
819                        libcfs_id2str(peer->peer_id), peer->peer_credits,
820                        peer->peer_outstanding_credits, peer->peer_sent_credits,
821                        nsend, nactive, tx, kptllnd_tx_typestr(tx->tx_type),
822                        tx->tx_active ? "A" : "",
823                        PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ?
824                        "" : "M",
825                        PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE) ?
826                        "" : "D",
827                        tx->tx_status,
828                        (tx->tx_tposted == 0) ? "not " : "",
829                        (tx->tx_tposted == 0) ? 0UL : (jiffies - tx->tx_tposted),
830                        *kptllnd_tunables.kptl_timeout);
831
832                 kptllnd_dump_ptltrace();
833
834                 kptllnd_tx_decref(tx);
835
836                 kptllnd_peer_close(peer, -ETIMEDOUT);
837                 kptllnd_peer_decref(peer); /* ...until here */
838
839                 /* start again now I've dropped the lock */
840                 goto again;
841         }
842
843         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
844 }
845
846 kptl_peer_t *
847 kptllnd_id2peer_locked (lnet_process_id_t id)
848 {
849         struct list_head *peers = kptllnd_nid2peerlist(id.nid);
850         struct list_head *tmp;
851         kptl_peer_t      *peer;
852
853         list_for_each (tmp, peers) {
854
855                 peer = list_entry (tmp, kptl_peer_t, peer_list);
856
857                 LASSERT(peer->peer_state == PEER_STATE_WAITING_HELLO ||
858                         peer->peer_state == PEER_STATE_ACTIVE);
859                 
860                 if (peer->peer_id.nid != id.nid ||
861                     peer->peer_id.pid != id.pid)
862                         continue;
863
864                 kptllnd_peer_addref(peer);
865
866                 CDEBUG(D_NET, "%s -> %s (%d)\n",
867                        libcfs_id2str(id), 
868                        kptllnd_ptlid2str(peer->peer_ptlid),
869                        atomic_read (&peer->peer_refcount));
870                 return peer;
871         }
872
873         return NULL;
874 }
875
876 void
877 kptllnd_peertable_overflow_msg(char *str, lnet_process_id_t id)
878 {
879         LCONSOLE_ERROR_MSG(0x127, "%s %s overflows the peer table[%d]: "
880                            "messages may be dropped\n",
881                            str, libcfs_id2str(id),
882                            kptllnd_data.kptl_n_active_peers);
883         LCONSOLE_ERROR_MSG(0x128, "Please correct by increasing "
884                            "'max_nodes' or 'max_procs_per_node'\n");
885 }
886
887 __u64
888 kptllnd_get_last_seen_matchbits_locked(lnet_process_id_t lpid)
889 {
890         kptl_peer_t            *peer;
891         struct list_head       *tmp;
892
893         /* Find the last matchbits I saw this new peer using.  Note..
894            A. This peer cannot be in the peer table - she's new!
895            B. If I can't find the peer in the closing/zombie peers, all
896               matchbits are safe because all refs to the (old) peer have gone
897               so all txs have completed so there's no risk of matchbit
898               collision!
899          */
900
901         LASSERT(kptllnd_id2peer_locked(lpid) == NULL);
902
903         /* peer's last matchbits can't change after it comes out of the peer
904          * table, so first match is fine */
905
906         list_for_each (tmp, &kptllnd_data.kptl_closing_peers) {
907                 peer = list_entry (tmp, kptl_peer_t, peer_list);
908
909                 if (peer->peer_id.nid == lpid.nid &&
910                     peer->peer_id.pid == lpid.pid)
911                         return peer->peer_last_matchbits_seen;
912         }
913         
914         list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
915                 peer = list_entry (tmp, kptl_peer_t, peer_list);
916
917                 if (peer->peer_id.nid == lpid.nid &&
918                     peer->peer_id.pid == lpid.pid)
919                         return peer->peer_last_matchbits_seen;
920         }
921         
922         return PTL_RESERVED_MATCHBITS;
923 }
924
925 kptl_peer_t *
926 kptllnd_peer_handle_hello (ptl_process_id_t  initiator,
927                            kptl_msg_t       *msg)
928 {
929         rwlock_t           *g_lock = &kptllnd_data.kptl_peer_rw_lock;
930         kptl_peer_t        *peer;
931         kptl_peer_t        *new_peer;
932         lnet_process_id_t   lpid;
933         unsigned long       flags;
934         kptl_tx_t          *hello_tx;
935         int                 rc;
936         __u64               safe_matchbits;
937         __u64               last_matchbits_seen;
938
939         lpid.nid = msg->ptlm_srcnid;
940         lpid.pid = msg->ptlm_srcpid;
941
942         CDEBUG(D_NET, "hello from %s(%s)\n",
943                libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
944
945         if (initiator.pid != kptllnd_data.kptl_portals_id.pid &&
946             (msg->ptlm_srcpid & LNET_PID_USERFLAG) == 0) {
947                 /* If the peer's PID isn't _the_ ptllnd kernel pid, she must be
948                  * userspace.  Refuse the connection if she hasn't set the
949                  * correct flag in her PID... */
950                 CERROR("Userflag not set in hello from %s (%s)\n",
951                        libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
952                 return NULL;
953         }
954         
955         /* kptlhm_matchbits are the highest matchbits my peer may have used to
956          * RDMA to me.  I ensure I never register buffers for RDMA that could
957          * match any she used */
958         safe_matchbits = msg->ptlm_u.hello.kptlhm_matchbits + 1;
959
960         if (safe_matchbits < PTL_RESERVED_MATCHBITS) {
961                 CERROR("Illegal matchbits "LPX64" in HELLO from %s\n",
962                        safe_matchbits, libcfs_id2str(lpid));
963                 return NULL;
964         }
965         
966         if (msg->ptlm_u.hello.kptlhm_max_msg_size < PTLLND_MIN_BUFFER_SIZE) {
967                 CERROR("%s: max message size %d < MIN %d",
968                        libcfs_id2str(lpid),
969                        msg->ptlm_u.hello.kptlhm_max_msg_size,
970                        PTLLND_MIN_BUFFER_SIZE);
971                 return NULL;
972         }
973
974         if (msg->ptlm_credits <= 1) {
975                 CERROR("Need more than 1+%d credits from %s\n",
976                        msg->ptlm_credits, libcfs_id2str(lpid));
977                 return NULL;
978         }
979         
980         write_lock_irqsave(g_lock, flags);
981
982         peer = kptllnd_id2peer_locked(lpid);
983         if (peer != NULL) {
984                 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
985                         /* Completing HELLO handshake */
986                         LASSERT(peer->peer_incarnation == 0);
987
988                         if (msg->ptlm_dststamp != 0 &&
989                             msg->ptlm_dststamp != peer->peer_myincarnation) {
990                                 write_unlock_irqrestore(g_lock, flags);
991
992                                 CERROR("Ignoring HELLO from %s: unexpected "
993                                        "dststamp "LPX64" ("LPX64" wanted)\n",
994                                        libcfs_id2str(lpid),
995                                        msg->ptlm_dststamp,
996                                        peer->peer_myincarnation);
997                                 kptllnd_peer_decref(peer);
998                                 return NULL;
999                         }
1000                         
1001                         /* Concurrent initiation or response to my HELLO */
1002                         peer->peer_state = PEER_STATE_ACTIVE;
1003                         peer->peer_incarnation = msg->ptlm_srcstamp;
1004                         peer->peer_next_matchbits = safe_matchbits;
1005                         peer->peer_max_msg_size =
1006                                 msg->ptlm_u.hello.kptlhm_max_msg_size;
1007                         
1008                         write_unlock_irqrestore(g_lock, flags);
1009                         return peer;
1010                 }
1011
1012                 if (msg->ptlm_dststamp != 0 &&
1013                     msg->ptlm_dststamp <= peer->peer_myincarnation) {
1014                         write_unlock_irqrestore(g_lock, flags);
1015
1016                         CERROR("Ignoring stale HELLO from %s: "
1017                                "dststamp "LPX64" (current "LPX64")\n",
1018                                libcfs_id2str(lpid),
1019                                msg->ptlm_dststamp,
1020                                peer->peer_myincarnation);
1021                         kptllnd_peer_decref(peer);
1022                         return NULL;
1023                 }
1024
1025                 /* Brand new connection attempt: remove old incarnation */
1026                 kptllnd_peer_close_locked(peer, 0);
1027         }
1028
1029         kptllnd_cull_peertable_locked(lpid);
1030
1031         write_unlock_irqrestore(g_lock, flags);
1032
1033         if (peer != NULL) {
1034                 CDEBUG(D_NET, "Peer %s (%s) reconnecting:"
1035                        " stamp "LPX64"("LPX64")\n",
1036                        libcfs_id2str(lpid), kptllnd_ptlid2str(initiator),
1037                        msg->ptlm_srcstamp, peer->peer_incarnation);
1038
1039                 kptllnd_peer_decref(peer);
1040         }
1041
1042         hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1043         if (hello_tx == NULL) {
1044                 CERROR("Unable to allocate HELLO message for %s\n",
1045                        libcfs_id2str(lpid));
1046                 return NULL;
1047         }
1048
1049         kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1050                          sizeof(kptl_hello_msg_t));
1051
1052         new_peer = kptllnd_peer_allocate(lpid, initiator);
1053         if (new_peer == NULL) {
1054                 kptllnd_tx_decref(hello_tx);
1055                 return NULL;
1056         }
1057
1058         rc = kptllnd_peer_reserve_buffers();
1059         if (rc != 0) {
1060                 kptllnd_peer_decref(new_peer);
1061                 kptllnd_tx_decref(hello_tx);
1062
1063                 CERROR("Failed to reserve buffers for %s\n",
1064                        libcfs_id2str(lpid));
1065                 return NULL;
1066         }
1067
1068         write_lock_irqsave(g_lock, flags);
1069
1070  again:
1071         if (kptllnd_data.kptl_shutdown) {
1072                 write_unlock_irqrestore(g_lock, flags);
1073
1074                 CERROR ("Shutdown started, refusing connection from %s\n",
1075                         libcfs_id2str(lpid));
1076                 kptllnd_peer_unreserve_buffers();
1077                 kptllnd_peer_decref(new_peer);
1078                 kptllnd_tx_decref(hello_tx);
1079                 return NULL;
1080         }
1081
1082         peer = kptllnd_id2peer_locked(lpid);
1083         if (peer != NULL) {
1084                 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
1085                         /* An outgoing message instantiated 'peer' for me */
1086                         LASSERT(peer->peer_incarnation == 0);
1087
1088                         peer->peer_state = PEER_STATE_ACTIVE;
1089                         peer->peer_incarnation = msg->ptlm_srcstamp;
1090                         peer->peer_next_matchbits = safe_matchbits;
1091                         peer->peer_max_msg_size =
1092                                 msg->ptlm_u.hello.kptlhm_max_msg_size;
1093
1094                         write_unlock_irqrestore(g_lock, flags);
1095
1096                         CWARN("Outgoing instantiated peer %s\n",
1097                               libcfs_id2str(lpid));
1098                 } else {
1099                         LASSERT (peer->peer_state == PEER_STATE_ACTIVE);
1100
1101                         write_unlock_irqrestore(g_lock, flags);
1102
1103                         /* WOW!  Somehow this peer completed the HELLO
1104                          * handshake while I slept.  I guess I could have slept
1105                          * while it rebooted and sent a new HELLO, so I'll fail
1106                          * this one... */
1107                         CWARN("Wow! peer %s\n", libcfs_id2str(lpid));
1108                         kptllnd_peer_decref(peer);
1109                         peer = NULL;
1110                 }
1111
1112                 kptllnd_peer_unreserve_buffers();
1113                 kptllnd_peer_decref(new_peer);
1114                 kptllnd_tx_decref(hello_tx);
1115                 return peer;
1116         }
1117
1118         if (kptllnd_data.kptl_n_active_peers ==
1119             kptllnd_data.kptl_expected_peers) {
1120                 /* peer table full */
1121                 write_unlock_irqrestore(g_lock, flags);
1122
1123                 kptllnd_peertable_overflow_msg("Connection from ", lpid);
1124
1125                 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1126                 if (rc != 0) {
1127                         CERROR("Refusing connection from %s\n",
1128                                libcfs_id2str(lpid));
1129                         kptllnd_peer_unreserve_buffers();
1130                         kptllnd_peer_decref(new_peer);
1131                         kptllnd_tx_decref(hello_tx);
1132                         return NULL;
1133                 }
1134                 
1135                 write_lock_irqsave(g_lock, flags);
1136                 kptllnd_data.kptl_expected_peers++;
1137                 goto again;
1138         }
1139
1140         last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(lpid);
1141
1142         hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1143         hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1144                 *kptllnd_tunables.kptl_max_msg_size;
1145
1146         new_peer->peer_state = PEER_STATE_ACTIVE;
1147         new_peer->peer_incarnation = msg->ptlm_srcstamp;
1148         new_peer->peer_next_matchbits = safe_matchbits;
1149         new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1150         new_peer->peer_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size;
1151
1152         kptllnd_peer_add_peertable_locked(new_peer);
1153
1154         write_unlock_irqrestore(g_lock, flags);
1155
1156         /* NB someone else could get in now and post a message before I post
1157          * the HELLO, but post_tx/check_sends take care of that! */
1158
1159         CDEBUG(D_NETTRACE, "%s: post response hello %p\n",
1160                libcfs_id2str(new_peer->peer_id), hello_tx);
1161
1162         kptllnd_post_tx(new_peer, hello_tx, 0);
1163         kptllnd_peer_check_sends(new_peer);
1164
1165         return new_peer;
1166 }
1167
1168 void
1169 kptllnd_tx_launch(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
1170 {
1171         kptllnd_post_tx(peer, tx, nfrag);
1172         kptllnd_peer_check_sends(peer);
1173 }
1174
1175 int
1176 kptllnd_find_target(kptl_peer_t **peerp, lnet_process_id_t target)
1177 {
1178         rwlock_t         *g_lock = &kptllnd_data.kptl_peer_rw_lock;
1179         ptl_process_id_t  ptl_id;
1180         kptl_peer_t      *new_peer;
1181         kptl_tx_t        *hello_tx;
1182         unsigned long     flags;
1183         int               rc;
1184         __u64             last_matchbits_seen;
1185
1186         /* I expect to find the peer, so I only take a read lock... */
1187         read_lock_irqsave(g_lock, flags);
1188         *peerp = kptllnd_id2peer_locked(target);
1189         read_unlock_irqrestore(g_lock, flags);
1190
1191         if (*peerp != NULL)
1192                 return 0;
1193         
1194         if ((target.pid & LNET_PID_USERFLAG) != 0) {
1195                 CWARN("Refusing to create a new connection to %s "
1196                       "(non-kernel peer)\n", libcfs_id2str(target));
1197                 return -EHOSTUNREACH;
1198         }
1199
1200         /* The new peer is a kernel ptllnd, and kernel ptllnds all have
1201          * the same portals PID */
1202         ptl_id.nid = kptllnd_lnet2ptlnid(target.nid);
1203         ptl_id.pid = kptllnd_data.kptl_portals_id.pid;
1204
1205         hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1206         if (hello_tx == NULL) {
1207                 CERROR("Unable to allocate connect message for %s\n",
1208                        libcfs_id2str(target));
1209                 return -ENOMEM;
1210         }
1211
1212         kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1213                          sizeof(kptl_hello_msg_t));
1214
1215         new_peer = kptllnd_peer_allocate(target, ptl_id);
1216         if (new_peer == NULL) {
1217                 rc = -ENOMEM;
1218                 goto unwind_0;
1219         }
1220
1221         rc = kptllnd_peer_reserve_buffers();
1222         if (rc != 0)
1223                 goto unwind_1;
1224
1225         write_lock_irqsave(g_lock, flags);
1226  again:
1227         if (kptllnd_data.kptl_shutdown) {
1228                 write_unlock_irqrestore(g_lock, flags);
1229                 rc = -ESHUTDOWN;
1230                 goto unwind_2;
1231         }
1232
1233         *peerp = kptllnd_id2peer_locked(target);
1234         if (*peerp != NULL) {
1235                 write_unlock_irqrestore(g_lock, flags);
1236                 goto unwind_2;
1237         }
1238
1239         kptllnd_cull_peertable_locked(target);
1240
1241         if (kptllnd_data.kptl_n_active_peers ==
1242             kptllnd_data.kptl_expected_peers) {
1243                 /* peer table full */
1244                 write_unlock_irqrestore(g_lock, flags);
1245
1246                 kptllnd_peertable_overflow_msg("Connection to ", target);
1247
1248                 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1249                 if (rc != 0) {
1250                         CERROR("Can't create connection to %s\n",
1251                                libcfs_id2str(target));
1252                         rc = -ENOMEM;
1253                         goto unwind_2;
1254                 }
1255                 write_lock_irqsave(g_lock, flags);
1256                 kptllnd_data.kptl_expected_peers++;
1257                 goto again;
1258         }
1259
1260         last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(target);
1261
1262         hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1263         hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1264                 *kptllnd_tunables.kptl_max_msg_size;
1265                 
1266         new_peer->peer_state = PEER_STATE_WAITING_HELLO;
1267         new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1268         
1269         kptllnd_peer_add_peertable_locked(new_peer);
1270
1271         write_unlock_irqrestore(g_lock, flags);
1272
1273         /* NB someone else could get in now and post a message before I post
1274          * the HELLO, but post_tx/check_sends take care of that! */
1275
1276         CDEBUG(D_NETTRACE, "%s: post initial hello %p\n",
1277                libcfs_id2str(new_peer->peer_id), hello_tx);
1278
1279         kptllnd_post_tx(new_peer, hello_tx, 0);
1280         kptllnd_peer_check_sends(new_peer);
1281        
1282         *peerp = new_peer;
1283         return 0;
1284         
1285  unwind_2:
1286         kptllnd_peer_unreserve_buffers();
1287  unwind_1:
1288         kptllnd_peer_decref(new_peer);
1289  unwind_0:
1290         kptllnd_tx_decref(hello_tx);
1291
1292         return rc;
1293 }