+static void
+lnet_ping_target_fini(void)
+{
+ int rc;
+
+ lnet_ping_md_unlink(the_lnet.ln_ping_info,
+ &the_lnet.ln_ping_target_md);
+
+ rc = LNetEQFree(the_lnet.ln_ping_target_eq);
+ LASSERT(rc == 0);
+
+ lnet_ping_info_destroy();
+}
+
+static int
+lnet_ni_tq_credits(lnet_ni_t *ni)
+{
+ int credits;
+
+ LASSERT(ni->ni_ncpts >= 1);
+
+ if (ni->ni_ncpts == 1)
+ return ni->ni_maxtxcredits;
+
+ credits = ni->ni_maxtxcredits / ni->ni_ncpts;
+ credits = max(credits, 8 * ni->ni_peertxcredits);
+ credits = min(credits, ni->ni_maxtxcredits);
+
+ return credits;
+}
+
+static void
+lnet_ni_unlink_locked(lnet_ni_t *ni)
+{
+ if (!list_empty(&ni->ni_cptlist)) {
+ list_del_init(&ni->ni_cptlist);
+ lnet_ni_decref_locked(ni, 0);
+ }
+
+ /* move it to zombie list and nobody can find it anymore */
+ LASSERT(!list_empty(&ni->ni_list));
+ list_move(&ni->ni_list, &the_lnet.ln_nis_zombie);
+ lnet_ni_decref_locked(ni, 0); /* drop ln_nis' ref */
+}
+
+static void
+lnet_clear_zombies_nis_locked(void)
+{
+ int i;
+ int islo;
+ lnet_ni_t *ni;
+
+ /* Now wait for the NI's I just nuked to show up on ln_zombie_nis
+ * and shut them down in guaranteed thread context */
+ i = 2;
+ while (!list_empty(&the_lnet.ln_nis_zombie)) {
+ int *ref;
+ int j;
+
+ ni = list_entry(the_lnet.ln_nis_zombie.next,
+ lnet_ni_t, ni_list);
+ list_del_init(&ni->ni_list);
+ cfs_percpt_for_each(ref, j, ni->ni_refs) {
+ if (*ref == 0)
+ continue;
+ /* still busy, add it back to zombie list */
+ list_add(&ni->ni_list, &the_lnet.ln_nis_zombie);
+ break;
+ }
+
+ if (!list_empty(&ni->ni_list)) {
+ lnet_net_unlock(LNET_LOCK_EX);
+ ++i;
+ if ((i & (-i)) == i) {
+ CDEBUG(D_WARNING,
+ "Waiting for zombie LNI %s\n",
+ libcfs_nid2str(ni->ni_nid));
+ }
+ cfs_pause(cfs_time_seconds(1));
+ lnet_net_lock(LNET_LOCK_EX);
+ continue;
+ }
+
+ ni->ni_lnd->lnd_refcount--;
+ lnet_net_unlock(LNET_LOCK_EX);
+
+ islo = ni->ni_lnd->lnd_type == LOLND;
+
+ LASSERT(!in_interrupt());
+ (ni->ni_lnd->lnd_shutdown)(ni);
+
+ /* can't deref lnd anymore now; it might have unregistered
+ * itself... */
+
+ if (!islo)
+ CDEBUG(D_LNI, "Removed LNI %s\n",
+ libcfs_nid2str(ni->ni_nid));
+
+ lnet_ni_free(ni);
+ i = 2;
+ lnet_net_lock(LNET_LOCK_EX);
+ }
+}
+
+static void
+lnet_shutdown_lndnis(void)
+{
+ int i;
+ lnet_ni_t *ni;
+
+ /* NB called holding the global mutex */
+
+ /* All quiet on the API front */
+ LASSERT(!the_lnet.ln_shutdown);
+ LASSERT(the_lnet.ln_refcount == 0);
+ LASSERT(list_empty(&the_lnet.ln_nis_zombie));
+
+ lnet_net_lock(LNET_LOCK_EX);
+ the_lnet.ln_shutdown = 1; /* flag shutdown */
+
+ /* Unlink NIs from the global table */
+ while (!list_empty(&the_lnet.ln_nis)) {
+ ni = list_entry(the_lnet.ln_nis.next,
+ lnet_ni_t, ni_list);
+ lnet_ni_unlink_locked(ni);
+ }
+
+ /* Drop the cached eqwait NI. */
+ if (the_lnet.ln_eq_waitni != NULL) {
+ lnet_ni_decref_locked(the_lnet.ln_eq_waitni, 0);
+ the_lnet.ln_eq_waitni = NULL;
+ }
+
+ /* Drop the cached loopback NI. */
+ if (the_lnet.ln_loni != NULL) {
+ lnet_ni_decref_locked(the_lnet.ln_loni, 0);
+ the_lnet.ln_loni = NULL;
+ }
+
+ lnet_net_unlock(LNET_LOCK_EX);
+
+ /* Clear lazy portals and drop delayed messages which hold refs
+ * on their lnet_msg_t::msg_rxpeer */
+ for (i = 0; i < the_lnet.ln_nportals; i++)
+ LNetClearLazyPortal(i);
+
+ /* Clear the peer table and wait for all peers to go (they hold refs on
+ * their NIs) */
+ lnet_peer_tables_cleanup(NULL);
+
+ lnet_net_lock(LNET_LOCK_EX);
+
+ lnet_clear_zombies_nis_locked();
+ the_lnet.ln_shutdown = 0;
+ lnet_net_unlock(LNET_LOCK_EX);
+}
+
+/* shutdown down the NI and release refcount */
+static void
+lnet_shutdown_lndni(struct lnet_ni *ni)
+{
+ lnet_net_lock(LNET_LOCK_EX);
+ lnet_ni_unlink_locked(ni);
+ lnet_net_unlock(LNET_LOCK_EX);
+
+ /* Do peer table cleanup for this ni */
+ lnet_peer_tables_cleanup(ni);
+
+ lnet_net_lock(LNET_LOCK_EX);
+ lnet_clear_zombies_nis_locked();
+ lnet_net_unlock(LNET_LOCK_EX);
+}
+
+static int
+lnet_startup_lndni(struct lnet_ni *ni, __s32 peer_timeout,
+ __s32 peer_cr, __s32 peer_buf_cr, __s32 credits)
+{
+ int rc = 0;
+ int lnd_type;
+ lnd_t *lnd;
+ struct lnet_tx_queue *tq;
+ int i;
+
+ lnd_type = LNET_NETTYP(LNET_NIDNET(ni->ni_nid));
+
+ LASSERT(libcfs_isknown_lnd(lnd_type));
+
+ if (lnd_type == CIBLND || lnd_type == OPENIBLND ||
+ lnd_type == IIBLND || lnd_type == VIBLND) {
+ CERROR("LND %s obsoleted\n", libcfs_lnd2str(lnd_type));
+ goto failed0;
+ }
+
+ /* Make sure this new NI is unique. */
+ lnet_net_lock(LNET_LOCK_EX);
+ if (!lnet_net_unique(LNET_NIDNET(ni->ni_nid), &the_lnet.ln_nis)) {
+ if (lnd_type == LOLND) {
+ lnet_net_unlock(LNET_LOCK_EX);
+ lnet_ni_free(ni);
+ return 0;
+ }
+ lnet_net_unlock(LNET_LOCK_EX);
+
+ CERROR("Net %s is not unique\n",
+ libcfs_net2str(LNET_NIDNET(ni->ni_nid)));
+ goto failed0;
+ }
+ lnet_net_unlock(LNET_LOCK_EX);
+
+ LNET_MUTEX_LOCK(&the_lnet.ln_lnd_mutex);
+ lnd = lnet_find_lnd_by_type(lnd_type);
+
+#ifdef __KERNEL__
+ if (lnd == NULL) {
+ LNET_MUTEX_UNLOCK(&the_lnet.ln_lnd_mutex);
+ rc = request_module("%s",
+ libcfs_lnd2modname(lnd_type));
+ LNET_MUTEX_LOCK(&the_lnet.ln_lnd_mutex);
+
+ lnd = lnet_find_lnd_by_type(lnd_type);
+ if (lnd == NULL) {
+ LNET_MUTEX_UNLOCK(&the_lnet.ln_lnd_mutex);
+ CERROR("Can't load LND %s, module %s, rc=%d\n",
+ libcfs_lnd2str(lnd_type),
+ libcfs_lnd2modname(lnd_type), rc);
+#ifndef HAVE_MODULE_LOADING_SUPPORT
+ LCONSOLE_ERROR_MSG(0x104, "Your kernel must be "
+ "compiled with kernel module "
+ "loading support.");
+#endif
+ goto failed0;
+ }
+ }
+#else
+ if (lnd == NULL) {
+ LNET_MUTEX_UNLOCK(&the_lnet.ln_lnd_mutex);
+ CERROR("LND %s not supported\n",
+ libcfs_lnd2str(lnd_type));
+ goto failed0;
+ }
+#endif
+
+ lnet_net_lock(LNET_LOCK_EX);
+ lnd->lnd_refcount++;
+ lnet_net_unlock(LNET_LOCK_EX);
+
+ ni->ni_lnd = lnd;
+
+ rc = (lnd->lnd_startup)(ni);
+
+ LNET_MUTEX_UNLOCK(&the_lnet.ln_lnd_mutex);
+
+ if (rc != 0) {
+ LCONSOLE_ERROR_MSG(0x105, "Error %d starting up LNI %s\n",
+ rc, libcfs_lnd2str(lnd->lnd_type));
+ lnet_net_lock(LNET_LOCK_EX);
+ lnd->lnd_refcount--;
+ lnet_net_unlock(LNET_LOCK_EX);
+ goto failed0;
+ }
+
+ /* If given some LND tunable parameters, parse those now to
+ * override the values in the NI structure. */
+ if (peer_buf_cr >= 0)
+ ni->ni_peerrtrcredits = peer_buf_cr;
+ if (peer_timeout >= 0)
+ ni->ni_peertimeout = peer_timeout;
+ /*
+ * TODO
+ * Note: For now, don't allow the user to change
+ * peertxcredits as this number is used in the
+ * IB LND to control queue depth.
+ * if (peer_cr != -1)
+ * ni->ni_peertxcredits = peer_cr;
+ */
+ if (credits >= 0)
+ ni->ni_maxtxcredits = credits;
+
+ LASSERT(ni->ni_peertimeout <= 0 || lnd->lnd_query != NULL);
+
+ lnet_net_lock(LNET_LOCK_EX);
+ /* refcount for ln_nis */
+ lnet_ni_addref_locked(ni, 0);
+ list_add_tail(&ni->ni_list, &the_lnet.ln_nis);
+ if (ni->ni_cpts != NULL) {
+ lnet_ni_addref_locked(ni, 0);
+ list_add_tail(&ni->ni_cptlist, &the_lnet.ln_nis_cpt);
+ }
+
+ lnet_net_unlock(LNET_LOCK_EX);
+
+ if (lnd->lnd_type == LOLND) {
+ lnet_ni_addref(ni);
+ LASSERT(the_lnet.ln_loni == NULL);
+ the_lnet.ln_loni = ni;
+ return 0;
+ }
+
+#ifndef __KERNEL__
+ if (lnd->lnd_wait != NULL) {
+ if (the_lnet.ln_eq_waitni == NULL) {
+ lnet_ni_addref(ni);
+ the_lnet.ln_eq_waitni = ni;
+ }
+ } else {
+# ifndef HAVE_LIBPTHREAD
+ LCONSOLE_ERROR_MSG(0x106, "LND %s not supported in a "
+ "single-threaded runtime\n",
+ libcfs_lnd2str(lnd_type));
+ /* shutdown the NI since if we get here then it must've already
+ * been started
+ */
+ lnet_shutdown_lndni(ni);
+ return -EINVAL;
+# endif
+ }
+#endif
+ if (ni->ni_peertxcredits == 0 || ni->ni_maxtxcredits == 0) {
+ LCONSOLE_ERROR_MSG(0x107, "LNI %s has no %scredits\n",
+ libcfs_lnd2str(lnd->lnd_type),
+ ni->ni_peertxcredits == 0 ?
+ "" : "per-peer ");
+ /* shutdown the NI since if we get here then it must've already
+ * been started
+ */
+ lnet_shutdown_lndni(ni);
+ return -EINVAL;
+ }
+
+ cfs_percpt_for_each(tq, i, ni->ni_tx_queues) {
+ tq->tq_credits_min =
+ tq->tq_credits_max =
+ tq->tq_credits = lnet_ni_tq_credits(ni);
+ }
+
+ CDEBUG(D_LNI, "Added LNI %s [%d/%d/%d/%d]\n",
+ libcfs_nid2str(ni->ni_nid), ni->ni_peertxcredits,
+ lnet_ni_tq_credits(ni) * LNET_CPT_NUMBER,
+ ni->ni_peerrtrcredits, ni->ni_peertimeout);
+
+ return 0;
+failed0:
+ lnet_ni_free(ni);
+ return -EINVAL;
+}
+
+static int
+lnet_startup_lndnis(struct list_head *nilist)
+{
+ struct lnet_ni *ni;
+ int rc;
+ int lnd_type;
+ int ni_count = 0;
+
+ while (!list_empty(nilist)) {
+ ni = list_entry(nilist->next, lnet_ni_t, ni_list);
+ list_del(&ni->ni_list);
+ rc = lnet_startup_lndni(ni, -1, -1, -1, -1);
+
+ if (rc < 0)
+ goto failed;
+
+ ni_count++;
+ }
+
+ if (the_lnet.ln_eq_waitni != NULL && ni_count > 1) {
+ lnd_type = the_lnet.ln_eq_waitni->ni_lnd->lnd_type;
+ LCONSOLE_ERROR_MSG(0x109, "LND %s can only run single-network"
+ "\n",
+ libcfs_lnd2str(lnd_type));
+ rc = -EINVAL;
+ goto failed;
+ }
+
+ return ni_count;
+failed:
+ lnet_shutdown_lndnis();
+
+ return rc;
+}
+
+/**
+ * Initialize LNet library.
+ *
+ * Only userspace program needs to call this function - it's automatically
+ * called in the kernel at module loading time. Caller has to call LNetFini()
+ * after a call to LNetInit(), if and only if the latter returned 0. It must
+ * be called exactly once.
+ *
+ * \return 0 on success, and -ve on failures.
+ */
+int
+LNetInit(void)
+{
+ int rc;
+
+ lnet_assert_wire_constants();
+ LASSERT(!the_lnet.ln_init);
+
+ memset(&the_lnet, 0, sizeof(the_lnet));
+
+ /* refer to global cfs_cpt_table for now */
+ the_lnet.ln_cpt_table = cfs_cpt_table;
+ the_lnet.ln_cpt_number = cfs_cpt_number(cfs_cpt_table);
+
+ LASSERT(the_lnet.ln_cpt_number > 0);
+ if (the_lnet.ln_cpt_number > LNET_CPT_MAX) {
+ /* we are under risk of consuming all lh_cookie */
+ CERROR("Can't have %d CPTs for LNet (max allowed is %d), "
+ "please change setting of CPT-table and retry\n",
+ the_lnet.ln_cpt_number, LNET_CPT_MAX);
+ return -1;
+ }
+
+ while ((1 << the_lnet.ln_cpt_bits) < the_lnet.ln_cpt_number)
+ the_lnet.ln_cpt_bits++;
+
+ rc = lnet_create_locks();
+ if (rc != 0) {
+ CERROR("Can't create LNet global locks: %d\n", rc);
+ return -1;
+ }
+
+ the_lnet.ln_refcount = 0;
+ the_lnet.ln_init = 1;
+ LNetInvalidateHandle(&the_lnet.ln_rc_eqh);
+ INIT_LIST_HEAD(&the_lnet.ln_lnds);
+ INIT_LIST_HEAD(&the_lnet.ln_rcd_zombie);
+ INIT_LIST_HEAD(&the_lnet.ln_rcd_deathrow);
+
+#ifdef __KERNEL__
+ /* The hash table size is the number of bits it takes to express the set
+ * ln_num_routes, minus 1 (better to under estimate than over so we
+ * don't waste memory). */
+ if (rnet_htable_size <= 0)
+ rnet_htable_size = LNET_REMOTE_NETS_HASH_DEFAULT;
+ else if (rnet_htable_size > LNET_REMOTE_NETS_HASH_MAX)
+ rnet_htable_size = LNET_REMOTE_NETS_HASH_MAX;
+ the_lnet.ln_remote_nets_hbits = max_t(int, 1,
+ order_base_2(rnet_htable_size) - 1);
+
+ /* All LNDs apart from the LOLND are in separate modules. They
+ * register themselves when their module loads, and unregister
+ * themselves when their module is unloaded. */
+#else
+ the_lnet.ln_remote_nets_hbits = 8;
+
+ /* Register LNDs
+ * NB the order here determines default 'networks=' order */
+# ifdef HAVE_LIBPTHREAD
+ LNET_REGISTER_ULND(the_tcplnd);
+# endif
+#endif
+ lnet_register_lnd(&the_lolnd);
+ return 0;
+}
+EXPORT_SYMBOL(LNetInit);
+
+/**