Whamcloud - gitweb
b=16777
authoryury <yury>
Sat, 13 Sep 2008 17:47:04 +0000 (17:47 +0000)
committeryury <yury>
Sat, 13 Sep 2008 17:47:04 +0000 (17:47 +0000)
r=adilger,robert

- fixes server side scalability issue which became visible with lru resize work landed.
It replcaes linear list of held locks on server with hash table from class_hash.c which improves lock find time drastically for case when one client holds ~limit of server locks (quite a big number for big servers) which is very possible if all clients are idle and one compiles kernel, etc.

13 files changed:
lustre/include/lustre_dlm.h
lustre/include/lustre_export.h
lustre/ldlm/ldlm_flock.c
lustre/ldlm/ldlm_lib.c
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c
lustre/mdt/mdt_handler.c
lustre/mgs/mgs_handler.c
lustre/obdclass/genops.c
lustre/obdclass/lprocfs_status.c
lustre/obdecho/echo.c
lustre/obdfilter/filter.c

index 7c65539..c254e32 100644 (file)
@@ -543,81 +543,153 @@ struct ldlm_interval_tree {
 };
 
 struct ldlm_lock {
 };
 
 struct ldlm_lock {
-        struct portals_handle l_handle; // must be first in the structure
-        atomic_t              l_refc;
-
-        /* internal spinlock protects l_resource.  we should hold this lock
-         * first before grabbing res_lock.*/
-        spinlock_t            l_lock;
-
-        /* ldlm_lock_change_resource() can change this */
-        struct ldlm_resource *l_resource;
-
-        /* protected by ns_hash_lock. FIXME */
-        struct list_head      l_lru;
-
-        /* protected by lr_lock, linkage to resource's lock queues */
-        struct list_head      l_res_link;
-
-        struct ldlm_interval *l_tree_node;      /* tree node for ldlm_extent */
-
-        /* protected by led_lock */
-        struct list_head      l_export_chain; // per-export chain of locks
-
-        /* protected by lr_lock */
-        ldlm_mode_t           l_req_mode;
-        ldlm_mode_t           l_granted_mode;
-
+        /** 
+         * Must be first in the structure.
+         */
+        struct portals_handle    l_handle;
+        /**
+         * Lock reference count.
+         */
+        atomic_t                 l_refc;
+        /** 
+         * Internal spinlock protects l_resource.  we should hold this lock
+         * first before grabbing res_lock.
+         */
+        spinlock_t               l_lock;
+        /** 
+         * ldlm_lock_change_resource() can change this. 
+         */
+        struct ldlm_resource    *l_resource;
+        /** 
+         * Protected by ns_hash_lock. List item for client side lru list.
+         */
+        struct list_head         l_lru;
+        /** 
+         * Protected by lr_lock, linkage to resource's lock queues. 
+         */
+        struct list_head         l_res_link;
+        /** 
+         * Tree node for ldlm_extent. 
+         */
+        struct ldlm_interval    *l_tree_node;
+        /** 
+         * Protected by per-bucket exp->exp_lock_hash locks. Per export hash
+         * of locks.
+         */
+        struct hlist_node        l_exp_hash;
+        /** 
+         * Protected by lr_lock. Requested mode. 
+         */
+        ldlm_mode_t              l_req_mode;
+        /**
+         * Granted mode, also protected by lr_lock.
+         */
+        ldlm_mode_t              l_granted_mode;
+        /**
+         * Lock enqueue completion handler.
+         */
         ldlm_completion_callback l_completion_ast;
         ldlm_completion_callback l_completion_ast;
+        /**
+         * Lock blocking ast handler.
+         */
         ldlm_blocking_callback   l_blocking_ast;
         ldlm_blocking_callback   l_blocking_ast;
+        /**
+         * Lock glimpse handler.
+         */
         ldlm_glimpse_callback    l_glimpse_ast;
 
         ldlm_glimpse_callback    l_glimpse_ast;
 
-        struct obd_export    *l_export;
-        struct obd_export    *l_conn_export;
+        /**
+         * Lock export.
+         */
+        struct obd_export       *l_export;
+        /**
+         * Lock connection export.
+         */
+        struct obd_export       *l_conn_export;
 
 
-        struct lustre_handle  l_remote_handle;
-        ldlm_policy_data_t    l_policy_data;
+        /**
+         * Remote lock handle.
+         */
+        struct lustre_handle     l_remote_handle;
 
 
-        /* protected by lr_lock */
+        ldlm_policy_data_t       l_policy_data;
+
+        /*
+         * Protected by lr_lock. Various counters: readers, writers, etc.
+         */
         __u32                 l_flags;
         __u32                 l_readers;
         __u32                 l_writers;
         __u8                  l_destroyed;
 
         __u32                 l_flags;
         __u32                 l_readers;
         __u32                 l_writers;
         __u8                  l_destroyed;
 
-        /* If the lock is granted, a process sleeps on this waitq to learn when
+        /** 
+         * If the lock is granted, a process sleeps on this waitq to learn when
          * it's no longer in use.  If the lock is not granted, a process sleeps
          * it's no longer in use.  If the lock is not granted, a process sleeps
-         * on this waitq to learn when it becomes granted. */
+         * on this waitq to learn when it becomes granted. 
+         */
         cfs_waitq_t           l_waitq;
         cfs_waitq_t           l_waitq;
+
         struct timeval        l_enqueued_time;
 
         struct timeval        l_enqueued_time;
 
-        cfs_time_t            l_last_used;      /* jiffies */
+        /**
+         * Jiffies. Should be converted to time if needed. 
+         */
+        cfs_time_t            l_last_used;
+
         struct ldlm_extent    l_req_extent;
 
         struct ldlm_extent    l_req_extent;
 
-        /* Client-side-only members */
-        __u32                 l_lvb_len;        /* temporary storage for */
-        void                 *l_lvb_data;       /* an LVB received during */
-        void                 *l_lvb_swabber;    /* an enqueue */
+        /* 
+         * Client-side-only members. 
+         */
+         
+        /** 
+         * Temporary storage for an LVB received during an enqueue operation.
+         */
+        __u32                 l_lvb_len;
+        void                 *l_lvb_data;
+        void                 *l_lvb_swabber;
+
         void                 *l_ast_data;
         spinlock_t            l_extents_list_lock;
         struct list_head      l_extents_list;
 
         struct list_head      l_cache_locks_list;
 
         void                 *l_ast_data;
         spinlock_t            l_extents_list_lock;
         struct list_head      l_extents_list;
 
         struct list_head      l_cache_locks_list;
 
-        /* Server-side-only members */
+        /* 
+         * Server-side-only members. 
+         */
 
 
-        /* protected by elt_lock */
-        struct list_head      l_pending_chain;  /* callbacks pending */
-        cfs_time_t            l_callback_timeout; /* jiffies */
+        /** 
+         * Protected by elt_lock. Callbacks pending.
+         */
+        struct list_head      l_pending_chain;
 
 
-        __u32                 l_pid;            /* pid which created this lock */
+        cfs_time_t            l_callback_timeout;
 
 
-        /* for ldlm_add_ast_work_item() */
+        /** 
+         * Pid which created this lock. 
+         */
+        __u32                 l_pid;
+
+        /** 
+         * For ldlm_add_ast_work_item(). 
+         */
         struct list_head      l_bl_ast;
         struct list_head      l_bl_ast;
+        /** 
+         * For ldlm_add_ast_work_item(). 
+         */
         struct list_head      l_cp_ast;
         struct list_head      l_cp_ast;
+        /** 
+         * For ldlm_add_ast_work_item(). 
+         */
+        struct list_head      l_rk_ast;
+
         struct ldlm_lock     *l_blocking_lock;
         int                   l_bl_ast_run;
 
         struct ldlm_lock     *l_blocking_lock;
         int                   l_bl_ast_run;
 
-        /* protected by lr_lock, linkages to "skip lists" */
+        /** 
+         * Protected by lr_lock, linkages to "skip lists". 
+         */
         struct list_head      l_sl_mode;
         struct list_head      l_sl_policy;
 };
         struct list_head      l_sl_mode;
         struct list_head      l_sl_policy;
 };
