Whamcloud - gitweb
b=15226
authoryury <yury>
Wed, 4 Jun 2008 12:07:32 +0000 (12:07 +0000)
committeryury <yury>
Wed, 4 Jun 2008 12:07:32 +0000 (12:07 +0000)
r=shadow,nikita
- fixes issue with accessing freed @ns on behalf rpc handling when some late rpc comes in fini time;
- add new comments in functions, converts existing ones to be inline wigth doxygen comments policies.

14 files changed:
lustre/include/liblustre.h
lustre/include/lustre_dlm.h
lustre/include/obd.h
lustre/ldlm/ldlm_lib.c
lustre/ldlm/ldlm_pool.c
lustre/ldlm/ldlm_request.c
lustre/ldlm/ldlm_resource.c
lustre/mdt/mdt_handler.c
lustre/mgs/mgs_handler.c
lustre/obdclass/genops.c
lustre/obdclass/obd_config.c
lustre/obdecho/echo.c
lustre/obdfilter/filter.c
lustre/ptlrpc/client.c

index 388de36..e59581f 100644 (file)
@@ -281,7 +281,7 @@ typedef spinlock_t rwlock_t;
 #define read_unlock(l)          spin_unlock(l)
 #define write_lock(l)           spin_lock(l)
 #define write_unlock(l)         spin_unlock(l)
-
+#define rwlock_init(l)          spin_lock_init(l)
 
 #define min(x,y) ((x)<(y) ? (x) : (y))
 #define max(x,y) ((x)>(y) ? (x) : (y))
