Whamcloud - gitweb
- hash-based export handling
authortappro <tappro>
Tue, 6 Nov 2007 13:03:56 +0000 (13:03 +0000)
committertappro <tappro>
Tue, 6 Nov 2007 13:03:56 +0000 (13:03 +0000)
  b:13815
  i:green, shadow

15 files changed:
lustre/include/Makefile.am
lustre/include/class_hash.h [new file with mode: 0644]
lustre/include/liblustre.h
lustre/include/lustre_export.h
lustre/include/lustre_net.h
lustre/include/obd.h
lustre/ldlm/ldlm_lib.c
lustre/obdclass/Makefile.in
lustre/obdclass/autoMakefile.am
lustre/obdclass/class_hash.c [new file with mode: 0644]
lustre/obdclass/genops.c
lustre/obdclass/lustre_handles.c
lustre/obdclass/obd_config.c
lustre/ptlrpc/connection.c
lustre/ptlrpc/ptlrpc_module.c

index 0a5f452..adf7c6e 100644 (file)
@@ -11,7 +11,7 @@ EXTRA_DIST = ioctl.h liblustre.h lprocfs_status.h lustre_cfg.h        \
             lustre_dlm.h lustre_export.h lustre_fsfilt.h lustre_ha.h \
             lustre_handles.h lustre_import.h lustre_lib.h lustre_sec.h \
             lustre_lite.h lustre_log.h lustre_mds.h lustre_mdc.h \
             lustre_dlm.h lustre_export.h lustre_fsfilt.h lustre_ha.h \
             lustre_handles.h lustre_import.h lustre_lib.h lustre_sec.h \
             lustre_lite.h lustre_log.h lustre_mds.h lustre_mdc.h \
-            lustre_net.h lustre_quota.h lustre_ucache.h lvfs.h \
+            lustre_net.h lustre_quota.h lustre_ucache.h lvfs.h class_hash.h \
             obd_cache.h obd_class.h obd_echo.h obd.h obd_lov.h \
             obd_ost.h obd_support.h lustre_ver.h lu_object.h lu_time.h  \
              md_object.h dt_object.h lustre_param.h lustre_mdt.h \
             obd_cache.h obd_class.h obd_echo.h obd.h obd_lov.h \
             obd_ost.h obd_support.h lustre_ver.h lu_object.h lu_time.h  \
              md_object.h dt_object.h lustre_param.h lustre_mdt.h \
diff --git a/lustre/include/class_hash.h b/lustre/include/class_hash.h
new file mode 100644 (file)
index 0000000..5e8a368
--- /dev/null
@@ -0,0 +1,117 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ */
+
+#ifndef __CLASS_HASH_H
+#define __CLASS_HASH_H
+
+#include <lustre_lib.h>
+
+/* define the hash bucket*/
+struct lustre_hash_bucket { 
+        struct hlist_head lhb_head;
+        spinlock_t lhb_lock;
+#ifdef LUSTRE_HASH_DEBUG
+        /* the number of hash item per bucket, 
+         * it will help us to analyse the hash distribute 
+         */
+        int lhb_item_count; 
+#endif      
+};
+
+struct lustre_hash_operations;
+
+struct lustre_class_hash_body {
+        char hashname[128];
+        spinlock_t lchb_lock; /* body lock */
+        struct lustre_hash_bucket *lchb_hash_tables;
+        __u32 lchb_hash_max_size; /* define the hash tables size */
+        /* define the hash operations */
+        struct lustre_hash_operations *lchb_hash_operations;
+};
+
+/* hash operations method define */
+struct lustre_hash_operations {
+        __u32 (*lustre_hashfn) (struct lustre_class_hash_body *hash_body, 
+                                void *key);
+        int   (*lustre_hash_key_compare) (void *key, 
+                                          struct hlist_node *compared_hnode);
+        /* add refcount */ 
+        void* (*lustre_hash_object_refcount_get) (struct hlist_node *hash_item);
+        /* dec refcount */
+        void  (*lustre_hash_object_refcount_put) (struct hlist_node *hash_item);
+};
+
+static inline struct hlist_node * 
+lustre_hash_getitem_in_bucket_nolock(struct lustre_class_hash_body *hash_body, 
+                                     int hashent, void *key)
+{
+        struct lustre_hash_bucket *bucket;
+        struct hlist_node  *hash_item_node;
+        struct lustre_hash_operations *hop = hash_body->lchb_hash_operations;
+        int find = 0;
+        ENTRY;
+
+        bucket = &hash_body->lchb_hash_tables[hashent];
+        hlist_for_each(hash_item_node, &(bucket->lhb_head)) {
+                find = hop->lustre_hash_key_compare(key, hash_item_node);
+                if (find == 1)
+                        break;
+        }
+        RETURN(find == 1 ? hash_item_node : NULL);
+}
+
+static inline int 
+lustre_hash_delitem_nolock(struct lustre_class_hash_body *hash_body, 
+                           int hashent, struct hlist_node * hash_item)
+{
+        struct lustre_hash_operations *hop = hash_body->lchb_hash_operations;
+
+        hlist_del_init(hash_item);
+
+        hop->lustre_hash_object_refcount_put(hash_item);
+
+#ifdef LUSTRE_HASH_DEBUG
+        hash_body->lchb_hash_tables[hashent].lhb_item_count--;
+        CDEBUG(D_INFO, "hashname[%s] bucket[%d] has [%d] hashitem\n", 
+                        hash_body->hashname, hashent, 
+                        hash_body->lchb_hash_tables[hashent].lhb_item_count);
+#endif
+
+        RETURN(0);
+}
+
+int lustre_hash_init(struct lustre_class_hash_body **hash_body,
+                     char *hashname, __u32 hashsize, 
+                     struct lustre_hash_operations *hash_operations);
+void lustre_hash_exit(struct lustre_class_hash_body **hash_body);
+int lustre_hash_additem_unique(struct lustre_class_hash_body *hash_body, 
+                               void *key, struct hlist_node *actual_hnode);
+int lustre_hash_additem(struct lustre_class_hash_body *hash_body, void *key, 
+                        struct hlist_node *actual_hnode);
+int lustre_hash_delitem_by_key(struct lustre_class_hash_body *hash_body, 
+                               void *key);
+int lustre_hash_delitem(struct lustre_class_hash_body *hash_body, void *key, 
+                        struct hlist_node *hash_item);
+void * lustre_hash_get_object_by_key(struct lustre_class_hash_body *hash_body,
+                                      void *key);
+
+/* ( uuid <-> export ) hash operations define */
+__u32 uuid_hashfn(struct lustre_class_hash_body *hash_body,  void * key);
+int uuid_hash_key_compare(void *key, struct hlist_node * compared_hnode);
+void * uuid_export_refcount_get(struct hlist_node * actual_hnode);
+void uuid_export_refcount_put(struct hlist_node * actual_hnode);
+
+/* ( nid <-> export ) hash operations define */
+__u32 nid_hashfn(struct lustre_class_hash_body *hash_body,  void * key);
+int nid_hash_key_compare(void *key, struct hlist_node * compared_hnode);
+void * nid_export_refcount_get(struct hlist_node * actual_hnode);
+void nid_export_refcount_put(struct hlist_node * actual_hnode);
+
+/* ( net_peer <-> connection ) hash operations define */
+__u32 conn_hashfn(struct lustre_class_hash_body *hash_body,  void * key);
+int conn_hash_key_compare(void *key, struct hlist_node * compared_hnode);
+void * conn_refcount_get(struct hlist_node * actual_hnode);
+void conn_refcount_put(struct hlist_node * actual_hnode);
+
+#endif /* __CLASS_HASH_H */
index c809c7c..a9c3a91 100644 (file)
@@ -726,6 +726,7 @@ static inline void del_timer(struct timer_list *l)
 typedef struct { volatile int counter; } atomic_t;
 
 #define ATOMIC_INIT(i) { (i) }
 typedef struct { volatile int counter; } atomic_t;
 
 #define ATOMIC_INIT(i) { (i) }
