Whamcloud - gitweb
b=17710
[fs/lustre-release.git] / lustre / obdclass / obd_config.c
index f4c88ca..eb8d415 100644 (file)
@@ -53,8 +53,9 @@
 #include <lustre_param.h>
 #include <class_hash.h>
 
-extern struct lustre_hash_operations uuid_hash_operations;
-extern struct lustre_hash_operations nid_hash_operations;
+static lustre_hash_ops_t uuid_hash_ops;
+static lustre_hash_ops_t nid_hash_ops;
+static lustre_hash_ops_t nid_stat_hash_ops;
 
 /*********** string parsing utils *********/
 
@@ -75,6 +76,72 @@ int class_find_param(char *buf, char *key, char **valp)
         return 0;
 }
 
+/**
+ * Finds a parameter in \a params and copies it to \a copy.
+ *
+ * Leading spaces are skipped. Next space or end of string is the
+ * parameter terminator with the exception that spaces inside single or double
+ * quotes get included into a parameter. The parameter is copied into \a copy
+ * which has to be allocated big enough by a caller, quotes are stripped in
+ * the copy and the copy is terminated by 0.
+ *
+ * On return \a params is set to next parameter or to NULL if last
+ * parameter is returned.
+ *
+ * \retval 0 if parameter is returned in \a copy
+ * \retval 1 otherwise
+ * \retval -EINVAL if unbalanced quota is found
+ */
+int class_get_next_param(char **params, char *copy)
+{
+        char *q1, *q2, *str;
+        int len;
+
+        str = *params;
+        while (*str == ' ')
+                str++;
+
+        if (*str == '\0') {
+                *params = NULL;
+                return 1;
+        }
+
+        while (1) {
+                q1 = strpbrk(str, " '\"");
+                if (q1 == NULL) {
+                        len = strlen(str);
+                        memcpy(copy, str, len);
+                        copy[len] = '\0';
+                        *params = NULL;
+                        return 0;
+                }
+                len = q1 - str;
+                if (*q1 == ' ') {
+                        memcpy(copy, str, len);
+                        copy[len] = '\0';
+                        *params = str + len;
+                        return 0;
+                }
+
+                memcpy(copy, str, len);
+                copy += len;
+
+                /* search for the matching closing quote */
+                str = q1 + 1;
+                q2 = strchr(str, *q1);
+                if (q2 == NULL) {
+                        CERROR("Unbalanced quota in parameters: \"%s\"\n",
+                               *params);
+                        return -EINVAL;
+                }
+                len = q2 - str;
+                memcpy(copy, str, len);
+                copy += len;
+                str = q2 + 1;
+        }
+        return 1;
+}
+
 /* returns 0 if this is the first key in the buffer, else 1.
    valp points to first char after key. */
 int class_match_param(char *buf, char *key, char **valp)
@@ -128,6 +195,7 @@ int class_parse_nid(char *buf, lnet_nid_t *nid, char **endh)
 }
 
 EXPORT_SYMBOL(class_find_param);
+EXPORT_SYMBOL(class_get_next_param);
 EXPORT_SYMBOL(class_match_param);
 EXPORT_SYMBOL(class_parse_nid);
 
@@ -207,7 +275,7 @@ int class_attach(struct lustre_cfg *lcfg)
         CFS_INIT_LIST_HEAD(&obd->obd_lock_replay_queue);
         CFS_INIT_LIST_HEAD(&obd->obd_final_req_queue);
 
-        llog_group_init(&obd->obd_olg, OBD_LLOG_GROUP);
+        llog_group_init(&obd->obd_olg, FILTER_GROUP_LLOG);
 
         spin_lock_init(&obd->obd_uncommitted_replies_lock);
         CFS_INIT_LIST_HEAD(&obd->obd_uncommitted_replies);
@@ -231,6 +299,8 @@ int class_attach(struct lustre_cfg *lcfg)
         spin_lock(&obd->obd_dev_lock);
         atomic_set(&obd->obd_refcount, 1);
         spin_unlock(&obd->obd_dev_lock);
