Whamcloud - gitweb
cefcb7d304b55fd88d253990195ac720d2cbbd1b
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd_peer.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved.
5  *   Author: PJ Kirner <pjkirner@clusterfs.com>
6  *           E Barton <eeb@bartonsoftware.com>
7  *
8  *   This file is part of the Lustre file system, http://www.lustre.org
9  *   Lustre is a trademark of Cluster File Systems, Inc.
10  *
11  *   This file is confidential source code owned by Cluster File Systems.
12  *   No viewing, modification, compilation, redistribution, or any other
13  *   form of use is permitted except through a signed license agreement.
14  *
15  *   If you have not signed such an agreement, then you have no rights to
16  *   this file.  Please destroy it immediately and contact CFS.
17  *
18  */
19
20 #include "ptllnd.h"
21 #include <libcfs/list.h>
22
23 static int
24 kptllnd_count_queue(struct list_head *q)
25 {
26         struct list_head *e;
27         int               n = 0;
28         
29         list_for_each(e, q) {
30                 n++;
31         }
32
33         return n;
34 }
35
36 int
37 kptllnd_get_peer_info(int index, 
38                       lnet_process_id_t *id,
39                       int *state, int *sent_hello,
40                       int *refcount, __u64 *incarnation,
41                       __u64 *next_matchbits, __u64 *last_matchbits_seen,
42                       int *nsendq, int *nactiveq,
43                       int *credits, int *outstanding_credits) 
44 {
45         rwlock_t         *g_lock = &kptllnd_data.kptl_peer_rw_lock;
46         unsigned long     flags;
47         struct list_head *ptmp;
48         kptl_peer_t      *peer;
49         int               i;
50         int               rc = -ENOENT;
51
52         read_lock_irqsave(g_lock, flags);
53
54         for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++) {
55                 
56                 list_for_each (ptmp, &kptllnd_data.kptl_peers[i]) {
57                         peer = list_entry(ptmp, kptl_peer_t, peer_list);
58
59                         if (index-- > 0)
60                                 continue;
61                         
62                         *id          = peer->peer_id;
63                         *state       = peer->peer_state;
64                         *sent_hello  = peer->peer_sent_hello;
65                         *refcount    = atomic_read(&peer->peer_refcount);
66                         *incarnation = peer->peer_incarnation;
67
68                         spin_lock(&peer->peer_lock);
69
70                         *next_matchbits      = peer->peer_next_matchbits;
71                         *last_matchbits_seen = peer->peer_last_matchbits_seen;
72                         *credits             = peer->peer_credits;
73                         *outstanding_credits = peer->peer_outstanding_credits;
74
75                         *nsendq   = kptllnd_count_queue(&peer->peer_sendq);
76                         *nactiveq = kptllnd_count_queue(&peer->peer_activeq);
77
78                         spin_unlock(&peer->peer_lock);
79
80                         rc = 0;
81                         goto out;
82                 }
83         }
84         
85  out:
86         read_unlock_irqrestore(g_lock, flags);
87         return rc;
88 }
89
90 void
91 kptllnd_peer_add_peertable_locked (kptl_peer_t *peer)
92 {
93         LASSERT (kptllnd_data.kptl_n_active_peers <
94                  kptllnd_data.kptl_expected_peers);
95
96         LASSERT (peer->peer_state == PEER_STATE_WAITING_HELLO ||
97                  peer->peer_state == PEER_STATE_ACTIVE);
98         
99         kptllnd_data.kptl_n_active_peers++;
100         atomic_inc(&peer->peer_refcount);       /* +1 ref for the list */
101
102         /* NB add to HEAD of peer list for MRU order!
103          * (see kptllnd_cull_peertable) */
104         list_add(&peer->peer_list, kptllnd_nid2peerlist(peer->peer_id.nid));
105 }
106
107 void
108 kptllnd_cull_peertable_locked (lnet_process_id_t pid)
109 {
110         /* I'm about to add a new peer with this portals ID to the peer table,
111          * so (a) this peer should not exist already and (b) I want to leave at
112          * most (max_procs_per_nid - 1) peers with this NID in the table. */
113         struct list_head  *peers = kptllnd_nid2peerlist(pid.nid);
114         int                cull_count = *kptllnd_tunables.kptl_max_procs_per_node;
115         int                count;
116         struct list_head  *tmp;
117         struct list_head  *nxt;
118         kptl_peer_t       *peer;
119         
120         count = 0;
121         list_for_each_safe (tmp, nxt, peers) {
122                 /* NB I rely on kptllnd_peer_add_peertable_locked to add peers
123                  * in MRU order */
124                 peer = list_entry(tmp, kptl_peer_t, peer_list);
125                         
126                 if (peer->peer_id.nid != pid.nid)
127                         continue;
128
129                 LASSERT (peer->peer_id.pid != pid.pid);
130                         
131                 count++;
132
133                 if (count < cull_count) /* recent (don't cull) */
134                         continue;
135
136                 CDEBUG(D_NET, "Cull %s(%s)\n",
137                        libcfs_id2str(peer->peer_id),
138                        kptllnd_ptlid2str(peer->peer_ptlid));
139                 
140                 kptllnd_peer_close_locked(peer, 0);
141         }
142 }
143
144 kptl_peer_t *
145 kptllnd_peer_allocate (lnet_process_id_t lpid, ptl_process_id_t ppid)
146 {
147         unsigned long    flags;
148         kptl_peer_t     *peer;
149
150         LIBCFS_ALLOC(peer, sizeof (*peer));
151         if (peer == NULL) {
152                 CERROR("Can't create peer %s (%s)\n",
153                        libcfs_id2str(lpid), 
154                        kptllnd_ptlid2str(ppid));
155                 return NULL;
156         }
157
158         memset(peer, 0, sizeof(*peer));         /* zero flags etc */
159
160         INIT_LIST_HEAD (&peer->peer_sendq);
161         INIT_LIST_HEAD (&peer->peer_activeq);
162         spin_lock_init (&peer->peer_lock);
163
164         peer->peer_state = PEER_STATE_ALLOCATED;
165         peer->peer_error = 0;
166         peer->peer_last_alive = cfs_time_current();
167         peer->peer_id = lpid;
168         peer->peer_ptlid = ppid;
169         peer->peer_credits = 1;                 /* enough for HELLO */
170         peer->peer_next_matchbits = PTL_RESERVED_MATCHBITS;
171         peer->peer_outstanding_credits = *kptllnd_tunables.kptl_peercredits - 1;
172
173         atomic_set(&peer->peer_refcount, 1);    /* 1 ref for caller */
174
175         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
176
177         /* Only increase # peers under lock, to guarantee we dont grow it
178          * during shutdown */
179         if (kptllnd_data.kptl_shutdown) {
180                 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, 
181                                         flags);
182                 LIBCFS_FREE(peer, sizeof(*peer));
183                 return NULL;
184         }
185
186         kptllnd_data.kptl_npeers++;
187         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
188         
189         return peer;
190 }
191
192 void
193 kptllnd_peer_destroy (kptl_peer_t *peer)
194 {
195         unsigned long flags;
196         
197         CDEBUG(D_NET, "Peer=%p\n", peer);
198
199         LASSERT (!in_interrupt());
200         LASSERT (atomic_read(&peer->peer_refcount) == 0);
201         LASSERT (peer->peer_state == PEER_STATE_ALLOCATED ||
202                  peer->peer_state == PEER_STATE_ZOMBIE);
203         LASSERT (list_empty(&peer->peer_sendq));
204         LASSERT (list_empty(&peer->peer_activeq));
205
206         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
207
208         if (peer->peer_state == PEER_STATE_ZOMBIE)
209                 list_del(&peer->peer_list);
210
211         kptllnd_data.kptl_npeers--;
212
213         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
214
215         LIBCFS_FREE (peer, sizeof (*peer));
216 }
217
218 void
219 kptllnd_peer_cancel_txs(kptl_peer_t *peer)
220 {
221         struct list_head   sendq;
222         struct list_head   activeq;
223         struct list_head  *tmp;
224         struct list_head  *nxt;
225         kptl_tx_t         *tx;
226         unsigned long      flags;
227
228         /* atomically grab all the peer's tx-es... */
229
230         spin_lock_irqsave(&peer->peer_lock, flags);
231
232         list_add(&sendq, &peer->peer_sendq);
233         list_del_init(&peer->peer_sendq);
234         list_for_each (tmp, &sendq) {
235                 tx = list_entry(tmp, kptl_tx_t, tx_list);
236                 tx->tx_active = 0;
237         }
238
239         list_add(&activeq, &peer->peer_activeq);
240         list_del_init(&peer->peer_activeq);
241         list_for_each (tmp, &activeq) {
242                 tx = list_entry(tmp, kptl_tx_t, tx_list);
243                 tx->tx_active = 0;
244         }
245
246         spin_unlock_irqrestore(&peer->peer_lock, flags);
247
248         /* ...then drop the peer's ref on them at leasure.  This will get
249          * kptllnd_tx_fini() to abort outstanding comms if necessary. */
250
251         list_for_each_safe (tmp, nxt, &sendq) {
252                 tx = list_entry(tmp, kptl_tx_t, tx_list);
253                 list_del(&tx->tx_list);
254                 tx->tx_status = -EIO;
255                 kptllnd_tx_decref(tx);
256         }
257
258         list_for_each_safe (tmp, nxt, &activeq) {
259                 tx = list_entry(tmp, kptl_tx_t, tx_list);
260                 list_del(&tx->tx_list);
261                 tx->tx_status = -EIO;
262                 kptllnd_tx_decref(tx);
263         }
264 }
265
266 void
267 kptllnd_peer_alive (kptl_peer_t *peer)
268 {
269         /* This is racy, but everyone's only writing cfs_time_current() */
270         peer->peer_last_alive = cfs_time_current();
271         mb();
272 }
273
274 void
275 kptllnd_peer_notify (kptl_peer_t *peer)
276 {
277         unsigned long flags;
278         time_t        last_alive = 0;
279         int           error = 0;
280         
281         spin_lock_irqsave(&peer->peer_lock, flags);
282
283         if (peer->peer_error != 0) {
284                 error = peer->peer_error;
285                 peer->peer_error = 0;
286                 
287                 last_alive = cfs_time_current_sec() - 
288                              cfs_duration_sec(cfs_time_current() - 
289                                               peer->peer_last_alive);
290         }
291         
292         spin_unlock_irqrestore(&peer->peer_lock, flags);
293
294         if (error != 0)
295                 lnet_notify (kptllnd_data.kptl_ni, peer->peer_id.nid, 0,
296                              last_alive);
297 }
298
299 void
300 kptllnd_handle_closing_peers ()
301 {
302         unsigned long           flags;
303         kptl_peer_t            *peer;
304         struct list_head       *tmp;
305         struct list_head       *nxt;
306         int                     idle;
307
308         /* Check with a read lock first to avoid blocking anyone */
309
310         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
311         idle = list_empty(&kptllnd_data.kptl_closing_peers);
312         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
313
314         if (idle)
315                 return;
316
317         /* Scan the closing peers and cancel their txs.
318          * NB only safe while there is only a single watchdog */
319
320         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
321
322         list_for_each_safe (tmp, nxt, &kptllnd_data.kptl_closing_peers) {
323                 peer = list_entry (tmp, kptl_peer_t, peer_list);
324
325                 LASSERT (peer->peer_state == PEER_STATE_CLOSING);
326
327                 list_del(&peer->peer_list);
328                 list_add_tail(&peer->peer_list,
329                               &kptllnd_data.kptl_zombie_peers);
330                 peer->peer_state = PEER_STATE_ZOMBIE;
331
332                 write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
333
334                 kptllnd_peer_notify(peer);
335                 kptllnd_peer_cancel_txs(peer);
336                 kptllnd_peer_decref(peer);
337
338                 write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
339         }
340
341         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
342 }
343
344 void
345 kptllnd_peer_close_locked(kptl_peer_t *peer, int why)
346 {
347         switch (peer->peer_state) {
348         default:
349                 LBUG();
350
351         case PEER_STATE_WAITING_HELLO:
352         case PEER_STATE_ACTIVE:
353                 /* Removing from peer table */
354                 kptllnd_data.kptl_n_active_peers--;
355                 LASSERT (kptllnd_data.kptl_n_active_peers >= 0);
356
357                 list_del(&peer->peer_list);
358                 kptllnd_peer_unreserve_buffers();
359
360                 peer->peer_error = why; /* stash 'why' only on first close */
361
362                 /* Schedule for immediate attention, taking peer table's ref */
363                 list_add_tail(&peer->peer_list, 
364                               &kptllnd_data.kptl_closing_peers);
365                 wake_up(&kptllnd_data.kptl_watchdog_waitq);
366                 break;
367
368         case PEER_STATE_ZOMBIE:
369                 /* Schedule for attention at next timeout */
370                 kptllnd_peer_addref(peer);
371                 list_del(&peer->peer_list);
372                 list_add_tail(&peer->peer_list, 
373                               &kptllnd_data.kptl_closing_peers);
374                 break;
375                 
376         case PEER_STATE_CLOSING:
377                 break;
378         }
379
380         peer->peer_state = PEER_STATE_CLOSING;
381 }
382
383 void
384 kptllnd_peer_close(kptl_peer_t *peer, int why)
385 {
386         unsigned long      flags;
387
388         write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
389         kptllnd_peer_close_locked(peer, why);
390         write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
391 }
392
393 int
394 kptllnd_peer_del(lnet_process_id_t id)
395 {
396         struct list_head  *ptmp;
397         struct list_head  *pnxt;
398         kptl_peer_t       *peer;
399         int                lo;
400         int                hi;
401         int                i;
402         unsigned long      flags;
403         int                rc = -ENOENT;
404
405         /*
406          * Find the single bucket we are supposed to look at or if nid is a
407          * wildcard (LNET_NID_ANY) then look at all of the buckets
408          */
409         if (id.nid != LNET_NID_ANY) {
410                 struct list_head *l = kptllnd_nid2peerlist(id.nid);
411                 
412                 lo = hi =  l - kptllnd_data.kptl_peers;
413         } else {
414                 if (id.pid != LNET_PID_ANY)
415                         return -EINVAL;
416                 
417                 lo = 0;
418                 hi = kptllnd_data.kptl_peer_hash_size - 1;
419         }
420
421 again:
422         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
423
424         for (i = lo; i <= hi; i++) {
425                 list_for_each_safe (ptmp, pnxt, &kptllnd_data.kptl_peers[i]) {
426                         peer = list_entry (ptmp, kptl_peer_t, peer_list);
427
428                         if (!(id.nid == LNET_NID_ANY || 
429                               (peer->peer_id.nid == id.nid &&
430                                (id.pid == LNET_PID_ANY || 
431                                 peer->peer_id.pid == id.pid))))
432                                 continue;
433
434                         kptllnd_peer_addref(peer); /* 1 ref for me... */
435
436                         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
437                                                flags);
438
439                         kptllnd_peer_close(peer, 0);
440                         kptllnd_peer_decref(peer); /* ...until here */
441
442                         rc = 0;         /* matched something */
443
444                         /* start again now I've dropped the lock */
445                         goto again;
446                 }
447         }
448
449         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
450
451         return (rc);
452 }
453
454 void
455 kptllnd_post_tx(kptl_peer_t *peer, kptl_tx_t *tx)
456 {
457         /* CAVEAT EMPTOR: I take over caller's ref on 'tx' */
458         ptl_handle_md_t  rdma_mdh = PTL_INVALID_HANDLE;
459         ptl_handle_md_t  msg_mdh = PTL_INVALID_HANDLE;
460         ptl_handle_me_t  meh;
461         ptl_md_t         md;
462         ptl_err_t        prc;
463         unsigned long    flags;
464
465         LASSERT (!tx->tx_idle);
466         LASSERT (!tx->tx_active);
467         LASSERT (PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
468         LASSERT (PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
469         LASSERT (tx->tx_type == TX_TYPE_SMALL_MESSAGE ||
470                  tx->tx_type == TX_TYPE_PUT_REQUEST ||
471                  tx->tx_type == TX_TYPE_GET_REQUEST);
472
473         kptllnd_set_tx_peer(tx, peer);
474
475         if (tx->tx_type == TX_TYPE_PUT_REQUEST ||
476             tx->tx_type == TX_TYPE_GET_REQUEST) {
477
478                 spin_lock_irqsave(&peer->peer_lock, flags);
479
480                 /* Assume 64-bit matchbits can't wrap */
481                 LASSERT (peer->peer_next_matchbits >= PTL_RESERVED_MATCHBITS);
482                 tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits =
483                         peer->peer_next_matchbits++;
484                         
485                 spin_unlock_irqrestore(&peer->peer_lock, flags);
486
487                 prc = PtlMEAttach(kptllnd_data.kptl_nih,
488                                   *kptllnd_tunables.kptl_portal,
489                                   peer->peer_ptlid,
490                                   tx->tx_msg->ptlm_u.rdma.kptlrm_matchbits,
491                                   0,             /* ignore bits */
492                                   PTL_UNLINK,
493                                   PTL_INS_BEFORE,
494                                   &meh);
495                 if (prc != PTL_OK) {
496                         CERROR("PtlMEAttach(%s) failed: %d\n",
497                                libcfs_id2str(peer->peer_id), prc);
498                         goto failed;
499                 }
500
501                 prc = PtlMDAttach(meh, tx->tx_rdma_md, PTL_UNLINK, &rdma_mdh);
502                 if (prc != PTL_OK) {
503                         CERROR("PtlMDAttach(%s) failed: %d\n",
504                                libcfs_id2str(tx->tx_peer->peer_id), prc);
505                         prc = PtlMEUnlink(meh);
506                         LASSERT(prc == PTL_OK);
507                         rdma_mdh = PTL_INVALID_HANDLE;
508                         goto failed;
509                 }
510
511                 /* I'm not racing with the event callback here.  It's a bug if
512                  * there's an event on the MD I just attached before I actually
513                  * send the RDMA request message which the event callback
514                  * catches by asserting 'rdma_mdh' is valid. */
515         }
516
517         memset(&md, 0, sizeof(md));
518         
519         md.start = tx->tx_msg;
520         md.length = tx->tx_msg->ptlm_nob;
521         md.threshold = 1;
522         md.options = PTL_MD_OP_PUT |
523                      PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
524                      PTL_MD_EVENT_START_DISABLE;
525         md.user_ptr = &tx->tx_msg_eventarg;
526         md.eq_handle = kptllnd_data.kptl_eqh;
527
528         prc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &msg_mdh);
529         if (prc != PTL_OK) {
530                 msg_mdh = PTL_INVALID_HANDLE;
531                 goto failed;
532         }
533         
534         spin_lock_irqsave(&peer->peer_lock, flags);
535
536         tx->tx_deadline = jiffies + (*kptllnd_tunables.kptl_timeout * HZ);
537         tx->tx_active = 1;
538         tx->tx_rdma_mdh = rdma_mdh;
539         tx->tx_msg_mdh = msg_mdh;
540
541         /* Ensure HELLO is sent first */
542         if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_HELLO)
543                 list_add(&tx->tx_list, &peer->peer_sendq);
544         else
545                 list_add_tail(&tx->tx_list, &peer->peer_sendq);
546
547         spin_unlock_irqrestore(&peer->peer_lock, flags);
548         return;
549         
550  failed:
551         spin_lock_irqsave(&peer->peer_lock, flags);
552
553         tx->tx_status = -EIO;
554         tx->tx_rdma_mdh = rdma_mdh;
555         tx->tx_msg_mdh = msg_mdh;
556
557         spin_unlock_irqrestore(&peer->peer_lock, flags);
558
559         kptllnd_tx_decref(tx);
560 }
561
562 void
563 kptllnd_peer_check_sends (kptl_peer_t *peer)
564 {
565
566         kptl_tx_t       *tx;
567         int              rc;
568         unsigned long    flags;
569
570         LASSERT(!in_interrupt());
571
572         spin_lock_irqsave(&peer->peer_lock, flags);
573
574         if (list_empty(&peer->peer_sendq) &&
575             peer->peer_outstanding_credits >= PTLLND_CREDIT_HIGHWATER) {
576
577                 /* post a NOOP to return credits */
578                 spin_unlock_irqrestore(&peer->peer_lock, flags);
579
580                 tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
581                 if (tx == NULL) {
582                         CERROR("Can't return credits to %s: can't allocate descriptor\n",
583                                libcfs_id2str(peer->peer_id));
584                 } else {
585                         kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_NOOP, 0);
586                         kptllnd_post_tx(peer, tx);
587                 }
588
589                 spin_lock_irqsave(&peer->peer_lock, flags);
590         }
591
592         while (!list_empty(&peer->peer_sendq)) {
593                 tx = list_entry (peer->peer_sendq.next, kptl_tx_t, tx_list);
594
595                 LASSERT (tx->tx_active);
596                 LASSERT (!PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE));
597                 LASSERT (tx->tx_type == TX_TYPE_SMALL_MESSAGE ||
598                          !PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
599
600                 LASSERT (peer->peer_outstanding_credits >= 0);
601                 LASSERT (peer->peer_outstanding_credits <= 
602                          *kptllnd_tunables.kptl_peercredits);
603                 LASSERT (peer->peer_credits >= 0);
604                 LASSERT (peer->peer_credits <= 
605                          *kptllnd_tunables.kptl_peercredits);
606
607                 /* Ensure HELLO is sent first */
608                 if (!peer->peer_sent_hello) {
609                         if (tx->tx_msg->ptlm_type != PTLLND_MSG_TYPE_HELLO)
610                                 break;
611                         peer->peer_sent_hello = 1;
612                 }
613
614                 if (peer->peer_credits == 0) {
615                         CDEBUG(D_NET, "%s: no credits\n",
616                                libcfs_id2str(peer->peer_id));
617                         break;
618                 }
619
620                 /* Don't use the last credit unless I've got credits to
621                  * return */
622                 if (peer->peer_credits == 1 &&
623                     peer->peer_outstanding_credits == 0) {
624                         CDEBUG(D_NET, "%s: not using last credit\n",
625                                libcfs_id2str(peer->peer_id));
626                         break;
627                 }
628
629                 list_del(&tx->tx_list);
630
631                 /* Discard any NOOP I queued if I'm not at the high-water mark
632                  * any more or more messages have been queued */
633                 if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_NOOP &&
634                     (!list_empty(&peer->peer_sendq) ||
635                      peer->peer_outstanding_credits < PTLLND_CREDIT_HIGHWATER)) {
636
637                         tx->tx_active = 0;
638
639                         spin_unlock_irqrestore(&peer->peer_lock, flags);
640
641                         CDEBUG(D_NET, "%s: redundant noop\n", 
642                                libcfs_id2str(peer->peer_id));
643                         kptllnd_tx_decref(tx);
644
645                         spin_lock_irqsave(&peer->peer_lock, flags);
646                         continue;
647                 }
648
649                 CDEBUG(D_NET, "tx=%p nob=%d to %s(%s)\n",
650                        tx, tx->tx_msg->ptlm_nob,
651                        libcfs_id2str(peer->peer_id), 
652                        kptllnd_ptlid2str(peer->peer_ptlid));
653
654                 /* fill last-minute msg header fields */
655                 kptllnd_msg_pack(tx->tx_msg, peer);
656
657                 peer->peer_outstanding_credits = 0;
658                 peer->peer_credits--;
659
660                 list_add_tail(&tx->tx_list, &peer->peer_activeq);
661
662                 kptllnd_tx_addref(tx);          /* 1 ref for me... */
663
664                 spin_unlock_irqrestore(&peer->peer_lock, flags);
665
666                 rc = PtlPut (tx->tx_msg_mdh,
667                              PTL_NOACK_REQ,
668                              peer->peer_ptlid,
669                              *kptllnd_tunables.kptl_portal,
670                              0,                 /* acl cookie */
671                              LNET_MSG_MATCHBITS,
672                              0,                 /* offset */
673                              0);                /* header data */
674                 if (rc != PTL_OK) {
675                         CERROR("PtlPut %s error %d\n",
676                                libcfs_id2str(peer->peer_id), rc);
677
678                         /* Nuke everything (including this tx) */
679                         kptllnd_peer_close(peer, -EIO);
680                         return;
681                 }
682
683                 kptllnd_tx_decref(tx);          /* drop my ref */
684
685                 spin_lock_irqsave(&peer->peer_lock, flags);
686         }
687
688         spin_unlock_irqrestore(&peer->peer_lock, flags);
689 }
690
691 kptl_tx_t *
692 kptllnd_find_timed_out_tx(kptl_peer_t *peer)
693 {
694         kptl_tx_t         *tx;
695         struct list_head  *tmp;
696         unsigned long      flags;
697
698         spin_lock_irqsave(&peer->peer_lock, flags);
699
700         list_for_each(tmp, &peer->peer_sendq) {
701                 tx = list_entry(peer->peer_sendq.next, kptl_tx_t, tx_list);
702
703                 if (time_after_eq(jiffies, tx->tx_deadline)) {
704                         kptllnd_tx_addref(tx);
705                         spin_unlock_irqrestore(&peer->peer_lock, flags);
706                         return tx;
707                 }
708         }
709
710         list_for_each(tmp, &peer->peer_activeq) {
711                 tx = list_entry(peer->peer_activeq.next, kptl_tx_t, tx_list);
712
713                 if (time_after_eq(jiffies, tx->tx_deadline)) {
714                         kptllnd_tx_addref(tx);
715                         spin_unlock_irqrestore(&peer->peer_lock, flags);
716                         return tx;
717                 }
718         }
719
720         spin_unlock_irqrestore(&peer->peer_lock, flags);
721         return NULL;
722 }
723
724
725 void
726 kptllnd_peer_check_bucket (int idx)
727 {
728         struct list_head  *peers = &kptllnd_data.kptl_peers[idx];
729         struct list_head  *ptmp;
730         kptl_peer_t       *peer;
731         kptl_tx_t         *tx;
732         unsigned long      flags;
733         int                nsend;
734         int                nactive;
735
736         CDEBUG(D_NET, "Bucket=%d\n", idx);
737
738  again:
739         /* NB. Shared lock while I just look */
740         read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
741
742         list_for_each (ptmp, peers) {
743                 peer = list_entry (ptmp, kptl_peer_t, peer_list);
744
745                 CDEBUG(D_NET, "Peer=%s Credits=%d Outstanding=%d\n",
746                        libcfs_id2str(peer->peer_id),
747                        peer->peer_credits, peer->peer_outstanding_credits);
748
749                 /* In case we have enough credits to return via a
750                  * NOOP, but there were no non-blocking tx descs
751                  * free to do it last time... */
752                 kptllnd_peer_check_sends(peer);
753
754                 tx = kptllnd_find_timed_out_tx(peer);
755                 if (tx == NULL)
756                         continue;
757
758                 kptllnd_peer_addref(peer); /* 1 ref for me... */
759
760                 read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,
761                                        flags);
762
763                 spin_lock_irqsave(&peer->peer_lock, flags);
764                 nsend = kptllnd_count_queue(&peer->peer_sendq);
765                 nactive = kptllnd_count_queue(&peer->peer_activeq);
766                 spin_unlock_irqrestore(&peer->peer_lock, flags);
767
768                 LCONSOLE_ERROR("Timing out %s: please check Portals\n",
769                                libcfs_id2str(peer->peer_id));
770
771                 CERROR("%s timed out: cred %d outstanding %d sendq %d "
772                        "activeq %d Tx %s (%s%s%s) status %d T/O %ds\n",
773                        libcfs_id2str(peer->peer_id),
774                        peer->peer_credits, peer->peer_outstanding_credits,
775                        nsend, nactive, kptllnd_tx_typestr(tx->tx_type),
776                        tx->tx_active ? "A" : "",
777                        PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ?
778                        "" : "M",
779                        PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE) ?
780                        "" : "D",
781                        tx->tx_status, *kptllnd_tunables.kptl_timeout);
782
783                 kptllnd_dump_ptltrace();
784
785                 kptllnd_tx_decref(tx);
786
787                 kptllnd_peer_close(peer, -ETIMEDOUT);
788                 kptllnd_peer_decref(peer); /* ...until here */
789
790                 /* start again now I've dropped the lock */
791                 goto again;
792         }
793
794         read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
795 }
796
797 kptl_peer_t *
798 kptllnd_id2peer_locked (lnet_process_id_t id)
799 {
800         struct list_head *peers = kptllnd_nid2peerlist(id.nid);
801         struct list_head *tmp;
802         kptl_peer_t      *peer;
803
804         list_for_each (tmp, peers) {
805
806                 peer = list_entry (tmp, kptl_peer_t, peer_list);
807
808                 LASSERT(peer->peer_state == PEER_STATE_WAITING_HELLO ||
809                         peer->peer_state == PEER_STATE_ACTIVE);
810                 
811                 if (peer->peer_id.nid != id.nid ||
812                     peer->peer_id.pid != id.pid)
813                         continue;
814
815                 kptllnd_peer_addref(peer);
816
817                 CDEBUG(D_NET, "%s -> %s (%d)\n",
818                        libcfs_id2str(id), 
819                        kptllnd_ptlid2str(peer->peer_ptlid),
820                        atomic_read (&peer->peer_refcount));
821                 return peer;
822         }
823
824         return NULL;
825 }
826
827 void
828 kptllnd_peertable_overflow_msg(char *str, lnet_process_id_t id)
829 {
830         LCONSOLE_ERROR("%s %s overflows the peer table[%d]: "
831                        "messages may be dropped\n",
832                        str, libcfs_id2str(id),
833                        kptllnd_data.kptl_n_active_peers);
834         LCONSOLE_ERROR("Please correct by increasing "
835                        "'max_nodes' or 'max_procs_per_node'\n");
836 }
837
838 __u64
839 kptllnd_get_last_seen_matchbits_locked(lnet_process_id_t lpid)
840 {
841         kptl_peer_t            *peer;
842         struct list_head       *tmp;
843
844         /* Find the last matchbits I saw this new peer using.  Note..
845            A. This peer cannot be in the peer table - she's new!
846            B. If I can't find the peer in the closing/zombie peers, all
847               matchbits are safe because all refs to the (old) peer have gone
848               so all txs have completed so there's no risk of matchbit
849               collision!
850          */
851
852         LASSERT(kptllnd_id2peer_locked(lpid) == NULL);
853
854         /* peer's last matchbits can't change after it comes out of the peer
855          * table, so first match is fine */
856
857         list_for_each (tmp, &kptllnd_data.kptl_closing_peers) {
858                 peer = list_entry (tmp, kptl_peer_t, peer_list);
859
860                 if (peer->peer_id.nid == lpid.nid &&
861                     peer->peer_id.pid == lpid.pid)
862                         return peer->peer_last_matchbits_seen;
863         }
864         
865         list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {
866                 peer = list_entry (tmp, kptl_peer_t, peer_list);
867
868                 if (peer->peer_id.nid == lpid.nid &&
869                     peer->peer_id.pid == lpid.pid)
870                         return peer->peer_last_matchbits_seen;
871         }
872         
873         return PTL_RESERVED_MATCHBITS;
874 }
875
876 kptl_peer_t *
877 kptllnd_peer_handle_hello (ptl_process_id_t  initiator,
878                            kptl_msg_t       *msg)
879 {
880         rwlock_t           *g_lock = &kptllnd_data.kptl_peer_rw_lock;
881         kptl_peer_t        *peer;
882         kptl_peer_t        *new_peer;
883         lnet_process_id_t   lpid;
884         unsigned long       flags;
885         kptl_tx_t          *hello_tx;
886         int                 rc;
887         __u64               safe_matchbits;
888         __u64               last_matchbits_seen;
889
890         lpid.nid = msg->ptlm_srcnid;
891         lpid.pid = msg->ptlm_srcpid;
892
893         CDEBUG(D_NET, "hello from %s(%s)\n",
894                libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
895
896         if (initiator.pid != kptllnd_data.kptl_portals_id.pid &&
897             (msg->ptlm_srcpid & LNET_PID_USERFLAG) == 0) {
898                 /* If the peer's PID isn't _the_ ptllnd kernel pid, she must be
899                  * userspace.  Refuse the connection if she hasn't set the
900                  * correct flag in her PID... */
901                 CERROR("Userflag not set in hello from %s (%s)\n",
902                        libcfs_id2str(lpid), kptllnd_ptlid2str(initiator));
903                 return NULL;
904         }
905         
906         /* kptlhm_matchbits are the highest matchbits my peer may have used to
907          * RDMA to me.  I ensure I never register buffers for RDMA that could
908          * match any she used */
909         safe_matchbits = msg->ptlm_u.hello.kptlhm_matchbits + 1;
910
911         if (safe_matchbits < PTL_RESERVED_MATCHBITS) {
912                 CERROR("Illegal matchbits "LPX64" in HELLO from %s\n",
913                        safe_matchbits, libcfs_id2str(lpid));
914                 return NULL;
915         }
916         
917         if (msg->ptlm_u.hello.kptlhm_max_msg_size !=
918             *kptllnd_tunables.kptl_max_msg_size) {
919                 CERROR("max message size MUST be equal for all peers: "
920                        "got %d expected %d from %s\n",
921                        msg->ptlm_u.hello.kptlhm_max_msg_size,
922                        *kptllnd_tunables.kptl_max_msg_size,
923                        libcfs_id2str(lpid));
924                 return NULL;
925         }
926
927         if (msg->ptlm_credits + 1 != *kptllnd_tunables.kptl_peercredits) {
928                 CERROR("peercredits MUST be equal on all peers: "
929                        "got %d expected %d from %s\n",
930                        msg->ptlm_credits + 1,
931                        *kptllnd_tunables.kptl_peercredits,
932                        libcfs_id2str(lpid));
933                 return NULL;
934         }
935         
936         write_lock_irqsave(g_lock, flags);
937
938         peer = kptllnd_id2peer_locked(lpid);
939         if (peer != NULL) {
940                 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
941                         /* Completing HELLO handshake */
942                         LASSERT(peer->peer_incarnation == 0);
943
944                         peer->peer_state = PEER_STATE_ACTIVE;
945                         peer->peer_incarnation = msg->ptlm_srcstamp;
946                         peer->peer_next_matchbits = safe_matchbits;
947
948                         write_unlock_irqrestore(g_lock, flags);
949                         return peer;
950                 }
951
952                 /* remove old incarnation of this peer */
953                 kptllnd_peer_close_locked(peer, 0);
954         }
955
956         kptllnd_cull_peertable_locked(lpid);
957
958         write_unlock_irqrestore(g_lock, flags);
959
960         if (peer != NULL) {
961                 CDEBUG(D_NET, "Peer %s (%s) reconnecting:"
962                        " stamp "LPX64"("LPX64")\n",
963                        libcfs_id2str(lpid), kptllnd_ptlid2str(initiator),
964                        msg->ptlm_srcstamp, peer->peer_incarnation);
965
966                 kptllnd_peer_decref(peer);
967         }
968
969         hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
970         if (hello_tx == NULL) {
971                 CERROR("Unable to allocate HELLO message for %s\n",
972                        libcfs_id2str(lpid));
973                 return NULL;
974         }
975
976         kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
977                          sizeof(kptl_hello_msg_t));
978
979         new_peer = kptllnd_peer_allocate(lpid, initiator);
980         if (new_peer == NULL) {
981                 kptllnd_tx_decref(hello_tx);
982                 return NULL;
983         }
984
985         rc = kptllnd_peer_reserve_buffers();
986         if (rc != 0) {
987                 kptllnd_peer_decref(new_peer);
988                 kptllnd_tx_decref(hello_tx);
989
990                 CERROR("Failed to reserve buffers for %s\n",
991                        libcfs_id2str(lpid));
992                 return NULL;
993         }
994
995         write_lock_irqsave(g_lock, flags);
996
997         peer = kptllnd_id2peer_locked(lpid);
998         if (peer != NULL) {
999                 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
1000                         /* An outgoing message instantiated 'peer' for me and
1001                          * presumably provoked this reply */
1002                         CWARN("Outgoing instantiated peer %s\n", libcfs_id2str(lpid));
1003                         LASSERT(peer->peer_incarnation == 0);
1004
1005                         peer->peer_state = PEER_STATE_ACTIVE;
1006                         peer->peer_incarnation = msg->ptlm_srcstamp;
1007                         peer->peer_next_matchbits = safe_matchbits;
1008                 } else {
1009                         LASSERT (peer->peer_state == PEER_STATE_ACTIVE);
1010                         /* WOW!  Somehow this peer completed the HELLO
1011                          * handshake while I slept.  I guess I could have slept
1012                          * while it rebooted and sent a new HELLO, so I'll fail
1013                          * this one... */
1014                         CWARN("Wow! peer %s\n", libcfs_id2str(lpid));
1015                         kptllnd_peer_decref(peer);
1016                         peer = NULL;
1017                 }
1018                 
1019                 write_unlock_irqrestore(g_lock, flags);
1020
1021                 kptllnd_peer_unreserve_buffers();
1022                 kptllnd_peer_decref(new_peer);
1023                 kptllnd_tx_decref(hello_tx);
1024                 return peer;
1025         }
1026
1027         if (kptllnd_data.kptl_n_active_peers ==
1028             kptllnd_data.kptl_expected_peers) {
1029                 /* peer table full */
1030                 write_unlock_irqrestore(g_lock, flags);
1031
1032                 kptllnd_peertable_overflow_msg("Connection from ", lpid);
1033
1034                 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1035                 if (rc != 0) {
1036                         CERROR("Refusing connection from %s\n",
1037                                libcfs_id2str(lpid));
1038                         kptllnd_peer_unreserve_buffers();
1039                         kptllnd_peer_decref(new_peer);
1040                         kptllnd_tx_decref(hello_tx);
1041                         return NULL;
1042                 }
1043                 
1044                 write_lock_irqsave(g_lock, flags);
1045                 kptllnd_data.kptl_expected_peers++;
1046         }
1047
1048         last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(lpid);
1049
1050         hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1051         hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1052                 *kptllnd_tunables.kptl_max_msg_size;
1053
1054         new_peer->peer_state = PEER_STATE_ACTIVE;
1055         new_peer->peer_incarnation = msg->ptlm_srcstamp;
1056         new_peer->peer_next_matchbits = safe_matchbits;
1057         new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1058
1059         kptllnd_peer_add_peertable_locked(new_peer);
1060
1061         write_unlock_irqrestore(g_lock, flags);
1062
1063         /* NB someone else could get in now and post a message before I post
1064          * the HELLO, but post_tx/check_sends take care of that! */
1065
1066         kptllnd_post_tx(new_peer, hello_tx);
1067         kptllnd_peer_check_sends(new_peer);
1068
1069         return new_peer;
1070 }
1071
1072 void
1073 kptllnd_tx_launch(kptl_tx_t *tx, lnet_process_id_t target)
1074 {
1075         rwlock_t         *g_lock = &kptllnd_data.kptl_peer_rw_lock;
1076         ptl_process_id_t  ptl_id;
1077         kptl_peer_t      *peer;
1078         kptl_peer_t      *new_peer = NULL;
1079         kptl_tx_t        *hello_tx = NULL;
1080         unsigned long     flags;
1081         int               rc;
1082         __u64             last_matchbits_seen;
1083
1084         LASSERT (tx->tx_lnet_msg != NULL);
1085         LASSERT (tx->tx_peer == NULL);
1086
1087         /* I expect to find the peer, so I only take a read lock... */
1088         read_lock_irqsave(g_lock, flags);
1089         peer = kptllnd_id2peer_locked(target);
1090         read_unlock_irqrestore(g_lock, flags);
1091
1092         if (peer != NULL) {
1093                 goto post;
1094         }
1095         
1096         if ((target.pid & LNET_PID_USERFLAG) != 0) {
1097                 CWARN("Refusing to create a new connection to %s "
1098                       "(non-kernel peer)\n", libcfs_id2str(target));
1099                 tx->tx_status = -EHOSTUNREACH;
1100                 goto failed;
1101         }
1102
1103         /* The new peer is a kernel ptllnd, and kernel ptllnds all have
1104          * the same portals PID */
1105         ptl_id.nid = kptllnd_lnet2ptlnid(target.nid);
1106         ptl_id.pid = kptllnd_data.kptl_portals_id.pid;
1107
1108         write_lock_irqsave(g_lock, flags);
1109
1110         peer = kptllnd_id2peer_locked(target);
1111         if (peer != NULL) {
1112                 write_unlock_irqrestore(g_lock, flags);
1113                 goto post;
1114         }
1115         
1116         kptllnd_cull_peertable_locked(target);
1117
1118         write_unlock_irqrestore(g_lock, flags);
1119                 
1120         hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
1121         if (hello_tx == NULL) {
1122                 CERROR("Unable to allocate connect message for %s\n",
1123                        libcfs_id2str(target));
1124                 tx->tx_status = -ENOMEM;
1125                 goto failed;
1126         }
1127
1128         kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
1129                          sizeof(kptl_hello_msg_t));
1130
1131         new_peer = kptllnd_peer_allocate(target, ptl_id);
1132         if (new_peer == NULL) {
1133                 tx->tx_status = -ENOMEM;
1134                 goto failed;
1135         }
1136
1137         rc = kptllnd_peer_reserve_buffers();
1138         if (rc != 0) {
1139                 tx->tx_status = rc;
1140                 goto failed;
1141         }
1142
1143         write_lock_irqsave(g_lock, flags);
1144
1145         peer = kptllnd_id2peer_locked(target);
1146         if (peer != NULL) {                     /* someone else beat me to it */
1147                 write_unlock_irqrestore(g_lock, flags);
1148
1149                 kptllnd_peer_unreserve_buffers();
1150                 kptllnd_peer_decref(new_peer);
1151                 kptllnd_tx_decref(hello_tx);
1152                 goto post;
1153         }
1154                 
1155         if (kptllnd_data.kptl_n_active_peers ==
1156             kptllnd_data.kptl_expected_peers) {
1157                 /* peer table full */
1158                 write_unlock_irqrestore(g_lock, flags);
1159
1160                 kptllnd_peertable_overflow_msg("Connection to ", target);
1161
1162                 rc = kptllnd_reserve_buffers(1); /* HELLO headroom */
1163                 if (rc != 0) {
1164                         CERROR("Can't create connection to %s\n",
1165                                libcfs_id2str(target));
1166                         kptllnd_peer_unreserve_buffers();
1167                         tx->tx_status = -ENOMEM;
1168                         goto failed;
1169                 }
1170                 write_lock_irqsave(g_lock, flags);
1171                 kptllnd_data.kptl_expected_peers++;
1172         }
1173
1174         last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(target);
1175
1176         hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
1177         hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
1178                 *kptllnd_tunables.kptl_max_msg_size;
1179                 
1180         new_peer->peer_state = PEER_STATE_WAITING_HELLO;
1181         new_peer->peer_last_matchbits_seen = last_matchbits_seen;
1182         
1183         kptllnd_peer_add_peertable_locked(new_peer);
1184
1185         write_unlock_irqrestore(g_lock, flags);
1186
1187         /* NB someone else could get in now and post a message before I post
1188          * the HELLO, but post_tx/check_sends take care of that! */
1189
1190         peer = new_peer;
1191         kptllnd_post_tx(peer, hello_tx);
1192
1193  post:
1194         kptllnd_post_tx(peer, tx);
1195         kptllnd_peer_check_sends(peer);
1196         kptllnd_peer_decref(peer);
1197         return;
1198         
1199  failed:
1200         if (hello_tx != NULL)
1201                 kptllnd_tx_decref(hello_tx);
1202
1203         if (new_peer != NULL)
1204                 kptllnd_peer_decref(new_peer);
1205
1206         LASSERT (tx->tx_status != 0);
1207         kptllnd_tx_decref(tx);
1208         
1209 }