Whamcloud - gitweb
LU-1346 libcfs: cleanup waitq related primitives
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd_peer.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/klnds/ptllnd/ptllnd_peer.c
37  *
38  * Author: PJ Kirner <pjkirner@clusterfs.com>
39  * Author: E Barton <eeb@bartonsoftware.com>
40  */
41
42 #include "ptllnd.h"
43 #include <libcfs/list.h>
44
45 static int
46 kptllnd_count_queue(cfs_list_t *q)
47 {
48         cfs_list_t *e;
49         int         n = 0;
50
51         cfs_list_for_each(e, q) {
52                 n++;
53         }
54
55         return n;
56 }
57
58 int
59 kptllnd_get_peer_info(int index,
60                       lnet_process_id_t *id,
61                       int *state, int *sent_hello,
62                       int *refcount, __u64 *incarnation,
63                       __u64 *next_matchbits, __u64 *last_matchbits_seen,
64                       int *nsendq, int *nactiveq,
65                       int *credits, int *outstanding_credits)
66 {
67         rwlock_t     *g_lock = &kptllnd_data.kptl_peer_rw_lock;
68         unsigned long     flags;
69         cfs_list_t       *ptmp;
70         kptl_peer_t      *peer;
71         int               i;
72         int               rc = -ENOENT;
73
74         read_lock_irqsave(g_lock, flags);
75
76         for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++) {
77                 cfs_list_for_each (ptmp, &kptllnd_data.kptl_peers[i]) {
78                         peer = cfs_list_entry(ptmp, kptl_peer_t, peer_list);
79
80                         if (index-- > 0)
81                                 continue;
82
83                         *id          = peer->peer_id;
84                         *state       = peer->peer_state;
85                         *sent_hello  = peer->peer_sent_hello;
86                         *refcount    = cfs_atomic_read(&peer->peer_refcount);
87                         *incarnation = peer->peer_incarnation;
88
89                         spin_lock(&peer->peer_lock);
90
91                         *next_matchbits      = peer->peer_next_matchbits;
92                         *last_matchbits_seen = peer->peer_last_matchbits_seen;
93                         *credits             = peer->peer_credits;
94                         *outstanding_credits = peer->peer_outstanding_credits;
95
96                         *nsendq   = kptllnd_count_queue(&peer->peer_sendq);
97                         *nactiveq = kptllnd_count_queue(&peer->peer_activeq);
98
99                         spin_unlock(&peer->peer_lock);
100
101                         rc = 0;
102                         goto out;
103                 }
104         }
105
106  out:
107         read_unlock_irqrestore(g_lock, flags);
108         return rc;
109 }
110
111 void
112 kptllnd_peer_add_peertable_locked (kptl_peer_t *peer)
113 {
114         LASSERT (kptllnd_data.kptl_n_active_peers <
115                  kptllnd_data.kptl_expected_peers);
116
117         LASSERT (peer->peer_state == PEER_STATE_WAITING_HELLO ||
118                  peer->peer_state == PEER_STATE_ACTIVE);
119
120         kptllnd_data.kptl_n_active_peers++;
121         cfs_atomic_inc(&peer->peer_refcount);       /* +1 ref for the list */
122
123         /* NB add to HEAD of peer list for MRU order!
124          * (see kptllnd_cull_peertable) */
125         cfs_list_add(&peer->peer_list, kptllnd_nid2peerlist(peer->peer_id.nid));
126 }
127
128 void
129 kptllnd_cull_peertable_locked (lnet_process_id_t pid)
130 {
131         /* I'm about to add a new peer with this portals ID to the peer table,
132          * so (a) this peer should not exist already and (b) I want to leave at
133          * most (max_procs_per_nid - 1) peers with this NID in the table. */
134         cfs_list_t   *peers = kptllnd_nid2peerlist(pid.nid);
135         int           cull_count = *kptllnd_tunables.kptl_max_procs_per_node;
136         int           count;
137         cfs_list_t   *tmp;
138         cfs_list_t   *nxt;
139         kptl_peer_t  *peer;
140
141         count = 0;
142         cfs_list_for_each_safe (tmp, nxt, peers) {
143                 /* NB I rely on kptllnd_peer_add_peertable_locked to add peers
144                  * in MRU order */
145                 peer = cfs_list_entry(tmp, kptl_peer_t, peer_list);
146                         
147                 if (LNET_NIDADDR(peer->peer_id.nid) != LNET_NIDADDR(pid.nid))
148                         continue;
149
150                 LASSERT (peer->peer_id.pid != pid.pid);
151                         
152                 count++;
153
154                 if (count < cull_count) /* recent (don't cull) */
155                         continue;
156
157                 CDEBUG(D_NET, "Cull %s(%s)\n",
158                        libcfs_id2str(peer->peer_id),
159                        kptllnd_ptlid2str(peer->peer_ptlid));
160                 
161                 kptllnd_peer_close_locked(peer, 0);
162         }
163 }
164
165 kptl_peer_t *
166 kptllnd_peer_allocate (kptl_net_t *net, lnet_process_id_t lpid, ptl_process_id_t ppid)
167 {
168         unsigned long    flags;
169         kptl_peer_t     *peer;
170
171         LIBCFS_ALLOC(peer, sizeof (*peer));
172         if (peer == NULL) {
173                 CERROR("Can't create peer %s (%s)\n",
174                        libcfs_id2str(lpid), 
175                        kptllnd_ptlid2str(ppid));
176                 return NULL;
177         }
178
179         memset(peer, 0, sizeof(*peer));         /* zero flags etc */
180
181         CFS_INIT_LIST_HEAD (&peer->peer_noops);
182         CFS_INIT_LIST_HEAD (&peer->peer_sendq);
183         CFS_INIT_LIST_HEAD (&peer->peer_activeq);
184         spin_lock_init(&peer->peer_lock);
185
186         peer->peer_state = PEER_STATE_ALLOCATED;
187         peer->peer_error = 0;
188         peer->peer_last_alive = 0;
189         peer->peer_id = lpid;
190         peer->peer_ptlid = ppid;
191         peer->peer_credits = 1;                 /* enough for HELLO */
192         peer->peer_next_matchbits = PTL_RESERVED_MATCHBITS;
193         peer->peer_outstanding_credits = *kptllnd_tunables.kptl_peertxcredits - 1;
194         peer->peer_sent_credits = 1;           /* HELLO credit is implicit */
195         peer->peer_max_msg_size = PTLLND_MIN_BUFFER_SIZE; /* until we know better */
196
197         cfs_atomic_set(&peer->peer_refcount, 1);    /* 1 ref for caller */
198
199         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
200
201         peer->peer_myincarnation = kptllnd_data.kptl_incarnation;
202
203         /* Only increase # peers under lock, to guarantee we dont grow it
204          * during shutdown */
205         if (net->net_shutdown) {
206                 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
207                                             flags);
208                 LIBCFS_FREE(peer, sizeof(*peer));
209                 return NULL;
210         }
211
212         kptllnd_data.kptl_npeers++;
213         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
214         return peer;
215 }
216
217 void
218 kptllnd_peer_destroy (kptl_peer_t *peer)
219 {
220         unsigned long flags;
221
222         CDEBUG(D_NET, "Peer=%p\n", peer);
223
224         LASSERT (!cfs_in_interrupt());
225         LASSERT (cfs_atomic_read(&peer->peer_refcount) == 0);
226         LASSERT (peer->peer_state == PEER_STATE_ALLOCATED ||
227                  peer->peer_state == PEER_STATE_ZOMBIE);
228         LASSERT (cfs_list_empty(&peer->peer_noops));
229         LASSERT (cfs_list_empty(&peer->peer_sendq));
230         LASSERT (cfs_list_empty(&peer->peer_activeq));
231
232         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
233
234         if (peer->peer_state == PEER_STATE_ZOMBIE)
235                 cfs_list_del(&peer->peer_list);
236
237         kptllnd_data.kptl_npeers--;
238
239         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
240
241         LIBCFS_FREE (peer, sizeof (*peer));
242 }
243
244 void
245 kptllnd_cancel_txlist (cfs_list_t *peerq, cfs_list_t *txs)
246 {
247         cfs_list_t  *tmp;
248         cfs_list_t  *nxt;
249         kptl_tx_t   *tx;
250
251         cfs_list_for_each_safe (tmp, nxt, peerq) {
252                 tx = cfs_list_entry(tmp, kptl_tx_t, tx_list);
253
254                 cfs_list_del(&tx->tx_list);
255                 cfs_list_add_tail(&tx->tx_list, txs);
256
257                 tx->tx_status = -EIO;
258                 tx->tx_active = 0;
259         }
260 }
261
262 void
263 kptllnd_peer_cancel_txs(kptl_peer_t *peer, cfs_list_t *txs)
264 {
265         unsigned long   flags;
266
267         spin_lock_irqsave(&peer->peer_lock, flags);
268
269         kptllnd_cancel_txlist(&peer->peer_noops, txs);
270         kptllnd_cancel_txlist(&peer->peer_sendq, txs);
271         kptllnd_cancel_txlist(&peer->peer_activeq, txs);
272                 
273         spin_unlock_irqrestore(&peer->peer_lock, flags);
274 }
275
276 void
277 kptllnd_peer_alive (kptl_peer_t *peer)
278 {
279         /* This is racy, but everyone's only writing cfs_time_current() */
280         peer->peer_last_alive = cfs_time_current();
281         cfs_mb();
282 }
283
284 void
285 kptllnd_peer_notify (kptl_peer_t *peer)
286 {
287         unsigned long flags;
288         kptl_net_t   *net;
289         kptl_net_t  **nets;
290         int           i = 0;
291         int           nnets = 0;
292         int           error = 0;
293         cfs_time_t    last_alive = 0;
294
295         spin_lock_irqsave(&peer->peer_lock, flags);
296
297         if (peer->peer_error != 0) {
298                 error = peer->peer_error;
299                 peer->peer_error = 0;
300                 last_alive = peer->peer_last_alive;
301         }
302
303         spin_unlock_irqrestore(&peer->peer_lock, flags);
304
305         if (error == 0)
306                 return;
307
308         read_lock(&kptllnd_data.kptl_net_rw_lock);
309         cfs_list_for_each_entry (net, &kptllnd_data.kptl_nets, net_list)
310                 nnets++;
311         read_unlock(&kptllnd_data.kptl_net_rw_lock);
312
313         if (nnets == 0) /* shutdown in progress */
314                 return;
315
316         LIBCFS_ALLOC(nets, nnets * sizeof(*nets));
317         if (nets == NULL) {
318                 CERROR("Failed to allocate nets[%d]\n", nnets);
319                 return;
320         }
321         memset(nets, 0, nnets * sizeof(*nets));
322
323         read_lock(&kptllnd_data.kptl_net_rw_lock);
324         i = 0;
325         cfs_list_for_each_entry (net, &kptllnd_data.kptl_nets, net_list) {
326                 LASSERT (i < nnets);
327                 nets[i] = net;
328                 kptllnd_net_addref(net);
329                 i++;
330         }
331         read_unlock(&kptllnd_data.kptl_net_rw_lock);
332
333         for (i = 0; i < nnets; i++) {
334                 lnet_nid_t peer_nid;
335
336                 net = nets[i];
337                 if (net == NULL)
338                         break;
339
340                 if (!net->net_shutdown) {
341                         peer_nid = kptllnd_ptl2lnetnid(net->net_ni->ni_nid,
342                                                        peer->peer_ptlid.nid);
343                         lnet_notify(net->net_ni, peer_nid, 0, last_alive);
344                 }
345
346                 kptllnd_net_decref(net);
347         }
348
349         LIBCFS_FREE(nets, nnets * sizeof(*nets));
350 }
351
352 void
353 kptllnd_handle_closing_peers ()
354 {
355         unsigned long           flags;
356         cfs_list_t              txs;
357         kptl_peer_t            *peer;
358         cfs_list_t             *tmp;
359         cfs_list_t             *nxt;
360         kptl_tx_t              *tx;
361         int                     idle;
362
363         /* Check with a read lock first to avoid blocking anyone */
364
365         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
366         idle = cfs_list_empty(&kptllnd_data.kptl_closing_peers) &&
367                cfs_list_empty(&kptllnd_data.kptl_zombie_peers);
368         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
369
370         if (idle)
371                 return;
372
373         CFS_INIT_LIST_HEAD(&txs);
374
375         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
376
377         /* Cancel txs on all zombie peers.  NB anyone dropping the last peer
378          * ref removes it from this list, so I musn't drop the lock while
379          * scanning it. */
380         cfs_list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
381                 peer = cfs_list_entry (tmp, kptl_peer_t, peer_list);
382
383                 LASSERT (peer->peer_state == PEER_STATE_ZOMBIE);
384
385                 kptllnd_peer_cancel_txs(peer, &txs);
386         }
387
388         /* Notify LNET and cancel txs on closing (i.e. newly closed) peers.  NB
389          * I'm the only one removing from this list, but peers can be added on
390          * the end any time I drop the lock. */
391
392         cfs_list_for_each_safe (tmp, nxt, &kptllnd_data.kptl_closing_peers) {
393                 peer = cfs_list_entry (tmp, kptl_peer_t, peer_list);
394
395                 LASSERT (peer->peer_state == PEER_STATE_CLOSING);
396
397                 cfs_list_del(&peer->peer_list);
398                 cfs_list_add_tail(&peer->peer_list,
399                                   &kptllnd_data.kptl_zombie_peers);
400                 peer->peer_state = PEER_STATE_ZOMBIE;
401
402                 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
403                                             flags);
404
405                 kptllnd_peer_notify(peer);
406                 kptllnd_peer_cancel_txs(peer, &txs);
407                 kptllnd_peer_decref(peer);
408
409                 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
410         }
411
412         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
413
414         /* Drop peer's ref on all cancelled txs.  This will get
415          * kptllnd_tx_fini() to abort outstanding comms if necessary. */
416
417         cfs_list_for_each_safe (tmp, nxt, &txs) {
418                 tx = cfs_list_entry(tmp, kptl_tx_t, tx_list);
419                 cfs_list_del(&tx->tx_list);
420                 kptllnd_tx_decref(tx);
421         }
422 }
423
424 void
425 kptllnd_peer_close_locked(kptl_peer_t *peer, int why)
426 {
427         switch (peer->peer_state) {
428         default:
429                 LBUG();
430
431         case PEER_STATE_WAITING_HELLO:
432         case PEER_STATE_ACTIVE:
433                 /* Ensure new peers see a new incarnation of me */
434                 LASSERT(peer->peer_myincarnation <= kptllnd_data.kptl_incarnation);
435                 if (peer->peer_myincarnation == kptllnd_data.kptl_incarnation)
436                         kptllnd_data.kptl_incarnation++;
437
438                 /* Removing from peer table */
439                 kptllnd_data.kptl_n_active_peers--;
440                 LASSERT (kptllnd_data.kptl_n_active_peers >= 0);
441
442                 cfs_list_del(&peer->peer_list);
443                 kptllnd_peer_unreserve_buffers();
444
445                 peer->peer_error = why; /* stash 'why' only on first close */
446                 peer->peer_state = PEER_STATE_CLOSING;
447
448                 /* Schedule for immediate attention, taking peer table's ref */
449                 cfs_list_add_tail(&peer->peer_list,
450                                  &kptllnd_data.kptl_closing_peers);
451                 wake_up(&kptllnd_data.kptl_watchdog_waitq);
452                 break;
453
454         case PEER_STATE_ZOMBIE:
455         case PEER_STATE_CLOSING:
456                 break;
457         }
458 }
459
460 void
461 kptllnd_peer_close(kptl_peer_t *peer, int why)
462 {
463         unsigned long      flags;
464
465         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
466         kptllnd_peer_close_locked(peer, why);
467         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
468 }
469
470 int
471 kptllnd_peer_del(lnet_process_id_t id)
472 {
473         cfs_list_t        *ptmp;
474         cfs_list_t        *pnxt;
475         kptl_peer_t       *peer;
476         int                lo;
477         int                hi;
478         int                i;
479         unsigned long      flags;
480         int                rc = -ENOENT;
481
482         /*
483          * Find the single bucket we are supposed to look at or if nid is a
484          * wildcard (LNET_NID_ANY) then look at all of the buckets
485          */
486         if (id.nid != LNET_NID_ANY) {
487                 cfs_list_t *l = kptllnd_nid2peerlist(id.nid);
488
489                 lo = hi =  l - kptllnd_data.kptl_peers;
490         } else {
491                 if (id.pid != LNET_PID_ANY)
492                         return -EINVAL;
493
494                 lo = 0;
495                 hi = kptllnd_data.kptl_peer_hash_size - 1;
496         }
497
498 again:
499         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
500
501         for (i = lo; i <= hi; i++) {
502                 cfs_list_for_each_safe (ptmp, pnxt,
503                                         &kptllnd_data.kptl_peers[i]) {
504                         peer = cfs_list_entry (ptmp, kptl_peer_t, peer_list);
505
506                         if (!(id.nid == LNET_NID_ANY || 
507                               (LNET_NIDADDR(peer->peer_id.nid) == LNET_NIDADDR(id.nid) &&
508                                (id.pid == LNET_PID_ANY || 
509                                 peer->peer_id.pid == id.pid))))
510                                 continue;
511
512                         kptllnd_peer_addref(peer); /* 1 ref for me... */
513
514                         read_unlock_irqrestore(&kptllnd_data. \
515                                                    kptl_peer_rw_lock,
516                                                    flags);
517
518                         kptllnd_peer_close(peer, 0);
519                         kptllnd_peer_decref(peer); /* ...until here */
520
521                         rc = 0;         /* matched something */
522
523                         /* start again now I've dropped the lock */
524                         goto again;
525                 }
526         }
527
528         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
529
530         return (rc);
531 }
532
533 void
534 kptllnd_queue_tx(kptl_peer_t *peer, kptl_tx_t *tx)
535 {
536         /* CAVEAT EMPTOR: I take over caller's ref on 'tx' */
537         unsigned long flags;
538
539         spin_lock_irqsave(&peer->peer_lock, flags);
540
541         /* Ensure HELLO is sent first */
542         if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_NOOP)
543                 cfs_list_add(&tx->tx_list, &peer->peer_noops);
544         else if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_HELLO)
545                 cfs_list_add(&tx->tx_list, &peer->peer_sendq);
546         else
547                 cfs_list_add_tail(&tx->tx_list, &peer->peer_sendq);
548
549         spin_unlock_irqrestore(&peer->peer_lock, flags);
550 }
551
552
553 void
554 kptllnd_post_tx(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
555 {
556         /* CAVEAT EMPTOR: I take over caller's ref on 'tx' */
557         ptl_handle_md_t  msg_mdh;
558         ptl_md_t         md;
559         ptl_err_t        prc;
560
561         LASSERT (!tx->tx_idle);
562         LASSERT (!tx->tx_active);
563         LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
564         LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
565         LASSERT (tx->tx_type == TX_TYPE_SMALL_MESSAGE ||
566                  tx->tx_type == TX_TYPE_PUT_REQUEST ||
567                  tx->tx_type == TX_TYPE_GET_REQUEST);
568
569         kptllnd_set_tx_peer(tx, peer);
570
571         memset(&md, 0, sizeof(md));
572
573         md.threshold = tx->tx_acked ? 2 : 1;    /* SEND END + ACK? */
574         md.options = PTL_MD_OP_PUT |
575                      PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
576                      PTL_MD_EVENT_START_DISABLE;
577         md.user_ptr = &tx->tx_msg_eventarg;
578         md.eq_handle = kptllnd_data.kptl_eqh;
579
580         if (nfrag == 0) {
581                 md.start = tx->tx_msg;
582                 md.length = tx->tx_msg->ptlm_nob;
583         } else {
584                 LASSERT (nfrag > 1);
585                 LASSERT (tx->tx_frags->iov[0].iov_base == (void *)tx->tx_msg);
586
587                 md.start = tx->tx_frags;
588                 md.length = nfrag;
589                 md.options |= PTL_MD_IOVEC;
590         }
591
592         prc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &msg_mdh);
593         if (prc != PTL_OK) {
594                 CERROR("PtlMDBind(%s) failed: %s(%d)\n",
595                        libcfs_id2str(peer->peer_id),
596                        kptllnd_errtype2str(prc), prc);
597                 tx->tx_status = -EIO;
598                 kptllnd_tx_decref(tx);
599                 return;
600         }
601
602
603         tx->tx_deadline = jiffies + (*kptllnd_tunables.kptl_timeout * HZ);
604         tx->tx_active = 1;
605         tx->tx_msg_mdh = msg_mdh;
606         kptllnd_queue_tx(peer, tx);
607 }
608
609 /* NB "restarts" comes from peer_sendq of a single peer */
610 void
611 kptllnd_restart_txs (kptl_net_t *net, lnet_process_id_t target,
612                      cfs_list_t *restarts)
613 {
614         kptl_tx_t   *tx;
615         kptl_tx_t   *tmp;
616         kptl_peer_t *peer;
617
618         LASSERT (!cfs_list_empty(restarts));
619
620         if (kptllnd_find_target(net, target, &peer) != 0)
621                 peer = NULL;
622
623         cfs_list_for_each_entry_safe (tx, tmp, restarts, tx_list) {
624                 LASSERT (tx->tx_peer != NULL);
625                 LASSERT (tx->tx_type == TX_TYPE_GET_REQUEST ||
626                          tx->tx_type == TX_TYPE_PUT_REQUEST ||
627                          tx->tx_type == TX_TYPE_SMALL_MESSAGE);
628
629                 cfs_list_del_init(&tx->tx_list);
630
631                 if (peer == NULL ||
632                     tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {
633                         kptllnd_tx_decref(tx);
634                         continue;
635                 }
636
637                 LASSERT (tx->tx_msg->ptlm_type != PTLLND_MSG_TYPE_NOOP);
638                 tx->tx_status = 0;
639                 tx->tx_active = 1;
640                 kptllnd_peer_decref(tx->tx_peer);
641                 tx->tx_peer = NULL;
642                 kptllnd_set_tx_peer(tx, peer);
643                 kptllnd_queue_tx(peer, tx); /* takes over my ref on tx */
644         }
645
646         if (peer == NULL)
647                 return;
648
649         kptllnd_peer_check_sends(peer);
650         kptllnd_peer_decref(peer);
651 }
652
653 static inline int
654 kptllnd_peer_send_noop (kptl_peer_t *peer)
655 {
656         if (!peer->peer_sent_hello ||
657             peer->peer_credits == 0 ||
658             !cfs_list_empty(&peer->peer_noops) ||
659             peer->peer_outstanding_credits < PTLLND_CREDIT_HIGHWATER)
660                 return 0;
661
662         /* No tx to piggyback NOOP onto or no credit to send a tx */
663         return (cfs_list_empty(&peer->peer_sendq) || peer->peer_credits == 1);
664 }
665
666 void
667 kptllnd_peer_check_sends (kptl_peer_t *peer)
668 {
669         ptl_handle_me_t  meh;
670         kptl_tx_t       *tx;
671         int              rc;
672         int              msg_type;
673         unsigned long    flags;
674
675         LASSERT(!cfs_in_interrupt());
676
677         spin_lock_irqsave(&peer->peer_lock, flags);
678
679         peer->peer_retry_noop = 0;
680
681         if (kptllnd_peer_send_noop(peer)) {
682                 /* post a NOOP to return credits */
683                 spin_unlock_irqrestore(&peer->peer_lock, flags);
684
685                 tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
686                 if (tx == NULL) {
687                         CERROR("Can't return credits to %s: can't allocate descriptor\n",
688                                libcfs_id2str(peer->peer_id));
689                 } else {
690                         kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_NOOP,
691                                          peer->peer_id, 0);
692                         kptllnd_post_tx(peer, tx, 0);
693                 }
694
695                 spin_lock_irqsave(&peer->peer_lock, flags);
696                 peer->peer_retry_noop = (tx == NULL);
697         }
698
699         for (;;) {
700                 if (!cfs_list_empty(&peer->peer_noops)) {
701                         LASSERT (peer->peer_sent_hello);
702                         tx = cfs_list_entry(peer->peer_noops.next,
703                                             kptl_tx_t, tx_list);
704                 } else if (!cfs_list_empty(&peer->peer_sendq)) {
705                         tx = cfs_list_entry(peer->peer_sendq.next,
706                                             kptl_tx_t, tx_list);
707                 } else {
708                         /* nothing to send right now */
709                         break;
710                 }
711
712                 LASSERT (tx->tx_active);
713                 LASSERT (!PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
714                 LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
715
716                 LASSERT (peer->peer_outstanding_credits >= 0);
717                 LASSERT (peer->peer_sent_credits >= 0);
718                 LASSERT (peer->peer_sent_credits +
719                          peer->peer_outstanding_credits <=
720                          *kptllnd_tunables.kptl_peertxcredits);
721                 LASSERT (peer->peer_credits >= 0);
722
723                 msg_type = tx->tx_msg->ptlm_type;
724
725                 /* Ensure HELLO is sent first */
726                 if (!peer->peer_sent_hello) {
727                         LASSERT (cfs_list_empty(&peer->peer_noops));
728                         if (msg_type != PTLLND_MSG_TYPE_HELLO)
729                                 break;
730                         peer->peer_sent_hello = 1;
731                 }
732
733                 if (peer->peer_credits == 0) {
734                         CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: no credits for %s[%p]\n",
735                                libcfs_id2str(peer->peer_id), 
736                                peer->peer_credits,
737                                peer->peer_outstanding_credits, 
738                                peer->peer_sent_credits, 
739                                kptllnd_msgtype2str(msg_type), tx);
740                         break;
741                 }
742
743                 /* Last/Initial credit reserved for NOOP/HELLO */
744                 if (peer->peer_credits == 1 &&
745                     msg_type != PTLLND_MSG_TYPE_HELLO &&
746                     msg_type != PTLLND_MSG_TYPE_NOOP) {
747                         CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: "
748                                "not using last credit for %s[%p]\n",
749                                libcfs_id2str(peer->peer_id), 
750                                peer->peer_credits,
751                                peer->peer_outstanding_credits,
752                                peer->peer_sent_credits,
753                                kptllnd_msgtype2str(msg_type), tx);
754                         break;
755                 }
756
757                 cfs_list_del(&tx->tx_list);
758
759                 /* Discard any NOOP I queued if I'm not at the high-water mark
760                  * any more or more messages have been queued */
761                 if (msg_type == PTLLND_MSG_TYPE_NOOP &&
762                     !kptllnd_peer_send_noop(peer)) {
763                         tx->tx_active = 0;
764
765                         spin_unlock_irqrestore(&peer->peer_lock, flags);
766
767                         CDEBUG(D_NET, "%s: redundant noop\n", 
768                                libcfs_id2str(peer->peer_id));
769                         kptllnd_tx_decref(tx);
770
771                         spin_lock_irqsave(&peer->peer_lock, flags);
772                         continue;
773                 }
774
775                 /* fill last-minute msg fields */
776                 kptllnd_msg_pack(tx->tx_msg, peer);
777
778                 if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
779                     tx->tx_type == TX_TYPE_GET_REQUEST) {
780                         /* peer_next_matchbits must be known good */
781                         LASSERT (peer->peer_state >= PEER_STATE_ACTIVE);
782                         /* Assume 64-bit matchbits can't wrap */
783                         LASSERT (peer->peer_next_matchbits >= PTL_RESERVED_MATCHBITS);
784                         tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits =
785                                 peer->peer_next_matchbits++;
786                 }
787
788                 peer->peer_sent_credits += peer->peer_outstanding_credits;
789                 peer->peer_outstanding_credits = 0;
790                 peer->peer_credits--;
791
792                 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: %s tx=%p nob=%d cred=%d\n",
793                        libcfs_id2str(peer->peer_id), peer->peer_credits,
794                        peer->peer_outstanding_credits, peer->peer_sent_credits,
795                        kptllnd_msgtype2str(msg_type), tx, tx->tx_msg->ptlm_nob,
796                        tx->tx_msg->ptlm_credits);
797
798                 cfs_list_add_tail(&tx->tx_list, &peer->peer_activeq);
799
800                 kptllnd_tx_addref(tx);          /* 1 ref for me... */
801
802                 spin_unlock_irqrestore(&peer->peer_lock, flags);
803
804                 if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
805                     tx->tx_type == TX_TYPE_GET_REQUEST) {
806                         /* Post bulk now we have safe matchbits */
807                         rc = PtlMEAttach(kptllnd_data.kptl_nih,
808                                          *kptllnd_tunables.kptl_portal,
809                                          peer->peer_ptlid,
810                                          tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits,
811                                          0,             /* ignore bits */
812                                          PTL_UNLINK,
813                                          PTL_INS_BEFORE,
814                                          &meh);
815                         if (rc != PTL_OK) {
816                                 CERROR("PtlMEAttach(%s) failed: %s(%d)\n",
817                                        libcfs_id2str(peer->peer_id),
818                                        kptllnd_errtype2str(rc), rc);
819                                 goto failed;
820                         }
821
822                         rc = PtlMDAttach(meh, tx->tx_rdma_md, PTL_UNLINK,
823                                          &tx->tx_rdma_mdh);
824                         if (rc != PTL_OK) {
825                                 CERROR("PtlMDAttach(%s) failed: %s(%d)\n",
826                                        libcfs_id2str(tx->tx_peer->peer_id),
827                                        kptllnd_errtype2str(rc), rc);
828                                 rc = PtlMEUnlink(meh);
829                                 LASSERT(rc == PTL_OK);
830                                 tx->tx_rdma_mdh = PTL_INVALID_HANDLE;
831                                 goto failed;
832                         }
833                         /* I'm not racing with the event callback here.  It's a
834                          * bug if there's an event on the MD I just attached
835                          * before I actually send the RDMA request message -
836                          * probably matchbits re-used in error. */
837                 }
838
839                 tx->tx_tposted = jiffies;       /* going on the wire */
840
841                 rc = PtlPut (tx->tx_msg_mdh,
842                              tx->tx_acked ? PTL_ACK_REQ : PTL_NOACK_REQ,
843                              peer->peer_ptlid,
844                              *kptllnd_tunables.kptl_portal,
845                              0,                 /* acl cookie */
846                              LNET_MSG_MATCHBITS,
847                              0,                 /* offset */
848                              0);                /* header data */
849                 if (rc != PTL_OK) {
850                         CERROR("PtlPut %s error %s(%d)\n",
851                                libcfs_id2str(peer->peer_id),
852                                kptllnd_errtype2str(rc), rc);
853                         goto failed;
854                 }
855
856                 kptllnd_tx_decref(tx);          /* drop my ref */
857
858                 spin_lock_irqsave(&peer->peer_lock, flags);
859         }
860
861         spin_unlock_irqrestore(&peer->peer_lock, flags);
862         return;
863
864  failed:
865         /* Nuke everything (including tx we were trying) */
866         kptllnd_peer_close(peer, -EIO);
867         kptllnd_tx_decref(tx);
868 }
869
870 kptl_tx_t *
871 kptllnd_find_timed_out_tx(kptl_peer_t *peer)
872 {
873         kptl_tx_t         *tx;
874         cfs_list_t        *ele;
875
876         cfs_list_for_each(ele, &peer->peer_sendq) {
877                 tx = cfs_list_entry(ele, kptl_tx_t, tx_list);
878
879                 if (cfs_time_aftereq(jiffies, tx->tx_deadline)) {
880                         kptllnd_tx_addref(tx);
881                         return tx;
882                 }
883         }
884
885         cfs_list_for_each(ele, &peer->peer_activeq) {
886                 tx = cfs_list_entry(ele, kptl_tx_t, tx_list);
887
888                 if (cfs_time_aftereq(jiffies, tx->tx_deadline)) {
889                         kptllnd_tx_addref(tx);
890                         return tx;
891                 }
892         }
893
894         return NULL;
895 }
896
897
898 void
899 kptllnd_peer_check_bucket (int idx, int stamp)
900 {
901         cfs_list_t        *peers = &kptllnd_data.kptl_peers[idx];
902         kptl_peer_t       *peer;
903         unsigned long      flags;
904
905         CDEBUG(D_NET, "Bucket=%d, stamp=%d\n", idx, stamp);
906
907  again:
908         /* NB. Shared lock while I just look */
909         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
910
911         cfs_list_for_each_entry (peer, peers, peer_list) {
912                 kptl_tx_t *tx;
913                 int        check_sends;
914                 int        c = -1, oc = -1, sc = -1;
915                 int        nsend = -1, nactive = -1;
916                 int        sent_hello = -1, state = -1;
917
918                 CDEBUG(D_NET, "Peer=%s Credits=%d Outstanding=%d Send=%d\n",
919                        libcfs_id2str(peer->peer_id), peer->peer_credits, 
920                        peer->peer_outstanding_credits, peer->peer_sent_credits);
921
922                 spin_lock(&peer->peer_lock);
923
924                 if (peer->peer_check_stamp == stamp) {
925                         /* checked already this pass */
926                         spin_unlock(&peer->peer_lock);
927                         continue;
928                 }
929
930                 peer->peer_check_stamp = stamp;
931                 tx = kptllnd_find_timed_out_tx(peer);
932                 check_sends = peer->peer_retry_noop;
933
934                 if (tx != NULL) {
935                         c  = peer->peer_credits;
936                         sc = peer->peer_sent_credits;
937                         oc = peer->peer_outstanding_credits;
938                         state      = peer->peer_state;
939                         sent_hello = peer->peer_sent_hello;
940                         nsend   = kptllnd_count_queue(&peer->peer_sendq);
941                         nactive = kptllnd_count_queue(&peer->peer_activeq);
942                 }
943
944                 spin_unlock(&peer->peer_lock);
945
946                 if (tx == NULL && !check_sends)
947                         continue;
948
949                 kptllnd_peer_addref(peer); /* 1 ref for me... */
950
951                 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
952                                            flags);
953
954                 if (tx == NULL) { /* nothing timed out */
955                         kptllnd_peer_check_sends(peer);
956                         kptllnd_peer_decref(peer); /* ...until here or... */
957
958                         /* rescan after dropping the lock */
959                         goto again;
960                 }
961
962                 LCONSOLE_ERROR_MSG(0x126, "Timing out %s: %s\n",
963                                    libcfs_id2str(peer->peer_id),
964                                    (tx->tx_tposted == 0) ?
965                                    "no free peer buffers" :
966                                    "please check Portals");
967
968                 if (tx->tx_tposted) {
969                         CERROR("Could not send to %s after %ds (sent %lds ago); "
970                                 "check Portals for possible issues\n",
971                                 libcfs_id2str(peer->peer_id),
972                                 *kptllnd_tunables.kptl_timeout,
973                                 cfs_duration_sec(jiffies - tx->tx_tposted));
974                 } else if (state < PEER_STATE_ACTIVE) {
975                         CERROR("Could not connect %s (%d) after %ds; "
976                                "peer might be down\n",
977                                libcfs_id2str(peer->peer_id), state,
978                                *kptllnd_tunables.kptl_timeout);
979                 } else {
980                         CERROR("Could not get credits for %s after %ds; "
981                                 "possible Lustre networking issues\n",
982                         libcfs_id2str(peer->peer_id),
983                         *kptllnd_tunables.kptl_timeout);
984                 }
985
986                 CERROR("%s timed out: cred %d outstanding %d, sent %d, "
987                        "state %d, sent_hello %d, sendq %d, activeq %d "
988                        "Tx %p %s %s (%s%s%s) status %d %sposted %lu T/O %ds\n",
989                        libcfs_id2str(peer->peer_id), c, oc, sc,
990                        state, sent_hello, nsend, nactive,
991                        tx, kptllnd_tx_typestr(tx->tx_type),
992                        kptllnd_msgtype2str(tx->tx_msg->ptlm_type),
993                        tx->tx_active ? "A" : "",
994                        PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ?
995                        "" : "M",
996                        PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE) ?
997                        "" : "D",
998                        tx->tx_status,
999                        (tx->tx_tposted == 0) ? "not " : "",
1000                        (tx->tx_tposted == 0) ? 0UL : (jiffies - tx->tx_tposted),
1001                        *kptllnd_tunables.kptl_timeout);
1002
1003                 kptllnd_tx_decref(tx);
1004
1005                 kptllnd_peer_close(peer, -ETIMEDOUT);
1006                 kptllnd_peer_decref(peer); /* ...until here */
1007
1008                 /* start again now I've dropped the lock */
1009                 goto again;
1010         }
1011
1012         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
1013 }
1014
1015 kptl_peer_t *
1016 kptllnd_id2peer_locked (lnet_process_id_t id)
1017 {
1018         cfs_list_t       *peers = kptllnd_nid2peerlist(id.nid);
1019         cfs_list_t       *tmp;
1020         kptl_peer_t      *peer;
1021
1022         cfs_list_for_each (tmp, peers) {
1023                 peer = cfs_list_entry (tmp, kptl_peer_t, peer_list);
1024
1025                 LASSERT(peer->peer_state == PEER_STATE_WAITING_HELLO ||
1026                         peer->peer_state == PEER_STATE_ACTIVE);
1027
1028                 /* NB logical LNet peers share one kptl_peer_t */
1029                 if (peer->peer_id.pid != id.pid ||
1030                     LNET_NIDADDR(id.nid) != LNET_NIDADDR(peer->peer_id.nid))
1031                         continue;
1032
1033                 kptllnd_peer_addref(peer);
1034
1035                 CDEBUG(D_NET, "%s -> %s (%d)\n",
1036                        libcfs_id2str(id),
1037                        kptllnd_ptlid2str(peer->peer_ptlid),
1038                        cfs_atomic_read (&peer->peer_refcount));
1039                 return peer;
1040         }
1041
1042         return NULL;
1043 }
1044
1045 void
1046 kptllnd_peertable_overflow_msg(char *str, lnet_process_id_t id)
1047 {
1048         LCONSOLE_ERROR_MSG(0x127, "%s %s overflows the peer table[%d]: "
1049                            "messages may be dropped\n",
1050                            str, libcfs_id2str(id),
1051                            kptllnd_data.kptl_n_active_peers);
1052         LCONSOLE_ERROR_MSG(0x128, "Please correct by increasing "
1053                            "'max_nodes' or 'max_procs_per_node'\n");
1054 }
1055
1056 __u64
1057 kptllnd_get_last_seen_matchbits_locked(lnet_process_id_t lpid)
1058 {
1059         kptl_peer_t  *peer;
1060         cfs_list_t   *tmp;
1061
1062         /* Find the last matchbits I saw this new peer using.  Note..
1063            A. This peer cannot be in the peer table - she's new!
1064            B. If I can't find the peer in the closing/zombie peers, all
1065               matchbits are safe because all refs to the (old) peer have gone
1066               so all txs have completed so there's no risk of matchbit
1067               collision!
1068          */
1069
1070         LASSERT(kptllnd_id2peer_locked(lpid) == NULL);
1071
1072         /* peer's last matchbits can't change after it comes out of the peer
1073          * table, so first match is fine */
1074
1075         cfs_list_for_each (tmp, &kptllnd_data.kptl_closing_peers) {
1076                 peer = cfs_list_entry (tmp, kptl_peer_t, peer_list);
1077
1078                 if (LNET_NIDADDR(peer->peer_id.nid) == LNET_NIDADDR(lpid.nid) &&
1079                     peer->peer_id.pid == lpid.pid)
1080                         return peer->peer_last_matchbits_seen;
1081         }
1082
1083         cfs_list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
1084                 peer = cfs_list_entry (tmp, kptl_peer_t, peer_list);
1085
1086                 if (LNET_NIDADDR(peer->peer_id.nid) == LNET_NIDADDR(lpid.nid) &&
1087                     peer->peer_id.pid == lpid.pid)
1088                         return peer->peer_last_matchbits_seen;
1089         }
1090
1091         return PTL_RESERVED_MATCHBITS;
1092 }
1093
1094 kptl_peer_t *
1095 kptllnd_peer_handle_hello (kptl_net_t *net,
1096                            ptl_process_id_t initiator, kptl_msg_t *msg)
1097 {
1098         rwlock_t                *g_lock = &kptllnd_data.kptl_peer_rw_lock;
1099         kptl_peer_t        *peer;
1100         kptl_peer_t        *new_peer;
1101         lnet_process_id_t   lpid;
1102         unsigned long       flags;
1103         kptl_tx_t          *hello_tx;
1104         int                 rc;
1105         __u64               safe_matchbits;
1106         __u64               last_matchbits_seen;
1107
1108         lpid.nid = msg->ptlm_srcnid;
1109         lpid.pid = msg->ptlm_srcpid;
1110
1111         CDEBUG(D_NET, "hello from %s(%s)\n",
1112                libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
1113
1114         if (initiator.pid != kptllnd_data.kptl_portals_id.pid &&
1115             (msg->ptlm_srcpid & LNET_PID_USERFLAG) == 0) {
1116                 /* If the peer's PID isn't _the_ ptllnd kernel pid, she must be
1117                  * userspace.  Refuse the connection if she hasn't set the
1118                  * correct flag in her PID... */
1119                 CERROR("Userflag not set in hello from %s (%s)\n",
1120                        libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
1121                 return NULL;
1122         }
1123         
1124         /* kptlhm_matchbits are the highest matchbits my peer may have used to
1125          * RDMA to me.  I ensure I never register buffers for RDMA that could
1126          * match any she used */
1127         safe_matchbits = msg->ptlm_u.hello.kptlhm_matchbits + 1;
1128
1129         if (safe_matchbits < PTL_RESERVED_MATCHBITS) {
1130                 CERROR("Illegal matchbits "LPX64" in HELLO from %s\n",
1131                        safe_matchbits, libcfs_id2str(lpid));
1132                 return NULL;
1133         }
1134         
1135         if (msg->ptlm_u.hello.kptlhm_max_msg_size < PTLLND_MIN_BUFFER_SIZE) {
1136                 CERROR("%s: max message size %d < MIN %d",
1137                        libcfs_id2str(lpid),
1138                        msg->ptlm_u.hello.kptlhm_max_msg_size,
1139                        PTLLND_MIN_BUFFER_SIZE);
1140                 return NULL;
1141         }
1142
1143         if (msg->ptlm_credits <= 1) {
1144                 CERROR("Need more than 1+%d credits from %s\n",
1145                        msg->ptlm_credits, libcfs_id2str(lpid));
1146                 return NULL;
1147         }
1148         
1149         write_lock_irqsave(g_lock, flags);
1150
1151         peer = kptllnd_id2peer_locked(lpid);
1152         if (peer != NULL) {
1153                 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
1154                         /* Completing HELLO handshake */
1155                         LASSERT(peer->peer_incarnation == 0);
1156
1157                         if (msg->ptlm_dststamp != 0 &&
1158                             msg->ptlm_dststamp != peer->peer_myincarnation) {
1159                                 write_unlock_irqrestore(g_lock, flags);
1160
1161                                 CERROR("Ignoring HELLO from %s: unexpected "
1162                                        "dststamp "LPX64" ("LPX64" wanted)\n",
1163                                        libcfs_id2str(lpid),
1164                                        msg->ptlm_dststamp,
1165                                        peer->peer_myincarnation);
1166                                 kptllnd_peer_decref(peer);
1167                                 return NULL;
1168                         }
1169                         
1170                         /* Concurrent initiation or response to my HELLO */
1171                         peer->peer_state = PEER_STATE_ACTIVE;
1172                         peer->peer_incarnation = msg->ptlm_srcstamp;
1173                         peer->peer_next_matchbits = safe_matchbits;
1174                         peer->peer_max_msg_size =
1175                                 msg->ptlm_u.hello.kptlhm_max_msg_size;
1176                         
1177                         write_unlock_irqrestore(g_lock, flags);
1178                         return peer;
1179                 }
1180
1181                 if (msg->ptlm_dststamp != 0 &&
1182                     msg->ptlm_dststamp <= peer->peer_myincarnation) {
1183                         write_unlock_irqrestore(g_lock, flags);
1184
1185                         CERROR("Ignoring stale HELLO from %s: "
1186                                "dststamp "LPX64" (current "LPX64")\n",
1187                                libcfs_id2str(lpid),
1188                                msg->ptlm_dststamp,
1189                                peer->peer_myincarnation);
1190                         kptllnd_peer_decref(peer);
1191                         return NULL;
1192                 }
1193
1194                 /* Brand new connection attempt: remove old incarnation */
1195                 kptllnd_peer_close_locked(peer, 0);
1196         }
1197
1198         kptllnd_cull_peertable_locked(lpid);
1199
1200         write_unlock_irqrestore(g_lock, flags);
1201
1202         if (peer != NULL) {
1203                 CDEBUG(D_NET, "Peer %s (%s) reconnecting:"
1204                        " stamp "LPX64"("LPX64")\n",
1205                        libcfs_id2str(lpid), kptllnd_ptlid2str(initiator),
1206                        msg->ptlm_srcstamp, peer->peer_incarnation);
1207
1208                 kptllnd_peer_decref(peer);
1209                 peer = NULL;
1210         }
1211
1212         hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1213         if (hello_tx == NULL) {
1214                 CERROR("Unable to allocate HELLO message for %s\n",
1215                        libcfs_id2str(lpid));
1216                 return NULL;
1217         }
1218
1219         kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1220                          lpid, sizeof(kptl_hello_msg_t));
1221
1222         new_peer = kptllnd_peer_allocate(net, lpid, initiator);
1223         if (new_peer == NULL) {
1224                 kptllnd_tx_decref(hello_tx);
1225                 return NULL;
1226         }
1227
1228         rc = kptllnd_peer_reserve_buffers();
1229         if (rc != 0) {
1230                 kptllnd_peer_decref(new_peer);
1231                 kptllnd_tx_decref(hello_tx);
1232
1233                 CERROR("Failed to reserve buffers for %s\n",
1234                        libcfs_id2str(lpid));
1235                 return NULL;
1236         }
1237
1238         write_lock_irqsave(g_lock, flags);
1239
1240  again:
1241         if (net->net_shutdown) {
1242                 write_unlock_irqrestore(g_lock, flags);
1243
1244                 CERROR ("Shutdown started, refusing connection from %s\n",
1245                         libcfs_id2str(lpid));
1246                 kptllnd_peer_unreserve_buffers();
1247                 kptllnd_peer_decref(new_peer);
1248                 kptllnd_tx_decref(hello_tx);
1249                 return NULL;
1250         }
1251
1252         peer = kptllnd_id2peer_locked(lpid);
1253         if (peer != NULL) {
1254                 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
1255                         /* An outgoing message instantiated 'peer' for me */
1256                         LASSERT(peer->peer_incarnation == 0);
1257
1258                         peer->peer_state = PEER_STATE_ACTIVE;
1259                         peer->peer_incarnation = msg->ptlm_srcstamp;
1260                         peer->peer_next_matchbits = safe_matchbits;
1261                         peer->peer_max_msg_size =
1262                                 msg->ptlm_u.hello.kptlhm_max_msg_size;
1263
1264                         write_unlock_irqrestore(g_lock, flags);
1265
1266                         CWARN("Outgoing instantiated peer %s\n",
1267                               libcfs_id2str(lpid));
1268                 } else {
1269                         LASSERT (peer->peer_state == PEER_STATE_ACTIVE);
1270
1271                         write_unlock_irqrestore(g_lock, flags);
1272
1273                         /* WOW!  Somehow this peer completed the HELLO
1274                          * handshake while I slept.  I guess I could have slept
1275                          * while it rebooted and sent a new HELLO, so I'll fail
1276                          * this one... */
1277                         CWARN("Wow! peer %s\n", libcfs_id2str(lpid));
1278                         kptllnd_peer_decref(peer);
1279                         peer = NULL;
1280                 }
1281
1282                 kptllnd_peer_unreserve_buffers();
1283                 kptllnd_peer_decref(new_peer);
1284                 kptllnd_tx_decref(hello_tx);
1285                 return peer;
1286         }
1287
1288         if (kptllnd_data.kptl_n_active_peers ==
1289             kptllnd_data.kptl_expected_peers) {
1290                 /* peer table full */
1291                 write_unlock_irqrestore(g_lock, flags);
1292
1293                 kptllnd_peertable_overflow_msg("Connection from ", lpid);
1294
1295                 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1296                 if (rc != 0) {
1297                         CERROR("Refusing connection from %s\n",
1298                                libcfs_id2str(lpid));
1299                         kptllnd_peer_unreserve_buffers();
1300                         kptllnd_peer_decref(new_peer);
1301                         kptllnd_tx_decref(hello_tx);
1302                         return NULL;
1303                 }
1304                 
1305                 write_lock_irqsave(g_lock, flags);
1306                 kptllnd_data.kptl_expected_peers++;
1307                 goto again;
1308         }
1309
1310         last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(lpid);
1311
1312         hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1313         hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1314                 *kptllnd_tunables.kptl_max_msg_size;
1315
1316         new_peer->peer_state = PEER_STATE_ACTIVE;
1317         new_peer->peer_incarnation = msg->ptlm_srcstamp;
1318         new_peer->peer_next_matchbits = safe_matchbits;
1319         new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1320         new_peer->peer_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size;
1321
1322         LASSERT (!net->net_shutdown);
1323         kptllnd_peer_add_peertable_locked(new_peer);
1324
1325         write_unlock_irqrestore(g_lock, flags);
1326
1327         /* NB someone else could get in now and post a message before I post
1328          * the HELLO, but post_tx/check_sends take care of that! */
1329
1330         CDEBUG(D_NETTRACE, "%s: post response hello %p\n",
1331                libcfs_id2str(new_peer->peer_id), hello_tx);
1332
1333         kptllnd_post_tx(new_peer, hello_tx, 0);
1334         kptllnd_peer_check_sends(new_peer);
1335
1336         return new_peer;
1337 }
1338
1339 void
1340 kptllnd_tx_launch(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
1341 {
1342         kptllnd_post_tx(peer, tx, nfrag);
1343         kptllnd_peer_check_sends(peer);
1344 }
1345
1346 int
1347 kptllnd_find_target(kptl_net_t *net, lnet_process_id_t target,
1348                     kptl_peer_t **peerp)
1349 {
1350         rwlock_t     *g_lock = &kptllnd_data.kptl_peer_rw_lock;
1351         ptl_process_id_t  ptl_id;
1352         kptl_peer_t      *new_peer;
1353         kptl_tx_t        *hello_tx;
1354         unsigned long     flags;
1355         int               rc;
1356         __u64             last_matchbits_seen;
1357
1358         /* I expect to find the peer, so I only take a read lock... */
1359         read_lock_irqsave(g_lock, flags);
1360         *peerp = kptllnd_id2peer_locked(target);
1361         read_unlock_irqrestore(g_lock, flags);
1362
1363         if (*peerp != NULL)
1364                 return 0;
1365
1366         if ((target.pid & LNET_PID_USERFLAG) != 0) {
1367                 CWARN("Refusing to create a new connection to %s "
1368                       "(non-kernel peer)\n", libcfs_id2str(target));
1369                 return -EHOSTUNREACH;
1370         }
1371
1372         /* The new peer is a kernel ptllnd, and kernel ptllnds all have the
1373          * same portals PID, which has nothing to do with LUSTRE_SRV_LNET_PID */
1374         ptl_id.nid = kptllnd_lnet2ptlnid(target.nid);
1375         ptl_id.pid = kptllnd_data.kptl_portals_id.pid;
1376
1377         hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1378         if (hello_tx == NULL) {
1379                 CERROR("Unable to allocate connect message for %s\n",
1380                        libcfs_id2str(target));
1381                 return -ENOMEM;
1382         }
1383
1384         hello_tx->tx_acked = 1;
1385         kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1386                          target, sizeof(kptl_hello_msg_t));
1387
1388         new_peer = kptllnd_peer_allocate(net, target, ptl_id);
1389         if (new_peer == NULL) {
1390                 rc = -ENOMEM;
1391                 goto unwind_0;
1392         }
1393
1394         rc = kptllnd_peer_reserve_buffers();
1395         if (rc != 0)
1396                 goto unwind_1;
1397
1398         write_lock_irqsave(g_lock, flags);
1399  again:
1400         /* Called only in lnd_send which can't happen after lnd_shutdown */
1401         LASSERT (!net->net_shutdown);
1402
1403         *peerp = kptllnd_id2peer_locked(target);
1404         if (*peerp != NULL) {
1405                 write_unlock_irqrestore(g_lock, flags);
1406                 goto unwind_2;
1407         }
1408
1409         kptllnd_cull_peertable_locked(target);
1410
1411         if (kptllnd_data.kptl_n_active_peers ==
1412             kptllnd_data.kptl_expected_peers) {
1413                 /* peer table full */
1414                 write_unlock_irqrestore(g_lock, flags);
1415
1416                 kptllnd_peertable_overflow_msg("Connection to ", target);
1417
1418                 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1419                 if (rc != 0) {
1420                         CERROR("Can't create connection to %s\n",
1421                                libcfs_id2str(target));
1422                         rc = -ENOMEM;
1423                         goto unwind_2;
1424                 }
1425                 write_lock_irqsave(g_lock, flags);
1426                 kptllnd_data.kptl_expected_peers++;
1427                 goto again;
1428         }
1429
1430         last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(target);
1431
1432         hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1433         hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1434                 *kptllnd_tunables.kptl_max_msg_size;
1435
1436         new_peer->peer_state = PEER_STATE_WAITING_HELLO;
1437         new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1438
1439         kptllnd_peer_add_peertable_locked(new_peer);
1440
1441         write_unlock_irqrestore(g_lock, flags);
1442
1443         /* NB someone else could get in now and post a message before I post
1444          * the HELLO, but post_tx/check_sends take care of that! */
1445
1446         CDEBUG(D_NETTRACE, "%s: post initial hello %p\n",
1447                libcfs_id2str(new_peer->peer_id), hello_tx);
1448
1449         kptllnd_post_tx(new_peer, hello_tx, 0);
1450         kptllnd_peer_check_sends(new_peer);
1451
1452         *peerp = new_peer;
1453         return 0;
1454
1455  unwind_2:
1456         kptllnd_peer_unreserve_buffers();
1457  unwind_1:
1458         kptllnd_peer_decref(new_peer);
1459  unwind_0:
1460         kptllnd_tx_decref(hello_tx);
1461
1462         return rc;
1463 }