Whamcloud - gitweb
c92fe6c67a98cf3db622ffe66fbfa2d226c2eab8
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd_peer.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/klnds/ptllnd/ptllnd_peer.c
37  *
38  * Author: PJ Kirner <pjkirner@clusterfs.com>
39  * Author: E Barton <eeb@bartonsoftware.com>
40  */
41
42 #include "ptllnd.h"
43 #include <libcfs/list.h>
44
45 static int
46 kptllnd_count_queue(struct list_head *q)
47 {
48         struct list_head *e;
49         int               n = 0;
50         
51         list_for_each(e, q) {
52                 n++;
53         }
54
55         return n;
56 }
57
58 int
59 kptllnd_get_peer_info(int index, 
60                       lnet_process_id_t *id,
61                       int *state, int *sent_hello,
62                       int *refcount, __u64 *incarnation,
63                       __u64 *next_matchbits, __u64 *last_matchbits_seen,
64                       int *nsendq, int *nactiveq,
65                       int *credits, int *outstanding_credits) 
66 {
67         rwlock_t         *g_lock = &kptllnd_data.kptl_peer_rw_lock;
68         unsigned long     flags;
69         struct list_head *ptmp;
70         kptl_peer_t      *peer;
71         int               i;
72         int               rc = -ENOENT;
73
74         read_lock_irqsave(g_lock, flags);
75
76         for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++) {
77                 
78                 list_for_each (ptmp, &kptllnd_data.kptl_peers[i]) {
79                         peer = list_entry(ptmp, kptl_peer_t, peer_list);
80
81                         if (index-- > 0)
82                                 continue;
83                         
84                         *id          = peer->peer_id;
85                         *state       = peer->peer_state;
86                         *sent_hello  = peer->peer_sent_hello;
87                         *refcount    = atomic_read(&peer->peer_refcount);
88                         *incarnation = peer->peer_incarnation;
89
90                         spin_lock(&peer->peer_lock);
91
92                         *next_matchbits      = peer->peer_next_matchbits;
93                         *last_matchbits_seen = peer->peer_last_matchbits_seen;
94                         *credits             = peer->peer_credits;
95                         *outstanding_credits = peer->peer_outstanding_credits;
96
97                         *nsendq   = kptllnd_count_queue(&peer->peer_sendq);
98                         *nactiveq = kptllnd_count_queue(&peer->peer_activeq);
99
100                         spin_unlock(&peer->peer_lock);
101
102                         rc = 0;
103                         goto out;
104                 }
105         }
106         
107  out:
108         read_unlock_irqrestore(g_lock, flags);
109         return rc;
110 }
111
112 void
113 kptllnd_peer_add_peertable_locked (kptl_peer_t *peer)
114 {
115         LASSERT (!kptllnd_data.kptl_shutdown);
116         LASSERT (kptllnd_data.kptl_n_active_peers <
117                  kptllnd_data.kptl_expected_peers);
118
119         LASSERT (peer->peer_state == PEER_STATE_WAITING_HELLO ||
120                  peer->peer_state == PEER_STATE_ACTIVE);
121         
122         kptllnd_data.kptl_n_active_peers++;
123         atomic_inc(&peer->peer_refcount);       /* +1 ref for the list */
124
125         /* NB add to HEAD of peer list for MRU order!
126          * (see kptllnd_cull_peertable) */
127         list_add(&peer->peer_list, kptllnd_nid2peerlist(peer->peer_id.nid));
128 }
129
130 void
131 kptllnd_cull_peertable_locked (lnet_process_id_t pid)
132 {
133         /* I'm about to add a new peer with this portals ID to the peer table,
134          * so (a) this peer should not exist already and (b) I want to leave at
135          * most (max_procs_per_nid - 1) peers with this NID in the table. */
136         struct list_head  *peers = kptllnd_nid2peerlist(pid.nid);
137         int                cull_count = *kptllnd_tunables.kptl_max_procs_per_node;
138         int                count;
139         struct list_head  *tmp;
140         struct list_head  *nxt;
141         kptl_peer_t       *peer;
142         
143         count = 0;
144         list_for_each_safe (tmp, nxt, peers) {
145                 /* NB I rely on kptllnd_peer_add_peertable_locked to add peers
146                  * in MRU order */
147                 peer = list_entry(tmp, kptl_peer_t, peer_list);
148                         
149                 if (peer->peer_id.nid != pid.nid)
150                         continue;
151
152                 LASSERT (peer->peer_id.pid != pid.pid);
153                         
154                 count++;
155
156                 if (count < cull_count) /* recent (don't cull) */
157                         continue;
158
159                 CDEBUG(D_NET, "Cull %s(%s)\n",
160                        libcfs_id2str(peer->peer_id),
161                        kptllnd_ptlid2str(peer->peer_ptlid));
162                 
163                 kptllnd_peer_close_locked(peer, 0);
164         }
165 }
166
167 kptl_peer_t *
168 kptllnd_peer_allocate (lnet_process_id_t lpid, ptl_process_id_t ppid)
169 {
170         unsigned long    flags;
171         kptl_peer_t     *peer;
172
173         LIBCFS_ALLOC(peer, sizeof (*peer));
174         if (peer == NULL) {
175                 CERROR("Can't create peer %s (%s)\n",
176                        libcfs_id2str(lpid), 
177                        kptllnd_ptlid2str(ppid));
178                 return NULL;
179         }
180
181         memset(peer, 0, sizeof(*peer));         /* zero flags etc */
182
183         INIT_LIST_HEAD (&peer->peer_noops);
184         INIT_LIST_HEAD (&peer->peer_sendq);
185         INIT_LIST_HEAD (&peer->peer_activeq);
186         spin_lock_init (&peer->peer_lock);
187
188         peer->peer_state = PEER_STATE_ALLOCATED;
189         peer->peer_error = 0;
190         peer->peer_last_alive = 0;
191         peer->peer_id = lpid;
192         peer->peer_ptlid = ppid;
193         peer->peer_credits = 1;                 /* enough for HELLO */
194         peer->peer_next_matchbits = PTL_RESERVED_MATCHBITS;
195         peer->peer_outstanding_credits = *kptllnd_tunables.kptl_peercredits - 1;
196         peer->peer_sent_credits = 1;           /* HELLO credit is implicit */
197         peer->peer_max_msg_size = PTLLND_MIN_BUFFER_SIZE; /* until we know better */
198
199         atomic_set(&peer->peer_refcount, 1);    /* 1 ref for caller */
200
201         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
202
203         peer->peer_myincarnation = kptllnd_data.kptl_incarnation;
204
205         /* Only increase # peers under lock, to guarantee we dont grow it
206          * during shutdown */
207         if (kptllnd_data.kptl_shutdown) {
208                 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, 
209                                         flags);
210                 LIBCFS_FREE(peer, sizeof(*peer));
211                 return NULL;
212         }
213
214         kptllnd_data.kptl_npeers++;
215         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
216         
217         return peer;
218 }
219
220 void
221 kptllnd_peer_destroy (kptl_peer_t *peer)
222 {
223         unsigned long flags;
224         
225         CDEBUG(D_NET, "Peer=%p\n", peer);
226
227         LASSERT (!in_interrupt());
228         LASSERT (atomic_read(&peer->peer_refcount) == 0);
229         LASSERT (peer->peer_state == PEER_STATE_ALLOCATED ||
230                  peer->peer_state == PEER_STATE_ZOMBIE);
231         LASSERT (list_empty(&peer->peer_noops));
232         LASSERT (list_empty(&peer->peer_sendq));
233         LASSERT (list_empty(&peer->peer_activeq));
234
235         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
236
237         if (peer->peer_state == PEER_STATE_ZOMBIE)
238                 list_del(&peer->peer_list);
239
240         kptllnd_data.kptl_npeers--;
241
242         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
243
244         LIBCFS_FREE (peer, sizeof (*peer));
245 }
246
247 void
248 kptllnd_cancel_txlist (struct list_head *peerq, struct list_head *txs)
249 {
250         struct list_head  *tmp;
251         struct list_head  *nxt;
252         kptl_tx_t         *tx;
253
254         list_for_each_safe (tmp, nxt, peerq) {
255                 tx = list_entry(tmp, kptl_tx_t, tx_list);
256
257                 list_del(&tx->tx_list);
258                 list_add_tail(&tx->tx_list, txs);
259
260                 tx->tx_status = -EIO;
261                 tx->tx_active = 0;
262         }
263 }
264
265 void
266 kptllnd_peer_cancel_txs(kptl_peer_t *peer, struct list_head *txs)
267 {
268         unsigned long   flags;
269
270         spin_lock_irqsave(&peer->peer_lock, flags);
271
272         kptllnd_cancel_txlist(&peer->peer_noops, txs);
273         kptllnd_cancel_txlist(&peer->peer_sendq, txs);
274         kptllnd_cancel_txlist(&peer->peer_activeq, txs);
275                 
276         spin_unlock_irqrestore(&peer->peer_lock, flags);
277 }
278
279 void
280 kptllnd_peer_alive (kptl_peer_t *peer)
281 {
282         /* This is racy, but everyone's only writing cfs_time_current() */
283         peer->peer_last_alive = cfs_time_current();
284         mb();
285 }
286
287 void
288 kptllnd_peer_notify (kptl_peer_t *peer)
289 {
290         unsigned long flags;
291         time_t        last_alive = 0;
292         int           error = 0;
293         
294         spin_lock_irqsave(&peer->peer_lock, flags);
295
296         if (peer->peer_error != 0) {
297                 error = peer->peer_error;
298                 peer->peer_error = 0;
299                 
300                 last_alive = cfs_time_current_sec() - 
301                              cfs_duration_sec(cfs_time_current() - 
302                                               peer->peer_last_alive);
303         }
304         
305         spin_unlock_irqrestore(&peer->peer_lock, flags);
306
307         if (error != 0)
308                 lnet_notify (kptllnd_data.kptl_ni, peer->peer_id.nid, 0,
309                              last_alive);
310 }
311
312 void
313 kptllnd_handle_closing_peers ()
314 {
315         unsigned long           flags;
316         struct list_head        txs;
317         kptl_peer_t            *peer;
318         struct list_head       *tmp;
319         struct list_head       *nxt;
320         kptl_tx_t              *tx;
321         int                     idle;
322
323         /* Check with a read lock first to avoid blocking anyone */
324
325         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
326         idle = list_empty(&kptllnd_data.kptl_closing_peers) &&
327                list_empty(&kptllnd_data.kptl_zombie_peers);
328         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
329
330         if (idle)
331                 return;
332
333         INIT_LIST_HEAD(&txs);
334
335         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
336
337         /* Cancel txs on all zombie peers.  NB anyone dropping the last peer
338          * ref removes it from this list, so I musn't drop the lock while
339          * scanning it. */
340         list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
341                 peer = list_entry (tmp, kptl_peer_t, peer_list);
342
343                 LASSERT (peer->peer_state == PEER_STATE_ZOMBIE);
344
345                 kptllnd_peer_cancel_txs(peer, &txs);
346         }
347
348         /* Notify LNET and cancel txs on closing (i.e. newly closed) peers.  NB
349          * I'm the only one removing from this list, but peers can be added on
350          * the end any time I drop the lock. */
351
352         list_for_each_safe (tmp, nxt, &kptllnd_data.kptl_closing_peers) {
353                 peer = list_entry (tmp, kptl_peer_t, peer_list);
354
355                 LASSERT (peer->peer_state == PEER_STATE_CLOSING);
356
357                 list_del(&peer->peer_list);
358                 list_add_tail(&peer->peer_list,
359                               &kptllnd_data.kptl_zombie_peers);
360                 peer->peer_state = PEER_STATE_ZOMBIE;
361
362                 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
363
364                 kptllnd_peer_notify(peer);
365                 kptllnd_peer_cancel_txs(peer, &txs);
366                 kptllnd_peer_decref(peer);
367
368                 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
369         }
370
371         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
372
373         /* Drop peer's ref on all cancelled txs.  This will get
374          * kptllnd_tx_fini() to abort outstanding comms if necessary. */
375
376         list_for_each_safe (tmp, nxt, &txs) {
377                 tx = list_entry(tmp, kptl_tx_t, tx_list);
378                 list_del(&tx->tx_list);
379                 kptllnd_tx_decref(tx);
380         }
381 }
382
383 void
384 kptllnd_peer_close_locked(kptl_peer_t *peer, int why)
385 {
386         switch (peer->peer_state) {
387         default:
388                 LBUG();
389
390         case PEER_STATE_WAITING_HELLO:
391         case PEER_STATE_ACTIVE:
392                 /* Ensure new peers see a new incarnation of me */
393                 LASSERT(peer->peer_myincarnation <= kptllnd_data.kptl_incarnation);
394                 if (peer->peer_myincarnation == kptllnd_data.kptl_incarnation)
395                         kptllnd_data.kptl_incarnation++;
396
397                 /* Removing from peer table */
398                 kptllnd_data.kptl_n_active_peers--;
399                 LASSERT (kptllnd_data.kptl_n_active_peers >= 0);
400
401                 list_del(&peer->peer_list);
402                 kptllnd_peer_unreserve_buffers();
403
404                 peer->peer_error = why; /* stash 'why' only on first close */
405                 peer->peer_state = PEER_STATE_CLOSING;
406
407                 /* Schedule for immediate attention, taking peer table's ref */
408                 list_add_tail(&peer->peer_list, 
409                               &kptllnd_data.kptl_closing_peers);
410                 wake_up(&kptllnd_data.kptl_watchdog_waitq);
411                 break;
412
413         case PEER_STATE_ZOMBIE:
414         case PEER_STATE_CLOSING:
415                 break;
416         }
417 }
418
419 void
420 kptllnd_peer_close(kptl_peer_t *peer, int why)
421 {
422         unsigned long      flags;
423
424         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
425         kptllnd_peer_close_locked(peer, why);
426         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
427 }
428
429 int
430 kptllnd_peer_del(lnet_process_id_t id)
431 {
432         struct list_head  *ptmp;
433         struct list_head  *pnxt;
434         kptl_peer_t       *peer;
435         int                lo;
436         int                hi;
437         int                i;
438         unsigned long      flags;
439         int                rc = -ENOENT;
440
441         /*
442          * Find the single bucket we are supposed to look at or if nid is a
443          * wildcard (LNET_NID_ANY) then look at all of the buckets
444          */
445         if (id.nid != LNET_NID_ANY) {
446                 struct list_head *l = kptllnd_nid2peerlist(id.nid);
447                 
448                 lo = hi =  l - kptllnd_data.kptl_peers;
449         } else {
450                 if (id.pid != LNET_PID_ANY)
451                         return -EINVAL;
452                 
453                 lo = 0;
454                 hi = kptllnd_data.kptl_peer_hash_size - 1;
455         }
456
457 again:
458         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
459
460         for (i = lo; i <= hi; i++) {
461                 list_for_each_safe (ptmp, pnxt, &kptllnd_data.kptl_peers[i]) {
462                         peer = list_entry (ptmp, kptl_peer_t, peer_list);
463
464                         if (!(id.nid == LNET_NID_ANY || 
465                               (peer->peer_id.nid == id.nid &&
466                                (id.pid == LNET_PID_ANY || 
467                                 peer->peer_id.pid == id.pid))))
468                                 continue;
469
470                         kptllnd_peer_addref(peer); /* 1 ref for me... */
471
472                         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
473                                                flags);
474
475                         kptllnd_peer_close(peer, 0);
476                         kptllnd_peer_decref(peer); /* ...until here */
477
478                         rc = 0;         /* matched something */
479
480                         /* start again now I've dropped the lock */
481                         goto again;
482                 }
483         }
484
485         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
486
487         return (rc);
488 }
489
490 void
491 kptllnd_post_tx(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
492 {
493         /* CAVEAT EMPTOR: I take over caller's ref on 'tx' */
494         ptl_handle_md_t  msg_mdh;
495         ptl_md_t         md;
496         ptl_err_t        prc;
497         unsigned long    flags;
498
499         LASSERT (!tx->tx_idle);
500         LASSERT (!tx->tx_active);
501         LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
502         LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
503         LASSERT (tx->tx_type == TX_TYPE_SMALL_MESSAGE ||
504                  tx->tx_type == TX_TYPE_PUT_REQUEST ||
505                  tx->tx_type == TX_TYPE_GET_REQUEST);
506
507         kptllnd_set_tx_peer(tx, peer);
508
509         memset(&md, 0, sizeof(md));
510
511         md.threshold = tx->tx_acked ? 2 : 1;    /* SEND END + ACK? */
512         md.options = PTL_MD_OP_PUT |
513                      PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
514                      PTL_MD_EVENT_START_DISABLE;
515         md.user_ptr = &tx->tx_msg_eventarg;
516         md.eq_handle = kptllnd_data.kptl_eqh;
517
518         if (nfrag == 0) {
519                 md.start = tx->tx_msg;
520                 md.length = tx->tx_msg->ptlm_nob;
521         } else {
522                 LASSERT (nfrag > 1);
523                 LASSERT (tx->tx_frags->iov[0].iov_base == (void *)tx->tx_msg);
524
525                 md.start = tx->tx_frags;
526                 md.length = nfrag;
527                 md.options |= PTL_MD_IOVEC;
528         }
529
530         prc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &msg_mdh);
531         if (prc != PTL_OK) {
532                 CERROR("PtlMDBind(%s) failed: %s(%d)\n",
533                        libcfs_id2str(peer->peer_id),
534                        kptllnd_errtype2str(prc), prc);
535                 tx->tx_status = -EIO;
536                 kptllnd_tx_decref(tx);
537                 return;
538         }
539
540         spin_lock_irqsave(&peer->peer_lock, flags);
541
542         tx->tx_deadline = jiffies + (*kptllnd_tunables.kptl_timeout * HZ);
543         tx->tx_active = 1;
544         tx->tx_msg_mdh = msg_mdh;
545
546         /* Ensure HELLO is sent first */
547         if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_NOOP)
548                 list_add(&tx->tx_list, &peer->peer_noops);
549         else if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_HELLO)
550                 list_add(&tx->tx_list, &peer->peer_sendq);
551         else
552                 list_add_tail(&tx->tx_list, &peer->peer_sendq);
553
554         spin_unlock_irqrestore(&peer->peer_lock, flags);
555 }
556
557 static inline int
558 kptllnd_peer_send_noop (kptl_peer_t *peer)
559 {
560         if (!peer->peer_sent_hello ||
561             peer->peer_credits == 0 ||
562             !list_empty(&peer->peer_noops) ||
563             peer->peer_outstanding_credits < PTLLND_CREDIT_HIGHWATER)
564                 return 0;
565
566         /* No tx to piggyback NOOP onto or no credit to send a tx */
567         return (list_empty(&peer->peer_sendq) || peer->peer_credits == 1);
568 }
569
570 void
571 kptllnd_peer_check_sends (kptl_peer_t *peer)
572 {
573         ptl_handle_me_t  meh;
574         kptl_tx_t       *tx;
575         int              rc;
576         int              msg_type;
577         unsigned long    flags;
578
579         LASSERT(!in_interrupt());
580
581         spin_lock_irqsave(&peer->peer_lock, flags);
582
583         peer->peer_retry_noop = 0;
584
585         if (kptllnd_peer_send_noop(peer)) {
586                 /* post a NOOP to return credits */
587                 spin_unlock_irqrestore(&peer->peer_lock, flags);
588
589                 tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
590                 if (tx == NULL) {
591                         CERROR("Can't return credits to %s: can't allocate descriptor\n",
592                                libcfs_id2str(peer->peer_id));
593                 } else {
594                         kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_NOOP, 0);
595                         kptllnd_post_tx(peer, tx, 0);
596                 }
597
598                 spin_lock_irqsave(&peer->peer_lock, flags);
599                 peer->peer_retry_noop = (tx == NULL);
600         }
601
602         for (;;) {
603                 if (!list_empty(&peer->peer_noops)) {
604                         LASSERT (peer->peer_sent_hello);
605                         tx = list_entry(peer->peer_noops.next,
606                                         kptl_tx_t, tx_list);
607                 } else if (!list_empty(&peer->peer_sendq)) {
608                         tx = list_entry(peer->peer_sendq.next,
609                                         kptl_tx_t, tx_list);
610                 } else {
611                         /* nothing to send right now */
612                         break;
613                 }
614
615                 LASSERT (tx->tx_active);
616                 LASSERT (!PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
617                 LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
618
619                 LASSERT (peer->peer_outstanding_credits >= 0);
620                 LASSERT (peer->peer_sent_credits >= 0);
621                 LASSERT (peer->peer_sent_credits +
622                          peer->peer_outstanding_credits <=
623                          *kptllnd_tunables.kptl_peercredits);
624                 LASSERT (peer->peer_credits >= 0);
625
626                 msg_type = tx->tx_msg->ptlm_type;
627
628                 /* Ensure HELLO is sent first */
629                 if (!peer->peer_sent_hello) {
630                         LASSERT (list_empty(&peer->peer_noops));
631                         if (msg_type != PTLLND_MSG_TYPE_HELLO)
632                                 break;
633                         peer->peer_sent_hello = 1;
634                 }
635
636                 if (peer->peer_credits == 0) {
637                         CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: no credits for %s[%p]\n",
638                                libcfs_id2str(peer->peer_id), 
639                                peer->peer_credits,
640                                peer->peer_outstanding_credits, 
641                                peer->peer_sent_credits, 
642                                kptllnd_msgtype2str(msg_type), tx);
643                         break;
644                 }
645
646                 /* Last/Initial credit reserved for NOOP/HELLO */
647                 if (peer->peer_credits == 1 &&
648                     msg_type != PTLLND_MSG_TYPE_HELLO &&
649                     msg_type != PTLLND_MSG_TYPE_NOOP) {
650                         CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: "
651                                "not using last credit for %s[%p]\n",
652                                libcfs_id2str(peer->peer_id), 
653                                peer->peer_credits,
654                                peer->peer_outstanding_credits,
655                                peer->peer_sent_credits,
656                                kptllnd_msgtype2str(msg_type), tx);
657                         break;
658                 }
659
660                 list_del(&tx->tx_list);
661
662                 /* Discard any NOOP I queued if I'm not at the high-water mark
663                  * any more or more messages have been queued */
664                 if (msg_type == PTLLND_MSG_TYPE_NOOP &&
665                     !kptllnd_peer_send_noop(peer)) {
666                         tx->tx_active = 0;
667
668                         spin_unlock_irqrestore(&peer->peer_lock, flags);
669
670                         CDEBUG(D_NET, "%s: redundant noop\n", 
671                                libcfs_id2str(peer->peer_id));
672                         kptllnd_tx_decref(tx);
673
674                         spin_lock_irqsave(&peer->peer_lock, flags);
675                         continue;
676                 }
677
678                 /* fill last-minute msg fields */
679                 kptllnd_msg_pack(tx->tx_msg, peer);
680
681                 if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
682                     tx->tx_type == TX_TYPE_GET_REQUEST) {
683                         /* peer_next_matchbits must be known good */
684                         LASSERT (peer->peer_state >= PEER_STATE_ACTIVE);
685                         /* Assume 64-bit matchbits can't wrap */
686                         LASSERT (peer->peer_next_matchbits >= PTL_RESERVED_MATCHBITS);
687                         tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits =
688                                 peer->peer_next_matchbits++;
689                 }
690
691                 peer->peer_sent_credits += peer->peer_outstanding_credits;
692                 peer->peer_outstanding_credits = 0;
693                 peer->peer_credits--;
694
695                 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: %s tx=%p nob=%d cred=%d\n",
696                        libcfs_id2str(peer->peer_id), peer->peer_credits,
697                        peer->peer_outstanding_credits, peer->peer_sent_credits,
698                        kptllnd_msgtype2str(msg_type), tx, tx->tx_msg->ptlm_nob,
699                        tx->tx_msg->ptlm_credits);
700
701                 list_add_tail(&tx->tx_list, &peer->peer_activeq);
702
703                 kptllnd_tx_addref(tx);          /* 1 ref for me... */
704
705                 spin_unlock_irqrestore(&peer->peer_lock, flags);
706
707                 if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
708                     tx->tx_type == TX_TYPE_GET_REQUEST) {
709                         /* Post bulk now we have safe matchbits */
710                         rc = PtlMEAttach(kptllnd_data.kptl_nih,
711                                          *kptllnd_tunables.kptl_portal,
712                                          peer->peer_ptlid,
713                                          tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits,
714                                          0,             /* ignore bits */
715                                          PTL_UNLINK,
716                                          PTL_INS_BEFORE,
717                                          &meh);
718                         if (rc != PTL_OK) {
719                                 CERROR("PtlMEAttach(%s) failed: %s(%d)\n",
720                                        libcfs_id2str(peer->peer_id),
721                                        kptllnd_errtype2str(rc), rc);
722                                 goto failed;
723                         }
724
725                         rc = PtlMDAttach(meh, tx->tx_rdma_md, PTL_UNLINK,
726                                          &tx->tx_rdma_mdh);
727                         if (rc != PTL_OK) {
728                                 CERROR("PtlMDAttach(%s) failed: %s(%d)\n",
729                                        libcfs_id2str(tx->tx_peer->peer_id),
730                                        kptllnd_errtype2str(rc), rc);
731                                 rc = PtlMEUnlink(meh);
732                                 LASSERT(rc == PTL_OK);
733                                 tx->tx_rdma_mdh = PTL_INVALID_HANDLE;
734                                 goto failed;
735                         }
736                         /* I'm not racing with the event callback here.  It's a
737                          * bug if there's an event on the MD I just attached
738                          * before I actually send the RDMA request message -
739                          * probably matchbits re-used in error. */
740                 }
741
742                 tx->tx_tposted = jiffies;       /* going on the wire */
743
744                 rc = PtlPut (tx->tx_msg_mdh,
745                              tx->tx_acked ? PTL_ACK_REQ : PTL_NOACK_REQ,
746                              peer->peer_ptlid,
747                              *kptllnd_tunables.kptl_portal,
748                              0,                 /* acl cookie */
749                              LNET_MSG_MATCHBITS,
750                              0,                 /* offset */
751                              0);                /* header data */
752                 if (rc != PTL_OK) {
753                         CERROR("PtlPut %s error %s(%d)\n",
754                                libcfs_id2str(peer->peer_id),
755                                kptllnd_errtype2str(rc), rc);
756                         goto failed;
757                 }
758
759                 kptllnd_tx_decref(tx);          /* drop my ref */
760
761                 spin_lock_irqsave(&peer->peer_lock, flags);
762         }
763
764         spin_unlock_irqrestore(&peer->peer_lock, flags);
765         return;
766
767  failed:
768         /* Nuke everything (including tx we were trying) */
769         kptllnd_peer_close(peer, -EIO);
770         kptllnd_tx_decref(tx);
771         kptllnd_schedule_ptltrace_dump();
772 }
773
774 kptl_tx_t *
775 kptllnd_find_timed_out_tx(kptl_peer_t *peer)
776 {
777         kptl_tx_t         *tx;
778         struct list_head  *ele;
779
780         list_for_each(ele, &peer->peer_sendq) {
781                 tx = list_entry(ele, kptl_tx_t, tx_list);
782
783                 if (time_after_eq(jiffies, tx->tx_deadline)) {
784                         kptllnd_tx_addref(tx);
785                         return tx;
786                 }
787         }
788
789         list_for_each(ele, &peer->peer_activeq) {
790                 tx = list_entry(ele, kptl_tx_t, tx_list);
791
792                 if (time_after_eq(jiffies, tx->tx_deadline)) {
793                         kptllnd_tx_addref(tx);
794                         return tx;
795                 }
796         }
797
798         return NULL;
799 }
800
801
802 void
803 kptllnd_peer_check_bucket (int idx, int stamp)
804 {
805         struct list_head  *peers = &kptllnd_data.kptl_peers[idx];
806         struct list_head  *ptmp;
807         kptl_peer_t       *peer;
808         kptl_tx_t         *tx;
809         unsigned long      flags;
810         int                nsend;
811         int                nactive;
812         int                check_sends;
813
814         CDEBUG(D_NET, "Bucket=%d, stamp=%d\n", idx, stamp);
815
816  again:
817         /* NB. Shared lock while I just look */
818         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
819
820         list_for_each (ptmp, peers) {
821                 peer = list_entry (ptmp, kptl_peer_t, peer_list);
822
823                 CDEBUG(D_NET, "Peer=%s Credits=%d Outstanding=%d Send=%d\n",
824                        libcfs_id2str(peer->peer_id), peer->peer_credits, 
825                        peer->peer_outstanding_credits, peer->peer_sent_credits);
826
827                 spin_lock(&peer->peer_lock);
828
829                 if (peer->peer_check_stamp == stamp) {
830                         /* checked already this pass */
831                         spin_unlock(&peer->peer_lock);
832                         continue;
833                 }
834
835                 peer->peer_check_stamp = stamp;
836                 tx = kptllnd_find_timed_out_tx(peer);
837                 check_sends = peer->peer_retry_noop;
838                 
839                 spin_unlock(&peer->peer_lock);
840                 
841                 if (tx == NULL && !check_sends)
842                         continue;
843
844                 kptllnd_peer_addref(peer); /* 1 ref for me... */
845
846                 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
847
848                 if (tx == NULL) { /* nothing timed out */
849                         kptllnd_peer_check_sends(peer);
850                         kptllnd_peer_decref(peer); /* ...until here or... */
851
852                         /* rescan after dropping the lock */
853                         goto again;
854                 }
855
856                 spin_lock_irqsave(&peer->peer_lock, flags);
857                 nsend = kptllnd_count_queue(&peer->peer_sendq);
858                 nactive = kptllnd_count_queue(&peer->peer_activeq);
859                 spin_unlock_irqrestore(&peer->peer_lock, flags);
860
861                 LCONSOLE_ERROR_MSG(0x126, "Timing out %s: %s\n",
862                                    libcfs_id2str(peer->peer_id),
863                                    (tx->tx_tposted == 0) ? 
864                                    "no free peer buffers" : 
865                                    "please check Portals");
866
867                 if (tx->tx_tposted) {
868                         CERROR("Could not send to %s after %ds (sent %lds ago); "
869                                 "check Portals for possible issues\n",
870                                 libcfs_id2str(peer->peer_id),
871                                 *kptllnd_tunables.kptl_timeout,
872                                 cfs_duration_sec(jiffies - tx->tx_tposted));
873                 } else {
874                         CERROR("Could not get credits for %s after %ds; "
875                                 "possible Lustre networking issues\n",
876                         libcfs_id2str(peer->peer_id),
877                         *kptllnd_tunables.kptl_timeout);
878                 }
879
880                 CERROR("%s timed out: cred %d outstanding %d, sent %d, "
881                        "sendq %d, activeq %d Tx %p %s (%s%s%s) status %d "
882                        "%sposted %lu T/O %ds\n",
883                        libcfs_id2str(peer->peer_id), peer->peer_credits,
884                        peer->peer_outstanding_credits, peer->peer_sent_credits,
885                        nsend, nactive, tx, kptllnd_tx_typestr(tx->tx_type),
886                        tx->tx_active ? "A" : "",
887                        PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ?
888                        "" : "M",
889                        PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE) ?
890                        "" : "D",
891                        tx->tx_status,
892                        (tx->tx_tposted == 0) ? "not " : "",
893                        (tx->tx_tposted == 0) ? 0UL : (jiffies - tx->tx_tposted),
894                        *kptllnd_tunables.kptl_timeout);
895
896 #ifdef CRAY_XT3
897                 if (*kptllnd_tunables.kptl_ptltrace_on_timeout)
898                         kptllnd_dump_ptltrace();
899 #endif
900
901                 kptllnd_tx_decref(tx);
902
903                 kptllnd_peer_close(peer, -ETIMEDOUT);
904                 kptllnd_peer_decref(peer); /* ...until here */
905
906                 /* start again now I've dropped the lock */
907                 goto again;
908         }
909
910         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
911 }
912
913 kptl_peer_t *
914 kptllnd_id2peer_locked (lnet_process_id_t id)
915 {
916         struct list_head *peers = kptllnd_nid2peerlist(id.nid);
917         struct list_head *tmp;
918         kptl_peer_t      *peer;
919
920         list_for_each (tmp, peers) {
921
922                 peer = list_entry (tmp, kptl_peer_t, peer_list);
923
924                 LASSERT(peer->peer_state == PEER_STATE_WAITING_HELLO ||
925                         peer->peer_state == PEER_STATE_ACTIVE);
926                 
927                 if (peer->peer_id.nid != id.nid ||
928                     peer->peer_id.pid != id.pid)
929                         continue;
930
931                 kptllnd_peer_addref(peer);
932
933                 CDEBUG(D_NET, "%s -> %s (%d)\n",
934                        libcfs_id2str(id), 
935                        kptllnd_ptlid2str(peer->peer_ptlid),
936                        atomic_read (&peer->peer_refcount));
937                 return peer;
938         }
939
940         return NULL;
941 }
942
943 void
944 kptllnd_peertable_overflow_msg(char *str, lnet_process_id_t id)
945 {
946         LCONSOLE_ERROR_MSG(0x127, "%s %s overflows the peer table[%d]: "
947                            "messages may be dropped\n",
948                            str, libcfs_id2str(id),
949                            kptllnd_data.kptl_n_active_peers);
950         LCONSOLE_ERROR_MSG(0x128, "Please correct by increasing "
951                            "'max_nodes' or 'max_procs_per_node'\n");
952 }
953
954 __u64
955 kptllnd_get_last_seen_matchbits_locked(lnet_process_id_t lpid)
956 {
957         kptl_peer_t            *peer;
958         struct list_head       *tmp;
959
960         /* Find the last matchbits I saw this new peer using.  Note..
961            A. This peer cannot be in the peer table - she's new!
962            B. If I can't find the peer in the closing/zombie peers, all
963               matchbits are safe because all refs to the (old) peer have gone
964               so all txs have completed so there's no risk of matchbit
965               collision!
966          */
967
968         LASSERT(kptllnd_id2peer_locked(lpid) == NULL);
969
970         /* peer's last matchbits can't change after it comes out of the peer
971          * table, so first match is fine */
972
973         list_for_each (tmp, &kptllnd_data.kptl_closing_peers) {
974                 peer = list_entry (tmp, kptl_peer_t, peer_list);
975
976                 if (peer->peer_id.nid == lpid.nid &&
977                     peer->peer_id.pid == lpid.pid)
978                         return peer->peer_last_matchbits_seen;
979         }
980         
981         list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
982                 peer = list_entry (tmp, kptl_peer_t, peer_list);
983
984                 if (peer->peer_id.nid == lpid.nid &&
985                     peer->peer_id.pid == lpid.pid)
986                         return peer->peer_last_matchbits_seen;
987         }
988         
989         return PTL_RESERVED_MATCHBITS;
990 }
991
992 kptl_peer_t *
993 kptllnd_peer_handle_hello (ptl_process_id_t  initiator,
994                            kptl_msg_t       *msg)
995 {
996         rwlock_t           *g_lock = &kptllnd_data.kptl_peer_rw_lock;
997         kptl_peer_t        *peer;
998         kptl_peer_t        *new_peer;
999         lnet_process_id_t   lpid;
1000         unsigned long       flags;
1001         kptl_tx_t          *hello_tx;
1002         int                 rc;
1003         __u64               safe_matchbits;
1004         __u64               last_matchbits_seen;
1005
1006         lpid.nid = msg->ptlm_srcnid;
1007         lpid.pid = msg->ptlm_srcpid;
1008
1009         CDEBUG(D_NET, "hello from %s(%s)\n",
1010                libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
1011
1012         if (initiator.pid != kptllnd_data.kptl_portals_id.pid &&
1013             (msg->ptlm_srcpid & LNET_PID_USERFLAG) == 0) {
1014                 /* If the peer's PID isn't _the_ ptllnd kernel pid, she must be
1015                  * userspace.  Refuse the connection if she hasn't set the
1016                  * correct flag in her PID... */
1017                 CERROR("Userflag not set in hello from %s (%s)\n",
1018                        libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
1019                 return NULL;
1020         }
1021         
1022         /* kptlhm_matchbits are the highest matchbits my peer may have used to
1023          * RDMA to me.  I ensure I never register buffers for RDMA that could
1024          * match any she used */
1025         safe_matchbits = msg->ptlm_u.hello.kptlhm_matchbits + 1;
1026
1027         if (safe_matchbits < PTL_RESERVED_MATCHBITS) {
1028                 CERROR("Illegal matchbits "LPX64" in HELLO from %s\n",
1029                        safe_matchbits, libcfs_id2str(lpid));
1030                 return NULL;
1031         }
1032         
1033         if (msg->ptlm_u.hello.kptlhm_max_msg_size < PTLLND_MIN_BUFFER_SIZE) {
1034                 CERROR("%s: max message size %d < MIN %d",
1035                        libcfs_id2str(lpid),
1036                        msg->ptlm_u.hello.kptlhm_max_msg_size,
1037                        PTLLND_MIN_BUFFER_SIZE);
1038                 return NULL;
1039         }
1040
1041         if (msg->ptlm_credits <= 1) {
1042                 CERROR("Need more than 1+%d credits from %s\n",
1043                        msg->ptlm_credits, libcfs_id2str(lpid));
1044                 return NULL;
1045         }
1046         
1047         write_lock_irqsave(g_lock, flags);
1048
1049         peer = kptllnd_id2peer_locked(lpid);
1050         if (peer != NULL) {
1051                 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
1052                         /* Completing HELLO handshake */
1053                         LASSERT(peer->peer_incarnation == 0);
1054
1055                         if (msg->ptlm_dststamp != 0 &&
1056                             msg->ptlm_dststamp != peer->peer_myincarnation) {
1057                                 write_unlock_irqrestore(g_lock, flags);
1058
1059                                 CERROR("Ignoring HELLO from %s: unexpected "
1060                                        "dststamp "LPX64" ("LPX64" wanted)\n",
1061                                        libcfs_id2str(lpid),
1062                                        msg->ptlm_dststamp,
1063                                        peer->peer_myincarnation);
1064                                 kptllnd_peer_decref(peer);
1065                                 return NULL;
1066                         }
1067                         
1068                         /* Concurrent initiation or response to my HELLO */
1069                         peer->peer_state = PEER_STATE_ACTIVE;
1070                         peer->peer_incarnation = msg->ptlm_srcstamp;
1071                         peer->peer_next_matchbits = safe_matchbits;
1072                         peer->peer_max_msg_size =
1073                                 msg->ptlm_u.hello.kptlhm_max_msg_size;
1074                         
1075                         write_unlock_irqrestore(g_lock, flags);
1076                         return peer;
1077                 }
1078
1079                 if (msg->ptlm_dststamp != 0 &&
1080                     msg->ptlm_dststamp <= peer->peer_myincarnation) {
1081                         write_unlock_irqrestore(g_lock, flags);
1082
1083                         CERROR("Ignoring stale HELLO from %s: "
1084                                "dststamp "LPX64" (current "LPX64")\n",
1085                                libcfs_id2str(lpid),
1086                                msg->ptlm_dststamp,
1087                                peer->peer_myincarnation);
1088                         kptllnd_peer_decref(peer);
1089                         return NULL;
1090                 }
1091
1092                 /* Brand new connection attempt: remove old incarnation */
1093                 kptllnd_peer_close_locked(peer, 0);
1094         }
1095
1096         kptllnd_cull_peertable_locked(lpid);
1097
1098         write_unlock_irqrestore(g_lock, flags);
1099
1100         if (peer != NULL) {
1101                 CDEBUG(D_NET, "Peer %s (%s) reconnecting:"
1102                        " stamp "LPX64"("LPX64")\n",
1103                        libcfs_id2str(lpid), kptllnd_ptlid2str(initiator),
1104                        msg->ptlm_srcstamp, peer->peer_incarnation);
1105
1106                 kptllnd_peer_decref(peer);
1107         }
1108
1109         hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1110         if (hello_tx == NULL) {
1111                 CERROR("Unable to allocate HELLO message for %s\n",
1112                        libcfs_id2str(lpid));
1113                 return NULL;
1114         }
1115
1116         kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1117                          sizeof(kptl_hello_msg_t));
1118
1119         new_peer = kptllnd_peer_allocate(lpid, initiator);
1120         if (new_peer == NULL) {
1121                 kptllnd_tx_decref(hello_tx);
1122                 return NULL;
1123         }
1124
1125         rc = kptllnd_peer_reserve_buffers();
1126         if (rc != 0) {
1127                 kptllnd_peer_decref(new_peer);
1128                 kptllnd_tx_decref(hello_tx);
1129
1130                 CERROR("Failed to reserve buffers for %s\n",
1131                        libcfs_id2str(lpid));
1132                 return NULL;
1133         }
1134
1135         write_lock_irqsave(g_lock, flags);
1136
1137  again:
1138         if (kptllnd_data.kptl_shutdown) {
1139                 write_unlock_irqrestore(g_lock, flags);
1140
1141                 CERROR ("Shutdown started, refusing connection from %s\n",
1142                         libcfs_id2str(lpid));
1143                 kptllnd_peer_unreserve_buffers();
1144                 kptllnd_peer_decref(new_peer);
1145                 kptllnd_tx_decref(hello_tx);
1146                 return NULL;
1147         }
1148
1149         peer = kptllnd_id2peer_locked(lpid);
1150         if (peer != NULL) {
1151                 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
1152                         /* An outgoing message instantiated 'peer' for me */
1153                         LASSERT(peer->peer_incarnation == 0);
1154
1155                         peer->peer_state = PEER_STATE_ACTIVE;
1156                         peer->peer_incarnation = msg->ptlm_srcstamp;
1157                         peer->peer_next_matchbits = safe_matchbits;
1158                         peer->peer_max_msg_size =
1159                                 msg->ptlm_u.hello.kptlhm_max_msg_size;
1160
1161                         write_unlock_irqrestore(g_lock, flags);
1162
1163                         CWARN("Outgoing instantiated peer %s\n",
1164                               libcfs_id2str(lpid));
1165                 } else {
1166                         LASSERT (peer->peer_state == PEER_STATE_ACTIVE);
1167
1168                         write_unlock_irqrestore(g_lock, flags);
1169
1170                         /* WOW!  Somehow this peer completed the HELLO
1171                          * handshake while I slept.  I guess I could have slept
1172                          * while it rebooted and sent a new HELLO, so I'll fail
1173                          * this one... */
1174                         CWARN("Wow! peer %s\n", libcfs_id2str(lpid));
1175                         kptllnd_peer_decref(peer);
1176                         peer = NULL;
1177                 }
1178
1179                 kptllnd_peer_unreserve_buffers();
1180                 kptllnd_peer_decref(new_peer);
1181                 kptllnd_tx_decref(hello_tx);
1182                 return peer;
1183         }
1184
1185         if (kptllnd_data.kptl_n_active_peers ==
1186             kptllnd_data.kptl_expected_peers) {
1187                 /* peer table full */
1188                 write_unlock_irqrestore(g_lock, flags);
1189
1190                 kptllnd_peertable_overflow_msg("Connection from ", lpid);
1191
1192                 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1193                 if (rc != 0) {
1194                         CERROR("Refusing connection from %s\n",
1195                                libcfs_id2str(lpid));
1196                         kptllnd_peer_unreserve_buffers();
1197                         kptllnd_peer_decref(new_peer);
1198                         kptllnd_tx_decref(hello_tx);
1199                         return NULL;
1200                 }
1201                 
1202                 write_lock_irqsave(g_lock, flags);
1203                 kptllnd_data.kptl_expected_peers++;
1204                 goto again;
1205         }
1206
1207         last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(lpid);
1208
1209         hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1210         hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1211                 *kptllnd_tunables.kptl_max_msg_size;
1212
1213         new_peer->peer_state = PEER_STATE_ACTIVE;
1214         new_peer->peer_incarnation = msg->ptlm_srcstamp;
1215         new_peer->peer_next_matchbits = safe_matchbits;
1216         new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1217         new_peer->peer_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size;
1218
1219         kptllnd_peer_add_peertable_locked(new_peer);
1220
1221         write_unlock_irqrestore(g_lock, flags);
1222
1223         /* NB someone else could get in now and post a message before I post
1224          * the HELLO, but post_tx/check_sends take care of that! */
1225
1226         CDEBUG(D_NETTRACE, "%s: post response hello %p\n",
1227                libcfs_id2str(new_peer->peer_id), hello_tx);
1228
1229         kptllnd_post_tx(new_peer, hello_tx, 0);
1230         kptllnd_peer_check_sends(new_peer);
1231
1232         return new_peer;
1233 }
1234
1235 void
1236 kptllnd_tx_launch(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
1237 {
1238         kptllnd_post_tx(peer, tx, nfrag);
1239         kptllnd_peer_check_sends(peer);
1240 }
1241
1242 int
1243 kptllnd_find_target(kptl_peer_t **peerp, lnet_process_id_t target)
1244 {
1245         rwlock_t         *g_lock = &kptllnd_data.kptl_peer_rw_lock;
1246         ptl_process_id_t  ptl_id;
1247         kptl_peer_t      *new_peer;
1248         kptl_tx_t        *hello_tx;
1249         unsigned long     flags;
1250         int               rc;
1251         __u64             last_matchbits_seen;
1252
1253         /* I expect to find the peer, so I only take a read lock... */
1254         read_lock_irqsave(g_lock, flags);
1255         *peerp = kptllnd_id2peer_locked(target);
1256         read_unlock_irqrestore(g_lock, flags);
1257
1258         if (*peerp != NULL)
1259                 return 0;
1260         
1261         if ((target.pid & LNET_PID_USERFLAG) != 0) {
1262                 CWARN("Refusing to create a new connection to %s "
1263                       "(non-kernel peer)\n", libcfs_id2str(target));
1264                 return -EHOSTUNREACH;
1265         }
1266
1267         /* The new peer is a kernel ptllnd, and kernel ptllnds all have
1268          * the same portals PID */
1269         ptl_id.nid = kptllnd_lnet2ptlnid(target.nid);
1270         ptl_id.pid = kptllnd_data.kptl_portals_id.pid;
1271
1272         hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1273         if (hello_tx == NULL) {
1274                 CERROR("Unable to allocate connect message for %s\n",
1275                        libcfs_id2str(target));
1276                 return -ENOMEM;
1277         }
1278
1279         kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1280                          sizeof(kptl_hello_msg_t));
1281
1282         new_peer = kptllnd_peer_allocate(target, ptl_id);
1283         if (new_peer == NULL) {
1284                 rc = -ENOMEM;
1285                 goto unwind_0;
1286         }
1287
1288         rc = kptllnd_peer_reserve_buffers();
1289         if (rc != 0)
1290                 goto unwind_1;
1291
1292         write_lock_irqsave(g_lock, flags);
1293  again:
1294         if (kptllnd_data.kptl_shutdown) {
1295                 write_unlock_irqrestore(g_lock, flags);
1296                 rc = -ESHUTDOWN;
1297                 goto unwind_2;
1298         }
1299
1300         *peerp = kptllnd_id2peer_locked(target);
1301         if (*peerp != NULL) {
1302                 write_unlock_irqrestore(g_lock, flags);
1303                 goto unwind_2;
1304         }
1305
1306         kptllnd_cull_peertable_locked(target);
1307
1308         if (kptllnd_data.kptl_n_active_peers ==
1309             kptllnd_data.kptl_expected_peers) {
1310                 /* peer table full */
1311                 write_unlock_irqrestore(g_lock, flags);
1312
1313                 kptllnd_peertable_overflow_msg("Connection to ", target);
1314
1315                 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1316                 if (rc != 0) {
1317                         CERROR("Can't create connection to %s\n",
1318                                libcfs_id2str(target));
1319                         rc = -ENOMEM;
1320                         goto unwind_2;
1321                 }
1322                 write_lock_irqsave(g_lock, flags);
1323                 kptllnd_data.kptl_expected_peers++;
1324                 goto again;
1325         }
1326
1327         last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(target);
1328
1329         hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1330         hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1331                 *kptllnd_tunables.kptl_max_msg_size;
1332                 
1333         new_peer->peer_state = PEER_STATE_WAITING_HELLO;
1334         new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1335         
1336         kptllnd_peer_add_peertable_locked(new_peer);
1337
1338         write_unlock_irqrestore(g_lock, flags);
1339
1340         /* NB someone else could get in now and post a message before I post
1341          * the HELLO, but post_tx/check_sends take care of that! */
1342
1343         CDEBUG(D_NETTRACE, "%s: post initial hello %p\n",
1344                libcfs_id2str(new_peer->peer_id), hello_tx);
1345
1346         kptllnd_post_tx(new_peer, hello_tx, 0);
1347         kptllnd_peer_check_sends(new_peer);
1348        
1349         *peerp = new_peer;
1350         return 0;
1351         
1352  unwind_2:
1353         kptllnd_peer_unreserve_buffers();
1354  unwind_1:
1355         kptllnd_peer_decref(new_peer);
1356  unwind_0:
1357         kptllnd_tx_decref(hello_tx);
1358
1359         return rc;
1360 }