@@ -767,6 +839,8 @@ int ldlm_refresh_waiting_lock(struct ldlm_lock *lock);
 void ldlm_revoke_export_locks(struct obd_export *exp);
 int ldlm_get_ref(void);
 void ldlm_put_ref(void);
 void ldlm_revoke_export_locks(struct obd_export *exp);
 int ldlm_get_ref(void);
 void ldlm_put_ref(void);
+int ldlm_init_export(struct obd_export *exp);
+void ldlm_destroy_export(struct obd_export *exp);
 
 /* ldlm_lock.c */
 ldlm_processing_policy ldlm_get_processing_policy(struct ldlm_resource *res);
 
 /* ldlm_lock.c */
 ldlm_processing_policy ldlm_get_processing_policy(struct ldlm_resource *res);
index 387bf9e..55961d1 100644 (file)
@@ -40,6 +40,7 @@
 #include <lustre/lustre_idl.h>
 #include <lustre_dlm.h>
 #include <lprocfs_status.h>
 #include <lustre/lustre_idl.h>
 #include <lustre_dlm.h>
 #include <lprocfs_status.h>
+#include <class_hash.h>
 
 /* Data stored per client in the last_rcvd file.  In le32 order. */
 struct mds_client_data;
 
 /* Data stored per client in the last_rcvd file.  In le32 order. */
 struct mds_client_data;
@@ -76,11 +77,6 @@ struct osc_creator {
         cfs_waitq_t             oscc_waitq; /* creating procs wait on this */
 };
 
         cfs_waitq_t             oscc_waitq; /* creating procs wait on this */
 };
 
-struct ldlm_export_data {
-        struct list_head       led_held_locks; /* protected by led_lock */
-        spinlock_t             led_lock;
-};
-
 struct ec_export_data { /* echo client */
         struct list_head eced_locks;
 };
 struct ec_export_data { /* echo client */
         struct list_head eced_locks;
 };
