Whamcloud - gitweb
LU-812 kernel: remove smp_lock.h
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd_peer.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  */
30 /*
31  * This file is part of Lustre, http://www.lustre.org/
32  * Lustre is a trademark of Sun Microsystems, Inc.
33  *
34  * lnet/klnds/ptllnd/ptllnd_peer.c
35  *
36  * Author: PJ Kirner <pjkirner@clusterfs.com>
37  * Author: E Barton <eeb@bartonsoftware.com>
38  */
39
40 #include "ptllnd.h"
41 #include <libcfs/list.h>
42
43 static int
44 kptllnd_count_queue(cfs_list_t *q)
45 {
46         cfs_list_t *e;
47         int         n = 0;
48
49         cfs_list_for_each(e, q) {
50                 n++;
51         }
52
53         return n;
54 }
55
56 int
57 kptllnd_get_peer_info(int index,
58                       lnet_process_id_t *id,
59                       int *state, int *sent_hello,
60                       int *refcount, __u64 *incarnation,
61                       __u64 *next_matchbits, __u64 *last_matchbits_seen,
62                       int *nsendq, int *nactiveq,
63                       int *credits, int *outstanding_credits)
64 {
65         cfs_rwlock_t     *g_lock = &kptllnd_data.kptl_peer_rw_lock;
66         unsigned long     flags;
67         cfs_list_t       *ptmp;
68         kptl_peer_t      *peer;
69         int               i;
70         int               rc = -ENOENT;
71
72         cfs_read_lock_irqsave(g_lock, flags);
73
74         for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++) {
75                 cfs_list_for_each (ptmp, &kptllnd_data.kptl_peers[i]) {
76                         peer = cfs_list_entry(ptmp, kptl_peer_t, peer_list);
77
78                         if (index-- > 0)
79                                 continue;
80
81                         *id          = peer->peer_id;
82                         *state       = peer->peer_state;
83                         *sent_hello  = peer->peer_sent_hello;
84                         *refcount    = cfs_atomic_read(&peer->peer_refcount);
85                         *incarnation = peer->peer_incarnation;
86
87                         cfs_spin_lock(&peer->peer_lock);
88
89                         *next_matchbits      = peer->peer_next_matchbits;
90                         *last_matchbits_seen = peer->peer_last_matchbits_seen;
91                         *credits             = peer->peer_credits;
92                         *outstanding_credits = peer->peer_outstanding_credits;
93
94                         *nsendq   = kptllnd_count_queue(&peer->peer_sendq);
95                         *nactiveq = kptllnd_count_queue(&peer->peer_activeq);
96
97                         cfs_spin_unlock(&peer->peer_lock);
98
99                         rc = 0;
100                         goto out;
101                 }
102         }
103
104  out:
105         cfs_read_unlock_irqrestore(g_lock, flags);
106         return rc;
107 }
108
109 void
110 kptllnd_peer_add_peertable_locked (kptl_peer_t *peer)
111 {
112         LASSERT (kptllnd_data.kptl_n_active_peers <
113                  kptllnd_data.kptl_expected_peers);
114
115         LASSERT (peer->peer_state == PEER_STATE_WAITING_HELLO ||
116                  peer->peer_state == PEER_STATE_ACTIVE);
117
118         kptllnd_data.kptl_n_active_peers++;
119         cfs_atomic_inc(&peer->peer_refcount);       /* +1 ref for the list */
120
121         /* NB add to HEAD of peer list for MRU order!
122          * (see kptllnd_cull_peertable) */
123         cfs_list_add(&peer->peer_list, kptllnd_nid2peerlist(peer->peer_id.nid));
124 }
125
126 void
127 kptllnd_cull_peertable_locked (lnet_process_id_t pid)
128 {
129         /* I'm about to add a new peer with this portals ID to the peer table,
130          * so (a) this peer should not exist already and (b) I want to leave at
131          * most (max_procs_per_nid - 1) peers with this NID in the table. */
132         cfs_list_t   *peers = kptllnd_nid2peerlist(pid.nid);
133         int           cull_count = *kptllnd_tunables.kptl_max_procs_per_node;
134         int           count;
135         cfs_list_t   *tmp;
136         cfs_list_t   *nxt;
137         kptl_peer_t  *peer;
138
139         count = 0;
140         cfs_list_for_each_safe (tmp, nxt, peers) {
141                 /* NB I rely on kptllnd_peer_add_peertable_locked to add peers
142                  * in MRU order */
143                 peer = cfs_list_entry(tmp, kptl_peer_t, peer_list);
144                         
145                 if (LNET_NIDADDR(peer->peer_id.nid) != LNET_NIDADDR(pid.nid))
146                         continue;
147
148                 LASSERT (peer->peer_id.pid != pid.pid);
149                         
150                 count++;
151
152                 if (count < cull_count) /* recent (don't cull) */
153                         continue;
154
155                 CDEBUG(D_NET, "Cull %s(%s)\n",
156                        libcfs_id2str(peer->peer_id),
157                        kptllnd_ptlid2str(peer->peer_ptlid));
158                 
159                 kptllnd_peer_close_locked(peer, 0);
160         }
161 }
162
163 kptl_peer_t *
164 kptllnd_peer_allocate (kptl_net_t *net, lnet_process_id_t lpid, ptl_process_id_t ppid)
165 {
166         unsigned long    flags;
167         kptl_peer_t     *peer;
168
169         LIBCFS_ALLOC(peer, sizeof (*peer));
170         if (peer == NULL) {
171                 CERROR("Can't create peer %s (%s)\n",
172                        libcfs_id2str(lpid), 
173                        kptllnd_ptlid2str(ppid));
174                 return NULL;
175         }
176
177         memset(peer, 0, sizeof(*peer));         /* zero flags etc */
178
179         CFS_INIT_LIST_HEAD (&peer->peer_noops);
180         CFS_INIT_LIST_HEAD (&peer->peer_sendq);
181         CFS_INIT_LIST_HEAD (&peer->peer_activeq);
182         cfs_spin_lock_init (&peer->peer_lock);
183
184         peer->peer_state = PEER_STATE_ALLOCATED;
185         peer->peer_error = 0;
186         peer->peer_last_alive = 0;
187         peer->peer_id = lpid;
188         peer->peer_ptlid = ppid;
189         peer->peer_credits = 1;                 /* enough for HELLO */
190         peer->peer_next_matchbits = PTL_RESERVED_MATCHBITS;
191         peer->peer_outstanding_credits = *kptllnd_tunables.kptl_peertxcredits - 1;
192         peer->peer_sent_credits = 1;           /* HELLO credit is implicit */
193         peer->peer_max_msg_size = PTLLND_MIN_BUFFER_SIZE; /* until we know better */
194
195         cfs_atomic_set(&peer->peer_refcount, 1);    /* 1 ref for caller */
196
197         cfs_write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
198
199         peer->peer_myincarnation = kptllnd_data.kptl_incarnation;
200
201         /* Only increase # peers under lock, to guarantee we dont grow it
202          * during shutdown */
203         if (net->net_shutdown) {
204                 cfs_write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
205                                             flags);
206                 LIBCFS_FREE(peer, sizeof(*peer));
207                 return NULL;
208         }
209
210         kptllnd_data.kptl_npeers++;
211         cfs_write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
212         return peer;
213 }
214
215 void
216 kptllnd_peer_destroy (kptl_peer_t *peer)
217 {
218         unsigned long flags;
219
220         CDEBUG(D_NET, "Peer=%p\n", peer);
221
222         LASSERT (!cfs_in_interrupt());
223         LASSERT (cfs_atomic_read(&peer->peer_refcount) == 0);
224         LASSERT (peer->peer_state == PEER_STATE_ALLOCATED ||
225                  peer->peer_state == PEER_STATE_ZOMBIE);
226         LASSERT (cfs_list_empty(&peer->peer_noops));
227         LASSERT (cfs_list_empty(&peer->peer_sendq));
228         LASSERT (cfs_list_empty(&peer->peer_activeq));
229
230         cfs_write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
231
232         if (peer->peer_state == PEER_STATE_ZOMBIE)
233                 cfs_list_del(&peer->peer_list);
234
235         kptllnd_data.kptl_npeers--;
236
237         cfs_write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
238
239         LIBCFS_FREE (peer, sizeof (*peer));
240 }
241
242 void
243 kptllnd_cancel_txlist (cfs_list_t *peerq, cfs_list_t *txs)
244 {
245         cfs_list_t  *tmp;
246         cfs_list_t  *nxt;
247         kptl_tx_t   *tx;
248
249         cfs_list_for_each_safe (tmp, nxt, peerq) {
250                 tx = cfs_list_entry(tmp, kptl_tx_t, tx_list);
251
252                 cfs_list_del(&tx->tx_list);
253                 cfs_list_add_tail(&tx->tx_list, txs);
254
255                 tx->tx_status = -EIO;
256                 tx->tx_active = 0;
257         }
258 }
259
260 void
261 kptllnd_peer_cancel_txs(kptl_peer_t *peer, cfs_list_t *txs)
262 {
263         unsigned long   flags;
264
265         cfs_spin_lock_irqsave(&peer->peer_lock, flags);
266
267         kptllnd_cancel_txlist(&peer->peer_noops, txs);
268         kptllnd_cancel_txlist(&peer->peer_sendq, txs);
269         kptllnd_cancel_txlist(&peer->peer_activeq, txs);
270                 
271         cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
272 }
273
274 void
275 kptllnd_peer_alive (kptl_peer_t *peer)
276 {
277         /* This is racy, but everyone's only writing cfs_time_current() */
278         peer->peer_last_alive = cfs_time_current();
279         cfs_mb();
280 }
281
282 void
283 kptllnd_peer_notify (kptl_peer_t *peer)
284 {
285         unsigned long flags;
286         kptl_net_t   *net;
287         kptl_net_t  **nets;
288         int           i = 0;
289         int           nnets = 0;
290         int           error = 0;
291         cfs_time_t    last_alive = 0;
292
293         cfs_spin_lock_irqsave(&peer->peer_lock, flags);
294
295         if (peer->peer_error != 0) {
296                 error = peer->peer_error;
297                 peer->peer_error = 0;
298                 last_alive = peer->peer_last_alive;
299         }
300
301         cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
302
303         if (error == 0)
304                 return;
305
306         cfs_read_lock(&kptllnd_data.kptl_net_rw_lock);
307         cfs_list_for_each_entry (net, &kptllnd_data.kptl_nets, net_list)
308                 nnets++;
309         cfs_read_unlock(&kptllnd_data.kptl_net_rw_lock);
310
311         if (nnets == 0) /* shutdown in progress */
312                 return;
313
314         LIBCFS_ALLOC(nets, nnets * sizeof(*nets));
315         if (nets == NULL) {
316                 CERROR("Failed to allocate nets[%d]\n", nnets);
317                 return;
318         }
319         memset(nets, 0, nnets * sizeof(*nets));
320
321         cfs_read_lock(&kptllnd_data.kptl_net_rw_lock);
322         i = 0;
323         cfs_list_for_each_entry (net, &kptllnd_data.kptl_nets, net_list) {
324                 LASSERT (i < nnets);
325                 nets[i] = net;
326                 kptllnd_net_addref(net);
327                 i++;
328         }
329         cfs_read_unlock(&kptllnd_data.kptl_net_rw_lock);
330
331         for (i = 0; i < nnets; i++) {
332                 lnet_nid_t peer_nid;
333
334                 net = nets[i];
335                 if (net == NULL)
336                         break;
337
338                 if (!net->net_shutdown) {
339                         peer_nid = kptllnd_ptl2lnetnid(net->net_ni->ni_nid,
340                                                        peer->peer_ptlid.nid);
341                         lnet_notify(net->net_ni, peer_nid, 0, last_alive);
342                 }
343
344                 kptllnd_net_decref(net);
345         }
346
347         LIBCFS_FREE(nets, nnets * sizeof(*nets));
348 }
349
350 void
351 kptllnd_handle_closing_peers ()
352 {
353         unsigned long           flags;
354         cfs_list_t              txs;
355         kptl_peer_t            *peer;
356         cfs_list_t             *tmp;
357         cfs_list_t             *nxt;
358         kptl_tx_t              *tx;
359         int                     idle;
360
361         /* Check with a read lock first to avoid blocking anyone */
362
363         cfs_read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
364         idle = cfs_list_empty(&kptllnd_data.kptl_closing_peers) &&
365                cfs_list_empty(&kptllnd_data.kptl_zombie_peers);
366         cfs_read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
367
368         if (idle)
369                 return;
370
371         CFS_INIT_LIST_HEAD(&txs);
372
373         cfs_write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
374
375         /* Cancel txs on all zombie peers.  NB anyone dropping the last peer
376          * ref removes it from this list, so I musn't drop the lock while
377          * scanning it. */
378         cfs_list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
379                 peer = cfs_list_entry (tmp, kptl_peer_t, peer_list);
380
381                 LASSERT (peer->peer_state == PEER_STATE_ZOMBIE);
382
383                 kptllnd_peer_cancel_txs(peer, &txs);
384         }
385
386         /* Notify LNET and cancel txs on closing (i.e. newly closed) peers.  NB
387          * I'm the only one removing from this list, but peers can be added on
388          * the end any time I drop the lock. */
389
390         cfs_list_for_each_safe (tmp, nxt, &kptllnd_data.kptl_closing_peers) {
391                 peer = cfs_list_entry (tmp, kptl_peer_t, peer_list);
392
393                 LASSERT (peer->peer_state == PEER_STATE_CLOSING);
394
395                 cfs_list_del(&peer->peer_list);
396                 cfs_list_add_tail(&peer->peer_list,
397                                   &kptllnd_data.kptl_zombie_peers);
398                 peer->peer_state = PEER_STATE_ZOMBIE;
399
400                 cfs_write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
401                                             flags);
402
403                 kptllnd_peer_notify(peer);
404                 kptllnd_peer_cancel_txs(peer, &txs);
405                 kptllnd_peer_decref(peer);
406
407                 cfs_write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
408         }
409
410         cfs_write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
411
412         /* Drop peer's ref on all cancelled txs.  This will get
413          * kptllnd_tx_fini() to abort outstanding comms if necessary. */
414
415         cfs_list_for_each_safe (tmp, nxt, &txs) {
416                 tx = cfs_list_entry(tmp, kptl_tx_t, tx_list);
417                 cfs_list_del(&tx->tx_list);
418                 kptllnd_tx_decref(tx);
419         }
420 }
421
422 void
423 kptllnd_peer_close_locked(kptl_peer_t *peer, int why)
424 {
425         switch (peer->peer_state) {
426         default:
427                 LBUG();
428
429         case PEER_STATE_WAITING_HELLO:
430         case PEER_STATE_ACTIVE:
431                 /* Ensure new peers see a new incarnation of me */
432                 LASSERT(peer->peer_myincarnation <= kptllnd_data.kptl_incarnation);
433                 if (peer->peer_myincarnation == kptllnd_data.kptl_incarnation)
434                         kptllnd_data.kptl_incarnation++;
435
436                 /* Removing from peer table */
437                 kptllnd_data.kptl_n_active_peers--;
438                 LASSERT (kptllnd_data.kptl_n_active_peers >= 0);
439
440                 cfs_list_del(&peer->peer_list);
441                 kptllnd_peer_unreserve_buffers();
442
443                 peer->peer_error = why; /* stash 'why' only on first close */
444                 peer->peer_state = PEER_STATE_CLOSING;
445
446                 /* Schedule for immediate attention, taking peer table's ref */
447                 cfs_list_add_tail(&peer->peer_list,
448                                  &kptllnd_data.kptl_closing_peers);
449                 cfs_waitq_signal(&kptllnd_data.kptl_watchdog_waitq);
450                 break;
451
452         case PEER_STATE_ZOMBIE:
453         case PEER_STATE_CLOSING:
454                 break;
455         }
456 }
457
458 void
459 kptllnd_peer_close(kptl_peer_t *peer, int why)
460 {
461         unsigned long      flags;
462
463         cfs_write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
464         kptllnd_peer_close_locked(peer, why);
465         cfs_write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
466 }
467
468 int
469 kptllnd_peer_del(lnet_process_id_t id)
470 {
471         cfs_list_t        *ptmp;
472         cfs_list_t        *pnxt;
473         kptl_peer_t       *peer;
474         int                lo;
475         int                hi;
476         int                i;
477         unsigned long      flags;
478         int                rc = -ENOENT;
479
480         /*
481          * Find the single bucket we are supposed to look at or if nid is a
482          * wildcard (LNET_NID_ANY) then look at all of the buckets
483          */
484         if (id.nid != LNET_NID_ANY) {
485                 cfs_list_t *l = kptllnd_nid2peerlist(id.nid);
486
487                 lo = hi =  l - kptllnd_data.kptl_peers;
488         } else {
489                 if (id.pid != LNET_PID_ANY)
490                         return -EINVAL;
491
492                 lo = 0;
493                 hi = kptllnd_data.kptl_peer_hash_size - 1;
494         }
495
496 again:
497         cfs_read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
498
499         for (i = lo; i <= hi; i++) {
500                 cfs_list_for_each_safe (ptmp, pnxt,
501                                         &kptllnd_data.kptl_peers[i]) {
502                         peer = cfs_list_entry (ptmp, kptl_peer_t, peer_list);
503
504                         if (!(id.nid == LNET_NID_ANY || 
505                               (LNET_NIDADDR(peer->peer_id.nid) == LNET_NIDADDR(id.nid) &&
506                                (id.pid == LNET_PID_ANY || 
507                                 peer->peer_id.pid == id.pid))))
508                                 continue;
509
510                         kptllnd_peer_addref(peer); /* 1 ref for me... */
511
512                         cfs_read_unlock_irqrestore(&kptllnd_data. \
513                                                    kptl_peer_rw_lock,
514                                                    flags);
515
516                         kptllnd_peer_close(peer, 0);
517                         kptllnd_peer_decref(peer); /* ...until here */
518
519                         rc = 0;         /* matched something */
520
521                         /* start again now I've dropped the lock */
522                         goto again;
523                 }
524         }
525
526         cfs_read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
527
528         return (rc);
529 }
530
531 void
532 kptllnd_queue_tx(kptl_peer_t *peer, kptl_tx_t *tx)
533 {
534         /* CAVEAT EMPTOR: I take over caller's ref on 'tx' */
535         unsigned long flags;
536
537         cfs_spin_lock_irqsave(&peer->peer_lock, flags);
538
539         /* Ensure HELLO is sent first */
540         if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_NOOP)
541                 cfs_list_add(&tx->tx_list, &peer->peer_noops);
542         else if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_HELLO)
543                 cfs_list_add(&tx->tx_list, &peer->peer_sendq);
544         else
545                 cfs_list_add_tail(&tx->tx_list, &peer->peer_sendq);
546
547         cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
548 }
549
550
551 void
552 kptllnd_post_tx(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
553 {
554         /* CAVEAT EMPTOR: I take over caller's ref on 'tx' */
555         ptl_handle_md_t  msg_mdh;
556         ptl_md_t         md;
557         ptl_err_t        prc;
558
559         LASSERT (!tx->tx_idle);
560         LASSERT (!tx->tx_active);
561         LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
562         LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
563         LASSERT (tx->tx_type == TX_TYPE_SMALL_MESSAGE ||
564                  tx->tx_type == TX_TYPE_PUT_REQUEST ||
565                  tx->tx_type == TX_TYPE_GET_REQUEST);
566
567         kptllnd_set_tx_peer(tx, peer);
568
569         memset(&md, 0, sizeof(md));
570
571         md.threshold = tx->tx_acked ? 2 : 1;    /* SEND END + ACK? */
572         md.options = PTL_MD_OP_PUT |
573                      PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
574                      PTL_MD_EVENT_START_DISABLE;
575         md.user_ptr = &tx->tx_msg_eventarg;
576         md.eq_handle = kptllnd_data.kptl_eqh;
577
578         if (nfrag == 0) {
579                 md.start = tx->tx_msg;
580                 md.length = tx->tx_msg->ptlm_nob;
581         } else {
582                 LASSERT (nfrag > 1);
583                 LASSERT (tx->tx_frags->iov[0].iov_base == (void *)tx->tx_msg);
584
585                 md.start = tx->tx_frags;
586                 md.length = nfrag;
587                 md.options |= PTL_MD_IOVEC;
588         }
589
590         prc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &msg_mdh);
591         if (prc != PTL_OK) {
592                 CERROR("PtlMDBind(%s) failed: %s(%d)\n",
593                        libcfs_id2str(peer->peer_id),
594                        kptllnd_errtype2str(prc), prc);
595                 tx->tx_status = -EIO;
596                 kptllnd_tx_decref(tx);
597                 return;
598         }
599
600
601         tx->tx_deadline = jiffies + (*kptllnd_tunables.kptl_timeout * CFS_HZ);
602         tx->tx_active = 1;
603         tx->tx_msg_mdh = msg_mdh;
604         kptllnd_queue_tx(peer, tx);
605 }
606
607 /* NB "restarts" comes from peer_sendq of a single peer */
608 void
609 kptllnd_restart_txs (kptl_net_t *net, lnet_process_id_t target,
610                      cfs_list_t *restarts)
611 {
612         kptl_tx_t   *tx;
613         kptl_tx_t   *tmp;
614         kptl_peer_t *peer;
615
616         LASSERT (!cfs_list_empty(restarts));
617
618         if (kptllnd_find_target(net, target, &peer) != 0)
619                 peer = NULL;
620
621         cfs_list_for_each_entry_safe (tx, tmp, restarts, tx_list) {
622                 LASSERT (tx->tx_peer != NULL);
623                 LASSERT (tx->tx_type == TX_TYPE_GET_REQUEST ||
624                          tx->tx_type == TX_TYPE_PUT_REQUEST ||
625                          tx->tx_type == TX_TYPE_SMALL_MESSAGE);
626
627                 cfs_list_del_init(&tx->tx_list);
628
629                 if (peer == NULL ||
630                     tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {
631                         kptllnd_tx_decref(tx);
632                         continue;
633                 }
634
635                 LASSERT (tx->tx_msg->ptlm_type != PTLLND_MSG_TYPE_NOOP);
636                 tx->tx_status = 0;
637                 tx->tx_active = 1;
638                 kptllnd_peer_decref(tx->tx_peer);
639                 tx->tx_peer = NULL;
640                 kptllnd_set_tx_peer(tx, peer);
641                 kptllnd_queue_tx(peer, tx); /* takes over my ref on tx */
642         }
643
644         if (peer == NULL)
645                 return;
646
647         kptllnd_peer_check_sends(peer);
648         kptllnd_peer_decref(peer);
649 }
650
651 static inline int
652 kptllnd_peer_send_noop (kptl_peer_t *peer)
653 {
654         if (!peer->peer_sent_hello ||
655             peer->peer_credits == 0 ||
656             !cfs_list_empty(&peer->peer_noops) ||
657             peer->peer_outstanding_credits < PTLLND_CREDIT_HIGHWATER)
658                 return 0;
659
660         /* No tx to piggyback NOOP onto or no credit to send a tx */
661         return (cfs_list_empty(&peer->peer_sendq) || peer->peer_credits == 1);
662 }
663
664 void
665 kptllnd_peer_check_sends (kptl_peer_t *peer)
666 {
667         ptl_handle_me_t  meh;
668         kptl_tx_t       *tx;
669         int              rc;
670         int              msg_type;
671         unsigned long    flags;
672
673         LASSERT(!cfs_in_interrupt());
674
675         cfs_spin_lock_irqsave(&peer->peer_lock, flags);
676
677         peer->peer_retry_noop = 0;
678
679         if (kptllnd_peer_send_noop(peer)) {
680                 /* post a NOOP to return credits */
681                 cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
682
683                 tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
684                 if (tx == NULL) {
685                         CERROR("Can't return credits to %s: can't allocate descriptor\n",
686                                libcfs_id2str(peer->peer_id));
687                 } else {
688                         kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_NOOP,
689                                          peer->peer_id, 0);
690                         kptllnd_post_tx(peer, tx, 0);
691                 }
692
693                 cfs_spin_lock_irqsave(&peer->peer_lock, flags);
694                 peer->peer_retry_noop = (tx == NULL);
695         }
696
697         for (;;) {
698                 if (!cfs_list_empty(&peer->peer_noops)) {
699                         LASSERT (peer->peer_sent_hello);
700                         tx = cfs_list_entry(peer->peer_noops.next,
701                                             kptl_tx_t, tx_list);
702                 } else if (!cfs_list_empty(&peer->peer_sendq)) {
703                         tx = cfs_list_entry(peer->peer_sendq.next,
704                                             kptl_tx_t, tx_list);
705                 } else {
706                         /* nothing to send right now */
707                         break;
708                 }
709
710                 LASSERT (tx->tx_active);
711                 LASSERT (!PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
712                 LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
713
714                 LASSERT (peer->peer_outstanding_credits >= 0);
715                 LASSERT (peer->peer_sent_credits >= 0);
716                 LASSERT (peer->peer_sent_credits +
717                          peer->peer_outstanding_credits <=
718                          *kptllnd_tunables.kptl_peertxcredits);
719                 LASSERT (peer->peer_credits >= 0);
720
721                 msg_type = tx->tx_msg->ptlm_type;
722
723                 /* Ensure HELLO is sent first */
724                 if (!peer->peer_sent_hello) {
725                         LASSERT (cfs_list_empty(&peer->peer_noops));
726                         if (msg_type != PTLLND_MSG_TYPE_HELLO)
727                                 break;
728                         peer->peer_sent_hello = 1;
729                 }
730
731                 if (peer->peer_credits == 0) {
732                         CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: no credits for %s[%p]\n",
733                                libcfs_id2str(peer->peer_id), 
734                                peer->peer_credits,
735                                peer->peer_outstanding_credits, 
736                                peer->peer_sent_credits, 
737                                kptllnd_msgtype2str(msg_type), tx);
738                         break;
739                 }
740
741                 /* Last/Initial credit reserved for NOOP/HELLO */
742                 if (peer->peer_credits == 1 &&
743                     msg_type != PTLLND_MSG_TYPE_HELLO &&
744                     msg_type != PTLLND_MSG_TYPE_NOOP) {
745                         CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: "
746                                "not using last credit for %s[%p]\n",
747                                libcfs_id2str(peer->peer_id), 
748                                peer->peer_credits,
749                                peer->peer_outstanding_credits,
750                                peer->peer_sent_credits,
751                                kptllnd_msgtype2str(msg_type), tx);
752                         break;
753                 }
754
755                 cfs_list_del(&tx->tx_list);
756
757                 /* Discard any NOOP I queued if I'm not at the high-water mark
758                  * any more or more messages have been queued */
759                 if (msg_type == PTLLND_MSG_TYPE_NOOP &&
760                     !kptllnd_peer_send_noop(peer)) {
761                         tx->tx_active = 0;
762
763                         cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
764
765                         CDEBUG(D_NET, "%s: redundant noop\n", 
766                                libcfs_id2str(peer->peer_id));
767                         kptllnd_tx_decref(tx);
768
769                         cfs_spin_lock_irqsave(&peer->peer_lock, flags);
770                         continue;
771                 }
772
773                 /* fill last-minute msg fields */
774                 kptllnd_msg_pack(tx->tx_msg, peer);
775
776                 if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
777                     tx->tx_type == TX_TYPE_GET_REQUEST) {
778                         /* peer_next_matchbits must be known good */
779                         LASSERT (peer->peer_state >= PEER_STATE_ACTIVE);
780                         /* Assume 64-bit matchbits can't wrap */
781                         LASSERT (peer->peer_next_matchbits >= PTL_RESERVED_MATCHBITS);
782                         tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits =
783                                 peer->peer_next_matchbits++;
784                 }
785
786                 peer->peer_sent_credits += peer->peer_outstanding_credits;
787                 peer->peer_outstanding_credits = 0;
788                 peer->peer_credits--;
789
790                 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: %s tx=%p nob=%d cred=%d\n",
791                        libcfs_id2str(peer->peer_id), peer->peer_credits,
792                        peer->peer_outstanding_credits, peer->peer_sent_credits,
793                        kptllnd_msgtype2str(msg_type), tx, tx->tx_msg->ptlm_nob,
794                        tx->tx_msg->ptlm_credits);
795
796                 cfs_list_add_tail(&tx->tx_list, &peer->peer_activeq);
797
798                 kptllnd_tx_addref(tx);          /* 1 ref for me... */
799
800                 cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
801
802                 if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
803                     tx->tx_type == TX_TYPE_GET_REQUEST) {
804                         /* Post bulk now we have safe matchbits */
805                         rc = PtlMEAttach(kptllnd_data.kptl_nih,
806                                          *kptllnd_tunables.kptl_portal,
807                                          peer->peer_ptlid,
808                                          tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits,
809                                          0,             /* ignore bits */
810                                          PTL_UNLINK,
811                                          PTL_INS_BEFORE,
812                                          &meh);
813                         if (rc != PTL_OK) {
814                                 CERROR("PtlMEAttach(%s) failed: %s(%d)\n",
815                                        libcfs_id2str(peer->peer_id),
816                                        kptllnd_errtype2str(rc), rc);
817                                 goto failed;
818                         }
819
820                         rc = PtlMDAttach(meh, tx->tx_rdma_md, PTL_UNLINK,
821                                          &tx->tx_rdma_mdh);
822                         if (rc != PTL_OK) {
823                                 CERROR("PtlMDAttach(%s) failed: %s(%d)\n",
824                                        libcfs_id2str(tx->tx_peer->peer_id),
825                                        kptllnd_errtype2str(rc), rc);
826                                 rc = PtlMEUnlink(meh);
827                                 LASSERT(rc == PTL_OK);
828                                 tx->tx_rdma_mdh = PTL_INVALID_HANDLE;
829                                 goto failed;
830                         }
831                         /* I'm not racing with the event callback here.  It's a
832                          * bug if there's an event on the MD I just attached
833                          * before I actually send the RDMA request message -
834                          * probably matchbits re-used in error. */
835                 }
836
837                 tx->tx_tposted = jiffies;       /* going on the wire */
838
839                 rc = PtlPut (tx->tx_msg_mdh,
840                              tx->tx_acked ? PTL_ACK_REQ : PTL_NOACK_REQ,
841                              peer->peer_ptlid,
842                              *kptllnd_tunables.kptl_portal,
843                              0,                 /* acl cookie */
844                              LNET_MSG_MATCHBITS,
845                              0,                 /* offset */
846                              0);                /* header data */
847                 if (rc != PTL_OK) {
848                         CERROR("PtlPut %s error %s(%d)\n",
849                                libcfs_id2str(peer->peer_id),
850                                kptllnd_errtype2str(rc), rc);
851                         goto failed;
852                 }
853
854                 kptllnd_tx_decref(tx);          /* drop my ref */
855
856                 cfs_spin_lock_irqsave(&peer->peer_lock, flags);
857         }
858
859         cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
860         return;
861
862  failed:
863         /* Nuke everything (including tx we were trying) */
864         kptllnd_peer_close(peer, -EIO);
865         kptllnd_tx_decref(tx);
866 }
867
868 kptl_tx_t *
869 kptllnd_find_timed_out_tx(kptl_peer_t *peer)
870 {
871         kptl_tx_t         *tx;
872         cfs_list_t        *ele;
873
874         cfs_list_for_each(ele, &peer->peer_sendq) {
875                 tx = cfs_list_entry(ele, kptl_tx_t, tx_list);
876
877                 if (cfs_time_aftereq(jiffies, tx->tx_deadline)) {
878                         kptllnd_tx_addref(tx);
879                         return tx;
880                 }
881         }
882
883         cfs_list_for_each(ele, &peer->peer_activeq) {
884                 tx = cfs_list_entry(ele, kptl_tx_t, tx_list);
885
886                 if (cfs_time_aftereq(jiffies, tx->tx_deadline)) {
887                         kptllnd_tx_addref(tx);
888                         return tx;
889                 }
890         }
891
892         return NULL;
893 }
894
895
896 void
897 kptllnd_peer_check_bucket (int idx, int stamp)
898 {
899         cfs_list_t        *peers = &kptllnd_data.kptl_peers[idx];
900         kptl_peer_t       *peer;
901         unsigned long      flags;
902
903         CDEBUG(D_NET, "Bucket=%d, stamp=%d\n", idx, stamp);
904
905  again:
906         /* NB. Shared lock while I just look */
907         cfs_read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
908
909         cfs_list_for_each_entry (peer, peers, peer_list) {
910                 kptl_tx_t *tx;
911                 int        check_sends;
912                 int        c = -1, oc = -1, sc = -1;
913                 int        nsend = -1, nactive = -1;
914                 int        sent_hello = -1, state = -1;
915
916                 CDEBUG(D_NET, "Peer=%s Credits=%d Outstanding=%d Send=%d\n",
917                        libcfs_id2str(peer->peer_id), peer->peer_credits, 
918                        peer->peer_outstanding_credits, peer->peer_sent_credits);
919
920                 cfs_spin_lock(&peer->peer_lock);
921
922                 if (peer->peer_check_stamp == stamp) {
923                         /* checked already this pass */
924                         cfs_spin_unlock(&peer->peer_lock);
925                         continue;
926                 }
927
928                 peer->peer_check_stamp = stamp;
929                 tx = kptllnd_find_timed_out_tx(peer);
930                 check_sends = peer->peer_retry_noop;
931
932                 if (tx != NULL) {
933                         c  = peer->peer_credits;
934                         sc = peer->peer_sent_credits;
935                         oc = peer->peer_outstanding_credits;
936                         state      = peer->peer_state;
937                         sent_hello = peer->peer_sent_hello;
938                         nsend   = kptllnd_count_queue(&peer->peer_sendq);
939                         nactive = kptllnd_count_queue(&peer->peer_activeq);
940                 }
941
942                 cfs_spin_unlock(&peer->peer_lock);
943
944                 if (tx == NULL && !check_sends)
945                         continue;
946
947                 kptllnd_peer_addref(peer); /* 1 ref for me... */
948
949                 cfs_read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
950                                            flags);
951
952                 if (tx == NULL) { /* nothing timed out */
953                         kptllnd_peer_check_sends(peer);
954                         kptllnd_peer_decref(peer); /* ...until here or... */
955
956                         /* rescan after dropping the lock */
957                         goto again;
958                 }
959
960                 LCONSOLE_ERROR_MSG(0x126, "Timing out %s: %s\n",
961                                    libcfs_id2str(peer->peer_id),
962                                    (tx->tx_tposted == 0) ?
963                                    "no free peer buffers" :
964                                    "please check Portals");
965
966                 if (tx->tx_tposted) {
967                         CERROR("Could not send to %s after %ds (sent %lds ago); "
968                                 "check Portals for possible issues\n",
969                                 libcfs_id2str(peer->peer_id),
970                                 *kptllnd_tunables.kptl_timeout,
971                                 cfs_duration_sec(jiffies - tx->tx_tposted));
972                 } else if (state < PEER_STATE_ACTIVE) {
973                         CERROR("Could not connect %s (%d) after %ds; "
974                                "peer might be down\n",
975                                libcfs_id2str(peer->peer_id), state,
976                                *kptllnd_tunables.kptl_timeout);
977                 } else {
978                         CERROR("Could not get credits for %s after %ds; "
979                                 "possible Lustre networking issues\n",
980                         libcfs_id2str(peer->peer_id),
981                         *kptllnd_tunables.kptl_timeout);
982                 }
983
984                 CERROR("%s timed out: cred %d outstanding %d, sent %d, "
985                        "state %d, sent_hello %d, sendq %d, activeq %d "
986                        "Tx %p %s %s (%s%s%s) status %d %sposted %lu T/O %ds\n",
987                        libcfs_id2str(peer->peer_id), c, oc, sc,
988                        state, sent_hello, nsend, nactive,
989                        tx, kptllnd_tx_typestr(tx->tx_type),
990                        kptllnd_msgtype2str(tx->tx_msg->ptlm_type),
991                        tx->tx_active ? "A" : "",
992                        PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ?
993                        "" : "M",
994                        PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE) ?
995                        "" : "D",
996                        tx->tx_status,
997                        (tx->tx_tposted == 0) ? "not " : "",
998                        (tx->tx_tposted == 0) ? 0UL : (jiffies - tx->tx_tposted),
999                        *kptllnd_tunables.kptl_timeout);
1000
1001                 kptllnd_tx_decref(tx);
1002
1003                 kptllnd_peer_close(peer, -ETIMEDOUT);
1004                 kptllnd_peer_decref(peer); /* ...until here */
1005
1006                 /* start again now I've dropped the lock */
1007                 goto again;
1008         }
1009
1010         cfs_read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
1011 }
1012
1013 kptl_peer_t *
1014 kptllnd_id2peer_locked (lnet_process_id_t id)
1015 {
1016         cfs_list_t       *peers = kptllnd_nid2peerlist(id.nid);
1017         cfs_list_t       *tmp;
1018         kptl_peer_t      *peer;
1019
1020         cfs_list_for_each (tmp, peers) {
1021                 peer = cfs_list_entry (tmp, kptl_peer_t, peer_list);
1022
1023                 LASSERT(peer->peer_state == PEER_STATE_WAITING_HELLO ||
1024                         peer->peer_state == PEER_STATE_ACTIVE);
1025
1026                 /* NB logical LNet peers share one kptl_peer_t */
1027                 if (peer->peer_id.pid != id.pid ||
1028                     LNET_NIDADDR(id.nid) != LNET_NIDADDR(peer->peer_id.nid))
1029                         continue;
1030
1031                 kptllnd_peer_addref(peer);
1032
1033                 CDEBUG(D_NET, "%s -> %s (%d)\n",
1034                        libcfs_id2str(id),
1035                        kptllnd_ptlid2str(peer->peer_ptlid),
1036                        cfs_atomic_read (&peer->peer_refcount));
1037                 return peer;
1038         }
1039
1040         return NULL;
1041 }
1042
1043 void
1044 kptllnd_peertable_overflow_msg(char *str, lnet_process_id_t id)
1045 {
1046         LCONSOLE_ERROR_MSG(0x127, "%s %s overflows the peer table[%d]: "
1047                            "messages may be dropped\n",
1048                            str, libcfs_id2str(id),
1049                            kptllnd_data.kptl_n_active_peers);
1050         LCONSOLE_ERROR_MSG(0x128, "Please correct by increasing "
1051                            "'max_nodes' or 'max_procs_per_node'\n");
1052 }
1053
1054 __u64
1055 kptllnd_get_last_seen_matchbits_locked(lnet_process_id_t lpid)
1056 {
1057         kptl_peer_t  *peer;
1058         cfs_list_t   *tmp;
1059
1060         /* Find the last matchbits I saw this new peer using.  Note..
1061            A. This peer cannot be in the peer table - she's new!
1062            B. If I can't find the peer in the closing/zombie peers, all
1063               matchbits are safe because all refs to the (old) peer have gone
1064               so all txs have completed so there's no risk of matchbit
1065               collision!
1066          */
1067
1068         LASSERT(kptllnd_id2peer_locked(lpid) == NULL);
1069
1070         /* peer's last matchbits can't change after it comes out of the peer
1071          * table, so first match is fine */
1072
1073         cfs_list_for_each (tmp, &kptllnd_data.kptl_closing_peers) {
1074                 peer = cfs_list_entry (tmp, kptl_peer_t, peer_list);
1075
1076                 if (LNET_NIDADDR(peer->peer_id.nid) == LNET_NIDADDR(lpid.nid) &&
1077                     peer->peer_id.pid == lpid.pid)
1078                         return peer->peer_last_matchbits_seen;
1079         }
1080
1081         cfs_list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
1082                 peer = cfs_list_entry (tmp, kptl_peer_t, peer_list);
1083
1084                 if (LNET_NIDADDR(peer->peer_id.nid) == LNET_NIDADDR(lpid.nid) &&
1085                     peer->peer_id.pid == lpid.pid)
1086                         return peer->peer_last_matchbits_seen;
1087         }
1088
1089         return PTL_RESERVED_MATCHBITS;
1090 }
1091
1092 kptl_peer_t *
1093 kptllnd_peer_handle_hello (kptl_net_t *net,
1094                            ptl_process_id_t initiator, kptl_msg_t *msg)
1095 {
1096         cfs_rwlock_t       *g_lock = &kptllnd_data.kptl_peer_rw_lock;
1097         kptl_peer_t        *peer;
1098         kptl_peer_t        *new_peer;
1099         lnet_process_id_t   lpid;
1100         unsigned long       flags;
1101         kptl_tx_t          *hello_tx;
1102         int                 rc;
1103         __u64               safe_matchbits;
1104         __u64               last_matchbits_seen;
1105
1106         lpid.nid = msg->ptlm_srcnid;
1107         lpid.pid = msg->ptlm_srcpid;
1108
1109         CDEBUG(D_NET, "hello from %s(%s)\n",
1110                libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
1111
1112         if (initiator.pid != kptllnd_data.kptl_portals_id.pid &&
1113             (msg->ptlm_srcpid & LNET_PID_USERFLAG) == 0) {
1114                 /* If the peer's PID isn't _the_ ptllnd kernel pid, she must be
1115                  * userspace.  Refuse the connection if she hasn't set the
1116                  * correct flag in her PID... */
1117                 CERROR("Userflag not set in hello from %s (%s)\n",
1118                        libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
1119                 return NULL;
1120         }
1121         
1122         /* kptlhm_matchbits are the highest matchbits my peer may have used to
1123          * RDMA to me.  I ensure I never register buffers for RDMA that could
1124          * match any she used */
1125         safe_matchbits = msg->ptlm_u.hello.kptlhm_matchbits + 1;
1126
1127         if (safe_matchbits < PTL_RESERVED_MATCHBITS) {
1128                 CERROR("Illegal matchbits "LPX64" in HELLO from %s\n",
1129                        safe_matchbits, libcfs_id2str(lpid));
1130                 return NULL;
1131         }
1132         
1133         if (msg->ptlm_u.hello.kptlhm_max_msg_size < PTLLND_MIN_BUFFER_SIZE) {
1134                 CERROR("%s: max message size %d < MIN %d",
1135                        libcfs_id2str(lpid),
1136                        msg->ptlm_u.hello.kptlhm_max_msg_size,
1137                        PTLLND_MIN_BUFFER_SIZE);
1138                 return NULL;
1139         }
1140
1141         if (msg->ptlm_credits <= 1) {
1142                 CERROR("Need more than 1+%d credits from %s\n",
1143                        msg->ptlm_credits, libcfs_id2str(lpid));
1144                 return NULL;
1145         }
1146         
1147         cfs_write_lock_irqsave(g_lock, flags);
1148
1149         peer = kptllnd_id2peer_locked(lpid);
1150         if (peer != NULL) {
1151                 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
1152                         /* Completing HELLO handshake */
1153                         LASSERT(peer->peer_incarnation == 0);
1154
1155                         if (msg->ptlm_dststamp != 0 &&
1156                             msg->ptlm_dststamp != peer->peer_myincarnation) {
1157                                 cfs_write_unlock_irqrestore(g_lock, flags);
1158
1159                                 CERROR("Ignoring HELLO from %s: unexpected "
1160                                        "dststamp "LPX64" ("LPX64" wanted)\n",
1161                                        libcfs_id2str(lpid),
1162                                        msg->ptlm_dststamp,
1163                                        peer->peer_myincarnation);
1164                                 kptllnd_peer_decref(peer);
1165                                 return NULL;
1166                         }
1167                         
1168                         /* Concurrent initiation or response to my HELLO */
1169                         peer->peer_state = PEER_STATE_ACTIVE;
1170                         peer->peer_incarnation = msg->ptlm_srcstamp;
1171                         peer->peer_next_matchbits = safe_matchbits;
1172                         peer->peer_max_msg_size =
1173                                 msg->ptlm_u.hello.kptlhm_max_msg_size;
1174                         
1175                         cfs_write_unlock_irqrestore(g_lock, flags);
1176                         return peer;
1177                 }
1178
1179                 if (msg->ptlm_dststamp != 0 &&
1180                     msg->ptlm_dststamp <= peer->peer_myincarnation) {
1181                         cfs_write_unlock_irqrestore(g_lock, flags);
1182
1183                         CERROR("Ignoring stale HELLO from %s: "
1184                                "dststamp "LPX64" (current "LPX64")\n",
1185                                libcfs_id2str(lpid),
1186                                msg->ptlm_dststamp,
1187                                peer->peer_myincarnation);
1188                         kptllnd_peer_decref(peer);
1189                         return NULL;
1190                 }
1191
1192                 /* Brand new connection attempt: remove old incarnation */
1193                 kptllnd_peer_close_locked(peer, 0);
1194         }
1195
1196         kptllnd_cull_peertable_locked(lpid);
1197
1198         cfs_write_unlock_irqrestore(g_lock, flags);
1199
1200         if (peer != NULL) {
1201                 CDEBUG(D_NET, "Peer %s (%s) reconnecting:"
1202                        " stamp "LPX64"("LPX64")\n",
1203                        libcfs_id2str(lpid), kptllnd_ptlid2str(initiator),
1204                        msg->ptlm_srcstamp, peer->peer_incarnation);
1205
1206                 kptllnd_peer_decref(peer);
1207                 peer = NULL;
1208         }
1209
1210         hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1211         if (hello_tx == NULL) {
1212                 CERROR("Unable to allocate HELLO message for %s\n",
1213                        libcfs_id2str(lpid));
1214                 return NULL;
1215         }
1216
1217         kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1218                          lpid, sizeof(kptl_hello_msg_t));
1219
1220         new_peer = kptllnd_peer_allocate(net, lpid, initiator);
1221         if (new_peer == NULL) {
1222                 kptllnd_tx_decref(hello_tx);
1223                 return NULL;
1224         }
1225
1226         rc = kptllnd_peer_reserve_buffers();
1227         if (rc != 0) {
1228                 kptllnd_peer_decref(new_peer);
1229                 kptllnd_tx_decref(hello_tx);
1230
1231                 CERROR("Failed to reserve buffers for %s\n",
1232                        libcfs_id2str(lpid));
1233                 return NULL;
1234         }
1235
1236         cfs_write_lock_irqsave(g_lock, flags);
1237
1238  again:
1239         if (net->net_shutdown) {
1240                 cfs_write_unlock_irqrestore(g_lock, flags);
1241
1242                 CERROR ("Shutdown started, refusing connection from %s\n",
1243                         libcfs_id2str(lpid));
1244                 kptllnd_peer_unreserve_buffers();
1245                 kptllnd_peer_decref(new_peer);
1246                 kptllnd_tx_decref(hello_tx);
1247                 return NULL;
1248         }
1249
1250         peer = kptllnd_id2peer_locked(lpid);
1251         if (peer != NULL) {
1252                 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
1253                         /* An outgoing message instantiated 'peer' for me */
1254                         LASSERT(peer->peer_incarnation == 0);
1255
1256                         peer->peer_state = PEER_STATE_ACTIVE;
1257                         peer->peer_incarnation = msg->ptlm_srcstamp;
1258                         peer->peer_next_matchbits = safe_matchbits;
1259                         peer->peer_max_msg_size =
1260                                 msg->ptlm_u.hello.kptlhm_max_msg_size;
1261
1262                         cfs_write_unlock_irqrestore(g_lock, flags);
1263
1264                         CWARN("Outgoing instantiated peer %s\n",
1265                               libcfs_id2str(lpid));
1266                 } else {
1267                         LASSERT (peer->peer_state == PEER_STATE_ACTIVE);
1268
1269                         cfs_write_unlock_irqrestore(g_lock, flags);
1270
1271                         /* WOW!  Somehow this peer completed the HELLO
1272                          * handshake while I slept.  I guess I could have slept
1273                          * while it rebooted and sent a new HELLO, so I'll fail
1274                          * this one... */
1275                         CWARN("Wow! peer %s\n", libcfs_id2str(lpid));
1276                         kptllnd_peer_decref(peer);
1277                         peer = NULL;
1278                 }
1279
1280                 kptllnd_peer_unreserve_buffers();
1281                 kptllnd_peer_decref(new_peer);
1282                 kptllnd_tx_decref(hello_tx);
1283                 return peer;
1284         }
1285
1286         if (kptllnd_data.kptl_n_active_peers ==
1287             kptllnd_data.kptl_expected_peers) {
1288                 /* peer table full */
1289                 cfs_write_unlock_irqrestore(g_lock, flags);
1290
1291                 kptllnd_peertable_overflow_msg("Connection from ", lpid);
1292
1293                 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1294                 if (rc != 0) {
1295                         CERROR("Refusing connection from %s\n",
1296                                libcfs_id2str(lpid));
1297                         kptllnd_peer_unreserve_buffers();
1298                         kptllnd_peer_decref(new_peer);
1299                         kptllnd_tx_decref(hello_tx);
1300                         return NULL;
1301                 }
1302                 
1303                 cfs_write_lock_irqsave(g_lock, flags);
1304                 kptllnd_data.kptl_expected_peers++;
1305                 goto again;
1306         }
1307
1308         last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(lpid);
1309
1310         hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1311         hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1312                 *kptllnd_tunables.kptl_max_msg_size;
1313
1314         new_peer->peer_state = PEER_STATE_ACTIVE;
1315         new_peer->peer_incarnation = msg->ptlm_srcstamp;
1316         new_peer->peer_next_matchbits = safe_matchbits;
1317         new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1318         new_peer->peer_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size;
1319
1320         LASSERT (!net->net_shutdown);
1321         kptllnd_peer_add_peertable_locked(new_peer);
1322
1323         cfs_write_unlock_irqrestore(g_lock, flags);
1324
1325         /* NB someone else could get in now and post a message before I post
1326          * the HELLO, but post_tx/check_sends take care of that! */
1327
1328         CDEBUG(D_NETTRACE, "%s: post response hello %p\n",
1329                libcfs_id2str(new_peer->peer_id), hello_tx);
1330
1331         kptllnd_post_tx(new_peer, hello_tx, 0);
1332         kptllnd_peer_check_sends(new_peer);
1333
1334         return new_peer;
1335 }
1336
1337 void
1338 kptllnd_tx_launch(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
1339 {
1340         kptllnd_post_tx(peer, tx, nfrag);
1341         kptllnd_peer_check_sends(peer);
1342 }
1343
1344 int
1345 kptllnd_find_target(kptl_net_t *net, lnet_process_id_t target,
1346                     kptl_peer_t **peerp)
1347 {
1348         cfs_rwlock_t     *g_lock = &kptllnd_data.kptl_peer_rw_lock;
1349         ptl_process_id_t  ptl_id;
1350         kptl_peer_t      *new_peer;
1351         kptl_tx_t        *hello_tx;
1352         unsigned long     flags;
1353         int               rc;
1354         __u64             last_matchbits_seen;
1355
1356         /* I expect to find the peer, so I only take a read lock... */
1357         cfs_read_lock_irqsave(g_lock, flags);
1358         *peerp = kptllnd_id2peer_locked(target);
1359         cfs_read_unlock_irqrestore(g_lock, flags);
1360
1361         if (*peerp != NULL)
1362                 return 0;
1363
1364         if ((target.pid & LNET_PID_USERFLAG) != 0) {
1365                 CWARN("Refusing to create a new connection to %s "
1366                       "(non-kernel peer)\n", libcfs_id2str(target));
1367                 return -EHOSTUNREACH;
1368         }
1369
1370         /* The new peer is a kernel ptllnd, and kernel ptllnds all have the
1371          * same portals PID, which has nothing to do with LUSTRE_SRV_LNET_PID */
1372         ptl_id.nid = kptllnd_lnet2ptlnid(target.nid);
1373         ptl_id.pid = kptllnd_data.kptl_portals_id.pid;
1374
1375         hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1376         if (hello_tx == NULL) {
1377                 CERROR("Unable to allocate connect message for %s\n",
1378                        libcfs_id2str(target));
1379                 return -ENOMEM;
1380         }
1381
1382         hello_tx->tx_acked = 1;
1383         kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1384                          target, sizeof(kptl_hello_msg_t));
1385
1386         new_peer = kptllnd_peer_allocate(net, target, ptl_id);
1387         if (new_peer == NULL) {
1388                 rc = -ENOMEM;
1389                 goto unwind_0;
1390         }
1391
1392         rc = kptllnd_peer_reserve_buffers();
1393         if (rc != 0)
1394                 goto unwind_1;
1395
1396         cfs_write_lock_irqsave(g_lock, flags);
1397  again:
1398         /* Called only in lnd_send which can't happen after lnd_shutdown */
1399         LASSERT (!net->net_shutdown);
1400
1401         *peerp = kptllnd_id2peer_locked(target);
1402         if (*peerp != NULL) {
1403                 cfs_write_unlock_irqrestore(g_lock, flags);
1404                 goto unwind_2;
1405         }
1406
1407         kptllnd_cull_peertable_locked(target);
1408
1409         if (kptllnd_data.kptl_n_active_peers ==
1410             kptllnd_data.kptl_expected_peers) {
1411                 /* peer table full */
1412                 cfs_write_unlock_irqrestore(g_lock, flags);
1413
1414                 kptllnd_peertable_overflow_msg("Connection to ", target);
1415
1416                 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1417                 if (rc != 0) {
1418                         CERROR("Can't create connection to %s\n",
1419                                libcfs_id2str(target));
1420                         rc = -ENOMEM;
1421                         goto unwind_2;
1422                 }
1423                 cfs_write_lock_irqsave(g_lock, flags);
1424                 kptllnd_data.kptl_expected_peers++;
1425                 goto again;
1426         }
1427
1428         last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(target);
1429
1430         hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1431         hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1432                 *kptllnd_tunables.kptl_max_msg_size;
1433
1434         new_peer->peer_state = PEER_STATE_WAITING_HELLO;
1435         new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1436
1437         kptllnd_peer_add_peertable_locked(new_peer);
1438
1439         cfs_write_unlock_irqrestore(g_lock, flags);
1440
1441         /* NB someone else could get in now and post a message before I post
1442          * the HELLO, but post_tx/check_sends take care of that! */
1443
1444         CDEBUG(D_NETTRACE, "%s: post initial hello %p\n",
1445                libcfs_id2str(new_peer->peer_id), hello_tx);
1446
1447         kptllnd_post_tx(new_peer, hello_tx, 0);
1448         kptllnd_peer_check_sends(new_peer);
1449
1450         *peerp = new_peer;
1451         return 0;
1452
1453  unwind_2:
1454         kptllnd_peer_unreserve_buffers();
1455  unwind_1:
1456         kptllnd_peer_decref(new_peer);
1457  unwind_0:
1458         kptllnd_tx_decref(hello_tx);
1459
1460         return rc;
1461 }