Description: use do_facet on sanity.sh for test handling recoverables errors
Details : use do_facet instead of direct use sysctl for set fail_loc on OST
+Severity : normal
+Bugzilla : 11013
+Description: hash tables for lists of nids, connections and uuids
+Details : Hash tables noticeably help when a lot of clients connect to a
+ server, to faster identify duplicate connections or reconnects,
+ also to faster find export to evict in manual eviction case.
+
+
--------------------------------------------------------------------------------
2007-05-03 Cluster File Systems, Inc. <info@clusterfs.com>
EXTRA_DIST = ioctl.h liblustre.h lprocfs_status.h lustre_cfg.h \
lustre_commit_confd.h lustre_debug.h lustre_disk.h \
lustre_dlm.h lustre_export.h lustre_fsfilt.h lustre_ha.h \
- lustre_handles.h lustre_import.h lustre_lib.h \
+ lustre_handles.h lustre_import.h lustre_lib.h class_hash.h \
lustre_lite.h lustre_log.h lustre_mds.h lustre_net.h \
lustre_param.h lustre_quota.h lustre_ucache.h lvfs.h \
obd_cache.h obd_class.h obd_echo.h obd.h obd_lov.h \
--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ */
+
+#ifndef __CLASS_HASH_H
+#define __CLASS_HASH_H
+
+#include <lustre_lib.h>
+
+/* define the hash bucket*/
+struct lustre_hash_bucket {
+ struct hlist_head lhb_head;
+ spinlock_t lhb_lock;
+#ifdef LUSTRE_HASH_DEBUG
+ /* the number of hash item per bucket,
+ * it will help us to analyse the hash distribute
+ */
+ int lhb_item_count;
+#endif
+};
+
+struct lustre_hash_operations;
+
+struct lustre_class_hash_body {
+ char hashname[128];
+ spinlock_t lchb_lock; /* body lock */
+ struct lustre_hash_bucket *lchb_hash_tables;
+ __u32 lchb_hash_max_size; /* define the hash tables size */
+ /* define the hash operations */
+ struct lustre_hash_operations *lchb_hash_operations;
+};
+
+/* hash operations method define */
+struct lustre_hash_operations {
+ __u32 (*lustre_hashfn) (struct lustre_class_hash_body *hash_body,
+ void *key);
+ int (*lustre_hash_key_compare) (void *key,
+ struct hlist_node *compared_hnode);
+ /* add refcount */
+ void* (*lustre_hash_object_refcount_get) (struct hlist_node *hash_item);
+ /* dec refcount */
+ void (*lustre_hash_object_refcount_put) (struct hlist_node *hash_item);
+};
+
+static inline struct hlist_node *
+lustre_hash_getitem_in_bucket_nolock(struct lustre_class_hash_body *hash_body,
+ int hashent, void *key)
+{
+ struct lustre_hash_bucket *bucket;
+ struct hlist_node *hash_item_node;
+ struct lustre_hash_operations *hop = hash_body->lchb_hash_operations;
+ int find = 0;
+ ENTRY;
+
+ bucket = &hash_body->lchb_hash_tables[hashent];
+ hlist_for_each(hash_item_node, &(bucket->lhb_head)) {
+ find = hop->lustre_hash_key_compare(key, hash_item_node);
+ if (find == 1)
+ break;
+ }
+ RETURN(find == 1 ? hash_item_node : NULL);
+}
+
+static inline int
+lustre_hash_delitem_nolock(struct lustre_class_hash_body *hash_body,
+ int hashent, struct hlist_node * hash_item)
+{
+ struct lustre_hash_operations *hop = hash_body->lchb_hash_operations;
+
+ hlist_del_init(hash_item);
+
+ hop->lustre_hash_object_refcount_put(hash_item);
+
+#ifdef LUSTRE_HASH_DEBUG
+ hash_body->lchb_hash_tables[hashent].lhb_item_count--;
+ CDEBUG(D_INFO, "hashname[%s] bucket[%d] has [%d] hashitem\n",
+ hash_body->hashname, hashent,
+ hash_body->lchb_hash_tables[hashent].lhb_item_count);
+#endif
+
+ RETURN(0);
+}
+
+int lustre_hash_init(struct lustre_class_hash_body **hash_body,
+ char *hashname, __u32 hashsize,
+ struct lustre_hash_operations *hash_operations);
+void lustre_hash_exit(struct lustre_class_hash_body **hash_body);
+int lustre_hash_additem_unique(struct lustre_class_hash_body *hash_body,
+ void *key, struct hlist_node *actual_hnode);
+int lustre_hash_additem(struct lustre_class_hash_body *hash_body, void *key,
+ struct hlist_node *actual_hnode);
+int lustre_hash_delitem_by_key(struct lustre_class_hash_body *hash_body,
+ void *key);
+int lustre_hash_delitem(struct lustre_class_hash_body *hash_body, void *key,
+ struct hlist_node *hash_item);
+void * lustre_hash_get_object_by_key(struct lustre_class_hash_body *hash_body,
+ void *key);
+
+/* ( uuid <-> export ) hash operations define */
+__u32 uuid_hashfn(struct lustre_class_hash_body *hash_body, void * key);
+int uuid_hash_key_compare(void *key, struct hlist_node * compared_hnode);
+void * uuid_export_refcount_get(struct hlist_node * actual_hnode);
+void uuid_export_refcount_put(struct hlist_node * actual_hnode);
+
+/* ( nid <-> export ) hash operations define */
+__u32 nid_hashfn(struct lustre_class_hash_body *hash_body, void * key);
+int nid_hash_key_compare(void *key, struct hlist_node * compared_hnode);
+void * nid_export_refcount_get(struct hlist_node * actual_hnode);
+void nid_export_refcount_put(struct hlist_node * actual_hnode);
+
+/* ( net_peer <-> connection ) hash operations define */
+__u32 conn_hashfn(struct lustre_class_hash_body *hash_body, void * key);
+int conn_hash_key_compare(void *key, struct hlist_node * compared_hnode);
+void * conn_refcount_get(struct hlist_node * actual_hnode);
+void conn_refcount_put(struct hlist_node * actual_hnode);
+
+#endif /* __CLASS_HASH_H */
#define atomic_dec(a) do { (a)->counter--; } while (0)
#define atomic_add(b,a) do {(a)->counter += b;} while (0)
#define atomic_sub(b,a) do {(a)->counter -= b;} while (0)
+#define atomic_sub_return(n,a) ((a)->counter -= n)
+#define atomic_dec_return(a) atomic_sub_return(1,a)
#ifndef likely
#define likely(exp) (exp)
atomic_t exp_rpc_count;
struct obd_uuid exp_client_uuid;
struct list_head exp_obd_chain;
+ struct hlist_node exp_uuid_hash; /* uuid-export hash*/
+ struct hlist_node exp_nid_hash; /* nid-export hash */
/* exp_obd_chain_timed fo ping evictor, protected by obd_dev_lock */
struct list_head exp_obd_chain_timed;
struct obd_device *exp_obd;
struct ptlrpc_connection {
struct list_head c_link;
+ struct hlist_node c_hash;
lnet_nid_t c_self;
lnet_process_id_t c_peer;
struct obd_uuid c_remote_uuid;
lnet_nid_t self, struct obd_uuid *uuid);
int ptlrpc_put_connection(struct ptlrpc_connection *c);
struct ptlrpc_connection *ptlrpc_connection_addref(struct ptlrpc_connection *);
-void ptlrpc_init_connection(void);
+int ptlrpc_init_connection(void);
void ptlrpc_cleanup_connection(void);
extern lnet_pid_t ptl_get_pid(void);
#include <lustre/lustre_idl.h>
#include <lustre_export.h>
#include <lustre_quota.h>
+#include <class_hash.h>
#define MAX_OBD_DEVICES 8192
obd_fail:1, /* cleanup with failover */
obd_async_recov:1, /* allow asyncronous orphan cleanup */
obd_no_conn:1; /* deny new connections */
+ /* uuid-export hash body */
+ struct lustre_class_hash_body *obd_uuid_hash_body;
+ /* nid-export hash body */
+ struct lustre_class_hash_body *obd_nid_hash_body;
atomic_t obd_refcount;
cfs_waitq_t obd_refcount_waitq;
struct list_head obd_exports;
--- /dev/null
+diff -urp linux-2.6.5-7.283.orig/include/asm-i386/atomic.h linux-2.6.5-7.283/include/asm-i386/atomic.h
+--- linux-2.6.5-7.283.orig/include/asm-i386/atomic.h 2004-04-04 06:36:52.000000000 +0300
++++ linux-2.6.5-7.283/include/asm-i386/atomic.h 2007-05-21 09:40:48.000000000 +0300
+@@ -176,6 +176,47 @@ static __inline__ int atomic_add_negativ
+ return c;
+ }
+
++/**
++ * atomic_add_return - add and return
++ * @v: pointer of type atomic_t
++ * @i: integer value to add
++ *
++ * Atomically adds @i to @v and returns @i + @v
++ */
++static __inline__ int atomic_add_return(int i, atomic_t *v)
++{
++ int __i;
++#ifdef CONFIG_M386
++ unsigned long flags;
++ if(unlikely(boot_cpu_data.x86==3))
++ goto no_xadd;
++#endif
++ /* Modern 486+ processor */
++ __i = i;
++ __asm__ __volatile__(
++ LOCK_PREFIX "xaddl %0, %1"
++ :"+r" (i), "+m" (v->counter)
++ : : "memory");
++ return i + __i;
++
++#ifdef CONFIG_M386
++no_xadd: /* Legacy 386 processor */
++ local_irq_save(flags);
++ __i = atomic_read(v);
++ atomic_set(v, i + __i);
++ local_irq_restore(flags);
++ return i + __i;
++#endif
++}
++
++static __inline__ int atomic_sub_return(int i, atomic_t *v)
++{
++ return atomic_add_return(-i,v);
++}
++
++#define atomic_inc_return(v) (atomic_add_return(1,v))
++#define atomic_dec_return(v) (atomic_sub_return(1,v))
++
+ /* These are x86-specific, used by some header files */
+ #define atomic_clear_mask(mask, addr) \
+ __asm__ __volatile__(LOCK "andl %0,%1" \
+diff -urp linux-2.6.5-7.283.orig/include/asm-x86_64/atomic.h linux-2.6.5-7.283/include/asm-x86_64/atomic.h
+--- linux-2.6.5-7.283.orig/include/asm-x86_64/atomic.h 2004-04-04 06:38:20.000000000 +0300
++++ linux-2.6.5-7.283/include/asm-x86_64/atomic.h 2007-05-21 09:47:04.000000000 +0300
+@@ -178,6 +178,31 @@ static __inline__ int atomic_add_negativ
+ return c;
+ }
+
++/**
++ * atomic_add_return - add and return
++ * @i: integer value to add
++ * @v: pointer of type atomic_t
++ *
++ * Atomically adds @i to @v and returns @i + @v
++ */
++static __inline__ int atomic_add_return(int i, atomic_t *v)
++{
++ int __i = i;
++ __asm__ __volatile__(
++ LOCK_PREFIX "xaddl %0, %1"
++ :"+r" (i), "+m" (v->counter)
++ : : "memory");
++ return i + __i;
++}
++
++static __inline__ int atomic_sub_return(int i, atomic_t *v)
++{
++ return atomic_add_return(-i,v);
++}
++
++#define atomic_inc_return(v) (atomic_add_return(1,v))
++#define atomic_dec_return(v) (atomic_sub_return(1,v))
++
+ /* These are x86-specific, used by some header files */
+ #define atomic_clear_mask(mask, addr) \
+ __asm__ __volatile__(LOCK "andl %0,%1" \
bitops_ext2_find_next_le_bit-2.6.patch
2.6.5-quotafix.patch
vfs_intent-reduce-stack-usage-2.6-suse-newer.patch
+atomic_add_return-sles9.patch
struct obd_uuid tgtuuid;
struct obd_uuid cluuid;
struct obd_uuid remote_uuid;
- struct list_head *p;
char *str, *tmp;
int rc = 0, abort_recovery;
struct obd_connect_data *data;
goto dont_check_exports;
spin_lock(&target->obd_dev_lock);
- list_for_each(p, &target->obd_exports) {
- export = list_entry(p, struct obd_export, exp_obd_chain);
- if (obd_uuid_equals(&cluuid, &export->exp_client_uuid)) {
- if (export->exp_connecting) { /* bug 9635, et. al. */
- CWARN("%s: exp %p already connecting\n",
- export->exp_obd->obd_name, export);
- export = NULL;
- rc = -EALREADY;
- break;
- }
-
- /* make darn sure this is coming from the same peer
- * if the UUIDs matched */
- if ((export->exp_connection != NULL) &&
- (strcmp(libcfs_nid2str(req->rq_peer.nid),
- libcfs_nid2str(export->exp_connection->c_peer.nid)))) {
- CWARN("%s: cookie %s seen on new NID %s when "
- "existing NID %s is already connected\n",
- target->obd_name, cluuid.uuid,
- libcfs_nid2str(req->rq_peer.nid),
- libcfs_nid2str(export->exp_connection->c_peer.nid));
- export = NULL;
- rc = -EALREADY;
- break;
- }
-
- spin_lock(&export->exp_lock);
- export->exp_connecting = 1;
- spin_unlock(&export->exp_lock);
- spin_unlock(&target->obd_dev_lock);
- LASSERT(export->exp_obd == target);
+ export = lustre_hash_get_object_by_key(target->obd_uuid_hash_body, &cluuid);
- rc = target_handle_reconnect(&conn, export, &cluuid);
- break;
- }
+ if (export != NULL && export->exp_connecting) { /* bug 9635, et. al. */
+ CWARN("%s: exp %p already connecting\n",
+ export->exp_obd->obd_name, export);
+ class_export_put(export);
export = NULL;
+ rc = -EALREADY;
+ } else if (export != NULL && export->exp_connection != NULL &&
+ req->rq_peer.nid != export->exp_connection->c_peer.nid) {
+ /* make darn sure this is coming from the same peer
+ * if the UUIDs matched */
+ CWARN("%s: cookie %s seen on new NID %s when "
+ "existing NID %s is already connected\n",
+ target->obd_name, cluuid.uuid,
+ libcfs_nid2str(req->rq_peer.nid),
+ libcfs_nid2str(export->exp_connection->c_peer.nid));
+ class_export_put(export);
+ export = NULL;
+ rc = -EALREADY;
+ } else if (export != NULL) {
+ spin_lock(&export->exp_lock);
+ export->exp_connecting = 1;
+ spin_unlock(&export->exp_lock);
+ class_export_put(export);
+ spin_unlock(&target->obd_dev_lock);
+ LASSERT(export->exp_obd == target);
+
+ rc = target_handle_reconnect(&conn, export, &cluuid);
}
+
/* If we found an export, we already unlocked. */
if (!export) {
spin_unlock(&target->obd_dev_lock);
req->rq_self,
&remote_uuid);
+ spin_lock(&target->obd_dev_lock);
+ /* Export might be hashed already, e.g. if this is reconnect */
+ if (hlist_unhashed(&export->exp_nid_hash))
+ lustre_hash_additem(export->exp_obd->obd_nid_hash_body,
+ &export->exp_connection->c_peer.nid,
+ &export->exp_nid_hash);
+ spin_unlock(&target->obd_dev_lock);
+
if (lustre_msg_get_op_flags(req->rq_repmsg) & MSG_CONNECT_RECONNECT)
GOTO(out, rc = 0);
endif
obdclass-all-objs := llog.o llog_cat.o llog_lvfs.o llog_obd.o llog_swab.o
-obdclass-all-objs += class_obd.o
+obdclass-all-objs += class_obd.o class_hash.o
obdclass-all-objs += debug.o genops.o uuid.o llog_ioctl.o
obdclass-all-objs += lprocfs_status.o lustre_handles.o lustre_peer.o
obdclass-all-objs += statfs_pack.o obdo.o obd_config.o obd_mount.o prng.o
noinst_LIBRARIES = liblustreclass.a
liblustreclass_a_SOURCES = class_obd.c debug.c genops.c statfs_pack.c uuid.c
-liblustreclass_a_SOURCES += lustre_handles.c lustre_peer.c lprocfs_status.c
+liblustreclass_a_SOURCES += lustre_handles.c lustre_peer.c lprocfs_status.c class_hash.c
liblustreclass_a_SOURCES += obdo.c obd_config.c llog.c llog_obd.c llog_cat.c
liblustreclass_a_SOURCES += llog_lvfs.c llog_swab.c
liblustreclass_a_SOURCES += prng.c #llog_ioctl.c rbtree.c
--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright (C) 2005 Cluster File Systems, Inc.
+ * Author: YuZhangyong <yzy@clusterfs.com>
+ *
+ * This file is part of Lustre, http://www.lustre.org/
+ *
+ * No redistribution or use is permitted outside of Cluster File Systems, Inc.
+ *
+ * Implement a hash class for hash process in lustre system.
+ */
+
+#ifndef __KERNEL__
+#include <liblustre.h>
+#include <obd.h>
+#endif
+
+#include <obd_class.h>
+#include <class_hash.h>
+#include <lustre_export.h>
+#include <obd_support.h>
+#include <lustre_net.h>
+
+int lustre_hash_init(struct lustre_class_hash_body **hash_body_new,
+ char *hashname, __u32 hashsize,
+ struct lustre_hash_operations *hash_operations)
+{
+ int i, n = 0;
+ struct lustre_class_hash_body *hash_body = NULL;
+
+ LASSERT(hashsize > 0);
+ LASSERT(hash_operations != NULL);
+ ENTRY;
+
+ i = hashsize;
+ while (i != 0) {
+ if (i & 0x1)
+ n++;
+ i >>= 1;
+ }
+
+ LASSERTF(n == 1, "hashsize %u isn't 2^n\n", hashsize);
+
+ /* alloc space for hash_body */
+ OBD_ALLOC(hash_body, sizeof(*hash_body));
+
+ if (hash_body == NULL) {
+ CERROR("Cannot alloc space for hash body, hashname = %s \n",
+ hashname);
+ RETURN(-ENOMEM);
+ }
+
+ LASSERT(hashname != NULL &&
+ strlen(hashname) <= sizeof(hash_body->hashname));
+ strcpy(hash_body->hashname, hashname);
+ hash_body->lchb_hash_max_size = hashsize;
+ hash_body->lchb_hash_operations = hash_operations;
+
+ /* alloc space for the hash tables */
+ OBD_ALLOC(hash_body->lchb_hash_tables,
+ sizeof(*hash_body->lchb_hash_tables) * hash_body->lchb_hash_max_size);
+
+ if (hash_body->lchb_hash_tables == NULL) {
+ OBD_FREE(hash_body, sizeof(*hash_body));
+ CERROR("Cannot alloc space for hashtables, hashname = %s \n",
+ hash_body->hashname);
+ RETURN(-ENOMEM);
+ }
+
+ spin_lock_init(&hash_body->lchb_lock); /* initialize the body lock */
+
+ for(i = 0 ; i < hash_body->lchb_hash_max_size; i++) {
+ /* initial the bucket lock and list_head */
+ INIT_HLIST_HEAD(&hash_body->lchb_hash_tables[i].lhb_head);
+ spin_lock_init(&hash_body->lchb_hash_tables[i].lhb_lock);
+ }
+ *hash_body_new = hash_body;
+
+ RETURN(0);
+}
+EXPORT_SYMBOL(lustre_hash_init);
+
+void lustre_hash_exit(struct lustre_class_hash_body **new_hash_body)
+{
+ int i;
+ struct lustre_class_hash_body *hash_body = NULL;
+ ENTRY;
+
+ hash_body = *new_hash_body;
+
+ if (hash_body == NULL) {
+ CWARN("hash body has been deleted\n");
+ goto out_hash;
+ }
+
+ spin_lock(&hash_body->lchb_lock); /* lock the hash tables */
+
+ if (hash_body->lchb_hash_tables == NULL ) {
+ spin_unlock(&hash_body->lchb_lock);
+ CWARN("hash tables has been deleted\n");
+ goto out_hash;
+ }
+
+ for( i = 0; i < hash_body->lchb_hash_max_size; i++ ) {
+ struct lustre_hash_bucket * bucket;
+ struct hlist_node * actual_hnode, *pos;
+
+ bucket = &hash_body->lchb_hash_tables[i];
+ spin_lock(&bucket->lhb_lock); /* lock the bucket */
+ hlist_for_each_safe(actual_hnode, pos, &(bucket->lhb_head)) {
+ lustre_hash_delitem_nolock(hash_body, i, actual_hnode);
+ }
+ spin_unlock(&bucket->lhb_lock);
+ }
+
+ /* free the hash_tables's memory space */
+ OBD_FREE(hash_body->lchb_hash_tables,
+ sizeof(*hash_body->lchb_hash_tables) * hash_body->lchb_hash_max_size);
+
+ hash_body->lchb_hash_tables = NULL;
+
+ spin_unlock(&hash_body->lchb_lock);
+
+out_hash :
+ /* free the hash_body's memory space */
+ if (hash_body != NULL) {
+ OBD_FREE(hash_body, sizeof(*hash_body));
+ *new_hash_body = NULL;
+ }
+ EXIT;
+}
+EXPORT_SYMBOL(lustre_hash_exit);
+
+/*
+ * only allow unique @key in hashtables, if the same @key has existed
+ * in hashtables, it will return with fails.
+ */
+int lustre_hash_additem_unique(struct lustre_class_hash_body *hash_body,
+ void *key, struct hlist_node *actual_hnode)
+{
+ int hashent;
+ struct lustre_hash_bucket *bucket = NULL;
+ struct lustre_hash_operations *hop = hash_body->lchb_hash_operations;
+ ENTRY;
+
+ LASSERT(hlist_unhashed(actual_hnode));
+ hashent = hop->lustre_hashfn(hash_body, key);
+
+ /* get the hash-bucket and lock it */
+ bucket = &hash_body->lchb_hash_tables[hashent];
+ spin_lock(&bucket->lhb_lock);
+
+ if ( (lustre_hash_getitem_in_bucket_nolock(hash_body, hashent, key)) != NULL) {
+ /* the added-item exist in hashtables, so cannot add it again */
+ spin_unlock(&bucket->lhb_lock);
+
+ CWARN("Already found the key in hash [%s]\n",
+ hash_body->hashname);
+ RETURN(-EALREADY);
+ }
+
+ hlist_add_head(actual_hnode, &(bucket->lhb_head));
+
+#ifdef LUSTRE_HASH_DEBUG
+ /* hash distribute debug */
+ hash_body->lchb_hash_tables[hashent].lhb_item_count++;
+ CDEBUG(D_INFO, "hashname[%s] bucket[%d] has [%d] hashitem\n",
+ hash_body->hashname, hashent,
+ hash_body->lchb_hash_tables[hashent].lhb_item_count);
+#endif
+ hop->lustre_hash_object_refcount_get(actual_hnode);
+
+ spin_unlock(&bucket->lhb_lock);
+
+ RETURN(0);
+}
+EXPORT_SYMBOL(lustre_hash_additem_unique);
+
+/*
+ * this version of additem, it allow multi same @key <key, value> in hashtables.
+ * in this additem version, we don't need to check if exist same @key in hash
+ * tables, we only add it to related hashbucket.
+ * example: maybe same nid will be related to multi difference export
+ */
+int lustre_hash_additem(struct lustre_class_hash_body *hash_body, void *key,
+ struct hlist_node *actual_hnode)
+{
+ int hashent;
+ struct lustre_hash_bucket *bucket = NULL;
+ struct lustre_hash_operations *hop = hash_body->lchb_hash_operations;
+ ENTRY;
+
+ LASSERT(hlist_unhashed(actual_hnode));
+
+ hashent = hop->lustre_hashfn(hash_body, key);
+
+ /* get the hashbucket and lock it */
+ bucket = &hash_body->lchb_hash_tables[hashent];
+ spin_lock(&bucket->lhb_lock);
+
+ hlist_add_head(actual_hnode, &(bucket->lhb_head));
+
+#ifdef LUSTRE_HASH_DEBUG
+ /* hash distribute debug */
+ hash_body->lchb_hash_tables[hashent].lhb_item_count++;
+ CDEBUG(D_INFO, "hashname[%s] bucket[%d] has [%d] hashitem\n",
+ hash_body->hashname, hashent,
+ hash_body->lchb_hash_tables[hashent].lhb_item_count);
+#endif
+ hop->lustre_hash_object_refcount_get(actual_hnode);
+
+ spin_unlock(&bucket->lhb_lock);
+
+ RETURN(0);
+}
+EXPORT_SYMBOL(lustre_hash_additem);
+
+
+/*
+ * this version of delitem will delete a hashitem with given @key,
+ * we need to search the <@key, @value> in hashbucket with @key,
+ * if match, the hashitem will be delete.
+ * we have a no-search version of delitem, it will directly delete a hashitem,
+ * doesn't need to search it in hashtables, so it is a O(1) delete.
+ */
+int lustre_hash_delitem_by_key(struct lustre_class_hash_body *hash_body,
+ void *key)
+{
+ int hashent ;
+ struct hlist_node * hash_item;
+ struct lustre_hash_bucket *bucket = NULL;
+ struct lustre_hash_operations *hop = hash_body->lchb_hash_operations;
+ int retval = 0;
+ ENTRY;
+
+ hashent = hop->lustre_hashfn(hash_body, key);
+
+ /* first, lock the hashbucket */
+ bucket = &hash_body->lchb_hash_tables[hashent];
+ spin_lock(&bucket->lhb_lock);
+
+ /* get the hash_item from hash_bucket */
+ hash_item = lustre_hash_getitem_in_bucket_nolock(hash_body, hashent,
+ key);
+
+ if (hash_item == NULL) {
+ spin_unlock(&bucket->lhb_lock);
+ RETURN(-ENOENT);
+ }
+
+ /* call delitem_nolock() to delete the hash_item */
+ retval = lustre_hash_delitem_nolock(hash_body, hashent, hash_item);
+
+ spin_unlock(&bucket->lhb_lock);
+
+ RETURN(retval);
+}
+EXPORT_SYMBOL(lustre_hash_delitem_by_key);
+
+/*
+ * the O(1) version of delete hash item,
+ * it will directly delete the hashitem with given @hash_item,
+ * the parameter @key used to get the relation hash bucket and lock it.
+ */
+int lustre_hash_delitem(struct lustre_class_hash_body *hash_body,
+ void *key, struct hlist_node * hash_item)
+{
+ int hashent = 0;
+ int retval = 0;
+ struct lustre_hash_bucket *bucket = NULL;
+ struct lustre_hash_operations *hop = hash_body->lchb_hash_operations;
+ ENTRY;
+
+ hashent = hop->lustre_hashfn(hash_body, key);
+
+ bucket = &hash_body->lchb_hash_tables[hashent];
+ spin_lock(&bucket->lhb_lock);
+
+ /* call delitem_nolock() to delete the hash_item */
+ retval = lustre_hash_delitem_nolock(hash_body, hashent, hash_item);
+
+ spin_unlock(&bucket->lhb_lock);
+
+ RETURN(retval);
+}
+EXPORT_SYMBOL(lustre_hash_delitem);
+
+void * lustre_hash_get_object_by_key(struct lustre_class_hash_body *hash_body,
+ void *key)
+{
+ int hashent ;
+ struct hlist_node * hash_item_hnode = NULL;
+ void * obj_value = NULL;
+ struct lustre_hash_bucket *bucket = NULL;
+ struct lustre_hash_operations * hop = hash_body->lchb_hash_operations;
+ ENTRY;
+
+ /* get the hash value from the given item */
+ hashent = hop->lustre_hashfn(hash_body, key);
+
+ bucket = &hash_body->lchb_hash_tables[hashent];
+ spin_lock(&bucket->lhb_lock); /* lock the bucket */
+
+ hash_item_hnode = lustre_hash_getitem_in_bucket_nolock(hash_body,
+ hashent, key);
+
+ if (hash_item_hnode == NULL) {
+ spin_unlock(&bucket->lhb_lock); /* lock the bucket */
+ RETURN(NULL);
+ }
+
+ obj_value = hop->lustre_hash_object_refcount_get(hash_item_hnode);
+ spin_unlock(&bucket->lhb_lock); /* lock the bucket */
+
+ RETURN(obj_value);
+}
+EXPORT_SYMBOL(lustre_hash_get_object_by_key);
+
+/*
+ * define (uuid <-> export) hash operations and function define
+ */
+
+/* define the uuid hash operations */
+struct lustre_hash_operations uuid_hash_operations = {
+ .lustre_hashfn = uuid_hashfn,
+ .lustre_hash_key_compare = uuid_hash_key_compare,
+ .lustre_hash_object_refcount_get = uuid_export_refcount_get,
+ .lustre_hash_object_refcount_put = uuid_export_refcount_put,
+};
+
+/* string hashing using djb2 hash algorithm */
+__u32 uuid_hashfn(struct lustre_class_hash_body *hash_body, void * key)
+{
+ __u32 hash = 5381;
+ struct obd_uuid * uuid_key = NULL;
+ int c;
+ char *ptr = NULL;
+
+ LASSERT(key != NULL);
+
+ uuid_key = (struct obd_uuid*)key;
+ ptr = uuid_key->uuid;
+
+ while ((c = *ptr++)) {
+ hash = hash * 33 + c;
+ }
+
+ hash &= (hash_body->lchb_hash_max_size - 1);
+
+ RETURN(hash);
+}
+
+/* Note, it is impossible to find an export that is in failed state with
+ * this function */
+int uuid_hash_key_compare(void *key, struct hlist_node *compared_hnode)
+{
+ struct obd_export *export = NULL;
+ struct obd_uuid *uuid_key = NULL, *compared_uuid = NULL;
+
+ LASSERT( key != NULL);
+
+ uuid_key = (struct obd_uuid*)key;
+
+ export = hlist_entry(compared_hnode, struct obd_export, exp_uuid_hash);
+
+ compared_uuid = &export->exp_client_uuid;
+
+ RETURN(obd_uuid_equals(uuid_key, compared_uuid) &&
+ !export->exp_failed);
+}
+
+void * uuid_export_refcount_get(struct hlist_node * actual_hnode)
+{
+ struct obd_export *export = NULL;
+
+ LASSERT(actual_hnode != NULL);
+
+ export = hlist_entry(actual_hnode, struct obd_export, exp_uuid_hash);
+
+ LASSERT(export != NULL);
+
+ class_export_get(export);
+
+ RETURN(export);
+}
+
+void uuid_export_refcount_put(struct hlist_node * actual_hnode)
+{
+ struct obd_export *export = NULL;
+
+ LASSERT(actual_hnode != NULL);
+
+ export = hlist_entry(actual_hnode, struct obd_export, exp_uuid_hash);
+
+ LASSERT(export != NULL);
+
+ class_export_put(export);
+}
+
+/*
+ * define (nid <-> export) hash operations and function define
+ */
+
+/* define the nid hash operations */
+struct lustre_hash_operations nid_hash_operations = {
+ .lustre_hashfn = nid_hashfn,
+ .lustre_hash_key_compare = nid_hash_key_compare,
+ .lustre_hash_object_refcount_get = nid_export_refcount_get,
+ .lustre_hash_object_refcount_put = nid_export_refcount_put,
+};
+
+/* string hashing using djb2 hash algorithm */
+__u32 nid_hashfn(struct lustre_class_hash_body *hash_body, void * key)
+{
+ __u32 hash = 5381;
+ int i;
+ char *ptr = key;
+
+ LASSERT(key != NULL);
+
+ for(i = 0 ; i < sizeof(lnet_nid_t) ; i ++)
+ hash = hash * 33 + ptr[i];
+
+ hash &= (hash_body->lchb_hash_max_size - 1);
+
+ RETURN(hash);
+}
+
+/* Note, it is impossible to find an export that is in failed state with
+ * this function */
+int nid_hash_key_compare(void *key, struct hlist_node *compared_hnode)
+{
+ struct obd_export *export = NULL;
+ lnet_nid_t *nid_key = NULL;
+
+ LASSERT( key != NULL);
+
+ nid_key = (lnet_nid_t*)key;
+
+ export = hlist_entry(compared_hnode, struct obd_export, exp_nid_hash);
+
+ return (export->exp_connection->c_peer.nid == *nid_key &&
+ !export->exp_failed);
+}
+
+void *nid_export_refcount_get(struct hlist_node *actual_hnode)
+{
+ struct obd_export *export = NULL;
+
+ LASSERT(actual_hnode != NULL);
+
+ export = hlist_entry(actual_hnode, struct obd_export, exp_nid_hash);
+
+ LASSERT(export != NULL);
+
+ class_export_get(export);
+
+ RETURN(export);
+}
+
+void nid_export_refcount_put(struct hlist_node *actual_hnode)
+{
+ struct obd_export *export = NULL;
+
+ LASSERT(actual_hnode != NULL);
+
+ export = hlist_entry(actual_hnode, struct obd_export, exp_nid_hash);
+
+ LASSERT(export != NULL);
+
+ class_export_put(export);
+}
+
+/*
+ * define (net_peer <-> connection) hash operations and function define
+ */
+
+/* define the conn hash operations */
+struct lustre_hash_operations conn_hash_operations = {
+ .lustre_hashfn = conn_hashfn,
+ .lustre_hash_key_compare = conn_hash_key_compare,
+ .lustre_hash_object_refcount_get = conn_refcount_get,
+ .lustre_hash_object_refcount_put = conn_refcount_put,
+};
+EXPORT_SYMBOL(conn_hash_operations);
+
+/* string hashing using djb2 hash algorithm */
+__u32 conn_hashfn(struct lustre_class_hash_body *hash_body, void * key)
+{
+ __u32 hash = 5381;
+ char *ptr = key;
+ int i;
+
+ LASSERT(key != NULL);
+
+ for(i = 0 ; i < sizeof(lnet_process_id_t) ; i ++)
+ hash = hash * 33 + ptr[i];
+
+ hash &= (hash_body->lchb_hash_max_size - 1);
+
+ RETURN(hash);
+}
+
+int conn_hash_key_compare(void *key, struct hlist_node *compared_hnode)
+{
+ struct ptlrpc_connection *c = NULL;
+ lnet_process_id_t *conn_key = NULL;
+
+ LASSERT( key != NULL);
+
+ conn_key = (lnet_process_id_t*)key;
+
+ c = hlist_entry(compared_hnode, struct ptlrpc_connection, c_hash);
+
+ return (conn_key->nid == c->c_peer.nid &&
+ conn_key->pid == c->c_peer.pid);
+}
+
+void *conn_refcount_get(struct hlist_node *actual_hnode)
+{
+ struct ptlrpc_connection *c = NULL;
+
+ LASSERT(actual_hnode != NULL);
+
+ c = hlist_entry(actual_hnode, struct ptlrpc_connection, c_hash);
+
+ LASSERT(c != NULL);
+
+ atomic_inc(&c->c_refcount);
+
+ RETURN(c);
+}
+
+void conn_refcount_put(struct hlist_node *actual_hnode)
+{
+ struct ptlrpc_connection *c = NULL;
+
+ LASSERT(actual_hnode != NULL);
+
+ c = hlist_entry(actual_hnode, struct ptlrpc_connection, c_hash);
+
+ LASSERT(c != NULL);
+
+ atomic_dec(&c->c_refcount);
+}
+
#include <obd_ost.h>
#include <obd_class.h>
#include <lprocfs_status.h>
+#include <class_hash.h>
extern struct list_head obd_types;
spinlock_t obd_types_lock;
struct obd_export *class_new_export(struct obd_device *obd,
struct obd_uuid *cluuid)
{
- struct obd_export *export, *tmp;
+ struct obd_export *export;
+ int rc = 0;
OBD_ALLOC(export, sizeof(*export));
if (!export)
class_handle_hash(&export->exp_handle, export_handle_addref);
export->exp_last_request_time = CURRENT_SECONDS;
spin_lock_init(&export->exp_lock);
+ INIT_HLIST_NODE(&export->exp_uuid_hash);
+ INIT_HLIST_NODE(&export->exp_nid_hash);
export->exp_client_uuid = *cluuid;
obd_init_export(export);
- spin_lock(&obd->obd_dev_lock);
if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
- list_for_each_entry(tmp, &obd->obd_exports, exp_obd_chain) {
- if (obd_uuid_equals(cluuid, &tmp->exp_client_uuid)) {
- spin_unlock(&obd->obd_dev_lock);
- CWARN("%s: denying duplicate export for %s\n",
- obd->obd_name, cluuid->uuid);
- class_handle_unhash(&export->exp_handle);
- OBD_FREE_PTR(export);
- return ERR_PTR(-EALREADY);
- }
- }
+ rc = lustre_hash_additem_unique(obd->obd_uuid_hash_body, cluuid,
+ &export->exp_uuid_hash);
+ if (rc != 0) {
+ CWARN("%s: denying duplicate export for %s\n",
+ obd->obd_name, cluuid->uuid);
+ class_handle_unhash(&export->exp_handle);
+ OBD_FREE_PTR(export);
+ return ERR_PTR(-EALREADY);
+ }
}
+
+ spin_lock(&obd->obd_dev_lock);
LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
class_incref(obd);
list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
class_handle_unhash(&exp->exp_handle);
spin_lock(&exp->exp_obd->obd_dev_lock);
+ /* delete an uuid-export hashitem from hashtables */
+ if (!hlist_unhashed(&exp->exp_uuid_hash)) {
+ lustre_hash_delitem(exp->exp_obd->obd_uuid_hash_body,
+ &exp->exp_client_uuid, &exp->exp_uuid_hash);
+ }
list_del_init(&exp->exp_obd_chain);
list_del_init(&exp->exp_obd_chain_timed);
exp->exp_obd->obd_num_exports--;
spin_lock(&export->exp_lock);
already_disconnected = export->exp_disconnected;
export->exp_disconnected = 1;
+
+ if (!hlist_unhashed(&export->exp_nid_hash)) {
+ lustre_hash_delitem(export->exp_obd->obd_nid_hash_body,
+ &export->exp_connection->c_peer.nid, &export->exp_nid_hash);
+ }
spin_unlock(&export->exp_lock);
/* class_cleanup(), abort_recovery(), and class_fail_export()
}
EXPORT_SYMBOL(obd_export_nid2str);
-#define EVICT_BATCH 32
int obd_export_evict_by_nid(struct obd_device *obd, char *nid)
{
- struct obd_export *doomed_exp[EVICT_BATCH] = { NULL };
- struct list_head *p;
- int exports_evicted = 0, num_to_evict = 0, i;
+ struct obd_export *doomed_exp = NULL;
+ int exports_evicted = 0;
-search_again:
- spin_lock(&obd->obd_dev_lock);
- list_for_each(p, &obd->obd_exports) {
- doomed_exp[num_to_evict] = list_entry(p, struct obd_export,
- exp_obd_chain);
- if (strcmp(obd_export_nid2str(doomed_exp[num_to_evict]),
- nid) == 0) {
- class_export_get(doomed_exp[num_to_evict]);
- if (++num_to_evict == EVICT_BATCH)
- break;
- }
- }
- spin_unlock(&obd->obd_dev_lock);
+ lnet_nid_t nid_key = libcfs_str2nid(nid);
- for (i = 0; i < num_to_evict; i++) {
+ do {
+ doomed_exp = lustre_hash_get_object_by_key(obd->obd_nid_hash_body,
+ &nid_key);
+
+ if (doomed_exp == NULL)
+ break;
+
+ LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
+ "nid %s found, wanted nid %s, requested nid %s\n",
+ obd_export_nid2str(doomed_exp),
+ libcfs_nid2str(nid_key), nid);
+
exports_evicted++;
CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
- obd->obd_name, nid, doomed_exp[i]->exp_client_uuid.uuid,
+ obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
exports_evicted);
- class_fail_export(doomed_exp[i]);
- class_export_put(doomed_exp[i]);
- }
- if (num_to_evict == EVICT_BATCH) {
- num_to_evict = 0;
- goto search_again;
- }
+ class_fail_export(doomed_exp);
+ class_export_put(doomed_exp);
+ } while (1);
if (!exports_evicted)
CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
int obd_export_evict_by_uuid(struct obd_device *obd, char *uuid)
{
struct obd_export *doomed_exp = NULL;
- struct list_head *p;
struct obd_uuid doomed;
int exports_evicted = 0;
obd_str2uuid(&doomed, uuid);
- spin_lock(&obd->obd_dev_lock);
- list_for_each(p, &obd->obd_exports) {
- doomed_exp = list_entry(p, struct obd_export, exp_obd_chain);
-
- if (obd_uuid_equals(&doomed, &doomed_exp->exp_client_uuid)) {
- class_export_get(doomed_exp);
- break;
- }
- doomed_exp = NULL;
- }
- spin_unlock(&obd->obd_dev_lock);
+ doomed_exp = lustre_hash_get_object_by_key(obd->obd_uuid_hash_body,
+ &doomed);
if (doomed_exp == NULL) {
CERROR("%s: can't disconnect %s: no exports found\n",
#include <lprocfs_status.h>
#include <libcfs/list.h>
#include <lustre_param.h>
+#include <class_hash.h>
+
+extern struct lustre_hash_operations uuid_hash_operations;
+extern struct lustre_hash_operations nid_hash_operations;
/*********** string parsing utils *********/
/* just leave this on forever. I can't use obd_set_up here because
other fns check that status, and we're not actually set up yet. */
obd->obd_starting = 1;
+
+ /* create an uuid-export hash body */
+ err = lustre_hash_init(&obd->obd_uuid_hash_body, "UUID_HASH",
+ 128, &uuid_hash_operations);
+ if (err) {
+ spin_unlock(&obd->obd_dev_lock);
+ GOTO(err_hash, err);
+ }
+
+ /* create a nid-export hash body */
+ err = lustre_hash_init(&obd->obd_nid_hash_body, "NID_HASH",
+ 128, &nid_hash_operations);
spin_unlock(&obd->obd_dev_lock);
+ if (err)
+ GOTO(err_hash, err);
exp = class_new_export(obd, &obd->obd_uuid);
if (IS_ERR(exp))
RETURN(0);
err_exp:
- CERROR("setup %s failed (%d)\n", obd->obd_name, err);
class_unlink_export(obd->obd_self_export);
obd->obd_self_export = NULL;
+err_hash:
+ lustre_hash_exit(&obd->obd_uuid_hash_body);
+ lustre_hash_exit(&obd->obd_nid_hash_body);
obd->obd_starting = 0;
+ CERROR("setup %s failed (%d)\n", obd->obd_name, err);
RETURN(err);
}
LASSERT(obd->obd_self_export);
+ /* destroy an uuid-export hash body */
+ lustre_hash_exit(&obd->obd_uuid_hash_body);
+
+ /* destroy a nid-export hash body */
+ lustre_hash_exit(&obd->obd_nid_hash_body);
+
/* Precleanup stage 1, we must make sure all exports (other than the
self-export) get destroyed. */
err = obd_precleanup(obd, OBD_CLEANUP_EXPORTS);
#endif
#include "ptlrpc_internal.h"
+#include <class_hash.h>
static spinlock_t conn_lock;
static struct list_head conn_list;
static struct list_head conn_unused_list;
+static struct lustre_class_hash_body *conn_hash_body;
+static struct lustre_class_hash_body *conn_unused_hash_body;
+
+extern struct lustre_hash_operations conn_hash_operations;
void ptlrpc_dump_connections(void)
{
ptlrpc_lookup_conn_locked (lnet_process_id_t peer)
{
struct ptlrpc_connection *c;
- struct list_head *tmp;
-
- list_for_each(tmp, &conn_list) {
- c = list_entry(tmp, struct ptlrpc_connection, c_link);
-
- if (peer.nid == c->c_peer.nid &&
- peer.pid == c->c_peer.pid)
- return ptlrpc_connection_addref(c);
- }
- list_for_each(tmp, &conn_unused_list) {
- c = list_entry(tmp, struct ptlrpc_connection, c_link);
+ c = lustre_hash_get_object_by_key(conn_hash_body, &peer);
+ if (c != NULL)
+ return c;
- if (peer.nid == c->c_peer.nid &&
- peer.pid == c->c_peer.pid) {
- list_del(&c->c_link);
- list_add(&c->c_link, &conn_list);
- return ptlrpc_connection_addref(c);
- }
- }
+ c = lustre_hash_get_object_by_key(conn_unused_hash_body, &peer);
+ if (c != NULL)
+ return c;
return NULL;
}
{
struct ptlrpc_connection *c;
struct ptlrpc_connection *c2;
+ int rc = 0;
ENTRY;
CDEBUG(D_INFO, "self %s peer %s\n",
spin_lock(&conn_lock);
c2 = ptlrpc_lookup_conn_locked(peer);
- if (c2 == NULL)
+ if (c2 == NULL) {
list_add(&c->c_link, &conn_list);
-
+ rc = lustre_hash_additem_unique(conn_hash_body, &peer,
+ &c->c_hash);
+ if (rc != 0) {
+ list_del(&c->c_link);
+ CERROR("Cannot add connection to conn_hash_body\n");
+ goto out_conn;
+ }
+ }
+
+out_conn:
+
spin_unlock(&conn_lock);
- if (c2 == NULL)
+ if (c2 == NULL && rc == 0)
RETURN (c);
-
- OBD_FREE(c, sizeof(*c));
+
+ if (c != NULL)
+ OBD_FREE(c, sizeof(*c));
RETURN (c2);
}
c, atomic_read(&c->c_refcount) - 1,
libcfs_nid2str(c->c_peer.nid));
- if (atomic_dec_and_test(&c->c_refcount)) {
+ spin_lock(&conn_lock);
+ LASSERT(!hlist_unhashed(&c->c_hash));
+ spin_unlock(&conn_lock);
+
+ if (atomic_dec_return(&c->c_refcount) == 1) {
+
spin_lock(&conn_lock);
+
+ lustre_hash_delitem(conn_hash_body, &peer, &c->c_hash);
list_del(&c->c_link);
+
list_add(&c->c_link, &conn_unused_list);
+ rc = lustre_hash_additem_unique(conn_unused_hash_body, &peer,
+ &c->c_hash);
+ if (rc != 0) {
+ spin_unlock(&conn_lock);
+ CERROR("Cannot hash connection to conn_hash_body\n");
+ GOTO(ret, rc);
+ }
+
spin_unlock(&conn_lock);
rc = 1;
- }
+
+ }
+
if (atomic_read(&c->c_refcount) < 0)
CERROR("connection %p refcount %d!\n",
c, atomic_read(&c->c_refcount));
+ret :
RETURN(rc);
}
RETURN(c);
}
-void ptlrpc_init_connection(void)
+int ptlrpc_init_connection(void)
{
+ int rc = 0;
CFS_INIT_LIST_HEAD(&conn_list);
+ rc = lustre_hash_init(&conn_hash_body, "CONN_HASH",
+ 128, &conn_hash_operations);
+ if (rc)
+ GOTO(ret, rc);
+
CFS_INIT_LIST_HEAD(&conn_unused_list);
+ rc = lustre_hash_init(&conn_unused_hash_body, "CONN_UNUSED_HASH",
+ 128, &conn_hash_operations);
+ if (rc)
+ GOTO(ret, rc);
+
spin_lock_init(&conn_lock);
+ret:
+ if (rc) {
+ lustre_hash_exit(&conn_hash_body);
+ lustre_hash_exit(&conn_unused_hash_body);
+ }
+ RETURN(rc);
}
void ptlrpc_cleanup_connection(void)
struct ptlrpc_connection *c;
spin_lock(&conn_lock);
+
+ lustre_hash_exit(&conn_unused_hash_body);
list_for_each_safe(tmp, pos, &conn_unused_list) {
c = list_entry(tmp, struct ptlrpc_connection, c_link);
list_del(&c->c_link);
OBD_FREE(c, sizeof(*c));
}
+
+ lustre_hash_exit(&conn_hash_body);
list_for_each_safe(tmp, pos, &conn_list) {
c = list_entry(tmp, struct ptlrpc_connection, c_link);
CERROR("Connection %p/%s has refcount %d (nid=%s)\n",
RETURN(rc);
cleanup_phase = 1;
- ptlrpc_init_connection();
- rc = llog_init_commit_master();
+ rc = ptlrpc_init_connection();
if (rc)
GOTO(cleanup, rc);
cleanup_phase = 2;
+ rc = llog_init_commit_master();
+ if (rc)
+ GOTO(cleanup, rc);
+ cleanup_phase = 3;
+
ptlrpc_put_connection_superhack = ptlrpc_put_connection;
rc = ptlrpc_start_pinger();
if (rc)
GOTO(cleanup, rc);
- cleanup_phase = 3;
+ cleanup_phase = 4;
rc = ldlm_init();
if (rc)
cleanup:
switch(cleanup_phase) {
- case 3:
+ case 4:
ptlrpc_stop_pinger();
- case 2:
+ case 3:
llog_cleanup_commit_master(1);
+ case 2:
ptlrpc_cleanup_connection();
case 1:
ptlrpc_exit_portals();