+
 #define atomic_read(a) ((a)->counter)
 #define atomic_set(a,b) do {(a)->counter = b; } while (0)
 #define atomic_dec_and_test(a) ((--((a)->counter)) == 0)
 #define atomic_read(a) ((a)->counter)
 #define atomic_set(a,b) do {(a)->counter = b; } while (0)
 #define atomic_dec_and_test(a) ((--((a)->counter)) == 0)
@@ -734,6 +735,8 @@ typedef struct { volatile int counter; } atomic_t;
 #define atomic_dec(a)  do { (a)->counter--; } while (0)
 #define atomic_add(b,a)  do {(a)->counter += b;} while (0)
 #define atomic_sub(b,a)  do {(a)->counter -= b;} while (0)
 #define atomic_dec(a)  do { (a)->counter--; } while (0)
 #define atomic_add(b,a)  do {(a)->counter += b;} while (0)
 #define atomic_sub(b,a)  do {(a)->counter -= b;} while (0)
+#define atomic_sub_return(n,a) ((a)->counter -= n)
+#define atomic_dec_return(a)  atomic_sub_return(1,a)
 
 #ifndef likely
 #define likely(exp) (exp)
 
 #ifndef likely
 #define likely(exp) (exp)
index 787094c..5c8719d 100644 (file)
@@ -85,6 +85,8 @@ struct obd_export {
         atomic_t                  exp_rpc_count;
         struct obd_uuid           exp_client_uuid;
         struct list_head          exp_obd_chain;
         atomic_t                  exp_rpc_count;
         struct obd_uuid           exp_client_uuid;
         struct list_head          exp_obd_chain;
+        struct hlist_node         exp_uuid_hash; /* uuid-export hash*/
+        struct hlist_node         exp_nid_hash; /* nid-export hash */
         /* exp_obd_chain_timed fo ping evictor, protected by obd_dev_lock */
         struct list_head          exp_obd_chain_timed;
         struct obd_device        *exp_obd;
         /* exp_obd_chain_timed fo ping evictor, protected by obd_dev_lock */
         struct list_head          exp_obd_chain_timed;
         struct obd_device        *exp_obd;
index b55d69e..5baf93a 100644 (file)
 
 struct ptlrpc_connection {
         struct list_head        c_link;
 
 struct ptlrpc_connection {
         struct list_head        c_link;
+        struct hlist_node       c_hash;
         lnet_nid_t              c_self;
         lnet_process_id_t       c_peer;
         struct obd_uuid         c_remote_uuid;
         lnet_nid_t              c_self;
         lnet_process_id_t       c_peer;
         struct obd_uuid         c_remote_uuid;
@@ -671,7 +672,7 @@ struct ptlrpc_connection *ptlrpc_get_connection(lnet_process_id_t peer,
                                                 lnet_nid_t self, struct obd_uuid *uuid);
 int ptlrpc_put_connection(struct ptlrpc_connection *c);
 struct ptlrpc_connection *ptlrpc_connection_addref(struct ptlrpc_connection *);
                                                 lnet_nid_t self, struct obd_uuid *uuid);
 int ptlrpc_put_connection(struct ptlrpc_connection *c);
 struct ptlrpc_connection *ptlrpc_connection_addref(struct ptlrpc_connection *);
-void ptlrpc_init_connection(void);
+int ptlrpc_init_connection(void);
 void ptlrpc_cleanup_connection(void);
 extern lnet_pid_t ptl_get_pid(void);
 
 void ptlrpc_cleanup_connection(void);
 extern lnet_pid_t ptl_get_pid(void);
 
index 9ef5cce..4c40002 100644 (file)
@@ -34,6 +34,7 @@
 #include <lustre_quota.h>
 #include <lustre_fld.h>
 #include <lustre_capa.h>
 #include <lustre_quota.h>
 #include <lustre_fld.h>
 #include <lustre_capa.h>
+#include <class_hash.h>
 
 #define MAX_OBD_DEVICES 8192
 
 
 #define MAX_OBD_DEVICES 8192
 
@@ -847,6 +848,10 @@ struct obd_device {
                      obd_fail:1,          /* cleanup with failover */
                      obd_async_recov:1,   /* allow asyncronous orphan cleanup */
                      obd_no_conn:1;       /* deny new connections */
                      obd_fail:1,          /* cleanup with failover */
                      obd_async_recov:1,   /* allow asyncronous orphan cleanup */
                      obd_no_conn:1;       /* deny new connections */
+        /* uuid-export hash body */
+        struct lustre_class_hash_body *obd_uuid_hash_body;
+        /* nid-export hash body */
+        struct lustre_class_hash_body *obd_nid_hash_body; 
         atomic_t obd_refcount;
         cfs_waitq_t             obd_refcount_waitq;
         struct list_head        obd_exports;
         atomic_t obd_refcount;
         cfs_waitq_t             obd_refcount_waitq;
         struct list_head        obd_exports;
index b2f8c44..18306b2 100644 (file)
@@ -568,7 +568,6 @@ int target_handle_connect(struct ptlrpc_request *req)
         struct obd_uuid tgtuuid;
         struct obd_uuid cluuid;
         struct obd_uuid remote_uuid;
         struct obd_uuid tgtuuid;
         struct obd_uuid cluuid;
         struct obd_uuid remote_uuid;
-        struct list_head *p;
         char *str, *tmp;
         int rc = 0;
         int initial_conn = 0;
         char *str, *tmp;
         int rc = 0;
         int initial_conn = 0;
@@ -689,43 +688,35 @@ int target_handle_connect(struct ptlrpc_request *req)
                 goto dont_check_exports;
 
         spin_lock(&target->obd_dev_lock);
                 goto dont_check_exports;
 
         spin_lock(&target->obd_dev_lock);
-        list_for_each(p, &target->obd_exports) {
-                export = list_entry(p, struct obd_export, exp_obd_chain);
-                if (obd_uuid_equals(&cluuid, &export->exp_client_uuid)) {
-                        if (export->exp_connecting) { /* bug 9635, et. al. */
-                                CWARN("%s: exp %p already connecting\n",
-                                      export->exp_obd->obd_name, export);
-                                export = NULL;
-                                rc = -EALREADY;
-                                break;
-                        }
-
-                        /* make darn sure this is coming from the same peer
-                         * if the UUIDs matched */
-                        if ((export->exp_connection != NULL) &&
-                            (strcmp(libcfs_nid2str(req->rq_peer.nid),
-                                    libcfs_nid2str(export->exp_connection->c_peer.nid)))) {
-                                CWARN("%s: cookie %s seen on new NID %s when "
-                                      "existing NID %s is already connected\n",
-                                      target->obd_name, cluuid.uuid,
-                                      libcfs_nid2str(req->rq_peer.nid),
-                                      libcfs_nid2str(export->exp_connection->c_peer.nid));
-                                export = NULL;
-                                rc = -EALREADY;
-                                break;
-                        }
+        export = lustre_hash_get_object_by_key(target->obd_uuid_hash_body, &cluuid);
 
 
-                        spin_lock(&export->exp_lock);
-                        export->exp_connecting = 1;
-                        spin_unlock(&export->exp_lock);
-                        spin_unlock(&target->obd_dev_lock);
-                        LASSERT(export->exp_obd == target);
-
-                        rc = target_handle_reconnect(&conn, export, &cluuid,
-                                                     initial_conn);
-                        break;
-                }
+        if (export != NULL && export->exp_connecting) { /* bug 9635, et. al. */
+                CWARN("%s: exp %p already connecting\n",
+                      export->exp_obd->obd_name, export);
+                class_export_put(export);
                 export = NULL;
                 export = NULL;
+                rc = -EALREADY;
+        } else if (export != NULL && export->exp_connection != NULL &&
+                   req->rq_peer.nid != export->exp_connection->c_peer.nid) {
+                /* make darn sure this is coming from the same peer
+                 * if the UUIDs matched */
+                  CWARN("%s: cookie %s seen on new NID %s when "
+                          "existing NID %s is already connected\n",
+                        target->obd_name, cluuid.uuid,
+                  libcfs_nid2str(req->rq_peer.nid),
+                  libcfs_nid2str(export->exp_connection->c_peer.nid));
+                  class_export_put(export);
+                  export = NULL;
+                  rc = -EALREADY;
+        } else if (export != NULL) {
+                spin_lock(&export->exp_lock);
+                export->exp_connecting = 1;
+                spin_unlock(&export->exp_lock);
+                class_export_put(export);
+                spin_unlock(&target->obd_dev_lock);
+                LASSERT(export->exp_obd == target);
+
+                rc = target_handle_reconnect(&conn, export, &cluuid, initial_conn);
         }
 
         /* If we found an export, we already unlocked. */
         }
 
         /* If we found an export, we already unlocked. */
@@ -871,6 +862,14 @@ dont_check_exports:
                                                        req->rq_self,
                                                        &remote_uuid);
 
                                                        req->rq_self,
                                                        &remote_uuid);
 
+        spin_lock(&target->obd_dev_lock);
+        /* Export might be hashed already, e.g. if this is reconnect */
+        if (hlist_unhashed(&export->exp_nid_hash))
+                lustre_hash_additem(export->exp_obd->obd_nid_hash_body, 
+                                    &export->exp_connection->c_peer.nid, 
+                                    &export->exp_nid_hash);
+        spin_unlock(&target->obd_dev_lock);
+
         spin_lock_bh(&target->obd_processing_task_lock);
         if (target->obd_recovering && !export->exp_in_recovery) {
                 spin_lock(&export->exp_lock);
         spin_lock_bh(&target->obd_processing_task_lock);
         if (target->obd_recovering && !export->exp_in_recovery) {
                 spin_lock(&export->exp_lock);
index ffd9cbf..30b7361 100644 (file)
@@ -20,7 +20,7 @@ sources:
 endif
 
 obdclass-all-objs := llog.o llog_cat.o llog_lvfs.o llog_obd.o llog_swab.o
 endif
 
 obdclass-all-objs := llog.o llog_cat.o llog_lvfs.o llog_obd.o llog_swab.o
-obdclass-all-objs += class_obd.o
+obdclass-all-objs += class_obd.o class_hash.o
 obdclass-all-objs += debug.o genops.o uuid.o llog_ioctl.o
 obdclass-all-objs += lprocfs_status.o lustre_handles.o lustre_peer.o
 obdclass-all-objs += statfs_pack.o obdo.o obd_config.o obd_mount.o prng.o mea.o
 obdclass-all-objs += debug.o genops.o uuid.o llog_ioctl.o
 obdclass-all-objs += lprocfs_status.o lustre_handles.o lustre_peer.o
 obdclass-all-objs += statfs_pack.o obdo.o obd_config.o obd_mount.o prng.o mea.o
index 5f70443..7a6fdce 100644 (file)
@@ -8,7 +8,7 @@ if LIBLUSTRE
 
 noinst_LIBRARIES = liblustreclass.a
 liblustreclass_a_SOURCES = class_obd.c debug.c genops.c statfs_pack.c mea.c uuid.c 
 
 noinst_LIBRARIES = liblustreclass.a
 liblustreclass_a_SOURCES = class_obd.c debug.c genops.c statfs_pack.c mea.c uuid.c 
-liblustreclass_a_SOURCES += lustre_handles.c lustre_peer.c lprocfs_status.c
+liblustreclass_a_SOURCES += lustre_handles.c lustre_peer.c lprocfs_status.c class_hash.c
 liblustreclass_a_SOURCES += obdo.c obd_config.c llog.c llog_obd.c llog_cat.c 
 liblustreclass_a_SOURCES += llog_lvfs.c llog_swab.c capa.c
 liblustreclass_a_SOURCES += prng.c #llog_ioctl.c rbtree.c
 liblustreclass_a_SOURCES += obdo.c obd_config.c llog.c llog_obd.c llog_cat.c 
 liblustreclass_a_SOURCES += llog_lvfs.c llog_swab.c capa.c
 liblustreclass_a_SOURCES += prng.c #llog_ioctl.c rbtree.c
diff --git a/lustre/obdclass/class_hash.c b/lustre/obdclass/class_hash.c
new file mode 100644 (file)
index 0000000..766e16d
--- /dev/null
@@ -0,0 +1,547 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  Copyright (C) 2005 Cluster File Systems, Inc.
+ *   Author: YuZhangyong <yzy@clusterfs.com>
+ *
+ *   This file is part of Lustre, http://www.lustre.org/
+ *
+ *   No redistribution or use is permitted outside of Cluster File Systems, Inc.
+ *
+ *   Implement a hash class for hash process in lustre system.
+ */
+
+#ifndef __KERNEL__
+#include <liblustre.h>
+#include <obd.h>
+#endif
+
+#include <obd_class.h>
+#include <class_hash.h>
+#include <lustre_export.h>
+#include <obd_support.h>
+#include <lustre_net.h>
+
+int lustre_hash_init(struct lustre_class_hash_body **hash_body_new, 
+                     char *hashname, __u32 hashsize, 
+                     struct lustre_hash_operations *hash_operations)
+{
+        int i, n = 0;
+        struct lustre_class_hash_body *hash_body = NULL;
+
+        LASSERT(hashsize > 0);
+        LASSERT(hash_operations != NULL);
+        ENTRY;
+
+        i = hashsize;
+        while (i != 0) {
+                if (i & 0x1)
+                        n++;
+                i >>= 1;
+        }
+        
+        LASSERTF(n == 1, "hashsize %u isn't 2^n\n", hashsize);
+
+        /* alloc space for hash_body */   
+        OBD_ALLOC(hash_body, sizeof(*hash_body)); 
+
+        if (hash_body == NULL) {
+                CERROR("Cannot alloc space for hash body, hashname = %s \n", 
+                        hashname);
+                RETURN(-ENOMEM);
+        }
+
+        LASSERT(hashname != NULL && 
+                strlen(hashname) <= sizeof(hash_body->hashname));
+        strcpy(hash_body->hashname, hashname);
+        hash_body->lchb_hash_max_size = hashsize;      
+        hash_body->lchb_hash_operations = hash_operations;  
+
+        /* alloc space for the hash tables */
+        OBD_ALLOC(hash_body->lchb_hash_tables, 
+                  sizeof(*hash_body->lchb_hash_tables) * hash_body->lchb_hash_max_size);     
+
+        if (hash_body->lchb_hash_tables == NULL) {
+                OBD_FREE(hash_body, sizeof(*hash_body)); 
+                CERROR("Cannot alloc space for hashtables, hashname = %s \n", 
+                        hash_body->hashname);
+                RETURN(-ENOMEM);
+        }
+
+        spin_lock_init(&hash_body->lchb_lock); /* initialize the body lock */
+
+        for(i = 0 ; i < hash_body->lchb_hash_max_size; i++) {
+                /* initial the bucket lock and list_head */
+                INIT_HLIST_HEAD(&hash_body->lchb_hash_tables[i].lhb_head);
+                spin_lock_init(&hash_body->lchb_hash_tables[i].lhb_lock);
+        }
+        *hash_body_new = hash_body;
+
+        RETURN(0);
+}
+EXPORT_SYMBOL(lustre_hash_init);
+
+void lustre_hash_exit(struct lustre_class_hash_body **new_hash_body)
+{
+        int i;
+        struct lustre_class_hash_body *hash_body = NULL;
+        ENTRY;
+
+        hash_body = *new_hash_body;
+
+        if (hash_body == NULL) {
+                CWARN("hash body has been deleted\n");
+                goto out_hash;
+        }
+
+        spin_lock(&hash_body->lchb_lock); /* lock the hash tables */
+
+        if (hash_body->lchb_hash_tables == NULL ) {
+                spin_unlock(&hash_body->lchb_lock);
+                CWARN("hash tables has been deleted\n");
+                goto out_hash;   
+        }
+
+        for( i = 0; i < hash_body->lchb_hash_max_size; i++ ) {
+                struct lustre_hash_bucket * bucket;
+                struct hlist_node * actual_hnode, *pos;
+
+                bucket = &hash_body->lchb_hash_tables[i];
+                spin_lock(&bucket->lhb_lock); /* lock the bucket */
+                hlist_for_each_safe(actual_hnode, pos, &(bucket->lhb_head)) {
+                        lustre_hash_delitem_nolock(hash_body, i, actual_hnode);
+                }
+                spin_unlock(&bucket->lhb_lock); 
+        }
+
+        /* free the hash_tables's memory space */
+        OBD_FREE(hash_body->lchb_hash_tables,
+                  sizeof(*hash_body->lchb_hash_tables) * hash_body->lchb_hash_max_size);     
+
+        hash_body->lchb_hash_tables = NULL;
+
+        spin_unlock(&hash_body->lchb_lock);
+
+out_hash : 
+        /* free the hash_body's memory space */
+        if (hash_body != NULL) {
+                OBD_FREE(hash_body, sizeof(*hash_body));
+                *new_hash_body = NULL;
+        }
+        EXIT;
+}
+EXPORT_SYMBOL(lustre_hash_exit);
+
+/*
+ * only allow unique @key in hashtables, if the same @key has existed 
+ * in hashtables, it will return with fails.
+ */
+int lustre_hash_additem_unique(struct lustre_class_hash_body *hash_body, 
+                               void *key, struct hlist_node *actual_hnode)
+{
+        int hashent;
+        struct lustre_hash_bucket *bucket = NULL;
+        struct lustre_hash_operations *hop = hash_body->lchb_hash_operations;
+        ENTRY;
+
+        LASSERT(hlist_unhashed(actual_hnode));
+        hashent = hop->lustre_hashfn(hash_body, key);
+
+        /* get the hash-bucket and lock it */
+        bucket = &hash_body->lchb_hash_tables[hashent];
+        spin_lock(&bucket->lhb_lock);
+
+        if ( (lustre_hash_getitem_in_bucket_nolock(hash_body, hashent, key)) != NULL) {
+                /* the added-item exist in hashtables, so cannot add it again */
+                spin_unlock(&bucket->lhb_lock);
+
+                CWARN("Already found the key in hash [%s]\n", 
+                      hash_body->hashname);
+                RETURN(-EALREADY);
+        }
+
+        hlist_add_head(actual_hnode, &(bucket->lhb_head));
+
+#ifdef LUSTRE_HASH_DEBUG
+        /* hash distribute debug */
+        hash_body->lchb_hash_tables[hashent].lhb_item_count++; 
+        CDEBUG(D_INFO, "hashname[%s] bucket[%d] has [%d] hashitem\n", 
+                        hash_body->hashname, hashent, 
+                        hash_body->lchb_hash_tables[hashent].lhb_item_count);
+#endif  
+        hop->lustre_hash_object_refcount_get(actual_hnode); 
+
+        spin_unlock(&bucket->lhb_lock);
+
+        RETURN(0);
+}
+EXPORT_SYMBOL(lustre_hash_additem_unique);
+
+/*
+ * this version of additem, it allow multi same @key <key, value> in hashtables. 
+ * in this additem version, we don't need to check if exist same @key in hash 
+ * tables, we only add it to related hashbucket.
+ * example: maybe same nid will be related to multi difference export
+ */
+int lustre_hash_additem(struct lustre_class_hash_body *hash_body, void *key, 
+                         struct hlist_node *actual_hnode)
+{
+        int hashent;
+        struct lustre_hash_bucket *bucket = NULL;
+        struct lustre_hash_operations *hop = hash_body->lchb_hash_operations;
+        ENTRY;
+
+        LASSERT(hlist_unhashed(actual_hnode));
+
+        hashent = hop->lustre_hashfn(hash_body, key);
+
+        /* get the hashbucket and lock it */
+        bucket = &hash_body->lchb_hash_tables[hashent];
+        spin_lock(&bucket->lhb_lock);
+
+        hlist_add_head(actual_hnode, &(bucket->lhb_head));
+
+#ifdef LUSTRE_HASH_DEBUG
+        /* hash distribute debug */
+        hash_body->lchb_hash_tables[hashent].lhb_item_count++; 
+        CDEBUG(D_INFO, "hashname[%s] bucket[%d] has [%d] hashitem\n", 
+                        hash_body->hashname, hashent, 
+                        hash_body->lchb_hash_tables[hashent].lhb_item_count);
+#endif  
+        hop->lustre_hash_object_refcount_get(actual_hnode); 
+
+        spin_unlock(&bucket->lhb_lock);
+
+        RETURN(0);
+}
+EXPORT_SYMBOL(lustre_hash_additem);
+
+
+/*
+ * this version of delitem will delete a hashitem with given @key, 
+ * we need to search the <@key, @value> in hashbucket with @key, 
+ * if match, the hashitem will be delete. 
+ * we have a no-search version of delitem, it will directly delete a hashitem, 
+ * doesn't need to search it in hashtables, so it is a O(1) delete.
+ */
+int lustre_hash_delitem_by_key(struct lustre_class_hash_body *hash_body, 
+                               void *key)
+{
+        int hashent ;
+        struct hlist_node * hash_item;
+        struct lustre_hash_bucket *bucket = NULL;
+        struct lustre_hash_operations *hop = hash_body->lchb_hash_operations;
+        int retval = 0;
+        ENTRY;
+
+        hashent = hop->lustre_hashfn(hash_body, key);
+
+        /* first, lock the hashbucket */
+        bucket = &hash_body->lchb_hash_tables[hashent];
+        spin_lock(&bucket->lhb_lock);
+
+        /* get the hash_item from hash_bucket */
+        hash_item = lustre_hash_getitem_in_bucket_nolock(hash_body, hashent, 
+                                                         key);
+
+        if (hash_item == NULL) {
+                spin_unlock(&bucket->lhb_lock);
+                RETURN(-ENOENT);
+        }
+
+        /* call delitem_nolock() to delete the hash_item */
+        retval = lustre_hash_delitem_nolock(hash_body, hashent, hash_item);
+
+        spin_unlock(&bucket->lhb_lock);
+
+        RETURN(retval);
+}
+EXPORT_SYMBOL(lustre_hash_delitem_by_key);
+
+/*
+ * the O(1) version of delete hash item, 
+ * it will directly delete the hashitem with given @hash_item,
+ * the parameter @key used to get the relation hash bucket and lock it.
+ */
+int lustre_hash_delitem(struct lustre_class_hash_body *hash_body, 
+                        void *key, struct hlist_node * hash_item)
+{  
+        int hashent = 0;
+        int retval = 0;
+        struct lustre_hash_bucket *bucket = NULL;
+        struct lustre_hash_operations *hop = hash_body->lchb_hash_operations;
+        ENTRY;
+
+        hashent = hop->lustre_hashfn(hash_body, key);
+
+        bucket = &hash_body->lchb_hash_tables[hashent];
+        spin_lock(&bucket->lhb_lock);
+
+        /* call delitem_nolock() to delete the hash_item */
+        retval = lustre_hash_delitem_nolock(hash_body, hashent, hash_item);
+
+        spin_unlock(&bucket->lhb_lock);
+
+        RETURN(retval);
+}
+EXPORT_SYMBOL(lustre_hash_delitem);
+
+void * lustre_hash_get_object_by_key(struct lustre_class_hash_body *hash_body,
+                                     void *key)
+{
+        int hashent ;
+        struct hlist_node * hash_item_hnode = NULL;
+        void * obj_value = NULL;
+        struct lustre_hash_bucket *bucket = NULL;
+        struct lustre_hash_operations * hop = hash_body->lchb_hash_operations;
+        ENTRY;
+
+        /* get the hash value from the given item */
+        hashent = hop->lustre_hashfn(hash_body, key);
+
+        bucket = &hash_body->lchb_hash_tables[hashent];
+        spin_lock(&bucket->lhb_lock); /* lock the bucket */
+
+        hash_item_hnode = lustre_hash_getitem_in_bucket_nolock(hash_body, 
+                                                               hashent, key);
+
+        if (hash_item_hnode == NULL) {
+                spin_unlock(&bucket->lhb_lock); /* lock the bucket */
+                RETURN(NULL);
+        }
+
+        obj_value = hop->lustre_hash_object_refcount_get(hash_item_hnode);
+        spin_unlock(&bucket->lhb_lock); /* lock the bucket */
+
+        RETURN(obj_value);
+}
+EXPORT_SYMBOL(lustre_hash_get_object_by_key);
+
+/*
+ * define (uuid <-> export) hash operations and function define
+ */
+
+/* define the uuid hash operations */
+struct lustre_hash_operations uuid_hash_operations = {
+        .lustre_hashfn = uuid_hashfn,
+        .lustre_hash_key_compare = uuid_hash_key_compare,
+        .lustre_hash_object_refcount_get = uuid_export_refcount_get,
+        .lustre_hash_object_refcount_put = uuid_export_refcount_put,
+};
+
+/* string hashing using djb2 hash algorithm */
+__u32 uuid_hashfn(struct lustre_class_hash_body *hash_body,  void * key)
+{
+        __u32 hash = 5381;
+        struct obd_uuid * uuid_key = NULL;
+        int c;
+        char *ptr = NULL;
+
+        LASSERT(key != NULL); 
+
+        uuid_key = (struct obd_uuid*)key;
+        ptr = uuid_key->uuid;
+
+        while ((c = *ptr++)) {
+                hash = hash * 33 + c;
+        }
+
+        hash &= (hash_body->lchb_hash_max_size - 1);
+
+        RETURN(hash);             
+}
+
+/* Note, it is impossible to find an export that is in failed state with
+ * this function */
+int uuid_hash_key_compare(void *key, struct hlist_node *compared_hnode)
+{
+        struct obd_export *export = NULL;
+        struct obd_uuid *uuid_key = NULL, *compared_uuid = NULL;
+
+        LASSERT( key != NULL);
+
+        uuid_key = (struct obd_uuid*)key;
+
+        export = hlist_entry(compared_hnode, struct obd_export, exp_uuid_hash);
+
+        compared_uuid = &export->exp_client_uuid;
+
+        RETURN(obd_uuid_equals(uuid_key, compared_uuid) &&
+               !export->exp_failed);
+}
+
+void * uuid_export_refcount_get(struct hlist_node * actual_hnode)
+{
+        struct obd_export *export = NULL;
+
+        LASSERT(actual_hnode != NULL);
+
+        export = hlist_entry(actual_hnode, struct obd_export, exp_uuid_hash);
+
+        LASSERT(export != NULL);
+
+        class_export_get(export);
+
+        RETURN(export);
+}
+
+void uuid_export_refcount_put(struct hlist_node * actual_hnode)
+{
+        struct obd_export *export = NULL;
+
+        LASSERT(actual_hnode != NULL);
+
+        export = hlist_entry(actual_hnode, struct obd_export, exp_uuid_hash);
+
+        LASSERT(export != NULL);
+
+        class_export_put(export);
+}
+
+/*
+ * define (nid <-> export) hash operations and function define
+ */
+
+/* define the nid hash operations */
+struct lustre_hash_operations nid_hash_operations = {
+        .lustre_hashfn = nid_hashfn,
+        .lustre_hash_key_compare = nid_hash_key_compare,
+        .lustre_hash_object_refcount_get = nid_export_refcount_get,
+        .lustre_hash_object_refcount_put = nid_export_refcount_put,
+};
+
+/* string hashing using djb2 hash algorithm */
+__u32 nid_hashfn(struct lustre_class_hash_body *hash_body,  void * key)
+{
+        __u32 hash = 5381;
+        int i;
+        char *ptr = key;
+
+        LASSERT(key != NULL); 
+
+        for(i = 0 ; i < sizeof(lnet_nid_t) ; i ++)
+                hash = hash * 33 + ptr[i];
+
+        hash &= (hash_body->lchb_hash_max_size - 1);
+
+        RETURN(hash);             
+}
+
+/* Note, it is impossible to find an export that is in failed state with
+ * this function */
+int nid_hash_key_compare(void *key, struct hlist_node *compared_hnode)
+{
+        struct obd_export *export = NULL;
+        lnet_nid_t *nid_key = NULL;
+
+        LASSERT( key != NULL);
+
+        nid_key = (lnet_nid_t*)key;
+
+        export = hlist_entry(compared_hnode, struct obd_export, exp_nid_hash);
+
+        return (export->exp_connection->c_peer.nid == *nid_key &&
+                !export->exp_failed);
+}
+
+void *nid_export_refcount_get(struct hlist_node *actual_hnode)
+{
+        struct obd_export *export = NULL;
+
+        LASSERT(actual_hnode != NULL);
+
+        export = hlist_entry(actual_hnode, struct obd_export, exp_nid_hash);
+
+        LASSERT(export != NULL);
+
+        class_export_get(export);
+
+        RETURN(export);
+}
+
+void nid_export_refcount_put(struct hlist_node *actual_hnode)
+{
+        struct obd_export *export = NULL;
+
+        LASSERT(actual_hnode != NULL);
+
+        export = hlist_entry(actual_hnode, struct obd_export, exp_nid_hash);
+
+        LASSERT(export != NULL);
+
+        class_export_put(export);
+}
+
+/*
+ * define (net_peer <-> connection) hash operations and function define
+ */
+
+/* define the conn hash operations */
+struct lustre_hash_operations conn_hash_operations = {
+        .lustre_hashfn = conn_hashfn,
+        .lustre_hash_key_compare = conn_hash_key_compare,
+        .lustre_hash_object_refcount_get = conn_refcount_get,
+        .lustre_hash_object_refcount_put = conn_refcount_put,
+};
+EXPORT_SYMBOL(conn_hash_operations);
+
+/* string hashing using djb2 hash algorithm */
+__u32 conn_hashfn(struct lustre_class_hash_body *hash_body,  void * key)
+{
+        __u32 hash = 5381;
+        char *ptr = key;
+        int i;
+
+        LASSERT(key != NULL); 
+
+        for(i = 0 ; i < sizeof(lnet_process_id_t) ; i ++)
+                hash = hash * 33 + ptr[i];
+
+        hash &= (hash_body->lchb_hash_max_size - 1);
+
+        RETURN(hash);             
+}
+
+int conn_hash_key_compare(void *key, struct hlist_node *compared_hnode)
+{
+        struct ptlrpc_connection *c = NULL;
+        lnet_process_id_t *conn_key = NULL;
+
+        LASSERT( key != NULL);
+
+        conn_key = (lnet_process_id_t*)key;
+
+        c = hlist_entry(compared_hnode, struct ptlrpc_connection, c_hash);
+
+        return (conn_key->nid == c->c_peer.nid &&
+                conn_key->pid == c->c_peer.pid);
+}
+
+void *conn_refcount_get(struct hlist_node *actual_hnode)
+{
+        struct ptlrpc_connection *c = NULL;
+
+        LASSERT(actual_hnode != NULL);
+
+        c = hlist_entry(actual_hnode, struct ptlrpc_connection, c_hash);
+
+        LASSERT(c != NULL);
+
+        atomic_inc(&c->c_refcount);
+
+        RETURN(c);
+}
+
+void conn_refcount_put(struct hlist_node *actual_hnode)
+{
+        struct ptlrpc_connection *c = NULL;
+
+        LASSERT(actual_hnode != NULL);
+
+        c = hlist_entry(actual_hnode, struct ptlrpc_connection, c_hash);
+
+        LASSERT(c != NULL);
+
+        atomic_dec(&c->c_refcount);
+}
+
index 59f1c29..ab15541 100644 (file)
@@ -32,6 +32,7 @@
 #include <obd_ost.h>
 #include <obd_class.h>
 #include <lprocfs_status.h>
 #include <obd_ost.h>
 #include <obd_class.h>
 #include <lprocfs_status.h>
+#include <class_hash.h>
 
 extern struct list_head obd_types;
 spinlock_t obd_types_lock;
 
 extern struct list_head obd_types;
 spinlock_t obd_types_lock;
@@ -689,9 +690,10 @@ void class_export_destroy(struct obd_export *exp)
 struct obd_export *class_new_export(struct obd_device *obd,
                                     struct obd_uuid *cluuid)
 {
 struct obd_export *class_new_export(struct obd_device *obd,
                                     struct obd_uuid *cluuid)
 {
-        struct obd_export *export, *tmp;
+        struct obd_export *export;
+        int rc = 0;
 
 
-        OBD_ALLOC(export, sizeof(*export));
+        OBD_ALLOC_PTR(export);
         if (!export)
                 return ERR_PTR(-ENOMEM);
 
         if (!export)
                 return ERR_PTR(-ENOMEM);
 
@@ -708,23 +710,26 @@ struct obd_export *class_new_export(struct obd_device *obd,
         class_handle_hash(&export->exp_handle, export_handle_addref);
         export->exp_last_request_time = CURRENT_SECONDS;
         spin_lock_init(&export->exp_lock);
         class_handle_hash(&export->exp_handle, export_handle_addref);
         export->exp_last_request_time = CURRENT_SECONDS;
         spin_lock_init(&export->exp_lock);
+        INIT_HLIST_NODE(&export->exp_uuid_hash);
+        INIT_HLIST_NODE(&export->exp_nid_hash);
 
         export->exp_client_uuid = *cluuid;
         obd_init_export(export);
 
         spin_lock(&obd->obd_dev_lock);
         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
 
         export->exp_client_uuid = *cluuid;
         obd_init_export(export);
 
         spin_lock(&obd->obd_dev_lock);
         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
-                list_for_each_entry(tmp, &obd->obd_exports, exp_obd_chain) {
-                        if (obd_uuid_equals(cluuid, &tmp->exp_client_uuid)) {
-                                spin_unlock(&obd->obd_dev_lock);
-                                CWARN("%s: denying duplicate export for %s\n",
-                                      obd->obd_name, cluuid->uuid);
-                                class_handle_unhash(&export->exp_handle);
-                                OBD_FREE_PTR(export);
-                                return ERR_PTR(-EALREADY);
-                        }
-                }
+               rc = lustre_hash_additem_unique(obd->obd_uuid_hash_body, cluuid, 
+                                               &export->exp_uuid_hash);
+               if (rc != 0) {
+                       CWARN("%s: denying duplicate export for %s\n",
+                             obd->obd_name, cluuid->uuid);
+                       spin_unlock(&obd->obd_dev_lock);
+                       class_handle_unhash(&export->exp_handle);
+                       OBD_FREE_PTR(export);
+                       return ERR_PTR(-EALREADY);
+               }
         }
         }
+
         LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
         class_incref(obd);
         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
         LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
         class_incref(obd);
         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
@@ -742,6 +747,11 @@ void class_unlink_export(struct obd_export *exp)
         class_handle_unhash(&exp->exp_handle);
 
         spin_lock(&exp->exp_obd->obd_dev_lock);
         class_handle_unhash(&exp->exp_handle);
 
         spin_lock(&exp->exp_obd->obd_dev_lock);
+        /* delete an uuid-export hashitem from hashtables */
+        if (!hlist_unhashed(&exp->exp_uuid_hash)) {
+                lustre_hash_delitem(exp->exp_obd->obd_uuid_hash_body, 
+                                    &exp->exp_client_uuid, &exp->exp_uuid_hash);
+        }
         list_del_init(&exp->exp_obd_chain);
         list_del_init(&exp->exp_obd_chain_timed);
         exp->exp_obd->obd_num_exports--;
         list_del_init(&exp->exp_obd_chain);
         list_del_init(&exp->exp_obd_chain_timed);
         exp->exp_obd->obd_num_exports--;
@@ -945,6 +955,11 @@ int class_disconnect(struct obd_export *export)
         spin_lock(&export->exp_lock);
         already_disconnected = export->exp_disconnected;
         export->exp_disconnected = 1;
         spin_lock(&export->exp_lock);
         already_disconnected = export->exp_disconnected;
         export->exp_disconnected = 1;
+
+        if (!hlist_unhashed(&export->exp_nid_hash)) {
+                lustre_hash_delitem(export->exp_obd->obd_nid_hash_body,
+                                    &export->exp_connection->c_peer.nid, &export->exp_nid_hash);
+        }
         spin_unlock(&export->exp_lock);
 
         /* class_cleanup(), abort_recovery(), and class_fail_export()
         spin_unlock(&export->exp_lock);
 
         /* class_cleanup(), abort_recovery(), and class_fail_export()
@@ -1004,12 +1019,8 @@ static void class_disconnect_export_list(struct list_head *list, int flags)
 
                 rc = obd_disconnect(fake_exp);
                 class_export_put(exp);
 
                 rc = obd_disconnect(fake_exp);
                 class_export_put(exp);
-                if (rc) {
-                        CDEBUG(D_HA, "disconnecting export %p failed: %d\n",
-                               exp, rc);
-                } else {
-                        CDEBUG(D_HA, "export %p disconnected\n", exp);
-                }
+                CDEBUG(D_HA, "disconnecting export %s (%p): rc %d\n",
+                       exp->exp_client_uuid.uuid, exp, rc);
         }
         EXIT;
 }
         }
         EXIT;
 }
@@ -1266,39 +1277,32 @@ char *obd_export_nid2str(struct obd_export *exp)
 }
 EXPORT_SYMBOL(obd_export_nid2str);
 
 }
 EXPORT_SYMBOL(obd_export_nid2str);
 
-#define EVICT_BATCH 32
 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
 {
 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
 {
-        struct obd_export *doomed_exp[EVICT_BATCH] = { NULL };
-        struct list_head *p;
-        int exports_evicted = 0, num_to_evict = 0, i;
+        struct obd_export *doomed_exp = NULL;
+        int exports_evicted = 0;
 
 
-search_again:
-        spin_lock(&obd->obd_dev_lock);
-        list_for_each(p, &obd->obd_exports) {
-                doomed_exp[num_to_evict] = list_entry(p, struct obd_export,
-                                                      exp_obd_chain);
-                if (strcmp(obd_export_nid2str(doomed_exp[num_to_evict]),
-                           nid) == 0) {
-                        class_export_get(doomed_exp[num_to_evict]);
-                        if (++num_to_evict == EVICT_BATCH)
-                                break;
-                }
-        }
-        spin_unlock(&obd->obd_dev_lock);
+        lnet_nid_t nid_key = libcfs_str2nid(nid);
+
+        do {
+                doomed_exp = lustre_hash_get_object_by_key(obd->obd_nid_hash_body,
+                                                           &nid_key);
+                if (doomed_exp == NULL)
+                        break;
 
 
-        for (i = 0; i < num_to_evict; i++) {
+                LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
+                         "nid %s found, wanted nid %s, requested nid %s\n",
+                         obd_export_nid2str(doomed_exp),
+                         libcfs_nid2str(nid_key), nid);        
+                LASSERTF(doomed_exp != obd->obd_self_export,
+                         "self-export is hashed by NID?\n");
                 exports_evicted++;
                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
                 exports_evicted++;
                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
-                       obd->obd_name, nid, doomed_exp[i]->exp_client_uuid.uuid,
+                       obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
                        exports_evicted);
                        exports_evicted);
-                class_fail_export(doomed_exp[i]);
-                class_export_put(doomed_exp[i]);
-        }
-        if (num_to_evict == EVICT_BATCH) {
-                num_to_evict = 0;
-                goto search_again;
-        }
+                class_fail_export(doomed_exp);
+                class_export_put(doomed_exp);
+        } while (1);
 
         if (!exports_evicted)
                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
 
         if (!exports_evicted)
                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
@@ -1310,27 +1314,17 @@ EXPORT_SYMBOL(obd_export_evict_by_nid);
 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
 {
         struct obd_export *doomed_exp = NULL;
 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
 {
         struct obd_export *doomed_exp = NULL;
-        struct list_head *p;
         struct obd_uuid doomed;
         int exports_evicted = 0;
 
         obd_str2uuid(&doomed, uuid);
         struct obd_uuid doomed;
         int exports_evicted = 0;
 
         obd_str2uuid(&doomed, uuid);
-        if(obd_uuid_equals(&doomed, &obd->obd_uuid)) {
+        if (obd_uuid_equals(&doomed, &obd->obd_uuid)) {
                 CERROR("%s: can't evict myself\n", obd->obd_name);
                 return exports_evicted;
         }
 
                 CERROR("%s: can't evict myself\n", obd->obd_name);
                 return exports_evicted;
         }
 
-        spin_lock(&obd->obd_dev_lock);
-        list_for_each(p, &obd->obd_exports) {
-                doomed_exp = list_entry(p, struct obd_export, exp_obd_chain);
-
-                if (obd_uuid_equals(&doomed, &doomed_exp->exp_client_uuid)) {
-                        class_export_get(doomed_exp);
-                        break;
-                }
-                doomed_exp = NULL;
-        }
-        spin_unlock(&obd->obd_dev_lock);
+        doomed_exp = lustre_hash_get_object_by_key(obd->obd_uuid_hash_body, 
+                                                   &doomed);
 
         if (doomed_exp == NULL) {
                 CERROR("%s: can't disconnect %s: no exports found\n",
 
         if (doomed_exp == NULL) {
                 CERROR("%s: can't disconnect %s: no exports found\n",
index dfdfec4..96fc122 100644 (file)
@@ -52,7 +52,16 @@ static struct handle_bucket {
 
 static atomic_t handle_count = ATOMIC_INIT(0);
 
 
 static atomic_t handle_count = ATOMIC_INIT(0);
 
+#ifdef __arch_um__
+/* For unknown reason, UML uses kmalloc rather than vmalloc to allocate
+ * memory(OBD_VMALLOC). Therefore, we have to redefine the
+ * HANDLE_HASH_SIZE to make the hash heads don't exceed 128K.
+ */
+#define HANDLE_HASH_SIZE 4096
+#else
 #define HANDLE_HASH_SIZE (1 << 14)
 #define HANDLE_HASH_SIZE (1 << 14)
+#endif /* ifdef __arch_um__ */
+
 #define HANDLE_HASH_MASK (HANDLE_HASH_SIZE - 1)
 
 /*
 #define HANDLE_HASH_MASK (HANDLE_HASH_SIZE - 1)
 
 /*
index 0ae76a3..0b5ec84 100644 (file)
 #include <lprocfs_status.h>
 #include <libcfs/list.h>
 #include <lustre_param.h>
 #include <lprocfs_status.h>
 #include <libcfs/list.h>
 #include <lustre_param.h>
+#include <class_hash.h>
+
+extern struct lustre_hash_operations uuid_hash_operations;
+extern struct lustre_hash_operations nid_hash_operations;
 
 /*********** string parsing utils *********/
 
 
 /*********** string parsing utils *********/
 
@@ -253,7 +257,21 @@ int class_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
         /* just leave this on forever.  I can't use obd_set_up here because
            other fns check that status, and we're not actually set up yet. */
         obd->obd_starting = 1;
         /* just leave this on forever.  I can't use obd_set_up here because
            other fns check that status, and we're not actually set up yet. */
         obd->obd_starting = 1;
+        /* create an uuid-export hash body */
+        err = lustre_hash_init(&obd->obd_uuid_hash_body, "UUID_HASH", 
+                               128, &uuid_hash_operations);
+        if (err) {
+                spin_unlock(&obd->obd_dev_lock);
+                GOTO(err_hash, err);
+        }
+
+        /* create a nid-export hash body */
+        err = lustre_hash_init(&obd->obd_nid_hash_body, "NID_HASH", 
+                               128, &nid_hash_operations);
         spin_unlock(&obd->obd_dev_lock);
         spin_unlock(&obd->obd_dev_lock);
+        if (err)
+                GOTO(err_hash, err);
 
         exp = class_new_export(obd, &obd->obd_uuid);
         if (IS_ERR(exp))
 
         exp = class_new_export(obd, &obd->obd_uuid);
         if (IS_ERR(exp))
@@ -279,10 +297,13 @@ int class_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
         RETURN(0);
 
 err_exp:
         RETURN(0);
 
 err_exp:
-        CERROR("setup %s failed (%d)\n", obd->obd_name, err);
         class_unlink_export(obd->obd_self_export);
         obd->obd_self_export = NULL;
         class_unlink_export(obd->obd_self_export);
         obd->obd_self_export = NULL;
+err_hash:
+        lustre_hash_exit(&obd->obd_uuid_hash_body);
+        lustre_hash_exit(&obd->obd_nid_hash_body);
         obd->obd_starting = 0;
         obd->obd_starting = 0;
+        CERROR("setup %s failed (%d)\n", obd->obd_name, err);
         RETURN(err);
 }
 
         RETURN(err);
 }
 
@@ -411,6 +432,12 @@ int class_cleanup(struct obd_device *obd, struct lustre_cfg *lcfg)
         }
         LASSERT(obd->obd_self_export);
 
         }
         LASSERT(obd->obd_self_export);
 