index b59c05a..5d2941c 100644 (file)
@@ -233,34 +233,71 @@ struct ldlm_pool_ops {
 #define LDLM_POOLS_FAST_SLV_CHANGE (50)
 
 struct ldlm_pool {
-        /* Common pool fields */
-        cfs_proc_dir_entry_t  *pl_proc_dir;      /* Pool proc directory. */
-        char                   pl_name[100];     /* Pool name, should be long 
-                                                  * enough to contain complex
-                                                  * proc entry name. */
-        spinlock_t             pl_lock;          /* Lock for protecting slv/clv 
-                                                  * updates. */
-        atomic_t               pl_limit;         /* Number of allowed locks in
-                                                  * in pool, both, client and 
-                                                  * server side. */
-        atomic_t               pl_granted;       /* Number of granted locks. */
-        atomic_t               pl_grant_rate;    /* Grant rate per T. */
-        atomic_t               pl_cancel_rate;   /* Cancel rate per T. */
-        atomic_t               pl_grant_speed;   /* Grant speed (GR-CR) per T. */
-        __u64                  pl_server_lock_volume; /* Server lock volume. 
-                                                  * Protected by pl_lock */
-        atomic_t               pl_lock_volume_factor; /* Lock volume factor. */
-
-        time_t                 pl_recalc_time;   /* Time when last slv from 
-                                                  * server was obtained. */
-        struct ldlm_pool_ops  *pl_ops;           /* Recalc and shrink ops. */ 
-
-        int                    pl_grant_plan;    /* Planned number of granted 
-                                                  * locks for next T. */
-        int                    pl_grant_step;    /* Grant plan step for next 
-                                                  * T. */
-
-        struct lprocfs_stats  *pl_stats;         /* Pool statistics. */
+        /** 
+         * Pool proc directory. 
+         */
+        cfs_proc_dir_entry_t  *pl_proc_dir;
+        /**
+         * Pool name, should be long enough to contain compound proc entry name.
+         */
+        char                   pl_name[100];
+        /** 
+         * Lock for protecting slv/clv updates. 
+         */
+        spinlock_t             pl_lock;
+        /**
+         * Number of allowed locks in in pool, both, client and server side. 
+         */
+        atomic_t               pl_limit;
+        /** 
+         * Number of granted locks in
+         */
+        atomic_t               pl_granted;
+        /** 
+         * Grant rate per T. 
+         */
+        atomic_t               pl_grant_rate;
+        /** 
+         * Cancel rate per T. 
+         */
+        atomic_t               pl_cancel_rate;
+        /** 
+         * Grant speed (GR-CR) per T. 
+         */
+        atomic_t               pl_grant_speed;
+        /** 
+         * Server lock volume. Protected by pl_lock.
+         */
+        __u64                  pl_server_lock_volume;
+        /** 
+         * Current biggest client lock volume. Protected by pl_lock.
+         */
+        __u64                  pl_client_lock_volume;
+        /** 
+         * Lock volume factor. SLV on client is calculated as following:
+         * server_slv * lock_volume_factor.
+         */
+        atomic_t               pl_lock_volume_factor;
+        /** 
+         * Time when last slv from server was obtained. 
+         */
+        time_t                 pl_recalc_time;
+        /**
+         * Recalc and shrink ops. 
+         */ 
+        struct ldlm_pool_ops  *pl_ops;
+        /**
+         * Planned number of granted locks for next T.
+         */
+        int                    pl_grant_plan;
+        /** 
+         * Grant plan step for next T. 
+         */
+        int                    pl_grant_step;
+        /** 
+         * Pool statistics. 
+         */
+        struct lprocfs_stats  *pl_stats;
 };
 
 typedef int (*ldlm_res_policy)(struct ldlm_namespace *, struct ldlm_lock **,
@@ -278,43 +315,89 @@ typedef enum {
         LDLM_NAMESPACE_MODEST = 1 << 1
 } ldlm_appetite_t;
 
-/* Default value for ->ns_shrink_thumb. If lock is not extent one its cost 
+/* 
+ * Default value for ->ns_shrink_thumb. If lock is not extent one its cost 
  * is one page. Here we have 256 pages which is 1M on i386. Thus by default
  * all extent locks which have more than 1M long extent will be kept in lru,
- * others (including ibits locks) will be canceled on memory pressure event. */
+ * others (including ibits locks) will be canceled on memory pressure event. 
+ */
 #define LDLM_LOCK_SHRINK_THUMB 256
 
-/* default values for the "max_nolock_size", "contention_time"
- * and "contended_locks" namespace tunables */
+/* 
+ * Default values for the "max_nolock_size", "contention_time" and 
+ * "contended_locks" namespace tunables. 
+ */
 #define NS_DEFAULT_MAX_NOLOCK_BYTES 0
 #define NS_DEFAULT_CONTENTION_SECONDS 2
 #define NS_DEFAULT_CONTENDED_LOCKS 32
 
 struct ldlm_namespace {
+        /**
+         * Namespace name. Used for logging, etc.
+         */
         char                  *ns_name;
-        ldlm_side_t            ns_client; /* is this a client-side lock tree? */
-        __u64                  ns_connect_flags; /* ns connect flags supported
-                                           * by server (may be changed via proc,
-                                           * lru resize may be disabled/enabled) */
-        __u64                  ns_orig_connect_flags; /* client side orig connect
-                                           * flags supported by server */
-        struct list_head      *ns_hash;   /* hash table for ns */
+
+        /** 
+         * Is this a client-side lock tree? 
+         */
+        ldlm_side_t            ns_client;
+
+        /** 
+         * Namespce connect flags supported by server (may be changed via proc,
+         * lru resize may be disabled/enabled).
+         */
+        __u64                  ns_connect_flags;
+
+         /** 
+          * Client side orig connect flags supported by server. 
+          */
+        __u64                  ns_orig_connect_flags;
+
+        /** 
+         * Hash table for namespace.
+         */
+        struct list_head      *ns_hash;
         spinlock_t             ns_hash_lock;
-        __u32                  ns_refcount; /* count of resources in the hash */
-        struct list_head       ns_root_list; /* all root resources in ns */
-        struct list_head       ns_list_chain; /* position in global NS list */
 
-        struct list_head       ns_unused_list; /* all root resources in ns */
+         /**
+          * Count of resources in the hash. 
+          */
+        __u32                  ns_refcount;
+
+         /** 
+          * All root resources in namespace. 
+          */
+        struct list_head       ns_root_list;
+
+        /** 
+         * Position in global namespace list.
+         */
+        struct list_head       ns_list_chain; 
+
+        /** 
+         * All root resources in namespace. 
+         */
+        struct list_head       ns_unused_list; 
         int                    ns_nr_unused;
         spinlock_t             ns_unused_lock;
 
         unsigned int           ns_max_unused;
         unsigned int           ns_max_age;
-        unsigned int           ns_ctime_age_limit; /* seconds */
+
+         /**
+          * Seconds.
+          */
+        unsigned int           ns_ctime_age_limit;
         
-        /* Lower limit to number of pages in lock to keep it in cache */
+        /** 
+         * Lower limit to number of pages in lock to keep it in cache.
+         */
         unsigned int           ns_shrink_thumb;
-        cfs_time_t             ns_next_dump;   /* next debug dump, jiffies */
+
+        /**
+         * Next debug dump, jiffies.
+         */
+        cfs_time_t             ns_next_dump;
 
         atomic_t               ns_locks;
         __u64                  ns_resources;
@@ -324,14 +407,28 @@ struct ldlm_namespace {
         cfs_waitq_t            ns_waitq;
         struct ldlm_pool       ns_pool;
         ldlm_appetite_t        ns_appetite;
-        /* if more than @ns_contented_locks found, the resource considered
-         * as contended */
+
+        /** 
+         * If more than @ns_contented_locks found, the resource considered
+         * as contended.
+         */
         unsigned               ns_contended_locks;
-        /* the resource remembers contended state during @ns_contention_time,
-         * in seconds */
+
+        /** 
+         * The resource remembers contended state during @ns_contention_time,
+         * in seconds.
+         */
         unsigned               ns_contention_time;
-        /* limit size of nolock requests, in bytes */
+
+        /** 
+         * Limit size of nolock requests, in bytes.
+         */
         unsigned               ns_max_nolock_size;
+
+        /**
+         * Backward link to obd, required for ldlm pool to store new SLV. 
+         */
+        struct obd_device     *ns_obd;
 };
 
 static inline int ns_is_client(struct ldlm_namespace *ns)
@@ -695,8 +792,9 @@ void ldlm_lock_dump_handle(int level, struct lustre_handle *);
 void ldlm_unlink_lock_skiplist(struct ldlm_lock *req);
 
 /* resource.c */
-struct ldlm_namespace *ldlm_namespace_new(char *name, ldlm_side_t client, 
-                                          ldlm_appetite_t apt);
+struct ldlm_namespace *
+ldlm_namespace_new(struct obd_device *obd, char *name, 
+                   ldlm_side_t client, ldlm_appetite_t apt);
 int ldlm_namespace_cleanup(struct ldlm_namespace *ns, int flags);
 void ldlm_namespace_free(struct ldlm_namespace *ns, 
                          struct obd_import *imp, int force);
@@ -847,9 +945,12 @@ int ldlm_pool_shrink(struct ldlm_pool *pl, int nr,
 void ldlm_pool_fini(struct ldlm_pool *pl);
 int ldlm_pool_setup(struct ldlm_pool *pl, int limit);
 int ldlm_pool_recalc(struct ldlm_pool *pl);
+__u32 ldlm_pool_get_lvf(struct ldlm_pool *pl);
 __u64 ldlm_pool_get_slv(struct ldlm_pool *pl);
+__u64 ldlm_pool_get_clv(struct ldlm_pool *pl);
 __u32 ldlm_pool_get_limit(struct ldlm_pool *pl);
 void ldlm_pool_set_slv(struct ldlm_pool *pl, __u64 slv);
+void ldlm_pool_set_clv(struct ldlm_pool *pl, __u64 clv);
 void ldlm_pool_set_limit(struct ldlm_pool *pl, __u32 limit);
 void ldlm_pool_add(struct ldlm_pool *pl, struct ldlm_lock *lock);
 void ldlm_pool_del(struct ldlm_pool *pl, struct ldlm_lock *lock);
index fd2a42b..188a022 100644 (file)
@@ -965,6 +965,13 @@ struct obd_device {
         struct lprocfs_stats  *obd_svc_stats;
         atomic_t               obd_evict_inprogress;
         cfs_waitq_t            obd_evict_inprogress_waitq;
+
+        /** 
+         * Ldlm pool part. Save last calculated SLV and Limit. 
+         */
+        rwlock_t               obd_pool_lock;
+        int                    obd_pool_limit;
+        __u64                  obd_pool_slv;
 };
 
 #define OBD_OPT_FORCE           0x0001
index 063f9d7..4ae8d06 100644 (file)
@@ -394,7 +394,7 @@ int client_connect_import(const struct lu_env *env,
 
         if (obd->obd_namespace != NULL)
                 CERROR("already have namespace!\n");
-        obd->obd_namespace = ldlm_namespace_new(obd->obd_name,
+        obd->obd_namespace = ldlm_namespace_new(obd, obd->obd_name,
                                                 LDLM_NAMESPACE_CLIENT,
                                                 LDLM_NAMESPACE_GREEDY);
         if (obd->obd_namespace == NULL)
@@ -1833,25 +1833,34 @@ static inline struct ldlm_pool *ldlm_exp2pl(struct obd_export *exp)
         return &exp->exp_obd->obd_namespace->ns_pool;
 }
 
+/**
+ * Packs current SLV and Limit into \a req.
+ */
 int target_pack_pool_reply(struct ptlrpc_request *req)
 {
-        struct ldlm_pool *pl;
+        struct obd_device *obd;
         ENTRY;
    
-        if (!req->rq_export || !req->rq_export->exp_obd ||
-            !exp_connect_lru_resize(req->rq_export)) {
+        /* 
+         * Check that we still have all structures alive as this may 
+         * be some late rpc in shutdown time.
+         */
+        if (unlikely(!req->rq_export || !req->rq_export->exp_obd ||
+                     !exp_connect_lru_resize(req->rq_export))) {
                 lustre_msg_set_slv(req->rq_repmsg, 0);
                 lustre_msg_set_limit(req->rq_repmsg, 0);
                 RETURN(0);
         }
 
-        pl = ldlm_exp2pl(req->rq_export);
+        /* 
+         * OBD is alive here as export is alive, which we checked above. 
+         */
+        obd = req->rq_export->exp_obd;
 
-        spin_lock(&pl->pl_lock);
-        LASSERT(ldlm_pool_get_slv(pl) != 0 && ldlm_pool_get_limit(pl) != 0);
-        lustre_msg_set_slv(req->rq_repmsg, ldlm_pool_get_slv(pl));
-        lustre_msg_set_limit(req->rq_repmsg, ldlm_pool_get_limit(pl));
-        spin_unlock(&pl->pl_lock);
+        read_lock(&obd->obd_pool_lock);
+        lustre_msg_set_slv(req->rq_repmsg, obd->obd_pool_slv);
+        lustre_msg_set_limit(req->rq_repmsg, obd->obd_pool_limit);
+        read_unlock(&obd->obd_pool_lock);
 
         RETURN(0);
 }
index cb96c8a..bd89cfa 100644 (file)
@@ -23,7 +23,8 @@
  *   license text for more details.
  */
 
-/* Idea of this code is rather simple. Each second, for each server namespace
+/* 
+ * Idea of this code is rather simple. Each second, for each server namespace
  * we have SLV - server lock volume which is calculated on current number of
  * granted locks, grant speed for past period, etc - that is, locking load.
  * This SLV number may be thought as a flow definition for simplicity. It is
 
 #ifdef HAVE_LRU_RESIZE_SUPPORT
 
-/* 50 ldlm locks for 1MB of RAM. */
+/*
+ * 50 ldlm locks for 1MB of RAM. 
+ */
 #define LDLM_POOL_HOST_L ((num_physpages >> (20 - CFS_PAGE_SHIFT)) * 50)
 
-/* Default step in % for grant plan. */
+/*
+ * Default step in % for grant plan. 
+ */
 #define LDLM_POOL_GSP (10)
 
-/* LDLM_POOL_GSP% of all locks is default GP. */
+/* 
+ * LDLM_POOL_GSP% of all locks is default GP. 
+ */
 #define LDLM_POOL_GP(L)   (((L) * LDLM_POOL_GSP) / 100)
 
-/* Max age for locks on clients. */
+/* 
+ * Max age for locks on clients. 
+ */
 #define LDLM_POOL_MAX_AGE (36000)
 
 #ifdef __KERNEL__
@@ -126,8 +135,10 @@ static inline __u64 dru(__u64 val, __u32 div)
 
 static inline __u64 ldlm_pool_slv_max(__u32 L)
 {
-        /* Allow to have all locks for 1 client for 10 hrs.
-         * Formula is the following: limit * 10h / 1 client. */
+        /*
+         * Allow to have all locks for 1 client for 10 hrs.
+         * Formula is the following: limit * 10h / 1 client. 
+         */
         __u64 lim = L *  LDLM_POOL_MAX_AGE / 1;
         return lim;
 }
@@ -158,7 +169,11 @@ static inline struct ldlm_namespace *ldlm_pl2ns(struct ldlm_pool *pl)
         return container_of(pl, struct ldlm_namespace, ns_pool);
 }
 
-/* Should be called under ->pl_lock taken */
+/**
+ * Recalculates next grant limit on passed \a pl.
+ *
+ * \pre ->pl_lock is locked. 
+ */
 static inline void ldlm_pool_recalc_grant_plan(struct ldlm_pool *pl)
 {
         int granted, grant_step, limit;
@@ -170,14 +185,18 @@ static inline void ldlm_pool_recalc_grant_plan(struct ldlm_pool *pl)
         pl->pl_grant_plan = granted + grant_step;
 }
 
-/* Should be called under ->pl_lock taken */
+/**
+ * Recalculates next SLV on passed \a pl.
+ *
+ * \pre ->pl_lock is locked. 
+ */
 static inline void ldlm_pool_recalc_slv(struct ldlm_pool *pl)
 {
         int grant_usage, granted, grant_plan;
         __u64 slv, slv_factor;
         __u32 limit;
 
-        slv = ldlm_pool_get_slv(pl);
+        slv = pl->pl_server_lock_volume;
         grant_plan = pl->pl_grant_plan;
         limit = ldlm_pool_get_limit(pl);
         granted = atomic_read(&pl->pl_granted);
@@ -186,12 +205,14 @@ static inline void ldlm_pool_recalc_slv(struct ldlm_pool *pl)
         if (grant_usage <= 0)
                 grant_usage = 1;
 
-        /* Find out SLV change factor which is the ratio of grant usage 
+        /* 
+         * Find out SLV change factor which is the ratio of grant usage 
          * from limit. SLV changes as fast as the ratio of grant plan 
          * consumtion. The more locks from grant plan are not consumed 
          * by clients in last interval (idle time), the faster grows 
          * SLV. And the opposite, the more grant plan is over-consumed
-         * (load time) the faster drops SLV. */
+         * (load time) the faster drops SLV. 
+         */
         slv_factor = (grant_usage * 100) / limit;
         if (2 * abs(granted - limit) > limit) {
                 slv_factor *= slv_factor;
@@ -206,13 +227,18 @@ static inline void ldlm_pool_recalc_slv(struct ldlm_pool *pl)
                 slv = ldlm_pool_slv_min(limit);
         }
 
-        ldlm_pool_set_slv(pl, slv);
+        pl->pl_server_lock_volume = slv;
 }
 
+/**
+ * Recalculates next stats on passed \a pl.
+ *
+ * \pre ->pl_lock is locked. 
+ */
 static inline void ldlm_pool_recalc_stats(struct ldlm_pool *pl)
 {
-        __u64 slv = ldlm_pool_get_slv(pl);
         int grant_plan = pl->pl_grant_plan;
+        __u64 slv = pl->pl_server_lock_volume;
         int granted = atomic_read(&pl->pl_granted);
         int grant_rate = atomic_read(&pl->pl_grant_rate);
         int cancel_rate = atomic_read(&pl->pl_cancel_rate);
@@ -229,6 +255,32 @@ static inline void ldlm_pool_recalc_stats(struct ldlm_pool *pl)
                             cancel_rate);
 }
 
+/**
+ * Sets current SLV into obd accessible via ldlm_pl2ns(pl)->ns_obd.
+ */
+static void ldlm_srv_pool_push_slv(struct ldlm_pool *pl)
+{
+        struct obd_device *obd;
+
+        /* 
+         * Set new SLV in obd field for using it later without accessing the
+         * pool. This is required to avoid race between sending reply to client
+         * with new SLV and cleanup server stack in which we can't guarantee
+         * that namespace is still alive. We know only that obd is alive as
+         * long as valid export is alive. 
+         */
+        obd = ldlm_pl2ns(pl)->ns_obd;
+        LASSERT(obd != NULL);
+        write_lock(&obd->obd_pool_lock);
+        obd->obd_pool_slv = pl->pl_server_lock_volume;
+        write_unlock(&obd->obd_pool_lock);
+}
+
+/**
+ * Recalculates all pool fields on passed \a pl.
+ *
+ * \pre ->pl_lock is not locked. 
+ */
 static int ldlm_srv_pool_recalc(struct ldlm_pool *pl)
 {
         time_t recalc_interval_sec;
@@ -237,17 +289,30 @@ static int ldlm_srv_pool_recalc(struct ldlm_pool *pl)
         spin_lock(&pl->pl_lock);
         recalc_interval_sec = cfs_time_current_sec() - pl->pl_recalc_time;
         if (recalc_interval_sec > 0) {
-                /* Update statistics */
+                /* 
+                 * Update statistics.
+                 */
                 ldlm_pool_recalc_stats(pl);
 
-                /* Recalc SLV after last period. This should be done
-                 * _before_ recalculating new grant plan. */
+                /* 
+                 * Recalc SLV after last period. This should be done
+                 * _before_ recalculating new grant plan. 
+                 */
                 ldlm_pool_recalc_slv(pl);
-
-                /* Update grant_plan for new period. */
+                
+                /* 
+                 * Make sure that pool informed obd of last SLV changes. 
+                 */
+                ldlm_srv_pool_push_slv(pl);
+
+                /* 
+                 * Update grant_plan for new period. 
+                 */
                 ldlm_pool_recalc_grant_plan(pl);
 
-                /* Zero out all rates and speed for the last period. */
+                /* 
+                 * Zero out all rates and speed for the last period. 
+                 */
                 atomic_set(&pl->pl_grant_rate, 0);
                 atomic_set(&pl->pl_cancel_rate, 0);
                 atomic_set(&pl->pl_grant_speed, 0);
@@ -259,26 +324,37 @@ static int ldlm_srv_pool_recalc(struct ldlm_pool *pl)
         RETURN(0);
 }
 
-/* Our goal here is to decrease SLV the way to make a client hold
- * @nr locks smaller in next 10h. */
+/**
+ * This function is used on server side as main entry point for memory
+ * preasure handling. It decreases SLV on \a pl according to passed
+ * \a nr and \a gfp_mask.
+ * 
+ * Our goal here is to decrease SLV such a way that clients hold \a nr
+ * locks smaller in next 10h. 
+ */
 static int ldlm_srv_pool_shrink(struct ldlm_pool *pl,
                                 int nr, unsigned int gfp_mask)
 {
         __u32 limit;
         ENTRY;
 
-        /* VM is asking how many entries may be potentially freed. */
+        /* 
+         * VM is asking how many entries may be potentially freed. 
+         */
         if (nr == 0)
                 RETURN(atomic_read(&pl->pl_granted));
 
-        /* Client already canceled locks but server is already in shrinker
-         * and can't cancel anything. Let's catch this race. */
+        /* 
+         * Client already canceled locks but server is already in shrinker
+         * and can't cancel anything. Let's catch this race. 
+         */
         if (atomic_read(&pl->pl_granted) == 0)
                 RETURN(0);
 
         spin_lock(&pl->pl_lock);
 
-        /* We want shrinker to possibly cause cancelation of @nr locks from
+        /* 
+         * We want shrinker to possibly cause cancelation of @nr locks from
          * clients or grant approximately @nr locks smaller next intervals.
          *
          * This is why we decresed SLV by @nr. This effect will only be as
@@ -287,27 +363,69 @@ static int ldlm_srv_pool_shrink(struct ldlm_pool *pl,
          * interval pool will either increase SLV if locks load is not high
          * or will keep on same level or even decrease again, thus, shrinker
          * decreased SLV will affect next recalc intervals and this way will
-         * make locking load lower. */
-        if (nr < ldlm_pool_get_slv(pl)) {
-                ldlm_pool_set_slv(pl, ldlm_pool_get_slv(pl) - nr);
+         * make locking load lower. 
+         */
+        if (nr < pl->pl_server_lock_volume) {
+                pl->pl_server_lock_volume = pl->pl_server_lock_volume - nr;
         } else {
                 limit = ldlm_pool_get_limit(pl);
-                ldlm_pool_set_slv(pl, ldlm_pool_slv_min(limit));
+                pl->pl_server_lock_volume = ldlm_pool_slv_min(limit);
         }
+
+        /* 
+         * Make sure that pool informed obd of last SLV changes. 
+         */
+        ldlm_srv_pool_push_slv(pl);
         spin_unlock(&pl->pl_lock);
 
-        /* We did not really free any memory here so far, it only will be
-         * freed later may be, so that we return 0 to not confuse VM. */
+        /* 
+         * We did not really free any memory here so far, it only will be
+         * freed later may be, so that we return 0 to not confuse VM. 
+         */
         RETURN(0);
 }
 
+/**
+ * Setup server side pool \a pl with passed \a limit.
+ */
 static int ldlm_srv_pool_setup(struct ldlm_pool *pl, int limit)
 {
+        struct obd_device *obd;
         ENTRY;
+        
+        obd = ldlm_pl2ns(pl)->ns_obd;
+        LASSERT(obd != NULL && obd != LP_POISON);
+        LASSERT(obd->obd_type != LP_POISON);
+        write_lock(&obd->obd_pool_lock);
+        obd->obd_pool_limit = limit;
+        write_unlock(&obd->obd_pool_lock);
+
         ldlm_pool_set_limit(pl, limit);
         RETURN(0);
 }
 
+/**
+ * Sets SLV and Limit from ldlm_pl2ns(pl)->ns_obd tp passed \a pl.
+ */
+static void ldlm_cli_pool_pop_slv(struct ldlm_pool *pl)
+{
+        struct obd_device *obd;
+
+        /* 
+         * Get new SLV and Limit from obd which is updated with comming 
+         * RPCs. 
+         */
+        obd = ldlm_pl2ns(pl)->ns_obd;
+        LASSERT(obd != NULL);
+        read_lock(&obd->obd_pool_lock);
+        pl->pl_server_lock_volume = obd->obd_pool_slv;
+        ldlm_pool_set_limit(pl, obd->obd_pool_limit);
+        read_unlock(&obd->obd_pool_lock);
+}
+
+/**
+ * Recalculates client sise pool \a pl according to current SLV and Limit.
+ */
 static int ldlm_cli_pool_recalc(struct ldlm_pool *pl)
 {
         time_t recalc_interval_sec;
@@ -315,12 +433,21 @@ static int ldlm_cli_pool_recalc(struct ldlm_pool *pl)
 
         spin_lock(&pl->pl_lock);
 
+        /* 
+         * Make sure that pool knows last SLV and Limit from obd. 
+         */
+        ldlm_cli_pool_pop_slv(pl);
+
         recalc_interval_sec = cfs_time_current_sec() - pl->pl_recalc_time;
         if (recalc_interval_sec > 0) {
-                /* Update statistics only every T */
+                /* 
+                 * Update statistics only every T. 
+                 */
                 ldlm_pool_recalc_stats(pl);
 
-                /* Zero out grant/cancel rates and speed for last period. */
+                /* 
+                 * Zero out grant/cancel rates and speed for last period. 
+                 */
                 atomic_set(&pl->pl_grant_rate, 0);
                 atomic_set(&pl->pl_cancel_rate, 0);
                 atomic_set(&pl->pl_grant_speed, 0);
@@ -330,34 +457,54 @@ static int ldlm_cli_pool_recalc(struct ldlm_pool *pl)
         }
         spin_unlock(&pl->pl_lock);
 
-        /* Do not cancel locks in case lru resize is disabled for this ns */
+        /* 
+         * Do not cancel locks in case lru resize is disabled for this ns. 
+         */
         if (!ns_connect_lru_resize(ldlm_pl2ns(pl)))
                 RETURN(0);
 
-        /* In the time of canceling locks on client we do not need to maintain
+        /* 
+         * In the time of canceling locks on client we do not need to maintain
          * sharp timing, we only want to cancel locks asap according to new SLV.
-         * This may be called when SLV has changed much, this is why we do not
-         * take into account pl->pl_recalc_time here. */
+         * It may be called when SLV has changed much, this is why we do not
+         * take into account pl->pl_recalc_time here. 
+         */
         RETURN(ldlm_cancel_lru(ldlm_pl2ns(pl), 0, LDLM_ASYNC, 
                                LDLM_CANCEL_LRUR));
 }
 
+/**
+ * This function is main entry point for memory preasure handling on client side.
+ * Main goal of this function is to cancel some number of locks on passed \a pl
+ * according to \a nr and \a gfp_mask.
+ */
 static int ldlm_cli_pool_shrink(struct ldlm_pool *pl,
                                 int nr, unsigned int gfp_mask)
 {
         ENTRY;
         
-        /* Do not cancel locks in case lru resize is disabled for this ns */
+        /* 
+         * Do not cancel locks in case lru resize is disabled for this ns. 
+         */
         if (!ns_connect_lru_resize(ldlm_pl2ns(pl)))
                 RETURN(0);
 
-        /* Find out how many locks may be released according to shrink 
-         * policy. */
+        /* 
+         * Make sure that pool knows last SLV and Limit from obd. 
+         */
+        ldlm_cli_pool_pop_slv(pl);
+
+        /* 
+         * Find out how many locks may be released according to shrink 
+         * policy. 
+         */
         if (nr == 0)
                 RETURN(ldlm_cancel_lru_estimate(ldlm_pl2ns(pl), 0, 0, 
                                                 LDLM_CANCEL_SHRINK));
 
-        /* Cancel @nr locks accoding to shrink policy */
+        /* 
+         * Cancel @nr locks accoding to shrink policy. 
+         */
         RETURN(ldlm_cancel_lru(ldlm_pl2ns(pl), nr, LDLM_SYNC, 
                                LDLM_CANCEL_SHRINK));
 }
@@ -373,6 +520,10 @@ struct ldlm_pool_ops ldlm_cli_pool_ops = {
         .po_shrink = ldlm_cli_pool_shrink
 };
 
+/**
+ * Pool recalc wrapper. Will call either client or server pool recalc callback
+ * depending what pool \a pl is used.
+ */
 int ldlm_pool_recalc(struct ldlm_pool *pl)
 {
         int count;
@@ -387,6 +538,10 @@ int ldlm_pool_recalc(struct ldlm_pool *pl)
 }
 EXPORT_SYMBOL(ldlm_pool_recalc);
 
+/**
+ * Pool shrink wrapper. Will call either client or server pool recalc callback
+ * depending what pool \a pl is used.
+ */
 int ldlm_pool_shrink(struct ldlm_pool *pl, int nr,
                      unsigned int gfp_mask)
 {
@@ -409,8 +564,12 @@ int ldlm_pool_shrink(struct ldlm_pool *pl, int nr,
 }
 EXPORT_SYMBOL(ldlm_pool_shrink);
 
-/* The purpose of this function is to re-setup limit and maximal allowed
- * slv according to the passed limit. */
+/**
+ * Pool setup wrapper. Will call either client or server pool recalc callback
+ * depending what pool \a pl is used.
+ *
+ * Sets passed \a limit into pool \a pl.
+ */
 int ldlm_pool_setup(struct ldlm_pool *pl, int limit)
 {
         ENTRY;
@@ -427,11 +586,12 @@ static int lprocfs_rd_pool_state(char *page, char **start, off_t off,
         int granted, grant_rate, cancel_rate, grant_step;
         int nr = 0, grant_speed, grant_plan;
         struct ldlm_pool *pl = data;
+        __u64 slv, clv;
         __u32 limit;
-        __u64 slv;
 
         spin_lock(&pl->pl_lock);
-        slv = ldlm_pool_get_slv(pl);
+        slv = pl->pl_server_lock_volume;
+        clv = pl->pl_client_lock_volume;
         limit = ldlm_pool_get_limit(pl);
         grant_plan = pl->pl_grant_plan;
         grant_step = pl->pl_grant_step;
@@ -444,6 +604,7 @@ static int lprocfs_rd_pool_state(char *page, char **start, off_t off,
         nr += snprintf(page + nr, count - nr, "LDLM pool state (%s):\n",
                        pl->pl_name);
         nr += snprintf(page + nr, count - nr, "  SLV: "LPU64"\n", slv);
+        nr += snprintf(page + nr, count - nr, "  CLV: "LPU64"\n", clv);
 
         nr += snprintf(page + nr, count - nr, "  LVF: %d\n",
                        atomic_read(&pl->pl_lock_volume_factor));
@@ -639,13 +800,13 @@ int ldlm_pool_init(struct ldlm_pool *pl, struct ldlm_namespace *ns,
         if (client == LDLM_NAMESPACE_SERVER) {
                 pl->pl_ops = &ldlm_srv_pool_ops;
                 ldlm_pool_set_limit(pl, LDLM_POOL_HOST_L);
-                ldlm_pool_set_slv(pl, ldlm_pool_slv_max(LDLM_POOL_HOST_L));
+                pl->pl_server_lock_volume = ldlm_pool_slv_max(LDLM_POOL_HOST_L);
         } else {
-                ldlm_pool_set_slv(pl, 1);
+                pl->pl_server_lock_volume = 1;
                 ldlm_pool_set_limit(pl, 1);
                 pl->pl_ops = &ldlm_cli_pool_ops;
         }
-
+        pl->pl_client_lock_volume = 0;
         rc = ldlm_pool_proc_init(pl);
         if (rc)
                 RETURN(rc);
@@ -660,17 +821,28 @@ void ldlm_pool_fini(struct ldlm_pool *pl)
 {
         ENTRY;
         ldlm_pool_proc_fini(pl);
-        pl->pl_ops = NULL;
+        
+        /* 
+         * Pool should not be used after this point. We can't free it here as
+         * it lives in struct ldlm_namespace, but still interested in catching
+         * any abnormal using cases.
+         */
+        POISON(pl, 0x5a, sizeof(*pl));
         EXIT;
 }
 EXPORT_SYMBOL(ldlm_pool_fini);
 
+/**
+ * Add new taken ldlm lock \a lock into pool \a pl accounting.
+ */
 void ldlm_pool_add(struct ldlm_pool *pl, struct ldlm_lock *lock)
 {
-        /* FLOCK locks are special in a sense that they are almost never
+        /* 
+         * FLOCK locks are special in a sense that they are almost never
          * cancelled, instead special kind of lock is used to drop them.
          * also there is no LRU for flock locks, so no point in tracking
-         * them anyway */
+         * them anyway. 
+         */
         if (lock->l_resource->lr_type == LDLM_FLOCK)
                 return;
 
@@ -682,18 +854,26 @@ void ldlm_pool_add(struct ldlm_pool *pl, struct ldlm_lock *lock)
 
         lprocfs_counter_incr(pl->pl_stats, LDLM_POOL_GRANT_STAT);
  
-        /* Do not do pool recalc for client side as all locks which
+        /* 
+         * Do not do pool recalc for client side as all locks which
          * potentially may be canceled has already been packed into 
          * enqueue/cancel rpc. Also we do not want to run out of stack
-         * with too long call paths. */
+         * with too long call paths. 
+         */
         if (ns_is_server(ldlm_pl2ns(pl)))
                 ldlm_pool_recalc(pl);
         EXIT;
 }
 EXPORT_SYMBOL(ldlm_pool_add);
 
+/**
+ * Remove ldlm lock \a lock from pool \a pl accounting.
+ */
 void ldlm_pool_del(struct ldlm_pool *pl, struct ldlm_lock *lock)
 {
+        /*
+         * Filter out FLOCK locks. Read above comment in ldlm_pool_add().
+         */
         if (lock->l_resource->lr_type == LDLM_FLOCK)
                 return;
         ENTRY;
@@ -710,33 +890,89 @@ void ldlm_pool_del(struct ldlm_pool *pl, struct ldlm_lock *lock)
 }
 EXPORT_SYMBOL(ldlm_pool_del);
 
-/* ->pl_lock should be taken. */
+/**
+ * Returns current \a pl SLV.
+ *
+ * \pre ->pl_lock is not locked. 
+ */
 __u64 ldlm_pool_get_slv(struct ldlm_pool *pl)
 {
-        return pl->pl_server_lock_volume;
+        __u64 slv;
+        spin_lock(&pl->pl_lock);
+        slv = pl->pl_server_lock_volume;
+        spin_unlock(&pl->pl_lock);
+        return slv;
 }
 EXPORT_SYMBOL(ldlm_pool_get_slv);
 
-/* ->pl_lock should be taken. */
+/**
+ * Sets passed \a slv to \a pl.
+ *
+ * \pre ->pl_lock is not locked. 
+ */
 void ldlm_pool_set_slv(struct ldlm_pool *pl, __u64 slv)
 {
+        spin_lock(&pl->pl_lock);
         pl->pl_server_lock_volume = slv;
+        spin_unlock(&pl->pl_lock);
 }
 EXPORT_SYMBOL(ldlm_pool_set_slv);
 
+/**
+ * Returns current \a pl CLV.
+ *
+ * \pre ->pl_lock is not locked. 
+ */
+__u64 ldlm_pool_get_clv(struct ldlm_pool *pl)
+{
+        __u64 slv;
+        spin_lock(&pl->pl_lock);
+        slv = pl->pl_client_lock_volume;
+        spin_unlock(&pl->pl_lock);
+        return slv;
+}
+EXPORT_SYMBOL(ldlm_pool_get_clv);
+
+/**
+ * Sets passed \a clv to \a pl.
+ *
+ * \pre ->pl_lock is not locked. 
+ */
+void ldlm_pool_set_clv(struct ldlm_pool *pl, __u64 clv)
+{
+        spin_lock(&pl->pl_lock);
+        pl->pl_client_lock_volume = clv;
+        spin_unlock(&pl->pl_lock);
+}
+EXPORT_SYMBOL(ldlm_pool_set_clv);
+
+/**
+ * Returns current \a pl limit.
+ */
 __u32 ldlm_pool_get_limit(struct ldlm_pool *pl)
 {
         return atomic_read(&pl->pl_limit);
 }
 EXPORT_SYMBOL(ldlm_pool_get_limit);
 
+/**
+ * Sets passed \a limit to \a pl.
+ */
 void ldlm_pool_set_limit(struct ldlm_pool *pl, __u32 limit)
 {
         atomic_set(&pl->pl_limit, limit);
 }
 EXPORT_SYMBOL(ldlm_pool_set_limit);
 
-/* Server side is only enabled for kernel space for now. */
+/**
+ * Returns current LVF from \a pl.
+ */
+__u32 ldlm_pool_get_lvf(struct ldlm_pool *pl)
+{
+        return atomic_read(&pl->pl_lock_volume_factor);
+}
+EXPORT_SYMBOL(ldlm_pool_get_lvf);
+
 #ifdef __KERNEL__
 static int ldlm_pool_granted(struct ldlm_pool *pl)
 {
@@ -759,9 +995,11 @@ void ldlm_pools_wakeup(void)
 }
 EXPORT_SYMBOL(ldlm_pools_wakeup);
 
-/* Cancel @nr locks from all namespaces (if possible). Returns number of
+/* 
+ * Cancel \a nr locks from all namespaces (if possible). Returns number of
  * cached locks after shrink is finished. All namespaces are asked to
- * cancel approximately equal amount of locks. */
+ * cancel approximately equal amount of locks to keep balancing.
+ */
 static int ldlm_pools_shrink(ldlm_side_t client, int nr, 
                              unsigned int gfp_mask)
 {
@@ -771,10 +1009,12 @@ static int ldlm_pools_shrink(ldlm_side_t client, int nr,
         if (nr != 0 && !(gfp_mask & __GFP_FS))
                 return -1;
 
-        CDEBUG(D_DLMTRACE, "request to shrink %d %s locks from all pools\n",
+        CDEBUG(D_DLMTRACE, "Request to shrink %d %s locks from all pools\n",
                nr, client == LDLM_NAMESPACE_CLIENT ? "client" : "server");
 
-        /* Find out how many resources we may release. */
+        /* 
+         * Find out how many resources we may release. 
+         */
         for (nr_ns = atomic_read(ldlm_namespace_nr(client)); 
              nr_ns > 0; nr_ns--) 
         {
@@ -794,19 +1034,25 @@ static int ldlm_pools_shrink(ldlm_side_t client, int nr,
         if (nr == 0 || total == 0)
                 return total;
 
-        /* Shrink at least ldlm_namespace_nr(client) namespaces. */
+        /* 
+         * Shrink at least ldlm_namespace_nr(client) namespaces. 
+         */
         for (nr_ns = atomic_read(ldlm_namespace_nr(client)); 
              nr_ns > 0; nr_ns--) 
         {
                 int cancel, nr_locks;
 
-                /* Do not call shrink under ldlm_namespace_lock(client) */
+                /* 
+                 * Do not call shrink under ldlm_namespace_lock(client) 
+                 */
                 mutex_down(ldlm_namespace_lock(client));
                 if (list_empty(ldlm_namespace_list(client))) {
                         mutex_up(ldlm_namespace_lock(client));
-                        /* If list is empty, we can't return any @cached > 0,
+                        /* 
+                         * If list is empty, we can't return any @cached > 0,
                          * that probably would cause needless shrinker
-                         * call. */
+                         * call. 
+                         */
                         cached = 0;
                         break;
                 }
@@ -840,9 +1086,13 @@ void ldlm_pools_recalc(ldlm_side_t client)
         struct ldlm_namespace *ns;
         int nr, equal = 0;
 
-        /* No need to setup pool limit for client pools. */
+        /* 
+         * No need to setup pool limit for client pools.
+         */
         if (client == LDLM_NAMESPACE_SERVER) {
-                /* Check all modest namespaces first. */
+                /* 
+                 * Check all modest namespaces first. 
+                 */
                 mutex_down(ldlm_namespace_lock(client));
                 list_for_each_entry(ns, ldlm_namespace_list(client), 
                                     ns_list_chain) 
@@ -854,16 +1104,20 @@ void ldlm_pools_recalc(ldlm_side_t client)
                         if (l == 0)
                                 l = 1;
 
-                        /* Set the modest pools limit equal to their avg granted
-                         * locks + 5%. */
+                        /* 
+                         * Set the modest pools limit equal to their avg granted
+                         * locks + 5%. 
+                         */
                         l += dru(l * LDLM_POOLS_MODEST_MARGIN, 100);
                         ldlm_pool_setup(&ns->ns_pool, l);
                         nr_l += l;
                         nr_p++;
                 }
 
-                /* Make sure that modest namespaces did not eat more that 2/3 
-                 * of limit */
+                /* 
+                 * Make sure that modest namespaces did not eat more that 2/3 
+                 * of limit. 
+                 */
                 if (nr_l >= 2 * (LDLM_POOL_HOST_L / 3)) {
                         CWARN("\"Modest\" pools eat out 2/3 of server locks "
                               "limit (%d of %lu). This means that you have too "
@@ -872,7 +1126,9 @@ void ldlm_pools_recalc(ldlm_side_t client)
                         equal = 1;
                 }
 
-                /* The rest is given to greedy namespaces. */
+                /* 
+                 * The rest is given to greedy namespaces. 
+                 */
                 list_for_each_entry(ns, ldlm_namespace_list(client), 
                                     ns_list_chain) 
                 {
@@ -880,14 +1136,18 @@ void ldlm_pools_recalc(ldlm_side_t client)
                                 continue;
 
                         if (equal) {
-                                /* In the case 2/3 locks are eaten out by
+                                /* 
+                                 * In the case 2/3 locks are eaten out by
                                  * modest pools, we re-setup equal limit
-                                 * for _all_ pools. */
+                                 * for _all_ pools. 
+                                 */
                                 l = LDLM_POOL_HOST_L /
                                         atomic_read(ldlm_namespace_nr(client));
                         } else {
-                                /* All the rest of greedy pools will have
-                                 * all locks in equal parts.*/
+                                /* 
+                                 * All the rest of greedy pools will have
+                                 * all locks in equal parts.
+                                 */
                                 l = (LDLM_POOL_HOST_L - nr_l) /
                                         (atomic_read(ldlm_namespace_nr(client)) -
                                          nr_p);
@@ -897,13 +1157,17 @@ void ldlm_pools_recalc(ldlm_side_t client)
                 mutex_up(ldlm_namespace_lock(client));
         }
 
-        /* Recalc at least ldlm_namespace_nr(client) namespaces. */
+        /* 
+         * Recalc at least ldlm_namespace_nr(client) namespaces. 
+         */
         for (nr = atomic_read(ldlm_namespace_nr(client)); nr > 0; nr--) {
-                /* Lock the list, get first @ns in the list, getref, move it
+                /* 
+                 * Lock the list, get first @ns in the list, getref, move it
                  * to the tail, unlock and call pool recalc. This way we avoid
                  * calling recalc under @ns lock what is really good as we get
                  * rid of potential deadlock on client nodes when canceling
-                 * locks synchronously. */
+                 * locks synchronously. 
+                 */
                 mutex_down(ldlm_namespace_lock(client));
                 if (list_empty(ldlm_namespace_list(client))) {
                         mutex_up(ldlm_namespace_lock(client));
@@ -914,7 +1178,9 @@ void ldlm_pools_recalc(ldlm_side_t client)
                 ldlm_namespace_move_locked(ns, client);
                 mutex_up(ldlm_namespace_lock(client));
 
-                /* After setup is done - recalc the pool. */
+                /* 
+                 * After setup is done - recalc the pool. 
+                 */
                 ldlm_pool_recalc(&ns->ns_pool);
                 ldlm_namespace_put(ns, 1);
         }
@@ -937,12 +1203,16 @@ static int ldlm_pools_thread_main(void *arg)
         while (1) {
                 struct l_wait_info lwi;
 
-                /* Recal all pools on this tick. */
+                /*
+                 * Recal all pools on this tick. 
+                 */
                 ldlm_pools_recalc(LDLM_NAMESPACE_SERVER);
                 ldlm_pools_recalc(LDLM_NAMESPACE_CLIENT);
                 
-                /* Wait until the next check time, or until we're
-                 * stopped. */
+                /*
+                 * Wait until the next check time, or until we're
+                 * stopped. 
+                 */
                 lwi = LWI_TIMEOUT(cfs_time_seconds(LDLM_POOLS_THREAD_PERIOD),
                                   NULL, NULL);
                 l_wait_event(thread->t_ctl_waitq, (thread->t_flags &
@@ -982,8 +1252,10 @@ static int ldlm_pools_thread_start(void)
         init_completion(&ldlm_pools_comp);
         cfs_waitq_init(&ldlm_pools_thread->t_ctl_waitq);
 
-        /* CLONE_VM and CLONE_FILES just avoid a needless copy, because we
-         * just drop the VM and FILES in ptlrpc_daemonize() right away. */
+        /* 
+         * CLONE_VM and CLONE_FILES just avoid a needless copy, because we
+         * just drop the VM and FILES in ptlrpc_daemonize() right away. 
+         */
         rc = cfs_kernel_thread(ldlm_pools_thread_main, ldlm_pools_thread,
                                CLONE_VM | CLONE_FILES);
         if (rc < 0) {
@@ -1010,9 +1282,11 @@ static void ldlm_pools_thread_stop(void)
         ldlm_pools_thread->t_flags = SVC_STOPPING;
         cfs_waitq_signal(&ldlm_pools_thread->t_ctl_waitq);
 
-        /* Make sure that pools thread is finished before freeing @thread.
+        /* 
+         * Make sure that pools thread is finished before freeing @thread.
          * This fixes possible race and oops due to accessing freed memory
-         * in pools thread. */
+         * in pools thread. 
+         */
         wait_for_completion(&ldlm_pools_comp);
         OBD_FREE_PTR(ldlm_pools_thread);
         ldlm_pools_thread = NULL;
@@ -1107,6 +1381,18 @@ void ldlm_pool_set_slv(struct ldlm_pool *pl, __u64 slv)
 }
 EXPORT_SYMBOL(ldlm_pool_set_slv);
 
+__u64 ldlm_pool_get_clv(struct ldlm_pool *pl)
+{
+        return 1;
+}
+EXPORT_SYMBOL(ldlm_pool_get_clv);
+
+void ldlm_pool_set_clv(struct ldlm_pool *pl, __u64 clv)
+{
+        return;
+}
+EXPORT_SYMBOL(ldlm_pool_set_clv);
+
 __u32 ldlm_pool_get_limit(struct ldlm_pool *pl)
 {
         return 0;
@@ -1119,6 +1405,12 @@ void ldlm_pool_set_limit(struct ldlm_pool *pl, __u32 limit)
 }
 EXPORT_SYMBOL(ldlm_pool_set_limit);
 
+__u32 ldlm_pool_get_lvf(struct ldlm_pool *pl)
+{
+        return 0;
+}
+EXPORT_SYMBOL(ldlm_pool_get_lvf);
+
 int ldlm_pools_init(void)
 {
         return 0;
index 3a02565..45799a0 100644 (file)
@@ -1000,24 +1000,35 @@ static inline struct ldlm_pool *ldlm_imp2pl(struct obd_import *imp)
         return &imp->imp_obd->obd_namespace->ns_pool;
 }
 
+/**
+ * Update client's obd pool related fields with new SLV and Limit from \a req.
+ */
 int ldlm_cli_update_pool(struct ptlrpc_request *req)
 {
+        struct obd_device *obd;
         __u64 old_slv, new_slv;
-        struct ldlm_pool *pl;
         __u32 new_limit;
         ENTRY;
     
-        if (!imp_connect_lru_resize(req->rq_import))
+        if (unlikely(!req->rq_import || !req->rq_import->imp_obd || 
+                     !imp_connect_lru_resize(req->rq_import)))
+        {
+                /* 
+                 * Do nothing for corner cases. 
+                 */
                 RETURN(0);
+        }
 
-        /* In some cases RPC may contain slv and limit zeroed out. This is 
+        /* 
+         * In some cases RPC may contain slv and limit zeroed out. This is 
          * the case when server does not support lru resize feature. This is
          * also possible in some recovery cases when server side reqs have no
          * ref to obd export and thus access to server side namespace is no 
-         * possible. */
+         * possible. 
+         */
         if (lustre_msg_get_slv(req->rq_repmsg) == 0 || 
             lustre_msg_get_limit(req->rq_repmsg) == 0) {
-                DEBUG_REQ(D_HA, req, "zero SLV or Limit found "
+                DEBUG_REQ(D_HA, req, "Zero SLV or Limit found "
                           "(SLV: "LPU64", Limit: %u)", 
                           lustre_msg_get_slv(req->rq_repmsg), 
                           lustre_msg_get_limit(req->rq_repmsg));
@@ -1026,30 +1037,41 @@ int ldlm_cli_update_pool(struct ptlrpc_request *req)
 
         new_limit = lustre_msg_get_limit(req->rq_repmsg);
         new_slv = lustre_msg_get_slv(req->rq_repmsg);
-        pl = ldlm_imp2pl(req->rq_import);
-        
-        spin_lock(&pl->pl_lock);
-        old_slv = ldlm_pool_get_slv(pl);
-        ldlm_pool_set_slv(pl, new_slv);
-        ldlm_pool_set_limit(pl, new_limit);
-
-        /* Check if we need to wakeup pools thread for fast SLV change. 
+        obd = req->rq_import->imp_obd;
+
+        /* 
+         * Set new SLV and Limit to obd fields to make accessible for pool 
+         * thread. We do not access obd_namespace and pool directly here
+         * as there is no reliable way to make sure that they are still
+         * alive in cleanup time. Evil races are possible which may cause
+         * oops in that time. 
+         */
+        write_lock(&obd->obd_pool_lock);
+        old_slv = obd->obd_pool_slv;
+        obd->obd_pool_slv = new_slv;
+        obd->obd_pool_limit = new_limit;
+        write_unlock(&obd->obd_pool_lock);
+
+        /* 
+         * Check if we need to wakeup pools thread for fast SLV change. 
          * This is only done when threads period is noticably long like 
-         * 10s or more. */
+         * 10s or more. 
+         */
 #if defined(__KERNEL__) && (LDLM_POOLS_THREAD_PERIOD >= 10)
-        {
+        if (old_slv > 0) {
                 __u64 fast_change = old_slv * LDLM_POOLS_FAST_SLV_CHANGE;
                 do_div(fast_change, 100);
 
-                /* Wake up pools thread only if SLV has changed more than 
+                /* 
+                 * Wake up pools thread only if SLV has changed more than 
                  * 50% since last update. In this case we want to react asap. 
                  * Otherwise it is no sense to wake up pools as they are 
-                 * re-calculated every LDLM_POOLS_THREAD_PERIOD anyways. */
+                 * re-calculated every LDLM_POOLS_THREAD_PERIOD anyways. 
+                 */
                 if (old_slv > new_slv && old_slv - new_slv > fast_change)
                         ldlm_pools_wakeup();
         }
 #endif
-        spin_unlock(&pl->pl_lock);
         RETURN(0);
 }
 EXPORT_SYMBOL(ldlm_cli_update_pool);
@@ -1205,17 +1227,17 @@ static ldlm_policy_res_t ldlm_cancel_lrur_policy(struct ldlm_namespace *ns,
         if (count && added >= count)
                 return LDLM_POLICY_KEEP_LOCK;
 
-        spin_lock(&pl->pl_lock);
         slv = ldlm_pool_get_slv(pl);
-        lvf = atomic_read(&pl->pl_lock_volume_factor);
-        spin_unlock(&pl->pl_lock);
-
+        lvf = ldlm_pool_get_lvf(pl);
         la = cfs_duration_sec(cfs_time_sub(cur, 
                               lock->l_last_used));
 
         /* Stop when slv is not yet come from server or 
          * lv is smaller than it is. */
         lv = lvf * la * unused;
+        
+        /* Inform pool about current CLV to see it via proc. */
+        ldlm_pool_set_clv(pl, lv);
         return (slv == 1 || lv < slv) ? 
                 LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK;
 }
index 4974f21..b943494 100644 (file)
@@ -295,8 +295,8 @@ void ldlm_proc_namespace(struct ldlm_namespace *ns)
 #define ldlm_proc_namespace(ns) do {} while (0)
 #endif /* LPROCFS */
 
-struct ldlm_namespace *ldlm_namespace_new(char *name, ldlm_side_t client
-                                          ldlm_appetite_t apt)
+struct ldlm_namespace *ldlm_namespace_new(struct obd_device *obd, char *name
+                                          ldlm_side_t client, ldlm_appetite_t apt)
 {
         struct ldlm_namespace *ns = NULL;
         struct list_head *bucket;
@@ -319,6 +319,10 @@ struct ldlm_namespace *ldlm_namespace_new(char *name, ldlm_side_t client,
 
         ns->ns_shrink_thumb = LDLM_LOCK_SHRINK_THUMB;
         ns->ns_appetite = apt;
+
+        LASSERT(obd != NULL);
+        ns->ns_obd = obd;
+
         namelen = strlen(name);
         OBD_ALLOC(ns->ns_name, namelen + 1);
         if (!ns->ns_name)
@@ -542,6 +546,15 @@ force_wait:
         RETURN(ELDLM_OK);
 }
 
+/**
+ * Performs various cleanups for passed \a ns to make it drop refc and be ready
+ * for freeing. Waits for refc == 0.
+ *
+ * The following is done:
+ * (0) Unregister \a ns from its list to make inaccessible for potential users
+ * like pools thread and others;
+ * (1) Clear all locks in \a ns.
+ */
 void ldlm_namespace_free_prior(struct ldlm_namespace *ns, 
                                struct obd_import *imp, 
                                int force)
@@ -553,10 +566,14 @@ void ldlm_namespace_free_prior(struct ldlm_namespace *ns,
                 return;
         }
 
-        /* Remove @ns from list. */
+        /* 
+         * Make sure that nobody can find this ns in its list. 
+         */
         ldlm_namespace_unregister(ns, ns->ns_client);
 
-        /* Can fail with -EINTR when force == 0 in which case try harder */
+        /* 
+         * Can fail with -EINTR when force == 0 in which case try harder.
+         */
         rc = __ldlm_namespace_free(ns, force);
         if (rc != ELDLM_OK) {
                 if (imp) {
@@ -564,14 +581,21 @@ void ldlm_namespace_free_prior(struct ldlm_namespace *ns,
                         ptlrpc_invalidate_import(imp);
                 }
 
-                /* With all requests dropped and the import inactive
-                 * we are gaurenteed all reference will be dropped. */
+                /* 
+                 * With all requests dropped and the import inactive
+                 * we are gaurenteed all reference will be dropped. 
+                 */
                 rc = __ldlm_namespace_free(ns, 1);
                 LASSERT(rc == 0);
         }
         EXIT;
 }
 
+/**
+ * Performs freeing memory structures related to \a ns. This is only done when
+ * ldlm_namespce_free_prior() successfully removed all resources referencing
+ * \a ns and its refc == 0.
+ */
 void ldlm_namespace_free_post(struct ldlm_namespace *ns)
 {
         ENTRY;
@@ -586,6 +610,7 @@ void ldlm_namespace_free_post(struct ldlm_namespace *ns)
          * it after @dir may cause oops.
          */
         ldlm_pool_fini(&ns->ns_pool);
+
 #ifdef LPROCFS
         {
                 struct proc_dir_entry *dir;
@@ -601,9 +626,10 @@ void ldlm_namespace_free_post(struct ldlm_namespace *ns)
 
         OBD_VFREE(ns->ns_hash, sizeof(*ns->ns_hash) * RES_HASH_SIZE);
         OBD_FREE(ns->ns_name, strlen(ns->ns_name) + 1);
-        /* 
-         * @ns should be not on list in this time, otherwise this will cause
-         * issues realted to using freed @ns in pools thread. 
+
+        /*
+         * Namespace \a ns should be not on list in this time, otherwise this
+         * will cause issues realted to using freed \a ns in pools thread. 
          */
         LASSERT(list_empty(&ns->ns_list_chain));
         OBD_FREE_PTR(ns);
index 2e35988..618638b 100644 (file)
@@ -3968,7 +3968,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
 
         snprintf(info->mti_u.ns_name, sizeof info->mti_u.ns_name,
                  LUSTRE_MDT_NAME"-%p", m);
-        m->mdt_namespace = ldlm_namespace_new(info->mti_u.ns_name,
+        m->mdt_namespace = ldlm_namespace_new(obd, info->mti_u.ns_name,
                                               LDLM_NAMESPACE_SERVER,
                                               LDLM_NAMESPACE_GREEDY);
         if (m->mdt_namespace == NULL)
index 4781703..af67705 100644 (file)
@@ -180,7 +180,7 @@ static int mgs_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
                 GOTO(err_put, rc = PTR_ERR(obd->obd_fsops));
 
         /* namespace for mgs llog */
-        obd->obd_namespace = ldlm_namespace_new("MGS", LDLM_NAMESPACE_SERVER,
+        obd->obd_namespace = ldlm_namespace_new(obd ,"MGS", LDLM_NAMESPACE_SERVER,
                                                 LDLM_NAMESPACE_MODEST);
         if (obd->obd_namespace == NULL)
                 GOTO(err_ops, rc = -ENOMEM);
@@ -276,16 +276,9 @@ static int mgs_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
         RETURN(rc);
 }
 
-static int mgs_ldlm_nsfree(void *data)
-{
-        struct ldlm_namespace *ns = (struct ldlm_namespace *)data;
-        ENTRY;
-
-        ptlrpc_daemonize("ll_mgs_nsfree");
-        ldlm_namespace_free(ns, NULL, 1 /* obd_force should always be on */);
-        RETURN(0);
-}
-
+/**
+ * Performs cleanup procedures for passed \a obd given it is mgs obd.
+ */
 static int mgs_cleanup(struct obd_device *obd)
 {
         struct mgs_obd *mgs = &obd->u.mgs;
@@ -305,12 +298,8 @@ static int mgs_cleanup(struct obd_device *obd)
         server_put_mount(obd->obd_name, mgs->mgs_vfsmnt);
         mgs->mgs_sb = NULL;
 
-        /* Free the namespace in it's own thread, so that if the
-           ldlm_cancel_handler put the last mgs obd ref, we won't
-           deadlock here. */
-        cfs_kernel_thread(mgs_ldlm_nsfree, obd->obd_namespace,
-                          CLONE_VM | CLONE_FILES);
-
+        ldlm_namespace_free(obd->obd_namespace, NULL, 1);
+        obd->obd_namespace = NULL;
 
         fsfilt_put_ops(obd->obd_fsops);
 
index 93a11de..e906d83 100644 (file)
@@ -72,6 +72,11 @@ static void obd_device_free(struct obd_device *obd)
         LASSERT(obd != NULL);
         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n", 
                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
+        if (obd->obd_namespace != NULL) {
+                CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n", 
+                       obd, obd->obd_namespace, obd->obd_force);
+                LBUG();
+        }
         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
 }
 EXPORT_SYMBOL(obd_device_free);
index 7cbba44..0dd3ab6 100644 (file)
@@ -120,8 +120,9 @@ EXPORT_SYMBOL(class_parse_nid);
 
 /********************** class fns **********************/
 
-/* Create a new device and set the type, name and uuid.  If
- * successful, the new device can be accessed by either name or uuid.
+/**
+ * Create a new device and set the type, name and uuid.  If successful, the new
+ * device can be accessed by either name or uuid.
  */
 int class_attach(struct lustre_cfg *lcfg)
 {
@@ -168,6 +169,10 @@ int class_attach(struct lustre_cfg *lcfg)
         LASSERTF(strncmp(obd->obd_name, name, strlen(name)) == 0, "%p obd_name %s != %s\n",
                  obd, obd->obd_name, name);
 
+        rwlock_init(&obd->obd_pool_lock);
+        obd->obd_pool_limit = 0;
+        obd->obd_pool_slv = 0;
+
         CFS_INIT_LIST_HEAD(&obd->obd_exports);
         CFS_INIT_LIST_HEAD(&obd->obd_exports_timed);
         CFS_INIT_LIST_HEAD(&obd->obd_nid_stats);
index 197d3cd..76288a1 100644 (file)
@@ -466,7 +466,7 @@ static int echo_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
         spin_lock_init(&obd->u.echo.eo_lock);
         obd->u.echo.eo_lastino = ECHO_INIT_OBJID;
 
-        obd->obd_namespace = ldlm_namespace_new("echo-tgt",
+        obd->obd_namespace = ldlm_namespace_new(obd, "echo-tgt",
                                                 LDLM_NAMESPACE_SERVER,
                                                 LDLM_NAMESPACE_MODEST);
         if (obd->obd_namespace == NULL) {
@@ -512,6 +512,7 @@ static int echo_cleanup(struct obd_device *obd)
         cfs_schedule_timeout (CFS_TASK_UNINT, cfs_time_seconds(1));
 
         ldlm_namespace_free(obd->obd_namespace, NULL, obd->obd_force);
+        obd->obd_namespace = NULL;
 
         leaked = atomic_read(&obd->u.echo.eo_prep);
         if (leaked != 0)
index 3b3d1fc..87b6614 100644 (file)
@@ -2008,7 +2008,7 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg,
                 GOTO(err_ops, rc = -ENOMEM);
 
         sprintf(ns_name, "filter-%s", obd->obd_uuid.uuid);
-        obd->obd_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER,
+        obd->obd_namespace = ldlm_namespace_new(obd, ns_name, LDLM_NAMESPACE_SERVER,
                                                 LDLM_NAMESPACE_GREEDY);
         if (obd->obd_namespace == NULL)
                 GOTO(err_post, rc = -ENOMEM);
@@ -2403,6 +2403,7 @@ static int filter_cleanup(struct obd_device *obd)
         target_cleanup_recovery(obd);
 
         ldlm_namespace_free(obd->obd_namespace, NULL, obd->obd_force);
+        obd->obd_namespace = NULL;
 
         sptlrpc_rule_set_free(&filter->fo_sptlrpc_rset);
 
index b536445..dac8412 100644 (file)
@@ -755,6 +755,9 @@ static int ptlrpc_check_status(struct ptlrpc_request *req)
         RETURN(err);
 }
 
+/**
+ * Callback function called when client receives RPC reply for \a req.
+ */
 static int after_reply(struct ptlrpc_request *req)
 {
         struct obd_import *imp = req->rq_import;
@@ -768,10 +771,14 @@ static int after_reply(struct ptlrpc_request *req)
         LASSERT(obd);
         LASSERT(req->rq_nob_received <= req->rq_repbuf_len);
 
-        /* NB Until this point, the whole of the incoming message,
-         * including buflens, status etc is in the sender's byte order. */
+        /*
+         * NB Until this point, the whole of the incoming message,
+         * including buflens, status etc is in the sender's byte order. 
+         */
 
-        /* Clear reply swab mask; this is a new reply in sender's byte order */
+        /*
+         * Clear reply swab mask; this is a new reply in sender's byte order. 
+         */
         req->rq_rep_swab_mask = 0;
 
         rc = sptlrpc_cli_unwrap_reply(req);
@@ -780,7 +787,9 @@ static int after_reply(struct ptlrpc_request *req)
                 RETURN(rc);
         }
 
-        /* security layer unwrap might ask resend this request */
+        /*
+         * Security layer unwrap might ask resend this request. 
+         */
         if (req->rq_resend)
                 RETURN(0);
 
@@ -813,9 +822,11 @@ static int after_reply(struct ptlrpc_request *req)
         imp->imp_connect_error = rc;
 
         if (rc) {
-                /* Either we've been evicted, or the server has failed for
+                /*
+                 * Either we've been evicted, or the server has failed for
                  * some reason. Try to reconnect, and if that fails, punt to
-                 * the upcall. */
+                 * the upcall. 
+                 */
                 if (ll_rpc_recoverable_error(rc)) {
                         if (req->rq_send_state != LUSTRE_IMP_FULL ||
                             imp->imp_obd->obd_no_recov || imp->imp_dlm_fake) {
@@ -825,24 +836,25 @@ static int after_reply(struct ptlrpc_request *req)
                         RETURN(rc);
                 }
         } else {
-                /* Let's look if server send slv. Do it only for RPC with 
-                 * rc == 0. */
-                if (imp->imp_obd->obd_namespace) {
-                        /* Disconnect rpc is sent when namespace is already 
-                         * destroyed. Let's check this and will not try update
-                         * pool. */
-                        ldlm_cli_update_pool(req);
-                }
+                /*
+                 * Let's look if server sent slv. Do it only for RPC with 
+                 * rc == 0. 
+                 */
+                ldlm_cli_update_pool(req);
         }
 
-        /* Store transno in reqmsg for replay. */
+        /*
+         * Store transno in reqmsg for replay. 
+         */
         req->rq_transno = lustre_msg_get_transno(req->rq_repmsg);
         lustre_msg_set_transno(req->rq_reqmsg, req->rq_transno);
 
         if (req->rq_import->imp_replayable) {
                 spin_lock(&imp->imp_lock);
-                /* no point in adding already-committed requests to the replay
-                 * list, we will just remove them immediately. b=9829 */
+                /*
+                 * No point in adding already-committed requests to the replay
+                 * list, we will just remove them immediately. b=9829 
+                 */
                 if (req->rq_transno != 0 && 
                     (req->rq_transno > 
                      lustre_msg_get_last_committed(req->rq_repmsg) ||
@@ -854,7 +866,9 @@ static int after_reply(struct ptlrpc_request *req)
                         spin_lock(&imp->imp_lock);
                 }
 
-                /* Replay-enabled imports return commit-status information. */
+                /*
+                 * Replay-enabled imports return commit-status information. 
+                 */
                 if (lustre_msg_get_last_committed(req->rq_repmsg)) {
                         imp->imp_peer_committed_transno =
                                 lustre_msg_get_last_committed(req->rq_repmsg);