From fce887d1f9e0170ef33807415bcff4660fe7fa39 Mon Sep 17 00:00:00 2001 From: yury Date: Wed, 4 Jun 2008 12:07:32 +0000 Subject: [PATCH] b=15226 r=shadow,nikita - fixes issue with accessing freed @ns on behalf rpc handling when some late rpc comes in fini time; - add new comments in functions, converts existing ones to be inline wigth doxygen comments policies. --- lustre/include/liblustre.h | 2 +- lustre/include/lustre_dlm.h | 207 ++++++++++++++----- lustre/include/obd.h | 7 + lustre/ldlm/ldlm_lib.c | 29 ++- lustre/ldlm/ldlm_pool.c | 468 +++++++++++++++++++++++++++++++++++-------- lustre/ldlm/ldlm_request.c | 66 ++++-- lustre/ldlm/ldlm_resource.c | 44 +++- lustre/mdt/mdt_handler.c | 2 +- lustre/mgs/mgs_handler.c | 23 +-- lustre/obdclass/genops.c | 5 + lustre/obdclass/obd_config.c | 9 +- lustre/obdecho/echo.c | 3 +- lustre/obdfilter/filter.c | 3 +- lustre/ptlrpc/client.c | 50 +++-- 14 files changed, 695 insertions(+), 223 deletions(-) diff --git a/lustre/include/liblustre.h b/lustre/include/liblustre.h index 388de36..e59581f 100644 --- a/lustre/include/liblustre.h +++ b/lustre/include/liblustre.h @@ -281,7 +281,7 @@ typedef spinlock_t rwlock_t; #define read_unlock(l) spin_unlock(l) #define write_lock(l) spin_lock(l) #define write_unlock(l) spin_unlock(l) - +#define rwlock_init(l) spin_lock_init(l) #define min(x,y) ((x)<(y) ? (x) : (y)) #define max(x,y) ((x)>(y) ? (x) : (y)) diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h index b59c05a..5d2941c 100644 --- a/lustre/include/lustre_dlm.h +++ b/lustre/include/lustre_dlm.h @@ -233,34 +233,71 @@ struct ldlm_pool_ops { #define LDLM_POOLS_FAST_SLV_CHANGE (50) struct ldlm_pool { - /* Common pool fields */ - cfs_proc_dir_entry_t *pl_proc_dir; /* Pool proc directory. */ - char pl_name[100]; /* Pool name, should be long - * enough to contain complex - * proc entry name. */ - spinlock_t pl_lock; /* Lock for protecting slv/clv - * updates. */ - atomic_t pl_limit; /* Number of allowed locks in - * in pool, both, client and - * server side. */ - atomic_t pl_granted; /* Number of granted locks. */ - atomic_t pl_grant_rate; /* Grant rate per T. */ - atomic_t pl_cancel_rate; /* Cancel rate per T. */ - atomic_t pl_grant_speed; /* Grant speed (GR-CR) per T. */ - __u64 pl_server_lock_volume; /* Server lock volume. - * Protected by pl_lock */ - atomic_t pl_lock_volume_factor; /* Lock volume factor. */ - - time_t pl_recalc_time; /* Time when last slv from - * server was obtained. */ - struct ldlm_pool_ops *pl_ops; /* Recalc and shrink ops. */ - - int pl_grant_plan; /* Planned number of granted - * locks for next T. */ - int pl_grant_step; /* Grant plan step for next - * T. */ - - struct lprocfs_stats *pl_stats; /* Pool statistics. */ + /** + * Pool proc directory. + */ + cfs_proc_dir_entry_t *pl_proc_dir; + /** + * Pool name, should be long enough to contain compound proc entry name. + */ + char pl_name[100]; + /** + * Lock for protecting slv/clv updates. + */ + spinlock_t pl_lock; + /** + * Number of allowed locks in in pool, both, client and server side. + */ + atomic_t pl_limit; + /** + * Number of granted locks in + */ + atomic_t pl_granted; + /** + * Grant rate per T. + */ + atomic_t pl_grant_rate; + /** + * Cancel rate per T. + */ + atomic_t pl_cancel_rate; + /** + * Grant speed (GR-CR) per T. + */ + atomic_t pl_grant_speed; + /** + * Server lock volume. Protected by pl_lock. + */ + __u64 pl_server_lock_volume; + /** + * Current biggest client lock volume. Protected by pl_lock. + */ + __u64 pl_client_lock_volume; + /** + * Lock volume factor. SLV on client is calculated as following: + * server_slv * lock_volume_factor. + */ + atomic_t pl_lock_volume_factor; + /** + * Time when last slv from server was obtained. + */ + time_t pl_recalc_time; + /** + * Recalc and shrink ops. + */ + struct ldlm_pool_ops *pl_ops; + /** + * Planned number of granted locks for next T. + */ + int pl_grant_plan; + /** + * Grant plan step for next T. + */ + int pl_grant_step; + /** + * Pool statistics. + */ + struct lprocfs_stats *pl_stats; }; typedef int (*ldlm_res_policy)(struct ldlm_namespace *, struct ldlm_lock **, @@ -278,43 +315,89 @@ typedef enum { LDLM_NAMESPACE_MODEST = 1 << 1 } ldlm_appetite_t; -/* Default value for ->ns_shrink_thumb. If lock is not extent one its cost +/* + * Default value for ->ns_shrink_thumb. If lock is not extent one its cost * is one page. Here we have 256 pages which is 1M on i386. Thus by default * all extent locks which have more than 1M long extent will be kept in lru, - * others (including ibits locks) will be canceled on memory pressure event. */ + * others (including ibits locks) will be canceled on memory pressure event. + */ #define LDLM_LOCK_SHRINK_THUMB 256 -/* default values for the "max_nolock_size", "contention_time" - * and "contended_locks" namespace tunables */ +/* + * Default values for the "max_nolock_size", "contention_time" and + * "contended_locks" namespace tunables. + */ #define NS_DEFAULT_MAX_NOLOCK_BYTES 0 #define NS_DEFAULT_CONTENTION_SECONDS 2 #define NS_DEFAULT_CONTENDED_LOCKS 32 struct ldlm_namespace { + /** + * Namespace name. Used for logging, etc. + */ char *ns_name; - ldlm_side_t ns_client; /* is this a client-side lock tree? */ - __u64 ns_connect_flags; /* ns connect flags supported - * by server (may be changed via proc, - * lru resize may be disabled/enabled) */ - __u64 ns_orig_connect_flags; /* client side orig connect - * flags supported by server */ - struct list_head *ns_hash; /* hash table for ns */ + + /** + * Is this a client-side lock tree? + */ + ldlm_side_t ns_client; + + /** + * Namespce connect flags supported by server (may be changed via proc, + * lru resize may be disabled/enabled). + */ + __u64 ns_connect_flags; + + /** + * Client side orig connect flags supported by server. + */ + __u64 ns_orig_connect_flags; + + /** + * Hash table for namespace. + */ + struct list_head *ns_hash; spinlock_t ns_hash_lock; - __u32 ns_refcount; /* count of resources in the hash */ - struct list_head ns_root_list; /* all root resources in ns */ - struct list_head ns_list_chain; /* position in global NS list */ - struct list_head ns_unused_list; /* all root resources in ns */ + /** + * Count of resources in the hash. + */ + __u32 ns_refcount; + + /** + * All root resources in namespace. + */ + struct list_head ns_root_list; + + /** + * Position in global namespace list. + */ + struct list_head ns_list_chain; + + /** + * All root resources in namespace. + */ + struct list_head ns_unused_list; int ns_nr_unused; spinlock_t ns_unused_lock; unsigned int ns_max_unused; unsigned int ns_max_age; - unsigned int ns_ctime_age_limit; /* seconds */ + + /** + * Seconds. + */ + unsigned int ns_ctime_age_limit; - /* Lower limit to number of pages in lock to keep it in cache */ + /** + * Lower limit to number of pages in lock to keep it in cache. + */ unsigned int ns_shrink_thumb; - cfs_time_t ns_next_dump; /* next debug dump, jiffies */ + + /** + * Next debug dump, jiffies. + */ + cfs_time_t ns_next_dump; atomic_t ns_locks; __u64 ns_resources; @@ -324,14 +407,28 @@ struct ldlm_namespace { cfs_waitq_t ns_waitq; struct ldlm_pool ns_pool; ldlm_appetite_t ns_appetite; - /* if more than @ns_contented_locks found, the resource considered - * as contended */ + + /** + * If more than @ns_contented_locks found, the resource considered + * as contended. + */ unsigned ns_contended_locks; - /* the resource remembers contended state during @ns_contention_time, - * in seconds */ + + /** + * The resource remembers contended state during @ns_contention_time, + * in seconds. + */ unsigned ns_contention_time; - /* limit size of nolock requests, in bytes */ + + /** + * Limit size of nolock requests, in bytes. + */ unsigned ns_max_nolock_size; + + /** + * Backward link to obd, required for ldlm pool to store new SLV. + */ + struct obd_device *ns_obd; }; static inline int ns_is_client(struct ldlm_namespace *ns) @@ -695,8 +792,9 @@ void ldlm_lock_dump_handle(int level, struct lustre_handle *); void ldlm_unlink_lock_skiplist(struct ldlm_lock *req); /* resource.c */ -struct ldlm_namespace *ldlm_namespace_new(char *name, ldlm_side_t client, - ldlm_appetite_t apt); +struct ldlm_namespace * +ldlm_namespace_new(struct obd_device *obd, char *name, + ldlm_side_t client, ldlm_appetite_t apt); int ldlm_namespace_cleanup(struct ldlm_namespace *ns, int flags); void ldlm_namespace_free(struct ldlm_namespace *ns, struct obd_import *imp, int force); @@ -847,9 +945,12 @@ int ldlm_pool_shrink(struct ldlm_pool *pl, int nr, void ldlm_pool_fini(struct ldlm_pool *pl); int ldlm_pool_setup(struct ldlm_pool *pl, int limit); int ldlm_pool_recalc(struct ldlm_pool *pl); +__u32 ldlm_pool_get_lvf(struct ldlm_pool *pl); __u64 ldlm_pool_get_slv(struct ldlm_pool *pl); +__u64 ldlm_pool_get_clv(struct ldlm_pool *pl); __u32 ldlm_pool_get_limit(struct ldlm_pool *pl); void ldlm_pool_set_slv(struct ldlm_pool *pl, __u64 slv); +void ldlm_pool_set_clv(struct ldlm_pool *pl, __u64 clv); void ldlm_pool_set_limit(struct ldlm_pool *pl, __u32 limit); void ldlm_pool_add(struct ldlm_pool *pl, struct ldlm_lock *lock); void ldlm_pool_del(struct ldlm_pool *pl, struct ldlm_lock *lock); diff --git a/lustre/include/obd.h b/lustre/include/obd.h index fd2a42b..188a022 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -965,6 +965,13 @@ struct obd_device { struct lprocfs_stats *obd_svc_stats; atomic_t obd_evict_inprogress; cfs_waitq_t obd_evict_inprogress_waitq; + + /** + * Ldlm pool part. Save last calculated SLV and Limit. + */ + rwlock_t obd_pool_lock; + int obd_pool_limit; + __u64 obd_pool_slv; }; #define OBD_OPT_FORCE 0x0001 diff --git a/lustre/ldlm/ldlm_lib.c b/lustre/ldlm/ldlm_lib.c index 063f9d7..4ae8d06 100644 --- a/lustre/ldlm/ldlm_lib.c +++ b/lustre/ldlm/ldlm_lib.c @@ -394,7 +394,7 @@ int client_connect_import(const struct lu_env *env, if (obd->obd_namespace != NULL) CERROR("already have namespace!\n"); - obd->obd_namespace = ldlm_namespace_new(obd->obd_name, + obd->obd_namespace = ldlm_namespace_new(obd, obd->obd_name, LDLM_NAMESPACE_CLIENT, LDLM_NAMESPACE_GREEDY); if (obd->obd_namespace == NULL) @@ -1833,25 +1833,34 @@ static inline struct ldlm_pool *ldlm_exp2pl(struct obd_export *exp) return &exp->exp_obd->obd_namespace->ns_pool; } +/** + * Packs current SLV and Limit into \a req. + */ int target_pack_pool_reply(struct ptlrpc_request *req) { - struct ldlm_pool *pl; + struct obd_device *obd; ENTRY; - if (!req->rq_export || !req->rq_export->exp_obd || - !exp_connect_lru_resize(req->rq_export)) { + /* + * Check that we still have all structures alive as this may + * be some late rpc in shutdown time. + */ + if (unlikely(!req->rq_export || !req->rq_export->exp_obd || + !exp_connect_lru_resize(req->rq_export))) { lustre_msg_set_slv(req->rq_repmsg, 0); lustre_msg_set_limit(req->rq_repmsg, 0); RETURN(0); } - pl = ldlm_exp2pl(req->rq_export); + /* + * OBD is alive here as export is alive, which we checked above. + */ + obd = req->rq_export->exp_obd; - spin_lock(&pl->pl_lock); - LASSERT(ldlm_pool_get_slv(pl) != 0 && ldlm_pool_get_limit(pl) != 0); - lustre_msg_set_slv(req->rq_repmsg, ldlm_pool_get_slv(pl)); - lustre_msg_set_limit(req->rq_repmsg, ldlm_pool_get_limit(pl)); - spin_unlock(&pl->pl_lock); + read_lock(&obd->obd_pool_lock); + lustre_msg_set_slv(req->rq_repmsg, obd->obd_pool_slv); + lustre_msg_set_limit(req->rq_repmsg, obd->obd_pool_limit); + read_unlock(&obd->obd_pool_lock); RETURN(0); } diff --git a/lustre/ldlm/ldlm_pool.c b/lustre/ldlm/ldlm_pool.c index cb96c8a..bd89cfa 100644 --- a/lustre/ldlm/ldlm_pool.c +++ b/lustre/ldlm/ldlm_pool.c @@ -23,7 +23,8 @@ * license text for more details. */ -/* Idea of this code is rather simple. Each second, for each server namespace +/* + * Idea of this code is rather simple. Each second, for each server namespace * we have SLV - server lock volume which is calculated on current number of * granted locks, grant speed for past period, etc - that is, locking load. * This SLV number may be thought as a flow definition for simplicity. It is @@ -98,16 +99,24 @@ #ifdef HAVE_LRU_RESIZE_SUPPORT -/* 50 ldlm locks for 1MB of RAM. */ +/* + * 50 ldlm locks for 1MB of RAM. + */ #define LDLM_POOL_HOST_L ((num_physpages >> (20 - CFS_PAGE_SHIFT)) * 50) -/* Default step in % for grant plan. */ +/* + * Default step in % for grant plan. + */ #define LDLM_POOL_GSP (10) -/* LDLM_POOL_GSP% of all locks is default GP. */ +/* + * LDLM_POOL_GSP% of all locks is default GP. + */ #define LDLM_POOL_GP(L) (((L) * LDLM_POOL_GSP) / 100) -/* Max age for locks on clients. */ +/* + * Max age for locks on clients. + */ #define LDLM_POOL_MAX_AGE (36000) #ifdef __KERNEL__ @@ -126,8 +135,10 @@ static inline __u64 dru(__u64 val, __u32 div) static inline __u64 ldlm_pool_slv_max(__u32 L) { - /* Allow to have all locks for 1 client for 10 hrs. - * Formula is the following: limit * 10h / 1 client. */ + /* + * Allow to have all locks for 1 client for 10 hrs. + * Formula is the following: limit * 10h / 1 client. + */ __u64 lim = L * LDLM_POOL_MAX_AGE / 1; return lim; } @@ -158,7 +169,11 @@ static inline struct ldlm_namespace *ldlm_pl2ns(struct ldlm_pool *pl) return container_of(pl, struct ldlm_namespace, ns_pool); } -/* Should be called under ->pl_lock taken */ +/** + * Recalculates next grant limit on passed \a pl. + * + * \pre ->pl_lock is locked. + */ static inline void ldlm_pool_recalc_grant_plan(struct ldlm_pool *pl) { int granted, grant_step, limit; @@ -170,14 +185,18 @@ static inline void ldlm_pool_recalc_grant_plan(struct ldlm_pool *pl) pl->pl_grant_plan = granted + grant_step; } -/* Should be called under ->pl_lock taken */ +/** + * Recalculates next SLV on passed \a pl. + * + * \pre ->pl_lock is locked. + */ static inline void ldlm_pool_recalc_slv(struct ldlm_pool *pl) { int grant_usage, granted, grant_plan; __u64 slv, slv_factor; __u32 limit; - slv = ldlm_pool_get_slv(pl); + slv = pl->pl_server_lock_volume; grant_plan = pl->pl_grant_plan; limit = ldlm_pool_get_limit(pl); granted = atomic_read(&pl->pl_granted); @@ -186,12 +205,14 @@ static inline void ldlm_pool_recalc_slv(struct ldlm_pool *pl) if (grant_usage <= 0) grant_usage = 1; - /* Find out SLV change factor which is the ratio of grant usage + /* + * Find out SLV change factor which is the ratio of grant usage * from limit. SLV changes as fast as the ratio of grant plan * consumtion. The more locks from grant plan are not consumed * by clients in last interval (idle time), the faster grows * SLV. And the opposite, the more grant plan is over-consumed - * (load time) the faster drops SLV. */ + * (load time) the faster drops SLV. + */ slv_factor = (grant_usage * 100) / limit; if (2 * abs(granted - limit) > limit) { slv_factor *= slv_factor; @@ -206,13 +227,18 @@ static inline void ldlm_pool_recalc_slv(struct ldlm_pool *pl) slv = ldlm_pool_slv_min(limit); } - ldlm_pool_set_slv(pl, slv); + pl->pl_server_lock_volume = slv; } +/** + * Recalculates next stats on passed \a pl. + * + * \pre ->pl_lock is locked. + */ static inline void ldlm_pool_recalc_stats(struct ldlm_pool *pl) { - __u64 slv = ldlm_pool_get_slv(pl); int grant_plan = pl->pl_grant_plan; + __u64 slv = pl->pl_server_lock_volume; int granted = atomic_read(&pl->pl_granted); int grant_rate = atomic_read(&pl->pl_grant_rate); int cancel_rate = atomic_read(&pl->pl_cancel_rate); @@ -229,6 +255,32 @@ static inline void ldlm_pool_recalc_stats(struct ldlm_pool *pl) cancel_rate); } +/** + * Sets current SLV into obd accessible via ldlm_pl2ns(pl)->ns_obd. + */ +static void ldlm_srv_pool_push_slv(struct ldlm_pool *pl) +{ + struct obd_device *obd; + + /* + * Set new SLV in obd field for using it later without accessing the + * pool. This is required to avoid race between sending reply to client + * with new SLV and cleanup server stack in which we can't guarantee + * that namespace is still alive. We know only that obd is alive as + * long as valid export is alive. + */ + obd = ldlm_pl2ns(pl)->ns_obd; + LASSERT(obd != NULL); + write_lock(&obd->obd_pool_lock); + obd->obd_pool_slv = pl->pl_server_lock_volume; + write_unlock(&obd->obd_pool_lock); +} + +/** + * Recalculates all pool fields on passed \a pl. + * + * \pre ->pl_lock is not locked. + */ static int ldlm_srv_pool_recalc(struct ldlm_pool *pl) { time_t recalc_interval_sec; @@ -237,17 +289,30 @@ static int ldlm_srv_pool_recalc(struct ldlm_pool *pl) spin_lock(&pl->pl_lock); recalc_interval_sec = cfs_time_current_sec() - pl->pl_recalc_time; if (recalc_interval_sec > 0) { - /* Update statistics */ + /* + * Update statistics. + */ ldlm_pool_recalc_stats(pl); - /* Recalc SLV after last period. This should be done - * _before_ recalculating new grant plan. */ + /* + * Recalc SLV after last period. This should be done + * _before_ recalculating new grant plan. + */ ldlm_pool_recalc_slv(pl); - - /* Update grant_plan for new period. */ + + /* + * Make sure that pool informed obd of last SLV changes. + */ + ldlm_srv_pool_push_slv(pl); + + /* + * Update grant_plan for new period. + */ ldlm_pool_recalc_grant_plan(pl); - /* Zero out all rates and speed for the last period. */ + /* + * Zero out all rates and speed for the last period. + */ atomic_set(&pl->pl_grant_rate, 0); atomic_set(&pl->pl_cancel_rate, 0); atomic_set(&pl->pl_grant_speed, 0); @@ -259,26 +324,37 @@ static int ldlm_srv_pool_recalc(struct ldlm_pool *pl) RETURN(0); } -/* Our goal here is to decrease SLV the way to make a client hold - * @nr locks smaller in next 10h. */ +/** + * This function is used on server side as main entry point for memory + * preasure handling. It decreases SLV on \a pl according to passed + * \a nr and \a gfp_mask. + * + * Our goal here is to decrease SLV such a way that clients hold \a nr + * locks smaller in next 10h. + */ static int ldlm_srv_pool_shrink(struct ldlm_pool *pl, int nr, unsigned int gfp_mask) { __u32 limit; ENTRY; - /* VM is asking how many entries may be potentially freed. */ + /* + * VM is asking how many entries may be potentially freed. + */ if (nr == 0) RETURN(atomic_read(&pl->pl_granted)); - /* Client already canceled locks but server is already in shrinker - * and can't cancel anything. Let's catch this race. */ + /* + * Client already canceled locks but server is already in shrinker + * and can't cancel anything. Let's catch this race. + */ if (atomic_read(&pl->pl_granted) == 0) RETURN(0); spin_lock(&pl->pl_lock); - /* We want shrinker to possibly cause cancelation of @nr locks from + /* + * We want shrinker to possibly cause cancelation of @nr locks from * clients or grant approximately @nr locks smaller next intervals. * * This is why we decresed SLV by @nr. This effect will only be as @@ -287,27 +363,69 @@ static int ldlm_srv_pool_shrink(struct ldlm_pool *pl, * interval pool will either increase SLV if locks load is not high * or will keep on same level or even decrease again, thus, shrinker * decreased SLV will affect next recalc intervals and this way will - * make locking load lower. */ - if (nr < ldlm_pool_get_slv(pl)) { - ldlm_pool_set_slv(pl, ldlm_pool_get_slv(pl) - nr); + * make locking load lower. + */ + if (nr < pl->pl_server_lock_volume) { + pl->pl_server_lock_volume = pl->pl_server_lock_volume - nr; } else { limit = ldlm_pool_get_limit(pl); - ldlm_pool_set_slv(pl, ldlm_pool_slv_min(limit)); + pl->pl_server_lock_volume = ldlm_pool_slv_min(limit); } + + /* + * Make sure that pool informed obd of last SLV changes. + */ + ldlm_srv_pool_push_slv(pl); spin_unlock(&pl->pl_lock); - /* We did not really free any memory here so far, it only will be - * freed later may be, so that we return 0 to not confuse VM. */ + /* + * We did not really free any memory here so far, it only will be + * freed later may be, so that we return 0 to not confuse VM. + */ RETURN(0); } +/** + * Setup server side pool \a pl with passed \a limit. + */ static int ldlm_srv_pool_setup(struct ldlm_pool *pl, int limit) { + struct obd_device *obd; ENTRY; + + obd = ldlm_pl2ns(pl)->ns_obd; + LASSERT(obd != NULL && obd != LP_POISON); + LASSERT(obd->obd_type != LP_POISON); + write_lock(&obd->obd_pool_lock); + obd->obd_pool_limit = limit; + write_unlock(&obd->obd_pool_lock); + ldlm_pool_set_limit(pl, limit); RETURN(0); } +/** + * Sets SLV and Limit from ldlm_pl2ns(pl)->ns_obd tp passed \a pl. + */ +static void ldlm_cli_pool_pop_slv(struct ldlm_pool *pl) +{ + struct obd_device *obd; + + /* + * Get new SLV and Limit from obd which is updated with comming + * RPCs. + */ + obd = ldlm_pl2ns(pl)->ns_obd; + LASSERT(obd != NULL); + read_lock(&obd->obd_pool_lock); + pl->pl_server_lock_volume = obd->obd_pool_slv; + ldlm_pool_set_limit(pl, obd->obd_pool_limit); + read_unlock(&obd->obd_pool_lock); +} + +/** + * Recalculates client sise pool \a pl according to current SLV and Limit. + */ static int ldlm_cli_pool_recalc(struct ldlm_pool *pl) { time_t recalc_interval_sec; @@ -315,12 +433,21 @@ static int ldlm_cli_pool_recalc(struct ldlm_pool *pl) spin_lock(&pl->pl_lock); + /* + * Make sure that pool knows last SLV and Limit from obd. + */ + ldlm_cli_pool_pop_slv(pl); + recalc_interval_sec = cfs_time_current_sec() - pl->pl_recalc_time; if (recalc_interval_sec > 0) { - /* Update statistics only every T */ + /* + * Update statistics only every T. + */ ldlm_pool_recalc_stats(pl); - /* Zero out grant/cancel rates and speed for last period. */ + /* + * Zero out grant/cancel rates and speed for last period. + */ atomic_set(&pl->pl_grant_rate, 0); atomic_set(&pl->pl_cancel_rate, 0); atomic_set(&pl->pl_grant_speed, 0); @@ -330,34 +457,54 @@ static int ldlm_cli_pool_recalc(struct ldlm_pool *pl) } spin_unlock(&pl->pl_lock); - /* Do not cancel locks in case lru resize is disabled for this ns */ + /* + * Do not cancel locks in case lru resize is disabled for this ns. + */ if (!ns_connect_lru_resize(ldlm_pl2ns(pl))) RETURN(0); - /* In the time of canceling locks on client we do not need to maintain + /* + * In the time of canceling locks on client we do not need to maintain * sharp timing, we only want to cancel locks asap according to new SLV. - * This may be called when SLV has changed much, this is why we do not - * take into account pl->pl_recalc_time here. */ + * It may be called when SLV has changed much, this is why we do not + * take into account pl->pl_recalc_time here. + */ RETURN(ldlm_cancel_lru(ldlm_pl2ns(pl), 0, LDLM_ASYNC, LDLM_CANCEL_LRUR)); } +/** + * This function is main entry point for memory preasure handling on client side. + * Main goal of this function is to cancel some number of locks on passed \a pl + * according to \a nr and \a gfp_mask. + */ static int ldlm_cli_pool_shrink(struct ldlm_pool *pl, int nr, unsigned int gfp_mask) { ENTRY; - /* Do not cancel locks in case lru resize is disabled for this ns */ + /* + * Do not cancel locks in case lru resize is disabled for this ns. + */ if (!ns_connect_lru_resize(ldlm_pl2ns(pl))) RETURN(0); - /* Find out how many locks may be released according to shrink - * policy. */ + /* + * Make sure that pool knows last SLV and Limit from obd. + */ + ldlm_cli_pool_pop_slv(pl); + + /* + * Find out how many locks may be released according to shrink + * policy. + */ if (nr == 0) RETURN(ldlm_cancel_lru_estimate(ldlm_pl2ns(pl), 0, 0, LDLM_CANCEL_SHRINK)); - /* Cancel @nr locks accoding to shrink policy */ + /* + * Cancel @nr locks accoding to shrink policy. + */ RETURN(ldlm_cancel_lru(ldlm_pl2ns(pl), nr, LDLM_SYNC, LDLM_CANCEL_SHRINK)); } @@ -373,6 +520,10 @@ struct ldlm_pool_ops ldlm_cli_pool_ops = { .po_shrink = ldlm_cli_pool_shrink }; +/** + * Pool recalc wrapper. Will call either client or server pool recalc callback + * depending what pool \a pl is used. + */ int ldlm_pool_recalc(struct ldlm_pool *pl) { int count; @@ -387,6 +538,10 @@ int ldlm_pool_recalc(struct ldlm_pool *pl) } EXPORT_SYMBOL(ldlm_pool_recalc); +/** + * Pool shrink wrapper. Will call either client or server pool recalc callback + * depending what pool \a pl is used. + */ int ldlm_pool_shrink(struct ldlm_pool *pl, int nr, unsigned int gfp_mask) { @@ -409,8 +564,12 @@ int ldlm_pool_shrink(struct ldlm_pool *pl, int nr, } EXPORT_SYMBOL(ldlm_pool_shrink); -/* The purpose of this function is to re-setup limit and maximal allowed - * slv according to the passed limit. */ +/** + * Pool setup wrapper. Will call either client or server pool recalc callback + * depending what pool \a pl is used. + * + * Sets passed \a limit into pool \a pl. + */ int ldlm_pool_setup(struct ldlm_pool *pl, int limit) { ENTRY; @@ -427,11 +586,12 @@ static int lprocfs_rd_pool_state(char *page, char **start, off_t off, int granted, grant_rate, cancel_rate, grant_step; int nr = 0, grant_speed, grant_plan; struct ldlm_pool *pl = data; + __u64 slv, clv; __u32 limit; - __u64 slv; spin_lock(&pl->pl_lock); - slv = ldlm_pool_get_slv(pl); + slv = pl->pl_server_lock_volume; + clv = pl->pl_client_lock_volume; limit = ldlm_pool_get_limit(pl); grant_plan = pl->pl_grant_plan; grant_step = pl->pl_grant_step; @@ -444,6 +604,7 @@ static int lprocfs_rd_pool_state(char *page, char **start, off_t off, nr += snprintf(page + nr, count - nr, "LDLM pool state (%s):\n", pl->pl_name); nr += snprintf(page + nr, count - nr, " SLV: "LPU64"\n", slv); + nr += snprintf(page + nr, count - nr, " CLV: "LPU64"\n", clv); nr += snprintf(page + nr, count - nr, " LVF: %d\n", atomic_read(&pl->pl_lock_volume_factor)); @@ -639,13 +800,13 @@ int ldlm_pool_init(struct ldlm_pool *pl, struct ldlm_namespace *ns, if (client == LDLM_NAMESPACE_SERVER) { pl->pl_ops = &ldlm_srv_pool_ops; ldlm_pool_set_limit(pl, LDLM_POOL_HOST_L); - ldlm_pool_set_slv(pl, ldlm_pool_slv_max(LDLM_POOL_HOST_L)); + pl->pl_server_lock_volume = ldlm_pool_slv_max(LDLM_POOL_HOST_L); } else { - ldlm_pool_set_slv(pl, 1); + pl->pl_server_lock_volume = 1; ldlm_pool_set_limit(pl, 1); pl->pl_ops = &ldlm_cli_pool_ops; } - + pl->pl_client_lock_volume = 0; rc = ldlm_pool_proc_init(pl); if (rc) RETURN(rc); @@ -660,17 +821,28 @@ void ldlm_pool_fini(struct ldlm_pool *pl) { ENTRY; ldlm_pool_proc_fini(pl); - pl->pl_ops = NULL; + + /* + * Pool should not be used after this point. We can't free it here as + * it lives in struct ldlm_namespace, but still interested in catching + * any abnormal using cases. + */ + POISON(pl, 0x5a, sizeof(*pl)); EXIT; } EXPORT_SYMBOL(ldlm_pool_fini); +/** + * Add new taken ldlm lock \a lock into pool \a pl accounting. + */ void ldlm_pool_add(struct ldlm_pool *pl, struct ldlm_lock *lock) { - /* FLOCK locks are special in a sense that they are almost never + /* + * FLOCK locks are special in a sense that they are almost never * cancelled, instead special kind of lock is used to drop them. * also there is no LRU for flock locks, so no point in tracking - * them anyway */ + * them anyway. + */ if (lock->l_resource->lr_type == LDLM_FLOCK) return; @@ -682,18 +854,26 @@ void ldlm_pool_add(struct ldlm_pool *pl, struct ldlm_lock *lock) lprocfs_counter_incr(pl->pl_stats, LDLM_POOL_GRANT_STAT); - /* Do not do pool recalc for client side as all locks which + /* + * Do not do pool recalc for client side as all locks which * potentially may be canceled has already been packed into * enqueue/cancel rpc. Also we do not want to run out of stack - * with too long call paths. */ + * with too long call paths. + */ if (ns_is_server(ldlm_pl2ns(pl))) ldlm_pool_recalc(pl); EXIT; } EXPORT_SYMBOL(ldlm_pool_add); +/** + * Remove ldlm lock \a lock from pool \a pl accounting. + */ void ldlm_pool_del(struct ldlm_pool *pl, struct ldlm_lock *lock) { + /* + * Filter out FLOCK locks. Read above comment in ldlm_pool_add(). + */ if (lock->l_resource->lr_type == LDLM_FLOCK) return; ENTRY; @@ -710,33 +890,89 @@ void ldlm_pool_del(struct ldlm_pool *pl, struct ldlm_lock *lock) } EXPORT_SYMBOL(ldlm_pool_del); -/* ->pl_lock should be taken. */ +/** + * Returns current \a pl SLV. + * + * \pre ->pl_lock is not locked. + */ __u64 ldlm_pool_get_slv(struct ldlm_pool *pl) { - return pl->pl_server_lock_volume; + __u64 slv; + spin_lock(&pl->pl_lock); + slv = pl->pl_server_lock_volume; + spin_unlock(&pl->pl_lock); + return slv; } EXPORT_SYMBOL(ldlm_pool_get_slv); -/* ->pl_lock should be taken. */ +/** + * Sets passed \a slv to \a pl. + * + * \pre ->pl_lock is not locked. + */ void ldlm_pool_set_slv(struct ldlm_pool *pl, __u64 slv) { + spin_lock(&pl->pl_lock); pl->pl_server_lock_volume = slv; + spin_unlock(&pl->pl_lock); } EXPORT_SYMBOL(ldlm_pool_set_slv); +/** + * Returns current \a pl CLV. + * + * \pre ->pl_lock is not locked. + */ +__u64 ldlm_pool_get_clv(struct ldlm_pool *pl) +{ + __u64 slv; + spin_lock(&pl->pl_lock); + slv = pl->pl_client_lock_volume; + spin_unlock(&pl->pl_lock); + return slv; +} +EXPORT_SYMBOL(ldlm_pool_get_clv); + +/** + * Sets passed \a clv to \a pl. + * + * \pre ->pl_lock is not locked. + */ +void ldlm_pool_set_clv(struct ldlm_pool *pl, __u64 clv) +{ + spin_lock(&pl->pl_lock); + pl->pl_client_lock_volume = clv; + spin_unlock(&pl->pl_lock); +} +EXPORT_SYMBOL(ldlm_pool_set_clv); + +/** + * Returns current \a pl limit. + */ __u32 ldlm_pool_get_limit(struct ldlm_pool *pl) { return atomic_read(&pl->pl_limit); } EXPORT_SYMBOL(ldlm_pool_get_limit); +/** + * Sets passed \a limit to \a pl. + */ void ldlm_pool_set_limit(struct ldlm_pool *pl, __u32 limit) { atomic_set(&pl->pl_limit, limit); } EXPORT_SYMBOL(ldlm_pool_set_limit); -/* Server side is only enabled for kernel space for now. */ +/** + * Returns current LVF from \a pl. + */ +__u32 ldlm_pool_get_lvf(struct ldlm_pool *pl) +{ + return atomic_read(&pl->pl_lock_volume_factor); +} +EXPORT_SYMBOL(ldlm_pool_get_lvf); + #ifdef __KERNEL__ static int ldlm_pool_granted(struct ldlm_pool *pl) { @@ -759,9 +995,11 @@ void ldlm_pools_wakeup(void) } EXPORT_SYMBOL(ldlm_pools_wakeup); -/* Cancel @nr locks from all namespaces (if possible). Returns number of +/* + * Cancel \a nr locks from all namespaces (if possible). Returns number of * cached locks after shrink is finished. All namespaces are asked to - * cancel approximately equal amount of locks. */ + * cancel approximately equal amount of locks to keep balancing. + */ static int ldlm_pools_shrink(ldlm_side_t client, int nr, unsigned int gfp_mask) { @@ -771,10 +1009,12 @@ static int ldlm_pools_shrink(ldlm_side_t client, int nr, if (nr != 0 && !(gfp_mask & __GFP_FS)) return -1; - CDEBUG(D_DLMTRACE, "request to shrink %d %s locks from all pools\n", + CDEBUG(D_DLMTRACE, "Request to shrink %d %s locks from all pools\n", nr, client == LDLM_NAMESPACE_CLIENT ? "client" : "server"); - /* Find out how many resources we may release. */ + /* + * Find out how many resources we may release. + */ for (nr_ns = atomic_read(ldlm_namespace_nr(client)); nr_ns > 0; nr_ns--) { @@ -794,19 +1034,25 @@ static int ldlm_pools_shrink(ldlm_side_t client, int nr, if (nr == 0 || total == 0) return total; - /* Shrink at least ldlm_namespace_nr(client) namespaces. */ + /* + * Shrink at least ldlm_namespace_nr(client) namespaces. + */ for (nr_ns = atomic_read(ldlm_namespace_nr(client)); nr_ns > 0; nr_ns--) { int cancel, nr_locks; - /* Do not call shrink under ldlm_namespace_lock(client) */ + /* + * Do not call shrink under ldlm_namespace_lock(client) + */ mutex_down(ldlm_namespace_lock(client)); if (list_empty(ldlm_namespace_list(client))) { mutex_up(ldlm_namespace_lock(client)); - /* If list is empty, we can't return any @cached > 0, + /* + * If list is empty, we can't return any @cached > 0, * that probably would cause needless shrinker - * call. */ + * call. + */ cached = 0; break; } @@ -840,9 +1086,13 @@ void ldlm_pools_recalc(ldlm_side_t client) struct ldlm_namespace *ns; int nr, equal = 0; - /* No need to setup pool limit for client pools. */ + /* + * No need to setup pool limit for client pools. + */ if (client == LDLM_NAMESPACE_SERVER) { - /* Check all modest namespaces first. */ + /* + * Check all modest namespaces first. + */ mutex_down(ldlm_namespace_lock(client)); list_for_each_entry(ns, ldlm_namespace_list(client), ns_list_chain) @@ -854,16 +1104,20 @@ void ldlm_pools_recalc(ldlm_side_t client) if (l == 0) l = 1; - /* Set the modest pools limit equal to their avg granted - * locks + 5%. */ + /* + * Set the modest pools limit equal to their avg granted + * locks + 5%. + */ l += dru(l * LDLM_POOLS_MODEST_MARGIN, 100); ldlm_pool_setup(&ns->ns_pool, l); nr_l += l; nr_p++; } - /* Make sure that modest namespaces did not eat more that 2/3 - * of limit */ + /* + * Make sure that modest namespaces did not eat more that 2/3 + * of limit. + */ if (nr_l >= 2 * (LDLM_POOL_HOST_L / 3)) { CWARN("\"Modest\" pools eat out 2/3 of server locks " "limit (%d of %lu). This means that you have too " @@ -872,7 +1126,9 @@ void ldlm_pools_recalc(ldlm_side_t client) equal = 1; } - /* The rest is given to greedy namespaces. */ + /* + * The rest is given to greedy namespaces. + */ list_for_each_entry(ns, ldlm_namespace_list(client), ns_list_chain) { @@ -880,14 +1136,18 @@ void ldlm_pools_recalc(ldlm_side_t client) continue; if (equal) { - /* In the case 2/3 locks are eaten out by + /* + * In the case 2/3 locks are eaten out by * modest pools, we re-setup equal limit - * for _all_ pools. */ + * for _all_ pools. + */ l = LDLM_POOL_HOST_L / atomic_read(ldlm_namespace_nr(client)); } else { - /* All the rest of greedy pools will have - * all locks in equal parts.*/ + /* + * All the rest of greedy pools will have + * all locks in equal parts. + */ l = (LDLM_POOL_HOST_L - nr_l) / (atomic_read(ldlm_namespace_nr(client)) - nr_p); @@ -897,13 +1157,17 @@ void ldlm_pools_recalc(ldlm_side_t client) mutex_up(ldlm_namespace_lock(client)); } - /* Recalc at least ldlm_namespace_nr(client) namespaces. */ + /* + * Recalc at least ldlm_namespace_nr(client) namespaces. + */ for (nr = atomic_read(ldlm_namespace_nr(client)); nr > 0; nr--) { - /* Lock the list, get first @ns in the list, getref, move it + /* + * Lock the list, get first @ns in the list, getref, move it * to the tail, unlock and call pool recalc. This way we avoid * calling recalc under @ns lock what is really good as we get * rid of potential deadlock on client nodes when canceling - * locks synchronously. */ + * locks synchronously. + */ mutex_down(ldlm_namespace_lock(client)); if (list_empty(ldlm_namespace_list(client))) { mutex_up(ldlm_namespace_lock(client)); @@ -914,7 +1178,9 @@ void ldlm_pools_recalc(ldlm_side_t client) ldlm_namespace_move_locked(ns, client); mutex_up(ldlm_namespace_lock(client)); - /* After setup is done - recalc the pool. */ + /* + * After setup is done - recalc the pool. + */ ldlm_pool_recalc(&ns->ns_pool); ldlm_namespace_put(ns, 1); } @@ -937,12 +1203,16 @@ static int ldlm_pools_thread_main(void *arg) while (1) { struct l_wait_info lwi; - /* Recal all pools on this tick. */ + /* + * Recal all pools on this tick. + */ ldlm_pools_recalc(LDLM_NAMESPACE_SERVER); ldlm_pools_recalc(LDLM_NAMESPACE_CLIENT); - /* Wait until the next check time, or until we're - * stopped. */ + /* + * Wait until the next check time, or until we're + * stopped. + */ lwi = LWI_TIMEOUT(cfs_time_seconds(LDLM_POOLS_THREAD_PERIOD), NULL, NULL); l_wait_event(thread->t_ctl_waitq, (thread->t_flags & @@ -982,8 +1252,10 @@ static int ldlm_pools_thread_start(void) init_completion(&ldlm_pools_comp); cfs_waitq_init(&ldlm_pools_thread->t_ctl_waitq); - /* CLONE_VM and CLONE_FILES just avoid a needless copy, because we - * just drop the VM and FILES in ptlrpc_daemonize() right away. */ + /* + * CLONE_VM and CLONE_FILES just avoid a needless copy, because we + * just drop the VM and FILES in ptlrpc_daemonize() right away. + */ rc = cfs_kernel_thread(ldlm_pools_thread_main, ldlm_pools_thread, CLONE_VM | CLONE_FILES); if (rc < 0) { @@ -1010,9 +1282,11 @@ static void ldlm_pools_thread_stop(void) ldlm_pools_thread->t_flags = SVC_STOPPING; cfs_waitq_signal(&ldlm_pools_thread->t_ctl_waitq); - /* Make sure that pools thread is finished before freeing @thread. + /* + * Make sure that pools thread is finished before freeing @thread. * This fixes possible race and oops due to accessing freed memory - * in pools thread. */ + * in pools thread. + */ wait_for_completion(&ldlm_pools_comp); OBD_FREE_PTR(ldlm_pools_thread); ldlm_pools_thread = NULL; @@ -1107,6 +1381,18 @@ void ldlm_pool_set_slv(struct ldlm_pool *pl, __u64 slv) } EXPORT_SYMBOL(ldlm_pool_set_slv); +__u64 ldlm_pool_get_clv(struct ldlm_pool *pl) +{ + return 1; +} +EXPORT_SYMBOL(ldlm_pool_get_clv); + +void ldlm_pool_set_clv(struct ldlm_pool *pl, __u64 clv) +{ + return; +} +EXPORT_SYMBOL(ldlm_pool_set_clv); + __u32 ldlm_pool_get_limit(struct ldlm_pool *pl) { return 0; @@ -1119,6 +1405,12 @@ void ldlm_pool_set_limit(struct ldlm_pool *pl, __u32 limit) } EXPORT_SYMBOL(ldlm_pool_set_limit); +__u32 ldlm_pool_get_lvf(struct ldlm_pool *pl) +{ + return 0; +} +EXPORT_SYMBOL(ldlm_pool_get_lvf); + int ldlm_pools_init(void) { return 0; diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index 3a02565..45799a0 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -1000,24 +1000,35 @@ static inline struct ldlm_pool *ldlm_imp2pl(struct obd_import *imp) return &imp->imp_obd->obd_namespace->ns_pool; } +/** + * Update client's obd pool related fields with new SLV and Limit from \a req. + */ int ldlm_cli_update_pool(struct ptlrpc_request *req) { + struct obd_device *obd; __u64 old_slv, new_slv; - struct ldlm_pool *pl; __u32 new_limit; ENTRY; - if (!imp_connect_lru_resize(req->rq_import)) + if (unlikely(!req->rq_import || !req->rq_import->imp_obd || + !imp_connect_lru_resize(req->rq_import))) + { + /* + * Do nothing for corner cases. + */ RETURN(0); + } - /* In some cases RPC may contain slv and limit zeroed out. This is + /* + * In some cases RPC may contain slv and limit zeroed out. This is * the case when server does not support lru resize feature. This is * also possible in some recovery cases when server side reqs have no * ref to obd export and thus access to server side namespace is no - * possible. */ + * possible. + */ if (lustre_msg_get_slv(req->rq_repmsg) == 0 || lustre_msg_get_limit(req->rq_repmsg) == 0) { - DEBUG_REQ(D_HA, req, "zero SLV or Limit found " + DEBUG_REQ(D_HA, req, "Zero SLV or Limit found " "(SLV: "LPU64", Limit: %u)", lustre_msg_get_slv(req->rq_repmsg), lustre_msg_get_limit(req->rq_repmsg)); @@ -1026,30 +1037,41 @@ int ldlm_cli_update_pool(struct ptlrpc_request *req) new_limit = lustre_msg_get_limit(req->rq_repmsg); new_slv = lustre_msg_get_slv(req->rq_repmsg); - pl = ldlm_imp2pl(req->rq_import); - - spin_lock(&pl->pl_lock); - old_slv = ldlm_pool_get_slv(pl); - ldlm_pool_set_slv(pl, new_slv); - ldlm_pool_set_limit(pl, new_limit); - - /* Check if we need to wakeup pools thread for fast SLV change. + obd = req->rq_import->imp_obd; + + /* + * Set new SLV and Limit to obd fields to make accessible for pool + * thread. We do not access obd_namespace and pool directly here + * as there is no reliable way to make sure that they are still + * alive in cleanup time. Evil races are possible which may cause + * oops in that time. + */ + write_lock(&obd->obd_pool_lock); + old_slv = obd->obd_pool_slv; + obd->obd_pool_slv = new_slv; + obd->obd_pool_limit = new_limit; + write_unlock(&obd->obd_pool_lock); + + /* + * Check if we need to wakeup pools thread for fast SLV change. * This is only done when threads period is noticably long like - * 10s or more. */ + * 10s or more. + */ #if defined(__KERNEL__) && (LDLM_POOLS_THREAD_PERIOD >= 10) - { + if (old_slv > 0) { __u64 fast_change = old_slv * LDLM_POOLS_FAST_SLV_CHANGE; do_div(fast_change, 100); - /* Wake up pools thread only if SLV has changed more than + /* + * Wake up pools thread only if SLV has changed more than * 50% since last update. In this case we want to react asap. * Otherwise it is no sense to wake up pools as they are - * re-calculated every LDLM_POOLS_THREAD_PERIOD anyways. */ + * re-calculated every LDLM_POOLS_THREAD_PERIOD anyways. + */ if (old_slv > new_slv && old_slv - new_slv > fast_change) ldlm_pools_wakeup(); } #endif - spin_unlock(&pl->pl_lock); RETURN(0); } EXPORT_SYMBOL(ldlm_cli_update_pool); @@ -1205,17 +1227,17 @@ static ldlm_policy_res_t ldlm_cancel_lrur_policy(struct ldlm_namespace *ns, if (count && added >= count) return LDLM_POLICY_KEEP_LOCK; - spin_lock(&pl->pl_lock); slv = ldlm_pool_get_slv(pl); - lvf = atomic_read(&pl->pl_lock_volume_factor); - spin_unlock(&pl->pl_lock); - + lvf = ldlm_pool_get_lvf(pl); la = cfs_duration_sec(cfs_time_sub(cur, lock->l_last_used)); /* Stop when slv is not yet come from server or * lv is smaller than it is. */ lv = lvf * la * unused; + + /* Inform pool about current CLV to see it via proc. */ + ldlm_pool_set_clv(pl, lv); return (slv == 1 || lv < slv) ? LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK; } diff --git a/lustre/ldlm/ldlm_resource.c b/lustre/ldlm/ldlm_resource.c index 4974f21..b943494 100644 --- a/lustre/ldlm/ldlm_resource.c +++ b/lustre/ldlm/ldlm_resource.c @@ -295,8 +295,8 @@ void ldlm_proc_namespace(struct ldlm_namespace *ns) #define ldlm_proc_namespace(ns) do {} while (0) #endif /* LPROCFS */ -struct ldlm_namespace *ldlm_namespace_new(char *name, ldlm_side_t client, - ldlm_appetite_t apt) +struct ldlm_namespace *ldlm_namespace_new(struct obd_device *obd, char *name, + ldlm_side_t client, ldlm_appetite_t apt) { struct ldlm_namespace *ns = NULL; struct list_head *bucket; @@ -319,6 +319,10 @@ struct ldlm_namespace *ldlm_namespace_new(char *name, ldlm_side_t client, ns->ns_shrink_thumb = LDLM_LOCK_SHRINK_THUMB; ns->ns_appetite = apt; + + LASSERT(obd != NULL); + ns->ns_obd = obd; + namelen = strlen(name); OBD_ALLOC(ns->ns_name, namelen + 1); if (!ns->ns_name) @@ -542,6 +546,15 @@ force_wait: RETURN(ELDLM_OK); } +/** + * Performs various cleanups for passed \a ns to make it drop refc and be ready + * for freeing. Waits for refc == 0. + * + * The following is done: + * (0) Unregister \a ns from its list to make inaccessible for potential users + * like pools thread and others; + * (1) Clear all locks in \a ns. + */ void ldlm_namespace_free_prior(struct ldlm_namespace *ns, struct obd_import *imp, int force) @@ -553,10 +566,14 @@ void ldlm_namespace_free_prior(struct ldlm_namespace *ns, return; } - /* Remove @ns from list. */ + /* + * Make sure that nobody can find this ns in its list. + */ ldlm_namespace_unregister(ns, ns->ns_client); - /* Can fail with -EINTR when force == 0 in which case try harder */ + /* + * Can fail with -EINTR when force == 0 in which case try harder. + */ rc = __ldlm_namespace_free(ns, force); if (rc != ELDLM_OK) { if (imp) { @@ -564,14 +581,21 @@ void ldlm_namespace_free_prior(struct ldlm_namespace *ns, ptlrpc_invalidate_import(imp); } - /* With all requests dropped and the import inactive - * we are gaurenteed all reference will be dropped. */ + /* + * With all requests dropped and the import inactive + * we are gaurenteed all reference will be dropped. + */ rc = __ldlm_namespace_free(ns, 1); LASSERT(rc == 0); } EXIT; } +/** + * Performs freeing memory structures related to \a ns. This is only done when + * ldlm_namespce_free_prior() successfully removed all resources referencing + * \a ns and its refc == 0. + */ void ldlm_namespace_free_post(struct ldlm_namespace *ns) { ENTRY; @@ -586,6 +610,7 @@ void ldlm_namespace_free_post(struct ldlm_namespace *ns) * it after @dir may cause oops. */ ldlm_pool_fini(&ns->ns_pool); + #ifdef LPROCFS { struct proc_dir_entry *dir; @@ -601,9 +626,10 @@ void ldlm_namespace_free_post(struct ldlm_namespace *ns) OBD_VFREE(ns->ns_hash, sizeof(*ns->ns_hash) * RES_HASH_SIZE); OBD_FREE(ns->ns_name, strlen(ns->ns_name) + 1); - /* - * @ns should be not on list in this time, otherwise this will cause - * issues realted to using freed @ns in pools thread. + + /* + * Namespace \a ns should be not on list in this time, otherwise this + * will cause issues realted to using freed \a ns in pools thread. */ LASSERT(list_empty(&ns->ns_list_chain)); OBD_FREE_PTR(ns); diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 2e35988..618638b 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -3968,7 +3968,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, snprintf(info->mti_u.ns_name, sizeof info->mti_u.ns_name, LUSTRE_MDT_NAME"-%p", m); - m->mdt_namespace = ldlm_namespace_new(info->mti_u.ns_name, + m->mdt_namespace = ldlm_namespace_new(obd, info->mti_u.ns_name, LDLM_NAMESPACE_SERVER, LDLM_NAMESPACE_GREEDY); if (m->mdt_namespace == NULL) diff --git a/lustre/mgs/mgs_handler.c b/lustre/mgs/mgs_handler.c index 4781703..af67705 100644 --- a/lustre/mgs/mgs_handler.c +++ b/lustre/mgs/mgs_handler.c @@ -180,7 +180,7 @@ static int mgs_setup(struct obd_device *obd, struct lustre_cfg *lcfg) GOTO(err_put, rc = PTR_ERR(obd->obd_fsops)); /* namespace for mgs llog */ - obd->obd_namespace = ldlm_namespace_new("MGS", LDLM_NAMESPACE_SERVER, + obd->obd_namespace = ldlm_namespace_new(obd ,"MGS", LDLM_NAMESPACE_SERVER, LDLM_NAMESPACE_MODEST); if (obd->obd_namespace == NULL) GOTO(err_ops, rc = -ENOMEM); @@ -276,16 +276,9 @@ static int mgs_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage) RETURN(rc); } -static int mgs_ldlm_nsfree(void *data) -{ - struct ldlm_namespace *ns = (struct ldlm_namespace *)data; - ENTRY; - - ptlrpc_daemonize("ll_mgs_nsfree"); - ldlm_namespace_free(ns, NULL, 1 /* obd_force should always be on */); - RETURN(0); -} - +/** + * Performs cleanup procedures for passed \a obd given it is mgs obd. + */ static int mgs_cleanup(struct obd_device *obd) { struct mgs_obd *mgs = &obd->u.mgs; @@ -305,12 +298,8 @@ static int mgs_cleanup(struct obd_device *obd) server_put_mount(obd->obd_name, mgs->mgs_vfsmnt); mgs->mgs_sb = NULL; - /* Free the namespace in it's own thread, so that if the - ldlm_cancel_handler put the last mgs obd ref, we won't - deadlock here. */ - cfs_kernel_thread(mgs_ldlm_nsfree, obd->obd_namespace, - CLONE_VM | CLONE_FILES); - + ldlm_namespace_free(obd->obd_namespace, NULL, 1); + obd->obd_namespace = NULL; fsfilt_put_ops(obd->obd_fsops); diff --git a/lustre/obdclass/genops.c b/lustre/obdclass/genops.c index 93a11de..e906d83 100644 --- a/lustre/obdclass/genops.c +++ b/lustre/obdclass/genops.c @@ -72,6 +72,11 @@ static void obd_device_free(struct obd_device *obd) LASSERT(obd != NULL); LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC); + if (obd->obd_namespace != NULL) { + CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n", + obd, obd->obd_namespace, obd->obd_force); + LBUG(); + } OBD_SLAB_FREE_PTR(obd, obd_device_cachep); } EXPORT_SYMBOL(obd_device_free); diff --git a/lustre/obdclass/obd_config.c b/lustre/obdclass/obd_config.c index 7cbba44..0dd3ab6 100644 --- a/lustre/obdclass/obd_config.c +++ b/lustre/obdclass/obd_config.c @@ -120,8 +120,9 @@ EXPORT_SYMBOL(class_parse_nid); /********************** class fns **********************/ -/* Create a new device and set the type, name and uuid. If - * successful, the new device can be accessed by either name or uuid. +/** + * Create a new device and set the type, name and uuid. If successful, the new + * device can be accessed by either name or uuid. */ int class_attach(struct lustre_cfg *lcfg) { @@ -168,6 +169,10 @@ int class_attach(struct lustre_cfg *lcfg) LASSERTF(strncmp(obd->obd_name, name, strlen(name)) == 0, "%p obd_name %s != %s\n", obd, obd->obd_name, name); + rwlock_init(&obd->obd_pool_lock); + obd->obd_pool_limit = 0; + obd->obd_pool_slv = 0; + CFS_INIT_LIST_HEAD(&obd->obd_exports); CFS_INIT_LIST_HEAD(&obd->obd_exports_timed); CFS_INIT_LIST_HEAD(&obd->obd_nid_stats); diff --git a/lustre/obdecho/echo.c b/lustre/obdecho/echo.c index 197d3cd..76288a1 100644 --- a/lustre/obdecho/echo.c +++ b/lustre/obdecho/echo.c @@ -466,7 +466,7 @@ static int echo_setup(struct obd_device *obd, struct lustre_cfg *lcfg) spin_lock_init(&obd->u.echo.eo_lock); obd->u.echo.eo_lastino = ECHO_INIT_OBJID; - obd->obd_namespace = ldlm_namespace_new("echo-tgt", + obd->obd_namespace = ldlm_namespace_new(obd, "echo-tgt", LDLM_NAMESPACE_SERVER, LDLM_NAMESPACE_MODEST); if (obd->obd_namespace == NULL) { @@ -512,6 +512,7 @@ static int echo_cleanup(struct obd_device *obd) cfs_schedule_timeout (CFS_TASK_UNINT, cfs_time_seconds(1)); ldlm_namespace_free(obd->obd_namespace, NULL, obd->obd_force); + obd->obd_namespace = NULL; leaked = atomic_read(&obd->u.echo.eo_prep); if (leaked != 0) diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index 3b3d1fc..87b6614 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -2008,7 +2008,7 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg, GOTO(err_ops, rc = -ENOMEM); sprintf(ns_name, "filter-%s", obd->obd_uuid.uuid); - obd->obd_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER, + obd->obd_namespace = ldlm_namespace_new(obd, ns_name, LDLM_NAMESPACE_SERVER, LDLM_NAMESPACE_GREEDY); if (obd->obd_namespace == NULL) GOTO(err_post, rc = -ENOMEM); @@ -2403,6 +2403,7 @@ static int filter_cleanup(struct obd_device *obd) target_cleanup_recovery(obd); ldlm_namespace_free(obd->obd_namespace, NULL, obd->obd_force); + obd->obd_namespace = NULL; sptlrpc_rule_set_free(&filter->fo_sptlrpc_rset); diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index b536445..dac8412 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -755,6 +755,9 @@ static int ptlrpc_check_status(struct ptlrpc_request *req) RETURN(err); } +/** + * Callback function called when client receives RPC reply for \a req. + */ static int after_reply(struct ptlrpc_request *req) { struct obd_import *imp = req->rq_import; @@ -768,10 +771,14 @@ static int after_reply(struct ptlrpc_request *req) LASSERT(obd); LASSERT(req->rq_nob_received <= req->rq_repbuf_len); - /* NB Until this point, the whole of the incoming message, - * including buflens, status etc is in the sender's byte order. */ + /* + * NB Until this point, the whole of the incoming message, + * including buflens, status etc is in the sender's byte order. + */ - /* Clear reply swab mask; this is a new reply in sender's byte order */ + /* + * Clear reply swab mask; this is a new reply in sender's byte order. + */ req->rq_rep_swab_mask = 0; rc = sptlrpc_cli_unwrap_reply(req); @@ -780,7 +787,9 @@ static int after_reply(struct ptlrpc_request *req) RETURN(rc); } - /* security layer unwrap might ask resend this request */ + /* + * Security layer unwrap might ask resend this request. + */ if (req->rq_resend) RETURN(0); @@ -813,9 +822,11 @@ static int after_reply(struct ptlrpc_request *req) imp->imp_connect_error = rc; if (rc) { - /* Either we've been evicted, or the server has failed for + /* + * Either we've been evicted, or the server has failed for * some reason. Try to reconnect, and if that fails, punt to - * the upcall. */ + * the upcall. + */ if (ll_rpc_recoverable_error(rc)) { if (req->rq_send_state != LUSTRE_IMP_FULL || imp->imp_obd->obd_no_recov || imp->imp_dlm_fake) { @@ -825,24 +836,25 @@ static int after_reply(struct ptlrpc_request *req) RETURN(rc); } } else { - /* Let's look if server send slv. Do it only for RPC with - * rc == 0. */ - if (imp->imp_obd->obd_namespace) { - /* Disconnect rpc is sent when namespace is already - * destroyed. Let's check this and will not try update - * pool. */ - ldlm_cli_update_pool(req); - } + /* + * Let's look if server sent slv. Do it only for RPC with + * rc == 0. + */ + ldlm_cli_update_pool(req); } - /* Store transno in reqmsg for replay. */ + /* + * Store transno in reqmsg for replay. + */ req->rq_transno = lustre_msg_get_transno(req->rq_repmsg); lustre_msg_set_transno(req->rq_reqmsg, req->rq_transno); if (req->rq_import->imp_replayable) { spin_lock(&imp->imp_lock); - /* no point in adding already-committed requests to the replay - * list, we will just remove them immediately. b=9829 */ + /* + * No point in adding already-committed requests to the replay + * list, we will just remove them immediately. b=9829 + */ if (req->rq_transno != 0 && (req->rq_transno > lustre_msg_get_last_committed(req->rq_repmsg) || @@ -854,7 +866,9 @@ static int after_reply(struct ptlrpc_request *req) spin_lock(&imp->imp_lock); } - /* Replay-enabled imports return commit-status information. */ + /* + * Replay-enabled imports return commit-status information. + */ if (lustre_msg_get_last_committed(req->rq_repmsg)) { imp->imp_peer_committed_transno = lustre_msg_get_last_committed(req->rq_repmsg); -- 1.8.3.1