+        lu_ref_init(&obd->obd_reference);
+        lu_ref_add(&obd->obd_reference, "attach", obd);
 
         obd->obd_attached = 1;
         CDEBUG(D_IOCTL, "OBD: dev %d attached type %s with refcount %d\n",
@@ -280,29 +350,32 @@ int class_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
         /* just leave this on forever.  I can't use obd_set_up here because
            other fns check that status, and we're not actually set up yet. */
         obd->obd_starting = 1;
+        obd->obd_uuid_hash = NULL;
+        obd->obd_nid_hash = NULL;
+        obd->obd_nid_stats_hash = NULL;
         spin_unlock(&obd->obd_dev_lock);
 
-        /* create an uuid-export hash body */
-        err = lustre_hash_init(&obd->obd_uuid_hash_body, "UUID_HASH",
-                               128, &uuid_hash_operations);
-        if (err)
-                GOTO(err_hash, err);
-
-        /* create a nid-export hash body */
-        err = lustre_hash_init(&obd->obd_nid_hash_body, "NID_HASH",
-                               128, &nid_hash_operations);
-        if (err)
-                GOTO(err_hash, err);
-
-        /* create a nid-stats hash body */
-        err = lustre_hash_init(&obd->obd_nid_stats_hash_body, "NID_STATS",
-                               128, &nid_stat_hash_operations);
-        if (err)
-                GOTO(err_hash, err);
+        /* create an uuid-export lustre hash */
+        obd->obd_uuid_hash = lustre_hash_init("UUID_HASH", 7, 7,
+                                              &uuid_hash_ops, 0);
+        if (!obd->obd_uuid_hash)
+                GOTO(err_hash, err = -ENOMEM);
+        /* create a nid-export lustre hash */
+        obd->obd_nid_hash = lustre_hash_init("NID_HASH", 7, 7,
+                                             &nid_hash_ops, 0);
+        if (!obd->obd_nid_hash)
+                GOTO(err_hash, err = -ENOMEM);
+        /* create a nid-stats lustre hash */
+        obd->obd_nid_stats_hash = lustre_hash_init("NID_STATS", 7, 7,
+                                                   &nid_stat_hash_ops, 0);
+        if (!obd->obd_nid_stats_hash)
+                GOTO(err_hash, err = -ENOMEM);
 
         exp = class_new_export(obd, &obd->obd_uuid);
         if (IS_ERR(exp))
-                RETURN(PTR_ERR(exp));
+                GOTO(err_hash, err = PTR_ERR(exp));
 
         obd->obd_self_export = exp;
         list_del_init(&exp->exp_obd_chain_timed);
@@ -316,24 +389,34 @@ int class_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
 
         spin_lock(&obd->obd_dev_lock);
         /* cleanup drops this */
-        class_incref(obd);
+        class_incref(obd, "setup", obd);
         spin_unlock(&obd->obd_dev_lock);
 
         CDEBUG(D_IOCTL, "finished setup of obd %s (uuid %s)\n",
                obd->obd_name, obd->obd_uuid.uuid);
 
         RETURN(0);
-
 err_exp:
-        class_unlink_export(obd->obd_self_export);
-        obd->obd_self_export = NULL;
+        if (obd->obd_self_export) {
+                class_unlink_export(obd->obd_self_export);
+                obd->obd_self_export = NULL;
+        }
 err_hash:
-        lustre_hash_exit(&obd->obd_uuid_hash_body);
-        lustre_hash_exit(&obd->obd_nid_hash_body);
-        lustre_hash_exit(&obd->obd_nid_stats_hash_body);
+        if (obd->obd_uuid_hash) {
+                lustre_hash_exit(obd->obd_uuid_hash);
+                obd->obd_uuid_hash = NULL;
+        }
+        if (obd->obd_nid_hash) {
+                lustre_hash_exit(obd->obd_nid_hash);
+                obd->obd_nid_hash = NULL;
+        }
+        if (obd->obd_nid_stats_hash) {
+                lustre_hash_exit(obd->obd_nid_stats_hash);
+                obd->obd_nid_stats_hash = NULL;
+        }
         obd->obd_starting = 0;
         CERROR("setup %s failed (%d)\n", obd->obd_name, err);
-        RETURN(err);
+        return err;
 }
 
 int class_detach(struct obd_device *obd, struct lustre_cfg *lcfg)
