Whamcloud - gitweb
i=maxim,b=18460,b=20171:
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd_peer.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/klnds/ptllnd/ptllnd_peer.c
37  *
38  * Author: PJ Kirner <pjkirner@clusterfs.com>
39  * Author: E Barton <eeb@bartonsoftware.com>
40  */
41
42 #include "ptllnd.h"
43 #include <libcfs/list.h>
44
45 static int
46 kptllnd_count_queue(struct list_head *q)
47 {
48         struct list_head *e;
49         int               n = 0;
50         
51         list_for_each(e, q) {
52                 n++;
53         }
54
55         return n;
56 }
57
58 int
59 kptllnd_get_peer_info(int index, 
60                       lnet_process_id_t *id,
61                       int *state, int *sent_hello,
62                       int *refcount, __u64 *incarnation,
63                       __u64 *next_matchbits, __u64 *last_matchbits_seen,
64                       int *nsendq, int *nactiveq,
65                       int *credits, int *outstanding_credits) 
66 {
67         rwlock_t         *g_lock = &kptllnd_data.kptl_peer_rw_lock;
68         unsigned long     flags;
69         struct list_head *ptmp;
70         kptl_peer_t      *peer;
71         int               i;
72         int               rc = -ENOENT;
73
74         read_lock_irqsave(g_lock, flags);
75
76         for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++) {
77                 list_for_each (ptmp, &kptllnd_data.kptl_peers[i]) {
78                         peer = list_entry(ptmp, kptl_peer_t, peer_list);
79
80                         if (index-- > 0)
81                                 continue;
82                         
83                         *id          = peer->peer_id;
84                         *state       = peer->peer_state;
85                         *sent_hello  = peer->peer_sent_hello;
86                         *refcount    = atomic_read(&peer->peer_refcount);
87                         *incarnation = peer->peer_incarnation;
88
89                         spin_lock(&peer->peer_lock);
90
91                         *next_matchbits      = peer->peer_next_matchbits;
92                         *last_matchbits_seen = peer->peer_last_matchbits_seen;
93                         *credits             = peer->peer_credits;
94                         *outstanding_credits = peer->peer_outstanding_credits;
95
96                         *nsendq   = kptllnd_count_queue(&peer->peer_sendq);
97                         *nactiveq = kptllnd_count_queue(&peer->peer_activeq);
98
99                         spin_unlock(&peer->peer_lock);
100
101                         rc = 0;
102                         goto out;
103                 }
104         }
105         
106  out:
107         read_unlock_irqrestore(g_lock, flags);
108         return rc;
109 }
110
111 void
112 kptllnd_peer_add_peertable_locked (kptl_peer_t *peer)
113 {
114         LASSERT (kptllnd_data.kptl_n_active_peers <
115                  kptllnd_data.kptl_expected_peers);
116
117         LASSERT (peer->peer_state == PEER_STATE_WAITING_HELLO ||
118                  peer->peer_state == PEER_STATE_ACTIVE);
119         
120         kptllnd_data.kptl_n_active_peers++;
121         atomic_inc(&peer->peer_refcount);       /* +1 ref for the list */
122
123         /* NB add to HEAD of peer list for MRU order!
124          * (see kptllnd_cull_peertable) */
125         list_add(&peer->peer_list, kptllnd_nid2peerlist(peer->peer_id.nid));
126 }
127
128 void
129 kptllnd_cull_peertable_locked (lnet_process_id_t pid)
130 {
131         /* I'm about to add a new peer with this portals ID to the peer table,
132          * so (a) this peer should not exist already and (b) I want to leave at
133          * most (max_procs_per_nid - 1) peers with this NID in the table. */
134         struct list_head  *peers = kptllnd_nid2peerlist(pid.nid);
135         int                cull_count = *kptllnd_tunables.kptl_max_procs_per_node;
136         int                count;
137         struct list_head  *tmp;
138         struct list_head  *nxt;
139         kptl_peer_t       *peer;
140         
141         count = 0;
142         list_for_each_safe (tmp, nxt, peers) {
143                 /* NB I rely on kptllnd_peer_add_peertable_locked to add peers
144                  * in MRU order */
145                 peer = list_entry(tmp, kptl_peer_t, peer_list);
146                         
147                 if (LNET_NIDADDR(peer->peer_id.nid) != LNET_NIDADDR(pid.nid))
148                         continue;
149
150                 LASSERT (peer->peer_id.pid != pid.pid);
151                         
152                 count++;
153
154                 if (count < cull_count) /* recent (don't cull) */
155                         continue;
156
157                 CDEBUG(D_NET, "Cull %s(%s)\n",
158                        libcfs_id2str(peer->peer_id),
159                        kptllnd_ptlid2str(peer->peer_ptlid));
160                 
161                 kptllnd_peer_close_locked(peer, 0);
162         }
163 }
164
165 kptl_peer_t *
166 kptllnd_peer_allocate (kptl_net_t *net, lnet_process_id_t lpid, ptl_process_id_t ppid)
167 {
168         unsigned long    flags;
169         kptl_peer_t     *peer;
170
171         LIBCFS_ALLOC(peer, sizeof (*peer));
172         if (peer == NULL) {
173                 CERROR("Can't create peer %s (%s)\n",
174                        libcfs_id2str(lpid), 
175                        kptllnd_ptlid2str(ppid));
176                 return NULL;
177         }
178
179         memset(peer, 0, sizeof(*peer));         /* zero flags etc */
180
181         INIT_LIST_HEAD (&peer->peer_noops);
182         INIT_LIST_HEAD (&peer->peer_sendq);
183         INIT_LIST_HEAD (&peer->peer_activeq);
184         spin_lock_init (&peer->peer_lock);
185
186         peer->peer_state = PEER_STATE_ALLOCATED;
187         peer->peer_error = 0;
188         peer->peer_last_alive = 0;
189         peer->peer_id = lpid;
190         peer->peer_ptlid = ppid;
191         peer->peer_credits = 1;                 /* enough for HELLO */
192         peer->peer_next_matchbits = PTL_RESERVED_MATCHBITS;
193         peer->peer_outstanding_credits = *kptllnd_tunables.kptl_peertxcredits - 1;
194         peer->peer_sent_credits = 1;           /* HELLO credit is implicit */
195         peer->peer_max_msg_size = PTLLND_MIN_BUFFER_SIZE; /* until we know better */
196
197         atomic_set(&peer->peer_refcount, 1);    /* 1 ref for caller */
198
199         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
200
201         peer->peer_myincarnation = kptllnd_data.kptl_incarnation;
202
203         /* Only increase # peers under lock, to guarantee we dont grow it
204          * during shutdown */
205         if (net->net_shutdown) {
206                 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
207                 LIBCFS_FREE(peer, sizeof(*peer));
208                 return NULL;
209         }
210
211         kptllnd_data.kptl_npeers++;
212         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
213         return peer;
214 }
215
216 void
217 kptllnd_peer_destroy (kptl_peer_t *peer)
218 {
219         unsigned long flags;
220         
221         CDEBUG(D_NET, "Peer=%p\n", peer);
222
223         LASSERT (!in_interrupt());
224         LASSERT (atomic_read(&peer->peer_refcount) == 0);
225         LASSERT (peer->peer_state == PEER_STATE_ALLOCATED ||
226                  peer->peer_state == PEER_STATE_ZOMBIE);
227         LASSERT (list_empty(&peer->peer_noops));
228         LASSERT (list_empty(&peer->peer_sendq));
229         LASSERT (list_empty(&peer->peer_activeq));
230
231         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
232
233         if (peer->peer_state == PEER_STATE_ZOMBIE)
234                 list_del(&peer->peer_list);
235
236         kptllnd_data.kptl_npeers--;
237
238         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
239
240         LIBCFS_FREE (peer, sizeof (*peer));
241 }
242
243 void
244 kptllnd_cancel_txlist (struct list_head *peerq, struct list_head *txs)
245 {
246         struct list_head  *tmp;
247         struct list_head  *nxt;
248         kptl_tx_t         *tx;
249
250         list_for_each_safe (tmp, nxt, peerq) {
251                 tx = list_entry(tmp, kptl_tx_t, tx_list);
252
253                 list_del(&tx->tx_list);
254                 list_add_tail(&tx->tx_list, txs);
255
256                 tx->tx_status = -EIO;
257                 tx->tx_active = 0;
258         }
259 }
260
261 void
262 kptllnd_peer_cancel_txs(kptl_peer_t *peer, struct list_head *txs)
263 {
264         unsigned long   flags;
265
266         spin_lock_irqsave(&peer->peer_lock, flags);
267
268         kptllnd_cancel_txlist(&peer->peer_noops, txs);
269         kptllnd_cancel_txlist(&peer->peer_sendq, txs);
270         kptllnd_cancel_txlist(&peer->peer_activeq, txs);
271                 
272         spin_unlock_irqrestore(&peer->peer_lock, flags);
273 }
274
275 void
276 kptllnd_peer_alive (kptl_peer_t *peer)
277 {
278         /* This is racy, but everyone's only writing cfs_time_current() */
279         peer->peer_last_alive = cfs_time_current();
280         mb();
281 }
282
283 void
284 kptllnd_peer_notify (kptl_peer_t *peer)
285 {
286         unsigned long flags;
287         kptl_net_t   *net;
288         kptl_net_t  **nets;
289         int           i = 0;
290         int           nnets = 0;
291         int           error = 0;
292         cfs_time_t    last_alive = 0;
293         
294         spin_lock_irqsave(&peer->peer_lock, flags);
295
296         if (peer->peer_error != 0) {
297                 error = peer->peer_error;
298                 peer->peer_error = 0;
299                 last_alive = peer->peer_last_alive;
300         }
301         
302         spin_unlock_irqrestore(&peer->peer_lock, flags);
303
304         if (error == 0)
305                 return;
306
307         read_lock(&kptllnd_data.kptl_net_rw_lock);
308         list_for_each_entry (net, &kptllnd_data.kptl_nets, net_list)
309                 nnets++;
310         read_unlock(&kptllnd_data.kptl_net_rw_lock);
311
312         if (nnets == 0) /* shutdown in progress */
313                 return;
314
315         LIBCFS_ALLOC(nets, nnets * sizeof(*nets));
316         if (nets == NULL) {
317                 CERROR("Failed to allocate nets[%d]\n", nnets);
318                 return;
319         }
320         memset(nets, 0, nnets * sizeof(*nets));
321
322         read_lock(&kptllnd_data.kptl_net_rw_lock);
323         i = 0;
324         list_for_each_entry (net, &kptllnd_data.kptl_nets, net_list) {
325                 LASSERT (i < nnets);
326                 nets[i] = net;
327                 kptllnd_net_addref(net);
328                 i++;
329         }
330         read_unlock(&kptllnd_data.kptl_net_rw_lock);
331
332         for (i = 0; i < nnets; i++) {
333                 lnet_nid_t peer_nid;
334
335                 net = nets[i];
336                 if (net == NULL)
337                         break;
338
339                 if (!net->net_shutdown) {
340                         peer_nid = kptllnd_ptl2lnetnid(net->net_ni->ni_nid,
341                                                        peer->peer_ptlid.nid);
342                         lnet_notify(net->net_ni, peer_nid, 0, last_alive);
343                 }
344
345                 kptllnd_net_decref(net);
346         }
347
348         LIBCFS_FREE(nets, nnets * sizeof(*nets));
349 }
350
351 void
352 kptllnd_handle_closing_peers ()
353 {
354         unsigned long           flags;
355         struct list_head        txs;
356         kptl_peer_t            *peer;
357         struct list_head       *tmp;
358         struct list_head       *nxt;
359         kptl_tx_t              *tx;
360         int                     idle;
361
362         /* Check with a read lock first to avoid blocking anyone */
363
364         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
365         idle = list_empty(&kptllnd_data.kptl_closing_peers) &&
366                list_empty(&kptllnd_data.kptl_zombie_peers);
367         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
368
369         if (idle)
370                 return;
371
372         INIT_LIST_HEAD(&txs);
373
374         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
375
376         /* Cancel txs on all zombie peers.  NB anyone dropping the last peer
377          * ref removes it from this list, so I musn't drop the lock while
378          * scanning it. */
379         list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
380                 peer = list_entry (tmp, kptl_peer_t, peer_list);
381
382                 LASSERT (peer->peer_state == PEER_STATE_ZOMBIE);
383
384                 kptllnd_peer_cancel_txs(peer, &txs);
385         }
386
387         /* Notify LNET and cancel txs on closing (i.e. newly closed) peers.  NB
388          * I'm the only one removing from this list, but peers can be added on
389          * the end any time I drop the lock. */
390
391         list_for_each_safe (tmp, nxt, &kptllnd_data.kptl_closing_peers) {
392                 peer = list_entry (tmp, kptl_peer_t, peer_list);
393
394                 LASSERT (peer->peer_state == PEER_STATE_CLOSING);
395
396                 list_del(&peer->peer_list);
397                 list_add_tail(&peer->peer_list,
398                               &kptllnd_data.kptl_zombie_peers);
399                 peer->peer_state = PEER_STATE_ZOMBIE;
400
401                 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
402
403                 kptllnd_peer_notify(peer);
404                 kptllnd_peer_cancel_txs(peer, &txs);
405                 kptllnd_peer_decref(peer);
406
407                 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
408         }
409
410         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
411
412         /* Drop peer's ref on all cancelled txs.  This will get
413          * kptllnd_tx_fini() to abort outstanding comms if necessary. */
414
415         list_for_each_safe (tmp, nxt, &txs) {
416                 tx = list_entry(tmp, kptl_tx_t, tx_list);
417                 list_del(&tx->tx_list);
418                 kptllnd_tx_decref(tx);
419         }
420 }
421
422 void
423 kptllnd_peer_close_locked(kptl_peer_t *peer, int why)
424 {
425         switch (peer->peer_state) {
426         default:
427                 LBUG();
428
429         case PEER_STATE_WAITING_HELLO:
430         case PEER_STATE_ACTIVE:
431                 /* Ensure new peers see a new incarnation of me */
432                 LASSERT(peer->peer_myincarnation <= kptllnd_data.kptl_incarnation);
433                 if (peer->peer_myincarnation == kptllnd_data.kptl_incarnation)
434                         kptllnd_data.kptl_incarnation++;
435
436                 /* Removing from peer table */
437                 kptllnd_data.kptl_n_active_peers--;
438                 LASSERT (kptllnd_data.kptl_n_active_peers >= 0);
439
440                 list_del(&peer->peer_list);
441                 kptllnd_peer_unreserve_buffers();
442
443                 peer->peer_error = why; /* stash 'why' only on first close */
444                 peer->peer_state = PEER_STATE_CLOSING;
445
446                 /* Schedule for immediate attention, taking peer table's ref */
447                 list_add_tail(&peer->peer_list, 
448                               &kptllnd_data.kptl_closing_peers);
449                 wake_up(&kptllnd_data.kptl_watchdog_waitq);
450                 break;
451
452         case PEER_STATE_ZOMBIE:
453         case PEER_STATE_CLOSING:
454                 break;
455         }
456 }
457
458 void
459 kptllnd_peer_close(kptl_peer_t *peer, int why)
460 {
461         unsigned long      flags;
462
463         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
464         kptllnd_peer_close_locked(peer, why);
465         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
466 }
467
468 int
469 kptllnd_peer_del(lnet_process_id_t id)
470 {
471         struct list_head  *ptmp;
472         struct list_head  *pnxt;
473         kptl_peer_t       *peer;
474         int                lo;
475         int                hi;
476         int                i;
477         unsigned long      flags;
478         int                rc = -ENOENT;
479
480         /*
481          * Find the single bucket we are supposed to look at or if nid is a
482          * wildcard (LNET_NID_ANY) then look at all of the buckets
483          */
484         if (id.nid != LNET_NID_ANY) {
485                 struct list_head *l = kptllnd_nid2peerlist(id.nid);
486                 
487                 lo = hi =  l - kptllnd_data.kptl_peers;
488         } else {
489                 if (id.pid != LNET_PID_ANY)
490                         return -EINVAL;
491                 
492                 lo = 0;
493                 hi = kptllnd_data.kptl_peer_hash_size - 1;
494         }
495
496 again:
497         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
498
499         for (i = lo; i <= hi; i++) {
500                 list_for_each_safe (ptmp, pnxt, &kptllnd_data.kptl_peers[i]) {
501                         peer = list_entry (ptmp, kptl_peer_t, peer_list);
502
503                         if (!(id.nid == LNET_NID_ANY || 
504                               (LNET_NIDADDR(peer->peer_id.nid) == LNET_NIDADDR(id.nid) &&
505                                (id.pid == LNET_PID_ANY || 
506                                 peer->peer_id.pid == id.pid))))
507                                 continue;
508
509                         kptllnd_peer_addref(peer); /* 1 ref for me... */
510
511                         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
512                                                flags);
513
514                         kptllnd_peer_close(peer, 0);
515                         kptllnd_peer_decref(peer); /* ...until here */
516
517                         rc = 0;         /* matched something */
518
519                         /* start again now I've dropped the lock */
520                         goto again;
521                 }
522         }
523
524         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
525
526         return (rc);
527 }
528
529 void
530 kptllnd_queue_tx(kptl_peer_t *peer, kptl_tx_t *tx)
531 {
532         /* CAVEAT EMPTOR: I take over caller's ref on 'tx' */
533         unsigned long flags;
534
535         spin_lock_irqsave(&peer->peer_lock, flags);
536
537         /* Ensure HELLO is sent first */
538         if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_NOOP)
539                 list_add(&tx->tx_list, &peer->peer_noops);
540         else if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_HELLO)
541                 list_add(&tx->tx_list, &peer->peer_sendq);
542         else
543                 list_add_tail(&tx->tx_list, &peer->peer_sendq);
544
545         spin_unlock_irqrestore(&peer->peer_lock, flags);
546 }
547
548
549 void
550 kptllnd_post_tx(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
551 {
552         /* CAVEAT EMPTOR: I take over caller's ref on 'tx' */
553         ptl_handle_md_t  msg_mdh;
554         ptl_md_t         md;
555         ptl_err_t        prc;
556
557         LASSERT (!tx->tx_idle);
558         LASSERT (!tx->tx_active);
559         LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
560         LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
561         LASSERT (tx->tx_type == TX_TYPE_SMALL_MESSAGE ||
562                  tx->tx_type == TX_TYPE_PUT_REQUEST ||
563                  tx->tx_type == TX_TYPE_GET_REQUEST);
564
565         kptllnd_set_tx_peer(tx, peer);
566
567         memset(&md, 0, sizeof(md));
568
569         md.threshold = tx->tx_acked ? 2 : 1;    /* SEND END + ACK? */
570         md.options = PTL_MD_OP_PUT |
571                      PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
572                      PTL_MD_EVENT_START_DISABLE;
573         md.user_ptr = &tx->tx_msg_eventarg;
574         md.eq_handle = kptllnd_data.kptl_eqh;
575
576         if (nfrag == 0) {
577                 md.start = tx->tx_msg;
578                 md.length = tx->tx_msg->ptlm_nob;
579         } else {
580                 LASSERT (nfrag > 1);
581                 LASSERT (tx->tx_frags->iov[0].iov_base == (void *)tx->tx_msg);
582
583                 md.start = tx->tx_frags;
584                 md.length = nfrag;
585                 md.options |= PTL_MD_IOVEC;
586         }
587
588         prc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &msg_mdh);
589         if (prc != PTL_OK) {
590                 CERROR("PtlMDBind(%s) failed: %s(%d)\n",
591                        libcfs_id2str(peer->peer_id),
592                        kptllnd_errtype2str(prc), prc);
593                 tx->tx_status = -EIO;
594                 kptllnd_tx_decref(tx);
595                 return;
596         }
597
598
599         tx->tx_deadline = jiffies + (*kptllnd_tunables.kptl_timeout * HZ);
600         tx->tx_active = 1;
601         tx->tx_msg_mdh = msg_mdh;
602         kptllnd_queue_tx(peer, tx);
603 }
604
605 /* NB "restarts" comes from peer_sendq of a single peer */
606 void
607 kptllnd_restart_txs (kptl_net_t *net, lnet_process_id_t target, struct list_head *restarts)
608 {
609         kptl_tx_t   *tx;
610         kptl_tx_t   *tmp;
611         kptl_peer_t *peer;
612
613         LASSERT (!list_empty(restarts));
614
615         if (kptllnd_find_target(net, target, &peer) != 0)
616                 peer = NULL;
617
618         list_for_each_entry_safe (tx, tmp, restarts, tx_list) {
619                 LASSERT (tx->tx_peer != NULL);
620                 LASSERT (tx->tx_type == TX_TYPE_GET_REQUEST ||
621                          tx->tx_type == TX_TYPE_PUT_REQUEST ||
622                          tx->tx_type == TX_TYPE_SMALL_MESSAGE);
623
624                 list_del_init(&tx->tx_list);
625
626                 if (peer == NULL ||
627                     tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {
628                         kptllnd_tx_decref(tx);
629                         continue;
630                 }
631
632                 LASSERT (tx->tx_msg->ptlm_type != PTLLND_MSG_TYPE_NOOP);
633                 tx->tx_status = 0;
634                 tx->tx_active = 1;
635                 kptllnd_peer_decref(tx->tx_peer);
636                 tx->tx_peer = NULL;
637                 kptllnd_set_tx_peer(tx, peer);
638                 kptllnd_queue_tx(peer, tx); /* takes over my ref on tx */
639         }
640
641         if (peer == NULL)
642                 return;
643
644         kptllnd_peer_check_sends(peer);
645         kptllnd_peer_decref(peer);
646 }
647
648 static inline int
649 kptllnd_peer_send_noop (kptl_peer_t *peer)
650 {
651         if (!peer->peer_sent_hello ||
652             peer->peer_credits == 0 ||
653             !list_empty(&peer->peer_noops) ||
654             peer->peer_outstanding_credits < PTLLND_CREDIT_HIGHWATER)
655                 return 0;
656
657         /* No tx to piggyback NOOP onto or no credit to send a tx */
658         return (list_empty(&peer->peer_sendq) || peer->peer_credits == 1);
659 }
660
661 void
662 kptllnd_peer_check_sends (kptl_peer_t *peer)
663 {
664         ptl_handle_me_t  meh;
665         kptl_tx_t       *tx;
666         int              rc;
667         int              msg_type;
668         unsigned long    flags;
669
670         LASSERT(!in_interrupt());
671
672         spin_lock_irqsave(&peer->peer_lock, flags);
673
674         peer->peer_retry_noop = 0;
675
676         if (kptllnd_peer_send_noop(peer)) {
677                 /* post a NOOP to return credits */
678                 spin_unlock_irqrestore(&peer->peer_lock, flags);
679
680                 tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
681                 if (tx == NULL) {
682                         CERROR("Can't return credits to %s: can't allocate descriptor\n",
683                                libcfs_id2str(peer->peer_id));
684                 } else {
685                         kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_NOOP,
686                                          peer->peer_id, 0);
687                         kptllnd_post_tx(peer, tx, 0);
688                 }
689
690                 spin_lock_irqsave(&peer->peer_lock, flags);
691                 peer->peer_retry_noop = (tx == NULL);
692         }
693
694         for (;;) {
695                 if (!list_empty(&peer->peer_noops)) {
696                         LASSERT (peer->peer_sent_hello);
697                         tx = list_entry(peer->peer_noops.next,
698                                         kptl_tx_t, tx_list);
699                 } else if (!list_empty(&peer->peer_sendq)) {
700                         tx = list_entry(peer->peer_sendq.next,
701                                         kptl_tx_t, tx_list);
702                 } else {
703                         /* nothing to send right now */
704                         break;
705                 }
706
707                 LASSERT (tx->tx_active);
708                 LASSERT (!PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
709                 LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
710
711                 LASSERT (peer->peer_outstanding_credits >= 0);
712                 LASSERT (peer->peer_sent_credits >= 0);
713                 LASSERT (peer->peer_sent_credits +
714                          peer->peer_outstanding_credits <=
715                          *kptllnd_tunables.kptl_peertxcredits);
716                 LASSERT (peer->peer_credits >= 0);
717
718                 msg_type = tx->tx_msg->ptlm_type;
719
720                 /* Ensure HELLO is sent first */
721                 if (!peer->peer_sent_hello) {
722                         LASSERT (list_empty(&peer->peer_noops));
723                         if (msg_type != PTLLND_MSG_TYPE_HELLO)
724                                 break;
725                         peer->peer_sent_hello = 1;
726                 }
727
728                 if (peer->peer_credits == 0) {
729                         CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: no credits for %s[%p]\n",
730                                libcfs_id2str(peer->peer_id), 
731                                peer->peer_credits,
732                                peer->peer_outstanding_credits, 
733                                peer->peer_sent_credits, 
734                                kptllnd_msgtype2str(msg_type), tx);
735                         break;
736                 }
737
738                 /* Last/Initial credit reserved for NOOP/HELLO */
739                 if (peer->peer_credits == 1 &&
740                     msg_type != PTLLND_MSG_TYPE_HELLO &&
741                     msg_type != PTLLND_MSG_TYPE_NOOP) {
742                         CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: "
743                                "not using last credit for %s[%p]\n",
744                                libcfs_id2str(peer->peer_id), 
745                                peer->peer_credits,
746                                peer->peer_outstanding_credits,
747                                peer->peer_sent_credits,
748                                kptllnd_msgtype2str(msg_type), tx);
749                         break;
750                 }
751
752                 list_del(&tx->tx_list);
753
754                 /* Discard any NOOP I queued if I'm not at the high-water mark
755                  * any more or more messages have been queued */
756                 if (msg_type == PTLLND_MSG_TYPE_NOOP &&
757                     !kptllnd_peer_send_noop(peer)) {
758                         tx->tx_active = 0;
759
760                         spin_unlock_irqrestore(&peer->peer_lock, flags);
761
762                         CDEBUG(D_NET, "%s: redundant noop\n", 
763                                libcfs_id2str(peer->peer_id));
764                         kptllnd_tx_decref(tx);
765
766                         spin_lock_irqsave(&peer->peer_lock, flags);
767                         continue;
768                 }
769
770                 /* fill last-minute msg fields */
771                 kptllnd_msg_pack(tx->tx_msg, peer);
772
773                 if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
774                     tx->tx_type == TX_TYPE_GET_REQUEST) {
775                         /* peer_next_matchbits must be known good */
776                         LASSERT (peer->peer_state >= PEER_STATE_ACTIVE);
777                         /* Assume 64-bit matchbits can't wrap */
778                         LASSERT (peer->peer_next_matchbits >= PTL_RESERVED_MATCHBITS);
779                         tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits =
780                                 peer->peer_next_matchbits++;
781                 }
782
783                 peer->peer_sent_credits += peer->peer_outstanding_credits;
784                 peer->peer_outstanding_credits = 0;
785                 peer->peer_credits--;
786
787                 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: %s tx=%p nob=%d cred=%d\n",
788                        libcfs_id2str(peer->peer_id), peer->peer_credits,
789                        peer->peer_outstanding_credits, peer->peer_sent_credits,
790                        kptllnd_msgtype2str(msg_type), tx, tx->tx_msg->ptlm_nob,
791                        tx->tx_msg->ptlm_credits);
792
793                 list_add_tail(&tx->tx_list, &peer->peer_activeq);
794
795                 kptllnd_tx_addref(tx);          /* 1 ref for me... */
796
797                 spin_unlock_irqrestore(&peer->peer_lock, flags);
798
799                 if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
800                     tx->tx_type == TX_TYPE_GET_REQUEST) {
801                         /* Post bulk now we have safe matchbits */
802                         rc = PtlMEAttach(kptllnd_data.kptl_nih,
803                                          *kptllnd_tunables.kptl_portal,
804                                          peer->peer_ptlid,
805                                          tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits,
806                                          0,             /* ignore bits */
807                                          PTL_UNLINK,
808                                          PTL_INS_BEFORE,
809                                          &meh);
810                         if (rc != PTL_OK) {
811                                 CERROR("PtlMEAttach(%s) failed: %s(%d)\n",
812                                        libcfs_id2str(peer->peer_id),
813                                        kptllnd_errtype2str(rc), rc);
814                                 goto failed;
815                         }
816
817                         rc = PtlMDAttach(meh, tx->tx_rdma_md, PTL_UNLINK,
818                                          &tx->tx_rdma_mdh);
819                         if (rc != PTL_OK) {
820                                 CERROR("PtlMDAttach(%s) failed: %s(%d)\n",
821                                        libcfs_id2str(tx->tx_peer->peer_id),
822                                        kptllnd_errtype2str(rc), rc);
823                                 rc = PtlMEUnlink(meh);
824                                 LASSERT(rc == PTL_OK);
825                                 tx->tx_rdma_mdh = PTL_INVALID_HANDLE;
826                                 goto failed;
827                         }
828                         /* I'm not racing with the event callback here.  It's a
829                          * bug if there's an event on the MD I just attached
830                          * before I actually send the RDMA request message -
831                          * probably matchbits re-used in error. */
832                 }
833
834                 tx->tx_tposted = jiffies;       /* going on the wire */
835
836                 rc = PtlPut (tx->tx_msg_mdh,
837                              tx->tx_acked ? PTL_ACK_REQ : PTL_NOACK_REQ,
838                              peer->peer_ptlid,
839                              *kptllnd_tunables.kptl_portal,
840                              0,                 /* acl cookie */
841                              LNET_MSG_MATCHBITS,
842                              0,                 /* offset */
843                              0);                /* header data */
844                 if (rc != PTL_OK) {
845                         CERROR("PtlPut %s error %s(%d)\n",
846                                libcfs_id2str(peer->peer_id),
847                                kptllnd_errtype2str(rc), rc);
848                         goto failed;
849                 }
850
851                 kptllnd_tx_decref(tx);          /* drop my ref */
852
853                 spin_lock_irqsave(&peer->peer_lock, flags);
854         }
855
856         spin_unlock_irqrestore(&peer->peer_lock, flags);
857         return;
858
859  failed:
860         /* Nuke everything (including tx we were trying) */
861         kptllnd_peer_close(peer, -EIO);
862         kptllnd_tx_decref(tx);
863         kptllnd_schedule_ptltrace_dump();
864 }
865
866 kptl_tx_t *
867 kptllnd_find_timed_out_tx(kptl_peer_t *peer)
868 {
869         kptl_tx_t         *tx;
870         struct list_head  *ele;
871
872         list_for_each(ele, &peer->peer_sendq) {
873                 tx = list_entry(ele, kptl_tx_t, tx_list);
874
875                 if (time_after_eq(jiffies, tx->tx_deadline)) {
876                         kptllnd_tx_addref(tx);
877                         return tx;
878                 }
879         }
880
881         list_for_each(ele, &peer->peer_activeq) {
882                 tx = list_entry(ele, kptl_tx_t, tx_list);
883
884                 if (time_after_eq(jiffies, tx->tx_deadline)) {
885                         kptllnd_tx_addref(tx);
886                         return tx;
887                 }
888         }
889
890         return NULL;
891 }
892
893
894 void
895 kptllnd_peer_check_bucket (int idx, int stamp)
896 {
897         struct list_head  *peers = &kptllnd_data.kptl_peers[idx];
898         kptl_peer_t       *peer;
899         unsigned long      flags;
900
901         CDEBUG(D_NET, "Bucket=%d, stamp=%d\n", idx, stamp);
902
903  again:
904         /* NB. Shared lock while I just look */
905         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
906
907         list_for_each_entry (peer, peers, peer_list) {
908                 kptl_tx_t *tx;
909                 int        check_sends;
910                 int        c = -1, oc = -1, sc = -1;
911                 int        nsend = -1, nactive = -1;
912                 int        sent_hello = -1, state = -1;
913
914                 CDEBUG(D_NET, "Peer=%s Credits=%d Outstanding=%d Send=%d\n",
915                        libcfs_id2str(peer->peer_id), peer->peer_credits, 
916                        peer->peer_outstanding_credits, peer->peer_sent_credits);
917
918                 spin_lock(&peer->peer_lock);
919
920                 if (peer->peer_check_stamp == stamp) {
921                         /* checked already this pass */
922                         spin_unlock(&peer->peer_lock);
923                         continue;
924                 }
925
926                 peer->peer_check_stamp = stamp;
927                 tx = kptllnd_find_timed_out_tx(peer);
928                 check_sends = peer->peer_retry_noop;
929                 
930                 if (tx != NULL) {
931                         c  = peer->peer_credits;
932                         sc = peer->peer_sent_credits;
933                         oc = peer->peer_outstanding_credits;
934                         state      = peer->peer_state;
935                         sent_hello = peer->peer_sent_hello;
936                         nsend   = kptllnd_count_queue(&peer->peer_sendq);
937                         nactive = kptllnd_count_queue(&peer->peer_activeq);
938                 }
939
940                 spin_unlock(&peer->peer_lock);
941                 
942                 if (tx == NULL && !check_sends)
943                         continue;
944
945                 kptllnd_peer_addref(peer); /* 1 ref for me... */
946
947                 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
948
949                 if (tx == NULL) { /* nothing timed out */
950                         kptllnd_peer_check_sends(peer);
951                         kptllnd_peer_decref(peer); /* ...until here or... */
952
953                         /* rescan after dropping the lock */
954                         goto again;
955                 }
956
957                 LCONSOLE_ERROR_MSG(0x126, "Timing out %s: %s\n",
958                                    libcfs_id2str(peer->peer_id),
959                                    (tx->tx_tposted == 0) ? 
960                                    "no free peer buffers" : 
961                                    "please check Portals");
962
963                 if (tx->tx_tposted) {
964                         CERROR("Could not send to %s after %ds (sent %lds ago); "
965                                 "check Portals for possible issues\n",
966                                 libcfs_id2str(peer->peer_id),
967                                 *kptllnd_tunables.kptl_timeout,
968                                 cfs_duration_sec(jiffies - tx->tx_tposted));
969                 } else if (state < PEER_STATE_ACTIVE) {
970                         CERROR("Could not connect %s (%d) after %ds; "
971                                "peer might be down\n",
972                                libcfs_id2str(peer->peer_id), state,
973                                *kptllnd_tunables.kptl_timeout);
974                 } else {
975                         CERROR("Could not get credits for %s after %ds; "
976                                 "possible Lustre networking issues\n",
977                         libcfs_id2str(peer->peer_id),
978                         *kptllnd_tunables.kptl_timeout);
979                 }
980
981                 CERROR("%s timed out: cred %d outstanding %d, sent %d, "
982                        "state %d, sent_hello %d, sendq %d, activeq %d "
983                        "Tx %p %s %s (%s%s%s) status %d %sposted %lu T/O %ds\n",
984                        libcfs_id2str(peer->peer_id), c, oc, sc,
985                        state, sent_hello, nsend, nactive,
986                        tx, kptllnd_tx_typestr(tx->tx_type),
987                        kptllnd_msgtype2str(tx->tx_msg->ptlm_type),
988                        tx->tx_active ? "A" : "",
989                        PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ?
990                        "" : "M",
991                        PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE) ?
992                        "" : "D",
993                        tx->tx_status,
994                        (tx->tx_tposted == 0) ? "not " : "",
995                        (tx->tx_tposted == 0) ? 0UL : (jiffies - tx->tx_tposted),
996                        *kptllnd_tunables.kptl_timeout);
997
998 #ifdef CRAY_XT3
999                 if (*kptllnd_tunables.kptl_ptltrace_on_timeout)
1000                         kptllnd_dump_ptltrace();
1001 #endif
1002
1003                 kptllnd_tx_decref(tx);
1004
1005                 kptllnd_peer_close(peer, -ETIMEDOUT);
1006                 kptllnd_peer_decref(peer); /* ...until here */
1007
1008                 /* start again now I've dropped the lock */
1009                 goto again;
1010         }
1011
1012         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
1013 }
1014
1015 kptl_peer_t *
1016 kptllnd_id2peer_locked (lnet_process_id_t id)
1017 {
1018         struct list_head *peers = kptllnd_nid2peerlist(id.nid);
1019         struct list_head *tmp;
1020         kptl_peer_t      *peer;
1021
1022         list_for_each (tmp, peers) {
1023                 peer = list_entry (tmp, kptl_peer_t, peer_list);
1024
1025                 LASSERT(peer->peer_state == PEER_STATE_WAITING_HELLO ||
1026                         peer->peer_state == PEER_STATE_ACTIVE);
1027
1028                 /* NB logical LNet peers share one kptl_peer_t */
1029                 if (peer->peer_id.pid != id.pid ||
1030                     LNET_NIDADDR(id.nid) != LNET_NIDADDR(peer->peer_id.nid))
1031                         continue;
1032
1033                 kptllnd_peer_addref(peer);
1034
1035                 CDEBUG(D_NET, "%s -> %s (%d)\n",
1036                        libcfs_id2str(id),
1037                        kptllnd_ptlid2str(peer->peer_ptlid),
1038                        atomic_read (&peer->peer_refcount));
1039                 return peer;
1040         }
1041
1042         return NULL;
1043 }
1044
1045 void
1046 kptllnd_peertable_overflow_msg(char *str, lnet_process_id_t id)
1047 {
1048         LCONSOLE_ERROR_MSG(0x127, "%s %s overflows the peer table[%d]: "
1049                            "messages may be dropped\n",
1050                            str, libcfs_id2str(id),
1051                            kptllnd_data.kptl_n_active_peers);
1052         LCONSOLE_ERROR_MSG(0x128, "Please correct by increasing "
1053                            "'max_nodes' or 'max_procs_per_node'\n");
1054 }
1055
1056 __u64
1057 kptllnd_get_last_seen_matchbits_locked(lnet_process_id_t lpid)
1058 {
1059         kptl_peer_t            *peer;
1060         struct list_head       *tmp;
1061
1062         /* Find the last matchbits I saw this new peer using.  Note..
1063            A. This peer cannot be in the peer table - she's new!
1064            B. If I can't find the peer in the closing/zombie peers, all
1065               matchbits are safe because all refs to the (old) peer have gone
1066               so all txs have completed so there's no risk of matchbit
1067               collision!
1068          */
1069
1070         LASSERT(kptllnd_id2peer_locked(lpid) == NULL);
1071
1072         /* peer's last matchbits can't change after it comes out of the peer
1073          * table, so first match is fine */
1074
1075         list_for_each (tmp, &kptllnd_data.kptl_closing_peers) {
1076                 peer = list_entry (tmp, kptl_peer_t, peer_list);
1077
1078                 if (LNET_NIDADDR(peer->peer_id.nid) == LNET_NIDADDR(lpid.nid) &&
1079                     peer->peer_id.pid == lpid.pid)
1080                         return peer->peer_last_matchbits_seen;
1081         }
1082
1083         list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
1084                 peer = list_entry (tmp, kptl_peer_t, peer_list);
1085
1086                 if (LNET_NIDADDR(peer->peer_id.nid) == LNET_NIDADDR(lpid.nid) &&
1087                     peer->peer_id.pid == lpid.pid)
1088                         return peer->peer_last_matchbits_seen;
1089         }
1090
1091         return PTL_RESERVED_MATCHBITS;
1092 }
1093
1094 kptl_peer_t *
1095 kptllnd_peer_handle_hello (kptl_net_t *net,
1096                            ptl_process_id_t initiator, kptl_msg_t *msg)
1097 {
1098         rwlock_t           *g_lock = &kptllnd_data.kptl_peer_rw_lock;
1099         kptl_peer_t        *peer;
1100         kptl_peer_t        *new_peer;
1101         lnet_process_id_t   lpid;
1102         unsigned long       flags;
1103         kptl_tx_t          *hello_tx;
1104         int                 rc;
1105         __u64               safe_matchbits;
1106         __u64               last_matchbits_seen;
1107
1108         lpid.nid = msg->ptlm_srcnid;
1109         lpid.pid = msg->ptlm_srcpid;
1110
1111         CDEBUG(D_NET, "hello from %s(%s)\n",
1112                libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
1113
1114         if (initiator.pid != kptllnd_data.kptl_portals_id.pid &&
1115             (msg->ptlm_srcpid & LNET_PID_USERFLAG) == 0) {
1116                 /* If the peer's PID isn't _the_ ptllnd kernel pid, she must be
1117                  * userspace.  Refuse the connection if she hasn't set the
1118                  * correct flag in her PID... */
1119                 CERROR("Userflag not set in hello from %s (%s)\n",
1120                        libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
1121                 return NULL;
1122         }
1123         
1124         /* kptlhm_matchbits are the highest matchbits my peer may have used to
1125          * RDMA to me.  I ensure I never register buffers for RDMA that could
1126          * match any she used */
1127         safe_matchbits = msg->ptlm_u.hello.kptlhm_matchbits + 1;
1128
1129         if (safe_matchbits < PTL_RESERVED_MATCHBITS) {
1130                 CERROR("Illegal matchbits "LPX64" in HELLO from %s\n",
1131                        safe_matchbits, libcfs_id2str(lpid));
1132                 return NULL;
1133         }
1134         
1135         if (msg->ptlm_u.hello.kptlhm_max_msg_size < PTLLND_MIN_BUFFER_SIZE) {
1136                 CERROR("%s: max message size %d < MIN %d",
1137                        libcfs_id2str(lpid),
1138                        msg->ptlm_u.hello.kptlhm_max_msg_size,
1139                        PTLLND_MIN_BUFFER_SIZE);
1140                 return NULL;
1141         }
1142
1143         if (msg->ptlm_credits <= 1) {
1144                 CERROR("Need more than 1+%d credits from %s\n",
1145                        msg->ptlm_credits, libcfs_id2str(lpid));
1146                 return NULL;
1147         }
1148         
1149         write_lock_irqsave(g_lock, flags);
1150
1151         peer = kptllnd_id2peer_locked(lpid);
1152         if (peer != NULL) {
1153                 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
1154                         /* Completing HELLO handshake */
1155                         LASSERT(peer->peer_incarnation == 0);
1156
1157                         if (msg->ptlm_dststamp != 0 &&
1158                             msg->ptlm_dststamp != peer->peer_myincarnation) {
1159                                 write_unlock_irqrestore(g_lock, flags);
1160
1161                                 CERROR("Ignoring HELLO from %s: unexpected "
1162                                        "dststamp "LPX64" ("LPX64" wanted)\n",
1163                                        libcfs_id2str(lpid),
1164                                        msg->ptlm_dststamp,
1165                                        peer->peer_myincarnation);
1166                                 kptllnd_peer_decref(peer);
1167                                 return NULL;
1168                         }
1169                         
1170                         /* Concurrent initiation or response to my HELLO */
1171                         peer->peer_state = PEER_STATE_ACTIVE;
1172                         peer->peer_incarnation = msg->ptlm_srcstamp;
1173                         peer->peer_next_matchbits = safe_matchbits;
1174                         peer->peer_max_msg_size =
1175                                 msg->ptlm_u.hello.kptlhm_max_msg_size;
1176                         
1177                         write_unlock_irqrestore(g_lock, flags);
1178                         return peer;
1179                 }
1180
1181                 if (msg->ptlm_dststamp != 0 &&
1182                     msg->ptlm_dststamp <= peer->peer_myincarnation) {
1183                         write_unlock_irqrestore(g_lock, flags);
1184
1185                         CERROR("Ignoring stale HELLO from %s: "
1186                                "dststamp "LPX64" (current "LPX64")\n",
1187                                libcfs_id2str(lpid),
1188                                msg->ptlm_dststamp,
1189                                peer->peer_myincarnation);
1190                         kptllnd_peer_decref(peer);
1191                         return NULL;
1192                 }
1193
1194                 /* Brand new connection attempt: remove old incarnation */
1195                 kptllnd_peer_close_locked(peer, 0);
1196         }
1197
1198         kptllnd_cull_peertable_locked(lpid);
1199
1200         write_unlock_irqrestore(g_lock, flags);
1201
1202         if (peer != NULL) {
1203                 CDEBUG(D_NET, "Peer %s (%s) reconnecting:"
1204                        " stamp "LPX64"("LPX64")\n",
1205                        libcfs_id2str(lpid), kptllnd_ptlid2str(initiator),
1206                        msg->ptlm_srcstamp, peer->peer_incarnation);
1207
1208                 kptllnd_peer_decref(peer);
1209                 peer = NULL;
1210         }
1211
1212         hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1213         if (hello_tx == NULL) {
1214                 CERROR("Unable to allocate HELLO message for %s\n",
1215                        libcfs_id2str(lpid));
1216                 return NULL;
1217         }
1218
1219         kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1220                          lpid, sizeof(kptl_hello_msg_t));
1221
1222         new_peer = kptllnd_peer_allocate(net, lpid, initiator);
1223         if (new_peer == NULL) {
1224                 kptllnd_tx_decref(hello_tx);
1225                 return NULL;
1226         }
1227
1228         rc = kptllnd_peer_reserve_buffers();
1229         if (rc != 0) {
1230                 kptllnd_peer_decref(new_peer);
1231                 kptllnd_tx_decref(hello_tx);
1232
1233                 CERROR("Failed to reserve buffers for %s\n",
1234                        libcfs_id2str(lpid));
1235                 return NULL;
1236         }
1237
1238         write_lock_irqsave(g_lock, flags);
1239
1240  again:
1241         if (net->net_shutdown) {
1242                 write_unlock_irqrestore(g_lock, flags);
1243
1244                 CERROR ("Shutdown started, refusing connection from %s\n",
1245                         libcfs_id2str(lpid));
1246                 kptllnd_peer_unreserve_buffers();
1247                 kptllnd_peer_decref(new_peer);
1248                 kptllnd_tx_decref(hello_tx);
1249                 return NULL;
1250         }
1251
1252         peer = kptllnd_id2peer_locked(lpid);
1253         if (peer != NULL) {
1254                 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
1255                         /* An outgoing message instantiated 'peer' for me */
1256                         LASSERT(peer->peer_incarnation == 0);
1257
1258                         peer->peer_state = PEER_STATE_ACTIVE;
1259                         peer->peer_incarnation = msg->ptlm_srcstamp;
1260                         peer->peer_next_matchbits = safe_matchbits;
1261                         peer->peer_max_msg_size =
1262                                 msg->ptlm_u.hello.kptlhm_max_msg_size;
1263
1264                         write_unlock_irqrestore(g_lock, flags);
1265
1266                         CWARN("Outgoing instantiated peer %s\n",
1267                               libcfs_id2str(lpid));
1268                 } else {
1269                         LASSERT (peer->peer_state == PEER_STATE_ACTIVE);
1270
1271                         write_unlock_irqrestore(g_lock, flags);
1272
1273                         /* WOW!  Somehow this peer completed the HELLO
1274                          * handshake while I slept.  I guess I could have slept
1275                          * while it rebooted and sent a new HELLO, so I'll fail
1276                          * this one... */
1277                         CWARN("Wow! peer %s\n", libcfs_id2str(lpid));
1278                         kptllnd_peer_decref(peer);
1279                         peer = NULL;
1280                 }
1281
1282                 kptllnd_peer_unreserve_buffers();
1283                 kptllnd_peer_decref(new_peer);
1284                 kptllnd_tx_decref(hello_tx);
1285                 return peer;
1286         }
1287
1288         if (kptllnd_data.kptl_n_active_peers ==
1289             kptllnd_data.kptl_expected_peers) {
1290                 /* peer table full */
1291                 write_unlock_irqrestore(g_lock, flags);
1292
1293                 kptllnd_peertable_overflow_msg("Connection from ", lpid);
1294
1295                 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1296                 if (rc != 0) {
1297                         CERROR("Refusing connection from %s\n",
1298                                libcfs_id2str(lpid));
1299                         kptllnd_peer_unreserve_buffers();
1300                         kptllnd_peer_decref(new_peer);
1301                         kptllnd_tx_decref(hello_tx);
1302                         return NULL;
1303                 }
1304                 
1305                 write_lock_irqsave(g_lock, flags);
1306                 kptllnd_data.kptl_expected_peers++;
1307                 goto again;
1308         }
1309
1310         last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(lpid);
1311
1312         hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1313         hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1314                 *kptllnd_tunables.kptl_max_msg_size;
1315
1316         new_peer->peer_state = PEER_STATE_ACTIVE;
1317         new_peer->peer_incarnation = msg->ptlm_srcstamp;
1318         new_peer->peer_next_matchbits = safe_matchbits;
1319         new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1320         new_peer->peer_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size;
1321
1322         LASSERT (!net->net_shutdown);
1323         kptllnd_peer_add_peertable_locked(new_peer);
1324
1325         write_unlock_irqrestore(g_lock, flags);
1326
1327         /* NB someone else could get in now and post a message before I post
1328          * the HELLO, but post_tx/check_sends take care of that! */
1329
1330         CDEBUG(D_NETTRACE, "%s: post response hello %p\n",
1331                libcfs_id2str(new_peer->peer_id), hello_tx);
1332
1333         kptllnd_post_tx(new_peer, hello_tx, 0);
1334         kptllnd_peer_check_sends(new_peer);
1335
1336         return new_peer;
1337 }
1338
1339 void
1340 kptllnd_tx_launch(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
1341 {
1342         kptllnd_post_tx(peer, tx, nfrag);
1343         kptllnd_peer_check_sends(peer);
1344 }
1345
1346 int
1347 kptllnd_find_target(kptl_net_t *net, lnet_process_id_t target,
1348                     kptl_peer_t **peerp)
1349 {
1350         rwlock_t         *g_lock = &kptllnd_data.kptl_peer_rw_lock;
1351         ptl_process_id_t  ptl_id;
1352         kptl_peer_t      *new_peer;
1353         kptl_tx_t        *hello_tx;
1354         unsigned long     flags;
1355         int               rc;
1356         __u64             last_matchbits_seen;
1357
1358         /* I expect to find the peer, so I only take a read lock... */
1359         read_lock_irqsave(g_lock, flags);
1360         *peerp = kptllnd_id2peer_locked(target);
1361         read_unlock_irqrestore(g_lock, flags);
1362
1363         if (*peerp != NULL)
1364                 return 0;
1365         
1366         if ((target.pid & LNET_PID_USERFLAG) != 0) {
1367                 CWARN("Refusing to create a new connection to %s "
1368                       "(non-kernel peer)\n", libcfs_id2str(target));
1369                 return -EHOSTUNREACH;
1370         }
1371
1372         /* The new peer is a kernel ptllnd, and kernel ptllnds all have the
1373          * same portals PID, which has nothing to do with LUSTRE_SRV_LNET_PID */
1374         ptl_id.nid = kptllnd_lnet2ptlnid(target.nid);
1375         ptl_id.pid = kptllnd_data.kptl_portals_id.pid;
1376
1377         hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1378         if (hello_tx == NULL) {
1379                 CERROR("Unable to allocate connect message for %s\n",
1380                        libcfs_id2str(target));
1381                 return -ENOMEM;
1382         }
1383
1384         hello_tx->tx_acked = 1;
1385         kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1386                          target, sizeof(kptl_hello_msg_t));
1387
1388         new_peer = kptllnd_peer_allocate(net, target, ptl_id);
1389         if (new_peer == NULL) {
1390                 rc = -ENOMEM;
1391                 goto unwind_0;
1392         }
1393
1394         rc = kptllnd_peer_reserve_buffers();
1395         if (rc != 0)
1396                 goto unwind_1;
1397
1398         write_lock_irqsave(g_lock, flags);
1399  again:
1400         /* Called only in lnd_send which can't happen after lnd_shutdown */
1401         LASSERT (!net->net_shutdown);
1402
1403         *peerp = kptllnd_id2peer_locked(target);
1404         if (*peerp != NULL) {
1405                 write_unlock_irqrestore(g_lock, flags);
1406                 goto unwind_2;
1407         }
1408
1409         kptllnd_cull_peertable_locked(target);
1410
1411         if (kptllnd_data.kptl_n_active_peers ==
1412             kptllnd_data.kptl_expected_peers) {
1413                 /* peer table full */
1414                 write_unlock_irqrestore(g_lock, flags);
1415
1416                 kptllnd_peertable_overflow_msg("Connection to ", target);
1417
1418                 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1419                 if (rc != 0) {
1420                         CERROR("Can't create connection to %s\n",
1421                                libcfs_id2str(target));
1422                         rc = -ENOMEM;
1423                         goto unwind_2;
1424                 }
1425                 write_lock_irqsave(g_lock, flags);
1426                 kptllnd_data.kptl_expected_peers++;
1427                 goto again;
1428         }
1429
1430         last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(target);
1431
1432         hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1433         hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1434                 *kptllnd_tunables.kptl_max_msg_size;
1435                 
1436         new_peer->peer_state = PEER_STATE_WAITING_HELLO;
1437         new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1438         
1439         kptllnd_peer_add_peertable_locked(new_peer);
1440
1441         write_unlock_irqrestore(g_lock, flags);
1442
1443         /* NB someone else could get in now and post a message before I post
1444          * the HELLO, but post_tx/check_sends take care of that! */
1445
1446         CDEBUG(D_NETTRACE, "%s: post initial hello %p\n",
1447                libcfs_id2str(new_peer->peer_id), hello_tx);
1448
1449         kptllnd_post_tx(new_peer, hello_tx, 0);
1450         kptllnd_peer_check_sends(new_peer);
1451        
1452         *peerp = new_peer;
1453         return 0;
1454         
1455  unwind_2:
1456         kptllnd_peer_unreserve_buffers();
1457  unwind_1:
1458         kptllnd_peer_decref(new_peer);
1459  unwind_0:
1460         kptllnd_tx_decref(hello_tx);
1461
1462         return rc;
1463 }