Whamcloud - gitweb
- turn off enable_irq_affinity by default.
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd_peer.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved.
5  *   Author: PJ Kirner <pjkirner@clusterfs.com>
6  *           E Barton <eeb@bartonsoftware.com>
7  *
8  *   This file is part of the Lustre file system, http://www.lustre.org
9  *   Lustre is a trademark of Cluster File Systems, Inc.
10  *
11  *   This file is confidential source code owned by Cluster File Systems.
12  *   No viewing, modification, compilation, redistribution, or any other
13  *   form of use is permitted except through a signed license agreement.
14  *
15  *   If you have not signed such an agreement, then you have no rights to
16  *   this file.  Please destroy it immediately and contact CFS.
17  *
18  */
19
20 #include "ptllnd.h"
21 #include <libcfs/list.h>
22
23 static int
24 kptllnd_count_queue(struct list_head *q)
25 {
26         struct list_head *e;
27         int               n = 0;
28         
29         list_for_each(e, q) {
30                 n++;
31         }
32
33         return n;
34 }
35
36 int
37 kptllnd_get_peer_info(int index, 
38                       lnet_process_id_t *id,
39                       int *state, int *sent_hello,
40                       int *refcount, __u64 *incarnation,
41                       __u64 *next_matchbits, __u64 *last_matchbits_seen,
42                       int *nsendq, int *nactiveq,
43                       int *credits, int *outstanding_credits) 
44 {
45         rwlock_t         *g_lock = &kptllnd_data.kptl_peer_rw_lock;
46         unsigned long     flags;
47         struct list_head *ptmp;
48         kptl_peer_t      *peer;
49         int               i;
50         int               rc = -ENOENT;
51
52         read_lock_irqsave(g_lock, flags);
53
54         for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++) {
55                 
56                 list_for_each (ptmp, &kptllnd_data.kptl_peers[i]) {
57                         peer = list_entry(ptmp, kptl_peer_t, peer_list);
58
59                         if (index-- > 0)
60                                 continue;
61                         
62                         *id          = peer->peer_id;
63                         *state       = peer->peer_state;
64                         *sent_hello  = peer->peer_sent_hello;
65                         *refcount    = atomic_read(&peer->peer_refcount);
66                         *incarnation = peer->peer_incarnation;
67
68                         spin_lock(&peer->peer_lock);
69
70                         *next_matchbits      = peer->peer_next_matchbits;
71                         *last_matchbits_seen = peer->peer_last_matchbits_seen;
72                         *credits             = peer->peer_credits;
73                         *outstanding_credits = peer->peer_outstanding_credits;
74
75                         *nsendq   = kptllnd_count_queue(&peer->peer_sendq);
76                         *nactiveq = kptllnd_count_queue(&peer->peer_activeq);
77
78                         spin_unlock(&peer->peer_lock);
79
80                         rc = 0;
81                         goto out;
82                 }
83         }
84         
85  out:
86         read_unlock_irqrestore(g_lock, flags);
87         return rc;
88 }
89
90 void
91 kptllnd_peer_add_peertable_locked (kptl_peer_t *peer)
92 {
93         LASSERT (!kptllnd_data.kptl_shutdown);
94         LASSERT (kptllnd_data.kptl_n_active_peers <
95                  kptllnd_data.kptl_expected_peers);
96
97         LASSERT (peer->peer_state == PEER_STATE_WAITING_HELLO ||
98                  peer->peer_state == PEER_STATE_ACTIVE);
99         
100         kptllnd_data.kptl_n_active_peers++;
101         atomic_inc(&peer->peer_refcount);       /* +1 ref for the list */
102
103         /* NB add to HEAD of peer list for MRU order!
104          * (see kptllnd_cull_peertable) */
105         list_add(&peer->peer_list, kptllnd_nid2peerlist(peer->peer_id.nid));
106 }
107
108 void
109 kptllnd_cull_peertable_locked (lnet_process_id_t pid)
110 {
111         /* I'm about to add a new peer with this portals ID to the peer table,
112          * so (a) this peer should not exist already and (b) I want to leave at
113          * most (max_procs_per_nid - 1) peers with this NID in the table. */
114         struct list_head  *peers = kptllnd_nid2peerlist(pid.nid);
115         int                cull_count = *kptllnd_tunables.kptl_max_procs_per_node;
116         int                count;
117         struct list_head  *tmp;
118         struct list_head  *nxt;
119         kptl_peer_t       *peer;
120         
121         count = 0;
122         list_for_each_safe (tmp, nxt, peers) {
123                 /* NB I rely on kptllnd_peer_add_peertable_locked to add peers
124                  * in MRU order */
125                 peer = list_entry(tmp, kptl_peer_t, peer_list);
126                         
127                 if (peer->peer_id.nid != pid.nid)
128                         continue;
129
130                 LASSERT (peer->peer_id.pid != pid.pid);
131                         
132                 count++;
133
134                 if (count < cull_count) /* recent (don't cull) */
135                         continue;
136
137                 CDEBUG(D_NET, "Cull %s(%s)\n",
138                        libcfs_id2str(peer->peer_id),
139                        kptllnd_ptlid2str(peer->peer_ptlid));
140                 
141                 kptllnd_peer_close_locked(peer, 0);
142         }
143 }
144
145 kptl_peer_t *
146 kptllnd_peer_allocate (lnet_process_id_t lpid, ptl_process_id_t ppid)
147 {
148         unsigned long    flags;
149         kptl_peer_t     *peer;
150
151         LIBCFS_ALLOC(peer, sizeof (*peer));
152         if (peer == NULL) {
153                 CERROR("Can't create peer %s (%s)\n",
154                        libcfs_id2str(lpid), 
155                        kptllnd_ptlid2str(ppid));
156                 return NULL;
157         }
158
159         memset(peer, 0, sizeof(*peer));         /* zero flags etc */
160
161         INIT_LIST_HEAD (&peer->peer_sendq);
162         INIT_LIST_HEAD (&peer->peer_activeq);
163         spin_lock_init (&peer->peer_lock);
164
165         peer->peer_state = PEER_STATE_ALLOCATED;
166         peer->peer_error = 0;
167         peer->peer_last_alive = cfs_time_current();
168         peer->peer_id = lpid;
169         peer->peer_ptlid = ppid;
170         peer->peer_credits = 1;                 /* enough for HELLO */
171         peer->peer_next_matchbits = PTL_RESERVED_MATCHBITS;
172         peer->peer_outstanding_credits = *kptllnd_tunables.kptl_peercredits - 1;
173         peer->peer_sent_credits = 1;           /* HELLO credit is implicit */
174         peer->peer_max_msg_size = PTLLND_MIN_BUFFER_SIZE; /* until we know better */
175
176         atomic_set(&peer->peer_refcount, 1);    /* 1 ref for caller */
177
178         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
179
180         peer->peer_myincarnation = kptllnd_data.kptl_incarnation;
181
182         /* Only increase # peers under lock, to guarantee we dont grow it
183          * during shutdown */
184         if (kptllnd_data.kptl_shutdown) {
185                 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, 
186                                         flags);
187                 LIBCFS_FREE(peer, sizeof(*peer));
188                 return NULL;
189         }
190
191         kptllnd_data.kptl_npeers++;
192         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
193         
194         return peer;
195 }
196
197 void
198 kptllnd_peer_destroy (kptl_peer_t *peer)
199 {
200         unsigned long flags;
201         
202         CDEBUG(D_NET, "Peer=%p\n", peer);
203
204         LASSERT (!in_interrupt());
205         LASSERT (atomic_read(&peer->peer_refcount) == 0);
206         LASSERT (peer->peer_state == PEER_STATE_ALLOCATED ||
207                  peer->peer_state == PEER_STATE_ZOMBIE);
208         LASSERT (list_empty(&peer->peer_sendq));
209         LASSERT (list_empty(&peer->peer_activeq));
210
211         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
212
213         if (peer->peer_state == PEER_STATE_ZOMBIE)
214                 list_del(&peer->peer_list);
215
216         kptllnd_data.kptl_npeers--;
217
218         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
219
220         LIBCFS_FREE (peer, sizeof (*peer));
221 }
222
223 void
224 kptllnd_cancel_txlist (struct list_head *peerq, struct list_head *txs)
225 {
226         struct list_head  *tmp;
227         struct list_head  *nxt;
228         kptl_tx_t         *tx;
229
230         list_for_each_safe (tmp, nxt, peerq) {
231                 tx = list_entry(tmp, kptl_tx_t, tx_list);
232
233                 list_del(&tx->tx_list);
234                 list_add_tail(&tx->tx_list, txs);
235
236                 tx->tx_status = -EIO;
237                 tx->tx_active = 0;
238         }
239 }
240
241 void
242 kptllnd_peer_cancel_txs(kptl_peer_t *peer, struct list_head *txs)
243 {
244         unsigned long   flags;
245
246         spin_lock_irqsave(&peer->peer_lock, flags);
247
248         kptllnd_cancel_txlist(&peer->peer_sendq, txs);
249         kptllnd_cancel_txlist(&peer->peer_activeq, txs);
250                 
251         spin_unlock_irqrestore(&peer->peer_lock, flags);
252 }
253
254 void
255 kptllnd_peer_alive (kptl_peer_t *peer)
256 {
257         /* This is racy, but everyone's only writing cfs_time_current() */
258         peer->peer_last_alive = cfs_time_current();
259         mb();
260 }
261
262 void
263 kptllnd_peer_notify (kptl_peer_t *peer)
264 {
265         unsigned long flags;
266         time_t        last_alive = 0;
267         int           error = 0;
268         
269         spin_lock_irqsave(&peer->peer_lock, flags);
270
271         if (peer->peer_error != 0) {
272                 error = peer->peer_error;
273                 peer->peer_error = 0;
274                 
275                 last_alive = cfs_time_current_sec() - 
276                              cfs_duration_sec(cfs_time_current() - 
277                                               peer->peer_last_alive);
278         }
279         
280         spin_unlock_irqrestore(&peer->peer_lock, flags);
281
282         if (error != 0)
283                 lnet_notify (kptllnd_data.kptl_ni, peer->peer_id.nid, 0,
284                              last_alive);
285 }
286
287 void
288 kptllnd_handle_closing_peers ()
289 {
290         unsigned long           flags;
291         struct list_head        txs;
292         kptl_peer_t            *peer;
293         struct list_head       *tmp;
294         struct list_head       *nxt;
295         kptl_tx_t              *tx;
296         int                     idle;
297
298         /* Check with a read lock first to avoid blocking anyone */
299
300         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
301         idle = list_empty(&kptllnd_data.kptl_closing_peers) &&
302                list_empty(&kptllnd_data.kptl_zombie_peers);
303         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
304
305         if (idle)
306                 return;
307
308         INIT_LIST_HEAD(&txs);
309
310         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
311
312         /* Cancel txs on all zombie peers.  NB anyone dropping the last peer
313          * ref removes it from this list, so I musn't drop the lock while
314          * scanning it. */
315         list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
316                 peer = list_entry (tmp, kptl_peer_t, peer_list);
317
318                 LASSERT (peer->peer_state == PEER_STATE_ZOMBIE);
319
320                 kptllnd_peer_cancel_txs(peer, &txs);
321         }
322
323         /* Notify LNET and cancel txs on closing (i.e. newly closed) peers.  NB
324          * I'm the only one removing from this list, but peers can be added on
325          * the end any time I drop the lock. */
326
327         list_for_each_safe (tmp, nxt, &kptllnd_data.kptl_closing_peers) {
328                 peer = list_entry (tmp, kptl_peer_t, peer_list);
329
330                 LASSERT (peer->peer_state == PEER_STATE_CLOSING);
331
332                 list_del(&peer->peer_list);
333                 list_add_tail(&peer->peer_list,
334                               &kptllnd_data.kptl_zombie_peers);
335                 peer->peer_state = PEER_STATE_ZOMBIE;
336
337                 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
338
339                 kptllnd_peer_notify(peer);
340                 kptllnd_peer_cancel_txs(peer, &txs);
341                 kptllnd_peer_decref(peer);
342
343                 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
344         }
345
346         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
347
348         /* Drop peer's ref on all cancelled txs.  This will get
349          * kptllnd_tx_fini() to abort outstanding comms if necessary. */
350
351         list_for_each_safe (tmp, nxt, &txs) {
352                 tx = list_entry(tmp, kptl_tx_t, tx_list);
353                 list_del(&tx->tx_list);
354                 kptllnd_tx_decref(tx);
355         }
356 }
357
358 void
359 kptllnd_peer_close_locked(kptl_peer_t *peer, int why)
360 {
361         switch (peer->peer_state) {
362         default:
363                 LBUG();
364
365         case PEER_STATE_WAITING_HELLO:
366         case PEER_STATE_ACTIVE:
367                 /* Ensure new peers see a new incarnation of me */
368                 LASSERT(peer->peer_myincarnation <= kptllnd_data.kptl_incarnation);
369                 if (peer->peer_myincarnation == kptllnd_data.kptl_incarnation)
370                         kptllnd_data.kptl_incarnation++;
371
372                 /* Removing from peer table */
373                 kptllnd_data.kptl_n_active_peers--;
374                 LASSERT (kptllnd_data.kptl_n_active_peers >= 0);
375
376                 list_del(&peer->peer_list);
377                 kptllnd_peer_unreserve_buffers();
378
379                 peer->peer_error = why; /* stash 'why' only on first close */
380                 peer->peer_state = PEER_STATE_CLOSING;
381
382                 /* Schedule for immediate attention, taking peer table's ref */
383                 list_add_tail(&peer->peer_list, 
384                               &kptllnd_data.kptl_closing_peers);
385                 wake_up(&kptllnd_data.kptl_watchdog_waitq);
386                 break;
387
388         case PEER_STATE_ZOMBIE:
389         case PEER_STATE_CLOSING:
390                 break;
391         }
392 }
393
394 void
395 kptllnd_peer_close(kptl_peer_t *peer, int why)
396 {
397         unsigned long      flags;
398
399         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
400         kptllnd_peer_close_locked(peer, why);
401         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
402 }
403
404 int
405 kptllnd_peer_del(lnet_process_id_t id)
406 {
407         struct list_head  *ptmp;
408         struct list_head  *pnxt;
409         kptl_peer_t       *peer;
410         int                lo;
411         int                hi;
412         int                i;
413         unsigned long      flags;
414         int                rc = -ENOENT;
415
416         /*
417          * Find the single bucket we are supposed to look at or if nid is a
418          * wildcard (LNET_NID_ANY) then look at all of the buckets
419          */
420         if (id.nid != LNET_NID_ANY) {
421                 struct list_head *l = kptllnd_nid2peerlist(id.nid);
422                 
423                 lo = hi =  l - kptllnd_data.kptl_peers;
424         } else {
425                 if (id.pid != LNET_PID_ANY)
426                         return -EINVAL;
427                 
428                 lo = 0;
429                 hi = kptllnd_data.kptl_peer_hash_size - 1;
430         }
431
432 again:
433         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
434
435         for (i = lo; i <= hi; i++) {
436                 list_for_each_safe (ptmp, pnxt, &kptllnd_data.kptl_peers[i]) {
437                         peer = list_entry (ptmp, kptl_peer_t, peer_list);
438
439                         if (!(id.nid == LNET_NID_ANY || 
440                               (peer->peer_id.nid == id.nid &&
441                                (id.pid == LNET_PID_ANY || 
442                                 peer->peer_id.pid == id.pid))))
443                                 continue;
444
445                         kptllnd_peer_addref(peer); /* 1 ref for me... */
446
447                         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
448                                                flags);
449
450                         kptllnd_peer_close(peer, 0);
451                         kptllnd_peer_decref(peer); /* ...until here */
452
453                         rc = 0;         /* matched something */
454
455                         /* start again now I've dropped the lock */
456                         goto again;
457                 }
458         }
459
460         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
461
462         return (rc);
463 }
464
465 void
466 kptllnd_post_tx(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
467 {
468         /* CAVEAT EMPTOR: I take over caller's ref on 'tx' */
469         ptl_handle_md_t  msg_mdh;
470         ptl_md_t         md;
471         ptl_err_t        prc;
472         unsigned long    flags;
473
474         LASSERT (!tx->tx_idle);
475         LASSERT (!tx->tx_active);
476         LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
477         LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
478         LASSERT (tx->tx_type == TX_TYPE_SMALL_MESSAGE ||
479                  tx->tx_type == TX_TYPE_PUT_REQUEST ||
480                  tx->tx_type == TX_TYPE_GET_REQUEST);
481
482         kptllnd_set_tx_peer(tx, peer);
483
484         memset(&md, 0, sizeof(md));
485
486         md.threshold = tx->tx_acked ? 2 : 1;    /* SEND END + ACK? */
487         md.options = PTL_MD_OP_PUT |
488                      PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
489                      PTL_MD_EVENT_START_DISABLE;
490         md.user_ptr = &tx->tx_msg_eventarg;
491         md.eq_handle = kptllnd_data.kptl_eqh;
492
493         if (nfrag == 0) {
494                 md.start = tx->tx_msg;
495                 md.length = tx->tx_msg->ptlm_nob;
496         } else {
497                 LASSERT (nfrag > 1);
498                 LASSERT (tx->tx_frags->iov[0].iov_base == (void *)tx->tx_msg);
499
500                 md.start = tx->tx_frags;
501                 md.length = nfrag;
502                 md.options |= PTL_MD_IOVEC;
503         }
504
505         prc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &msg_mdh);
506         if (prc != PTL_OK) {
507                 CERROR("PtlMDBind(%s) failed: %s(%d)\n",
508                        libcfs_id2str(peer->peer_id),
509                        kptllnd_errtype2str(prc), prc);
510                 tx->tx_status = -EIO;
511                 kptllnd_tx_decref(tx);
512                 return;
513         }
514
515         spin_lock_irqsave(&peer->peer_lock, flags);
516
517         tx->tx_deadline = jiffies + (*kptllnd_tunables.kptl_timeout * HZ);
518         tx->tx_active = 1;
519         tx->tx_msg_mdh = msg_mdh;
520
521         /* Ensure HELLO is sent first */
522         if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_HELLO)
523                 list_add(&tx->tx_list, &peer->peer_sendq);
524         else
525                 list_add_tail(&tx->tx_list, &peer->peer_sendq);
526
527         spin_unlock_irqrestore(&peer->peer_lock, flags);
528 }
529
530 void
531 kptllnd_peer_check_sends (kptl_peer_t *peer)
532 {
533         ptl_handle_me_t  meh;
534         kptl_tx_t       *tx;
535         int              rc;
536         unsigned long    flags;
537
538         LASSERT(!in_interrupt());
539
540         spin_lock_irqsave(&peer->peer_lock, flags);
541
542         peer->peer_retry_noop = 0;
543
544         if (list_empty(&peer->peer_sendq) &&
545             peer->peer_outstanding_credits >= PTLLND_CREDIT_HIGHWATER &&
546             peer->peer_credits != 0) {
547
548                 /* post a NOOP to return credits */
549                 spin_unlock_irqrestore(&peer->peer_lock, flags);
550
551                 tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
552                 if (tx == NULL) {
553                         CERROR("Can't return credits to %s: can't allocate descriptor\n",
554                                libcfs_id2str(peer->peer_id));
555                 } else {
556                         kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_NOOP, 0);
557                         kptllnd_post_tx(peer, tx, 0);
558                 }
559
560                 spin_lock_irqsave(&peer->peer_lock, flags);
561                 peer->peer_retry_noop = (tx == NULL);
562         }
563
564         while (!list_empty(&peer->peer_sendq)) {
565                 tx = list_entry (peer->peer_sendq.next, kptl_tx_t, tx_list);
566
567                 LASSERT (tx->tx_active);
568                 LASSERT (!PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
569                 LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
570
571                 LASSERT (peer->peer_outstanding_credits >= 0);
572                 LASSERT (peer->peer_sent_credits >= 0);
573                 LASSERT (peer->peer_sent_credits +
574                          peer->peer_outstanding_credits <=
575                          *kptllnd_tunables.kptl_peercredits);
576                 LASSERT (peer->peer_credits >= 0);
577
578                 /* Ensure HELLO is sent first */
579                 if (!peer->peer_sent_hello) {
580                         if (tx->tx_msg->ptlm_type != PTLLND_MSG_TYPE_HELLO)
581                                 break;
582                         peer->peer_sent_hello = 1;
583                 }
584
585                 if (peer->peer_credits == 0) {
586                         CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: no credits for %p\n",
587                                libcfs_id2str(peer->peer_id), 
588                                peer->peer_credits,
589                                peer->peer_outstanding_credits, 
590                                peer->peer_sent_credits, tx);
591                         break;
592                 }
593
594                 /* Don't use the last credit unless I've got credits to
595                  * return */
596                 if (peer->peer_credits == 1 &&
597                     peer->peer_outstanding_credits == 0) {
598                         CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: "
599                                "not using last credit for %p\n",
600                                libcfs_id2str(peer->peer_id), 
601                                peer->peer_credits,
602                                peer->peer_outstanding_credits,
603                                peer->peer_sent_credits, tx);
604                         break;
605                 }
606
607                 list_del(&tx->tx_list);
608
609                 /* Discard any NOOP I queued if I'm not at the high-water mark
610                  * any more or more messages have been queued */
611                 if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_NOOP &&
612                     (!list_empty(&peer->peer_sendq) ||
613                      peer->peer_outstanding_credits < PTLLND_CREDIT_HIGHWATER)) {
614
615                         tx->tx_active = 0;
616
617                         spin_unlock_irqrestore(&peer->peer_lock, flags);
618
619                         CDEBUG(D_NET, "%s: redundant noop\n", 
620                                libcfs_id2str(peer->peer_id));
621                         kptllnd_tx_decref(tx);
622
623                         spin_lock_irqsave(&peer->peer_lock, flags);
624                         continue;
625                 }
626
627                 /* fill last-minute msg fields */
628                 kptllnd_msg_pack(tx->tx_msg, peer);
629
630                 if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
631                     tx->tx_type == TX_TYPE_GET_REQUEST) {
632                         /* peer_next_matchbits must be known good */
633                         LASSERT (peer->peer_state >= PEER_STATE_ACTIVE);
634                         /* Assume 64-bit matchbits can't wrap */
635                         LASSERT (peer->peer_next_matchbits >= PTL_RESERVED_MATCHBITS);
636                         tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits =
637                                 peer->peer_next_matchbits++;
638                 }
639                 
640                 peer->peer_sent_credits += peer->peer_outstanding_credits;
641                 peer->peer_outstanding_credits = 0;
642                 peer->peer_credits--;
643
644                 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: %s tx=%p nob=%d cred=%d\n",
645                        libcfs_id2str(peer->peer_id), peer->peer_credits,
646                        peer->peer_outstanding_credits, peer->peer_sent_credits,
647                        kptllnd_msgtype2str(tx->tx_msg->ptlm_type),
648                        tx, tx->tx_msg->ptlm_nob,
649                        tx->tx_msg->ptlm_credits);
650
651                 list_add_tail(&tx->tx_list, &peer->peer_activeq);
652
653                 kptllnd_tx_addref(tx);          /* 1 ref for me... */
654
655                 spin_unlock_irqrestore(&peer->peer_lock, flags);
656
657                 if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
658                     tx->tx_type == TX_TYPE_GET_REQUEST) {
659                         /* Post bulk now we have safe matchbits */
660                         rc = PtlMEAttach(kptllnd_data.kptl_nih,
661                                          *kptllnd_tunables.kptl_portal,
662                                          peer->peer_ptlid,
663                                          tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits,
664                                          0,             /* ignore bits */
665                                          PTL_UNLINK,
666                                          PTL_INS_BEFORE,
667                                          &meh);
668                         if (rc != PTL_OK) {
669                                 CERROR("PtlMEAttach(%s) failed: %s(%d)\n",
670                                        libcfs_id2str(peer->peer_id),
671                                        kptllnd_errtype2str(rc), rc);
672                                 goto failed;
673                         }
674
675                         rc = PtlMDAttach(meh, tx->tx_rdma_md, PTL_UNLINK,
676                                          &tx->tx_rdma_mdh);
677                         if (rc != PTL_OK) {
678                                 CERROR("PtlMDAttach(%s) failed: %s(%d)\n",
679                                        libcfs_id2str(tx->tx_peer->peer_id),
680                                        kptllnd_errtype2str(rc), rc);
681                                 rc = PtlMEUnlink(meh);
682                                 LASSERT(rc == PTL_OK);
683                                 tx->tx_rdma_mdh = PTL_INVALID_HANDLE;
684                                 goto failed;
685                         }
686                         /* I'm not racing with the event callback here.  It's a
687                          * bug if there's an event on the MD I just attached
688                          * before I actually send the RDMA request message -
689                          * probably matchbits re-used in error. */
690                 }
691
692                 tx->tx_tposted = jiffies;       /* going on the wire */
693
694                 rc = PtlPut (tx->tx_msg_mdh,
695                              tx->tx_acked ? PTL_ACK_REQ : PTL_NOACK_REQ,
696                              peer->peer_ptlid,
697                              *kptllnd_tunables.kptl_portal,
698                              0,                 /* acl cookie */
699                              LNET_MSG_MATCHBITS,
700                              0,                 /* offset */
701                              0);                /* header data */
702                 if (rc != PTL_OK) {
703                         CERROR("PtlPut %s error %s(%d)\n",
704                                libcfs_id2str(peer->peer_id),
705                                kptllnd_errtype2str(rc), rc);
706                         goto failed;
707                 }
708
709                 kptllnd_tx_decref(tx);          /* drop my ref */
710
711                 spin_lock_irqsave(&peer->peer_lock, flags);
712         }
713
714         spin_unlock_irqrestore(&peer->peer_lock, flags);
715         return;
716
717  failed:
718         /* Nuke everything (including tx we were trying) */
719         kptllnd_peer_close(peer, -EIO);
720         kptllnd_tx_decref(tx);
721 }
722
723 kptl_tx_t *
724 kptllnd_find_timed_out_tx(kptl_peer_t *peer)
725 {
726         kptl_tx_t         *tx;
727         struct list_head  *ele;
728
729         list_for_each(ele, &peer->peer_sendq) {
730                 tx = list_entry(ele, kptl_tx_t, tx_list);
731
732                 if (time_after_eq(jiffies, tx->tx_deadline)) {
733                         kptllnd_tx_addref(tx);
734                         return tx;
735                 }
736         }
737
738         list_for_each(ele, &peer->peer_activeq) {
739                 tx = list_entry(ele, kptl_tx_t, tx_list);
740
741                 if (time_after_eq(jiffies, tx->tx_deadline)) {
742                         kptllnd_tx_addref(tx);
743                         return tx;
744                 }
745         }
746
747         return NULL;
748 }
749
750
751 void
752 kptllnd_peer_check_bucket (int idx, int stamp)
753 {
754         struct list_head  *peers = &kptllnd_data.kptl_peers[idx];
755         struct list_head  *ptmp;
756         kptl_peer_t       *peer;
757         kptl_tx_t         *tx;
758         unsigned long      flags;
759         int                nsend;
760         int                nactive;
761         int                check_sends;
762
763         CDEBUG(D_NET, "Bucket=%d, stamp=%d\n", idx, stamp);
764
765  again:
766         /* NB. Shared lock while I just look */
767         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
768
769         list_for_each (ptmp, peers) {
770                 peer = list_entry (ptmp, kptl_peer_t, peer_list);
771
772                 CDEBUG(D_NET, "Peer=%s Credits=%d Outstanding=%d Send=%d\n",
773                        libcfs_id2str(peer->peer_id), peer->peer_credits, 
774                        peer->peer_outstanding_credits, peer->peer_sent_credits);
775
776                 spin_lock(&peer->peer_lock);
777
778                 if (peer->peer_check_stamp == stamp) {
779                         /* checked already this pass */
780                         spin_unlock(&peer->peer_lock);
781                         continue;
782                 }
783
784                 peer->peer_check_stamp = stamp;
785                 tx = kptllnd_find_timed_out_tx(peer);
786                 check_sends = peer->peer_retry_noop;
787                 
788                 spin_unlock(&peer->peer_lock);
789                 
790                 if (tx == NULL && !check_sends)
791                         continue;
792
793                 kptllnd_peer_addref(peer); /* 1 ref for me... */
794
795                 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
796
797                 if (tx == NULL) { /* nothing timed out */
798                         kptllnd_peer_check_sends(peer);
799                         kptllnd_peer_decref(peer); /* ...until here or... */
800
801                         /* rescan after dropping the lock */
802                         goto again;
803                 }
804
805                 spin_lock_irqsave(&peer->peer_lock, flags);
806                 nsend = kptllnd_count_queue(&peer->peer_sendq);
807                 nactive = kptllnd_count_queue(&peer->peer_activeq);
808                 spin_unlock_irqrestore(&peer->peer_lock, flags);
809
810                 LCONSOLE_ERROR_MSG(0x126, "Timing out %s: %s\n",
811                                    libcfs_id2str(peer->peer_id),
812                                    (tx->tx_tposted == 0) ? 
813                                    "no free peer buffers" : 
814                                    "please check Portals");
815
816                 if (tx->tx_tposted) {
817                         CERROR("Could not send to %s after %ds (sent %lds ago); "
818                                 "check Portals for possible issues\n",
819                                 libcfs_id2str(peer->peer_id),
820                                 *kptllnd_tunables.kptl_timeout,
821                                 cfs_duration_sec(jiffies - tx->tx_tposted));
822                 } else {
823                         CERROR("Could not get credits for %s after %ds; "
824                                 "possible Lustre networking issues\n",
825                         libcfs_id2str(peer->peer_id),
826                         *kptllnd_tunables.kptl_timeout);
827                 }
828
829                 CERROR("%s timed out: cred %d outstanding %d, sent %d, "
830                        "sendq %d, activeq %d Tx %p %s (%s%s%s) status %d "
831                        "%sposted %lu T/O %ds\n",
832                        libcfs_id2str(peer->peer_id), peer->peer_credits,
833                        peer->peer_outstanding_credits, peer->peer_sent_credits,
834                        nsend, nactive, tx, kptllnd_tx_typestr(tx->tx_type),
835                        tx->tx_active ? "A" : "",
836                        PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ?
837                        "" : "M",
838                        PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE) ?
839                        "" : "D",
840                        tx->tx_status,
841                        (tx->tx_tposted == 0) ? "not " : "",
842                        (tx->tx_tposted == 0) ? 0UL : (jiffies - tx->tx_tposted),
843                        *kptllnd_tunables.kptl_timeout);
844
845                 kptllnd_dump_ptltrace();
846
847                 kptllnd_tx_decref(tx);
848
849                 kptllnd_peer_close(peer, -ETIMEDOUT);
850                 kptllnd_peer_decref(peer); /* ...until here */
851
852                 /* start again now I've dropped the lock */
853                 goto again;
854         }
855
856         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
857 }
858
859 kptl_peer_t *
860 kptllnd_id2peer_locked (lnet_process_id_t id)
861 {
862         struct list_head *peers = kptllnd_nid2peerlist(id.nid);
863         struct list_head *tmp;
864         kptl_peer_t      *peer;
865
866         list_for_each (tmp, peers) {
867
868                 peer = list_entry (tmp, kptl_peer_t, peer_list);
869
870                 LASSERT(peer->peer_state == PEER_STATE_WAITING_HELLO ||
871                         peer->peer_state == PEER_STATE_ACTIVE);
872                 
873                 if (peer->peer_id.nid != id.nid ||
874                     peer->peer_id.pid != id.pid)
875                         continue;
876
877                 kptllnd_peer_addref(peer);
878
879                 CDEBUG(D_NET, "%s -> %s (%d)\n",
880                        libcfs_id2str(id), 
881                        kptllnd_ptlid2str(peer->peer_ptlid),
882                        atomic_read (&peer->peer_refcount));
883                 return peer;
884         }
885
886         return NULL;
887 }
888
889 void
890 kptllnd_peertable_overflow_msg(char *str, lnet_process_id_t id)
891 {
892         LCONSOLE_ERROR_MSG(0x127, "%s %s overflows the peer table[%d]: "
893                            "messages may be dropped\n",
894                            str, libcfs_id2str(id),
895                            kptllnd_data.kptl_n_active_peers);
896         LCONSOLE_ERROR_MSG(0x128, "Please correct by increasing "
897                            "'max_nodes' or 'max_procs_per_node'\n");
898 }
899
900 __u64
901 kptllnd_get_last_seen_matchbits_locked(lnet_process_id_t lpid)
902 {
903         kptl_peer_t            *peer;
904         struct list_head       *tmp;
905
906         /* Find the last matchbits I saw this new peer using.  Note..
907            A. This peer cannot be in the peer table - she's new!
908            B. If I can't find the peer in the closing/zombie peers, all
909               matchbits are safe because all refs to the (old) peer have gone
910               so all txs have completed so there's no risk of matchbit
911               collision!
912          */
913
914         LASSERT(kptllnd_id2peer_locked(lpid) == NULL);
915
916         /* peer's last matchbits can't change after it comes out of the peer
917          * table, so first match is fine */
918
919         list_for_each (tmp, &kptllnd_data.kptl_closing_peers) {
920                 peer = list_entry (tmp, kptl_peer_t, peer_list);
921
922                 if (peer->peer_id.nid == lpid.nid &&
923                     peer->peer_id.pid == lpid.pid)
924                         return peer->peer_last_matchbits_seen;
925         }
926         
927         list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
928                 peer = list_entry (tmp, kptl_peer_t, peer_list);
929
930                 if (peer->peer_id.nid == lpid.nid &&
931                     peer->peer_id.pid == lpid.pid)
932                         return peer->peer_last_matchbits_seen;
933         }
934         
935         return PTL_RESERVED_MATCHBITS;
936 }
937
938 kptl_peer_t *
939 kptllnd_peer_handle_hello (ptl_process_id_t  initiator,
940                            kptl_msg_t       *msg)
941 {
942         rwlock_t           *g_lock = &kptllnd_data.kptl_peer_rw_lock;
943         kptl_peer_t        *peer;
944         kptl_peer_t        *new_peer;
945         lnet_process_id_t   lpid;
946         unsigned long       flags;
947         kptl_tx_t          *hello_tx;
948         int                 rc;
949         __u64               safe_matchbits;
950         __u64               last_matchbits_seen;
951
952         lpid.nid = msg->ptlm_srcnid;
953         lpid.pid = msg->ptlm_srcpid;
954
955         CDEBUG(D_NET, "hello from %s(%s)\n",
956                libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
957
958         if (initiator.pid != kptllnd_data.kptl_portals_id.pid &&
959             (msg->ptlm_srcpid & LNET_PID_USERFLAG) == 0) {
960                 /* If the peer's PID isn't _the_ ptllnd kernel pid, she must be
961                  * userspace.  Refuse the connection if she hasn't set the
962                  * correct flag in her PID... */
963                 CERROR("Userflag not set in hello from %s (%s)\n",
964                        libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
965                 return NULL;
966         }
967         
968         /* kptlhm_matchbits are the highest matchbits my peer may have used to
969          * RDMA to me.  I ensure I never register buffers for RDMA that could
970          * match any she used */
971         safe_matchbits = msg->ptlm_u.hello.kptlhm_matchbits + 1;
972
973         if (safe_matchbits < PTL_RESERVED_MATCHBITS) {
974                 CERROR("Illegal matchbits "LPX64" in HELLO from %s\n",
975                        safe_matchbits, libcfs_id2str(lpid));
976                 return NULL;
977         }
978         
979         if (msg->ptlm_u.hello.kptlhm_max_msg_size < PTLLND_MIN_BUFFER_SIZE) {
980                 CERROR("%s: max message size %d < MIN %d",
981                        libcfs_id2str(lpid),
982                        msg->ptlm_u.hello.kptlhm_max_msg_size,
983                        PTLLND_MIN_BUFFER_SIZE);
984                 return NULL;
985         }
986
987         if (msg->ptlm_credits <= 1) {
988                 CERROR("Need more than 1+%d credits from %s\n",
989                        msg->ptlm_credits, libcfs_id2str(lpid));
990                 return NULL;
991         }
992         
993         write_lock_irqsave(g_lock, flags);
994
995         peer = kptllnd_id2peer_locked(lpid);
996         if (peer != NULL) {
997                 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
998                         /* Completing HELLO handshake */
999                         LASSERT(peer->peer_incarnation == 0);
1000
1001                         if (msg->ptlm_dststamp != 0 &&
1002                             msg->ptlm_dststamp != peer->peer_myincarnation) {
1003                                 write_unlock_irqrestore(g_lock, flags);
1004
1005                                 CERROR("Ignoring HELLO from %s: unexpected "
1006                                        "dststamp "LPX64" ("LPX64" wanted)\n",
1007                                        libcfs_id2str(lpid),
1008                                        msg->ptlm_dststamp,
1009                                        peer->peer_myincarnation);
1010                                 kptllnd_peer_decref(peer);
1011                                 return NULL;
1012                         }
1013                         
1014                         /* Concurrent initiation or response to my HELLO */
1015                         peer->peer_state = PEER_STATE_ACTIVE;
1016                         peer->peer_incarnation = msg->ptlm_srcstamp;
1017                         peer->peer_next_matchbits = safe_matchbits;
1018                         peer->peer_max_msg_size =
1019                                 msg->ptlm_u.hello.kptlhm_max_msg_size;
1020                         
1021                         write_unlock_irqrestore(g_lock, flags);
1022                         return peer;
1023                 }
1024
1025                 if (msg->ptlm_dststamp != 0 &&
1026                     msg->ptlm_dststamp <= peer->peer_myincarnation) {
1027                         write_unlock_irqrestore(g_lock, flags);
1028
1029                         CERROR("Ignoring stale HELLO from %s: "
1030                                "dststamp "LPX64" (current "LPX64")\n",
1031                                libcfs_id2str(lpid),
1032                                msg->ptlm_dststamp,
1033                                peer->peer_myincarnation);
1034                         kptllnd_peer_decref(peer);
1035                         return NULL;
1036                 }
1037
1038                 /* Brand new connection attempt: remove old incarnation */
1039                 kptllnd_peer_close_locked(peer, 0);
1040         }
1041
1042         kptllnd_cull_peertable_locked(lpid);
1043
1044         write_unlock_irqrestore(g_lock, flags);
1045
1046         if (peer != NULL) {
1047                 CDEBUG(D_NET, "Peer %s (%s) reconnecting:"
1048                        " stamp "LPX64"("LPX64")\n",
1049                        libcfs_id2str(lpid), kptllnd_ptlid2str(initiator),
1050                        msg->ptlm_srcstamp, peer->peer_incarnation);
1051
1052                 kptllnd_peer_decref(peer);
1053         }
1054
1055         hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1056         if (hello_tx == NULL) {
1057                 CERROR("Unable to allocate HELLO message for %s\n",
1058                        libcfs_id2str(lpid));
1059                 return NULL;
1060         }
1061
1062         kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1063                          sizeof(kptl_hello_msg_t));
1064
1065         new_peer = kptllnd_peer_allocate(lpid, initiator);
1066         if (new_peer == NULL) {
1067                 kptllnd_tx_decref(hello_tx);
1068                 return NULL;
1069         }
1070
1071         rc = kptllnd_peer_reserve_buffers();
1072         if (rc != 0) {
1073                 kptllnd_peer_decref(new_peer);
1074                 kptllnd_tx_decref(hello_tx);
1075
1076                 CERROR("Failed to reserve buffers for %s\n",
1077                        libcfs_id2str(lpid));
1078                 return NULL;
1079         }
1080
1081         write_lock_irqsave(g_lock, flags);
1082
1083  again:
1084         if (kptllnd_data.kptl_shutdown) {
1085                 write_unlock_irqrestore(g_lock, flags);
1086
1087                 CERROR ("Shutdown started, refusing connection from %s\n",
1088                         libcfs_id2str(lpid));
1089                 kptllnd_peer_unreserve_buffers();
1090                 kptllnd_peer_decref(new_peer);
1091                 kptllnd_tx_decref(hello_tx);
1092                 return NULL;
1093         }
1094
1095         peer = kptllnd_id2peer_locked(lpid);
1096         if (peer != NULL) {
1097                 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
1098                         /* An outgoing message instantiated 'peer' for me */
1099                         LASSERT(peer->peer_incarnation == 0);
1100
1101                         peer->peer_state = PEER_STATE_ACTIVE;
1102                         peer->peer_incarnation = msg->ptlm_srcstamp;
1103                         peer->peer_next_matchbits = safe_matchbits;
1104                         peer->peer_max_msg_size =
1105                                 msg->ptlm_u.hello.kptlhm_max_msg_size;
1106
1107                         write_unlock_irqrestore(g_lock, flags);
1108
1109                         CWARN("Outgoing instantiated peer %s\n",
1110                               libcfs_id2str(lpid));
1111                 } else {
1112                         LASSERT (peer->peer_state == PEER_STATE_ACTIVE);
1113
1114                         write_unlock_irqrestore(g_lock, flags);
1115
1116                         /* WOW!  Somehow this peer completed the HELLO
1117                          * handshake while I slept.  I guess I could have slept
1118                          * while it rebooted and sent a new HELLO, so I'll fail
1119                          * this one... */
1120                         CWARN("Wow! peer %s\n", libcfs_id2str(lpid));
1121                         kptllnd_peer_decref(peer);
1122                         peer = NULL;
1123                 }
1124
1125                 kptllnd_peer_unreserve_buffers();
1126                 kptllnd_peer_decref(new_peer);
1127                 kptllnd_tx_decref(hello_tx);
1128                 return peer;
1129         }
1130
1131         if (kptllnd_data.kptl_n_active_peers ==
1132             kptllnd_data.kptl_expected_peers) {
1133                 /* peer table full */
1134                 write_unlock_irqrestore(g_lock, flags);
1135
1136                 kptllnd_peertable_overflow_msg("Connection from ", lpid);
1137
1138                 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1139                 if (rc != 0) {
1140                         CERROR("Refusing connection from %s\n",
1141                                libcfs_id2str(lpid));
1142                         kptllnd_peer_unreserve_buffers();
1143                         kptllnd_peer_decref(new_peer);
1144                         kptllnd_tx_decref(hello_tx);
1145                         return NULL;
1146                 }
1147                 
1148                 write_lock_irqsave(g_lock, flags);
1149                 kptllnd_data.kptl_expected_peers++;
1150                 goto again;
1151         }
1152
1153         last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(lpid);
1154
1155         hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1156         hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1157                 *kptllnd_tunables.kptl_max_msg_size;
1158
1159         new_peer->peer_state = PEER_STATE_ACTIVE;
1160         new_peer->peer_incarnation = msg->ptlm_srcstamp;
1161         new_peer->peer_next_matchbits = safe_matchbits;
1162         new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1163         new_peer->peer_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size;
1164
1165         kptllnd_peer_add_peertable_locked(new_peer);
1166
1167         write_unlock_irqrestore(g_lock, flags);
1168
1169         /* NB someone else could get in now and post a message before I post
1170          * the HELLO, but post_tx/check_sends take care of that! */
1171
1172         CDEBUG(D_NETTRACE, "%s: post response hello %p\n",
1173                libcfs_id2str(new_peer->peer_id), hello_tx);
1174
1175         kptllnd_post_tx(new_peer, hello_tx, 0);
1176         kptllnd_peer_check_sends(new_peer);
1177
1178         return new_peer;
1179 }
1180
1181 void
1182 kptllnd_tx_launch(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
1183 {
1184         kptllnd_post_tx(peer, tx, nfrag);
1185         kptllnd_peer_check_sends(peer);
1186 }
1187
1188 int
1189 kptllnd_find_target(kptl_peer_t **peerp, lnet_process_id_t target)
1190 {
1191         rwlock_t         *g_lock = &kptllnd_data.kptl_peer_rw_lock;
1192         ptl_process_id_t  ptl_id;
1193         kptl_peer_t      *new_peer;
1194         kptl_tx_t        *hello_tx;
1195         unsigned long     flags;
1196         int               rc;
1197         __u64             last_matchbits_seen;
1198
1199         /* I expect to find the peer, so I only take a read lock... */
1200         read_lock_irqsave(g_lock, flags);
1201         *peerp = kptllnd_id2peer_locked(target);
1202         read_unlock_irqrestore(g_lock, flags);
1203
1204         if (*peerp != NULL)
1205                 return 0;
1206         
1207         if ((target.pid & LNET_PID_USERFLAG) != 0) {
1208                 CWARN("Refusing to create a new connection to %s "
1209                       "(non-kernel peer)\n", libcfs_id2str(target));
1210                 return -EHOSTUNREACH;
1211         }
1212
1213         /* The new peer is a kernel ptllnd, and kernel ptllnds all have
1214          * the same portals PID */
1215         ptl_id.nid = kptllnd_lnet2ptlnid(target.nid);
1216         ptl_id.pid = kptllnd_data.kptl_portals_id.pid;
1217
1218         hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1219         if (hello_tx == NULL) {
1220                 CERROR("Unable to allocate connect message for %s\n",
1221                        libcfs_id2str(target));
1222                 return -ENOMEM;
1223         }
1224
1225         kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1226                          sizeof(kptl_hello_msg_t));
1227
1228         new_peer = kptllnd_peer_allocate(target, ptl_id);
1229         if (new_peer == NULL) {
1230                 rc = -ENOMEM;
1231                 goto unwind_0;
1232         }
1233
1234         rc = kptllnd_peer_reserve_buffers();
1235         if (rc != 0)
1236                 goto unwind_1;
1237
1238         write_lock_irqsave(g_lock, flags);
1239  again:
1240         if (kptllnd_data.kptl_shutdown) {
1241                 write_unlock_irqrestore(g_lock, flags);
1242                 rc = -ESHUTDOWN;
1243                 goto unwind_2;
1244         }
1245
1246         *peerp = kptllnd_id2peer_locked(target);
1247         if (*peerp != NULL) {
1248                 write_unlock_irqrestore(g_lock, flags);
1249                 goto unwind_2;
1250         }
1251
1252         kptllnd_cull_peertable_locked(target);
1253
1254         if (kptllnd_data.kptl_n_active_peers ==
1255             kptllnd_data.kptl_expected_peers) {
1256                 /* peer table full */
1257                 write_unlock_irqrestore(g_lock, flags);
1258
1259                 kptllnd_peertable_overflow_msg("Connection to ", target);
1260
1261                 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1262                 if (rc != 0) {
1263                         CERROR("Can't create connection to %s\n",
1264                                libcfs_id2str(target));
1265                         rc = -ENOMEM;
1266                         goto unwind_2;
1267                 }
1268                 write_lock_irqsave(g_lock, flags);
1269                 kptllnd_data.kptl_expected_peers++;
1270                 goto again;
1271         }
1272
1273         last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(target);
1274
1275         hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1276         hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1277                 *kptllnd_tunables.kptl_max_msg_size;
1278                 
1279         new_peer->peer_state = PEER_STATE_WAITING_HELLO;
1280         new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1281         
1282         kptllnd_peer_add_peertable_locked(new_peer);
1283
1284         write_unlock_irqrestore(g_lock, flags);
1285
1286         /* NB someone else could get in now and post a message before I post
1287          * the HELLO, but post_tx/check_sends take care of that! */
1288
1289         CDEBUG(D_NETTRACE, "%s: post initial hello %p\n",
1290                libcfs_id2str(new_peer->peer_id), hello_tx);
1291
1292         kptllnd_post_tx(new_peer, hello_tx, 0);
1293         kptllnd_peer_check_sends(new_peer);
1294        
1295         *peerp = new_peer;
1296         return 0;
1297         
1298  unwind_2:
1299         kptllnd_peer_unreserve_buffers();
1300  unwind_1:
1301         kptllnd_peer_decref(new_peer);
1302  unwind_0:
1303         kptllnd_tx_decref(hello_tx);
1304
1305         return rc;
1306 }