@@ -357,7 +440,7 @@ int class_detach(struct obd_device *obd, struct lustre_cfg *lcfg)
         CDEBUG(D_IOCTL, "detach on obd %s (uuid %s)\n",
                obd->obd_name, obd->obd_uuid.uuid);
 
-        class_decref(obd);
+        class_decref(obd, "attach", obd);
 
         /* not strictly necessary, but cleans up eagerly */
         obd_zombie_impexp_cull();
@@ -367,19 +450,22 @@ int class_detach(struct obd_device *obd, struct lustre_cfg *lcfg)
 
 static void dump_exports(struct obd_device *obd)
 {
-        struct obd_export *exp, *n;
+        struct obd_export *exp;
 
-        list_for_each_entry_safe(exp, n, &obd->obd_exports, exp_obd_chain) {
+        spin_lock(&obd->obd_dev_lock);
+        list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain) {
                 struct ptlrpc_reply_state *rs;
                 struct ptlrpc_reply_state *first_reply = NULL;
                 int                        nreplies = 0;
 
+                spin_lock(&exp->exp_lock);
                 list_for_each_entry (rs, &exp->exp_outstanding_replies,
                                      rs_exp_list) {
                         if (nreplies == 0)
                                 first_reply = rs;
                         nreplies++;
                 }
+                spin_unlock(&exp->exp_lock);
 
                 CDEBUG(D_IOCTL, "%s: %p %s %s %d %d %d: %p %s\n",
                        obd->obd_name, exp, exp->exp_client_uuid.uuid,
@@ -388,6 +474,7 @@ static void dump_exports(struct obd_device *obd)
                        exp->exp_failed, nreplies, first_reply,
                        nreplies > 3 ? "..." : "");
         }
+        spin_unlock(&obd->obd_dev_lock);
 }
 
 int class_cleanup(struct obd_device *obd, struct lustre_cfg *lcfg)
@@ -412,7 +499,7 @@ int class_cleanup(struct obd_device *obd, struct lustre_cfg *lcfg)
         /* Leave this on forever */
         obd->obd_stopping = 1;
         spin_unlock(&obd->obd_dev_lock);
-
+        
         if (lcfg->lcfg_bufcount >= 2 && LUSTRE_CFG_BUFLEN(lcfg, 1) > 0) {
                 for (flag = lustre_cfg_string(lcfg, 1); *flag != 0; flag++)
                         switch (*flag) {
@@ -425,32 +512,26 @@ int class_cleanup(struct obd_device *obd, struct lustre_cfg *lcfg)
                                 obd->obd_fail = 1;
                                 obd->obd_no_transno = 1;
                                 obd->obd_no_recov = 1;
-                                /* Set the obd readonly if we can */
-                                if (OBP(obd, iocontrol))
+                                if (OBP(obd, iocontrol)) {
+                                        obd_iocontrol(OBD_IOC_SYNC,
+                                                      obd->obd_self_export,
+                                                      0, NULL, NULL);
+                                        /* Set the obd readonly if we can */
                                         obd_iocontrol(OBD_IOC_SET_READONLY,
                                                       obd->obd_self_export,
                                                       0, NULL, NULL);
+                                }
                                 break;
                         default:
-                                CERROR("unrecognised flag '%c'\n",
-                                       *flag);
+                                CERROR("Unrecognised flag '%c'\n", *flag);
                         }
         }
 
+        LASSERT(obd->obd_self_export);
+
         /* The three references that should be remaining are the
          * obd_self_export and the attach and setup references. */
         if (atomic_read(&obd->obd_refcount) > 3) {
-#if 0           /* We should never fail to cleanup with mountconf */
-                if (!(obd->obd_fail || obd->obd_force)) {
-                        CERROR("OBD %s is still busy with %d references\n"
-                               "You should stop active file system users,"
-                               " or use the --force option to cleanup.\n",
-                               obd->obd_name, atomic_read(&obd->obd_refcount));
-                        dump_exports(obd);
-                        /* Allow a failed cleanup to try again. */
-                        obd->obd_stopping = 0;
-                }
-#endif
                 /* refcounf - 3 might be the number of real exports
                    (excluding self export). But class_incref is called
                    by other things as well, so don't count on it. */
@@ -459,29 +540,39 @@ int class_cleanup(struct obd_device *obd, struct lustre_cfg *lcfg)
                 dump_exports(obd);
                 class_disconnect_exports(obd);
         }