@@ -128,7 +124,8 @@ struct obd_export {
         struct lprocfs_stats     *exp_ldlm_stats;
         struct ptlrpc_connection *exp_connection;
         __u32                     exp_conn_cnt;
         struct lprocfs_stats     *exp_ldlm_stats;
         struct ptlrpc_connection *exp_connection;
         __u32                     exp_conn_cnt;
-        struct ldlm_export_data   exp_ldlm_data;
+        lustre_hash_t            *exp_lock_hash; /* existing lock hash */
+        spinlock_t                exp_lock_hash_lock;
         struct list_head          exp_outstanding_replies;
         time_t                    exp_last_request_time;
         struct list_head          exp_req_replay_queue;
         struct list_head          exp_outstanding_replies;
         time_t                    exp_last_request_time;
         struct list_head          exp_req_replay_queue;
index 86e8cdc..fc3b593 100644 (file)
@@ -398,10 +398,11 @@ reprocess:
                 new2->l_conn_export = lock->l_conn_export;
                 if (lock->l_export != NULL) {
                         new2->l_export = class_export_get(lock->l_export);
                 new2->l_conn_export = lock->l_conn_export;
                 if (lock->l_export != NULL) {
                         new2->l_export = class_export_get(lock->l_export);
-                        spin_lock(&new2->l_export->exp_ldlm_data.led_lock);
-                        list_add(&new2->l_export_chain,
-                                 &new2->l_export->exp_ldlm_data.led_held_locks);
-                        spin_unlock(&new2->l_export->exp_ldlm_data.led_lock);
+                        if (new2->l_export->exp_lock_hash && 
+                            hlist_unhashed(&new2->l_exp_hash))
+                                lustre_hash_add(new2->l_export->exp_lock_hash,
+                                                &new2->l_remote_handle,
+                                                &new2->l_exp_hash);
                 }
                 if (*flags == LDLM_FL_WAIT_NOREPROC) {
                         ldlm_lock_addref_internal_nolock(new2, lock->l_granted_mode);
                 }
                 if (*flags == LDLM_FL_WAIT_NOREPROC) {
                         ldlm_lock_addref_internal_nolock(new2, lock->l_granted_mode);
index c6de21f..3978b71 100644 (file)
@@ -909,6 +909,7 @@ dont_check_exports:
                                                        &remote_uuid);
 
         spin_lock(&target->obd_dev_lock);
                                                        &remote_uuid);
 
         spin_lock(&target->obd_dev_lock);
+
         /* Export might be hashed already, e.g. if this is reconnect */
         if (hlist_unhashed(&export->exp_nid_hash))
                 lustre_hash_add(export->exp_obd->obd_nid_hash,
         /* Export might be hashed already, e.g. if this is reconnect */
         if (hlist_unhashed(&export->exp_nid_hash))
                 lustre_hash_add(export->exp_obd->obd_nid_hash,
index 5678333..27965f4 100644 (file)
@@ -265,11 +265,9 @@ int ldlm_lock_destroy_internal(struct ldlm_lock *lock)
         }
         lock->l_destroyed = 1;
 
         }
         lock->l_destroyed = 1;
 
-        if (lock->l_export)
-                spin_lock(&lock->l_export->exp_ldlm_data.led_lock);
-        list_del_init(&lock->l_export_chain);
-        if (lock->l_export)
-                spin_unlock(&lock->l_export->exp_ldlm_data.led_lock);
+        if (lock->l_export && lock->l_export->exp_lock_hash)
+                lustre_hash_del(lock->l_export->exp_lock_hash,
+                                &lock->l_remote_handle, &lock->l_exp_hash);
 
         ldlm_lock_remove_from_lru(lock);
         class_handle_unhash(&lock->l_handle);
 
         ldlm_lock_remove_from_lru(lock);
         class_handle_unhash(&lock->l_handle);
@@ -343,14 +341,15 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource)
         atomic_set(&lock->l_refc, 2);
         CFS_INIT_LIST_HEAD(&lock->l_res_link);
         CFS_INIT_LIST_HEAD(&lock->l_lru);
         atomic_set(&lock->l_refc, 2);
         CFS_INIT_LIST_HEAD(&lock->l_res_link);
         CFS_INIT_LIST_HEAD(&lock->l_lru);
-        CFS_INIT_LIST_HEAD(&lock->l_export_chain);
         CFS_INIT_LIST_HEAD(&lock->l_pending_chain);
         CFS_INIT_LIST_HEAD(&lock->l_bl_ast);
         CFS_INIT_LIST_HEAD(&lock->l_cp_ast);
         CFS_INIT_LIST_HEAD(&lock->l_pending_chain);
         CFS_INIT_LIST_HEAD(&lock->l_bl_ast);
         CFS_INIT_LIST_HEAD(&lock->l_cp_ast);
+        CFS_INIT_LIST_HEAD(&lock->l_rk_ast);
         cfs_waitq_init(&lock->l_waitq);
         lock->l_blocking_lock = NULL;
         CFS_INIT_LIST_HEAD(&lock->l_sl_mode);
         CFS_INIT_LIST_HEAD(&lock->l_sl_policy);
         cfs_waitq_init(&lock->l_waitq);
         lock->l_blocking_lock = NULL;
         CFS_INIT_LIST_HEAD(&lock->l_sl_mode);
         CFS_INIT_LIST_HEAD(&lock->l_sl_policy);
+        CFS_INIT_HLIST_NODE(&lock->l_exp_hash);
 
         atomic_inc(&resource->lr_namespace->ns_locks);
         CFS_INIT_LIST_HEAD(&lock->l_handle.h_link);
 
         atomic_inc(&resource->lr_namespace->ns_locks);
         CFS_INIT_LIST_HEAD(&lock->l_handle.h_link);
@@ -1444,10 +1443,10 @@ static int
 ldlm_work_revoke_ast_lock(struct list_head *tmp, struct ldlm_cb_set_arg *arg)
 {
         struct ldlm_lock_desc desc;
 ldlm_work_revoke_ast_lock(struct list_head *tmp, struct ldlm_cb_set_arg *arg)
 {
         struct ldlm_lock_desc desc;
-        struct ldlm_lock *lock = list_entry(tmp, struct ldlm_lock, l_export_chain);
+        struct ldlm_lock *lock = list_entry(tmp, struct ldlm_lock, l_rk_ast);
         ENTRY;
 
         ENTRY;
 
-        list_del_init(&lock->l_export_chain);
+        list_del_init(&lock->l_rk_ast);
 
         /* the desc just pretend to exclusive */
         ldlm_lock2desc(lock, &desc);
 
         /* the desc just pretend to exclusive */
         ldlm_lock2desc(lock, &desc);
@@ -1660,30 +1659,27 @@ int ldlm_lock_set_data(struct lustre_handle *lockh, void *data)
         RETURN(0);
 }
 
         RETURN(0);
 }
 
-void ldlm_cancel_locks_for_export(struct obd_export *exp)
+void ldlm_cancel_locks_for_export_cb(void *obj, void *data)
 {
 {
-        struct ldlm_lock *lock;
+        struct obd_export    *exp = data;
+        struct ldlm_lock     *lock = obj;
         struct ldlm_resource *res;
 
         struct ldlm_resource *res;
 
-        spin_lock(&exp->exp_ldlm_data.led_lock);
-        while(!list_empty(&exp->exp_ldlm_data.led_held_locks)) {
-                lock = list_entry(exp->exp_ldlm_data.led_held_locks.next,
-                                  struct ldlm_lock, l_export_chain);
-                res = ldlm_resource_getref(lock->l_resource);
-                LDLM_LOCK_GET(lock);
-                spin_unlock(&exp->exp_ldlm_data.led_lock);
-
-                LDLM_DEBUG(lock, "export %p", exp);
-                ldlm_res_lvbo_update(res, NULL, 0, 1);
+        res = ldlm_resource_getref(lock->l_resource);
+        LDLM_LOCK_GET(lock);
 
 
-                ldlm_lock_cancel(lock);
-                ldlm_reprocess_all(res);
+        LDLM_DEBUG(lock, "export %p", exp);
+        ldlm_res_lvbo_update(res, NULL, 0, 1);
+        ldlm_lock_cancel(lock);
+        ldlm_reprocess_all(res);
+        ldlm_resource_putref(res);
+        LDLM_LOCK_PUT(lock);
+}
 
 
-                ldlm_resource_putref(res);
-                LDLM_LOCK_PUT(lock);
-                spin_lock(&exp->exp_ldlm_data.led_lock);
-        }
-        spin_unlock(&exp->exp_ldlm_data.led_lock);
+void ldlm_cancel_locks_for_export(struct obd_export *exp)
+{
+        lustre_hash_for_each_empty(exp->exp_lock_hash,
+                                   ldlm_cancel_locks_for_export_cb, exp);
 }
 
 struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
 }
 
 struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
