disconnect takes too long time if there are many locks to cancel.
besides the amount of time spent on each lock cancel, there is a
resched() in cfs_hash_for_each_relax(), i.e. disconnect or eviction
may take unexpectedly long time.
- do not cancel locks on disconnect_export;
- export will be left in obd_unlinked_exports list pinned by live
locks;
- new re-connects will created other non-conflicting exports;
- new locks will cancel obsolete locks on conflicts;
- once all the locks on the disconnected export will be cancelled,
the export will be destroyed on the last ref put;
- do not cancel in small portions, cancel all together in just 1
dedicated thread - use server side blocking thread for that;
- cancel blocked locks first so that waiting locks could proceed;
- take care about blocked waiting locks, so that they would get
cancelled quickly too;
- do not remove lock from waiting list on AST error before moving
it to elt_expired_locks list, because it removes it from export
list too; otherwise this blocked lock will not be cancelled
immediately on failed export;
- cancel lock instead of just destroy for failed export, to make
full cleanup, i.e. remove it from export list.
also make the proper order of events on umount:
- disconnect export;
- cleanup namespace, to cancel all the locks before export barrier;
- exports barrier;
- lprocfs_free_per_client_stats (requires nid_exp_ref_count == 0);
- namespace_free_post is left in cleanup ensure will not get and
segfault on an absent namespace.
Signed-off-by: Vitaly Fertman <vitaly_fertman@xyratex.com>
Change-Id: Ia39b09ce967237ed5078c8a71e760b1e103c6f55
Xyratex-bug-id: MRP-395 MRP-1366 MRP-1366
Reviewed-by: Andriy Skulysh <Andriy_Skulysh@xyratex.com>
Reviewed-by: Alexey Lyashkov <Alexey_Lyashkov@xyratex.com>
Tested-by: Elena Gryaznova <Elena_Gryaznova@xyratex.com>
Reviewed-on: http://review.whamcloud.com/5843
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: James Simmons <uja.ornl@yahoo.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
ldlm_side_t client, ldlm_appetite_t apt,
ldlm_ns_type_t ns_type);
int ldlm_namespace_cleanup(struct ldlm_namespace *ns, __u64 flags);
+void ldlm_namespace_free_prior(struct ldlm_namespace *ns,
+ struct obd_import *imp,
+ int force);
+void ldlm_namespace_free_post(struct ldlm_namespace *ns);
void ldlm_namespace_free(struct ldlm_namespace *ns,
struct obd_import *imp, int force);
void ldlm_namespace_register(struct ldlm_namespace *ns, ldlm_side_t client);
struct obd_uuid exp_client_uuid;
/** To link all exports on an obd device */
struct list_head exp_obd_chain;
+ /* Unlinked export list */
+ struct list_head exp_stale_list;
struct hlist_node exp_uuid_hash; /** uuid-export hash*/
struct hlist_node exp_nid_hash; /** nid-export hash */
struct hlist_node exp_gen_hash; /** last_rcvd clt gen hash */
struct llog_rec_hdr;
typedef int (*llog_cb_t)(const struct lu_env *, struct llog_handle *,
struct llog_rec_hdr *, void *);
+
+extern atomic_t obd_stale_export_num;
+extern struct list_head obd_stale_exports;
+extern spinlock_t obd_stale_export_lock;
+
+struct obd_export *obd_stale_export_get(void);
+void obd_stale_export_put(struct obd_export *exp);
+void obd_stale_export_adjust(struct obd_export *exp);
+
/* obd_config.c */
struct lustre_cfg *lustre_cfg_rename(struct lustre_cfg *cfg,
const char *new_name);
#define OBD_FAIL_LDLM_AGL_NOLOCK 0x31b
#define OBD_FAIL_LDLM_OST_LVB 0x31c
#define OBD_FAIL_LDLM_ENQUEUE_HANG 0x31d
+#define OBD_FAIL_LDLM_BL_EVICT 0x31e
#define OBD_FAIL_LDLM_PAUSE_CANCEL2 0x31f
#define OBD_FAIL_LDLM_CP_CB_WAIT2 0x320
#define OBD_FAIL_LDLM_CP_CB_WAIT3 0x321
int ldlm_resource_putref_locked(struct ldlm_resource *res);
void ldlm_resource_insert_lock_after(struct ldlm_lock *original,
struct ldlm_lock *new);
-void ldlm_namespace_free_prior(struct ldlm_namespace *ns,
- struct obd_import *imp, int force);
-void ldlm_namespace_free_post(struct ldlm_namespace *ns);
/* ldlm_lock.c */
void ldlm_lock_touch_in_lru(struct ldlm_lock *lock);
void ldlm_lock_destroy_nolock(struct ldlm_lock *lock);
-void ldlm_cancel_locks_for_export(struct obd_export *export);
+int ldlm_export_cancel_blocked_locks(struct obd_export *exp);
+int ldlm_export_cancel_locks(struct obd_export *exp);
/* ldlm_lockd.c */
int ldlm_bl_to_thread_lock(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
struct ldlm_lock_desc *ld,
struct list_head *cancels, int count,
ldlm_cancel_flags_t cancel_flags);
+int ldlm_bl_thread_wakeup(void);
void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
struct ldlm_lock_desc *ld, struct ldlm_lock *lock);
if (exp->exp_imp_reverse)
ptlrpc_cleanup_imp(exp->exp_imp_reverse);
- if (exp->exp_obd->obd_namespace != NULL)
- ldlm_cancel_locks_for_export(exp);
+ ldlm_bl_thread_wakeup();
/* complete all outstanding replies */
spin_lock(&exp->exp_lock);
res = lock->l_resource;
LASSERT(ldlm_is_destroyed(lock));
+ LASSERT(list_empty(&lock->l_exp_list));
LASSERT(list_empty(&lock->l_res_link));
LASSERT(list_empty(&lock->l_pending_chain));
int ecl_loop;
};
+static void ldlm_cancel_lock_for_export(struct obd_export *exp,
+ struct ldlm_lock *lock,
+ struct export_cl_data *ecl)
+{
+ struct ldlm_resource *res;
+
+ res = ldlm_resource_getref(lock->l_resource);
+
+ ldlm_res_lvbo_update(res, NULL, 1);
+ ldlm_lock_cancel(lock);
+ if (!exp->exp_obd->obd_stopping)
+ ldlm_reprocess_all(res);
+ ldlm_resource_putref(res);
+
+ ecl->ecl_loop++;
+ if ((ecl->ecl_loop & -ecl->ecl_loop) == ecl->ecl_loop) {
+ CDEBUG(D_INFO, "Export %p, %d locks cancelled.\n",
+ exp, ecl->ecl_loop);
+ }
+}
+
/**
- * Iterator function for ldlm_cancel_locks_for_export.
+ * Iterator function for ldlm_export_cancel_locks.
* Cancels passed locks.
*/
static int
{
struct export_cl_data *ecl = (struct export_cl_data *)data;
struct obd_export *exp = ecl->ecl_exp;
- struct ldlm_lock *lock = cfs_hash_object(hs, hnode);
- struct ldlm_resource *res;
+ struct ldlm_lock *lock = cfs_hash_object(hs, hnode);
- res = ldlm_resource_getref(lock->l_resource);
- LDLM_LOCK_GET(lock);
+ LDLM_LOCK_GET(lock);
+ ldlm_cancel_lock_for_export(exp, lock, ecl);
+ LDLM_LOCK_RELEASE(lock);
- LDLM_DEBUG(lock, "export %p", exp);
- ldlm_res_lvbo_update(res, NULL, 1);
- ldlm_lock_cancel(lock);
- ldlm_reprocess_all(res);
- ldlm_resource_putref(res);
- LDLM_LOCK_RELEASE(lock);
+ return 0;
+}
- ecl->ecl_loop++;
- if ((ecl->ecl_loop & -ecl->ecl_loop) == ecl->ecl_loop) {
- CDEBUG(D_INFO,
- "Cancel lock %p for export %p (loop %d), still have "
- "%d locks left on hash table.\n",
- lock, exp, ecl->ecl_loop,
- atomic_read(&hs->hs_count));
+/**
+ * Cancel all blocked locks for given export.
+ *
+ * Typically called on client disconnection/eviction
+ */
+int ldlm_export_cancel_blocked_locks(struct obd_export *exp)
+{
+ struct export_cl_data ecl = {
+ .ecl_exp = exp,
+ .ecl_loop = 0,
+ };
+
+ while (!list_empty(&exp->exp_bl_list)) {
+ struct ldlm_lock *lock;
+
+ spin_lock_bh(&exp->exp_bl_list_lock);
+ if (!list_empty(&exp->exp_bl_list)) {
+ lock = list_entry(exp->exp_bl_list.next,
+ struct ldlm_lock, l_exp_list);
+ LDLM_LOCK_GET(lock);
+ list_del_init(&lock->l_exp_list);
+ } else {
+ lock = NULL;
+ }
+ spin_unlock_bh(&exp->exp_bl_list_lock);
+
+ if (lock == NULL)
+ break;
+
+ ldlm_cancel_lock_for_export(exp, lock, &ecl);
+ LDLM_LOCK_RELEASE(lock);
}
- return 0;
+ CDEBUG(D_DLMTRACE, "Export %p, canceled %d locks, "
+ "left on hash table %d.\n", exp, ecl.ecl_loop,
+ atomic_read(&exp->exp_lock_hash->hs_count));
+
+ return ecl.ecl_loop;
}
/**
* Cancel all locks for given export.
*
- * Typically called on client disconnection/eviction
+ * Typically called after client disconnection/eviction
*/
-void ldlm_cancel_locks_for_export(struct obd_export *exp)
+int ldlm_export_cancel_locks(struct obd_export *exp)
{
struct export_cl_data ecl = {
.ecl_exp = exp,
cfs_hash_for_each_empty(exp->exp_lock_hash,
ldlm_cancel_locks_for_export_cb, &ecl);
+
+ CDEBUG(D_DLMTRACE, "Export %p, canceled %d locks, "
+ "left on hash table %d.\n", exp, ecl.ecl_loop,
+ atomic_read(&exp->exp_lock_hash->hs_count));
+
+ return ecl.ecl_loop;
}
/**
return 1;
}
+static void ldlm_add_blocked_lock(struct ldlm_lock *lock)
+{
+ spin_lock_bh(&lock->l_export->exp_bl_list_lock);
+ if (list_empty(&lock->l_exp_list)) {
+ if (lock->l_granted_mode != lock->l_req_mode)
+ list_add_tail(&lock->l_exp_list,
+ &lock->l_export->exp_bl_list);
+ else
+ list_add(&lock->l_exp_list,
+ &lock->l_export->exp_bl_list);
+ }
+ spin_unlock_bh(&lock->l_export->exp_bl_list_lock);
+
+ /* A blocked lock is added. Adjust the position in
+ * the stale list if the export is in the list.
+ * If export is stale and not in the list - it is being
+ * processed and will be placed on the right position
+ * on obd_stale_export_put(). */
+ if (!list_empty(&lock->l_export->exp_stale_list))
+ obd_stale_export_adjust(lock->l_export);
+}
+
static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
{
int ret;
}
spin_unlock_bh(&waiting_locks_spinlock);
- if (ret) {
- spin_lock_bh(&lock->l_export->exp_bl_list_lock);
- if (list_empty(&lock->l_exp_list))
- list_add(&lock->l_exp_list,
- &lock->l_export->exp_bl_list);
- spin_unlock_bh(&lock->l_export->exp_bl_list_lock);
- }
+ if (ret)
+ ldlm_add_blocked_lock(lock);
LDLM_DEBUG(lock, "%sadding to wait list(timeout: %d, AT: %s)",
ret == 0 ? "not re-" : "", timeout,
req->rq_interpret_reply = ldlm_cb_interpret;
lock_res_and_lock(lock);
- if (lock->l_granted_mode != lock->l_req_mode) {
- /* this blocking AST will be communicated as part of the
- * completion AST instead */
+ if (ldlm_is_destroyed(lock)) {
+ /* What's the point? */
unlock_res_and_lock(lock);
-
ptlrpc_req_finished(req);
- LDLM_DEBUG(lock, "lock not granted, not sending blocking AST");
RETURN(0);
}
- if (ldlm_is_destroyed(lock)) {
- /* What's the point? */
+ if (lock->l_granted_mode != lock->l_req_mode) {
+ /* this blocking AST will be communicated as part of the
+ * completion AST instead */
+ ldlm_add_blocked_lock(lock);
+ ldlm_set_waited(lock);
unlock_res_and_lock(lock);
+
ptlrpc_req_finished(req);
+ LDLM_DEBUG(lock, "lock not granted, not sending blocking AST");
RETURN(0);
}
ldlm_lock2desc(lock, &dlm_rep->lock_desc);
ldlm_lock2handle(lock, &dlm_rep->lock_handle);
+ if (lock && lock->l_resource->lr_type == LDLM_EXTENT)
+ OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_BL_EVICT, 6);
+
/* We never send a blocking AST until the lock is granted, but
* we can tell it right now */
lock_res_and_lock(lock);
}
if (rc != 0) {
- lock_res_and_lock(lock);
- ldlm_resource_unlink_lock(lock);
- ldlm_lock_destroy_nolock(lock);
- unlock_res_and_lock(lock);
- }
+ if (lock->l_export) {
+ ldlm_lock_cancel(lock);
+ } else {
+ lock_res_and_lock(lock);
+ ldlm_resource_unlink_lock(lock);
+ ldlm_lock_destroy_nolock(lock);
+ unlock_res_and_lock(lock);
+
+ }
+ }
if (!err && dlm_req->lock_desc.l_resource.lr_type != LDLM_FLOCK)
ldlm_reprocess_all(lock->l_resource);
return ldlm_bl_to_thread(ns, ld, NULL, cancels, count, cancel_flags);
}
+int ldlm_bl_thread_wakeup(void)
+{
+ wake_up(&ldlm_state->ldlm_bl_pool->blp_waitq);
+ return 0;
+}
+
/* Setinfo coming from Server (eg MDT) to Client (eg MDC)! */
static int ldlm_handle_setinfo(struct ptlrpc_request *req)
{
req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL);
CDEBUG(D_INODE, "cancel\n");
if (CFS_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_NET) ||
- CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_CANCEL_RESEND))
+ CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_CANCEL_RESEND) ||
+ CFS_FAIL_CHECK(OBD_FAIL_LDLM_BL_EVICT))
RETURN(0);
rc = ldlm_handle_cancel(req);
if (rc)
EXPORT_SYMBOL(ldlm_revoke_export_locks);
#endif /* HAVE_SERVER_SUPPORT */
-static struct ldlm_bl_work_item *ldlm_bl_get_work(struct ldlm_bl_pool *blp)
+static int ldlm_bl_get_work(struct ldlm_bl_pool *blp,
+ struct ldlm_bl_work_item **p_blwi,
+ struct obd_export **p_exp)
{
struct ldlm_bl_work_item *blwi = NULL;
static unsigned int num_bl = 0;
+ static unsigned int num_stale;
+ int num_th = atomic_read(&blp->blp_num_threads);
+
+ *p_exp = obd_stale_export_get();
spin_lock(&blp->blp_lock);
+ if (*p_exp != NULL) {
+ if (num_th == 1 || ++num_stale < num_th) {
+ spin_unlock(&blp->blp_lock);
+ return 1;
+ } else {
+ num_stale = 0;
+ }
+ }
+
/* process a request from the blp_list at least every blp_num_threads */
if (!list_empty(&blp->blp_list) &&
(list_empty(&blp->blp_prio_list) || num_bl == 0))
blwi_entry);
if (blwi) {
- if (++num_bl >= atomic_read(&blp->blp_num_threads))
+ if (++num_bl >= num_th)
num_bl = 0;
list_del(&blwi->blwi_entry);
}
spin_unlock(&blp->blp_lock);
+ *p_blwi = blwi;
- return blwi;
+ if (*p_exp != NULL && *p_blwi != NULL) {
+ obd_stale_export_put(*p_exp);
+ *p_exp = NULL;
+ }
+
+ return (*p_blwi != NULL || *p_exp != NULL) ? 1 : 0;
}
/* This only contains temporary data until the thread starts */
return 0;
}
+/* Not fatal if racy and have a few too many threads */
+static int ldlm_bl_thread_need_create(struct ldlm_bl_pool *blp,
+ struct ldlm_bl_work_item *blwi)
+{
+ int busy = atomic_read(&blp->blp_busy_threads);
+
+ if (busy >= blp->blp_max_threads)
+ return 0;
+
+ if (busy < atomic_read(&blp->blp_num_threads))
+ return 0;
+
+ if (blwi != NULL && (blwi->blwi_ns == NULL ||
+ blwi->blwi_mem_pressure))
+ return 0;
+
+ return 1;
+}
+
+static int ldlm_bl_thread_blwi(struct ldlm_bl_pool *blp,
+ struct ldlm_bl_work_item *blwi)
+{
+ ENTRY;
+
+ if (blwi->blwi_ns == NULL)
+ /* added by ldlm_cleanup() */
+ RETURN(LDLM_ITER_STOP);
+
+ if (blwi->blwi_mem_pressure)
+ memory_pressure_set();
+
+ OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL2, 4);
+
+ if (blwi->blwi_count) {
+ int count;
+ /* The special case when we cancel locks in lru
+ * asynchronously, we pass the list of locks here.
+ * Thus locks are marked LDLM_FL_CANCELING, but NOT
+ * canceled locally yet. */
+ count = ldlm_cli_cancel_list_local(&blwi->blwi_head,
+ blwi->blwi_count,
+ LCF_BL_AST);
+ ldlm_cli_cancel_list(&blwi->blwi_head, count, NULL,
+ blwi->blwi_flags);
+ } else {
+ ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
+ blwi->blwi_lock);
+ }
+ if (blwi->blwi_mem_pressure)
+ memory_pressure_clr();
+
+ if (blwi->blwi_flags & LCF_ASYNC)
+ OBD_FREE(blwi, sizeof(*blwi));
+ else
+ complete(&blwi->blwi_comp);
+
+ RETURN(0);
+}
+
+/**
+ * Cancel stale locks on export. Cancel blocked locks first.
+ * If the given export has blocked locks, the next in the list may have
+ * them too, thus cancel not blocked locks only if the current export has
+ * no blocked locks.
+ **/
+static int ldlm_bl_thread_exports(struct ldlm_bl_pool *blp,
+ struct obd_export *exp)
+{
+ int num;
+ ENTRY;
+
+ OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_BL_EVICT, 4);
+
+ num = ldlm_export_cancel_blocked_locks(exp);
+ if (num == 0)
+ ldlm_export_cancel_locks(exp);
+
+ obd_stale_export_put(exp);
+
+ RETURN(0);
+}
+
+
/**
* Main blocking requests processing thread.
*
while (1) {
struct l_wait_info lwi = { 0 };
struct ldlm_bl_work_item *blwi = NULL;
- int busy;
+ struct obd_export *exp = NULL;
+ int rc;
- blwi = ldlm_bl_get_work(blp);
+ rc = ldlm_bl_get_work(blp, &blwi, &exp);
- if (blwi == NULL) {
+ if (rc == 0) {
atomic_dec(&blp->blp_busy_threads);
l_wait_event_exclusive(blp->blp_waitq,
- (blwi = ldlm_bl_get_work(blp)) != NULL,
- &lwi);
- busy = atomic_inc_return(&blp->blp_busy_threads);
- } else {
- busy = atomic_read(&blp->blp_busy_threads);
+ ldlm_bl_get_work(blp, &blwi,
+ &exp),
+ &lwi);
+ atomic_inc(&blp->blp_busy_threads);
}
- if (blwi->blwi_ns == NULL)
- /* added by ldlm_cleanup() */
- break;
-
- /* Not fatal if racy and have a few too many threads */
- if (unlikely(busy < blp->blp_max_threads &&
- busy >= atomic_read(&blp->blp_num_threads) &&
- !blwi->blwi_mem_pressure))
+ if (ldlm_bl_thread_need_create(blp, blwi))
/* discard the return value, we tried */
ldlm_bl_thread_start(blp);
- if (blwi->blwi_mem_pressure)
- memory_pressure_set();
-
- OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL2, 4);
-
- if (blwi->blwi_count) {
- int count;
- /* The special case when we cancel locks in LRU
- * asynchronously, we pass the list of locks here.
- * Thus locks are marked LDLM_FL_CANCELING, but NOT
- * canceled locally yet. */
- count = ldlm_cli_cancel_list_local(&blwi->blwi_head,
- blwi->blwi_count,
- LCF_BL_AST);
- ldlm_cli_cancel_list(&blwi->blwi_head, count, NULL,
- blwi->blwi_flags);
- } else {
- ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
- blwi->blwi_lock);
- }
- if (blwi->blwi_mem_pressure)
- memory_pressure_clr();
+ if (exp)
+ rc = ldlm_bl_thread_exports(blp, exp);
+ else if (blwi)
+ rc = ldlm_bl_thread_blwi(blp, blwi);
- if (blwi->blwi_flags & LCF_ASYNC)
- OBD_FREE(blwi, sizeof(*blwi));
- else
- complete(&blwi->blwi_comp);
+ if (rc == LDLM_ITER_STOP)
+ break;
}
atomic_dec(&blp->blp_busy_threads);
struct ldlm_namespace *ns;
struct ldlm_namespace *ns_old = NULL;
int nr, equal = 0;
- int time = 50; /* seconds of sleep if no active namespaces */
+ /* seconds of sleep if no active namespaces */
+ int time = client ? LDLM_POOL_CLI_DEF_RECALC_PERIOD :
+ LDLM_POOL_SRV_DEF_RECALC_PERIOD;
/*
* No need to setup pool limit for client pools.
ldlm_namespace_put(ns);
}
}
+
+ /* Wake up the blocking threads from time to time. */
+ ldlm_bl_thread_wakeup();
+
return time;
}
if (rc)
CERROR("ldlm_cli_cancel: %d\n", rc);
} else {
- ldlm_resource_unlink_lock(lock);
unlock_res(res);
LDLM_DEBUG(lock, "Freeing a lock still held by a "
"client node");
- ldlm_lock_destroy(lock);
+ ldlm_lock_cancel(lock);
}
LDLM_LOCK_RELEASE(lock);
} while (1);
}
EXIT;
}
+EXPORT_SYMBOL(ldlm_namespace_free_prior);
/**
* Performs freeing memory structures related to \a ns. This is only done
ldlm_put_ref();
EXIT;
}
+EXPORT_SYMBOL(ldlm_namespace_free_post);
/**
* Cleanup the resource, and free namespace.
mdt_llog_ctxt_unclone(env, m, LLOG_AGENT_ORIG_CTXT);
mdt_llog_ctxt_unclone(env, m, LLOG_CHANGELOG_ORIG_CTXT);
+
+ if (m->mdt_namespace != NULL)
+ ldlm_namespace_free_prior(m->mdt_namespace, NULL,
+ d->ld_obd->obd_force);
+
obd_exports_barrier(obd);
obd_zombie_barrier();
upcall_cache_cleanup(m->mdt_identity_cache);
m->mdt_identity_cache = NULL;
- if (m->mdt_namespace != NULL) {
- ldlm_namespace_free(m->mdt_namespace, NULL,
- d->ld_obd->obd_force);
- d->ld_obd->obd_namespace = m->mdt_namespace = NULL;
- }
+ if (m->mdt_namespace != NULL) {
+ ldlm_namespace_free_post(m->mdt_namespace);
+ d->ld_obd->obd_namespace = m->mdt_namespace = NULL;
+ }
mdt_quota_fini(env, m);
int mgs_fs_cleanup(const struct lu_env *env, struct mgs_device *mgs)
{
- class_disconnect_exports(mgs->mgs_obd); /* cleans up client info too */
-
if (mgs->mgs_configs_dir) {
lu_object_put(env, &mgs->mgs_configs_dir->do_lu);
mgs->mgs_configs_dir = NULL;
mgs_params_fsdb_cleanup(env, mgs);
mgs_cleanup_fsdb_list(mgs);
+ ldlm_namespace_free_prior(obd->obd_namespace, NULL, 1);
obd_exports_barrier(obd);
obd_zombie_barrier();
mgs_fs_cleanup(env, mgs);
- ldlm_namespace_free(obd->obd_namespace, NULL, 1);
+ ldlm_namespace_free_post(obd->obd_namespace);
obd->obd_namespace = NULL;
lu_site_purge(env, d->ld_site, ~0);
{
int i, err;
+ spin_lock_init(&obd_stale_export_lock);
+ INIT_LIST_HEAD(&obd_stale_exports);
+ atomic_set(&obd_stale_export_num, 0);
+
LCONSOLE_INFO("Lustre: Build Version: "BUILD_VERSION"\n");
spin_lock_init(&obd_types_lock);
class_handle_cleanup();
class_exit_uuidlist();
obd_zombie_impexp_stop();
+ LASSERT(list_empty(&obd_stale_exports));
memory_leaked = obd_memory_sum();
static void print_export_data(struct obd_export *exp,
const char *status, int locks);
+struct list_head obd_stale_exports;
+spinlock_t obd_stale_export_lock;
+atomic_t obd_stale_export_num;
+
int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
if (atomic_dec_and_test(&exp->exp_refcount)) {
LASSERT(!list_empty(&exp->exp_obd_chain));
- CDEBUG(D_IOCTL, "final put %p/%s\n",
- exp, exp->exp_client_uuid.uuid);
+ LASSERT(list_empty(&exp->exp_stale_list));
+ CDEBUG(D_IOCTL, "final put %p/%s\n",
+ exp, exp->exp_client_uuid.uuid);
- /* release nid stat refererence */
- lprocfs_exp_cleanup(exp);
+ /* release nid stat refererence */
+ lprocfs_exp_cleanup(exp);
- obd_zombie_export_add(exp);
- }
+ obd_zombie_export_add(exp);
+ }
}
EXPORT_SYMBOL(class_export_put);
INIT_HLIST_NODE(&export->exp_gen_hash);
spin_lock_init(&export->exp_bl_list_lock);
INIT_LIST_HEAD(&export->exp_bl_list);
+ INIT_LIST_HEAD(&export->exp_stale_list);
export->exp_sp_peer = LUSTRE_SP_ANY;
export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
list_del_init(&exp->exp_obd_chain_timed);
exp->exp_obd->obd_num_exports--;
spin_unlock(&exp->exp_obd->obd_dev_lock);
- class_export_put(exp);
+ atomic_inc(&obd_stale_export_num);
+
+ /* A reference is kept by obd_stale_exports list */
+ obd_stale_export_put(exp);
}
/* Import management functions */
* Add export to the obd_zombe thread and notify it.
*/
static void obd_zombie_export_add(struct obd_export *exp) {
+ atomic_dec(&obd_stale_export_num);
spin_lock(&exp->exp_obd->obd_dev_lock);
LASSERT(!list_empty(&exp->exp_obd_chain));
list_del_init(&exp->exp_obd_chain);
EXPORT_SYMBOL(obd_zombie_barrier);
+struct obd_export *obd_stale_export_get(void)
+{
+ struct obd_export *exp = NULL;
+ ENTRY;
+
+ spin_lock(&obd_stale_export_lock);
+ if (!list_empty(&obd_stale_exports)) {
+ exp = list_entry(obd_stale_exports.next,
+ struct obd_export, exp_stale_list);
+ list_del_init(&exp->exp_stale_list);
+ }
+ spin_unlock(&obd_stale_export_lock);
+
+ if (exp) {
+ CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
+ atomic_read(&obd_stale_export_num));
+ }
+ RETURN(exp);
+}
+EXPORT_SYMBOL(obd_stale_export_get);
+
+void obd_stale_export_put(struct obd_export *exp)
+{
+ ENTRY;
+
+ LASSERT(list_empty(&exp->exp_stale_list));
+ if (exp->exp_lock_hash &&
+ atomic_read(&exp->exp_lock_hash->hs_count)) {
+ CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
+ atomic_read(&obd_stale_export_num));
+
+ spin_lock_bh(&exp->exp_bl_list_lock);
+ spin_lock(&obd_stale_export_lock);
+ /* Add to the tail if there is no blocked locks,
+ * to the head otherwise. */
+ if (list_empty(&exp->exp_bl_list))
+ list_add_tail(&exp->exp_stale_list,
+ &obd_stale_exports);
+ else
+ list_add(&exp->exp_stale_list,
+ &obd_stale_exports);
+
+ spin_unlock(&obd_stale_export_lock);
+ spin_unlock_bh(&exp->exp_bl_list_lock);
+ } else {
+ class_export_put(exp);
+ }
+ EXIT;
+}
+EXPORT_SYMBOL(obd_stale_export_put);
+
+/**
+ * Adjust the position of the export in the stale list,
+ * i.e. move to the head of the list if is needed.
+ **/
+void obd_stale_export_adjust(struct obd_export *exp)
+{
+ LASSERT(exp != NULL);
+ spin_lock_bh(&exp->exp_bl_list_lock);
+ spin_lock(&obd_stale_export_lock);
+
+ if (!list_empty(&exp->exp_stale_list) &&
+ !list_empty(&exp->exp_bl_list))
+ list_move(&exp->exp_stale_list, &obd_stale_exports);
+
+ spin_unlock(&obd_stale_export_lock);
+ spin_unlock_bh(&exp->exp_bl_list_lock);
+}
+EXPORT_SYMBOL(obd_stale_export_adjust);
+
/**
* destroy zombie export/import thread.
*/
LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
+ /* ignore waiting locks, no more granted locks in the list */
+ if (lock->l_granted_mode != lock->l_req_mode)
+ break;
+
if (!ldlm_res_eq(&tsi->tsi_resid, &lock->l_resource->lr_name))
continue;
stop.ls_flags = 0;
lfsck_stop(env, m->ofd_osd, &stop);
target_recovery_fini(obd);
+ if (m->ofd_namespace != NULL)
+ ldlm_namespace_free_prior(m->ofd_namespace, NULL,
+ d->ld_obd->obd_force);
+
obd_exports_barrier(obd);
obd_zombie_barrier();
ofd_fs_cleanup(env, m);
if (m->ofd_namespace != NULL) {
- ldlm_namespace_free(m->ofd_namespace, NULL,
- d->ld_obd->obd_force);
+ ldlm_namespace_free_post(m->ofd_namespace);
d->ld_obd->obd_namespace = m->ofd_namespace = NULL;
}
}
run_test 66 "lock enqueue re-send vs client eviction"
+test_65() {
+ mount_client $DIR2
+
+ #grant lock1, export2
+ $SETSTRIPE -i -0 $DIR2/$tfile || return 1
+ $MULTIOP $DIR2/$tfile Ow || return 2
+
+#define OBD_FAIL_LDLM_BL_EVICT 0x31e
+ do_facet ost $LCTL set_param fail_loc=0x31e
+ #get waiting lock2, export1
+ $MULTIOP $DIR/$tfile Ow &
+ PID1=$!
+ # let enqueue to get asleep
+ sleep 2
+
+ #get lock2 blocked
+ $MULTIOP $DIR2/$tfile Ow &
+ PID2=$!
+ sleep 2
+
+ #evict export1
+ ost_evict_client
+
+ sleep 2
+ do_facet ost $LCTL set_param fail_loc=0
+
+ wait $PID1
+ wait $PID2
+
+ umount_client $DIR2
+}
+run_test 65 "lock enqueue for destroyed export"
+
check_cli_ir_state()
{
local NODE=${1:-$HOSTNAME}