+
+ /* add the lpni to a net */
+ list_for_each_entry(pn, &peer->lp_peer_nets, lpn_on_peer_list) {
+ if (pn->lpn_net_id == net_id) {
+ list_add_tail(&lpni->lpni_on_peer_net_list,
+ &pn->lpn_peer_nis);
+ lpni->lpni_peer_net = pn;
+ lnet_net_unlock(LNET_LOCK_EX);
+ LIBCFS_FREE(peer_net, sizeof(*peer_net));
+ return 0;
+ }
+ }
+
+ INIT_LIST_HEAD(&peer_net->lpn_on_peer_list);
+ INIT_LIST_HEAD(&peer_net->lpn_peer_nis);
+
+ /* build the hierarchy */
+ peer_net->lpn_net_id = net_id;
+ peer_net->lpn_peer = peer;
+ lpni->lpni_peer_net = peer_net;
+ list_add_tail(&lpni->lpni_on_peer_net_list, &peer_net->lpn_peer_nis);
+ list_add_tail(&peer_net->lpn_on_peer_list, &peer->lp_peer_nets);
+
+ lnet_net_unlock(LNET_LOCK_EX);
+ return 0;
+}
+
+int
+lnet_del_peer_ni_from_peer(lnet_nid_t key_nid, lnet_nid_t nid)
+{
+ int cpt;
+ lnet_nid_t local_nid;
+ struct lnet_peer *peer;
+ struct lnet_peer_ni *lpni, *lpni2;
+ struct lnet_peer_table *ptable = NULL;
+
+ if (key_nid == LNET_NID_ANY)
+ return -EINVAL;
+
+ local_nid = (nid != LNET_NID_ANY) ? nid : key_nid;
+ cpt = lnet_nid_cpt_hash(local_nid, LNET_CPT_NUMBER);
+ lnet_net_lock(LNET_LOCK_EX);
+
+ lpni = lnet_find_peer_ni_locked(local_nid);
+ if (lpni == NULL) {
+ lnet_net_unlock(cpt);
+ return -EINVAL;
+ }
+ lnet_peer_ni_decref_locked(lpni);
+
+ peer = lpni->lpni_peer_net->lpn_peer;
+ LASSERT(peer != NULL);
+
+ if (peer->lp_primary_nid == lpni->lpni_nid) {
+ /*
+ * deleting the primary ni is equivalent to deleting the
+ * entire peer
+ */
+ lpni = NULL;
+ lpni = lnet_get_next_peer_ni_locked(peer, NULL, lpni);
+ while (lpni != NULL) {
+ lpni2 = lnet_get_next_peer_ni_locked(peer, NULL, lpni);
+ cpt = lnet_nid_cpt_hash(lpni->lpni_nid,
+ LNET_CPT_NUMBER);
+ lnet_peer_remove_from_remote_list(lpni);
+ ptable = the_lnet.ln_peer_tables[cpt];
+ ptable->pt_zombies++;
+ list_del_init(&lpni->lpni_hashlist);
+ lnet_peer_ni_decref_locked(lpni);
+ lpni = lpni2;
+ }
+ lnet_net_unlock(LNET_LOCK_EX);
+
+ return 0;
+ }
+
+ lnet_peer_remove_from_remote_list(lpni);
+ cpt = lnet_nid_cpt_hash(lpni->lpni_nid, LNET_CPT_NUMBER);
+ ptable = the_lnet.ln_peer_tables[cpt];
+ ptable->pt_zombies++;
+ list_del_init(&lpni->lpni_hashlist);
+ lnet_peer_ni_decref_locked(lpni);
+ lnet_net_unlock(LNET_LOCK_EX);
+
+ return 0;
+}
+
+void
+lnet_destroy_peer_ni_locked(struct lnet_peer_ni *lpni)
+{
+ struct lnet_peer_table *ptable;
+
+ LASSERT(atomic_read(&lpni->lpni_refcount) == 0);
+ LASSERT(lpni->lpni_rtr_refcount == 0);
+ LASSERT(list_empty(&lpni->lpni_txq));
+ LASSERT(list_empty(&lpni->lpni_hashlist));
+ LASSERT(lpni->lpni_txqnob == 0);
+ LASSERT(lpni->lpni_peer_net != NULL);
+ LASSERT(lpni->lpni_peer_net->lpn_peer != NULL);
+
+ ptable = the_lnet.ln_peer_tables[lpni->lpni_cpt];
+ LASSERT(ptable->pt_number > 0);
+ ptable->pt_number--;
+
+ lpni->lpni_net = NULL;
+
+ lnet_try_destroy_peer_hierarchy_locked(lpni);
+
+ LIBCFS_FREE(lpni, sizeof(*lpni));
+
+ LASSERT(ptable->pt_zombies > 0);
+ ptable->pt_zombies--;
+}
+
+int
+lnet_nid2peerni_locked(struct lnet_peer_ni **lpnip, lnet_nid_t nid, int cpt)
+{
+ struct lnet_peer_table *ptable;
+ struct lnet_peer_ni *lpni = NULL;
+ struct lnet_peer_ni *lpni2;
+ int cpt2;
+ int rc = 0;
+
+ *lpnip = NULL;
+ if (the_lnet.ln_shutdown) /* it's shutting down */
+ return -ESHUTDOWN;
+
+ /*
+ * calculate cpt2 with the standard hash function
+ * This cpt2 becomes the slot where we'll find or create the peer.
+ */
+ cpt2 = lnet_nid_cpt_hash(nid, LNET_CPT_NUMBER);
+
+ /*
+ * Any changes to the peer tables happen under exclusive write
+ * lock. Any reads to the peer tables can be done via a standard
+ * CPT read lock.
+ */
+ if (cpt != LNET_LOCK_EX) {
+ lnet_net_unlock(cpt);
+ lnet_net_lock(LNET_LOCK_EX);
+ }
+
+ ptable = the_lnet.ln_peer_tables[cpt2];
+ lpni = lnet_get_peer_ni_locked(ptable, nid);
+ if (lpni != NULL) {
+ *lpnip = lpni;
+ if (cpt != LNET_LOCK_EX) {
+ lnet_net_unlock(LNET_LOCK_EX);
+ lnet_net_lock(cpt);
+ }
+ return 0;
+ }
+
+ /*
+ * take extra refcount in case another thread has shutdown LNet
+ * and destroyed locks and peer-table before I finish the allocation
+ */