Whamcloud - gitweb
f8999e0b14f2e816de929f71b02223ec3b0b160f
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd_peer.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved.
5  *   Author: PJ Kirner <pjkirner@clusterfs.com>
6  *           E Barton <eeb@bartonsoftware.com>
7  *
8  *   This file is part of the Lustre file system, http://www.lustre.org
9  *   Lustre is a trademark of Cluster File Systems, Inc.
10  *
11  *   This file is confidential source code owned by Cluster File Systems.
12  *   No viewing, modification, compilation, redistribution, or any other
13  *   form of use is permitted except through a signed license agreement.
14  *
15  *   If you have not signed such an agreement, then you have no rights to
16  *   this file.  Please destroy it immediately and contact CFS.
17  *
18  */
19
20 #include "ptllnd.h"
21 #include <libcfs/list.h>
22
23 static int
24 kptllnd_count_queue(struct list_head *q)
25 {
26         struct list_head *e;
27         int               n = 0;
28         
29         list_for_each(e, q) {
30                 n++;
31         }
32
33         return n;
34 }
35
36 int
37 kptllnd_get_peer_info(int index, 
38                       lnet_process_id_t *id,
39                       int *state, int *sent_hello,
40                       int *refcount, __u64 *incarnation,
41                       __u64 *next_matchbits, __u64 *last_matchbits_seen,
42                       int *nsendq, int *nactiveq,
43                       int *credits, int *outstanding_credits) 
44 {
45         rwlock_t         *g_lock = &kptllnd_data.kptl_peer_rw_lock;
46         unsigned long     flags;
47         struct list_head *ptmp;
48         kptl_peer_t      *peer;
49         int               i;
50         int               rc = -ENOENT;
51
52         read_lock_irqsave(g_lock, flags);
53
54         for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++) {
55                 
56                 list_for_each (ptmp, &kptllnd_data.kptl_peers[i]) {
57                         peer = list_entry(ptmp, kptl_peer_t, peer_list);
58
59                         if (index-- > 0)
60                                 continue;
61                         
62                         *id          = peer->peer_id;
63                         *state       = peer->peer_state;
64                         *sent_hello  = peer->peer_sent_hello;
65                         *refcount    = atomic_read(&peer->peer_refcount);
66                         *incarnation = peer->peer_incarnation;
67
68                         spin_lock(&peer->peer_lock);
69
70                         *next_matchbits      = peer->peer_next_matchbits;
71                         *last_matchbits_seen = peer->peer_last_matchbits_seen;
72                         *credits             = peer->peer_credits;
73                         *outstanding_credits = peer->peer_outstanding_credits;
74
75                         *nsendq   = kptllnd_count_queue(&peer->peer_sendq);
76                         *nactiveq = kptllnd_count_queue(&peer->peer_activeq);
77
78                         spin_unlock(&peer->peer_lock);
79
80                         rc = 0;
81                         goto out;
82                 }
83         }
84         
85  out:
86         read_unlock_irqrestore(g_lock, flags);
87         return rc;
88 }
89
90 void
91 kptllnd_peer_add_peertable_locked (kptl_peer_t *peer)
92 {
93         LASSERT (kptllnd_data.kptl_n_active_peers <
94                  kptllnd_data.kptl_expected_peers);
95
96         LASSERT (peer->peer_state == PEER_STATE_WAITING_HELLO ||
97                  peer->peer_state == PEER_STATE_ACTIVE);
98         
99         kptllnd_data.kptl_n_active_peers++;
100         atomic_inc(&peer->peer_refcount);       /* +1 ref for the list */
101
102         /* NB add to HEAD of peer list for MRU order!
103          * (see kptllnd_cull_peertable) */
104         list_add(&peer->peer_list, kptllnd_nid2peerlist(peer->peer_id.nid));
105 }
106
107 void
108 kptllnd_cull_peertable_locked (lnet_process_id_t pid)
109 {
110         /* I'm about to add a new peer with this portals ID to the peer table,
111          * so (a) this peer should not exist already and (b) I want to leave at
112          * most (max_procs_per_nid - 1) peers with this NID in the table. */
113         struct list_head  *peers = kptllnd_nid2peerlist(pid.nid);
114         int                cull_count = *kptllnd_tunables.kptl_max_procs_per_node;
115         int                count;
116         struct list_head  *tmp;
117         struct list_head  *nxt;
118         kptl_peer_t       *peer;
119         
120         count = 0;
121         list_for_each_safe (tmp, nxt, peers) {
122                 /* NB I rely on kptllnd_peer_add_peertable_locked to add peers
123                  * in MRU order */
124                 peer = list_entry(tmp, kptl_peer_t, peer_list);
125                         
126                 if (peer->peer_id.nid != pid.nid)
127                         continue;
128
129                 LASSERT (peer->peer_id.pid != pid.pid);
130                         
131                 count++;
132
133                 if (count < cull_count) /* recent (don't cull) */
134                         continue;
135
136                 CDEBUG(D_NET, "Cull %s(%s)\n",
137                        libcfs_id2str(peer->peer_id),
138                        kptllnd_ptlid2str(peer->peer_ptlid));
139                 
140                 kptllnd_peer_close_locked(peer, 0);
141         }
142 }
143
144 kptl_peer_t *
145 kptllnd_peer_allocate (lnet_process_id_t lpid, ptl_process_id_t ppid)
146 {
147         unsigned long    flags;
148         kptl_peer_t     *peer;
149
150         LIBCFS_ALLOC(peer, sizeof (*peer));
151         if (peer == NULL) {
152                 CERROR("Can't create peer %s (%s)\n",
153                        libcfs_id2str(lpid), 
154                        kptllnd_ptlid2str(ppid));
155                 return NULL;
156         }
157
158         memset(peer, 0, sizeof(*peer));         /* zero flags etc */
159
160         INIT_LIST_HEAD (&peer->peer_sendq);
161         INIT_LIST_HEAD (&peer->peer_activeq);
162         spin_lock_init (&peer->peer_lock);
163
164         peer->peer_state = PEER_STATE_ALLOCATED;
165         peer->peer_error = 0;
166         peer->peer_last_alive = cfs_time_current();
167         peer->peer_id = lpid;
168         peer->peer_ptlid = ppid;
169         peer->peer_credits = 1;                 /* enough for HELLO */
170         peer->peer_next_matchbits = PTL_RESERVED_MATCHBITS;
171         peer->peer_outstanding_credits = *kptllnd_tunables.kptl_peercredits - 1;
172
173         atomic_set(&peer->peer_refcount, 1);    /* 1 ref for caller */
174
175         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
176
177         /* Only increase # peers under lock, to guarantee we dont grow it
178          * during shutdown */
179         if (kptllnd_data.kptl_shutdown) {
180                 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, 
181                                         flags);
182                 LIBCFS_FREE(peer, sizeof(*peer));
183                 return NULL;
184         }
185
186         kptllnd_data.kptl_npeers++;
187         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
188         
189         return peer;
190 }
191
192 void
193 kptllnd_peer_destroy (kptl_peer_t *peer)
194 {
195         unsigned long flags;
196         
197         CDEBUG(D_NET, "Peer=%p\n", peer);
198
199         LASSERT (!in_interrupt());
200         LASSERT (atomic_read(&peer->peer_refcount) == 0);
201         LASSERT (peer->peer_state == PEER_STATE_ALLOCATED ||
202                  peer->peer_state == PEER_STATE_ZOMBIE);
203         LASSERT (list_empty(&peer->peer_sendq));
204         LASSERT (list_empty(&peer->peer_activeq));
205
206         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
207
208         if (peer->peer_state == PEER_STATE_ZOMBIE)
209                 list_del(&peer->peer_list);
210
211         kptllnd_data.kptl_npeers--;
212
213         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
214
215         LIBCFS_FREE (peer, sizeof (*peer));
216 }
217
218 void
219 kptllnd_peer_cancel_txs(kptl_peer_t *peer)
220 {
221         struct list_head   sendq;
222         struct list_head   activeq;
223         struct list_head  *tmp;
224         struct list_head  *nxt;
225         kptl_tx_t         *tx;
226         unsigned long      flags;
227
228         /* atomically grab all the peer's tx-es... */
229
230         spin_lock_irqsave(&peer->peer_lock, flags);
231
232         list_add(&sendq, &peer->peer_sendq);
233         list_del_init(&peer->peer_sendq);
234         list_for_each (tmp, &sendq) {
235                 tx = list_entry(tmp, kptl_tx_t, tx_list);
236                 tx->tx_active = 0;
237         }
238
239         list_add(&activeq, &peer->peer_activeq);
240         list_del_init(&peer->peer_activeq);
241         list_for_each (tmp, &activeq) {
242                 tx = list_entry(tmp, kptl_tx_t, tx_list);
243                 tx->tx_active = 0;
244         }
245
246         spin_unlock_irqrestore(&peer->peer_lock, flags);
247
248         /* ...then drop the peer's ref on them at leasure.  This will get
249          * kptllnd_tx_fini() to abort outstanding comms if necessary. */
250
251         list_for_each_safe (tmp, nxt, &sendq) {
252                 tx = list_entry(tmp, kptl_tx_t, tx_list);
253                 list_del(&tx->tx_list);
254                 tx->tx_status = -EIO;
255                 kptllnd_tx_decref(tx);
256         }
257
258         list_for_each_safe (tmp, nxt, &activeq) {
259                 tx = list_entry(tmp, kptl_tx_t, tx_list);
260                 list_del(&tx->tx_list);
261                 tx->tx_status = -EIO;
262                 kptllnd_tx_decref(tx);
263         }
264 }
265
266 void
267 kptllnd_peer_alive (kptl_peer_t *peer)
268 {
269         /* This is racy, but everyone's only writing cfs_time_current() */
270         peer->peer_last_alive = cfs_time_current();
271         mb();
272 }
273
274 void
275 kptllnd_peer_notify (kptl_peer_t *peer)
276 {
277         unsigned long flags;
278         time_t        last_alive = 0;
279         int           error = 0;
280         
281         spin_lock_irqsave(&peer->peer_lock, flags);
282
283         if (peer->peer_error != 0) {
284                 error = peer->peer_error;
285                 peer->peer_error = 0;
286                 
287                 last_alive = cfs_time_current_sec() - 
288                              cfs_duration_sec(cfs_time_current() - 
289                                               peer->peer_last_alive);
290         }
291         
292         spin_unlock_irqrestore(&peer->peer_lock, flags);
293
294         if (error != 0)
295                 lnet_notify (kptllnd_data.kptl_ni, peer->peer_id.nid, 0,
296                              last_alive);
297 }
298
299 void
300 kptllnd_handle_closing_peers ()
301 {
302         unsigned long           flags;
303         kptl_peer_t            *peer;
304         struct list_head       *tmp;
305         struct list_head       *nxt;
306         int                     idle;
307
308         /* Check with a read lock first to avoid blocking anyone */
309
310         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
311         idle = list_empty(&kptllnd_data.kptl_closing_peers);
312         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
313
314         if (idle)
315                 return;
316
317         /* Scan the closing peers and cancel their txs.
318          * NB only safe while there is only a single watchdog */
319
320         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
321
322         list_for_each_safe (tmp, nxt, &kptllnd_data.kptl_closing_peers) {
323                 peer = list_entry (tmp, kptl_peer_t, peer_list);
324
325                 LASSERT (peer->peer_state == PEER_STATE_CLOSING);
326
327                 list_del(&peer->peer_list);
328                 list_add_tail(&peer->peer_list,
329                               &kptllnd_data.kptl_zombie_peers);
330                 peer->peer_state = PEER_STATE_ZOMBIE;
331
332                 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
333
334                 kptllnd_peer_notify(peer);
335                 kptllnd_peer_cancel_txs(peer);
336                 kptllnd_peer_decref(peer);
337
338                 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
339         }
340
341         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
342 }
343
344 void
345 kptllnd_peer_close_locked(kptl_peer_t *peer, int why)
346 {
347         switch (peer->peer_state) {
348         default:
349                 LBUG();
350
351         case PEER_STATE_WAITING_HELLO:
352         case PEER_STATE_ACTIVE:
353                 /* Removing from peer table */
354                 kptllnd_data.kptl_n_active_peers--;
355                 LASSERT (kptllnd_data.kptl_n_active_peers >= 0);
356
357                 list_del(&peer->peer_list);
358                 kptllnd_peer_unreserve_buffers();
359
360                 peer->peer_error = why; /* stash 'why' only on first close */
361
362                 /* Schedule for immediate attention, taking peer table's ref */
363                 list_add_tail(&peer->peer_list, 
364                               &kptllnd_data.kptl_closing_peers);
365                 wake_up(&kptllnd_data.kptl_watchdog_waitq);
366                 break;
367
368         case PEER_STATE_ZOMBIE:
369                 /* Schedule for attention at next timeout */
370                 kptllnd_peer_addref(peer);
371                 list_del(&peer->peer_list);
372                 list_add_tail(&peer->peer_list, 
373                               &kptllnd_data.kptl_closing_peers);
374                 break;
375                 
376         case PEER_STATE_CLOSING:
377                 break;
378         }
379
380         peer->peer_state = PEER_STATE_CLOSING;
381 }
382
383 void
384 kptllnd_peer_close(kptl_peer_t *peer, int why)
385 {
386         unsigned long      flags;
387
388         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
389         kptllnd_peer_close_locked(peer, why);
390         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
391 }
392
393 int
394 kptllnd_peer_del(lnet_process_id_t id)
395 {
396         struct list_head  *ptmp;
397         struct list_head  *pnxt;
398         kptl_peer_t       *peer;
399         int                lo;
400         int                hi;
401         int                i;
402         unsigned long      flags;
403         int                rc = -ENOENT;
404
405         /*
406          * Find the single bucket we are supposed to look at or if nid is a
407          * wildcard (LNET_NID_ANY) then look at all of the buckets
408          */
409         if (id.nid != LNET_NID_ANY) {
410                 struct list_head *l = kptllnd_nid2peerlist(id.nid);
411                 
412                 lo = hi =  l - kptllnd_data.kptl_peers;
413         } else {
414                 if (id.pid != LNET_PID_ANY)
415                         return -EINVAL;
416                 
417                 lo = 0;
418                 hi = kptllnd_data.kptl_peer_hash_size - 1;
419         }
420
421 again:
422         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
423
424         for (i = lo; i <= hi; i++) {
425                 list_for_each_safe (ptmp, pnxt, &kptllnd_data.kptl_peers[i]) {
426                         peer = list_entry (ptmp, kptl_peer_t, peer_list);
427
428                         if (!(id.nid == LNET_NID_ANY || 
429                               (peer->peer_id.nid == id.nid &&
430                                (id.pid == LNET_PID_ANY || 
431                                 peer->peer_id.pid == id.pid))))
432                                 continue;
433
434                         kptllnd_peer_addref(peer); /* 1 ref for me... */
435
436                         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
437                                                flags);
438
439                         kptllnd_peer_close(peer, 0);
440                         kptllnd_peer_decref(peer); /* ...until here */
441
442                         rc = 0;         /* matched something */
443
444                         /* start again now I've dropped the lock */
445                         goto again;
446                 }
447         }
448
449         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
450
451         return (rc);
452 }
453
454 void
455 kptllnd_post_tx(kptl_peer_t *peer, kptl_tx_t *tx)
456 {
457         /* CAVEAT EMPTOR: I take over caller's ref on 'tx' */
458         ptl_handle_md_t  rdma_mdh = PTL_INVALID_HANDLE;
459         ptl_handle_md_t  msg_mdh = PTL_INVALID_HANDLE;
460         ptl_handle_me_t  meh;
461         ptl_md_t         md;
462         ptl_err_t        prc;
463         unsigned long    flags;
464
465         LASSERT (!tx->tx_idle);
466         LASSERT (!tx->tx_active);
467         LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
468         LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
469         LASSERT (tx->tx_type == TX_TYPE_SMALL_MESSAGE ||
470                  tx->tx_type == TX_TYPE_PUT_REQUEST ||
471                  tx->tx_type == TX_TYPE_GET_REQUEST);
472
473         kptllnd_set_tx_peer(tx, peer);
474
475         if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
476             tx->tx_type == TX_TYPE_GET_REQUEST) {
477
478                 spin_lock_irqsave(&peer->peer_lock, flags);
479
480                 /* Assume 64-bit matchbits can't wrap */
481                 LASSERT (peer->peer_next_matchbits >= PTL_RESERVED_MATCHBITS);
482                 tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits =
483                         peer->peer_next_matchbits++;
484                         
485                 spin_unlock_irqrestore(&peer->peer_lock, flags);
486
487                 prc = PtlMEAttach(kptllnd_data.kptl_nih,
488                                   *kptllnd_tunables.kptl_portal,
489                                   peer->peer_ptlid,
490                                   tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits,
491                                   0,             /* ignore bits */
492                                   PTL_UNLINK,
493                                   PTL_INS_BEFORE,
494                                   &meh);
495                 if (prc != PTL_OK) {
496                         CERROR("PtlMEAttach(%s) failed: %d\n",
497                                libcfs_id2str(peer->peer_id), prc);
498                         goto failed;
499                 }
500
501                 prc = PtlMDAttach(meh, tx->tx_rdma_md, PTL_UNLINK, &rdma_mdh);
502                 if (prc != PTL_OK) {
503                         CERROR("PtlMDAttach(%s) failed: %d\n",
504                                libcfs_id2str(tx->tx_peer->peer_id), prc);
505                         prc = PtlMEUnlink(meh);
506                         LASSERT(prc == PTL_OK);
507                         rdma_mdh = PTL_INVALID_HANDLE;
508                         goto failed;
509                 }
510
511                 /* I'm not racing with the event callback here.  It's a bug if
512                  * there's an event on the MD I just attached before I actually
513                  * send the RDMA request message which the event callback
514                  * catches by asserting 'rdma_mdh' is valid. */
515         }
516
517         memset(&md, 0, sizeof(md));
518         
519         md.start = tx->tx_msg;
520         md.length = tx->tx_msg->ptlm_nob;
521         md.threshold = 1;
522         md.options = PTL_MD_OP_PUT |
523                      PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
524                      PTL_MD_EVENT_START_DISABLE;
525         md.user_ptr = &tx->tx_msg_eventarg;
526         md.eq_handle = kptllnd_data.kptl_eqh;
527
528         prc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &msg_mdh);
529         if (prc != PTL_OK) {
530                 msg_mdh = PTL_INVALID_HANDLE;
531                 goto failed;
532         }
533         
534         spin_lock_irqsave(&peer->peer_lock, flags);
535
536         tx->tx_deadline = jiffies + (*kptllnd_tunables.kptl_timeout * HZ);
537         tx->tx_active = 1;
538         tx->tx_rdma_mdh = rdma_mdh;
539         tx->tx_msg_mdh = msg_mdh;
540
541         /* Ensure HELLO is sent first */
542         if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_HELLO)
543                 list_add(&tx->tx_list, &peer->peer_sendq);
544         else
545                 list_add_tail(&tx->tx_list, &peer->peer_sendq);
546
547         spin_unlock_irqrestore(&peer->peer_lock, flags);
548         return;
549         
550  failed:
551         spin_lock_irqsave(&peer->peer_lock, flags);
552
553         tx->tx_status = -EIO;
554         tx->tx_rdma_mdh = rdma_mdh;
555         tx->tx_msg_mdh = msg_mdh;
556
557         spin_unlock_irqrestore(&peer->peer_lock, flags);
558
559         kptllnd_tx_decref(tx);
560 }
561
562 void
563 kptllnd_peer_check_sends (kptl_peer_t *peer)
564 {
565
566         kptl_tx_t       *tx;
567         int              rc;
568         unsigned long    flags;
569
570         LASSERT(!in_interrupt());
571
572         spin_lock_irqsave(&peer->peer_lock, flags);
573
574         if (list_empty(&peer->peer_sendq) &&
575             peer->peer_outstanding_credits >= PTLLND_CREDIT_HIGHWATER &&
576             peer->peer_credits != 0) {
577
578                 /* post a NOOP to return credits */
579                 spin_unlock_irqrestore(&peer->peer_lock, flags);
580
581                 tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
582                 if (tx == NULL) {
583                         CERROR("Can't return credits to %s: can't allocate descriptor\n",
584                                libcfs_id2str(peer->peer_id));
585                 } else {
586                         kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_NOOP, 0);
587                         kptllnd_post_tx(peer, tx);
588                 }
589
590                 spin_lock_irqsave(&peer->peer_lock, flags);
591         }
592
593         while (!list_empty(&peer->peer_sendq)) {
594                 tx = list_entry (peer->peer_sendq.next, kptl_tx_t, tx_list);
595
596                 LASSERT (tx->tx_active);
597                 LASSERT (!PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
598                 LASSERT (tx->tx_type == TX_TYPE_SMALL_MESSAGE ||
599                          !PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
600
601                 LASSERT (peer->peer_outstanding_credits >= 0);
602                 LASSERT (peer->peer_outstanding_credits <= 
603                          *kptllnd_tunables.kptl_peercredits);
604                 LASSERT (peer->peer_credits >= 0);
605                 LASSERT (peer->peer_credits <= 
606                          *kptllnd_tunables.kptl_peercredits);
607
608                 /* Ensure HELLO is sent first */
609                 if (!peer->peer_sent_hello) {
610                         if (tx->tx_msg->ptlm_type != PTLLND_MSG_TYPE_HELLO)
611                                 break;
612                         peer->peer_sent_hello = 1;
613                 }
614
615                 if (peer->peer_credits == 0) {
616                         CDEBUG(D_NETTRACE, "%s[%d/%d]: no credits for %p\n",
617                                libcfs_id2str(peer->peer_id),
618                                peer->peer_credits, peer->peer_outstanding_credits, tx);
619                         break;
620                 }
621
622                 /* Don't use the last credit unless I've got credits to
623                  * return */
624                 if (peer->peer_credits == 1 &&
625                     peer->peer_outstanding_credits == 0) {
626                         CDEBUG(D_NETTRACE, "%s[%d/%d]: not using last credit for %p\n",
627                                libcfs_id2str(peer->peer_id),
628                                peer->peer_credits, peer->peer_outstanding_credits, tx);
629                         break;
630                 }
631
632                 list_del(&tx->tx_list);
633
634                 /* Discard any NOOP I queued if I'm not at the high-water mark
635                  * any more or more messages have been queued */
636                 if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_NOOP &&
637                     (!list_empty(&peer->peer_sendq) ||
638                      peer->peer_outstanding_credits < PTLLND_CREDIT_HIGHWATER)) {
639
640                         tx->tx_active = 0;
641
642                         spin_unlock_irqrestore(&peer->peer_lock, flags);
643
644                         CDEBUG(D_NET, "%s: redundant noop\n", 
645                                libcfs_id2str(peer->peer_id));
646                         kptllnd_tx_decref(tx);
647
648                         spin_lock_irqsave(&peer->peer_lock, flags);
649                         continue;
650                 }
651
652                 /* fill last-minute msg header fields */
653                 kptllnd_msg_pack(tx->tx_msg, peer);
654
655                 peer->peer_outstanding_credits = 0;
656                 peer->peer_credits--;
657
658                 CDEBUG(D_NETTRACE, "%s[%d/%d]: %s tx=%p nob=%d cred=%d\n",
659                        libcfs_id2str(peer->peer_id),
660                        peer->peer_credits, peer->peer_outstanding_credits,
661                        kptllnd_msgtype2str(tx->tx_msg->ptlm_type),
662                        tx, tx->tx_msg->ptlm_nob,
663                        tx->tx_msg->ptlm_credits);
664
665                 list_add_tail(&tx->tx_list, &peer->peer_activeq);
666
667                 kptllnd_tx_addref(tx);          /* 1 ref for me... */
668
669                 spin_unlock_irqrestore(&peer->peer_lock, flags);
670
671                 rc = PtlPut (tx->tx_msg_mdh,
672                              PTL_NOACK_REQ,
673                              peer->peer_ptlid,
674                              *kptllnd_tunables.kptl_portal,
675                              0,                 /* acl cookie */
676                              LNET_MSG_MATCHBITS,
677                              0,                 /* offset */
678                              0);                /* header data */
679                 if (rc != PTL_OK) {
680                         CERROR("PtlPut %s error %d\n",
681                                libcfs_id2str(peer->peer_id), rc);
682
683                         /* Nuke everything (including this tx) */
684                         kptllnd_peer_close(peer, -EIO);
685                         return;
686                 }
687
688                 kptllnd_tx_decref(tx);          /* drop my ref */
689
690                 spin_lock_irqsave(&peer->peer_lock, flags);
691         }
692
693         spin_unlock_irqrestore(&peer->peer_lock, flags);
694 }
695
696 kptl_tx_t *
697 kptllnd_find_timed_out_tx(kptl_peer_t *peer)
698 {
699         kptl_tx_t         *tx;
700         struct list_head  *tmp;
701         unsigned long      flags;
702
703         spin_lock_irqsave(&peer->peer_lock, flags);
704
705         list_for_each(tmp, &peer->peer_sendq) {
706                 tx = list_entry(peer->peer_sendq.next, kptl_tx_t, tx_list);
707
708                 if (time_after_eq(jiffies, tx->tx_deadline)) {
709                         kptllnd_tx_addref(tx);
710                         spin_unlock_irqrestore(&peer->peer_lock, flags);
711                         return tx;
712                 }
713         }
714
715         list_for_each(tmp, &peer->peer_activeq) {
716                 tx = list_entry(peer->peer_activeq.next, kptl_tx_t, tx_list);
717
718                 if (time_after_eq(jiffies, tx->tx_deadline)) {
719                         kptllnd_tx_addref(tx);
720                         spin_unlock_irqrestore(&peer->peer_lock, flags);
721                         return tx;
722                 }
723         }
724
725         spin_unlock_irqrestore(&peer->peer_lock, flags);
726         return NULL;
727 }
728
729
730 void
731 kptllnd_peer_check_bucket (int idx)
732 {
733         struct list_head  *peers = &kptllnd_data.kptl_peers[idx];
734         struct list_head  *ptmp;
735         kptl_peer_t       *peer;
736         kptl_tx_t         *tx;
737         unsigned long      flags;
738         int                nsend;
739         int                nactive;
740
741         CDEBUG(D_NET, "Bucket=%d\n", idx);
742
743  again:
744         /* NB. Shared lock while I just look */
745         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
746
747         list_for_each (ptmp, peers) {
748                 peer = list_entry (ptmp, kptl_peer_t, peer_list);
749
750                 CDEBUG(D_NET, "Peer=%s Credits=%d Outstanding=%d\n",
751                        libcfs_id2str(peer->peer_id),
752                        peer->peer_credits, peer->peer_outstanding_credits);
753
754                 /* In case we have enough credits to return via a
755                  * NOOP, but there were no non-blocking tx descs
756                  * free to do it last time... */
757                 kptllnd_peer_check_sends(peer);
758
759                 tx = kptllnd_find_timed_out_tx(peer);
760                 if (tx == NULL)
761                         continue;
762
763                 kptllnd_peer_addref(peer); /* 1 ref for me... */
764
765                 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
766                                        flags);
767
768                 spin_lock_irqsave(&peer->peer_lock, flags);
769                 nsend = kptllnd_count_queue(&peer->peer_sendq);
770                 nactive = kptllnd_count_queue(&peer->peer_activeq);
771                 spin_unlock_irqrestore(&peer->peer_lock, flags);
772
773                 LCONSOLE_ERROR("Timing out %s: please check Portals\n",
774                                libcfs_id2str(peer->peer_id));
775
776                 CERROR("%s timed out: cred %d outstanding %d sendq %d "
777                        "activeq %d Tx %s (%s%s%s) status %d T/O %ds\n",
778                        libcfs_id2str(peer->peer_id),
779                        peer->peer_credits, peer->peer_outstanding_credits,
780                        nsend, nactive, kptllnd_tx_typestr(tx->tx_type),
781                        tx->tx_active ? "A" : "",
782                        PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ?
783                        "" : "M",
784                        PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE) ?
785                        "" : "D",
786                        tx->tx_status, *kptllnd_tunables.kptl_timeout);
787
788                 kptllnd_dump_ptltrace();
789
790                 kptllnd_tx_decref(tx);
791
792                 kptllnd_peer_close(peer, -ETIMEDOUT);
793                 kptllnd_peer_decref(peer); /* ...until here */
794
795                 /* start again now I've dropped the lock */
796                 goto again;
797         }
798
799         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
800 }
801
802 kptl_peer_t *
803 kptllnd_id2peer_locked (lnet_process_id_t id)
804 {
805         struct list_head *peers = kptllnd_nid2peerlist(id.nid);
806         struct list_head *tmp;
807         kptl_peer_t      *peer;
808
809         list_for_each (tmp, peers) {
810
811                 peer = list_entry (tmp, kptl_peer_t, peer_list);
812
813                 LASSERT(peer->peer_state == PEER_STATE_WAITING_HELLO ||
814                         peer->peer_state == PEER_STATE_ACTIVE);
815                 
816                 if (peer->peer_id.nid != id.nid ||
817                     peer->peer_id.pid != id.pid)
818                         continue;
819
820                 kptllnd_peer_addref(peer);
821
822                 CDEBUG(D_NET, "%s -> %s (%d)\n",
823                        libcfs_id2str(id), 
824                        kptllnd_ptlid2str(peer->peer_ptlid),
825                        atomic_read (&peer->peer_refcount));
826                 return peer;
827         }
828
829         return NULL;
830 }
831
832 void
833 kptllnd_peertable_overflow_msg(char *str, lnet_process_id_t id)
834 {
835         LCONSOLE_ERROR("%s %s overflows the peer table[%d]: "
836                        "messages may be dropped\n",
837                        str, libcfs_id2str(id),
838                        kptllnd_data.kptl_n_active_peers);
839         LCONSOLE_ERROR("Please correct by increasing "
840                        "'max_nodes' or 'max_procs_per_node'\n");
841 }
842
843 __u64
844 kptllnd_get_last_seen_matchbits_locked(lnet_process_id_t lpid)
845 {
846         kptl_peer_t            *peer;
847         struct list_head       *tmp;
848
849         /* Find the last matchbits I saw this new peer using.  Note..
850            A. This peer cannot be in the peer table - she's new!
851            B. If I can't find the peer in the closing/zombie peers, all
852               matchbits are safe because all refs to the (old) peer have gone
853               so all txs have completed so there's no risk of matchbit
854               collision!
855          */
856
857         LASSERT(kptllnd_id2peer_locked(lpid) == NULL);
858
859         /* peer's last matchbits can't change after it comes out of the peer
860          * table, so first match is fine */
861
862         list_for_each (tmp, &kptllnd_data.kptl_closing_peers) {
863                 peer = list_entry (tmp, kptl_peer_t, peer_list);
864
865                 if (peer->peer_id.nid == lpid.nid &&
866                     peer->peer_id.pid == lpid.pid)
867                         return peer->peer_last_matchbits_seen;
868         }
869         
870         list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
871                 peer = list_entry (tmp, kptl_peer_t, peer_list);
872
873                 if (peer->peer_id.nid == lpid.nid &&
874                     peer->peer_id.pid == lpid.pid)
875                         return peer->peer_last_matchbits_seen;
876         }
877         
878         return PTL_RESERVED_MATCHBITS;
879 }
880
881 kptl_peer_t *
882 kptllnd_peer_handle_hello (ptl_process_id_t  initiator,
883                            kptl_msg_t       *msg)
884 {
885         rwlock_t           *g_lock = &kptllnd_data.kptl_peer_rw_lock;
886         kptl_peer_t        *peer;
887         kptl_peer_t        *new_peer;
888         lnet_process_id_t   lpid;
889         unsigned long       flags;
890         kptl_tx_t          *hello_tx;
891         int                 rc;
892         __u64               safe_matchbits;
893         __u64               last_matchbits_seen;
894
895         lpid.nid = msg->ptlm_srcnid;
896         lpid.pid = msg->ptlm_srcpid;
897
898         CDEBUG(D_NET, "hello from %s(%s)\n",
899                libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
900
901         if (initiator.pid != kptllnd_data.kptl_portals_id.pid &&
902             (msg->ptlm_srcpid & LNET_PID_USERFLAG) == 0) {
903                 /* If the peer's PID isn't _the_ ptllnd kernel pid, she must be
904                  * userspace.  Refuse the connection if she hasn't set the
905                  * correct flag in her PID... */
906                 CERROR("Userflag not set in hello from %s (%s)\n",
907                        libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
908                 return NULL;
909         }
910         
911         /* kptlhm_matchbits are the highest matchbits my peer may have used to
912          * RDMA to me.  I ensure I never register buffers for RDMA that could
913          * match any she used */
914         safe_matchbits = msg->ptlm_u.hello.kptlhm_matchbits + 1;
915
916         if (safe_matchbits < PTL_RESERVED_MATCHBITS) {
917                 CERROR("Illegal matchbits "LPX64" in HELLO from %s\n",
918                        safe_matchbits, libcfs_id2str(lpid));
919                 return NULL;
920         }
921         
922         if (msg->ptlm_u.hello.kptlhm_max_msg_size !=
923             *kptllnd_tunables.kptl_max_msg_size) {
924                 CERROR("max message size MUST be equal for all peers: "
925                        "got %d expected %d from %s\n",
926                        msg->ptlm_u.hello.kptlhm_max_msg_size,
927                        *kptllnd_tunables.kptl_max_msg_size,
928                        libcfs_id2str(lpid));
929                 return NULL;
930         }
931
932         if (msg->ptlm_credits + 1 != *kptllnd_tunables.kptl_peercredits) {
933                 CERROR("peercredits MUST be equal on all peers: "
934                        "got %d expected %d from %s\n",
935                        msg->ptlm_credits + 1,
936                        *kptllnd_tunables.kptl_peercredits,
937                        libcfs_id2str(lpid));
938                 return NULL;
939         }
940         
941         write_lock_irqsave(g_lock, flags);
942
943         peer = kptllnd_id2peer_locked(lpid);
944         if (peer != NULL) {
945                 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
946                         /* Completing HELLO handshake */
947                         LASSERT(peer->peer_incarnation == 0);
948
949                         peer->peer_state = PEER_STATE_ACTIVE;
950                         peer->peer_incarnation = msg->ptlm_srcstamp;
951                         peer->peer_next_matchbits = safe_matchbits;
952
953                         write_unlock_irqrestore(g_lock, flags);
954                         return peer;
955                 }
956
957                 /* remove old incarnation of this peer */
958                 kptllnd_peer_close_locked(peer, 0);
959         }
960
961         kptllnd_cull_peertable_locked(lpid);
962
963         write_unlock_irqrestore(g_lock, flags);
964
965         if (peer != NULL) {
966                 CDEBUG(D_NET, "Peer %s (%s) reconnecting:"
967                        " stamp "LPX64"("LPX64")\n",
968                        libcfs_id2str(lpid), kptllnd_ptlid2str(initiator),
969                        msg->ptlm_srcstamp, peer->peer_incarnation);
970
971                 kptllnd_peer_decref(peer);
972         }
973
974         hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
975         if (hello_tx == NULL) {
976                 CERROR("Unable to allocate HELLO message for %s\n",
977                        libcfs_id2str(lpid));
978                 return NULL;
979         }
980
981         kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
982                          sizeof(kptl_hello_msg_t));
983
984         new_peer = kptllnd_peer_allocate(lpid, initiator);
985         if (new_peer == NULL) {
986                 kptllnd_tx_decref(hello_tx);
987                 return NULL;
988         }
989
990         rc = kptllnd_peer_reserve_buffers();
991         if (rc != 0) {
992                 kptllnd_peer_decref(new_peer);
993                 kptllnd_tx_decref(hello_tx);
994
995                 CERROR("Failed to reserve buffers for %s\n",
996                        libcfs_id2str(lpid));
997                 return NULL;
998         }
999
1000         write_lock_irqsave(g_lock, flags);
1001
1002         peer = kptllnd_id2peer_locked(lpid);
1003         if (peer != NULL) {
1004                 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
1005                         /* An outgoing message instantiated 'peer' for me and
1006                          * presumably provoked this reply */
1007                         CWARN("Outgoing instantiated peer %s\n", libcfs_id2str(lpid));
1008                         LASSERT(peer->peer_incarnation == 0);
1009
1010                         peer->peer_state = PEER_STATE_ACTIVE;
1011                         peer->peer_incarnation = msg->ptlm_srcstamp;
1012                         peer->peer_next_matchbits = safe_matchbits;
1013                 } else {
1014                         LASSERT (peer->peer_state == PEER_STATE_ACTIVE);
1015                         /* WOW!  Somehow this peer completed the HELLO
1016                          * handshake while I slept.  I guess I could have slept
1017                          * while it rebooted and sent a new HELLO, so I'll fail
1018                          * this one... */
1019                         CWARN("Wow! peer %s\n", libcfs_id2str(lpid));
1020                         kptllnd_peer_decref(peer);
1021                         peer = NULL;
1022                 }
1023                 
1024                 write_unlock_irqrestore(g_lock, flags);
1025
1026                 kptllnd_peer_unreserve_buffers();
1027                 kptllnd_peer_decref(new_peer);
1028                 kptllnd_tx_decref(hello_tx);
1029                 return peer;
1030         }
1031
1032         if (kptllnd_data.kptl_n_active_peers ==
1033             kptllnd_data.kptl_expected_peers) {
1034                 /* peer table full */
1035                 write_unlock_irqrestore(g_lock, flags);
1036
1037                 kptllnd_peertable_overflow_msg("Connection from ", lpid);
1038
1039                 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1040                 if (rc != 0) {
1041                         CERROR("Refusing connection from %s\n",
1042                                libcfs_id2str(lpid));
1043                         kptllnd_peer_unreserve_buffers();
1044                         kptllnd_peer_decref(new_peer);
1045                         kptllnd_tx_decref(hello_tx);
1046                         return NULL;
1047                 }
1048                 
1049                 write_lock_irqsave(g_lock, flags);
1050                 kptllnd_data.kptl_expected_peers++;
1051         }
1052
1053         last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(lpid);
1054
1055         hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1056         hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1057                 *kptllnd_tunables.kptl_max_msg_size;
1058
1059         new_peer->peer_state = PEER_STATE_ACTIVE;
1060         new_peer->peer_incarnation = msg->ptlm_srcstamp;
1061         new_peer->peer_next_matchbits = safe_matchbits;
1062         new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1063
1064         kptllnd_peer_add_peertable_locked(new_peer);
1065
1066         write_unlock_irqrestore(g_lock, flags);
1067
1068         /* NB someone else could get in now and post a message before I post
1069          * the HELLO, but post_tx/check_sends take care of that! */
1070
1071         CDEBUG(D_NETTRACE, "%s: post response hello %p\n",
1072                libcfs_id2str(new_peer->peer_id), hello_tx);
1073
1074         kptllnd_post_tx(new_peer, hello_tx);
1075         kptllnd_peer_check_sends(new_peer);
1076
1077         return new_peer;
1078 }
1079
1080 void
1081 kptllnd_tx_launch(kptl_tx_t *tx, lnet_process_id_t target)
1082 {
1083         rwlock_t         *g_lock = &kptllnd_data.kptl_peer_rw_lock;
1084         ptl_process_id_t  ptl_id;
1085         kptl_peer_t      *peer;
1086         kptl_peer_t      *new_peer = NULL;
1087         kptl_tx_t        *hello_tx = NULL;
1088         unsigned long     flags;
1089         int               rc;
1090         __u64             last_matchbits_seen;
1091
1092         LASSERT (tx->tx_lnet_msg != NULL);
1093         LASSERT (tx->tx_peer == NULL);
1094
1095         /* I expect to find the peer, so I only take a read lock... */
1096         read_lock_irqsave(g_lock, flags);
1097         peer = kptllnd_id2peer_locked(target);
1098         read_unlock_irqrestore(g_lock, flags);
1099
1100         if (peer != NULL) {
1101                 goto post;
1102         }
1103         
1104         if ((target.pid & LNET_PID_USERFLAG) != 0) {
1105                 CWARN("Refusing to create a new connection to %s "
1106                       "(non-kernel peer)\n", libcfs_id2str(target));
1107                 tx->tx_status = -EHOSTUNREACH;
1108                 goto failed;
1109         }
1110
1111         /* The new peer is a kernel ptllnd, and kernel ptllnds all have
1112          * the same portals PID */
1113         ptl_id.nid = kptllnd_lnet2ptlnid(target.nid);
1114         ptl_id.pid = kptllnd_data.kptl_portals_id.pid;
1115
1116         write_lock_irqsave(g_lock, flags);
1117
1118         peer = kptllnd_id2peer_locked(target);
1119         if (peer != NULL) {
1120                 write_unlock_irqrestore(g_lock, flags);
1121                 goto post;
1122         }
1123         
1124         kptllnd_cull_peertable_locked(target);
1125
1126         write_unlock_irqrestore(g_lock, flags);
1127                 
1128         hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1129         if (hello_tx == NULL) {
1130                 CERROR("Unable to allocate connect message for %s\n",
1131                        libcfs_id2str(target));
1132                 tx->tx_status = -ENOMEM;
1133                 goto failed;
1134         }
1135
1136         kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1137                          sizeof(kptl_hello_msg_t));
1138
1139         new_peer = kptllnd_peer_allocate(target, ptl_id);
1140         if (new_peer == NULL) {
1141                 tx->tx_status = -ENOMEM;
1142                 goto failed;
1143         }
1144
1145         rc = kptllnd_peer_reserve_buffers();
1146         if (rc != 0) {
1147                 tx->tx_status = rc;
1148                 goto failed;
1149         }
1150
1151         write_lock_irqsave(g_lock, flags);
1152
1153         peer = kptllnd_id2peer_locked(target);
1154         if (peer != NULL) {                     /* someone else beat me to it */
1155                 write_unlock_irqrestore(g_lock, flags);
1156
1157                 kptllnd_peer_unreserve_buffers();
1158                 kptllnd_peer_decref(new_peer);
1159                 kptllnd_tx_decref(hello_tx);
1160                 goto post;
1161         }
1162                 
1163         if (kptllnd_data.kptl_n_active_peers ==
1164             kptllnd_data.kptl_expected_peers) {
1165                 /* peer table full */
1166                 write_unlock_irqrestore(g_lock, flags);
1167
1168                 kptllnd_peertable_overflow_msg("Connection to ", target);
1169
1170                 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1171                 if (rc != 0) {
1172                         CERROR("Can't create connection to %s\n",
1173                                libcfs_id2str(target));
1174                         kptllnd_peer_unreserve_buffers();
1175                         tx->tx_status = -ENOMEM;
1176                         goto failed;
1177                 }
1178                 write_lock_irqsave(g_lock, flags);
1179                 kptllnd_data.kptl_expected_peers++;
1180         }
1181
1182         last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(target);
1183
1184         hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1185         hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1186                 *kptllnd_tunables.kptl_max_msg_size;
1187                 
1188         new_peer->peer_state = PEER_STATE_WAITING_HELLO;
1189         new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1190         
1191         kptllnd_peer_add_peertable_locked(new_peer);
1192
1193         write_unlock_irqrestore(g_lock, flags);
1194
1195         /* NB someone else could get in now and post a message before I post
1196          * the HELLO, but post_tx/check_sends take care of that! */
1197
1198         CDEBUG(D_NETTRACE, "%s: post initial hello %p\n",
1199                libcfs_id2str(new_peer->peer_id), hello_tx);
1200
1201         peer = new_peer;
1202         kptllnd_post_tx(peer, hello_tx);
1203
1204  post:
1205         kptllnd_post_tx(peer, tx);
1206         kptllnd_peer_check_sends(peer);
1207         kptllnd_peer_decref(peer);
1208         return;
1209         
1210  failed:
1211         if (hello_tx != NULL)
1212                 kptllnd_tx_decref(hello_tx);
1213
1214         if (new_peer != NULL)
1215                 kptllnd_peer_decref(new_peer);
1216
1217         LASSERT (tx->tx_status != 0);
1218         kptllnd_tx_decref(tx);
1219         
1220 }