Whamcloud - gitweb
LU-56 o2iblnd: CPT affinity o2iblnd
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd_peer.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  */
30 /*
31  * This file is part of Lustre, http://www.lustre.org/
32  * Lustre is a trademark of Sun Microsystems, Inc.
33  *
34  * lnet/klnds/ptllnd/ptllnd_peer.c
35  *
36  * Author: PJ Kirner <pjkirner@clusterfs.com>
37  * Author: E Barton <eeb@bartonsoftware.com>
38  */
39
40 #include "ptllnd.h"
41 #include <libcfs/list.h>
42
43 static int
44 kptllnd_count_queue(cfs_list_t *q)
45 {
46         cfs_list_t *e;
47         int         n = 0;
48
49         cfs_list_for_each(e, q) {
50                 n++;
51         }
52
53         return n;
54 }
55
56 int
57 kptllnd_get_peer_info(int index,
58                       lnet_process_id_t *id,
59                       int *state, int *sent_hello,
60                       int *refcount, __u64 *incarnation,
61                       __u64 *next_matchbits, __u64 *last_matchbits_seen,
62                       int *nsendq, int *nactiveq,
63                       int *credits, int *outstanding_credits)
64 {
65         cfs_rwlock_t     *g_lock = &kptllnd_data.kptl_peer_rw_lock;
66         unsigned long     flags;
67         cfs_list_t       *ptmp;
68         kptl_peer_t      *peer;
69         int               i;
70         int               rc = -ENOENT;
71
72         cfs_read_lock_irqsave(g_lock, flags);
73
74         for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++) {
75                 cfs_list_for_each (ptmp, &kptllnd_data.kptl_peers[i]) {
76                         peer = cfs_list_entry(ptmp, kptl_peer_t, peer_list);
77
78                         if (index-- > 0)
79                                 continue;
80
81                         *id          = peer->peer_id;
82                         *state       = peer->peer_state;
83                         *sent_hello  = peer->peer_sent_hello;
84                         *refcount    = cfs_atomic_read(&peer->peer_refcount);
85                         *incarnation = peer->peer_incarnation;
86
87                         cfs_spin_lock(&peer->peer_lock);
88
89                         *next_matchbits      = peer->peer_next_matchbits;
90                         *last_matchbits_seen = peer->peer_last_matchbits_seen;
91                         *credits             = peer->peer_credits;
92                         *outstanding_credits = peer->peer_outstanding_credits;
93
94                         *nsendq   = kptllnd_count_queue(&peer->peer_sendq);
95                         *nactiveq = kptllnd_count_queue(&peer->peer_activeq);
96
97                         cfs_spin_unlock(&peer->peer_lock);
98
99                         rc = 0;
100                         goto out;
101                 }
102         }
103
104  out:
105         cfs_read_unlock_irqrestore(g_lock, flags);
106         return rc;
107 }
108
109 void
110 kptllnd_peer_add_peertable_locked (kptl_peer_t *peer)
111 {
112         LASSERT (kptllnd_data.kptl_n_active_peers <
113                  kptllnd_data.kptl_expected_peers);
114
115         LASSERT (peer->peer_state == PEER_STATE_WAITING_HELLO ||
116                  peer->peer_state == PEER_STATE_ACTIVE);
117
118         kptllnd_data.kptl_n_active_peers++;
119         cfs_atomic_inc(&peer->peer_refcount);       /* +1 ref for the list */
120
121         /* NB add to HEAD of peer list for MRU order!
122          * (see kptllnd_cull_peertable) */
123         cfs_list_add(&peer->peer_list, kptllnd_nid2peerlist(peer->peer_id.nid));
124 }
125
126 void
127 kptllnd_cull_peertable_locked (lnet_process_id_t pid)
128 {
129         /* I'm about to add a new peer with this portals ID to the peer table,
130          * so (a) this peer should not exist already and (b) I want to leave at
131          * most (max_procs_per_nid - 1) peers with this NID in the table. */
132         cfs_list_t   *peers = kptllnd_nid2peerlist(pid.nid);
133         int           cull_count = *kptllnd_tunables.kptl_max_procs_per_node;
134         int           count;
135         cfs_list_t   *tmp;
136         cfs_list_t   *nxt;
137         kptl_peer_t  *peer;
138
139         count = 0;
140         cfs_list_for_each_safe (tmp, nxt, peers) {
141                 /* NB I rely on kptllnd_peer_add_peertable_locked to add peers
142                  * in MRU order */
143                 peer = cfs_list_entry(tmp, kptl_peer_t, peer_list);
144                         
145                 if (LNET_NIDADDR(peer->peer_id.nid) != LNET_NIDADDR(pid.nid))
146                         continue;
147
148                 LASSERT (peer->peer_id.pid != pid.pid);
149                         
150                 count++;
151
152                 if (count < cull_count) /* recent (don't cull) */
153                         continue;
154
155                 CDEBUG(D_NET, "Cull %s(%s)\n",
156                        libcfs_id2str(peer->peer_id),
157                        kptllnd_ptlid2str(peer->peer_ptlid));
158                 
159                 kptllnd_peer_close_locked(peer, 0);
160         }
161 }
162
163 kptl_peer_t *
164 kptllnd_peer_allocate (kptl_net_t *net, lnet_process_id_t lpid, ptl_process_id_t ppid)
165 {
166         unsigned long    flags;
167         kptl_peer_t     *peer;
168
169         LIBCFS_ALLOC(peer, sizeof (*peer));
170         if (peer == NULL) {
171                 CERROR("Can't create peer %s (%s)\n",
172                        libcfs_id2str(lpid), 
173                        kptllnd_ptlid2str(ppid));
174                 return NULL;
175         }
176
177         memset(peer, 0, sizeof(*peer));         /* zero flags etc */
178
179         CFS_INIT_LIST_HEAD (&peer->peer_noops);
180         CFS_INIT_LIST_HEAD (&peer->peer_sendq);
181         CFS_INIT_LIST_HEAD (&peer->peer_activeq);
182         cfs_spin_lock_init (&peer->peer_lock);
183
184         peer->peer_state = PEER_STATE_ALLOCATED;
185         peer->peer_error = 0;
186         peer->peer_last_alive = 0;
187         peer->peer_id = lpid;
188         peer->peer_ptlid = ppid;
189         peer->peer_credits = 1;                 /* enough for HELLO */
190         peer->peer_next_matchbits = PTL_RESERVED_MATCHBITS;
191         peer->peer_outstanding_credits = *kptllnd_tunables.kptl_peertxcredits - 1;
192         peer->peer_sent_credits = 1;           /* HELLO credit is implicit */
193         peer->peer_max_msg_size = PTLLND_MIN_BUFFER_SIZE; /* until we know better */
194
195         cfs_atomic_set(&peer->peer_refcount, 1);    /* 1 ref for caller */
196
197         cfs_write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
198
199         peer->peer_myincarnation = kptllnd_data.kptl_incarnation;
200
201         /* Only increase # peers under lock, to guarantee we dont grow it
202          * during shutdown */
203         if (net->net_shutdown) {
204                 cfs_write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
205                                             flags);
206                 LIBCFS_FREE(peer, sizeof(*peer));
207                 return NULL;
208         }
209
210         kptllnd_data.kptl_npeers++;
211         cfs_write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
212         return peer;
213 }
214
215 void
216 kptllnd_peer_destroy (kptl_peer_t *peer)
217 {
218         unsigned long flags;
219
220         CDEBUG(D_NET, "Peer=%p\n", peer);
221
222         LASSERT (!cfs_in_interrupt());
223         LASSERT (cfs_atomic_read(&peer->peer_refcount) == 0);
224         LASSERT (peer->peer_state == PEER_STATE_ALLOCATED ||
225                  peer->peer_state == PEER_STATE_ZOMBIE);
226         LASSERT (cfs_list_empty(&peer->peer_noops));
227         LASSERT (cfs_list_empty(&peer->peer_sendq));
228         LASSERT (cfs_list_empty(&peer->peer_activeq));
229
230         cfs_write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
231
232         if (peer->peer_state == PEER_STATE_ZOMBIE)
233                 cfs_list_del(&peer->peer_list);
234
235         kptllnd_data.kptl_npeers--;
236
237         cfs_write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
238
239         LIBCFS_FREE (peer, sizeof (*peer));
240 }
241
242 void
243 kptllnd_cancel_txlist (cfs_list_t *peerq, cfs_list_t *txs)
244 {
245         cfs_list_t  *tmp;
246         cfs_list_t  *nxt;
247         kptl_tx_t   *tx;
248
249         cfs_list_for_each_safe (tmp, nxt, peerq) {
250                 tx = cfs_list_entry(tmp, kptl_tx_t, tx_list);
251
252                 cfs_list_del(&tx->tx_list);
253                 cfs_list_add_tail(&tx->tx_list, txs);
254
255                 tx->tx_status = -EIO;
256                 tx->tx_active = 0;
257         }
258 }
259
260 void
261 kptllnd_peer_cancel_txs(kptl_peer_t *peer, cfs_list_t *txs)
262 {
263         unsigned long   flags;
264
265         cfs_spin_lock_irqsave(&peer->peer_lock, flags);
266
267         kptllnd_cancel_txlist(&peer->peer_noops, txs);
268         kptllnd_cancel_txlist(&peer->peer_sendq, txs);
269         kptllnd_cancel_txlist(&peer->peer_activeq, txs);
270                 
271         cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
272 }
273
274 void
275 kptllnd_peer_alive (kptl_peer_t *peer)
276 {
277         /* This is racy, but everyone's only writing cfs_time_current() */
278         peer->peer_last_alive = cfs_time_current();
279         cfs_mb();
280 }
281
282 void
283 kptllnd_peer_notify (kptl_peer_t *peer)
284 {
285         unsigned long flags;
286         kptl_net_t   *net;
287         kptl_net_t  **nets;
288         int           i = 0;
289         int           nnets = 0;
290         int           error = 0;
291         cfs_time_t    last_alive = 0;
292
293         cfs_spin_lock_irqsave(&peer->peer_lock, flags);
294
295         if (peer->peer_error != 0) {
296                 error = peer->peer_error;
297                 peer->peer_error = 0;
298                 last_alive = peer->peer_last_alive;
299         }
300
301         cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
302
303         if (error == 0)
304                 return;
305
306         cfs_read_lock(&kptllnd_data.kptl_net_rw_lock);
307         cfs_list_for_each_entry (net, &kptllnd_data.kptl_nets, net_list)
308                 nnets++;
309         cfs_read_unlock(&kptllnd_data.kptl_net_rw_lock);
310
311         if (nnets == 0) /* shutdown in progress */
312                 return;
313
314         LIBCFS_ALLOC(nets, nnets * sizeof(*nets));
315         if (nets == NULL) {
316                 CERROR("Failed to allocate nets[%d]\n", nnets);
317                 return;
318         }
319         memset(nets, 0, nnets * sizeof(*nets));
320
321         cfs_read_lock(&kptllnd_data.kptl_net_rw_lock);
322         i = 0;
323         cfs_list_for_each_entry (net, &kptllnd_data.kptl_nets, net_list) {
324                 LASSERT (i < nnets);
325                 nets[i] = net;
326                 kptllnd_net_addref(net);
327                 i++;
328         }
329         cfs_read_unlock(&kptllnd_data.kptl_net_rw_lock);
330
331         for (i = 0; i < nnets; i++) {
332                 lnet_nid_t peer_nid;
333
334                 net = nets[i];
335                 if (net == NULL)
336                         break;
337
338                 if (!net->net_shutdown) {
339                         peer_nid = kptllnd_ptl2lnetnid(net->net_ni->ni_nid,
340                                                        peer->peer_ptlid.nid);
341                         lnet_notify(net->net_ni, peer_nid, 0, last_alive);
342                 }
343
344                 kptllnd_net_decref(net);
345         }
346
347         LIBCFS_FREE(nets, nnets * sizeof(*nets));
348 }
349
350 void
351 kptllnd_handle_closing_peers ()
352 {
353         unsigned long           flags;
354         cfs_list_t              txs;
355         kptl_peer_t            *peer;
356         cfs_list_t             *tmp;
357         cfs_list_t             *nxt;
358         kptl_tx_t              *tx;
359         int                     idle;
360
361         /* Check with a read lock first to avoid blocking anyone */
362
363         cfs_read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
364         idle = cfs_list_empty(&kptllnd_data.kptl_closing_peers) &&
365                cfs_list_empty(&kptllnd_data.kptl_zombie_peers);
366         cfs_read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
367
368         if (idle)
369                 return;
370
371         CFS_INIT_LIST_HEAD(&txs);
372
373         cfs_write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
374
375         /* Cancel txs on all zombie peers.  NB anyone dropping the last peer
376          * ref removes it from this list, so I musn't drop the lock while
377          * scanning it. */
378         cfs_list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
379                 peer = cfs_list_entry (tmp, kptl_peer_t, peer_list);
380
381                 LASSERT (peer->peer_state == PEER_STATE_ZOMBIE);
382
383                 kptllnd_peer_cancel_txs(peer, &txs);
384         }
385
386         /* Notify LNET and cancel txs on closing (i.e. newly closed) peers.  NB
387          * I'm the only one removing from this list, but peers can be added on
388          * the end any time I drop the lock. */
389
390         cfs_list_for_each_safe (tmp, nxt, &kptllnd_data.kptl_closing_peers) {
391                 peer = cfs_list_entry (tmp, kptl_peer_t, peer_list);
392
393                 LASSERT (peer->peer_state == PEER_STATE_CLOSING);
394
395                 cfs_list_del(&peer->peer_list);
396                 cfs_list_add_tail(&peer->peer_list,
397                                   &kptllnd_data.kptl_zombie_peers);
398                 peer->peer_state = PEER_STATE_ZOMBIE;
399
400                 cfs_write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
401                                             flags);
402
403                 kptllnd_peer_notify(peer);
404                 kptllnd_peer_cancel_txs(peer, &txs);
405                 kptllnd_peer_decref(peer);
406
407                 cfs_write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
408         }
409
410         cfs_write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
411
412         /* Drop peer's ref on all cancelled txs.  This will get
413          * kptllnd_tx_fini() to abort outstanding comms if necessary. */
414
415         cfs_list_for_each_safe (tmp, nxt, &txs) {
416                 tx = cfs_list_entry(tmp, kptl_tx_t, tx_list);
417                 cfs_list_del(&tx->tx_list);
418                 kptllnd_tx_decref(tx);
419         }
420 }
421
422 void
423 kptllnd_peer_close_locked(kptl_peer_t *peer, int why)
424 {
425         switch (peer->peer_state) {
426         default:
427                 LBUG();
428
429         case PEER_STATE_WAITING_HELLO:
430         case PEER_STATE_ACTIVE:
431                 /* Ensure new peers see a new incarnation of me */
432                 LASSERT(peer->peer_myincarnation <= kptllnd_data.kptl_incarnation);
433                 if (peer->peer_myincarnation == kptllnd_data.kptl_incarnation)
434                         kptllnd_data.kptl_incarnation++;
435
436                 /* Removing from peer table */
437                 kptllnd_data.kptl_n_active_peers--;
438                 LASSERT (kptllnd_data.kptl_n_active_peers >= 0);
439
440                 cfs_list_del(&peer->peer_list);
441                 kptllnd_peer_unreserve_buffers();
442
443                 peer->peer_error = why; /* stash 'why' only on first close */
444                 peer->peer_state = PEER_STATE_CLOSING;
445
446                 /* Schedule for immediate attention, taking peer table's ref */
447                 cfs_list_add_tail(&peer->peer_list,
448                                  &kptllnd_data.kptl_closing_peers);
449                 cfs_waitq_signal(&kptllnd_data.kptl_watchdog_waitq);
450                 break;
451
452         case PEER_STATE_ZOMBIE:
453         case PEER_STATE_CLOSING:
454                 break;
455         }
456 }
457
458 void
459 kptllnd_peer_close(kptl_peer_t *peer, int why)
460 {
461         unsigned long      flags;
462
463         cfs_write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
464         kptllnd_peer_close_locked(peer, why);
465         cfs_write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
466 }
467
468 int
469 kptllnd_peer_del(lnet_process_id_t id)
470 {
471         cfs_list_t        *ptmp;
472         cfs_list_t        *pnxt;
473         kptl_peer_t       *peer;
474         int                lo;
475         int                hi;
476         int                i;
477         unsigned long      flags;
478         int                rc = -ENOENT;
479
480         /*
481          * Find the single bucket we are supposed to look at or if nid is a
482          * wildcard (LNET_NID_ANY) then look at all of the buckets
483          */
484         if (id.nid != LNET_NID_ANY) {
485                 cfs_list_t *l = kptllnd_nid2peerlist(id.nid);
486
487                 lo = hi =  l - kptllnd_data.kptl_peers;
488         } else {
489                 if (id.pid != LNET_PID_ANY)
490                         return -EINVAL;
491
492                 lo = 0;
493                 hi = kptllnd_data.kptl_peer_hash_size - 1;
494         }
495
496 again:
497         cfs_read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
498
499         for (i = lo; i <= hi; i++) {
500                 cfs_list_for_each_safe (ptmp, pnxt,
501                                         &kptllnd_data.kptl_peers[i]) {
502                         peer = cfs_list_entry (ptmp, kptl_peer_t, peer_list);
503
504                         if (!(id.nid == LNET_NID_ANY || 
505                               (LNET_NIDADDR(peer->peer_id.nid) == LNET_NIDADDR(id.nid) &&
506                                (id.pid == LNET_PID_ANY || 
507                                 peer->peer_id.pid == id.pid))))
508                                 continue;
509
510                         kptllnd_peer_addref(peer); /* 1 ref for me... */
511
512                         cfs_read_unlock_irqrestore(&kptllnd_data. \
513                                                    kptl_peer_rw_lock,
514                                                    flags);
515
516                         kptllnd_peer_close(peer, 0);
517                         kptllnd_peer_decref(peer); /* ...until here */
518
519                         rc = 0;         /* matched something */
520
521                         /* start again now I've dropped the lock */
522                         goto again;
523                 }
524         }
525
526         cfs_read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
527
528         return (rc);
529 }
530
531 void
532 kptllnd_queue_tx(kptl_peer_t *peer, kptl_tx_t *tx)
533 {
534         /* CAVEAT EMPTOR: I take over caller's ref on 'tx' */
535         unsigned long flags;
536
537         cfs_spin_lock_irqsave(&peer->peer_lock, flags);
538
539         /* Ensure HELLO is sent first */
540         if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_NOOP)
541                 cfs_list_add(&tx->tx_list, &peer->peer_noops);
542         else if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_HELLO)
543                 cfs_list_add(&tx->tx_list, &peer->peer_sendq);
544         else
545                 cfs_list_add_tail(&tx->tx_list, &peer->peer_sendq);
546
547         cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
548 }
549
550
551 void
552 kptllnd_post_tx(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
553 {
554         /* CAVEAT EMPTOR: I take over caller's ref on 'tx' */
555         ptl_handle_md_t  msg_mdh;
556         ptl_md_t         md;
557         ptl_err_t        prc;
558
559         LASSERT (!tx->tx_idle);
560         LASSERT (!tx->tx_active);
561         LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
562         LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
563         LASSERT (tx->tx_type == TX_TYPE_SMALL_MESSAGE ||
564                  tx->tx_type == TX_TYPE_PUT_REQUEST ||
565                  tx->tx_type == TX_TYPE_GET_REQUEST);
566
567         kptllnd_set_tx_peer(tx, peer);
568
569         memset(&md, 0, sizeof(md));
570
571         md.threshold = tx->tx_acked ? 2 : 1;    /* SEND END + ACK? */
572         md.options = PTL_MD_OP_PUT |
573                      PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
574                      PTL_MD_EVENT_START_DISABLE;
575         md.user_ptr = &tx->tx_msg_eventarg;
576         md.eq_handle = kptllnd_data.kptl_eqh;
577
578         if (nfrag == 0) {
579                 md.start = tx->tx_msg;
580                 md.length = tx->tx_msg->ptlm_nob;
581         } else {
582                 LASSERT (nfrag > 1);
583                 LASSERT (tx->tx_frags->iov[0].iov_base == (void *)tx->tx_msg);
584
585                 md.start = tx->tx_frags;
586                 md.length = nfrag;
587                 md.options |= PTL_MD_IOVEC;
588         }
589
590         prc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &msg_mdh);
591         if (prc != PTL_OK) {
592                 CERROR("PtlMDBind(%s) failed: %s(%d)\n",
593                        libcfs_id2str(peer->peer_id),
594                        kptllnd_errtype2str(prc), prc);
595                 tx->tx_status = -EIO;
596                 kptllnd_tx_decref(tx);
597                 return;
598         }
599
600
601         tx->tx_deadline = jiffies + (*kptllnd_tunables.kptl_timeout * CFS_HZ);
602         tx->tx_active = 1;
603         tx->tx_msg_mdh = msg_mdh;
604         kptllnd_queue_tx(peer, tx);
605 }
606
607 /* NB "restarts" comes from peer_sendq of a single peer */
608 void
609 kptllnd_restart_txs (kptl_net_t *net, lnet_process_id_t target,
610                      cfs_list_t *restarts)
611 {
612         kptl_tx_t   *tx;
613         kptl_tx_t   *tmp;
614         kptl_peer_t *peer;
615
616         LASSERT (!cfs_list_empty(restarts));
617
618         if (kptllnd_find_target(net, target, &peer) != 0)
619                 peer = NULL;
620
621         cfs_list_for_each_entry_safe (tx, tmp, restarts, tx_list) {
622                 LASSERT (tx->tx_peer != NULL);
623                 LASSERT (tx->tx_type == TX_TYPE_GET_REQUEST ||
624                          tx->tx_type == TX_TYPE_PUT_REQUEST ||
625                          tx->tx_type == TX_TYPE_SMALL_MESSAGE);
626
627                 cfs_list_del_init(&tx->tx_list);
628
629                 if (peer == NULL ||
630                     tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {
631                         kptllnd_tx_decref(tx);
632                         continue;
633                 }
634
635                 LASSERT (tx->tx_msg->ptlm_type != PTLLND_MSG_TYPE_NOOP);
636                 tx->tx_status = 0;
637                 tx->tx_active = 1;
638                 kptllnd_peer_decref(tx->tx_peer);
639                 tx->tx_peer = NULL;
640                 kptllnd_set_tx_peer(tx, peer);
641                 kptllnd_queue_tx(peer, tx); /* takes over my ref on tx */
642         }
643
644         if (peer == NULL)
645                 return;
646
647         kptllnd_peer_check_sends(peer);
648         kptllnd_peer_decref(peer);
649 }
650
651 static inline int
652 kptllnd_peer_send_noop (kptl_peer_t *peer)
653 {
654         if (!peer->peer_sent_hello ||
655             peer->peer_credits == 0 ||
656             !cfs_list_empty(&peer->peer_noops) ||
657             peer->peer_outstanding_credits < PTLLND_CREDIT_HIGHWATER)
658                 return 0;
659
660         /* No tx to piggyback NOOP onto or no credit to send a tx */
661         return (cfs_list_empty(&peer->peer_sendq) || peer->peer_credits == 1);
662 }
663
664 void
665 kptllnd_peer_check_sends (kptl_peer_t *peer)
666 {
667         ptl_handle_me_t  meh;
668         kptl_tx_t       *tx;
669         int              rc;
670         int              msg_type;
671         unsigned long    flags;
672
673         LASSERT(!cfs_in_interrupt());
674
675         cfs_spin_lock_irqsave(&peer->peer_lock, flags);
676
677         peer->peer_retry_noop = 0;
678
679         if (kptllnd_peer_send_noop(peer)) {
680                 /* post a NOOP to return credits */
681                 cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
682
683                 tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
684                 if (tx == NULL) {
685                         CERROR("Can't return credits to %s: can't allocate descriptor\n",
686                                libcfs_id2str(peer->peer_id));
687                 } else {
688                         kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_NOOP,
689                                          peer->peer_id, 0);
690                         kptllnd_post_tx(peer, tx, 0);
691                 }
692
693                 cfs_spin_lock_irqsave(&peer->peer_lock, flags);
694                 peer->peer_retry_noop = (tx == NULL);
695         }
696
697         for (;;) {
698                 if (!cfs_list_empty(&peer->peer_noops)) {
699                         LASSERT (peer->peer_sent_hello);
700                         tx = cfs_list_entry(peer->peer_noops.next,
701                                             kptl_tx_t, tx_list);
702                 } else if (!cfs_list_empty(&peer->peer_sendq)) {
703                         tx = cfs_list_entry(peer->peer_sendq.next,
704                                             kptl_tx_t, tx_list);
705                 } else {
706                         /* nothing to send right now */
707                         break;
708                 }
709
710                 LASSERT (tx->tx_active);
711                 LASSERT (!PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
712                 LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
713
714                 LASSERT (peer->peer_outstanding_credits >= 0);
715                 LASSERT (peer->peer_sent_credits >= 0);
716                 LASSERT (peer->peer_sent_credits +
717                          peer->peer_outstanding_credits <=
718                          *kptllnd_tunables.kptl_peertxcredits);
719                 LASSERT (peer->peer_credits >= 0);
720
721                 msg_type = tx->tx_msg->ptlm_type;
722
723                 /* Ensure HELLO is sent first */
724                 if (!peer->peer_sent_hello) {
725                         LASSERT (cfs_list_empty(&peer->peer_noops));
726                         if (msg_type != PTLLND_MSG_TYPE_HELLO)
727                                 break;
728                         peer->peer_sent_hello = 1;
729                 }
730
731                 if (peer->peer_credits == 0) {
732                         CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: no credits for %s[%p]\n",
733                                libcfs_id2str(peer->peer_id), 
734                                peer->peer_credits,
735                                peer->peer_outstanding_credits, 
736                                peer->peer_sent_credits, 
737                                kptllnd_msgtype2str(msg_type), tx);
738                         break;
739                 }
740
741                 /* Last/Initial credit reserved for NOOP/HELLO */
742                 if (peer->peer_credits == 1 &&
743                     msg_type != PTLLND_MSG_TYPE_HELLO &&
744                     msg_type != PTLLND_MSG_TYPE_NOOP) {
745                         CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: "
746                                "not using last credit for %s[%p]\n",
747                                libcfs_id2str(peer->peer_id), 
748                                peer->peer_credits,
749                                peer->peer_outstanding_credits,
750                                peer->peer_sent_credits,
751                                kptllnd_msgtype2str(msg_type), tx);
752                         break;
753                 }
754
755                 cfs_list_del(&tx->tx_list);
756
757                 /* Discard any NOOP I queued if I'm not at the high-water mark
758                  * any more or more messages have been queued */
759                 if (msg_type == PTLLND_MSG_TYPE_NOOP &&
760                     !kptllnd_peer_send_noop(peer)) {
761                         tx->tx_active = 0;
762
763                         cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
764
765                         CDEBUG(D_NET, "%s: redundant noop\n", 
766                                libcfs_id2str(peer->peer_id));
767                         kptllnd_tx_decref(tx);
768
769                         cfs_spin_lock_irqsave(&peer->peer_lock, flags);
770                         continue;
771                 }
772
773                 /* fill last-minute msg fields */
774                 kptllnd_msg_pack(tx->tx_msg, peer);
775
776                 if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
777                     tx->tx_type == TX_TYPE_GET_REQUEST) {
778                         /* peer_next_matchbits must be known good */
779                         LASSERT (peer->peer_state >= PEER_STATE_ACTIVE);
780                         /* Assume 64-bit matchbits can't wrap */
781                         LASSERT (peer->peer_next_matchbits >= PTL_RESERVED_MATCHBITS);
782                         tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits =
783                                 peer->peer_next_matchbits++;
784                 }
785
786                 peer->peer_sent_credits += peer->peer_outstanding_credits;
787                 peer->peer_outstanding_credits = 0;
788                 peer->peer_credits--;
789
790                 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: %s tx=%p nob=%d cred=%d\n",
791                        libcfs_id2str(peer->peer_id), peer->peer_credits,
792                        peer->peer_outstanding_credits, peer->peer_sent_credits,
793                        kptllnd_msgtype2str(msg_type), tx, tx->tx_msg->ptlm_nob,
794                        tx->tx_msg->ptlm_credits);
795
796                 cfs_list_add_tail(&tx->tx_list, &peer->peer_activeq);
797
798                 kptllnd_tx_addref(tx);          /* 1 ref for me... */
799
800                 cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
801
802                 if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
803                     tx->tx_type == TX_TYPE_GET_REQUEST) {
804                         /* Post bulk now we have safe matchbits */
805                         rc = PtlMEAttach(kptllnd_data.kptl_nih,
806                                          *kptllnd_tunables.kptl_portal,
807                                          peer->peer_ptlid,
808                                          tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits,
809                                          0,             /* ignore bits */
810                                          PTL_UNLINK,
811                                          PTL_INS_BEFORE,
812                                          &meh);
813                         if (rc != PTL_OK) {
814                                 CERROR("PtlMEAttach(%s) failed: %s(%d)\n",
815                                        libcfs_id2str(peer->peer_id),
816                                        kptllnd_errtype2str(rc), rc);
817                                 goto failed;
818                         }
819
820                         rc = PtlMDAttach(meh, tx->tx_rdma_md, PTL_UNLINK,
821                                          &tx->tx_rdma_mdh);
822                         if (rc != PTL_OK) {
823                                 CERROR("PtlMDAttach(%s) failed: %s(%d)\n",
824                                        libcfs_id2str(tx->tx_peer->peer_id),
825                                        kptllnd_errtype2str(rc), rc);
826                                 rc = PtlMEUnlink(meh);
827                                 LASSERT(rc == PTL_OK);
828                                 tx->tx_rdma_mdh = PTL_INVALID_HANDLE;
829                                 goto failed;
830                         }
831                         /* I'm not racing with the event callback here.  It's a
832                          * bug if there's an event on the MD I just attached
833                          * before I actually send the RDMA request message -
834                          * probably matchbits re-used in error. */
835                 }
836
837                 tx->tx_tposted = jiffies;       /* going on the wire */
838
839                 rc = PtlPut (tx->tx_msg_mdh,
840                              tx->tx_acked ? PTL_ACK_REQ : PTL_NOACK_REQ,
841                              peer->peer_ptlid,
842                              *kptllnd_tunables.kptl_portal,
843                              0,                 /* acl cookie */
844                              LNET_MSG_MATCHBITS,
845                              0,                 /* offset */
846                              0);                /* header data */
847                 if (rc != PTL_OK) {
848                         CERROR("PtlPut %s error %s(%d)\n",
849                                libcfs_id2str(peer->peer_id),
850                                kptllnd_errtype2str(rc), rc);
851                         goto failed;
852                 }
853
854                 kptllnd_tx_decref(tx);          /* drop my ref */
855
856                 cfs_spin_lock_irqsave(&peer->peer_lock, flags);
857         }
858
859         cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
860         return;
861
862  failed:
863         /* Nuke everything (including tx we were trying) */
864         kptllnd_peer_close(peer, -EIO);
865         kptllnd_tx_decref(tx);
866         kptllnd_schedule_ptltrace_dump();
867 }
868
869 kptl_tx_t *
870 kptllnd_find_timed_out_tx(kptl_peer_t *peer)
871 {
872         kptl_tx_t         *tx;
873         cfs_list_t        *ele;
874
875         cfs_list_for_each(ele, &peer->peer_sendq) {
876                 tx = cfs_list_entry(ele, kptl_tx_t, tx_list);
877
878                 if (cfs_time_aftereq(jiffies, tx->tx_deadline)) {
879                         kptllnd_tx_addref(tx);
880                         return tx;
881                 }
882         }
883
884         cfs_list_for_each(ele, &peer->peer_activeq) {
885                 tx = cfs_list_entry(ele, kptl_tx_t, tx_list);
886
887                 if (cfs_time_aftereq(jiffies, tx->tx_deadline)) {
888                         kptllnd_tx_addref(tx);
889                         return tx;
890                 }
891         }
892
893         return NULL;
894 }
895
896
897 void
898 kptllnd_peer_check_bucket (int idx, int stamp)
899 {
900         cfs_list_t        *peers = &kptllnd_data.kptl_peers[idx];
901         kptl_peer_t       *peer;
902         unsigned long      flags;
903
904         CDEBUG(D_NET, "Bucket=%d, stamp=%d\n", idx, stamp);
905
906  again:
907         /* NB. Shared lock while I just look */
908         cfs_read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
909
910         cfs_list_for_each_entry (peer, peers, peer_list) {
911                 kptl_tx_t *tx;
912                 int        check_sends;
913                 int        c = -1, oc = -1, sc = -1;
914                 int        nsend = -1, nactive = -1;
915                 int        sent_hello = -1, state = -1;
916
917                 CDEBUG(D_NET, "Peer=%s Credits=%d Outstanding=%d Send=%d\n",
918                        libcfs_id2str(peer->peer_id), peer->peer_credits, 
919                        peer->peer_outstanding_credits, peer->peer_sent_credits);
920
921                 cfs_spin_lock(&peer->peer_lock);
922
923                 if (peer->peer_check_stamp == stamp) {
924                         /* checked already this pass */
925                         cfs_spin_unlock(&peer->peer_lock);
926                         continue;
927                 }
928
929                 peer->peer_check_stamp = stamp;
930                 tx = kptllnd_find_timed_out_tx(peer);
931                 check_sends = peer->peer_retry_noop;
932
933                 if (tx != NULL) {
934                         c  = peer->peer_credits;
935                         sc = peer->peer_sent_credits;
936                         oc = peer->peer_outstanding_credits;
937                         state      = peer->peer_state;
938                         sent_hello = peer->peer_sent_hello;
939                         nsend   = kptllnd_count_queue(&peer->peer_sendq);
940                         nactive = kptllnd_count_queue(&peer->peer_activeq);
941                 }
942
943                 cfs_spin_unlock(&peer->peer_lock);
944
945                 if (tx == NULL && !check_sends)
946                         continue;
947
948                 kptllnd_peer_addref(peer); /* 1 ref for me... */
949
950                 cfs_read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
951                                            flags);
952
953                 if (tx == NULL) { /* nothing timed out */
954                         kptllnd_peer_check_sends(peer);
955                         kptllnd_peer_decref(peer); /* ...until here or... */
956
957                         /* rescan after dropping the lock */
958                         goto again;
959                 }
960
961                 LCONSOLE_ERROR_MSG(0x126, "Timing out %s: %s\n",
962                                    libcfs_id2str(peer->peer_id),
963                                    (tx->tx_tposted == 0) ?
964                                    "no free peer buffers" :
965                                    "please check Portals");
966
967                 if (tx->tx_tposted) {
968                         CERROR("Could not send to %s after %ds (sent %lds ago); "
969                                 "check Portals for possible issues\n",
970                                 libcfs_id2str(peer->peer_id),
971                                 *kptllnd_tunables.kptl_timeout,
972                                 cfs_duration_sec(jiffies - tx->tx_tposted));
973                 } else if (state < PEER_STATE_ACTIVE) {
974                         CERROR("Could not connect %s (%d) after %ds; "
975                                "peer might be down\n",
976                                libcfs_id2str(peer->peer_id), state,
977                                *kptllnd_tunables.kptl_timeout);
978                 } else {
979                         CERROR("Could not get credits for %s after %ds; "
980                                 "possible Lustre networking issues\n",
981                         libcfs_id2str(peer->peer_id),
982                         *kptllnd_tunables.kptl_timeout);
983                 }
984
985                 CERROR("%s timed out: cred %d outstanding %d, sent %d, "
986                        "state %d, sent_hello %d, sendq %d, activeq %d "
987                        "Tx %p %s %s (%s%s%s) status %d %sposted %lu T/O %ds\n",
988                        libcfs_id2str(peer->peer_id), c, oc, sc,
989                        state, sent_hello, nsend, nactive,
990                        tx, kptllnd_tx_typestr(tx->tx_type),
991                        kptllnd_msgtype2str(tx->tx_msg->ptlm_type),
992                        tx->tx_active ? "A" : "",
993                        PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ?
994                        "" : "M",
995                        PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE) ?
996                        "" : "D",
997                        tx->tx_status,
998                        (tx->tx_tposted == 0) ? "not " : "",
999                        (tx->tx_tposted == 0) ? 0UL : (jiffies - tx->tx_tposted),
1000                        *kptllnd_tunables.kptl_timeout);
1001
1002 #ifdef CRAY_XT3
1003                 if (*kptllnd_tunables.kptl_ptltrace_on_timeout)
1004                         kptllnd_dump_ptltrace();
1005 #endif
1006
1007                 kptllnd_tx_decref(tx);
1008
1009                 kptllnd_peer_close(peer, -ETIMEDOUT);
1010                 kptllnd_peer_decref(peer); /* ...until here */
1011
1012                 /* start again now I've dropped the lock */
1013                 goto again;
1014         }
1015
1016         cfs_read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
1017 }
1018
1019 kptl_peer_t *
1020 kptllnd_id2peer_locked (lnet_process_id_t id)
1021 {
1022         cfs_list_t       *peers = kptllnd_nid2peerlist(id.nid);
1023         cfs_list_t       *tmp;
1024         kptl_peer_t      *peer;
1025
1026         cfs_list_for_each (tmp, peers) {
1027                 peer = cfs_list_entry (tmp, kptl_peer_t, peer_list);
1028
1029                 LASSERT(peer->peer_state == PEER_STATE_WAITING_HELLO ||
1030                         peer->peer_state == PEER_STATE_ACTIVE);
1031
1032                 /* NB logical LNet peers share one kptl_peer_t */
1033                 if (peer->peer_id.pid != id.pid ||
1034                     LNET_NIDADDR(id.nid) != LNET_NIDADDR(peer->peer_id.nid))
1035                         continue;
1036
1037                 kptllnd_peer_addref(peer);
1038
1039                 CDEBUG(D_NET, "%s -> %s (%d)\n",
1040                        libcfs_id2str(id),
1041                        kptllnd_ptlid2str(peer->peer_ptlid),
1042                        cfs_atomic_read (&peer->peer_refcount));
1043                 return peer;
1044         }
1045
1046         return NULL;
1047 }
1048
1049 void
1050 kptllnd_peertable_overflow_msg(char *str, lnet_process_id_t id)
1051 {
1052         LCONSOLE_ERROR_MSG(0x127, "%s %s overflows the peer table[%d]: "
1053                            "messages may be dropped\n",
1054                            str, libcfs_id2str(id),
1055                            kptllnd_data.kptl_n_active_peers);
1056         LCONSOLE_ERROR_MSG(0x128, "Please correct by increasing "
1057                            "'max_nodes' or 'max_procs_per_node'\n");
1058 }
1059
1060 __u64
1061 kptllnd_get_last_seen_matchbits_locked(lnet_process_id_t lpid)
1062 {
1063         kptl_peer_t  *peer;
1064         cfs_list_t   *tmp;
1065
1066         /* Find the last matchbits I saw this new peer using.  Note..
1067            A. This peer cannot be in the peer table - she's new!
1068            B. If I can't find the peer in the closing/zombie peers, all
1069               matchbits are safe because all refs to the (old) peer have gone
1070               so all txs have completed so there's no risk of matchbit
1071               collision!
1072          */
1073
1074         LASSERT(kptllnd_id2peer_locked(lpid) == NULL);
1075
1076         /* peer's last matchbits can't change after it comes out of the peer
1077          * table, so first match is fine */
1078
1079         cfs_list_for_each (tmp, &kptllnd_data.kptl_closing_peers) {
1080                 peer = cfs_list_entry (tmp, kptl_peer_t, peer_list);
1081
1082                 if (LNET_NIDADDR(peer->peer_id.nid) == LNET_NIDADDR(lpid.nid) &&
1083                     peer->peer_id.pid == lpid.pid)
1084                         return peer->peer_last_matchbits_seen;
1085         }
1086
1087         cfs_list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
1088                 peer = cfs_list_entry (tmp, kptl_peer_t, peer_list);
1089
1090                 if (LNET_NIDADDR(peer->peer_id.nid) == LNET_NIDADDR(lpid.nid) &&
1091                     peer->peer_id.pid == lpid.pid)
1092                         return peer->peer_last_matchbits_seen;
1093         }
1094
1095         return PTL_RESERVED_MATCHBITS;
1096 }
1097
1098 kptl_peer_t *
1099 kptllnd_peer_handle_hello (kptl_net_t *net,
1100                            ptl_process_id_t initiator, kptl_msg_t *msg)
1101 {
1102         cfs_rwlock_t       *g_lock = &kptllnd_data.kptl_peer_rw_lock;
1103         kptl_peer_t        *peer;
1104         kptl_peer_t        *new_peer;
1105         lnet_process_id_t   lpid;
1106         unsigned long       flags;
1107         kptl_tx_t          *hello_tx;
1108         int                 rc;
1109         __u64               safe_matchbits;
1110         __u64               last_matchbits_seen;
1111
1112         lpid.nid = msg->ptlm_srcnid;
1113         lpid.pid = msg->ptlm_srcpid;
1114
1115         CDEBUG(D_NET, "hello from %s(%s)\n",
1116                libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
1117
1118         if (initiator.pid != kptllnd_data.kptl_portals_id.pid &&
1119             (msg->ptlm_srcpid & LNET_PID_USERFLAG) == 0) {
1120                 /* If the peer's PID isn't _the_ ptllnd kernel pid, she must be
1121                  * userspace.  Refuse the connection if she hasn't set the
1122                  * correct flag in her PID... */
1123                 CERROR("Userflag not set in hello from %s (%s)\n",
1124                        libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
1125                 return NULL;
1126         }
1127         
1128         /* kptlhm_matchbits are the highest matchbits my peer may have used to
1129          * RDMA to me.  I ensure I never register buffers for RDMA that could
1130          * match any she used */
1131         safe_matchbits = msg->ptlm_u.hello.kptlhm_matchbits + 1;
1132
1133         if (safe_matchbits < PTL_RESERVED_MATCHBITS) {
1134                 CERROR("Illegal matchbits "LPX64" in HELLO from %s\n",
1135                        safe_matchbits, libcfs_id2str(lpid));
1136                 return NULL;
1137         }
1138         
1139         if (msg->ptlm_u.hello.kptlhm_max_msg_size < PTLLND_MIN_BUFFER_SIZE) {
1140                 CERROR("%s: max message size %d < MIN %d",
1141                        libcfs_id2str(lpid),
1142                        msg->ptlm_u.hello.kptlhm_max_msg_size,
1143                        PTLLND_MIN_BUFFER_SIZE);
1144                 return NULL;
1145         }
1146
1147         if (msg->ptlm_credits <= 1) {
1148                 CERROR("Need more than 1+%d credits from %s\n",
1149                        msg->ptlm_credits, libcfs_id2str(lpid));
1150                 return NULL;
1151         }
1152         
1153         cfs_write_lock_irqsave(g_lock, flags);
1154
1155         peer = kptllnd_id2peer_locked(lpid);
1156         if (peer != NULL) {
1157                 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
1158                         /* Completing HELLO handshake */
1159                         LASSERT(peer->peer_incarnation == 0);
1160
1161                         if (msg->ptlm_dststamp != 0 &&
1162                             msg->ptlm_dststamp != peer->peer_myincarnation) {
1163                                 cfs_write_unlock_irqrestore(g_lock, flags);
1164
1165                                 CERROR("Ignoring HELLO from %s: unexpected "
1166                                        "dststamp "LPX64" ("LPX64" wanted)\n",
1167                                        libcfs_id2str(lpid),
1168                                        msg->ptlm_dststamp,
1169                                        peer->peer_myincarnation);
1170                                 kptllnd_peer_decref(peer);
1171                                 return NULL;
1172                         }
1173                         
1174                         /* Concurrent initiation or response to my HELLO */
1175                         peer->peer_state = PEER_STATE_ACTIVE;
1176                         peer->peer_incarnation = msg->ptlm_srcstamp;
1177                         peer->peer_next_matchbits = safe_matchbits;
1178                         peer->peer_max_msg_size =
1179                                 msg->ptlm_u.hello.kptlhm_max_msg_size;
1180                         
1181                         cfs_write_unlock_irqrestore(g_lock, flags);
1182                         return peer;
1183                 }
1184
1185                 if (msg->ptlm_dststamp != 0 &&
1186                     msg->ptlm_dststamp <= peer->peer_myincarnation) {
1187                         cfs_write_unlock_irqrestore(g_lock, flags);
1188
1189                         CERROR("Ignoring stale HELLO from %s: "
1190                                "dststamp "LPX64" (current "LPX64")\n",
1191                                libcfs_id2str(lpid),
1192                                msg->ptlm_dststamp,
1193                                peer->peer_myincarnation);
1194                         kptllnd_peer_decref(peer);
1195                         return NULL;
1196                 }
1197
1198                 /* Brand new connection attempt: remove old incarnation */
1199                 kptllnd_peer_close_locked(peer, 0);
1200         }
1201
1202         kptllnd_cull_peertable_locked(lpid);
1203
1204         cfs_write_unlock_irqrestore(g_lock, flags);
1205
1206         if (peer != NULL) {
1207                 CDEBUG(D_NET, "Peer %s (%s) reconnecting:"
1208                        " stamp "LPX64"("LPX64")\n",
1209                        libcfs_id2str(lpid), kptllnd_ptlid2str(initiator),
1210                        msg->ptlm_srcstamp, peer->peer_incarnation);
1211
1212                 kptllnd_peer_decref(peer);
1213                 peer = NULL;
1214         }
1215
1216         hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1217         if (hello_tx == NULL) {
1218                 CERROR("Unable to allocate HELLO message for %s\n",
1219                        libcfs_id2str(lpid));
1220                 return NULL;
1221         }
1222
1223         kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1224                          lpid, sizeof(kptl_hello_msg_t));
1225
1226         new_peer = kptllnd_peer_allocate(net, lpid, initiator);
1227         if (new_peer == NULL) {
1228                 kptllnd_tx_decref(hello_tx);
1229                 return NULL;
1230         }
1231
1232         rc = kptllnd_peer_reserve_buffers();
1233         if (rc != 0) {
1234                 kptllnd_peer_decref(new_peer);
1235                 kptllnd_tx_decref(hello_tx);
1236
1237                 CERROR("Failed to reserve buffers for %s\n",
1238                        libcfs_id2str(lpid));
1239                 return NULL;
1240         }
1241
1242         cfs_write_lock_irqsave(g_lock, flags);
1243
1244  again:
1245         if (net->net_shutdown) {
1246                 cfs_write_unlock_irqrestore(g_lock, flags);
1247
1248                 CERROR ("Shutdown started, refusing connection from %s\n",
1249                         libcfs_id2str(lpid));
1250                 kptllnd_peer_unreserve_buffers();
1251                 kptllnd_peer_decref(new_peer);
1252                 kptllnd_tx_decref(hello_tx);
1253                 return NULL;
1254         }
1255
1256         peer = kptllnd_id2peer_locked(lpid);
1257         if (peer != NULL) {
1258                 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
1259                         /* An outgoing message instantiated 'peer' for me */
1260                         LASSERT(peer->peer_incarnation == 0);
1261
1262                         peer->peer_state = PEER_STATE_ACTIVE;
1263                         peer->peer_incarnation = msg->ptlm_srcstamp;
1264                         peer->peer_next_matchbits = safe_matchbits;
1265                         peer->peer_max_msg_size =
1266                                 msg->ptlm_u.hello.kptlhm_max_msg_size;
1267
1268                         cfs_write_unlock_irqrestore(g_lock, flags);
1269
1270                         CWARN("Outgoing instantiated peer %s\n",
1271                               libcfs_id2str(lpid));
1272                 } else {
1273                         LASSERT (peer->peer_state == PEER_STATE_ACTIVE);
1274
1275                         cfs_write_unlock_irqrestore(g_lock, flags);
1276
1277                         /* WOW!  Somehow this peer completed the HELLO
1278                          * handshake while I slept.  I guess I could have slept
1279                          * while it rebooted and sent a new HELLO, so I'll fail
1280                          * this one... */
1281                         CWARN("Wow! peer %s\n", libcfs_id2str(lpid));
1282                         kptllnd_peer_decref(peer);
1283                         peer = NULL;
1284                 }
1285
1286                 kptllnd_peer_unreserve_buffers();
1287                 kptllnd_peer_decref(new_peer);
1288                 kptllnd_tx_decref(hello_tx);
1289                 return peer;
1290         }
1291
1292         if (kptllnd_data.kptl_n_active_peers ==
1293             kptllnd_data.kptl_expected_peers) {
1294                 /* peer table full */
1295                 cfs_write_unlock_irqrestore(g_lock, flags);
1296
1297                 kptllnd_peertable_overflow_msg("Connection from ", lpid);
1298
1299                 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1300                 if (rc != 0) {
1301                         CERROR("Refusing connection from %s\n",
1302                                libcfs_id2str(lpid));
1303                         kptllnd_peer_unreserve_buffers();
1304                         kptllnd_peer_decref(new_peer);
1305                         kptllnd_tx_decref(hello_tx);
1306                         return NULL;
1307                 }
1308                 
1309                 cfs_write_lock_irqsave(g_lock, flags);
1310                 kptllnd_data.kptl_expected_peers++;
1311                 goto again;
1312         }
1313
1314         last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(lpid);
1315
1316         hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1317         hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1318                 *kptllnd_tunables.kptl_max_msg_size;
1319
1320         new_peer->peer_state = PEER_STATE_ACTIVE;
1321         new_peer->peer_incarnation = msg->ptlm_srcstamp;
1322         new_peer->peer_next_matchbits = safe_matchbits;
1323         new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1324         new_peer->peer_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size;
1325
1326         LASSERT (!net->net_shutdown);
1327         kptllnd_peer_add_peertable_locked(new_peer);
1328
1329         cfs_write_unlock_irqrestore(g_lock, flags);
1330
1331         /* NB someone else could get in now and post a message before I post
1332          * the HELLO, but post_tx/check_sends take care of that! */
1333
1334         CDEBUG(D_NETTRACE, "%s: post response hello %p\n",
1335                libcfs_id2str(new_peer->peer_id), hello_tx);
1336
1337         kptllnd_post_tx(new_peer, hello_tx, 0);
1338         kptllnd_peer_check_sends(new_peer);
1339
1340         return new_peer;
1341 }
1342
1343 void
1344 kptllnd_tx_launch(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
1345 {
1346         kptllnd_post_tx(peer, tx, nfrag);
1347         kptllnd_peer_check_sends(peer);
1348 }
1349
1350 int
1351 kptllnd_find_target(kptl_net_t *net, lnet_process_id_t target,
1352                     kptl_peer_t **peerp)
1353 {
1354         cfs_rwlock_t     *g_lock = &kptllnd_data.kptl_peer_rw_lock;
1355         ptl_process_id_t  ptl_id;
1356         kptl_peer_t      *new_peer;
1357         kptl_tx_t        *hello_tx;
1358         unsigned long     flags;
1359         int               rc;
1360         __u64             last_matchbits_seen;
1361
1362         /* I expect to find the peer, so I only take a read lock... */
1363         cfs_read_lock_irqsave(g_lock, flags);
1364         *peerp = kptllnd_id2peer_locked(target);
1365         cfs_read_unlock_irqrestore(g_lock, flags);
1366
1367         if (*peerp != NULL)
1368                 return 0;
1369
1370         if ((target.pid & LNET_PID_USERFLAG) != 0) {
1371                 CWARN("Refusing to create a new connection to %s "
1372                       "(non-kernel peer)\n", libcfs_id2str(target));
1373                 return -EHOSTUNREACH;
1374         }
1375
1376         /* The new peer is a kernel ptllnd, and kernel ptllnds all have the
1377          * same portals PID, which has nothing to do with LUSTRE_SRV_LNET_PID */
1378         ptl_id.nid = kptllnd_lnet2ptlnid(target.nid);
1379         ptl_id.pid = kptllnd_data.kptl_portals_id.pid;
1380
1381         hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1382         if (hello_tx == NULL) {
1383                 CERROR("Unable to allocate connect message for %s\n",
1384                        libcfs_id2str(target));
1385                 return -ENOMEM;
1386         }
1387
1388         hello_tx->tx_acked = 1;
1389         kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1390                          target, sizeof(kptl_hello_msg_t));
1391
1392         new_peer = kptllnd_peer_allocate(net, target, ptl_id);
1393         if (new_peer == NULL) {
1394                 rc = -ENOMEM;
1395                 goto unwind_0;
1396         }
1397
1398         rc = kptllnd_peer_reserve_buffers();
1399         if (rc != 0)
1400                 goto unwind_1;
1401
1402         cfs_write_lock_irqsave(g_lock, flags);
1403  again:
1404         /* Called only in lnd_send which can't happen after lnd_shutdown */
1405         LASSERT (!net->net_shutdown);
1406
1407         *peerp = kptllnd_id2peer_locked(target);
1408         if (*peerp != NULL) {
1409                 cfs_write_unlock_irqrestore(g_lock, flags);
1410                 goto unwind_2;
1411         }
1412
1413         kptllnd_cull_peertable_locked(target);
1414
1415         if (kptllnd_data.kptl_n_active_peers ==
1416             kptllnd_data.kptl_expected_peers) {
1417                 /* peer table full */
1418                 cfs_write_unlock_irqrestore(g_lock, flags);
1419
1420                 kptllnd_peertable_overflow_msg("Connection to ", target);
1421
1422                 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1423                 if (rc != 0) {
1424                         CERROR("Can't create connection to %s\n",
1425                                libcfs_id2str(target));
1426                         rc = -ENOMEM;
1427                         goto unwind_2;
1428                 }
1429                 cfs_write_lock_irqsave(g_lock, flags);
1430                 kptllnd_data.kptl_expected_peers++;
1431                 goto again;
1432         }
1433
1434         last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(target);
1435
1436         hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1437         hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1438                 *kptllnd_tunables.kptl_max_msg_size;
1439
1440         new_peer->peer_state = PEER_STATE_WAITING_HELLO;
1441         new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1442
1443         kptllnd_peer_add_peertable_locked(new_peer);
1444
1445         cfs_write_unlock_irqrestore(g_lock, flags);
1446
1447         /* NB someone else could get in now and post a message before I post
1448          * the HELLO, but post_tx/check_sends take care of that! */
1449
1450         CDEBUG(D_NETTRACE, "%s: post initial hello %p\n",
1451                libcfs_id2str(new_peer->peer_id), hello_tx);
1452
1453         kptllnd_post_tx(new_peer, hello_tx, 0);
1454         kptllnd_peer_check_sends(new_peer);
1455
1456         *peerp = new_peer;
1457         return 0;
1458
1459  unwind_2:
1460         kptllnd_peer_unreserve_buffers();
1461  unwind_1:
1462         kptllnd_peer_decref(new_peer);
1463  unwind_0:
1464         kptllnd_tx_decref(hello_tx);
1465
1466         return rc;
1467 }