Whamcloud - gitweb
f4e67f4bd72ad609353b9101ae88186a0cd92748
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd_peer.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved.
5  *   Author: PJ Kirner <pjkirner@clusterfs.com>
6  *           E Barton <eeb@bartonsoftware.com>
7  *
8  *   This file is part of the Lustre file system, http://www.lustre.org
9  *   Lustre is a trademark of Cluster File Systems, Inc.
10  *
11  *   This file is confidential source code owned by Cluster File Systems.
12  *   No viewing, modification, compilation, redistribution, or any other
13  *   form of use is permitted except through a signed license agreement.
14  *
15  *   If you have not signed such an agreement, then you have no rights to
16  *   this file.  Please destroy it immediately and contact CFS.
17  *
18  */
19
20 #include "ptllnd.h"
21 #include <libcfs/list.h>
22
23 static int
24 kptllnd_count_queue(struct list_head *q)
25 {
26         struct list_head *e;
27         int               n = 0;
28         
29         list_for_each(e, q) {
30                 n++;
31         }
32
33         return n;
34 }
35
36 int
37 kptllnd_get_peer_info(int index, 
38                       lnet_process_id_t *id,
39                       int *state, int *sent_hello,
40                       int *refcount, __u64 *incarnation,
41                       __u64 *next_matchbits, __u64 *last_matchbits_seen,
42                       int *nsendq, int *nactiveq,
43                       int *credits, int *outstanding_credits) 
44 {
45         rwlock_t         *g_lock = &kptllnd_data.kptl_peer_rw_lock;
46         unsigned long     flags;
47         struct list_head *ptmp;
48         kptl_peer_t      *peer;
49         int               i;
50         int               rc = -ENOENT;
51
52         read_lock_irqsave(g_lock, flags);
53
54         for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++) {
55                 
56                 list_for_each (ptmp, &kptllnd_data.kptl_peers[i]) {
57                         peer = list_entry(ptmp, kptl_peer_t, peer_list);
58
59                         if (index-- > 0)
60                                 continue;
61                         
62                         *id          = peer->peer_id;
63                         *state       = peer->peer_state;
64                         *sent_hello  = peer->peer_sent_hello;
65                         *refcount    = atomic_read(&peer->peer_refcount);
66                         *incarnation = peer->peer_incarnation;
67
68                         spin_lock(&peer->peer_lock);
69
70                         *next_matchbits      = peer->peer_next_matchbits;
71                         *last_matchbits_seen = peer->peer_last_matchbits_seen;
72                         *credits             = peer->peer_credits;
73                         *outstanding_credits = peer->peer_outstanding_credits;
74
75                         *nsendq   = kptllnd_count_queue(&peer->peer_sendq);
76                         *nactiveq = kptllnd_count_queue(&peer->peer_activeq);
77
78                         spin_unlock(&peer->peer_lock);
79
80                         rc = 0;
81                         goto out;
82                 }
83         }
84         
85  out:
86         read_unlock_irqrestore(g_lock, flags);
87         return rc;
88 }
89
90 void
91 kptllnd_peer_add_peertable_locked (kptl_peer_t *peer)
92 {
93         LASSERT (!kptllnd_data.kptl_shutdown);
94         LASSERT (kptllnd_data.kptl_n_active_peers <
95                  kptllnd_data.kptl_expected_peers);
96
97         LASSERT (peer->peer_state == PEER_STATE_WAITING_HELLO ||
98                  peer->peer_state == PEER_STATE_ACTIVE);
99         
100         kptllnd_data.kptl_n_active_peers++;
101         atomic_inc(&peer->peer_refcount);       /* +1 ref for the list */
102
103         /* NB add to HEAD of peer list for MRU order!
104          * (see kptllnd_cull_peertable) */
105         list_add(&peer->peer_list, kptllnd_nid2peerlist(peer->peer_id.nid));
106 }
107
108 void
109 kptllnd_cull_peertable_locked (lnet_process_id_t pid)
110 {
111         /* I'm about to add a new peer with this portals ID to the peer table,
112          * so (a) this peer should not exist already and (b) I want to leave at
113          * most (max_procs_per_nid - 1) peers with this NID in the table. */
114         struct list_head  *peers = kptllnd_nid2peerlist(pid.nid);
115         int                cull_count = *kptllnd_tunables.kptl_max_procs_per_node;
116         int                count;
117         struct list_head  *tmp;
118         struct list_head  *nxt;
119         kptl_peer_t       *peer;
120         
121         count = 0;
122         list_for_each_safe (tmp, nxt, peers) {
123                 /* NB I rely on kptllnd_peer_add_peertable_locked to add peers
124                  * in MRU order */
125                 peer = list_entry(tmp, kptl_peer_t, peer_list);
126                         
127                 if (peer->peer_id.nid != pid.nid)
128                         continue;
129
130                 LASSERT (peer->peer_id.pid != pid.pid);
131                         
132                 count++;
133
134                 if (count < cull_count) /* recent (don't cull) */
135                         continue;
136
137                 CDEBUG(D_NET, "Cull %s(%s)\n",
138                        libcfs_id2str(peer->peer_id),
139                        kptllnd_ptlid2str(peer->peer_ptlid));
140                 
141                 kptllnd_peer_close_locked(peer, 0);
142         }
143 }
144
145 kptl_peer_t *
146 kptllnd_peer_allocate (lnet_process_id_t lpid, ptl_process_id_t ppid)
147 {
148         unsigned long    flags;
149         kptl_peer_t     *peer;
150
151         LIBCFS_ALLOC(peer, sizeof (*peer));
152         if (peer == NULL) {
153                 CERROR("Can't create peer %s (%s)\n",
154                        libcfs_id2str(lpid), 
155                        kptllnd_ptlid2str(ppid));
156                 return NULL;
157         }
158
159         memset(peer, 0, sizeof(*peer));         /* zero flags etc */
160
161         INIT_LIST_HEAD (&peer->peer_noops);
162         INIT_LIST_HEAD (&peer->peer_sendq);
163         INIT_LIST_HEAD (&peer->peer_activeq);
164         spin_lock_init (&peer->peer_lock);
165
166         peer->peer_state = PEER_STATE_ALLOCATED;
167         peer->peer_error = 0;
168         peer->peer_last_alive = cfs_time_current();
169         peer->peer_id = lpid;
170         peer->peer_ptlid = ppid;
171         peer->peer_credits = 1;                 /* enough for HELLO */
172         peer->peer_next_matchbits = PTL_RESERVED_MATCHBITS;
173         peer->peer_outstanding_credits = *kptllnd_tunables.kptl_peercredits - 1;
174         peer->peer_sent_credits = 1;           /* HELLO credit is implicit */
175         peer->peer_max_msg_size = PTLLND_MIN_BUFFER_SIZE; /* until we know better */
176
177         atomic_set(&peer->peer_refcount, 1);    /* 1 ref for caller */
178
179         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
180
181         peer->peer_myincarnation = kptllnd_data.kptl_incarnation;
182
183         /* Only increase # peers under lock, to guarantee we dont grow it
184          * during shutdown */
185         if (kptllnd_data.kptl_shutdown) {
186                 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, 
187                                         flags);
188                 LIBCFS_FREE(peer, sizeof(*peer));
189                 return NULL;
190         }
191
192         kptllnd_data.kptl_npeers++;
193         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
194         
195         return peer;
196 }
197
198 void
199 kptllnd_peer_destroy (kptl_peer_t *peer)
200 {
201         unsigned long flags;
202         
203         CDEBUG(D_NET, "Peer=%p\n", peer);
204
205         LASSERT (!in_interrupt());
206         LASSERT (atomic_read(&peer->peer_refcount) == 0);
207         LASSERT (peer->peer_state == PEER_STATE_ALLOCATED ||
208                  peer->peer_state == PEER_STATE_ZOMBIE);
209         LASSERT (list_empty(&peer->peer_noops));
210         LASSERT (list_empty(&peer->peer_sendq));
211         LASSERT (list_empty(&peer->peer_activeq));
212
213         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
214
215         if (peer->peer_state == PEER_STATE_ZOMBIE)
216                 list_del(&peer->peer_list);
217
218         kptllnd_data.kptl_npeers--;
219
220         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
221
222         LIBCFS_FREE (peer, sizeof (*peer));
223 }
224
225 void
226 kptllnd_cancel_txlist (struct list_head *peerq, struct list_head *txs)
227 {
228         struct list_head  *tmp;
229         struct list_head  *nxt;
230         kptl_tx_t         *tx;
231
232         list_for_each_safe (tmp, nxt, peerq) {
233                 tx = list_entry(tmp, kptl_tx_t, tx_list);
234
235                 list_del(&tx->tx_list);
236                 list_add_tail(&tx->tx_list, txs);
237
238                 tx->tx_status = -EIO;
239                 tx->tx_active = 0;
240         }
241 }
242
243 void
244 kptllnd_peer_cancel_txs(kptl_peer_t *peer, struct list_head *txs)
245 {
246         unsigned long   flags;
247
248         spin_lock_irqsave(&peer->peer_lock, flags);
249
250         kptllnd_cancel_txlist(&peer->peer_noops, txs);
251         kptllnd_cancel_txlist(&peer->peer_sendq, txs);
252         kptllnd_cancel_txlist(&peer->peer_activeq, txs);
253                 
254         spin_unlock_irqrestore(&peer->peer_lock, flags);
255 }
256
257 void
258 kptllnd_peer_alive (kptl_peer_t *peer)
259 {
260         /* This is racy, but everyone's only writing cfs_time_current() */
261         peer->peer_last_alive = cfs_time_current();
262         mb();
263 }
264
265 void
266 kptllnd_peer_notify (kptl_peer_t *peer)
267 {
268         unsigned long flags;
269         time_t        last_alive = 0;
270         int           error = 0;
271         
272         spin_lock_irqsave(&peer->peer_lock, flags);
273
274         if (peer->peer_error != 0) {
275                 error = peer->peer_error;
276                 peer->peer_error = 0;
277                 
278                 last_alive = cfs_time_current_sec() - 
279                              cfs_duration_sec(cfs_time_current() - 
280                                               peer->peer_last_alive);
281         }
282         
283         spin_unlock_irqrestore(&peer->peer_lock, flags);
284
285         if (error != 0)
286                 lnet_notify (kptllnd_data.kptl_ni, peer->peer_id.nid, 0,
287                              last_alive);
288 }
289
290 void
291 kptllnd_handle_closing_peers ()
292 {
293         unsigned long           flags;
294         struct list_head        txs;
295         kptl_peer_t            *peer;
296         struct list_head       *tmp;
297         struct list_head       *nxt;
298         kptl_tx_t              *tx;
299         int                     idle;
300
301         /* Check with a read lock first to avoid blocking anyone */
302
303         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
304         idle = list_empty(&kptllnd_data.kptl_closing_peers) &&
305                list_empty(&kptllnd_data.kptl_zombie_peers);
306         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
307
308         if (idle)
309                 return;
310
311         INIT_LIST_HEAD(&txs);
312
313         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
314
315         /* Cancel txs on all zombie peers.  NB anyone dropping the last peer
316          * ref removes it from this list, so I musn't drop the lock while
317          * scanning it. */
318         list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
319                 peer = list_entry (tmp, kptl_peer_t, peer_list);
320
321                 LASSERT (peer->peer_state == PEER_STATE_ZOMBIE);
322
323                 kptllnd_peer_cancel_txs(peer, &txs);
324         }
325
326         /* Notify LNET and cancel txs on closing (i.e. newly closed) peers.  NB
327          * I'm the only one removing from this list, but peers can be added on
328          * the end any time I drop the lock. */
329
330         list_for_each_safe (tmp, nxt, &kptllnd_data.kptl_closing_peers) {
331                 peer = list_entry (tmp, kptl_peer_t, peer_list);
332
333                 LASSERT (peer->peer_state == PEER_STATE_CLOSING);
334
335                 list_del(&peer->peer_list);
336                 list_add_tail(&peer->peer_list,
337                               &kptllnd_data.kptl_zombie_peers);
338                 peer->peer_state = PEER_STATE_ZOMBIE;
339
340                 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
341
342                 kptllnd_peer_notify(peer);
343                 kptllnd_peer_cancel_txs(peer, &txs);
344                 kptllnd_peer_decref(peer);
345
346                 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
347         }
348
349         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
350
351         /* Drop peer's ref on all cancelled txs.  This will get
352          * kptllnd_tx_fini() to abort outstanding comms if necessary. */
353
354         list_for_each_safe (tmp, nxt, &txs) {
355                 tx = list_entry(tmp, kptl_tx_t, tx_list);
356                 list_del(&tx->tx_list);
357                 kptllnd_tx_decref(tx);
358         }
359 }
360
361 void
362 kptllnd_peer_close_locked(kptl_peer_t *peer, int why)
363 {
364         switch (peer->peer_state) {
365         default:
366                 LBUG();
367
368         case PEER_STATE_WAITING_HELLO:
369         case PEER_STATE_ACTIVE:
370                 /* Ensure new peers see a new incarnation of me */
371                 LASSERT(peer->peer_myincarnation <= kptllnd_data.kptl_incarnation);
372                 if (peer->peer_myincarnation == kptllnd_data.kptl_incarnation)
373                         kptllnd_data.kptl_incarnation++;
374
375                 /* Removing from peer table */
376                 kptllnd_data.kptl_n_active_peers--;
377                 LASSERT (kptllnd_data.kptl_n_active_peers >= 0);
378
379                 list_del(&peer->peer_list);
380                 kptllnd_peer_unreserve_buffers();
381
382                 peer->peer_error = why; /* stash 'why' only on first close */
383                 peer->peer_state = PEER_STATE_CLOSING;
384
385                 /* Schedule for immediate attention, taking peer table's ref */
386                 list_add_tail(&peer->peer_list, 
387                               &kptllnd_data.kptl_closing_peers);
388                 wake_up(&kptllnd_data.kptl_watchdog_waitq);
389                 break;
390
391         case PEER_STATE_ZOMBIE:
392         case PEER_STATE_CLOSING:
393                 break;
394         }
395 }
396
397 void
398 kptllnd_peer_close(kptl_peer_t *peer, int why)
399 {
400         unsigned long      flags;
401
402         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
403         kptllnd_peer_close_locked(peer, why);
404         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
405 }
406
407 int
408 kptllnd_peer_del(lnet_process_id_t id)
409 {
410         struct list_head  *ptmp;
411         struct list_head  *pnxt;
412         kptl_peer_t       *peer;
413         int                lo;
414         int                hi;
415         int                i;
416         unsigned long      flags;
417         int                rc = -ENOENT;
418
419         /*
420          * Find the single bucket we are supposed to look at or if nid is a
421          * wildcard (LNET_NID_ANY) then look at all of the buckets
422          */
423         if (id.nid != LNET_NID_ANY) {
424                 struct list_head *l = kptllnd_nid2peerlist(id.nid);
425                 
426                 lo = hi =  l - kptllnd_data.kptl_peers;
427         } else {
428                 if (id.pid != LNET_PID_ANY)
429                         return -EINVAL;
430                 
431                 lo = 0;
432                 hi = kptllnd_data.kptl_peer_hash_size - 1;
433         }
434
435 again:
436         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
437
438         for (i = lo; i <= hi; i++) {
439                 list_for_each_safe (ptmp, pnxt, &kptllnd_data.kptl_peers[i]) {
440                         peer = list_entry (ptmp, kptl_peer_t, peer_list);
441
442                         if (!(id.nid == LNET_NID_ANY || 
443                               (peer->peer_id.nid == id.nid &&
444                                (id.pid == LNET_PID_ANY || 
445                                 peer->peer_id.pid == id.pid))))
446                                 continue;
447
448                         kptllnd_peer_addref(peer); /* 1 ref for me... */
449
450                         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
451                                                flags);
452
453                         kptllnd_peer_close(peer, 0);
454                         kptllnd_peer_decref(peer); /* ...until here */
455
456                         rc = 0;         /* matched something */
457
458                         /* start again now I've dropped the lock */
459                         goto again;
460                 }
461         }
462
463         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
464
465         return (rc);
466 }
467
468 void
469 kptllnd_post_tx(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
470 {
471         /* CAVEAT EMPTOR: I take over caller's ref on 'tx' */
472         ptl_handle_md_t  msg_mdh;
473         ptl_md_t         md;
474         ptl_err_t        prc;
475         unsigned long    flags;
476
477         LASSERT (!tx->tx_idle);
478         LASSERT (!tx->tx_active);
479         LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
480         LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
481         LASSERT (tx->tx_type == TX_TYPE_SMALL_MESSAGE ||
482                  tx->tx_type == TX_TYPE_PUT_REQUEST ||
483                  tx->tx_type == TX_TYPE_GET_REQUEST);
484
485         kptllnd_set_tx_peer(tx, peer);
486
487         memset(&md, 0, sizeof(md));
488
489         md.threshold = tx->tx_acked ? 2 : 1;    /* SEND END + ACK? */
490         md.options = PTL_MD_OP_PUT |
491                      PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
492                      PTL_MD_EVENT_START_DISABLE;
493         md.user_ptr = &tx->tx_msg_eventarg;
494         md.eq_handle = kptllnd_data.kptl_eqh;
495
496         if (nfrag == 0) {
497                 md.start = tx->tx_msg;
498                 md.length = tx->tx_msg->ptlm_nob;
499         } else {
500                 LASSERT (nfrag > 1);
501                 LASSERT (tx->tx_frags->iov[0].iov_base == (void *)tx->tx_msg);
502
503                 md.start = tx->tx_frags;
504                 md.length = nfrag;
505                 md.options |= PTL_MD_IOVEC;
506         }
507
508         prc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &msg_mdh);
509         if (prc != PTL_OK) {
510                 CERROR("PtlMDBind(%s) failed: %s(%d)\n",
511                        libcfs_id2str(peer->peer_id),
512                        kptllnd_errtype2str(prc), prc);
513                 tx->tx_status = -EIO;
514                 kptllnd_tx_decref(tx);
515                 return;
516         }
517
518         spin_lock_irqsave(&peer->peer_lock, flags);
519
520         tx->tx_deadline = jiffies + (*kptllnd_tunables.kptl_timeout * HZ);
521         tx->tx_active = 1;
522         tx->tx_msg_mdh = msg_mdh;
523
524         /* Ensure HELLO is sent first */
525         if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_NOOP)
526                 list_add(&tx->tx_list, &peer->peer_noops);
527         else if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_HELLO)
528                 list_add(&tx->tx_list, &peer->peer_sendq);
529         else
530                 list_add_tail(&tx->tx_list, &peer->peer_sendq);
531
532         spin_unlock_irqrestore(&peer->peer_lock, flags);
533 }
534
535 static inline int
536 kptllnd_peer_send_noop (kptl_peer_t *peer)
537 {
538         if (!peer->peer_sent_hello ||
539             peer->peer_credits == 0 ||
540             !list_empty(&peer->peer_noops) ||
541             peer->peer_outstanding_credits < PTLLND_CREDIT_HIGHWATER)
542                 return 0;
543
544         /* No tx to piggyback NOOP onto or no credit to send a tx */
545         return (list_empty(&peer->peer_sendq) || peer->peer_credits == 1);
546 }
547
548 void
549 kptllnd_peer_check_sends (kptl_peer_t *peer)
550 {
551         ptl_handle_me_t  meh;
552         kptl_tx_t       *tx;
553         int              rc;
554         int              msg_type;
555         unsigned long    flags;
556
557         LASSERT(!in_interrupt());
558
559         spin_lock_irqsave(&peer->peer_lock, flags);
560
561         peer->peer_retry_noop = 0;
562
563         if (kptllnd_peer_send_noop(peer)) {
564                 /* post a NOOP to return credits */
565                 spin_unlock_irqrestore(&peer->peer_lock, flags);
566
567                 tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
568                 if (tx == NULL) {
569                         CERROR("Can't return credits to %s: can't allocate descriptor\n",
570                                libcfs_id2str(peer->peer_id));
571                 } else {
572                         kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_NOOP, 0);
573                         kptllnd_post_tx(peer, tx, 0);
574                 }
575
576                 spin_lock_irqsave(&peer->peer_lock, flags);
577                 peer->peer_retry_noop = (tx == NULL);
578         }
579
580         for (;;) {
581                 if (!list_empty(&peer->peer_noops)) {
582                         LASSERT (peer->peer_sent_hello);
583                         tx = list_entry(peer->peer_noops.next,
584                                         kptl_tx_t, tx_list);
585                 } else if (!list_empty(&peer->peer_sendq)) {
586                         tx = list_entry(peer->peer_sendq.next,
587                                         kptl_tx_t, tx_list);
588                 } else {
589                         /* nothing to send right now */
590                         break;
591                 }
592
593                 LASSERT (tx->tx_active);
594                 LASSERT (!PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
595                 LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
596
597                 LASSERT (peer->peer_outstanding_credits >= 0);
598                 LASSERT (peer->peer_sent_credits >= 0);
599                 LASSERT (peer->peer_sent_credits +
600                          peer->peer_outstanding_credits <=
601                          *kptllnd_tunables.kptl_peercredits);
602                 LASSERT (peer->peer_credits >= 0);
603
604                 msg_type = tx->tx_msg->ptlm_type;
605
606                 /* Ensure HELLO is sent first */
607                 if (!peer->peer_sent_hello) {
608                         LASSERT (list_empty(&peer->peer_noops));
609                         if (msg_type != PTLLND_MSG_TYPE_HELLO)
610                                 break;
611                         peer->peer_sent_hello = 1;
612                 }
613
614                 if (peer->peer_credits == 0) {
615                         CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: no credits for %s[%p]\n",
616                                libcfs_id2str(peer->peer_id), 
617                                peer->peer_credits,
618                                peer->peer_outstanding_credits, 
619                                peer->peer_sent_credits, 
620                                kptllnd_msgtype2str(msg_type), tx);
621                         break;
622                 }
623
624                 /* Last/Initial credit reserved for NOOP/HELLO */
625                 if (peer->peer_credits == 1 &&
626                     msg_type != PTLLND_MSG_TYPE_HELLO &&
627                     msg_type != PTLLND_MSG_TYPE_NOOP) {
628                         CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: "
629                                "not using last credit for %s[%p]\n",
630                                libcfs_id2str(peer->peer_id), 
631                                peer->peer_credits,
632                                peer->peer_outstanding_credits,
633                                peer->peer_sent_credits,
634                                kptllnd_msgtype2str(msg_type), tx);
635                         break;
636                 }
637
638                 list_del(&tx->tx_list);
639
640                 /* Discard any NOOP I queued if I'm not at the high-water mark
641                  * any more or more messages have been queued */
642                 if (msg_type == PTLLND_MSG_TYPE_NOOP &&
643                     !kptllnd_peer_send_noop(peer)) {
644                         tx->tx_active = 0;
645
646                         spin_unlock_irqrestore(&peer->peer_lock, flags);
647
648                         CDEBUG(D_NET, "%s: redundant noop\n", 
649                                libcfs_id2str(peer->peer_id));
650                         kptllnd_tx_decref(tx);
651
652                         spin_lock_irqsave(&peer->peer_lock, flags);
653                         continue;
654                 }
655
656                 /* fill last-minute msg fields */
657                 kptllnd_msg_pack(tx->tx_msg, peer);
658
659                 if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
660                     tx->tx_type == TX_TYPE_GET_REQUEST) {
661                         /* peer_next_matchbits must be known good */
662                         LASSERT (peer->peer_state >= PEER_STATE_ACTIVE);
663                         /* Assume 64-bit matchbits can't wrap */
664                         LASSERT (peer->peer_next_matchbits >= PTL_RESERVED_MATCHBITS);
665                         tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits =
666                                 peer->peer_next_matchbits++;
667                 }
668
669                 peer->peer_sent_credits += peer->peer_outstanding_credits;
670                 peer->peer_outstanding_credits = 0;
671                 peer->peer_credits--;
672
673                 CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: %s tx=%p nob=%d cred=%d\n",
674                        libcfs_id2str(peer->peer_id), peer->peer_credits,
675                        peer->peer_outstanding_credits, peer->peer_sent_credits,
676                        kptllnd_msgtype2str(msg_type), tx, tx->tx_msg->ptlm_nob,
677                        tx->tx_msg->ptlm_credits);
678
679                 list_add_tail(&tx->tx_list, &peer->peer_activeq);
680
681                 kptllnd_tx_addref(tx);          /* 1 ref for me... */
682
683                 spin_unlock_irqrestore(&peer->peer_lock, flags);
684
685                 if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
686                     tx->tx_type == TX_TYPE_GET_REQUEST) {
687                         /* Post bulk now we have safe matchbits */
688                         rc = PtlMEAttach(kptllnd_data.kptl_nih,
689                                          *kptllnd_tunables.kptl_portal,
690                                          peer->peer_ptlid,
691                                          tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits,
692                                          0,             /* ignore bits */
693                                          PTL_UNLINK,
694                                          PTL_INS_BEFORE,
695                                          &meh);
696                         if (rc != PTL_OK) {
697                                 CERROR("PtlMEAttach(%s) failed: %s(%d)\n",
698                                        libcfs_id2str(peer->peer_id),
699                                        kptllnd_errtype2str(rc), rc);
700                                 goto failed;
701                         }
702
703                         rc = PtlMDAttach(meh, tx->tx_rdma_md, PTL_UNLINK,
704                                          &tx->tx_rdma_mdh);
705                         if (rc != PTL_OK) {
706                                 CERROR("PtlMDAttach(%s) failed: %s(%d)\n",
707                                        libcfs_id2str(tx->tx_peer->peer_id),
708                                        kptllnd_errtype2str(rc), rc);
709                                 rc = PtlMEUnlink(meh);
710                                 LASSERT(rc == PTL_OK);
711                                 tx->tx_rdma_mdh = PTL_INVALID_HANDLE;
712                                 goto failed;
713                         }
714                         /* I'm not racing with the event callback here.  It's a
715                          * bug if there's an event on the MD I just attached
716                          * before I actually send the RDMA request message -
717                          * probably matchbits re-used in error. */
718                 }
719
720                 tx->tx_tposted = jiffies;       /* going on the wire */
721
722                 rc = PtlPut (tx->tx_msg_mdh,
723                              tx->tx_acked ? PTL_ACK_REQ : PTL_NOACK_REQ,
724                              peer->peer_ptlid,
725                              *kptllnd_tunables.kptl_portal,
726                              0,                 /* acl cookie */
727                              LNET_MSG_MATCHBITS,
728                              0,                 /* offset */
729                              0);                /* header data */
730                 if (rc != PTL_OK) {
731                         CERROR("PtlPut %s error %s(%d)\n",
732                                libcfs_id2str(peer->peer_id),
733                                kptllnd_errtype2str(rc), rc);
734                         goto failed;
735                 }
736
737                 kptllnd_tx_decref(tx);          /* drop my ref */
738
739                 spin_lock_irqsave(&peer->peer_lock, flags);
740         }
741
742         spin_unlock_irqrestore(&peer->peer_lock, flags);
743         return;
744
745  failed:
746         /* Nuke everything (including tx we were trying) */
747         kptllnd_peer_close(peer, -EIO);
748         kptllnd_tx_decref(tx);
749 }
750
751 kptl_tx_t *
752 kptllnd_find_timed_out_tx(kptl_peer_t *peer)
753 {
754         kptl_tx_t         *tx;
755         struct list_head  *ele;
756
757         list_for_each(ele, &peer->peer_sendq) {
758                 tx = list_entry(ele, kptl_tx_t, tx_list);
759
760                 if (time_after_eq(jiffies, tx->tx_deadline)) {
761                         kptllnd_tx_addref(tx);
762                         return tx;
763                 }
764         }
765
766         list_for_each(ele, &peer->peer_activeq) {
767                 tx = list_entry(ele, kptl_tx_t, tx_list);
768
769                 if (time_after_eq(jiffies, tx->tx_deadline)) {
770                         kptllnd_tx_addref(tx);
771                         return tx;
772                 }
773         }
774
775         return NULL;
776 }
777
778
779 void
780 kptllnd_peer_check_bucket (int idx, int stamp)
781 {
782         struct list_head  *peers = &kptllnd_data.kptl_peers[idx];
783         struct list_head  *ptmp;
784         kptl_peer_t       *peer;
785         kptl_tx_t         *tx;
786         unsigned long      flags;
787         int                nsend;
788         int                nactive;
789         int                check_sends;
790
791         CDEBUG(D_NET, "Bucket=%d, stamp=%d\n", idx, stamp);
792
793  again:
794         /* NB. Shared lock while I just look */
795         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
796
797         list_for_each (ptmp, peers) {
798                 peer = list_entry (ptmp, kptl_peer_t, peer_list);
799
800                 CDEBUG(D_NET, "Peer=%s Credits=%d Outstanding=%d Send=%d\n",
801                        libcfs_id2str(peer->peer_id), peer->peer_credits, 
802                        peer->peer_outstanding_credits, peer->peer_sent_credits);
803
804                 spin_lock(&peer->peer_lock);
805
806                 if (peer->peer_check_stamp == stamp) {
807                         /* checked already this pass */
808                         spin_unlock(&peer->peer_lock);
809                         continue;
810                 }
811
812                 peer->peer_check_stamp = stamp;
813                 tx = kptllnd_find_timed_out_tx(peer);
814                 check_sends = peer->peer_retry_noop;
815                 
816                 spin_unlock(&peer->peer_lock);
817                 
818                 if (tx == NULL && !check_sends)
819                         continue;
820
821                 kptllnd_peer_addref(peer); /* 1 ref for me... */
822
823                 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
824
825                 if (tx == NULL) { /* nothing timed out */
826                         kptllnd_peer_check_sends(peer);
827                         kptllnd_peer_decref(peer); /* ...until here or... */
828
829                         /* rescan after dropping the lock */
830                         goto again;
831                 }
832
833                 spin_lock_irqsave(&peer->peer_lock, flags);
834                 nsend = kptllnd_count_queue(&peer->peer_sendq);
835                 nactive = kptllnd_count_queue(&peer->peer_activeq);
836                 spin_unlock_irqrestore(&peer->peer_lock, flags);
837
838                 LCONSOLE_ERROR_MSG(0x126, "Timing out %s: %s\n",
839                                    libcfs_id2str(peer->peer_id),
840                                    (tx->tx_tposted == 0) ? 
841                                    "no free peer buffers" : 
842                                    "please check Portals");
843
844                 if (tx->tx_tposted) {
845                         CERROR("Could not send to %s after %ds (sent %lds ago); "
846                                 "check Portals for possible issues\n",
847                                 libcfs_id2str(peer->peer_id),
848                                 *kptllnd_tunables.kptl_timeout,
849                                 cfs_duration_sec(jiffies - tx->tx_tposted));
850                 } else {
851                         CERROR("Could not get credits for %s after %ds; "
852                                 "possible Lustre networking issues\n",
853                         libcfs_id2str(peer->peer_id),
854                         *kptllnd_tunables.kptl_timeout);
855                 }
856
857                 CERROR("%s timed out: cred %d outstanding %d, sent %d, "
858                        "sendq %d, activeq %d Tx %p %s (%s%s%s) status %d "
859                        "%sposted %lu T/O %ds\n",
860                        libcfs_id2str(peer->peer_id), peer->peer_credits,
861                        peer->peer_outstanding_credits, peer->peer_sent_credits,
862                        nsend, nactive, tx, kptllnd_tx_typestr(tx->tx_type),
863                        tx->tx_active ? "A" : "",
864                        PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ?
865                        "" : "M",
866                        PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE) ?
867                        "" : "D",
868                        tx->tx_status,
869                        (tx->tx_tposted == 0) ? "not " : "",
870                        (tx->tx_tposted == 0) ? 0UL : (jiffies - tx->tx_tposted),
871                        *kptllnd_tunables.kptl_timeout);
872
873                 kptllnd_dump_ptltrace();
874
875                 kptllnd_tx_decref(tx);
876
877                 kptllnd_peer_close(peer, -ETIMEDOUT);
878                 kptllnd_peer_decref(peer); /* ...until here */
879
880                 /* start again now I've dropped the lock */
881                 goto again;
882         }
883
884         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
885 }
886
887 kptl_peer_t *
888 kptllnd_id2peer_locked (lnet_process_id_t id)
889 {
890         struct list_head *peers = kptllnd_nid2peerlist(id.nid);
891         struct list_head *tmp;
892         kptl_peer_t      *peer;
893
894         list_for_each (tmp, peers) {
895
896                 peer = list_entry (tmp, kptl_peer_t, peer_list);
897
898                 LASSERT(peer->peer_state == PEER_STATE_WAITING_HELLO ||
899                         peer->peer_state == PEER_STATE_ACTIVE);
900                 
901                 if (peer->peer_id.nid != id.nid ||
902                     peer->peer_id.pid != id.pid)
903                         continue;
904
905                 kptllnd_peer_addref(peer);
906
907                 CDEBUG(D_NET, "%s -> %s (%d)\n",
908                        libcfs_id2str(id), 
909                        kptllnd_ptlid2str(peer->peer_ptlid),
910                        atomic_read (&peer->peer_refcount));
911                 return peer;
912         }
913
914         return NULL;
915 }
916
917 void
918 kptllnd_peertable_overflow_msg(char *str, lnet_process_id_t id)
919 {
920         LCONSOLE_ERROR_MSG(0x127, "%s %s overflows the peer table[%d]: "
921                            "messages may be dropped\n",
922                            str, libcfs_id2str(id),
923                            kptllnd_data.kptl_n_active_peers);
924         LCONSOLE_ERROR_MSG(0x128, "Please correct by increasing "
925                            "'max_nodes' or 'max_procs_per_node'\n");
926 }
927
928 __u64
929 kptllnd_get_last_seen_matchbits_locked(lnet_process_id_t lpid)
930 {
931         kptl_peer_t            *peer;
932         struct list_head       *tmp;
933
934         /* Find the last matchbits I saw this new peer using.  Note..
935            A. This peer cannot be in the peer table - she's new!
936            B. If I can't find the peer in the closing/zombie peers, all
937               matchbits are safe because all refs to the (old) peer have gone
938               so all txs have completed so there's no risk of matchbit
939               collision!
940          */
941
942         LASSERT(kptllnd_id2peer_locked(lpid) == NULL);
943
944         /* peer's last matchbits can't change after it comes out of the peer
945          * table, so first match is fine */
946
947         list_for_each (tmp, &kptllnd_data.kptl_closing_peers) {
948                 peer = list_entry (tmp, kptl_peer_t, peer_list);
949
950                 if (peer->peer_id.nid == lpid.nid &&
951                     peer->peer_id.pid == lpid.pid)
952                         return peer->peer_last_matchbits_seen;
953         }
954         
955         list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
956                 peer = list_entry (tmp, kptl_peer_t, peer_list);
957
958                 if (peer->peer_id.nid == lpid.nid &&
959                     peer->peer_id.pid == lpid.pid)
960                         return peer->peer_last_matchbits_seen;
961         }
962         
963         return PTL_RESERVED_MATCHBITS;
964 }
965
966 kptl_peer_t *
967 kptllnd_peer_handle_hello (ptl_process_id_t  initiator,
968                            kptl_msg_t       *msg)
969 {
970         rwlock_t           *g_lock = &kptllnd_data.kptl_peer_rw_lock;
971         kptl_peer_t        *peer;
972         kptl_peer_t        *new_peer;
973         lnet_process_id_t   lpid;
974         unsigned long       flags;
975         kptl_tx_t          *hello_tx;
976         int                 rc;
977         __u64               safe_matchbits;
978         __u64               last_matchbits_seen;
979
980         lpid.nid = msg->ptlm_srcnid;
981         lpid.pid = msg->ptlm_srcpid;
982
983         CDEBUG(D_NET, "hello from %s(%s)\n",
984                libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
985
986         if (initiator.pid != kptllnd_data.kptl_portals_id.pid &&
987             (msg->ptlm_srcpid & LNET_PID_USERFLAG) == 0) {
988                 /* If the peer's PID isn't _the_ ptllnd kernel pid, she must be
989                  * userspace.  Refuse the connection if she hasn't set the
990                  * correct flag in her PID... */
991                 CERROR("Userflag not set in hello from %s (%s)\n",
992                        libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
993                 return NULL;
994         }
995         
996         /* kptlhm_matchbits are the highest matchbits my peer may have used to
997          * RDMA to me.  I ensure I never register buffers for RDMA that could
998          * match any she used */
999         safe_matchbits = msg->ptlm_u.hello.kptlhm_matchbits + 1;
1000
1001         if (safe_matchbits < PTL_RESERVED_MATCHBITS) {
1002                 CERROR("Illegal matchbits "LPX64" in HELLO from %s\n",
1003                        safe_matchbits, libcfs_id2str(lpid));
1004                 return NULL;
1005         }
1006         
1007         if (msg->ptlm_u.hello.kptlhm_max_msg_size < PTLLND_MIN_BUFFER_SIZE) {
1008                 CERROR("%s: max message size %d < MIN %d",
1009                        libcfs_id2str(lpid),
1010                        msg->ptlm_u.hello.kptlhm_max_msg_size,
1011                        PTLLND_MIN_BUFFER_SIZE);
1012                 return NULL;
1013         }
1014
1015         if (msg->ptlm_credits <= 1) {
1016                 CERROR("Need more than 1+%d credits from %s\n",
1017                        msg->ptlm_credits, libcfs_id2str(lpid));
1018                 return NULL;
1019         }
1020         
1021         write_lock_irqsave(g_lock, flags);
1022
1023         peer = kptllnd_id2peer_locked(lpid);
1024         if (peer != NULL) {
1025                 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
1026                         /* Completing HELLO handshake */
1027                         LASSERT(peer->peer_incarnation == 0);
1028
1029                         if (msg->ptlm_dststamp != 0 &&
1030                             msg->ptlm_dststamp != peer->peer_myincarnation) {
1031                                 write_unlock_irqrestore(g_lock, flags);
1032
1033                                 CERROR("Ignoring HELLO from %s: unexpected "
1034                                        "dststamp "LPX64" ("LPX64" wanted)\n",
1035                                        libcfs_id2str(lpid),
1036                                        msg->ptlm_dststamp,
1037                                        peer->peer_myincarnation);
1038                                 kptllnd_peer_decref(peer);
1039                                 return NULL;
1040                         }
1041                         
1042                         /* Concurrent initiation or response to my HELLO */
1043                         peer->peer_state = PEER_STATE_ACTIVE;
1044                         peer->peer_incarnation = msg->ptlm_srcstamp;
1045                         peer->peer_next_matchbits = safe_matchbits;
1046                         peer->peer_max_msg_size =
1047                                 msg->ptlm_u.hello.kptlhm_max_msg_size;
1048                         
1049                         write_unlock_irqrestore(g_lock, flags);
1050                         return peer;
1051                 }
1052
1053                 if (msg->ptlm_dststamp != 0 &&
1054                     msg->ptlm_dststamp <= peer->peer_myincarnation) {
1055                         write_unlock_irqrestore(g_lock, flags);
1056
1057                         CERROR("Ignoring stale HELLO from %s: "
1058                                "dststamp "LPX64" (current "LPX64")\n",
1059                                libcfs_id2str(lpid),
1060                                msg->ptlm_dststamp,
1061                                peer->peer_myincarnation);
1062                         kptllnd_peer_decref(peer);
1063                         return NULL;
1064                 }
1065
1066                 /* Brand new connection attempt: remove old incarnation */
1067                 kptllnd_peer_close_locked(peer, 0);
1068         }
1069
1070         kptllnd_cull_peertable_locked(lpid);
1071
1072         write_unlock_irqrestore(g_lock, flags);
1073
1074         if (peer != NULL) {
1075                 CDEBUG(D_NET, "Peer %s (%s) reconnecting:"
1076                        " stamp "LPX64"("LPX64")\n",
1077                        libcfs_id2str(lpid), kptllnd_ptlid2str(initiator),
1078                        msg->ptlm_srcstamp, peer->peer_incarnation);
1079
1080                 kptllnd_peer_decref(peer);
1081         }
1082
1083         hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1084         if (hello_tx == NULL) {
1085                 CERROR("Unable to allocate HELLO message for %s\n",
1086                        libcfs_id2str(lpid));
1087                 return NULL;
1088         }
1089
1090         kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1091                          sizeof(kptl_hello_msg_t));
1092
1093         new_peer = kptllnd_peer_allocate(lpid, initiator);
1094         if (new_peer == NULL) {
1095                 kptllnd_tx_decref(hello_tx);
1096                 return NULL;
1097         }
1098
1099         rc = kptllnd_peer_reserve_buffers();
1100         if (rc != 0) {
1101                 kptllnd_peer_decref(new_peer);
1102                 kptllnd_tx_decref(hello_tx);
1103
1104                 CERROR("Failed to reserve buffers for %s\n",
1105                        libcfs_id2str(lpid));
1106                 return NULL;
1107         }
1108
1109         write_lock_irqsave(g_lock, flags);
1110
1111  again:
1112         if (kptllnd_data.kptl_shutdown) {
1113                 write_unlock_irqrestore(g_lock, flags);
1114
1115                 CERROR ("Shutdown started, refusing connection from %s\n",
1116                         libcfs_id2str(lpid));
1117                 kptllnd_peer_unreserve_buffers();
1118                 kptllnd_peer_decref(new_peer);
1119                 kptllnd_tx_decref(hello_tx);
1120                 return NULL;
1121         }
1122
1123         peer = kptllnd_id2peer_locked(lpid);
1124         if (peer != NULL) {
1125                 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
1126                         /* An outgoing message instantiated 'peer' for me */
1127                         LASSERT(peer->peer_incarnation == 0);
1128
1129                         peer->peer_state = PEER_STATE_ACTIVE;
1130                         peer->peer_incarnation = msg->ptlm_srcstamp;
1131                         peer->peer_next_matchbits = safe_matchbits;
1132                         peer->peer_max_msg_size =
1133                                 msg->ptlm_u.hello.kptlhm_max_msg_size;
1134
1135                         write_unlock_irqrestore(g_lock, flags);
1136
1137                         CWARN("Outgoing instantiated peer %s\n",
1138                               libcfs_id2str(lpid));
1139                 } else {
1140                         LASSERT (peer->peer_state == PEER_STATE_ACTIVE);
1141
1142                         write_unlock_irqrestore(g_lock, flags);
1143
1144                         /* WOW!  Somehow this peer completed the HELLO
1145                          * handshake while I slept.  I guess I could have slept
1146                          * while it rebooted and sent a new HELLO, so I'll fail
1147                          * this one... */
1148                         CWARN("Wow! peer %s\n", libcfs_id2str(lpid));
1149                         kptllnd_peer_decref(peer);
1150                         peer = NULL;
1151                 }
1152
1153                 kptllnd_peer_unreserve_buffers();
1154                 kptllnd_peer_decref(new_peer);
1155                 kptllnd_tx_decref(hello_tx);
1156                 return peer;
1157         }
1158
1159         if (kptllnd_data.kptl_n_active_peers ==
1160             kptllnd_data.kptl_expected_peers) {
1161                 /* peer table full */
1162                 write_unlock_irqrestore(g_lock, flags);
1163
1164                 kptllnd_peertable_overflow_msg("Connection from ", lpid);
1165
1166                 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1167                 if (rc != 0) {
1168                         CERROR("Refusing connection from %s\n",
1169                                libcfs_id2str(lpid));
1170                         kptllnd_peer_unreserve_buffers();
1171                         kptllnd_peer_decref(new_peer);
1172                         kptllnd_tx_decref(hello_tx);
1173                         return NULL;
1174                 }
1175                 
1176                 write_lock_irqsave(g_lock, flags);
1177                 kptllnd_data.kptl_expected_peers++;
1178                 goto again;
1179         }
1180
1181         last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(lpid);
1182
1183         hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1184         hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1185                 *kptllnd_tunables.kptl_max_msg_size;
1186
1187         new_peer->peer_state = PEER_STATE_ACTIVE;
1188         new_peer->peer_incarnation = msg->ptlm_srcstamp;
1189         new_peer->peer_next_matchbits = safe_matchbits;
1190         new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1191         new_peer->peer_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size;
1192
1193         kptllnd_peer_add_peertable_locked(new_peer);
1194
1195         write_unlock_irqrestore(g_lock, flags);
1196
1197         /* NB someone else could get in now and post a message before I post
1198          * the HELLO, but post_tx/check_sends take care of that! */
1199
1200         CDEBUG(D_NETTRACE, "%s: post response hello %p\n",
1201                libcfs_id2str(new_peer->peer_id), hello_tx);
1202
1203         kptllnd_post_tx(new_peer, hello_tx, 0);
1204         kptllnd_peer_check_sends(new_peer);
1205
1206         return new_peer;
1207 }
1208
1209 void
1210 kptllnd_tx_launch(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
1211 {
1212         kptllnd_post_tx(peer, tx, nfrag);
1213         kptllnd_peer_check_sends(peer);
1214 }
1215
1216 int
1217 kptllnd_find_target(kptl_peer_t **peerp, lnet_process_id_t target)
1218 {
1219         rwlock_t         *g_lock = &kptllnd_data.kptl_peer_rw_lock;
1220         ptl_process_id_t  ptl_id;
1221         kptl_peer_t      *new_peer;
1222         kptl_tx_t        *hello_tx;
1223         unsigned long     flags;
1224         int               rc;
1225         __u64             last_matchbits_seen;
1226
1227         /* I expect to find the peer, so I only take a read lock... */
1228         read_lock_irqsave(g_lock, flags);
1229         *peerp = kptllnd_id2peer_locked(target);
1230         read_unlock_irqrestore(g_lock, flags);
1231
1232         if (*peerp != NULL)
1233                 return 0;
1234         
1235         if ((target.pid & LNET_PID_USERFLAG) != 0) {
1236                 CWARN("Refusing to create a new connection to %s "
1237                       "(non-kernel peer)\n", libcfs_id2str(target));
1238                 return -EHOSTUNREACH;
1239         }
1240
1241         /* The new peer is a kernel ptllnd, and kernel ptllnds all have
1242          * the same portals PID */
1243         ptl_id.nid = kptllnd_lnet2ptlnid(target.nid);
1244         ptl_id.pid = kptllnd_data.kptl_portals_id.pid;
1245
1246         hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1247         if (hello_tx == NULL) {
1248                 CERROR("Unable to allocate connect message for %s\n",
1249                        libcfs_id2str(target));
1250                 return -ENOMEM;
1251         }
1252
1253         kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1254                          sizeof(kptl_hello_msg_t));
1255
1256         new_peer = kptllnd_peer_allocate(target, ptl_id);
1257         if (new_peer == NULL) {
1258                 rc = -ENOMEM;
1259                 goto unwind_0;
1260         }
1261
1262         rc = kptllnd_peer_reserve_buffers();
1263         if (rc != 0)
1264                 goto unwind_1;
1265
1266         write_lock_irqsave(g_lock, flags);
1267  again:
1268         if (kptllnd_data.kptl_shutdown) {
1269                 write_unlock_irqrestore(g_lock, flags);
1270                 rc = -ESHUTDOWN;
1271                 goto unwind_2;
1272         }
1273
1274         *peerp = kptllnd_id2peer_locked(target);
1275         if (*peerp != NULL) {
1276                 write_unlock_irqrestore(g_lock, flags);
1277                 goto unwind_2;
1278         }
1279
1280         kptllnd_cull_peertable_locked(target);
1281
1282         if (kptllnd_data.kptl_n_active_peers ==
1283             kptllnd_data.kptl_expected_peers) {
1284                 /* peer table full */
1285                 write_unlock_irqrestore(g_lock, flags);
1286
1287                 kptllnd_peertable_overflow_msg("Connection to ", target);
1288
1289                 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1290                 if (rc != 0) {
1291                         CERROR("Can't create connection to %s\n",
1292                                libcfs_id2str(target));
1293                         rc = -ENOMEM;
1294                         goto unwind_2;
1295                 }
1296                 write_lock_irqsave(g_lock, flags);
1297                 kptllnd_data.kptl_expected_peers++;
1298                 goto again;
1299         }
1300
1301         last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(target);
1302
1303         hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1304         hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1305                 *kptllnd_tunables.kptl_max_msg_size;
1306                 
1307         new_peer->peer_state = PEER_STATE_WAITING_HELLO;
1308         new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1309         
1310         kptllnd_peer_add_peertable_locked(new_peer);
1311
1312         write_unlock_irqrestore(g_lock, flags);
1313
1314         /* NB someone else could get in now and post a message before I post
1315          * the HELLO, but post_tx/check_sends take care of that! */
1316
1317         CDEBUG(D_NETTRACE, "%s: post initial hello %p\n",
1318                libcfs_id2str(new_peer->peer_id), hello_tx);
1319
1320         kptllnd_post_tx(new_peer, hello_tx, 0);
1321         kptllnd_peer_check_sends(new_peer);
1322        
1323         *peerp = new_peer;
1324         return 0;
1325         
1326  unwind_2:
1327         kptllnd_peer_unreserve_buffers();
1328  unwind_1:
1329         kptllnd_peer_decref(new_peer);
1330  unwind_0:
1331         kptllnd_tx_decref(hello_tx);
1332
1333         return rc;
1334 }