Whamcloud - gitweb
641a73c66bf0121bd5188a217bd9ee2592257012
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd_peer.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/klnds/ptllnd/ptllnd_peer.c
37  *
38  * Author: PJ Kirner <pjkirner@clusterfs.com>
39  * Author: E Barton <eeb@bartonsoftware.com>
40  */
41
42 #include "ptllnd.h"
43 #include <libcfs/list.h>
44
45 static int
46 kptllnd_count_queue(struct list_head *q)
47 {
48         struct list_head *e;
49         int               n = 0;
50         
51         list_for_each(e, q) {
52                 n++;
53         }
54
55         return n;
56 }
57
58 int
59 kptllnd_get_peer_info(int index, 
60                       lnet_process_id_t *id,
61                       int *state, int *sent_hello,
62                       int *refcount, __u64 *incarnation,
63                       __u64 *next_matchbits, __u64 *last_matchbits_seen,
64                       int *nsendq, int *nactiveq,
65                       int *credits, int *outstanding_credits) 
66 {
67         rwlock_t         *g_lock = &kptllnd_data.kptl_peer_rw_lock;
68         unsigned long     flags;
69         struct list_head *ptmp;
70         kptl_peer_t      *peer;
71         int               i;
72         int               rc = -ENOENT;
73
74         read_lock_irqsave(g_lock, flags);
75
76         for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++) {
77                 
78                 list_for_each (ptmp, &kptllnd_data.kptl_peers[i]) {
79                         peer = list_entry(ptmp, kptl_peer_t, peer_list);
80
81                         if (index-- > 0)
82                                 continue;
83                         
84                         *id          = peer->peer_id;
85                         *state       = peer->peer_state;
86                         *sent_hello  = peer->peer_sent_hello;
87                         *refcount    = atomic_read(&peer->peer_refcount);
88                         *incarnation = peer->peer_incarnation;
89
90                         spin_lock(&peer->peer_lock);
91
92                         *next_matchbits      = peer->peer_next_matchbits;
93                         *last_matchbits_seen = peer->peer_last_matchbits_seen;
94                         *credits             = peer->peer_credits;
95                         *outstanding_credits = peer->peer_outstanding_credits;
96
97                         *nsendq   = kptllnd_count_queue(&peer->peer_sendq);
98                         *nactiveq = kptllnd_count_queue(&peer->peer_activeq);
99
100                         spin_unlock(&peer->peer_lock);
101
102                         rc = 0;
103                         goto out;
104                 }
105         }
106         
107  out:
108         read_unlock_irqrestore(g_lock, flags);
109         return rc;
110 }
111
112 void
113 kptllnd_peer_add_peertable_locked (kptl_peer_t *peer)
114 {
115         LASSERT (!kptllnd_data.kptl_shutdown);
116         LASSERT (kptllnd_data.kptl_n_active_peers <
117                  kptllnd_data.kptl_expected_peers);
118
119         LASSERT (peer->peer_state == PEER_STATE_WAITING_HELLO ||
120                  peer->peer_state == PEER_STATE_ACTIVE);
121         
122         kptllnd_data.kptl_n_active_peers++;
123         atomic_inc(&peer->peer_refcount);       /* +1 ref for the list */
124
125         /* NB add to HEAD of peer list for MRU order!
126          * (see kptllnd_cull_peertable) */
127         list_add(&peer->peer_list, kptllnd_nid2peerlist(peer->peer_id.nid));
128 }
129
130 void
131 kptllnd_cull_peertable_locked (lnet_process_id_t pid)
132 {
133         /* I'm about to add a new peer with this portals ID to the peer table,
134          * so (a) this peer should not exist already and (b) I want to leave at
135          * most (max_procs_per_nid - 1) peers with this NID in the table. */
136         struct list_head  *peers = kptllnd_nid2peerlist(pid.nid);
137         int                cull_count = *kptllnd_tunables.kptl_max_procs_per_node;
138         int                count;
139         struct list_head  *tmp;
140         struct list_head  *nxt;
141         kptl_peer_t       *peer;
142         
143         count = 0;
144         list_for_each_safe (tmp, nxt, peers) {
145                 /* NB I rely on kptllnd_peer_add_peertable_locked to add peers
146                  * in MRU order */
147                 peer = list_entry(tmp, kptl_peer_t, peer_list);
148                         
149                 if (peer->peer_id.nid != pid.nid)
150                         continue;
151
152                 LASSERT (peer->peer_id.pid != pid.pid);
153                         
154                 count++;
155
156                 if (count < cull_count) /* recent (don't cull) */
157                         continue;
158
159                 CDEBUG(D_NET, "Cull %s(%s)\n",
160                        libcfs_id2str(peer->peer_id),
161                        kptllnd_ptlid2str(peer->peer_ptlid));
162                 
163                 kptllnd_peer_close_locked(peer, 0);
164         }
165 }
166
167 kptl_peer_t *
168 kptllnd_peer_allocate (lnet_process_id_t lpid, ptl_process_id_t ppid)
169 {
170         unsigned long    flags;
171         kptl_peer_t     *peer;
172
173         LIBCFS_ALLOC(peer, sizeof (*peer));
174         if (peer == NULL) {
175                 CERROR("Can't create peer %s (%s)\n",
176                        libcfs_id2str(lpid), 
177                        kptllnd_ptlid2str(ppid));
178                 return NULL;
179         }
180
181         memset(peer, 0, sizeof(*peer));         /* zero flags etc */
182
183         INIT_LIST_HEAD (&peer->peer_noops);
184         INIT_LIST_HEAD (&peer->peer_sendq);
185         INIT_LIST_HEAD (&peer->peer_activeq);
186         spin_lock_init (&peer->peer_lock);
187
188         peer->peer_state = PEER_STATE_ALLOCATED;
189         peer->peer_error = 0;
190         peer->peer_last_alive = 0;
191         peer->peer_id = lpid;
192         peer->peer_ptlid = ppid;
193         peer->peer_credits = 1;                 /* enough for HELLO */
194         peer->peer_next_matchbits = PTL_RESERVED_MATCHBITS;
195         peer->peer_outstanding_credits = *kptllnd_tunables.kptl_peertxcredits - 1;
196         peer->peer_sent_credits = 1;           /* HELLO credit is implicit */
197         peer->peer_max_msg_size = PTLLND_MIN_BUFFER_SIZE; /* until we know better */
198
199         atomic_set(&peer->peer_refcount, 1);    /* 1 ref for caller */
200
201         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
202
203         peer->peer_myincarnation = kptllnd_data.kptl_incarnation;
204
205         /* Only increase # peers under lock, to guarantee we dont grow it
206          * during shutdown */
207         if (kptllnd_data.kptl_shutdown) {
208                 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, 
209                                         flags);
210                 LIBCFS_FREE(peer, sizeof(*peer));
211                 return NULL;
212         }
213
214         kptllnd_data.kptl_npeers++;
215         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
216         
217         return peer;
218 }
219
220 void
221 kptllnd_peer_destroy (kptl_peer_t *peer)
222 {
223         unsigned long flags;
224         
225         CDEBUG(D_NET, "Peer=%p\n", peer);
226
227         LASSERT (!in_interrupt());
228         LASSERT (atomic_read(&peer->peer_refcount) == 0);
229         LASSERT (peer->peer_state == PEER_STATE_ALLOCATED ||
230                  peer->peer_state == PEER_STATE_ZOMBIE);
231         LASSERT (list_empty(&peer->peer_noops));
232         LASSERT (list_empty(&peer->peer_sendq));
233         LASSERT (list_empty(&peer->peer_activeq));
234
235         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
236
237         if (peer->peer_state == PEER_STATE_ZOMBIE)
238                 list_del(&peer->peer_list);
239
240         kptllnd_data.kptl_npeers--;
241
242         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
243
244         LIBCFS_FREE (peer, sizeof (*peer));
245 }
246
247 void
248 kptllnd_cancel_txlist (struct list_head *peerq, struct list_head *txs)
249 {
250         struct list_head  *tmp;
251         struct list_head  *nxt;
252         kptl_tx_t         *tx;
253
254         list_for_each_safe (tmp, nxt, peerq) {
255                 tx = list_entry(tmp, kptl_tx_t, tx_list);
256
257                 list_del(&tx->tx_list);
258                 list_add_tail(&tx->tx_list, txs);
259
260                 tx->tx_status = -EIO;
261                 tx->tx_active = 0;
262         }
263 }
264
265 void
266 kptllnd_peer_cancel_txs(kptl_peer_t *peer, struct list_head *txs)
267 {
268         unsigned long   flags;
269
270         spin_lock_irqsave(&peer->peer_lock, flags);
271
272         kptllnd_cancel_txlist(&peer->peer_noops, txs);
273         kptllnd_cancel_txlist(&peer->peer_sendq, txs);
274         kptllnd_cancel_txlist(&peer->peer_activeq, txs);
275                 
276         spin_unlock_irqrestore(&peer->peer_lock, flags);
277 }
278
279 void
280 kptllnd_peer_alive (kptl_peer_t *peer)
281 {
282         /* This is racy, but everyone's only writing cfs_time_current() */
283         peer->peer_last_alive = cfs_time_current();
284         mb();
285 }
286
287 void
288 kptllnd_peer_notify (kptl_peer_t *peer)
289 {
290         unsigned long flags;
291         time_t        last_alive = 0;
292         int           error = 0;
293         
294         spin_lock_irqsave(&peer->peer_lock, flags);
295
296         if (peer->peer_error != 0) {
297                 error = peer->peer_error;
298                 peer->peer_error = 0;
299                 
300                 last_alive = cfs_time_current_sec() - 
301                              cfs_duration_sec(cfs_time_current() - 
302                                               peer->peer_last_alive);
303         }
304         
305         spin_unlock_irqrestore(&peer->peer_lock, flags);
306
307         if (error != 0)
308                 lnet_notify (kptllnd_data.kptl_ni, peer->peer_id.nid, 0,
309                              last_alive);
310 }
311
312 void
313 kptllnd_handle_closing_peers ()
314 {
315         unsigned long           flags;
316         struct list_head        txs;
317         kptl_peer_t            *peer;
318         struct list_head       *tmp;
319         struct list_head       *nxt;
320         kptl_tx_t              *tx;
321         int                     idle;
322
323         /* Check with a read lock first to avoid blocking anyone */
324
325         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
326         idle = list_empty(&kptllnd_data.kptl_closing_peers) &&
327                list_empty(&kptllnd_data.kptl_zombie_peers);
328         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
329
330         if (idle)
331                 return;
332
333         INIT_LIST_HEAD(&txs);
334
335         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
336
337         /* Cancel txs on all zombie peers.  NB anyone dropping the last peer
338          * ref removes it from this list, so I musn't drop the lock while
339          * scanning it. */
340         list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
341                 peer = list_entry (tmp, kptl_peer_t, peer_list);
342
343                 LASSERT (peer->peer_state == PEER_STATE_ZOMBIE);
344
345                 kptllnd_peer_cancel_txs(peer, &txs);
346         }
347
348         /* Notify LNET and cancel txs on closing (i.e. newly closed) peers.  NB
349          * I'm the only one removing from this list, but peers can be added on
350          * the end any time I drop the lock. */
351
352         list_for_each_safe (tmp, nxt, &kptllnd_data.kptl_closing_peers) {
353                 peer = list_entry (tmp, kptl_peer_t, peer_list);
354
355                 LASSERT (peer->peer_state == PEER_STATE_CLOSING);
356
357                 list_del(&peer->peer_list);
358                 list_add_tail(&peer->peer_list,
359                               &kptllnd_data.kptl_zombie_peers);
360                 peer->peer_state = PEER_STATE_ZOMBIE;
361
362                 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
363
364                 kptllnd_peer_notify(peer);
365                 kptllnd_peer_cancel_txs(peer, &txs);
366                 kptllnd_peer_decref(peer);
367
368                 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
369         }
370
371         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
372
373         /* Drop peer's ref on all cancelled txs.  This will get
374          * kptllnd_tx_fini() to abort outstanding comms if necessary. */
375
376         list_for_each_safe (tmp, nxt, &txs) {
377                 tx = list_entry(tmp, kptl_tx_t, tx_list);
378                 list_del(&tx->tx_list);
379                 kptllnd_tx_decref(tx);
380         }
381 }
382
383 void
384 kptllnd_peer_close_locked(kptl_peer_t *peer, int why)
385 {
386         switch (peer->peer_state) {
387         default:
388                 LBUG();
389
390         case PEER_STATE_WAITING_HELLO:
391         case PEER_STATE_ACTIVE:
392                 /* Ensure new peers see a new incarnation of me */
393                 LASSERT(peer->peer_myincarnation <= kptllnd_data.kptl_incarnation);
394                 if (peer->peer_myincarnation == kptllnd_data.kptl_incarnation)
395                         kptllnd_data.kptl_incarnation++;
396
397                 /* Removing from peer table */
398                 kptllnd_data.kptl_n_active_peers--;
399                 LASSERT (kptllnd_data.kptl_n_active_peers >= 0);
400
401                 list_del(&peer->peer_list);
402                 kptllnd_peer_unreserve_buffers();
403
404                 peer->peer_error = why; /* stash 'why' only on first close */
405                 peer->peer_state = PEER_STATE_CLOSING;
406
407                 /* Schedule for immediate attention, taking peer table's ref */
408                 list_add_tail(&peer->peer_list, 
409                               &kptllnd_data.kptl_closing_peers);
410                 wake_up(&kptllnd_data.kptl_watchdog_waitq);
411                 break;
412
413         case PEER_STATE_ZOMBIE:
414         case PEER_STATE_CLOSING:
415                 break;
416         }
417 }
418
419 void
420 kptllnd_peer_close(kptl_peer_t *peer, int why)
421 {
422         unsigned long      flags;
423
424         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
425         kptllnd_peer_close_locked(peer, why);
426         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
427 }
428
429 int
430 kptllnd_peer_del(lnet_process_id_t id)
431 {
432         struct list_head  *ptmp;
433         struct list_head  *pnxt;
434         kptl_peer_t       *peer;
435         int                lo;
436         int                hi;
437         int                i;
438         unsigned long      flags;
439         int                rc = -ENOENT;
440
441         /*
442          * Find the single bucket we are supposed to look at or if nid is a
443          * wildcard (LNET_NID_ANY) then look at all of the buckets
444          */
445         if (id.nid != LNET_NID_ANY) {
446                 struct list_head *l = kptllnd_nid2peerlist(id.nid);
447                 
448                 lo = hi =  l - kptllnd_data.kptl_peers;
449         } else {
450                 if (id.pid != LNET_PID_ANY)
451                         return -EINVAL;
452                 
453                 lo = 0;
454                 hi = kptllnd_data.kptl_peer_hash_size - 1;
455         }
456
457 again:
458         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
459
460         for (i = lo; i <= hi; i++) {
461                 list_for_each_safe (ptmp, pnxt, &kptllnd_data.kptl_peers[i]) {
462                         peer = list_entry (ptmp, kptl_peer_t, peer_list);
463
464                         if (!(id.nid == LNET_NID_ANY || 
465                               (peer->peer_id.nid == id.nid &&
466                                (id.pid == LNET_PID_ANY || 
467                                 peer->peer_id.pid == id.pid))))
468                                 continue;
469
470                         kptllnd_peer_addref(peer); /* 1 ref for me... */
471
472                         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
473                                                flags);
474
475                         kptllnd_peer_close(peer, 0);
476                         kptllnd_peer_decref(peer); /* ...until here */
477
478                         rc = 0;         /* matched something */
479
480                         /* start again now I've dropped the lock */
481                         goto again;
482                 }
483         }
484
485         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
486
487         return (rc);
488 }
489
490 void
491 kptllnd_queue_tx(kptl_peer_t *peer, kptl_tx_t *tx)
492 {
493         /* CAVEAT EMPTOR: I take over caller's ref on 'tx' */
494         unsigned long flags;
495
496         spin_lock_irqsave(&peer->peer_lock, flags);
497
498         /* Ensure HELLO is sent first */
499         if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_NOOP)
500                 list_add(&tx->tx_list, &peer->peer_noops);
501         else if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_HELLO)
502                 list_add(&tx->tx_list, &peer->peer_sendq);
503         else
504                 list_add_tail(&tx->tx_list, &peer->peer_sendq);
505
506         spin_unlock_irqrestore(&peer->peer_lock, flags);
507 }
508
509
510 void
511 kptllnd_post_tx(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
512 {
513         /* CAVEAT EMPTOR: I take over caller's ref on 'tx' */
514         ptl_handle_md_t  msg_mdh;
515         ptl_md_t         md;
516         ptl_err_t        prc;
517
518         LASSERT (!tx->tx_idle);
519         LASSERT (!tx->tx_active);
520         LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
521         LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
522         LASSERT (tx->tx_type == TX_TYPE_SMALL_MESSAGE ||
523                  tx->tx_type == TX_TYPE_PUT_REQUEST ||
524                  tx->tx_type == TX_TYPE_GET_REQUEST);
525
526         kptllnd_set_tx_peer(tx, peer);
527
528         memset(&md, 0, sizeof(md));
529
530         md.threshold = tx->tx_acked ? 2 : 1;    /* SEND END + ACK? */
531         md.options = PTL_MD_OP_PUT |
532                      PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
533                      PTL_MD_EVENT_START_DISABLE;
534         md.user_ptr = &tx->tx_msg_eventarg;
535         md.eq_handle = kptllnd_data.kptl_eqh;
536
537         if (nfrag == 0) {
538                 md.start = tx->tx_msg;
539                 md.length = tx->tx_msg->ptlm_nob;
540         } else {
541                 LASSERT (nfrag > 1);
542                 LASSERT (tx->tx_frags->iov[0].iov_base == (void *)tx->tx_msg);
543
544                 md.start = tx->tx_frags;
545                 md.length = nfrag;
546                 md.options |= PTL_MD_IOVEC;
547         }
548
549         prc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &msg_mdh);
550         if (prc != PTL_OK) {
551                 CERROR("PtlMDBind(%s) failed: %s(%d)\n",
552                        libcfs_id2str(peer->peer_id),
553                        kptllnd_errtype2str(prc), prc);
554                 tx->tx_status = -EIO;
555                 kptllnd_tx_decref(tx);
556                 return;
557         }
558
559
560         tx->tx_deadline = jiffies + (*kptllnd_tunables.kptl_timeout * HZ);
561         tx->tx_active = 1;
562         tx->tx_msg_mdh = msg_mdh;
563         kptllnd_queue_tx(peer, tx);
564 }
565
566 /* NB "restarts" comes from peer_sendq of a single peer */
567 void
568 kptllnd_restart_txs (lnet_process_id_t target, struct list_head *restarts)
569 {
570         kptl_tx_t   *tx;
571         kptl_tx_t   *tmp;
572         kptl_peer_t *peer;
573
574         LASSERT (!list_empty(restarts));
575
576         if (kptllnd_find_target(&peer, target) != 0)
577                 peer = NULL;
578
579         list_for_each_entry_safe (tx, tmp, restarts, tx_list) {
580                 LASSERT (tx->tx_peer != NULL);
581                 LASSERT (tx->tx_type == TX_TYPE_GET_REQUEST ||
582                          tx->tx_type == TX_TYPE_PUT_REQUEST ||
583                          tx->tx_type == TX_TYPE_SMALL_MESSAGE);
584
585                 list_del_init(&tx->tx_list);
586
587                 if (peer == NULL ||
588                     tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {
589                         kptllnd_tx_decref(tx);
590                         continue;
591                 }
592
593                 LASSERT (tx->tx_msg->ptlm_type != PTLLND_MSG_TYPE_NOOP);
594                 tx->tx_status = 0;
595                 tx->tx_active = 1;
596                 kptllnd_peer_decref(tx->tx_peer);
597                 tx->tx_peer = NULL;
598                 kptllnd_set_tx_peer(tx, peer);
599                 kptllnd_queue_tx(peer, tx); /* takes over my ref on tx */
600         }
601
602         if (peer == NULL)
603                 return;
604
605         kptllnd_peer_check_sends(peer);
606         kptllnd_peer_decref(peer);
607 }
608
609 static inline int
610 kptllnd_peer_send_noop (kptl_peer_t *peer)
611 {
612         if (!peer->peer_sent_hello ||
613             peer->peer_credits == 0 ||
614             !list_empty(&peer->peer_noops) ||
615             peer->peer_outstanding_credits < PTLLND_CREDIT_HIGHWATER)
616                 return 0;
617
618         /* No tx to piggyback NOOP onto or no credit to send a tx */
619         return (list_empty(&peer->peer_sendq) || peer->peer_credits == 1);
620 }
621
622 void
623 kptllnd_peer_check_sends (kptl_peer_t *peer)
624 {
625         ptl_handle_me_t  meh;
626         kptl_tx_t       *tx;
627         int              rc;
628         int              msg_type;
629         unsigned long    flags;
630
631         LASSERT(!in_interrupt());
632
633         spin_lock_irqsave(&peer->peer_lock, flags);
634
635         peer->peer_retry_noop = 0;
636
637         if (kptllnd_peer_send_noop(peer)) {
638                 /* post a NOOP to return credits */
639                 spin_unlock_irqrestore(&peer->peer_lock, flags);
640
641                 tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
642                 if (tx == NULL) {
643                         CERROR("Can't return credits to %s: can't allocate descriptor\n",
644                                libcfs_id2str(peer->peer_id));
645                 } else {
646                         kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_NOOP, 0);
647                         kptllnd_post_tx(peer, tx, 0);
648                 }
649
650                 spin_lock_irqsave(&peer->peer_lock, flags);
651                 peer->peer_retry_noop = (tx == NULL);
652         }
653
654         for (;;) {
655                 if (!list_empty(&peer->peer_noops)) {
656                         LASSERT (peer->peer_sent_hello);
657                         tx = list_entry(peer->peer_noops.next,
658                                         kptl_tx_t, tx_list);
659                 } else if (!list_empty(&peer->peer_sendq)) {
660                         tx = list_entry(peer->peer_sendq.next,
661                                         kptl_tx_t, tx_list);
662                 } else {
663                         /* nothing to send right now */
664                         break;
665                 }
666
667                 LASSERT (tx->tx_active);
668                 LASSERT (!PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
669                 LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
670
671                 LASSERT (peer->peer_outstanding_credits >= 0);
672                 LASSERT (peer->peer_sent_credits >= 0);
673                 LASSERT (peer->peer_sent_credits +
674                          peer->peer_outstanding_credits <=
675                          *kptllnd_tunables.kptl_peertxcredits);
676                 LASSERT (peer->peer_credits >= 0);
677
678                 msg_type = tx->tx_msg->ptlm_type;
679
680                 /* Ensure HELLO is sent first */
681                 if (!peer->peer_sent_hello) {
682                         LASSERT (list_empty(&peer->peer_noops));
683                         if (msg_type != PTLLND_MSG_TYPE_HELLO)
684                                 break;
685                         peer->peer_sent_hello = 1;
686                 }
687
688                 if (peer->peer_credits == 0) {
689                         CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: no credits for %s[%p]\n",
690                                libcfs_id2str(peer->peer_id), 
691                                peer->peer_credits,
692                                peer->peer_outstanding_credits, 
693                                peer->peer_sent_credits, 
694                                kptllnd_msgtype2str(msg_type), tx);
695                         break;
696                 }
697
698                 /* Last/Initial credit reserved for NOOP/HELLO */
699                 if (peer->peer_credits == 1 &&
700                     msg_type != PTLLND_MSG_TYPE_HELLO &&
701                     msg_type != PTLLND_MSG_TYPE_NOOP) {
702                         CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: "
703                                "not using last credit for %s[%p]\n",
704                                libcfs_id2str(peer->peer_id), 
705                                peer->peer_credits,
706                                peer->peer_outstanding_credits,
707                                peer->peer_sent_credits,
708                                kptllnd_msgtype2str(msg_type), tx);
709                         break;
710                 }
711
712                 list_del(&tx->tx_list);
713
714                 /* Discard any NOOP I queued if I'm not at the high-water mark
715                  * any more or more messages have been queued */
716                 if (msg_type == PTLLND_MSG_TYPE_NOOP &&
717                     !kptllnd_peer_send_noop(peer)) {
718                         tx->tx_active = 0;
719
720                         spin_unlock_irqrestore(&peer->peer_lock, flags);
721
722                         CDEBUG(D_NET, "%s: redundant noop\n", 
723                                libcfs_id2str(peer->peer_id));
724                         kptllnd_tx_decref(tx);
725
726                         spin_lock_irqsave(&peer->peer_lock, flags);
727                         continue;
728                 }
729
730                 /* fill last-minute msg fields */
731                 kptllnd_msg_pack(tx->tx_msg, peer);
732
733                 if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
734                     tx->tx_type == TX_TYPE_GET_REQUEST) {
735                         /* peer_next_matchbits must be known good */
736                         LASSERT (peer->peer_state >= PEER_STATE_ACTIVE);
737                         /* Assume 64-bit matchbits can't wrap */
738                         LASSERT (peer->peer_next_matchbits >= PTL_RESERVED_MATCHBITS);
739                         tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits =
740                                 peer->peer_next_matchbits++;
741                 }
742
743                 peer->peer_sent_credits += peer->peer_outstanding_credits;
744                 peer->peer_outstanding_credits = 0;
745                 peer->peer_credits--;
746
747                 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: %s tx=%p nob=%d cred=%d\n",
748                        libcfs_id2str(peer->peer_id), peer->peer_credits,
749                        peer->peer_outstanding_credits, peer->peer_sent_credits,
750                        kptllnd_msgtype2str(msg_type), tx, tx->tx_msg->ptlm_nob,
751                        tx->tx_msg->ptlm_credits);
752
753                 list_add_tail(&tx->tx_list, &peer->peer_activeq);
754
755                 kptllnd_tx_addref(tx);          /* 1 ref for me... */
756
757                 spin_unlock_irqrestore(&peer->peer_lock, flags);
758
759                 if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
760                     tx->tx_type == TX_TYPE_GET_REQUEST) {
761                         /* Post bulk now we have safe matchbits */
762                         rc = PtlMEAttach(kptllnd_data.kptl_nih,
763                                          *kptllnd_tunables.kptl_portal,
764                                          peer->peer_ptlid,
765                                          tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits,
766                                          0,             /* ignore bits */
767                                          PTL_UNLINK,
768                                          PTL_INS_BEFORE,
769                                          &meh);
770                         if (rc != PTL_OK) {
771                                 CERROR("PtlMEAttach(%s) failed: %s(%d)\n",
772                                        libcfs_id2str(peer->peer_id),
773                                        kptllnd_errtype2str(rc), rc);
774                                 goto failed;
775                         }
776
777                         rc = PtlMDAttach(meh, tx->tx_rdma_md, PTL_UNLINK,
778                                          &tx->tx_rdma_mdh);
779                         if (rc != PTL_OK) {
780                                 CERROR("PtlMDAttach(%s) failed: %s(%d)\n",
781                                        libcfs_id2str(tx->tx_peer->peer_id),
782                                        kptllnd_errtype2str(rc), rc);
783                                 rc = PtlMEUnlink(meh);
784                                 LASSERT(rc == PTL_OK);
785                                 tx->tx_rdma_mdh = PTL_INVALID_HANDLE;
786                                 goto failed;
787                         }
788                         /* I'm not racing with the event callback here.  It's a
789                          * bug if there's an event on the MD I just attached
790                          * before I actually send the RDMA request message -
791                          * probably matchbits re-used in error. */
792                 }
793
794                 tx->tx_tposted = jiffies;       /* going on the wire */
795
796                 rc = PtlPut (tx->tx_msg_mdh,
797                              tx->tx_acked ? PTL_ACK_REQ : PTL_NOACK_REQ,
798                              peer->peer_ptlid,
799                              *kptllnd_tunables.kptl_portal,
800                              0,                 /* acl cookie */
801                              LNET_MSG_MATCHBITS,
802                              0,                 /* offset */
803                              0);                /* header data */
804                 if (rc != PTL_OK) {
805                         CERROR("PtlPut %s error %s(%d)\n",
806                                libcfs_id2str(peer->peer_id),
807                                kptllnd_errtype2str(rc), rc);
808                         goto failed;
809                 }
810
811                 kptllnd_tx_decref(tx);          /* drop my ref */
812
813                 spin_lock_irqsave(&peer->peer_lock, flags);
814         }
815
816         spin_unlock_irqrestore(&peer->peer_lock, flags);
817         return;
818
819  failed:
820         /* Nuke everything (including tx we were trying) */
821         kptllnd_peer_close(peer, -EIO);
822         kptllnd_tx_decref(tx);
823         kptllnd_schedule_ptltrace_dump();
824 }
825
826 kptl_tx_t *
827 kptllnd_find_timed_out_tx(kptl_peer_t *peer)
828 {
829         kptl_tx_t         *tx;
830         struct list_head  *ele;
831
832         list_for_each(ele, &peer->peer_sendq) {
833                 tx = list_entry(ele, kptl_tx_t, tx_list);
834
835                 if (time_after_eq(jiffies, tx->tx_deadline)) {
836                         kptllnd_tx_addref(tx);
837                         return tx;
838                 }
839         }
840
841         list_for_each(ele, &peer->peer_activeq) {
842                 tx = list_entry(ele, kptl_tx_t, tx_list);
843
844                 if (time_after_eq(jiffies, tx->tx_deadline)) {
845                         kptllnd_tx_addref(tx);
846                         return tx;
847                 }
848         }
849
850         return NULL;
851 }
852
853
854 void
855 kptllnd_peer_check_bucket (int idx, int stamp)
856 {
857         struct list_head  *peers = &kptllnd_data.kptl_peers[idx];
858         struct list_head  *ptmp;
859         kptl_peer_t       *peer;
860         kptl_tx_t         *tx;
861         unsigned long      flags;
862         int                nsend;
863         int                nactive;
864         int                check_sends;
865
866         CDEBUG(D_NET, "Bucket=%d, stamp=%d\n", idx, stamp);
867
868  again:
869         /* NB. Shared lock while I just look */
870         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
871
872         list_for_each (ptmp, peers) {
873                 peer = list_entry (ptmp, kptl_peer_t, peer_list);
874
875                 CDEBUG(D_NET, "Peer=%s Credits=%d Outstanding=%d Send=%d\n",
876                        libcfs_id2str(peer->peer_id), peer->peer_credits, 
877                        peer->peer_outstanding_credits, peer->peer_sent_credits);
878
879                 spin_lock(&peer->peer_lock);
880
881                 if (peer->peer_check_stamp == stamp) {
882                         /* checked already this pass */
883                         spin_unlock(&peer->peer_lock);
884                         continue;
885                 }
886
887                 peer->peer_check_stamp = stamp;
888                 tx = kptllnd_find_timed_out_tx(peer);
889                 check_sends = peer->peer_retry_noop;
890                 
891                 spin_unlock(&peer->peer_lock);
892                 
893                 if (tx == NULL && !check_sends)
894                         continue;
895
896                 kptllnd_peer_addref(peer); /* 1 ref for me... */
897
898                 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
899
900                 if (tx == NULL) { /* nothing timed out */
901                         kptllnd_peer_check_sends(peer);
902                         kptllnd_peer_decref(peer); /* ...until here or... */
903
904                         /* rescan after dropping the lock */
905                         goto again;
906                 }
907
908                 spin_lock_irqsave(&peer->peer_lock, flags);
909                 nsend = kptllnd_count_queue(&peer->peer_sendq);
910                 nactive = kptllnd_count_queue(&peer->peer_activeq);
911                 spin_unlock_irqrestore(&peer->peer_lock, flags);
912
913                 LCONSOLE_ERROR_MSG(0x126, "Timing out %s: %s\n",
914                                    libcfs_id2str(peer->peer_id),
915                                    (tx->tx_tposted == 0) ? 
916                                    "no free peer buffers" : 
917                                    "please check Portals");
918
919                 if (tx->tx_tposted) {
920                         CERROR("Could not send to %s after %ds (sent %lds ago); "
921                                 "check Portals for possible issues\n",
922                                 libcfs_id2str(peer->peer_id),
923                                 *kptllnd_tunables.kptl_timeout,
924                                 cfs_duration_sec(jiffies - tx->tx_tposted));
925                 } else {
926                         CERROR("Could not get credits for %s after %ds; "
927                                 "possible Lustre networking issues\n",
928                         libcfs_id2str(peer->peer_id),
929                         *kptllnd_tunables.kptl_timeout);
930                 }
931
932                 CERROR("%s timed out: cred %d outstanding %d, sent %d, "
933                        "sendq %d, activeq %d Tx %p %s (%s%s%s) status %d "
934                        "%sposted %lu T/O %ds\n",
935                        libcfs_id2str(peer->peer_id), peer->peer_credits,
936                        peer->peer_outstanding_credits, peer->peer_sent_credits,
937                        nsend, nactive, tx, kptllnd_tx_typestr(tx->tx_type),
938                        tx->tx_active ? "A" : "",
939                        PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ?
940                        "" : "M",
941                        PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE) ?
942                        "" : "D",
943                        tx->tx_status,
944                        (tx->tx_tposted == 0) ? "not " : "",
945                        (tx->tx_tposted == 0) ? 0UL : (jiffies - tx->tx_tposted),
946                        *kptllnd_tunables.kptl_timeout);
947
948 #ifdef CRAY_XT3
949                 if (*kptllnd_tunables.kptl_ptltrace_on_timeout)
950                         kptllnd_dump_ptltrace();
951 #endif
952
953                 kptllnd_tx_decref(tx);
954
955                 kptllnd_peer_close(peer, -ETIMEDOUT);
956                 kptllnd_peer_decref(peer); /* ...until here */
957
958                 /* start again now I've dropped the lock */
959                 goto again;
960         }
961
962         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
963 }
964
965 kptl_peer_t *
966 kptllnd_id2peer_locked (lnet_process_id_t id)
967 {
968         struct list_head *peers = kptllnd_nid2peerlist(id.nid);
969         struct list_head *tmp;
970         kptl_peer_t      *peer;
971
972         list_for_each (tmp, peers) {
973
974                 peer = list_entry (tmp, kptl_peer_t, peer_list);
975
976                 LASSERT(peer->peer_state == PEER_STATE_WAITING_HELLO ||
977                         peer->peer_state == PEER_STATE_ACTIVE);
978                 
979                 if (peer->peer_id.nid != id.nid ||
980                     peer->peer_id.pid != id.pid)
981                         continue;
982
983                 kptllnd_peer_addref(peer);
984
985                 CDEBUG(D_NET, "%s -> %s (%d)\n",
986                        libcfs_id2str(id), 
987                        kptllnd_ptlid2str(peer->peer_ptlid),
988                        atomic_read (&peer->peer_refcount));
989                 return peer;
990         }
991
992         return NULL;
993 }
994
995 void
996 kptllnd_peertable_overflow_msg(char *str, lnet_process_id_t id)
997 {
998         LCONSOLE_ERROR_MSG(0x127, "%s %s overflows the peer table[%d]: "
999                            "messages may be dropped\n",
1000                            str, libcfs_id2str(id),
1001                            kptllnd_data.kptl_n_active_peers);
1002         LCONSOLE_ERROR_MSG(0x128, "Please correct by increasing "
1003                            "'max_nodes' or 'max_procs_per_node'\n");
1004 }
1005
1006 __u64
1007 kptllnd_get_last_seen_matchbits_locked(lnet_process_id_t lpid)
1008 {
1009         kptl_peer_t            *peer;
1010         struct list_head       *tmp;
1011
1012         /* Find the last matchbits I saw this new peer using.  Note..
1013            A. This peer cannot be in the peer table - she's new!
1014            B. If I can't find the peer in the closing/zombie peers, all
1015               matchbits are safe because all refs to the (old) peer have gone
1016               so all txs have completed so there's no risk of matchbit
1017               collision!
1018          */
1019
1020         LASSERT(kptllnd_id2peer_locked(lpid) == NULL);
1021
1022         /* peer's last matchbits can't change after it comes out of the peer
1023          * table, so first match is fine */
1024
1025         list_for_each (tmp, &kptllnd_data.kptl_closing_peers) {
1026                 peer = list_entry (tmp, kptl_peer_t, peer_list);
1027
1028                 if (peer->peer_id.nid == lpid.nid &&
1029                     peer->peer_id.pid == lpid.pid)
1030                         return peer->peer_last_matchbits_seen;
1031         }
1032         
1033         list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
1034                 peer = list_entry (tmp, kptl_peer_t, peer_list);
1035
1036                 if (peer->peer_id.nid == lpid.nid &&
1037                     peer->peer_id.pid == lpid.pid)
1038                         return peer->peer_last_matchbits_seen;
1039         }
1040         
1041         return PTL_RESERVED_MATCHBITS;
1042 }
1043
1044 kptl_peer_t *
1045 kptllnd_peer_handle_hello (ptl_process_id_t  initiator,
1046                            kptl_msg_t       *msg)
1047 {
1048         rwlock_t           *g_lock = &kptllnd_data.kptl_peer_rw_lock;
1049         kptl_peer_t        *peer;
1050         kptl_peer_t        *new_peer;
1051         lnet_process_id_t   lpid;
1052         unsigned long       flags;
1053         kptl_tx_t          *hello_tx;
1054         int                 rc;
1055         __u64               safe_matchbits;
1056         __u64               last_matchbits_seen;
1057
1058         lpid.nid = msg->ptlm_srcnid;
1059         lpid.pid = msg->ptlm_srcpid;
1060
1061         CDEBUG(D_NET, "hello from %s(%s)\n",
1062                libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
1063
1064         if (initiator.pid != kptllnd_data.kptl_portals_id.pid &&
1065             (msg->ptlm_srcpid & LNET_PID_USERFLAG) == 0) {
1066                 /* If the peer's PID isn't _the_ ptllnd kernel pid, she must be
1067                  * userspace.  Refuse the connection if she hasn't set the
1068                  * correct flag in her PID... */
1069                 CERROR("Userflag not set in hello from %s (%s)\n",
1070                        libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
1071                 return NULL;
1072         }
1073         
1074         /* kptlhm_matchbits are the highest matchbits my peer may have used to
1075          * RDMA to me.  I ensure I never register buffers for RDMA that could
1076          * match any she used */
1077         safe_matchbits = msg->ptlm_u.hello.kptlhm_matchbits + 1;
1078
1079         if (safe_matchbits < PTL_RESERVED_MATCHBITS) {
1080                 CERROR("Illegal matchbits "LPX64" in HELLO from %s\n",
1081                        safe_matchbits, libcfs_id2str(lpid));
1082                 return NULL;
1083         }
1084         
1085         if (msg->ptlm_u.hello.kptlhm_max_msg_size < PTLLND_MIN_BUFFER_SIZE) {
1086                 CERROR("%s: max message size %d < MIN %d",
1087                        libcfs_id2str(lpid),
1088                        msg->ptlm_u.hello.kptlhm_max_msg_size,
1089                        PTLLND_MIN_BUFFER_SIZE);
1090                 return NULL;
1091         }
1092
1093         if (msg->ptlm_credits <= 1) {
1094                 CERROR("Need more than 1+%d credits from %s\n",
1095                        msg->ptlm_credits, libcfs_id2str(lpid));
1096                 return NULL;
1097         }
1098         
1099         write_lock_irqsave(g_lock, flags);
1100
1101         peer = kptllnd_id2peer_locked(lpid);
1102         if (peer != NULL) {
1103                 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
1104                         /* Completing HELLO handshake */
1105                         LASSERT(peer->peer_incarnation == 0);
1106
1107                         if (msg->ptlm_dststamp != 0 &&
1108                             msg->ptlm_dststamp != peer->peer_myincarnation) {
1109                                 write_unlock_irqrestore(g_lock, flags);
1110
1111                                 CERROR("Ignoring HELLO from %s: unexpected "
1112                                        "dststamp "LPX64" ("LPX64" wanted)\n",
1113                                        libcfs_id2str(lpid),
1114                                        msg->ptlm_dststamp,
1115                                        peer->peer_myincarnation);
1116                                 kptllnd_peer_decref(peer);
1117                                 return NULL;
1118                         }
1119                         
1120                         /* Concurrent initiation or response to my HELLO */
1121                         peer->peer_state = PEER_STATE_ACTIVE;
1122                         peer->peer_incarnation = msg->ptlm_srcstamp;
1123                         peer->peer_next_matchbits = safe_matchbits;
1124                         peer->peer_max_msg_size =
1125                                 msg->ptlm_u.hello.kptlhm_max_msg_size;
1126                         
1127                         write_unlock_irqrestore(g_lock, flags);
1128                         return peer;
1129                 }
1130
1131                 if (msg->ptlm_dststamp != 0 &&
1132                     msg->ptlm_dststamp <= peer->peer_myincarnation) {
1133                         write_unlock_irqrestore(g_lock, flags);
1134
1135                         CERROR("Ignoring stale HELLO from %s: "
1136                                "dststamp "LPX64" (current "LPX64")\n",
1137                                libcfs_id2str(lpid),
1138                                msg->ptlm_dststamp,
1139                                peer->peer_myincarnation);
1140                         kptllnd_peer_decref(peer);
1141                         return NULL;
1142                 }
1143
1144                 /* Brand new connection attempt: remove old incarnation */
1145                 kptllnd_peer_close_locked(peer, 0);
1146         }
1147
1148         kptllnd_cull_peertable_locked(lpid);
1149
1150         write_unlock_irqrestore(g_lock, flags);
1151
1152         if (peer != NULL) {
1153                 CDEBUG(D_NET, "Peer %s (%s) reconnecting:"
1154                        " stamp "LPX64"("LPX64")\n",
1155                        libcfs_id2str(lpid), kptllnd_ptlid2str(initiator),
1156                        msg->ptlm_srcstamp, peer->peer_incarnation);
1157
1158                 kptllnd_peer_decref(peer);
1159         }
1160
1161         hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1162         if (hello_tx == NULL) {
1163                 CERROR("Unable to allocate HELLO message for %s\n",
1164                        libcfs_id2str(lpid));
1165                 return NULL;
1166         }
1167
1168         kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1169                          sizeof(kptl_hello_msg_t));
1170
1171         new_peer = kptllnd_peer_allocate(lpid, initiator);
1172         if (new_peer == NULL) {
1173                 kptllnd_tx_decref(hello_tx);
1174                 return NULL;
1175         }
1176
1177         rc = kptllnd_peer_reserve_buffers();
1178         if (rc != 0) {
1179                 kptllnd_peer_decref(new_peer);
1180                 kptllnd_tx_decref(hello_tx);
1181
1182                 CERROR("Failed to reserve buffers for %s\n",
1183                        libcfs_id2str(lpid));
1184                 return NULL;
1185         }
1186
1187         write_lock_irqsave(g_lock, flags);
1188
1189  again:
1190         if (kptllnd_data.kptl_shutdown) {
1191                 write_unlock_irqrestore(g_lock, flags);
1192
1193                 CERROR ("Shutdown started, refusing connection from %s\n",
1194                         libcfs_id2str(lpid));
1195                 kptllnd_peer_unreserve_buffers();
1196                 kptllnd_peer_decref(new_peer);
1197                 kptllnd_tx_decref(hello_tx);
1198                 return NULL;
1199         }
1200
1201         peer = kptllnd_id2peer_locked(lpid);
1202         if (peer != NULL) {
1203                 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
1204                         /* An outgoing message instantiated 'peer' for me */
1205                         LASSERT(peer->peer_incarnation == 0);
1206
1207                         peer->peer_state = PEER_STATE_ACTIVE;
1208                         peer->peer_incarnation = msg->ptlm_srcstamp;
1209                         peer->peer_next_matchbits = safe_matchbits;
1210                         peer->peer_max_msg_size =
1211                                 msg->ptlm_u.hello.kptlhm_max_msg_size;
1212
1213                         write_unlock_irqrestore(g_lock, flags);
1214
1215                         CWARN("Outgoing instantiated peer %s\n",
1216                               libcfs_id2str(lpid));
1217                 } else {
1218                         LASSERT (peer->peer_state == PEER_STATE_ACTIVE);
1219
1220                         write_unlock_irqrestore(g_lock, flags);
1221
1222                         /* WOW!  Somehow this peer completed the HELLO
1223                          * handshake while I slept.  I guess I could have slept
1224                          * while it rebooted and sent a new HELLO, so I'll fail
1225                          * this one... */
1226                         CWARN("Wow! peer %s\n", libcfs_id2str(lpid));
1227                         kptllnd_peer_decref(peer);
1228                         peer = NULL;
1229                 }
1230
1231                 kptllnd_peer_unreserve_buffers();
1232                 kptllnd_peer_decref(new_peer);
1233                 kptllnd_tx_decref(hello_tx);
1234                 return peer;
1235         }
1236
1237         if (kptllnd_data.kptl_n_active_peers ==
1238             kptllnd_data.kptl_expected_peers) {
1239                 /* peer table full */
1240                 write_unlock_irqrestore(g_lock, flags);
1241
1242                 kptllnd_peertable_overflow_msg("Connection from ", lpid);
1243
1244                 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1245                 if (rc != 0) {
1246                         CERROR("Refusing connection from %s\n",
1247                                libcfs_id2str(lpid));
1248                         kptllnd_peer_unreserve_buffers();
1249                         kptllnd_peer_decref(new_peer);
1250                         kptllnd_tx_decref(hello_tx);
1251                         return NULL;
1252                 }
1253                 
1254                 write_lock_irqsave(g_lock, flags);
1255                 kptllnd_data.kptl_expected_peers++;
1256                 goto again;
1257         }
1258
1259         last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(lpid);
1260
1261         hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1262         hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1263                 *kptllnd_tunables.kptl_max_msg_size;
1264
1265         new_peer->peer_state = PEER_STATE_ACTIVE;
1266         new_peer->peer_incarnation = msg->ptlm_srcstamp;
1267         new_peer->peer_next_matchbits = safe_matchbits;
1268         new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1269         new_peer->peer_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size;
1270
1271         kptllnd_peer_add_peertable_locked(new_peer);
1272
1273         write_unlock_irqrestore(g_lock, flags);
1274
1275         /* NB someone else could get in now and post a message before I post
1276          * the HELLO, but post_tx/check_sends take care of that! */
1277
1278         CDEBUG(D_NETTRACE, "%s: post response hello %p\n",
1279                libcfs_id2str(new_peer->peer_id), hello_tx);
1280
1281         kptllnd_post_tx(new_peer, hello_tx, 0);
1282         kptllnd_peer_check_sends(new_peer);
1283
1284         return new_peer;
1285 }
1286
1287 void
1288 kptllnd_tx_launch(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
1289 {
1290         kptllnd_post_tx(peer, tx, nfrag);
1291         kptllnd_peer_check_sends(peer);
1292 }
1293
1294 int
1295 kptllnd_find_target(kptl_peer_t **peerp, lnet_process_id_t target)
1296 {
1297         rwlock_t         *g_lock = &kptllnd_data.kptl_peer_rw_lock;
1298         ptl_process_id_t  ptl_id;
1299         kptl_peer_t      *new_peer;
1300         kptl_tx_t        *hello_tx;
1301         unsigned long     flags;
1302         int               rc;
1303         __u64             last_matchbits_seen;
1304
1305         /* I expect to find the peer, so I only take a read lock... */
1306         read_lock_irqsave(g_lock, flags);
1307         *peerp = kptllnd_id2peer_locked(target);
1308         read_unlock_irqrestore(g_lock, flags);
1309
1310         if (*peerp != NULL)
1311                 return 0;
1312         
1313         if ((target.pid & LNET_PID_USERFLAG) != 0) {
1314                 CWARN("Refusing to create a new connection to %s "
1315                       "(non-kernel peer)\n", libcfs_id2str(target));
1316                 return -EHOSTUNREACH;
1317         }
1318
1319         /* The new peer is a kernel ptllnd, and kernel ptllnds all have
1320          * the same portals PID */
1321         ptl_id.nid = kptllnd_lnet2ptlnid(target.nid);
1322         ptl_id.pid = kptllnd_data.kptl_portals_id.pid;
1323
1324         hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1325         if (hello_tx == NULL) {
1326                 CERROR("Unable to allocate connect message for %s\n",
1327                        libcfs_id2str(target));
1328                 return -ENOMEM;
1329         }
1330
1331         hello_tx->tx_acked = 1;
1332         kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1333                          sizeof(kptl_hello_msg_t));
1334
1335         new_peer = kptllnd_peer_allocate(target, ptl_id);
1336         if (new_peer == NULL) {
1337                 rc = -ENOMEM;
1338                 goto unwind_0;
1339         }
1340
1341         rc = kptllnd_peer_reserve_buffers();
1342         if (rc != 0)
1343                 goto unwind_1;
1344
1345         write_lock_irqsave(g_lock, flags);
1346  again:
1347         if (kptllnd_data.kptl_shutdown) {
1348                 write_unlock_irqrestore(g_lock, flags);
1349                 rc = -ESHUTDOWN;
1350                 goto unwind_2;
1351         }
1352
1353         *peerp = kptllnd_id2peer_locked(target);
1354         if (*peerp != NULL) {
1355                 write_unlock_irqrestore(g_lock, flags);
1356                 goto unwind_2;
1357         }
1358
1359         kptllnd_cull_peertable_locked(target);
1360
1361         if (kptllnd_data.kptl_n_active_peers ==
1362             kptllnd_data.kptl_expected_peers) {
1363                 /* peer table full */
1364                 write_unlock_irqrestore(g_lock, flags);
1365
1366                 kptllnd_peertable_overflow_msg("Connection to ", target);
1367
1368                 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1369                 if (rc != 0) {
1370                         CERROR("Can't create connection to %s\n",
1371                                libcfs_id2str(target));
1372                         rc = -ENOMEM;
1373                         goto unwind_2;
1374                 }
1375                 write_lock_irqsave(g_lock, flags);
1376                 kptllnd_data.kptl_expected_peers++;
1377                 goto again;
1378         }
1379
1380         last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(target);
1381
1382         hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1383         hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1384                 *kptllnd_tunables.kptl_max_msg_size;
1385                 
1386         new_peer->peer_state = PEER_STATE_WAITING_HELLO;
1387         new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1388         
1389         kptllnd_peer_add_peertable_locked(new_peer);
1390
1391         write_unlock_irqrestore(g_lock, flags);
1392
1393         /* NB someone else could get in now and post a message before I post
1394          * the HELLO, but post_tx/check_sends take care of that! */
1395
1396         CDEBUG(D_NETTRACE, "%s: post initial hello %p\n",
1397                libcfs_id2str(new_peer->peer_id), hello_tx);
1398
1399         kptllnd_post_tx(new_peer, hello_tx, 0);
1400         kptllnd_peer_check_sends(new_peer);
1401        
1402         *peerp = new_peer;
1403         return 0;
1404         
1405  unwind_2:
1406         kptllnd_peer_unreserve_buffers();
1407  unwind_1:
1408         kptllnd_peer_decref(new_peer);
1409  unwind_0:
1410         kptllnd_tx_decref(hello_tx);
1411
1412         return rc;
1413 }