From 6d07ea6396b2faf2dd0af3383b0ea6246cefef3d Mon Sep 17 00:00:00 2001 From: yury Date: Sat, 7 Jun 2008 09:37:41 +0000 Subject: [PATCH] b=15226 r=nikita,shadow - fixes access to freed ns and pool on behalf of comming rpcs in finalization time; - cleanups, comments. --- lustre/include/liblustre.h | 8 + lustre/include/lustre_dlm.h | 11 +- lustre/include/obd.h | 5 + lustre/ldlm/ldlm_lib.c | 31 +-- lustre/ldlm/ldlm_pool.c | 470 +++++++++++++++++++++++++++++++++++-------- lustre/ldlm/ldlm_request.c | 62 ++++-- lustre/ldlm/ldlm_resource.c | 22 +- lustre/mds/handler.c | 3 +- lustre/mgs/mgs_handler.c | 20 +- lustre/obdclass/genops.c | 10 +- lustre/obdclass/obd_config.c | 4 + lustre/obdecho/echo.c | 3 +- lustre/obdfilter/filter.c | 3 +- lustre/ptlrpc/client.c | 9 +- lustre/ptlrpc/niobuf.c | 3 +- 15 files changed, 498 insertions(+), 166 deletions(-) diff --git a/lustre/include/liblustre.h b/lustre/include/liblustre.h index 79b35fd..84f9242 100644 --- a/lustre/include/liblustre.h +++ b/lustre/include/liblustre.h @@ -295,6 +295,14 @@ static inline void spin_unlock_bh(spinlock_t *l) {} static inline void spin_lock_irqsave(spinlock_t *a, unsigned long b) {} static inline void spin_unlock_irqrestore(spinlock_t *a, unsigned long b) {} +typedef spinlock_t rwlock_t; +#define RW_LOCK_UNLOCKED SPIN_LOCK_UNLOCKED +#define read_lock(l) spin_lock(l) +#define read_unlock(l) spin_unlock(l) +#define write_lock(l) spin_lock(l) +#define write_unlock(l) spin_unlock(l) +#define rwlock_init(l) spin_lock_init(l) + #define min(x,y) ((x)<(y) ? (x) : (y)) #define max(x,y) ((x)>(y) ? (x) : (y)) diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h index e8a3bbf..e3fe88f 100644 --- a/lustre/include/lustre_dlm.h +++ b/lustre/include/lustre_dlm.h @@ -245,6 +245,7 @@ struct ldlm_pool { atomic_t pl_grant_speed; /* Grant speed (GR-CR) per T. */ __u64 pl_server_lock_volume; /* Server lock volume. * Protected by pl_lock */ + __u64 pl_client_lock_volume; /* Client lock volue. */ atomic_t pl_lock_volume_factor; /* Lock volume factor. */ time_t pl_recalc_time; /* Time when last slv from @@ -330,6 +331,8 @@ struct ldlm_namespace { unsigned ns_max_nolock_size; struct adaptive_timeout ns_at_estimate;/* estimated lock callback time*/ + /* backward link to obd, required for ldlm pool to store new SLV. */ + struct obd_device *ns_obd; }; static inline int ns_is_client(struct ldlm_namespace *ns) @@ -693,8 +696,9 @@ void ldlm_lock_dump_handle(int level, struct lustre_handle *); void ldlm_unlink_lock_skiplist(struct ldlm_lock *req); /* resource.c */ -struct ldlm_namespace *ldlm_namespace_new(char *name, ldlm_side_t client, - ldlm_appetite_t apt); +struct ldlm_namespace * +ldlm_namespace_new(struct obd_device *obd, char *name, + ldlm_side_t client, ldlm_appetite_t apt); int ldlm_namespace_cleanup(struct ldlm_namespace *ns, int flags); void ldlm_namespace_free(struct ldlm_namespace *ns, struct obd_import *imp, int force); @@ -825,9 +829,12 @@ int ldlm_pool_shrink(struct ldlm_pool *pl, int nr, void ldlm_pool_fini(struct ldlm_pool *pl); int ldlm_pool_setup(struct ldlm_pool *pl, int limit); int ldlm_pool_recalc(struct ldlm_pool *pl); +__u32 ldlm_pool_get_lvf(struct ldlm_pool *pl); __u64 ldlm_pool_get_slv(struct ldlm_pool *pl); +__u64 ldlm_pool_get_clv(struct ldlm_pool *pl); __u32 ldlm_pool_get_limit(struct ldlm_pool *pl); void ldlm_pool_set_slv(struct ldlm_pool *pl, __u64 slv); +void ldlm_pool_set_clv(struct ldlm_pool *pl, __u64 clv); void ldlm_pool_set_limit(struct ldlm_pool *pl, __u32 limit); void ldlm_pool_add(struct ldlm_pool *pl, struct ldlm_lock *lock); void ldlm_pool_del(struct ldlm_pool *pl, struct ldlm_lock *lock); diff --git a/lustre/include/obd.h b/lustre/include/obd.h index fd2c060..4dcdf89 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -856,6 +856,11 @@ struct obd_device { unsigned int obd_cntr_base; atomic_t obd_evict_inprogress; cfs_waitq_t obd_evict_inprogress_waitq; + + /* Ldlm pool part. Save last calculated SLV and Limit. */ + rwlock_t obd_pool_lock; + int obd_pool_limit; + __u64 obd_pool_slv; }; #define OBD_OPT_FORCE 0x0001 diff --git a/lustre/ldlm/ldlm_lib.c b/lustre/ldlm/ldlm_lib.c index 97f502f..648cd0e 100644 --- a/lustre/ldlm/ldlm_lib.c +++ b/lustre/ldlm/ldlm_lib.c @@ -383,7 +383,7 @@ int client_connect_import(struct lustre_handle *dlm_handle, if (obd->obd_namespace != NULL) CERROR("already have namespace!\n"); - obd->obd_namespace = ldlm_namespace_new(obd->obd_name, + obd->obd_namespace = ldlm_namespace_new(obd, obd->obd_name, LDLM_NAMESPACE_CLIENT, LDLM_NAMESPACE_GREEDY); if (obd->obd_namespace == NULL) @@ -1562,24 +1562,29 @@ static inline struct ldlm_pool *ldlm_exp2pl(struct obd_export *exp) int target_pack_pool_reply(struct ptlrpc_request *req) { - struct ldlm_pool *pl; + struct obd_device *obd; ENTRY; - - if (!req->rq_export || !req->rq_export->exp_obd || - !req->rq_export->exp_obd->obd_namespace || - !exp_connect_lru_resize(req->rq_export)) { + + /* + * Check that we still have all structures alive as this may + * be some late rpc in shutdown time. + */ + if (unlikely(!req->rq_export || !req->rq_export->exp_obd || + !exp_connect_lru_resize(req->rq_export))) { lustre_msg_set_slv(req->rq_repmsg, 0); lustre_msg_set_limit(req->rq_repmsg, 0); RETURN(0); } - - pl = ldlm_exp2pl(req->rq_export); - spin_lock(&pl->pl_lock); - LASSERT(ldlm_pool_get_slv(pl) != 0 && ldlm_pool_get_limit(pl) != 0); - lustre_msg_set_slv(req->rq_repmsg, ldlm_pool_get_slv(pl)); - lustre_msg_set_limit(req->rq_repmsg, ldlm_pool_get_limit(pl)); - spin_unlock(&pl->pl_lock); + /* + * OBD is alive here as export is alive, which we checked above. + */ + obd = req->rq_export->exp_obd; + + read_lock(&obd->obd_pool_lock); + lustre_msg_set_slv(req->rq_repmsg, obd->obd_pool_slv); + lustre_msg_set_limit(req->rq_repmsg, obd->obd_pool_limit); + read_unlock(&obd->obd_pool_lock); RETURN(0); } diff --git a/lustre/ldlm/ldlm_pool.c b/lustre/ldlm/ldlm_pool.c index fbcf055..bd89cfa 100644 --- a/lustre/ldlm/ldlm_pool.c +++ b/lustre/ldlm/ldlm_pool.c @@ -23,7 +23,8 @@ * license text for more details. */ -/* Idea of this code is rather simple. Each second, for each server namespace +/* + * Idea of this code is rather simple. Each second, for each server namespace * we have SLV - server lock volume which is calculated on current number of * granted locks, grant speed for past period, etc - that is, locking load. * This SLV number may be thought as a flow definition for simplicity. It is @@ -98,16 +99,24 @@ #ifdef HAVE_LRU_RESIZE_SUPPORT -/* 50 ldlm locks for 1MB of RAM. */ -#define LDLM_POOL_HOST_L ((num_physpages >> (20 - PAGE_SHIFT)) * 50) +/* + * 50 ldlm locks for 1MB of RAM. + */ +#define LDLM_POOL_HOST_L ((num_physpages >> (20 - CFS_PAGE_SHIFT)) * 50) -/* Default step in % for grant plan. */ +/* + * Default step in % for grant plan. + */ #define LDLM_POOL_GSP (10) -/* LDLM_POOL_GSP% of all locks is default GP. */ +/* + * LDLM_POOL_GSP% of all locks is default GP. + */ #define LDLM_POOL_GP(L) (((L) * LDLM_POOL_GSP) / 100) -/* Max age for locks on clients. */ +/* + * Max age for locks on clients. + */ #define LDLM_POOL_MAX_AGE (36000) #ifdef __KERNEL__ @@ -126,8 +135,10 @@ static inline __u64 dru(__u64 val, __u32 div) static inline __u64 ldlm_pool_slv_max(__u32 L) { - /* Allow to have all locks for 1 client for 10 hrs. - * Formula is the following: limit * 10h / 1 client. */ + /* + * Allow to have all locks for 1 client for 10 hrs. + * Formula is the following: limit * 10h / 1 client. + */ __u64 lim = L * LDLM_POOL_MAX_AGE / 1; return lim; } @@ -158,7 +169,11 @@ static inline struct ldlm_namespace *ldlm_pl2ns(struct ldlm_pool *pl) return container_of(pl, struct ldlm_namespace, ns_pool); } -/* Should be called under ->pl_lock taken */ +/** + * Recalculates next grant limit on passed \a pl. + * + * \pre ->pl_lock is locked. + */ static inline void ldlm_pool_recalc_grant_plan(struct ldlm_pool *pl) { int granted, grant_step, limit; @@ -170,14 +185,18 @@ static inline void ldlm_pool_recalc_grant_plan(struct ldlm_pool *pl) pl->pl_grant_plan = granted + grant_step; } -/* Should be called under ->pl_lock taken */ +/** + * Recalculates next SLV on passed \a pl. + * + * \pre ->pl_lock is locked. + */ static inline void ldlm_pool_recalc_slv(struct ldlm_pool *pl) { int grant_usage, granted, grant_plan; __u64 slv, slv_factor; __u32 limit; - slv = ldlm_pool_get_slv(pl); + slv = pl->pl_server_lock_volume; grant_plan = pl->pl_grant_plan; limit = ldlm_pool_get_limit(pl); granted = atomic_read(&pl->pl_granted); @@ -186,12 +205,14 @@ static inline void ldlm_pool_recalc_slv(struct ldlm_pool *pl) if (grant_usage <= 0) grant_usage = 1; - /* Find out SLV change factor which is the ratio of grant usage + /* + * Find out SLV change factor which is the ratio of grant usage * from limit. SLV changes as fast as the ratio of grant plan * consumtion. The more locks from grant plan are not consumed * by clients in last interval (idle time), the faster grows * SLV. And the opposite, the more grant plan is over-consumed - * (load time) the faster drops SLV. */ + * (load time) the faster drops SLV. + */ slv_factor = (grant_usage * 100) / limit; if (2 * abs(granted - limit) > limit) { slv_factor *= slv_factor; @@ -206,13 +227,18 @@ static inline void ldlm_pool_recalc_slv(struct ldlm_pool *pl) slv = ldlm_pool_slv_min(limit); } - ldlm_pool_set_slv(pl, slv); + pl->pl_server_lock_volume = slv; } +/** + * Recalculates next stats on passed \a pl. + * + * \pre ->pl_lock is locked. + */ static inline void ldlm_pool_recalc_stats(struct ldlm_pool *pl) { - __u64 slv = ldlm_pool_get_slv(pl); int grant_plan = pl->pl_grant_plan; + __u64 slv = pl->pl_server_lock_volume; int granted = atomic_read(&pl->pl_granted); int grant_rate = atomic_read(&pl->pl_grant_rate); int cancel_rate = atomic_read(&pl->pl_cancel_rate); @@ -229,6 +255,32 @@ static inline void ldlm_pool_recalc_stats(struct ldlm_pool *pl) cancel_rate); } +/** + * Sets current SLV into obd accessible via ldlm_pl2ns(pl)->ns_obd. + */ +static void ldlm_srv_pool_push_slv(struct ldlm_pool *pl) +{ + struct obd_device *obd; + + /* + * Set new SLV in obd field for using it later without accessing the + * pool. This is required to avoid race between sending reply to client + * with new SLV and cleanup server stack in which we can't guarantee + * that namespace is still alive. We know only that obd is alive as + * long as valid export is alive. + */ + obd = ldlm_pl2ns(pl)->ns_obd; + LASSERT(obd != NULL); + write_lock(&obd->obd_pool_lock); + obd->obd_pool_slv = pl->pl_server_lock_volume; + write_unlock(&obd->obd_pool_lock); +} + +/** + * Recalculates all pool fields on passed \a pl. + * + * \pre ->pl_lock is not locked. + */ static int ldlm_srv_pool_recalc(struct ldlm_pool *pl) { time_t recalc_interval_sec; @@ -237,17 +289,30 @@ static int ldlm_srv_pool_recalc(struct ldlm_pool *pl) spin_lock(&pl->pl_lock); recalc_interval_sec = cfs_time_current_sec() - pl->pl_recalc_time; if (recalc_interval_sec > 0) { - /* Update statistics */ + /* + * Update statistics. + */ ldlm_pool_recalc_stats(pl); - /* Recalc SLV after last period. This should be done - * _before_ recalculating new grant plan. */ + /* + * Recalc SLV after last period. This should be done + * _before_ recalculating new grant plan. + */ ldlm_pool_recalc_slv(pl); - - /* Update grant_plan for new period. */ + + /* + * Make sure that pool informed obd of last SLV changes. + */ + ldlm_srv_pool_push_slv(pl); + + /* + * Update grant_plan for new period. + */ ldlm_pool_recalc_grant_plan(pl); - /* Zero out all rates and speed for the last period. */ + /* + * Zero out all rates and speed for the last period. + */ atomic_set(&pl->pl_grant_rate, 0); atomic_set(&pl->pl_cancel_rate, 0); atomic_set(&pl->pl_grant_speed, 0); @@ -259,26 +324,37 @@ static int ldlm_srv_pool_recalc(struct ldlm_pool *pl) RETURN(0); } -/* Our goal here is to decrease SLV the way to make a client hold - * @nr locks smaller in next 10h. */ +/** + * This function is used on server side as main entry point for memory + * preasure handling. It decreases SLV on \a pl according to passed + * \a nr and \a gfp_mask. + * + * Our goal here is to decrease SLV such a way that clients hold \a nr + * locks smaller in next 10h. + */ static int ldlm_srv_pool_shrink(struct ldlm_pool *pl, int nr, unsigned int gfp_mask) { __u32 limit; ENTRY; - /* VM is asking how many entries may be potentially freed. */ + /* + * VM is asking how many entries may be potentially freed. + */ if (nr == 0) RETURN(atomic_read(&pl->pl_granted)); - /* Client already canceled locks but server is already in shrinker - * and can't cancel anything. Let's catch this race. */ + /* + * Client already canceled locks but server is already in shrinker + * and can't cancel anything. Let's catch this race. + */ if (atomic_read(&pl->pl_granted) == 0) RETURN(0); spin_lock(&pl->pl_lock); - /* We want shrinker to possibly cause cancelation of @nr locks from + /* + * We want shrinker to possibly cause cancelation of @nr locks from * clients or grant approximately @nr locks smaller next intervals. * * This is why we decresed SLV by @nr. This effect will only be as @@ -287,27 +363,69 @@ static int ldlm_srv_pool_shrink(struct ldlm_pool *pl, * interval pool will either increase SLV if locks load is not high * or will keep on same level or even decrease again, thus, shrinker * decreased SLV will affect next recalc intervals and this way will - * make locking load lower. */ - if (nr < ldlm_pool_get_slv(pl)) { - ldlm_pool_set_slv(pl, ldlm_pool_get_slv(pl) - nr); + * make locking load lower. + */ + if (nr < pl->pl_server_lock_volume) { + pl->pl_server_lock_volume = pl->pl_server_lock_volume - nr; } else { limit = ldlm_pool_get_limit(pl); - ldlm_pool_set_slv(pl, ldlm_pool_slv_min(limit)); + pl->pl_server_lock_volume = ldlm_pool_slv_min(limit); } + + /* + * Make sure that pool informed obd of last SLV changes. + */ + ldlm_srv_pool_push_slv(pl); spin_unlock(&pl->pl_lock); - /* We did not really free any memory here so far, it only will be - * freed later may be, so that we return 0 to not confuse VM. */ + /* + * We did not really free any memory here so far, it only will be + * freed later may be, so that we return 0 to not confuse VM. + */ RETURN(0); } +/** + * Setup server side pool \a pl with passed \a limit. + */ static int ldlm_srv_pool_setup(struct ldlm_pool *pl, int limit) { + struct obd_device *obd; ENTRY; + + obd = ldlm_pl2ns(pl)->ns_obd; + LASSERT(obd != NULL && obd != LP_POISON); + LASSERT(obd->obd_type != LP_POISON); + write_lock(&obd->obd_pool_lock); + obd->obd_pool_limit = limit; + write_unlock(&obd->obd_pool_lock); + ldlm_pool_set_limit(pl, limit); RETURN(0); } +/** + * Sets SLV and Limit from ldlm_pl2ns(pl)->ns_obd tp passed \a pl. + */ +static void ldlm_cli_pool_pop_slv(struct ldlm_pool *pl) +{ + struct obd_device *obd; + + /* + * Get new SLV and Limit from obd which is updated with comming + * RPCs. + */ + obd = ldlm_pl2ns(pl)->ns_obd; + LASSERT(obd != NULL); + read_lock(&obd->obd_pool_lock); + pl->pl_server_lock_volume = obd->obd_pool_slv; + ldlm_pool_set_limit(pl, obd->obd_pool_limit); + read_unlock(&obd->obd_pool_lock); +} + +/** + * Recalculates client sise pool \a pl according to current SLV and Limit. + */ static int ldlm_cli_pool_recalc(struct ldlm_pool *pl) { time_t recalc_interval_sec; @@ -315,12 +433,21 @@ static int ldlm_cli_pool_recalc(struct ldlm_pool *pl) spin_lock(&pl->pl_lock); + /* + * Make sure that pool knows last SLV and Limit from obd. + */ + ldlm_cli_pool_pop_slv(pl); + recalc_interval_sec = cfs_time_current_sec() - pl->pl_recalc_time; if (recalc_interval_sec > 0) { - /* Update statistics only every T */ + /* + * Update statistics only every T. + */ ldlm_pool_recalc_stats(pl); - /* Zero out grant/cancel rates and speed for last period. */ + /* + * Zero out grant/cancel rates and speed for last period. + */ atomic_set(&pl->pl_grant_rate, 0); atomic_set(&pl->pl_cancel_rate, 0); atomic_set(&pl->pl_grant_speed, 0); @@ -330,34 +457,54 @@ static int ldlm_cli_pool_recalc(struct ldlm_pool *pl) } spin_unlock(&pl->pl_lock); - /* Do not cancel locks in case lru resize is disabled for this ns */ + /* + * Do not cancel locks in case lru resize is disabled for this ns. + */ if (!ns_connect_lru_resize(ldlm_pl2ns(pl))) RETURN(0); - /* In the time of canceling locks on client we do not need to maintain + /* + * In the time of canceling locks on client we do not need to maintain * sharp timing, we only want to cancel locks asap according to new SLV. - * This may be called when SLV has changed much, this is why we do not - * take into account pl->pl_recalc_time here. */ + * It may be called when SLV has changed much, this is why we do not + * take into account pl->pl_recalc_time here. + */ RETURN(ldlm_cancel_lru(ldlm_pl2ns(pl), 0, LDLM_ASYNC, LDLM_CANCEL_LRUR)); } +/** + * This function is main entry point for memory preasure handling on client side. + * Main goal of this function is to cancel some number of locks on passed \a pl + * according to \a nr and \a gfp_mask. + */ static int ldlm_cli_pool_shrink(struct ldlm_pool *pl, int nr, unsigned int gfp_mask) { ENTRY; - /* Do not cancel locks in case lru resize is disabled for this ns */ + /* + * Do not cancel locks in case lru resize is disabled for this ns. + */ if (!ns_connect_lru_resize(ldlm_pl2ns(pl))) RETURN(0); - /* Find out how many locks may be released according to shrink - * policy. */ + /* + * Make sure that pool knows last SLV and Limit from obd. + */ + ldlm_cli_pool_pop_slv(pl); + + /* + * Find out how many locks may be released according to shrink + * policy. + */ if (nr == 0) RETURN(ldlm_cancel_lru_estimate(ldlm_pl2ns(pl), 0, 0, LDLM_CANCEL_SHRINK)); - /* Cancel @nr locks accoding to shrink policy */ + /* + * Cancel @nr locks accoding to shrink policy. + */ RETURN(ldlm_cancel_lru(ldlm_pl2ns(pl), nr, LDLM_SYNC, LDLM_CANCEL_SHRINK)); } @@ -373,6 +520,10 @@ struct ldlm_pool_ops ldlm_cli_pool_ops = { .po_shrink = ldlm_cli_pool_shrink }; +/** + * Pool recalc wrapper. Will call either client or server pool recalc callback + * depending what pool \a pl is used. + */ int ldlm_pool_recalc(struct ldlm_pool *pl) { int count; @@ -387,6 +538,10 @@ int ldlm_pool_recalc(struct ldlm_pool *pl) } EXPORT_SYMBOL(ldlm_pool_recalc); +/** + * Pool shrink wrapper. Will call either client or server pool recalc callback + * depending what pool \a pl is used. + */ int ldlm_pool_shrink(struct ldlm_pool *pl, int nr, unsigned int gfp_mask) { @@ -409,8 +564,12 @@ int ldlm_pool_shrink(struct ldlm_pool *pl, int nr, } EXPORT_SYMBOL(ldlm_pool_shrink); -/* The purpose of this function is to re-setup limit and maximal allowed - * slv according to the passed limit. */ +/** + * Pool setup wrapper. Will call either client or server pool recalc callback + * depending what pool \a pl is used. + * + * Sets passed \a limit into pool \a pl. + */ int ldlm_pool_setup(struct ldlm_pool *pl, int limit) { ENTRY; @@ -427,11 +586,12 @@ static int lprocfs_rd_pool_state(char *page, char **start, off_t off, int granted, grant_rate, cancel_rate, grant_step; int nr = 0, grant_speed, grant_plan; struct ldlm_pool *pl = data; + __u64 slv, clv; __u32 limit; - __u64 slv; spin_lock(&pl->pl_lock); - slv = ldlm_pool_get_slv(pl); + slv = pl->pl_server_lock_volume; + clv = pl->pl_client_lock_volume; limit = ldlm_pool_get_limit(pl); grant_plan = pl->pl_grant_plan; grant_step = pl->pl_grant_step; @@ -444,6 +604,7 @@ static int lprocfs_rd_pool_state(char *page, char **start, off_t off, nr += snprintf(page + nr, count - nr, "LDLM pool state (%s):\n", pl->pl_name); nr += snprintf(page + nr, count - nr, " SLV: "LPU64"\n", slv); + nr += snprintf(page + nr, count - nr, " CLV: "LPU64"\n", clv); nr += snprintf(page + nr, count - nr, " LVF: %d\n", atomic_read(&pl->pl_lock_volume_factor)); @@ -639,13 +800,13 @@ int ldlm_pool_init(struct ldlm_pool *pl, struct ldlm_namespace *ns, if (client == LDLM_NAMESPACE_SERVER) { pl->pl_ops = &ldlm_srv_pool_ops; ldlm_pool_set_limit(pl, LDLM_POOL_HOST_L); - ldlm_pool_set_slv(pl, ldlm_pool_slv_max(LDLM_POOL_HOST_L)); + pl->pl_server_lock_volume = ldlm_pool_slv_max(LDLM_POOL_HOST_L); } else { - ldlm_pool_set_slv(pl, 1); + pl->pl_server_lock_volume = 1; ldlm_pool_set_limit(pl, 1); pl->pl_ops = &ldlm_cli_pool_ops; } - + pl->pl_client_lock_volume = 0; rc = ldlm_pool_proc_init(pl); if (rc) RETURN(rc); @@ -660,17 +821,28 @@ void ldlm_pool_fini(struct ldlm_pool *pl) { ENTRY; ldlm_pool_proc_fini(pl); - pl->pl_ops = NULL; + + /* + * Pool should not be used after this point. We can't free it here as + * it lives in struct ldlm_namespace, but still interested in catching + * any abnormal using cases. + */ + POISON(pl, 0x5a, sizeof(*pl)); EXIT; } EXPORT_SYMBOL(ldlm_pool_fini); +/** + * Add new taken ldlm lock \a lock into pool \a pl accounting. + */ void ldlm_pool_add(struct ldlm_pool *pl, struct ldlm_lock *lock) { - /* FLOCK locks are special in a sense that they are almost never + /* + * FLOCK locks are special in a sense that they are almost never * cancelled, instead special kind of lock is used to drop them. * also there is no LRU for flock locks, so no point in tracking - * them anyway */ + * them anyway. + */ if (lock->l_resource->lr_type == LDLM_FLOCK) return; @@ -682,18 +854,26 @@ void ldlm_pool_add(struct ldlm_pool *pl, struct ldlm_lock *lock) lprocfs_counter_incr(pl->pl_stats, LDLM_POOL_GRANT_STAT); - /* Do not do pool recalc for client side as all locks which + /* + * Do not do pool recalc for client side as all locks which * potentially may be canceled has already been packed into * enqueue/cancel rpc. Also we do not want to run out of stack - * with too long call paths. */ + * with too long call paths. + */ if (ns_is_server(ldlm_pl2ns(pl))) ldlm_pool_recalc(pl); EXIT; } EXPORT_SYMBOL(ldlm_pool_add); +/** + * Remove ldlm lock \a lock from pool \a pl accounting. + */ void ldlm_pool_del(struct ldlm_pool *pl, struct ldlm_lock *lock) { + /* + * Filter out FLOCK locks. Read above comment in ldlm_pool_add(). + */ if (lock->l_resource->lr_type == LDLM_FLOCK) return; ENTRY; @@ -710,33 +890,89 @@ void ldlm_pool_del(struct ldlm_pool *pl, struct ldlm_lock *lock) } EXPORT_SYMBOL(ldlm_pool_del); -/* ->pl_lock should be taken. */ +/** + * Returns current \a pl SLV. + * + * \pre ->pl_lock is not locked. + */ __u64 ldlm_pool_get_slv(struct ldlm_pool *pl) { - return pl->pl_server_lock_volume; + __u64 slv; + spin_lock(&pl->pl_lock); + slv = pl->pl_server_lock_volume; + spin_unlock(&pl->pl_lock); + return slv; } EXPORT_SYMBOL(ldlm_pool_get_slv); -/* ->pl_lock should be taken. */ +/** + * Sets passed \a slv to \a pl. + * + * \pre ->pl_lock is not locked. + */ void ldlm_pool_set_slv(struct ldlm_pool *pl, __u64 slv) { + spin_lock(&pl->pl_lock); pl->pl_server_lock_volume = slv; + spin_unlock(&pl->pl_lock); } EXPORT_SYMBOL(ldlm_pool_set_slv); +/** + * Returns current \a pl CLV. + * + * \pre ->pl_lock is not locked. + */ +__u64 ldlm_pool_get_clv(struct ldlm_pool *pl) +{ + __u64 slv; + spin_lock(&pl->pl_lock); + slv = pl->pl_client_lock_volume; + spin_unlock(&pl->pl_lock); + return slv; +} +EXPORT_SYMBOL(ldlm_pool_get_clv); + +/** + * Sets passed \a clv to \a pl. + * + * \pre ->pl_lock is not locked. + */ +void ldlm_pool_set_clv(struct ldlm_pool *pl, __u64 clv) +{ + spin_lock(&pl->pl_lock); + pl->pl_client_lock_volume = clv; + spin_unlock(&pl->pl_lock); +} +EXPORT_SYMBOL(ldlm_pool_set_clv); + +/** + * Returns current \a pl limit. + */ __u32 ldlm_pool_get_limit(struct ldlm_pool *pl) { return atomic_read(&pl->pl_limit); } EXPORT_SYMBOL(ldlm_pool_get_limit); +/** + * Sets passed \a limit to \a pl. + */ void ldlm_pool_set_limit(struct ldlm_pool *pl, __u32 limit) { atomic_set(&pl->pl_limit, limit); } EXPORT_SYMBOL(ldlm_pool_set_limit); -/* Server side is only enabled for kernel space for now. */ +/** + * Returns current LVF from \a pl. + */ +__u32 ldlm_pool_get_lvf(struct ldlm_pool *pl) +{ + return atomic_read(&pl->pl_lock_volume_factor); +} +EXPORT_SYMBOL(ldlm_pool_get_lvf); + #ifdef __KERNEL__ static int ldlm_pool_granted(struct ldlm_pool *pl) { @@ -759,9 +995,11 @@ void ldlm_pools_wakeup(void) } EXPORT_SYMBOL(ldlm_pools_wakeup); -/* Cancel @nr locks from all namespaces (if possible). Returns number of +/* + * Cancel \a nr locks from all namespaces (if possible). Returns number of * cached locks after shrink is finished. All namespaces are asked to - * cancel approximately equal amount of locks. */ + * cancel approximately equal amount of locks to keep balancing. + */ static int ldlm_pools_shrink(ldlm_side_t client, int nr, unsigned int gfp_mask) { @@ -771,10 +1009,12 @@ static int ldlm_pools_shrink(ldlm_side_t client, int nr, if (nr != 0 && !(gfp_mask & __GFP_FS)) return -1; - CDEBUG(D_DLMTRACE, "request to shrink %d %s locks from all pools\n", + CDEBUG(D_DLMTRACE, "Request to shrink %d %s locks from all pools\n", nr, client == LDLM_NAMESPACE_CLIENT ? "client" : "server"); - /* Find out how many resources we may release. */ + /* + * Find out how many resources we may release. + */ for (nr_ns = atomic_read(ldlm_namespace_nr(client)); nr_ns > 0; nr_ns--) { @@ -794,19 +1034,25 @@ static int ldlm_pools_shrink(ldlm_side_t client, int nr, if (nr == 0 || total == 0) return total; - /* Shrink at least ldlm_namespace_nr(client) namespaces. */ + /* + * Shrink at least ldlm_namespace_nr(client) namespaces. + */ for (nr_ns = atomic_read(ldlm_namespace_nr(client)); nr_ns > 0; nr_ns--) { int cancel, nr_locks; - /* Do not call shrink under ldlm_namespace_lock(client) */ + /* + * Do not call shrink under ldlm_namespace_lock(client) + */ mutex_down(ldlm_namespace_lock(client)); if (list_empty(ldlm_namespace_list(client))) { mutex_up(ldlm_namespace_lock(client)); - /* If list is empty, we can't return any @cached > 0, + /* + * If list is empty, we can't return any @cached > 0, * that probably would cause needless shrinker - * call. */ + * call. + */ cached = 0; break; } @@ -840,9 +1086,13 @@ void ldlm_pools_recalc(ldlm_side_t client) struct ldlm_namespace *ns; int nr, equal = 0; - /* No need to setup pool limit for client pools. */ + /* + * No need to setup pool limit for client pools. + */ if (client == LDLM_NAMESPACE_SERVER) { - /* Check all modest namespaces first. */ + /* + * Check all modest namespaces first. + */ mutex_down(ldlm_namespace_lock(client)); list_for_each_entry(ns, ldlm_namespace_list(client), ns_list_chain) @@ -854,16 +1104,20 @@ void ldlm_pools_recalc(ldlm_side_t client) if (l == 0) l = 1; - /* Set the modest pools limit equal to their avg granted - * locks + 5%. */ + /* + * Set the modest pools limit equal to their avg granted + * locks + 5%. + */ l += dru(l * LDLM_POOLS_MODEST_MARGIN, 100); ldlm_pool_setup(&ns->ns_pool, l); nr_l += l; nr_p++; } - /* Make sure that modest namespaces did not eat more that 2/3 - * of limit */ + /* + * Make sure that modest namespaces did not eat more that 2/3 + * of limit. + */ if (nr_l >= 2 * (LDLM_POOL_HOST_L / 3)) { CWARN("\"Modest\" pools eat out 2/3 of server locks " "limit (%d of %lu). This means that you have too " @@ -872,7 +1126,9 @@ void ldlm_pools_recalc(ldlm_side_t client) equal = 1; } - /* The rest is given to greedy namespaces. */ + /* + * The rest is given to greedy namespaces. + */ list_for_each_entry(ns, ldlm_namespace_list(client), ns_list_chain) { @@ -880,14 +1136,18 @@ void ldlm_pools_recalc(ldlm_side_t client) continue; if (equal) { - /* In the case 2/3 locks are eaten out by + /* + * In the case 2/3 locks are eaten out by * modest pools, we re-setup equal limit - * for _all_ pools. */ + * for _all_ pools. + */ l = LDLM_POOL_HOST_L / atomic_read(ldlm_namespace_nr(client)); } else { - /* All the rest of greedy pools will have - * all locks in equal parts.*/ + /* + * All the rest of greedy pools will have + * all locks in equal parts. + */ l = (LDLM_POOL_HOST_L - nr_l) / (atomic_read(ldlm_namespace_nr(client)) - nr_p); @@ -897,13 +1157,17 @@ void ldlm_pools_recalc(ldlm_side_t client) mutex_up(ldlm_namespace_lock(client)); } - /* Recalc at least ldlm_namespace_nr(client) namespaces. */ + /* + * Recalc at least ldlm_namespace_nr(client) namespaces. + */ for (nr = atomic_read(ldlm_namespace_nr(client)); nr > 0; nr--) { - /* Lock the list, get first @ns in the list, getref, move it + /* + * Lock the list, get first @ns in the list, getref, move it * to the tail, unlock and call pool recalc. This way we avoid * calling recalc under @ns lock what is really good as we get * rid of potential deadlock on client nodes when canceling - * locks synchronously. */ + * locks synchronously. + */ mutex_down(ldlm_namespace_lock(client)); if (list_empty(ldlm_namespace_list(client))) { mutex_up(ldlm_namespace_lock(client)); @@ -914,7 +1178,9 @@ void ldlm_pools_recalc(ldlm_side_t client) ldlm_namespace_move_locked(ns, client); mutex_up(ldlm_namespace_lock(client)); - /* After setup is done - recalc the pool. */ + /* + * After setup is done - recalc the pool. + */ ldlm_pool_recalc(&ns->ns_pool); ldlm_namespace_put(ns, 1); } @@ -937,12 +1203,16 @@ static int ldlm_pools_thread_main(void *arg) while (1) { struct l_wait_info lwi; - /* Recal all pools on this tick. */ + /* + * Recal all pools on this tick. + */ ldlm_pools_recalc(LDLM_NAMESPACE_SERVER); ldlm_pools_recalc(LDLM_NAMESPACE_CLIENT); - /* Wait until the next check time, or until we're - * stopped. */ + /* + * Wait until the next check time, or until we're + * stopped. + */ lwi = LWI_TIMEOUT(cfs_time_seconds(LDLM_POOLS_THREAD_PERIOD), NULL, NULL); l_wait_event(thread->t_ctl_waitq, (thread->t_flags & @@ -982,8 +1252,10 @@ static int ldlm_pools_thread_start(void) init_completion(&ldlm_pools_comp); cfs_waitq_init(&ldlm_pools_thread->t_ctl_waitq); - /* CLONE_VM and CLONE_FILES just avoid a needless copy, because we - * just drop the VM and FILES in ptlrpc_daemonize() right away. */ + /* + * CLONE_VM and CLONE_FILES just avoid a needless copy, because we + * just drop the VM and FILES in ptlrpc_daemonize() right away. + */ rc = cfs_kernel_thread(ldlm_pools_thread_main, ldlm_pools_thread, CLONE_VM | CLONE_FILES); if (rc < 0) { @@ -1010,9 +1282,11 @@ static void ldlm_pools_thread_stop(void) ldlm_pools_thread->t_flags = SVC_STOPPING; cfs_waitq_signal(&ldlm_pools_thread->t_ctl_waitq); - /* Make sure that pools thread is finished before freeing @thread. + /* + * Make sure that pools thread is finished before freeing @thread. * This fixes possible race and oops due to accessing freed memory - * in pools thread. */ + * in pools thread. + */ wait_for_completion(&ldlm_pools_comp); OBD_FREE_PTR(ldlm_pools_thread); ldlm_pools_thread = NULL; @@ -1107,6 +1381,18 @@ void ldlm_pool_set_slv(struct ldlm_pool *pl, __u64 slv) } EXPORT_SYMBOL(ldlm_pool_set_slv); +__u64 ldlm_pool_get_clv(struct ldlm_pool *pl) +{ + return 1; +} +EXPORT_SYMBOL(ldlm_pool_get_clv); + +void ldlm_pool_set_clv(struct ldlm_pool *pl, __u64 clv) +{ + return; +} +EXPORT_SYMBOL(ldlm_pool_set_clv); + __u32 ldlm_pool_get_limit(struct ldlm_pool *pl) { return 0; @@ -1119,6 +1405,12 @@ void ldlm_pool_set_limit(struct ldlm_pool *pl, __u32 limit) } EXPORT_SYMBOL(ldlm_pool_set_limit); +__u32 ldlm_pool_get_lvf(struct ldlm_pool *pl) +{ + return 0; +} +EXPORT_SYMBOL(ldlm_pool_get_lvf); + int ldlm_pools_init(void) { return 0; diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index 6304423..ddedaeb 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -1002,22 +1002,30 @@ static inline struct ldlm_pool *ldlm_imp2pl(struct obd_import *imp) int ldlm_cli_update_pool(struct ptlrpc_request *req) { + struct obd_device *obd; __u64 old_slv, new_slv; - struct ldlm_pool *pl; __u32 new_limit; ENTRY; - if (!imp_connect_lru_resize(req->rq_import)) + if (unlikely(!req->rq_import || !req->rq_import->imp_obd || + !imp_connect_lru_resize(req->rq_import))) + { + /* + * Do nothing for corner cases. + */ RETURN(0); + } - /* In some cases RPC may contain slv and limit zeroed out. This is + /* + * In some cases RPC may contain slv and limit zeroed out. This is * the case when server does not support lru resize feature. This is * also possible in some recovery cases when server side reqs have no * ref to obd export and thus access to server side namespace is no - * possible. */ + * possible. + */ if (lustre_msg_get_slv(req->rq_repmsg) == 0 || lustre_msg_get_limit(req->rq_repmsg) == 0) { - DEBUG_REQ(D_HA, req, "zero SLV or Limit found " + DEBUG_REQ(D_HA, req, "Zero SLV or Limit found " "(SLV: "LPU64", Limit: %u)", lustre_msg_get_slv(req->rq_repmsg), lustre_msg_get_limit(req->rq_repmsg)); @@ -1026,30 +1034,41 @@ int ldlm_cli_update_pool(struct ptlrpc_request *req) new_limit = lustre_msg_get_limit(req->rq_repmsg); new_slv = lustre_msg_get_slv(req->rq_repmsg); - pl = ldlm_imp2pl(req->rq_import); - - spin_lock(&pl->pl_lock); - old_slv = ldlm_pool_get_slv(pl); - ldlm_pool_set_slv(pl, new_slv); - ldlm_pool_set_limit(pl, new_limit); - - /* Check if we need to wakeup pools thread for fast SLV change. + obd = req->rq_import->imp_obd; + + /* + * Set new SLV and Limit to obd fields to make accessible for pool + * thread. We do not access obd_namespace and pool directly here + * as there is no reliable way to make sure that they are still + * alive in cleanup time. Evil races are possible which may cause + * oops in that time. + */ + write_lock(&obd->obd_pool_lock); + old_slv = obd->obd_pool_slv; + obd->obd_pool_slv = new_slv; + obd->obd_pool_limit = new_limit; + write_unlock(&obd->obd_pool_lock); + + /* + * Check if we need to wakeup pools thread for fast SLV change. * This is only done when threads period is noticably long like - * 10s or more. */ + * 10s or more. + */ #if defined(__KERNEL__) && (LDLM_POOLS_THREAD_PERIOD >= 10) - { + if (old_slv > 0) { __u64 fast_change = old_slv * LDLM_POOLS_FAST_SLV_CHANGE; do_div(fast_change, 100); - /* Wake up pools thread only if SLV has changed more than + /* + * Wake up pools thread only if SLV has changed more than * 50% since last update. In this case we want to react asap. * Otherwise it is no sense to wake up pools as they are - * re-calculated every LDLM_POOLS_THREAD_PERIOD anyways. */ + * re-calculated every LDLM_POOLS_THREAD_PERIOD anyways. + */ if (old_slv > new_slv && old_slv - new_slv > fast_change) ldlm_pools_wakeup(); } #endif - spin_unlock(&pl->pl_lock); RETURN(0); } EXPORT_SYMBOL(ldlm_cli_update_pool); @@ -1201,10 +1220,8 @@ static ldlm_policy_res_t ldlm_cancel_lrur_policy(struct ldlm_namespace *ns, if (count && added >= count) return LDLM_POLICY_KEEP_LOCK; - spin_lock(&pl->pl_lock); slv = ldlm_pool_get_slv(pl); - lvf = atomic_read(&pl->pl_lock_volume_factor); - spin_unlock(&pl->pl_lock); + lvf = ldlm_pool_get_lvf(pl); la = cfs_duration_sec(cfs_time_sub(cur, lock->l_last_used)); @@ -1212,6 +1229,9 @@ static ldlm_policy_res_t ldlm_cancel_lrur_policy(struct ldlm_namespace *ns, /* Stop when slv is not yet come from server or * lv is smaller than it is. */ lv = lvf * la * unused; + + /* Inform pool about current CLV to see it via proc. */ + ldlm_pool_set_clv(pl, lv); return (slv == 1 || lv < slv) ? LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK; } diff --git a/lustre/ldlm/ldlm_resource.c b/lustre/ldlm/ldlm_resource.c index 48399f1..92d41aa 100644 --- a/lustre/ldlm/ldlm_resource.c +++ b/lustre/ldlm/ldlm_resource.c @@ -289,8 +289,9 @@ void ldlm_proc_namespace(struct ldlm_namespace *ns) #define ldlm_proc_namespace(ns) do {} while (0) #endif /* LPROCFS */ -struct ldlm_namespace *ldlm_namespace_new(char *name, ldlm_side_t client, - ldlm_appetite_t apt) +struct ldlm_namespace * +ldlm_namespace_new(struct obd_device *obd, char *name, + ldlm_side_t client, ldlm_appetite_t apt) { struct ldlm_namespace *ns = NULL; struct list_head *bucket; @@ -318,6 +319,10 @@ struct ldlm_namespace *ldlm_namespace_new(char *name, ldlm_side_t client, ns->ns_shrink_thumb = LDLM_LOCK_SHRINK_THUMB; ns->ns_appetite = apt; + + LASSERT(obd != NULL); + ns->ns_obd = obd; + strcpy(ns->ns_name, name); CFS_INIT_LIST_HEAD(&ns->ns_root_list); @@ -576,11 +581,9 @@ void ldlm_namespace_free_post(struct ldlm_namespace *ns) return; } - /* - * Fini pool _before_ parent proc dir is removed. This is important as + /* Fini pool _before_ parent proc dir is removed. This is important as * ldlm_pool_fini() removes own proc dir which is child to @dir. Removing - * it after @dir may cause oops. - */ + * it after @dir may cause oops. */ ldlm_pool_fini(&ns->ns_pool); #ifdef LPROCFS @@ -597,10 +600,9 @@ void ldlm_namespace_free_post(struct ldlm_namespace *ns) #endif OBD_VFREE(ns->ns_hash, sizeof(*ns->ns_hash) * RES_HASH_SIZE); OBD_FREE(ns->ns_name, strlen(ns->ns_name) + 1); - /* - * @ns should be not on list in this time, otherwise this will cause - * issues realted to using freed @ns in pools thread. - */ + + /* @ns should be not on list in this time, otherwise this will cause + * issues realted to using freed @ns in pools thread. */ LASSERT(list_empty(&ns->ns_list_chain)); OBD_FREE_PTR(ns); ldlm_put_ref(); diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index de4108f..d92026d 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -1990,7 +1990,7 @@ static int mds_setup(struct obd_device *obd, obd_count len, void *buf) mds->mds_evict_ost_nids = 1; sprintf(ns_name, "mds-%s", obd->obd_uuid.uuid); - obd->obd_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER, + obd->obd_namespace = ldlm_namespace_new(obd, ns_name, LDLM_NAMESPACE_SERVER, LDLM_NAMESPACE_GREEDY); if (obd->obd_namespace == NULL) { mds_cleanup(obd); @@ -2291,6 +2291,7 @@ static int mds_cleanup(struct obd_device *obd) obd->u.obt.obt_sb = NULL; ldlm_namespace_free(obd->obd_namespace, NULL, obd->obd_force); + obd->obd_namespace = NULL; spin_lock_bh(&obd->obd_processing_task_lock); if (obd->obd_recovering) { diff --git a/lustre/mgs/mgs_handler.c b/lustre/mgs/mgs_handler.c index c9ccaa6..c67e48f 100644 --- a/lustre/mgs/mgs_handler.c +++ b/lustre/mgs/mgs_handler.c @@ -145,7 +145,7 @@ static int mgs_setup(struct obd_device *obd, obd_count len, void *buf) GOTO(err_put, rc = PTR_ERR(obd->obd_fsops)); /* namespace for mgs llog */ - obd->obd_namespace = ldlm_namespace_new("MGS", LDLM_NAMESPACE_SERVER, + obd->obd_namespace = ldlm_namespace_new(obd, "MGS", LDLM_NAMESPACE_SERVER, LDLM_NAMESPACE_MODEST); if (obd->obd_namespace == NULL) GOTO(err_ops, rc = -ENOMEM); @@ -241,16 +241,6 @@ static int mgs_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage) RETURN(rc); } -static int mgs_ldlm_nsfree(void *data) -{ - struct ldlm_namespace *ns = (struct ldlm_namespace *)data; - ENTRY; - - ptlrpc_daemonize("ll_mgs_nsfree"); - ldlm_namespace_free(ns, NULL, 1 /* obd_force should always be on */); - RETURN(0); -} - static int mgs_cleanup(struct obd_device *obd) { struct mgs_obd *mgs = &obd->u.mgs; @@ -270,12 +260,8 @@ static int mgs_cleanup(struct obd_device *obd) server_put_mount(obd->obd_name, mgs->mgs_vfsmnt); mgs->mgs_sb = NULL; - /* Free the namespace in it's own thread, so that if the - ldlm_cancel_handler put the last mgs obd ref, we won't - deadlock here. */ - cfs_kernel_thread(mgs_ldlm_nsfree, obd->obd_namespace, - CLONE_VM | CLONE_FILES); - + ldlm_namespace_free(obd->obd_namespace, NULL, 1); + obd->obd_namespace = NULL; fsfilt_put_ops(obd->obd_fsops); LCONSOLE_INFO("%s has stopped.\n", obd->obd_name); diff --git a/lustre/obdclass/genops.c b/lustre/obdclass/genops.c index f6fbf45..57eaa22 100644 --- a/lustre/obdclass/genops.c +++ b/lustre/obdclass/genops.c @@ -70,8 +70,14 @@ EXPORT_SYMBOL(obd_device_alloc); static void obd_device_free(struct obd_device *obd) { LASSERT(obd != NULL); - LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n", - obd, obd->obd_magic, OBD_DEVICE_MAGIC); + LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic " + "%08x != %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC); + if (obd->obd_namespace != NULL) { + CERROR("obd %p: namespace %p was not properly cleaned up " + "(obd_force=%d)!\n", + obd, obd->obd_namespace, obd->obd_force); + LBUG(); + } OBD_SLAB_FREE_PTR(obd, obd_device_cachep); } EXPORT_SYMBOL(obd_device_free); diff --git a/lustre/obdclass/obd_config.c b/lustre/obdclass/obd_config.c index 0f98e59..cf8c393 100644 --- a/lustre/obdclass/obd_config.c +++ b/lustre/obdclass/obd_config.c @@ -188,6 +188,10 @@ int class_attach(struct lustre_cfg *lcfg) LASSERTF(strncmp(obd->obd_name, name, strlen(name)) == 0, "%p obd_name %s != %s\n", obd, obd->obd_name, name); + rwlock_init(&obd->obd_pool_lock); + obd->obd_pool_limit = 0; + obd->obd_pool_slv = 0; + CFS_INIT_LIST_HEAD(&obd->obd_exports); CFS_INIT_LIST_HEAD(&obd->obd_exports_timed); CFS_INIT_LIST_HEAD(&obd->obd_nid_stats); diff --git a/lustre/obdecho/echo.c b/lustre/obdecho/echo.c index d49f801..b822c56 100644 --- a/lustre/obdecho/echo.c +++ b/lustre/obdecho/echo.c @@ -465,7 +465,7 @@ static int echo_setup(struct obd_device *obd, obd_count len, void *buf) spin_lock_init(&obd->u.echo.eo_lock); obd->u.echo.eo_lastino = ECHO_INIT_OBJID; - obd->obd_namespace = ldlm_namespace_new("echo-tgt", + obd->obd_namespace = ldlm_namespace_new(obd, "echo-tgt", LDLM_NAMESPACE_SERVER, LDLM_NAMESPACE_GREEDY); if (obd->obd_namespace == NULL) { @@ -511,6 +511,7 @@ static int echo_cleanup(struct obd_device *obd) cfs_schedule_timeout (CFS_TASK_UNINT, cfs_time_seconds(1)); ldlm_namespace_free(obd->obd_namespace, NULL, obd->obd_force); + obd->obd_namespace = NULL; leaked = atomic_read(&obd->u.echo.eo_prep); if (leaked != 0) diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index 8bd25cd..8245564 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -1765,7 +1765,7 @@ int filter_common_setup(struct obd_device *obd, obd_count len, void *buf, filter->fo_fmd_max_age = FILTER_FMD_MAX_AGE_DEFAULT; sprintf(ns_name, "filter-%s", obd->obd_uuid.uuid); - obd->obd_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER, + obd->obd_namespace = ldlm_namespace_new(obd, ns_name, LDLM_NAMESPACE_SERVER, LDLM_NAMESPACE_GREEDY); if (obd->obd_namespace == NULL) GOTO(err_post, rc = -ENOMEM); @@ -2017,6 +2017,7 @@ static int filter_cleanup(struct obd_device *obd) lquota_cleanup(filter_quota_interface_ref, obd); ldlm_namespace_free(obd->obd_namespace, NULL, obd->obd_force); + obd->obd_namespace = NULL; if (obd->u.obt.obt_sb == NULL) RETURN(0); diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index 772667e..1f2a4cb 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -851,14 +851,9 @@ static int after_reply(struct ptlrpc_request *req) RETURN(rc); } } else { - /* Let's look if server send slv. Do it only for RPC with + /* Let's look if server sent slv. Do it only for RPC with * rc == 0. */ - if (imp->imp_obd->obd_namespace) { - /* Disconnect rpc is sent when namespace is already - * destroyed. Let's check this and will not try update - * pool. */ - ldlm_cli_update_pool(req); - } + ldlm_cli_update_pool(req); } /* Store transno in reqmsg for replay. */ diff --git a/lustre/ptlrpc/niobuf.c b/lustre/ptlrpc/niobuf.c index 3b64084..4c2f988 100644 --- a/lustre/ptlrpc/niobuf.c +++ b/lustre/ptlrpc/niobuf.c @@ -363,8 +363,7 @@ int ptlrpc_send_reply (struct ptlrpc_request *req, int flags) /* Report service time estimate for future client reqs */ lustre_msg_set_timeout(req->rq_repmsg, at_get(&svc->srv_at_estimate)); - if (req->rq_export && req->rq_export->exp_obd) - target_pack_pool_reply(req); + target_pack_pool_reply(req); if (lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT) { /* early replies go to offset 0, regular replies go after that*/ -- 1.8.3.1