+        /* destroy an uuid-export hash body */
+        lustre_hash_exit(&obd->obd_uuid_hash_body);
+
+        /* destroy a nid-export hash body */
+        lustre_hash_exit(&obd->obd_nid_hash_body);
+
         /* Precleanup stage 1, we must make sure all exports (other than the
            self-export) get destroyed. */
         err = obd_precleanup(obd, OBD_CLEANUP_EXPORTS);
         /* Precleanup stage 1, we must make sure all exports (other than the
            self-export) get destroyed. */
         err = obd_precleanup(obd, OBD_CLEANUP_EXPORTS);
index 1d2e228..b2e0d85 100644 (file)
 #endif
 
 #include "ptlrpc_internal.h"
 #endif
 
 #include "ptlrpc_internal.h"
+#include <class_hash.h>
 
 static spinlock_t conn_lock;
 static struct list_head conn_list;
 static struct list_head conn_unused_list;
 
 static spinlock_t conn_lock;
 static struct list_head conn_list;
 static struct list_head conn_unused_list;
+static struct lustre_class_hash_body *conn_hash_body;
+static struct lustre_class_hash_body *conn_unused_hash_body;
+
+extern struct lustre_hash_operations conn_hash_operations;
 
 void ptlrpc_dump_connections(void)
 {
 
 void ptlrpc_dump_connections(void)
 {
@@ -58,26 +63,14 @@ struct ptlrpc_connection*
 ptlrpc_lookup_conn_locked (lnet_process_id_t peer)
 {
         struct ptlrpc_connection *c;
 ptlrpc_lookup_conn_locked (lnet_process_id_t peer)
 {
         struct ptlrpc_connection *c;
-        struct list_head         *tmp;
-
-        list_for_each(tmp, &conn_list) {
-                c = list_entry(tmp, struct ptlrpc_connection, c_link);
-
-                if (peer.nid == c->c_peer.nid &&
-                    peer.pid == c->c_peer.pid)
-                        return ptlrpc_connection_addref(c);
-        }
 
 
-        list_for_each(tmp, &conn_unused_list) {
-                c = list_entry(tmp, struct ptlrpc_connection, c_link);
+        c = lustre_hash_get_object_by_key(conn_hash_body, &peer);
+        if (c != NULL)
+                return c;
 
 
-                if (peer.nid == c->c_peer.nid &&
-                    peer.pid == c->c_peer.pid) {
-                        list_del(&c->c_link);
-                        list_add(&c->c_link, &conn_list);
-                        return ptlrpc_connection_addref(c);
-                }
-        }
+        c = lustre_hash_get_object_by_key(conn_unused_hash_body, &peer);
+        if (c != NULL)
+                return c;
 
         return NULL;
 }
 
         return NULL;
 }
@@ -88,6 +81,7 @@ struct ptlrpc_connection *ptlrpc_get_connection(lnet_process_id_t peer,
 {
         struct ptlrpc_connection *c;
         struct ptlrpc_connection *c2;
 {
         struct ptlrpc_connection *c;
         struct ptlrpc_connection *c2;
+        int rc = 0;
         ENTRY;
 
         CDEBUG(D_INFO, "self %s peer %s\n", 
         ENTRY;
 
         CDEBUG(D_INFO, "self %s peer %s\n", 
@@ -115,15 +109,26 @@ struct ptlrpc_connection *ptlrpc_get_connection(lnet_process_id_t peer,
         spin_lock(&conn_lock);
 
         c2 = ptlrpc_lookup_conn_locked(peer);
         spin_lock(&conn_lock);
 
         c2 = ptlrpc_lookup_conn_locked(peer);
-        if (c2 == NULL)
+        if (c2 == NULL) {
                 list_add(&c->c_link, &conn_list);
                 list_add(&c->c_link, &conn_list);
-        
+                rc = lustre_hash_additem_unique(conn_hash_body, &peer, 
+                                                &c->c_hash);
+                if (rc != 0) {
+                        list_del(&c->c_link);
+                        CERROR("Cannot add connection to conn_hash_body\n");
+                        goto out_conn;
+                }
+        }
+
+out_conn:
+
         spin_unlock(&conn_lock);
 
         spin_unlock(&conn_lock);
 
-        if (c2 == NULL)
+        if (c2 == NULL && rc == 0)
                 RETURN (c);
                 RETURN (c);
-        
-        OBD_FREE(c, sizeof(*c));
+
+        if (c != NULL) 
+                OBD_FREE(c, sizeof(*c));
         RETURN (c2);
 }
 
         RETURN (c2);
 }
 
@@ -141,16 +146,35 @@ int ptlrpc_put_connection(struct ptlrpc_connection *c)
                 c, atomic_read(&c->c_refcount) - 1, 
                 libcfs_nid2str(c->c_peer.nid));
 
                 c, atomic_read(&c->c_refcount) - 1, 
                 libcfs_nid2str(c->c_peer.nid));
 
-        if (atomic_dec_and_test(&c->c_refcount)) {
+        spin_lock(&conn_lock);
+        LASSERT(!hlist_unhashed(&c->c_hash));
+        spin_unlock(&conn_lock);
+
+        if (atomic_dec_return(&c->c_refcount) == 1) {
+
                 spin_lock(&conn_lock);
                 spin_lock(&conn_lock);
+
+                lustre_hash_delitem(conn_hash_body, &c->c_peer, &c->c_hash);
                 list_del(&c->c_link);
                 list_del(&c->c_link);
+
                 list_add(&c->c_link, &conn_unused_list);
                 list_add(&c->c_link, &conn_unused_list);
+                rc = lustre_hash_additem_unique(conn_unused_hash_body, &c->c_peer, 
+                                                &c->c_hash);
+                if (rc != 0) {
+                        spin_unlock(&conn_lock);
+                        CERROR("Cannot hash connection to conn_hash_body\n");
+                        GOTO(ret, rc);
+                }
+
                 spin_unlock(&conn_lock);
                 rc = 1;
                 spin_unlock(&conn_lock);
                 rc = 1;
-        }
+        } 
+
         if (atomic_read(&c->c_refcount) < 0)
                 CERROR("connection %p refcount %d!\n",
                        c, atomic_read(&c->c_refcount));
         if (atomic_read(&c->c_refcount) < 0)
                 CERROR("connection %p refcount %d!\n",
                        c, atomic_read(&c->c_refcount));
+ret :
 
         RETURN(rc);
 }
 
         RETURN(rc);
 }
@@ -165,11 +189,28 @@ struct ptlrpc_connection *ptlrpc_connection_addref(struct ptlrpc_connection *c)
         RETURN(c);
 }
 
         RETURN(c);
 }
 
