Whamcloud - gitweb
Severity : major
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd_peer.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved.
5  *   Author: PJ Kirner <pjkirner@clusterfs.com>
6  *           E Barton <eeb@bartonsoftware.com>
7  *
8  *   This file is part of the Lustre file system, http://www.lustre.org
9  *   Lustre is a trademark of Cluster File Systems, Inc.
10  *
11  *   This file is confidential source code owned by Cluster File Systems.
12  *   No viewing, modification, compilation, redistribution, or any other
13  *   form of use is permitted except through a signed license agreement.
14  *
15  *   If you have not signed such an agreement, then you have no rights to
16  *   this file.  Please destroy it immediately and contact CFS.
17  *
18  */
19
20 #include "ptllnd.h"
21 #include <libcfs/list.h>
22
23 static int
24 kptllnd_count_queue(struct list_head *q)
25 {
26         struct list_head *e;
27         int               n = 0;
28         
29         list_for_each(e, q) {
30                 n++;
31         }
32
33         return n;
34 }
35
36 int
37 kptllnd_get_peer_info(int index, 
38                       lnet_process_id_t *id,
39                       int *state, int *sent_hello,
40                       int *refcount, __u64 *incarnation,
41                       __u64 *next_matchbits, __u64 *last_matchbits_seen,
42                       int *nsendq, int *nactiveq,
43                       int *credits, int *outstanding_credits) 
44 {
45         rwlock_t         *g_lock = &kptllnd_data.kptl_peer_rw_lock;
46         unsigned long     flags;
47         struct list_head *ptmp;
48         kptl_peer_t      *peer;
49         int               i;
50         int               rc = -ENOENT;
51
52         read_lock_irqsave(g_lock, flags);
53
54         for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++) {
55                 
56                 list_for_each (ptmp, &kptllnd_data.kptl_peers[i]) {
57                         peer = list_entry(ptmp, kptl_peer_t, peer_list);
58
59                         if (index-- > 0)
60                                 continue;
61                         
62                         *id          = peer->peer_id;
63                         *state       = peer->peer_state;
64                         *sent_hello  = peer->peer_sent_hello;
65                         *refcount    = atomic_read(&peer->peer_refcount);
66                         *incarnation = peer->peer_incarnation;
67
68                         spin_lock(&peer->peer_lock);
69
70                         *next_matchbits      = peer->peer_next_matchbits;
71                         *last_matchbits_seen = peer->peer_last_matchbits_seen;
72                         *credits             = peer->peer_credits;
73                         *outstanding_credits = peer->peer_outstanding_credits;
74
75                         *nsendq   = kptllnd_count_queue(&peer->peer_sendq);
76                         *nactiveq = kptllnd_count_queue(&peer->peer_activeq);
77
78                         spin_unlock(&peer->peer_lock);
79
80                         rc = 0;
81                         goto out;
82                 }
83         }
84         
85  out:
86         read_unlock_irqrestore(g_lock, flags);
87         return rc;
88 }
89
90 void
91 kptllnd_peer_add_peertable_locked (kptl_peer_t *peer)
92 {
93         LASSERT (kptllnd_data.kptl_n_active_peers <
94                  kptllnd_data.kptl_expected_peers);
95
96         LASSERT (peer->peer_state == PEER_STATE_WAITING_HELLO ||
97                  peer->peer_state == PEER_STATE_ACTIVE);
98         
99         kptllnd_data.kptl_n_active_peers++;
100         atomic_inc(&peer->peer_refcount);       /* +1 ref for the list */
101
102         /* NB add to HEAD of peer list for MRU order!
103          * (see kptllnd_cull_peertable) */
104         list_add(&peer->peer_list, kptllnd_nid2peerlist(peer->peer_id.nid));
105 }
106
107 void
108 kptllnd_cull_peertable_locked (lnet_process_id_t pid)
109 {
110         /* I'm about to add a new peer with this portals ID to the peer table,
111          * so (a) this peer should not exist already and (b) I want to leave at
112          * most (max_procs_per_nid - 1) peers with this NID in the table. */
113         struct list_head  *peers = kptllnd_nid2peerlist(pid.nid);
114         int                cull_count = *kptllnd_tunables.kptl_max_procs_per_node;
115         int                count;
116         struct list_head  *tmp;
117         struct list_head  *nxt;
118         kptl_peer_t       *peer;
119         
120         count = 0;
121         list_for_each_safe (tmp, nxt, peers) {
122                 /* NB I rely on kptllnd_peer_add_peertable_locked to add peers
123                  * in MRU order */
124                 peer = list_entry(tmp, kptl_peer_t, peer_list);
125                         
126                 if (peer->peer_id.nid != pid.nid)
127                         continue;
128
129                 LASSERT (peer->peer_id.pid != pid.pid);
130                         
131                 count++;
132
133                 if (count < cull_count) /* recent (don't cull) */
134                         continue;
135
136                 CDEBUG(D_NET, "Cull %s(%s)\n",
137                        libcfs_id2str(peer->peer_id),
138                        kptllnd_ptlid2str(peer->peer_ptlid));
139                 
140                 kptllnd_peer_close_locked(peer, 0);
141         }
142 }
143
144 kptl_peer_t *
145 kptllnd_peer_allocate (lnet_process_id_t lpid, ptl_process_id_t ppid)
146 {
147         unsigned long    flags;
148         kptl_peer_t     *peer;
149
150         LIBCFS_ALLOC(peer, sizeof (*peer));
151         if (peer == NULL) {
152                 CERROR("Can't create peer %s (%s)\n",
153                        libcfs_id2str(lpid), 
154                        kptllnd_ptlid2str(ppid));
155                 return NULL;
156         }
157
158         memset(peer, 0, sizeof(*peer));         /* zero flags etc */
159
160         INIT_LIST_HEAD (&peer->peer_sendq);
161         INIT_LIST_HEAD (&peer->peer_activeq);
162         spin_lock_init (&peer->peer_lock);
163
164         peer->peer_state = PEER_STATE_ALLOCATED;
165         peer->peer_error = 0;
166         peer->peer_last_alive = cfs_time_current();
167         peer->peer_id = lpid;
168         peer->peer_ptlid = ppid;
169         peer->peer_credits = 1;                 /* enough for HELLO */
170         peer->peer_next_matchbits = PTL_RESERVED_MATCHBITS;
171         peer->peer_outstanding_credits = *kptllnd_tunables.kptl_peercredits - 1;
172         peer->peer_sent_credits = 1;           /* HELLO credit is implicit */
173         peer->peer_max_msg_size = PTLLND_MIN_BUFFER_SIZE; /* until we know better */
174
175         atomic_set(&peer->peer_refcount, 1);    /* 1 ref for caller */
176
177         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
178
179         peer->peer_myincarnation = kptllnd_data.kptl_incarnation;
180
181         /* Only increase # peers under lock, to guarantee we dont grow it
182          * during shutdown */
183         if (kptllnd_data.kptl_shutdown) {
184                 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, 
185                                         flags);
186                 LIBCFS_FREE(peer, sizeof(*peer));
187                 return NULL;
188         }
189
190         kptllnd_data.kptl_npeers++;
191         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
192         
193         return peer;
194 }
195
196 void
197 kptllnd_peer_destroy (kptl_peer_t *peer)
198 {
199         unsigned long flags;
200         
201         CDEBUG(D_NET, "Peer=%p\n", peer);
202
203         LASSERT (!in_interrupt());
204         LASSERT (atomic_read(&peer->peer_refcount) == 0);
205         LASSERT (peer->peer_state == PEER_STATE_ALLOCATED ||
206                  peer->peer_state == PEER_STATE_ZOMBIE);
207         LASSERT (list_empty(&peer->peer_sendq));
208         LASSERT (list_empty(&peer->peer_activeq));
209
210         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
211
212         if (peer->peer_state == PEER_STATE_ZOMBIE)
213                 list_del(&peer->peer_list);
214
215         kptllnd_data.kptl_npeers--;
216
217         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
218
219         LIBCFS_FREE (peer, sizeof (*peer));
220 }
221
222 void
223 kptllnd_peer_cancel_txs(kptl_peer_t *peer)
224 {
225         struct list_head   sendq;
226         struct list_head   activeq;
227         struct list_head  *tmp;
228         struct list_head  *nxt;
229         kptl_tx_t         *tx;
230         unsigned long      flags;
231
232         /* atomically grab all the peer's tx-es... */
233
234         spin_lock_irqsave(&peer->peer_lock, flags);
235
236         list_add(&sendq, &peer->peer_sendq);
237         list_del_init(&peer->peer_sendq);
238         list_for_each (tmp, &sendq) {
239                 tx = list_entry(tmp, kptl_tx_t, tx_list);
240                 tx->tx_active = 0;
241         }
242
243         list_add(&activeq, &peer->peer_activeq);
244         list_del_init(&peer->peer_activeq);
245         list_for_each (tmp, &activeq) {
246                 tx = list_entry(tmp, kptl_tx_t, tx_list);
247                 tx->tx_active = 0;
248         }
249
250         spin_unlock_irqrestore(&peer->peer_lock, flags);
251
252         /* ...then drop the peer's ref on them at leasure.  This will get
253          * kptllnd_tx_fini() to abort outstanding comms if necessary. */
254
255         list_for_each_safe (tmp, nxt, &sendq) {
256                 tx = list_entry(tmp, kptl_tx_t, tx_list);
257                 list_del(&tx->tx_list);
258                 tx->tx_status = -EIO;
259                 kptllnd_tx_decref(tx);
260         }
261
262         list_for_each_safe (tmp, nxt, &activeq) {
263                 tx = list_entry(tmp, kptl_tx_t, tx_list);
264                 list_del(&tx->tx_list);
265                 tx->tx_status = -EIO;
266                 kptllnd_tx_decref(tx);
267         }
268 }
269
270 void
271 kptllnd_peer_alive (kptl_peer_t *peer)
272 {
273         /* This is racy, but everyone's only writing cfs_time_current() */
274         peer->peer_last_alive = cfs_time_current();
275         mb();
276 }
277
278 void
279 kptllnd_peer_notify (kptl_peer_t *peer)
280 {
281         unsigned long flags;
282         time_t        last_alive = 0;
283         int           error = 0;
284         
285         spin_lock_irqsave(&peer->peer_lock, flags);
286
287         if (peer->peer_error != 0) {
288                 error = peer->peer_error;
289                 peer->peer_error = 0;
290                 
291                 last_alive = cfs_time_current_sec() - 
292                              cfs_duration_sec(cfs_time_current() - 
293                                               peer->peer_last_alive);
294         }
295         
296         spin_unlock_irqrestore(&peer->peer_lock, flags);
297
298         if (error != 0)
299                 lnet_notify (kptllnd_data.kptl_ni, peer->peer_id.nid, 0,
300                              last_alive);
301 }
302
303 void
304 kptllnd_handle_closing_peers ()
305 {
306         unsigned long           flags;
307         kptl_peer_t            *peer;
308         struct list_head       *tmp;
309         struct list_head       *nxt;
310         int                     idle;
311
312         /* Check with a read lock first to avoid blocking anyone */
313
314         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
315         idle = list_empty(&kptllnd_data.kptl_closing_peers);
316         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
317
318         if (idle)
319                 return;
320
321         /* Scan the closing peers and cancel their txs.
322          * NB only safe while there is only a single watchdog */
323
324         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
325
326         list_for_each_safe (tmp, nxt, &kptllnd_data.kptl_closing_peers) {
327                 peer = list_entry (tmp, kptl_peer_t, peer_list);
328
329                 LASSERT (peer->peer_state == PEER_STATE_CLOSING);
330
331                 list_del(&peer->peer_list);
332                 list_add_tail(&peer->peer_list,
333                               &kptllnd_data.kptl_zombie_peers);
334                 peer->peer_state = PEER_STATE_ZOMBIE;
335
336                 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
337
338                 kptllnd_peer_notify(peer);
339                 kptllnd_peer_cancel_txs(peer);
340                 kptllnd_peer_decref(peer);
341
342                 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
343         }
344
345         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
346 }
347
348 void
349 kptllnd_peer_close_locked(kptl_peer_t *peer, int why)
350 {
351         switch (peer->peer_state) {
352         default:
353                 LBUG();
354
355         case PEER_STATE_WAITING_HELLO:
356         case PEER_STATE_ACTIVE:
357                 /* Ensure new peers see a new incarnation of me */
358                 LASSERT(peer->peer_myincarnation <= kptllnd_data.kptl_incarnation);
359                 if (peer->peer_myincarnation == kptllnd_data.kptl_incarnation)
360                         kptllnd_data.kptl_incarnation++;
361
362                 /* Removing from peer table */
363                 kptllnd_data.kptl_n_active_peers--;
364                 LASSERT (kptllnd_data.kptl_n_active_peers >= 0);
365
366                 list_del(&peer->peer_list);
367                 kptllnd_peer_unreserve_buffers();
368
369                 peer->peer_error = why; /* stash 'why' only on first close */
370
371                 /* Schedule for immediate attention, taking peer table's ref */
372                 list_add_tail(&peer->peer_list, 
373                               &kptllnd_data.kptl_closing_peers);
374                 wake_up(&kptllnd_data.kptl_watchdog_waitq);
375                 break;
376
377         case PEER_STATE_ZOMBIE:
378                 /* Schedule for attention at next timeout */
379                 kptllnd_peer_addref(peer);
380                 list_del(&peer->peer_list);
381                 list_add_tail(&peer->peer_list, 
382                               &kptllnd_data.kptl_closing_peers);
383                 break;
384                 
385         case PEER_STATE_CLOSING:
386                 break;
387         }
388
389         peer->peer_state = PEER_STATE_CLOSING;
390 }
391
392 void
393 kptllnd_peer_close(kptl_peer_t *peer, int why)
394 {
395         unsigned long      flags;
396
397         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
398         kptllnd_peer_close_locked(peer, why);
399         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
400 }
401
402 int
403 kptllnd_peer_del(lnet_process_id_t id)
404 {
405         struct list_head  *ptmp;
406         struct list_head  *pnxt;
407         kptl_peer_t       *peer;
408         int                lo;
409         int                hi;
410         int                i;
411         unsigned long      flags;
412         int                rc = -ENOENT;
413
414         /*
415          * Find the single bucket we are supposed to look at or if nid is a
416          * wildcard (LNET_NID_ANY) then look at all of the buckets
417          */
418         if (id.nid != LNET_NID_ANY) {
419                 struct list_head *l = kptllnd_nid2peerlist(id.nid);
420                 
421                 lo = hi =  l - kptllnd_data.kptl_peers;
422         } else {
423                 if (id.pid != LNET_PID_ANY)
424                         return -EINVAL;
425                 
426                 lo = 0;
427                 hi = kptllnd_data.kptl_peer_hash_size - 1;
428         }
429
430 again:
431         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
432
433         for (i = lo; i <= hi; i++) {
434                 list_for_each_safe (ptmp, pnxt, &kptllnd_data.kptl_peers[i]) {
435                         peer = list_entry (ptmp, kptl_peer_t, peer_list);
436
437                         if (!(id.nid == LNET_NID_ANY || 
438                               (peer->peer_id.nid == id.nid &&
439                                (id.pid == LNET_PID_ANY || 
440                                 peer->peer_id.pid == id.pid))))
441                                 continue;
442
443                         kptllnd_peer_addref(peer); /* 1 ref for me... */
444
445                         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
446                                                flags);
447
448                         kptllnd_peer_close(peer, 0);
449                         kptllnd_peer_decref(peer); /* ...until here */
450
451                         rc = 0;         /* matched something */
452
453                         /* start again now I've dropped the lock */
454                         goto again;
455                 }
456         }
457
458         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
459
460         return (rc);
461 }
462
463 void
464 kptllnd_post_tx(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
465 {
466         /* CAVEAT EMPTOR: I take over caller's ref on 'tx' */
467         ptl_handle_md_t  rdma_mdh = PTL_INVALID_HANDLE;
468         ptl_handle_md_t  msg_mdh = PTL_INVALID_HANDLE;
469         ptl_handle_me_t  meh;
470         ptl_md_t         md;
471         ptl_err_t        prc;
472         unsigned long    flags;
473
474         LASSERT (!tx->tx_idle);
475         LASSERT (!tx->tx_active);
476         LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
477         LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
478         LASSERT (tx->tx_type == TX_TYPE_SMALL_MESSAGE ||
479                  tx->tx_type == TX_TYPE_PUT_REQUEST ||
480                  tx->tx_type == TX_TYPE_GET_REQUEST);
481
482         kptllnd_set_tx_peer(tx, peer);
483
484         if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
485             tx->tx_type == TX_TYPE_GET_REQUEST) {
486
487                 spin_lock_irqsave(&peer->peer_lock, flags);
488
489                 /* Assume 64-bit matchbits can't wrap */
490                 LASSERT (peer->peer_next_matchbits >= PTL_RESERVED_MATCHBITS);
491                 tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits =
492                         peer->peer_next_matchbits++;
493                         
494                 spin_unlock_irqrestore(&peer->peer_lock, flags);
495
496                 prc = PtlMEAttach(kptllnd_data.kptl_nih,
497                                   *kptllnd_tunables.kptl_portal,
498                                   peer->peer_ptlid,
499                                   tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits,
500                                   0,             /* ignore bits */
501                                   PTL_UNLINK,
502                                   PTL_INS_BEFORE,
503                                   &meh);
504                 if (prc != PTL_OK) {
505                         CERROR("PtlMEAttach(%s) failed: %d\n",
506                                libcfs_id2str(peer->peer_id), prc);
507                         goto failed;
508                 }
509
510                 prc = PtlMDAttach(meh, tx->tx_rdma_md, PTL_UNLINK, &rdma_mdh);
511                 if (prc != PTL_OK) {
512                         CERROR("PtlMDAttach(%s) failed: %d\n",
513                                libcfs_id2str(tx->tx_peer->peer_id), prc);
514                         prc = PtlMEUnlink(meh);
515                         LASSERT(prc == PTL_OK);
516                         rdma_mdh = PTL_INVALID_HANDLE;
517                         goto failed;
518                 }
519
520                 /* I'm not racing with the event callback here.  It's a bug if
521                  * there's an event on the MD I just attached before I actually
522                  * send the RDMA request message which the event callback
523                  * catches by asserting 'rdma_mdh' is valid. */
524         }
525
526         memset(&md, 0, sizeof(md));
527
528         md.threshold = tx->tx_acked ? 2 : 1;    /* SEND END + ACK? */
529         md.options = PTL_MD_OP_PUT |
530                      PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
531                      PTL_MD_EVENT_START_DISABLE;
532         md.user_ptr = &tx->tx_msg_eventarg;
533         md.eq_handle = kptllnd_data.kptl_eqh;
534
535         if (nfrag == 0) {
536                 md.start = tx->tx_msg;
537                 md.length = tx->tx_msg->ptlm_nob;
538         } else {
539                 LASSERT (nfrag > 1);
540                 LASSERT (tx->tx_frags->iov[0].iov_base == (void *)tx->tx_msg);
541
542                 md.start = tx->tx_frags;
543                 md.length = nfrag;
544                 md.options |= PTL_MD_IOVEC;
545         }
546
547         prc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &msg_mdh);
548         if (prc != PTL_OK) {
549                 msg_mdh = PTL_INVALID_HANDLE;
550                 goto failed;
551         }
552         
553         spin_lock_irqsave(&peer->peer_lock, flags);
554
555         tx->tx_deadline = jiffies + (*kptllnd_tunables.kptl_timeout * HZ);
556         tx->tx_active = 1;
557         tx->tx_rdma_mdh = rdma_mdh;
558         tx->tx_msg_mdh = msg_mdh;
559
560         /* Ensure HELLO is sent first */
561         if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_HELLO)
562                 list_add(&tx->tx_list, &peer->peer_sendq);
563         else
564                 list_add_tail(&tx->tx_list, &peer->peer_sendq);
565
566         spin_unlock_irqrestore(&peer->peer_lock, flags);
567         return;
568         
569  failed:
570         spin_lock_irqsave(&peer->peer_lock, flags);
571
572         tx->tx_status = -EIO;
573         tx->tx_rdma_mdh = rdma_mdh;
574         tx->tx_msg_mdh = msg_mdh;
575
576         spin_unlock_irqrestore(&peer->peer_lock, flags);
577
578         kptllnd_tx_decref(tx);
579 }
580
581 void
582 kptllnd_peer_check_sends (kptl_peer_t *peer)
583 {
584
585         kptl_tx_t       *tx;
586         int              rc;
587         unsigned long    flags;
588
589         LASSERT(!in_interrupt());
590
591         spin_lock_irqsave(&peer->peer_lock, flags);
592
593         if (list_empty(&peer->peer_sendq) &&
594             peer->peer_outstanding_credits >= PTLLND_CREDIT_HIGHWATER &&
595             peer->peer_credits != 0) {
596
597                 /* post a NOOP to return credits */
598                 spin_unlock_irqrestore(&peer->peer_lock, flags);
599
600                 tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
601                 if (tx == NULL) {
602                         CERROR("Can't return credits to %s: can't allocate descriptor\n",
603                                libcfs_id2str(peer->peer_id));
604                 } else {
605                         kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_NOOP, 0);
606                         kptllnd_post_tx(peer, tx, 0);
607                 }
608
609                 spin_lock_irqsave(&peer->peer_lock, flags);
610         }
611
612         while (!list_empty(&peer->peer_sendq)) {
613                 tx = list_entry (peer->peer_sendq.next, kptl_tx_t, tx_list);
614
615                 LASSERT (tx->tx_active);
616                 LASSERT (!PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
617                 LASSERT (tx->tx_type == TX_TYPE_SMALL_MESSAGE ||
618                          !PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
619
620                 LASSERT (peer->peer_outstanding_credits >= 0);
621                 LASSERT (peer->peer_sent_credits >= 0);
622                 LASSERT (peer->peer_sent_credits +
623                          peer->peer_outstanding_credits <=
624                          *kptllnd_tunables.kptl_peercredits);
625                 LASSERT (peer->peer_credits >= 0);
626
627                 /* Ensure HELLO is sent first */
628                 if (!peer->peer_sent_hello) {
629                         if (tx->tx_msg->ptlm_type != PTLLND_MSG_TYPE_HELLO)
630                                 break;
631                         peer->peer_sent_hello = 1;
632                 }
633
634                 if (peer->peer_credits == 0) {
635                         CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: no credits for %p\n",
636                                libcfs_id2str(peer->peer_id), 
637                                peer->peer_credits,
638                                peer->peer_outstanding_credits, 
639                                peer->peer_sent_credits, tx);
640                         break;
641                 }
642
643                 /* Don't use the last credit unless I've got credits to
644                  * return */
645                 if (peer->peer_credits == 1 &&
646                     peer->peer_outstanding_credits == 0) {
647                         CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: "
648                                "not using last credit for %p\n",
649                                libcfs_id2str(peer->peer_id), 
650                                peer->peer_credits,
651                                peer->peer_outstanding_credits,
652                                peer->peer_sent_credits, tx);
653                         break;
654                 }
655
656                 list_del(&tx->tx_list);
657
658                 /* Discard any NOOP I queued if I'm not at the high-water mark
659                  * any more or more messages have been queued */
660                 if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_NOOP &&
661                     (!list_empty(&peer->peer_sendq) ||
662                      peer->peer_outstanding_credits < PTLLND_CREDIT_HIGHWATER)) {
663
664                         tx->tx_active = 0;
665
666                         spin_unlock_irqrestore(&peer->peer_lock, flags);
667
668                         CDEBUG(D_NET, "%s: redundant noop\n", 
669                                libcfs_id2str(peer->peer_id));
670                         kptllnd_tx_decref(tx);
671
672                         spin_lock_irqsave(&peer->peer_lock, flags);
673                         continue;
674                 }
675
676                 /* fill last-minute msg header fields */
677                 kptllnd_msg_pack(tx->tx_msg, peer);
678
679                 peer->peer_sent_credits += peer->peer_outstanding_credits;
680                 peer->peer_outstanding_credits = 0;
681                 peer->peer_credits--;
682
683                 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: %s tx=%p nob=%d cred=%d\n",
684                        libcfs_id2str(peer->peer_id), peer->peer_credits,
685                        peer->peer_outstanding_credits, peer->peer_sent_credits,
686                        kptllnd_msgtype2str(tx->tx_msg->ptlm_type),
687                        tx, tx->tx_msg->ptlm_nob,
688                        tx->tx_msg->ptlm_credits);
689
690                 list_add_tail(&tx->tx_list, &peer->peer_activeq);
691
692                 kptllnd_tx_addref(tx);          /* 1 ref for me... */
693
694                 spin_unlock_irqrestore(&peer->peer_lock, flags);
695
696                 tx->tx_tposted = jiffies;       /* going on the wire */
697
698                 rc = PtlPut (tx->tx_msg_mdh,
699                              tx->tx_acked ? PTL_ACK_REQ : PTL_NOACK_REQ,
700                              peer->peer_ptlid,
701                              *kptllnd_tunables.kptl_portal,
702                              0,                 /* acl cookie */
703                              LNET_MSG_MATCHBITS,
704                              0,                 /* offset */
705                              0);                /* header data */
706                 if (rc != PTL_OK) {
707                         CERROR("PtlPut %s error %d\n",
708                                libcfs_id2str(peer->peer_id), rc);
709
710                         /* Nuke everything (including this tx) */
711                         kptllnd_peer_close(peer, -EIO);
712                         return;
713                 }
714
715                 kptllnd_tx_decref(tx);          /* drop my ref */
716
717                 spin_lock_irqsave(&peer->peer_lock, flags);
718         }
719
720         spin_unlock_irqrestore(&peer->peer_lock, flags);
721 }
722
723 kptl_tx_t *
724 kptllnd_find_timed_out_tx(kptl_peer_t *peer)
725 {
726         kptl_tx_t         *tx;
727         struct list_head  *tmp;
728         unsigned long      flags;
729
730         spin_lock_irqsave(&peer->peer_lock, flags);
731
732         list_for_each(tmp, &peer->peer_sendq) {
733                 tx = list_entry(peer->peer_sendq.next, kptl_tx_t, tx_list);
734
735                 if (time_after_eq(jiffies, tx->tx_deadline)) {
736                         kptllnd_tx_addref(tx);
737                         spin_unlock_irqrestore(&peer->peer_lock, flags);
738                         return tx;
739                 }
740         }
741
742         list_for_each(tmp, &peer->peer_activeq) {
743                 tx = list_entry(peer->peer_activeq.next, kptl_tx_t, tx_list);
744
745                 if (time_after_eq(jiffies, tx->tx_deadline)) {
746                         kptllnd_tx_addref(tx);
747                         spin_unlock_irqrestore(&peer->peer_lock, flags);
748                         return tx;
749                 }
750         }
751
752         spin_unlock_irqrestore(&peer->peer_lock, flags);
753         return NULL;
754 }
755
756
757 void
758 kptllnd_peer_check_bucket (int idx)
759 {
760         struct list_head  *peers = &kptllnd_data.kptl_peers[idx];
761         struct list_head  *ptmp;
762         kptl_peer_t       *peer;
763         kptl_tx_t         *tx;
764         unsigned long      flags;
765         int                nsend;
766         int                nactive;
767
768         CDEBUG(D_NET, "Bucket=%d\n", idx);
769
770  again:
771         /* NB. Shared lock while I just look */
772         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
773
774         list_for_each (ptmp, peers) {
775                 peer = list_entry (ptmp, kptl_peer_t, peer_list);
776
777                 CDEBUG(D_NET, "Peer=%s Credits=%d Outstanding=%d Send=%d\n",
778                        libcfs_id2str(peer->peer_id), peer->peer_credits, 
779                        peer->peer_outstanding_credits, peer->peer_sent_credits);
780
781                 /* In case we have enough credits to return via a
782                  * NOOP, but there were no non-blocking tx descs
783                  * free to do it last time... */
784                 kptllnd_peer_check_sends(peer);
785
786                 tx = kptllnd_find_timed_out_tx(peer);
787                 if (tx == NULL)
788                         continue;
789
790                 kptllnd_peer_addref(peer); /* 1 ref for me... */
791
792                 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
793                                        flags);
794
795                 spin_lock_irqsave(&peer->peer_lock, flags);
796                 nsend = kptllnd_count_queue(&peer->peer_sendq);
797                 nactive = kptllnd_count_queue(&peer->peer_activeq);
798                 spin_unlock_irqrestore(&peer->peer_lock, flags);
799
800                 LCONSOLE_ERROR("Timing out %s: %s\n",
801                                libcfs_id2str(peer->peer_id),
802                                (tx->tx_tposted == 0) ? 
803                                "no free peer buffers" : "please check Portals");
804
805                 CERROR("%s timed out: cred %d outstanding %d, sent %d, "
806                        "sendq %d, activeq %d Tx %p %s (%s%s%s) status %d "
807                        "%sposted %lu T/O %ds\n",
808                        libcfs_id2str(peer->peer_id), peer->peer_credits,
809                        peer->peer_outstanding_credits, peer->peer_sent_credits,
810                        nsend, nactive, tx, kptllnd_tx_typestr(tx->tx_type),
811                        tx->tx_active ? "A" : "",
812                        PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ?
813                        "" : "M",
814                        PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE) ?
815                        "" : "D",
816                        tx->tx_status,
817                        (tx->tx_tposted == 0) ? "not " : "",
818                        (tx->tx_tposted == 0) ? 0UL : (jiffies - tx->tx_tposted),
819                        *kptllnd_tunables.kptl_timeout);
820
821                 kptllnd_dump_ptltrace();
822
823                 kptllnd_tx_decref(tx);
824
825                 kptllnd_peer_close(peer, -ETIMEDOUT);
826                 kptllnd_peer_decref(peer); /* ...until here */
827
828                 /* start again now I've dropped the lock */
829                 goto again;
830         }
831
832         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
833 }
834
835 kptl_peer_t *
836 kptllnd_id2peer_locked (lnet_process_id_t id)
837 {
838         struct list_head *peers = kptllnd_nid2peerlist(id.nid);
839         struct list_head *tmp;
840         kptl_peer_t      *peer;
841
842         list_for_each (tmp, peers) {
843
844                 peer = list_entry (tmp, kptl_peer_t, peer_list);
845
846                 LASSERT(peer->peer_state == PEER_STATE_WAITING_HELLO ||
847                         peer->peer_state == PEER_STATE_ACTIVE);
848                 
849                 if (peer->peer_id.nid != id.nid ||
850                     peer->peer_id.pid != id.pid)
851                         continue;
852
853                 kptllnd_peer_addref(peer);
854
855                 CDEBUG(D_NET, "%s -> %s (%d)\n",
856                        libcfs_id2str(id), 
857                        kptllnd_ptlid2str(peer->peer_ptlid),
858                        atomic_read (&peer->peer_refcount));
859                 return peer;
860         }
861
862         return NULL;
863 }
864
865 void
866 kptllnd_peertable_overflow_msg(char *str, lnet_process_id_t id)
867 {
868         LCONSOLE_ERROR("%s %s overflows the peer table[%d]: "
869                        "messages may be dropped\n",
870                        str, libcfs_id2str(id),
871                        kptllnd_data.kptl_n_active_peers);
872         LCONSOLE_ERROR("Please correct by increasing "
873                        "'max_nodes' or 'max_procs_per_node'\n");
874 }
875
876 __u64
877 kptllnd_get_last_seen_matchbits_locked(lnet_process_id_t lpid)
878 {
879         kptl_peer_t            *peer;
880         struct list_head       *tmp;
881
882         /* Find the last matchbits I saw this new peer using.  Note..
883            A. This peer cannot be in the peer table - she's new!
884            B. If I can't find the peer in the closing/zombie peers, all
885               matchbits are safe because all refs to the (old) peer have gone
886               so all txs have completed so there's no risk of matchbit
887               collision!
888          */
889
890         LASSERT(kptllnd_id2peer_locked(lpid) == NULL);
891
892         /* peer's last matchbits can't change after it comes out of the peer
893          * table, so first match is fine */
894
895         list_for_each (tmp, &kptllnd_data.kptl_closing_peers) {
896                 peer = list_entry (tmp, kptl_peer_t, peer_list);
897
898                 if (peer->peer_id.nid == lpid.nid &&
899                     peer->peer_id.pid == lpid.pid)
900                         return peer->peer_last_matchbits_seen;
901         }
902         
903         list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
904                 peer = list_entry (tmp, kptl_peer_t, peer_list);
905
906                 if (peer->peer_id.nid == lpid.nid &&
907                     peer->peer_id.pid == lpid.pid)
908                         return peer->peer_last_matchbits_seen;
909         }
910         
911         return PTL_RESERVED_MATCHBITS;
912 }
913
914 kptl_peer_t *
915 kptllnd_peer_handle_hello (ptl_process_id_t  initiator,
916                            kptl_msg_t       *msg)
917 {
918         rwlock_t           *g_lock = &kptllnd_data.kptl_peer_rw_lock;
919         kptl_peer_t        *peer;
920         kptl_peer_t        *new_peer;
921         lnet_process_id_t   lpid;
922         unsigned long       flags;
923         kptl_tx_t          *hello_tx;
924         int                 rc;
925         __u64               safe_matchbits;
926         __u64               last_matchbits_seen;
927
928         lpid.nid = msg->ptlm_srcnid;
929         lpid.pid = msg->ptlm_srcpid;
930
931         CDEBUG(D_NET, "hello from %s(%s)\n",
932                libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
933
934         if (initiator.pid != kptllnd_data.kptl_portals_id.pid &&
935             (msg->ptlm_srcpid & LNET_PID_USERFLAG) == 0) {
936                 /* If the peer's PID isn't _the_ ptllnd kernel pid, she must be
937                  * userspace.  Refuse the connection if she hasn't set the
938                  * correct flag in her PID... */
939                 CERROR("Userflag not set in hello from %s (%s)\n",
940                        libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
941                 return NULL;
942         }
943         
944         /* kptlhm_matchbits are the highest matchbits my peer may have used to
945          * RDMA to me.  I ensure I never register buffers for RDMA that could
946          * match any she used */
947         safe_matchbits = msg->ptlm_u.hello.kptlhm_matchbits + 1;
948
949         if (safe_matchbits < PTL_RESERVED_MATCHBITS) {
950                 CERROR("Illegal matchbits "LPX64" in HELLO from %s\n",
951                        safe_matchbits, libcfs_id2str(lpid));
952                 return NULL;
953         }
954         
955         if (msg->ptlm_u.hello.kptlhm_max_msg_size < PTLLND_MIN_BUFFER_SIZE) {
956                 CERROR("%s: max message size %d < MIN %d",
957                        libcfs_id2str(lpid),
958                        msg->ptlm_u.hello.kptlhm_max_msg_size,
959                        *kptllnd_tunables.kptl_max_msg_size);
960                 return NULL;
961         }
962
963         if (msg->ptlm_credits <= 1) {
964                 CERROR("Need more than 1+%d credits from %s\n",
965                        msg->ptlm_credits, libcfs_id2str(lpid));
966                 return NULL;
967         }
968         
969         write_lock_irqsave(g_lock, flags);
970
971         peer = kptllnd_id2peer_locked(lpid);
972         if (peer != NULL) {
973                 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
974                         /* Completing HELLO handshake */
975                         LASSERT(peer->peer_incarnation == 0);
976
977                         if (msg->ptlm_dststamp != 0 &&
978                             msg->ptlm_dststamp != peer->peer_myincarnation) {
979                                 write_unlock_irqrestore(g_lock, flags);
980
981                                 CERROR("Ignoring HELLO from %s: unexpected "
982                                        "dststamp "LPX64" ("LPX64" wanted)\n",
983                                        libcfs_id2str(lpid),
984                                        msg->ptlm_dststamp,
985                                        peer->peer_myincarnation);
986                                 kptllnd_peer_decref(peer);
987                                 return NULL;
988                         }
989                         
990                         /* Concurrent initiation or response to my HELLO */
991                         peer->peer_state = PEER_STATE_ACTIVE;
992                         peer->peer_incarnation = msg->ptlm_srcstamp;
993                         peer->peer_next_matchbits = safe_matchbits;
994                         peer->peer_max_msg_size =
995                                 msg->ptlm_u.hello.kptlhm_max_msg_size;
996                         
997                         write_unlock_irqrestore(g_lock, flags);
998                         return peer;
999                 }
1000
1001                 if (msg->ptlm_dststamp != 0 &&
1002                     msg->ptlm_dststamp <= peer->peer_myincarnation) {
1003                         write_unlock_irqrestore(g_lock, flags);
1004
1005                         CERROR("Ignoring stale HELLO from %s: "
1006                                "dststamp "LPX64" (current "LPX64")\n",
1007                                libcfs_id2str(lpid),
1008                                msg->ptlm_dststamp,
1009                                peer->peer_myincarnation);
1010                         kptllnd_peer_decref(peer);
1011                         return NULL;
1012                 }
1013
1014                 /* Brand new connection attempt: remove old incarnation */
1015                 kptllnd_peer_close_locked(peer, 0);
1016         }
1017
1018         kptllnd_cull_peertable_locked(lpid);
1019
1020         write_unlock_irqrestore(g_lock, flags);
1021
1022         if (peer != NULL) {
1023                 CDEBUG(D_NET, "Peer %s (%s) reconnecting:"
1024                        " stamp "LPX64"("LPX64")\n",
1025                        libcfs_id2str(lpid), kptllnd_ptlid2str(initiator),
1026                        msg->ptlm_srcstamp, peer->peer_incarnation);
1027
1028                 kptllnd_peer_decref(peer);
1029         }
1030
1031         hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1032         if (hello_tx == NULL) {
1033                 CERROR("Unable to allocate HELLO message for %s\n",
1034                        libcfs_id2str(lpid));
1035                 return NULL;
1036         }
1037
1038         kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1039                          sizeof(kptl_hello_msg_t));
1040
1041         new_peer = kptllnd_peer_allocate(lpid, initiator);
1042         if (new_peer == NULL) {
1043                 kptllnd_tx_decref(hello_tx);
1044                 return NULL;
1045         }
1046
1047         rc = kptllnd_peer_reserve_buffers();
1048         if (rc != 0) {
1049                 kptllnd_peer_decref(new_peer);
1050                 kptllnd_tx_decref(hello_tx);
1051
1052                 CERROR("Failed to reserve buffers for %s\n",
1053                        libcfs_id2str(lpid));
1054                 return NULL;
1055         }
1056
1057         write_lock_irqsave(g_lock, flags);
1058  again:
1059         peer = kptllnd_id2peer_locked(lpid);
1060         if (peer != NULL) {
1061                 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
1062                         /* An outgoing message instantiated 'peer' for me */
1063                         CWARN("Outgoing instantiated peer %s\n", libcfs_id2str(lpid));
1064                         LASSERT(peer->peer_incarnation == 0);
1065
1066                         peer->peer_state = PEER_STATE_ACTIVE;
1067                         peer->peer_incarnation = msg->ptlm_srcstamp;
1068                         peer->peer_next_matchbits = safe_matchbits;
1069                         peer->peer_max_msg_size =
1070                                 msg->ptlm_u.hello.kptlhm_max_msg_size;
1071                 } else {
1072                         LASSERT (peer->peer_state == PEER_STATE_ACTIVE);
1073                         /* WOW!  Somehow this peer completed the HELLO
1074                          * handshake while I slept.  I guess I could have slept
1075                          * while it rebooted and sent a new HELLO, so I'll fail
1076                          * this one... */
1077                         CWARN("Wow! peer %s\n", libcfs_id2str(lpid));
1078                         kptllnd_peer_decref(peer);
1079                         peer = NULL;
1080                 }
1081                 
1082                 write_unlock_irqrestore(g_lock, flags);
1083
1084                 kptllnd_peer_unreserve_buffers();
1085                 kptllnd_peer_decref(new_peer);
1086                 kptllnd_tx_decref(hello_tx);
1087                 return peer;
1088         }
1089
1090         if (kptllnd_data.kptl_n_active_peers ==
1091             kptllnd_data.kptl_expected_peers) {
1092                 /* peer table full */
1093                 write_unlock_irqrestore(g_lock, flags);
1094
1095                 kptllnd_peertable_overflow_msg("Connection from ", lpid);
1096
1097                 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1098                 if (rc != 0) {
1099                         CERROR("Refusing connection from %s\n",
1100                                libcfs_id2str(lpid));
1101                         kptllnd_peer_unreserve_buffers();
1102                         kptllnd_peer_decref(new_peer);
1103                         kptllnd_tx_decref(hello_tx);
1104                         return NULL;
1105                 }
1106                 
1107                 write_lock_irqsave(g_lock, flags);
1108                 kptllnd_data.kptl_expected_peers++;
1109                 goto again;
1110         }
1111
1112         last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(lpid);
1113
1114         hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1115         hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1116                 *kptllnd_tunables.kptl_max_msg_size;
1117
1118         new_peer->peer_state = PEER_STATE_ACTIVE;
1119         new_peer->peer_incarnation = msg->ptlm_srcstamp;
1120         new_peer->peer_next_matchbits = safe_matchbits;
1121         new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1122         new_peer->peer_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size;
1123
1124         kptllnd_peer_add_peertable_locked(new_peer);
1125
1126         write_unlock_irqrestore(g_lock, flags);
1127
1128         /* NB someone else could get in now and post a message before I post
1129          * the HELLO, but post_tx/check_sends take care of that! */
1130
1131         CDEBUG(D_NETTRACE, "%s: post response hello %p\n",
1132                libcfs_id2str(new_peer->peer_id), hello_tx);
1133
1134         kptllnd_post_tx(new_peer, hello_tx, 0);
1135         kptllnd_peer_check_sends(new_peer);
1136
1137         return new_peer;
1138 }
1139
1140 void
1141 kptllnd_tx_launch(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
1142 {
1143         kptllnd_post_tx(peer, tx, nfrag);
1144         kptllnd_peer_check_sends(peer);
1145 }
1146
1147 int
1148 kptllnd_find_target(kptl_peer_t **peerp, lnet_process_id_t target)
1149 {
1150         rwlock_t         *g_lock = &kptllnd_data.kptl_peer_rw_lock;
1151         ptl_process_id_t  ptl_id;
1152         kptl_peer_t      *new_peer;
1153         kptl_tx_t        *hello_tx;
1154         unsigned long     flags;
1155         int               rc;
1156         __u64             last_matchbits_seen;
1157
1158         /* I expect to find the peer, so I only take a read lock... */
1159         read_lock_irqsave(g_lock, flags);
1160         *peerp = kptllnd_id2peer_locked(target);
1161         read_unlock_irqrestore(g_lock, flags);
1162
1163         if (*peerp != NULL)
1164                 return 0;
1165         
1166         if ((target.pid & LNET_PID_USERFLAG) != 0) {
1167                 CWARN("Refusing to create a new connection to %s "
1168                       "(non-kernel peer)\n", libcfs_id2str(target));
1169                 return -EHOSTUNREACH;
1170         }
1171
1172         /* The new peer is a kernel ptllnd, and kernel ptllnds all have
1173          * the same portals PID */
1174         ptl_id.nid = kptllnd_lnet2ptlnid(target.nid);
1175         ptl_id.pid = kptllnd_data.kptl_portals_id.pid;
1176
1177         hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1178         if (hello_tx == NULL) {
1179                 CERROR("Unable to allocate connect message for %s\n",
1180                        libcfs_id2str(target));
1181                 return -ENOMEM;
1182         }
1183
1184         kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1185                          sizeof(kptl_hello_msg_t));
1186
1187         new_peer = kptllnd_peer_allocate(target, ptl_id);
1188         if (new_peer == NULL) {
1189                 rc = -ENOMEM;
1190                 goto unwind_0;
1191         }
1192
1193         rc = kptllnd_peer_reserve_buffers();
1194         if (rc != 0)
1195                 goto unwind_1;
1196
1197         write_lock_irqsave(g_lock, flags);
1198  again:
1199         *peerp = kptllnd_id2peer_locked(target);
1200         if (*peerp != NULL) {
1201                 write_unlock_irqrestore(g_lock, flags);
1202                 goto unwind_2;
1203         }
1204
1205         kptllnd_cull_peertable_locked(target);
1206
1207         if (kptllnd_data.kptl_n_active_peers ==
1208             kptllnd_data.kptl_expected_peers) {
1209                 /* peer table full */
1210                 write_unlock_irqrestore(g_lock, flags);
1211
1212                 kptllnd_peertable_overflow_msg("Connection to ", target);
1213
1214                 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1215                 if (rc != 0) {
1216                         CERROR("Can't create connection to %s\n",
1217                                libcfs_id2str(target));
1218                         rc = -ENOMEM;
1219                         goto unwind_2;
1220                 }
1221                 write_lock_irqsave(g_lock, flags);
1222                 kptllnd_data.kptl_expected_peers++;
1223                 goto again;
1224         }
1225
1226         last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(target);
1227
1228         hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1229         hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1230                 *kptllnd_tunables.kptl_max_msg_size;
1231                 
1232         new_peer->peer_state = PEER_STATE_WAITING_HELLO;
1233         new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1234         
1235         kptllnd_peer_add_peertable_locked(new_peer);
1236
1237         write_unlock_irqrestore(g_lock, flags);
1238
1239         /* NB someone else could get in now and post a message before I post
1240          * the HELLO, but post_tx/check_sends take care of that! */
1241
1242         CDEBUG(D_NETTRACE, "%s: post initial hello %p\n",
1243                libcfs_id2str(new_peer->peer_id), hello_tx);
1244
1245         kptllnd_post_tx(new_peer, hello_tx, 0);
1246         kptllnd_peer_check_sends(new_peer);
1247        
1248         *peerp = new_peer;
1249         return 0;
1250         
1251  unwind_2:
1252         kptllnd_peer_unreserve_buffers();
1253  unwind_1:
1254         kptllnd_peer_decref(new_peer);
1255  unwind_0:
1256         kptllnd_tx_decref(hello_tx);
1257
1258         return rc;
1259 }