index 88afb3f..870ae23 100644 (file)
@@ -839,26 +839,6 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data)
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
-static struct ldlm_lock *
-find_existing_lock(struct obd_export *exp,
-                   const struct lustre_handle *remote_hdl)
-{
-        struct list_head *iter;
-
-        spin_lock(&exp->exp_ldlm_data.led_lock);
-        list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) {
-                struct ldlm_lock *lock;
-                lock = list_entry(iter, struct ldlm_lock, l_export_chain);
-                if (lock->l_remote_handle.cookie == remote_hdl->cookie) {
-                        LDLM_LOCK_GET(lock);
-                        spin_unlock(&exp->exp_ldlm_data.led_lock);
-                        return lock;
-                }
-        }
-        spin_unlock(&exp->exp_ldlm_data.led_lock);
-        return NULL;
-}
-
 #ifdef __KERNEL__
 extern unsigned long long lu_time_stamp_get(void);
 #else
 #ifdef __KERNEL__
 extern unsigned long long lu_time_stamp_get(void);
 #else
@@ -979,8 +959,9 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns,
 #endif
 
         if (unlikely(flags & LDLM_FL_REPLAY)) {
 #endif
 
         if (unlikely(flags & LDLM_FL_REPLAY)) {
-                lock = find_existing_lock(req->rq_export,
-                                          &dlm_req->lock_handle[0]);
+                /* Find an existing lock in the per-export lock hash */
+                lock = lustre_hash_lookup(req->rq_export->exp_lock_hash,
+                                          (void *)&dlm_req->lock_handle[0]);
                 if (lock != NULL) {
                         DEBUG_REQ(D_DLMTRACE, req, "found existing lock cookie "
                                   LPX64, lock->l_handle.h_cookie);
                 if (lock != NULL) {
                         DEBUG_REQ(D_DLMTRACE, req, "found existing lock cookie "
                                   LPX64, lock->l_handle.h_cookie);
@@ -1010,10 +991,11 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns,
                 GOTO(out, rc = -ENOTCONN);
         }
         lock->l_export = class_export_get(req->rq_export);
                 GOTO(out, rc = -ENOTCONN);
         }
         lock->l_export = class_export_get(req->rq_export);
-        spin_lock(&lock->l_export->exp_ldlm_data.led_lock);
-        list_add(&lock->l_export_chain,
-                 &lock->l_export->exp_ldlm_data.led_held_locks);
-        spin_unlock(&lock->l_export->exp_ldlm_data.led_lock);
+
+        if (lock->l_export->exp_lock_hash)
+                lustre_hash_add(lock->l_export->exp_lock_hash,
+                                &lock->l_remote_handle, 
+                                &lock->l_exp_hash);
 
 existing_lock:
 
 
 existing_lock:
 
@@ -1829,47 +1811,51 @@ static int ldlm_cancel_handler(struct ptlrpc_request *req)
         RETURN(0);
 }
 
         RETURN(0);
 }
 
-void ldlm_revoke_export_locks(struct obd_export *exp)
+void ldlm_revoke_lock_cb(void *obj, void *data)
 {
 {
-        struct list_head *locklist = &exp->exp_ldlm_data.led_held_locks;
-        struct list_head  rpc_list;
-        struct ldlm_lock *lock, *next;
+        struct list_head   *rpc_list = data;
+        struct ldlm_lock   *lock = obj;
 
 
-        ENTRY;
-        CFS_INIT_LIST_HEAD(&rpc_list);
+        lock_res_and_lock(lock);
 
 
-        spin_lock(&exp->exp_ldlm_data.led_lock);
-        list_for_each_entry_safe(lock, next, locklist, l_export_chain) {
-                lock_res_and_lock(lock);
+        if (lock->l_req_mode != lock->l_granted_mode) {
+                unlock_res_and_lock(lock);
+                return;
+        }
 
 
-                if (lock->l_req_mode != lock->l_granted_mode) {
-                        unlock_res_and_lock(lock);
-                        continue;
-                }
+        LASSERT(lock->l_resource);
+        if (lock->l_resource->lr_type != LDLM_IBITS &&
+            lock->l_resource->lr_type != LDLM_PLAIN) {
+                unlock_res_and_lock(lock);
+                return;
+        }
 
 
-                LASSERT(lock->l_resource);
-                if (lock->l_resource->lr_type != LDLM_IBITS &&
-                    lock->l_resource->lr_type != LDLM_PLAIN) {
-                        unlock_res_and_lock(lock);
-                        continue;
-                }
+        if (lock->l_flags & LDLM_FL_AST_SENT) {
+                unlock_res_and_lock(lock);
+                return;
+        }
 
 
-                if (lock->l_flags & LDLM_FL_AST_SENT) {
-                        unlock_res_and_lock(lock);
-                        continue;
-                }
+        LASSERT(lock->l_blocking_ast);
+        LASSERT(!lock->l_blocking_lock);
 
 
-                LASSERT(lock->l_blocking_ast);
-                LASSERT(!lock->l_blocking_lock);
+        lock->l_flags |= LDLM_FL_AST_SENT;
+        if (lock->l_export && lock->l_export->exp_lock_hash)
+                lustre_hash_del(lock->l_export->exp_lock_hash,
+                                &lock->l_remote_handle, &lock->l_exp_hash);
+        list_add_tail(&lock->l_rk_ast, rpc_list);
+        LDLM_LOCK_GET(lock);
 
 
-                lock->l_flags |= LDLM_FL_AST_SENT;
-                list_move(&lock->l_export_chain, &rpc_list);
-                LDLM_LOCK_GET(lock);
+        unlock_res_and_lock(lock);
+}
 
 
-                unlock_res_and_lock(lock);
-        }
-        spin_unlock(&exp->exp_ldlm_data.led_lock);
+void ldlm_revoke_export_locks(struct obd_export *exp)
+{
+        struct list_head  rpc_list;
+        ENTRY;
 
 
+        CFS_INIT_LIST_HEAD(&rpc_list);
+        lustre_hash_for_each_empty(exp->exp_lock_hash,
+                                   ldlm_revoke_lock_cb, &rpc_list);
         ldlm_run_ast_work(&rpc_list, LDLM_WORK_REVOKE_AST);
 
         EXIT;
         ldlm_run_ast_work(&rpc_list, LDLM_WORK_REVOKE_AST);
 
         EXIT;
@@ -2038,6 +2024,88 @@ void ldlm_put_ref(void)
         EXIT;
 }
 
         EXIT;
 }
 