-        LASSERT(obd->obd_self_export);
 
         /* destroy an uuid-export hash body */
-        lustre_hash_exit(&obd->obd_uuid_hash_body);
+        if (obd->obd_uuid_hash) {
+                lustre_hash_exit(obd->obd_uuid_hash);
+                obd->obd_uuid_hash = NULL;
+        }
 
         /* destroy a nid-export hash body */
-        lustre_hash_exit(&obd->obd_nid_hash_body);
+        if (obd->obd_nid_hash) {
+                lustre_hash_exit(obd->obd_nid_hash);
+                obd->obd_nid_hash = NULL;
+        }
 
         /* destroy a nid-stats hash body */
-        lustre_hash_exit(&obd->obd_nid_stats_hash_body);
+        if (obd->obd_nid_stats_hash) {
+                lustre_hash_exit(obd->obd_nid_stats_hash);
+                obd->obd_nid_stats_hash = NULL;
+        }
 
         /* Precleanup, we must make sure all exports get destroyed. */
         err = obd_precleanup(obd, OBD_CLEANUP_EXPORTS);
         if (err)
                 CERROR("Precleanup %s returned %d\n",
                        obd->obd_name, err);
-        class_decref(obd);
+        class_decref(obd, "setup", obd);
         obd->obd_set_up = 0;
         RETURN(0);
 }
 
-struct obd_device *class_incref(struct obd_device *obd)
+struct obd_device *class_incref(struct obd_device *obd,
+                                const char *scope, const void *source)
 {
+        lu_ref_add_atomic(&obd->obd_reference, scope, source);
         atomic_inc(&obd->obd_refcount);
         CDEBUG(D_INFO, "incref %s (%p) now %d\n", obd->obd_name, obd,
                atomic_read(&obd->obd_refcount));
@@ -489,7 +580,7 @@ struct obd_device *class_incref(struct obd_device *obd)
         return obd;
 }
 
-void class_decref(struct obd_device *obd)
+void class_decref(struct obd_device *obd, const char *scope, const void *source)
 {
         int err;
         int refs;
@@ -498,6 +589,7 @@ void class_decref(struct obd_device *obd)
         atomic_dec(&obd->obd_refcount);
         refs = atomic_read(&obd->obd_refcount);
         spin_unlock(&obd->obd_dev_lock);
+        lu_ref_del(&obd->obd_reference, scope, source);
 
         CDEBUG(D_INFO, "Decref %s (%p) now %d\n", obd->obd_name, obd, refs);
 
@@ -528,7 +620,7 @@ void class_decref(struct obd_device *obd)
                                        obd->obd_name, err);
                 }
                 if (OBP(obd, detach)) {
-                        err = OBP(obd,detach)(obd);
+                        err = OBP(obd, detach)(obd);
                         if (err)
                                 CERROR("Detach returned %d\n", err);
                 }
@@ -821,6 +913,28 @@ int class_process_config(struct lustre_cfg *lcfg)
                 err = class_del_conn(obd, lcfg);
                 GOTO(out, err = 0);
         }
