Whamcloud - gitweb
Severity : major
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd_peer.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved.
5  *   Author: PJ Kirner <pjkirner@clusterfs.com>
6  *           E Barton <eeb@bartonsoftware.com>
7  *
8  *   This file is part of the Lustre file system, http://www.lustre.org
9  *   Lustre is a trademark of Cluster File Systems, Inc.
10  *
11  *   This file is confidential source code owned by Cluster File Systems.
12  *   No viewing, modification, compilation, redistribution, or any other
13  *   form of use is permitted except through a signed license agreement.
14  *
15  *   If you have not signed such an agreement, then you have no rights to
16  *   this file.  Please destroy it immediately and contact CFS.
17  *
18  */
19
20 #include "ptllnd.h"
21 #include <libcfs/list.h>
22
23 static int
24 kptllnd_count_queue(struct list_head *q)
25 {
26         struct list_head *e;
27         int               n = 0;
28         
29         list_for_each(e, q) {
30                 n++;
31         }
32
33         return n;
34 }
35
36 int
37 kptllnd_get_peer_info(int index, 
38                       lnet_process_id_t *id,
39                       int *state, int *sent_hello,
40                       int *refcount, __u64 *incarnation,
41                       __u64 *next_matchbits, __u64 *last_matchbits_seen,
42                       int *nsendq, int *nactiveq,
43                       int *credits, int *outstanding_credits) 
44 {
45         rwlock_t         *g_lock = &kptllnd_data.kptl_peer_rw_lock;
46         unsigned long     flags;
47         struct list_head *ptmp;
48         kptl_peer_t      *peer;
49         int               i;
50         int               rc = -ENOENT;
51
52         read_lock_irqsave(g_lock, flags);
53
54         for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++) {
55                 
56                 list_for_each (ptmp, &kptllnd_data.kptl_peers[i]) {
57                         peer = list_entry(ptmp, kptl_peer_t, peer_list);
58
59                         if (index-- > 0)
60                                 continue;
61                         
62                         *id          = peer->peer_id;
63                         *state       = peer->peer_state;
64                         *sent_hello  = peer->peer_sent_hello;
65                         *refcount    = atomic_read(&peer->peer_refcount);
66                         *incarnation = peer->peer_incarnation;
67
68                         spin_lock(&peer->peer_lock);
69
70                         *next_matchbits      = peer->peer_next_matchbits;
71                         *last_matchbits_seen = peer->peer_last_matchbits_seen;
72                         *credits             = peer->peer_credits;
73                         *outstanding_credits = peer->peer_outstanding_credits;
74
75                         *nsendq   = kptllnd_count_queue(&peer->peer_sendq);
76                         *nactiveq = kptllnd_count_queue(&peer->peer_activeq);
77
78                         spin_unlock(&peer->peer_lock);
79
80                         rc = 0;
81                         goto out;
82                 }
83         }
84         
85  out:
86         read_unlock_irqrestore(g_lock, flags);
87         return rc;
88 }
89
90 void
91 kptllnd_peer_add_peertable_locked (kptl_peer_t *peer)
92 {
93         LASSERT (kptllnd_data.kptl_n_active_peers <
94                  kptllnd_data.kptl_expected_peers);
95
96         LASSERT (peer->peer_state == PEER_STATE_WAITING_HELLO ||
97                  peer->peer_state == PEER_STATE_ACTIVE);
98         
99         kptllnd_data.kptl_n_active_peers++;
100         atomic_inc(&peer->peer_refcount);       /* +1 ref for the list */
101
102         /* NB add to HEAD of peer list for MRU order!
103          * (see kptllnd_cull_peertable) */
104         list_add(&peer->peer_list, kptllnd_nid2peerlist(peer->peer_id.nid));
105 }
106
107 void
108 kptllnd_cull_peertable_locked (lnet_process_id_t pid)
109 {
110         /* I'm about to add a new peer with this portals ID to the peer table,
111          * so (a) this peer should not exist already and (b) I want to leave at
112          * most (max_procs_per_nid - 1) peers with this NID in the table. */
113         struct list_head  *peers = kptllnd_nid2peerlist(pid.nid);
114         int                cull_count = *kptllnd_tunables.kptl_max_procs_per_node;
115         int                count;
116         struct list_head  *tmp;
117         struct list_head  *nxt;
118         kptl_peer_t       *peer;
119         
120         count = 0;
121         list_for_each_safe (tmp, nxt, peers) {
122                 /* NB I rely on kptllnd_peer_add_peertable_locked to add peers
123                  * in MRU order */
124                 peer = list_entry(tmp, kptl_peer_t, peer_list);
125                         
126                 if (peer->peer_id.nid != pid.nid)
127                         continue;
128
129                 LASSERT (peer->peer_id.pid != pid.pid);
130                         
131                 count++;
132
133                 if (count < cull_count) /* recent (don't cull) */
134                         continue;
135
136                 CDEBUG(D_NET, "Cull %s(%s)\n",
137                        libcfs_id2str(peer->peer_id),
138                        kptllnd_ptlid2str(peer->peer_ptlid));
139                 
140                 kptllnd_peer_close_locked(peer, 0);
141         }
142 }
143
144 kptl_peer_t *
145 kptllnd_peer_allocate (lnet_process_id_t lpid, ptl_process_id_t ppid)
146 {
147         unsigned long    flags;
148         kptl_peer_t     *peer;
149
150         LIBCFS_ALLOC(peer, sizeof (*peer));
151         if (peer == NULL) {
152                 CERROR("Can't create peer %s (%s)\n",
153                        libcfs_id2str(lpid), 
154                        kptllnd_ptlid2str(ppid));
155                 return NULL;
156         }
157
158         memset(peer, 0, sizeof(*peer));         /* zero flags etc */
159
160         INIT_LIST_HEAD (&peer->peer_sendq);
161         INIT_LIST_HEAD (&peer->peer_activeq);
162         spin_lock_init (&peer->peer_lock);
163
164         peer->peer_state = PEER_STATE_ALLOCATED;
165         peer->peer_error = 0;
166         peer->peer_last_alive = cfs_time_current();
167         peer->peer_id = lpid;
168         peer->peer_ptlid = ppid;
169         peer->peer_credits = 1;                 /* enough for HELLO */
170         peer->peer_next_matchbits = PTL_RESERVED_MATCHBITS;
171         peer->peer_outstanding_credits = *kptllnd_tunables.kptl_peercredits - 1;
172         peer->peer_sent_credits = 1;           /* HELLO credit is implicit */
173         peer->peer_max_msg_size = PTLLND_MIN_BUFFER_SIZE; /* until we know better */
174
175         atomic_set(&peer->peer_refcount, 1);    /* 1 ref for caller */
176
177         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
178
179         peer->peer_myincarnation = kptllnd_data.kptl_incarnation;
180
181         /* Only increase # peers under lock, to guarantee we dont grow it
182          * during shutdown */
183         if (kptllnd_data.kptl_shutdown) {
184                 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, 
185                                         flags);
186                 LIBCFS_FREE(peer, sizeof(*peer));
187                 return NULL;
188         }
189
190         kptllnd_data.kptl_npeers++;
191         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
192         
193         return peer;
194 }
195
196 void
197 kptllnd_peer_destroy (kptl_peer_t *peer)
198 {
199         unsigned long flags;
200         
201         CDEBUG(D_NET, "Peer=%p\n", peer);
202
203         LASSERT (!in_interrupt());
204         LASSERT (atomic_read(&peer->peer_refcount) == 0);
205         LASSERT (peer->peer_state == PEER_STATE_ALLOCATED ||
206                  peer->peer_state == PEER_STATE_ZOMBIE);
207         LASSERT (list_empty(&peer->peer_sendq));
208         LASSERT (list_empty(&peer->peer_activeq));
209
210         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
211
212         if (peer->peer_state == PEER_STATE_ZOMBIE)
213                 list_del(&peer->peer_list);
214
215         kptllnd_data.kptl_npeers--;
216
217         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
218
219         LIBCFS_FREE (peer, sizeof (*peer));
220 }
221
222 void
223 kptllnd_cancel_txlist (struct list_head *peerq, struct list_head *txs)
224 {
225         struct list_head  *tmp;
226         struct list_head  *nxt;
227         kptl_tx_t         *tx;
228
229         list_for_each_safe (tmp, nxt, peerq) {
230                 tx = list_entry(tmp, kptl_tx_t, tx_list);
231
232                 list_del(&tx->tx_list);
233                 list_add_tail(&tx->tx_list, txs);
234
235                 tx->tx_status = -EIO;
236                 tx->tx_active = 0;
237         }
238 }
239
240 void
241 kptllnd_peer_cancel_txs(kptl_peer_t *peer, struct list_head *txs)
242 {
243         unsigned long   flags;
244
245         spin_lock_irqsave(&peer->peer_lock, flags);
246
247         kptllnd_cancel_txlist(&peer->peer_sendq, txs);
248         kptllnd_cancel_txlist(&peer->peer_activeq, txs);
249                 
250         spin_unlock_irqrestore(&peer->peer_lock, flags);
251 }
252
253 void
254 kptllnd_peer_alive (kptl_peer_t *peer)
255 {
256         /* This is racy, but everyone's only writing cfs_time_current() */
257         peer->peer_last_alive = cfs_time_current();
258         mb();
259 }
260
261 void
262 kptllnd_peer_notify (kptl_peer_t *peer)
263 {
264         unsigned long flags;
265         time_t        last_alive = 0;
266         int           error = 0;
267         
268         spin_lock_irqsave(&peer->peer_lock, flags);
269
270         if (peer->peer_error != 0) {
271                 error = peer->peer_error;
272                 peer->peer_error = 0;
273                 
274                 last_alive = cfs_time_current_sec() - 
275                              cfs_duration_sec(cfs_time_current() - 
276                                               peer->peer_last_alive);
277         }
278         
279         spin_unlock_irqrestore(&peer->peer_lock, flags);
280
281         if (error != 0)
282                 lnet_notify (kptllnd_data.kptl_ni, peer->peer_id.nid, 0,
283                              last_alive);
284 }
285
286 void
287 kptllnd_handle_closing_peers ()
288 {
289         unsigned long           flags;
290         struct list_head        txs;
291         kptl_peer_t            *peer;
292         struct list_head       *tmp;
293         struct list_head       *nxt;
294         kptl_tx_t              *tx;
295         int                     idle;
296
297         /* Check with a read lock first to avoid blocking anyone */
298
299         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
300         idle = list_empty(&kptllnd_data.kptl_closing_peers) &&
301                list_empty(&kptllnd_data.kptl_zombie_peers);
302         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
303
304         if (idle)
305                 return;
306
307         INIT_LIST_HEAD(&txs);
308
309         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
310
311         /* Cancel txs on all zombie peers.  NB anyone dropping the last peer
312          * ref removes it from this list, so I musn't drop the lock while
313          * scanning it. */
314         list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
315                 peer = list_entry (tmp, kptl_peer_t, peer_list);
316
317                 LASSERT (peer->peer_state == PEER_STATE_ZOMBIE);
318
319                 kptllnd_peer_cancel_txs(peer, &txs);
320         }
321
322         /* Notify LNET and cancel txs on closing (i.e. newly closed) peers.  NB
323          * I'm the only one removing from this list, but peers can be added on
324          * the end any time I drop the lock. */
325
326         list_for_each_safe (tmp, nxt, &kptllnd_data.kptl_closing_peers) {
327                 peer = list_entry (tmp, kptl_peer_t, peer_list);
328
329                 LASSERT (peer->peer_state == PEER_STATE_CLOSING);
330
331                 list_del(&peer->peer_list);
332                 list_add_tail(&peer->peer_list,
333                               &kptllnd_data.kptl_zombie_peers);
334                 peer->peer_state = PEER_STATE_ZOMBIE;
335
336                 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
337
338                 kptllnd_peer_notify(peer);
339                 kptllnd_peer_cancel_txs(peer, &txs);
340                 kptllnd_peer_decref(peer);
341
342                 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
343         }
344
345         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
346
347         /* Drop peer's ref on all cancelled txs.  This will get
348          * kptllnd_tx_fini() to abort outstanding comms if necessary. */
349
350         list_for_each_safe (tmp, nxt, &txs) {
351                 tx = list_entry(tmp, kptl_tx_t, tx_list);
352                 list_del(&tx->tx_list);
353                 kptllnd_tx_decref(tx);
354         }
355 }
356
357 void
358 kptllnd_peer_close_locked(kptl_peer_t *peer, int why)
359 {
360         switch (peer->peer_state) {
361         default:
362                 LBUG();
363
364         case PEER_STATE_WAITING_HELLO:
365         case PEER_STATE_ACTIVE:
366                 /* Ensure new peers see a new incarnation of me */
367                 LASSERT(peer->peer_myincarnation <= kptllnd_data.kptl_incarnation);
368                 if (peer->peer_myincarnation == kptllnd_data.kptl_incarnation)
369                         kptllnd_data.kptl_incarnation++;
370
371                 /* Removing from peer table */
372                 kptllnd_data.kptl_n_active_peers--;
373                 LASSERT (kptllnd_data.kptl_n_active_peers >= 0);
374
375                 list_del(&peer->peer_list);
376                 kptllnd_peer_unreserve_buffers();
377
378                 peer->peer_error = why; /* stash 'why' only on first close */
379                 peer->peer_state = PEER_STATE_CLOSING;
380
381                 /* Schedule for immediate attention, taking peer table's ref */
382                 list_add_tail(&peer->peer_list, 
383                               &kptllnd_data.kptl_closing_peers);
384                 wake_up(&kptllnd_data.kptl_watchdog_waitq);
385                 break;
386
387         case PEER_STATE_ZOMBIE:
388         case PEER_STATE_CLOSING:
389                 break;
390         }
391 }
392
393 void
394 kptllnd_peer_close(kptl_peer_t *peer, int why)
395 {
396         unsigned long      flags;
397
398         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
399         kptllnd_peer_close_locked(peer, why);
400         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
401 }
402
403 int
404 kptllnd_peer_del(lnet_process_id_t id)
405 {
406         struct list_head  *ptmp;
407         struct list_head  *pnxt;
408         kptl_peer_t       *peer;
409         int                lo;
410         int                hi;
411         int                i;
412         unsigned long      flags;
413         int                rc = -ENOENT;
414
415         /*
416          * Find the single bucket we are supposed to look at or if nid is a
417          * wildcard (LNET_NID_ANY) then look at all of the buckets
418          */
419         if (id.nid != LNET_NID_ANY) {
420                 struct list_head *l = kptllnd_nid2peerlist(id.nid);
421                 
422                 lo = hi =  l - kptllnd_data.kptl_peers;
423         } else {
424                 if (id.pid != LNET_PID_ANY)
425                         return -EINVAL;
426                 
427                 lo = 0;
428                 hi = kptllnd_data.kptl_peer_hash_size - 1;
429         }
430
431 again:
432         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
433
434         for (i = lo; i <= hi; i++) {
435                 list_for_each_safe (ptmp, pnxt, &kptllnd_data.kptl_peers[i]) {
436                         peer = list_entry (ptmp, kptl_peer_t, peer_list);
437
438                         if (!(id.nid == LNET_NID_ANY || 
439                               (peer->peer_id.nid == id.nid &&
440                                (id.pid == LNET_PID_ANY || 
441                                 peer->peer_id.pid == id.pid))))
442                                 continue;
443
444                         kptllnd_peer_addref(peer); /* 1 ref for me... */
445
446                         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
447                                                flags);
448
449                         kptllnd_peer_close(peer, 0);
450                         kptllnd_peer_decref(peer); /* ...until here */
451
452                         rc = 0;         /* matched something */
453
454                         /* start again now I've dropped the lock */
455                         goto again;
456                 }
457         }
458
459         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
460
461         return (rc);
462 }
463
464 void
465 kptllnd_post_tx(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
466 {
467         /* CAVEAT EMPTOR: I take over caller's ref on 'tx' */
468         ptl_handle_md_t  rdma_mdh = PTL_INVALID_HANDLE;
469         ptl_handle_md_t  msg_mdh = PTL_INVALID_HANDLE;
470         ptl_handle_me_t  meh;
471         ptl_md_t         md;
472         ptl_err_t        prc;
473         unsigned long    flags;
474
475         LASSERT (!tx->tx_idle);
476         LASSERT (!tx->tx_active);
477         LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
478         LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
479         LASSERT (tx->tx_type == TX_TYPE_SMALL_MESSAGE ||
480                  tx->tx_type == TX_TYPE_PUT_REQUEST ||
481                  tx->tx_type == TX_TYPE_GET_REQUEST);
482
483         kptllnd_set_tx_peer(tx, peer);
484
485         if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
486             tx->tx_type == TX_TYPE_GET_REQUEST) {
487
488                 spin_lock_irqsave(&peer->peer_lock, flags);
489
490                 /* Assume 64-bit matchbits can't wrap */
491                 LASSERT (peer->peer_next_matchbits >= PTL_RESERVED_MATCHBITS);
492                 tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits =
493                         peer->peer_next_matchbits++;
494                         
495                 spin_unlock_irqrestore(&peer->peer_lock, flags);
496
497                 prc = PtlMEAttach(kptllnd_data.kptl_nih,
498                                   *kptllnd_tunables.kptl_portal,
499                                   peer->peer_ptlid,
500                                   tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits,
501                                   0,             /* ignore bits */
502                                   PTL_UNLINK,
503                                   PTL_INS_BEFORE,
504                                   &meh);
505                 if (prc != PTL_OK) {
506                         CERROR("PtlMEAttach(%s) failed: %d\n",
507                                libcfs_id2str(peer->peer_id), prc);
508                         goto failed;
509                 }
510
511                 prc = PtlMDAttach(meh, tx->tx_rdma_md, PTL_UNLINK, &rdma_mdh);
512                 if (prc != PTL_OK) {
513                         CERROR("PtlMDAttach(%s) failed: %d\n",
514                                libcfs_id2str(tx->tx_peer->peer_id), prc);
515                         prc = PtlMEUnlink(meh);
516                         LASSERT(prc == PTL_OK);
517                         rdma_mdh = PTL_INVALID_HANDLE;
518                         goto failed;
519                 }
520
521                 /* I'm not racing with the event callback here.  It's a bug if
522                  * there's an event on the MD I just attached before I actually
523                  * send the RDMA request message which the event callback
524                  * catches by asserting 'rdma_mdh' is valid. */
525         }
526
527         memset(&md, 0, sizeof(md));
528
529         md.threshold = tx->tx_acked ? 2 : 1;    /* SEND END + ACK? */
530         md.options = PTL_MD_OP_PUT |
531                      PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
532                      PTL_MD_EVENT_START_DISABLE;
533         md.user_ptr = &tx->tx_msg_eventarg;
534         md.eq_handle = kptllnd_data.kptl_eqh;
535
536         if (nfrag == 0) {
537                 md.start = tx->tx_msg;
538                 md.length = tx->tx_msg->ptlm_nob;
539         } else {
540                 LASSERT (nfrag > 1);
541                 LASSERT (tx->tx_frags->iov[0].iov_base == (void *)tx->tx_msg);
542
543                 md.start = tx->tx_frags;
544                 md.length = nfrag;
545                 md.options |= PTL_MD_IOVEC;
546         }
547
548         prc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &msg_mdh);
549         if (prc != PTL_OK) {
550                 msg_mdh = PTL_INVALID_HANDLE;
551                 goto failed;
552         }
553         
554         spin_lock_irqsave(&peer->peer_lock, flags);
555
556         tx->tx_deadline = jiffies + (*kptllnd_tunables.kptl_timeout * HZ);
557         tx->tx_active = 1;
558         tx->tx_rdma_mdh = rdma_mdh;
559         tx->tx_msg_mdh = msg_mdh;
560
561         /* Ensure HELLO is sent first */
562         if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_HELLO)
563                 list_add(&tx->tx_list, &peer->peer_sendq);
564         else
565                 list_add_tail(&tx->tx_list, &peer->peer_sendq);
566
567         spin_unlock_irqrestore(&peer->peer_lock, flags);
568         return;
569         
570  failed:
571         spin_lock_irqsave(&peer->peer_lock, flags);
572
573         tx->tx_status = -EIO;
574         tx->tx_rdma_mdh = rdma_mdh;
575         tx->tx_msg_mdh = msg_mdh;
576
577         spin_unlock_irqrestore(&peer->peer_lock, flags);
578
579         kptllnd_tx_decref(tx);
580 }
581
582 void
583 kptllnd_peer_check_sends (kptl_peer_t *peer)
584 {
585
586         kptl_tx_t       *tx;
587         int              rc;
588         unsigned long    flags;
589
590         LASSERT(!in_interrupt());
591
592         spin_lock_irqsave(&peer->peer_lock, flags);
593
594         peer->peer_retry_noop = 0;
595
596         if (list_empty(&peer->peer_sendq) &&
597             peer->peer_outstanding_credits >= PTLLND_CREDIT_HIGHWATER &&
598             peer->peer_credits != 0) {
599
600                 /* post a NOOP to return credits */
601                 spin_unlock_irqrestore(&peer->peer_lock, flags);
602
603                 tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
604                 if (tx == NULL) {
605                         CERROR("Can't return credits to %s: can't allocate descriptor\n",
606                                libcfs_id2str(peer->peer_id));
607                 } else {
608                         kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_NOOP, 0);
609                         kptllnd_post_tx(peer, tx, 0);
610                 }
611
612                 spin_lock_irqsave(&peer->peer_lock, flags);
613                 peer->peer_retry_noop = (tx == NULL);
614         }
615
616         while (!list_empty(&peer->peer_sendq)) {
617                 tx = list_entry (peer->peer_sendq.next, kptl_tx_t, tx_list);
618
619                 LASSERT (tx->tx_active);
620                 LASSERT (!PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
621                 LASSERT (tx->tx_type == TX_TYPE_SMALL_MESSAGE ||
622                          !PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
623
624                 LASSERT (peer->peer_outstanding_credits >= 0);
625                 LASSERT (peer->peer_sent_credits >= 0);
626                 LASSERT (peer->peer_sent_credits +
627                          peer->peer_outstanding_credits <=
628                          *kptllnd_tunables.kptl_peercredits);
629                 LASSERT (peer->peer_credits >= 0);
630
631                 /* Ensure HELLO is sent first */
632                 if (!peer->peer_sent_hello) {
633                         if (tx->tx_msg->ptlm_type != PTLLND_MSG_TYPE_HELLO)
634                                 break;
635                         peer->peer_sent_hello = 1;
636                 }
637
638                 if (peer->peer_credits == 0) {
639                         CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: no credits for %p\n",
640                                libcfs_id2str(peer->peer_id), 
641                                peer->peer_credits,
642                                peer->peer_outstanding_credits, 
643                                peer->peer_sent_credits, tx);
644                         break;
645                 }
646
647                 /* Don't use the last credit unless I've got credits to
648                  * return */
649                 if (peer->peer_credits == 1 &&
650                     peer->peer_outstanding_credits == 0) {
651                         CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: "
652                                "not using last credit for %p\n",
653                                libcfs_id2str(peer->peer_id), 
654                                peer->peer_credits,
655                                peer->peer_outstanding_credits,
656                                peer->peer_sent_credits, tx);
657                         break;
658                 }
659
660                 list_del(&tx->tx_list);
661
662                 /* Discard any NOOP I queued if I'm not at the high-water mark
663                  * any more or more messages have been queued */
664                 if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_NOOP &&
665                     (!list_empty(&peer->peer_sendq) ||
666                      peer->peer_outstanding_credits < PTLLND_CREDIT_HIGHWATER)) {
667
668                         tx->tx_active = 0;
669
670                         spin_unlock_irqrestore(&peer->peer_lock, flags);
671
672                         CDEBUG(D_NET, "%s: redundant noop\n", 
673                                libcfs_id2str(peer->peer_id));
674                         kptllnd_tx_decref(tx);
675
676                         spin_lock_irqsave(&peer->peer_lock, flags);
677                         continue;
678                 }
679
680                 /* fill last-minute msg header fields */
681                 kptllnd_msg_pack(tx->tx_msg, peer);
682
683                 peer->peer_sent_credits += peer->peer_outstanding_credits;
684                 peer->peer_outstanding_credits = 0;
685                 peer->peer_credits--;
686
687                 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: %s tx=%p nob=%d cred=%d\n",
688                        libcfs_id2str(peer->peer_id), peer->peer_credits,
689                        peer->peer_outstanding_credits, peer->peer_sent_credits,
690                        kptllnd_msgtype2str(tx->tx_msg->ptlm_type),
691                        tx, tx->tx_msg->ptlm_nob,
692                        tx->tx_msg->ptlm_credits);
693
694                 list_add_tail(&tx->tx_list, &peer->peer_activeq);
695
696                 kptllnd_tx_addref(tx);          /* 1 ref for me... */
697
698                 spin_unlock_irqrestore(&peer->peer_lock, flags);
699
700                 tx->tx_tposted = jiffies;       /* going on the wire */
701
702                 rc = PtlPut (tx->tx_msg_mdh,
703                              tx->tx_acked ? PTL_ACK_REQ : PTL_NOACK_REQ,
704                              peer->peer_ptlid,
705                              *kptllnd_tunables.kptl_portal,
706                              0,                 /* acl cookie */
707                              LNET_MSG_MATCHBITS,
708                              0,                 /* offset */
709                              0);                /* header data */
710                 if (rc != PTL_OK) {
711                         CERROR("PtlPut %s error %d\n",
712                                libcfs_id2str(peer->peer_id), rc);
713
714                         /* Nuke everything (including this tx) */
715                         kptllnd_peer_close(peer, -EIO);
716                         return;
717                 }
718
719                 kptllnd_tx_decref(tx);          /* drop my ref */
720
721                 spin_lock_irqsave(&peer->peer_lock, flags);
722         }
723
724         spin_unlock_irqrestore(&peer->peer_lock, flags);
725 }
726
727 kptl_tx_t *
728 kptllnd_find_timed_out_tx(kptl_peer_t *peer)
729 {
730         kptl_tx_t         *tx;
731         struct list_head  *tmp;
732
733         list_for_each(tmp, &peer->peer_sendq) {
734                 tx = list_entry(peer->peer_sendq.next, kptl_tx_t, tx_list);
735
736                 if (time_after_eq(jiffies, tx->tx_deadline)) {
737                         kptllnd_tx_addref(tx);
738                         return tx;
739                 }
740         }
741
742         list_for_each(tmp, &peer->peer_activeq) {
743                 tx = list_entry(peer->peer_activeq.next, kptl_tx_t, tx_list);
744
745                 if (time_after_eq(jiffies, tx->tx_deadline)) {
746                         kptllnd_tx_addref(tx);
747                         return tx;
748                 }
749         }
750
751         return NULL;
752 }
753
754
755 void
756 kptllnd_peer_check_bucket (int idx, int stamp)
757 {
758         struct list_head  *peers = &kptllnd_data.kptl_peers[idx];
759         struct list_head  *ptmp;
760         kptl_peer_t       *peer;
761         kptl_tx_t         *tx;
762         unsigned long      flags;
763         int                nsend;
764         int                nactive;
765         int                check_sends;
766
767         CDEBUG(D_NET, "Bucket=%d, stamp=%d\n", idx, stamp);
768
769  again:
770         /* NB. Shared lock while I just look */
771         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
772
773         list_for_each (ptmp, peers) {
774                 peer = list_entry (ptmp, kptl_peer_t, peer_list);
775
776                 CDEBUG(D_NET, "Peer=%s Credits=%d Outstanding=%d Send=%d\n",
777                        libcfs_id2str(peer->peer_id), peer->peer_credits, 
778                        peer->peer_outstanding_credits, peer->peer_sent_credits);
779
780                 spin_lock(&peer->peer_lock);
781
782                 if (peer->peer_check_stamp == stamp) {
783                         /* checked already this pass */
784                         spin_unlock(&peer->peer_lock);
785                         continue;
786                 }
787
788                 peer->peer_check_stamp = stamp;
789                 tx = kptllnd_find_timed_out_tx(peer);
790                 check_sends = peer->peer_retry_noop;
791                 
792                 spin_unlock(&peer->peer_lock);
793                 
794                 if (tx == NULL && !check_sends)
795                         continue;
796
797                 kptllnd_peer_addref(peer); /* 1 ref for me... */
798
799                 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
800
801                 if (tx == NULL) { /* nothing timed out */
802                         kptllnd_peer_check_sends(peer);
803                         kptllnd_peer_decref(peer); /* ...until here or... */
804
805                         /* rescan after dropping the lock */
806                         goto again;
807                 }
808
809                 spin_lock_irqsave(&peer->peer_lock, flags);
810                 nsend = kptllnd_count_queue(&peer->peer_sendq);
811                 nactive = kptllnd_count_queue(&peer->peer_activeq);
812                 spin_unlock_irqrestore(&peer->peer_lock, flags);
813
814                 LCONSOLE_ERROR("Timing out %s: %s\n",
815                                libcfs_id2str(peer->peer_id),
816                                (tx->tx_tposted == 0) ? 
817                                "no free peer buffers" : "please check Portals");
818
819                 CERROR("%s timed out: cred %d outstanding %d, sent %d, "
820                        "sendq %d, activeq %d Tx %p %s (%s%s%s) status %d "
821                        "%sposted %lu T/O %ds\n",
822                        libcfs_id2str(peer->peer_id), peer->peer_credits,
823                        peer->peer_outstanding_credits, peer->peer_sent_credits,
824                        nsend, nactive, tx, kptllnd_tx_typestr(tx->tx_type),
825                        tx->tx_active ? "A" : "",
826                        PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ?
827                        "" : "M",
828                        PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE) ?
829                        "" : "D",
830                        tx->tx_status,
831                        (tx->tx_tposted == 0) ? "not " : "",
832                        (tx->tx_tposted == 0) ? 0UL : (jiffies - tx->tx_tposted),
833                        *kptllnd_tunables.kptl_timeout);
834
835                 kptllnd_dump_ptltrace();
836
837                 kptllnd_tx_decref(tx);
838
839                 kptllnd_peer_close(peer, -ETIMEDOUT);
840                 kptllnd_peer_decref(peer); /* ...until here */
841
842                 /* start again now I've dropped the lock */
843                 goto again;
844         }
845
846         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
847 }
848
849 kptl_peer_t *
850 kptllnd_id2peer_locked (lnet_process_id_t id)
851 {
852         struct list_head *peers = kptllnd_nid2peerlist(id.nid);
853         struct list_head *tmp;
854         kptl_peer_t      *peer;
855
856         list_for_each (tmp, peers) {
857
858                 peer = list_entry (tmp, kptl_peer_t, peer_list);
859
860                 LASSERT(peer->peer_state == PEER_STATE_WAITING_HELLO ||
861                         peer->peer_state == PEER_STATE_ACTIVE);
862                 
863                 if (peer->peer_id.nid != id.nid ||
864                     peer->peer_id.pid != id.pid)
865                         continue;
866
867                 kptllnd_peer_addref(peer);
868
869                 CDEBUG(D_NET, "%s -> %s (%d)\n",
870                        libcfs_id2str(id), 
871                        kptllnd_ptlid2str(peer->peer_ptlid),
872                        atomic_read (&peer->peer_refcount));
873                 return peer;
874         }
875
876         return NULL;
877 }
878
879 void
880 kptllnd_peertable_overflow_msg(char *str, lnet_process_id_t id)
881 {
882         LCONSOLE_ERROR("%s %s overflows the peer table[%d]: "
883                        "messages may be dropped\n",
884                        str, libcfs_id2str(id),
885                        kptllnd_data.kptl_n_active_peers);
886         LCONSOLE_ERROR("Please correct by increasing "
887                        "'max_nodes' or 'max_procs_per_node'\n");
888 }
889
890 __u64
891 kptllnd_get_last_seen_matchbits_locked(lnet_process_id_t lpid)
892 {
893         kptl_peer_t            *peer;
894         struct list_head       *tmp;
895
896         /* Find the last matchbits I saw this new peer using.  Note..
897            A. This peer cannot be in the peer table - she's new!
898            B. If I can't find the peer in the closing/zombie peers, all
899               matchbits are safe because all refs to the (old) peer have gone
900               so all txs have completed so there's no risk of matchbit
901               collision!
902          */
903
904         LASSERT(kptllnd_id2peer_locked(lpid) == NULL);
905
906         /* peer's last matchbits can't change after it comes out of the peer
907          * table, so first match is fine */
908
909         list_for_each (tmp, &kptllnd_data.kptl_closing_peers) {
910                 peer = list_entry (tmp, kptl_peer_t, peer_list);
911
912                 if (peer->peer_id.nid == lpid.nid &&
913                     peer->peer_id.pid == lpid.pid)
914                         return peer->peer_last_matchbits_seen;
915         }
916         
917         list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
918                 peer = list_entry (tmp, kptl_peer_t, peer_list);
919
920                 if (peer->peer_id.nid == lpid.nid &&
921                     peer->peer_id.pid == lpid.pid)
922                         return peer->peer_last_matchbits_seen;
923         }
924         
925         return PTL_RESERVED_MATCHBITS;
926 }
927
928 kptl_peer_t *
929 kptllnd_peer_handle_hello (ptl_process_id_t  initiator,
930                            kptl_msg_t       *msg)
931 {
932         rwlock_t           *g_lock = &kptllnd_data.kptl_peer_rw_lock;
933         kptl_peer_t        *peer;
934         kptl_peer_t        *new_peer;
935         lnet_process_id_t   lpid;
936         unsigned long       flags;
937         kptl_tx_t          *hello_tx;
938         int                 rc;
939         __u64               safe_matchbits;
940         __u64               last_matchbits_seen;
941
942         lpid.nid = msg->ptlm_srcnid;
943         lpid.pid = msg->ptlm_srcpid;
944
945         CDEBUG(D_NET, "hello from %s(%s)\n",
946                libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
947
948         if (initiator.pid != kptllnd_data.kptl_portals_id.pid &&
949             (msg->ptlm_srcpid & LNET_PID_USERFLAG) == 0) {
950                 /* If the peer's PID isn't _the_ ptllnd kernel pid, she must be
951                  * userspace.  Refuse the connection if she hasn't set the
952                  * correct flag in her PID... */
953                 CERROR("Userflag not set in hello from %s (%s)\n",
954                        libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
955                 return NULL;
956         }
957         
958         /* kptlhm_matchbits are the highest matchbits my peer may have used to
959          * RDMA to me.  I ensure I never register buffers for RDMA that could
960          * match any she used */
961         safe_matchbits = msg->ptlm_u.hello.kptlhm_matchbits + 1;
962
963         if (safe_matchbits < PTL_RESERVED_MATCHBITS) {
964                 CERROR("Illegal matchbits "LPX64" in HELLO from %s\n",
965                        safe_matchbits, libcfs_id2str(lpid));
966                 return NULL;
967         }
968         
969         if (msg->ptlm_u.hello.kptlhm_max_msg_size < PTLLND_MIN_BUFFER_SIZE) {
970                 CERROR("%s: max message size %d < MIN %d",
971                        libcfs_id2str(lpid),
972                        msg->ptlm_u.hello.kptlhm_max_msg_size,
973                        *kptllnd_tunables.kptl_max_msg_size);
974                 return NULL;
975         }
976
977         if (msg->ptlm_credits <= 1) {
978                 CERROR("Need more than 1+%d credits from %s\n",
979                        msg->ptlm_credits, libcfs_id2str(lpid));
980                 return NULL;
981         }
982         
983         write_lock_irqsave(g_lock, flags);
984
985         peer = kptllnd_id2peer_locked(lpid);
986         if (peer != NULL) {
987                 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
988                         /* Completing HELLO handshake */
989                         LASSERT(peer->peer_incarnation == 0);
990
991                         if (msg->ptlm_dststamp != 0 &&
992                             msg->ptlm_dststamp != peer->peer_myincarnation) {
993                                 write_unlock_irqrestore(g_lock, flags);
994
995                                 CERROR("Ignoring HELLO from %s: unexpected "
996                                        "dststamp "LPX64" ("LPX64" wanted)\n",
997                                        libcfs_id2str(lpid),
998                                        msg->ptlm_dststamp,
999                                        peer->peer_myincarnation);
1000                                 kptllnd_peer_decref(peer);
1001                                 return NULL;
1002                         }
1003                         
1004                         /* Concurrent initiation or response to my HELLO */
1005                         peer->peer_state = PEER_STATE_ACTIVE;
1006                         peer->peer_incarnation = msg->ptlm_srcstamp;
1007                         peer->peer_next_matchbits = safe_matchbits;
1008                         peer->peer_max_msg_size =
1009                                 msg->ptlm_u.hello.kptlhm_max_msg_size;
1010                         
1011                         write_unlock_irqrestore(g_lock, flags);
1012                         return peer;
1013                 }
1014
1015                 if (msg->ptlm_dststamp != 0 &&
1016                     msg->ptlm_dststamp <= peer->peer_myincarnation) {
1017                         write_unlock_irqrestore(g_lock, flags);
1018
1019                         CERROR("Ignoring stale HELLO from %s: "
1020                                "dststamp "LPX64" (current "LPX64")\n",
1021                                libcfs_id2str(lpid),
1022                                msg->ptlm_dststamp,
1023                                peer->peer_myincarnation);
1024                         kptllnd_peer_decref(peer);
1025                         return NULL;
1026                 }
1027
1028                 /* Brand new connection attempt: remove old incarnation */
1029                 kptllnd_peer_close_locked(peer, 0);
1030         }
1031
1032         kptllnd_cull_peertable_locked(lpid);
1033
1034         write_unlock_irqrestore(g_lock, flags);
1035
1036         if (peer != NULL) {
1037                 CDEBUG(D_NET, "Peer %s (%s) reconnecting:"
1038                        " stamp "LPX64"("LPX64")\n",
1039                        libcfs_id2str(lpid), kptllnd_ptlid2str(initiator),
1040                        msg->ptlm_srcstamp, peer->peer_incarnation);
1041
1042                 kptllnd_peer_decref(peer);
1043         }
1044
1045         hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1046         if (hello_tx == NULL) {
1047                 CERROR("Unable to allocate HELLO message for %s\n",
1048                        libcfs_id2str(lpid));
1049                 return NULL;
1050         }
1051
1052         kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1053                          sizeof(kptl_hello_msg_t));
1054
1055         new_peer = kptllnd_peer_allocate(lpid, initiator);
1056         if (new_peer == NULL) {
1057                 kptllnd_tx_decref(hello_tx);
1058                 return NULL;
1059         }
1060
1061         rc = kptllnd_peer_reserve_buffers();
1062         if (rc != 0) {
1063                 kptllnd_peer_decref(new_peer);
1064                 kptllnd_tx_decref(hello_tx);
1065
1066                 CERROR("Failed to reserve buffers for %s\n",
1067                        libcfs_id2str(lpid));
1068                 return NULL;
1069         }
1070
1071         write_lock_irqsave(g_lock, flags);
1072  again:
1073         peer = kptllnd_id2peer_locked(lpid);
1074         if (peer != NULL) {
1075                 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
1076                         /* An outgoing message instantiated 'peer' for me */
1077                         LASSERT(peer->peer_incarnation == 0);
1078
1079                         peer->peer_state = PEER_STATE_ACTIVE;
1080                         peer->peer_incarnation = msg->ptlm_srcstamp;
1081                         peer->peer_next_matchbits = safe_matchbits;
1082                         peer->peer_max_msg_size =
1083                                 msg->ptlm_u.hello.kptlhm_max_msg_size;
1084
1085                         write_unlock_irqrestore(g_lock, flags);
1086
1087                         CWARN("Outgoing instantiated peer %s\n",
1088                               libcfs_id2str(lpid));
1089                 } else {
1090                         LASSERT (peer->peer_state == PEER_STATE_ACTIVE);
1091
1092                         write_unlock_irqrestore(g_lock, flags);
1093
1094                         /* WOW!  Somehow this peer completed the HELLO
1095                          * handshake while I slept.  I guess I could have slept
1096                          * while it rebooted and sent a new HELLO, so I'll fail
1097                          * this one... */
1098                         CWARN("Wow! peer %s\n", libcfs_id2str(lpid));
1099                         kptllnd_peer_decref(peer);
1100                         peer = NULL;
1101                 }
1102
1103                 kptllnd_peer_unreserve_buffers();
1104                 kptllnd_peer_decref(new_peer);
1105                 kptllnd_tx_decref(hello_tx);
1106                 return peer;
1107         }
1108
1109         if (kptllnd_data.kptl_n_active_peers ==
1110             kptllnd_data.kptl_expected_peers) {
1111                 /* peer table full */
1112                 write_unlock_irqrestore(g_lock, flags);
1113
1114                 kptllnd_peertable_overflow_msg("Connection from ", lpid);
1115
1116                 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1117                 if (rc != 0) {
1118                         CERROR("Refusing connection from %s\n",
1119                                libcfs_id2str(lpid));
1120                         kptllnd_peer_unreserve_buffers();
1121                         kptllnd_peer_decref(new_peer);
1122                         kptllnd_tx_decref(hello_tx);
1123                         return NULL;
1124                 }
1125                 
1126                 write_lock_irqsave(g_lock, flags);
1127                 kptllnd_data.kptl_expected_peers++;
1128                 goto again;
1129         }
1130
1131         last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(lpid);
1132
1133         hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1134         hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1135                 *kptllnd_tunables.kptl_max_msg_size;
1136
1137         new_peer->peer_state = PEER_STATE_ACTIVE;
1138         new_peer->peer_incarnation = msg->ptlm_srcstamp;
1139         new_peer->peer_next_matchbits = safe_matchbits;
1140         new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1141         new_peer->peer_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size;
1142
1143         kptllnd_peer_add_peertable_locked(new_peer);
1144
1145         write_unlock_irqrestore(g_lock, flags);
1146
1147         /* NB someone else could get in now and post a message before I post
1148          * the HELLO, but post_tx/check_sends take care of that! */
1149
1150         CDEBUG(D_NETTRACE, "%s: post response hello %p\n",
1151                libcfs_id2str(new_peer->peer_id), hello_tx);
1152
1153         kptllnd_post_tx(new_peer, hello_tx, 0);
1154         kptllnd_peer_check_sends(new_peer);
1155
1156         return new_peer;
1157 }
1158
1159 void
1160 kptllnd_tx_launch(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
1161 {
1162         kptllnd_post_tx(peer, tx, nfrag);
1163         kptllnd_peer_check_sends(peer);
1164 }
1165
1166 int
1167 kptllnd_find_target(kptl_peer_t **peerp, lnet_process_id_t target)
1168 {
1169         rwlock_t         *g_lock = &kptllnd_data.kptl_peer_rw_lock;
1170         ptl_process_id_t  ptl_id;
1171         kptl_peer_t      *new_peer;
1172         kptl_tx_t        *hello_tx;
1173         unsigned long     flags;
1174         int               rc;
1175         __u64             last_matchbits_seen;
1176
1177         /* I expect to find the peer, so I only take a read lock... */
1178         read_lock_irqsave(g_lock, flags);
1179         *peerp = kptllnd_id2peer_locked(target);
1180         read_unlock_irqrestore(g_lock, flags);
1181
1182         if (*peerp != NULL)
1183                 return 0;
1184         
1185         if ((target.pid & LNET_PID_USERFLAG) != 0) {
1186                 CWARN("Refusing to create a new connection to %s "
1187                       "(non-kernel peer)\n", libcfs_id2str(target));
1188                 return -EHOSTUNREACH;
1189         }
1190
1191         /* The new peer is a kernel ptllnd, and kernel ptllnds all have
1192          * the same portals PID */
1193         ptl_id.nid = kptllnd_lnet2ptlnid(target.nid);
1194         ptl_id.pid = kptllnd_data.kptl_portals_id.pid;
1195
1196         hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1197         if (hello_tx == NULL) {
1198                 CERROR("Unable to allocate connect message for %s\n",
1199                        libcfs_id2str(target));
1200                 return -ENOMEM;
1201         }
1202
1203         kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1204                          sizeof(kptl_hello_msg_t));
1205
1206         new_peer = kptllnd_peer_allocate(target, ptl_id);
1207         if (new_peer == NULL) {
1208                 rc = -ENOMEM;
1209                 goto unwind_0;
1210         }
1211
1212         rc = kptllnd_peer_reserve_buffers();
1213         if (rc != 0)
1214                 goto unwind_1;
1215
1216         write_lock_irqsave(g_lock, flags);
1217  again:
1218         *peerp = kptllnd_id2peer_locked(target);
1219         if (*peerp != NULL) {
1220                 write_unlock_irqrestore(g_lock, flags);
1221                 goto unwind_2;
1222         }
1223
1224         kptllnd_cull_peertable_locked(target);
1225
1226         if (kptllnd_data.kptl_n_active_peers ==
1227             kptllnd_data.kptl_expected_peers) {
1228                 /* peer table full */
1229                 write_unlock_irqrestore(g_lock, flags);
1230
1231                 kptllnd_peertable_overflow_msg("Connection to ", target);
1232
1233                 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1234                 if (rc != 0) {
1235                         CERROR("Can't create connection to %s\n",
1236                                libcfs_id2str(target));
1237                         rc = -ENOMEM;
1238                         goto unwind_2;
1239                 }
1240                 write_lock_irqsave(g_lock, flags);
1241                 kptllnd_data.kptl_expected_peers++;
1242                 goto again;
1243         }
1244
1245         last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(target);
1246
1247         hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1248         hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1249                 *kptllnd_tunables.kptl_max_msg_size;
1250                 
1251         new_peer->peer_state = PEER_STATE_WAITING_HELLO;
1252         new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1253         
1254         kptllnd_peer_add_peertable_locked(new_peer);
1255
1256         write_unlock_irqrestore(g_lock, flags);
1257
1258         /* NB someone else could get in now and post a message before I post
1259          * the HELLO, but post_tx/check_sends take care of that! */
1260
1261         CDEBUG(D_NETTRACE, "%s: post initial hello %p\n",
1262                libcfs_id2str(new_peer->peer_id), hello_tx);
1263
1264         kptllnd_post_tx(new_peer, hello_tx, 0);
1265         kptllnd_peer_check_sends(new_peer);
1266        
1267         *peerp = new_peer;
1268         return 0;
1269         
1270  unwind_2:
1271         kptllnd_peer_unreserve_buffers();
1272  unwind_1:
1273         kptllnd_peer_decref(new_peer);
1274  unwind_0:
1275         kptllnd_tx_decref(hello_tx);
1276
1277         return rc;
1278 }