Whamcloud - gitweb
LU-7734 lnet: configure peers from DLC
[fs/lustre-release.git] / lnet / lnet / peer.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2014, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lnet/lnet/peer.c
33  */
34
35 #define DEBUG_SUBSYSTEM S_LNET
36
37 #include <lnet/lib-lnet.h>
38 #include <lnet/lib-dlc.h>
39
40 static void
41 lnet_peer_remove_from_remote_list(struct lnet_peer_ni *lpni)
42 {
43         if (!list_empty(&lpni->lpni_on_remote_peer_ni_list)) {
44                 list_del_init(&lpni->lpni_on_remote_peer_ni_list);
45                 lnet_peer_ni_decref_locked(lpni);
46         }
47 }
48
49 static void
50 lnet_peer_tables_destroy(void)
51 {
52         struct lnet_peer_table  *ptable;
53         struct list_head        *hash;
54         int                     i;
55         int                     j;
56
57         if (!the_lnet.ln_peer_tables)
58                 return;
59
60         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
61                 hash = ptable->pt_hash;
62                 if (!hash) /* not intialized */
63                         break;
64
65                 ptable->pt_hash = NULL;
66                 for (j = 0; j < LNET_PEER_HASH_SIZE; j++)
67                         LASSERT(list_empty(&hash[j]));
68
69                 LIBCFS_FREE(hash, LNET_PEER_HASH_SIZE * sizeof(*hash));
70         }
71
72         cfs_percpt_free(the_lnet.ln_peer_tables);
73         the_lnet.ln_peer_tables = NULL;
74 }
75
76 int
77 lnet_peer_tables_create(void)
78 {
79         struct lnet_peer_table  *ptable;
80         struct list_head        *hash;
81         int                     i;
82         int                     j;
83
84         the_lnet.ln_peer_tables = cfs_percpt_alloc(lnet_cpt_table(),
85                                                    sizeof(*ptable));
86         if (the_lnet.ln_peer_tables == NULL) {
87                 CERROR("Failed to allocate cpu-partition peer tables\n");
88                 return -ENOMEM;
89         }
90
91         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
92                 LIBCFS_CPT_ALLOC(hash, lnet_cpt_table(), i,
93                                  LNET_PEER_HASH_SIZE * sizeof(*hash));
94                 if (hash == NULL) {
95                         CERROR("Failed to create peer hash table\n");
96                         lnet_peer_tables_destroy();
97                         return -ENOMEM;
98                 }
99
100                 for (j = 0; j < LNET_PEER_HASH_SIZE; j++)
101                         INIT_LIST_HEAD(&hash[j]);
102                 ptable->pt_hash = hash; /* sign of initialization */
103         }
104
105         return 0;
106 }
107
108 void lnet_peer_uninit()
109 {
110         int cpt;
111         struct lnet_peer_ni *lpni, *tmp;
112         struct lnet_peer_table *ptable = NULL;
113
114         /* remove all peer_nis from the remote peer and he hash list */
115         list_for_each_entry_safe(lpni, tmp, &the_lnet.ln_remote_peer_ni_list,
116                                  lpni_on_remote_peer_ni_list) {
117                 list_del_init(&lpni->lpni_on_remote_peer_ni_list);
118                 lnet_peer_ni_decref_locked(lpni);
119
120                 cpt = lnet_cpt_of_nid_locked(lpni->lpni_nid, NULL);
121                 ptable = the_lnet.ln_peer_tables[cpt];
122                 ptable->pt_zombies++;
123
124                 list_del_init(&lpni->lpni_hashlist);
125                 lnet_peer_ni_decref_locked(lpni);
126         }
127
128         lnet_peer_tables_destroy();
129 }
130
131 static void
132 lnet_peer_table_cleanup_locked(lnet_ni_t *ni, struct lnet_peer_table *ptable)
133 {
134         int                      i;
135         struct lnet_peer_ni     *lp;
136         struct lnet_peer_ni     *tmp;
137
138         for (i = 0; i < LNET_PEER_HASH_SIZE; i++) {
139                 list_for_each_entry_safe(lp, tmp, &ptable->pt_hash[i],
140                                          lpni_hashlist) {
141                         if (ni != NULL && ni->ni_net != lp->lpni_net)
142                                 continue;
143                         list_del_init(&lp->lpni_hashlist);
144                         /* Lose hash table's ref */
145                         ptable->pt_zombies++;
146                         lnet_peer_ni_decref_locked(lp);
147                 }
148         }
149 }
150
151 static void
152 lnet_peer_table_finalize_wait_locked(struct lnet_peer_table *ptable,
153                                      int cpt_locked)
154 {
155         int     i;
156
157         for (i = 3; ptable->pt_zombies != 0; i++) {
158                 lnet_net_unlock(cpt_locked);
159
160                 if (IS_PO2(i)) {
161                         CDEBUG(D_WARNING,
162                                "Waiting for %d zombies on peer table\n",
163                                ptable->pt_zombies);
164                 }
165                 set_current_state(TASK_UNINTERRUPTIBLE);
166                 schedule_timeout(cfs_time_seconds(1) >> 1);
167                 lnet_net_lock(cpt_locked);
168         }
169 }
170
171 static void
172 lnet_peer_table_del_rtrs_locked(lnet_ni_t *ni, struct lnet_peer_table *ptable,
173                                 int cpt_locked)
174 {
175         struct lnet_peer_ni     *lp;
176         struct lnet_peer_ni     *tmp;
177         lnet_nid_t              lpni_nid;
178         int                     i;
179
180         for (i = 0; i < LNET_PEER_HASH_SIZE; i++) {
181                 list_for_each_entry_safe(lp, tmp, &ptable->pt_hash[i],
182                                          lpni_hashlist) {
183                         if (ni->ni_net != lp->lpni_net)
184                                 continue;
185
186                         if (lp->lpni_rtr_refcount == 0)
187                                 continue;
188
189                         lpni_nid = lp->lpni_nid;
190
191                         lnet_net_unlock(cpt_locked);
192                         lnet_del_route(LNET_NIDNET(LNET_NID_ANY), lpni_nid);
193                         lnet_net_lock(cpt_locked);
194                 }
195         }
196 }
197
198 void
199 lnet_peer_tables_cleanup(lnet_ni_t *ni)
200 {
201         int                             i;
202         struct lnet_peer_table          *ptable;
203
204         LASSERT(the_lnet.ln_shutdown || ni != NULL);
205         /* If just deleting the peers for a NI, get rid of any routes these
206          * peers are gateways for. */
207         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
208                 lnet_net_lock(LNET_LOCK_EX);
209                 lnet_peer_table_del_rtrs_locked(ni, ptable, i);
210                 lnet_net_unlock(LNET_LOCK_EX);
211         }
212
213         /* Start the cleanup process */
214         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
215                 lnet_net_lock(LNET_LOCK_EX);
216                 lnet_peer_table_cleanup_locked(ni, ptable);
217                 lnet_net_unlock(LNET_LOCK_EX);
218         }
219
220         /* Wait until all peers have been destroyed. */
221         cfs_percpt_for_each(ptable, i, the_lnet.ln_peer_tables) {
222                 lnet_net_lock(LNET_LOCK_EX);
223                 lnet_peer_table_finalize_wait_locked(ptable, i);
224                 lnet_net_unlock(LNET_LOCK_EX);
225         }
226 }
227
228 static struct lnet_peer_ni *
229 lnet_get_peer_ni_locked(struct lnet_peer_table *ptable, lnet_nid_t nid)
230 {
231         struct list_head        *peers;
232         struct lnet_peer_ni     *lp;
233
234         LASSERT(!the_lnet.ln_shutdown);
235
236         peers = &ptable->pt_hash[lnet_nid2peerhash(nid)];
237         list_for_each_entry(lp, peers, lpni_hashlist) {
238                 if (lp->lpni_nid == nid) {
239                         lnet_peer_ni_addref_locked(lp);
240                         return lp;
241                 }
242         }
243
244         return NULL;
245 }
246
247 struct lnet_peer_ni *
248 lnet_find_peer_ni_locked(lnet_nid_t nid)
249 {
250         struct lnet_peer_ni *lpni;
251         struct lnet_peer_table *ptable;
252         int cpt;
253
254         cpt = lnet_nid_cpt_hash(nid, LNET_CPT_NUMBER);
255
256         ptable = the_lnet.ln_peer_tables[cpt];
257         lpni = lnet_get_peer_ni_locked(ptable, nid);
258
259         return lpni;
260 }
261
262 int
263 lnet_find_or_create_peer_locked(lnet_nid_t dst_nid, int cpt, struct lnet_peer **peer)
264 {
265         struct lnet_peer_ni *lpni;
266
267         lpni = lnet_find_peer_ni_locked(dst_nid);
268         if (!lpni) {
269                 int rc;
270                 rc = lnet_nid2peerni_locked(&lpni, dst_nid, cpt);
271                 if (rc != 0)
272                         return rc;
273         }
274
275         *peer = lpni->lpni_peer_net->lpn_peer;
276         lnet_peer_ni_decref_locked(lpni);
277
278         return 0;
279 }
280
281 struct lnet_peer_ni *
282 lnet_get_peer_ni_idx_locked(int idx, struct lnet_peer_net **lpn,
283                             struct lnet_peer **lp)
284 {
285         struct lnet_peer_ni     *lpni;
286
287         list_for_each_entry((*lp), &the_lnet.ln_peers, lp_on_lnet_peer_list) {
288                 list_for_each_entry((*lpn), &((*lp)->lp_peer_nets), lpn_on_peer_list) {
289                         list_for_each_entry(lpni, &((*lpn)->lpn_peer_nis),
290                                             lpni_on_peer_net_list)
291                                 if (idx-- == 0)
292                                         return lpni;
293                 }
294         }
295
296         return NULL;
297 }
298
299 struct lnet_peer_ni *
300 lnet_get_next_peer_ni_locked(struct lnet_peer *peer,
301                              struct lnet_peer_net *peer_net,
302                              struct lnet_peer_ni *prev)
303 {
304         struct lnet_peer_ni *lpni;
305         struct lnet_peer_net *net = peer_net;
306
307         if (!prev) {
308                 if (!net)
309                         net = list_entry(peer->lp_peer_nets.next,
310                                          struct lnet_peer_net,
311                                          lpn_on_peer_list);
312                 lpni = list_entry(net->lpn_peer_nis.next, struct lnet_peer_ni,
313                                   lpni_on_peer_net_list);
314
315                 return lpni;
316         }
317
318         if (prev->lpni_on_peer_net_list.next ==
319             &prev->lpni_peer_net->lpn_peer_nis) {
320                 /*
321                  * if you reached the end of the peer ni list and the peer
322                  * net is specified then there are no more peer nis in that
323                  * net.
324                  */
325                 if (net)
326                         return NULL;
327
328                 /*
329                  * we reached the end of this net ni list. move to the
330                  * next net
331                  */
332                 if (prev->lpni_peer_net->lpn_on_peer_list.next ==
333                     &peer->lp_peer_nets)
334                         /* no more nets and no more NIs. */
335                         return NULL;
336
337                 /* get the next net */
338                 net = list_entry(prev->lpni_peer_net->lpn_on_peer_list.next,
339                                  struct lnet_peer_net,
340                                  lpn_on_peer_list);
341                 /* get the ni on it */
342                 lpni = list_entry(net->lpn_peer_nis.next, struct lnet_peer_ni,
343                                   lpni_on_peer_net_list);
344
345                 return lpni;
346         }
347
348         /* there are more nis left */
349         lpni = list_entry(prev->lpni_on_peer_net_list.next,
350                           struct lnet_peer_ni, lpni_on_peer_net_list);
351
352         return lpni;
353 }
354
355 bool
356 lnet_peer_is_ni_pref_locked(struct lnet_peer_ni *lpni, struct lnet_ni *ni)
357 {
358         int i;
359
360         for (i = 0; i < lpni->lpni_pref_nnids; i++) {
361                 if (lpni->lpni_pref_nids[i] == ni->ni_nid)
362                         return true;
363         }
364         return false;
365 }
366
367 static void
368 lnet_try_destroy_peer_hierarchy_locked(struct lnet_peer_ni *lpni)
369 {
370         struct lnet_peer_net *peer_net;
371         struct lnet_peer *peer;
372
373         /* TODO: could the below situation happen? accessing an already
374          * destroyed peer? */
375         if (lpni->lpni_peer_net == NULL ||
376             lpni->lpni_peer_net->lpn_peer == NULL)
377                 return;
378
379         peer_net = lpni->lpni_peer_net;
380         peer = lpni->lpni_peer_net->lpn_peer;
381
382         list_del_init(&lpni->lpni_on_peer_net_list);
383         lpni->lpni_peer_net = NULL;
384
385         /* if peer_net is empty, then remove it from the peer */
386         if (list_empty(&peer_net->lpn_peer_nis)) {
387                 list_del_init(&peer_net->lpn_on_peer_list);
388                 peer_net->lpn_peer = NULL;
389                 LIBCFS_FREE(peer_net, sizeof(*peer_net));
390
391                 /* if the peer is empty then remove it from the
392                  * the_lnet.ln_peers */
393                 if (list_empty(&peer->lp_peer_nets)) {
394                         list_del_init(&peer->lp_on_lnet_peer_list);
395                         LIBCFS_FREE(peer, sizeof(*peer));
396                 }
397         }
398 }
399
400 static int
401 lnet_build_peer_hierarchy(struct lnet_peer_ni *lpni)
402 {
403         struct lnet_peer *peer;
404         struct lnet_peer_net *peer_net;
405         __u32 lpni_net = LNET_NIDNET(lpni->lpni_nid);
406
407         peer = NULL;
408         peer_net = NULL;
409
410         LIBCFS_ALLOC(peer, sizeof(*peer));
411         if (peer == NULL)
412                 return -ENOMEM;
413
414         LIBCFS_ALLOC(peer_net, sizeof(*peer_net));
415         if (peer_net == NULL) {
416                 LIBCFS_FREE(peer, sizeof(*peer));
417                 return -ENOMEM;
418         }
419
420         INIT_LIST_HEAD(&peer->lp_on_lnet_peer_list);
421         INIT_LIST_HEAD(&peer->lp_peer_nets);
422         INIT_LIST_HEAD(&peer_net->lpn_on_peer_list);
423         INIT_LIST_HEAD(&peer_net->lpn_peer_nis);
424
425         /* build the hierarchy */
426         peer_net->lpn_net_id = lpni_net;
427         peer_net->lpn_peer = peer;
428         lpni->lpni_peer_net = peer_net;
429         peer->lp_primary_nid = lpni->lpni_nid;
430         list_add_tail(&peer_net->lpn_on_peer_list, &peer->lp_peer_nets);
431         list_add_tail(&lpni->lpni_on_peer_net_list, &peer_net->lpn_peer_nis);
432         list_add_tail(&peer->lp_on_lnet_peer_list, &the_lnet.ln_peers);
433
434         return 0;
435 }
436
437 struct lnet_peer_net *
438 lnet_peer_get_net_locked(struct lnet_peer *peer, __u32 net_id)
439 {
440         struct lnet_peer_net *peer_net;
441         list_for_each_entry(peer_net, &peer->lp_peer_nets, lpn_on_peer_list) {
442                 if (peer_net->lpn_net_id == net_id)
443                         return peer_net;
444         }
445         return NULL;
446 }
447
448 /*
449  * given the key nid find the peer to add the new peer NID to. If the key
450  * nid is NULL, then create a new peer, but first make sure that the NID
451  * is unique
452  */
453 int
454 lnet_add_peer_ni_to_peer(lnet_nid_t key_nid, lnet_nid_t nid)
455 {
456         struct lnet_peer_ni *lpni, *lpni2;
457         struct lnet_peer *peer;
458         struct lnet_peer_net *peer_net, *pn;
459         int cpt, cpt2, rc;
460         struct lnet_peer_table *ptable = NULL;
461         __u32 net_id = LNET_NIDNET(nid);
462
463         if (nid == LNET_NID_ANY)
464                 return -EINVAL;
465
466         /* check that nid is unique */
467         cpt = lnet_nid_cpt_hash(nid, LNET_CPT_NUMBER);
468         lnet_net_lock(cpt);
469         lpni = lnet_find_peer_ni_locked(nid);
470         if (lpni != NULL) {
471                 lnet_peer_ni_decref_locked(lpni);
472                 lnet_net_unlock(cpt);
473                 return -EEXIST;
474         }
475         lnet_net_unlock(cpt);
476
477         if (key_nid != LNET_NID_ANY) {
478                 cpt2 = lnet_nid_cpt_hash(key_nid, LNET_CPT_NUMBER);
479                 lnet_net_lock(cpt2);
480                 lpni = lnet_find_peer_ni_locked(key_nid);
481                 if (lpni == NULL) {
482                         lnet_net_unlock(cpt2);
483                         /* key_nid refers to a non-existant peer_ni.*/
484                         return -EINVAL;
485                 }
486                 peer = lpni->lpni_peer_net->lpn_peer;
487                 peer->lp_multi_rail = true;
488                 lnet_peer_ni_decref_locked(lpni);
489                 lnet_net_unlock(cpt2);
490         } else {
491                 lnet_net_lock(LNET_LOCK_EX);
492                 rc = lnet_nid2peerni_locked(&lpni, nid, LNET_LOCK_EX);
493                 if (rc == 0) {
494                         lpni->lpni_peer_net->lpn_peer->lp_multi_rail = true;
495                         lnet_peer_ni_decref_locked(lpni);
496                 }
497                 lnet_net_unlock(LNET_LOCK_EX);
498                 return rc;
499         }
500
501         lpni = NULL;
502
503         LIBCFS_CPT_ALLOC(lpni, lnet_cpt_table(), cpt, sizeof(*lpni));
504         if (lpni == NULL)
505                 return -ENOMEM;
506
507         INIT_LIST_HEAD(&lpni->lpni_txq);
508         INIT_LIST_HEAD(&lpni->lpni_rtrq);
509         INIT_LIST_HEAD(&lpni->lpni_routes);
510         INIT_LIST_HEAD(&lpni->lpni_hashlist);
511         INIT_LIST_HEAD(&lpni->lpni_on_peer_net_list);
512         INIT_LIST_HEAD(&lpni->lpni_on_remote_peer_ni_list);
513
514         lpni->lpni_alive = !lnet_peers_start_down(); /* 1 bit!! */
515         lpni->lpni_last_alive = cfs_time_current(); /* assumes alive */
516         lpni->lpni_ping_feats = LNET_PING_FEAT_INVAL;
517         lpni->lpni_nid = nid;
518         lpni->lpni_cpt = cpt;
519         lnet_set_peer_ni_health_locked(lpni, true);
520
521         /* allocate here in case we need to add a new peer_net */
522         peer_net = NULL;
523         LIBCFS_ALLOC(peer_net, sizeof(*peer_net));
524         if (peer_net == NULL) {
525                 rc = -ENOMEM;
526                 if (lpni != NULL)
527                         LIBCFS_FREE(lpni, sizeof(*lpni));
528                 return rc;
529         }
530
531         lnet_net_lock(LNET_LOCK_EX);
532
533         ptable = the_lnet.ln_peer_tables[cpt];
534         ptable->pt_number++;
535
536         lpni2 = lnet_find_peer_ni_locked(nid);
537         if (lpni2 != NULL) {
538                 lnet_peer_ni_decref_locked(lpni2);
539                 /* sanity check that lpni2's peer is what we expect */
540                 if (lpni2->lpni_peer_net->lpn_peer != peer)
541                         rc = -EEXIST;
542                 else
543                         rc = -EINVAL;
544
545                 ptable->pt_number--;
546                 /* another thread has already added it */
547                 lnet_net_unlock(LNET_LOCK_EX);
548                 LIBCFS_FREE(peer_net, sizeof(*peer_net));
549                 return rc;
550         }
551
552         lpni->lpni_net = lnet_get_net_locked(LNET_NIDNET(lpni->lpni_nid));
553         if (lpni->lpni_net != NULL) {
554                 lpni->lpni_txcredits    =
555                 lpni->lpni_mintxcredits = lpni->lpni_net->net_peertxcredits;
556                 lpni->lpni_rtrcredits =
557                 lpni->lpni_minrtrcredits = lnet_peer_buffer_credits(lpni->lpni_net);
558         } else {
559                 /*
560                  * if you're adding a peer which is not on a local network
561                  * then we can't assign any of the credits. It won't be
562                  * picked for sending anyway. Eventually a network can be
563                  * added, in this case we need to revisit this peer and
564                  * update its credits.
565                  */
566
567                 /* increment refcount for remote peer list */
568                 atomic_inc(&lpni->lpni_refcount);
569                 list_add_tail(&lpni->lpni_on_remote_peer_ni_list,
570                               &the_lnet.ln_remote_peer_ni_list);
571         }
572
573         /* increment refcount for peer on hash list */
574         atomic_inc(&lpni->lpni_refcount);
575
576         list_add_tail(&lpni->lpni_hashlist,
577                       &ptable->pt_hash[lnet_nid2peerhash(nid)]);
578         ptable->pt_version++;
579
580         /* add the lpni to a net */
581         list_for_each_entry(pn, &peer->lp_peer_nets, lpn_on_peer_list) {
582                 if (pn->lpn_net_id == net_id) {
583                         list_add_tail(&lpni->lpni_on_peer_net_list,
584                                       &pn->lpn_peer_nis);
585                         lpni->lpni_peer_net = pn;
586                         lnet_net_unlock(LNET_LOCK_EX);
587                         LIBCFS_FREE(peer_net, sizeof(*peer_net));
588                         return 0;
589                 }
590         }
591
592         INIT_LIST_HEAD(&peer_net->lpn_on_peer_list);
593         INIT_LIST_HEAD(&peer_net->lpn_peer_nis);
594
595         /* build the hierarchy */
596         peer_net->lpn_net_id = net_id;
597         peer_net->lpn_peer = peer;
598         lpni->lpni_peer_net = peer_net;
599         list_add_tail(&lpni->lpni_on_peer_net_list, &peer_net->lpn_peer_nis);
600         list_add_tail(&peer_net->lpn_on_peer_list, &peer->lp_peer_nets);
601
602         lnet_net_unlock(LNET_LOCK_EX);
603         return 0;
604 }
605
606 int
607 lnet_del_peer_ni_from_peer(lnet_nid_t key_nid, lnet_nid_t nid)
608 {
609         int cpt;
610         lnet_nid_t local_nid;
611         struct lnet_peer *peer;
612         struct lnet_peer_ni *lpni, *lpni2;
613         struct lnet_peer_table *ptable = NULL;
614
615         if (key_nid == LNET_NID_ANY)
616                 return -EINVAL;
617
618         local_nid = (nid != LNET_NID_ANY) ? nid : key_nid;
619         cpt = lnet_nid_cpt_hash(local_nid, LNET_CPT_NUMBER);
620         lnet_net_lock(LNET_LOCK_EX);
621
622         lpni = lnet_find_peer_ni_locked(local_nid);
623         if (lpni == NULL) {
624                 lnet_net_unlock(cpt);
625                 return -EINVAL;
626         }
627         lnet_peer_ni_decref_locked(lpni);
628
629         peer = lpni->lpni_peer_net->lpn_peer;
630         LASSERT(peer != NULL);
631
632         if (peer->lp_primary_nid == lpni->lpni_nid) {
633                 /*
634                  * deleting the primary ni is equivalent to deleting the
635                  * entire peer
636                  */
637                 lpni = NULL;
638                 lpni = lnet_get_next_peer_ni_locked(peer, NULL, lpni);
639                 while (lpni != NULL) {
640                         lpni2 = lnet_get_next_peer_ni_locked(peer, NULL, lpni);
641                         cpt = lnet_nid_cpt_hash(lpni->lpni_nid,
642                                                 LNET_CPT_NUMBER);
643                         lnet_peer_remove_from_remote_list(lpni);
644                         ptable = the_lnet.ln_peer_tables[cpt];
645                         ptable->pt_zombies++;
646                         list_del_init(&lpni->lpni_hashlist);
647                         lnet_peer_ni_decref_locked(lpni);
648                         lpni = lpni2;
649                 }
650                 lnet_net_unlock(LNET_LOCK_EX);
651
652                 return 0;
653         }
654
655         lnet_peer_remove_from_remote_list(lpni);
656         cpt = lnet_nid_cpt_hash(lpni->lpni_nid, LNET_CPT_NUMBER);
657         ptable = the_lnet.ln_peer_tables[cpt];
658         ptable->pt_zombies++;
659         list_del_init(&lpni->lpni_hashlist);
660         lnet_peer_ni_decref_locked(lpni);
661         lnet_net_unlock(LNET_LOCK_EX);
662
663         return 0;
664 }
665
666 void
667 lnet_destroy_peer_ni_locked(struct lnet_peer_ni *lpni)
668 {
669         struct lnet_peer_table *ptable;
670
671         LASSERT(atomic_read(&lpni->lpni_refcount) == 0);
672         LASSERT(lpni->lpni_rtr_refcount == 0);
673         LASSERT(list_empty(&lpni->lpni_txq));
674         LASSERT(list_empty(&lpni->lpni_hashlist));
675         LASSERT(lpni->lpni_txqnob == 0);
676         LASSERT(lpni->lpni_peer_net != NULL);
677         LASSERT(lpni->lpni_peer_net->lpn_peer != NULL);
678
679         ptable = the_lnet.ln_peer_tables[lpni->lpni_cpt];
680         LASSERT(ptable->pt_number > 0);
681         ptable->pt_number--;
682
683         lpni->lpni_net = NULL;
684
685         lnet_try_destroy_peer_hierarchy_locked(lpni);
686
687         LIBCFS_FREE(lpni, sizeof(*lpni));
688
689         LASSERT(ptable->pt_zombies > 0);
690         ptable->pt_zombies--;
691 }
692
693 int
694 lnet_nid2peerni_locked(struct lnet_peer_ni **lpnip, lnet_nid_t nid, int cpt)
695 {
696         struct lnet_peer_table  *ptable;
697         struct lnet_peer_ni     *lpni = NULL;
698         struct lnet_peer_ni     *lpni2;
699         int                     cpt2;
700         int                     rc = 0;
701
702         *lpnip = NULL;
703         if (the_lnet.ln_shutdown) /* it's shutting down */
704                 return -ESHUTDOWN;
705
706         /*
707          * calculate cpt2 with the standard hash function
708          * This cpt2 becomes the slot where we'll find or create the peer.
709          */
710         cpt2 = lnet_nid_cpt_hash(nid, LNET_CPT_NUMBER);
711
712         /*
713          * Any changes to the peer tables happen under exclusive write
714          * lock. Any reads to the peer tables can be done via a standard
715          * CPT read lock.
716          */
717         if (cpt != LNET_LOCK_EX) {
718                 lnet_net_unlock(cpt);
719                 lnet_net_lock(LNET_LOCK_EX);
720         }
721
722         ptable = the_lnet.ln_peer_tables[cpt2];
723         lpni = lnet_get_peer_ni_locked(ptable, nid);
724         if (lpni != NULL) {
725                 *lpnip = lpni;
726                 if (cpt != LNET_LOCK_EX) {
727                         lnet_net_unlock(LNET_LOCK_EX);
728                         lnet_net_lock(cpt);
729                 }
730                 return 0;
731         }
732
733         /*
734          * take extra refcount in case another thread has shutdown LNet
735          * and destroyed locks and peer-table before I finish the allocation
736          */
737         ptable->pt_number++;
738         lnet_net_unlock(LNET_LOCK_EX);
739
740         LIBCFS_CPT_ALLOC(lpni, lnet_cpt_table(), cpt2, sizeof(*lpni));
741
742         if (lpni == NULL) {
743                 rc = -ENOMEM;
744                 lnet_net_lock(cpt);
745                 goto out;
746         }
747
748         INIT_LIST_HEAD(&lpni->lpni_txq);
749         INIT_LIST_HEAD(&lpni->lpni_rtrq);
750         INIT_LIST_HEAD(&lpni->lpni_routes);
751         INIT_LIST_HEAD(&lpni->lpni_hashlist);
752         INIT_LIST_HEAD(&lpni->lpni_on_peer_net_list);
753         INIT_LIST_HEAD(&lpni->lpni_on_remote_peer_ni_list);
754
755         lpni->lpni_alive = !lnet_peers_start_down(); /* 1 bit!! */
756         lpni->lpni_last_alive = cfs_time_current(); /* assumes alive */
757         lpni->lpni_ping_feats = LNET_PING_FEAT_INVAL;
758         lpni->lpni_nid = nid;
759         lpni->lpni_cpt = cpt2;
760         atomic_set(&lpni->lpni_refcount, 2);    /* 1 for caller; 1 for hash */
761
762         rc = lnet_build_peer_hierarchy(lpni);
763         if (rc != 0)
764                 goto out;
765
766         lnet_net_lock(LNET_LOCK_EX);
767
768         if (the_lnet.ln_shutdown) {
769                 rc = -ESHUTDOWN;
770                 goto out;
771         }
772
773         lpni2 = lnet_get_peer_ni_locked(ptable, nid);
774         if (lpni2 != NULL) {
775                 *lpnip = lpni2;
776                 goto out;
777         }
778
779         lpni->lpni_net = lnet_get_net_locked(LNET_NIDNET(lpni->lpni_nid));
780         if (lpni->lpni_net) {
781                 lpni->lpni_txcredits    =
782                 lpni->lpni_mintxcredits =
783                         lpni->lpni_net->net_tunables.lct_peer_tx_credits;
784                 lpni->lpni_rtrcredits    =
785                 lpni->lpni_minrtrcredits =
786                         lnet_peer_buffer_credits(lpni->lpni_net);
787         } else {
788                 /*
789                  * if you're adding a peer which is not on a local network
790                  * then we can't assign any of the credits. It won't be
791                  * picked for sending anyway. Eventually a network can be
792                  * added, in this case we need to revisit this peer and
793                  * update its credits.
794                  */
795
796                 CDEBUG(D_NET, "peer_ni %s is not directly connected\n",
797                        libcfs_nid2str(nid));
798                 /* increment refcount for remote peer list */
799                 atomic_inc(&lpni->lpni_refcount);
800                 list_add_tail(&lpni->lpni_on_remote_peer_ni_list,
801                               &the_lnet.ln_remote_peer_ni_list);
802         }
803
804         lnet_set_peer_ni_health_locked(lpni, true);
805
806         list_add_tail(&lpni->lpni_hashlist,
807                         &ptable->pt_hash[lnet_nid2peerhash(nid)]);
808         ptable->pt_version++;
809         *lpnip = lpni;
810
811         if (cpt != LNET_LOCK_EX) {
812                 lnet_net_unlock(LNET_LOCK_EX);
813                 lnet_net_lock(cpt);
814         }
815
816         return 0;
817 out:
818         if (lpni != NULL) {
819                 lnet_try_destroy_peer_hierarchy_locked(lpni);
820                 LIBCFS_FREE(lpni, sizeof(*lpni));
821         }
822         ptable->pt_number--;
823         if (cpt != LNET_LOCK_EX) {
824                 lnet_net_unlock(LNET_LOCK_EX);
825                 lnet_net_lock(cpt);
826         }
827         return rc;
828 }
829
830 void
831 lnet_debug_peer(lnet_nid_t nid)
832 {
833         char                    *aliveness = "NA";
834         struct lnet_peer_ni     *lp;
835         int                     rc;
836         int                     cpt;
837
838         cpt = lnet_cpt_of_nid(nid, NULL);
839         lnet_net_lock(cpt);
840
841         rc = lnet_nid2peerni_locked(&lp, nid, cpt);
842         if (rc != 0) {
843                 lnet_net_unlock(cpt);
844                 CDEBUG(D_WARNING, "No peer %s\n", libcfs_nid2str(nid));
845                 return;
846         }
847
848         if (lnet_isrouter(lp) || lnet_peer_aliveness_enabled(lp))
849                 aliveness = lp->lpni_alive ? "up" : "down";
850
851         CDEBUG(D_WARNING, "%-24s %4d %5s %5d %5d %5d %5d %5d %ld\n",
852                libcfs_nid2str(lp->lpni_nid), atomic_read(&lp->lpni_refcount),
853                aliveness, lp->lpni_net->net_tunables.lct_peer_tx_credits,
854                lp->lpni_rtrcredits, lp->lpni_minrtrcredits,
855                lp->lpni_txcredits, lp->lpni_mintxcredits, lp->lpni_txqnob);
856
857         lnet_peer_ni_decref_locked(lp);
858
859         lnet_net_unlock(cpt);
860 }
861
862 int lnet_get_peer_ni_info(__u32 peer_index, __u64 *nid,
863                           char aliveness[LNET_MAX_STR_LEN],
864                           __u32 *cpt_iter, __u32 *refcount,
865                           __u32 *ni_peer_tx_credits, __u32 *peer_tx_credits,
866                           __u32 *peer_rtr_credits, __u32 *peer_min_rtr_credits,
867                           __u32 *peer_tx_qnob)
868 {
869         struct lnet_peer_table          *peer_table;
870         struct lnet_peer_ni             *lp;
871         int                             j;
872         int                             lncpt;
873         bool                            found = false;
874
875         /* get the number of CPTs */
876         lncpt = cfs_percpt_number(the_lnet.ln_peer_tables);
877
878         /* if the cpt number to be examined is >= the number of cpts in
879          * the system then indicate that there are no more cpts to examin
880          */
881         if (*cpt_iter >= lncpt)
882                 return -ENOENT;
883
884         /* get the current table */
885         peer_table = the_lnet.ln_peer_tables[*cpt_iter];
886         /* if the ptable is NULL then there are no more cpts to examine */
887         if (peer_table == NULL)
888                 return -ENOENT;
889
890         lnet_net_lock(*cpt_iter);
891
892         for (j = 0; j < LNET_PEER_HASH_SIZE && !found; j++) {
893                 struct list_head *peers = &peer_table->pt_hash[j];
894
895                 list_for_each_entry(lp, peers, lpni_hashlist) {
896                         if (peer_index-- > 0)
897                                 continue;
898
899                         snprintf(aliveness, LNET_MAX_STR_LEN, "NA");
900                         if (lnet_isrouter(lp) ||
901                                 lnet_peer_aliveness_enabled(lp))
902                                 snprintf(aliveness, LNET_MAX_STR_LEN,
903                                          lp->lpni_alive ? "up" : "down");
904
905                         *nid = lp->lpni_nid;
906                         *refcount = atomic_read(&lp->lpni_refcount);
907                         *ni_peer_tx_credits =
908                                 lp->lpni_net->net_tunables.lct_peer_tx_credits;
909                         *peer_tx_credits = lp->lpni_txcredits;
910                         *peer_rtr_credits = lp->lpni_rtrcredits;
911                         *peer_min_rtr_credits = lp->lpni_mintxcredits;
912                         *peer_tx_qnob = lp->lpni_txqnob;
913
914                         found = true;
915                 }
916
917         }
918         lnet_net_unlock(*cpt_iter);
919
920         *cpt_iter = lncpt;
921
922         return found ? 0 : -ENOENT;
923 }
924
925 int lnet_get_peer_info(__u32 idx, lnet_nid_t *primary_nid, lnet_nid_t *nid,
926                        struct lnet_peer_ni_credit_info *peer_ni_info)
927 {
928         struct lnet_peer_ni *lpni = NULL;
929         struct lnet_peer_net *lpn = NULL;
930         struct lnet_peer *lp = NULL;
931
932         lpni = lnet_get_peer_ni_idx_locked(idx, &lpn, &lp);
933
934         if (!lpni)
935                 return -ENOENT;
936
937         *primary_nid = lp->lp_primary_nid;
938         *nid = lpni->lpni_nid;
939         snprintf(peer_ni_info->cr_aliveness, LNET_MAX_STR_LEN, "NA");
940         if (lnet_isrouter(lpni) ||
941                 lnet_peer_aliveness_enabled(lpni))
942                 snprintf(peer_ni_info->cr_aliveness, LNET_MAX_STR_LEN,
943                          lpni->lpni_alive ? "up" : "down");
944
945         peer_ni_info->cr_refcount = atomic_read(&lpni->lpni_refcount);
946         peer_ni_info->cr_ni_peer_tx_credits = (lpni->lpni_net != NULL) ?
947                 lpni->lpni_net->net_peertxcredits : 0;
948         peer_ni_info->cr_peer_tx_credits = lpni->lpni_txcredits;
949         peer_ni_info->cr_peer_rtr_credits = lpni->lpni_rtrcredits;
950         peer_ni_info->cr_peer_min_rtr_credits = lpni->lpni_mintxcredits;
951         peer_ni_info->cr_peer_tx_qnob = lpni->lpni_txqnob;
952
953         return 0;
954 }