+        case LCFG_POOL_NEW: {
+                err = obd_pool_new(obd, lustre_cfg_string(lcfg, 2));
+                GOTO(out, err = 0);
+                break;
+        }
+        case LCFG_POOL_ADD: {
+                err = obd_pool_add(obd, lustre_cfg_string(lcfg, 2),
+                                   lustre_cfg_string(lcfg, 3));
+                GOTO(out, err = 0);
+                break;
+        }
+        case LCFG_POOL_REM: {
+                err = obd_pool_rem(obd, lustre_cfg_string(lcfg, 2),
+                                   lustre_cfg_string(lcfg, 3));
+                GOTO(out, err = 0);
+                break;
+        }
+        case LCFG_POOL_DEL: {
+                err = obd_pool_del(obd, lustre_cfg_string(lcfg, 2));
+                GOTO(out, err = 0);
+                break;
+        }
         default: {
                 err = obd_process_config(obd, sizeof(*lcfg), lcfg);
                 GOTO(out, err);
@@ -845,6 +959,7 @@ int class_process_proc_param(char *prefix, struct lprocfs_vars *lvars,
         int i, keylen, vallen;
         int matched = 0, j = 0;
         int rc = 0;
+        int skip = 0;
         ENTRY;
 
         if (lcfg->lcfg_command != LCFG_PARAM) {
@@ -861,7 +976,7 @@ int class_process_proc_param(char *prefix, struct lprocfs_vars *lvars,
                 class_match_param(key, prefix, &key);
                 sval = strchr(key, '=');
                 if (!sval || (*(sval + 1) == 0)) {
-                        CERROR("Can't parse param %s\n", key);
+                        CERROR("Can't parse param %s (missing '=')\n", key);
                         /* rc = -EINVAL; continue parsing other params */
                         continue;
                 }
@@ -893,9 +1008,14 @@ int class_process_proc_param(char *prefix, struct lprocfs_vars *lvars,
                         j++;
                 }
                 if (!matched) {
+                        /* If the prefix doesn't match, return error so we
+                           can pass it down the stack */
+                        if (strnchr(key, keylen, '.'))
+                            RETURN(-ENOSYS);
                         CERROR("%s: unknown param %s\n",
                                (char *)lustre_cfg_string(lcfg, 0), key);
                         /* rc = -EINVAL;       continue parsing other params */
+                        skip++;
                 } else {
                         LCONSOLE_INFO("%s.%.*s: set parameter %.*s=%s\n",
                                       lustre_cfg_string(lcfg, 0),
@@ -906,6 +1026,8 @@ int class_process_proc_param(char *prefix, struct lprocfs_vars *lvars,
 
         if (rc > 0)
                 rc = 0;
+        if (!rc && skip)
+                rc = skip;
         RETURN(rc);
 #else
         CDEBUG(D_CONFIG, "liblustre can't process params.\n");
@@ -955,8 +1077,8 @@ static int class_config_llog_handler(struct llog_handle * handle,
                 /* Figure out config state info */
                 if (lcfg->lcfg_command == LCFG_MARKER) {
                         struct cfg_marker *marker = lustre_cfg_buf(lcfg, 1);
-                        if (swab)
-                                lustre_swab_cfg_marker(marker);
+                        lustre_swab_cfg_marker(marker, swab,
+                                               LUSTRE_CFG_BUFLEN(lcfg, 1));
                         CDEBUG(D_CONFIG, "Marker, inst_flg=%#x mark_flg=%#x\n",
                                clli->cfg_flags, marker->cm_flags);
                         if (marker->cm_flags & CM_START) {
@@ -967,8 +1089,9 @@ static int class_config_llog_handler(struct llog_handle * handle,
                                         CDEBUG(D_CONFIG, "SKIP #%d\n",
                                                marker->cm_step);
                                 } else if ((marker->cm_flags & CM_EXCLUDE) ||
-                                           lustre_check_exclusion(clli->cfg_sb,
-                                                          marker->cm_tgtname)) {
+                                           (clli->cfg_sb &&
+                                            lustre_check_exclusion(clli->cfg_sb,
+                                                         marker->cm_tgtname))) {
                                         clli->cfg_flags |= CFG_F_EXCLUDE;
                                         CDEBUG(D_CONFIG, "EXCLUDE %d\n",
                                                marker->cm_step);
@@ -996,6 +1119,29 @@ static int class_config_llog_handler(struct llog_handle * handle,
                         break;
                 }
 
+                /*
+                 * For interoperability between 1.8 and 2.0,
+                 * rename "mds" obd device type to "mdt".
+                 */
+                {
+                        char *typename = lustre_cfg_string(lcfg, 1);
+                        char *index = lustre_cfg_string(lcfg, 2);
+                        
+                        if ((lcfg->lcfg_command == LCFG_ATTACH && typename &&
+                             strcmp(typename, "mds") == 0)) {
+                                CWARN("For 1.8 interoperability, rename obd "
+                                       "type from mds to mdt\n");
+                                typename[2] = 't';
+                        }
+                        if ((lcfg->lcfg_command == LCFG_SETUP && index &&
+                             strcmp(index, "type") == 0)) {
+                                CDEBUG(D_INFO, "For 1.8 interoperability, "
+                                       "set this index to '0'\n");
+                                index[0] = '0';
+                                index[1] = 0;
+                        }
+                }
+
                 if ((clli->cfg_flags & CFG_F_EXCLUDE) &&
                     (lcfg->lcfg_command == LCFG_LOV_ADD_OBD))
                         /* Add inactive instead */
@@ -1026,6 +1172,22 @@ static int class_config_llog_handler(struct llog_handle * handle,
                         lustre_cfg_bufs_set_string(&bufs, 2,
                                                    clli->cfg_uuid.uuid);
                 }
+                /*
+                 * sptlrpc config record, we expect 2 data segments:
+                 *  [0]: fs_name/target_name,
+                 *  [1]: rule string
+                 * moving them to index [1] and [2], and insert MGC's
+                 * obdname at index [0].
+                 */
+                if (clli && clli->cfg_instance == NULL &&
+                    lcfg->lcfg_command == LCFG_SPTLRPC_CONF) {
+                        lustre_cfg_bufs_set(&bufs, 2, bufs.lcfg_buf[1],
+                                            bufs.lcfg_buflen[1]);
+                        lustre_cfg_bufs_set(&bufs, 1, bufs.lcfg_buf[0],
+                                            bufs.lcfg_buflen[0]);
+                        lustre_cfg_bufs_set_string(&bufs, 0,
+                                                   clli->cfg_obdname);
+                }
 
                 lcfg_new = lustre_cfg_new(lcfg->lcfg_command, &bufs);
 
@@ -1198,10 +1360,10 @@ parse_out:
 /* Cleanup and detach */
 int class_manual_cleanup(struct obd_device *obd)
 {
-        struct lustre_cfg *lcfg;
-        struct lustre_cfg_bufs bufs;
-        int rc;
-        char flags[3]="";
+        char                    flags[3] = "";
+        struct lustre_cfg      *lcfg;
+        struct lustre_cfg_bufs  bufs;
+        int                     rc;
         ENTRY;
 
         if (!obd) {
@@ -1220,6 +1382,8 @@ int class_manual_cleanup(struct obd_device *obd)
         lustre_cfg_bufs_reset(&bufs, obd->obd_name);
         lustre_cfg_bufs_set_string(&bufs, 1, flags);
         lcfg = lustre_cfg_new(LCFG_CLEANUP, &bufs);
+        if (!lcfg)
+                RETURN(-ENOMEM);
 
         rc = class_process_config(lcfg);
         if (rc) {
@@ -1236,3 +1400,188 @@ out:
         lustre_cfg_free(lcfg);
         RETURN(rc);
 }
+
+/*
+ * uuid<->export lustre hash operations
+ */
+
+static unsigned
+uuid_hash(lustre_hash_t *lh,  void *key, unsigned mask)
+{
+        return lh_djb2_hash(((struct obd_uuid *)key)->uuid,
+                            sizeof(((struct obd_uuid *)key)->uuid), mask);
+}
+
+static void *
+uuid_key(struct hlist_node *hnode)
+{
+        struct obd_export *exp;
+
+        exp = hlist_entry(hnode, struct obd_export, exp_uuid_hash);
+
+        RETURN(&exp->exp_client_uuid);
+}
+
+/*
+ * NOTE: It is impossible to find an export that is in failed
+ *       state with this function
+ */
+static int
+uuid_compare(void *key, struct hlist_node *hnode)
+{
+        struct obd_export *exp;
+
+        LASSERT(key);
+        exp = hlist_entry(hnode, struct obd_export, exp_uuid_hash);
+
+        RETURN(obd_uuid_equals((struct obd_uuid *)key,&exp->exp_client_uuid) &&
+               !exp->exp_failed);
+}
+
+static void *
+uuid_export_get(struct hlist_node *hnode)
+{
+        struct obd_export *exp;
+
+        exp = hlist_entry(hnode, struct obd_export, exp_uuid_hash);
+        class_export_get(exp);
+
+        RETURN(exp);
+}
+
+static void *
+uuid_export_put(struct hlist_node *hnode)
+{
+        struct obd_export *exp;
+
+        exp = hlist_entry(hnode, struct obd_export, exp_uuid_hash);
+        class_export_put(exp);
+
+        RETURN(exp);
+}
+
+static lustre_hash_ops_t uuid_hash_ops = {
+        .lh_hash    = uuid_hash,
+        .lh_key     = uuid_key,
+        .lh_compare = uuid_compare,
+        .lh_get     = uuid_export_get,
+        .lh_put     = uuid_export_put,
+};
+
+
+/*
+ * nid<->export hash operations
+ */
+
+static unsigned
+nid_hash(lustre_hash_t *lh,  void *key, unsigned mask)
+{
+        return lh_djb2_hash(key, sizeof(lnet_nid_t), mask);
+}
+
+static void *
+nid_key(struct hlist_node *hnode)
+{
+        struct obd_export *exp;
+
+        exp = hlist_entry(hnode, struct obd_export, exp_nid_hash);
+
+        RETURN(&exp->exp_connection->c_peer.nid);
+}
+
+/*
+ * NOTE: It is impossible to find an export that is in failed
+ *       state with this function
+ */
+static int
+nid_compare(void *key, struct hlist_node *hnode)
+{
+        struct obd_export *exp;
+
+        LASSERT(key);
+        exp = hlist_entry(hnode, struct obd_export, exp_nid_hash);
+
+        RETURN(exp->exp_connection->c_peer.nid == *(lnet_nid_t *)key &&
+               !exp->exp_failed);
+}
+
+static void *
+nid_export_get(struct hlist_node *hnode)
+{
+        struct obd_export *exp;
+
+        exp = hlist_entry(hnode, struct obd_export, exp_nid_hash);
+        class_export_get(exp);
+
+        RETURN(exp);
+}
+
+static void *
+nid_export_put(struct hlist_node *hnode)
+{
+        struct obd_export *exp;
+
+        exp = hlist_entry(hnode, struct obd_export, exp_nid_hash);
+        class_export_put(exp);
+
+        RETURN(exp);
+}
+
+static lustre_hash_ops_t nid_hash_ops = {
+        .lh_hash    = nid_hash,
+        .lh_key     = nid_key,
+        .lh_compare = nid_compare,
+        .lh_get     = nid_export_get,
+        .lh_put     = nid_export_put,
+};
+
+
+/*
+ * nid<->nidstats hash operations
+ */
+
+static void *
+nidstats_key(struct hlist_node *hnode)
+{
+        struct nid_stat *ns;
+
+        ns = hlist_entry(hnode, struct nid_stat, nid_hash);
+
+        RETURN(&ns->nid);
+}
+
+static int
+nidstats_compare(void *key, struct hlist_node *hnode)
+{
+        RETURN(*(lnet_nid_t *)nidstats_key(hnode) == *(lnet_nid_t *)key);
+}
+
+static void *
+nidstats_get(struct hlist_node *hnode)
+{
+        struct nid_stat *ns;
+
+        ns = hlist_entry(hnode, struct nid_stat, nid_hash);
+        ns->nid_exp_ref_count++;
+
+        RETURN(ns);
+}
+
+static void *
+nidstats_put(struct hlist_node *hnode)
+{
+        struct nid_stat *ns;
+
+        ns = hlist_entry(hnode, struct nid_stat, nid_hash);
+        ns->nid_exp_ref_count--;
+
+        RETURN(ns);
+}
+
+static lustre_hash_ops_t nid_stat_hash_ops = {
+        .lh_hash    = nid_hash,
+        .lh_key     = nidstats_key,
+        .lh_compare = nidstats_compare,
+        .lh_get     = nidstats_get,
+        .lh_put     = nidstats_put,
+};