Whamcloud - gitweb
b=16098
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd_peer.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/klnds/ptllnd/ptllnd_peer.c
37  *
38  * Author: PJ Kirner <pjkirner@clusterfs.com>
39  * Author: E Barton <eeb@bartonsoftware.com>
40  */
41
42 #include "ptllnd.h"
43 #include <libcfs/list.h>
44
45 static int
46 kptllnd_count_queue(struct list_head *q)
47 {
48         struct list_head *e;
49         int               n = 0;
50         
51         list_for_each(e, q) {
52                 n++;
53         }
54
55         return n;
56 }
57
58 int
59 kptllnd_get_peer_info(int index, 
60                       lnet_process_id_t *id,
61                       int *state, int *sent_hello,
62                       int *refcount, __u64 *incarnation,
63                       __u64 *next_matchbits, __u64 *last_matchbits_seen,
64                       int *nsendq, int *nactiveq,
65                       int *credits, int *outstanding_credits) 
66 {
67         rwlock_t         *g_lock = &kptllnd_data.kptl_peer_rw_lock;
68         unsigned long     flags;
69         struct list_head *ptmp;
70         kptl_peer_t      *peer;
71         int               i;
72         int               rc = -ENOENT;
73
74         read_lock_irqsave(g_lock, flags);
75
76         for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++) {
77                 
78                 list_for_each (ptmp, &kptllnd_data.kptl_peers[i]) {
79                         peer = list_entry(ptmp, kptl_peer_t, peer_list);
80
81                         if (index-- > 0)
82                                 continue;
83                         
84                         *id          = peer->peer_id;
85                         *state       = peer->peer_state;
86                         *sent_hello  = peer->peer_sent_hello;
87                         *refcount    = atomic_read(&peer->peer_refcount);
88                         *incarnation = peer->peer_incarnation;
89
90                         spin_lock(&peer->peer_lock);
91
92                         *next_matchbits      = peer->peer_next_matchbits;
93                         *last_matchbits_seen = peer->peer_last_matchbits_seen;
94                         *credits             = peer->peer_credits;
95                         *outstanding_credits = peer->peer_outstanding_credits;
96
97                         *nsendq   = kptllnd_count_queue(&peer->peer_sendq);
98                         *nactiveq = kptllnd_count_queue(&peer->peer_activeq);
99
100                         spin_unlock(&peer->peer_lock);
101
102                         rc = 0;
103                         goto out;
104                 }
105         }
106         
107  out:
108         read_unlock_irqrestore(g_lock, flags);
109         return rc;
110 }
111
112 void
113 kptllnd_peer_add_peertable_locked (kptl_peer_t *peer)
114 {
115         LASSERT (!kptllnd_data.kptl_shutdown);
116         LASSERT (kptllnd_data.kptl_n_active_peers <
117                  kptllnd_data.kptl_expected_peers);
118
119         LASSERT (peer->peer_state == PEER_STATE_WAITING_HELLO ||
120                  peer->peer_state == PEER_STATE_ACTIVE);
121         
122         kptllnd_data.kptl_n_active_peers++;
123         atomic_inc(&peer->peer_refcount);       /* +1 ref for the list */
124
125         /* NB add to HEAD of peer list for MRU order!
126          * (see kptllnd_cull_peertable) */
127         list_add(&peer->peer_list, kptllnd_nid2peerlist(peer->peer_id.nid));
128 }
129
130 void
131 kptllnd_cull_peertable_locked (lnet_process_id_t pid)
132 {
133         /* I'm about to add a new peer with this portals ID to the peer table,
134          * so (a) this peer should not exist already and (b) I want to leave at
135          * most (max_procs_per_nid - 1) peers with this NID in the table. */
136         struct list_head  *peers = kptllnd_nid2peerlist(pid.nid);
137         int                cull_count = *kptllnd_tunables.kptl_max_procs_per_node;
138         int                count;
139         struct list_head  *tmp;
140         struct list_head  *nxt;
141         kptl_peer_t       *peer;
142         
143         count = 0;
144         list_for_each_safe (tmp, nxt, peers) {
145                 /* NB I rely on kptllnd_peer_add_peertable_locked to add peers
146                  * in MRU order */
147                 peer = list_entry(tmp, kptl_peer_t, peer_list);
148                         
149                 if (peer->peer_id.nid != pid.nid)
150                         continue;
151
152                 LASSERT (peer->peer_id.pid != pid.pid);
153                         
154                 count++;
155
156                 if (count < cull_count) /* recent (don't cull) */
157                         continue;
158
159                 CDEBUG(D_NET, "Cull %s(%s)\n",
160                        libcfs_id2str(peer->peer_id),
161                        kptllnd_ptlid2str(peer->peer_ptlid));
162                 
163                 kptllnd_peer_close_locked(peer, 0);
164         }
165 }
166
167 kptl_peer_t *
168 kptllnd_peer_allocate (lnet_process_id_t lpid, ptl_process_id_t ppid)
169 {
170         unsigned long    flags;
171         kptl_peer_t     *peer;
172
173         LIBCFS_ALLOC(peer, sizeof (*peer));
174         if (peer == NULL) {
175                 CERROR("Can't create peer %s (%s)\n",
176                        libcfs_id2str(lpid), 
177                        kptllnd_ptlid2str(ppid));
178                 return NULL;
179         }
180
181         memset(peer, 0, sizeof(*peer));         /* zero flags etc */
182
183         INIT_LIST_HEAD (&peer->peer_noops);
184         INIT_LIST_HEAD (&peer->peer_sendq);
185         INIT_LIST_HEAD (&peer->peer_activeq);
186         spin_lock_init (&peer->peer_lock);
187
188         peer->peer_state = PEER_STATE_ALLOCATED;
189         peer->peer_error = 0;
190         peer->peer_last_alive = cfs_time_current();
191         peer->peer_id = lpid;
192         peer->peer_ptlid = ppid;
193         peer->peer_credits = 1;                 /* enough for HELLO */
194         peer->peer_next_matchbits = PTL_RESERVED_MATCHBITS;
195         peer->peer_outstanding_credits = *kptllnd_tunables.kptl_peercredits - 1;
196         peer->peer_sent_credits = 1;           /* HELLO credit is implicit */
197         peer->peer_max_msg_size = PTLLND_MIN_BUFFER_SIZE; /* until we know better */
198
199         atomic_set(&peer->peer_refcount, 1);    /* 1 ref for caller */
200
201         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
202
203         peer->peer_myincarnation = kptllnd_data.kptl_incarnation;
204
205         /* Only increase # peers under lock, to guarantee we dont grow it
206          * during shutdown */
207         if (kptllnd_data.kptl_shutdown) {
208                 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, 
209                                         flags);
210                 LIBCFS_FREE(peer, sizeof(*peer));
211                 return NULL;
212         }
213
214         kptllnd_data.kptl_npeers++;
215         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
216         
217         return peer;
218 }
219
220 void
221 kptllnd_peer_destroy (kptl_peer_t *peer)
222 {
223         unsigned long flags;
224         
225         CDEBUG(D_NET, "Peer=%p\n", peer);
226
227         LASSERT (!in_interrupt());
228         LASSERT (atomic_read(&peer->peer_refcount) == 0);
229         LASSERT (peer->peer_state == PEER_STATE_ALLOCATED ||
230                  peer->peer_state == PEER_STATE_ZOMBIE);
231         LASSERT (list_empty(&peer->peer_noops));
232         LASSERT (list_empty(&peer->peer_sendq));
233         LASSERT (list_empty(&peer->peer_activeq));
234
235         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
236
237         if (peer->peer_state == PEER_STATE_ZOMBIE)
238                 list_del(&peer->peer_list);
239
240         kptllnd_data.kptl_npeers--;
241
242         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
243
244         LIBCFS_FREE (peer, sizeof (*peer));
245 }
246
247 void
248 kptllnd_cancel_txlist (struct list_head *peerq, struct list_head *txs)
249 {
250         struct list_head  *tmp;
251         struct list_head  *nxt;
252         kptl_tx_t         *tx;
253
254         list_for_each_safe (tmp, nxt, peerq) {
255                 tx = list_entry(tmp, kptl_tx_t, tx_list);
256
257                 list_del(&tx->tx_list);
258                 list_add_tail(&tx->tx_list, txs);
259
260                 tx->tx_status = -EIO;
261                 tx->tx_active = 0;
262         }
263 }
264
265 void
266 kptllnd_peer_cancel_txs(kptl_peer_t *peer, struct list_head *txs)
267 {
268         unsigned long   flags;
269
270         spin_lock_irqsave(&peer->peer_lock, flags);
271
272         kptllnd_cancel_txlist(&peer->peer_noops, txs);
273         kptllnd_cancel_txlist(&peer->peer_sendq, txs);
274         kptllnd_cancel_txlist(&peer->peer_activeq, txs);
275                 
276         spin_unlock_irqrestore(&peer->peer_lock, flags);
277 }
278
279 void
280 kptllnd_peer_alive (kptl_peer_t *peer)
281 {
282         /* This is racy, but everyone's only writing cfs_time_current() */
283         peer->peer_last_alive = cfs_time_current();
284         mb();
285 }
286
287 void
288 kptllnd_peer_notify (kptl_peer_t *peer)
289 {
290         unsigned long flags;
291         time_t        last_alive = 0;
292         int           error = 0;
293         
294         spin_lock_irqsave(&peer->peer_lock, flags);
295
296         if (peer->peer_error != 0) {
297                 error = peer->peer_error;
298                 peer->peer_error = 0;
299                 
300                 last_alive = cfs_time_current_sec() - 
301                              cfs_duration_sec(cfs_time_current() - 
302                                               peer->peer_last_alive);
303         }
304         
305         spin_unlock_irqrestore(&peer->peer_lock, flags);
306
307         if (error != 0)
308                 lnet_notify (kptllnd_data.kptl_ni, peer->peer_id.nid, 0,
309                              last_alive);
310 }
311
312 void
313 kptllnd_handle_closing_peers ()
314 {
315         unsigned long           flags;
316         struct list_head        txs;
317         kptl_peer_t            *peer;
318         struct list_head       *tmp;
319         struct list_head       *nxt;
320         kptl_tx_t              *tx;
321         int                     idle;
322
323         /* Check with a read lock first to avoid blocking anyone */
324
325         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
326         idle = list_empty(&kptllnd_data.kptl_closing_peers) &&
327                list_empty(&kptllnd_data.kptl_zombie_peers);
328         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
329
330         if (idle)
331                 return;
332
333         INIT_LIST_HEAD(&txs);
334
335         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
336
337         /* Cancel txs on all zombie peers.  NB anyone dropping the last peer
338          * ref removes it from this list, so I musn't drop the lock while
339          * scanning it. */
340         list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
341                 peer = list_entry (tmp, kptl_peer_t, peer_list);
342
343                 LASSERT (peer->peer_state == PEER_STATE_ZOMBIE);
344
345                 kptllnd_peer_cancel_txs(peer, &txs);
346         }
347
348         /* Notify LNET and cancel txs on closing (i.e. newly closed) peers.  NB
349          * I'm the only one removing from this list, but peers can be added on
350          * the end any time I drop the lock. */
351
352         list_for_each_safe (tmp, nxt, &kptllnd_data.kptl_closing_peers) {
353                 peer = list_entry (tmp, kptl_peer_t, peer_list);
354
355                 LASSERT (peer->peer_state == PEER_STATE_CLOSING);
356
357                 list_del(&peer->peer_list);
358                 list_add_tail(&peer->peer_list,
359                               &kptllnd_data.kptl_zombie_peers);
360                 peer->peer_state = PEER_STATE_ZOMBIE;
361
362                 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
363
364                 kptllnd_peer_notify(peer);
365                 kptllnd_peer_cancel_txs(peer, &txs);
366                 kptllnd_peer_decref(peer);
367
368                 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
369         }
370
371         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
372
373         /* Drop peer's ref on all cancelled txs.  This will get
374          * kptllnd_tx_fini() to abort outstanding comms if necessary. */
375
376         list_for_each_safe (tmp, nxt, &txs) {
377                 tx = list_entry(tmp, kptl_tx_t, tx_list);
378                 list_del(&tx->tx_list);
379                 kptllnd_tx_decref(tx);
380         }
381 }
382
383 void
384 kptllnd_peer_close_locked(kptl_peer_t *peer, int why)
385 {
386         switch (peer->peer_state) {
387         default:
388                 LBUG();
389
390         case PEER_STATE_WAITING_HELLO:
391         case PEER_STATE_ACTIVE:
392                 /* Ensure new peers see a new incarnation of me */
393                 LASSERT(peer->peer_myincarnation <= kptllnd_data.kptl_incarnation);
394                 if (peer->peer_myincarnation == kptllnd_data.kptl_incarnation)
395                         kptllnd_data.kptl_incarnation++;
396
397                 /* Removing from peer table */
398                 kptllnd_data.kptl_n_active_peers--;
399                 LASSERT (kptllnd_data.kptl_n_active_peers >= 0);
400
401                 list_del(&peer->peer_list);
402                 kptllnd_peer_unreserve_buffers();
403
404                 peer->peer_error = why; /* stash 'why' only on first close */
405                 peer->peer_state = PEER_STATE_CLOSING;
406
407                 /* Schedule for immediate attention, taking peer table's ref */
408                 list_add_tail(&peer->peer_list, 
409                               &kptllnd_data.kptl_closing_peers);
410                 wake_up(&kptllnd_data.kptl_watchdog_waitq);
411                 break;
412
413         case PEER_STATE_ZOMBIE:
414         case PEER_STATE_CLOSING:
415                 break;
416         }
417 }
418
419 void
420 kptllnd_peer_close(kptl_peer_t *peer, int why)
421 {
422         unsigned long      flags;
423
424         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
425         kptllnd_peer_close_locked(peer, why);
426         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
427 }
428
429 int
430 kptllnd_peer_del(lnet_process_id_t id)
431 {
432         struct list_head  *ptmp;
433         struct list_head  *pnxt;
434         kptl_peer_t       *peer;
435         int                lo;
436         int                hi;
437         int                i;
438         unsigned long      flags;
439         int                rc = -ENOENT;
440
441         /*
442          * Find the single bucket we are supposed to look at or if nid is a
443          * wildcard (LNET_NID_ANY) then look at all of the buckets
444          */
445         if (id.nid != LNET_NID_ANY) {
446                 struct list_head *l = kptllnd_nid2peerlist(id.nid);
447                 
448                 lo = hi =  l - kptllnd_data.kptl_peers;
449         } else {
450                 if (id.pid != LNET_PID_ANY)
451                         return -EINVAL;
452                 
453                 lo = 0;
454                 hi = kptllnd_data.kptl_peer_hash_size - 1;
455         }
456
457 again:
458         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
459
460         for (i = lo; i <= hi; i++) {
461                 list_for_each_safe (ptmp, pnxt, &kptllnd_data.kptl_peers[i]) {
462                         peer = list_entry (ptmp, kptl_peer_t, peer_list);
463
464                         if (!(id.nid == LNET_NID_ANY || 
465                               (peer->peer_id.nid == id.nid &&
466                                (id.pid == LNET_PID_ANY || 
467                                 peer->peer_id.pid == id.pid))))
468                                 continue;
469
470                         kptllnd_peer_addref(peer); /* 1 ref for me... */
471
472                         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
473                                                flags);
474
475                         kptllnd_peer_close(peer, 0);
476                         kptllnd_peer_decref(peer); /* ...until here */
477
478                         rc = 0;         /* matched something */
479
480                         /* start again now I've dropped the lock */
481                         goto again;
482                 }
483         }
484
485         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
486
487         return (rc);
488 }
489
490 void
491 kptllnd_post_tx(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
492 {
493         /* CAVEAT EMPTOR: I take over caller's ref on 'tx' */
494         ptl_handle_md_t  msg_mdh;
495         ptl_md_t         md;
496         ptl_err_t        prc;
497         unsigned long    flags;
498
499         LASSERT (!tx->tx_idle);
500         LASSERT (!tx->tx_active);
501         LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
502         LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
503         LASSERT (tx->tx_type == TX_TYPE_SMALL_MESSAGE ||
504                  tx->tx_type == TX_TYPE_PUT_REQUEST ||
505                  tx->tx_type == TX_TYPE_GET_REQUEST);
506
507         kptllnd_set_tx_peer(tx, peer);
508
509         memset(&md, 0, sizeof(md));
510
511         md.threshold = tx->tx_acked ? 2 : 1;    /* SEND END + ACK? */
512         md.options = PTL_MD_OP_PUT |
513                      PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
514                      PTL_MD_EVENT_START_DISABLE;
515         md.user_ptr = &tx->tx_msg_eventarg;
516         md.eq_handle = kptllnd_data.kptl_eqh;
517
518         if (nfrag == 0) {
519                 md.start = tx->tx_msg;
520                 md.length = tx->tx_msg->ptlm_nob;
521         } else {
522                 LASSERT (nfrag > 1);
523                 LASSERT (tx->tx_frags->iov[0].iov_base == (void *)tx->tx_msg);
524
525                 md.start = tx->tx_frags;
526                 md.length = nfrag;
527                 md.options |= PTL_MD_IOVEC;
528         }
529
530         prc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &msg_mdh);
531         if (prc != PTL_OK) {
532                 CERROR("PtlMDBind(%s) failed: %s(%d)\n",
533                        libcfs_id2str(peer->peer_id),
534                        kptllnd_errtype2str(prc), prc);
535                 tx->tx_status = -EIO;
536                 kptllnd_tx_decref(tx);
537                 return;
538         }
539
540         spin_lock_irqsave(&peer->peer_lock, flags);
541
542         tx->tx_deadline = jiffies + (*kptllnd_tunables.kptl_timeout * HZ);
543         tx->tx_active = 1;
544         tx->tx_msg_mdh = msg_mdh;
545
546         /* Ensure HELLO is sent first */
547         if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_NOOP)
548                 list_add(&tx->tx_list, &peer->peer_noops);
549         else if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_HELLO)
550                 list_add(&tx->tx_list, &peer->peer_sendq);
551         else
552                 list_add_tail(&tx->tx_list, &peer->peer_sendq);
553
554         spin_unlock_irqrestore(&peer->peer_lock, flags);
555 }
556
557 static inline int
558 kptllnd_peer_send_noop (kptl_peer_t *peer)
559 {
560         if (!peer->peer_sent_hello ||
561             peer->peer_credits == 0 ||
562             !list_empty(&peer->peer_noops) ||
563             peer->peer_outstanding_credits < PTLLND_CREDIT_HIGHWATER)
564                 return 0;
565
566         /* No tx to piggyback NOOP onto or no credit to send a tx */
567         return (list_empty(&peer->peer_sendq) || peer->peer_credits == 1);
568 }
569
570 void
571 kptllnd_peer_check_sends (kptl_peer_t *peer)
572 {
573         ptl_handle_me_t  meh;
574         kptl_tx_t       *tx;
575         int              rc;
576         int              msg_type;
577         unsigned long    flags;
578
579         LASSERT(!in_interrupt());
580
581         spin_lock_irqsave(&peer->peer_lock, flags);
582
583         peer->peer_retry_noop = 0;
584
585         if (kptllnd_peer_send_noop(peer)) {
586                 /* post a NOOP to return credits */
587                 spin_unlock_irqrestore(&peer->peer_lock, flags);
588
589                 tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
590                 if (tx == NULL) {
591                         CERROR("Can't return credits to %s: can't allocate descriptor\n",
592                                libcfs_id2str(peer->peer_id));
593                 } else {
594                         kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_NOOP, 0);
595                         kptllnd_post_tx(peer, tx, 0);
596                 }
597
598                 spin_lock_irqsave(&peer->peer_lock, flags);
599                 peer->peer_retry_noop = (tx == NULL);
600         }
601
602         for (;;) {
603                 if (!list_empty(&peer->peer_noops)) {
604                         LASSERT (peer->peer_sent_hello);
605                         tx = list_entry(peer->peer_noops.next,
606                                         kptl_tx_t, tx_list);
607                 } else if (!list_empty(&peer->peer_sendq)) {
608                         tx = list_entry(peer->peer_sendq.next,
609                                         kptl_tx_t, tx_list);
610                 } else {
611                         /* nothing to send right now */
612                         break;
613                 }
614
615                 LASSERT (tx->tx_active);
616                 LASSERT (!PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
617                 LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
618
619                 LASSERT (peer->peer_outstanding_credits >= 0);
620                 LASSERT (peer->peer_sent_credits >= 0);
621                 LASSERT (peer->peer_sent_credits +
622                          peer->peer_outstanding_credits <=
623                          *kptllnd_tunables.kptl_peercredits);
624                 LASSERT (peer->peer_credits >= 0);
625
626                 msg_type = tx->tx_msg->ptlm_type;
627
628                 /* Ensure HELLO is sent first */
629                 if (!peer->peer_sent_hello) {
630                         LASSERT (list_empty(&peer->peer_noops));
631                         if (msg_type != PTLLND_MSG_TYPE_HELLO)
632                                 break;
633                         peer->peer_sent_hello = 1;
634                 }
635
636                 if (peer->peer_credits == 0) {
637                         CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: no credits for %s[%p]\n",
638                                libcfs_id2str(peer->peer_id), 
639                                peer->peer_credits,
640                                peer->peer_outstanding_credits, 
641                                peer->peer_sent_credits, 
642                                kptllnd_msgtype2str(msg_type), tx);
643                         break;
644                 }
645
646                 /* Last/Initial credit reserved for NOOP/HELLO */
647                 if (peer->peer_credits == 1 &&
648                     msg_type != PTLLND_MSG_TYPE_HELLO &&
649                     msg_type != PTLLND_MSG_TYPE_NOOP) {
650                         CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: "
651                                "not using last credit for %s[%p]\n",
652                                libcfs_id2str(peer->peer_id), 
653                                peer->peer_credits,
654                                peer->peer_outstanding_credits,
655                                peer->peer_sent_credits,
656                                kptllnd_msgtype2str(msg_type), tx);
657                         break;
658                 }
659
660                 list_del(&tx->tx_list);
661
662                 /* Discard any NOOP I queued if I'm not at the high-water mark
663                  * any more or more messages have been queued */
664                 if (msg_type == PTLLND_MSG_TYPE_NOOP &&
665                     !kptllnd_peer_send_noop(peer)) {
666                         tx->tx_active = 0;
667
668                         spin_unlock_irqrestore(&peer->peer_lock, flags);
669
670                         CDEBUG(D_NET, "%s: redundant noop\n", 
671                                libcfs_id2str(peer->peer_id));
672                         kptllnd_tx_decref(tx);
673
674                         spin_lock_irqsave(&peer->peer_lock, flags);
675                         continue;
676                 }
677
678                 /* fill last-minute msg fields */
679                 kptllnd_msg_pack(tx->tx_msg, peer);
680
681                 if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
682                     tx->tx_type == TX_TYPE_GET_REQUEST) {
683                         /* peer_next_matchbits must be known good */
684                         LASSERT (peer->peer_state >= PEER_STATE_ACTIVE);
685                         /* Assume 64-bit matchbits can't wrap */
686                         LASSERT (peer->peer_next_matchbits >= PTL_RESERVED_MATCHBITS);
687                         tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits =
688                                 peer->peer_next_matchbits++;
689                 }
690
691                 peer->peer_sent_credits += peer->peer_outstanding_credits;
692                 peer->peer_outstanding_credits = 0;
693                 peer->peer_credits--;
694
695                 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: %s tx=%p nob=%d cred=%d\n",
696                        libcfs_id2str(peer->peer_id), peer->peer_credits,
697                        peer->peer_outstanding_credits, peer->peer_sent_credits,
698                        kptllnd_msgtype2str(msg_type), tx, tx->tx_msg->ptlm_nob,
699                        tx->tx_msg->ptlm_credits);
700
701                 list_add_tail(&tx->tx_list, &peer->peer_activeq);
702
703                 kptllnd_tx_addref(tx);          /* 1 ref for me... */
704
705                 spin_unlock_irqrestore(&peer->peer_lock, flags);
706
707                 if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
708                     tx->tx_type == TX_TYPE_GET_REQUEST) {
709                         /* Post bulk now we have safe matchbits */
710                         rc = PtlMEAttach(kptllnd_data.kptl_nih,
711                                          *kptllnd_tunables.kptl_portal,
712                                          peer->peer_ptlid,
713                                          tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits,
714                                          0,             /* ignore bits */
715                                          PTL_UNLINK,
716                                          PTL_INS_BEFORE,
717                                          &meh);
718                         if (rc != PTL_OK) {
719                                 CERROR("PtlMEAttach(%s) failed: %s(%d)\n",
720                                        libcfs_id2str(peer->peer_id),
721                                        kptllnd_errtype2str(rc), rc);
722                                 goto failed;
723                         }
724
725                         rc = PtlMDAttach(meh, tx->tx_rdma_md, PTL_UNLINK,
726                                          &tx->tx_rdma_mdh);
727                         if (rc != PTL_OK) {
728                                 CERROR("PtlMDAttach(%s) failed: %s(%d)\n",
729                                        libcfs_id2str(tx->tx_peer->peer_id),
730                                        kptllnd_errtype2str(rc), rc);
731                                 rc = PtlMEUnlink(meh);
732                                 LASSERT(rc == PTL_OK);
733                                 tx->tx_rdma_mdh = PTL_INVALID_HANDLE;
734                                 goto failed;
735                         }
736                         /* I'm not racing with the event callback here.  It's a
737                          * bug if there's an event on the MD I just attached
738                          * before I actually send the RDMA request message -
739                          * probably matchbits re-used in error. */
740                 }
741
742                 tx->tx_tposted = jiffies;       /* going on the wire */
743
744                 rc = PtlPut (tx->tx_msg_mdh,
745                              tx->tx_acked ? PTL_ACK_REQ : PTL_NOACK_REQ,
746                              peer->peer_ptlid,
747                              *kptllnd_tunables.kptl_portal,
748                              0,                 /* acl cookie */
749                              LNET_MSG_MATCHBITS,
750                              0,                 /* offset */
751                              0);                /* header data */
752                 if (rc != PTL_OK) {
753                         CERROR("PtlPut %s error %s(%d)\n",
754                                libcfs_id2str(peer->peer_id),
755                                kptllnd_errtype2str(rc), rc);
756                         goto failed;
757                 }
758
759                 kptllnd_tx_decref(tx);          /* drop my ref */
760
761                 spin_lock_irqsave(&peer->peer_lock, flags);
762         }
763
764         spin_unlock_irqrestore(&peer->peer_lock, flags);
765         return;
766
767  failed:
768         /* Nuke everything (including tx we were trying) */
769         kptllnd_peer_close(peer, -EIO);
770         kptllnd_tx_decref(tx);
771 }
772
773 kptl_tx_t *
774 kptllnd_find_timed_out_tx(kptl_peer_t *peer)
775 {
776         kptl_tx_t         *tx;
777         struct list_head  *ele;
778
779         list_for_each(ele, &peer->peer_sendq) {
780                 tx = list_entry(ele, kptl_tx_t, tx_list);
781
782                 if (time_after_eq(jiffies, tx->tx_deadline)) {
783                         kptllnd_tx_addref(tx);
784                         return tx;
785                 }
786         }
787
788         list_for_each(ele, &peer->peer_activeq) {
789                 tx = list_entry(ele, kptl_tx_t, tx_list);
790
791                 if (time_after_eq(jiffies, tx->tx_deadline)) {
792                         kptllnd_tx_addref(tx);
793                         return tx;
794                 }
795         }
796
797         return NULL;
798 }
799
800
801 void
802 kptllnd_peer_check_bucket (int idx, int stamp)
803 {
804         struct list_head  *peers = &kptllnd_data.kptl_peers[idx];
805         struct list_head  *ptmp;
806         kptl_peer_t       *peer;
807         kptl_tx_t         *tx;
808         unsigned long      flags;
809         int                nsend;
810         int                nactive;
811         int                check_sends;
812
813         CDEBUG(D_NET, "Bucket=%d, stamp=%d\n", idx, stamp);
814
815  again:
816         /* NB. Shared lock while I just look */
817         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
818
819         list_for_each (ptmp, peers) {
820                 peer = list_entry (ptmp, kptl_peer_t, peer_list);
821
822                 CDEBUG(D_NET, "Peer=%s Credits=%d Outstanding=%d Send=%d\n",
823                        libcfs_id2str(peer->peer_id), peer->peer_credits, 
824                        peer->peer_outstanding_credits, peer->peer_sent_credits);
825
826                 spin_lock(&peer->peer_lock);
827
828                 if (peer->peer_check_stamp == stamp) {
829                         /* checked already this pass */
830                         spin_unlock(&peer->peer_lock);
831                         continue;
832                 }
833
834                 peer->peer_check_stamp = stamp;
835                 tx = kptllnd_find_timed_out_tx(peer);
836                 check_sends = peer->peer_retry_noop;
837                 
838                 spin_unlock(&peer->peer_lock);
839                 
840                 if (tx == NULL && !check_sends)
841                         continue;
842
843                 kptllnd_peer_addref(peer); /* 1 ref for me... */
844
845                 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
846
847                 if (tx == NULL) { /* nothing timed out */
848                         kptllnd_peer_check_sends(peer);
849                         kptllnd_peer_decref(peer); /* ...until here or... */
850
851                         /* rescan after dropping the lock */
852                         goto again;
853                 }
854
855                 spin_lock_irqsave(&peer->peer_lock, flags);
856                 nsend = kptllnd_count_queue(&peer->peer_sendq);
857                 nactive = kptllnd_count_queue(&peer->peer_activeq);
858                 spin_unlock_irqrestore(&peer->peer_lock, flags);
859
860                 LCONSOLE_ERROR_MSG(0x126, "Timing out %s: %s\n",
861                                    libcfs_id2str(peer->peer_id),
862                                    (tx->tx_tposted == 0) ? 
863                                    "no free peer buffers" : 
864                                    "please check Portals");
865
866                 if (tx->tx_tposted) {
867                         CERROR("Could not send to %s after %ds (sent %lds ago); "
868                                 "check Portals for possible issues\n",
869                                 libcfs_id2str(peer->peer_id),
870                                 *kptllnd_tunables.kptl_timeout,
871                                 cfs_duration_sec(jiffies - tx->tx_tposted));
872                 } else {
873                         CERROR("Could not get credits for %s after %ds; "
874                                 "possible Lustre networking issues\n",
875                         libcfs_id2str(peer->peer_id),
876                         *kptllnd_tunables.kptl_timeout);
877                 }
878
879                 CERROR("%s timed out: cred %d outstanding %d, sent %d, "
880                        "sendq %d, activeq %d Tx %p %s (%s%s%s) status %d "
881                        "%sposted %lu T/O %ds\n",
882                        libcfs_id2str(peer->peer_id), peer->peer_credits,
883                        peer->peer_outstanding_credits, peer->peer_sent_credits,
884                        nsend, nactive, tx, kptllnd_tx_typestr(tx->tx_type),
885                        tx->tx_active ? "A" : "",
886                        PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ?
887                        "" : "M",
888                        PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE) ?
889                        "" : "D",
890                        tx->tx_status,
891                        (tx->tx_tposted == 0) ? "not " : "",
892                        (tx->tx_tposted == 0) ? 0UL : (jiffies - tx->tx_tposted),
893                        *kptllnd_tunables.kptl_timeout);
894
895                 kptllnd_dump_ptltrace();
896
897                 kptllnd_tx_decref(tx);
898
899                 kptllnd_peer_close(peer, -ETIMEDOUT);
900                 kptllnd_peer_decref(peer); /* ...until here */
901
902                 /* start again now I've dropped the lock */
903                 goto again;
904         }
905
906         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
907 }
908
909 kptl_peer_t *
910 kptllnd_id2peer_locked (lnet_process_id_t id)
911 {
912         struct list_head *peers = kptllnd_nid2peerlist(id.nid);
913         struct list_head *tmp;
914         kptl_peer_t      *peer;
915
916         list_for_each (tmp, peers) {
917
918                 peer = list_entry (tmp, kptl_peer_t, peer_list);
919
920                 LASSERT(peer->peer_state == PEER_STATE_WAITING_HELLO ||
921                         peer->peer_state == PEER_STATE_ACTIVE);
922                 
923                 if (peer->peer_id.nid != id.nid ||
924                     peer->peer_id.pid != id.pid)
925                         continue;
926
927                 kptllnd_peer_addref(peer);
928
929                 CDEBUG(D_NET, "%s -> %s (%d)\n",
930                        libcfs_id2str(id), 
931                        kptllnd_ptlid2str(peer->peer_ptlid),
932                        atomic_read (&peer->peer_refcount));
933                 return peer;
934         }
935
936         return NULL;
937 }
938
939 void
940 kptllnd_peertable_overflow_msg(char *str, lnet_process_id_t id)
941 {
942         LCONSOLE_ERROR_MSG(0x127, "%s %s overflows the peer table[%d]: "
943                            "messages may be dropped\n",
944                            str, libcfs_id2str(id),
945                            kptllnd_data.kptl_n_active_peers);
946         LCONSOLE_ERROR_MSG(0x128, "Please correct by increasing "
947                            "'max_nodes' or 'max_procs_per_node'\n");
948 }
949
950 __u64
951 kptllnd_get_last_seen_matchbits_locked(lnet_process_id_t lpid)
952 {
953         kptl_peer_t            *peer;
954         struct list_head       *tmp;
955
956         /* Find the last matchbits I saw this new peer using.  Note..
957            A. This peer cannot be in the peer table - she's new!
958            B. If I can't find the peer in the closing/zombie peers, all
959               matchbits are safe because all refs to the (old) peer have gone
960               so all txs have completed so there's no risk of matchbit
961               collision!
962          */
963
964         LASSERT(kptllnd_id2peer_locked(lpid) == NULL);
965
966         /* peer's last matchbits can't change after it comes out of the peer
967          * table, so first match is fine */
968
969         list_for_each (tmp, &kptllnd_data.kptl_closing_peers) {
970                 peer = list_entry (tmp, kptl_peer_t, peer_list);
971
972                 if (peer->peer_id.nid == lpid.nid &&
973                     peer->peer_id.pid == lpid.pid)
974                         return peer->peer_last_matchbits_seen;
975         }
976         
977         list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
978                 peer = list_entry (tmp, kptl_peer_t, peer_list);
979
980                 if (peer->peer_id.nid == lpid.nid &&
981                     peer->peer_id.pid == lpid.pid)
982                         return peer->peer_last_matchbits_seen;
983         }
984         
985         return PTL_RESERVED_MATCHBITS;
986 }
987
988 kptl_peer_t *
989 kptllnd_peer_handle_hello (ptl_process_id_t  initiator,
990                            kptl_msg_t       *msg)
991 {
992         rwlock_t           *g_lock = &kptllnd_data.kptl_peer_rw_lock;
993         kptl_peer_t        *peer;
994         kptl_peer_t        *new_peer;
995         lnet_process_id_t   lpid;
996         unsigned long       flags;
997         kptl_tx_t          *hello_tx;
998         int                 rc;
999         __u64               safe_matchbits;
1000         __u64               last_matchbits_seen;
1001
1002         lpid.nid = msg->ptlm_srcnid;
1003         lpid.pid = msg->ptlm_srcpid;
1004
1005         CDEBUG(D_NET, "hello from %s(%s)\n",
1006                libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
1007
1008         if (initiator.pid != kptllnd_data.kptl_portals_id.pid &&
1009             (msg->ptlm_srcpid & LNET_PID_USERFLAG) == 0) {
1010                 /* If the peer's PID isn't _the_ ptllnd kernel pid, she must be
1011                  * userspace.  Refuse the connection if she hasn't set the
1012                  * correct flag in her PID... */
1013                 CERROR("Userflag not set in hello from %s (%s)\n",
1014                        libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
1015                 return NULL;
1016         }
1017         
1018         /* kptlhm_matchbits are the highest matchbits my peer may have used to
1019          * RDMA to me.  I ensure I never register buffers for RDMA that could
1020          * match any she used */
1021         safe_matchbits = msg->ptlm_u.hello.kptlhm_matchbits + 1;
1022
1023         if (safe_matchbits < PTL_RESERVED_MATCHBITS) {
1024                 CERROR("Illegal matchbits "LPX64" in HELLO from %s\n",
1025                        safe_matchbits, libcfs_id2str(lpid));
1026                 return NULL;
1027         }
1028         
1029         if (msg->ptlm_u.hello.kptlhm_max_msg_size < PTLLND_MIN_BUFFER_SIZE) {
1030                 CERROR("%s: max message size %d < MIN %d",
1031                        libcfs_id2str(lpid),
1032                        msg->ptlm_u.hello.kptlhm_max_msg_size,
1033                        PTLLND_MIN_BUFFER_SIZE);
1034                 return NULL;
1035         }
1036
1037         if (msg->ptlm_credits <= 1) {
1038                 CERROR("Need more than 1+%d credits from %s\n",
1039                        msg->ptlm_credits, libcfs_id2str(lpid));
1040                 return NULL;
1041         }
1042         
1043         write_lock_irqsave(g_lock, flags);
1044
1045         peer = kptllnd_id2peer_locked(lpid);
1046         if (peer != NULL) {
1047                 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
1048                         /* Completing HELLO handshake */
1049                         LASSERT(peer->peer_incarnation == 0);
1050
1051                         if (msg->ptlm_dststamp != 0 &&
1052                             msg->ptlm_dststamp != peer->peer_myincarnation) {
1053                                 write_unlock_irqrestore(g_lock, flags);
1054
1055                                 CERROR("Ignoring HELLO from %s: unexpected "
1056                                        "dststamp "LPX64" ("LPX64" wanted)\n",
1057                                        libcfs_id2str(lpid),
1058                                        msg->ptlm_dststamp,
1059                                        peer->peer_myincarnation);
1060                                 kptllnd_peer_decref(peer);
1061                                 return NULL;
1062                         }
1063                         
1064                         /* Concurrent initiation or response to my HELLO */
1065                         peer->peer_state = PEER_STATE_ACTIVE;
1066                         peer->peer_incarnation = msg->ptlm_srcstamp;
1067                         peer->peer_next_matchbits = safe_matchbits;
1068                         peer->peer_max_msg_size =
1069                                 msg->ptlm_u.hello.kptlhm_max_msg_size;
1070                         
1071                         write_unlock_irqrestore(g_lock, flags);
1072                         return peer;
1073                 }
1074
1075                 if (msg->ptlm_dststamp != 0 &&
1076                     msg->ptlm_dststamp <= peer->peer_myincarnation) {
1077                         write_unlock_irqrestore(g_lock, flags);
1078
1079                         CERROR("Ignoring stale HELLO from %s: "
1080                                "dststamp "LPX64" (current "LPX64")\n",
1081                                libcfs_id2str(lpid),
1082                                msg->ptlm_dststamp,
1083                                peer->peer_myincarnation);
1084                         kptllnd_peer_decref(peer);
1085                         return NULL;
1086                 }
1087
1088                 /* Brand new connection attempt: remove old incarnation */
1089                 kptllnd_peer_close_locked(peer, 0);
1090         }
1091
1092         kptllnd_cull_peertable_locked(lpid);
1093
1094         write_unlock_irqrestore(g_lock, flags);
1095
1096         if (peer != NULL) {
1097                 CDEBUG(D_NET, "Peer %s (%s) reconnecting:"
1098                        " stamp "LPX64"("LPX64")\n",
1099                        libcfs_id2str(lpid), kptllnd_ptlid2str(initiator),
1100                        msg->ptlm_srcstamp, peer->peer_incarnation);
1101
1102                 kptllnd_peer_decref(peer);
1103         }
1104
1105         hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1106         if (hello_tx == NULL) {
1107                 CERROR("Unable to allocate HELLO message for %s\n",
1108                        libcfs_id2str(lpid));
1109                 return NULL;
1110         }
1111
1112         kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1113                          sizeof(kptl_hello_msg_t));
1114
1115         new_peer = kptllnd_peer_allocate(lpid, initiator);
1116         if (new_peer == NULL) {
1117                 kptllnd_tx_decref(hello_tx);
1118                 return NULL;
1119         }
1120
1121         rc = kptllnd_peer_reserve_buffers();
1122         if (rc != 0) {
1123                 kptllnd_peer_decref(new_peer);
1124                 kptllnd_tx_decref(hello_tx);
1125
1126                 CERROR("Failed to reserve buffers for %s\n",
1127                        libcfs_id2str(lpid));
1128                 return NULL;
1129         }
1130
1131         write_lock_irqsave(g_lock, flags);
1132
1133  again:
1134         if (kptllnd_data.kptl_shutdown) {
1135                 write_unlock_irqrestore(g_lock, flags);
1136
1137                 CERROR ("Shutdown started, refusing connection from %s\n",
1138                         libcfs_id2str(lpid));
1139                 kptllnd_peer_unreserve_buffers();
1140                 kptllnd_peer_decref(new_peer);
1141                 kptllnd_tx_decref(hello_tx);
1142                 return NULL;
1143         }
1144
1145         peer = kptllnd_id2peer_locked(lpid);
1146         if (peer != NULL) {
1147                 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
1148                         /* An outgoing message instantiated 'peer' for me */
1149                         LASSERT(peer->peer_incarnation == 0);
1150
1151                         peer->peer_state = PEER_STATE_ACTIVE;
1152                         peer->peer_incarnation = msg->ptlm_srcstamp;
1153                         peer->peer_next_matchbits = safe_matchbits;
1154                         peer->peer_max_msg_size =
1155                                 msg->ptlm_u.hello.kptlhm_max_msg_size;
1156
1157                         write_unlock_irqrestore(g_lock, flags);
1158
1159                         CWARN("Outgoing instantiated peer %s\n",
1160                               libcfs_id2str(lpid));
1161                 } else {
1162                         LASSERT (peer->peer_state == PEER_STATE_ACTIVE);
1163
1164                         write_unlock_irqrestore(g_lock, flags);
1165
1166                         /* WOW!  Somehow this peer completed the HELLO
1167                          * handshake while I slept.  I guess I could have slept
1168                          * while it rebooted and sent a new HELLO, so I'll fail
1169                          * this one... */
1170                         CWARN("Wow! peer %s\n", libcfs_id2str(lpid));
1171                         kptllnd_peer_decref(peer);
1172                         peer = NULL;
1173                 }
1174
1175                 kptllnd_peer_unreserve_buffers();
1176                 kptllnd_peer_decref(new_peer);
1177                 kptllnd_tx_decref(hello_tx);
1178                 return peer;
1179         }
1180
1181         if (kptllnd_data.kptl_n_active_peers ==
1182             kptllnd_data.kptl_expected_peers) {
1183                 /* peer table full */
1184                 write_unlock_irqrestore(g_lock, flags);
1185
1186                 kptllnd_peertable_overflow_msg("Connection from ", lpid);
1187
1188                 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1189                 if (rc != 0) {
1190                         CERROR("Refusing connection from %s\n",
1191                                libcfs_id2str(lpid));
1192                         kptllnd_peer_unreserve_buffers();
1193                         kptllnd_peer_decref(new_peer);
1194                         kptllnd_tx_decref(hello_tx);
1195                         return NULL;
1196                 }
1197                 
1198                 write_lock_irqsave(g_lock, flags);
1199                 kptllnd_data.kptl_expected_peers++;
1200                 goto again;
1201         }
1202
1203         last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(lpid);
1204
1205         hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1206         hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1207                 *kptllnd_tunables.kptl_max_msg_size;
1208
1209         new_peer->peer_state = PEER_STATE_ACTIVE;
1210         new_peer->peer_incarnation = msg->ptlm_srcstamp;
1211         new_peer->peer_next_matchbits = safe_matchbits;
1212         new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1213         new_peer->peer_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size;
1214
1215         kptllnd_peer_add_peertable_locked(new_peer);
1216
1217         write_unlock_irqrestore(g_lock, flags);
1218
1219         /* NB someone else could get in now and post a message before I post
1220          * the HELLO, but post_tx/check_sends take care of that! */
1221
1222         CDEBUG(D_NETTRACE, "%s: post response hello %p\n",
1223                libcfs_id2str(new_peer->peer_id), hello_tx);
1224
1225         kptllnd_post_tx(new_peer, hello_tx, 0);
1226         kptllnd_peer_check_sends(new_peer);
1227
1228         return new_peer;
1229 }
1230
1231 void
1232 kptllnd_tx_launch(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
1233 {
1234         kptllnd_post_tx(peer, tx, nfrag);
1235         kptllnd_peer_check_sends(peer);
1236 }
1237
1238 int
1239 kptllnd_find_target(kptl_peer_t **peerp, lnet_process_id_t target)
1240 {
1241         rwlock_t         *g_lock = &kptllnd_data.kptl_peer_rw_lock;
1242         ptl_process_id_t  ptl_id;
1243         kptl_peer_t      *new_peer;
1244         kptl_tx_t        *hello_tx;
1245         unsigned long     flags;
1246         int               rc;
1247         __u64             last_matchbits_seen;
1248
1249         /* I expect to find the peer, so I only take a read lock... */
1250         read_lock_irqsave(g_lock, flags);
1251         *peerp = kptllnd_id2peer_locked(target);
1252         read_unlock_irqrestore(g_lock, flags);
1253
1254         if (*peerp != NULL)
1255                 return 0;
1256         
1257         if ((target.pid & LNET_PID_USERFLAG) != 0) {
1258                 CWARN("Refusing to create a new connection to %s "
1259                       "(non-kernel peer)\n", libcfs_id2str(target));
1260                 return -EHOSTUNREACH;
1261         }
1262
1263         /* The new peer is a kernel ptllnd, and kernel ptllnds all have
1264          * the same portals PID */
1265         ptl_id.nid = kptllnd_lnet2ptlnid(target.nid);
1266         ptl_id.pid = kptllnd_data.kptl_portals_id.pid;
1267
1268         hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1269         if (hello_tx == NULL) {
1270                 CERROR("Unable to allocate connect message for %s\n",
1271                        libcfs_id2str(target));
1272                 return -ENOMEM;
1273         }
1274
1275         kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1276                          sizeof(kptl_hello_msg_t));
1277
1278         new_peer = kptllnd_peer_allocate(target, ptl_id);
1279         if (new_peer == NULL) {
1280                 rc = -ENOMEM;
1281                 goto unwind_0;
1282         }
1283
1284         rc = kptllnd_peer_reserve_buffers();
1285         if (rc != 0)
1286                 goto unwind_1;
1287
1288         write_lock_irqsave(g_lock, flags);
1289  again:
1290         if (kptllnd_data.kptl_shutdown) {
1291                 write_unlock_irqrestore(g_lock, flags);
1292                 rc = -ESHUTDOWN;
1293                 goto unwind_2;
1294         }
1295
1296         *peerp = kptllnd_id2peer_locked(target);
1297         if (*peerp != NULL) {
1298                 write_unlock_irqrestore(g_lock, flags);
1299                 goto unwind_2;
1300         }
1301
1302         kptllnd_cull_peertable_locked(target);
1303
1304         if (kptllnd_data.kptl_n_active_peers ==
1305             kptllnd_data.kptl_expected_peers) {
1306                 /* peer table full */
1307                 write_unlock_irqrestore(g_lock, flags);
1308
1309                 kptllnd_peertable_overflow_msg("Connection to ", target);
1310
1311                 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1312                 if (rc != 0) {
1313                         CERROR("Can't create connection to %s\n",
1314                                libcfs_id2str(target));
1315                         rc = -ENOMEM;
1316                         goto unwind_2;
1317                 }
1318                 write_lock_irqsave(g_lock, flags);
1319                 kptllnd_data.kptl_expected_peers++;
1320                 goto again;
1321         }
1322
1323         last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(target);
1324
1325         hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1326         hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1327                 *kptllnd_tunables.kptl_max_msg_size;
1328                 
1329         new_peer->peer_state = PEER_STATE_WAITING_HELLO;
1330         new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1331         
1332         kptllnd_peer_add_peertable_locked(new_peer);
1333
1334         write_unlock_irqrestore(g_lock, flags);
1335
1336         /* NB someone else could get in now and post a message before I post
1337          * the HELLO, but post_tx/check_sends take care of that! */
1338
1339         CDEBUG(D_NETTRACE, "%s: post initial hello %p\n",
1340                libcfs_id2str(new_peer->peer_id), hello_tx);
1341
1342         kptllnd_post_tx(new_peer, hello_tx, 0);
1343         kptllnd_peer_check_sends(new_peer);
1344        
1345         *peerp = new_peer;
1346         return 0;
1347         
1348  unwind_2:
1349         kptllnd_peer_unreserve_buffers();
1350  unwind_1:
1351         kptllnd_peer_decref(new_peer);
1352  unwind_0:
1353         kptllnd_tx_decref(hello_tx);
1354
1355         return rc;
1356 }