-void ptlrpc_init_connection(void)
+int ptlrpc_init_connection(void)
 {
 {
+        int rc = 0;
         CFS_INIT_LIST_HEAD(&conn_list);
         CFS_INIT_LIST_HEAD(&conn_list);
+        rc = lustre_hash_init(&conn_hash_body, "CONN_HASH", 
+                              128, &conn_hash_operations);
+        if (rc)
+                GOTO(ret, rc);
+
         CFS_INIT_LIST_HEAD(&conn_unused_list);
         CFS_INIT_LIST_HEAD(&conn_unused_list);
+        rc = lustre_hash_init(&conn_unused_hash_body, "CONN_UNUSED_HASH", 
+                              128, &conn_hash_operations);
+        if (rc)
+                GOTO(ret, rc);
+
         spin_lock_init(&conn_lock);
         spin_lock_init(&conn_lock);
+ret:
+        if (rc) {
+                lustre_hash_exit(&conn_hash_body);
+                lustre_hash_exit(&conn_unused_hash_body);
+        }
+        RETURN(rc);
 }
 
 void ptlrpc_cleanup_connection(void)
 }
 
 void ptlrpc_cleanup_connection(void)
@@ -178,11 +219,15 @@ void ptlrpc_cleanup_connection(void)
         struct ptlrpc_connection *c;
 
         spin_lock(&conn_lock);
         struct ptlrpc_connection *c;
 
         spin_lock(&conn_lock);
