Whamcloud - gitweb
5d659d868df7015fa2500de970870553398e1cfd
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd_peer.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/klnds/ptllnd/ptllnd_peer.c
37  *
38  * Author: PJ Kirner <pjkirner@clusterfs.com>
39  * Author: E Barton <eeb@bartonsoftware.com>
40  */
41
42 #include "ptllnd.h"
43 #include <libcfs/list.h>
44
45 static int
46 kptllnd_count_queue(struct list_head *q)
47 {
48         struct list_head *e;
49         int               n = 0;
50         
51         list_for_each(e, q) {
52                 n++;
53         }
54
55         return n;
56 }
57
58 int
59 kptllnd_get_peer_info(int index, 
60                       lnet_process_id_t *id,
61                       int *state, int *sent_hello,
62                       int *refcount, __u64 *incarnation,
63                       __u64 *next_matchbits, __u64 *last_matchbits_seen,
64                       int *nsendq, int *nactiveq,
65                       int *credits, int *outstanding_credits) 
66 {
67         rwlock_t         *g_lock = &kptllnd_data.kptl_peer_rw_lock;
68         unsigned long     flags;
69         struct list_head *ptmp;
70         kptl_peer_t      *peer;
71         int               i;
72         int               rc = -ENOENT;
73
74         read_lock_irqsave(g_lock, flags);
75
76         for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++) {
77                 list_for_each (ptmp, &kptllnd_data.kptl_peers[i]) {
78                         peer = list_entry(ptmp, kptl_peer_t, peer_list);
79
80                         if (index-- > 0)
81                                 continue;
82                         
83                         *id          = peer->peer_id;
84                         *state       = peer->peer_state;
85                         *sent_hello  = peer->peer_sent_hello;
86                         *refcount    = atomic_read(&peer->peer_refcount);
87                         *incarnation = peer->peer_incarnation;
88
89                         spin_lock(&peer->peer_lock);
90
91                         *next_matchbits      = peer->peer_next_matchbits;
92                         *last_matchbits_seen = peer->peer_last_matchbits_seen;
93                         *credits             = peer->peer_credits;
94                         *outstanding_credits = peer->peer_outstanding_credits;
95
96                         *nsendq   = kptllnd_count_queue(&peer->peer_sendq);
97                         *nactiveq = kptllnd_count_queue(&peer->peer_activeq);
98
99                         spin_unlock(&peer->peer_lock);
100
101                         rc = 0;
102                         goto out;
103                 }
104         }
105         
106  out:
107         read_unlock_irqrestore(g_lock, flags);
108         return rc;
109 }
110
111 void
112 kptllnd_peer_add_peertable_locked (kptl_peer_t *peer)
113 {
114         LASSERT (kptllnd_data.kptl_n_active_peers <
115                  kptllnd_data.kptl_expected_peers);
116
117         LASSERT (peer->peer_state == PEER_STATE_WAITING_HELLO ||
118                  peer->peer_state == PEER_STATE_ACTIVE);
119         
120         kptllnd_data.kptl_n_active_peers++;
121         atomic_inc(&peer->peer_refcount);       /* +1 ref for the list */
122
123         /* NB add to HEAD of peer list for MRU order!
124          * (see kptllnd_cull_peertable) */
125         list_add(&peer->peer_list, kptllnd_nid2peerlist(peer->peer_id.nid));
126 }
127
128 void
129 kptllnd_cull_peertable_locked (lnet_process_id_t pid)
130 {
131         /* I'm about to add a new peer with this portals ID to the peer table,
132          * so (a) this peer should not exist already and (b) I want to leave at
133          * most (max_procs_per_nid - 1) peers with this NID in the table. */
134         struct list_head  *peers = kptllnd_nid2peerlist(pid.nid);
135         int                cull_count = *kptllnd_tunables.kptl_max_procs_per_node;
136         int                count;
137         struct list_head  *tmp;
138         struct list_head  *nxt;
139         kptl_peer_t       *peer;
140         
141         count = 0;
142         list_for_each_safe (tmp, nxt, peers) {
143                 /* NB I rely on kptllnd_peer_add_peertable_locked to add peers
144                  * in MRU order */
145                 peer = list_entry(tmp, kptl_peer_t, peer_list);
146                         
147                 if (LNET_NIDADDR(peer->peer_id.nid) != LNET_NIDADDR(pid.nid))
148                         continue;
149
150                 LASSERT (peer->peer_id.pid != pid.pid);
151                         
152                 count++;
153
154                 if (count < cull_count) /* recent (don't cull) */
155                         continue;
156
157                 CDEBUG(D_NET, "Cull %s(%s)\n",
158                        libcfs_id2str(peer->peer_id),
159                        kptllnd_ptlid2str(peer->peer_ptlid));
160                 
161                 kptllnd_peer_close_locked(peer, 0);
162         }
163 }
164
165 kptl_peer_t *
166 kptllnd_peer_allocate (kptl_net_t *net, lnet_process_id_t lpid, ptl_process_id_t ppid)
167 {
168         unsigned long    flags;
169         kptl_peer_t     *peer;
170
171         LIBCFS_ALLOC(peer, sizeof (*peer));
172         if (peer == NULL) {
173                 CERROR("Can't create peer %s (%s)\n",
174                        libcfs_id2str(lpid), 
175                        kptllnd_ptlid2str(ppid));
176                 return NULL;
177         }
178
179         memset(peer, 0, sizeof(*peer));         /* zero flags etc */
180
181         INIT_LIST_HEAD (&peer->peer_noops);
182         INIT_LIST_HEAD (&peer->peer_sendq);
183         INIT_LIST_HEAD (&peer->peer_activeq);
184         spin_lock_init (&peer->peer_lock);
185
186         peer->peer_state = PEER_STATE_ALLOCATED;
187         peer->peer_error = 0;
188         peer->peer_last_alive = 0;
189         peer->peer_id = lpid;
190         peer->peer_ptlid = ppid;
191         peer->peer_credits = 1;                 /* enough for HELLO */
192         peer->peer_next_matchbits = PTL_RESERVED_MATCHBITS;
193         peer->peer_outstanding_credits = *kptllnd_tunables.kptl_peertxcredits - 1;
194         peer->peer_sent_credits = 1;           /* HELLO credit is implicit */
195         peer->peer_max_msg_size = PTLLND_MIN_BUFFER_SIZE; /* until we know better */
196
197         atomic_set(&peer->peer_refcount, 1);    /* 1 ref for caller */
198
199         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
200
201         peer->peer_myincarnation = kptllnd_data.kptl_incarnation;
202
203         /* Only increase # peers under lock, to guarantee we dont grow it
204          * during shutdown */
205         if (net->net_shutdown) {
206                 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
207                 LIBCFS_FREE(peer, sizeof(*peer));
208                 return NULL;
209         }
210
211         kptllnd_data.kptl_npeers++;
212         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
213         return peer;
214 }
215
216 void
217 kptllnd_peer_destroy (kptl_peer_t *peer)
218 {
219         unsigned long flags;
220         
221         CDEBUG(D_NET, "Peer=%p\n", peer);
222
223         LASSERT (!in_interrupt());
224         LASSERT (atomic_read(&peer->peer_refcount) == 0);
225         LASSERT (peer->peer_state == PEER_STATE_ALLOCATED ||
226                  peer->peer_state == PEER_STATE_ZOMBIE);
227         LASSERT (list_empty(&peer->peer_noops));
228         LASSERT (list_empty(&peer->peer_sendq));
229         LASSERT (list_empty(&peer->peer_activeq));
230
231         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
232
233         if (peer->peer_state == PEER_STATE_ZOMBIE)
234                 list_del(&peer->peer_list);
235
236         kptllnd_data.kptl_npeers--;
237
238         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
239
240         LIBCFS_FREE (peer, sizeof (*peer));
241 }
242
243 void
244 kptllnd_cancel_txlist (struct list_head *peerq, struct list_head *txs)
245 {
246         struct list_head  *tmp;
247         struct list_head  *nxt;
248         kptl_tx_t         *tx;
249
250         list_for_each_safe (tmp, nxt, peerq) {
251                 tx = list_entry(tmp, kptl_tx_t, tx_list);
252
253                 list_del(&tx->tx_list);
254                 list_add_tail(&tx->tx_list, txs);
255
256                 tx->tx_status = -EIO;
257                 tx->tx_active = 0;
258         }
259 }
260
261 void
262 kptllnd_peer_cancel_txs(kptl_peer_t *peer, struct list_head *txs)
263 {
264         unsigned long   flags;
265
266         spin_lock_irqsave(&peer->peer_lock, flags);
267
268         kptllnd_cancel_txlist(&peer->peer_noops, txs);
269         kptllnd_cancel_txlist(&peer->peer_sendq, txs);
270         kptllnd_cancel_txlist(&peer->peer_activeq, txs);
271                 
272         spin_unlock_irqrestore(&peer->peer_lock, flags);
273 }
274
275 void
276 kptllnd_peer_alive (kptl_peer_t *peer)
277 {
278         /* This is racy, but everyone's only writing cfs_time_current() */
279         peer->peer_last_alive = cfs_time_current();
280         mb();
281 }
282
283 void
284 kptllnd_peer_notify (kptl_peer_t *peer)
285 {
286         unsigned long flags;
287         kptl_net_t   *net;
288         kptl_net_t  **nets;
289         int           i = 0;
290         int           nnets = 0;
291         int           error = 0;
292         time_t        last_alive = 0;
293         
294         spin_lock_irqsave(&peer->peer_lock, flags);
295
296         if (peer->peer_error != 0) {
297                 error = peer->peer_error;
298                 peer->peer_error = 0;
299                 
300                 last_alive = cfs_time_current_sec() - 
301                              cfs_duration_sec(cfs_time_current() - 
302                                               peer->peer_last_alive);
303         }
304         
305         spin_unlock_irqrestore(&peer->peer_lock, flags);
306
307         if (error == 0)
308                 return;
309
310         read_lock(&kptllnd_data.kptl_net_rw_lock);
311         list_for_each_entry (net, &kptllnd_data.kptl_nets, net_list)
312                 nnets++;
313         read_unlock(&kptllnd_data.kptl_net_rw_lock);
314
315         if (nnets == 0) /* shutdown in progress */
316                 return;
317
318         LIBCFS_ALLOC(nets, nnets * sizeof(*nets));
319         if (nets == NULL) {
320                 CERROR("Failed to allocate nets[%d]\n", nnets);
321                 return;
322         }
323         memset(nets, 0, nnets * sizeof(*nets));
324
325         read_lock(&kptllnd_data.kptl_net_rw_lock);
326         i = 0;
327         list_for_each_entry (net, &kptllnd_data.kptl_nets, net_list) {
328                 LASSERT (i < nnets);
329                 nets[i] = net;
330                 kptllnd_net_addref(net);
331                 i++;
332         }
333         read_unlock(&kptllnd_data.kptl_net_rw_lock);
334
335         for (i = 0; i < nnets; i++) {
336                 lnet_nid_t peer_nid;
337
338                 net = nets[i];
339                 if (net == NULL)
340                         break;
341
342                 if (!net->net_shutdown) {
343                         peer_nid = kptllnd_ptl2lnetnid(net->net_ni->ni_nid,
344                                                        peer->peer_ptlid.nid);
345                         lnet_notify(net->net_ni, peer_nid, 0, last_alive);
346                 }
347
348                 kptllnd_net_decref(net);
349         }
350
351         LIBCFS_FREE(nets, nnets * sizeof(*nets));
352 }
353
354 void
355 kptllnd_handle_closing_peers ()
356 {
357         unsigned long           flags;
358         struct list_head        txs;
359         kptl_peer_t            *peer;
360         struct list_head       *tmp;
361         struct list_head       *nxt;
362         kptl_tx_t              *tx;
363         int                     idle;
364
365         /* Check with a read lock first to avoid blocking anyone */
366
367         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
368         idle = list_empty(&kptllnd_data.kptl_closing_peers) &&
369                list_empty(&kptllnd_data.kptl_zombie_peers);
370         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
371
372         if (idle)
373                 return;
374
375         INIT_LIST_HEAD(&txs);
376
377         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
378
379         /* Cancel txs on all zombie peers.  NB anyone dropping the last peer
380          * ref removes it from this list, so I musn't drop the lock while
381          * scanning it. */
382         list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
383                 peer = list_entry (tmp, kptl_peer_t, peer_list);
384
385                 LASSERT (peer->peer_state == PEER_STATE_ZOMBIE);
386
387                 kptllnd_peer_cancel_txs(peer, &txs);
388         }
389
390         /* Notify LNET and cancel txs on closing (i.e. newly closed) peers.  NB
391          * I'm the only one removing from this list, but peers can be added on
392          * the end any time I drop the lock. */
393
394         list_for_each_safe (tmp, nxt, &kptllnd_data.kptl_closing_peers) {
395                 peer = list_entry (tmp, kptl_peer_t, peer_list);
396
397                 LASSERT (peer->peer_state == PEER_STATE_CLOSING);
398
399                 list_del(&peer->peer_list);
400                 list_add_tail(&peer->peer_list,
401                               &kptllnd_data.kptl_zombie_peers);
402                 peer->peer_state = PEER_STATE_ZOMBIE;
403
404                 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
405
406                 kptllnd_peer_notify(peer);
407                 kptllnd_peer_cancel_txs(peer, &txs);
408                 kptllnd_peer_decref(peer);
409
410                 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
411         }
412
413         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
414
415         /* Drop peer's ref on all cancelled txs.  This will get
416          * kptllnd_tx_fini() to abort outstanding comms if necessary. */
417
418         list_for_each_safe (tmp, nxt, &txs) {
419                 tx = list_entry(tmp, kptl_tx_t, tx_list);
420                 list_del(&tx->tx_list);
421                 kptllnd_tx_decref(tx);
422         }
423 }
424
425 void
426 kptllnd_peer_close_locked(kptl_peer_t *peer, int why)
427 {
428         switch (peer->peer_state) {
429         default:
430                 LBUG();
431
432         case PEER_STATE_WAITING_HELLO:
433         case PEER_STATE_ACTIVE:
434                 /* Ensure new peers see a new incarnation of me */
435                 LASSERT(peer->peer_myincarnation <= kptllnd_data.kptl_incarnation);
436                 if (peer->peer_myincarnation == kptllnd_data.kptl_incarnation)
437                         kptllnd_data.kptl_incarnation++;
438
439                 /* Removing from peer table */
440                 kptllnd_data.kptl_n_active_peers--;
441                 LASSERT (kptllnd_data.kptl_n_active_peers >= 0);
442
443                 list_del(&peer->peer_list);
444                 kptllnd_peer_unreserve_buffers();
445
446                 peer->peer_error = why; /* stash 'why' only on first close */
447                 peer->peer_state = PEER_STATE_CLOSING;
448
449                 /* Schedule for immediate attention, taking peer table's ref */
450                 list_add_tail(&peer->peer_list, 
451                               &kptllnd_data.kptl_closing_peers);
452                 wake_up(&kptllnd_data.kptl_watchdog_waitq);
453                 break;
454
455         case PEER_STATE_ZOMBIE:
456         case PEER_STATE_CLOSING:
457                 break;
458         }
459 }
460
461 void
462 kptllnd_peer_close(kptl_peer_t *peer, int why)
463 {
464         unsigned long      flags;
465
466         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
467         kptllnd_peer_close_locked(peer, why);
468         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
469 }
470
471 int
472 kptllnd_peer_del(lnet_process_id_t id)
473 {
474         struct list_head  *ptmp;
475         struct list_head  *pnxt;
476         kptl_peer_t       *peer;
477         int                lo;
478         int                hi;
479         int                i;
480         unsigned long      flags;
481         int                rc = -ENOENT;
482
483         /*
484          * Find the single bucket we are supposed to look at or if nid is a
485          * wildcard (LNET_NID_ANY) then look at all of the buckets
486          */
487         if (id.nid != LNET_NID_ANY) {
488                 struct list_head *l = kptllnd_nid2peerlist(id.nid);
489                 
490                 lo = hi =  l - kptllnd_data.kptl_peers;
491         } else {
492                 if (id.pid != LNET_PID_ANY)
493                         return -EINVAL;
494                 
495                 lo = 0;
496                 hi = kptllnd_data.kptl_peer_hash_size - 1;
497         }
498
499 again:
500         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
501
502         for (i = lo; i <= hi; i++) {
503                 list_for_each_safe (ptmp, pnxt, &kptllnd_data.kptl_peers[i]) {
504                         peer = list_entry (ptmp, kptl_peer_t, peer_list);
505
506                         if (!(id.nid == LNET_NID_ANY || 
507                               (LNET_NIDADDR(peer->peer_id.nid) == LNET_NIDADDR(id.nid) &&
508                                (id.pid == LNET_PID_ANY || 
509                                 peer->peer_id.pid == id.pid))))
510                                 continue;
511
512                         kptllnd_peer_addref(peer); /* 1 ref for me... */
513
514                         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
515                                                flags);
516
517                         kptllnd_peer_close(peer, 0);
518                         kptllnd_peer_decref(peer); /* ...until here */
519
520                         rc = 0;         /* matched something */
521
522                         /* start again now I've dropped the lock */
523                         goto again;
524                 }
525         }
526
527         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
528
529         return (rc);
530 }
531
532 void
533 kptllnd_queue_tx(kptl_peer_t *peer, kptl_tx_t *tx)
534 {
535         /* CAVEAT EMPTOR: I take over caller's ref on 'tx' */
536         unsigned long flags;
537
538         spin_lock_irqsave(&peer->peer_lock, flags);
539
540         /* Ensure HELLO is sent first */
541         if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_NOOP)
542                 list_add(&tx->tx_list, &peer->peer_noops);
543         else if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_HELLO)
544                 list_add(&tx->tx_list, &peer->peer_sendq);
545         else
546                 list_add_tail(&tx->tx_list, &peer->peer_sendq);
547
548         spin_unlock_irqrestore(&peer->peer_lock, flags);
549 }
550
551
552 void
553 kptllnd_post_tx(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
554 {
555         /* CAVEAT EMPTOR: I take over caller's ref on 'tx' */
556         ptl_handle_md_t  msg_mdh;
557         ptl_md_t         md;
558         ptl_err_t        prc;
559
560         LASSERT (!tx->tx_idle);
561         LASSERT (!tx->tx_active);
562         LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
563         LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
564         LASSERT (tx->tx_type == TX_TYPE_SMALL_MESSAGE ||
565                  tx->tx_type == TX_TYPE_PUT_REQUEST ||
566                  tx->tx_type == TX_TYPE_GET_REQUEST);
567
568         kptllnd_set_tx_peer(tx, peer);
569
570         memset(&md, 0, sizeof(md));
571
572         md.threshold = tx->tx_acked ? 2 : 1;    /* SEND END + ACK? */
573         md.options = PTL_MD_OP_PUT |
574                      PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
575                      PTL_MD_EVENT_START_DISABLE;
576         md.user_ptr = &tx->tx_msg_eventarg;
577         md.eq_handle = kptllnd_data.kptl_eqh;
578
579         if (nfrag == 0) {
580                 md.start = tx->tx_msg;
581                 md.length = tx->tx_msg->ptlm_nob;
582         } else {
583                 LASSERT (nfrag > 1);
584                 LASSERT (tx->tx_frags->iov[0].iov_base == (void *)tx->tx_msg);
585
586                 md.start = tx->tx_frags;
587                 md.length = nfrag;
588                 md.options |= PTL_MD_IOVEC;
589         }
590
591         prc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &msg_mdh);
592         if (prc != PTL_OK) {
593                 CERROR("PtlMDBind(%s) failed: %s(%d)\n",
594                        libcfs_id2str(peer->peer_id),
595                        kptllnd_errtype2str(prc), prc);
596                 tx->tx_status = -EIO;
597                 kptllnd_tx_decref(tx);
598                 return;
599         }
600
601
602         tx->tx_deadline = jiffies + (*kptllnd_tunables.kptl_timeout * HZ);
603         tx->tx_active = 1;
604         tx->tx_msg_mdh = msg_mdh;
605         kptllnd_queue_tx(peer, tx);
606 }
607
608 /* NB "restarts" comes from peer_sendq of a single peer */
609 void
610 kptllnd_restart_txs (kptl_net_t *net, lnet_process_id_t target, struct list_head *restarts)
611 {
612         kptl_tx_t   *tx;
613         kptl_tx_t   *tmp;
614         kptl_peer_t *peer;
615
616         LASSERT (!list_empty(restarts));
617
618         if (kptllnd_find_target(net, target, &peer) != 0)
619                 peer = NULL;
620
621         list_for_each_entry_safe (tx, tmp, restarts, tx_list) {
622                 LASSERT (tx->tx_peer != NULL);
623                 LASSERT (tx->tx_type == TX_TYPE_GET_REQUEST ||
624                          tx->tx_type == TX_TYPE_PUT_REQUEST ||
625                          tx->tx_type == TX_TYPE_SMALL_MESSAGE);
626
627                 list_del_init(&tx->tx_list);
628
629                 if (peer == NULL ||
630                     tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {
631                         kptllnd_tx_decref(tx);
632                         continue;
633                 }
634
635                 LASSERT (tx->tx_msg->ptlm_type != PTLLND_MSG_TYPE_NOOP);
636                 tx->tx_status = 0;
637                 tx->tx_active = 1;
638                 kptllnd_peer_decref(tx->tx_peer);
639                 tx->tx_peer = NULL;
640                 kptllnd_set_tx_peer(tx, peer);
641                 kptllnd_queue_tx(peer, tx); /* takes over my ref on tx */
642         }
643
644         if (peer == NULL)
645                 return;
646
647         kptllnd_peer_check_sends(peer);
648         kptllnd_peer_decref(peer);
649 }
650
651 static inline int
652 kptllnd_peer_send_noop (kptl_peer_t *peer)
653 {
654         if (!peer->peer_sent_hello ||
655             peer->peer_credits == 0 ||
656             !list_empty(&peer->peer_noops) ||
657             peer->peer_outstanding_credits < PTLLND_CREDIT_HIGHWATER)
658                 return 0;
659
660         /* No tx to piggyback NOOP onto or no credit to send a tx */
661         return (list_empty(&peer->peer_sendq) || peer->peer_credits == 1);
662 }
663
664 void
665 kptllnd_peer_check_sends (kptl_peer_t *peer)
666 {
667         ptl_handle_me_t  meh;
668         kptl_tx_t       *tx;
669         int              rc;
670         int              msg_type;
671         unsigned long    flags;
672
673         LASSERT(!in_interrupt());
674
675         spin_lock_irqsave(&peer->peer_lock, flags);
676
677         peer->peer_retry_noop = 0;
678
679         if (kptllnd_peer_send_noop(peer)) {
680                 /* post a NOOP to return credits */
681                 spin_unlock_irqrestore(&peer->peer_lock, flags);
682
683                 tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
684                 if (tx == NULL) {
685                         CERROR("Can't return credits to %s: can't allocate descriptor\n",
686                                libcfs_id2str(peer->peer_id));
687                 } else {
688                         kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_NOOP,
689                                          peer->peer_id, 0);
690                         kptllnd_post_tx(peer, tx, 0);
691                 }
692
693                 spin_lock_irqsave(&peer->peer_lock, flags);
694                 peer->peer_retry_noop = (tx == NULL);
695         }
696
697         for (;;) {
698                 if (!list_empty(&peer->peer_noops)) {
699                         LASSERT (peer->peer_sent_hello);
700                         tx = list_entry(peer->peer_noops.next,
701                                         kptl_tx_t, tx_list);
702                 } else if (!list_empty(&peer->peer_sendq)) {
703                         tx = list_entry(peer->peer_sendq.next,
704                                         kptl_tx_t, tx_list);
705                 } else {
706                         /* nothing to send right now */
707                         break;
708                 }
709
710                 LASSERT (tx->tx_active);
711                 LASSERT (!PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
712                 LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
713
714                 LASSERT (peer->peer_outstanding_credits >= 0);
715                 LASSERT (peer->peer_sent_credits >= 0);
716                 LASSERT (peer->peer_sent_credits +
717                          peer->peer_outstanding_credits <=
718                          *kptllnd_tunables.kptl_peertxcredits);
719                 LASSERT (peer->peer_credits >= 0);
720
721                 msg_type = tx->tx_msg->ptlm_type;
722
723                 /* Ensure HELLO is sent first */
724                 if (!peer->peer_sent_hello) {
725                         LASSERT (list_empty(&peer->peer_noops));
726                         if (msg_type != PTLLND_MSG_TYPE_HELLO)
727                                 break;
728                         peer->peer_sent_hello = 1;
729                 }
730
731                 if (peer->peer_credits == 0) {
732                         CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: no credits for %s[%p]\n",
733                                libcfs_id2str(peer->peer_id), 
734                                peer->peer_credits,
735                                peer->peer_outstanding_credits, 
736                                peer->peer_sent_credits, 
737                                kptllnd_msgtype2str(msg_type), tx);
738                         break;
739                 }
740
741                 /* Last/Initial credit reserved for NOOP/HELLO */
742                 if (peer->peer_credits == 1 &&
743                     msg_type != PTLLND_MSG_TYPE_HELLO &&
744                     msg_type != PTLLND_MSG_TYPE_NOOP) {
745                         CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: "
746                                "not using last credit for %s[%p]\n",
747                                libcfs_id2str(peer->peer_id), 
748                                peer->peer_credits,
749                                peer->peer_outstanding_credits,
750                                peer->peer_sent_credits,
751                                kptllnd_msgtype2str(msg_type), tx);
752                         break;
753                 }
754
755                 list_del(&tx->tx_list);
756
757                 /* Discard any NOOP I queued if I'm not at the high-water mark
758                  * any more or more messages have been queued */
759                 if (msg_type == PTLLND_MSG_TYPE_NOOP &&
760                     !kptllnd_peer_send_noop(peer)) {
761                         tx->tx_active = 0;
762
763                         spin_unlock_irqrestore(&peer->peer_lock, flags);
764
765                         CDEBUG(D_NET, "%s: redundant noop\n", 
766                                libcfs_id2str(peer->peer_id));
767                         kptllnd_tx_decref(tx);
768
769                         spin_lock_irqsave(&peer->peer_lock, flags);
770                         continue;
771                 }
772
773                 /* fill last-minute msg fields */
774                 kptllnd_msg_pack(tx->tx_msg, peer);
775
776                 if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
777                     tx->tx_type == TX_TYPE_GET_REQUEST) {
778                         /* peer_next_matchbits must be known good */
779                         LASSERT (peer->peer_state >= PEER_STATE_ACTIVE);
780                         /* Assume 64-bit matchbits can't wrap */
781                         LASSERT (peer->peer_next_matchbits >= PTL_RESERVED_MATCHBITS);
782                         tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits =
783                                 peer->peer_next_matchbits++;
784                 }
785
786                 peer->peer_sent_credits += peer->peer_outstanding_credits;
787                 peer->peer_outstanding_credits = 0;
788                 peer->peer_credits--;
789
790                 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: %s tx=%p nob=%d cred=%d\n",
791                        libcfs_id2str(peer->peer_id), peer->peer_credits,
792                        peer->peer_outstanding_credits, peer->peer_sent_credits,
793                        kptllnd_msgtype2str(msg_type), tx, tx->tx_msg->ptlm_nob,
794                        tx->tx_msg->ptlm_credits);
795
796                 list_add_tail(&tx->tx_list, &peer->peer_activeq);
797
798                 kptllnd_tx_addref(tx);          /* 1 ref for me... */
799
800                 spin_unlock_irqrestore(&peer->peer_lock, flags);
801
802                 if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
803                     tx->tx_type == TX_TYPE_GET_REQUEST) {
804                         /* Post bulk now we have safe matchbits */
805                         rc = PtlMEAttach(kptllnd_data.kptl_nih,
806                                          *kptllnd_tunables.kptl_portal,
807                                          peer->peer_ptlid,
808                                          tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits,
809                                          0,             /* ignore bits */
810                                          PTL_UNLINK,
811                                          PTL_INS_BEFORE,
812                                          &meh);
813                         if (rc != PTL_OK) {
814                                 CERROR("PtlMEAttach(%s) failed: %s(%d)\n",
815                                        libcfs_id2str(peer->peer_id),
816                                        kptllnd_errtype2str(rc), rc);
817                                 goto failed;
818                         }
819
820                         rc = PtlMDAttach(meh, tx->tx_rdma_md, PTL_UNLINK,
821                                          &tx->tx_rdma_mdh);
822                         if (rc != PTL_OK) {
823                                 CERROR("PtlMDAttach(%s) failed: %s(%d)\n",
824                                        libcfs_id2str(tx->tx_peer->peer_id),
825                                        kptllnd_errtype2str(rc), rc);
826                                 rc = PtlMEUnlink(meh);
827                                 LASSERT(rc == PTL_OK);
828                                 tx->tx_rdma_mdh = PTL_INVALID_HANDLE;
829                                 goto failed;
830                         }
831                         /* I'm not racing with the event callback here.  It's a
832                          * bug if there's an event on the MD I just attached
833                          * before I actually send the RDMA request message -
834                          * probably matchbits re-used in error. */
835                 }
836
837                 tx->tx_tposted = jiffies;       /* going on the wire */
838
839                 rc = PtlPut (tx->tx_msg_mdh,
840                              tx->tx_acked ? PTL_ACK_REQ : PTL_NOACK_REQ,
841                              peer->peer_ptlid,
842                              *kptllnd_tunables.kptl_portal,
843                              0,                 /* acl cookie */
844                              LNET_MSG_MATCHBITS,
845                              0,                 /* offset */
846                              0);                /* header data */
847                 if (rc != PTL_OK) {
848                         CERROR("PtlPut %s error %s(%d)\n",
849                                libcfs_id2str(peer->peer_id),
850                                kptllnd_errtype2str(rc), rc);
851                         goto failed;
852                 }
853
854                 kptllnd_tx_decref(tx);          /* drop my ref */
855
856                 spin_lock_irqsave(&peer->peer_lock, flags);
857         }
858
859         spin_unlock_irqrestore(&peer->peer_lock, flags);
860         return;
861
862  failed:
863         /* Nuke everything (including tx we were trying) */
864         kptllnd_peer_close(peer, -EIO);
865         kptllnd_tx_decref(tx);
866         kptllnd_schedule_ptltrace_dump();
867 }
868
869 kptl_tx_t *
870 kptllnd_find_timed_out_tx(kptl_peer_t *peer)
871 {
872         kptl_tx_t         *tx;
873         struct list_head  *ele;
874
875         list_for_each(ele, &peer->peer_sendq) {
876                 tx = list_entry(ele, kptl_tx_t, tx_list);
877
878                 if (time_after_eq(jiffies, tx->tx_deadline)) {
879                         kptllnd_tx_addref(tx);
880                         return tx;
881                 }
882         }
883
884         list_for_each(ele, &peer->peer_activeq) {
885                 tx = list_entry(ele, kptl_tx_t, tx_list);
886
887                 if (time_after_eq(jiffies, tx->tx_deadline)) {
888                         kptllnd_tx_addref(tx);
889                         return tx;
890                 }
891         }
892
893         return NULL;
894 }
895
896
897 void
898 kptllnd_peer_check_bucket (int idx, int stamp)
899 {
900         struct list_head  *peers = &kptllnd_data.kptl_peers[idx];
901         kptl_peer_t       *peer;
902         unsigned long      flags;
903
904         CDEBUG(D_NET, "Bucket=%d, stamp=%d\n", idx, stamp);
905
906  again:
907         /* NB. Shared lock while I just look */
908         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
909
910         list_for_each_entry (peer, peers, peer_list) {
911                 kptl_tx_t *tx;
912                 int        check_sends;
913                 int        c = -1, oc = -1, sc = -1;
914                 int        nsend = -1, nactive = -1;
915                 int        sent_hello = -1, state = -1;
916
917                 CDEBUG(D_NET, "Peer=%s Credits=%d Outstanding=%d Send=%d\n",
918                        libcfs_id2str(peer->peer_id), peer->peer_credits, 
919                        peer->peer_outstanding_credits, peer->peer_sent_credits);
920
921                 spin_lock(&peer->peer_lock);
922
923                 if (peer->peer_check_stamp == stamp) {
924                         /* checked already this pass */
925                         spin_unlock(&peer->peer_lock);
926                         continue;
927                 }
928
929                 peer->peer_check_stamp = stamp;
930                 tx = kptllnd_find_timed_out_tx(peer);
931                 check_sends = peer->peer_retry_noop;
932                 
933                 if (tx != NULL) {
934                         c  = peer->peer_credits;
935                         sc = peer->peer_sent_credits;
936                         oc = peer->peer_outstanding_credits;
937                         state      = peer->peer_state;
938                         sent_hello = peer->peer_sent_hello;
939                         nsend   = kptllnd_count_queue(&peer->peer_sendq);
940                         nactive = kptllnd_count_queue(&peer->peer_activeq);
941                 }
942
943                 spin_unlock(&peer->peer_lock);
944                 
945                 if (tx == NULL && !check_sends)
946                         continue;
947
948                 kptllnd_peer_addref(peer); /* 1 ref for me... */
949
950                 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
951
952                 if (tx == NULL) { /* nothing timed out */
953                         kptllnd_peer_check_sends(peer);
954                         kptllnd_peer_decref(peer); /* ...until here or... */
955
956                         /* rescan after dropping the lock */
957                         goto again;
958                 }
959
960                 LCONSOLE_ERROR_MSG(0x126, "Timing out %s: %s\n",
961                                    libcfs_id2str(peer->peer_id),
962                                    (tx->tx_tposted == 0) ? 
963                                    "no free peer buffers" : 
964                                    "please check Portals");
965
966                 if (tx->tx_tposted) {
967                         CERROR("Could not send to %s after %ds (sent %lds ago); "
968                                 "check Portals for possible issues\n",
969                                 libcfs_id2str(peer->peer_id),
970                                 *kptllnd_tunables.kptl_timeout,
971                                 cfs_duration_sec(jiffies - tx->tx_tposted));
972                 } else if (state < PEER_STATE_ACTIVE) {
973                         CERROR("Could not connect %s (%d) after %ds; "
974                                "peer might be down\n",
975                                libcfs_id2str(peer->peer_id), state,
976                                *kptllnd_tunables.kptl_timeout);
977                 } else {
978                         CERROR("Could not get credits for %s after %ds; "
979                                 "possible Lustre networking issues\n",
980                         libcfs_id2str(peer->peer_id),
981                         *kptllnd_tunables.kptl_timeout);
982                 }
983
984                 CERROR("%s timed out: cred %d outstanding %d, sent %d, "
985                        "state %d, sent_hello %d, sendq %d, activeq %d "
986                        "Tx %p %s %s (%s%s%s) status %d %sposted %lu T/O %ds\n",
987                        libcfs_id2str(peer->peer_id), c, oc, sc,
988                        state, sent_hello, nsend, nactive,
989                        tx, kptllnd_tx_typestr(tx->tx_type),
990                        kptllnd_msgtype2str(tx->tx_msg->ptlm_type),
991                        tx->tx_active ? "A" : "",
992                        PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ?
993                        "" : "M",
994                        PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE) ?
995                        "" : "D",
996                        tx->tx_status,
997                        (tx->tx_tposted == 0) ? "not " : "",
998                        (tx->tx_tposted == 0) ? 0UL : (jiffies - tx->tx_tposted),
999                        *kptllnd_tunables.kptl_timeout);
1000
1001 #ifdef CRAY_XT3
1002                 if (*kptllnd_tunables.kptl_ptltrace_on_timeout)
1003                         kptllnd_dump_ptltrace();
1004 #endif
1005
1006                 kptllnd_tx_decref(tx);
1007
1008                 kptllnd_peer_close(peer, -ETIMEDOUT);
1009                 kptllnd_peer_decref(peer); /* ...until here */
1010
1011                 /* start again now I've dropped the lock */
1012                 goto again;
1013         }
1014
1015         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
1016 }
1017
1018 kptl_peer_t *
1019 kptllnd_id2peer_locked (lnet_process_id_t id)
1020 {
1021         struct list_head *peers = kptllnd_nid2peerlist(id.nid);
1022         struct list_head *tmp;
1023         kptl_peer_t      *peer;
1024
1025         list_for_each (tmp, peers) {
1026                 peer = list_entry (tmp, kptl_peer_t, peer_list);
1027
1028                 LASSERT(peer->peer_state == PEER_STATE_WAITING_HELLO ||
1029                         peer->peer_state == PEER_STATE_ACTIVE);
1030
1031                 /* NB logical LNet peers share one kptl_peer_t */
1032                 if (peer->peer_id.pid != id.pid ||
1033                     LNET_NIDADDR(id.nid) != LNET_NIDADDR(peer->peer_id.nid))
1034                         continue;
1035
1036                 kptllnd_peer_addref(peer);
1037
1038                 CDEBUG(D_NET, "%s -> %s (%d)\n",
1039                        libcfs_id2str(id),
1040                        kptllnd_ptlid2str(peer->peer_ptlid),
1041                        atomic_read (&peer->peer_refcount));
1042                 return peer;
1043         }
1044
1045         return NULL;
1046 }
1047
1048 void
1049 kptllnd_peertable_overflow_msg(char *str, lnet_process_id_t id)
1050 {
1051         LCONSOLE_ERROR_MSG(0x127, "%s %s overflows the peer table[%d]: "
1052                            "messages may be dropped\n",
1053                            str, libcfs_id2str(id),
1054                            kptllnd_data.kptl_n_active_peers);
1055         LCONSOLE_ERROR_MSG(0x128, "Please correct by increasing "
1056                            "'max_nodes' or 'max_procs_per_node'\n");
1057 }
1058
1059 __u64
1060 kptllnd_get_last_seen_matchbits_locked(lnet_process_id_t lpid)
1061 {
1062         kptl_peer_t            *peer;
1063         struct list_head       *tmp;
1064
1065         /* Find the last matchbits I saw this new peer using.  Note..
1066            A. This peer cannot be in the peer table - she's new!
1067            B. If I can't find the peer in the closing/zombie peers, all
1068               matchbits are safe because all refs to the (old) peer have gone
1069               so all txs have completed so there's no risk of matchbit
1070               collision!
1071          */
1072
1073         LASSERT(kptllnd_id2peer_locked(lpid) == NULL);
1074
1075         /* peer's last matchbits can't change after it comes out of the peer
1076          * table, so first match is fine */
1077
1078         list_for_each (tmp, &kptllnd_data.kptl_closing_peers) {
1079                 peer = list_entry (tmp, kptl_peer_t, peer_list);
1080
1081                 if (LNET_NIDADDR(peer->peer_id.nid) == LNET_NIDADDR(lpid.nid) &&
1082                     peer->peer_id.pid == lpid.pid)
1083                         return peer->peer_last_matchbits_seen;
1084         }
1085
1086         list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
1087                 peer = list_entry (tmp, kptl_peer_t, peer_list);
1088
1089                 if (LNET_NIDADDR(peer->peer_id.nid) == LNET_NIDADDR(lpid.nid) &&
1090                     peer->peer_id.pid == lpid.pid)
1091                         return peer->peer_last_matchbits_seen;
1092         }
1093
1094         return PTL_RESERVED_MATCHBITS;
1095 }
1096
1097 kptl_peer_t *
1098 kptllnd_peer_handle_hello (kptl_net_t *net,
1099                            ptl_process_id_t initiator, kptl_msg_t *msg)
1100 {
1101         rwlock_t           *g_lock = &kptllnd_data.kptl_peer_rw_lock;
1102         kptl_peer_t        *peer;
1103         kptl_peer_t        *new_peer;
1104         lnet_process_id_t   lpid;
1105         unsigned long       flags;
1106         kptl_tx_t          *hello_tx;
1107         int                 rc;
1108         __u64               safe_matchbits;
1109         __u64               last_matchbits_seen;
1110
1111         lpid.nid = msg->ptlm_srcnid;
1112         lpid.pid = msg->ptlm_srcpid;
1113
1114         CDEBUG(D_NET, "hello from %s(%s)\n",
1115                libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
1116
1117         if (initiator.pid != kptllnd_data.kptl_portals_id.pid &&
1118             (msg->ptlm_srcpid & LNET_PID_USERFLAG) == 0) {
1119                 /* If the peer's PID isn't _the_ ptllnd kernel pid, she must be
1120                  * userspace.  Refuse the connection if she hasn't set the
1121                  * correct flag in her PID... */
1122                 CERROR("Userflag not set in hello from %s (%s)\n",
1123                        libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
1124                 return NULL;
1125         }
1126         
1127         /* kptlhm_matchbits are the highest matchbits my peer may have used to
1128          * RDMA to me.  I ensure I never register buffers for RDMA that could
1129          * match any she used */
1130         safe_matchbits = msg->ptlm_u.hello.kptlhm_matchbits + 1;
1131
1132         if (safe_matchbits < PTL_RESERVED_MATCHBITS) {
1133                 CERROR("Illegal matchbits "LPX64" in HELLO from %s\n",
1134                        safe_matchbits, libcfs_id2str(lpid));
1135                 return NULL;
1136         }
1137         
1138         if (msg->ptlm_u.hello.kptlhm_max_msg_size < PTLLND_MIN_BUFFER_SIZE) {
1139                 CERROR("%s: max message size %d < MIN %d",
1140                        libcfs_id2str(lpid),
1141                        msg->ptlm_u.hello.kptlhm_max_msg_size,
1142                        PTLLND_MIN_BUFFER_SIZE);
1143                 return NULL;
1144         }
1145
1146         if (msg->ptlm_credits <= 1) {
1147                 CERROR("Need more than 1+%d credits from %s\n",
1148                        msg->ptlm_credits, libcfs_id2str(lpid));
1149                 return NULL;
1150         }
1151         
1152         write_lock_irqsave(g_lock, flags);
1153
1154         peer = kptllnd_id2peer_locked(lpid);
1155         if (peer != NULL) {
1156                 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
1157                         /* Completing HELLO handshake */
1158                         LASSERT(peer->peer_incarnation == 0);
1159
1160                         if (msg->ptlm_dststamp != 0 &&
1161                             msg->ptlm_dststamp != peer->peer_myincarnation) {
1162                                 write_unlock_irqrestore(g_lock, flags);
1163
1164                                 CERROR("Ignoring HELLO from %s: unexpected "
1165                                        "dststamp "LPX64" ("LPX64" wanted)\n",
1166                                        libcfs_id2str(lpid),
1167                                        msg->ptlm_dststamp,
1168                                        peer->peer_myincarnation);
1169                                 kptllnd_peer_decref(peer);
1170                                 return NULL;
1171                         }
1172                         
1173                         /* Concurrent initiation or response to my HELLO */
1174                         peer->peer_state = PEER_STATE_ACTIVE;
1175                         peer->peer_incarnation = msg->ptlm_srcstamp;
1176                         peer->peer_next_matchbits = safe_matchbits;
1177                         peer->peer_max_msg_size =
1178                                 msg->ptlm_u.hello.kptlhm_max_msg_size;
1179                         
1180                         write_unlock_irqrestore(g_lock, flags);
1181                         return peer;
1182                 }
1183
1184                 if (msg->ptlm_dststamp != 0 &&
1185                     msg->ptlm_dststamp <= peer->peer_myincarnation) {
1186                         write_unlock_irqrestore(g_lock, flags);
1187
1188                         CERROR("Ignoring stale HELLO from %s: "
1189                                "dststamp "LPX64" (current "LPX64")\n",
1190                                libcfs_id2str(lpid),
1191                                msg->ptlm_dststamp,
1192                                peer->peer_myincarnation);
1193                         kptllnd_peer_decref(peer);
1194                         return NULL;
1195                 }
1196
1197                 /* Brand new connection attempt: remove old incarnation */
1198                 kptllnd_peer_close_locked(peer, 0);
1199         }
1200
1201         kptllnd_cull_peertable_locked(lpid);
1202
1203         write_unlock_irqrestore(g_lock, flags);
1204
1205         if (peer != NULL) {
1206                 CDEBUG(D_NET, "Peer %s (%s) reconnecting:"
1207                        " stamp "LPX64"("LPX64")\n",
1208                        libcfs_id2str(lpid), kptllnd_ptlid2str(initiator),
1209                        msg->ptlm_srcstamp, peer->peer_incarnation);
1210
1211                 kptllnd_peer_decref(peer);
1212                 peer = NULL;
1213         }
1214
1215         hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1216         if (hello_tx == NULL) {
1217                 CERROR("Unable to allocate HELLO message for %s\n",
1218                        libcfs_id2str(lpid));
1219                 return NULL;
1220         }
1221
1222         kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1223                          lpid, sizeof(kptl_hello_msg_t));
1224
1225         new_peer = kptllnd_peer_allocate(net, lpid, initiator);
1226         if (new_peer == NULL) {
1227                 kptllnd_tx_decref(hello_tx);
1228                 return NULL;
1229         }
1230
1231         rc = kptllnd_peer_reserve_buffers();
1232         if (rc != 0) {
1233                 kptllnd_peer_decref(new_peer);
1234                 kptllnd_tx_decref(hello_tx);
1235
1236                 CERROR("Failed to reserve buffers for %s\n",
1237                        libcfs_id2str(lpid));
1238                 return NULL;
1239         }
1240
1241         write_lock_irqsave(g_lock, flags);
1242
1243  again:
1244         if (net->net_shutdown) {
1245                 write_unlock_irqrestore(g_lock, flags);
1246
1247                 CERROR ("Shutdown started, refusing connection from %s\n",
1248                         libcfs_id2str(lpid));
1249                 kptllnd_peer_unreserve_buffers();
1250                 kptllnd_peer_decref(new_peer);
1251                 kptllnd_tx_decref(hello_tx);
1252                 return NULL;
1253         }
1254
1255         peer = kptllnd_id2peer_locked(lpid);
1256         if (peer != NULL) {
1257                 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
1258                         /* An outgoing message instantiated 'peer' for me */
1259                         LASSERT(peer->peer_incarnation == 0);
1260
1261                         peer->peer_state = PEER_STATE_ACTIVE;
1262                         peer->peer_incarnation = msg->ptlm_srcstamp;
1263                         peer->peer_next_matchbits = safe_matchbits;
1264                         peer->peer_max_msg_size =
1265                                 msg->ptlm_u.hello.kptlhm_max_msg_size;
1266
1267                         write_unlock_irqrestore(g_lock, flags);
1268
1269                         CWARN("Outgoing instantiated peer %s\n",
1270                               libcfs_id2str(lpid));
1271                 } else {
1272                         LASSERT (peer->peer_state == PEER_STATE_ACTIVE);
1273
1274                         write_unlock_irqrestore(g_lock, flags);
1275
1276                         /* WOW!  Somehow this peer completed the HELLO
1277                          * handshake while I slept.  I guess I could have slept
1278                          * while it rebooted and sent a new HELLO, so I'll fail
1279                          * this one... */
1280                         CWARN("Wow! peer %s\n", libcfs_id2str(lpid));
1281                         kptllnd_peer_decref(peer);
1282                         peer = NULL;
1283                 }
1284
1285                 kptllnd_peer_unreserve_buffers();
1286                 kptllnd_peer_decref(new_peer);
1287                 kptllnd_tx_decref(hello_tx);
1288                 return peer;
1289         }
1290
1291         if (kptllnd_data.kptl_n_active_peers ==
1292             kptllnd_data.kptl_expected_peers) {
1293                 /* peer table full */
1294                 write_unlock_irqrestore(g_lock, flags);
1295
1296                 kptllnd_peertable_overflow_msg("Connection from ", lpid);
1297
1298                 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1299                 if (rc != 0) {
1300                         CERROR("Refusing connection from %s\n",
1301                                libcfs_id2str(lpid));
1302                         kptllnd_peer_unreserve_buffers();
1303                         kptllnd_peer_decref(new_peer);
1304                         kptllnd_tx_decref(hello_tx);
1305                         return NULL;
1306                 }
1307                 
1308                 write_lock_irqsave(g_lock, flags);
1309                 kptllnd_data.kptl_expected_peers++;
1310                 goto again;
1311         }
1312
1313         last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(lpid);
1314
1315         hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1316         hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1317                 *kptllnd_tunables.kptl_max_msg_size;
1318
1319         new_peer->peer_state = PEER_STATE_ACTIVE;
1320         new_peer->peer_incarnation = msg->ptlm_srcstamp;
1321         new_peer->peer_next_matchbits = safe_matchbits;
1322         new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1323         new_peer->peer_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size;
1324
1325         LASSERT (!net->net_shutdown);
1326         kptllnd_peer_add_peertable_locked(new_peer);
1327
1328         write_unlock_irqrestore(g_lock, flags);
1329
1330         /* NB someone else could get in now and post a message before I post
1331          * the HELLO, but post_tx/check_sends take care of that! */
1332
1333         CDEBUG(D_NETTRACE, "%s: post response hello %p\n",
1334                libcfs_id2str(new_peer->peer_id), hello_tx);
1335
1336         kptllnd_post_tx(new_peer, hello_tx, 0);
1337         kptllnd_peer_check_sends(new_peer);
1338
1339         return new_peer;
1340 }
1341
1342 void
1343 kptllnd_tx_launch(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
1344 {
1345         kptllnd_post_tx(peer, tx, nfrag);
1346         kptllnd_peer_check_sends(peer);
1347 }
1348
1349 int
1350 kptllnd_find_target(kptl_net_t *net, lnet_process_id_t target,
1351                     kptl_peer_t **peerp)
1352 {
1353         rwlock_t         *g_lock = &kptllnd_data.kptl_peer_rw_lock;
1354         ptl_process_id_t  ptl_id;
1355         kptl_peer_t      *new_peer;
1356         kptl_tx_t        *hello_tx;
1357         unsigned long     flags;
1358         int               rc;
1359         __u64             last_matchbits_seen;
1360
1361         /* I expect to find the peer, so I only take a read lock... */
1362         read_lock_irqsave(g_lock, flags);
1363         *peerp = kptllnd_id2peer_locked(target);
1364         read_unlock_irqrestore(g_lock, flags);
1365
1366         if (*peerp != NULL)
1367                 return 0;
1368         
1369         if ((target.pid & LNET_PID_USERFLAG) != 0) {
1370                 CWARN("Refusing to create a new connection to %s "
1371                       "(non-kernel peer)\n", libcfs_id2str(target));
1372                 return -EHOSTUNREACH;
1373         }
1374
1375         /* The new peer is a kernel ptllnd, and kernel ptllnds all have the
1376          * same portals PID, which has nothing to do with LUSTRE_SRV_LNET_PID */
1377         ptl_id.nid = kptllnd_lnet2ptlnid(target.nid);
1378         ptl_id.pid = kptllnd_data.kptl_portals_id.pid;
1379
1380         hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1381         if (hello_tx == NULL) {
1382                 CERROR("Unable to allocate connect message for %s\n",
1383                        libcfs_id2str(target));
1384                 return -ENOMEM;
1385         }
1386
1387         hello_tx->tx_acked = 1;
1388         kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1389                          target, sizeof(kptl_hello_msg_t));
1390
1391         new_peer = kptllnd_peer_allocate(net, target, ptl_id);
1392         if (new_peer == NULL) {
1393                 rc = -ENOMEM;
1394                 goto unwind_0;
1395         }
1396
1397         rc = kptllnd_peer_reserve_buffers();
1398         if (rc != 0)
1399                 goto unwind_1;
1400
1401         write_lock_irqsave(g_lock, flags);
1402  again:
1403         /* Called only in lnd_send which can't happen after lnd_shutdown */
1404         LASSERT (!net->net_shutdown);
1405
1406         *peerp = kptllnd_id2peer_locked(target);
1407         if (*peerp != NULL) {
1408                 write_unlock_irqrestore(g_lock, flags);
1409                 goto unwind_2;
1410         }
1411
1412         kptllnd_cull_peertable_locked(target);
1413
1414         if (kptllnd_data.kptl_n_active_peers ==
1415             kptllnd_data.kptl_expected_peers) {
1416                 /* peer table full */
1417                 write_unlock_irqrestore(g_lock, flags);
1418
1419                 kptllnd_peertable_overflow_msg("Connection to ", target);
1420
1421                 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1422                 if (rc != 0) {
1423                         CERROR("Can't create connection to %s\n",
1424                                libcfs_id2str(target));
1425                         rc = -ENOMEM;
1426                         goto unwind_2;
1427                 }
1428                 write_lock_irqsave(g_lock, flags);
1429                 kptllnd_data.kptl_expected_peers++;
1430                 goto again;
1431         }
1432
1433         last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(target);
1434
1435         hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1436         hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1437                 *kptllnd_tunables.kptl_max_msg_size;
1438                 
1439         new_peer->peer_state = PEER_STATE_WAITING_HELLO;
1440         new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1441         
1442         kptllnd_peer_add_peertable_locked(new_peer);
1443
1444         write_unlock_irqrestore(g_lock, flags);
1445
1446         /* NB someone else could get in now and post a message before I post
1447          * the HELLO, but post_tx/check_sends take care of that! */
1448
1449         CDEBUG(D_NETTRACE, "%s: post initial hello %p\n",
1450                libcfs_id2str(new_peer->peer_id), hello_tx);
1451
1452         kptllnd_post_tx(new_peer, hello_tx, 0);
1453         kptllnd_peer_check_sends(new_peer);
1454        
1455         *peerp = new_peer;
1456         return 0;
1457         
1458  unwind_2:
1459         kptllnd_peer_unreserve_buffers();
1460  unwind_1:
1461         kptllnd_peer_decref(new_peer);
1462  unwind_0:
1463         kptllnd_tx_decref(hello_tx);
1464
1465         return rc;
1466 }