+/* 
+ * Export handle<->lock hash operations. 
+ */
+static unsigned
+ldlm_export_lock_hash(lustre_hash_t *lh, void *key, unsigned mask)
+{
+        return lh_u64_hash(((struct lustre_handle *)key)->cookie, mask);
+}
+
+static void *
+ldlm_export_lock_key(struct hlist_node *hnode)
+{
+        struct ldlm_lock *lock;
+        ENTRY;
+
+        lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
+        RETURN(&lock->l_remote_handle);
+}
+
+static int
+ldlm_export_lock_compare(void *key, struct hlist_node *hnode)
+{
+        ENTRY;
+        RETURN(lustre_handle_equal(ldlm_export_lock_key(hnode), key));
+}
+
+static void *
+ldlm_export_lock_get(struct hlist_node *hnode)
+{
+        struct ldlm_lock *lock;
+        ENTRY;
+
+        lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
+        LDLM_LOCK_GET(lock);
+
+        RETURN(lock);
+}
+
+static void *
+ldlm_export_lock_put(struct hlist_node *hnode)
+{
+        struct ldlm_lock *lock;
+        ENTRY;
+
+        lock = hlist_entry(hnode, struct ldlm_lock, l_exp_hash);
+        LDLM_LOCK_PUT(lock);
+
+        RETURN(lock);
+}
+
+static lustre_hash_ops_t ldlm_export_lock_ops = {
+        .lh_hash    = ldlm_export_lock_hash,
+        .lh_key     = ldlm_export_lock_key,
+        .lh_compare = ldlm_export_lock_compare,
+        .lh_get     = ldlm_export_lock_get,
+        .lh_put     = ldlm_export_lock_put
+};
+
+int ldlm_init_export(struct obd_export *exp)
+{
+        ENTRY;
+
+        exp->exp_lock_hash =
+                lustre_hash_init(obd_uuid2str(&exp->exp_client_uuid),
+                                 128, 65536, &ldlm_export_lock_ops, LH_REHASH);
+
+        if (!exp->exp_lock_hash)
+                RETURN(-ENOMEM);
+
+        RETURN(0);
+}
+EXPORT_SYMBOL(ldlm_init_export);
+
+void ldlm_destroy_export(struct obd_export *exp)
+{
+        ENTRY;
+        lustre_hash_exit(exp->exp_lock_hash);
+        exp->exp_lock_hash = NULL;
+        EXIT;
+}
+EXPORT_SYMBOL(ldlm_destroy_export);
+
 static int ldlm_setup(void)
 {
         struct ldlm_bl_pool *blp;
 static int ldlm_setup(void)
 {
         struct ldlm_bl_pool *blp;
index a75ef68..99f211e 100644 (file)
@@ -377,6 +377,7 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
 {
         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
         int is_replay = *flags & LDLM_FL_REPLAY;
 {
         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
         int is_replay = *flags & LDLM_FL_REPLAY;
+        struct lustre_handle old_hash_key;
         struct ldlm_lock *lock;
         struct ldlm_reply *reply;
         int cleanup_phase = 1;
         struct ldlm_lock *lock;
         struct ldlm_reply *reply;
         int cleanup_phase = 1;
@@ -425,7 +426,15 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
         cleanup_phase = 0;
 
         lock_res_and_lock(lock);
         cleanup_phase = 0;
 
         lock_res_and_lock(lock);
+        old_hash_key = lock->l_remote_handle;
         lock->l_remote_handle = reply->lock_handle;
         lock->l_remote_handle = reply->lock_handle;
+
+        /* Key change rehash lock in per-export hash with new key */
+        if (exp->exp_lock_hash)
+                lustre_hash_rehash_key(exp->exp_lock_hash, &old_hash_key,
+                                       &lock->l_remote_handle,
+                                       &lock->l_exp_hash);
+
         *flags = reply->lock_flags;
         lock->l_flags |= reply->lock_flags & LDLM_INHERIT_FLAGS;
         /* move NO_TIMEOUT flag to the lock to force ldlm_lock_match()
         *flags = reply->lock_flags;
         lock->l_flags |= reply->lock_flags & LDLM_INHERIT_FLAGS;
         /* move NO_TIMEOUT flag to the lock to force ldlm_lock_match()
@@ -1973,8 +1982,10 @@ static int ldlm_chain_lock_for_replay(struct ldlm_lock *lock, void *closure)
 static int replay_lock_interpret(struct ptlrpc_request *req,
                                  struct ldlm_async_args *aa, int rc)
 {
 static int replay_lock_interpret(struct ptlrpc_request *req,
                                  struct ldlm_async_args *aa, int rc)
 {
-        struct ldlm_lock  *lock;
-        struct ldlm_reply *reply;
+        struct lustre_handle  old_hash_key;
+        struct ldlm_lock     *lock;
+        struct ldlm_reply    *reply;
+        struct obd_export    *exp;
 
         ENTRY;
         atomic_dec(&req->rq_import->imp_replay_inflight);
 
         ENTRY;
         atomic_dec(&req->rq_import->imp_replay_inflight);
@@ -1996,7 +2007,16 @@ static int replay_lock_interpret(struct ptlrpc_request *req,
                 GOTO(out, rc = -ESTALE);
         }
 
                 GOTO(out, rc = -ESTALE);
         }
 
+        old_hash_key = lock->l_remote_handle;
         lock->l_remote_handle = reply->lock_handle;
         lock->l_remote_handle = reply->lock_handle;
+
+        /* Key change rehash lock in per-export hash with new key */
+       exp = req->rq_export;
+        if (exp && exp->exp_lock_hash)
+                lustre_hash_rehash_key(exp->exp_lock_hash, &old_hash_key,
+                                      &lock->l_remote_handle,
+                                       &lock->l_exp_hash);
+
         LDLM_DEBUG(lock, "replayed lock:");
         ptlrpc_import_recovery_state_machine(req->rq_import);
         LDLM_LOCK_PUT(lock);
         LDLM_DEBUG(lock, "replayed lock:");
         ptlrpc_import_recovery_state_machine(req->rq_import);
         LDLM_LOCK_PUT(lock);
@@ -2004,7 +2024,6 @@ out:
         if (rc != ELDLM_OK)
                 ptlrpc_connect_import(req->rq_import, NULL);
 
         if (rc != ELDLM_OK)
                 ptlrpc_connect_import(req->rq_import, NULL);
 
-
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
index d746f0e..bc457d4 100644 (file)
@@ -2712,27 +2712,23 @@ int mdt_intent_lock_replace(struct mdt_thread_info *info,
                 RETURN(ELDLM_LOCK_REPLACED);
         }
 
                 RETURN(ELDLM_LOCK_REPLACED);
         }
 
-        /* This lock might already be given to the client by an resent req,
-         * in this case we should return ELDLM_LOCK_ABORTED,
-         * so we should check led_held_locks here, but it will affect
-         * performance, FIXME
+        /* 
+         * Fixup the lock to be given to the client. 
          */
          */
-        /* Fixup the lock to be given to the client */
         lock_res_and_lock(new_lock);
         new_lock->l_readers = 0;
         new_lock->l_writers = 0;
 
         new_lock->l_export = class_export_get(req->rq_export);
         lock_res_and_lock(new_lock);
         new_lock->l_readers = 0;
         new_lock->l_writers = 0;
 
         new_lock->l_export = class_export_get(req->rq_export);
-        spin_lock(&req->rq_export->exp_ldlm_data.led_lock);
-        list_add(&new_lock->l_export_chain,
-                 &new_lock->l_export->exp_ldlm_data.led_held_locks);
-        spin_unlock(&req->rq_export->exp_ldlm_data.led_lock);
-
         new_lock->l_blocking_ast = lock->l_blocking_ast;
         new_lock->l_completion_ast = lock->l_completion_ast;
         new_lock->l_remote_handle = lock->l_remote_handle;
         new_lock->l_flags &= ~LDLM_FL_LOCAL;
 
         new_lock->l_blocking_ast = lock->l_blocking_ast;
         new_lock->l_completion_ast = lock->l_completion_ast;
         new_lock->l_remote_handle = lock->l_remote_handle;
         new_lock->l_flags &= ~LDLM_FL_LOCAL;
 
+        lustre_hash_add(new_lock->l_export->exp_lock_hash,
+                        &new_lock->l_remote_handle, 
+                        &new_lock->l_exp_hash);
+
         unlock_res_and_lock(new_lock);
         LDLM_LOCK_PUT(new_lock);
         lh->mlh_reg_lh.cookie = 0;
         unlock_res_and_lock(new_lock);
         LDLM_LOCK_PUT(new_lock);
         lh->mlh_reg_lh.cookie = 0;
@@ -2749,7 +2745,7 @@ static void mdt_intent_fixup_resent(struct mdt_thread_info *info,
         struct obd_export      *exp = req->rq_export;
         struct lustre_handle    remote_hdl;
         struct ldlm_request    *dlmreq;
         struct obd_export      *exp = req->rq_export;
         struct lustre_handle    remote_hdl;
         struct ldlm_request    *dlmreq;
-        struct list_head       *iter;
+        struct ldlm_lock       *lock;
 
         if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
                 return;
 
         if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
                 return;
@@ -2757,27 +2753,24 @@ static void mdt_intent_fixup_resent(struct mdt_thread_info *info,
         dlmreq = req_capsule_client_get(info->mti_pill, &RMF_DLM_REQ);
         remote_hdl = dlmreq->lock_handle[0];
 
         dlmreq = req_capsule_client_get(info->mti_pill, &RMF_DLM_REQ);
         remote_hdl = dlmreq->lock_handle[0];
 
-        spin_lock(&exp->exp_ldlm_data.led_lock);
-        list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) {
-                struct ldlm_lock *lock;
-                lock = list_entry(iter, struct ldlm_lock, l_export_chain);
-                if (lock == new_lock)
-                        continue;
-                if (lock->l_remote_handle.cookie == remote_hdl.cookie) {
+        lock = lustre_hash_lookup(exp->exp_lock_hash, &remote_hdl);
+        if (lock) {
+                if (lock != new_lock) {
                         lh->mlh_reg_lh.cookie = lock->l_handle.h_cookie;
                         lh->mlh_reg_mode = lock->l_granted_mode;
 
                         lh->mlh_reg_lh.cookie = lock->l_handle.h_cookie;
                         lh->mlh_reg_mode = lock->l_granted_mode;
 
-                        LDLM_DEBUG(lock, "restoring lock cookie");
+                        LDLM_DEBUG(lock, "Restoring lock cookie");
                         DEBUG_REQ(D_DLMTRACE, req,
                                   "restoring lock cookie "LPX64,
                                   lh->mlh_reg_lh.cookie);
                         if (old_lock)
                                 *old_lock = LDLM_LOCK_GET(lock);
                         DEBUG_REQ(D_DLMTRACE, req,
                                   "restoring lock cookie "LPX64,
                                   lh->mlh_reg_lh.cookie);
                         if (old_lock)
                                 *old_lock = LDLM_LOCK_GET(lock);
-                        spin_unlock(&exp->exp_ldlm_data.led_lock);
+                        lh_put(exp->exp_lock_hash, &lock->l_exp_hash);
                         return;
                 }
                         return;
                 }
+
+                lh_put(exp->exp_lock_hash, &lock->l_exp_hash);
         }
         }
-        spin_unlock(&exp->exp_ldlm_data.led_lock);
 
         /*
          * If the xid matches, then we know this is a resent request, and allow
 
         /*
          * If the xid matches, then we know this is a resent request, and allow
@@ -3040,12 +3033,6 @@ static int mdt_intent_policy(struct ldlm_namespace *ns,
                 if (it != NULL) {
                         const struct ldlm_request *dlmreq;
                         __u64 req_bits;
                 if (it != NULL) {
                         const struct ldlm_request *dlmreq;
                         __u64 req_bits;
-#if 0
-                        struct ldlm_lock       *lock = *lockp;
-
-                        LDLM_DEBUG(lock, "intent policy opc: %s\n",
-                                   ldlm_it2str(it->opc));
-#endif
 
                         rc = mdt_intent_opc(it->opc, info, lockp, flags);
                         if (rc == 0)
 
                         rc = mdt_intent_opc(it->opc, info, lockp, flags);
                         if (rc == 0)
@@ -4482,6 +4469,7 @@ static int mdt_obd_disconnect(struct obd_export *exp)
 static int mdt_init_export(struct obd_export *exp)
 {
         struct mdt_export_data *med = &exp->exp_mdt_data;
 static int mdt_init_export(struct obd_export *exp)
 {
         struct mdt_export_data *med = &exp->exp_mdt_data;
+        int                     rc;
         ENTRY;
 
         CFS_INIT_LIST_HEAD(&med->med_open_head);
         ENTRY;
 
         CFS_INIT_LIST_HEAD(&med->med_open_head);
@@ -4491,7 +4479,10 @@ static int mdt_init_export(struct obd_export *exp)
         spin_lock(&exp->exp_lock);
         exp->exp_connecting = 1;
         spin_unlock(&exp->exp_lock);
         spin_lock(&exp->exp_lock);
         exp->exp_connecting = 1;
         spin_unlock(&exp->exp_lock);
-        RETURN(0);
+        rc = ldlm_init_export(exp);
+        if (rc)
+                CERROR("Error %d while initializing export\n", rc);
+        RETURN(rc);
 }
 
 static int mdt_destroy_export(struct obd_export *export)
 }
 
 static int mdt_destroy_export(struct obd_export *export)
@@ -4512,6 +4503,7 @@ static int mdt_destroy_export(struct obd_export *export)
                 mdt_cleanup_idmap(med);
 
         target_destroy_export(export);
                 mdt_cleanup_idmap(med);
 
         target_destroy_export(export);
+        ldlm_destroy_export(export);
 
         if (obd_uuid_equals(&export->exp_client_uuid, &obd->obd_uuid))
                 RETURN(0);
 
         if (obd_uuid_equals(&export->exp_client_uuid, &obd->obd_uuid))
                 RETURN(0);
index afb3255..3208572 100644 (file)
@@ -694,11 +694,17 @@ out:
         RETURN(0);
 }
 
         RETURN(0);
 }
 
+static inline int mgs_init_export(struct obd_export *exp)
+{
+        return ldlm_init_export(exp);
+}
+
 static inline int mgs_destroy_export(struct obd_export *exp)
 {
         ENTRY;
 
         target_destroy_export(exp);
 static inline int mgs_destroy_export(struct obd_export *exp)
 {
         ENTRY;
 
         target_destroy_export(exp);
+        ldlm_destroy_export(exp);
 
         RETURN(0);
 }
 
         RETURN(0);
 }
@@ -808,6 +814,7 @@ static struct obd_ops mgs_obd_ops = {
         .o_setup           = mgs_setup,
         .o_precleanup      = mgs_precleanup,
         .o_cleanup         = mgs_cleanup,
         .o_setup           = mgs_setup,
         .o_precleanup      = mgs_precleanup,
         .o_cleanup         = mgs_cleanup,
+        .o_init_export     = mgs_init_export,
         .o_destroy_export  = mgs_destroy_export,
         .o_iocontrol       = mgs_iocontrol,
         .o_llog_init       = mgs_llog_init,
         .o_destroy_export  = mgs_destroy_export,
         .o_iocontrol       = mgs_iocontrol,
         .o_llog_init       = mgs_llog_init,
index 3eb7073..2a92609 100644 (file)
@@ -735,15 +735,12 @@ struct obd_export *class_new_export(struct obd_device *obd,
                 return ERR_PTR(-ENOMEM);
 
         export->exp_conn_cnt = 0;
                 return ERR_PTR(-ENOMEM);
 
         export->exp_conn_cnt = 0;
+        export->exp_lock_hash = NULL;
         atomic_set(&export->exp_refcount, 2);
         atomic_set(&export->exp_rpc_count, 0);
         export->exp_obd = obd;
         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
         atomic_set(&export->exp_refcount, 2);
         atomic_set(&export->exp_rpc_count, 0);
         export->exp_obd = obd;
         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
-        /* XXX this should be in LDLM init */
-        CFS_INIT_LIST_HEAD(&export->exp_ldlm_data.led_held_locks);
-        spin_lock_init(&export->exp_ldlm_data.led_lock);
-
         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
         class_handle_hash(&export->exp_handle, export_handle_addref);
         export->exp_last_request_time = cfs_time_current_sec();
         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
         class_handle_hash(&export->exp_handle, export_handle_addref);
         export->exp_last_request_time = cfs_time_current_sec();
index cef248d..a0e0912 100644 (file)
@@ -1352,6 +1352,16 @@ struct exp_uuid_cb_data {
         int                    *len;
 };
 
         int                    *len;
 };
 
+static void
+lprocfs_exp_rd_cb_data_init(struct exp_uuid_cb_data *cb_data, char *page,
+                            int count, int *eof, int *len)
+{
+        cb_data->page = page;
+        cb_data->count = count;
+        cb_data->eof = eof;
+        cb_data->len = len;
+}
+
 void lprocfs_exp_print_uuid(void *obj, void *cb_data)
 {
         struct obd_export *exp = (struct obd_export *)obj;
 void lprocfs_exp_print_uuid(void *obj, void *cb_data)
 {
         struct obd_export *exp = (struct obd_export *)obj;
@@ -1373,17 +1383,46 @@ int lprocfs_exp_rd_uuid(char *page, char **start, off_t off, int count,
 
         *eof = 1;
         page[0] = '\0';
 
         *eof = 1;
         page[0] = '\0';
-        LASSERT(obd != NULL);
-
-        cb_data.page = page;
-        cb_data.count = count;
-        cb_data.eof = eof;
-        cb_data.len = &len;
+        lprocfs_exp_rd_cb_data_init(&cb_data, page, count, eof, &len);
         lustre_hash_for_each_key(obd->obd_nid_hash, &stats->nid,
                                  lprocfs_exp_print_uuid, &cb_data);
         return (*cb_data.len);
 }
 
         lustre_hash_for_each_key(obd->obd_nid_hash, &stats->nid,
                                  lprocfs_exp_print_uuid, &cb_data);
         return (*cb_data.len);
 }
 
+void lprocfs_exp_print_hash(void *obj, void *cb_data)
+{
+        struct exp_uuid_cb_data *data = cb_data;
+        struct obd_export       *exp = obj;
+        lustre_hash_t           *lh;
+
+        lh = exp->exp_lock_hash;
+        if (lh) {
+                if (!*data->len)
+                        *data->len += lustre_hash_debug_header(data->page,
+                                                               data->count);
+
+                *data->len += lustre_hash_debug_str(lh, data->page + *data->len,
+                                                    data->count);
+        }
+}
+
+int lprocfs_exp_rd_hash(char *page, char **start, off_t off, int count,
+                        int *eof,  void *data)
+{
+        struct nid_stat *stats = (struct nid_stat *)data;
+        struct exp_uuid_cb_data cb_data;
+        struct obd_device *obd = stats->nid_obd;
+        int len = 0;
+
+        *eof = 1;
+        page[0] = '\0';
+        lprocfs_exp_rd_cb_data_init(&cb_data, page, count, eof, &len);
+
+        lustre_hash_for_each_key(obd->obd_nid_hash, &stats->nid,
+                                 lprocfs_exp_print_hash, &cb_data);
+        return (*cb_data.len);
+}
+
 int lprocfs_nid_stats_clear_read(char *page, char **start, off_t off,
                                         int count, int *eof,  void *data)
 {
 int lprocfs_nid_stats_clear_read(char *page, char **start, off_t off,
                                         int count, int *eof,  void *data)
 {
@@ -1504,6 +1543,11 @@ int lprocfs_exp_setup(struct obd_export *exp, lnet_nid_t *nid, int *newnid)
         if (rc)
                 CWARN("Error adding the uuid file\n");
 
         if (rc)
                 CWARN("Error adding the uuid file\n");
 
+        rc = lprocfs_add_simple(tmp->nid_proc, "hash",
+                                lprocfs_exp_rd_hash, NULL, tmp);
+        if (rc)
+                CWARN("Error adding the hash file\n");
+
         exp->exp_nid_stats = tmp;
         *newnid = 1;
         RETURN(rc);
         exp->exp_nid_stats = tmp;
         *newnid = 1;
         RETURN(rc);
index 11f0bef..f90afb1 100644 (file)
@@ -97,11 +97,17 @@ static int echo_disconnect(struct obd_export *exp)
         return class_disconnect(exp);
 }
 
         return class_disconnect(exp);
 }
 
+static int echo_init_export(struct obd_export *exp)
+{
+        return ldlm_init_export(exp);
+}
+
 static int echo_destroy_export(struct obd_export *exp)
 {
         ENTRY;
 
         target_destroy_export(exp);
 static int echo_destroy_export(struct obd_export *exp)
 {
         ENTRY;
 
         target_destroy_export(exp);
+        ldlm_destroy_export(exp);
 
         RETURN(0);
 }
 
         RETURN(0);
 }
@@ -539,6 +545,7 @@ static struct obd_ops echo_obd_ops = {
         .o_owner           = THIS_MODULE,
         .o_connect         = echo_connect,
         .o_disconnect      = echo_disconnect,
         .o_owner           = THIS_MODULE,
         .o_connect         = echo_connect,
         .o_disconnect      = echo_disconnect,
+        .o_init_export     = echo_init_export,
         .o_destroy_export  = echo_destroy_export,
         .o_create          = echo_create,
         .o_destroy         = echo_destroy,
         .o_destroy_export  = echo_destroy_export,
         .o_create          = echo_create,
         .o_destroy         = echo_destroy,
index e60e2b5..49f209e 100644 (file)
@@ -596,7 +596,7 @@ static int filter_init_export(struct obd_export *exp)
         exp->exp_connecting = 1;
         spin_unlock(&exp->exp_lock);
 
         exp->exp_connecting = 1;
         spin_unlock(&exp->exp_lock);
 
-        return 0;
+        return ldlm_init_export(exp);
 }
 
 static int filter_free_server_data(struct filter_obd *filter)
 }
 
 static int filter_free_server_data(struct filter_obd *filter)
@@ -2876,6 +2876,7 @@ static int filter_destroy_export(struct obd_export *exp)
          */
 
         target_destroy_export(exp);
          */
 
         target_destroy_export(exp);
+        ldlm_destroy_export(exp);
 
         if (obd_uuid_equals(&exp->exp_client_uuid, &exp->exp_obd->obd_uuid))
                 RETURN(0);
 
         if (obd_uuid_equals(&exp->exp_client_uuid, &exp->exp_obd->obd_uuid))
                 RETURN(0);