+
+        lustre_hash_exit(&conn_unused_hash_body);
         list_for_each_safe(tmp, pos, &conn_unused_list) {
                 c = list_entry(tmp, struct ptlrpc_connection, c_link);
                 list_del(&c->c_link);
                 OBD_FREE(c, sizeof(*c));
         }
         list_for_each_safe(tmp, pos, &conn_unused_list) {
                 c = list_entry(tmp, struct ptlrpc_connection, c_link);
                 list_del(&c->c_link);
                 OBD_FREE(c, sizeof(*c));
         }
+
+        lustre_hash_exit(&conn_hash_body);
         list_for_each_safe(tmp, pos, &conn_list) {
                 c = list_entry(tmp, struct ptlrpc_connection, c_link);
                 CERROR("Connection %p/%s has refcount %d (nid=%s)\n",
         list_for_each_safe(tmp, pos, &conn_list) {
                 c = list_entry(tmp, struct ptlrpc_connection, c_link);
                 CERROR("Connection %p/%s has refcount %d (nid=%s)\n",
index b060b1e..65d986e 100644 (file)
@@ -70,22 +70,26 @@ __init int ptlrpc_init(void)
         cleanup_phase = 2;
 
         ptlrpc_init_connection();
         cleanup_phase = 2;
 
         ptlrpc_init_connection();
-        rc = llog_init_commit_master();
         if (rc)
                 GOTO(cleanup, rc);
         cleanup_phase = 3;
 
         if (rc)
                 GOTO(cleanup, rc);
         cleanup_phase = 3;
 
+        rc = llog_init_commit_master();
+        if (rc)
+                GOTO(cleanup, rc);
+        cleanup_phase = 4;
+
         ptlrpc_put_connection_superhack = ptlrpc_put_connection;
 
         rc = ptlrpc_start_pinger();
         if (rc)
                 GOTO(cleanup, rc);
         ptlrpc_put_connection_superhack = ptlrpc_put_connection;
 
         rc = ptlrpc_start_pinger();
         if (rc)
                 GOTO(cleanup, rc);
-        cleanup_phase = 4;
+        cleanup_phase = 5;
 
         rc = ldlm_init();
         if (rc)
                 GOTO(cleanup, rc);
 
         rc = ldlm_init();
         if (rc)
                 GOTO(cleanup, rc);
-        cleanup_phase = 5;
+        cleanup_phase = 6;
 
         rc = sptlrpc_init();
         if (rc)
 
         rc = sptlrpc_init();
         if (rc)
@@ -95,12 +99,13 @@ __init int ptlrpc_init(void)
 
 cleanup:
         switch(cleanup_phase) {
 
 cleanup:
         switch(cleanup_phase) {
-        case 5:
+        case 6:
                 ldlm_exit();
                 ldlm_exit();
-        case 4:
+        case 5:
                 ptlrpc_stop_pinger();
                 ptlrpc_stop_pinger();
-        case 3:
+        case 4:
                 llog_cleanup_commit_master(1);
                 llog_cleanup_commit_master(1);
+        case 3:
                 ptlrpc_cleanup_connection();
         case 2:
                 ptlrpc_exit_portals();
                 ptlrpc_cleanup_connection();
         case 2:
                 ptlrpc_exit_portals();