From fd330cf53033d8721ead0489448b15c58329657c Mon Sep 17 00:00:00 2001 From: tappro Date: Tue, 6 Nov 2007 13:03:56 +0000 Subject: [PATCH] - hash-based export handling b:13815 i:green, shadow --- lustre/include/Makefile.am | 2 +- lustre/include/class_hash.h | 117 +++++++++ lustre/include/liblustre.h | 3 + lustre/include/lustre_export.h | 2 + lustre/include/lustre_net.h | 3 +- lustre/include/obd.h | 5 + lustre/ldlm/ldlm_lib.c | 71 +++-- lustre/obdclass/Makefile.in | 2 +- lustre/obdclass/autoMakefile.am | 2 +- lustre/obdclass/class_hash.c | 547 +++++++++++++++++++++++++++++++++++++++ lustre/obdclass/genops.c | 108 ++++---- lustre/obdclass/lustre_handles.c | 9 + lustre/obdclass/obd_config.c | 29 ++- lustre/ptlrpc/connection.c | 97 +++++-- lustre/ptlrpc/ptlrpc_module.c | 17 +- 15 files changed, 884 insertions(+), 130 deletions(-) create mode 100644 lustre/include/class_hash.h create mode 100644 lustre/obdclass/class_hash.c diff --git a/lustre/include/Makefile.am b/lustre/include/Makefile.am index 0a5f452..adf7c6e 100644 --- a/lustre/include/Makefile.am +++ b/lustre/include/Makefile.am @@ -11,7 +11,7 @@ EXTRA_DIST = ioctl.h liblustre.h lprocfs_status.h lustre_cfg.h \ lustre_dlm.h lustre_export.h lustre_fsfilt.h lustre_ha.h \ lustre_handles.h lustre_import.h lustre_lib.h lustre_sec.h \ lustre_lite.h lustre_log.h lustre_mds.h lustre_mdc.h \ - lustre_net.h lustre_quota.h lustre_ucache.h lvfs.h \ + lustre_net.h lustre_quota.h lustre_ucache.h lvfs.h class_hash.h \ obd_cache.h obd_class.h obd_echo.h obd.h obd_lov.h \ obd_ost.h obd_support.h lustre_ver.h lu_object.h lu_time.h \ md_object.h dt_object.h lustre_param.h lustre_mdt.h \ diff --git a/lustre/include/class_hash.h b/lustre/include/class_hash.h new file mode 100644 index 0000000..5e8a368 --- /dev/null +++ b/lustre/include/class_hash.h @@ -0,0 +1,117 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + */ + +#ifndef __CLASS_HASH_H +#define __CLASS_HASH_H + +#include + +/* define the hash bucket*/ +struct lustre_hash_bucket { + struct hlist_head lhb_head; + spinlock_t lhb_lock; +#ifdef LUSTRE_HASH_DEBUG + /* the number of hash item per bucket, + * it will help us to analyse the hash distribute + */ + int lhb_item_count; +#endif +}; + +struct lustre_hash_operations; + +struct lustre_class_hash_body { + char hashname[128]; + spinlock_t lchb_lock; /* body lock */ + struct lustre_hash_bucket *lchb_hash_tables; + __u32 lchb_hash_max_size; /* define the hash tables size */ + /* define the hash operations */ + struct lustre_hash_operations *lchb_hash_operations; +}; + +/* hash operations method define */ +struct lustre_hash_operations { + __u32 (*lustre_hashfn) (struct lustre_class_hash_body *hash_body, + void *key); + int (*lustre_hash_key_compare) (void *key, + struct hlist_node *compared_hnode); + /* add refcount */ + void* (*lustre_hash_object_refcount_get) (struct hlist_node *hash_item); + /* dec refcount */ + void (*lustre_hash_object_refcount_put) (struct hlist_node *hash_item); +}; + +static inline struct hlist_node * +lustre_hash_getitem_in_bucket_nolock(struct lustre_class_hash_body *hash_body, + int hashent, void *key) +{ + struct lustre_hash_bucket *bucket; + struct hlist_node *hash_item_node; + struct lustre_hash_operations *hop = hash_body->lchb_hash_operations; + int find = 0; + ENTRY; + + bucket = &hash_body->lchb_hash_tables[hashent]; + hlist_for_each(hash_item_node, &(bucket->lhb_head)) { + find = hop->lustre_hash_key_compare(key, hash_item_node); + if (find == 1) + break; + } + RETURN(find == 1 ? hash_item_node : NULL); +} + +static inline int +lustre_hash_delitem_nolock(struct lustre_class_hash_body *hash_body, + int hashent, struct hlist_node * hash_item) +{ + struct lustre_hash_operations *hop = hash_body->lchb_hash_operations; + + hlist_del_init(hash_item); + + hop->lustre_hash_object_refcount_put(hash_item); + +#ifdef LUSTRE_HASH_DEBUG + hash_body->lchb_hash_tables[hashent].lhb_item_count--; + CDEBUG(D_INFO, "hashname[%s] bucket[%d] has [%d] hashitem\n", + hash_body->hashname, hashent, + hash_body->lchb_hash_tables[hashent].lhb_item_count); +#endif + + RETURN(0); +} + +int lustre_hash_init(struct lustre_class_hash_body **hash_body, + char *hashname, __u32 hashsize, + struct lustre_hash_operations *hash_operations); +void lustre_hash_exit(struct lustre_class_hash_body **hash_body); +int lustre_hash_additem_unique(struct lustre_class_hash_body *hash_body, + void *key, struct hlist_node *actual_hnode); +int lustre_hash_additem(struct lustre_class_hash_body *hash_body, void *key, + struct hlist_node *actual_hnode); +int lustre_hash_delitem_by_key(struct lustre_class_hash_body *hash_body, + void *key); +int lustre_hash_delitem(struct lustre_class_hash_body *hash_body, void *key, + struct hlist_node *hash_item); +void * lustre_hash_get_object_by_key(struct lustre_class_hash_body *hash_body, + void *key); + +/* ( uuid <-> export ) hash operations define */ +__u32 uuid_hashfn(struct lustre_class_hash_body *hash_body, void * key); +int uuid_hash_key_compare(void *key, struct hlist_node * compared_hnode); +void * uuid_export_refcount_get(struct hlist_node * actual_hnode); +void uuid_export_refcount_put(struct hlist_node * actual_hnode); + +/* ( nid <-> export ) hash operations define */ +__u32 nid_hashfn(struct lustre_class_hash_body *hash_body, void * key); +int nid_hash_key_compare(void *key, struct hlist_node * compared_hnode); +void * nid_export_refcount_get(struct hlist_node * actual_hnode); +void nid_export_refcount_put(struct hlist_node * actual_hnode); + +/* ( net_peer <-> connection ) hash operations define */ +__u32 conn_hashfn(struct lustre_class_hash_body *hash_body, void * key); +int conn_hash_key_compare(void *key, struct hlist_node * compared_hnode); +void * conn_refcount_get(struct hlist_node * actual_hnode); +void conn_refcount_put(struct hlist_node * actual_hnode); + +#endif /* __CLASS_HASH_H */ diff --git a/lustre/include/liblustre.h b/lustre/include/liblustre.h index c809c7c..a9c3a91 100644 --- a/lustre/include/liblustre.h +++ b/lustre/include/liblustre.h @@ -726,6 +726,7 @@ static inline void del_timer(struct timer_list *l) typedef struct { volatile int counter; } atomic_t; #define ATOMIC_INIT(i) { (i) } + #define atomic_read(a) ((a)->counter) #define atomic_set(a,b) do {(a)->counter = b; } while (0) #define atomic_dec_and_test(a) ((--((a)->counter)) == 0) @@ -734,6 +735,8 @@ typedef struct { volatile int counter; } atomic_t; #define atomic_dec(a) do { (a)->counter--; } while (0) #define atomic_add(b,a) do {(a)->counter += b;} while (0) #define atomic_sub(b,a) do {(a)->counter -= b;} while (0) +#define atomic_sub_return(n,a) ((a)->counter -= n) +#define atomic_dec_return(a) atomic_sub_return(1,a) #ifndef likely #define likely(exp) (exp) diff --git a/lustre/include/lustre_export.h b/lustre/include/lustre_export.h index 787094c..5c8719d 100644 --- a/lustre/include/lustre_export.h +++ b/lustre/include/lustre_export.h @@ -85,6 +85,8 @@ struct obd_export { atomic_t exp_rpc_count; struct obd_uuid exp_client_uuid; struct list_head exp_obd_chain; + struct hlist_node exp_uuid_hash; /* uuid-export hash*/ + struct hlist_node exp_nid_hash; /* nid-export hash */ /* exp_obd_chain_timed fo ping evictor, protected by obd_dev_lock */ struct list_head exp_obd_chain_timed; struct obd_device *exp_obd; diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h index b55d69e..5baf93a 100644 --- a/lustre/include/lustre_net.h +++ b/lustre/include/lustre_net.h @@ -167,6 +167,7 @@ struct ptlrpc_connection { struct list_head c_link; + struct hlist_node c_hash; lnet_nid_t c_self; lnet_process_id_t c_peer; struct obd_uuid c_remote_uuid; @@ -671,7 +672,7 @@ struct ptlrpc_connection *ptlrpc_get_connection(lnet_process_id_t peer, lnet_nid_t self, struct obd_uuid *uuid); int ptlrpc_put_connection(struct ptlrpc_connection *c); struct ptlrpc_connection *ptlrpc_connection_addref(struct ptlrpc_connection *); -void ptlrpc_init_connection(void); +int ptlrpc_init_connection(void); void ptlrpc_cleanup_connection(void); extern lnet_pid_t ptl_get_pid(void); diff --git a/lustre/include/obd.h b/lustre/include/obd.h index 9ef5cce..4c40002 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -34,6 +34,7 @@ #include #include #include +#include #define MAX_OBD_DEVICES 8192 @@ -847,6 +848,10 @@ struct obd_device { obd_fail:1, /* cleanup with failover */ obd_async_recov:1, /* allow asyncronous orphan cleanup */ obd_no_conn:1; /* deny new connections */ + /* uuid-export hash body */ + struct lustre_class_hash_body *obd_uuid_hash_body; + /* nid-export hash body */ + struct lustre_class_hash_body *obd_nid_hash_body; atomic_t obd_refcount; cfs_waitq_t obd_refcount_waitq; struct list_head obd_exports; diff --git a/lustre/ldlm/ldlm_lib.c b/lustre/ldlm/ldlm_lib.c index b2f8c44..18306b2 100644 --- a/lustre/ldlm/ldlm_lib.c +++ b/lustre/ldlm/ldlm_lib.c @@ -568,7 +568,6 @@ int target_handle_connect(struct ptlrpc_request *req) struct obd_uuid tgtuuid; struct obd_uuid cluuid; struct obd_uuid remote_uuid; - struct list_head *p; char *str, *tmp; int rc = 0; int initial_conn = 0; @@ -689,43 +688,35 @@ int target_handle_connect(struct ptlrpc_request *req) goto dont_check_exports; spin_lock(&target->obd_dev_lock); - list_for_each(p, &target->obd_exports) { - export = list_entry(p, struct obd_export, exp_obd_chain); - if (obd_uuid_equals(&cluuid, &export->exp_client_uuid)) { - if (export->exp_connecting) { /* bug 9635, et. al. */ - CWARN("%s: exp %p already connecting\n", - export->exp_obd->obd_name, export); - export = NULL; - rc = -EALREADY; - break; - } - - /* make darn sure this is coming from the same peer - * if the UUIDs matched */ - if ((export->exp_connection != NULL) && - (strcmp(libcfs_nid2str(req->rq_peer.nid), - libcfs_nid2str(export->exp_connection->c_peer.nid)))) { - CWARN("%s: cookie %s seen on new NID %s when " - "existing NID %s is already connected\n", - target->obd_name, cluuid.uuid, - libcfs_nid2str(req->rq_peer.nid), - libcfs_nid2str(export->exp_connection->c_peer.nid)); - export = NULL; - rc = -EALREADY; - break; - } + export = lustre_hash_get_object_by_key(target->obd_uuid_hash_body, &cluuid); - spin_lock(&export->exp_lock); - export->exp_connecting = 1; - spin_unlock(&export->exp_lock); - spin_unlock(&target->obd_dev_lock); - LASSERT(export->exp_obd == target); - - rc = target_handle_reconnect(&conn, export, &cluuid, - initial_conn); - break; - } + if (export != NULL && export->exp_connecting) { /* bug 9635, et. al. */ + CWARN("%s: exp %p already connecting\n", + export->exp_obd->obd_name, export); + class_export_put(export); export = NULL; + rc = -EALREADY; + } else if (export != NULL && export->exp_connection != NULL && + req->rq_peer.nid != export->exp_connection->c_peer.nid) { + /* make darn sure this is coming from the same peer + * if the UUIDs matched */ + CWARN("%s: cookie %s seen on new NID %s when " + "existing NID %s is already connected\n", + target->obd_name, cluuid.uuid, + libcfs_nid2str(req->rq_peer.nid), + libcfs_nid2str(export->exp_connection->c_peer.nid)); + class_export_put(export); + export = NULL; + rc = -EALREADY; + } else if (export != NULL) { + spin_lock(&export->exp_lock); + export->exp_connecting = 1; + spin_unlock(&export->exp_lock); + class_export_put(export); + spin_unlock(&target->obd_dev_lock); + LASSERT(export->exp_obd == target); + + rc = target_handle_reconnect(&conn, export, &cluuid, initial_conn); } /* If we found an export, we already unlocked. */ @@ -871,6 +862,14 @@ dont_check_exports: req->rq_self, &remote_uuid); + spin_lock(&target->obd_dev_lock); + /* Export might be hashed already, e.g. if this is reconnect */ + if (hlist_unhashed(&export->exp_nid_hash)) + lustre_hash_additem(export->exp_obd->obd_nid_hash_body, + &export->exp_connection->c_peer.nid, + &export->exp_nid_hash); + spin_unlock(&target->obd_dev_lock); + spin_lock_bh(&target->obd_processing_task_lock); if (target->obd_recovering && !export->exp_in_recovery) { spin_lock(&export->exp_lock); diff --git a/lustre/obdclass/Makefile.in b/lustre/obdclass/Makefile.in index ffd9cbf..30b7361 100644 --- a/lustre/obdclass/Makefile.in +++ b/lustre/obdclass/Makefile.in @@ -20,7 +20,7 @@ sources: endif obdclass-all-objs := llog.o llog_cat.o llog_lvfs.o llog_obd.o llog_swab.o -obdclass-all-objs += class_obd.o +obdclass-all-objs += class_obd.o class_hash.o obdclass-all-objs += debug.o genops.o uuid.o llog_ioctl.o obdclass-all-objs += lprocfs_status.o lustre_handles.o lustre_peer.o obdclass-all-objs += statfs_pack.o obdo.o obd_config.o obd_mount.o prng.o mea.o diff --git a/lustre/obdclass/autoMakefile.am b/lustre/obdclass/autoMakefile.am index 5f70443..7a6fdce 100644 --- a/lustre/obdclass/autoMakefile.am +++ b/lustre/obdclass/autoMakefile.am @@ -8,7 +8,7 @@ if LIBLUSTRE noinst_LIBRARIES = liblustreclass.a liblustreclass_a_SOURCES = class_obd.c debug.c genops.c statfs_pack.c mea.c uuid.c -liblustreclass_a_SOURCES += lustre_handles.c lustre_peer.c lprocfs_status.c +liblustreclass_a_SOURCES += lustre_handles.c lustre_peer.c lprocfs_status.c class_hash.c liblustreclass_a_SOURCES += obdo.c obd_config.c llog.c llog_obd.c llog_cat.c liblustreclass_a_SOURCES += llog_lvfs.c llog_swab.c capa.c liblustreclass_a_SOURCES += prng.c #llog_ioctl.c rbtree.c diff --git a/lustre/obdclass/class_hash.c b/lustre/obdclass/class_hash.c new file mode 100644 index 0000000..766e16d --- /dev/null +++ b/lustre/obdclass/class_hash.c @@ -0,0 +1,547 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Copyright (C) 2005 Cluster File Systems, Inc. + * Author: YuZhangyong + * + * This file is part of Lustre, http://www.lustre.org/ + * + * No redistribution or use is permitted outside of Cluster File Systems, Inc. + * + * Implement a hash class for hash process in lustre system. + */ + +#ifndef __KERNEL__ +#include +#include +#endif + +#include +#include +#include +#include +#include + +int lustre_hash_init(struct lustre_class_hash_body **hash_body_new, + char *hashname, __u32 hashsize, + struct lustre_hash_operations *hash_operations) +{ + int i, n = 0; + struct lustre_class_hash_body *hash_body = NULL; + + LASSERT(hashsize > 0); + LASSERT(hash_operations != NULL); + ENTRY; + + i = hashsize; + while (i != 0) { + if (i & 0x1) + n++; + i >>= 1; + } + + LASSERTF(n == 1, "hashsize %u isn't 2^n\n", hashsize); + + /* alloc space for hash_body */ + OBD_ALLOC(hash_body, sizeof(*hash_body)); + + if (hash_body == NULL) { + CERROR("Cannot alloc space for hash body, hashname = %s \n", + hashname); + RETURN(-ENOMEM); + } + + LASSERT(hashname != NULL && + strlen(hashname) <= sizeof(hash_body->hashname)); + strcpy(hash_body->hashname, hashname); + hash_body->lchb_hash_max_size = hashsize; + hash_body->lchb_hash_operations = hash_operations; + + /* alloc space for the hash tables */ + OBD_ALLOC(hash_body->lchb_hash_tables, + sizeof(*hash_body->lchb_hash_tables) * hash_body->lchb_hash_max_size); + + if (hash_body->lchb_hash_tables == NULL) { + OBD_FREE(hash_body, sizeof(*hash_body)); + CERROR("Cannot alloc space for hashtables, hashname = %s \n", + hash_body->hashname); + RETURN(-ENOMEM); + } + + spin_lock_init(&hash_body->lchb_lock); /* initialize the body lock */ + + for(i = 0 ; i < hash_body->lchb_hash_max_size; i++) { + /* initial the bucket lock and list_head */ + INIT_HLIST_HEAD(&hash_body->lchb_hash_tables[i].lhb_head); + spin_lock_init(&hash_body->lchb_hash_tables[i].lhb_lock); + } + *hash_body_new = hash_body; + + RETURN(0); +} +EXPORT_SYMBOL(lustre_hash_init); + +void lustre_hash_exit(struct lustre_class_hash_body **new_hash_body) +{ + int i; + struct lustre_class_hash_body *hash_body = NULL; + ENTRY; + + hash_body = *new_hash_body; + + if (hash_body == NULL) { + CWARN("hash body has been deleted\n"); + goto out_hash; + } + + spin_lock(&hash_body->lchb_lock); /* lock the hash tables */ + + if (hash_body->lchb_hash_tables == NULL ) { + spin_unlock(&hash_body->lchb_lock); + CWARN("hash tables has been deleted\n"); + goto out_hash; + } + + for( i = 0; i < hash_body->lchb_hash_max_size; i++ ) { + struct lustre_hash_bucket * bucket; + struct hlist_node * actual_hnode, *pos; + + bucket = &hash_body->lchb_hash_tables[i]; + spin_lock(&bucket->lhb_lock); /* lock the bucket */ + hlist_for_each_safe(actual_hnode, pos, &(bucket->lhb_head)) { + lustre_hash_delitem_nolock(hash_body, i, actual_hnode); + } + spin_unlock(&bucket->lhb_lock); + } + + /* free the hash_tables's memory space */ + OBD_FREE(hash_body->lchb_hash_tables, + sizeof(*hash_body->lchb_hash_tables) * hash_body->lchb_hash_max_size); + + hash_body->lchb_hash_tables = NULL; + + spin_unlock(&hash_body->lchb_lock); + +out_hash : + /* free the hash_body's memory space */ + if (hash_body != NULL) { + OBD_FREE(hash_body, sizeof(*hash_body)); + *new_hash_body = NULL; + } + EXIT; +} +EXPORT_SYMBOL(lustre_hash_exit); + +/* + * only allow unique @key in hashtables, if the same @key has existed + * in hashtables, it will return with fails. + */ +int lustre_hash_additem_unique(struct lustre_class_hash_body *hash_body, + void *key, struct hlist_node *actual_hnode) +{ + int hashent; + struct lustre_hash_bucket *bucket = NULL; + struct lustre_hash_operations *hop = hash_body->lchb_hash_operations; + ENTRY; + + LASSERT(hlist_unhashed(actual_hnode)); + hashent = hop->lustre_hashfn(hash_body, key); + + /* get the hash-bucket and lock it */ + bucket = &hash_body->lchb_hash_tables[hashent]; + spin_lock(&bucket->lhb_lock); + + if ( (lustre_hash_getitem_in_bucket_nolock(hash_body, hashent, key)) != NULL) { + /* the added-item exist in hashtables, so cannot add it again */ + spin_unlock(&bucket->lhb_lock); + + CWARN("Already found the key in hash [%s]\n", + hash_body->hashname); + RETURN(-EALREADY); + } + + hlist_add_head(actual_hnode, &(bucket->lhb_head)); + +#ifdef LUSTRE_HASH_DEBUG + /* hash distribute debug */ + hash_body->lchb_hash_tables[hashent].lhb_item_count++; + CDEBUG(D_INFO, "hashname[%s] bucket[%d] has [%d] hashitem\n", + hash_body->hashname, hashent, + hash_body->lchb_hash_tables[hashent].lhb_item_count); +#endif + hop->lustre_hash_object_refcount_get(actual_hnode); + + spin_unlock(&bucket->lhb_lock); + + RETURN(0); +} +EXPORT_SYMBOL(lustre_hash_additem_unique); + +/* + * this version of additem, it allow multi same @key in hashtables. + * in this additem version, we don't need to check if exist same @key in hash + * tables, we only add it to related hashbucket. + * example: maybe same nid will be related to multi difference export + */ +int lustre_hash_additem(struct lustre_class_hash_body *hash_body, void *key, + struct hlist_node *actual_hnode) +{ + int hashent; + struct lustre_hash_bucket *bucket = NULL; + struct lustre_hash_operations *hop = hash_body->lchb_hash_operations; + ENTRY; + + LASSERT(hlist_unhashed(actual_hnode)); + + hashent = hop->lustre_hashfn(hash_body, key); + + /* get the hashbucket and lock it */ + bucket = &hash_body->lchb_hash_tables[hashent]; + spin_lock(&bucket->lhb_lock); + + hlist_add_head(actual_hnode, &(bucket->lhb_head)); + +#ifdef LUSTRE_HASH_DEBUG + /* hash distribute debug */ + hash_body->lchb_hash_tables[hashent].lhb_item_count++; + CDEBUG(D_INFO, "hashname[%s] bucket[%d] has [%d] hashitem\n", + hash_body->hashname, hashent, + hash_body->lchb_hash_tables[hashent].lhb_item_count); +#endif + hop->lustre_hash_object_refcount_get(actual_hnode); + + spin_unlock(&bucket->lhb_lock); + + RETURN(0); +} +EXPORT_SYMBOL(lustre_hash_additem); + + +/* + * this version of delitem will delete a hashitem with given @key, + * we need to search the <@key, @value> in hashbucket with @key, + * if match, the hashitem will be delete. + * we have a no-search version of delitem, it will directly delete a hashitem, + * doesn't need to search it in hashtables, so it is a O(1) delete. + */ +int lustre_hash_delitem_by_key(struct lustre_class_hash_body *hash_body, + void *key) +{ + int hashent ; + struct hlist_node * hash_item; + struct lustre_hash_bucket *bucket = NULL; + struct lustre_hash_operations *hop = hash_body->lchb_hash_operations; + int retval = 0; + ENTRY; + + hashent = hop->lustre_hashfn(hash_body, key); + + /* first, lock the hashbucket */ + bucket = &hash_body->lchb_hash_tables[hashent]; + spin_lock(&bucket->lhb_lock); + + /* get the hash_item from hash_bucket */ + hash_item = lustre_hash_getitem_in_bucket_nolock(hash_body, hashent, + key); + + if (hash_item == NULL) { + spin_unlock(&bucket->lhb_lock); + RETURN(-ENOENT); + } + + /* call delitem_nolock() to delete the hash_item */ + retval = lustre_hash_delitem_nolock(hash_body, hashent, hash_item); + + spin_unlock(&bucket->lhb_lock); + + RETURN(retval); +} +EXPORT_SYMBOL(lustre_hash_delitem_by_key); + +/* + * the O(1) version of delete hash item, + * it will directly delete the hashitem with given @hash_item, + * the parameter @key used to get the relation hash bucket and lock it. + */ +int lustre_hash_delitem(struct lustre_class_hash_body *hash_body, + void *key, struct hlist_node * hash_item) +{ + int hashent = 0; + int retval = 0; + struct lustre_hash_bucket *bucket = NULL; + struct lustre_hash_operations *hop = hash_body->lchb_hash_operations; + ENTRY; + + hashent = hop->lustre_hashfn(hash_body, key); + + bucket = &hash_body->lchb_hash_tables[hashent]; + spin_lock(&bucket->lhb_lock); + + /* call delitem_nolock() to delete the hash_item */ + retval = lustre_hash_delitem_nolock(hash_body, hashent, hash_item); + + spin_unlock(&bucket->lhb_lock); + + RETURN(retval); +} +EXPORT_SYMBOL(lustre_hash_delitem); + +void * lustre_hash_get_object_by_key(struct lustre_class_hash_body *hash_body, + void *key) +{ + int hashent ; + struct hlist_node * hash_item_hnode = NULL; + void * obj_value = NULL; + struct lustre_hash_bucket *bucket = NULL; + struct lustre_hash_operations * hop = hash_body->lchb_hash_operations; + ENTRY; + + /* get the hash value from the given item */ + hashent = hop->lustre_hashfn(hash_body, key); + + bucket = &hash_body->lchb_hash_tables[hashent]; + spin_lock(&bucket->lhb_lock); /* lock the bucket */ + + hash_item_hnode = lustre_hash_getitem_in_bucket_nolock(hash_body, + hashent, key); + + if (hash_item_hnode == NULL) { + spin_unlock(&bucket->lhb_lock); /* lock the bucket */ + RETURN(NULL); + } + + obj_value = hop->lustre_hash_object_refcount_get(hash_item_hnode); + spin_unlock(&bucket->lhb_lock); /* lock the bucket */ + + RETURN(obj_value); +} +EXPORT_SYMBOL(lustre_hash_get_object_by_key); + +/* + * define (uuid <-> export) hash operations and function define + */ + +/* define the uuid hash operations */ +struct lustre_hash_operations uuid_hash_operations = { + .lustre_hashfn = uuid_hashfn, + .lustre_hash_key_compare = uuid_hash_key_compare, + .lustre_hash_object_refcount_get = uuid_export_refcount_get, + .lustre_hash_object_refcount_put = uuid_export_refcount_put, +}; + +/* string hashing using djb2 hash algorithm */ +__u32 uuid_hashfn(struct lustre_class_hash_body *hash_body, void * key) +{ + __u32 hash = 5381; + struct obd_uuid * uuid_key = NULL; + int c; + char *ptr = NULL; + + LASSERT(key != NULL); + + uuid_key = (struct obd_uuid*)key; + ptr = uuid_key->uuid; + + while ((c = *ptr++)) { + hash = hash * 33 + c; + } + + hash &= (hash_body->lchb_hash_max_size - 1); + + RETURN(hash); +} + +/* Note, it is impossible to find an export that is in failed state with + * this function */ +int uuid_hash_key_compare(void *key, struct hlist_node *compared_hnode) +{ + struct obd_export *export = NULL; + struct obd_uuid *uuid_key = NULL, *compared_uuid = NULL; + + LASSERT( key != NULL); + + uuid_key = (struct obd_uuid*)key; + + export = hlist_entry(compared_hnode, struct obd_export, exp_uuid_hash); + + compared_uuid = &export->exp_client_uuid; + + RETURN(obd_uuid_equals(uuid_key, compared_uuid) && + !export->exp_failed); +} + +void * uuid_export_refcount_get(struct hlist_node * actual_hnode) +{ + struct obd_export *export = NULL; + + LASSERT(actual_hnode != NULL); + + export = hlist_entry(actual_hnode, struct obd_export, exp_uuid_hash); + + LASSERT(export != NULL); + + class_export_get(export); + + RETURN(export); +} + +void uuid_export_refcount_put(struct hlist_node * actual_hnode) +{ + struct obd_export *export = NULL; + + LASSERT(actual_hnode != NULL); + + export = hlist_entry(actual_hnode, struct obd_export, exp_uuid_hash); + + LASSERT(export != NULL); + + class_export_put(export); +} + +/* + * define (nid <-> export) hash operations and function define + */ + +/* define the nid hash operations */ +struct lustre_hash_operations nid_hash_operations = { + .lustre_hashfn = nid_hashfn, + .lustre_hash_key_compare = nid_hash_key_compare, + .lustre_hash_object_refcount_get = nid_export_refcount_get, + .lustre_hash_object_refcount_put = nid_export_refcount_put, +}; + +/* string hashing using djb2 hash algorithm */ +__u32 nid_hashfn(struct lustre_class_hash_body *hash_body, void * key) +{ + __u32 hash = 5381; + int i; + char *ptr = key; + + LASSERT(key != NULL); + + for(i = 0 ; i < sizeof(lnet_nid_t) ; i ++) + hash = hash * 33 + ptr[i]; + + hash &= (hash_body->lchb_hash_max_size - 1); + + RETURN(hash); +} + +/* Note, it is impossible to find an export that is in failed state with + * this function */ +int nid_hash_key_compare(void *key, struct hlist_node *compared_hnode) +{ + struct obd_export *export = NULL; + lnet_nid_t *nid_key = NULL; + + LASSERT( key != NULL); + + nid_key = (lnet_nid_t*)key; + + export = hlist_entry(compared_hnode, struct obd_export, exp_nid_hash); + + return (export->exp_connection->c_peer.nid == *nid_key && + !export->exp_failed); +} + +void *nid_export_refcount_get(struct hlist_node *actual_hnode) +{ + struct obd_export *export = NULL; + + LASSERT(actual_hnode != NULL); + + export = hlist_entry(actual_hnode, struct obd_export, exp_nid_hash); + + LASSERT(export != NULL); + + class_export_get(export); + + RETURN(export); +} + +void nid_export_refcount_put(struct hlist_node *actual_hnode) +{ + struct obd_export *export = NULL; + + LASSERT(actual_hnode != NULL); + + export = hlist_entry(actual_hnode, struct obd_export, exp_nid_hash); + + LASSERT(export != NULL); + + class_export_put(export); +} + +/* + * define (net_peer <-> connection) hash operations and function define + */ + +/* define the conn hash operations */ +struct lustre_hash_operations conn_hash_operations = { + .lustre_hashfn = conn_hashfn, + .lustre_hash_key_compare = conn_hash_key_compare, + .lustre_hash_object_refcount_get = conn_refcount_get, + .lustre_hash_object_refcount_put = conn_refcount_put, +}; +EXPORT_SYMBOL(conn_hash_operations); + +/* string hashing using djb2 hash algorithm */ +__u32 conn_hashfn(struct lustre_class_hash_body *hash_body, void * key) +{ + __u32 hash = 5381; + char *ptr = key; + int i; + + LASSERT(key != NULL); + + for(i = 0 ; i < sizeof(lnet_process_id_t) ; i ++) + hash = hash * 33 + ptr[i]; + + hash &= (hash_body->lchb_hash_max_size - 1); + + RETURN(hash); +} + +int conn_hash_key_compare(void *key, struct hlist_node *compared_hnode) +{ + struct ptlrpc_connection *c = NULL; + lnet_process_id_t *conn_key = NULL; + + LASSERT( key != NULL); + + conn_key = (lnet_process_id_t*)key; + + c = hlist_entry(compared_hnode, struct ptlrpc_connection, c_hash); + + return (conn_key->nid == c->c_peer.nid && + conn_key->pid == c->c_peer.pid); +} + +void *conn_refcount_get(struct hlist_node *actual_hnode) +{ + struct ptlrpc_connection *c = NULL; + + LASSERT(actual_hnode != NULL); + + c = hlist_entry(actual_hnode, struct ptlrpc_connection, c_hash); + + LASSERT(c != NULL); + + atomic_inc(&c->c_refcount); + + RETURN(c); +} + +void conn_refcount_put(struct hlist_node *actual_hnode) +{ + struct ptlrpc_connection *c = NULL; + + LASSERT(actual_hnode != NULL); + + c = hlist_entry(actual_hnode, struct ptlrpc_connection, c_hash); + + LASSERT(c != NULL); + + atomic_dec(&c->c_refcount); +} + diff --git a/lustre/obdclass/genops.c b/lustre/obdclass/genops.c index 59f1c29..ab15541 100644 --- a/lustre/obdclass/genops.c +++ b/lustre/obdclass/genops.c @@ -32,6 +32,7 @@ #include #include #include +#include extern struct list_head obd_types; spinlock_t obd_types_lock; @@ -689,9 +690,10 @@ void class_export_destroy(struct obd_export *exp) struct obd_export *class_new_export(struct obd_device *obd, struct obd_uuid *cluuid) { - struct obd_export *export, *tmp; + struct obd_export *export; + int rc = 0; - OBD_ALLOC(export, sizeof(*export)); + OBD_ALLOC_PTR(export); if (!export) return ERR_PTR(-ENOMEM); @@ -708,23 +710,26 @@ struct obd_export *class_new_export(struct obd_device *obd, class_handle_hash(&export->exp_handle, export_handle_addref); export->exp_last_request_time = CURRENT_SECONDS; spin_lock_init(&export->exp_lock); + INIT_HLIST_NODE(&export->exp_uuid_hash); + INIT_HLIST_NODE(&export->exp_nid_hash); export->exp_client_uuid = *cluuid; obd_init_export(export); spin_lock(&obd->obd_dev_lock); if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) { - list_for_each_entry(tmp, &obd->obd_exports, exp_obd_chain) { - if (obd_uuid_equals(cluuid, &tmp->exp_client_uuid)) { - spin_unlock(&obd->obd_dev_lock); - CWARN("%s: denying duplicate export for %s\n", - obd->obd_name, cluuid->uuid); - class_handle_unhash(&export->exp_handle); - OBD_FREE_PTR(export); - return ERR_PTR(-EALREADY); - } - } + rc = lustre_hash_additem_unique(obd->obd_uuid_hash_body, cluuid, + &export->exp_uuid_hash); + if (rc != 0) { + CWARN("%s: denying duplicate export for %s\n", + obd->obd_name, cluuid->uuid); + spin_unlock(&obd->obd_dev_lock); + class_handle_unhash(&export->exp_handle); + OBD_FREE_PTR(export); + return ERR_PTR(-EALREADY); + } } + LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */ class_incref(obd); list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports); @@ -742,6 +747,11 @@ void class_unlink_export(struct obd_export *exp) class_handle_unhash(&exp->exp_handle); spin_lock(&exp->exp_obd->obd_dev_lock); + /* delete an uuid-export hashitem from hashtables */ + if (!hlist_unhashed(&exp->exp_uuid_hash)) { + lustre_hash_delitem(exp->exp_obd->obd_uuid_hash_body, + &exp->exp_client_uuid, &exp->exp_uuid_hash); + } list_del_init(&exp->exp_obd_chain); list_del_init(&exp->exp_obd_chain_timed); exp->exp_obd->obd_num_exports--; @@ -945,6 +955,11 @@ int class_disconnect(struct obd_export *export) spin_lock(&export->exp_lock); already_disconnected = export->exp_disconnected; export->exp_disconnected = 1; + + if (!hlist_unhashed(&export->exp_nid_hash)) { + lustre_hash_delitem(export->exp_obd->obd_nid_hash_body, + &export->exp_connection->c_peer.nid, &export->exp_nid_hash); + } spin_unlock(&export->exp_lock); /* class_cleanup(), abort_recovery(), and class_fail_export() @@ -1004,12 +1019,8 @@ static void class_disconnect_export_list(struct list_head *list, int flags) rc = obd_disconnect(fake_exp); class_export_put(exp); - if (rc) { - CDEBUG(D_HA, "disconnecting export %p failed: %d\n", - exp, rc); - } else { - CDEBUG(D_HA, "export %p disconnected\n", exp); - } + CDEBUG(D_HA, "disconnecting export %s (%p): rc %d\n", + exp->exp_client_uuid.uuid, exp, rc); } EXIT; } @@ -1266,39 +1277,32 @@ char *obd_export_nid2str(struct obd_export *exp) } EXPORT_SYMBOL(obd_export_nid2str); -#define EVICT_BATCH 32 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid) { - struct obd_export *doomed_exp[EVICT_BATCH] = { NULL }; - struct list_head *p; - int exports_evicted = 0, num_to_evict = 0, i; + struct obd_export *doomed_exp = NULL; + int exports_evicted = 0; -search_again: - spin_lock(&obd->obd_dev_lock); - list_for_each(p, &obd->obd_exports) { - doomed_exp[num_to_evict] = list_entry(p, struct obd_export, - exp_obd_chain); - if (strcmp(obd_export_nid2str(doomed_exp[num_to_evict]), - nid) == 0) { - class_export_get(doomed_exp[num_to_evict]); - if (++num_to_evict == EVICT_BATCH) - break; - } - } - spin_unlock(&obd->obd_dev_lock); + lnet_nid_t nid_key = libcfs_str2nid(nid); + + do { + doomed_exp = lustre_hash_get_object_by_key(obd->obd_nid_hash_body, + &nid_key); + if (doomed_exp == NULL) + break; - for (i = 0; i < num_to_evict; i++) { + LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key, + "nid %s found, wanted nid %s, requested nid %s\n", + obd_export_nid2str(doomed_exp), + libcfs_nid2str(nid_key), nid); + LASSERTF(doomed_exp != obd->obd_self_export, + "self-export is hashed by NID?\n"); exports_evicted++; CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n", - obd->obd_name, nid, doomed_exp[i]->exp_client_uuid.uuid, + obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid, exports_evicted); - class_fail_export(doomed_exp[i]); - class_export_put(doomed_exp[i]); - } - if (num_to_evict == EVICT_BATCH) { - num_to_evict = 0; - goto search_again; - } + class_fail_export(doomed_exp); + class_export_put(doomed_exp); + } while (1); if (!exports_evicted) CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n", @@ -1310,27 +1314,17 @@ EXPORT_SYMBOL(obd_export_evict_by_nid); int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid) { struct obd_export *doomed_exp = NULL; - struct list_head *p; struct obd_uuid doomed; int exports_evicted = 0; obd_str2uuid(&doomed, uuid); - if(obd_uuid_equals(&doomed, &obd->obd_uuid)) { + if (obd_uuid_equals(&doomed, &obd->obd_uuid)) { CERROR("%s: can't evict myself\n", obd->obd_name); return exports_evicted; } - spin_lock(&obd->obd_dev_lock); - list_for_each(p, &obd->obd_exports) { - doomed_exp = list_entry(p, struct obd_export, exp_obd_chain); - - if (obd_uuid_equals(&doomed, &doomed_exp->exp_client_uuid)) { - class_export_get(doomed_exp); - break; - } - doomed_exp = NULL; - } - spin_unlock(&obd->obd_dev_lock); + doomed_exp = lustre_hash_get_object_by_key(obd->obd_uuid_hash_body, + &doomed); if (doomed_exp == NULL) { CERROR("%s: can't disconnect %s: no exports found\n", diff --git a/lustre/obdclass/lustre_handles.c b/lustre/obdclass/lustre_handles.c index dfdfec4..96fc122 100644 --- a/lustre/obdclass/lustre_handles.c +++ b/lustre/obdclass/lustre_handles.c @@ -52,7 +52,16 @@ static struct handle_bucket { static atomic_t handle_count = ATOMIC_INIT(0); +#ifdef __arch_um__ +/* For unknown reason, UML uses kmalloc rather than vmalloc to allocate + * memory(OBD_VMALLOC). Therefore, we have to redefine the + * HANDLE_HASH_SIZE to make the hash heads don't exceed 128K. + */ +#define HANDLE_HASH_SIZE 4096 +#else #define HANDLE_HASH_SIZE (1 << 14) +#endif /* ifdef __arch_um__ */ + #define HANDLE_HASH_MASK (HANDLE_HASH_SIZE - 1) /* diff --git a/lustre/obdclass/obd_config.c b/lustre/obdclass/obd_config.c index 0ae76a3..0b5ec84 100644 --- a/lustre/obdclass/obd_config.c +++ b/lustre/obdclass/obd_config.c @@ -38,6 +38,10 @@ #include #include #include +#include + +extern struct lustre_hash_operations uuid_hash_operations; +extern struct lustre_hash_operations nid_hash_operations; /*********** string parsing utils *********/ @@ -253,7 +257,21 @@ int class_setup(struct obd_device *obd, struct lustre_cfg *lcfg) /* just leave this on forever. I can't use obd_set_up here because other fns check that status, and we're not actually set up yet. */ obd->obd_starting = 1; + + /* create an uuid-export hash body */ + err = lustre_hash_init(&obd->obd_uuid_hash_body, "UUID_HASH", + 128, &uuid_hash_operations); + if (err) { + spin_unlock(&obd->obd_dev_lock); + GOTO(err_hash, err); + } + + /* create a nid-export hash body */ + err = lustre_hash_init(&obd->obd_nid_hash_body, "NID_HASH", + 128, &nid_hash_operations); spin_unlock(&obd->obd_dev_lock); + if (err) + GOTO(err_hash, err); exp = class_new_export(obd, &obd->obd_uuid); if (IS_ERR(exp)) @@ -279,10 +297,13 @@ int class_setup(struct obd_device *obd, struct lustre_cfg *lcfg) RETURN(0); err_exp: - CERROR("setup %s failed (%d)\n", obd->obd_name, err); class_unlink_export(obd->obd_self_export); obd->obd_self_export = NULL; +err_hash: + lustre_hash_exit(&obd->obd_uuid_hash_body); + lustre_hash_exit(&obd->obd_nid_hash_body); obd->obd_starting = 0; + CERROR("setup %s failed (%d)\n", obd->obd_name, err); RETURN(err); } @@ -411,6 +432,12 @@ int class_cleanup(struct obd_device *obd, struct lustre_cfg *lcfg) } LASSERT(obd->obd_self_export); + /* destroy an uuid-export hash body */ + lustre_hash_exit(&obd->obd_uuid_hash_body); + + /* destroy a nid-export hash body */ + lustre_hash_exit(&obd->obd_nid_hash_body); + /* Precleanup stage 1, we must make sure all exports (other than the self-export) get destroyed. */ err = obd_precleanup(obd, OBD_CLEANUP_EXPORTS); diff --git a/lustre/ptlrpc/connection.c b/lustre/ptlrpc/connection.c index 1d2e228..b2e0d85 100644 --- a/lustre/ptlrpc/connection.c +++ b/lustre/ptlrpc/connection.c @@ -33,10 +33,15 @@ #endif #include "ptlrpc_internal.h" +#include static spinlock_t conn_lock; static struct list_head conn_list; static struct list_head conn_unused_list; +static struct lustre_class_hash_body *conn_hash_body; +static struct lustre_class_hash_body *conn_unused_hash_body; + +extern struct lustre_hash_operations conn_hash_operations; void ptlrpc_dump_connections(void) { @@ -58,26 +63,14 @@ struct ptlrpc_connection* ptlrpc_lookup_conn_locked (lnet_process_id_t peer) { struct ptlrpc_connection *c; - struct list_head *tmp; - - list_for_each(tmp, &conn_list) { - c = list_entry(tmp, struct ptlrpc_connection, c_link); - - if (peer.nid == c->c_peer.nid && - peer.pid == c->c_peer.pid) - return ptlrpc_connection_addref(c); - } - list_for_each(tmp, &conn_unused_list) { - c = list_entry(tmp, struct ptlrpc_connection, c_link); + c = lustre_hash_get_object_by_key(conn_hash_body, &peer); + if (c != NULL) + return c; - if (peer.nid == c->c_peer.nid && - peer.pid == c->c_peer.pid) { - list_del(&c->c_link); - list_add(&c->c_link, &conn_list); - return ptlrpc_connection_addref(c); - } - } + c = lustre_hash_get_object_by_key(conn_unused_hash_body, &peer); + if (c != NULL) + return c; return NULL; } @@ -88,6 +81,7 @@ struct ptlrpc_connection *ptlrpc_get_connection(lnet_process_id_t peer, { struct ptlrpc_connection *c; struct ptlrpc_connection *c2; + int rc = 0; ENTRY; CDEBUG(D_INFO, "self %s peer %s\n", @@ -115,15 +109,26 @@ struct ptlrpc_connection *ptlrpc_get_connection(lnet_process_id_t peer, spin_lock(&conn_lock); c2 = ptlrpc_lookup_conn_locked(peer); - if (c2 == NULL) + if (c2 == NULL) { list_add(&c->c_link, &conn_list); - + rc = lustre_hash_additem_unique(conn_hash_body, &peer, + &c->c_hash); + if (rc != 0) { + list_del(&c->c_link); + CERROR("Cannot add connection to conn_hash_body\n"); + goto out_conn; + } + } + +out_conn: + spin_unlock(&conn_lock); - if (c2 == NULL) + if (c2 == NULL && rc == 0) RETURN (c); - - OBD_FREE(c, sizeof(*c)); + + if (c != NULL) + OBD_FREE(c, sizeof(*c)); RETURN (c2); } @@ -141,16 +146,35 @@ int ptlrpc_put_connection(struct ptlrpc_connection *c) c, atomic_read(&c->c_refcount) - 1, libcfs_nid2str(c->c_peer.nid)); - if (atomic_dec_and_test(&c->c_refcount)) { + spin_lock(&conn_lock); + LASSERT(!hlist_unhashed(&c->c_hash)); + spin_unlock(&conn_lock); + + if (atomic_dec_return(&c->c_refcount) == 1) { + spin_lock(&conn_lock); + + lustre_hash_delitem(conn_hash_body, &c->c_peer, &c->c_hash); list_del(&c->c_link); + list_add(&c->c_link, &conn_unused_list); + rc = lustre_hash_additem_unique(conn_unused_hash_body, &c->c_peer, + &c->c_hash); + if (rc != 0) { + spin_unlock(&conn_lock); + CERROR("Cannot hash connection to conn_hash_body\n"); + GOTO(ret, rc); + } + spin_unlock(&conn_lock); rc = 1; - } + + } + if (atomic_read(&c->c_refcount) < 0) CERROR("connection %p refcount %d!\n", c, atomic_read(&c->c_refcount)); +ret : RETURN(rc); } @@ -165,11 +189,28 @@ struct ptlrpc_connection *ptlrpc_connection_addref(struct ptlrpc_connection *c) RETURN(c); } -void ptlrpc_init_connection(void) +int ptlrpc_init_connection(void) { + int rc = 0; CFS_INIT_LIST_HEAD(&conn_list); + rc = lustre_hash_init(&conn_hash_body, "CONN_HASH", + 128, &conn_hash_operations); + if (rc) + GOTO(ret, rc); + CFS_INIT_LIST_HEAD(&conn_unused_list); + rc = lustre_hash_init(&conn_unused_hash_body, "CONN_UNUSED_HASH", + 128, &conn_hash_operations); + if (rc) + GOTO(ret, rc); + spin_lock_init(&conn_lock); +ret: + if (rc) { + lustre_hash_exit(&conn_hash_body); + lustre_hash_exit(&conn_unused_hash_body); + } + RETURN(rc); } void ptlrpc_cleanup_connection(void) @@ -178,11 +219,15 @@ void ptlrpc_cleanup_connection(void) struct ptlrpc_connection *c; spin_lock(&conn_lock); + + lustre_hash_exit(&conn_unused_hash_body); list_for_each_safe(tmp, pos, &conn_unused_list) { c = list_entry(tmp, struct ptlrpc_connection, c_link); list_del(&c->c_link); OBD_FREE(c, sizeof(*c)); } + + lustre_hash_exit(&conn_hash_body); list_for_each_safe(tmp, pos, &conn_list) { c = list_entry(tmp, struct ptlrpc_connection, c_link); CERROR("Connection %p/%s has refcount %d (nid=%s)\n", diff --git a/lustre/ptlrpc/ptlrpc_module.c b/lustre/ptlrpc/ptlrpc_module.c index b060b1e..65d986e 100644 --- a/lustre/ptlrpc/ptlrpc_module.c +++ b/lustre/ptlrpc/ptlrpc_module.c @@ -70,22 +70,26 @@ __init int ptlrpc_init(void) cleanup_phase = 2; ptlrpc_init_connection(); - rc = llog_init_commit_master(); if (rc) GOTO(cleanup, rc); cleanup_phase = 3; + rc = llog_init_commit_master(); + if (rc) + GOTO(cleanup, rc); + cleanup_phase = 4; + ptlrpc_put_connection_superhack = ptlrpc_put_connection; rc = ptlrpc_start_pinger(); if (rc) GOTO(cleanup, rc); - cleanup_phase = 4; + cleanup_phase = 5; rc = ldlm_init(); if (rc) GOTO(cleanup, rc); - cleanup_phase = 5; + cleanup_phase = 6; rc = sptlrpc_init(); if (rc) @@ -95,12 +99,13 @@ __init int ptlrpc_init(void) cleanup: switch(cleanup_phase) { - case 5: + case 6: ldlm_exit(); - case 4: + case 5: ptlrpc_stop_pinger(); - case 3: + case 4: llog_cleanup_commit_master(1); + case 3: ptlrpc_cleanup_connection(); case 2: ptlrpc_exit_portals(); -- 1.8.3.1