Whamcloud - gitweb
b=17167 libcfs: ensure all libcfs exported symbols to have cfs_ prefix
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd_peer.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/klnds/ptllnd/ptllnd_peer.c
37  *
38  * Author: PJ Kirner <pjkirner@clusterfs.com>
39  * Author: E Barton <eeb@bartonsoftware.com>
40  */
41
42 #include "ptllnd.h"
43 #include <libcfs/list.h>
44
45 static int
46 kptllnd_count_queue(cfs_list_t *q)
47 {
48         cfs_list_t *e;
49         int         n = 0;
50
51         cfs_list_for_each(e, q) {
52                 n++;
53         }
54
55         return n;
56 }
57
58 int
59 kptllnd_get_peer_info(int index,
60                       lnet_process_id_t *id,
61                       int *state, int *sent_hello,
62                       int *refcount, __u64 *incarnation,
63                       __u64 *next_matchbits, __u64 *last_matchbits_seen,
64                       int *nsendq, int *nactiveq,
65                       int *credits, int *outstanding_credits)
66 {
67         cfs_rwlock_t     *g_lock = &kptllnd_data.kptl_peer_rw_lock;
68         unsigned long     flags;
69         cfs_list_t       *ptmp;
70         kptl_peer_t      *peer;
71         int               i;
72         int               rc = -ENOENT;
73
74         cfs_read_lock_irqsave(g_lock, flags);
75
76         for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++) {
77                 cfs_list_for_each (ptmp, &kptllnd_data.kptl_peers[i]) {
78                         peer = cfs_list_entry(ptmp, kptl_peer_t, peer_list);
79
80                         if (index-- > 0)
81                                 continue;
82
83                         *id          = peer->peer_id;
84                         *state       = peer->peer_state;
85                         *sent_hello  = peer->peer_sent_hello;
86                         *refcount    = cfs_atomic_read(&peer->peer_refcount);
87                         *incarnation = peer->peer_incarnation;
88
89                         cfs_spin_lock(&peer->peer_lock);
90
91                         *next_matchbits      = peer->peer_next_matchbits;
92                         *last_matchbits_seen = peer->peer_last_matchbits_seen;
93                         *credits             = peer->peer_credits;
94                         *outstanding_credits = peer->peer_outstanding_credits;
95
96                         *nsendq   = kptllnd_count_queue(&peer->peer_sendq);
97                         *nactiveq = kptllnd_count_queue(&peer->peer_activeq);
98
99                         cfs_spin_unlock(&peer->peer_lock);
100
101                         rc = 0;
102                         goto out;
103                 }
104         }
105
106  out:
107         cfs_read_unlock_irqrestore(g_lock, flags);
108         return rc;
109 }
110
111 void
112 kptllnd_peer_add_peertable_locked (kptl_peer_t *peer)
113 {
114         LASSERT (kptllnd_data.kptl_n_active_peers <
115                  kptllnd_data.kptl_expected_peers);
116
117         LASSERT (peer->peer_state == PEER_STATE_WAITING_HELLO ||
118                  peer->peer_state == PEER_STATE_ACTIVE);
119
120         kptllnd_data.kptl_n_active_peers++;
121         cfs_atomic_inc(&peer->peer_refcount);       /* +1 ref for the list */
122
123         /* NB add to HEAD of peer list for MRU order!
124          * (see kptllnd_cull_peertable) */
125         cfs_list_add(&peer->peer_list, kptllnd_nid2peerlist(peer->peer_id.nid));
126 }
127
128 void
129 kptllnd_cull_peertable_locked (lnet_process_id_t pid)
130 {
131         /* I'm about to add a new peer with this portals ID to the peer table,
132          * so (a) this peer should not exist already and (b) I want to leave at
133          * most (max_procs_per_nid - 1) peers with this NID in the table. */
134         cfs_list_t   *peers = kptllnd_nid2peerlist(pid.nid);
135         int           cull_count = *kptllnd_tunables.kptl_max_procs_per_node;
136         int           count;
137         cfs_list_t   *tmp;
138         cfs_list_t   *nxt;
139         kptl_peer_t  *peer;
140
141         count = 0;
142         cfs_list_for_each_safe (tmp, nxt, peers) {
143                 /* NB I rely on kptllnd_peer_add_peertable_locked to add peers
144                  * in MRU order */
145                 peer = cfs_list_entry(tmp, kptl_peer_t, peer_list);
146                         
147                 if (LNET_NIDADDR(peer->peer_id.nid) != LNET_NIDADDR(pid.nid))
148                         continue;
149
150                 LASSERT (peer->peer_id.pid != pid.pid);
151                         
152                 count++;
153
154                 if (count < cull_count) /* recent (don't cull) */
155                         continue;
156
157                 CDEBUG(D_NET, "Cull %s(%s)\n",
158                        libcfs_id2str(peer->peer_id),
159                        kptllnd_ptlid2str(peer->peer_ptlid));
160                 
161                 kptllnd_peer_close_locked(peer, 0);
162         }
163 }
164
165 kptl_peer_t *
166 kptllnd_peer_allocate (kptl_net_t *net, lnet_process_id_t lpid, ptl_process_id_t ppid)
167 {
168         unsigned long    flags;
169         kptl_peer_t     *peer;
170
171         LIBCFS_ALLOC(peer, sizeof (*peer));
172         if (peer == NULL) {
173                 CERROR("Can't create peer %s (%s)\n",
174                        libcfs_id2str(lpid), 
175                        kptllnd_ptlid2str(ppid));
176                 return NULL;
177         }
178
179         memset(peer, 0, sizeof(*peer));         /* zero flags etc */
180
181         CFS_INIT_LIST_HEAD (&peer->peer_noops);
182         CFS_INIT_LIST_HEAD (&peer->peer_sendq);
183         CFS_INIT_LIST_HEAD (&peer->peer_activeq);
184         cfs_spin_lock_init (&peer->peer_lock);
185
186         peer->peer_state = PEER_STATE_ALLOCATED;
187         peer->peer_error = 0;
188         peer->peer_last_alive = 0;
189         peer->peer_id = lpid;
190         peer->peer_ptlid = ppid;
191         peer->peer_credits = 1;                 /* enough for HELLO */
192         peer->peer_next_matchbits = PTL_RESERVED_MATCHBITS;
193         peer->peer_outstanding_credits = *kptllnd_tunables.kptl_peertxcredits - 1;
194         peer->peer_sent_credits = 1;           /* HELLO credit is implicit */
195         peer->peer_max_msg_size = PTLLND_MIN_BUFFER_SIZE; /* until we know better */
196
197         cfs_atomic_set(&peer->peer_refcount, 1);    /* 1 ref for caller */
198
199         cfs_write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
200
201         peer->peer_myincarnation = kptllnd_data.kptl_incarnation;
202
203         /* Only increase # peers under lock, to guarantee we dont grow it
204          * during shutdown */
205         if (net->net_shutdown) {
206                 cfs_write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
207                                             flags);
208                 LIBCFS_FREE(peer, sizeof(*peer));
209                 return NULL;
210         }
211
212         kptllnd_data.kptl_npeers++;
213         cfs_write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
214         return peer;
215 }
216
217 void
218 kptllnd_peer_destroy (kptl_peer_t *peer)
219 {
220         unsigned long flags;
221
222         CDEBUG(D_NET, "Peer=%p\n", peer);
223
224         LASSERT (!cfs_in_interrupt());
225         LASSERT (cfs_atomic_read(&peer->peer_refcount) == 0);
226         LASSERT (peer->peer_state == PEER_STATE_ALLOCATED ||
227                  peer->peer_state == PEER_STATE_ZOMBIE);
228         LASSERT (cfs_list_empty(&peer->peer_noops));
229         LASSERT (cfs_list_empty(&peer->peer_sendq));
230         LASSERT (cfs_list_empty(&peer->peer_activeq));
231
232         cfs_write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
233
234         if (peer->peer_state == PEER_STATE_ZOMBIE)
235                 cfs_list_del(&peer->peer_list);
236
237         kptllnd_data.kptl_npeers--;
238
239         cfs_write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
240
241         LIBCFS_FREE (peer, sizeof (*peer));
242 }
243
244 void
245 kptllnd_cancel_txlist (cfs_list_t *peerq, cfs_list_t *txs)
246 {
247         cfs_list_t  *tmp;
248         cfs_list_t  *nxt;
249         kptl_tx_t   *tx;
250
251         cfs_list_for_each_safe (tmp, nxt, peerq) {
252                 tx = cfs_list_entry(tmp, kptl_tx_t, tx_list);
253
254                 cfs_list_del(&tx->tx_list);
255                 cfs_list_add_tail(&tx->tx_list, txs);
256
257                 tx->tx_status = -EIO;
258                 tx->tx_active = 0;
259         }
260 }
261
262 void
263 kptllnd_peer_cancel_txs(kptl_peer_t *peer, cfs_list_t *txs)
264 {
265         unsigned long   flags;
266
267         cfs_spin_lock_irqsave(&peer->peer_lock, flags);
268
269         kptllnd_cancel_txlist(&peer->peer_noops, txs);
270         kptllnd_cancel_txlist(&peer->peer_sendq, txs);
271         kptllnd_cancel_txlist(&peer->peer_activeq, txs);
272                 
273         cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
274 }
275
276 void
277 kptllnd_peer_alive (kptl_peer_t *peer)
278 {
279         /* This is racy, but everyone's only writing cfs_time_current() */
280         peer->peer_last_alive = cfs_time_current();
281         cfs_mb();
282 }
283
284 void
285 kptllnd_peer_notify (kptl_peer_t *peer)
286 {
287         unsigned long flags;
288         kptl_net_t   *net;
289         kptl_net_t  **nets;
290         int           i = 0;
291         int           nnets = 0;
292         int           error = 0;
293         cfs_time_t    last_alive = 0;
294
295         cfs_spin_lock_irqsave(&peer->peer_lock, flags);
296
297         if (peer->peer_error != 0) {
298                 error = peer->peer_error;
299                 peer->peer_error = 0;
300                 last_alive = peer->peer_last_alive;
301         }
302
303         cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
304
305         if (error == 0)
306                 return;
307
308         cfs_read_lock(&kptllnd_data.kptl_net_rw_lock);
309         cfs_list_for_each_entry (net, &kptllnd_data.kptl_nets, net_list)
310                 nnets++;
311         cfs_read_unlock(&kptllnd_data.kptl_net_rw_lock);
312
313         if (nnets == 0) /* shutdown in progress */
314                 return;
315
316         LIBCFS_ALLOC(nets, nnets * sizeof(*nets));
317         if (nets == NULL) {
318                 CERROR("Failed to allocate nets[%d]\n", nnets);
319                 return;
320         }
321         memset(nets, 0, nnets * sizeof(*nets));
322
323         cfs_read_lock(&kptllnd_data.kptl_net_rw_lock);
324         i = 0;
325         cfs_list_for_each_entry (net, &kptllnd_data.kptl_nets, net_list) {
326                 LASSERT (i < nnets);
327                 nets[i] = net;
328                 kptllnd_net_addref(net);
329                 i++;
330         }
331         cfs_read_unlock(&kptllnd_data.kptl_net_rw_lock);
332
333         for (i = 0; i < nnets; i++) {
334                 lnet_nid_t peer_nid;
335
336                 net = nets[i];
337                 if (net == NULL)
338                         break;
339
340                 if (!net->net_shutdown) {
341                         peer_nid = kptllnd_ptl2lnetnid(net->net_ni->ni_nid,
342                                                        peer->peer_ptlid.nid);
343                         lnet_notify(net->net_ni, peer_nid, 0, last_alive);
344                 }
345
346                 kptllnd_net_decref(net);
347         }
348
349         LIBCFS_FREE(nets, nnets * sizeof(*nets));
350 }
351
352 void
353 kptllnd_handle_closing_peers ()
354 {
355         unsigned long           flags;
356         cfs_list_t              txs;
357         kptl_peer_t            *peer;
358         cfs_list_t             *tmp;
359         cfs_list_t             *nxt;
360         kptl_tx_t              *tx;
361         int                     idle;
362
363         /* Check with a read lock first to avoid blocking anyone */
364
365         cfs_read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
366         idle = cfs_list_empty(&kptllnd_data.kptl_closing_peers) &&
367                cfs_list_empty(&kptllnd_data.kptl_zombie_peers);
368         cfs_read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
369
370         if (idle)
371                 return;
372
373         CFS_INIT_LIST_HEAD(&txs);
374
375         cfs_write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
376
377         /* Cancel txs on all zombie peers.  NB anyone dropping the last peer
378          * ref removes it from this list, so I musn't drop the lock while
379          * scanning it. */
380         cfs_list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
381                 peer = cfs_list_entry (tmp, kptl_peer_t, peer_list);
382
383                 LASSERT (peer->peer_state == PEER_STATE_ZOMBIE);
384
385                 kptllnd_peer_cancel_txs(peer, &txs);
386         }
387
388         /* Notify LNET and cancel txs on closing (i.e. newly closed) peers.  NB
389          * I'm the only one removing from this list, but peers can be added on
390          * the end any time I drop the lock. */
391
392         cfs_list_for_each_safe (tmp, nxt, &kptllnd_data.kptl_closing_peers) {
393                 peer = cfs_list_entry (tmp, kptl_peer_t, peer_list);
394
395                 LASSERT (peer->peer_state == PEER_STATE_CLOSING);
396
397                 cfs_list_del(&peer->peer_list);
398                 cfs_list_add_tail(&peer->peer_list,
399                                   &kptllnd_data.kptl_zombie_peers);
400                 peer->peer_state = PEER_STATE_ZOMBIE;
401
402                 cfs_write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
403                                             flags);
404
405                 kptllnd_peer_notify(peer);
406                 kptllnd_peer_cancel_txs(peer, &txs);
407                 kptllnd_peer_decref(peer);
408
409                 cfs_write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
410         }
411
412         cfs_write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
413
414         /* Drop peer's ref on all cancelled txs.  This will get
415          * kptllnd_tx_fini() to abort outstanding comms if necessary. */
416
417         cfs_list_for_each_safe (tmp, nxt, &txs) {
418                 tx = cfs_list_entry(tmp, kptl_tx_t, tx_list);
419                 cfs_list_del(&tx->tx_list);
420                 kptllnd_tx_decref(tx);
421         }
422 }
423
424 void
425 kptllnd_peer_close_locked(kptl_peer_t *peer, int why)
426 {
427         switch (peer->peer_state) {
428         default:
429                 LBUG();
430
431         case PEER_STATE_WAITING_HELLO:
432         case PEER_STATE_ACTIVE:
433                 /* Ensure new peers see a new incarnation of me */
434                 LASSERT(peer->peer_myincarnation <= kptllnd_data.kptl_incarnation);
435                 if (peer->peer_myincarnation == kptllnd_data.kptl_incarnation)
436                         kptllnd_data.kptl_incarnation++;
437
438                 /* Removing from peer table */
439                 kptllnd_data.kptl_n_active_peers--;
440                 LASSERT (kptllnd_data.kptl_n_active_peers >= 0);
441
442                 cfs_list_del(&peer->peer_list);
443                 kptllnd_peer_unreserve_buffers();
444
445                 peer->peer_error = why; /* stash 'why' only on first close */
446                 peer->peer_state = PEER_STATE_CLOSING;
447
448                 /* Schedule for immediate attention, taking peer table's ref */
449                 cfs_list_add_tail(&peer->peer_list,
450                                  &kptllnd_data.kptl_closing_peers);
451                 cfs_waitq_signal(&kptllnd_data.kptl_watchdog_waitq);
452                 break;
453
454         case PEER_STATE_ZOMBIE:
455         case PEER_STATE_CLOSING:
456                 break;
457         }
458 }
459
460 void
461 kptllnd_peer_close(kptl_peer_t *peer, int why)
462 {
463         unsigned long      flags;
464
465         cfs_write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
466         kptllnd_peer_close_locked(peer, why);
467         cfs_write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
468 }
469
470 int
471 kptllnd_peer_del(lnet_process_id_t id)
472 {
473         cfs_list_t        *ptmp;
474         cfs_list_t        *pnxt;
475         kptl_peer_t       *peer;
476         int                lo;
477         int                hi;
478         int                i;
479         unsigned long      flags;
480         int                rc = -ENOENT;
481
482         /*
483          * Find the single bucket we are supposed to look at or if nid is a
484          * wildcard (LNET_NID_ANY) then look at all of the buckets
485          */
486         if (id.nid != LNET_NID_ANY) {
487                 cfs_list_t *l = kptllnd_nid2peerlist(id.nid);
488
489                 lo = hi =  l - kptllnd_data.kptl_peers;
490         } else {
491                 if (id.pid != LNET_PID_ANY)
492                         return -EINVAL;
493
494                 lo = 0;
495                 hi = kptllnd_data.kptl_peer_hash_size - 1;
496         }
497
498 again:
499         cfs_read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
500
501         for (i = lo; i <= hi; i++) {
502                 cfs_list_for_each_safe (ptmp, pnxt,
503                                         &kptllnd_data.kptl_peers[i]) {
504                         peer = cfs_list_entry (ptmp, kptl_peer_t, peer_list);
505
506                         if (!(id.nid == LNET_NID_ANY || 
507                               (LNET_NIDADDR(peer->peer_id.nid) == LNET_NIDADDR(id.nid) &&
508                                (id.pid == LNET_PID_ANY || 
509                                 peer->peer_id.pid == id.pid))))
510                                 continue;
511
512                         kptllnd_peer_addref(peer); /* 1 ref for me... */
513
514                         cfs_read_unlock_irqrestore(&kptllnd_data. \
515                                                    kptl_peer_rw_lock,
516                                                    flags);
517
518                         kptllnd_peer_close(peer, 0);
519                         kptllnd_peer_decref(peer); /* ...until here */
520
521                         rc = 0;         /* matched something */
522
523                         /* start again now I've dropped the lock */
524                         goto again;
525                 }
526         }
527
528         cfs_read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
529
530         return (rc);
531 }
532
533 void
534 kptllnd_queue_tx(kptl_peer_t *peer, kptl_tx_t *tx)
535 {
536         /* CAVEAT EMPTOR: I take over caller's ref on 'tx' */
537         unsigned long flags;
538
539         cfs_spin_lock_irqsave(&peer->peer_lock, flags);
540
541         /* Ensure HELLO is sent first */
542         if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_NOOP)
543                 cfs_list_add(&tx->tx_list, &peer->peer_noops);
544         else if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_HELLO)
545                 cfs_list_add(&tx->tx_list, &peer->peer_sendq);
546         else
547                 cfs_list_add_tail(&tx->tx_list, &peer->peer_sendq);
548
549         cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
550 }
551
552
553 void
554 kptllnd_post_tx(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
555 {
556         /* CAVEAT EMPTOR: I take over caller's ref on 'tx' */
557         ptl_handle_md_t  msg_mdh;
558         ptl_md_t         md;
559         ptl_err_t        prc;
560
561         LASSERT (!tx->tx_idle);
562         LASSERT (!tx->tx_active);
563         LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
564         LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
565         LASSERT (tx->tx_type == TX_TYPE_SMALL_MESSAGE ||
566                  tx->tx_type == TX_TYPE_PUT_REQUEST ||
567                  tx->tx_type == TX_TYPE_GET_REQUEST);
568
569         kptllnd_set_tx_peer(tx, peer);
570
571         memset(&md, 0, sizeof(md));
572
573         md.threshold = tx->tx_acked ? 2 : 1;    /* SEND END + ACK? */
574         md.options = PTL_MD_OP_PUT |
575                      PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
576                      PTL_MD_EVENT_START_DISABLE;
577         md.user_ptr = &tx->tx_msg_eventarg;
578         md.eq_handle = kptllnd_data.kptl_eqh;
579
580         if (nfrag == 0) {
581                 md.start = tx->tx_msg;
582                 md.length = tx->tx_msg->ptlm_nob;
583         } else {
584                 LASSERT (nfrag > 1);
585                 LASSERT (tx->tx_frags->iov[0].iov_base == (void *)tx->tx_msg);
586
587                 md.start = tx->tx_frags;
588                 md.length = nfrag;
589                 md.options |= PTL_MD_IOVEC;
590         }
591
592         prc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &msg_mdh);
593         if (prc != PTL_OK) {
594                 CERROR("PtlMDBind(%s) failed: %s(%d)\n",
595                        libcfs_id2str(peer->peer_id),
596                        kptllnd_errtype2str(prc), prc);
597                 tx->tx_status = -EIO;
598                 kptllnd_tx_decref(tx);
599                 return;
600         }
601
602
603         tx->tx_deadline = jiffies + (*kptllnd_tunables.kptl_timeout * CFS_HZ);
604         tx->tx_active = 1;
605         tx->tx_msg_mdh = msg_mdh;
606         kptllnd_queue_tx(peer, tx);
607 }
608
609 /* NB "restarts" comes from peer_sendq of a single peer */
610 void
611 kptllnd_restart_txs (kptl_net_t *net, lnet_process_id_t target,
612                      cfs_list_t *restarts)
613 {
614         kptl_tx_t   *tx;
615         kptl_tx_t   *tmp;
616         kptl_peer_t *peer;
617
618         LASSERT (!cfs_list_empty(restarts));
619
620         if (kptllnd_find_target(net, target, &peer) != 0)
621                 peer = NULL;
622
623         cfs_list_for_each_entry_safe (tx, tmp, restarts, tx_list) {
624                 LASSERT (tx->tx_peer != NULL);
625                 LASSERT (tx->tx_type == TX_TYPE_GET_REQUEST ||
626                          tx->tx_type == TX_TYPE_PUT_REQUEST ||
627                          tx->tx_type == TX_TYPE_SMALL_MESSAGE);
628
629                 cfs_list_del_init(&tx->tx_list);
630
631                 if (peer == NULL ||
632                     tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {
633                         kptllnd_tx_decref(tx);
634                         continue;
635                 }
636
637                 LASSERT (tx->tx_msg->ptlm_type != PTLLND_MSG_TYPE_NOOP);
638                 tx->tx_status = 0;
639                 tx->tx_active = 1;
640                 kptllnd_peer_decref(tx->tx_peer);
641                 tx->tx_peer = NULL;
642                 kptllnd_set_tx_peer(tx, peer);
643                 kptllnd_queue_tx(peer, tx); /* takes over my ref on tx */
644         }
645
646         if (peer == NULL)
647                 return;
648
649         kptllnd_peer_check_sends(peer);
650         kptllnd_peer_decref(peer);
651 }
652
653 static inline int
654 kptllnd_peer_send_noop (kptl_peer_t *peer)
655 {
656         if (!peer->peer_sent_hello ||
657             peer->peer_credits == 0 ||
658             !cfs_list_empty(&peer->peer_noops) ||
659             peer->peer_outstanding_credits < PTLLND_CREDIT_HIGHWATER)
660                 return 0;
661
662         /* No tx to piggyback NOOP onto or no credit to send a tx */
663         return (cfs_list_empty(&peer->peer_sendq) || peer->peer_credits == 1);
664 }
665
666 void
667 kptllnd_peer_check_sends (kptl_peer_t *peer)
668 {
669         ptl_handle_me_t  meh;
670         kptl_tx_t       *tx;
671         int              rc;
672         int              msg_type;
673         unsigned long    flags;
674
675         LASSERT(!cfs_in_interrupt());
676
677         cfs_spin_lock_irqsave(&peer->peer_lock, flags);
678
679         peer->peer_retry_noop = 0;
680
681         if (kptllnd_peer_send_noop(peer)) {
682                 /* post a NOOP to return credits */
683                 cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
684
685                 tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
686                 if (tx == NULL) {
687                         CERROR("Can't return credits to %s: can't allocate descriptor\n",
688                                libcfs_id2str(peer->peer_id));
689                 } else {
690                         kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_NOOP,
691                                          peer->peer_id, 0);
692                         kptllnd_post_tx(peer, tx, 0);
693                 }
694
695                 cfs_spin_lock_irqsave(&peer->peer_lock, flags);
696                 peer->peer_retry_noop = (tx == NULL);
697         }
698
699         for (;;) {
700                 if (!cfs_list_empty(&peer->peer_noops)) {
701                         LASSERT (peer->peer_sent_hello);
702                         tx = cfs_list_entry(peer->peer_noops.next,
703                                             kptl_tx_t, tx_list);
704                 } else if (!cfs_list_empty(&peer->peer_sendq)) {
705                         tx = cfs_list_entry(peer->peer_sendq.next,
706                                             kptl_tx_t, tx_list);
707                 } else {
708                         /* nothing to send right now */
709                         break;
710                 }
711
712                 LASSERT (tx->tx_active);
713                 LASSERT (!PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
714                 LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
715
716                 LASSERT (peer->peer_outstanding_credits >= 0);
717                 LASSERT (peer->peer_sent_credits >= 0);
718                 LASSERT (peer->peer_sent_credits +
719                          peer->peer_outstanding_credits <=
720                          *kptllnd_tunables.kptl_peertxcredits);
721                 LASSERT (peer->peer_credits >= 0);
722
723                 msg_type = tx->tx_msg->ptlm_type;
724
725                 /* Ensure HELLO is sent first */
726                 if (!peer->peer_sent_hello) {
727                         LASSERT (cfs_list_empty(&peer->peer_noops));
728                         if (msg_type != PTLLND_MSG_TYPE_HELLO)
729                                 break;
730                         peer->peer_sent_hello = 1;
731                 }
732
733                 if (peer->peer_credits == 0) {
734                         CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: no credits for %s[%p]\n",
735                                libcfs_id2str(peer->peer_id), 
736                                peer->peer_credits,
737                                peer->peer_outstanding_credits, 
738                                peer->peer_sent_credits, 
739                                kptllnd_msgtype2str(msg_type), tx);
740                         break;
741                 }
742
743                 /* Last/Initial credit reserved for NOOP/HELLO */
744                 if (peer->peer_credits == 1 &&
745                     msg_type != PTLLND_MSG_TYPE_HELLO &&
746                     msg_type != PTLLND_MSG_TYPE_NOOP) {
747                         CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: "
748                                "not using last credit for %s[%p]\n",
749                                libcfs_id2str(peer->peer_id), 
750                                peer->peer_credits,
751                                peer->peer_outstanding_credits,
752                                peer->peer_sent_credits,
753                                kptllnd_msgtype2str(msg_type), tx);
754                         break;
755                 }
756
757                 cfs_list_del(&tx->tx_list);
758
759                 /* Discard any NOOP I queued if I'm not at the high-water mark
760                  * any more or more messages have been queued */
761                 if (msg_type == PTLLND_MSG_TYPE_NOOP &&
762                     !kptllnd_peer_send_noop(peer)) {
763                         tx->tx_active = 0;
764
765                         cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
766
767                         CDEBUG(D_NET, "%s: redundant noop\n", 
768                                libcfs_id2str(peer->peer_id));
769                         kptllnd_tx_decref(tx);
770
771                         cfs_spin_lock_irqsave(&peer->peer_lock, flags);
772                         continue;
773                 }
774
775                 /* fill last-minute msg fields */
776                 kptllnd_msg_pack(tx->tx_msg, peer);
777
778                 if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
779                     tx->tx_type == TX_TYPE_GET_REQUEST) {
780                         /* peer_next_matchbits must be known good */
781                         LASSERT (peer->peer_state >= PEER_STATE_ACTIVE);
782                         /* Assume 64-bit matchbits can't wrap */
783                         LASSERT (peer->peer_next_matchbits >= PTL_RESERVED_MATCHBITS);
784                         tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits =
785                                 peer->peer_next_matchbits++;
786                 }
787
788                 peer->peer_sent_credits += peer->peer_outstanding_credits;
789                 peer->peer_outstanding_credits = 0;
790                 peer->peer_credits--;
791
792                 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: %s tx=%p nob=%d cred=%d\n",
793                        libcfs_id2str(peer->peer_id), peer->peer_credits,
794                        peer->peer_outstanding_credits, peer->peer_sent_credits,
795                        kptllnd_msgtype2str(msg_type), tx, tx->tx_msg->ptlm_nob,
796                        tx->tx_msg->ptlm_credits);
797
798                 cfs_list_add_tail(&tx->tx_list, &peer->peer_activeq);
799
800                 kptllnd_tx_addref(tx);          /* 1 ref for me... */
801
802                 cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
803
804                 if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
805                     tx->tx_type == TX_TYPE_GET_REQUEST) {
806                         /* Post bulk now we have safe matchbits */
807                         rc = PtlMEAttach(kptllnd_data.kptl_nih,
808                                          *kptllnd_tunables.kptl_portal,
809                                          peer->peer_ptlid,
810                                          tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits,
811                                          0,             /* ignore bits */
812                                          PTL_UNLINK,
813                                          PTL_INS_BEFORE,
814                                          &meh);
815                         if (rc != PTL_OK) {
816                                 CERROR("PtlMEAttach(%s) failed: %s(%d)\n",
817                                        libcfs_id2str(peer->peer_id),
818                                        kptllnd_errtype2str(rc), rc);
819                                 goto failed;
820                         }
821
822                         rc = PtlMDAttach(meh, tx->tx_rdma_md, PTL_UNLINK,
823                                          &tx->tx_rdma_mdh);
824                         if (rc != PTL_OK) {
825                                 CERROR("PtlMDAttach(%s) failed: %s(%d)\n",
826                                        libcfs_id2str(tx->tx_peer->peer_id),
827                                        kptllnd_errtype2str(rc), rc);
828                                 rc = PtlMEUnlink(meh);
829                                 LASSERT(rc == PTL_OK);
830                                 tx->tx_rdma_mdh = PTL_INVALID_HANDLE;
831                                 goto failed;
832                         }
833                         /* I'm not racing with the event callback here.  It's a
834                          * bug if there's an event on the MD I just attached
835                          * before I actually send the RDMA request message -
836                          * probably matchbits re-used in error. */
837                 }
838
839                 tx->tx_tposted = jiffies;       /* going on the wire */
840
841                 rc = PtlPut (tx->tx_msg_mdh,
842                              tx->tx_acked ? PTL_ACK_REQ : PTL_NOACK_REQ,
843                              peer->peer_ptlid,
844                              *kptllnd_tunables.kptl_portal,
845                              0,                 /* acl cookie */
846                              LNET_MSG_MATCHBITS,
847                              0,                 /* offset */
848                              0);                /* header data */
849                 if (rc != PTL_OK) {
850                         CERROR("PtlPut %s error %s(%d)\n",
851                                libcfs_id2str(peer->peer_id),
852                                kptllnd_errtype2str(rc), rc);
853                         goto failed;
854                 }
855
856                 kptllnd_tx_decref(tx);          /* drop my ref */
857
858                 cfs_spin_lock_irqsave(&peer->peer_lock, flags);
859         }
860
861         cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
862         return;
863
864  failed:
865         /* Nuke everything (including tx we were trying) */
866         kptllnd_peer_close(peer, -EIO);
867         kptllnd_tx_decref(tx);
868         kptllnd_schedule_ptltrace_dump();
869 }
870
871 kptl_tx_t *
872 kptllnd_find_timed_out_tx(kptl_peer_t *peer)
873 {
874         kptl_tx_t         *tx;
875         cfs_list_t        *ele;
876
877         cfs_list_for_each(ele, &peer->peer_sendq) {
878                 tx = cfs_list_entry(ele, kptl_tx_t, tx_list);
879
880                 if (cfs_time_aftereq(jiffies, tx->tx_deadline)) {
881                         kptllnd_tx_addref(tx);
882                         return tx;
883                 }
884         }
885
886         cfs_list_for_each(ele, &peer->peer_activeq) {
887                 tx = cfs_list_entry(ele, kptl_tx_t, tx_list);
888
889                 if (cfs_time_aftereq(jiffies, tx->tx_deadline)) {
890                         kptllnd_tx_addref(tx);
891                         return tx;
892                 }
893         }
894
895         return NULL;
896 }
897
898
899 void
900 kptllnd_peer_check_bucket (int idx, int stamp)
901 {
902         cfs_list_t        *peers = &kptllnd_data.kptl_peers[idx];
903         kptl_peer_t       *peer;
904         unsigned long      flags;
905
906         CDEBUG(D_NET, "Bucket=%d, stamp=%d\n", idx, stamp);
907
908  again:
909         /* NB. Shared lock while I just look */
910         cfs_read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
911
912         cfs_list_for_each_entry (peer, peers, peer_list) {
913                 kptl_tx_t *tx;
914                 int        check_sends;
915                 int        c = -1, oc = -1, sc = -1;
916                 int        nsend = -1, nactive = -1;
917                 int        sent_hello = -1, state = -1;
918
919                 CDEBUG(D_NET, "Peer=%s Credits=%d Outstanding=%d Send=%d\n",
920                        libcfs_id2str(peer->peer_id), peer->peer_credits, 
921                        peer->peer_outstanding_credits, peer->peer_sent_credits);
922
923                 cfs_spin_lock(&peer->peer_lock);
924
925                 if (peer->peer_check_stamp == stamp) {
926                         /* checked already this pass */
927                         cfs_spin_unlock(&peer->peer_lock);
928                         continue;
929                 }
930
931                 peer->peer_check_stamp = stamp;
932                 tx = kptllnd_find_timed_out_tx(peer);
933                 check_sends = peer->peer_retry_noop;
934
935                 if (tx != NULL) {
936                         c  = peer->peer_credits;
937                         sc = peer->peer_sent_credits;
938                         oc = peer->peer_outstanding_credits;
939                         state      = peer->peer_state;
940                         sent_hello = peer->peer_sent_hello;
941                         nsend   = kptllnd_count_queue(&peer->peer_sendq);
942                         nactive = kptllnd_count_queue(&peer->peer_activeq);
943                 }
944
945                 cfs_spin_unlock(&peer->peer_lock);
946
947                 if (tx == NULL && !check_sends)
948                         continue;
949
950                 kptllnd_peer_addref(peer); /* 1 ref for me... */
951
952                 cfs_read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
953                                            flags);
954
955                 if (tx == NULL) { /* nothing timed out */
956                         kptllnd_peer_check_sends(peer);
957                         kptllnd_peer_decref(peer); /* ...until here or... */
958
959                         /* rescan after dropping the lock */
960                         goto again;
961                 }
962
963                 LCONSOLE_ERROR_MSG(0x126, "Timing out %s: %s\n",
964                                    libcfs_id2str(peer->peer_id),
965                                    (tx->tx_tposted == 0) ?
966                                    "no free peer buffers" :
967                                    "please check Portals");
968
969                 if (tx->tx_tposted) {
970                         CERROR("Could not send to %s after %ds (sent %lds ago); "
971                                 "check Portals for possible issues\n",
972                                 libcfs_id2str(peer->peer_id),
973                                 *kptllnd_tunables.kptl_timeout,
974                                 cfs_duration_sec(jiffies - tx->tx_tposted));
975                 } else if (state < PEER_STATE_ACTIVE) {
976                         CERROR("Could not connect %s (%d) after %ds; "
977                                "peer might be down\n",
978                                libcfs_id2str(peer->peer_id), state,
979                                *kptllnd_tunables.kptl_timeout);
980                 } else {
981                         CERROR("Could not get credits for %s after %ds; "
982                                 "possible Lustre networking issues\n",
983                         libcfs_id2str(peer->peer_id),
984                         *kptllnd_tunables.kptl_timeout);
985                 }
986
987                 CERROR("%s timed out: cred %d outstanding %d, sent %d, "
988                        "state %d, sent_hello %d, sendq %d, activeq %d "
989                        "Tx %p %s %s (%s%s%s) status %d %sposted %lu T/O %ds\n",
990                        libcfs_id2str(peer->peer_id), c, oc, sc,
991                        state, sent_hello, nsend, nactive,
992                        tx, kptllnd_tx_typestr(tx->tx_type),
993                        kptllnd_msgtype2str(tx->tx_msg->ptlm_type),
994                        tx->tx_active ? "A" : "",
995                        PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ?
996                        "" : "M",
997                        PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE) ?
998                        "" : "D",
999                        tx->tx_status,
1000                        (tx->tx_tposted == 0) ? "not " : "",
1001                        (tx->tx_tposted == 0) ? 0UL : (jiffies - tx->tx_tposted),
1002                        *kptllnd_tunables.kptl_timeout);
1003
1004 #ifdef CRAY_XT3
1005                 if (*kptllnd_tunables.kptl_ptltrace_on_timeout)
1006                         kptllnd_dump_ptltrace();
1007 #endif
1008
1009                 kptllnd_tx_decref(tx);
1010
1011                 kptllnd_peer_close(peer, -ETIMEDOUT);
1012                 kptllnd_peer_decref(peer); /* ...until here */
1013
1014                 /* start again now I've dropped the lock */
1015                 goto again;
1016         }
1017
1018         cfs_read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
1019 }
1020
1021 kptl_peer_t *
1022 kptllnd_id2peer_locked (lnet_process_id_t id)
1023 {
1024         cfs_list_t       *peers = kptllnd_nid2peerlist(id.nid);
1025         cfs_list_t       *tmp;
1026         kptl_peer_t      *peer;
1027
1028         cfs_list_for_each (tmp, peers) {
1029                 peer = cfs_list_entry (tmp, kptl_peer_t, peer_list);
1030
1031                 LASSERT(peer->peer_state == PEER_STATE_WAITING_HELLO ||
1032                         peer->peer_state == PEER_STATE_ACTIVE);
1033
1034                 /* NB logical LNet peers share one kptl_peer_t */
1035                 if (peer->peer_id.pid != id.pid ||
1036                     LNET_NIDADDR(id.nid) != LNET_NIDADDR(peer->peer_id.nid))
1037                         continue;
1038
1039                 kptllnd_peer_addref(peer);
1040
1041                 CDEBUG(D_NET, "%s -> %s (%d)\n",
1042                        libcfs_id2str(id),
1043                        kptllnd_ptlid2str(peer->peer_ptlid),
1044                        cfs_atomic_read (&peer->peer_refcount));
1045                 return peer;
1046         }
1047
1048         return NULL;
1049 }
1050
1051 void
1052 kptllnd_peertable_overflow_msg(char *str, lnet_process_id_t id)
1053 {
1054         LCONSOLE_ERROR_MSG(0x127, "%s %s overflows the peer table[%d]: "
1055                            "messages may be dropped\n",
1056                            str, libcfs_id2str(id),
1057                            kptllnd_data.kptl_n_active_peers);
1058         LCONSOLE_ERROR_MSG(0x128, "Please correct by increasing "
1059                            "'max_nodes' or 'max_procs_per_node'\n");
1060 }
1061
1062 __u64
1063 kptllnd_get_last_seen_matchbits_locked(lnet_process_id_t lpid)
1064 {
1065         kptl_peer_t  *peer;
1066         cfs_list_t   *tmp;
1067
1068         /* Find the last matchbits I saw this new peer using.  Note..
1069            A. This peer cannot be in the peer table - she's new!
1070            B. If I can't find the peer in the closing/zombie peers, all
1071               matchbits are safe because all refs to the (old) peer have gone
1072               so all txs have completed so there's no risk of matchbit
1073               collision!
1074          */
1075
1076         LASSERT(kptllnd_id2peer_locked(lpid) == NULL);
1077
1078         /* peer's last matchbits can't change after it comes out of the peer
1079          * table, so first match is fine */
1080
1081         cfs_list_for_each (tmp, &kptllnd_data.kptl_closing_peers) {
1082                 peer = cfs_list_entry (tmp, kptl_peer_t, peer_list);
1083
1084                 if (LNET_NIDADDR(peer->peer_id.nid) == LNET_NIDADDR(lpid.nid) &&
1085                     peer->peer_id.pid == lpid.pid)
1086                         return peer->peer_last_matchbits_seen;
1087         }
1088
1089         cfs_list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
1090                 peer = cfs_list_entry (tmp, kptl_peer_t, peer_list);
1091
1092                 if (LNET_NIDADDR(peer->peer_id.nid) == LNET_NIDADDR(lpid.nid) &&
1093                     peer->peer_id.pid == lpid.pid)
1094                         return peer->peer_last_matchbits_seen;
1095         }
1096
1097         return PTL_RESERVED_MATCHBITS;
1098 }
1099
1100 kptl_peer_t *
1101 kptllnd_peer_handle_hello (kptl_net_t *net,
1102                            ptl_process_id_t initiator, kptl_msg_t *msg)
1103 {
1104         cfs_rwlock_t       *g_lock = &kptllnd_data.kptl_peer_rw_lock;
1105         kptl_peer_t        *peer;
1106         kptl_peer_t        *new_peer;
1107         lnet_process_id_t   lpid;
1108         unsigned long       flags;
1109         kptl_tx_t          *hello_tx;
1110         int                 rc;
1111         __u64               safe_matchbits;
1112         __u64               last_matchbits_seen;
1113
1114         lpid.nid = msg->ptlm_srcnid;
1115         lpid.pid = msg->ptlm_srcpid;
1116
1117         CDEBUG(D_NET, "hello from %s(%s)\n",
1118                libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
1119
1120         if (initiator.pid != kptllnd_data.kptl_portals_id.pid &&
1121             (msg->ptlm_srcpid & LNET_PID_USERFLAG) == 0) {
1122                 /* If the peer's PID isn't _the_ ptllnd kernel pid, she must be
1123                  * userspace.  Refuse the connection if she hasn't set the
1124                  * correct flag in her PID... */
1125                 CERROR("Userflag not set in hello from %s (%s)\n",
1126                        libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
1127                 return NULL;
1128         }
1129         
1130         /* kptlhm_matchbits are the highest matchbits my peer may have used to
1131          * RDMA to me.  I ensure I never register buffers for RDMA that could
1132          * match any she used */
1133         safe_matchbits = msg->ptlm_u.hello.kptlhm_matchbits + 1;
1134
1135         if (safe_matchbits < PTL_RESERVED_MATCHBITS) {
1136                 CERROR("Illegal matchbits "LPX64" in HELLO from %s\n",
1137                        safe_matchbits, libcfs_id2str(lpid));
1138                 return NULL;
1139         }
1140         
1141         if (msg->ptlm_u.hello.kptlhm_max_msg_size < PTLLND_MIN_BUFFER_SIZE) {
1142                 CERROR("%s: max message size %d < MIN %d",
1143                        libcfs_id2str(lpid),
1144                        msg->ptlm_u.hello.kptlhm_max_msg_size,
1145                        PTLLND_MIN_BUFFER_SIZE);
1146                 return NULL;
1147         }
1148
1149         if (msg->ptlm_credits <= 1) {
1150                 CERROR("Need more than 1+%d credits from %s\n",
1151                        msg->ptlm_credits, libcfs_id2str(lpid));
1152                 return NULL;
1153         }
1154         
1155         cfs_write_lock_irqsave(g_lock, flags);
1156
1157         peer = kptllnd_id2peer_locked(lpid);
1158         if (peer != NULL) {
1159                 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
1160                         /* Completing HELLO handshake */
1161                         LASSERT(peer->peer_incarnation == 0);
1162
1163                         if (msg->ptlm_dststamp != 0 &&
1164                             msg->ptlm_dststamp != peer->peer_myincarnation) {
1165                                 cfs_write_unlock_irqrestore(g_lock, flags);
1166
1167                                 CERROR("Ignoring HELLO from %s: unexpected "
1168                                        "dststamp "LPX64" ("LPX64" wanted)\n",
1169                                        libcfs_id2str(lpid),
1170                                        msg->ptlm_dststamp,
1171                                        peer->peer_myincarnation);
1172                                 kptllnd_peer_decref(peer);
1173                                 return NULL;
1174                         }
1175                         
1176                         /* Concurrent initiation or response to my HELLO */
1177                         peer->peer_state = PEER_STATE_ACTIVE;
1178                         peer->peer_incarnation = msg->ptlm_srcstamp;
1179                         peer->peer_next_matchbits = safe_matchbits;
1180                         peer->peer_max_msg_size =
1181                                 msg->ptlm_u.hello.kptlhm_max_msg_size;
1182                         
1183                         cfs_write_unlock_irqrestore(g_lock, flags);
1184                         return peer;
1185                 }
1186
1187                 if (msg->ptlm_dststamp != 0 &&
1188                     msg->ptlm_dststamp <= peer->peer_myincarnation) {
1189                         cfs_write_unlock_irqrestore(g_lock, flags);
1190
1191                         CERROR("Ignoring stale HELLO from %s: "
1192                                "dststamp "LPX64" (current "LPX64")\n",
1193                                libcfs_id2str(lpid),
1194                                msg->ptlm_dststamp,
1195                                peer->peer_myincarnation);
1196                         kptllnd_peer_decref(peer);
1197                         return NULL;
1198                 }
1199
1200                 /* Brand new connection attempt: remove old incarnation */
1201                 kptllnd_peer_close_locked(peer, 0);
1202         }
1203
1204         kptllnd_cull_peertable_locked(lpid);
1205
1206         cfs_write_unlock_irqrestore(g_lock, flags);
1207
1208         if (peer != NULL) {
1209                 CDEBUG(D_NET, "Peer %s (%s) reconnecting:"
1210                        " stamp "LPX64"("LPX64")\n",
1211                        libcfs_id2str(lpid), kptllnd_ptlid2str(initiator),
1212                        msg->ptlm_srcstamp, peer->peer_incarnation);
1213
1214                 kptllnd_peer_decref(peer);
1215                 peer = NULL;
1216         }
1217
1218         hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1219         if (hello_tx == NULL) {
1220                 CERROR("Unable to allocate HELLO message for %s\n",
1221                        libcfs_id2str(lpid));
1222                 return NULL;
1223         }
1224
1225         kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1226                          lpid, sizeof(kptl_hello_msg_t));
1227
1228         new_peer = kptllnd_peer_allocate(net, lpid, initiator);
1229         if (new_peer == NULL) {
1230                 kptllnd_tx_decref(hello_tx);
1231                 return NULL;
1232         }
1233
1234         rc = kptllnd_peer_reserve_buffers();
1235         if (rc != 0) {
1236                 kptllnd_peer_decref(new_peer);
1237                 kptllnd_tx_decref(hello_tx);
1238
1239                 CERROR("Failed to reserve buffers for %s\n",
1240                        libcfs_id2str(lpid));
1241                 return NULL;
1242         }
1243
1244         cfs_write_lock_irqsave(g_lock, flags);
1245
1246  again:
1247         if (net->net_shutdown) {
1248                 cfs_write_unlock_irqrestore(g_lock, flags);
1249
1250                 CERROR ("Shutdown started, refusing connection from %s\n",
1251                         libcfs_id2str(lpid));
1252                 kptllnd_peer_unreserve_buffers();
1253                 kptllnd_peer_decref(new_peer);
1254                 kptllnd_tx_decref(hello_tx);
1255                 return NULL;
1256         }
1257
1258         peer = kptllnd_id2peer_locked(lpid);
1259         if (peer != NULL) {
1260                 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
1261                         /* An outgoing message instantiated 'peer' for me */
1262                         LASSERT(peer->peer_incarnation == 0);
1263
1264                         peer->peer_state = PEER_STATE_ACTIVE;
1265                         peer->peer_incarnation = msg->ptlm_srcstamp;
1266                         peer->peer_next_matchbits = safe_matchbits;
1267                         peer->peer_max_msg_size =
1268                                 msg->ptlm_u.hello.kptlhm_max_msg_size;
1269
1270                         cfs_write_unlock_irqrestore(g_lock, flags);
1271
1272                         CWARN("Outgoing instantiated peer %s\n",
1273                               libcfs_id2str(lpid));
1274                 } else {
1275                         LASSERT (peer->peer_state == PEER_STATE_ACTIVE);
1276
1277                         cfs_write_unlock_irqrestore(g_lock, flags);
1278
1279                         /* WOW!  Somehow this peer completed the HELLO
1280                          * handshake while I slept.  I guess I could have slept
1281                          * while it rebooted and sent a new HELLO, so I'll fail
1282                          * this one... */
1283                         CWARN("Wow! peer %s\n", libcfs_id2str(lpid));
1284                         kptllnd_peer_decref(peer);
1285                         peer = NULL;
1286                 }
1287
1288                 kptllnd_peer_unreserve_buffers();
1289                 kptllnd_peer_decref(new_peer);
1290                 kptllnd_tx_decref(hello_tx);
1291                 return peer;
1292         }
1293
1294         if (kptllnd_data.kptl_n_active_peers ==
1295             kptllnd_data.kptl_expected_peers) {
1296                 /* peer table full */
1297                 cfs_write_unlock_irqrestore(g_lock, flags);
1298
1299                 kptllnd_peertable_overflow_msg("Connection from ", lpid);
1300
1301                 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1302                 if (rc != 0) {
1303                         CERROR("Refusing connection from %s\n",
1304                                libcfs_id2str(lpid));
1305                         kptllnd_peer_unreserve_buffers();
1306                         kptllnd_peer_decref(new_peer);
1307                         kptllnd_tx_decref(hello_tx);
1308                         return NULL;
1309                 }
1310                 
1311                 cfs_write_lock_irqsave(g_lock, flags);
1312                 kptllnd_data.kptl_expected_peers++;
1313                 goto again;
1314         }
1315
1316         last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(lpid);
1317
1318         hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1319         hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1320                 *kptllnd_tunables.kptl_max_msg_size;
1321
1322         new_peer->peer_state = PEER_STATE_ACTIVE;
1323         new_peer->peer_incarnation = msg->ptlm_srcstamp;
1324         new_peer->peer_next_matchbits = safe_matchbits;
1325         new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1326         new_peer->peer_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size;
1327
1328         LASSERT (!net->net_shutdown);
1329         kptllnd_peer_add_peertable_locked(new_peer);
1330
1331         cfs_write_unlock_irqrestore(g_lock, flags);
1332
1333         /* NB someone else could get in now and post a message before I post
1334          * the HELLO, but post_tx/check_sends take care of that! */
1335
1336         CDEBUG(D_NETTRACE, "%s: post response hello %p\n",
1337                libcfs_id2str(new_peer->peer_id), hello_tx);
1338
1339         kptllnd_post_tx(new_peer, hello_tx, 0);
1340         kptllnd_peer_check_sends(new_peer);
1341
1342         return new_peer;
1343 }
1344
1345 void
1346 kptllnd_tx_launch(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
1347 {
1348         kptllnd_post_tx(peer, tx, nfrag);
1349         kptllnd_peer_check_sends(peer);
1350 }
1351
1352 int
1353 kptllnd_find_target(kptl_net_t *net, lnet_process_id_t target,
1354                     kptl_peer_t **peerp)
1355 {
1356         cfs_rwlock_t     *g_lock = &kptllnd_data.kptl_peer_rw_lock;
1357         ptl_process_id_t  ptl_id;
1358         kptl_peer_t      *new_peer;
1359         kptl_tx_t        *hello_tx;
1360         unsigned long     flags;
1361         int               rc;
1362         __u64             last_matchbits_seen;
1363
1364         /* I expect to find the peer, so I only take a read lock... */
1365         cfs_read_lock_irqsave(g_lock, flags);
1366         *peerp = kptllnd_id2peer_locked(target);
1367         cfs_read_unlock_irqrestore(g_lock, flags);
1368
1369         if (*peerp != NULL)
1370                 return 0;
1371
1372         if ((target.pid & LNET_PID_USERFLAG) != 0) {
1373                 CWARN("Refusing to create a new connection to %s "
1374                       "(non-kernel peer)\n", libcfs_id2str(target));
1375                 return -EHOSTUNREACH;
1376         }
1377
1378         /* The new peer is a kernel ptllnd, and kernel ptllnds all have the
1379          * same portals PID, which has nothing to do with LUSTRE_SRV_LNET_PID */
1380         ptl_id.nid = kptllnd_lnet2ptlnid(target.nid);
1381         ptl_id.pid = kptllnd_data.kptl_portals_id.pid;
1382
1383         hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1384         if (hello_tx == NULL) {
1385                 CERROR("Unable to allocate connect message for %s\n",
1386                        libcfs_id2str(target));
1387                 return -ENOMEM;
1388         }
1389
1390         hello_tx->tx_acked = 1;
1391         kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1392                          target, sizeof(kptl_hello_msg_t));
1393
1394         new_peer = kptllnd_peer_allocate(net, target, ptl_id);
1395         if (new_peer == NULL) {
1396                 rc = -ENOMEM;
1397                 goto unwind_0;
1398         }
1399
1400         rc = kptllnd_peer_reserve_buffers();
1401         if (rc != 0)
1402                 goto unwind_1;
1403
1404         cfs_write_lock_irqsave(g_lock, flags);
1405  again:
1406         /* Called only in lnd_send which can't happen after lnd_shutdown */
1407         LASSERT (!net->net_shutdown);
1408
1409         *peerp = kptllnd_id2peer_locked(target);
1410         if (*peerp != NULL) {
1411                 cfs_write_unlock_irqrestore(g_lock, flags);
1412                 goto unwind_2;
1413         }
1414
1415         kptllnd_cull_peertable_locked(target);
1416
1417         if (kptllnd_data.kptl_n_active_peers ==
1418             kptllnd_data.kptl_expected_peers) {
1419                 /* peer table full */
1420                 cfs_write_unlock_irqrestore(g_lock, flags);
1421
1422                 kptllnd_peertable_overflow_msg("Connection to ", target);
1423
1424                 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1425                 if (rc != 0) {
1426                         CERROR("Can't create connection to %s\n",
1427                                libcfs_id2str(target));
1428                         rc = -ENOMEM;
1429                         goto unwind_2;
1430                 }
1431                 cfs_write_lock_irqsave(g_lock, flags);
1432                 kptllnd_data.kptl_expected_peers++;
1433                 goto again;
1434         }
1435
1436         last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(target);
1437
1438         hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1439         hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1440                 *kptllnd_tunables.kptl_max_msg_size;
1441                 
1442         new_peer->peer_state = PEER_STATE_WAITING_HELLO;
1443         new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1444         
1445         kptllnd_peer_add_peertable_locked(new_peer);
1446
1447         cfs_write_unlock_irqrestore(g_lock, flags);
1448
1449         /* NB someone else could get in now and post a message before I post
1450          * the HELLO, but post_tx/check_sends take care of that! */
1451
1452         CDEBUG(D_NETTRACE, "%s: post initial hello %p\n",
1453                libcfs_id2str(new_peer->peer_id), hello_tx);
1454
1455         kptllnd_post_tx(new_peer, hello_tx, 0);
1456         kptllnd_peer_check_sends(new_peer);
1457        
1458         *peerp = new_peer;
1459         return 0;
1460         
1461  unwind_2:
1462         kptllnd_peer_unreserve_buffers();
1463  unwind_1:
1464         kptllnd_peer_decref(new_peer);
1465  unwind_0:
1466         kptllnd_tx_decref(hello_tx);
1467
1468         return rc;
1469 }